SpringBoot整合分布式消息平台Pulsar

本文涉及的产品
日志服务 SLS,月写入数据量 50GB 1个月
简介: SpringBoot整合分布式消息平台Pulsar

大家好,我是君哥。

作为优秀的消息流平台,Pulsar 的使用越来越多,这篇文章讲解 Pulsar 的 Java 客户端。

部署 Pulsar

Pulsar 的部署方式主要有 3 种,本地安装二进制文件、docker 部署、在 Kubernetes 上部署。

本文采用 docker 部署一个单节点的 Pulsar 集群。实验环境是 2 核 CPU 和 4G 内存。

部署命令如下:

docker run -it -p 6650:6650  -p 8080:8080 --mount source=pulsardata,target=/pulsar/data --mount source=pulsarconf,target=/pulsar/conf apachepulsar/pulsar:2.9.1 bin/pulsar standalone

安装过程可能会出现下面的错误:

unknown flag: --mount
See 'docker run --help'.

这是因为 docker 版本低,不支持 mount 参数,把 docker 版本升级到 17.06 以上就可以了。

部署过程中可能会因为网络的原因失败,多试几次就可以成功了。如果看到下面的日志,就说明启动成功了。

2022-01-08T22:27:58,726+0000 [main] INFO  org.apache.pulsar.broker.PulsarService - messaging service is ready, bootstrap service port = 8080, broker url= pulsar://localhost:6650, cluster=standalone

本地单节点集群启动后,会创建一个 namespace,名字叫 public/default

Pulsar 客户端

目前 Pulsar 支持多种语言的客户端,包括:

Java 客户端Go 客户端Python 客户端C++ 客户端Node.js 客户端WebSocket 客户端C# 客户端

SpringBoot 配置

使用 SpringBoot 整合 Pulsar 客户端,首先引入 Pulsar 客户端依赖,代码如下:

<dependency>
    <groupId>org.apache.pulsar</groupId>
    <artifactId>pulsar-client</artifactId>
    <version>2.9.1</version>
</dependency>

然后在 properties 文件中添加配置:

# Pulsar 地址
pulsar.url=pulsar://192.168.59.155:6650
# topic
pulsar.topic=testTopic
# consumer group
pulsar.subscription=topicGroup

创建 Client

创建客户端非常简单,代码如下:

client = PulsarClient.builder()
                .serviceUrl(url)
                .build();

上面的 url 就是 properties 文件中定义的 pulsar.url 。

创建 Client 时,即使集群没有启成功,程序也不会报错,因为这时还没有真正地去连接集群。

创建 Producer

producer = client.newProducer()
                .topic(topic)
                .compressionType(CompressionType.LZ4)
                .sendTimeout(0, TimeUnit.SECONDS)
                .enableBatching(true)
                .batchingMaxPublishDelay(10, TimeUnit.MILLISECONDS)
                .batchingMaxMessages(1000)
                .maxPendingMessages(1000)
                .blockIfQueueFull(true)
                .roundRobinRouterBatchingPartitionSwitchFrequency(10)
                .batcherBuilder(BatcherBuilder.DEFAULT)
                .create();

创建 Producer,会真正的连接集群,这时如果集群有问题,就会报连接错误。

下面解释一下创建 Producer 的参数:

topic:Producer 要写入的 topic。compressionType:压缩策略,目前支持 4 种策略 (NONE、LZ4、ZLIB、ZSTD),从 Pulsar2.3 开始,只有 Consumer 的版本在 2.3 以上,这个策略才会生效。sendTimeout:超时时间,如果 Producer 在超时时间为收到 ACK,会进行重新发送。enableBatching:是否开启消息批量处理,这里默认 true,这个参数只有在异步发送 (sendAsync) 时才能生效,选择同步发送会失效。batchingMaxPublishDelay:批量发送消息的时间段,这里定义的是 10ms,需要注意的是,设置了批量时间,就不会受消息数量的影响。批量发送会把要发送的批量消息放在一个网络包里发送出去,减少网络 IO 次数,大大提高网卡的发送效率。batchingMaxMessages:批量发送消息的最大数量。maxPendingMessages:等待从 broker 接收 ACK 的消息队列最大长度。如果这个队列满了,producer 所有的 sendAsync 和 send 都会失败,除非设置了 blockIfQueueFull 值是 true。blockIfQueueFull:Producer 发送消息时会把消息先放入本地 Queue 缓存,如果缓存满了,就会阻塞消息发送。roundRobinRouterBatchingPartition-SwitchFrequency:如果发送消息时没有指定 key,那默认采用 round robin 的方式发送消息,使用 round robin 的方式,切换 partition 的周期是 (frequency * batchingMaxPublishDelay)。

创建 Consumer

Pulsar 的消费模型如下图:

微信图片_20221213104826.png

大家好,我是君哥。

作为优秀的消息流平台,Pulsar 的使用越来越多,这篇文章讲解 Pulsar 的 Java 客户端。

部署 Pulsar

Pulsar 的部署方式主要有 3 种,本地安装二进制文件、docker 部署、在 Kubernetes 上部署。

本文采用 docker 部署一个单节点的 Pulsar 集群。实验环境是 2 核 CPU 和 4G 内存。

部署命令如下:

docker run -it -p 6650:6650  -p 8080:8080 --mount source=pulsardata,target=/pulsar/data --mount source=pulsarconf,target=/pulsar/conf apachepulsar/pulsar:2.9.1 bin/pulsar standalone

安装过程可能会出现下面的错误:

unknown flag: --mount
See 'docker run --help'.

这是因为 docker 版本低,不支持 mount 参数,把 docker 版本升级到 17.06 以上就可以了。

部署过程中可能会因为网络的原因失败,多试几次就可以成功了。如果看到下面的日志,就说明启动成功了。

2022-01-08T22:27:58,726+0000 [main] INFO  org.apache.pulsar.broker.PulsarService - messaging service is ready, bootstrap service port = 8080, broker url= pulsar://localhost:6650, cluster=standalone

本地单节点集群启动后,会创建一个 namespace,名字叫 public/default

Pulsar 客户端

目前 Pulsar 支持多种语言的客户端,包括:

Java 客户端Go 客户端Python 客户端C++ 客户端Node.js 客户端WebSocket 客户端C# 客户端

SpringBoot 配置

使用 SpringBoot 整合 Pulsar 客户端,首先引入 Pulsar 客户端依赖,代码如下:

<dependency>
    <groupId>org.apache.pulsar</groupId>
    <artifactId>pulsar-client</artifactId>
    <version>2.9.1</version>
</dependency>

然后在 properties 文件中添加配置:

# Pulsar 地址
pulsar.url=pulsar://192.168.59.155:6650
# topic
pulsar.topic=testTopic
# consumer group
pulsar.subscription=topicGroup

创建 Client

创建客户端非常简单,代码如下:

client = PulsarClient.builder()
                .serviceUrl(url)
                .build();

上面的 url 就是 properties 文件中定义的 pulsar.url 。

创建 Client 时,即使集群没有启成功,程序也不会报错,因为这时还没有真正地去连接集群。

创建 Producer

producer = client.newProducer()
                .topic(topic)
                .compressionType(CompressionType.LZ4)
                .sendTimeout(0, TimeUnit.SECONDS)
                .enableBatching(true)
                .batchingMaxPublishDelay(10, TimeUnit.MILLISECONDS)
                .batchingMaxMessages(1000)
                .maxPendingMessages(1000)
                .blockIfQueueFull(true)
                .roundRobinRouterBatchingPartitionSwitchFrequency(10)
                .batcherBuilder(BatcherBuilder.DEFAULT)
                .create();

创建 Producer,会真正的连接集群,这时如果集群有问题,就会报连接错误。

下面解释一下创建 Producer 的参数:

topic:Producer 要写入的 topic。compressionType:压缩策略,目前支持 4 种策略 (NONE、LZ4、ZLIB、ZSTD),从 Pulsar2.3 开始,只有 Consumer 的版本在 2.3 以上,这个策略才会生效。sendTimeout:超时时间,如果 Producer 在超时时间为收到 ACK,会进行重新发送。enableBatching:是否开启消息批量处理,这里默认 true,这个参数只有在异步发送 (sendAsync) 时才能生效,选择同步发送会失效。batchingMaxPublishDelay:批量发送消息的时间段,这里定义的是 10ms,需要注意的是,设置了批量时间,就不会受消息数量的影响。批量发送会把要发送的批量消息放在一个网络包里发送出去,减少网络 IO 次数,大大提高网卡的发送效率。batchingMaxMessages:批量发送消息的最大数量。maxPendingMessages:等待从 broker 接收 ACK 的消息队列最大长度。如果这个队列满了,producer 所有的 sendAsync 和 send 都会失败,除非设置了 blockIfQueueFull 值是 true。blockIfQueueFull:Producer 发送消息时会把消息先放入本地 Queue 缓存,如果缓存满了,就会阻塞消息发送。roundRobinRouterBatchingPartition-SwitchFrequency:如果发送消息时没有指定 key,那默认采用 round robin 的方式发送消息,使用 round robin 的方式,切换 partition 的周期是 (frequency * batchingMaxPublishDelay)。

创建 Consumer

Pulsar 的消费模型如下图:

微信图片_20221213104850.png

Shared:共享模式,同一个 Topic 可以由多个消费者订阅和消费。消息通过 round robin 轮询机制分发给不同的消费者,并且每个消息仅会被分发给一个消费者。当消费者断开,如果发送给它消息没有被消费,这些消息会被重新分发给其它存活的消费者。如下图:

微信图片_20221213104913.png

Key_Shared:消息和消费者都会绑定一个key,消息只会发送给绑定同一个key的消费者。如果有新消费者建立连接或者有消费者断开连接,就需要更新一些消息的 key。跟 Shared 模式相比,Key_Shared 的好处是既可以让消费者并发地消费消息,又能保证同一Key下的消息顺序。如下图:

微信图片_20221213104940.png

subscriptionInitialPosition:创建新的 subscription 时从哪里开始消费,有两个选项:

    Latest :从最新的消息开始消费 Earliest :从最早的消息开始消费

negativeAckRedeliveryDelay:消费失败后间隔多久 broker 重新发送。

receiverQueueSize:在调用 receive 方法之前,最多能累积多少条消息。可以设置为 0,这样每次只从 broker 拉取一条消息。在 Shared 模式下,receiverQueueSize 设置为 0,可以防止批量消息多发给一个 Consumer 而导致其他 Consumer 空闲。

Consumer 接收消息有四种方式:同步单条、同步批量、异步单条和异步批量,代码如下:

Message message = consumer.receive()
CompletableFuture<Message> message = consumer.receiveAsync();
Messages message = consumer.batchReceive();
CompletableFuture<Messages> message = consumer.batchReceiveAsync();

对于批量接收,也可以设置批量接收的策略,代码如下:

consumer = client.newConsumer()
    .topic(topic)
    .subscriptionName(subscription)
        .batchReceivePolicy(BatchReceivePolicy.builder()
        .maxNumMessages(100)
        .maxNumBytes(1024 * 1024)
        .timeout(200, TimeUnit.MILLISECONDS)
        .build())
    .subscribe();

代码中的参数说明如下:

maxNumMessages:批量接收的最大消息数量。maxNumBytes:批量接收消息的大小,这里是 1MB。

测试

首先编写 Producer 发送消息的代码,如下:

public void sendMsg(String key, String data) {
    CompletableFuture<MessageId> future = producer.newMessage()
        .key(key)
        .value(data.getBytes()).sendAsync();
    future.handle((v, ex) -> {
        if (ex == null) {
            logger.info("发送消息成功, key:{}, msg: {}", key, data);
        } else {
            logger.error("发送消息失败, key:{}, msg: {}", key, data);
        }
        return null;
    });
    future.join();
    logger.info("发送消息完成, key:{}, msg: {}", key, data);
}

然后编写一个 Consumer 消费消息的代码,如下:

public void start() throws Exception{
    while (true) {
        Message message = consumer.receive();
        String key = message.getKey();
        String data = new String(message.getData());
        String topic = message.getTopicName();
        if (StringUtils.isNotEmpty(data)) {
            try{
                logger.info("收到消息, topic:{}, key:{}, data:{}", topic, key, data);
            }catch(Exception e){
                logger.error("接收消息异常,topic:{}, key:{}, data:{}", topic, key, data, e);
            }
        }
        consumer.acknowledge(message);
    }
}

最后编写一个 Controller 类,调用 Producer 发送消息,代码如下:

@RequestMapping("/send")
@ResponseBody
public String send(@RequestParam String key, @RequestParam String data) {
    logger.info("收到消息发送请求, key:{}, value:{}", key, data);
    pulsarProducer.sendMsg(key, data);
    return "success";
}

调用 Producer 发送一条消息,key=key1,data=data1,具体操作为在浏览器中输入下面的 url 后回车:

http://192.168.157.1:8083/pulsar/send?key=key1&data=data1

可以看到控制台输出下面日志:

2022-01-08 22:42:33,199 [pulsar-client-io-6-1] [INFO] boot.pulsar.PulsarProducer - 发送消息成功, key:key1, msg: data1
2022-01-08 22:42:33,200 [http-nio-8083-exec-1] [INFO] boot.pulsar.PulsarProducer - 发送消息完成, key:key1, msg: data1
2022-01-08 22:42:33,232 [Thread-22] [INFO] boot.pulsar.PulsarConsumer - 收到消息, topic:persistent://public/default/testTopic, key:key1, data:data1
2022-01-08 22:43:14,498 [pulsar-timer-5-1] [INFO] org.apache.pulsar.client.impl.ConsumerStatsRecorderImpl - [testTopic] [topicGroup] [7def6] Prefetched messages: 0 --- Consume throughput received: 0.02 msgs/s --- 0.00 Mbit/s --- Ack sent rate: 0.02 ack/s --- Failed messages: 0 --- batch messages: 0 ---Failed acks: 0
2022-01-08 22:43:14,961 [pulsar-timer-9-1] [INFO] org.apache.pulsar.client.impl.ProducerStatsRecorderImpl - [testTopic] [standalone-9-0] Pending messages: 0 --- Publish throughput: 0.02 msg/s --- 0.00 Mbit/s --- Latency: med: 69.000 ms - 95pct: 69.000 ms - 99pct: 69.000 ms - 99.9pct: 69.000 ms - max: 69.000 ms --- Ack received rate: 0.02 ack/s --- Failed messages: 0

从日志中看到,这里使用的 namespace 就是创建集群时生成的public/default。

总结

从 SpringBoot 整合 Java 客户端使用来看,Pulsar 的 api 是非常友好的,使用起来方便简洁。Consumer 的使用需要考虑多一些,需要考虑到批量、异步以及订阅类型。

