当前位置: 首页 > news >正文

java_将数据存入elasticsearch进行高效搜索

使用技术简介:

(1) 使用Nginx实现反向代理,使前端可以调用多个微服务

(2) 使用nacos将多个服务管理关联起来

(3) 将数据存入elasticsearch进行高效搜索

(4) 使用消息队列rabbitmq进行消息的传递 

(5) 使用 openfeign 进行多个服务之间的api调用

参考: 56 尚上优选项目-平台管理端-整合ES+MQ实现商品上下架-功能最终测试_哔哩哔哩_bilibili

1. 使用Nginx实现反向代理

使用Nginx实现反向代理,使前端可以通过一个端口,调用多个微服务(的端口)

前端中的配置的base api端口 9901:

反向代理逻辑图:

图示 /acl /sys 为两个服务的名称中的路径字符串。

在nginx中的配置如下:

2. 使用nacos将多个服务管理关联起来

 通过nacos将多个服务关联起来,这里实现一个产品上架(存入elasticsearch仓库,简称es)下架(从es仓库删除)的功能.service-product提供商品的信息,service-search通过远程调用(FeignClient)调用service-product的接口,获取商品的具体信息,存入或者从es中删除。

service-product 和 service-search 两个服务通过消息队列进行通讯(rabbitmq消息队列, pom名称:spring-cloud-starter-bus-amqp)

nacos部分:pom依赖:

<!--服务注册 -->
<dependency><groupId>com.alibaba.cloud</groupId><artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
</dependency>

在各个服务中添加在naco的注册:示例 service-produce模块的application.yml中添加cloud.nacos.discovery.server-addr

 在 main主程序中添加 启用Nacos服务发现功能:

同理在service-search模块中:

启动nacos服务,同时启动自己的服务,看自己的服务是否注册成功:

下载安装合适版本的nacos(版本不对应会出兼容性问题):

到nacos目录,执行:

.\bin\startup.cmd -m standalone

启动nacos后,启动自己的服务,如自己的服务注册成功,会在log中有如下提示:

在浏览器中输入: http://192.168.136.1:8848/nacos/index.html 也可看是否启动成功,若你的目标服务在“服务列表”中,说明服务注册成功,nacos正常运行,服务可以被关联起来,可以通过restful风格进行数据的传递了。

3. 使用消息队列rabbitmq进行消息的传递 

在需要使用的服务service-search的pom.xml中添加依赖:

<!--rabbitmq消息队列-->
<dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-bus-amqp</artifactId>
</dependency>

安装rabbitmq服务,示例在虚拟机docker中安装 rabbitmq:3.8-management:

#拉取镜像
docker pull rabbitmq:3.8-management
#创建容器启动
docker run -d --restart=always -p 5672:5672 -p 15672:15672 --name rabbitmq rabbitmq:3.8-management

查看安装rabbitmq的虚拟机ip地址: 

docker中安装好后,可以在本机(win11)上测试下安装是否成功:浏览器中输入:http://192.168.68.132:15672/ (注意这里是自己虚拟机的ip地址)

根据自己虚拟机的地址,在需要使用的服务service-search 和 serivice-product 中配置ip地址和端口号等信息:

 

编写代码,使用 RabbitTemplate, 进行消息的发送:

进行消息发送:指定j交换机exchange 字符串(自定义一个),路由器routing, 发送内容 Object message (这里是 Long skuId)

为了让发送的消息能构正确解析,需要定义一个@Configuration, 使用Jackson2JsonMessageConverter,进行消息的类型转换,这里是将Long skuId 转换成Json格式:

package com.atguigu.ssyx.mq.config;import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class MQConfig {@Beanpublic MessageConverter messageConverter() {return new Jackson2JsonMessageConverter();}
}

注意rabbit端还需要对 RabbitTemplate 做一些初始化操作,参考init():

