Pulsar分布式消息处理平台实战

简介: Pulsar分布式消息处理平台实战

Pulsar分布式消息处理平台实战

什么是Pulsar?

Pulsar是由Apache基金会孵化的一款分布式消息处理平台,它具备如下特点:

  1. 分布式架构:Pulsar可以在多个集群节点上运行,消息可以在各个节点之间进行传递。多集群部署时,Pulsar还具备跨集群消息复制和应用级别的容错机制。
  2. 高性能:Pulsar的消息传递速度非常快,可以处理百万级消息。另外,Pulsar的消费模型支持多队列方式,可以充分利用CPU和网络带宽。
  3. 可扩展:Pulsar的设计理念是可扩展的,可以方便地扩展其处理能力。Pulsar还支持Kafka API,这意味着用户可以使用Kafka客户端来访问Pulsar。
  4. 多种语言支持:Pulsar的客户端支持多种编程语言,包括Java、C++、Python、Go等。

Pulsar的核心概念

在使用Pulsar之前,需要先了解Pulsar的一些核心概念。

Topic

Topic是消息发布和订阅的逻辑地址,可以理解为消息的目的地。Pulsar的Topic采用的命名规则是“pulsar://tenant/namespace/topic”,其中tenant是租户标识,namespace是命名空间标识,topic是Topic名称。

Producer

Producer是消息的生产者,用于往Topic中发送消息。一个Topic可以有多个Producer。

Consumer

Consumer是消息的消费者,用于从Topic中消费消息。一个Topic可以有多个Consumer。

Subscription

Subscription是一种消费模式,用于控制Consumer如何消费消息。Pulsar支持两种Subscription类型:Exclusive Subscription和Shared Subscription。

  1. Exclusive Subscription:一个Topic只能被一组Consumer消费,消费模式为Exclusive Subscription时,同一时间只能有一个Consumer消费Topic。
  2. Shared Subscription:一个Topic可以被多组Consumer消费,消费模式为Shared Subscription时,多个Consumer可以同时消费同一个Topic。

Message

Message是Pulsar中的消息对象,可以包含任何数据。在Pulsar中,一条消息由三个部分组成:Payload、Key、Properties。Payload是消息的内容,Key是消息的唯一标识,Properties是消息的一些属性信息,可以用于筛选和路由消息。

Pulsar的安装和使用

环境准备

在开始使用Pulsar之前,需要先安装Java 8+和Maven。另外,建议使用Linux系统来运行Pulsar。

Pulsar的安装

Pulsar可以通过源码编译和二进制发行版两种方式来安装。

源码编译安装
  1. 下载源代码
git clone https://github.com/apache/pulsar.git
cd pulsar
  1. 编译源代码
mvn clean install -DskipTests
  1. 解压二进制包

在pulsar/distribution/server/target目录下可以找到生成的二进制包。

tar xvfz apache-pulsar-2.8.0-incubating-bin.tar.gz
cd apache-pulsar-2.8.0-incubating
二进制发行版安装
  1. 下载二进制包
wget https://archive.apache.org/dist/incubator/pulsar/pulsar-2.8.0-incubating/apache-pulsar-2.8.0-incubating-bin.tar.gz
  1. 解压二进制包
tar xvfz apache-pulsar-2.8.0-incubating-bin.tar.gz
cd apache-pulsar-2.8.0-incubating

Pulsar的启动和关闭

启动Pulsar

在Pulsar的bin目录下,执行如下命令启动Pulsar服务:

./pulsar standalone

这里启动的是单机版的Pulsar服务,如果要启动分布式Pulsar服务,需要修改配置文件,具体配置方式请参考Pulsar官方文档。

关闭Pulsar

在Pulsar的bin目录下,执行如下命令关闭Pulsar服务:

./pulsar-daemon stop standalone

Pulsar的实战演练

使用Pulsar的Java客户端

