java_将数据存入elasticsearch进行高效搜索
使用技术简介:
(1) 使用Nginx实现反向代理,使前端可以调用多个微服务
(2) 使用nacos将多个服务管理关联起来
(3) 将数据存入elasticsearch进行高效搜索
(4) 使用消息队列rabbitmq进行消息的传递
(5) 使用 openfeign 进行多个服务之间的api调用
参考: 56 尚上优选项目-平台管理端-整合ES+MQ实现商品上下架-功能最终测试_哔哩哔哩_bilibili
1. 使用Nginx实现反向代理
使用Nginx实现反向代理,使前端可以通过一个端口,调用多个微服务(的端口)
前端中的配置的base api端口 9901:
反向代理逻辑图:
图示 /acl /sys 为两个服务的名称中的路径字符串。
在nginx中的配置如下:
2. 使用nacos将多个服务管理关联起来
通过nacos将多个服务关联起来,这里实现一个产品上架(存入elasticsearch仓库,简称es)下架(从es仓库删除)的功能.service-product提供商品的信息,service-search通过远程调用(FeignClient)调用service-product的接口,获取商品的具体信息,存入或者从es中删除。
service-product 和 service-search 两个服务通过消息队列进行通讯(rabbitmq消息队列, pom名称:spring-cloud-starter-bus-amqp)
nacos部分:pom依赖:
<!--服务注册 --> <dependency><groupId>com.alibaba.cloud</groupId><artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId> </dependency>
在各个服务中添加在naco的注册:示例 service-produce模块的application.yml中添加cloud.nacos.discovery.server-addr
在 main主程序中添加 启用Nacos服务发现功能:
同理在service-search模块中:
启动nacos服务,同时启动自己的服务,看自己的服务是否注册成功:
下载安装合适版本的nacos(版本不对应会出兼容性问题):
到nacos目录,执行:
.\bin\startup.cmd -m standalone
启动nacos后,启动自己的服务,如自己的服务注册成功,会在log中有如下提示:
在浏览器中输入: http://192.168.136.1:8848/nacos/index.html 也可看是否启动成功,若你的目标服务在“服务列表”中,说明服务注册成功,nacos正常运行,服务可以被关联起来,可以通过restful风格进行数据的传递了。
3. 使用消息队列rabbitmq进行消息的传递
在需要使用的服务service-search的pom.xml中添加依赖:
<!--rabbitmq消息队列--> <dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-bus-amqp</artifactId> </dependency>
安装rabbitmq服务,示例在虚拟机docker中安装 rabbitmq:3.8-management:
#拉取镜像
docker pull rabbitmq:3.8-management
#创建容器启动
docker run -d --restart=always -p 5672:5672 -p 15672:15672 --name rabbitmq rabbitmq:3.8-management
查看安装rabbitmq的虚拟机ip地址:
docker中安装好后,可以在本机(win11)上测试下安装是否成功:浏览器中输入:http://192.168.68.132:15672/ (注意这里是自己虚拟机的ip地址)
根据自己虚拟机的地址,在需要使用的服务service-search 和 serivice-product 中配置ip地址和端口号等信息:
编写代码,使用 RabbitTemplate, 进行消息的发送:
进行消息发送:指定j交换机exchange 字符串(自定义一个),路由器routing, 发送内容 Object message (这里是 Long skuId)
为了让发送的消息能构正确解析,需要定义一个@Configuration, 使用Jackson2JsonMessageConverter,进行消息的类型转换,这里是将Long skuId 转换成Json格式:
package com.atguigu.ssyx.mq.config;import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class MQConfig {@Beanpublic MessageConverter messageConverter() {return new Jackson2JsonMessageConverter();}
}
注意rabbit端还需要对 RabbitTemplate 做一些初始化操作,参考init():
package com.atguigu.ssyx.mq.config;import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;import javax.annotation.PostConstruct;@Component
public class MQProducerAckConfig implements RabbitTemplate.ReturnCallback, RabbitTemplate.ConfirmCallback {// 我们发送消息使用的是 private RabbitTemplate rabbitTemplate; 对象// 如果不做设置的话 当前的rabbitTemplate 与当前的配置类没有任何关系!@Autowiredprivate RabbitTemplate rabbitTemplate;// 设置 表示修饰一个非静态的void方法,在服务器加载Servlet的时候运行。并且只执行一次!// PostConstruct注解的函数:在 Spring 容器创建并初始化 Bean 后自动调用,一个类中只能有一个 @PostConstruct 注解的方法@PostConstructpublic void init(){rabbitTemplate.setReturnCallback(this);rabbitTemplate.setConfirmCallback(this);}/*** 表示消息是否正确发送到了交换机上** @param correlationData 消息的载体* @param ack 判断是否发送到交换机上* @param cause 原因*/@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {if (ack) {System.out.println("消息发送成功!");} else {System.out.println("消息发送失败!" + cause);}}/*** 消息如果没有正确发送到队列中,则会走这个方法!如果消息被正常处理,则这个方法不会走!** @param message* @param replyCode* @param replyText* @param exchange* @param routingKey*/@Overridepublic void returnedMessage(Message message, int replyCode, String replyText,String exchange, String routingKey) {System.out.println("消息主体: " + new String(message.getBody()));System.out.println("应答码: " + replyCode);System.out.println("描述:" + replyText);System.out.println("消息使用的交换器 exchange : " + exchange);System.out.println("消息使用的路由键 routing : " + routingKey);}
}
消息的接收端(消费者) ,通过RabbitListener 定位到到发送者发送的消息队列上:
package com.atguigu.ssyx.receiver;
import com.atguigu.ssyx.mq.constant.MqConst;
import com.rabbitmq.client.Channel;
import com.atguigu.ssyx.search.service.SkuService;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;import java.io.IOException;@Component
public class SkuReceiver {@Autowiredprivate SkuService skuService;/*** 商品上架* @param skuId* @param message* @param channel* @throws IOException*/@RabbitListener(bindings = @QueueBinding( //绑定接收什么消息 //消费者,接收消息value=@Queue(value = MqConst.QUEUE_GOODS_UPPER, durable = "true"), // durable 持久化exchange = @Exchange(value = MqConst.EXCHANGE_GOODS_DIRECT),key = {MqConst.ROUTING_GOODS_UPPER}))public void upperSku(Long skuId, Message message, Channel channel) throws IOException {// 发送者: rabbitService.sendMsg(MqConst.EXCHANGE_GOODS_DIRECT,// MqConst.ROUTING_GOODS_UPPER,// skuId);//:发送者函数原型:public boolean sendMsg(String exchange, String routingKey, Object message)// 这里的Object message使用 public MessageConverter messageConverter() 方法转成了json格式,// 确保了生产者和消费者之间的消息序列化与反序列化逻辑一致,// 所以 Object message 对应这里接收者的 Long skuId 形参,实现了参数传递的目的。try{if(skuId != null) {skuService.upperSku(skuId);}/*** 第一个参数:表示收到的消息的标号* 第二个参数:如果为true表示可以签收多个消息*///DeliveryTag 交货标签channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);} catch (IOException e) {System.out.print("john:IOException occurred while processing message");throw e; // 或者处理该异常}}/*** 商品下架* @param skuId*/@RabbitListener(bindings = @QueueBinding(value = @Queue(value = MqConst.QUEUE_GOODS_LOWER, durable = "true"),exchange = @Exchange(value = MqConst.EXCHANGE_GOODS_DIRECT), //交换器key = {MqConst.ROUTING_GOODS_LOWER} //路由器))public void lowerSku(Long skuId, Message message, Channel channel) throws IOException {try {if (skuId != null) {skuService.lowerSku(skuId);}channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);}catch (IOException e) {System.out.print("john:IOException occurred while processing message");throw e; // 或者处理该异常}}
}
注意着的对应关系:
public boolean sendMsg(String exchange, String routingKey, Object message)
sendMsg中的 Object message 被 public MessageConverter messageConverter() 正确解析成了 Long skuId
upperSku(Long skuId, Message message, Channel channel)
至此,rabbitmq完成了从service-product传递一个skuId到service-serach的动作。
若有消息发送送时,界面中会出翔对应的消息队列的名称:
4.将数据存入elasticsearch进行高效搜索
elasticsearch的配置
下载elasticsearch Past Releases of Elastic Stack Software | Elastic, elasticsearch-7.8.0-windows-x86_64.zip (注意版本不对,会有兼容性问题)
下载对应版本的分词器:elasticsearch-analysis-ik-7.8.0.zip
下载 kibana来进行客户端操作 kibana-7.8.0-windows-x86_64.zip
解压 elasticsearch-7.8.0-windows-x86_64文件,在
path\elasticsearch-7.8.0-windows-x86_64\elasticsearch-7.8.0\plugins\ 目录下新建文件夹,命名为ik,解压elasticsearch-analysis-ik-7.8.0,将内容所有内容copy到ik目录中:
在 es目录下运行 .\bin\elasticsearch.bat 启动es:
可以看到启动成功了:
浏览器界面看下 http://localhost:9200/ ,出现下面界面,说明启动成功
配置kibana
下载,解压,修改如下3个配置 (不同版本可能不一样,有增加或者删除项):
启动kibana:
.\bin\kibana.bat
浏览器访问看下:http://127.0.0.1:5601/
分词器的测试:
代码配置
首先定义一个存储仓库对应的类 SkuRepository, 继承自 ElasticsearchRepository:
package com.atguigu.ssyx.search.repository;
import com.atguigu.ssyx.model.search.SkuEs;
import org.springframework.data.elasticsearch.repository.ElasticsearchRepository;public interface SkuRepository extends ElasticsearchRepository<SkuEs, Long> {
}
使用 skuRepository.save(skuEs) 方法,即可存入 es仓库中。
使用 skuRepository.deleteById(skuId); 即可从仓库中删除。
使用 ProductFeignClient 进行跨服务访问
因为service-serach要使用service-product的信息,但时两个服务,这是需要使用 ProductFeignClient 进行restful风格的api传递参数,进行远程调用。
首先需要pom中引入依赖:
<!-- 服务调用feign --> <dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-openfeign</artifactId><scope>provided </scope> </dependency>
在对应的 service-serach的application-dev.yml中进行配置:
然后,定义service-product中的api,将参数传入到指定的api接口:
package com.atguigu.ssyx.product.api;import com.atguigu.ssyx.model.product.Category;
import com.atguigu.ssyx.model.product.SkuInfo;
import com.atguigu.ssyx.product.service.CategoryService;
import com.atguigu.ssyx.product.service.SkuInfoService;
import io.swagger.annotations.ApiOperation;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;@RestController
// 内部调用,不是前端调用
@RequestMapping("/api/product") //该类是为了内部调用,供service-search使用,所以命名为内部inner
public class ProductInnerController {@Autowiredprivate CategoryService categoryService;@Autowiredprivate SkuInfoService skuInfoService;//根据categoryId获取商品的category信息@ApiOperation(value = "根据categoryId获取商品的category信息")@GetMapping("inner/getCategory/{categoryId}")public Category getCategoryById(@PathVariable("categoryId") Long categoryId) {return categoryService.getById(categoryId);}//根据skuId获取skuInfo@ApiOperation(value = "根据skuId获取skuInfo信息")@GetMapping("inner/getSkuInfo/{skuId}")public SkuInfo getSkuInfoById(@PathVariable("skuId") Long skuId) {return skuInfoService.getById(skuId);}}
定义使用该api的函数,并使用@FeignClient 注明从哪个模块进行api的对接:
package com.atguigu.ssyx.client.product;import com.atguigu.ssyx.model.product.Category;
import com.atguigu.ssyx.model.product.SkuInfo;
import org.springframework.cloud.openfeign.FeignClient;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;//与service\service-product\src\main\resources\application.yml中的application。name名称一致
@FeignClient(value = "service-product")
public interface ProductFeignClient {//作为\service\service-product中的\product\api\ProductInnerController.java//中函数的接口定义文件//注意要使用完整的restful风格的路径//用于远程调用(service-search远程调用service-product)@GetMapping("/api/product/inner/getCategory/{categoryId}")public Category getCategoryById(@PathVariable("categoryId") Long categoryId);@GetMapping("/api/product/inner/getSkuInfo/{skuId}")public SkuInfo getSkuInfoById(@PathVariable("skuId") Long skuId);}
定义feign对象 productFeignClient,调用api对应的函数接口:
SkuInfo skuInfo = productFeignClient.getSkuInfoById(skuId); // ... Category category = productFeignClient.getCategoryById(skuInfo.getCategoryId());
package com.atguigu.ssyx.search.service.impl;import com.alibaba.fastjson.JSON;
import com.atguigu.ssyx.client.product.ProductFeignClient;
import com.atguigu.ssyx.enums.SkuType;
import com.atguigu.ssyx.model.product.Category;
import com.atguigu.ssyx.model.product.SkuInfo;
import com.atguigu.ssyx.model.search.SkuEs;
import com.atguigu.ssyx.search.repository.SkuRepository;
import com.atguigu.ssyx.search.service.SkuService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;import static jdk.nashorn.internal.runtime.regexp.joni.Config.log;@Slf4j
@Service
public class SkuServiceImpl implements SkuService {//通过service-product-client中的feign远程调用service-product中的方法@Autowiredprivate ProductFeignClient productFeignClient;//写入ES,即ElasticsearchRepository的接口类@Autowiredprivate SkuRepository skuRepository;@Overridepublic void upperSku(Long skuId) {SkuEs skuEs = new SkuEs();//为skuEs一个一个属性赋值SkuInfo skuInfo = productFeignClient.getSkuInfoById(skuId);if (skuInfo == null) {return;}Category category = productFeignClient.getCategoryById(skuInfo.getCategoryId());if (category != null) {skuEs.setCategoryId(category.getId());skuEs.setCategoryName(category.getName());}skuEs.setId(skuInfo.getId());skuEs.setKeyword(skuInfo.getSkuName() + "," + skuEs.getCategoryName()); //keyword不分词查询skuEs.setWareId(skuInfo.getWareId());skuEs.setIsNewPerson(skuInfo.getIsNewPerson());skuEs.setImgUrl(skuInfo.getImgUrl());//分词查询的字段数据类型必须是 FieldType.TextskuEs.setTitle(skuInfo.getSkuName());if (skuInfo.getSkuType() == SkuType.COMMON.getCode()) //普通还是秒杀{skuEs.setSkuType(0);skuEs.setPrice(skuInfo.getPrice().doubleValue());skuEs.setStock(skuInfo.getStock()); //仓库数量skuEs.setSale(skuInfo.getSale());skuEs.setPerLimit(skuInfo.getPerLimit()); //每人限购数量} else {//TODO 待完善-秒杀商品}//使用 ElasticsearchRepository 提供的方法保存ES信息SkuEs save = skuRepository.save(skuEs);System.out.print("upperSku:" + JSON.toJSONString(save));}@Overridepublic void lowerSku(Long skuId) {//使用 ElasticsearchRepository 提供的方法删除ES信息skuRepository.deleteById(skuId);}
}
5. 效果验证
实验中的表的名称是skues:
对应
public interface SkuRepository extends ElasticsearchRepository<SkuEs, Long> {
}
对应代码中的名称:
当服务中的 public void upperSku(Long skuId) 被调用时,即 skuRepository.save(skuEs);被调用时,对应的数据就会写入 es中,可在 kibana中进行查看:
使用下面命令进行查看:
GET /_cat/indices?v
查看更多数据,(数据结构取决于自己数据库和代码中对数据的定义):
使用如下命令,其中skues为你的index (表名)名称
POST /skues/_search
{"query":{"match_all":{}}
}
结果有了: