Pulsar消息服务之Java工具类
Pulsar介绍
Pulsar是一种多租户、高性能的服务器到服务器消息传递解决方案。Pulsar最初由雅虎开发,目前由Apache软件基金会管理。
官方网站
https://pulsar.apache.org/
目的
基于Pulsar客户端jar包,用Java开发MQ消息发送与接收工具类;
JDK
17+
pom.xml
<dependency><groupId>org.apache.pulsar</groupId><artifactId>pulsar-client-all</artifactId><version>3.3.2</version>
</dependency>
PulsarMqUtils.java
package com.example;import org.apache.pulsar.client.api.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;/*** @Description Pulsar MQ服务生产与消费客户端工具类* @Version V1.0*/
public class PulsarMqUtils {private static Logger logger = LoggerFactory.getLogger(PulsarMqUtils.class);/*** pulsar服务地址*/private static final String SERVER_URL = "pulsar://%s:%s";/*** topic完整路径,格式为persistent://集群(租户)ID/命名空间/Topic名称,示例:persistent://public/space/topic* 注意:* persistent 表示持久化主题,无消费ack确认的消息会被broker持久化存储在节点磁盘中;* non-persistent 表示非持久化主题,消息将在生产者确认后存储在内存中,一旦服务broker关闭或重启、或主题无任何订阅者(消费者)则消息丢失。*/private static final String TOPIC_NAME = "persistent://%s/%s/%s";private PulsarClient client;private PulsarMqUtils() {}/*** 创建Pulsar工具类对象* @param host 主机* @param port 端口* @return* @throws PulsarClientException*/public static PulsarMqUtils buildServer(String host, int port) throws PulsarClientException {return new PulsarMqUtils().createPulsarClient(host, port);}/*** 创建Pulsar工具类对象* @param serviceUrl 示例:pulsar://localhost:6650,多个服务示列:pulsar://localhost:6550,localhost:6651* @return* @throws PulsarClientException*/public static PulsarMqUtils buildServer(String serviceUrl) throws PulsarClientException {return new PulsarMqUtils().createPulsarClient(serviceUrl);}/*** 创建Pulsar客户端连接* @param host 主机* @param port 端口* @return* @throws PulsarClientException*/private PulsarMqUtils createPulsarClient(String host, int port) throws PulsarClientException {return createPulsarClient(String.format(SERVER_URL, host, port));}/*** 创建Pulsar客户端连接* @param serviceUrl* @return* @throws PulsarClientException*/private PulsarMqUtils createPulsarClient(String serviceUrl) throws PulsarClientException {client = PulsarClient.builder()//pulsar服务地址.serviceUrl(serviceUrl)// 如果超过60秒未使用链接,则释放连接.connectionMaxIdleSeconds(60)// 超时连接5秒.connectionTimeout(5, TimeUnit.SECONDS)// 每个代理连接最大并发请求数(默认5000),按需调整以防占用pulsar资源过高.maxConcurrentLookupRequests(1000).build();return this;}/*** 创建Pulsar客户端指定(租户/命名空间/主题)生产者对象* @param tenant* @param namespace* @param topic* @param enableBatching* @return* @throws PulsarClientException*/public Producer<byte[]> newProducer(String tenant, String namespace, String topic, boolean enableBatching) throws PulsarClientException {return newProducer(String.format(TOPIC_NAME, tenant, namespace, topic), enableBatching);}/*** 创建Pulsar客户端指定(租户/命名空间/主题)生产者对象* @param topicName* @param enableBatching* @return* @throws PulsarClientException*/private Producer<byte[]> newProducer(String topicName, boolean enableBatching) throws PulsarClientException {return client.newProducer(Schema.BYTES).topic(topicName)// 消息发送超时时间3s(默认值:30秒);设置为零将使超时设置为无穷大,对重复数据删除功能时非常有用.sendTimeout(0, TimeUnit.SECONDS)// 启用了批处理消息,一次性产生多个消息.enableBatching(enableBatching)// 批次模式DEFAULT | KEY_BASED.batcherBuilder(BatcherBuilder.DEFAULT)// 默认值为1ms,如果设置为非零值,消息将排队,直到此时间间隔到期.batchingMaxPublishDelay(5, TimeUnit.MILLISECONDS)// 批处理中的最大消息数(默认值为1000).batchingMaxMessages(1000).create();}/*** 创建Pulsar客户端指定(租户/命名空间/主题)消者者对象* @param topicNames* @param subscriptionName* @param properties* @param messageListener* @return* @throws PulsarClientException*/private Consumer<byte[]> newConsumer(String [] topicNames, String subscriptionName, Map<String, String> properties, MessageListener<byte[]> messageListener) throws PulsarClientException {return client.newConsumer(Schema.BYTES)// topic完整路径,格式为persistent://集群(租户)ID/命名空间/Topic.topic(topicNames)// 需要在控制台Topic详情页创建好一个订阅,此处填写订阅名.subscriptionName(subscriptionName)// 声明消费模式为共享模式// Exclusive:独占模式(默认),此subscriptionName只能被一个客户端消费,有其它客户端订阅同名则会抛错// Shared: 多用户共享模式,客户端之间的循环分配,不保证消费顺序// Failover: 故障切换模,多个客户端连接,只能其中一个进行消费,待客户端产生故障后自动分配一个可用的新消费者// Key_Shared: 多个用户将能够使用相同的订阅,消息发给指定密钥的消费者.subscriptionType(SubscriptionType.Shared)// 订阅相关参数,tag订阅等.subscriptionProperties(properties)// Earliest 配置从最早开始消费,否则可能会消费不到历史消息// Latest 配置从最新开始消费,会丢失已到达的历史消息.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)// 监听消息.messageListener(messageListener)// ack确认超时.ackTimeout(5, TimeUnit.SECONDS)// 检查ack确认超时的间隔.ackTimeoutTickTime(2, TimeUnit.SECONDS)// 消费消息ack否定确认后,延迟指定时间后,重新投递回broker,再重新分配消费.negativeAckRedeliveryDelay(5, TimeUnit.SECONDS)// 设置批量接收消息策略.batchReceivePolicy(BatchReceivePolicy.builder()// 接收消息数量.maxNumMessages(100)// 接收消息大小.maxNumBytes(10 * 1024 * 1024)// 等待超时.timeout(200, TimeUnit.MILLISECONDS).build()).subscribe();}/*** 生产者发送批量消息(发送完即关闭对象)* @param tenant* @param namespace* @param topic* @param messages* @param properties* @throws PulsarClientException*/public void sendBatchMq(String tenant, String namespace, String topic, List<String> messages, Map<String, String> properties) throws PulsarClientException {Producer<byte[]> producer = newProducer(tenant, namespace, topic, true);for (String message : messages) {send(producer, message, properties, 0);}closeProducer(producer);}/*** 生产者发送消息(发送完即关闭对象)* @param tenant* @param namespace* @param topic* @param message* @param properties* @param delayTime* @throws PulsarClientException*/public void sendMq(String tenant, String namespace, String topic, String message, Map<String, String> properties, long delayTime) throws PulsarClientException {Producer<byte[]> producer = newProducer(tenant, namespace, topic, false);sendMq(producer, message, properties, delayTime);closeProducer(producer);}/*** 生产者发送消息(需手动关闭生产者对象)* @param producer* @param message* @param properties* @param delayTime* @throws PulsarClientException*/public void sendMq(Producer<byte[]> producer, String message, Map<String, String> properties, long delayTime) throws PulsarClientException {send(producer, message, properties, delayTime);}/*** 生产者发送消息核心方法* @param producer* @param message* @param properties* @param delayTime* @throws PulsarClientException*/private void send(Producer<byte[]> producer, String message, Map<String, String> properties, long delayTime) throws PulsarClientException {MessageId messageId = producer.newMessage().properties(properties)// 为消息设置密钥//.key("").value(message.getBytes(StandardCharsets.UTF_8))// deliverAt : 定时方法, deliverAfter:延时方法// 延时消息的时长取值范围为0 - 864000秒(0秒 - 10天)。如10月1日12:00开始,最长可以设置864000秒。定时和延时消息在精度上会有1秒左右的偏差。// 延时消息的消费模式仅支持使用 Shared 模式进行消费,否则会失去延时效果(Key-shared 也不支持)。// 设定定时时间后,TTL 的时间依旧会从发送消息的时间点开始算消息的最长保留时间;要确保 TTL 的时间要大于延时的时间,否则 TTL 到期时,消息会被删除。.deliverAfter(delayTime, TimeUnit.SECONDS).send();logger.info("pulsar client sened message! id : {}", messageId);}/*** 消费者(订阅者)接收Pulsar服务主题下的消息* @param tenant* @param namespace* @param topic* @param subscriptionName* @param properties* @param headerInterface* @throws IOException*/public void receiveMq(String tenant, String namespace, String topic, String subscriptionName, Map<String, String> properties, HeaderInterface headerInterface) throws IOException {receiveMq(new String[] {String.format(TOPIC_NAME, tenant, namespace, topic)}, subscriptionName, properties, headerInterface);}/*** 消费者(订阅者)接收Pulsar服务主题下的消息* @param topics 多个主题:"persistent://public/space/topic1,persistent://public/space/topic2,..."* @param subscriptionName* @param properties* @param headerInterface* @throws IOException*/public void receiveMq(String topics, String subscriptionName, Map<String, String> properties, HeaderInterface headerInterface) throws IOException {receiveMq(topics.split(","), subscriptionName, properties, headerInterface);}/*** 消费者(订阅者)接收Pulsar服务主题下的消息* @param topicNames 多个主题:["persistent://public/space/topic1","persistent://public/space/topic2",...]* @param subscriptionName* @param properties* @param headerInterface* @throws IOException*/public void receiveMq(String [] topicNames, String subscriptionName, Map<String, String> properties, HeaderInterface headerInterface) throws IOException {MessageListener<byte[]> messageListener = (consumer, message) -> {try {// 回调逻辑方法headerInterface.execute(message);//ack应答消息已消费成功consumer.acknowledge(message);} catch (IOException ioe) {logger.error("pulsar consumer receive message error! " + ioe.getMessage(), ioe);// ack应答消息消费失败consumer.negativeAcknowledge(message);}};Consumer<byte[]> consumer = newConsumer(topicNames, subscriptionName, properties, messageListener);logger.info("pulsar consumer start listener message!");addShutdownHook(consumer);}/*** 关闭生产者对象* @param producer*/public void closeProducer(Producer<byte[]> producer) {if (producer != null && producer.isConnected()) {try {producer.flush();producer.close();logger.info("pulsar producer is close!");} catch(PulsarClientException e) {logger.error("pulsar producer close error!", e);}}}/*** 关闭消费者对象* @param consumer*/private void closeConsumer(Consumer<byte[]> consumer) {if (consumer != null && consumer.isConnected()) {try {consumer.close();logger.info("pulsar consumer is close!");} catch(PulsarClientException e) {logger.error("pulsar consumer close error!", e);}}}/*** 关闭客户端连接*/public void close() {if (client != null && !client.isClosed()) {try {client.close();logger.info("pulsar client is close!");} catch(PulsarClientException e) {logger.error("pulsar client close error!", e);}}}/*** 函数式回调接口*/@FunctionalInterfaceinterface HeaderInterface{void execute(Message<byte[]> message) throws IOException;}/*** 注册虚拟机器关机钩子事件* @param consumer*/private void addShutdownHook(Consumer<byte[]> consumer){Runtime.getRuntime().addShutdownHook(new Thread(() -> {logger.info("jvm shutdown hook: close pulsar consumer server ...");closeConsumer(consumer);close();}));}/*** 示例* @param args* @throws IOException*/public static void main(String[] args) throws IOException {// 配置生产者/消费者相关参数HashMap<String, String> properties = new HashMap<>();properties.put("tag1","1");// PulsarMqUtils pulsarMqUtils = PulsarMqUtils.buildServer("localhost", 6650);PulsarMqUtils pulsarMqUtils = PulsarMqUtils.buildServer("pulsar://localhost:6650");//生产消息(延迟10s)pulsarMqUtils.sendMq("public", "default", "topic1", "test Message1, time:" + System.currentTimeMillis(), properties, 10L);//生产消息pulsarMqUtils.sendMq("public", "default", "topic2", "test Message2, time:" + System.currentTimeMillis(), properties, 0L);//生产批量消息
// List<String> messages = new ArrayList<>();
// for (int i=0;i< 1000; i++) {
// messages.add("test Message, time:" + System.currentTimeMillis() + "," + i);
// }
// pulsarMqUtils.sendBatchMq("public", "default", "topic1", messages, properties);//消费消息
// pulsarMqUtils.receiveMq("public", "default", "topic1", "topic_sub1", properties, (message) -> {
// logger.info("message:{}" , new String(message.getValue(), StandardCharsets.UTF_8));
// });//多个主题订阅消费消息String topics = "persistent://public/default/topic1,persistent://public/default/topic2";pulsarMqUtils.receiveMq(topics, "topic_sub1", properties, (message) -> {logger.info("message:{}" , new String(message.getValue(), StandardCharsets.UTF_8));});//关闭客户端
// pulsarMqUtils.close();}}
基于Pulsar客户端jar实现Java创建生产者与消费者操作工具类,支持发送mq消息、监听消费mq消息,简单易用引入到项目代码中使用;
此Java工具类有经过实战与测试,但难免有不足之处,欢迎交流与指正;