Pulsar的Java客户端支持生产者和消费者的编程,可以通过Maven来引入Java客户端的依赖项。

引入Java客户端的依赖项:

<dependency>
    <groupId>org.apache.pulsar</groupId>
    <artifactId>pulsar-client</artifactId>
    <version>2.8.0-incubating</version>
</dependency>
发送消息

在Pulsar中发送消息需要创建一个Producer对象,然后使用Producer对象来发送消息。

创建Producer的代码如下:

Producer<String> producer = PulsarClient.builder()
    .serviceUrl("pulsar://localhost:6650")
    .build()
    .newProducer(Schema.STRING)
    .topic("my-topic");

其中,serviceUrl是Pulsar服务的地址,Schema.STRING表示发送的数据类型为字符串,topic是消息目的地。

使用Producer对象发送消息:

producer.send("hello world");
接收消息

在Pulsar中接收消息需要创建一个Consumer对象,然后使用Consumer对象来接收消息。

创建Consumer的代码如下:

Consumer<String> consumer = PulsarClient.builder()
    .serviceUrl("pulsar://localhost:6650")
    .build()
    .newConsumer(Schema.STRING)
    .topic("my-topic")
    .subscriptionName("my-subscription")
    .subscriptionType(SubscriptionType.Exclusive)
    .subscribe();

其中,serviceUrl是Pulsar服务的地址,Schema.STRING表示接收的数据类型为字符串,topic是消息目的地,subscriptionName是订阅者名称,subscriptionType是订阅类型。

接收消息的代码如下:

while (true) {
    Message<String> message = consumer.receive();
    System.out.println("Received message: " + message.getValue());
    consumer.acknowledge(message);
}

Pulsar的Web管理界面

Pulsar提供了Web管理界面,可以通过Web界面来管理Pulsar服务。

访问Web管理界面的地址为:http://localhost:8080,其中localhost为Pulsar服务所在的主机名或IP地址。

操作Pulsar的命令行工具

Pulsar还提供了命令行工具pulsar-admin,可以通过pulsar-admin来管理Pulsar服务。

查看集群信息

使用pulsar-admin查看集群信息的命令如下:

bin/pulsar-admin clusters list
查看Tenant信息

使用pulsar-admin查看Tenant信息的命令如下:

bin/pulsar-admin tenants list
查看Namespace信息

使用pulsar-admin查看Namespace信息的命令如下:

bin/pulsar-admin namespaces list
查看Topic信息

使用pulsar-admin查看Topic信息的命令如下:

bin/pulsar-admin topics list
创建Topic

使用pulsar-admin创建Topic的命令如下:

bin/pulsar-admin topics create persistent://public/default/my-topic
删除Topic

使用pulsar-admin删除Topic的命令如下:

bin/pulsar-admin topics delete persistent://public/default/my-topic

总结

Pulsar是一款高性能、可扩展、可靠的分布式消息处理平台,具备多种语言支持和易于使用的API。通过本文的介绍,你应该已经掌握了Pulsar的核心概念、安装和使用方法,以及如何使用Pulsar的Java客户端来发送和接收消息。如果想要深入了解和使用Pulsar,建议参考P


