当前位置: 首页 > news >正文

001 Kafka入门及安装

Kafka入门及安装

文章目录

  • Kafka入门及安装
    • 1.介绍
      • Kafka的基本概念和核心组件
    • 2.安装
      • 1.docker快速安装
          • zookeeper安装
          • kafka安装
        • 添加topic
        • 删除topic
        • kafka-ui安装
      • 2.Docker安装(SASL/PLAIN认证配置-用户名密码)

来源参考的deepseek,如有侵权联系立删

1.介绍

Kafka的基本概念和核心组件

Kafka是分布式流处理平台,由 Scala 和 Java 编写。Kafka 的核心概念和组件包括:

  1. Producer:发布消息的客户端,向Broker发送数据。
  2. Consumer:订阅消息的客户端,从Broker读取数据。
  3. Broker:Kafka服务节点,负责消息存储和转发。
  4. Topic:消息的逻辑分类单位,类似数据库表。
  5. Partition:主题的分区,实现并行处理和扩展性。
  6. Consumer Group:多个消费者组成的组,协同消费同一主题的不同分区。

详细介绍

  1. 主题(Topic) :主题是消息的逻辑分类单位,类似于数据库中的表。每个主题可以包含多个分区(Partition),用于实现水平扩展和高吞吐量。
  2. 分区(Partition) :分区是物理上的消息分组,每个分区是一个有序且不可变的消息队列。分区可以分布在不同的服务器上,以实现数据的分布式存储和处理。
  3. Broker(服务器) :Broker 是 Kafka 集群中的节点,负责接收和存储消息。一个集群可以由多个 Broker 组成,每个 Broker 可以管理多个分区和副本。
  4. 生产者(Producer) :生产者是向 Kafka 主题发送消息的客户端。生产者将消息发布到指定的主题中,消息会被路由到相应的分区。
  5. 消费者(Consumer) :消费者是从 Kafka 主题中拉取消息的客户端。消费者可以订阅一个或多个主题,并从这些主题中读取消息。消费者可以组成消费者组(Consumer Group),以实现消息的并行消费和故障恢复。
  6. 消费者组(Consumer Group) :消费者组由多个消费者组成,组内的每个消费者负责消费不同分区的消息,以实现高效的消息分发和容错能力。
  7. ZooKeeper:ZooKeeper 是一个分布式协调服务,用于管理 Kafka 集群的元数据,如主题、分区和副本的配置信息。ZooKeeper 还负责选举和维护集群的领导者。
  8. 日志(Log) :Kafka 使用日志来存储消息,每个主题的日志由多个分区组成,每个分区包含一系列有序的消息。日志可以进行压缩和持久化,以提高存储效率和数据可靠性。
  9. 偏移量(Offset) :偏移量用于跟踪消息的消费位置。消费者通过偏移量来确定从何处继续消费消息。
  10. 复制机制:Kafka 通过复制机制提高容错能力。每个分区可以有多个副本,即使部分 Broker 失效,服务仍可继续运行。

这些组件共同构成了 Kafka 的核心架构,使其能够实现高吞吐量、低延迟、可扩展性和持久化的消息传递和处理。

2.安装

1.docker快速安装

zookeeper安装
docker pull wurstmeister/zookeeper
docker run -d --name zookeeper -p 2181:2181 -t wurstmeister/zookeeper
kafka安装
docker pull wurstmeister/kafka
docker run -d --name kafka -p 9092:9092 --link zookeeper:zookeeper  \
-v /etc/localtime:/etc/localtime  \
--env KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 \
--env KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://127.0.0.1:9092 \
--env KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 \
--env KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1 wurstmeister/kafka
--name kafka: 设置容器的名字为“kafka”。-p 9092:9092: 将容器的9092端口映射到宿主机的9092端口。--link zookeeper:zookeeper: 连接到名为“zookeeper”的另一个Docker容器,并且在当前的容器中可以通过zookeeper这个别名来访问它。--env KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181: 设置环境变量,指定ZooKeeper的连接字符串。--env KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092: 设置环境变量,指定Kafka的advertised listeners。--env KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092: 设置环境变量,指定Kafka的listeners。--env KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1: 设置环境变量,指定offsets topic的副本因子。wurstmeister/kafka: 使用的Docker镜像名字。/opt/kafka/config/server.properties  #镜像内部配置文件地址
./kafka-server-start.sh -daemon ./config/server.properties  #启动命令
添加topic
kafka-topics.sh --create --bootstrap-server 127.0.0.1:9092 \
--replication-factor 1 --partitions 1 --topic mytest
删除topic
kafka-topics.sh \--delete --topic lx \--zookeeper 127.0.0.1:2181
kafka-ui安装
docker run --name kafka-ui  -p 19092:8080 \-e KAFKA_CLUSTERS_0_NAME=local \-e KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS=127.0.0.1:9092 \-d provectuslabs/kafka-ui:latest         

