当前位置: 首页 > news >正文

Kafka 4.0入门到熟练

1、集群部署

1.1、JDK

cat >> /etc/ansible/playbook/install-jdk.yml << EOF 
- hosts: clusterremote_user: roottasks:- name: 分发JDKcopy: src=/opt/software/jdk-21.0.5_linux-x64_bin.tar.gz  dest=/opt/software- name: 解压JDKshell: tar -xvzf /opt/software/jdk-21.0.5_linux-x64_bin.tar.gz -C /usr/local/java- name: 配置环境变量blockinfile:path: /etc/profileblock: |export JAVA_HOME=/usr/local/java/jdk-21.0.5export PATH=$JAVA_HOME/bin:$PATHmarker: "# {mark} JDK"
EOF
ansible-playbook install-jdk.yml

1.2、基础环境配置

cat >> /etc/ansible/playbook/modify_env.yml << EOF 
- hosts: clusterremote_user: roottasks:#设置主机名- name: set hostnameshell: hostnamectl set-hostname {{hostname}}- name: distribute hosts to nodescopy:src: /etc/hostsdest: /etc#关闭防火墙- name: stop firewalldservice:name: firewalldstate: stoppedenabled: no#关闭selinux- name: setenforce 0shell: "setenforce 0"failed_when: false- name: set selinux disabledreplace:path: /etc/selinux/configregexp: '^SELINUX=enforcing'replace: 'SELINUX=disabled'- name: 设置最大文件句柄数lineinfile:path: /etc/security/limits.confinsertafter: '### AFTER THIS LINE'line: "{{ item }}"state: presentwith_items:- '*       soft   noproc  65536'- '*       hard   noproc  65536'- '*       soft   nofile  131072'- '*       hard   nofile  131072'- '*       hard memlock unlimited'- '*       soft memlock unlimited'- name: 关闭 THPlineinfile:path: /etc/rc.localline: |echo never > /sys/kernel/mm/transparent_hugepage/enabledecho never > /sys/kernel/mm/transparent_hugepage/defrag- name: Change permissionsshell: chmod +x /etc/rc.d/rc.local- name: 关闭swapreplace:path: /etc/fstabregexp: '^(\s*)([^#\n]+\s+)(\w+\s+)swap(\s+.*)$'replace: '#\1\2\3swap\4'backup: yes- name: Disable SWAPshell: |swapoff -a
EOF
ansible-playbook modify_env.yml

1.3、免密

cat >> /etc/ansible/playbook/ssh-pubkey.yml << EOF 
- hosts: clustergather_facts: noremote_user: roottasks:- name: 免密登录authorized_key:user: rootkey: "{{ lookup('file', '/root/.ssh/id_rsa.pub') }}"state: present
EOF
ansible-playbook ssh-pubkey.yml

1.4、部署Kafka

cat >> /etc/ansible/playbook/install-kafka.yml << EOF 
- hosts: clusterremote_user: roottasks:- name: 分发 kafkacopy: src=/opt/software/kafka_2.13-4.0.0.tgz  dest=/opt/software- name: 解压 kafkashell: tar -xvzf /opt/software/kafka_2.13-4.0.0.tgz -C /opt/module/- name: rename to kafkashell: mv /opt/module/kafka_2.13-4.0.0 /opt/module/kafka- name: 赋权starrocksshell: chown -R starrocks:starrocks /opt/module/kafka- name: 配置环境变量blockinfile:path: /home/starrocks/.bashrcblock: |# kafkaexport KAFKA_HOME=/opt/module/kafkaexport PATH=$KAFKA_HOME/bin:$PATHmarker: "# {mark} KAFKA"EOF
ansible-playbook install-kafka.yml

1.5、修改Kafka配置

vi /opt/module/kafka/config/server.properties

