当前位置: 首页 > news >正文

动态订阅kafka mq实现(消费者组动态上下线)

和上篇文章 动态订阅rocket mq实现(消费者组动态上下线) 目的一致,直接上代码

    /*** Kafka topic container集合*/private static final Map<String, ConcurrentMessageListenerContainer<String, String>> topics = new HashMap<>();public void registerKafkaListeners(BinlogPortDatabaseConfig binlogPortDatabaseConfig) {/*BinlogPortDatabaseConfig是自定义的数据结构,即需要动态注册的kafka配置包含topic、sever、client,自定义即可*/ConsumerFactory<String, String> consumerFactory = binlogPortDatabaseConfig.createConsumerFactory();ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();factory.setBatchListener(true);if (consumerFactory == null) {return;}factory.setConsumerFactory(consumerFactory);ConcurrentMessageListenerContainer<String, String> container = factory.createContainer(binlogPortDatabaseConfig.getTopic());//设置为false,解决client后自动加-0的问题container.setAlwaysClientIdSuffix(false);container.setupMessageListener((MessageListener<String, String>) record -> {//TODO:你的消费逻辑,record即为消息体}} catch (IllegalArgumentException e) {log.error("registerKafkaListeners JSON解析失败", e);} catch (NullPointerException e) {log.error("registerKafkaListeners 消息为空或部分字段缺失", e);} catch (Exception e) {log.error("registerKafkaListeners 注册异常", e);}});container.start();topics.put(binlogPortDatabaseConfig.getTopic(), container);}public void factoryDel(String topic) {ConcurrentMessageListenerContainer<String, String> container = topics.get(topic);if (!topic.isEmpty()) {container.stop();topics.remove(topic);}}public ConsumerFactory<String, String> createConsumerFactory() {Properties props = new Properties();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, /*你的kafka server*/);props.put(ConsumerConfig.CLIENT_ID_CONFIG, /*你的kafka client*/);if (SystemEnvUtil.isTest()) {props.put(ConsumerConfig.GROUP_ID_CONFIG, Constant.consumerGroupIdOffline + topic);}if (SystemEnvUtil.isProd() || SystemEnvUtil.isSandbox()) {props.put(ConsumerConfig.GROUP_ID_CONFIG,/*你的group id*/);}props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, String.valueOf(100));props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, String.valueOf(false));Map<String, Object> configMap = new java.util.HashMap<>();for (Map.Entry<Object, Object> entry : props.entrySet()) {configMap.put((String) entry.getKey(), entry.getValue());}return new DefaultKafkaConsumerFactory<>(configMap);}

http://www.mrgr.cn/news/91669.html

相关文章:

  • 【3.2JavaScript】JavaScript语法基础
  • git-提交时间和作者时间的区别
  • 字符串函数和结构题内存对齐
  • rknn 板端运行程序Invalid RKNN model version 6, Meet unsupported rknn target type
  • Java 面试笔记 - Java基础
  • 技术总结 | MySQL面试知识点
  • 技术解析 | 适用于TeamCity的Unreal Engine支持插件,提升游戏构建效率
  • Compose常用UI组件
  • 代码随想录算法训练营第六天| 242.有效的字母异位词 、349. 两个数组的交集、202. 快乐数 、1. 两数之和
  • SOME/IP--协议英文原文讲解6
  • IO模型与NIO基础--NIO网络传输选择器
  • 如何通过 Homebrew 安装 Qt 并配置环境变量
  • idea 2023.3.7常用插件
  • 面试题之手写call,apply,bind
  • 【OS安装与使用】part4-ubuntu22.04安装anaconda
  • virtualbox怎么把主机剪切板里的内容复制进来
  • 二、Three.js几何体BufferGeometry顶点笔记
  • 强化学习-价值学习算法
  • 如何查询网站是否被百度蜘蛛收录?
  • 计算机网络抄手 运输层