2.Docker安装(SASL/PLAIN认证配置-用户名密码)

1.宿主机创建用户密码配置文件

#创建文件
touch   /dockerData/kafka/config/kafka_server_jaas.conf

2.编辑kafka_server_jaas.conf配置

KafkaServer {org.apache.kafka.common.security.plain.PlainLoginModule requiredusername="admin"password="123456"user_admin="123456"user_moshangshang="123456";
};                            
user_admin="123456"
user_后面跟的是用户名,然后123456为密码,可配置多个用户密码

3.运行kafka容器

docker run -d --name kafka -p 9092:9092 --link zookeeper:zookeeper  \
-v /etc/localtime:/etc/localtime  \
-v /dockerData/kafka/config/kafka_server_jaas.conf:/opt/kafka/config/kafka_server_jaas.conf  \
--env KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 \
--env 192.168.1.250:9092 \
--env KAFKA_LISTENERS=SASL_PLAINTEXT://0.0.0.0:9092 \
--env KAFKA_OPTS="-Djava.security.auth.login.config=/opt/kafka/config/kafka_server_jaas.conf" \
--env KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1 wurstmeister/kafka

4.宿主机创建kafka的server.properties 配置

5.修改并添加认证相关配置

listeners=SASL_PLAINTEXT://0.0.0.0:9092
advertised.listeners=SASL_PLAINTEXT://192.168.1.250:9092
security.inter.broker.protocol=SASL_PLAINTEXT
sasl.enabled.mechanisms=PLAIN
sasl.mechanism.inter.broker.protocol=PLAIN

6.拷贝宿主机配置文件覆盖容器内部原本文件

docker  cp /dockerData/kafka/config/server.properties   kafka:/opt/kafka/config

7.重启容器
8.整合springboot的yml配置

#kafka配置
spring:kafka:#kafka集群地址bootstrap-servers: 192.168.25.100:9092producer:#批量发送的数据量大小batch-size: 1#可用发送数量的最大缓存buffer-memory: 33554432#key序列化器key-serializer: org.apache.kafka.common.serialization.StringSerializer#value序列化器value-serializer: org.apache.kafka.common.serialization.StringSerializer#达到多少时间后,会发送properties:linger.ms: 1security.protocol: SASL_PLAINTEXTsasl.mechanism: PLAINsasl.jaas.config: org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="123456";#代表集群中从节点都持久化后才认为发送成功acks: -1consumer:enable-auto-commit: falsekey-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializerproperties:session.timeout.ms: 15000security.protocol: SASL_PLAINTEXTsasl.mechanism: PLAINsasl.jaas.config: org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="123456";group-id: testauto-offset-reset: earliestlistener:ack-mode: MANUAL # 精准控制offset提交concurrency: 3 # 并发消费者数type: batch

http://www.mrgr.cn/news/92526.html

相关文章:

  • 弱监督语义分割学习计划(1)-简单实现Open Vocabulary Label但是效果不好
  • Vue 3 搭建前端模板并集成 Ant Design Vue(2025)
  • 编写Redis开机自启动脚本
  • Imagination GPU 3D Graphics Wrokload
  • 蓝桥杯第十六届嵌入式模拟编程题解析
  • lattice hdl实现spi接口
  • Flask笔记
  • lowagie(itext)老版本手绘PDF,包含页码、水印、图片、复选框、复杂行列合并、行高设置等。
  • kubernetes 初学命令
  • Docker 部署 MinIO 对象存储服务
  • 【Qt】为程序增加闪退crash报告日志
  • 27.[前端开发-JavaScript基础]Day04-函数基本使用-递归-变量作用域-函数式编程
  • 24.[前端开发-JavaScript基础]Day01-插件配置-变量-数据
  • SpringBoot项目注入 traceId 来追踪整个请求的日志链路
  • RAG 阿里云
  • 数据开发的简历及面试
  • C# Unity 唐老狮 No.2 模拟面试题
  • Oracle 12c Docker安装问题排查 sga_target 1536M is too small
  • Vue2+Element实现Excel文件上传下载预览【超详细图解】
  • deepseek-r1-centos-本地服务器配置方法