Kafka 4.0入门到熟练
1、集群部署
1.1、JDK
cat >> /etc/ansible/playbook/install-jdk.yml << EOF
- hosts: clusterremote_user: roottasks:- name: 分发JDKcopy: src=/opt/software/jdk-21.0.5_linux-x64_bin.tar.gz dest=/opt/software- name: 解压JDKshell: tar -xvzf /opt/software/jdk-21.0.5_linux-x64_bin.tar.gz -C /usr/local/java- name: 配置环境变量blockinfile:path: /etc/profileblock: |export JAVA_HOME=/usr/local/java/jdk-21.0.5export PATH=$JAVA_HOME/bin:$PATHmarker: "# {mark} JDK"
EOF
ansible-playbook install-jdk.yml
1.2、基础环境配置
cat >> /etc/ansible/playbook/modify_env.yml << EOF
- hosts: clusterremote_user: roottasks:#设置主机名- name: set hostnameshell: hostnamectl set-hostname {{hostname}}- name: distribute hosts to nodescopy:src: /etc/hostsdest: /etc#关闭防火墙- name: stop firewalldservice:name: firewalldstate: stoppedenabled: no#关闭selinux- name: setenforce 0shell: "setenforce 0"failed_when: false- name: set selinux disabledreplace:path: /etc/selinux/configregexp: '^SELINUX=enforcing'replace: 'SELINUX=disabled'- name: 设置最大文件句柄数lineinfile:path: /etc/security/limits.confinsertafter: '### AFTER THIS LINE'line: "{{ item }}"state: presentwith_items:- '* soft noproc 65536'- '* hard noproc 65536'- '* soft nofile 131072'- '* hard nofile 131072'- '* hard memlock unlimited'- '* soft memlock unlimited'- name: 关闭 THPlineinfile:path: /etc/rc.localline: |echo never > /sys/kernel/mm/transparent_hugepage/enabledecho never > /sys/kernel/mm/transparent_hugepage/defrag- name: Change permissionsshell: chmod +x /etc/rc.d/rc.local- name: 关闭swapreplace:path: /etc/fstabregexp: '^(\s*)([^#\n]+\s+)(\w+\s+)swap(\s+.*)$'replace: '#\1\2\3swap\4'backup: yes- name: Disable SWAPshell: |swapoff -a
EOF
ansible-playbook modify_env.yml
1.3、免密
cat >> /etc/ansible/playbook/ssh-pubkey.yml << EOF
- hosts: clustergather_facts: noremote_user: roottasks:- name: 免密登录authorized_key:user: rootkey: "{{ lookup('file', '/root/.ssh/id_rsa.pub') }}"state: present
EOF
ansible-playbook ssh-pubkey.yml
1.4、部署Kafka
cat >> /etc/ansible/playbook/install-kafka.yml << EOF
- hosts: clusterremote_user: roottasks:- name: 分发 kafkacopy: src=/opt/software/kafka_2.13-4.0.0.tgz dest=/opt/software- name: 解压 kafkashell: tar -xvzf /opt/software/kafka_2.13-4.0.0.tgz -C /opt/module/- name: rename to kafkashell: mv /opt/module/kafka_2.13-4.0.0 /opt/module/kafka- name: 赋权starrocksshell: chown -R starrocks:starrocks /opt/module/kafka- name: 配置环境变量blockinfile:path: /home/starrocks/.bashrcblock: |# kafkaexport KAFKA_HOME=/opt/module/kafkaexport PATH=$KAFKA_HOME/bin:$PATHmarker: "# {mark} KAFKA"EOF
ansible-playbook install-kafka.yml
1.5、修改Kafka配置
vi /opt/module/kafka/config/server.properties
############################# Server Basics ############################## The role of this server. Setting this puts us in KRaft mode
process.roles=broker,controller# The node id associated with this instance's roles
## 为不同kafka 服务分配不同的 id值,其他服务id随ip的递增而增加
node.id=1# List of controller endpoints used connect to the controller cluster
controller.quorum.bootstrap.servers=kylin-01:9093## 接入集群的kafka 服务的ip 和 编号,端口号使用监控和管理端口 9093
controller.quorum.voters=1@kylin-01:9093,2@kylin-02:9093,3@kylin-03:9093listeners=PLAINTEXT://kylin-01:9092,CONTROLLER://kylin-01:9093# Name of listener used for communication between brokers.
inter.broker.listener.name=PLAINTEXT# Listener name, hostname and port the broker or the controller will advertise to clients.
# If not set, it uses the value for "listeners".
advertised.listeners=PLAINTEXT://kylin-01:9092,CONTROLLER://kylin-01:9093# This is required if running in KRaft mode.
controller.listener.names=CONTROLLER# listener.security.protocol.map=BROKER:SASL_SSL,CONTROLLER:SASL_SSL
listener.security.protocol.map=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL# The number of threads that the server uses for receiving requests from the network and sending responses to the network
num.network.threads=3# The number of threads that the server uses for processing requests, which may include disk I/O
num.io.threads=8# The send buffer (SO_SNDBUF) used by the socket server
socket.send.buffer.bytes=102400# The receive buffer (SO_RCVBUF) used by the socket server
socket.receive.buffer.bytes=102400# The maximum size of a request that the socket server will accept (protection against OOM)
socket.request.max.bytes=104857600############################# Log Basics ############################## A comma separated list of directories under which to store log files
log.dirs=/data/kafka/data# The default number of log partitions per topic. More partitions allow greater
# parallelism for consumption, but this will also result in more files across
# the brokers.
num.partitions=1# This value is recommended to be increased for installations with data dirs located in RAID array.
num.recovery.threads.per.data.dir=1############################# Internal Topic Settings #############################
offsets.topic.replication.factor=1
share.coordinator.state.topic.replication.factor=1
share.coordinator.state.topic.min.isr=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
############################# Log Flush Policy #############################
# The number of messages to accept before forcing a flush of data to disk
#log.flush.interval.messages=10000# The maximum amount of time a message can sit in a log before we force a flush
#log.flush.interval.ms=1000############################# Log Retention Policy #############################
# The minimum age of a log file to be eligible for deletion due to age
log.retention.hours=168# A size-based retention policy for logs. Segments are pruned from the log unless the remaining
# segments drop below log.retention.bytes. Functions independently of log.retention.hours.
#log.retention.bytes=1073741824# The maximum size of a log segment file. When this size is reached a new log segment will be created.
log.segment.bytes=1073741824# The interval at which log segments are checked to see if they can be deleted according
# to the retention policies
log.retention.check.interval.ms=300000# 最大消息大小512MB
message.max.bytes=536870912
replica.fetch.max.bytes=536870912
# 最大请求字节大小1GB
max.request.size=1073741824
1.6、修改其它节点配置
node.id=2
controller.quorum.bootstrap.servers=kylin-02:9093
controller.quorum.voters=1@kylin-01:9093,2@kylin-02:9093,3@kylin-03:9093
listeners=PLAINTEXT://kylin-02:9092,CONTROLLER://kylin-02:9093
advertised.listeners=PLAINTEXT://kylin-02:9092,CONTROLLER://kylin-02:9093
node.id=3
controller.quorum.bootstrap.servers=kylin-03:9093
controller.quorum.voters=1@kylin-01:9093,2@kylin-02:9093,3@kylin-03:9093
listeners=PLAINTEXT://kylin-03:9092,CONTROLLER://kylin-03:9093
advertised.listeners=PLAINTEXT://kylin-03:9092,CONTROLLER://kylin-03:9093
1.7、修改日志目录
vi bin/kafka-run-class.sh
# Log directory to use
# 修改日志目录
LOG_DIR=/data/kafka/logif [ "x$LOG_DIR" = "x" ]; thenLOG_DIR="$base_dir/logs"
fi
1.8、初始化集群
生成存储目录唯一ID
bin/kafka-storage.sh random-uuid
格式化 kafka 存储目录(每个节点都需要执行)
bin/kafka-storage.sh format -t m6BZb8yRSzmdNvW9kzOoQg -c config/server.properties
1.9、启动集群
每个节点都执行启动服务命令
bin/kafka-server-start.sh -daemon config/server.properties
查看服务日志
tail -f /data/kafka/log/server.log
查看 kafka 节点状态
bin/kafka-broker-api-versions.sh --bootstrap-server kylin-01:9092
查看 Kafka 的端口监听状态
netstat -tuln | grep 9092
使用 ps 命令查看 Kafka 进程
ps aux | grep kafka
使用 top 或 htop 查看 Kafka 进程
top -p <PID>
2、操作命令
## 查看主题
kafka-topics.sh --bootstrap-server kylin-01:9092 --list## 查看主题明细
kafka-topics.sh --bootstrap-server kylin-01:9092 --describe <topic-id>## 创建主题,分区 partition 为5,副本 replication-factor 为2,broker 数应 大于等于 副本数。(broker 不必在创建 topic 时显示指定)
kafka-topics.sh --bootstrap-server kylin-01:9092 --create --topic <topic-id> --partitions 5 --replication-factor 2## 删除主题
kafka-topics.sh --bootstrap-server kylin-01:9092 --delete --topic <topic-id>## 查看消费者列表--list
kafka-consumer-groups.sh --bootstrap-server kylin-01:9092 --list## 查看指定消费组详情
kafka-consumer-groups.sh --bootstrap-server kylin-01:9092 --describe --group <group-id>## 删除特定group
kafka-consumer-groups.sh --bootstrap-server kylin-01:9092 --delete --group <group-id>## 打开一个生产者
kafka-console-producer.sh --bootstrap-server kylin-01:9092 --topic <topic-id>## 打开一个消费者
kafka-console-consumer.sh --bootstrap-server kylin-01:9092 --topic <topic-id> --consumer-property group.id=<group-id> --from-beginning ## 查看所有消费组详情--all-groups
kafka-consumer-groups.sh --bootstrap-server kylin-01:9092 --describe --all-groups查询消费者成员信息--members## 所有消费组成员信息
kafka-consumer-groups.sh --bootstrap-server kylin-01:9092 --describe --all-groups --members## 指定消费组成员信息
kafka-consumer-groups.sh --bootstrap-server kylin-01:9092 --describe --members --group <group-id>## 修改到最新offset
kafka-consumer-groups.sh --bootstrap-server kylin-01:9092 --group <group-id> --reset-offsets --topic <topic-id> --to-latest --execute## 预演-重设位移位置
kafka-consumer-groups.sh --bootstrap-server kylin-01:9092 --group <group-id> --reset-offsets --topic <topic-id> --to-offset 100 --dry-run## 重设位移位置
kafka-consumer-groups.sh --bootstrap-server kylin-01:9092 --group <group-id> --reset-offsets --topic <topic-id> --to-offset 100 --execute## 获取指定时间戳的offset
kafka-run-class.sh kafka.tools.GetOffsetShell --bootstrap-server kylin-01:9092 --topic <topic-id> --time 1740499200000## topic扩容
kafka-topic.sh --bootstrap-server kylin-01:9092 --topic <topic-id> --alter --partitions 16## 指定时间范围获取数据
kafka-consumer-groups.sh --bootstrap-server kylin-01:9092 --topic <topic-id> --property print.timestamp=true --max-messages 3 | awk -F 'CreateTime:|\t' '>=1742450425000 && <=1742464825000 {print bash}'
#
kafka-consumer-groups.sh --bootstrap-server kylin-01:9092 --topic <topic-id> --property print.timestamp=true --max-messages 3 | awk -F 'CreateTime:|\t' '>=1742450425000 && <=1742464825000 {print $0}'
#
kafka-consumer-groups.sh --bootstrap-server kylin-01:9092 --topic <topic-id> --property print.timestamp=true --partition 0 --offset 100 --property print.timestamp=true | grep '1027729757'