package com.atguigu.ssyx.mq.config;import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;import javax.annotation.PostConstruct;@Component
public class MQProducerAckConfig implements RabbitTemplate.ReturnCallback, RabbitTemplate.ConfirmCallback {//  我们发送消息使用的是 private RabbitTemplate rabbitTemplate; 对象//  如果不做设置的话 当前的rabbitTemplate 与当前的配置类没有任何关系!@Autowiredprivate RabbitTemplate rabbitTemplate;//  设置 表示修饰一个非静态的void方法,在服务器加载Servlet的时候运行。并且只执行一次!// PostConstruct注解的函数:在 Spring 容器创建并初始化 Bean 后自动调用,一个类中只能有一个 @PostConstruct 注解的方法@PostConstructpublic void init(){rabbitTemplate.setReturnCallback(this);rabbitTemplate.setConfirmCallback(this);}/*** 表示消息是否正确发送到了交换机上** @param correlationData 消息的载体* @param ack             判断是否发送到交换机上* @param cause           原因*/@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {if (ack) {System.out.println("消息发送成功!");} else {System.out.println("消息发送失败!" + cause);}}/*** 消息如果没有正确发送到队列中,则会走这个方法!如果消息被正常处理,则这个方法不会走!** @param message* @param replyCode* @param replyText* @param exchange* @param routingKey*/@Overridepublic void returnedMessage(Message message, int replyCode, String replyText,String exchange, String routingKey) {System.out.println("消息主体: " + new String(message.getBody()));System.out.println("应答码: " + replyCode);System.out.println("描述:" + replyText);System.out.println("消息使用的交换器 exchange : " + exchange);System.out.println("消息使用的路由键 routing : " + routingKey);}
}

消息的接收端(消费者) ,通过RabbitListener 定位到到发送者发送的消息队列上:

