8张图带你彻底理解Pulsar的跨地域复制

本文涉及的产品
注册配置 MSE Nacos/ZooKeeper,118元/月
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
容器服务 Serverless 版 ACK Serverless,952元额度 多规格
简介: 8张图带你彻底理解Pulsar的跨地域复制

地域复制是 Apache Pulsar 企业级特性的重要组成部分,它保证了系统的高可用,在操作和管理上也非常便捷,今天用 5 张图来带大家学习这个功能。

1 多机房部署

Pulsar 多机房部署如下图:

微信图片_20221213103903.png

上面的 Pulsar 架构中,Pulsar 3 个集群分别部署在北京、上海、贵阳 3 个机房,每个机房一套集群,每个集群中都有一个 Topic1,并且对应着订阅是 Subscription1。但是 3 个集群之间并没有数据同步。如果某一个机房发生故障,那这个机房的存量消息将不能被消费掉。

2 跨地域复制 (GEO-Replication)

Pulsar 最初是在 Yahoo 内部开发,在设计之初就加入了对 Yahoo 全球十多个机房的跨地域复制的需求。在上面的例子中,如果这 3 个机房可以相互同步数据,那即使某一个机房发生故障了,这个机房的存量数据因为已经被同步到其他机房,可以被其他机房的消费者消费掉。如下图:

微信图片_20221213103927.png

那 Pulsar 的跨地域复制到底是怎么做的呢?

2.1 存储模型回顾

首先我们回顾一下 Pulsar 的存储模型。我们知道,Pulsar 的消息持久化用到了存储系统 BookKeeper,如下图:

微信图片_20221213103951.png

Producer 生产完消息后,会刷到底层的 BookKeeper 存储引擎进行持久化。

Consumer 创建的时候要订阅一个 Topic,Pulsar 就会给它分配一个 Subscription 进行绑定,如上图 Consumer 绑定了 Subscription2。

Subscription 会持续从 Ledger 中获取消息推给 Consumer,当然前提是 Consumer 要有消息缓存空间。

Consumer 消费完成一个消息后,回复给 Subscription 一个 ACK,Subscription 收到 ACK 后把游标向后推一位。这个游标也是保存在了 BookKeeper,BookKeeper 会专门为这个游标开一个 Ledger。

2.2 跨地域复制过程

Pulsar 的跨地域复制跟上面的存储模型很类似,集群中多了一个 Replicator。以上海机房复制到北京机房为例,如下图:

微信图片_20221213104015.png

上海机房的 Pulsar 集群中有一个 Replicator,这个 Replicator 中有一个 Producer-R,绑定的了北京机房的 Topic1,把数据用生产者的方式发送到北京机房。

上海机房集群中生产的消息首先在本地集群中持久化,然后再被异步转发到北京集群。

上海机房 Replicator 中的 Producer-R 跟集群中的 Producer1 没有任何关系,它配置的集群地址是北京机房集群地址。

整个复制流程如下:

  • Producer1 生产消息到上海机房 Topic1;
  • 上海机房把消息持久化到 BookKeeper;
  • BookKeeper 返回成功后把消息推给 Replicator 的 Cursor;
  • Replicator 的 Cursor 通过 Producer-R 把消息发给北京机房 Topic1;
  • 北京机房 Topic1 写入 BookKeeper 成功后给上海机房 Replicator 的 Cursor 回复一个 ACK,上海机房 Cursor 收到 ACK 后通过  Producer-R 推送下一条消息。

2.3 消息丢失和幂等

因为在 Replicator 中维护了一个 Cursor,如果一条消息没有收到北京机房的 ACK,Replicator 可以通过 Producer-R 再次把这条消息发送北京机房,这样可以防止消息丢失。

如果因为网络问题,Producer-R 给北京机房推送消息后,北京机房回复的 ACK 上海机房没有收到,怎么处理呢?Producer-R 会再次给北京机房发送同一条消息,这种场景很容易导致消息重复。为了解决消息幂等的问题,Pulsar 提供了一个 Producer 幂等配置,北京机房开启这个设置后,broker 中会缓存一个内部 Cursor,用于保存收到的上一条消息的 MessageId ,如果收到一条新消息的 MessageId 小于等于当前 Cursor 中缓存的 MessageId,这条消息就会被丢掉。

2.4 消息顺序

上图中,上海机房的 Producer-R 和 北京机房的 Producer2 都往北京机房的 Topic1 写消息,消息的顺序怎么保证呢?

因为跨机房复制是异步的过程,Pulsar 只能保证上海机房和北京机房各自写入消息的顺序性,比如上海机房Producer-R 写入 msg1~msg5 这 5 条消息,北京机房 Producer2 写入 msgA~msgE 这5条消息,最终消息顺序可能如下:

微信图片_20221213104042.png

2.5 低延迟