相关文章
|
3月前
|
存储 监控 固态存储
【vSAN分布式存储服务器数据恢复】VMware vSphere vSAN 分布式存储虚拟化平台VMDK文件1KB问题数据恢复案例
在一例vSAN分布式存储故障中,因替换故障闪存盘后磁盘组失效,一台采用RAID0策略且未使用置备的虚拟机VMDK文件受损,仅余1KB大小。经分析发现,该VMDK文件与内部虚拟对象关联失效导致。恢复方案包括定位虚拟对象及组件的具体物理位置,解析分配空间,并手动重组RAID0结构以恢复数据。此案例强调了深入理解vSAN分布式存储机制的重要性,以及定制化数据恢复方案的有效性。
90 5
|
7天前
|
消息中间件 监控 数据可视化
Apache Airflow 开源最顶级的分布式工作流平台
Apache Airflow 是一个用于创作、调度和监控工作流的平台,通过将工作流定义为代码,实现更好的可维护性和协作性。Airflow 使用有向无环图(DAG)定义任务,支持动态生成、扩展和优雅的管道设计。其丰富的命令行工具和用户界面使得任务管理和监控更加便捷。适用于静态和缓慢变化的工作流,常用于数据处理。
Apache Airflow 开源最顶级的分布式工作流平台
|
4月前
|
机器学习/深度学习 人工智能 Shell
人工智能平台PAI操作报错合集之在分布式训练过程中遇到报错,是什么原因
阿里云人工智能平台PAI是一个功能强大、易于使用的AI开发平台,旨在降低AI开发门槛,加速创新,助力企业和开发者高效构建、部署和管理人工智能应用。其中包含了一系列相互协同的产品与服务,共同构成一个完整的人工智能开发与应用生态系统。以下是对PAI产品使用合集的概述,涵盖数据处理、模型开发、训练加速、模型部署及管理等多个环节。
|
30天前
|
NoSQL Java Redis
开发实战:使用Redisson实现分布式延时消息,订单30分钟关闭的另外一种实现!
本文详细介绍了 Redisson 延迟队列(DelayedQueue)的实现原理,包括基本使用、内部数据结构、基本流程、发送和获取延时消息以及初始化延时队列等内容。文章通过代码示例和流程图,逐步解析了延迟消息的发送、接收及处理机制,帮助读者深入了解 Redisson 延迟队列的工作原理。
|
5月前
|
消息中间件 NoSQL Java
Redis系列学习文章分享---第六篇(Redis实战篇--Redis分布式锁+实现思路+误删问题+原子性+lua脚本+Redisson功能介绍+可重入锁+WatchDog机制+multiLock)
Redis系列学习文章分享---第六篇(Redis实战篇--Redis分布式锁+实现思路+误删问题+原子性+lua脚本+Redisson功能介绍+可重入锁+WatchDog机制+multiLock)
227 0
|
3月前
|
消息中间件 Java Kafka
"Kafka快速上手:从环境搭建到Java Producer与Consumer实战,轻松掌握分布式流处理平台"
【8月更文挑战第10天】Apache Kafka作为分布式流处理平台的领头羊,凭借其高吞吐量、可扩展性和容错性,在大数据处理、实时日志收集及消息队列领域表现卓越。初学者需掌握Kafka基本概念与操作。Kafka的核心组件包括Producer(生产者)、Broker(服务器)和Consumer(消费者)。Producer发送消息到Topic,Broker负责存储与转发,Consumer则读取这些消息。首先确保已安装Java和Kafka,并启动服务。接着可通过命令行创建Topic,并使用提供的Java API实现Producer发送消息和Consumer读取消息的功能。
70 8
|
3月前
|
消息中间件 SQL 关系型数据库
go-zero微服务实战系列(十、分布式事务如何实现)
go-zero微服务实战系列(十、分布式事务如何实现)
|
4月前
|
存储 缓存 分布式计算
高并发架构设计三大利器:缓存、限流和降级问题之缓存的应对策略问题如何解决
高并发架构设计三大利器:缓存、限流和降级问题之缓存的应对策略问题如何解决
|
5月前
|
机器学习/深度学习 人工智能 自然语言处理
人工智能平台PAI产品使用合集之如何配置cluster系统自动生成分布式参数
阿里云人工智能平台PAI是一个功能强大、易于使用的AI开发平台,旨在降低AI开发门槛,加速创新,助力企业和开发者高效构建、部署和管理人工智能应用。其中包含了一系列相互协同的产品与服务,共同构成一个完整的人工智能开发与应用生态系统。以下是对PAI产品使用合集的概述,涵盖数据处理、模型开发、训练加速、模型部署及管理等多个环节。

热门文章

最新文章