当前位置: 首页 > news >正文

SpringBoot日常:集成Kafka

文章目录

      • 1、pom.xml文件
      • 2、application.yml
      • 3、生产者配置类
      • 4、消费者配置类
      • 5、消息订阅
      • 6、生产者发送消息
      • 7、测试发送消息

本章内容主要介绍如何在springboot项目对kafka进行整合,最终能达到的效果就是能够在项目中通过配置相关的kafka配置,就能进行消息的生产和消费。

1、pom.xml文件

原本项目用 Spring Boot 的版本为2.6.X,所以这里用spring-cloud-starter-stream-kafka的版本用的是2.2.1.RELEASE,也可以用其他版本,但是注意兼容性,不然会编译运行报错

<dependencyManagement><dependencies><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-dependencies</artifactId><version>2021.0.2</version>  <!-- 确保与 Spring Boot 2.6.x 兼容 --><scope>import</scope><type>pom</type></dependency></dependencies>
</dependencyManagement><dependencies><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-stream-kafka</artifactId><version>2.2.1.RELEASE</version></dependency>
</dependencies>

2、application.yml

添加kafka的相关配置

spring:kafka:bootstrap-servers: 192.168.102.179:9092producer:acks: 1retries: 0batch-size: 30720000buffer-memory: 33554432key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializer#消费者配置consumer:group-id: test-kafka#是否开启手动提交 默认自动提交enable-auto-commit: true#如果enable.auto.commit为true,则消费者偏移自动提交给Kafka的频率(以毫秒为单位),默认值为5000  自动提交已消费offset时间间隔auto-commit-interval: 5000#earliest:分区已经有提交的offset从提交的offset开始消费,如果没有提交的offset,从头开始消费,latest:分区下已有提交的offset从提交的offset开始消费,没有提交的offset从新产生的数据开始消费auto-offset-reset: earliestkey-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializer#一次调用 poll() 操作时返回的最大记录数 默认为 500 条max-poll-records: 2#kafka session timeoutsession:timeout:ms: 300000listener:#kafka 没有创建指定的 topic 下  项目启动是否报错 true  falsemissing-topics-fatal: false#Kafka 的消费模式 single 每次单条消费消息  batch  每次批量消费消息type: singleack-mode: manual_immediate

3、生产者配置类

添加一个生产者配置类KafkaProducerConfig ,主要设置消息的序列化方式等消息处理方式

import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;import java.util.HashMap;
import java.util.Map;/*** @Author 码至终章* @Date 2025/1/8 11:33* @Version 1.0*/
@Configuration
@EnableKafka
public class KafkaProducerConfig {@Value("${spring.kafka.bootstrap-servers}")private String servers;@Bean("myProducerKafkaProps")public Map<String, Object> getMyKafkaProps() {Map<String, Object> props = new HashMap<>(4);props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);return props;}@Beanpublic ProducerFactory<String, String> newProducerFactory() {return new DefaultKafkaProducerFactory<>(getMyKafkaProps());}@Beanpublic KafkaTemplate<String, String> kafkaTemplate() {return new KafkaTemplate<>(newProducerFactory());}}

4、消费者配置类