跨区域复制的低延迟从两个方面来保证:

  • Replicator 和 broker是在一个进程中,这样减少了数据拷贝
  • 跨地域复制采用异步方式

2.6 ZooKeeper 集群

跨机房复制可以采用全局 ZooKeeper 集群,把 Pulsar 集群信息注册到 ZooKeeper 集群。如下图:

微信图片_20221213104106.png

这样每个集群就可以根据 ZooKeeper 中保存的信息来创建本地的 Replicator。

但是如果没有全局 ZooKeeper 集群,因为保存的数据是轻量级的,使用本地 ZooKeeper 集群也是可以的。如下图:

微信图片_20221213104129.png

这样每个机房的 Pulsar 集群从本地 ZooKeeper 中获取到需要复制的远程集群信息,就可以创建 Replicator 了。这种情况反而更加灵活。因为下面这种方式的 Pulsar 集群,全局 ZooKeeper 是不能满足要求的。

比如现在有一个西安机房的 Pulsar 集群自己不生产消息,只接受从北京、上海、贵阳三个机房的复制数据,如下图:

微信图片_20221213104151.png

3 复制原理

Pulsar 中 Topic 的格式如下:

persistent://tenant/namespace/topic

一个 Topic 的上级目录有 namespace 和 tenant。要允许两个集群间消息跨地域复制,首先要允许 tenant(租户) 有权限访问两个集群。而跨地域复制是在 namespace 级别进行管理的,如果允许一个 namespace 跨地域复制,那发布到这个 namespace 上的任意一个 topic 的消息,都会被复制到指定集合的所有集群中。

3.1 tenant 授权

要使用跨地域复制,首先要给租户设置访问权限。下面命令给 my-tenant 这个租户授予了 pulsar-shanghai、pulsar-beijing 和 pulsar-guiyang 的访问权限。

bin/pulsar-admin tenants create my-tenant --admin-roles my-admin-role --allowed-clusters pulsar-shanghai,pulsar-beijing,pulsar-guiyang

3.2 namespace 级别启动

跨地域复制是在 namespace 级别进行管理的,租户拥有了权限后,把 namespace 指定给要复制的集群:

bin/pulsar-admin namespaces set-clusters my-tenant/my-namespace --clusters pulsar-shanghai,pulsar-beijing,pulsar-guiyang

namespace 级别的复制可以随时改变,改变后立刻生效。

namespace  配置跨地域复制后,默认该 namespace 下创建的所有 Topic 都会复制到列表中其他集群。如果要选择固定的集群进行复制,可以使用 Pulsar Client 来指定,比如 Java Client 下面的代码只允许 my-topic 这个 topic 在pulsar-shanghai,pulsar-beijing 这两个集群间复制。

List<String>restrictReplicationTo=Arrays.asList(

       "pulsar-shanghai",

       "pulsar-beijing"

);

Producerproducer=client.newProducer()

       .topic("my-topic")

       .create();

producer.newMessage()

       .value("my-payload".getBytes())

       .setReplicationClusters(restrictReplicationTo)

       .send();

3.3 Topic 级别启动

要让一个 Topic 能够跨地域复制,要在 Topic 级别启动:

bin/pulsar-admin topics set-replication-clusters --clusters pulsar-shanghai,pulsar-beijing,pulsar-guiyang my-tenant/my-namespace/Topic1

3.4 防止循环复制

如果配置了上海机房和北京机房之间的跨地域复制,那从上海机房复制到北京机房后,消息有没有可能从北京机房再复制到上海机房呢?

当然不会。上海机房发送消息到北京机房时,会给消息加一个 Property,用来表示是哪个机房生产的数据。北京机房收到这个数据后,就会知道是从别的机房复制来的,Replicator 中的 Cursor 在订阅消息时就会把这部分消息过滤掉。

总结

一句话概括,Pulsar 的跨地域复制,其实就是在一个本地集群中创建一个 Producer,把异地的集群作为这个 Producer 的发送地址,将本地集群的消息发送过去,并且在本地维护一个 Cusor 来保证消息可靠性和幂等性。