相关实践学习
深入解析Docker容器化技术
Docker是一个开源的应用容器引擎,让开发者可以打包他们的应用以及依赖包到一个可移植的容器中,然后发布到任何流行的Linux机器上,也可以实现虚拟化,容器是完全使用沙箱机制,相互之间不会有任何接口。Docker是世界领先的软件容器平台。开发人员利用Docker可以消除协作编码时“在我的机器上可正常工作”的问题。运维人员利用Docker可以在隔离容器中并行运行和管理应用,获得更好的计算密度。企业利用Docker可以构建敏捷的软件交付管道,以更快的速度、更高的安全性和可靠的信誉为Linux和Windows Server应用发布新功能。 在本套课程中,我们将全面的讲解Docker技术栈,从环境安装到容器、镜像操作以及生产环境如何部署开发的微服务应用。本课程由黑马程序员提供。 &nbsp; &nbsp; 相关的阿里云产品:容器服务 ACK 容器服务 Kubernetes 版(简称 ACK)提供高性能可伸缩的容器应用管理能力,支持企业级容器化应用的全生命周期管理。整合阿里云虚拟化、存储、网络和安全能力,打造云端最佳容器化应用运行环境。 了解产品详情: https://www.aliyun.com/product/kubernetes
相关文章
|
4月前
|
消息中间件 运维 Kafka
直播预告|Kafka+Flink双引擎实战:手把手带你搭建分布式实时分析平台!
在数字化转型中,企业亟需从海量数据中快速提取价值并转化为业务增长动力。5月15日19:00-21:00,阿里云三位技术专家将讲解Kafka与Flink的强强联合方案,帮助企业零门槛构建分布式实时分析平台。此组合广泛应用于实时风控、用户行为追踪等场景,具备高吞吐、弹性扩缩容及亚秒级响应优势。直播适合初学者、开发者和数据工程师,参与还有机会领取定制好礼!扫描海报二维码或点击链接预约直播:[https://developer.aliyun.com/live/255088](https://developer.aliyun.com/live/255088)
338 35
直播预告|Kafka+Flink双引擎实战:手把手带你搭建分布式实时分析平台!
|
4月前
|
消息中间件 运维 Kafka
直播预告|Kafka+Flink 双引擎实战:手把手带你搭建分布式实时分析平台!
直播预告|Kafka+Flink 双引擎实战:手把手带你搭建分布式实时分析平台!
168 11
|
7天前
|
消息中间件 监控 Java
Apache Kafka 分布式流处理平台技术详解与实践指南
本文档全面介绍 Apache Kafka 分布式流处理平台的核心概念、架构设计和实践应用。作为高吞吐量、低延迟的分布式消息系统,Kafka 已成为现代数据管道和流处理应用的事实标准。本文将深入探讨其生产者-消费者模型、主题分区机制、副本复制、流处理API等核心机制,帮助开发者构建可靠、可扩展的实时数据流处理系统。
101 4
|
15天前
|
NoSQL Java 调度
分布式锁与分布式锁使用 Redis 和 Spring Boot 进行调度锁(不带 ShedLock)
分布式锁是分布式系统中用于同步多节点访问共享资源的机制,防止并发操作带来的冲突。本文介绍了基于Spring Boot和Redis实现分布式锁的技术方案,涵盖锁的获取与释放、Redis配置、服务调度及多实例运行等内容,通过Docker Compose搭建环境,验证了锁的有效性与互斥特性。
分布式锁与分布式锁使用 Redis 和 Spring Boot 进行调度锁(不带 ShedLock)
|
4月前
|
监控 Java 调度
SpringBoot中@Scheduled和Quartz的区别是什么?分布式定时任务框架选型实战
本文对比分析了SpringBoot中的`@Scheduled`与Quartz定时任务框架。`@Scheduled`轻量易用,适合单机简单场景,但存在多实例重复执行、无持久化等缺陷;Quartz功能强大,支持分布式调度、任务持久化、动态调整和失败重试,适用于复杂企业级需求。文章通过特性对比、代码示例及常见问题解答,帮助开发者理解两者差异,合理选择方案。记住口诀:单机简单用注解,多节点上Quartz;若是任务要可靠,持久化配置不能少。
476 4
|
3月前
|
运维 监控 Linux
WGCLOUD运维平台的分布式计划任务功能介绍
WGCLOUD是一款免费开源的运维监控平台,支持主机与服务器性能监控,具备实时告警和自愈功能。本文重点介绍其计划任务功能模块,可统一管理Linux和Windows主机的定时任务。相比手动配置crontab或Windows任务计划,WGCLOUD提供直观界面,通过添加cron表达式、执行指令或脚本并选择主机,即可轻松完成任务设置,大幅提升多主机任务管理效率。
|
5月前
|
SQL 监控 Go
新一代 Cron-Job分布式调度平台,v1.0.8版本发布,支持Go执行器SDK!
现代化的Cron-Job分布式任务调度平台,支持Go语言执行器SDK,多项核心优势优于其他调度平台。
97 8
|
6月前
|
存储 Java 文件存储
🗄️Spring Boot 3 整合 MinIO 实现分布式文件存储
本文介绍了如何基于Spring Boot 3和MinIO实现分布式文件存储。随着应用规模扩大,传统的单机文件存储方案难以应对大规模数据和高并发访问,分布式文件存储系统成为更好的选择。文章详细讲解了MinIO的安装、配置及与Spring Boot的整合步骤,包括Docker部署、MinIO控制台操作、Spring Boot项目中的依赖引入、配置类编写及工具类封装等内容。最后通过一个上传头像的接口示例展示了具体的开发和测试过程,强调了将API操作封装成通用工具类以提高代码复用性和可维护性的重要性。
1257 7
🗄️Spring Boot 3 整合 MinIO 实现分布式文件存储
|
6月前
|
数据采集 监控 数据可视化
11.7K Star!这个分布式爬虫管理平台让多语言协作如此简单!
分布式爬虫管理平台Crawlab,支持任何编程语言和框架的爬虫管理,提供可视化界面、任务调度、日志监控等企业级功能,让爬虫开发管理效率提升300%!
198 1
|
9天前
|
前端开发 安全 Java
基于springboot+vue开发的会议预约管理系统
一个完整的会议预约管理系统,包含前端用户界面、管理后台和后端API服务。 ### 后端 - **框架**: Spring Boot 2.7.18 - **数据库**: MySQL 5.6+ - **ORM**: MyBatis Plus 3.5.3.1 - **安全**: Spring Security + JWT - **Java版本**: Java 11 ### 前端 - **框架**: Vue 3.3.4 - **UI组件**: Element Plus 2.3.8 - **构建工具**: Vite 4.4.5 - **状态管理**: Pinia 2.1.6 - **HTTP客户端
86 4
基于springboot+vue开发的会议预约管理系统