Kafka常用的一些命令
Kafka配置log.dirs为多目录时,若将数据目录下的文件全部删除,那么分区在当前broker下最终落在哪个数据目录下会重新分配
# 配置sasl用户名密码
cat > /tmp/client.properties <<EOF
security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="test" password="test";
EOFcat > /tmp/client.properties <<EOF
security.protocol=SASL_PLAINTEXT
sasl.mechanism=SCRAM-SHA-512
sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="u_dc_kafkapusher_cestc" password="qkj9Y3bzfa7er";
EOF# 查看topic/kafka/bin/kafka-topics.sh --topic log_type_linux_process --describe --bootstrap-server 10.255.225.16:30001 --command-config /tmp/client.properties
# 命令测试/kafka/bin/kafka-topics.sh --create --bootstrap-server 172.16.0.18:9093,172.16.0.19:9093,172.16.0.20:9093 --replication-factor 3 --partitions 3 --topic dblab01/kafka/bin/kafka-topics.sh --describe --bootstrap-server 172.16.0.9:9093,172.16.0.10:9093,172.16.0.11:9093 --topic dblab01/kafka/bin/kafka-topics.sh --list --bootstrap-server 192.168.109.131:9092 /kafka/bin/kafka-console-producer.sh --bootstrap-server localhost:9094 --topic test1 --producer.config /tmp/client.properties/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9094 --topic test1 --from-beginning --consumer.config /tmp/client.properties #指定topic 从头消费 并打印offset/分区/时间戳/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic topic2 --property print.offset=true --property print.partition=true --property print.timestamp=true --from-beginning > topic2Log.txt
# 指定topic分区的offset消费/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9093 --property print.offset=true --property print.partition=true --property print.timestamp=true --topic topic2 --partition 0 --offset 0 --max-messages 10# 指定topic分区从头消费10条消息/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9093 --property print.offset=true --property print.partition=true --property print.timestamp=true --topic topic1 --partition 0 --from-beginning --max-messages 10# 性能测试/kafka/bin/kafka-producer-perf-test.sh --topic topic1 --num-records 100000000000 --record-size 1024 --throughput -1 --producer-props bootstrap.servers=localhost:9092 batch.size=20971520 linger.ms=1 max.request.size=67108864 buffer.memory=67108864 compression.type=lz4/kafka/bin/kafka-consumer-perf-test.sh --bootstrap-server localhost:9093 --topic topic1 --messages 10000000 --reporting-interval 1000 --show-detailed-stats --timeout 30000# 从文件中读取随机读取消息发送/kafka/bin/kafka-producer-perf-test.sh --topic topic1 --num-records 1000 --payload-file /tmp/testRecords.txt --throughput -1 --producer-props bootstrap.servers=localhost:9093 batch.size=20971520 linger.ms=1 max.request.size=67108864 buffer.memory=67108864 compression.type=lz4 /kafka/bin/kafka-producer-perf-test.sh --topic topic1 --num-records 1000000000 --record-size 20971520 --throughput -1 --producer-props bootstrap.servers=localhost:9093 batch.size=20971520 linger.ms=500 max.request.size=67108864 buffer.memory=67108864 compression.type=lz4/kafka/bin/kafka-producer-perf-test.sh --topic topic1 --num-records 1000000000 --record-size 1024 --throughput -1 --producer-props bootstrap.servers=localhost:9093 batch.size=20971520 linger.ms=50 max.request.size=67108864 buffer.memory=67108864 compression.type=lz4
##性能测试:
#生产 /kafka/bin/kafka-producer-perf-test.sh --topic topic1 --num-records 10000000000 --record-size 10 --throughput -1 --producer-props bootstrap.servers=localhost:9094 linger.ms=50 batch.size=524288 compression.type=lz4 acks=0 max.request.size=5242880 buffer.memory=268435456
#消费 /kafka/bin/kafka-consumer-perf-test.sh --broker-list localhost:9094 --topic test15 --group PerformanceTest_Group --fetch-size 1048576 --messages 10000000 --threads 10 --timeout 10000 # 使用分区方法/kafka/bin/kafka-producer-perf-test.sh --topic dblab01 --num-records 100000000 --record-size 1024 --throughput -1 --producer-props bootstrap.servers=localhost:9093 linger.ms=300 batch.size=524288 acks=-1 partitioner.class=com.test.cmqadapter.kafka.ProducerPartitioner &ps ux | grep -E 'topic'| grep -v grep |awk '{print $2}' |xargs kill -s 9offset操作/kafka/bin/kafka-consumer-groups.sh --bootstrap-server localhost:9093 --all-groups --all-topics --describe/kafka/bin/kafka-consumer-groups.sh --bootstrap-server localhost:9093 --group test-consumer-group --describe
# 消费组更新指定topic分区的消费位移/kafka/bin/kafka-consumer-groups.sh --bootstrap-server localhost:9094 --group $groupName --reset-offsets --topic ${topicname}:${partition} --to-latest --execute#删除topic分区指定offset偏移量的消息
{"partitions": [{"topic": "topic", "partition": 0, "offset": 2}],"version":1
}
执行删除命令/kafka/bin/kafka-delete-records.sh --bootstrap-server localhost:9093 --offset-json-file /tmp/json.tmp
# 指定topic分区的offset消费/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9094 --topic {topicName} --partition {partition} --offset {offset} --property print.offset=true --max-messages 5kafka 重置offset:/kafka/bin/kafka-consumer-groups.sh --bootstrap-server localhost:9094 --group ${groupName} --topic {topicName}:{partition} --property print.offset=true --reset-offsets --to-offset {offset} --execute
#指定时间指定topic分区/kafka/bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group testgp --topic yypt-cmdb-monitor-physical-sync:0 --reset-offsets --to-datetime 2017-08-04T14:30:00.000 --execute更新到当前group最初的offset位置/kafka/bin/kafka-consumer-groups.sh --bootstrap-server localhost:9093 --group test-group --reset-offsets --all-topics --to-earliest --execute
更新到当前group消费指定topic1到新的offset位置
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group test-group --reset-offsets --topic topic1 --to-latest --execute
更新到指定的offset位置
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group test-group --reset-offsets --all-topics --to-offset 500000 --execute
更新到当前offset位置(解决offset的异常)
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group test-group --reset-offsets --all-topics --to-current --execute
offset位置按设置的值进行位移
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group test-group --reset-offsets --all-topics --shift-by -100000 --execute
offset设置到指定时刻开始 需要考虑时区
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group test-group --reset-offsets --all-topics --to-datetime 2017-08-04T14:30:00.000 --execute/kafka/bin/kafka-run-class.sh kafka.tools.DumpLogSegments --files /bitnami/kafka4/data/zhuh_test-3/00000000000000000000.log --print-data-log --deep-iteration > /tmp/secLog.log# topic的partitions最大位移(可消费的最大位移)/kafka/bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9093 --time -1 --topic perfA
# topic的partitions最小位移(可消费的最小位移)/kafka/bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9093 --time -2 --topic perfA增加到6分区/kafka/bin/kafka-topics.sh --bootstrap-server localhost:9093 --topic dblab01 --alter --partitions 6 --command-config /tmp/client.properties指定brokerid创建topic/kafka/bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --replica-assignment 999:1000,999:998,1000:998 --topic test4修改消息时间戳为落盘时间/kafka/bin/kafka-configs.sh --bootstrap-server localhost:9092 --alter --broker 1000/1001/1002 --add-config log.message.timestamp.type=LogAppendTime重分配迁移命令/kafka/bin/kafka-reassign-partitions.sh --bootstrap-server 172.16.0.9:9093 --topics-to-move-json-file topic-to-move.json --broker-list "1000,1001" --generate
{"topics":[{"topic":"test1"},{"topic":"test2"},{"topic":"test3"},{"topic":"topic1"}],"version": 1
}/kafka/bin/kafka-reassign-partitions.sh --bootstrap-server 172.16.0.9:9093 --reassignment-json-file comm_partitions-to-move.json --execute{"version": 1,"partitions": [{"topic": "topic2","partition": 0,"replicas": [1000,1001]},{"topic": "topic2","partition": 1,"replicas": [1002,1003]}]
}/kafka/bin/kafka-leader-election.sh --bootstrap-server xxxx:9090 --topic Topic1 --election-type PREFERRED --partition 1# mirrorMaker使用/kafka/bin/kafka-mirror-maker.sh --consumer.config consumer.properties --producer.config producer.properties --num.streams 5 --whitelist=".*"
参数解释:
--producer.config:mirrormaker中的生产者,与我们常规的生产者配置是一样的,我们可以参照生产者配置文档进行参数配置
--consumer.config:mirrormaker中的消费者,与我们常规的消费者配置相同,可以参照消费者配置文档进行参数配置
--num.streams:mirrormaker中消费者线程的个数
--whitelist:mirrormaker中消费者指定消费的主题,.*表示所有主题,常规可以使用 | 分割开多个主题。
当然还有很多其他参数,我们可以执行bin/kafka-mirror-maker.sh --help来查看消费组操作/kafka/bin/kafka-consumer-groups.sh --bootstrap-server localhost:9093 --list/kafka/bin/kafka-consumer-groups.sh --bootstrap-server localhost:9093 --describe --group test-mm /kafka/bin/kafka-consumer-groups.sh --bootstrap-server localhost:9093 --delete --group perf-consumer-87887/kafka/bin/kafka-consumer-groups.sh --bootstrap-server localhost:9093 --group test_topic_consumer --all-topics --reset-offsets --to-datetime 2021-11-29T12:00:00.000+08:00 --executebin/kafka-consumer-groups.sh --bootstrap-server localhost:9093 --group test-group --reset-offsets --all-topics --to-earliest --execute/kafka/bin/kafka-consumer-groups.sh --bootstrap-server localhost:9093 --group test-group --reset-offsets --all-topics –to-latest --execute
kafka集群中某个pod数据损坏启动失败,可以再持久化log目录的data下执行如下命令,然后重启
rm -rf !(meta.properties)
ACL权限、配置操作(deny权限比allow权限更大)
# server.properties文件中添加
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
allow.everyone.if.no.acl.found=true
super.users=User:cmq-kafka-ild1veqxgj # 配置sasl用户名密码
cat > /tmp/client.testwb <<EOF
security.protocol=SASL_PLAINTEXT
sasl.mechanism=SCRAM-SHA-512
sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="test2" password="test2";
EOF# ACL操作
# 查看user信息/kafka/bin/kafka-configs.sh --bootstrap-server localhost:9093 --describe --entity-type users# 添加topic的配置 消息保留时间/kafka/bin/kafka-configs.sh --bootstrap-server localhost:9094 --add-config retentaion.ms=60000 --entity-type topics --entity-name test1 --alter
# 设置topic分区阈值/kafka/bin/kafka-configs.sh --bootstrap-server localhost:9094 --add-config retention.bytes=-1 --entity-type topics --entity-name test1 --alter # 创建acl用户/kafka/bin/kafka-configs.sh --bootstrap-server localhost:9094 --alter --add-config 'SCRAM-SHA-256=[iterations=8192,password=admin@12345],SCRAM-SHA-512=[password=admin@12345]' --entity-type users --entity-name admin1# 列出所有acl权限/kafka/bin/kafka-acls.sh --bootstrap-server localhost:9093 --list
# 查看user信息/kafka/bin/kafka-configs.sh --bootstrap-server localhost:9093 --describe --entity-type users --command-config /tmp/client.properties# 增加topic的describe权限,可以控制消费堆积查看,需要添加涉及group的所有topic/kafka/bin/kafka-acls.sh --bootstrap-server localhost:9093 -add --allow-principal User:testwb2 --operation DESCRIBE --topic test/kafka/bin/kafka-acls.sh --bootstrap-server localhost:9093 -add --allow-principal User:testwb --operation READ --operation WRITE --topic test1 --command-config /tmp/client.properties# 删除用户u_wa1c_wa_jhs的topic的读写权限/kafka/bin/kafka-acls.sh --bootstrap-server localhost:9093 --remove --allow-principal User:u_wa1c_wa_jhs --operation READ --operation WRITE --topic test --command-config /tmp/client.properties# 删除user信息/kafka/bin/kafka-configs.sh --bootstrap-server localhost:9093 --alter --delete-config 'SCRAM-SHA-512,SCRAM-SHA-256' --entity-type users --entity-name test2 --command-config /tmp/client.properties
# 添加用户test的topic1的读写权限/kafka/bin/kafka-acls.sh --bootstrap-server localhost:9093 -add --allow-principal User:test --operation READ --operation WRITE --topic topic1# 增加堆积查看能力/kafka/bin/kafka-acls.sh --bootstrap-server localhost:9093 -add --allow-principal User:test --operation DESCRIBE_CONFIGS --topic topic1/kafka/bin/kafka-acls.sh --bootstrap-server localhost:9092 -add --allow-principal User:test2 --operation DescribeConfigs --topic test#查询user1的权限列表/kafka/bin/kafka-acls.sh --bootstrap-server localhost:9093 --list --principal User:user2 --command-config /tmp/client.properties
# 查看topic得权限列表./kafka-acls.sh --bootstrap-server localhost:9093 --list --topic topic1 --command-config /tmp/client.properties
# 设置某个用户某个ip为黑名单/kafka/bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:* --allow-host * --deny-principal User:BadBob --deny-host 198.51.100.3 --operation All --topic Test-topic# 设置用户test不允许有管理操作权限/kafka/bin/kafka-acls.sh --bootstrap-server localhost:9093 --add --deny-principal User:test --cluster --operation All --command-config /tmp/client.properties
# 允许用户user有管理操作权限/kafka/bin/kafka-acls.sh --bootstrap-server localhost:9093 --add --allow-principal User:user --cluster --operation All --command-config /tmp/client.properties/kafka/bin/kafka-acls.sh --bootstrap-server localhost:9093 --add --deny-principal User:test --topic * --operation All --command-config /tmp/client.properties# 对用户 test 限流 10M/S/kafka/bin/kafka-configs.sh --bootstrap-server localhost:9093 --alter --add-config 'producer_byte_rate=10485760' --entity-type users --entity-name test# 对 client id 为 clientA 的限流 10M/S/kafka/bin/kafka-configs.sh --bootstrap-server localhost:9093 --alter --add-config 'producer_byte_rate=10485760' --entity-type clients --entity-name clientA# 重选举所有topic分区leader/kafka/bin/kafka-leader-election.sh --bootstrap-server localhost:9092 --election-type preferred --all-topic-partitions显示kafka目录的使用情况/kafka/bin/kafka-log-dirs.sh --bootstrap-server localhost:9093 --describe --broker-list 1000 --topic-list topic1日志级别操作
# 展示broker id的日志配置/kafka/bin/kafka-configs.sh --bootstrap-server localhost:9092 --broker-logger 1000 --describe# 将request日志设置成 debug/kafka/bin/kafka-configs.sh --bootstrap-server localhost:9092 --broker-logger 1000 --alter --add-config kafka.request.logger=DEBUG
--------------------------------------------------------------------------------
PS:常用的 Kafka Schema Registry 包括以下几种:
1.Confluent Schema Registry: 是由 Confluent 公司提供的 Kafka Schema Registry,是目前应用最为广泛的一个。
2.Apicurio Registry: 是一个开源的、基于 Java 的 Schema Registry 实现,拥有丰富的 API 支持和可扩展性。
3.IBM Event Streams Schema Registry: 是 IBM 公司提供的 Kafka Schema Registry,与 IBM Event Streams 云服务紧密集成。
4.AWS Glue Schema Registry: 是 Amazon Web Services(AWS)提供的 Schema Registry,适用于在 AWS 上部署的 Kafka 生态系统。
5.Alibaba Cloud Schemas API: 是阿里云提供的 Schema Registry 解决方案。
以上是常用的 Kafka Schema Registry,这些 Schema Registry 对应着不同的厂商和云平台,使用时需要根据实际情况进行选择。