当前位置: 首页 > news >正文

Spring Boot 配置Kafka

1 Kafka

        Kafka 是由 Linkedin 公司开发的,它是一个分布式的,支持多分区、多副本,基于 Zookeeper 的分布式消息流平台,它同时也是一款开源的基于发布订阅模式的消息引擎系统。

2 Maven依赖

<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency>

3 Spring Boot配置

spring:kafka:bootstrap-servers: localhost:9092producer:batch-size: 16384buffer-memory: 67108864acks: 1compression-type: lz4key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializerconsumer:enable-auto-commit: trueauto-offset-reset: latestkey-deserializer: org.apache.kafka.common.serialization.StringSerializervalue-deserializer: org.apache.kafka.common.serialization.StringSerializer

4 生产者配置

4.1 KafkaProducerConfig

        生产者的相关配置,指定kafka的地址,消息序列化器。

        topic的分区数、副本数。

package com.xudongbase.kafka.producer;import com.xudongbase.kafka.constant.KafkaTopicConstant;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;import java.util.HashMap;
import java.util.Map;
import java.util.Properties;@Configuration
public class KafkaProducerConfig {@Value(value = "${spring.kafka.bootstrap-servers:}")private String bootstrapAddress;/*** 分区(分区数需要慎重设置,一般分区数为消费者的倍数,要不然在消费高峰时刻会出现消费速度不一样的情况)*/private static final int NUM_PARTITIONS = 5;/*** 副本*/private static final short REPLICATION_FACTOR = (short) 2;

http://www.mrgr.cn/news/80816.html

相关文章:

  • 阿里巴巴前端面试经验
  • 【Maven】基础(一)
  • golang中的值传递与引用传递,如何理解结构体的方法?为什么 T 和 *T 有不同的方法集?
  • RHEL 7.5 源码安装 mysql-5.7.17 数据库
  • phpSpider如何应对网页结构的变化
  • C语言-稀疏数组转置
  • 【0371】Postgres内核 实现构建一个 WAL record
  • 【java面向对象编程】第二弹----成员方法
  • 基于DockerCompose搭建Redis主从哨兵模式
  • js分页功能
  • 【Python】使用Selenium 操作浏览器 自动化测试 记录
  • regression里面的误差来源
  • Pytorch | 从零构建AlexNet对CIFAR10进行分类
  • Linux函数栈帧
  • windows上安装Redis
  • 对AI现状和未来发展的浅见
  • 关于代码注释
  • 分布式算法(一):从ACID和BASE到CAP
  • 面试题整理6----什么是进程最大数、最大线程数、进程打开的文件数,怎么调整
  • 百度飞桨:零基础入门深度学习
  • cocos creator制作2dTop-down游戏(虚拟摇杆、地图加载)
  • C# 基本信息介绍
  • python之OpenGL应用(1)入门篇
  • TCP拥塞控制
  • 2024年12月英语六级CET6写作与翻译笔记
  • 实现线程同步的方法