创建一个消费者配置类KafkaConsumerConfig,主要设置一些消息的接收处理配置

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.kafka.listener.ContainerProperties;import java.util.HashMap;
import java.util.Map;/*** @Author 码至终章* @Date 2025/1/8 12:09* @Version 1.0*/
@Configuration
@EnableKafka
public class KafkaConsumerConfig {@Value("${spring.kafka.bootstrap-servers}")private String servers;@Value("${spring.kafka.consumer.group-id}")private String groupId;@Value("${spring.kafka.consumer.auto-offset-reset}")private String offsetReset;@Value("${spring.kafka.consumer.max-poll-records}")private String maxPollRecords;@Value("${spring.kafka.consumer.auto-commit-interval}")private String autoCommitIntervalMs;@Value("${spring.kafka.consumer.enable-auto-commit}")private boolean enableAutoCommit;@Bean("myConsumerKafkaProps")public Map<String, Object> getMyKafkaProps() {Map<String, Object> props = new HashMap<>(12);//是否自动提交props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);//kafak 服务器props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);//不存在已经提交的offest时 earliest 表示从头开始消费,latest 表示从最新的数据消费props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, offsetReset);//消费组idprops.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);//一次调用poll()操作时返回的最大记录数,默认值为500props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords);//自动提交时间间隔 默认 5秒props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, autoCommitIntervalMs);//props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeoutMs);return props;}/*** 消费者工厂*/@Bean("myContainerFactory")public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(getMyKafkaProps()));// 并发创建的消费者数量factory.setConcurrency(3);// 开启批处理factory.setBatchListener(true);//拉取超时时间factory.getContainerProperties().setPollTimeout(1500);//是否自动提交 ACK kafka 默认是自动提交if (!enableAutoCommit) {//共有其中方式factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.BATCH);}return factory;}
}

5、消息订阅

创建一个消费者监听消息类,里面对主题消息监听,这里的测试主题为testone

import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;/*** @Author 码至终章* @Date 2025/1/8 14:19* @Version 1.0*/
@Slf4j
@Component
public class MyKafkaConsumer {@KafkaListener(id = "my-kafka-consumer",idIsGroup = false, topics = "topicone",containerFactory = "myContainerFactory")public void listen(String message) {log.info("接收到主题消息,消息内容:{}", message);}
}

6、生产者发送消息

为了方便调用测试,这里在controller编写一个方法发送消息

@RestController
@Slf4j
public class TestController {@Autowiredprivate KafkaTemplate<String, String> kafkaTemplate;@GetMapping ("/sendMessage")public void sendMessage(@RequestParam String message) {this.kafkaTemplate.send("topicone", message);}
}

7、测试发送消息

这里简单用postman调用接口发送一条消息
在这里插入图片描述
从idea的程序控制台可以看到消费者监听可以正常接收到消息
在这里插入图片描述


http://www.mrgr.cn/news/83104.html

相关文章:

  • Redis为 List/Set/Hash 的元素设置单独的过期时间
  • C++虚函数(八股总结)
  • (已开源-AAAI25) RCTrans:雷达相机融合3D目标检测模型
  • 超越Ray-Ban Meta?雷鸟V3 AI拍摄眼镜正式发布!定价1799元
  • 【MySQL高可用】读写分离与主从复制
  • Leetcode 3409. Longest Subsequence With Decreasing Adjacent Difference
  • Python —— 常用的字符串方法
  • JavaSE
  • UnityRenderStreaming使用记录(五)
  • 本地缓存:Guava Cache
  • Ubuntu平台虚拟机软件学习笔记
  • Linux驱动学习之第二个驱动程序(LED点亮关闭驱动程序-分层设计思想,使其能适应不同的板子-驱动程序模块为多个源文件怎么写Makefile)
  • 【深度学习】布匹寻边:抓边误差小于5px【附完整链接】
  • 【vue3封装element-plus的反馈组件el-drawer、el-dialog】
  • docker搭建atlassian-confluence:7.2.0
  • XS5037C一款应用于专业安防摄像机的图像信号处理芯片,支持MIPI和 DVP 接口,内置高性能ISP处理器,支持3D降噪和数字宽动态
  • Onedrive精神分裂怎么办(有变更却不同步)
  • 【Redis源码】 RedisObject结构体
  • 单片机-定时器中断
  • formik 的使用
  • 202305 青少年软件编程等级考试C/C++ 一级真题答案及解析(电子学会)
  • ESP32编译和双OTA分区问题
  • Ubuntu更改内核
  • 使用LinkedList手撕图的邻接表
  • eNSP之家----ACL实验入门实例详解(Access Control List访问控制列表)(重要重要重要的事说三遍)
  • (五)WebGL中vertexAttribPointer方法的使用详解