package com.atguigu.ssyx.receiver;
import com.atguigu.ssyx.mq.constant.MqConst;
import com.rabbitmq.client.Channel;
import com.atguigu.ssyx.search.service.SkuService;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;import java.io.IOException;@Component
public class SkuReceiver {@Autowiredprivate SkuService skuService;/*** 商品上架* @param skuId* @param message* @param channel* @throws IOException*/@RabbitListener(bindings = @QueueBinding( //绑定接收什么消息 //消费者,接收消息value=@Queue(value = MqConst.QUEUE_GOODS_UPPER, durable = "true"), // durable 持久化exchange = @Exchange(value = MqConst.EXCHANGE_GOODS_DIRECT),key = {MqConst.ROUTING_GOODS_UPPER}))public void upperSku(Long skuId, Message message, Channel channel) throws IOException {// 发送者:   rabbitService.sendMsg(MqConst.EXCHANGE_GOODS_DIRECT,//                      MqConst.ROUTING_GOODS_UPPER,//                      skuId);//:发送者函数原型:public boolean sendMsg(String exchange, String routingKey, Object message)// 这里的Object message使用 public MessageConverter messageConverter() 方法转成了json格式,// 确保了生产者和消费者之间的消息序列化与反序列化逻辑一致,// 所以 Object message 对应这里接收者的 Long skuId 形参,实现了参数传递的目的。try{if(skuId != null) {skuService.upperSku(skuId);}/*** 第一个参数:表示收到的消息的标号* 第二个参数:如果为true表示可以签收多个消息*///DeliveryTag 交货标签channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);} catch (IOException e) {System.out.print("john:IOException occurred while processing message");throw e;  // 或者处理该异常}}/*** 商品下架* @param skuId*/@RabbitListener(bindings = @QueueBinding(value = @Queue(value = MqConst.QUEUE_GOODS_LOWER, durable = "true"),exchange = @Exchange(value = MqConst.EXCHANGE_GOODS_DIRECT), //交换器key = {MqConst.ROUTING_GOODS_LOWER} //路由器))public void lowerSku(Long skuId, Message message, Channel channel) throws IOException {try {if (skuId != null) {skuService.lowerSku(skuId);}channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);}catch (IOException e) {System.out.print("john:IOException occurred while processing message");throw e;  // 或者处理该异常}}
}

注意着的对应关系:

public boolean sendMsg(String exchange, String routingKey, Object message

sendMsg中的 Object message 被 public MessageConverter messageConverter() 正确解析成了 Long skuId

upperSku(Long skuId, Message message, Channel channel)

至此,rabbitmq完成了从service-product传递一个skuId到service-serach的动作。

若有消息发送送时,界面中会出翔对应的消息队列的名称:

4.将数据存入elasticsearch进行高效搜索

elasticsearch的配置

下载elasticsearch Past Releases of Elastic Stack Software | Elastic, elasticsearch-7.8.0-windows-x86_64.zip (注意版本不对,会有兼容性问题)

下载对应版本的分词器:elasticsearch-analysis-ik-7.8.0.zip 

下载 kibana来进行客户端操作 kibana-7.8.0-windows-x86_64.zip

解压 elasticsearch-7.8.0-windows-x86_64文件,在

path\elasticsearch-7.8.0-windows-x86_64\elasticsearch-7.8.0\plugins\ 目录下新建文件夹,命名为ik,解压elasticsearch-analysis-ik-7.8.0,将内容所有内容copy到ik目录中:

在 es目录下运行 .\bin\elasticsearch.bat 启动es:

可以看到启动成功了:

浏览器界面看下 http://localhost:9200/ ,出现下面界面,说明启动成功

配置kibana

下载,解压,修改如下3个配置 (不同版本可能不一样,有增加或者删除项):

启动kibana:

 .\bin\kibana.bat

浏览器访问看下:http://127.0.0.1:5601/ 

分词器的测试:

代码配置

首先定义一个存储仓库对应的类  SkuRepository, 继承自 ElasticsearchRepository:

package com.atguigu.ssyx.search.repository;
import com.atguigu.ssyx.model.search.SkuEs;
import org.springframework.data.elasticsearch.repository.ElasticsearchRepository;public interface SkuRepository extends ElasticsearchRepository<SkuEs, Long> {
}

使用 skuRepository.save(skuEs) 方法,即可存入 es仓库中。

使用 skuRepository.deleteById(skuId); 即可从仓库中删除。 

使用 ProductFeignClient 进行跨服务访问

因为service-serach要使用service-product的信息,但时两个服务,这是需要使用 ProductFeignClient 进行restful风格的api传递参数,进行远程调用。

首先需要pom中引入依赖:

<!-- 服务调用feign -->
<dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-openfeign</artifactId><scope>provided </scope>
</dependency>

在对应的 service-serach的application-dev.yml中进行配置:

然后,定义service-product中的api,将参数传入到指定的api接口:

package com.atguigu.ssyx.product.api;import com.atguigu.ssyx.model.product.Category;
import com.atguigu.ssyx.model.product.SkuInfo;
import com.atguigu.ssyx.product.service.CategoryService;
import com.atguigu.ssyx.product.service.SkuInfoService;
import io.swagger.annotations.ApiOperation;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;@RestController
// 内部调用,不是前端调用
@RequestMapping("/api/product") //该类是为了内部调用,供service-search使用,所以命名为内部inner
public class ProductInnerController {@Autowiredprivate CategoryService categoryService;@Autowiredprivate SkuInfoService skuInfoService;//根据categoryId获取商品的category信息@ApiOperation(value = "根据categoryId获取商品的category信息")@GetMapping("inner/getCategory/{categoryId}")public Category getCategoryById(@PathVariable("categoryId") Long categoryId) {return categoryService.getById(categoryId);}//根据skuId获取skuInfo@ApiOperation(value = "根据skuId获取skuInfo信息")@GetMapping("inner/getSkuInfo/{skuId}")public SkuInfo getSkuInfoById(@PathVariable("skuId") Long skuId) {return skuInfoService.getById(skuId);}}

定义使用该api的函数,并使用@FeignClient 注明从哪个模块进行api的对接:

package com.atguigu.ssyx.client.product;import com.atguigu.ssyx.model.product.Category;
import com.atguigu.ssyx.model.product.SkuInfo;
import org.springframework.cloud.openfeign.FeignClient;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;//与service\service-product\src\main\resources\application.yml中的application。name名称一致
@FeignClient(value = "service-product")
public interface ProductFeignClient {//作为\service\service-product中的\product\api\ProductInnerController.java//中函数的接口定义文件//注意要使用完整的restful风格的路径//用于远程调用(service-search远程调用service-product)@GetMapping("/api/product/inner/getCategory/{categoryId}")public Category getCategoryById(@PathVariable("categoryId") Long categoryId);@GetMapping("/api/product/inner/getSkuInfo/{skuId}")public SkuInfo getSkuInfoById(@PathVariable("skuId") Long skuId);}

定义feign对象 productFeignClient,调用api对应的函数接口:

SkuInfo skuInfo = productFeignClient.getSkuInfoById(skuId);
// ...
Category category = productFeignClient.getCategoryById(skuInfo.getCategoryId());
package com.atguigu.ssyx.search.service.impl;import com.alibaba.fastjson.JSON;
import com.atguigu.ssyx.client.product.ProductFeignClient;
import com.atguigu.ssyx.enums.SkuType;
import com.atguigu.ssyx.model.product.Category;
import com.atguigu.ssyx.model.product.SkuInfo;
import com.atguigu.ssyx.model.search.SkuEs;
import com.atguigu.ssyx.search.repository.SkuRepository;
import com.atguigu.ssyx.search.service.SkuService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;import static jdk.nashorn.internal.runtime.regexp.joni.Config.log;@Slf4j
@Service
public class SkuServiceImpl implements SkuService {//通过service-product-client中的feign远程调用service-product中的方法@Autowiredprivate ProductFeignClient productFeignClient;//写入ES,即ElasticsearchRepository的接口类@Autowiredprivate SkuRepository skuRepository;@Overridepublic void upperSku(Long skuId) {SkuEs skuEs = new SkuEs();//为skuEs一个一个属性赋值SkuInfo skuInfo = productFeignClient.getSkuInfoById(skuId);if (skuInfo == null) {return;}Category category = productFeignClient.getCategoryById(skuInfo.getCategoryId());if (category != null) {skuEs.setCategoryId(category.getId());skuEs.setCategoryName(category.getName());}skuEs.setId(skuInfo.getId());skuEs.setKeyword(skuInfo.getSkuName() + "," + skuEs.getCategoryName()); //keyword不分词查询skuEs.setWareId(skuInfo.getWareId());skuEs.setIsNewPerson(skuInfo.getIsNewPerson());skuEs.setImgUrl(skuInfo.getImgUrl());//分词查询的字段数据类型必须是 FieldType.TextskuEs.setTitle(skuInfo.getSkuName());if (skuInfo.getSkuType() == SkuType.COMMON.getCode()) //普通还是秒杀{skuEs.setSkuType(0);skuEs.setPrice(skuInfo.getPrice().doubleValue());skuEs.setStock(skuInfo.getStock()); //仓库数量skuEs.setSale(skuInfo.getSale());skuEs.setPerLimit(skuInfo.getPerLimit()); //每人限购数量} else {//TODO 待完善-秒杀商品}//使用 ElasticsearchRepository 提供的方法保存ES信息SkuEs save = skuRepository.save(skuEs);System.out.print("upperSku:" + JSON.toJSONString(save));}@Overridepublic void lowerSku(Long skuId) {//使用 ElasticsearchRepository 提供的方法删除ES信息skuRepository.deleteById(skuId);}
}

5. 效果验证

实验中的表的名称是skues:

对应

public interface SkuRepository extends ElasticsearchRepository<SkuEs, Long> {
}

对应代码中的名称: 

当服务中的  public void upperSku(Long skuId) 被调用时,即 skuRepository.save(skuEs);被调用时,对应的数据就会写入 es中,可在 kibana中进行查看:

使用下面命令进行查看:

GET /_cat/indices?v

查看更多数据,(数据结构取决于自己数据库和代码中对数据的定义):

使用如下命令,其中skues为你的index (表名)名称

POST /skues/_search
{"query":{"match_all":{}}
}

结果有了:


http://www.mrgr.cn/news/83415.html

相关文章:

  • 自动驾驶---E2E架构演进
  • 论文阅读:Searching for Fast Demosaicking Algorithms
  • PyCharm文档管理
  • 【数据结构:前缀树Trie】
  • ue5玩家角色添加武器。切换武器位置,手上武器放到背上。演示一下人体插槽和武器的连接。仅仅演示,实际项目不是这么用的
  • 正则表达式{}和(),pyhton里的正则表达式,函数findall解析
  • Vue Router4
  • Flask----前后端不分离-登录
  • 【算法与数据结构】—— 回文问题
  • 有心力场的两体问题
  • 修改之前的代码使得利用设备树文件和Platform总线设备驱动实现对多个LED的驱动【只是假想对LED进行驱动,并没有实际的硬件操作】
  • 大模型WebUI:Gradio全解11——Chatbots:融合大模型的多模态聊天机器人(2)
  • android四大组件之一——Service
  • 探索 C++ Insights: 理解编译器背后的 C++ 实现
  • 树的模拟实现
  • python 个人学习笔记
  • RabbitMQ基础(简单易懂)
  • day06_Spark SQL
  • 【源码解析】Java NIO 包中的 ByteBuffer
  • 【Rust自学】11.7. 按测试的名称运行测试
  • Python|基于DeepSeek大模型,实现文本内容仿写(8)
  • MySql按年月日自动创建分区存储过程
  • 使用Struts2遇到的Context[项目名称]启动失败问题解决(Java Web学习笔记)
  • 《CPython Internals》阅读笔记:p96-p96
  • 20、Citrix 云桌面常见VDA注册问题汇总
  • HTTP 核心概念