############################# Server Basics ############################## The role of this server. Setting this puts us in KRaft mode
process.roles=broker,controller# The node id associated with this instance's roles
## 为不同kafka 服务分配不同的 id值,其他服务id随ip的递增而增加
node.id=1# List of controller endpoints used connect to the controller cluster
controller.quorum.bootstrap.servers=kylin-01:9093## 接入集群的kafka 服务的ip 和 编号,端口号使用监控和管理端口 9093
controller.quorum.voters=1@kylin-01:9093,2@kylin-02:9093,3@kylin-03:9093listeners=PLAINTEXT://kylin-01:9092,CONTROLLER://kylin-01:9093# Name of listener used for communication between brokers.
inter.broker.listener.name=PLAINTEXT# Listener name, hostname and port the broker or the controller will advertise to clients.
# If not set, it uses the value for "listeners".
advertised.listeners=PLAINTEXT://kylin-01:9092,CONTROLLER://kylin-01:9093# This is required if running in KRaft mode.
controller.listener.names=CONTROLLER# listener.security.protocol.map=BROKER:SASL_SSL,CONTROLLER:SASL_SSL
listener.security.protocol.map=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL# The number of threads that the server uses for receiving requests from the network and sending responses to the network
num.network.threads=3# The number of threads that the server uses for processing requests, which may include disk I/O
num.io.threads=8# The send buffer (SO_SNDBUF) used by the socket server
socket.send.buffer.bytes=102400# The receive buffer (SO_RCVBUF) used by the socket server
socket.receive.buffer.bytes=102400# The maximum size of a request that the socket server will accept (protection against OOM)
socket.request.max.bytes=104857600############################# Log Basics ############################## A comma separated list of directories under which to store log files
log.dirs=/data/kafka/data# The default number of log partitions per topic. More partitions allow greater
# parallelism for consumption, but this will also result in more files across
# the brokers.
num.partitions=1# This value is recommended to be increased for installations with data dirs located in RAID array.
num.recovery.threads.per.data.dir=1############################# Internal Topic Settings  #############################
offsets.topic.replication.factor=1
share.coordinator.state.topic.replication.factor=1
share.coordinator.state.topic.min.isr=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
############################# Log Flush Policy #############################
# The number of messages to accept before forcing a flush of data to disk
#log.flush.interval.messages=10000# The maximum amount of time a message can sit in a log before we force a flush
#log.flush.interval.ms=1000############################# Log Retention Policy #############################
# The minimum age of a log file to be eligible for deletion due to age
log.retention.hours=168# A size-based retention policy for logs. Segments are pruned from the log unless the remaining
# segments drop below log.retention.bytes. Functions independently of log.retention.hours.
#log.retention.bytes=1073741824# The maximum size of a log segment file. When this size is reached a new log segment will be created.
log.segment.bytes=1073741824# The interval at which log segments are checked to see if they can be deleted according
# to the retention policies
log.retention.check.interval.ms=300000# 最大消息大小512MB
message.max.bytes=536870912
replica.fetch.max.bytes=536870912
# 最大请求字节大小1GB
max.request.size=1073741824

1.6、修改其它节点配置

node.id=2
controller.quorum.bootstrap.servers=kylin-02:9093
controller.quorum.voters=1@kylin-01:9093,2@kylin-02:9093,3@kylin-03:9093
listeners=PLAINTEXT://kylin-02:9092,CONTROLLER://kylin-02:9093
advertised.listeners=PLAINTEXT://kylin-02:9092,CONTROLLER://kylin-02:9093
node.id=3
controller.quorum.bootstrap.servers=kylin-03:9093
controller.quorum.voters=1@kylin-01:9093,2@kylin-02:9093,3@kylin-03:9093
listeners=PLAINTEXT://kylin-03:9092,CONTROLLER://kylin-03:9093
advertised.listeners=PLAINTEXT://kylin-03:9092,CONTROLLER://kylin-03:9093

1.7、修改日志目录

vi bin/kafka-run-class.sh

# Log directory to use
# 修改日志目录
LOG_DIR=/data/kafka/logif [ "x$LOG_DIR" = "x" ]; thenLOG_DIR="$base_dir/logs"
fi

1.8、初始化集群

生成存储目录唯一ID

bin/kafka-storage.sh random-uuid

格式化 kafka 存储目录(每个节点都需要执行)

bin/kafka-storage.sh format -t m6BZb8yRSzmdNvW9kzOoQg -c config/server.properties

1.9、启动集群

每个节点都执行启动服务命令

bin/kafka-server-start.sh -daemon config/server.properties

查看服务日志

 tail -f /data/kafka/log/server.log

查看 kafka 节点状态

bin/kafka-broker-api-versions.sh --bootstrap-server kylin-01:9092

查看 Kafka 的端口监听状态

netstat -tuln | grep 9092

使用 ps 命令查看 Kafka 进程

ps aux | grep kafka

使用 top 或 htop 查看 Kafka 进程

top -p <PID>

2、操作命令