相关实践学习
通过Ingress进行灰度发布
本场景您将运行一个简单的应用,部署一个新的应用用于新的发布,并通过Ingress能力实现灰度发布。
容器应用与集群管理
欢迎来到《容器应用与集群管理》课程,本课程是“云原生容器Clouder认证“系列中的第二阶段。课程将向您介绍与容器集群相关的概念和技术,这些概念和技术可以帮助您了解阿里云容器服务ACK/ACK Serverless的使用。同时,本课程也会向您介绍可以采取的工具、方法和可操作步骤,以帮助您了解如何基于容器服务ACK Serverless构建和管理企业级应用。 学习完本课程后,您将能够: 掌握容器集群、容器编排的基本概念 掌握Kubernetes的基础概念及核心思想 掌握阿里云容器服务ACK/ACK Serverless概念及使用方法 基于容器服务ACK Serverless搭建和管理企业级网站应用
相关文章
|
API 索引
es实战-分片分配失败解决方案
分片无法分配情况的一些解决办法
2013 0
|
2月前
|
SQL 关系型数据库 MySQL
实时数仓 Hologres操作报错合集之遇到报错:连接到 FE 失败,剩余的连接槽保留用于非复制超级用户连接,该怎么处理
在使用阿里云实时数仓Hologres时,可能会遇到不同类型的错误。例如:1.内存超限错误、2.字符串缓冲区扩大错误、3.分区导入错误、4.外部表访问错误、5.服务未开通或权限问题、6.数据类型范围错误,下面是一些常见错误案例及可能的原因与解决策略的概览。
|
24天前
|
关系型数据库 分布式数据库 数据库
PolarDB产品使用问题之“主集群和从集群地域映射表”指的是什么
PolarDB产品使用合集涵盖了从创建与管理、数据管理、性能优化与诊断、安全与合规到生态与集成、运维与支持等全方位的功能和服务,旨在帮助企业轻松构建高可用、高性能且易于管理的数据库环境,满足不同业务场景的需求。用户可以通过阿里云控制台、API、SDK等方式便捷地使用这些功能,实现数据库的高效运维与持续优化。
PolarDB产品使用问题之“主集群和从集群地域映射表”指的是什么
|
1月前
|
分布式计算 DataWorks 数据管理
DataWorks操作报错合集之资源组切换后仍然报错,并且提示了新的IP地址172.25.0.67,该如何排查
DataWorks是阿里云提供的一站式大数据开发与治理平台,支持数据集成、数据开发、数据服务、数据质量管理、数据安全管理等全流程数据处理。在使用DataWorks过程中,可能会遇到各种操作报错。以下是一些常见的报错情况及其可能的原因和解决方法。
|
2月前
|
DataWorks API 调度
DataWorks产品使用合集之在调度配置配置了节点的上游节点输出,没办法自动生成这个flow的依赖,该怎么操作
DataWorks作为一站式的数据开发与治理平台,提供了从数据采集、清洗、开发、调度、服务化、质量监控到安全管理的全套解决方案,帮助企业构建高效、规范、安全的大数据处理体系。以下是对DataWorks产品使用合集的概述,涵盖数据处理的各个环节。
|
2月前
|
SQL DataWorks 数据处理
DataWorks操作报错合集之在创建ES的数据源时,测试连通性提示无法连通,出现报错,如何解决
DataWorks是阿里云提供的一站式大数据开发与治理平台,支持数据集成、数据开发、数据服务、数据质量管理、数据安全管理等全流程数据处理。在使用DataWorks过程中,可能会遇到各种操作报错。以下是一些常见的报错情况及其可能的原因和解决方法。
|
2月前
|
SQL 数据采集 DataWorks
DataWorks操作报错合集之数据集成里面的数据调度独享资源组测试通过了,但是数据地图里无法通过,该如何解决
DataWorks是阿里云提供的一站式大数据开发与治理平台,支持数据集成、数据开发、数据服务、数据质量管理、数据安全管理等全流程数据处理。在使用DataWorks过程中,可能会遇到各种操作报错。以下是一些常见的报错情况及其可能的原因和解决方法。
|
3月前
|
存储 SQL 运维
跨节点参数的缘起与今生
Dataphin v3.13引入了跨节点参数功能,允许任务间传递消息。输出节点(如SQL、Shell、Python任务)能输出参数,输入节点可以接收并使用这些参数。此功能解决了通过公共存储中转消息的复杂性和低效问题。应用场景包括:金融企业的币种转换,其中汇率任务(输出节点)提供汇率,转换任务(输入节点)使用该汇率;以及产品目录更新检查,通过跨节点参数控制是否需要执行数据导入任务。用户可以通过任务编辑器设置和传递跨节点参数,并在运维中进行补数据操作。
135 2
跨节点参数的缘起与今生
|
2月前
|
SQL 关系型数据库 MySQL
PolarDB产品使用合集之当主节点发生切换后,客户端需要重新配置写入节点吗
PolarDB是阿里云推出的一种云原生数据库服务,专为云设计,提供兼容MySQL、PostgreSQL的高性能、低成本、弹性可扩展的数据库解决方案,可以有效地管理和优化PolarDB实例,确保数据库服务的稳定、高效运行。以下是使用PolarDB产品的一些建议和最佳实践合集。
|
3月前
|
SQL 分布式计算 DataWorks
DataWorks常见问题之使用连接串模式新建ES数据源报错如何解决
DataWorks是阿里云提供的一站式大数据开发与管理平台,支持数据集成、数据开发、数据治理等功能;在本汇总中,我们梳理了DataWorks产品在使用过程中经常遇到的问题及解答,以助用户在数据处理和分析工作中提高效率,降低难度。