Spring Boot 配置Kafka
1 Kafka
Kafka 是由 Linkedin
公司开发的,它是一个分布式的,支持多分区、多副本,基于 Zookeeper 的分布式消息流平台,它同时也是一款开源的基于发布订阅模式的消息引擎系统。
2 Maven依赖
<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency>
3 Spring Boot配置
spring:kafka:bootstrap-servers: localhost:9092producer:batch-size: 16384buffer-memory: 67108864acks: 1compression-type: lz4key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializerconsumer:enable-auto-commit: trueauto-offset-reset: latestkey-deserializer: org.apache.kafka.common.serialization.StringSerializervalue-deserializer: org.apache.kafka.common.serialization.StringSerializer
4 生产者配置
4.1 KafkaProducerConfig
生产者的相关配置,指定kafka的地址,消息序列化器。
topic的分区数、副本数。
package com.xudongbase.kafka.producer;import com.xudongbase.kafka.constant.KafkaTopicConstant;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;import java.util.HashMap;
import java.util.Map;
import java.util.Properties;@Configuration
public class KafkaProducerConfig {@Value(value = "${spring.kafka.bootstrap-servers:}")private String bootstrapAddress;/*** 分区(分区数需要慎重设置,一般分区数为消费者的倍数,要不然在消费高峰时刻会出现消费速度不一样的情况)*/private static final int NUM_PARTITIONS = 5;/*** 副本*/private static final short REPLICATION_FACTOR = (short) 2;