## 查看主题
kafka-topics.sh --bootstrap-server kylin-01:9092 --list## 查看主题明细
kafka-topics.sh --bootstrap-server kylin-01:9092 --describe <topic-id>## 创建主题,分区 partition 为5,副本 replication-factor 为2,broker 数应 大于等于 副本数。(broker 不必在创建 topic 时显示指定)
kafka-topics.sh --bootstrap-server kylin-01:9092 --create --topic <topic-id> --partitions 5 --replication-factor 2## 删除主题
kafka-topics.sh --bootstrap-server kylin-01:9092 --delete --topic <topic-id>## 查看消费者列表--list
kafka-consumer-groups.sh --bootstrap-server kylin-01:9092 --list## 查看指定消费组详情
kafka-consumer-groups.sh --bootstrap-server kylin-01:9092 --describe --group <group-id>## 删除特定group
kafka-consumer-groups.sh --bootstrap-server kylin-01:9092 --delete --group <group-id>## 打开一个生产者
kafka-console-producer.sh --bootstrap-server kylin-01:9092 --topic <topic-id>## 打开一个消费者
kafka-console-consumer.sh --bootstrap-server kylin-01:9092 --topic <topic-id>  --consumer-property group.id=<group-id>  --from-beginning ## 查看所有消费组详情--all-groups
kafka-consumer-groups.sh --bootstrap-server kylin-01:9092 --describe --all-groups查询消费者成员信息--members## 所有消费组成员信息
kafka-consumer-groups.sh --bootstrap-server kylin-01:9092 --describe --all-groups --members## 指定消费组成员信息
kafka-consumer-groups.sh --bootstrap-server kylin-01:9092 --describe --members --group <group-id>## 修改到最新offset
kafka-consumer-groups.sh --bootstrap-server kylin-01:9092 --group <group-id> --reset-offsets --topic <topic-id> --to-latest --execute## 预演-重设位移位置
kafka-consumer-groups.sh --bootstrap-server kylin-01:9092 --group <group-id> --reset-offsets --topic <topic-id> --to-offset 100 --dry-run## 重设位移位置
kafka-consumer-groups.sh --bootstrap-server kylin-01:9092 --group <group-id> --reset-offsets --topic <topic-id> --to-offset 100 --execute## 获取指定时间戳的offset
kafka-run-class.sh kafka.tools.GetOffsetShell --bootstrap-server kylin-01:9092 --topic <topic-id> --time 1740499200000## topic扩容
kafka-topic.sh  --bootstrap-server kylin-01:9092 --topic <topic-id> --alter --partitions 16## 指定时间范围获取数据
kafka-consumer-groups.sh --bootstrap-server kylin-01:9092 --topic <topic-id> --property print.timestamp=true --max-messages 3 | awk -F 'CreateTime:|\t' '>=1742450425000 && <=1742464825000 {print bash}' 
#
kafka-consumer-groups.sh --bootstrap-server kylin-01:9092 --topic <topic-id> --property print.timestamp=true --max-messages 3 | awk -F 'CreateTime:|\t' '>=1742450425000 && <=1742464825000 {print $0}' 
#
kafka-consumer-groups.sh --bootstrap-server kylin-01:9092 --topic <topic-id> --property print.timestamp=true --partition 0 --offset 100 --property print.timestamp=true | grep '1027729757'

3、API


http://www.mrgr.cn/news/96768.html

相关文章:

  • 41.C++哈希6(哈希切割/分片/位图/布隆过滤器与海量数据处理场景)
  • ML 聚类算法 dbscan|| OPTICS
  • 【C++】vector常用方法总结
  • Springboot学习笔记3.28
  • JVM——模型分析、回收机制
  • 七. JAVA类和对象(二)
  • 消息中间件对比与选型指南:Kafka、ActiveMQ、RabbitMQ与RocketMQ
  • 前端界面在线excel编辑器 。node编写post接口获取文件流,使用传参替换表格内容展示、前后端一把梭。
  • LLM应用层推荐 -- 基于文档的问答tools Web UI 框架 开源向量库 -- 推荐、对比
  • 003-JMeter发起请求详解
  • Vue中将pdf文件转为图片
  • GitPython库快速应用入门
  • 【超详细】一文解决更新小米澎湃2.0后LSPose失效问题
  • 使用 Less 实现 PC 和移动端样式适配
  • pytorch模型的进阶训练和性能优化
  • uniapp -- 列表垂直方向拖拽drag组件
  • 【橘子大模型】关于PromptTemplate
  • 从 BBRv2 到 BBRv3
  • Windows搭建AI大模型应用开发环境以及踩过的坑
  • sourceinsight 4.0 任意配置主题颜色风格的方法