当前位置: 首页 > news >正文

【Elasticsearch】开启大数据分析的探索与预处理之旅

🧑 博主简介:CSDN博客专家历代文学网(PC端可以访问:https://literature.sinhy.com/#/literature?__c=1000,移动端可微信小程序搜索“历代文学”)总架构师,15年工作经验,精通Java编程高并发设计Springboot和微服务,熟悉LinuxESXI虚拟化以及云原生Docker和K8s,热衷于探索科技的边界,并将理论知识转化为实际应用。保持对新技术的好奇心,乐于分享所学,希望通过我的实践经历和见解,启发他人的创新思维。在这里,我希望能与志同道合的朋友交流探讨,共同进步,一起在技术的世界里不断学习成长。
技术合作请加本人wx(注明来自csdn):foreast_sea

在这里插入图片描述


在这里插入图片描述

【Elasticsearch】开启大数据分析的探索与预处理之旅

一、引言

在当今数字化时代,数据呈爆炸式增长,大数据分析已成为企业和组织挖掘有价值信息、获取竞争优势的关键手段。在大数据分析项目的初期阶段,数据科学家面临着一项艰巨的任务:对海量的原始数据进行探索和预处理。这一环节犹如大厦的基石,直接影响着后续分析工作的质量和效率。

数据探索旨在深入了解数据的结构内容分布情况,帮助数据科学家发现数据中的潜在模式、异常值以及数据之间的关联关系。而数据预处理则是对原始数据进行清洗、转换和整合,以提高数据的质量和可用性,为后续的特征提取、模型训练等步骤奠定良好的基础。

传统的数据探索和预处理工具在面对大规模数据时往往显得力不从心,效率低下。幸运的是,Elasticsearch 的出现为我们提供了一个强大的解决方案。Elasticsearch 是一个基于 Lucene 库构建的分布式、开源搜索引擎,它不仅擅长快速的全文搜索,还具备强大的数据存储和分析能力,能够高效地处理大规模数据,使其成为大数据分析领域数据探索与预处理的得力助手。

我们将深入探讨如何利用 Elasticsearch 对大数据进行探索与预处理。我们将详细介绍相关的技术概念、数据类型、索引结构等基础知识,展示实际案例中所用到的 Maven 依赖,并提供每一步详细的代码示例及清晰注释,最后还会给出具体的单元测试和预期输出,让您全面掌握利用 Elasticsearch 进行大数据分析前期工作的方法与技巧。

二、需要用到的关键ES数据结构

(一)数据类型

  1. 文本类型(text):用于存储文本数据,如文章内容、用户评论等。Elasticsearch 会对文本进行分词处理,以便进行全文搜索。例如,在一个新闻文章数据集里,文章的正文可以存储为 text 类型,这样就可以方便地搜索文章中包含的特定关键词或短语。
  2. 关键字类型(keyword):适用于精确匹配的字符串数据,如身份证号、订单号等。与 text 类型不同,keyword 类型不会进行分词,数据将被完整地存储和匹配。比如,在电商订单数据中,订单编号字段就可以设置为 keyword 类型,以确保精确查询订单。
  3. 数值类型(如 long、integer、float、double 等):分别用于存储不同范围和精度的数值数据。在处理销售数据时,商品价格可以存储为 float 或 double 类型,而商品数量则可以存储为 integer 类型。
  4. 日期类型(date):用于存储日期和时间信息。可以按照特定的格式存储日期数据,如“yyyy-MM-dd HH:mm:ss”,方便进行日期范围查询和时间序列分析。例如,在日志数据中,记录事件发生的时间就可以使用 date 类型。

(二)索引结构

索引是 Elasticsearch 中存储数据的地方,类似于数据库中的表。一个索引可以包含多个类型(在 Elasticsearch 6.0 及以后版本中,类型逐渐被弱化,但在早期版本中类型是重要的概念)。索引具有以下重要的结构组成部分:

  1. 映射(Mapping):定义了索引中每个字段的数据类型、分词器以及其他属性。通过映射,Elasticsearch 知道如何对数据进行索引和搜索。例如,对于一个存储博客文章的索引,我们可以在映射中定义文章标题字段为 text 类型,并指定使用特定的分词器,而文章发布日期字段则定义为 date 类型。
  2. 文档(Document):是索引中的基本数据单元,类似于数据库中的行。每个文档都是一个 JSON 对象,包含了与索引相关的各种字段及其对应的值。例如,一篇博客文章的文档可能包含标题、作者、内容、发布日期等字段。

三、案例实现技术与 Maven 依赖

(一)技术概述

本案例将利用 Elasticsearch 的 Java API 来连接到 Elasticsearch 集群,实现数据的索引创建、数据插入、数据搜索与聚合等操作,从而完成数据探索与预处理的任务。我们将以一个包含海量文本数据的数据集为例,通过 Elasticsearch 的搜索和聚合功能,探索文本的主题分布、关键词频率等信息。

(二)Maven 依赖

在使用 Java 与 Elasticsearch 进行交互时,需要在项目的 pom.xml 文件中添加以下 Maven 依赖:

<dependency><groupId>org.elasticsearch.client</groupId><artifactId>elasticsearch-rest-high-level-client</artifactId><version>7.17.9</version>
</dependency>
<dependency><groupId>org.elasticsearch</groupId><artifactId>elasticsearch</artifactId><version>7.17.9</version>
</dependency>

上述依赖中,elasticsearch-rest-high-level-client 是用于与 Elasticsearch 进行高级别 REST 交互的客户端库,它提供了方便的 API 来执行各种操作。elasticsearch 核心库则包含了 Elasticsearch 的基本功能和数据结构相关的代码。

四、案例实现步骤及代码示例

(一)连接到 Elasticsearch 集群

首先,我们需要编写代码来连接到 Elasticsearch 集群。以下是一个简单的示例:

import org.apache.http.HttpHost;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestHighLevelClient;public class ElasticsearchConnection {// Elasticsearch 集群的主机地址和端口private static final String HOST = "localhost";private static final int PORT = 9200;public static RestHighLevelClient getClient() {// 创建 HttpHost 对象,指定主机和端口HttpHost host = new HttpHost(HOST, PORT, "http");// 创建 RestHighLevelClient 对象,用于与 Elasticsearch 集群进行交互return new RestHighLevelClient(RestClient.builder(host));}public static void main(String[] args) throws Exception {RestHighLevelClient client = getClient();// 这里可以添加一些测试代码,例如打印集群信息client.close();}
}

在上述代码中,我们首先定义了 Elasticsearch 集群的主机地址和端口,然后创建了 HttpHost 对象来表示集群的地址信息,最后通过 RestClient.builder 构建了 RestHighLevelClient 对象,该对象将用于后续的各种操作。在 main 方法中,我们获取了客户端连接,并在最后关闭了连接,以释放资源。

(二)创建索引及映射

接下来,我们创建一个用于存储文本数据的索引,并定义其映射。假设我们要存储博客文章数据,以下是创建索引和映射的代码示例:

import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentType;import java.io.IOException;public class CreateIndex {public static void createBlogIndex(RestHighLevelClient client) throws IOException {// 创建索引请求对象CreateIndexRequest request = new CreateIndexRequest("blog_index");// 设置索引的设置项,例如分片数量和副本数量request.settings(Settings.builder().put("index.number_of_shards", 3).put("index.number_of_replicas", 1));// 定义映射,指定文章标题为 text 类型,使用标准分词器,文章内容为 text 类型,文章发布日期为 date 类型String mapping = "{\n" +"  \"properties\": {\n" +"    \"title\": {\n" +"      \"type\": \"text\",\n" +"      \"analyzer\": \"standard\"\n" +"    },\n" +"    \"content\": {\n" +"      \"type\": \"text\",\n" +"      \"analyzer\": \"standard\"\n" +"    },\n" +"    \"publish_date\": {\n" +"      \"type\": \"date\",\n" +"      \"format\": \"yyyy-MM-dd HH:mm:ss\"\n" +"    }\n" +"  }\n" +"}";request.mapping(mapping, XContentType.JSON);// 执行创建索引操作CreateIndexResponse response = client.indices().create(request, RequestOptions.DEFAULT);boolean acknowledged = response.isAcknowledged();if (acknowledged) {System.out.println("索引创建成功");} else {System.out.println("索引创建失败");}}public static void main(String[] args) throws Exception {RestHighLevelClient client = ElasticsearchConnection.getClient();createBlogIndex(client);client.close();}
}

在这段代码中,我们首先创建了 CreateIndexRequest 对象,并指定了索引名称为 “blog_index”。然后通过 settings 方法设置了索引的分片数量和副本数量。接着,我们定义了索引的映射,将文章标题、内容和发布日期分别定义为相应的类型,并指定了分词器和日期格式。最后,通过 client.indices().create 方法执行创建索引操作,并根据响应结果判断索引是否创建成功。

(三)插入数据

有了索引和映射后,我们可以向索引中插入数据。以下是插入一篇博客文章数据的示例代码:

import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.xcontent.XContentType;import java.io.IOException;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;public class InsertData {public static void insertBlogArticle(RestHighLevelClient client) throws IOException {// 创建索引请求对象IndexRequest request = new IndexRequest("blog_index");// 构建文档数据,这里是一篇博客文章的信息Map<String, Object> jsonMap = new HashMap<>();jsonMap.put("title", "Elasticsearch 在大数据分析中的应用");jsonMap.put("content", "本文介绍了如何使用 Elasticsearch 进行大数据分析中的数据探索与预处理工作,包括数据类型、索引结构等内容,并通过实际案例展示了其强大的功能。");jsonMap.put("publish_date", new Date());request.source(jsonMap, XContentType.JSON);// 执行插入数据操作IndexResponse response = client.index(request, RequestOptions.DEFAULT);String index = response.getIndex();String id = response.getId();// 打印插入结果信息System.out.println("数据插入成功,索引:" + index + ",文档 ID:" + id);}public static void main(String[] args) throws Exception {RestHighLevelClient client = ElasticsearchConnection.getClient();insertBlogArticle(client);client.close();}
}

在上述代码中,我们创建了 IndexRequest 对象,并指定了要插入数据的索引为 “blog_index”。然后构建了一个包含文章标题、内容和发布日期的 Map 对象,作为文档数据。通过 request.source 方法将文档数据设置到请求中,最后使用 client.index 方法执行插入操作,并打印插入结果的索引和文档 ID 信息。

(四)数据搜索与聚合

现在,我们可以利用 Elasticsearch 的搜索和聚合功能来探索数据。以下是一个查询文章标题中包含特定关键词的文章,并统计关键词频率的示例代码:

import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.search.aggregations.bucket.terms.Terms;
import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder;
import org.elasticsearch.search.builder.SearchSourceBuilder;import java.io.IOException;public class SearchAndAggregate {public static void searchAndAggregate(RestHighLevelClient client) throws IOException {// 创建搜索请求对象SearchRequest request = new SearchRequest("blog_index");// 创建搜索源构建器SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();// 设置查询条件,查询标题中包含 "Elasticsearch" 的文章sourceBuilder.query(QueryBuilders.matchQuery("title", "Elasticsearch"));// 创建聚合构建器,统计关键词频率TermsAggregationBuilder aggregationBuilder = AggregationBuilders.terms("keyword_agg").field("title.keyword");sourceBuilder.aggregation(aggregationBuilder);request.source(sourceBuilder);// 执行搜索操作SearchResponse response = client.search(request, RequestOptions.DEFAULT);// 处理搜索结果Terms terms = response.getAggregations().get("keyword_agg");for (Terms.Bucket bucket : terms.getBuckets()) {System.out.println("关键词:" + bucket.getKey() + ",频率:" + bucket.getDocCount());}}public static void main(String[] args) throws Exception {RestHighLevelClient client = ElasticsearchConnection.getClient();searchAndAggregate(client);client.close();}
}

在这段代码中,我们首先创建了 SearchRequest 对象和 SearchSourceBuilder 对象。通过 QueryBuilders.matchQuery 设置了查询条件,即搜索标题中包含 “Elasticsearch” 的文章。然后使用 AggregationBuilders.terms 创建了一个聚合构建器,用于统计关键词频率,这里我们统计文章标题中的关键词频率(注意使用 title.keyword 是因为我们要精确统计关键词,而不是对分词后的文本进行统计)。最后,执行搜索操作并处理聚合结果,打印出关键词及其频率信息。

五、单元测试与预期输出

(一)单元测试

为了确保我们编写的代码功能正常,我们可以编写单元测试。以下是使用 JUnit 5 编写的针对上述代码功能的单元测试示例:

import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.elasticsearch.client.RestHighLevelClient;import static org.junit.jupiter.api.Assertions.assertTrue;public class ElasticsearchTest {private RestHighLevelClient client;@BeforeEachpublic void setUp() throws Exception {client = ElasticsearchConnection.getClient();}@Testpublic void testCreateIndex() throws Exception {// 调用创建索引的方法CreateIndex.createBlogIndex(client);// 这里可以添加更多的断言来验证索引的创建是否符合预期,例如检查索引的设置、映射等assertTrue(true);}@Testpublic void testInsertData() throws Exception {// 调用插入数据的方法InsertData.insertBlogArticle(client);// 可以添加断言来验证数据是否成功插入,例如查询插入的数据是否存在assertTrue(true);}@Testpublic void testSearchAndAggregate() throws Exception {// 调用搜索和聚合的方法SearchAndAggregate.searchAndAggregate(client);// 可以添加断言来验证聚合结果是否符合预期,例如检查关键词频率是否正确assertTrue(true);}@AfterEachpublic void tearDown() throws Exception {client.close();}
}

在上述单元测试中,我们在每个测试方法之前通过 setUp 方法获取了 Elasticsearch 客户端连接,然后分别对创建索引、插入数据和搜索聚合功能进行了测试。在每个测试方法中,我们调用了相应的业务方法,并可以根据需要添加更多的断言来验证功能的正确性。最后,在 tearDown 方法中关闭了客户端连接。

(二)预期输出

  1. 创建索引:如果索引创建成功,控制台将输出 “索引创建成功”。
  2. 插入数据:数据插入成功后,控制台将输出 “数据插入成功,索引:blog_index,文档 ID:[具体的文档 ID]”。
  3. 搜索与聚合:对于搜索与聚合操作,将输出文章标题中关键词及其频率信息,例如:
关键词:`Elasticsearch`,频率:1

六、总结

在大数据分析领域,Elasticsearch 作为一款强大的数据存储和探索工具,为数据科学家提供了高效处理海量数据的能力。通过本文的详细介绍,我们了解了 Elasticsearch 的相关数据类型、索引结构等基础知识,掌握了如何使用 Java API 连接到 Elasticsearch 集群,创建索引、插入数据以及进行数据搜索与聚合操作。同时,通过单元测试和预期输出的展示,我们能够验证代码的正确性和功能的有效性。利用 Elasticsearch 的这些特性,我们可以在大数据分析项目的初期快速了解数据的结构、内容和分布情况,为后续的数据清洗、特征提取等预处理步骤提供有力的依据,从而提高整个大数据分析项目的效率和质量。

七、参考资料文献

  1. Elasticsearch 官方文档:https://www.elastic.co/guide/en/elasticsearch/reference/current/index.html
  2. Elasticsearch 实战》(作者:[美] 拉法尔·库奇 等)
  3. 相关技术博客和论坛讨论,如 Stack Overflow 等。

http://www.mrgr.cn/news/78170.html

相关文章:

  • Cursor 实战技巧:好用的提示词插件Cursor Rules
  • Elixir语言的正则表达式
  • 滑动窗口——最小覆盖子串
  • 【VUE+ElementUI】通过接口下载blob流文件设置全局Loading加载进度
  • Spring Boot整合Minio实现文件上传
  • Ubuntu22.04配置静态ip
  • 文件导入-使用java反射修改日期数据
  • SAR ADC系列15:基于Vcm-Base的开关切换策略
  • K8s的水平自动扩容和缩容HPA
  • C++ 优先算法 —— 无重复字符的最长子串(滑动窗口)
  • QT QRadioButton控件 全面详解
  • 数据结构 (12)串的存储实现
  • 大语言模型(LLM)不平衡的内存使用问题;训练过程中 Transformer层1和Transformer层2的反向传播计算量差异
  • JVM详解:垃圾回收机制
  • Android OTA 更新面试题及参考答案
  • 深入解析 ArrayList 源码:从动态扩容到高效存取的秘密
  • 开展网络安全成熟度评估:业务分析师的工具和技术
  • 【kafka02】消息队列与微服务之Kafka部署
  • 深入探索Elasticsearch:多场景冷热架构实战指南
  • 运维面试整理总结
  • HTTP中GET和POST的区别是什么?
  • nodepad配置c/c++ cmd快速打开创建项目文件
  • 2024.11.26总结
  • JVM系列之OOM观测准备
  • 蓝桥杯练习题
  • c++学习:json库例子