首页> 标签> RocketMQ
"RocketMQ"
共 3187 条结果
全部 问答 文章 公开课 课程 电子书 技术圈 体验
基于 RocketMQ Prometheus Exporter 打造定制化 DevOps 平台
作者 | 陈厚道 冯庆来源 | 阿里巴巴云原生公众号导读:本文将对 RocketMQ-Exporter 的设计实现做一个简单的介绍,读者可通过本文了解到 RocketMQ-Exporter 的实现过程,以及通过 RocketMQ-Exporter 来搭建自己的 RocketMQ 监控系统。RocketMQ 在线可交互教程现已登录知行动手实验室,PC 端登录 start.aliyun.com 即可直达。RocketMQ 云原生系列文章:阿里的 RocketMQ 如何让双十一峰值之下 0 故障当 RocketMQ 遇上 Serverless,会碰撞出怎样的火花?云原生时代 RocketMQ 运维管控的利器 - RocketMQ Operator云原生时代消息中间件的演进路线基于 RocketMQ Prometheus Exporter 打造定制化 DevOps 平台(本文)RocketMQ-Exporter 项目的 GitHub 地址:https://github.com/apache/rocketmq-exporter文章主要内容包含以下几个方面:RocketMQ 介绍Prometheus 简介RocketMQ-Exporter 的具体实现RocketMQ-Exporter 的监控指标和告警指标RocketMQ-Exporter 使用示例RocketMQ 介绍RocketMQ 是一个分布式消息和流数据平台,具有低延迟、高性能、高可靠性、万亿级容量和灵活的可扩展性。简单的来说,它由 Broker 服务器和客户端两部分组成,其中客户端一个是消息发布者客户端(Producer),它负责向 Broker 服务器发送消息;另外一个是消息的消费者客户端(Consumer),多个消费者可以组成一个消费组,来订阅和拉取消费 Broker 服务器上存储的消息。正由于它具有高性能、高可靠性和高实时性的特点,与其他协议组件在 MQTT 等各种消息场景中的结合也越来越多,应用越来越广泛。而对于这样一个强大的消息中间件平台,在实际使用的时候还缺少一个监控管理平台。当前在开源界,使用最广泛监控解决方案的就是 Prometheus。与其它传统监控系统相比较,Prometheus 具有易于管理,监控服务的内部运行状态,强大的数据模型,强大的查询语言 PromQL,高效的数据处理,可扩展,易于集成,可视化,开放性等优点。并且借助于 Prometheus 可以很快速的构建出一个能够监控 RocketMQ 的监控平台。Prometheus 简介下图展示了 Prometheus 的基本架构:1. Prometheus ServerPrometheus Server 是 Prometheus 组件中的核心部分,负责实现对监控数据的获取,存储以及查询。Prometheus Server 可以通过静态配置管理监控目标,也可以配合使用 Service Discovery 的方式动态管理监控目标,并从这些监控目标中获取数据。其次 Prometheus Server 需要对采集到的监控数据进行存储,Prometheus Server 本身就是一个时序数据库,将采集到的监控数据按照时间序列的方式存储在本地磁盘当中。最后 Prometheus Server 对外提供了自定义的 PromQL 语言,实现对数据的查询以及分析。2. ExportersExporter 将监控数据采集的端点通过 HTTP 服务的形式暴露给 Prometheus Server,Prometheus Server 通过访问该 Exporter 提供的 Endpoint 端点,即可获取到需要采集的监控数据。RocketMQ-Exporter 就是这样一个 Exporter,它首先从 RocketMQ 集群采集数据,然后借助 Prometheus 提供的第三方客户端库将采集的数据规范化成符合 Prometheus 系统要求的数据,Prometheus 定时去从 Exporter 拉取数据即可。当前 RocketMQ Exporter 已被 Prometheus 官方收录,其地址为:https://github.com/apache/rocketmq-exporter。RocketMQ-Exporter 的具体实现当前在 Exporter 当中,实现原理如下图所示:整个系统基于 spring boot 框架来实现。由于 MQ 内部本身提供了比较全面的数据统计信息,所以对于 Exporter 而言,只需要将 MQ 集群提供的统计信息取出然后进行加工而已。所以 RocketMQ-Exporter 的基本逻辑是内部启动多个定时任务周期性的从 MQ 集群拉取数据,然后将数据规范化后通过端点暴露给 Prometheus 即可。其中主要包含如下主要的三个功能部分:MQAdminExt 模块通过封装 MQ 系统客户端提供的接口来获取 MQ 集群内部的统计信息。MetricService 负责将 MQ 集群返回的结果数据进行加工,使其符合 Prometheus 要求的格式化数据。Collect 模块负责存储规范化后的数据,最后当 Prometheus 定时从 Exporter 拉取数据的时候,Exporter 就将 Collector 收集的数据通过 HTTP 的形式在/metrics 端点进行暴露。RocketMQ-Exporter 的监控指标和告警指标RocketMQ-Exporter 主要是配合 Prometheus 来做监控,下面来看看当前在 Expoter 中定义了哪些监控指标和告警指标。监控指标rocketmq_message_accumulation 是一个聚合指标,需要根据其它上报指标聚合生成。告警指标消费者堆积告警指标也是一个聚合指标,它根据消费堆积的聚合指标生成,value 这个阈值对每个消费者是不固定的,当前是根据过去 5 分钟生产者生产的消息数量来定,用户也可以根据实际情况自行设定该阈值。告警指标设置的值只是个阈值只是象征性的值,用户可根据在实际使用 RocketMQ 的情况下自行设定。这里重点介绍一下消费者堆积告警指标,在以往的监控系统中,由于没有像 Prometheus 那样有强大的 PromQL 语言,在处理消费者告警问题时势必需要为每个消费者设置告警,那这样就需要 RocketMQ 系统的维护人员为每个消费者添加,要么在系统后台检测到有新的消费者创建时自动添加。在 Prometheus 中,这可以通过一条如下的语句来实现:(sum(rocketmq_producer_offset) by (topic) - on(topic) group_right sum(rocketmq_consumer_offset) by (group,topic)) - ignoring(group) group_left sum (avg_over_time(rocketmq_producer_tps[5m])) by (topic)*5*60 > 0借助 PromQL 这一条语句不仅可以实现为任意一个消费者创建消费告警堆积告警,而且还可以使消费堆积的阈值取一个跟生产者发送速度相关的阈值。这样大大增加了消费堆积告警的准确性。RocketMQ-Exporter 使用示例1. 启动 NameServer 和 Broker要验证 RocketMQ 的 Spring-Boot 客户端,首先要确保 RocketMQ 服务正确的下载并启动。可以参考 RocketMQ 主站的快速开始来进行操作。确保启动 NameServer 和 Broker 已经正确启动。2. 编译 RocketMQ-Exporter用户当前使用,需要自行下载 git 源码编译:git clone https://github.com/apache/rocketmq-exporter cd rocketmq-exporter mvn clean install3. 配置和运行RocketMQ-Exporter 有如下的运行选项:以上的运行选项既可以在下载代码后在配置文件中更改,也可以通过命令行来设置。编译出来的 jar 包就叫 rocketmq-exporter-0.0.1-SNAPSHOT.jar,可以通过如下的方式来运行。java -jar rocketmq-exporter-0.0.1-SNAPSHOT.jar [--rocketmq.config.namesrvAddr="127.0.0.1:9876" ...]4. 安装 Prometheus首先到 Prometheus 官方下载地址去下载 Prometheus 安装包,当前以 linux 系统安装为例,选择的安装包为 prometheus-2.7.0-rc.1.linux-amd64.tar.gz,经过如下的操作步骤就可以启动 prometheus 进程。tar -xzf prometheus-2.7.0-rc.1.linux-amd64.tar.gzcd prometheus-2.7.0-rc.1.linux-amd64/./prometheus --config.file=prometheus.yml --web.listen-address=:5555Prometheus 默认监听端口号为 9090,为了不与系统上的其它进程监听端口冲突,我们在启动参数里面重新设置了监听端口号为 5555。然后通过浏览器访问 http://<服务器 IP 地址>:5555,就可以验证 Prometheus 是否已成功安装,显示界面如下:由于 RocketMQ-Exporter 进程已启动,这个时候可以通过 Prometheus 来抓取 RocketMQ-Exporter 的数据,这个时候只需要更改 Prometheus 启动的配置文件即可。整体配置文件如下:# my global config global: scrape_interval: 15s # Set the scrape interval to every 15 seconds. Default is every 1 minute. evaluation_interval: 15s # Evaluate rules every 15 seconds. The default is every 1 minute. # scrape_timeout is set to the global default (10s). # Load rules once and periodically evaluate them according to the global 'evaluation_interval'. rule_files: # - "first_rules.yml" # - "second_rules.yml" scrape_configs: - job_name: 'prometheus' static_configs: - targets: ['localhost:5555'] - job_name: 'exporter' static_configs: - targets: ['localhost:5557']更改配置文件后,重启服务即可。重启后就可以在 Prometheus 界面查询 RocketMQ-Exporter 上报的指标,例如查询 rocketmq_broker_tps 指标,其结果如下:5. 告警规则添加在 Prometheus 可以展示 RocketMQ-Exporter 的指标后,就可以在 Prometheus 中配置 RocketMQ 的告警指标了。在 Prometheus 的配置文件中添加如下的告警配置项,*.rules 表示可以匹配多个后缀为 rules 的文件。rule_files: # - "first_rules.yml" # - "second_rules.yml" - /home/prometheus/prometheus-2.7.0-rc.1.linux-amd64/rules/*.rules当前设置的告警配置文件为 warn.rules,其文件具体内容如下所示。其中的阈值只起一个示例的作用,具体的阈值还需用户根据实际使用情况来自行设定。### # Sample prometheus rules/alerts for rocketmq. # ### # Galera Alerts groups: - name: GaleraAlerts rules: - alert: RocketMQClusterProduceHigh expr: sum(rocketmq_producer_tps) by (cluster) >= 10 for: 3m labels: severity: warning annotations: description: '{{$labels.cluster}} Sending tps too high.' summary: cluster send tps too high - alert: RocketMQClusterProduceLow expr: sum(rocketmq_producer_tps) by (cluster) < 1 for: 3m labels: severity: warning annotations: description: '{{$labels.cluster}} Sending tps too low.' summary: cluster send tps too low - alert: RocketMQClusterConsumeHigh expr: sum(rocketmq_consumer_tps) by (cluster) >= 10 for: 3m labels: severity: warning annotations: description: '{{$labels.cluster}} consuming tps too high.' summary: cluster consume tps too high - alert: RocketMQClusterConsumeLow expr: sum(rocketmq_consumer_tps) by (cluster) < 1 for: 3m labels: severity: warning annotations: description: '{{$labels.cluster}} consuming tps too low.' summary: cluster consume tps too low - alert: ConsumerFallingBehind expr: (sum(rocketmq_producer_offset) by (topic) - on(topic) group_right sum(rocketmq_consumer_offset) by (group,topic)) - ignoring(group) group_left sum (avg_over_time(rocketmq_producer_tps[5m])) by (topic)*5*60 > 0 for: 3m labels: severity: warning annotations: description: 'consumer {{$labels.group}} on {{$labels.topic}} lag behind and is falling behind (behind value {{$value}}).' summary: consumer lag behind - alert: GroupGetLatencyByStoretime expr: rocketmq_group_get_latency_by_storetime > 1000 for: 3m labels: severity: warning annotations: description: 'consumer {{$labels.group}} on {{$labels.broker}}, {{$labels.topic}} consume time lag behind message store time and (behind value is {{$value}}).' summary: message consumes time lag behind message store time too much 最终,可以在 Prometheus 的看一下告警展示效果,红色表示当前处于告警状态的项,绿色表示正常状态。6. Grafana dashboard for RocketMQPrometheus 自身的指标展示平台没有当前流行的展示平台 Grafana 好, 为了更好的展示 RocketMQ 的指标,可以使用 Grafana 来展示 Prometheus 获取的指标。首先到官网去下载:https://grafana.com/grafana/download,这里仍以二进制文件安装为例进行介绍。wget https://dl.grafana.com/oss/release/grafana-6.2.5.linux-amd64.tar.gz tar -zxvf grafana-6.2.5.linux-amd64.tar.gz cd grafana-5.4.3/同样为了不与其它进程的使用端口冲突,可以修改 conf 目录下的 defaults.ini 文件的监听端口,当前将 grafana 的监听端口改为 55555,然后使用如下的命令启动即可:./bin/grafana-server web然后通过浏览器访问 http://<服务器 IP 地址>:55555,就可以验证 grafana 是否已成功安装。系统默认用户名和密码为 admin/admin,第一次登陆系统会要求修改密码,修改密码后登陆,界面显示如下:点击 Add data source 按钮,会要求选择数据源。选择数据源为 Prometheus,设置数据源的地址为前面步骤启动的 Prometheus 的地址。回到主界面会要求创建新的 Dashboard。点击创建 dashboard,创建 dashboard 可以自己手动创建,也可以以配置文件导入的方式创建,当前已将 RocketMQ 的 dashboard 配置文件上传到 Grafana 的官网,这里以配置文件导入的方式进行创建。点击 New dashboard 下拉按钮。选择 import dashboard。这个时候可以到 Grafana 官网去下载当前已为 RocketMQ 创建好的配置文件,地址为:https://grafana.com/dashboards/10477/revisions,如下图所示:点击 download 就可以下载配置文件,下载配置文件然后,复制配置文件中的内容粘贴到上图的粘贴内容处。最后按上述方式就将配置文件导入到 Grafana 了。最终的效果如下所示:作者简介陈厚道,曾就职于腾讯、盛大、斗鱼等互联网公司。目前就职于尚德机构,在尚德机构负责基础架构方面的设计和开发工作。对分布式消息队列、微服务架构和落地、DevOps 和监控平台有比较深入的研究。冯庆,曾就职于华为。目前就职于尚德机构,在尚德机构基础架构团队负责基础组件的开发工作。在 PC 端登录 start.aliyun.com 知行动手实验室,沉浸式体验在线交互教程。
文章
消息中间件  ·  存储  ·  Prometheus  ·  监控  ·  Cloud Native  ·  Devops  ·  Java  ·  Apache  ·  数据安全/隐私保护  ·  RocketMQ
2023-02-02
阿里的 RocketMQ 如何让双十一峰值之下 0 故障?
作者 | 愈安来源 | 阿里巴巴云原生公众号2020 年双十一交易峰值达到 58.3 W 笔/秒,消息中间件 RocketMQ 继续数年 0 故障丝般顺滑地完美支持了整个集团大促的各类业务平稳。2020 年双十一大促中,消息中间件 RocketMQ 发生了以下几个方面的变化:云原生化实践:完成运维层面的云原生化改造,实现 Kubernetes 化。性能优化:消息过滤优化交易集群性能提升 30%。全新的消费模型:对于延迟敏感业务提供新的消费模式,降低因发布、重启等场景下导致的消费延迟。云原生化实践1. 背景Kubernetes 作为目前云原生化技术栈实践中重要的一环,其生态已经逐步建立并日益丰富。目前,服务于集团内部的 RocketMQ 集群拥有巨大的规模以及各种历史因素,因此在运维方面存在相当一部分痛点,我们希望能够通过云原生技术栈来尝试找到对应解决方案,并同时实现降本提效,达到无人值守的自动化运维。消息中间件早在 2016 年,通过内部团队提供的中间件部署平台实现了容器化和自动化发布,整体的运维比 2016 年前已经有了很大的提高,但是作为一个有状态的服务,在运维层面仍然存在较多的问题。中间件部署平台帮我们完成了资源的申请,容器的创建、初始化、镜像安装等一系列的基础工作,但是因为中间件各个产品都有自己不同的部署逻辑,所以在应用的发布上,就是各应用自己的定制化了。中间件部署平台的开发也不完全了解集团内 RocketMQ 的部署过程是怎样的。因此在 2016 年的时候,部署平台需要我们去亲自实现消息中间件的应用发布代码。虽然部署平台大大提升了我们的运维效率,甚至还能实现一键发布,但是这样的方案也有不少的问题。比较明显的就是,当我们的发布逻辑有变化的时候,还需要去修改部署平台对应的代码,需要部署平台升级来支持我们,用最近比较流行的一个说法,就是相当不云原生。同样在故障机替换、集群缩容等操作中,存在部分人工参与的工作,如切流,堆积数据的确认等。我们尝试过在部署平台中集成更多消息中间件自己的运维逻辑,不过在其他团队的工程里写自己的业务代码,确实也是一个不太友好的实现方案,因此我们希望通过 Kubernetes 来实现消息中间件自己的 operator 。我们同样希望利用云化后云盘的多副本能力来降低我们的机器成本并降低主备运维的复杂程度。经过一段时间的跟进与探讨,最终再次由内部团队承担了建设云原生应用运维平台的任务,并依托于中间件部署平台的经验,借助云原生技术栈,实现对有状态应用自动化运维的突破。2. 实现整体的实现方案如上图所示,通过自定义的 CRD 对消息中间件的业务模型进行抽象,将原有的在中间件部署平台的业务发布部署逻辑下沉到消息中间件自己的 operator 中,托管在内部 Kubernetes 平台上。该平台负责所有的容器生产、初始化以及集团内一切线上环境的基线部署,屏蔽掉 IaaS 层的所有细节。Operator 承担了所有的新建集群、扩容、缩容、迁移的全部逻辑,包括每个 pod 对应的 brokerName 自动生成、配置文件,根据集群不同功能而配置的各种开关,元数据的同步复制等等。同时之前一些人工的相关操作,比如切流时候的流量观察,下线前的堆积数据观察等也全部集成到了 operator 中。当我们有需求重新修改各种运维逻辑的时候,也再也不用去依赖通用的具体实现,修改自己的 operator 即可。最后线上的实际部署情况去掉了图中的所有的 replica 备机。在 Kubernetes 的理念中,一个集群中每个实例的状态是一致的,没有依赖关系,而如果按照消息中间件原有的主备成对部署的方案,主备之间是有严格的对应关系,并且在上下线发布过程中有严格的顺序要求,这种部署模式在 Kubernetes 的体系下是并不提倡的。若依然采用以上老的架构方式,会导致实例控制的复杂性和不可控性,同时我们也希望能更多的遵循 Kubernetes 的运维理念。云化后的 ECS 使用的是高速云盘,底层将对数据做了多备份,因此数据的可用性得到了保障。并且高速云盘在性能上完全满足 MQ 同步刷盘,因此,此时就可以把之前的异步刷盘改为同步,保证消息写入时的不丢失问题。云原生模式下,所有的实例环境均是一致性的,依托容器技术和 Kubernetes 的技术,可实现任何实例挂掉(包含宕机引起的挂掉),都能自动自愈,快速恢复。解决了数据的可靠性和服务的可用性后,整个云原生化后的架构可以变得更加简单,只有 broker 的概念,再无主备之分。3. 大促验证上图是 Kubernetes 上线后双十一大促当天的发送 RT 统计,可见大促期间的发送 RT 较为平稳,整体符合预期,云原生化实践完成了关键性的里程碑。性能优化1. 背景RocketMQ 至今已经连续七年 0 故障支持集团的双十一大促。自从 RocketMQ 诞生以来,为了能够完全承载包括集团业务中台交易消息等核心链路在内的各类关键业务,复用了原有的上层协议逻辑,使得各类业务方完全无感知的切换到 RocketMQ 上,并同时充分享受了更为稳定和强大的 RocketMQ 消息中间件的各类特性。当前,申请订阅业务中台的核心交易消息的业务方一直都在不断持续增加,并且随着各类业务复杂度提升,业务方的消息订阅配置也变得更加复杂繁琐,从而使得交易集群的进行过滤的计算逻辑也变得更为复杂。这些业务方部分沿用旧的协议逻辑(Header 过滤),部分使用 RocketMQ 特有的 SQL 过滤。2. 主要成本目前集团内部 RocketMQ 的大促机器成本绝大部分都是交易消息相关的集群,在双十一零点峰值期间,交易集群的峰值和交易峰值成正比,叠加每年新增的复杂订阅带来了额外 CPU 过滤计算逻辑,交易集群都是大促中机器成本增长最大的地方。3. 优化过程由于历史原因,大部分的业务方主要还是使用 Header 过滤,内部实现其实是aviator 表达式。仔细观察交易消息集群的业务方过滤表达式,可以发现绝大部分都指定类似MessageType == xxxx这样的条件。翻看 aviator 的源码可以发现这样的条件最终会调用 Java 的字符串比较String.compareTo()。由于交易消息包括大量不同业务的 MessageType,光是有记录的起码有几千个,随着交易业务流程复杂化,MessageType 的增长更是繁多。随着交易峰值的提高,交易消息峰值正比增长,叠加这部分更加复杂的过滤,持续增长的将来,交易集群的成本极可能和交易峰值指数增长,因此决心对这部分进行优化。原有的过滤流程如下,每个交易消息需要逐个匹配不同 group 的订阅关系表达式,如果符合表达式,则选取对应的 group 的机器进行投递。如下图所示:对此流程进行优化的思路需要一定的灵感,在这里借助数据库索引的思路:原有流程可以把所有订阅方的过滤表达式看作数据库的记录,每次消息过滤就相当于一个带有特定条件的数据库查询,把所有匹配查询(消息)的记录(过滤表达式)选取出来作为结果。为了加快查询结果,可以选择 MessageType 作为一个索引字段进行索引化,每次查询变为先匹配 MessageType 主索引,然后把匹配上主索引的记录再进行其它条件(如下图的 sellerId 和 testA )匹配,优化流程如下图所示:以上优化流程确定后,要关注的技术点有两个:如何抽取每个表达式中的 MessageType 字段?如何对 MessageType 字段进行索引化?对于技术点 1 ,需要针对 aviator 的编译流程进行 hook ,深入 aviator 源码后,可以发现 aviator 的编译是典型的Recursive descent,同时需要考虑到提取后父表达式的短路问题。在编译过程中针对 messageType==XXX 这种类型进行提取后,把原有的 message==XXX 转变为 true/false 两种情况,然后针对 true、false 进行表达式的短路即可得出表达式优化提取后的情况。例如:表达式: messageType=='200-trade-paid-done' && buyerId==123456 提取为两个子表达式: 子表达式1(messageType==200-trade-paid-done):buyerId==123456 子表达式2(messageType!=200-trade-paid-done):false具体到 aviator 的实现里,表达式编译会把每个 token 构建一个 List ,类似如下图所示(为方便理解,绿色方框的是 token ,其它框表示表达式的具体条件组合):提取了 messageType ,有两种情况:情况一:messageType == '200-trade-paid-done',则把之前 token 的位置合并成true,然后进行表达式短路计算,最后优化成 buyerId==123456 ,具体如下:情况二:messageType != '200-trade-paid-done',则把之前 token 的位置合并成 false ,表达式短路计算后,最后优化成 false ,具体如下:这样就完成 messageType 的提取。这里可能有人就有一个疑问,为什么要考虑到上面的情况二,messageType != '200-trade-paid-done',这是因为必须要考虑到多个条件的时候,比如:(messageType=='200-trade-paid-done' && buyerId==123456) || (messageType=='200-trade-success' && buyerId==3333)就必须考虑到不等于的情况了。同理,如果考虑到多个表达式嵌套,需要逐步进行短路计算。但整体逻辑是类似的,这里就不再赘述。说完技术点 1,我们继续关注技术点 2,考虑到高效过滤,直接使用 HashMap 结构进行索引化即可,即把 messageType 的值作为 HashMap 的 key ,把提取后的子表达式作为 HashMap 的 value ,这样每次过滤直接通过一次 hash 计算即可过滤掉绝大部分不适合的表达式,大大提高了过滤效率。3. 优化效果该优化最主要降低了 CPU 计算逻辑,根据优化前后的性能情况对比,我们发现不同的交易集群中的订阅方订阅表达式复杂度越高,优化效果越好,这个是符合我们的预期的,其中最大的 CPU 优化有32%的提升,大大降低了本年度 RocketMQ 的部署机器成本。全新的消费模型 —— POP 消费1. 背景RocketMQ 的 PULL 消费对于机器异常 hang 时并不十分友好。如果遇到客户端机器 hang 住,但处于半死不活的状态,与 broker 的心跳没有断掉的时候,客户端 rebalance 依然会分配消费队列到 hang 机器上,并且 hang 机器消费速度很慢甚至无法消费的时候,这样会导致消费堆积。另外类似还有服务端 Broker 发布时,也会由于客户端多次 rebalance 导致消费延迟影响等无法避免的问题。如下图所示:当 Pull Client 2 发生 hang 机器的时候,它所分配到的三个 Broker 上的 Q2 都出现严重的红色堆积。对于此,我们增加了一种新的消费模型 —— POP 消费,能够解决此类稳定性问题。如下图所示:POP 消费中,三个客户端并不需要 rebalance 去分配消费队列,取而代之的是,它们都会使用 POP 请求所有的 broker 获取消息进行消费。broker 内部会把自身的三个队列的消息根据一定的算法分配给请求的 POP Client。即使 Pop Client 2 出现 hang,但内部队列的消息也会让 Pop Client1 和 Pop Client2 进行消费。这样就 hang 机器造成的避免了消费堆积。2. 实现POP 消费和原来 PULL 消费对比,最大的一点就是弱化了队列这个概念,PULL 消费需要客户端通过 rebalance 把 broker 的队列分配好,从而去消费分配到自己专属的队列,新的 POP 消费中,客户端的机器会直接到每个 broker 的队列进行请求消费, broker 会把消息分配返回给等待的机器。随后客户端消费结束后返回对应的 Ack 结果通知 broker,broker 再标记消息消费结果,如果超时没响应或者消费失败,再会进行重试。POP 消费的架构图如上图所示。Broker 对于每次 POP 的请求,都会有以下三个操作:对应的队列进行加锁,然后从 store 层获取该队列的消息;然后写入 CK 消息,表明获取的消息要被 POP 消费;最后提交当前位点,并释放锁。CK 消息实际上是记录了 POP 消息具体位点的定时消息,当客户端超时没响应的时候,CK 消息就会重新被 broker 消费,然后把 CK 消息的位点的消息写入重试队列。如果 broker 收到客户端的消费结果的 Ack ,删除对应的 CK 消息,然后根据具体结果判断是否需要重试。从整体流程可见,POP 消费并不需要 reblance ,可以避免 rebalance 带来的消费延时,同时客户端可以消费 broker 的所有队列,这样就可以避免机器 hang 而导致堆积的问题。
文章
消息中间件  ·  运维  ·  Kubernetes  ·  Cloud Native  ·  中间件  ·  双11  ·  数据库  ·  RocketMQ  ·  索引  ·  容器
2023-02-02
如何在 Spring 生态中玩转 RocketMQ?
在 Spring 生态中玩转 RocketMQ 系列教程现已登陆知行动手实验室,点击这里立即体验!移动端同学,需要在PC端登录 start.aliyun.com 进行体验。RocketMQ 作为业务消息的首选,在消息和流处理领域被广泛应用。而微服务生态 Spring 框架也是业务开发中最受欢迎的框架,两者的完美契合使得 RocketMQ 成为 Spring Messaging 实现中最受欢迎的消息实现。本文展示了 5 种在 Spring 生态中文玩转 RocketMQ 的方式,并描述了每个项目的特点和使用场景。一、前言上世纪 90 年代末,随着 Java EE(Enterprise Edition)的出现,特别是 Enterprise Java Beans 的使用需要复杂的描述符配置和死板复杂的代码实现,增加了广大开发者的学习曲线和开发成本,由此基于简单的 XML 配置和普通 Java 对象(Plain Old Java Objects)的 Spring 技术应运而生,依赖注入(Dependency Injection),控制反转(Inversion of Control)和面向切面编程(AOP)的技术更加敏捷地解决了传统 Java 企业及版本的不足。随着 Spring 的持续演进,基于注解(Annotation)的配置逐渐取代了 XML 文件配置。除了依赖注入、控制翻转、AOP 这些技术,Spring 后续衍生出 AMQP、Transactional、Security、Batch、Data Access 等模块,涉及开发的各个领域。2014 年 4 月 1 日,Spring Boot 1.0.0 正式发布。它基于“约定大于配置”(Convention over configuration)这一理念来快速地开发,测试,运行和部署 Spring 应用,并能通过简单地与各种启动器(如spring-boot-web-starter)结合,让应用直接以命令行的方式运行,不需再部署到独立容器中。Spring Boot 的出现可以说是 Spring 框架的第二春,它不但简化了开发的流程,目前更是事实标准。下面这幅图可以看出相同功能的 Spring 和 Spring Boot 的代码实现对比。Apache RocketMQ 是一款是业界知名的分布式消息和流处理中间件,它主要功能是消息分发、异步解耦、削峰填谷等。RocketMQ 是一款金融级消息及流数据平台,RocketMQ 在交易、支付链路上用的很多,主要是对消息链路质量要求非常高的场景,能够支持万亿级消息洪峰。RocketMQ 在业务消息中被广泛应用,并衍生出顺序消息、事务消息、延迟消息等匹配各类业务场景的特殊消息。本文的主角就是 Spring 和 RocketMQ,那几乎每个 Java 程序员都会使用 Spring 框架与支持丰富业务场景的 RocketMQ 会碰撞出怎么样的火花?二、RocketMQ 与 Spring 的碰撞在介绍 RocketMQ 与 Spring 故事之前,不得不提到 Spring 中的两个关于消息的框架,Spring Messaging 和 Spring Cloud Stream。它们都能够与 Spring Boot 整合并提供了一些参考的实现。和所有的实现框架一样,消息框架的目的是实现轻量级的消息驱动的微服务,可以有效地简化开发人员对消息中间件的使用复杂度,让系统开发人员可以有更多的精力关注于核心业务逻辑的处理。1. Spring MessagingSpring Messaging 是 Spring Framework 4 中添加的模块,是 Spring 与消息系统集成的一个扩展性的支持。它实现了从基于 JmsTemplate 的简单的使用 JMS 接口到异步接收消息的一整套完整的基础架构,Spring AMQP 提供了该协议所要求的类似的功能集。在与 Spring Boot 的集成后,它拥有了自动配置能力,能够在测试和运行时与相应的消息传递系统进行集成。单纯对于客户端而言,Spring Messaging 提供了一套抽象的 API 或者说是约定的标准,对消息发送端和消息接收端的模式进行规定,比如消息 Messaging 对应的模型就包括一个消息体 Payload 和消息头 Header。不同的消息中间件提供商可以在这个模式下提供自己的 Spring 实现:在消息发送端需要实现的是一个 XXXTemplate 形式的 Java Bean,结合 Spring Boot 的自动化配置选项提供多个不同的发送消息方法;在消息的消费端是一个 XXXMessageListener 接口(实现方式通常会使用一个注解来声明一个消息驱动的 POJO),提供回调方法来监听和消费消息,这个接口同样可以使用 Spring Boot 的自动化选项和一些定制化的属性。在 Apache RocketMQ 生态中,RocketMQ-Spring-Boot-Starter(下文简称 RocketMQ-Spring)就是一个支持 Spring Messaging API 标准的项目。该项目把 RocketMQ 的客户端使用 Spring Boot 的方式进行了封装,可以让用户通过简单的 annotation 和标准的 Spring Messaging API 编写代码来进行消息的发送和消费,也支持扩展出 RocketMQ 原生 API 来支持更加丰富的消息类型。在 RocketMQ-Spring 毕业初期,RocketMQ 社区同学请 Spring 社区的同学对 RocketMQ-Spring 代码进行 review,引出一段罗美琪(RocketMQ)和春波特(Spring Boot)故事的佳话[1],著名 Spring 布道师 Josh Long 向国外同学介绍如何使用 RocketMQ-Spring 收发消息[2]。RocketMQ-Spring 也在短短两年时间超越 Spring-Kafka 和 Spring-AMQP(注:两者均由 Spring 社区维护),成为 Spring Messaging 生态中最活跃的消息项目。2. Spring Cloud StreamSpring Cloud Stream 结合了 Spring Integration 的注解和功能,它的应用模型如下:Spring Cloud Stream 框架中提供一个独立的应用内核,它通过输入(@Input)和输出(@Output)通道与外部世界进行通信,消息源端(Source)通过输入通道发送消息,消费目标端(Sink)通过监听输出通道来获取消费的消息。这些通道通过专用的 Binder 实现与外部代理连接。开发人员的代码只需要针对应用内核提供的固定的接口和注解方式进行编程,而不需要关心运行时具体的 Binder 绑定的消息中间件。在运行时,Spring Cloud Stream 能够自动探测并使用在 classpath 下找到的 Binder。这样开发人员可以轻松地在相同的代码中使用不同类型的中间件:仅仅需要在构建时包含进不同的 Binder。在更加复杂的使用场景中,也可以在应用中打包多个 Binder 并让它自己选择 Binder,甚至在运行时为不同的通道使用不同的 Binder。 Binder 抽象使得 Spring Cloud Stream 应用可以灵活的连接到中间件,加之 Spring Cloud Stream 使用利用了 Spring Boot 的灵活配置配置能力,这样的配置可以通过外部配置的属性和 Spring Boot 支持的任何形式来提供(包括应用启动参数、环境变量和 application.yml 或者 application.properties 文件),部署人员可以在运行时动态选择通道连接 destination(例如,RocketMQ 的 topic 或者 RabbitMQ 的 exchange)。Spring Cloud Stream 屏蔽了底层消息中间件的实现细节,希望以统一的一套 API 来进行消息的发送/消费,底层消息中间件的实现细节由各消息中间件的 Binder 完成。Spring 官方实现了 Rabbit binder 和 Kafka Binder。Spring Cloud Alibaba 实现了 RocketMQ Binder[3],其主要实现原理是把发送消息最终代理给了 RocketMQ-Spring 的 RocketMQTemplate,在消费端则内部会启动 RocketMQ-Spring Consumer Container 来接收消息。以此为基础,Spring Cloud Alibaba 还实现了 Spring Cloud Bus RocketMQ, 用户可以使用 RocketMQ 作为 Spring Cloud 体系内的消息总线,来连接分布式系统的所有节点。通过 Spring Cloud Stream RocketMQ Binder,RocketMQ 可以与 Spring Cloud 生态更好的结合。比如与 Spring Cloud Data Flow、Spring Cloud Funtion 结合,让 RocketMQ 可以在 Spring 流计算生态、Serverless(FaaS) 项目中被使用。如今 Spring Cloud Stream RocketMQ Binder 和 Spring Cloud Bus RocketMQ 做为 Spring Cloud Alibaba 的实现已登陆 Spring 的官网[4],Spring Cloud Alibaba 也成为 Spring Cloud 最活跃的实现。三、如何在 Spring 生态中选择 RocketMQ 实现?通过介绍 Spring 中的消息框架,介绍了以 RocketMQ 为基础与 Spring 消息框架结合的几个项目,主要是 RocketMQ-Spring、Spring Cloud Stream RocketMQ Binder、Spring Cloud Bus RocketMQ、Spring Data Flow 和 Spring Cloud Function。它们之间的关系可以如下图表示。如何在实际业务开发中选择相应项目进行使用?下面分别列出每个项目的特点和使用场景。1. RocketMQ-Spring特点:作为起步依赖,简单引入一个包就能在 Spring 生态用到 RocketMQ 客户端的所有功能。利用了大量自动配置和注解简化了编程模型,并且支持 Spring Messaging API。与 RocketMQ 原生 Java SDK 的功能完全对齐。使用场景:适合在 Spring Boot 中使用 RocketMQ 的用户,希望能用到 RocketMQ 原生 java 客户端的所有功能,并通过 Spring 注解和自动配置简化编程模型。2. Spring Cloud Stream RocketMQ Binder特点:屏蔽底层 MQ 实现细节,上层 Spring Cloud Stream 的 API 是统一的。如果想从 Kafka 切到 RocketMQ,直接改个配置即可。与 Spring Cloud 生态整合更加方便。比如 Spring Cloud Data Flow,这上面的流计算都是基于 Spring Cloud Stream;Spring Cloud Bus 消息总线内部也是用的 Spring Cloud Stream。Spring Cloud Stream 提供的注解,编程体验都是非常棒。使用场景:在代码层面能完全屏蔽底层消息中间件的用户,并且希望能项目能更好的接入 Spring Cloud 生态(Spring Cloud Data Flow、Spring Cloud Funtcion 等)。3. Spring Cloud Bus RocketMQ特点:将 RocketMQ 作为事件的“传输器”,通过发送事件(消息)到消息队列上,从而广播到订阅该事件(消息)的所有节点上,完成事件的分发和通知。使用场景:在 Spring 生态中希望用 RocketMQ 做消息总线的用户,可以用在应用间事件的通信,配置中心客户端刷新等场景。4. Spring Cloud Data Flow特点:以 Source/Processor/Sink 组件进行流式任务处理。RocketMQ 作为流处理过程中的中间存储组件。使用场景:流处理,大数据处理场景。5. Spring Cloud Function特点:消息的消费/生产/处理都是一次函数调用,融合 Java 生态的 Function 模型。使用场景:Serverless 场景。本文整体介绍了在 Spring 生态中接入 RockeMQ 的 5 种方法,让各位开发者对几种经典场景有宏观的了解。后续会有专栏详细介绍上述各个项目的具体使用方法和应用场景,真正地在 Spring 生态中玩转 RocketMQ!在 Spring 生态中玩转 RocketMQ 系列教程现已登陆知行动手实验室,点击这里立即体验!移动端同学,需要在PC端登录 start.aliyun.com 进行体验。相关链接:[1] https://www.infoq.cn/article/G-og5V8x3BK8i4z90y6P[2] https://spring.io/blog/2020/02/25/spring-tips-apache-rocketmq[3] https://github.com/alibaba/spring-cloud-alibaba/wiki/RocketMQ[4] https://spring.io/projects/spring-cloud-alibaba
文章
消息中间件  ·  Java  ·  中间件  ·  Serverless  ·  API  ·  Apache  ·  RocketMQ  ·  数据格式  ·  微服务  ·  Spring
2023-02-02
RocketMQ-Spring 毕业两周年,为什么能成为 Spring 生态中最受欢迎的 messaging 实现?
作者 | RocketMQ 官微来源|阿里巴巴云原生公众号2019 年 1 月,孵化 6 个月的 RocketMQ-Spring 作为 Apache RocketMQ 的子项目正式毕业,发布了第一个 Release 版本 2.0.1。该项目是把 RocketMQ 的客户端使用 Spring Boot 的方式进行了封装,可以让用户通过简单的 annotation 和标准的 Spring Messaging API 编写代码来进行消息的发送和消费。当时 RocketMQ 社区同学请 Spring 社区的同学对 RocketMQ-Spring 代码进行 review,引出一段罗美琪(RocketMQ)和春波特(Spring Boot)的故事。时隔两年,RocketMQ-Spring 正式发布 2.2.0。在这期间,RocketMQ-Spring 迭代了数个版本,以 RocketMQ-Spring 为基础实现的 Spring Cloud Stream RocketMQ Binder、Spring Cloud Bus RocketMQ 登上了 Spring 的官网,Spring 布道师 baeldung 向国外同学介绍如何使用 RocketMQ-Spring,越来越多国内外的同学开始使用 RocketMQ-Spring 收发消息,RocketMQ-Spring 仓库的 star 数也在短短两年时间内超越了 Spring-Kafka 和 Spring-AMQP(注:两者均由 Spring 社区维护),成为 Apache RocketMQ 最受欢迎的生态项目之一。RocketMQ-Spring 的受欢迎一方面得益于支持丰富业务场景的 RocketMQ 与微服务生态 Spring 的完美契合,另一方面也与 RocketMQ-Spring 本身严格遵循 Spring Messaging API 规范,支持丰富的消息类型分不开。遵循 Spring Messaging API 规范Spring Messaging 提供了一套抽象的 API,对消息发送端和消息接收端的模式进行规定,不同的消息中间件提供商可以在这个模式下提供自己的 Spring 实现:在消息发送端需要实现的是一个 XXXTemplate 形式的 Java Bean,结合 Spring Boot 的自动化配置选项提供多个不同的发送消息方法;在消息的消费端是一个 XXXMessageListener 接口(实现方式通常会使用一个注解来声明一个消息驱动的 POJO),提供回调方法来监听和消费消息,这个接口同样可以使用 Spring Boot 的自动化选项和一些定制化的属性。1. 发送端RocketMQ-Spring 在遵循 Spring Messaging API 规范的基础上结合 RocketMQ 自身的功能特点提供了相应的 API。在消息的发送端,RocketMQ-Spring 通过实现 RocketMQTemplate 完成消息的发送。如下图所示,RocketMQTemplate 继承 AbstractMessageSendingTemplate 抽象类,来支持 Spring Messaging API 标准的消息转换和发送方法,这些方法最终会代理给 doSend 方法,doSend 方法会最终调用 syncSend,由 DefaultMQProducer 实现。除 Spring Messaging API 规范中的方法,RocketMQTemplate 还实现了 RocketMQ 原生客户端的一些方法,来支持更加丰富的消息类型。值得注意的是,相比于原生客户端需要自己去构建 RocketMQ Message(比如将对象序列化成 byte 数组放入 Message 对象),RocketMQTemplate 可以直接将对象、字符串或者 byte 数组作为参数发送出去(对象序列化操作由 RocketMQ-Spring 内置完成),在消费端约定好对应的 Schema 即可正常收发。RocketMQTemplate Send API: SendResult syncSend(String destination, Object payload) SendResult syncSend(String destination, Message<?> message) void asyncSend(String destination, Message<?> message, SendCallback sendCallback) void asyncSend(String destination, Message<?> message, SendCallback sendCallback) ……2. 消费端在消费端,需要实现一个包含 @RocketMQMessageListener 注解的类(需要实现 RocketMQListener 接口,并实现 onMessage 方法,在注解中进行 topic、consumerGroup 等属性配置),这个 Listener 会一对一的被放置到 DefaultRocketMQListenerContainer 容器对象中,容器对象会根据消费的方式(并发或顺序),将 RocketMQListener 封装到具体的 RocketMQ 内部的并发或者顺序接口实现。在容器中创建 RocketMQ DefaultPushConsumer 对象,启动并监听定制的 Topic 消息,完成约定 Schema 对象的转换,回调到 Listener 的 onMessage 方法。@Service @RocketMQMessageListener(topic = "${demo.rocketmq.topic}", consumerGroup = "string_consumer", selectorExpression = "${demo.rocketmq.tag}") public class StringConsumer implements RocketMQListener<String> { @Override public void onMessage(String message) { System.out.printf("------- StringConsumer received: %s \n", message); } }除此 Push 接口之外,在最新的 2.2.0 版本中,RocketMQ-Spring 实现了 RocketMQ Lite Pull Consumer。通过在配置文件中进行 consumer 的配置,利用 RocketMQTemplate 的 Recevie 方法即可主动 Pull 消息。配置文件resource/application.properties: rocketmq.name-server=localhost:9876 rocketmq.consumer.group=my-group1 rocketmq.consumer.topic=test Pull Consumer代码: while(!isStop) { List<String> messages = rocketMQTemplate.receive(String.class); System.out.println(messages); }丰富的消息类型RocketMQ Spring 消息类型支持方面与 RocketMQ 原生客户端完全对齐,包括同步/异步/one-way、顺序、延迟、批量、事务以及 Request-Reply 消息。在这里,主要介绍较为特殊的事务消息和 request-reply 消息。1. 事务消息RocketMQ 的事务消息不同于 Spring Messaging 中的事务消息,依然采用 RocketMQ 原生事务消息的方案。如下所示,发送事务消息时需要实现一个包含 @RocketMQTransactionListener 注解的类,并实现 executeLocalTransaction 和 checkLocalTransaction 方法,从而来完成执行本地事务以及检查本地事务执行结果。// Build a SpringMessage for sending in transaction Message msg = MessageBuilder.withPayload(..)...; // In sendMessageInTransaction(), the first parameter transaction name ("test") // must be same with the @RocketMQTransactionListener's member field 'transName' rocketMQTemplate.sendMessageInTransaction("test-topic", msg, null); // Define transaction listener with the annotation @RocketMQTransactionListener @RocketMQTransactionListener class TransactionListenerImpl implements RocketMQLocalTransactionListener { @Override public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) { // ... local transaction process, return bollback, commit or unknown return RocketMQLocalTransactionState.UNKNOWN; } @Override public RocketMQLocalTransactionState checkLocalTransaction(Message msg) { // ... check transaction status and return bollback, commit or unknown return RocketMQLocalTransactionState.COMMIT; } }在 2.1.0 版本中,RocketMQ-Spring 重构了事务消息的实现,如下图所示,旧版本中每一个 group 对应一个 TransactionProducer,而在新版本中改为每一个 RocketMQTemplate 对应一个 TransationProducer,从而解决了并发使用多个事务消息的问题。当用户需要在单进程使用多个事务消息时,可以使用 ExtRocketMQTemplate 来完成(一般情况下,推荐一个进程使用一个 RocketMQTemplate,ExtRocketMQTemplate 可以使用在同进程中需要使用多个 Producer / LitePullConsumer 的场景,可以为 ExtRocketMQTemplate 指定与标准模版 RocketMQTemplate 不同的 nameserver、group 等配置),并在对应的 RocketMQTransactionListener 注解中指定 rocketMQTemplateBeanName 为 ExtRocketMQTemplate 的 BeanName。2. Request-Reply 消息在 2.1.0 版本中,RocketMQ-Spring 开始支持 Request-Reply 消息。Request-Reply 消息指的是上游服务投递消息后进入等待被通知的状态,直到消费端返回结果并返回给发送端。在 RocketMQ-Spring 中,发送端通过 RocketMQTemplate 的 sendAndReceivce 方法进行发送,如下所示,主要有同步和异步两种方式。异步方式中通过实现 RocketMQLocalRequestCallback 进行回调。// 同步发送request并且等待String类型的返回值 String replyString = rocketMQTemplate.sendAndReceive("stringRequestTopic", "request string", String.class); // 异步发送request并且等待User类型的返回值 rocketMQTemplate.sendAndReceive("objectRequestTopic", new User("requestUserName",(byte) 9), new RocketMQLocalRequestCallback<User>() { @Override public void onSuccess(User message) { …… } @Override public void onException(Throwable e) { …… } });在消费端,仍然需要实现一个包含 @RocketMQMessageListener 注解的类,但需要实现的接口是 RocketMQReplyListener<T, R> 接口(普通消息为 RocketMQListener 接口),其中 T 表示接收值的类型,R 表示返回值的类型,接口需要实现带返回值的 onMessage 方法,返回值的内容返回给对应的 Producer。@Service @RocketMQMessageListener(topic = "stringRequestTopic", consumerGroup = "stringRequestConsumer") public class StringConsumerWithReplyString implements RocketMQReplyListener<String, String> { @Override public String onMessage(String message) { …… return "reply string"; } }RocketMQ-Spring 遵循 Spring 约定大于配置(Convention over configuration)的理念,通过启动器(Spring Boot Starter)的方式,在 pom 文件引入依赖(groupId:org.apache.rocketmq,artifactId:rocketmq-spring-boot-starter)便可以在 Spring Boot 中集成所有 RocketMQ 客户端的所有功能,通过简单的注解使用即可完成消息的收发。在 RocketMQ-Spring Github Wiki 中有更加详细的用法和常见问题解答。据统计,从 RocketMQ-Spring 发布第一个正式版本以来,RocketMQ-Spring 完成 16 个 bug 修复,37 个 imporvement,其中包括事务消息重构,消息过滤、消息序列化、多实例 RocketMQTemplate 优化等重要优化,欢迎更多的小伙伴能参与到 RocketMQ 社区的建设中来,罗美琪(RocketMQ)和春波特(Spring Boot)的故事还在继续...钉钉搜索群号:21982288,即可进群和众多开发者交流!
文章
消息中间件  ·  Cloud Native  ·  Java  ·  API  ·  Apache  ·  RocketMQ  ·  开发者  ·  微服务  ·  Spring  ·  容器
2023-02-02
再见 2020!Apache RocketMQ 发布 4.8.0,DLedger 模式全面提升!
作者 | RocketMQ社区来源|阿里巴巴云原生公众号“童年的雨天最是泥泞,却是记忆里最干净的曾经。凛冬散尽,星河长明,新的一年,万事顺遂,再见,2020!”走过这个岁末,万众期待的 Apache RocketMQ 4.8.0 终于发布了,在这个版本中社区对 RocketMQ 完成大量的优化和问题修复。更重要的是,该版本从性能、稳定性、功能三个方面大幅度提升 DLedger 模式能力。DLedger 是 OpenMessaging 中一个基于 Raft 的 CommitLog 存储库实现,从 RocketMQ 4.5.0 版本开始,RocketMQ 引入 DLedger 模式来解决了 Broker 组内自动故障转移的问题,而在 4.8.0 版本中社区也对 RocketMQ DLedger 模式进行了全面升级。性能升级异步化 pipeline 模式RocketMQ 4.7.0 重新升级了同步双写的架构,利用异步化 pipeline 模式大幅提升了同步双写的性能。在 RocketMQ 4.8.0 中,社区将这一改进应用到 DLedger 模式中, 下图展示了 DLedger 模式下 broker 处理发送消息的过程。在原本的架构中, SendMessageProcessor 线程对每一个消息的处理,都需要等待多数派复制成功确认,才会返回给客户端,而在新版本中,利用 CompletableFuture 对线程处理消息的过程进行异步化改造,不再等待多数派的确认即可对下一个请求进行处理,Ack 操作由其他线程确认之后再进行结果处理并返回给客户端。通过对复制过程进行切分并将其流水线化,减少线程的长时间等待,充分利用 CPU,从而大幅提高吞吐量。批量日志复制Batch 一直是性能优化的重要方式,在新版本中,可以通过设置 isEnableBatchPush=true 来开启 DLedger 模式的批量复制。通过将多条数据聚合在一个包中进行发送,可以降低收发包的个数,从而降低系统调用和上下文的切换。在数据发送压力比较大,并且可能达到系统收发包瓶颈的情况下,批量复制能显著提高吞吐量。值得注意的是,DLedger 模式下的批量复制并不会对单个包进行延时的攒批处理,因此不会影响单个消息的发送时延。除了上述的性能优化,社区还对 DLedger 模式下影响性能的锁、缓存等做了数项性能优化,使 DLedger 模式下的性能提升数倍。稳定性升级为了验证和测试 Dledger 模式的可靠性,除了本地对 DLedger 模式进行了各种各样的测试,社区利用 OpenMessaging-Chaos 框架对 RocketMQ DLedger 模式进行了大量 Chaos 测试。OpenMessaging-Chaos 是一个利用故障注入来验证各种消息平台一致性和高可用性的测试框架,在 OpenMessaging-Chaos 的测试中,客户端并发地向待测试集群发送和接收消息,中间会间隔性地对集群进行故障注入,最后给出测试结果,包括是否丢消息,是否有重复消息,集群平均故障恢复时间等。利用 OpenMessaging-Chaos,我们验证了 DLedger 模式在以下故障注入场景下的表现:random-partition(fixed-partition)故障随机挑选节点进行网络隔离,模拟常见的对称网络分区。random-loss 故障随机挑选节点并对这些节点接收和发送的网络包进行按比例丢弃,模拟一些节点网络较差的情况。random-kill(minor-kill、major-kill、fixed-kill)故障模拟常见的进程崩溃情况。random-suspend(minor-suspend、major-suspend、fixed-suspend)故障模拟一些慢节点的情况,比如发生 Full GC、OOM 等。bridge 和 partition-majorities-ring 故障模拟比较极端的非对称网络分区。以 minor-kill 故障注入为例,我们部署 5 个节点组成一组 DLedger 模式的 RocketMQ broker 进行 Chaos 测试。minor-kill 故障注入会随机挑选集群中少数节点进程杀死,由于杀死少数节点,即使集群不可用也能在一段时间内恢复,方便测试集群平均故障恢复时间。测试过程中我们设置四个客户端并发向 RocketMQ DLedger 集群发送和接收消息,故障注入时间间隔为 100s,即 100s 正常运行,100s 故障注入,一直循环,整个阶段一共持续 2400s。测试结束后,OpenMessaging-Chaos 会给出测试结果和时延图。下图展示了整个测试过程中入队操作的时延情况。图中纵坐标是是时延,横坐标是测试时间,绿色框表示数据发送成功,红色框表示数据发送失败,蓝色框表示不确定是否数据添加成功,灰色部分表示故障注入的时间段。可以看出一些故障注入时间段造成了集群短暂的不可用,一些故障时间段则没有,这是合理的。由于是随机网络分区,所以只有杀死的少数节点包含 leader 节点时才会造成集群重新选举,但即使造成集群重新选举, DLedger 模式在一段时间后也会恢复可用性。下图是测试结束后 OpenMessaging-Chaos 给出的统计结果,可以看到一共成功发送了 11W 个数据,没有数据丢失,这表明即使在故障下,RocketMQ DLedger 模式仍旧满足 At Least Once 的投递语义。此外,RTOTestResult 表明 12 次故障时间段一共发生了 3 次集群不可用的情况(与上述时延图一致),但每次集群都能在 30 秒以内恢复,平均故障恢复时间在 22 秒左右。在 OpenMessaging Chaos 测试过程中,我们发现了 DLedger 模式存在的数个隐性问题并进行了修复,提高了 DLedger 模式下对进程异常崩溃、慢节点、对称/非对称网络分区、网络丢包的容错能力,也进一步验证了 DLedger 模式的可靠性。功能升级DLedger 模式支持 Preferred Leader在旧版本中一组 Broker 中选谁作为 Leader 完全是随机的。但是在新版中我们可以通过配置 preferredLeaderId 来指定优先选举某个节点为 Leader,如下图所示,通过在三个机房部署 DLedger 模式的 broker 组,利用 preferred leader 可以更好的实现机房对齐的功能,图中 DC1 中服务更好,我们可以优先选择在 DC1 中部署 Master。此外,利用 preferred leader 还可以更好实现 DLedger 集群混部,从而充分利用机器资源。DLedger 模式支持批量消息从 RocketMQ 4.8.0 开始,DLedger 模式支持批量消息发送,从而在消息类型的支持上完全与 Master-Slave 部署形态保持一致。除了对 DLedger 模式的大量优化,本次 RocketMQ 版本一共包含 Improvement 25 个,Bug Fix 11 个,文档和代码格式优化 16 个。据不完全统计,这些贡献来自近 40 位 RocketMQ 社区的 Contributor,感谢大家的积极参与。也非常期待有更多的用户、厂商、开发者参与到 RocketMQ 建设中来,加入 Apache RocketMQ 社区,永远不会太迟!
文章
消息中间件  ·  存储  ·  缓存  ·  Cloud Native  ·  Java  ·  Apache  ·  RocketMQ  ·  开发者  ·  混合部署
2023-02-02
Kratos微服务框架实现IoT功能:设备实时地图
Kratos微服务框架实现IoT功能:设备实时地图IoT,也就是物联网,万物互联,在未来肯定是一个热点——实际上,现在物联网已经很热了。那好,既然这一块这么有前途。那我们就来学习怎么开发物联网系统吧。可是,作为一个小白,两眼一抹黑:我想学,可是我该如何开始?这玩意儿到底该咋整呢?于是,我各种找资料,各种学习——此处省略一亿个字,其中的艰辛,其中的曲折,总之就是:说来都是泪,欲哭却无声——总算是有了基础的认知,有了一个模糊的方向。我知道了物联网设备通讯协议MQTT、CoAP、LwM2M,知道了微服务,知道了MQ,知道了Websocket,知道了REST,知道了gRPC……有了这些认知,看起来可以开始做技术选型了。在这个时候,我发现了B站开源的微服务框架go-kratos。那么,Kratos能否实现物联网的系统和功能呢?答案是:必须可以。我们现在要开发一个物联网的系统,Kratos能够为我们提供什么技术支撑呢?有以下功能模块可供使用:MQTT,用于设备与物联网服务之间的同异步通讯;gRPC,用于微服务之间的同步通讯;MQ消息队列(RabbitMQ、Kafka、Pulsar、NATS、RocketMQ等),用于微服务之间的异步通讯;REST(基于gRPC gateway),用于后端跟前端的同步通讯;Websocket,用于后端跟前端的异步通讯。物联网一个最基础的功能就是实时地图了,也就是在地图上展现设备的动态,比如:位置、轨迹、方向……在我查找资料的时候,发现了一个实时地图的示例程序 realtimemap-go,它是Actor模型框架 Proto.Actor 的展示程序。该示例程序显示的是芬兰首都赫尔辛基公共交通车辆的实时位置。Proto.Actor,它是一种用于 Go、C# 和 Java/Kotlin 的超快速分布式 Actor 解决方案。你可能会问,那为什么不用它来进行开发?因为,它实现起来太复杂了,维护起来就更加复杂。如果你用过Erlang编程语言,那么你就能够深深体会到当中的困难。Proto.Actor该示例有一个在线演示:https://realtimemap.skyrise.cloud/该示例程序有以下特性:车辆的实时位置;车辆的轨迹;地理围栏通知(车辆进出该地理区域);每个公交公司在地理围栏区域的车辆;水平缩放。本文基于此示例程序,在Kratos下面重新实现了一遍。先决条件KratosVue.jsMQTTWebsocketgRPCRESTful示例程序的后端基于Kratos开发,需要有一定的Kratos的基础。前端基于Vue3和Typescript进行开发,需要有一定的相关基础。它是如何工作的?设备使用MQTT通讯协议将数据推送给服务端;服务端使用REST和Websocket将设备数据推送给前端。服务端基于Kratos框架进行开发,为了简便演示,本示例只有一个单体服务,实际运用时,拆分服务也是容易的。服务端接收MQTT数据数据源由于这个应用程序是关于跟踪车辆的,我们需要从某个地方获取它们的位置。在此应用程序中,位置是从赫尔辛基地区交通局的高频车辆定位 MQTT 代理接收的。有关数据的更多信息:赫尔辛基地区交通 - 开放数据。赫尔辛基地区交通局的高频定位。此数据已根据 © Helsinki Region Transport 2021、Creative Commons BY 4.0 International 获得许可Topic定义如下:0/1 /2 /3 /4 /5 /6 /7 /8 /9 /10 /11 /12 /13 /14 /15 /16 /<prefix>/<version>/<journey_type>/<temporal_type>/<event_type>/<transport_mode>/<operator_id>/<vehicle_number>/<route_id>/<direction_id>/<headsign>/<start_time>/<next_stop>/<geohash_level>/<geohash>/<sid>/#type Topic struct { Prefix string // /hfp/ is the root of the topic tree. Version string // v2 is the current version of the HFP topic and the payload format. JourneyType string // The type of the journey. Either journey, deadrun or signoff. TemporalType string // The status of the journey, ongoing or upcoming. EventType string // One of vp, due, arr, dep, ars, pde, pas, wait, doo, doc, tlr, tla, da, dout, ba, bout, vja, vjout. TransportMode string // The type of the vehicle. One of bus, tram, train, ferry, metro, ubus (used by U-line buses and other vehicles with limited realtime information) or robot (used by robot buses). // operator_id/vehicle_number uniquely identifies the vehicle. OperatorId string // The unique ID of the operator that owns the vehicle. VehicleNumber string // The vehicle number that can be seen painted on the side of the vehicle, often next to the front door. Different operators may use overlapping vehicle numbers. RouteId string // The ID of the route the vehicle is running on. DirectionId string // The line direction of the trip, either 1 or 2. Headsign string // The destination name, e.g. Aviapolis. StartTime string // The scheduled start time of the trip NextStop string // The ID of next stop or station. GeohashLevel string // The geohash level represents the magnitude of change in the GPS coordinates since the previous message from the same vehicle. Geohash string // The latitude and the longitude of the vehicle. Sid string // Junction ID, corresponds to sid in the payload. }载体数据结构定义如下:package hfp type Payload struct { Longitude *float64 `json:"long"` // 经度(WGS84) Latitude *float64 `json:"lat"` // 纬度(WGS84) Heading *int32 `json:"hdg"` // 朝向角度[0, 360] DoorState *int32 `json:"drst"` // 门状态 0:所有门都已关闭 1:有门打开 Timestamp *time.Time `json:"tst"` // 时间戳 Speed *float64 `json:"spd"` // 车速(m/s) Odometer *int32 `json:"odo"` // 里程(m) } type Event struct { VehicleId string // 车辆ID OperatorId string // 司机ID VehiclePosition *Payload `json:"VP"` // 坐标 DoorOpen *Payload `json:"DOO"` // 开门 DoorClosed *Payload `json:"DOC"` // 关门 }需要注意的是,我测试时发现,MQTT接收数据时只要接收一段时间就自动断开了,一开始我还以为是我这边出问题了,后来做了一些测试才发现,是对方限制了使用,应该是测试账号的ClientID只允许接收一定时长的数据。编写代码首先创建MQTT服务端,它本质上是一个MQTT的客户端,它具有全双工、双向的数据流,所以实现为服务端也并无问题。package server import ( "context" "github.com/go-kratos/kratos/v2/log" "github.com/tx7do/kratos-transport/transport/mqtt" "kratos-realtimemap/app/admin/internal/conf" "kratos-realtimemap/app/admin/internal/service" ) // NewMQTTServer create a mqtt server. func NewMQTTServer(c *conf.Server, _ log.Logger, svc *service.AdminService) *mqtt.Server { ctx := context.Background() srv := mqtt.NewServer( mqtt.WithAddress([]string{c.Mqtt.Addr}), mqtt.WithCodec("json"), ) _ = srv.RegisterSubscriber(ctx, "/hfp/v2/journey/ongoing/vp/bus/#", registerSensorDataHandler(svc.TransitPostTelemetry), hfpEventCreator, ) svc.SetMqttBroker(srv) return srv }以上代码创建了一个MQTT的服务器,使用JSON编解码器进行编解码,监听了Topic为/hfp/v2/journey/ongoing/vp/bus/#的MQTT推送消息。接着实现服务,对设备通过MQTT推送的消息进行处理:package service import ( "context" "github.com/tx7do/kratos-transport/broker" "kratos-realtimemap/api/hfp" "kratos-realtimemap/app/admin/internal/pkg/data" ) func (s *RealtimeMapService) SetMqttBroker(b broker.Broker) { s.mb = b } func (s *RealtimeMapService) TransitPostTelemetry(_ context.Context, topic string, headers broker.Headers, msg *hfp.Event) error { //fmt.Println("Topic: ", topic) topicInfo := hfp.Topic{} topicInfo.Parse(topic) msg.OperatorId = topicInfo.OperatorId msg.VehicleId = topicInfo.GetVehicleUID() position := msg.MapToPosition() if position != nil { s.positionHistory.Update(position) turnovers := data.AllOrganizations.Update(position) s.BroadcastVehicleTurnoverNotification(turnovers) s.BroadcastVehiclePosition(s.positionHistory.GetPositionsHistory(position.VehicleId)) } s.log.Infof("事件类型: %s 交通工具类型: %s 司机ID: %s 车辆ID: %s", topicInfo.EventType, topicInfo.TransportMode, topicInfo.OperatorId, msg.VehicleId) return nil }以上代码对Topic和载体数据进行了解析,将设备状态存入内存当中,旋即把状态通过Websocket广播给前端。好了,我们对MQTT的处理就完成了。处理MQTT的课结束,下课!嗯?这就完了?这么简单?没错,就这么点代码,就这么的容易,我也想多叨叨几句,扩充点篇幅,只可惜,它确实就是这么容易就搞定了。服务端推送数据到前端服务端与前端的通讯主要靠REST和Websocket来实现。那些更新频率不高,实时性要求也不高的数据都可以走REST,由前端主动拉取。而实时性和更新频率都比较高的数据则可以通过Websocket由服务端主动推送。数据结构别看设备与服务端的通讯很简单,但是,服务端到前端的数据就复杂多了。有以下数据:Organization(组织),指的是汽车的所属公司。Geofence(地理围栏),它是地图上的一个几何区域,用于标定汽车的停车场或者运营区域,出入都将会发送一个通知给前端。Position(汽车坐标),它是汽车的一个坐标点,包含了汽车在该点上的状态,比如:开关门,速度,朝向等。Viewport(视口),它是地图上的一个裁剪矩形,浅显的描述就是你在前端看到的地图区域,前端只接收该视口之内的汽车数据,否则服务器会向前端发送系统所有的汽车数据,不论服务器还是网络都将会吃不消。Notification(通知),服务端通知前端一些事件,主要是:汽车进出地理围栏的事件,汽车上线下线通知。其中,Position和Notification都是通过Websocket推送给前端,其他数据则是前端通过REST主动拉取。以上数据结构通过Protobuf定义:syntax = "proto3"; // 地理点 message GeoPoint { double longitude = 1;// 经度(WGS84) double latitude = 2;// 纬度(WGS84) } // 组织 message Organization { string id = 1;// 组织ID string name = 2;// 组织名称 } // 地理围栏 message Geofence { string name = 1;// 围栏名称 double radius_in_meters = 2;// 半径长度(圆形地理围栏) double longitude = 3;// 经度(WGS84) double latitude = 4;// 纬度(WGS84) string org_id = 5;// 组织ID repeated string vehicles_in_zone = 6;// 区域内所有的车辆 } // 车辆坐标 message Position { string vehicle_id = 1;// 车辆ID string org_id = 2;// 组织ID int64 timestamp = 3;// 时间戳 double longitude = 4;// 经度(WGS84) double latitude = 5;// 纬度(WGS84) int32 heading = 6;// 朝向角度[0, 360] bool doors_open = 7;// 门状态 0:所有门都已关闭 1:有门打开 double speed = 8;// 车速(m/s) } // 视口 message Viewport { GeoPoint south_west = 1;// 西南点(左下点) GeoPoint north_east = 2;// 东北点(右上点) } // 通知 message Notification { string message = 1;// 通知内容 }REST像拉取组织列表、获取某一个组织的详情、获取某一车辆的行车轨迹,都属于低频的操作,所以都走REST。REST的功能是通过gRPC的gateway实现的,所以我们可以通过protobuf来定义API:syntax = "proto3"; // 实时地图服务 service RealtimeMapService { // 获取组织列表 rpc ListOrganizations (google.protobuf.Empty) returns (ListOrganizationsReply) { option (google.api.http) = { get: "/api/organizations" }; } // 获取组织详情 rpc GetOrganization (GetOrganizationReq) returns (GetOrganizationReply) { option (google.api.http) = { get: "/api/organizations/{org_id}" }; } // 获取车辆轨迹 rpc GetVehicleTrail (GetVehicleTrailReq) returns (GetVehicleTrailReply) { option (google.api.http) = { get: "/api/trail/{id}" }; } }下面就可以创建REST服务器了:package server // NewMiddleware 创建中间件 func NewMiddleware(ac *conf.Auth, logger log.Logger) http.ServerOption { return http.Middleware( recovery.Recovery(), tracing.Server(), logging.Server(logger), ) } // NewHTTPServer new an HTTP server. func NewHTTPServer(c *conf.Server, ac *conf.Auth, logger log.Logger, s *service.RealtimeMapService) *http.Server { var opts = []http.ServerOption{ NewMiddleware(ac, logger), http.Filter(handlers.CORS( handlers.AllowedHeaders([]string{"" + "", "Content-Type", "Authorization"}), handlers.AllowedMethods([]string{"GET", "POST", "PUT", "DELETE", "HEAD", "OPTIONS"}), handlers.AllowedOrigins([]string{"*"}), )), } if c.Http.Network != "" { opts = append(opts, http.Network(c.Http.Network)) } if c.Http.Addr != "" { opts = append(opts, http.Address(c.Http.Addr)) } if c.Http.Timeout != nil { opts = append(opts, http.Timeout(c.Http.Timeout.AsDuration())) } srv := http.NewServer(opts...) h := openapiv2.NewHandler() srv.HandlePrefix("/q/", h) v1.RegisterRealtimeMapServiceHTTPServer(srv, s) return srv }其服务很简单,也就是一些非常简单的内存数据查询:package service func (s *RealtimeMapService) ListOrganizations(_ context.Context, _ *emptypb.Empty) (*v1.ListOrganizationsReply, error) { reply := &v1.ListOrganizationsReply{ Organizations: data.AllOrganizations.MapToBaseInfoArray(), } return reply, nil } func (s *RealtimeMapService) GetOrganization(_ context.Context, req *v1.GetOrganizationReq) (*v1.GetOrganizationReply, error) { if org, ok := data.AllOrganizations[req.OrgId]; ok { return &v1.GetOrganizationReply{ Id: org.Id, Name: org.Name, Geofences: org.MapToGeofenceArray(), }, nil } else { return nil, v1.ErrorResourceNotFound(fmt.Sprintf("Organization %s not found", req.OrgId)) } } func (s *RealtimeMapService) GetVehicleTrail(_ context.Context, req *v1.GetVehicleTrailReq) (*v1.GetVehicleTrailReply, error) { his := s.positionHistory.GetVehicleTrail(req.Id) if his == nil { return nil, v1.ErrorResourceNotFound(fmt.Sprintf("%s positions history not found", req.Id)) } return &v1.GetVehicleTrailReply{Positions: his}, nil }WebsocketWebsocket适合需要服务端主动推送消息的应用场景之下。REST肯定是做不到的,长轮询的效率之低下,令人发指。在Kratos下创建一个Websocket的服务器是容易的,只需要以下代码即可实现:package server import ( "github.com/go-kratos/kratos/v2/log" "github.com/tx7do/kratos-transport/transport/websocket" "kratos-realtimemap/app/admin/internal/conf" "kratos-realtimemap/app/admin/internal/service" ) // NewWebsocketServer create a websocket server. func NewWebsocketServer(c *conf.Server, _ log.Logger, svc *service.RealtimeMapService) *websocket.Server { srv := websocket.NewServer( websocket.WithAddress(c.Websocket.Addr), websocket.WithPath(c.Websocket.Path), websocket.WithConnectHandle(svc.OnWebsocketConnect), websocket.WithCodec("json"), ) svc.SetWebsocketServer(srv) return srv }向前端推送消息,我简单处理了,调用Broadcast方法直接广播全部前端了:func (s *RealtimeMapService) BroadcastToWebsocketClient(eventId string, payload interface{}) { if payload == nil { return } bufPayload, _ := json.Marshal(&payload) var proto v1.WebsocketProto proto.EventId = eventId proto.Payload = string(bufPayload) bufProto, _ := json.Marshal(&proto) var msg websocket.Message msg.Body = bufProto s.ws.Broadcast(websocket.MessageType(v1.MessageType_Notify), &msg) }只有两个推送:BroadcastVehiclePosition方法是推送车辆的位置信息的:func (s *RealtimeMapService) BroadcastVehiclePosition(positions data.PositionArray) { s.BroadcastToWebsocketClient("positions", positions) }BroadcastVehicleTurnoverNotification是推送车辆进出物理围栏通知的:func (s *RealtimeMapService) BroadcastVehicleTurnoverNotification(turnovers data.TurnoverArray) { for _, turnover := range turnovers { var str string if turnover.Status { str = fmt.Sprintf("%s from %s entered the zone %s", turnover.VehicleId, turnover.OrganizationName, turnover.GeofenceName) } else { str = fmt.Sprintf("%s from %s left the zone %s", turnover.VehicleId, turnover.OrganizationName, turnover.GeofenceName) } s.BroadcastToWebsocketClient("notification", str) } }在程序里面,我们只处理了一个前端推送的消息,是前端视口改变的更新消息:func (s *RealtimeMapService) OnWebsocketMessage(sessionId websocket.SessionID, message *websocket.Message) error { s.log.Infof("[%s] Payload: %s\n", sessionId, string(message.Body)) var proto v1.WebsocketProto if err := json.Unmarshal(message.Body, &proto); err != nil { s.log.Error("Error unmarshalling proto json %v", err) return nil } switch proto.EventId { case "viewport": var msg v1.Viewport if err := json.Unmarshal([]byte(proto.Payload), &msg); err != nil { s.log.Error("Error unmarshalling payload json %v", err) return nil } _ = s.OnWsSetViewport(sessionId, &msg) } return nil } func (s *RealtimeMapService) OnWsSetViewport(sessionId websocket.SessionID, msg *v1.Viewport) error { s.viewports[sessionId] = msg return nil }到这里,服务端基本上就实现了。虽然还很粗糙,但是该有的功能是实现了。实现前端前端基于Vue.js和Typescript开发。REST客户端REST客户端基于axios封装而成:import axios, { AxiosInstance, AxiosRequestConfig } from 'axios'; import {deepMerge} from '@/util'; export interface CreateAxiosOptions extends AxiosRequestConfig { authenticationScheme?: string; } export class VAxios { private axiosInstance: AxiosInstance; private readonly options: CreateAxiosOptions; constructor(options: CreateAxiosOptions) { this.options = options; this.axiosInstance = axios.create(options); } private createAxios(config: CreateAxiosOptions): void { this.axiosInstance = axios.create(config); } getAxios(): AxiosInstance { return this.axiosInstance; } configAxios(config: CreateAxiosOptions) { if (!this.axiosInstance) { return; } this.createAxios(config); } setHeader(headers: any): void { if (!this.axiosInstance) { return; } Object.assign(this.axiosInstance.defaults.headers, headers); } get<T = any>(url: string): Promise<T> { return this.axiosInstance.get(url); } } function createAxios(opt?: Partial<CreateAxiosOptions>) { return new VAxios( deepMerge( { authenticationScheme: '', withCredentials: false, timeout: 10 * 1000, baseURL: process.env.VUE_APP_API_URL || 'http://localhost:8800/api/', headers: { 'Content-Type': 'application/json;charset=UTF-8', }, // 配置项,下面的选项都可以在独立的接口请求中覆盖 requestOptions: { // 默认将prefix 添加到url joinPrefix: true, // 是否返回原生响应头 比如:需要获取响应头时使用该属性 isReturnNativeResponse: false, // 需要对返回数据进行处理 isTransformResponse: true, // post请求的时候添加参数到url joinParamsToUrl: false, // 格式化提交参数时间 formatDate: true, // 是否加入时间戳 joinTime: true, // 忽略重复请求 ignoreCancelToken: true, // 是否携带token withToken: true, }, }, opt || {}, ), ); } export const apiInstance = createAxios();Websocket客户端Websocket基于WebSocket类开发:export interface PositionsDto { positions: PositionDto[]; } export interface PositionDto { vehicle_id: string; longitude: number; latitude: number; heading: number; speed: number; doors_open: boolean; } export interface WebsocketProto { event_id: string; payload: string; } export interface GeoPoint { longitude: number; latitude: number; } export interface Viewport { southWest: GeoPoint; northEast: GeoPoint; } export interface UpdateViewport { viewport: Viewport; } export interface Notification { message: string; } export interface HubConnection { setViewport(swLng: number, swLat: number, neLng: number, neLat: number); onPositions(callback: (positions: PositionDto[]) => void); onNotification(callback: (notification: string) => void); disconnect(): Promise<void>; } function ByteBufferToObject(buff) { const enc = new TextDecoder('utf-8'); const uint8Array = new Uint8Array(buff); const decodedString = enc.decode(uint8Array); // console.log(decodedString); return JSON.parse(decodedString); } function StringToArrayBuffer(str) { return new TextEncoder().encode(str); } class WebsocketConnect implements HubConnection { private connection: WebSocket; private onPositionsCallback?: (positions: PositionDto[]) => void; private onNotificationCallback?: (notification: string) => void; constructor() { const wsURL = `ws://localhost:7700/`; this.connection = new WebSocket(wsURL); this.connection.binaryType = 'arraybuffer'; this.connection.onopen = this.onWebsocketOpen.bind(this); this.connection.onerror = this.onWebsocketError.bind(this); this.connection.onmessage = this.onWebsocketMessage.bind(this); this.connection.onclose = this.onWebsocketClose.bind(this); } onWebsocketOpen(event) { console.log('ws连接成功', event); } onWebsocketError(event) { console.error('ws错误', event); } onWebsocketMessage(event) { const proto = ByteBufferToObject(event.data); // console.log(proto); const data = JSON.parse(proto['payload']); // console.log(data); const eventId = proto['event_id']; if (eventId == 'positions') { if (this.onPositionsCallback != null) { this.onPositionsCallback(data); } } else if (eventId == 'notification') { if (this.onNotificationCallback != null) { this.onNotificationCallback(data); } } } onWebsocketClose(event) { console.log('ws连接关闭', event); } sendMessage(eventId, data) { const x: WebsocketProto = { event_id: eventId, payload: JSON.stringify(data), }; const str = JSON.stringify(x); // console.log(str); this.connection.send(StringToArrayBuffer(str)); } setViewport(swLng: number, swLat: number, neLng: number, neLat: number) { const x: Viewport = { southWest: { longitude: swLng, latitude: swLat, }, northEast: { longitude: neLng, latitude: neLat, }, }; this.sendMessage('viewport', x); } onPositions(callback: (positions: PositionDto[]) => void) { this.onPositionsCallback = callback; } onNotification(callback: (notification: string) => void) { this.onNotificationCallback = callback; } async disconnect() { await this.connection.close(1000); } } export const connectToHub = new WebsocketConnect;地图客户端地图是使用的Mapbox开发的,这一块是直接从realtimemap-go中拷贝出来的。本来是想自己基于高德或者百度地图重新做一个,但是基于坐标系的考虑,就没有采用高德或者百度地图来开发了。要使用Mapbox,首先需要去 Mapbox 注册一个账号。然后在mapboxConfig.ts当中把你自己账号的AccessToken填写到mapboxAccessToken常量。项目代码GithubGiteeKratos 官方示例参考资料GTFS Realtime ReferenceHigh-frequency positioningrealtimemap-go
文章
消息中间件  ·  前端开发  ·  JavaScript  ·  物联网  ·  定位技术  ·  Go  ·  C#  ·  网络架构  ·  RocketMQ  ·  微服务
2023-02-02
rocketMQ配置了消费者,生产者发送成功了,但是无法消费
今天在开发环境测试了下功能,早上10点的时候发送了个消息消费成功了,但是下午的时候,发现没有消费,就去控制台查看了下,显示group在线,但是在topic下的消息查询中却没有group消费
问答
消息中间件  ·  RocketMQ
2023-02-01
消息中间件MQ知识点
为什么使用MQ?MQ的优点简答异步处理 - 相比于传统的串行、并行方式,提高了系统吞吐量。应用解耦 - 系统间通过消息通信,不用关心其他系统的处理。流量削锋 - 可以通过消息队列长度控制请求量;可以缓解短时间内的高并发请求。日志处理 - 解决大量日志传输。消息通讯 - 消息队列一般都内置了高效的通信机制,因此也可以用在纯的消息通讯。比如实现点对点消息队列,或者聊天室等。详答主要是:解耦、异步、削峰。**解耦:**A 系统发送数据到 BCD 三个系统,通过接口调用发送。如果 E 系统也要这个数据呢?那如果 C 系统现在不需要了呢?A 系统负责人几乎崩溃…A 系统跟其它各种乱七八糟的系统严重耦合,A 系统产生一条比较关键的数据,很多系统都需要 A 系统将这个数据发送过来。如果使用 MQ,A 系统产生一条数据,发送到 MQ 里面去,哪个系统需要数据自己去 MQ 里面消费。如果新系统需要数据,直接从 MQ 里消费即可;如果某个系统不需要这条数据了,就取消对 MQ 消息的消费即可。这样下来,A 系统压根儿不需要去考虑要给谁发送数据,不需要维护这个代码,也不需要考虑人家是否调用成功、失败超时等情况。就是一个系统或者一个模块,调用了多个系统或者模块,互相之间的调用很复杂,维护起来很麻烦。但是其实这个调用是不需要直接同步调用接口的,如果用 MQ 给它异步化解耦。**异步:**A 系统接收一个请求,需要在自己本地写库,还需要在 BCD 三个系统写库,自己本地写库要 3ms,BCD 三个系统分别写库要 300ms、450ms、200ms。最终请求总延时是 3 + 300 + 450 + 200 = 953ms,接近 1s,用户感觉搞个什么东西,慢死了慢死了。用户通过浏览器发起请求。如果使用 MQ,那么 A 系统连续发送 3 条消息到 MQ 队列中,假如耗时 5ms,A 系统从接受一个请求到返回响应给用户,总时长是 3 + 5 = 8ms。削峰:减少高峰时期对服务器压力。1.1 消息队列有什么优缺点?RabbitMQ有什么优缺点?优点上面已经说了,就是在特殊场景下有其对应的好处,解耦、异步、削峰。缺点有以下几个:系统可用性降低本来系统运行好好的,现在你非要加入个消息队列进去,那消息队列挂了,你的系统不是呵呵了。因此,系统可用性会降低;系统复杂度提高加入了消息队列,要多考虑很多方面的问题,比如:一致性问题、如何保证消息不被重复消费、如何保证消息可靠性传输等。因此,需要考虑的东西更多,复杂性增大。一致性问题A 系统处理完了直接返回成功了,人都以为你这个请求就成功了;但是问题是,要是 BCD 三个系统那里,BD 两个系统写库成功了,结果 C 系统写库失败了,咋整?你这数据就不一致了。所以消息队列实际是一种非常复杂的架构,你引入它有很多好处,但是也得针对它带来的坏处做各种额外的技术方案和架构来规避掉,做好之后,你会发现,妈呀,系统复杂度提升了一个数量级,也许是复杂了 10 倍。但是关键时刻,用,还是得用的。1.2 你们公司生产环境用的是什么消息中间件?这个首先你可以说下你们公司选用的是什么消息中间件,比如用的是RabbitMQ,然后可以初步给一些你对不同MQ中间件技术的选型分析。举个例子:比如说ActiveMQ是老牌的消息中间件,国内很多公司过去运用的还是非常广泛的,功能很强大。但是问题在于没法确认ActiveMQ可以支撑互联网公司的高并发、高负载以及高吞吐的复杂场景,在国内互联网公司落地较少。而且使用较多的是一些传统企业,用ActiveMQ做异步调用和系统解耦。然后你可以说说RabbitMQ,他的好处在于可以支撑高并发、高吞吐、性能很高,同时有非常完善便捷的后台管理界面可以使用。另外,他还支持集群化、高可用部署架构、消息高可靠支持,功能较为完善。而且经过调研,国内各大互联网公司落地大规模RabbitMQ集群支撑自身业务的case较多,国内各种中小型互联网公司使用RabbitMQ的实践也比较多。除此之外,RabbitMQ的开源社区很活跃,较高频率的迭代版本,来修复发现的bug以及进行各种优化,因此综合考虑过后,公司采取了RabbitMQ。但是RabbitMQ也有一点缺陷,就是他自身是基于erlang语言开发的,所以导致较为难以分析里面的源码,也较难进行深层次的源码定制和改造,毕竟需要较为扎实的erlang语言功底才可以。然后可以聊聊RocketMQ,是阿里开源的,经过阿里的生产环境的超高并发、高吞吐的考验,性能卓越,同时还支持分布式事务等特殊场景。而且RocketMQ是基于Java语言开发的,适合深入阅读源码,有需要可以站在源码层面解决线上生产问题,包括源码的二次开发和改造。另外就是Kafka。Kafka提供的消息中间件的功能明显较少一些,相对上述几款MQ中间件要少很多。但是Kafka的优势在于专为超高吞吐量的实时日志采集、实时数据同步、实时数据计算等场景来设计。因此Kafka在大数据领域中配合实时计算技术(比如Spark Streaming、Storm、Flink)使用的较多。但是在传统的MQ中间件使用场景中较少采用。1.3 Kafka、ActiveMQ、RabbitMQ、RocketMQ 有什么优缺点?综上,各种对比之后,有如下建议:一般的业务系统要引入 MQ,最早大家都用 ActiveMQ,但是现在确实大家用的不多了,没经过大规模吞吐量场景的验证,社区也不是很活跃,所以大家还是算了吧,我个人不推荐用这个了;后来大家开始用 RabbitMQ,但是确实 erlang 语言阻止了大量的 Java 工程师去深入研究和掌控它,对公司而言,几乎处于不可控的状态,但是确实人家是开源的,比较稳定的支持,活跃度也高;不过现在确实越来越多的公司会去用 RocketMQ,确实很不错,毕竟是阿里出品,但社区可能有突然黄掉的风险(目前 RocketMQ 已捐给 Apache,但 GitHub 上的活跃度其实不算高)对自己公司技术实力有绝对自信的,推荐用 RocketMQ,否则回去老老实实用 RabbitMQ 吧,人家有活跃的开源社区,绝对不会黄。所以中小型公司,技术实力较为一般,技术挑战不是特别高,用 RabbitMQ 是不错的选择;大型公司,基础架构研发实力较强,用 RocketMQ 是很好的选择。如果是大数据领域的实时计算、日志采集等场景,用 Kafka 是业内标准的,绝对没问题,社区活跃度很高,绝对不会黄,何况几乎是全世界这个领域的事实性规范。1.4 MQ 有哪些常见问题?如何解决这些问题?MQ 的常见问题有:1.消息的顺序问题2.消息的重复问题消息的顺序问题消息有序指的是可以按照消息的发送顺序来消费。假如生产者产生了 2 条消息:M1、M2,假定 M1 发送到 S1,M2 发送到 S2,如果要保证 M1 先于 M2 被消费,怎么做?解决方案:(1)保证生产者 - MQServer - 消费者是一对一对一的关系缺陷:并行度就会成为消息系统的瓶颈(吞吐量不够)更多的异常处理,比如:只要消费端出现问题,就会导致整个处理流程阻塞,我们不得不花费更多的精力来解决阻塞的问题。 (2)通过合理的设计或者将问题分解来规避。不关注乱序的应用实际大量存在队列无序并不意味着消息无序 所以从业务层面来保证消息的顺序而不仅仅是依赖于消息系统,是一种更合理的方式。消息的重复问题造成消息重复的根本原因是:网络不可达。所以解决这个问题的办法就是绕过这个问题。那么问题就变成了:如果消费端收到两条一样的消息,应该怎样处理?消费端处理消息的业务逻辑保持幂等性。只要保持幂等性,不管来多少条重复消息,最后处理的结果都一样。保证每条消息都有唯一编号且保证消息处理成功与去重表的日志同时出现。利用一张日志表来记录已经处理成功的消息的 ID,如果新到的消息 ID 已经在日志表中,那么就不再处理这条消息。2 什么是RabbitMQ?RabbitMQ是一款开源的,Erlang编写的,基于AMQP协议的消息中间件2.1 rabbitmq 的使用场景(1)服务间异步通信(2)顺序消费(3)定时任务(4)请求削峰2.2 RabbitMQ基本概念Broker: 简单来说就是消息队列服务器实体Exchange: 消息交换机,它指定消息按什么规则,路由到哪个队列Queue: 消息队列载体,每个消息都会被投入到一个或多个队列Binding: 绑定,它的作用就是把exchange和queue按照路由规则绑定起来Routing Key: 路由关键字,exchange根据这个关键字进行消息投递VHost: vhost 可以理解为虚拟 broker ,即 mini-RabbitMQ server。其内部均含有独立的 queue、exchange 和 binding 等,但最最重要的是,其拥有独立的权限系统,可以做到 vhost 范围的用户控制。当然,从 RabbitMQ 的全局角度,vhost 可以作为不同权限隔离的手段(一个典型的例子就是不同的应用可以跑在不同的 vhost 中)。Producer: 消息生产者,就是投递消息的程序Consumer: 消息消费者,就是接受消息的程序Channel: 消息通道,在客户端的每个连接里,可建立多个channel,每个channel代表一个会话任务由Exchange、Queue、RoutingKey三个才能决定一个从Exchange到Queue的唯一的线路。2.3 RabbitMQ的工作模式一.simple模式(即最简单的收发模式)1.消息产生消息,将消息放入队列2.消息的消费者(consumer) 监听 消息队列,如果队列中有消息,就消费掉,消息被拿走后,自动从队列中删除(隐患 消息可能没有被消费者正确处理,已经从队列中消失了,造成消息的丢失,这里可以设置成手动的ack,但如果设置成手动ack,处理完后要及时发送ack消息给队列,否则会造成内存溢出)。二.work工作模式(资源的竞争)1.消息产生者将消息放入队列消费者可以有多个,消费者1,消费者2同时监听同一个队列,消息被消费。C1 C2共同争抢当前的消息队列内容,谁先拿到谁负责消费消息(隐患:高并发情况下,默认会产生某一个消息被多个消费者共同使用,可以设置一个开关(syncronize) 保证一条消息只能被一个消费者使用)。三.publish/subscribe发布订阅(共享资源)1、每个消费者监听自己的队列;2、生产者将消息发给broker,由交换机将消息转发到绑定此交换机的每个队列,每个绑定交换机的队列都将接收到消息。四.routing路由模式1.消息生产者将消息发送给交换机按照路由判断,路由是字符串(info) 当前产生的消息携带路由字符(对象的方法),交换机根据路由的key,只能匹配上路由key对应的消息队列,对应的消费者才能消费消息;2.根据业务功能定义路由字符串3.从系统的代码逻辑中获取对应的功能字符串,将消息任务扔到对应的队列中。4.业务场景:error 通知;EXCEPTION;错误通知的功能;传统意义的错误通知;客户通知;利用key路由,可以将程序中的错误封装成消息传入到消息队列中,开发者可以自定义消费者,实时接收错误;五.topic 主题模式(路由模式的一种)1.星号井号代表通配符2.星号代表多个单词,井号代表一个单词3.路由功能添加模糊匹配4.消息产生者产生消息,把消息交给交换机5.交换机根据key的规则模糊匹配到对应的队列,由队列的监听消费者接收消息消费(在我的理解看来就是routing查询的一种模糊匹配,就类似sql的模糊查询方式)2.4 如何保证RabbitMQ消息的顺序性?拆分多个 queue,每个 queue 一个 consumer,就是多一些 queue 而已,确实是麻烦点;或者就一个 queue 但是对应一个 consumer,然后这个 consumer 内部用内存队列做排队,然后分发给底层不同的 worker 来处理。2.5 消息如何分发?若该队列至少有一个消费者订阅,消息将以循环(round-robin)的方式发送给消费者。每条消息只会分发给一个订阅的消费者(前提是消费者能够正常处理消息并进行确认)。通过路由可实现多消费的功能2.6 消息怎么路由?消息提供方->路由->一至多个队列消息发布到交换器时,消息将拥有一个路由键(routing key),在消息创建时设定。通过队列路由键,可以把队列绑定到交换器上。消息到达交换器后,RabbitMQ 会将消息的路由键与队列的路由键进行匹配(针对不同的交换器有不同的路由规则);常用的交换器主要分为一下三种:fanout:如果交换器收到消息,将会广播到所有绑定的队列上direct:如果路由键完全匹配,消息就被投递到相应的队列topic:可以使来自不同源头的消息能够到达同一个队列。 使用 topic 交换器时,可以使用通配符2.7 消息基于什么传输?由于 TCP 连接的创建和销毁开销较大,且并发数受系统资源限制,会造成性能瓶颈。RabbitMQ 使用信道的方式来传输数据。信道是建立在真实的 TCP 连接内的虚拟连接,且每条 TCP 连接上的信道数量没有限制。2.8 如何保证消息不被重复消费?或者说,如何保证消息消费时的幂等性?先说为什么会重复消费:正常情况下,消费者在消费消息的时候,消费完毕后,会发送一个确认消息给消息队列,消息队列就知道该消息被消费了,就会将该消息从消息队列中删除;但是因为网络传输等等故障,确认信息没有传送到消息队列,导致消息队列不知道自己已经消费过该消息了,再次将消息分发给其他的消费者。针对以上问题,一个解决思路是:保证消息的唯一性,就算是多次传输,不要让消息的多次消费带来影响;保证消息等幂性;比如:在写入消息队列的数据做唯一标示,消费消息时,根据唯一标识判断是否消费过;假设你有个系统,消费一条消息就往数据库里插入一条数据,要是你一个消息重复两次,你不就插入了两条,这数据不就错了?但是你要是消费到第二次的时候,自己判断一下是否已经消费过了,若是就直接扔了,这样不就保留了一条数据,从而保证了数据的正确性。2.9 如何确保消息正确地发送至 RabbitMQ? 如何确保消息接收方消费了消息?发送方确认模式将信道设置成 confirm 模式(发送方确认模式),则所有在信道上发布的消息都会被指派一个唯一的 ID。一旦消息被投递到目的队列后,或者消息被写入磁盘后(可持久化的消息),信道会发送一个确认给生产者(包含消息唯一 ID)。如果 RabbitMQ 发生内部错误从而导致消息丢失,会发送一条 nack(notacknowledged,未确认)消息。发送方确认模式是异步的,生产者应用程序在等待确认的同时,可以继续发送消息。当确认消息到达生产者应用程序,生产者应用程序的回调方法就会被触发来处理确认消息。接收方确认机制消费者接收每一条消息后都必须进行确认(消息接收和消息确认是两个不同操作)。只有消费者确认了消息,RabbitMQ 才能安全地把消息从队列中删除。这里并没有用到超时机制,RabbitMQ 仅通过 Consumer 的连接中断来确认是否需要重新发送消息。也就是说,只要连接不中断,RabbitMQ 给了 Consumer 足够长的时间来处理消息。保证数据的最终一致性;下面罗列几种特殊情况如果消费者接收到消息,在确认之前断开了连接或取消订阅,RabbitMQ 会认为消息没有被分发,然后重新分发给下一个订阅的消费者。(可能存在消息重复消费的隐患,需要去重)如果消费者接收到消息却没有确认消息,连接也未断开,则 RabbitMQ 认为该消费者繁忙,将不会给该消费者分发更多的消息。2.10 如何保证RabbitMQ消息的可靠传输?消息不可靠的情况可能是消息丢失,劫持等原因;丢失又分为:生产者丢失消息、消息列表丢失消息、消费者丢失消息;生产者丢失消息:从生产者弄丢数据这个角度来看,RabbitMQ提供transaction和confirm模式来确保生产者不丢消息;transaction机制就是说:发送消息前,开启事务(channel.txSelect()),然后发送消息,如果发送过程中出现什么异常,事务就会回滚(channel.txRollback()),如果发送成功则提交事务(channel.txCommit())。然而,这种方式有个缺点:吞吐量下降;confirm模式用的居多:一旦channel进入confirm模式,所有在该信道上发布的消息都将会被指派一个唯一的ID(从1开始),一旦消息被投递到所有匹配的队列之后;rabbitMQ就会发送一个ACK给生产者(包含消息的唯一ID),这就使得生产者知道消息已经正确到达目的队列了;如果rabbitMQ没能处理该消息,则会发送一个Nack消息给你,你可以进行重试操作。消息队列丢数据:消息持久化。处理消息队列丢数据的情况,一般是开启持久化磁盘的配置。这个持久化配置可以和confirm机制配合使用,你可以在消息持久化磁盘后,再给生产者发送一个Ack信号。这样,如果消息持久化磁盘之前,rabbitMQ阵亡了,那么生产者收不到Ack信号,生产者会自动重发。那么如何持久化呢?这里顺便说一下吧,其实也很容易,就下面两步1.将queue的持久化标识durable设置为true,则代表是一个持久的队列2.发送消息的时候将deliveryMode=2这样设置以后,即使rabbitMQ挂了,重启后也能恢复数据消费者丢失消息:消费者丢数据一般是因为采用了自动确认消息模式,改为手动确认消息即可!消费者在收到消息之后,处理消息之前,会自动回复RabbitMQ已收到消息;如果这时处理消息失败,就会丢失该消息;解决方案:处理消息成功后,手动回复确认消息。2.11 为什么不应该对所有的 message 都使用持久化机制?首先,必然导致性能的下降,因为写磁盘比写 RAM 慢的多,message 的吞吐量可能有 10 倍的差距。其次,message 的持久化机制用在 RabbitMQ 的内置 cluster 方案时会出现“坑爹”问题。矛盾点在于,若 message 设置了 persistent 属性,但 queue 未设置 durable 属性,那么当该 queue 的 owner node 出现异常后,在未重建该 queue 前,发往该 queue 的 message 将被 blackholed ;若 message 设置了 persistent 属性,同时 queue 也设置了 durable 属性,那么当 queue 的 owner node 异常且无法重启的情况下,则该 queue 无法在其他 node 上重建,只能等待其 owner node 重启后,才能恢复该 queue 的使用,而在这段时间内发送给该 queue 的 message 将被 blackholed 。所以,是否要对 message 进行持久化,需要综合考虑性能需要,以及可能遇到的问题。若想达到 100,000 条/秒以上的消息吞吐量(单 RabbitMQ 服务器),则要么使用其他的方式来确保 message 的可靠 delivery ,要么使用非常快速的存储系统以支持全持久化(例如使用 SSD)。另外一种处理原则是:仅对关键消息作持久化处理(根据业务重要程度),且应该保证关键消息的量不会导致性能瓶颈。2.12 如何保证高可用的?RabbitMQ 的集群RabbitMQ 是比较有代表性的,因为是基于主从(非分布式)做高可用性的,我们就以 RabbitMQ 为例子讲解第一种 MQ 的高可用性怎么实现。RabbitMQ 有三种模式:单机模式、普通集群模式、镜像集群模式。单机模式,就是 Demo 级别的,一般就是你本地启动了玩玩儿的?,没人生产用单机模式普通集群模式,意思就是在多台机器上启动多个 RabbitMQ 实例,每个机器启动一个。你创建的 queue,只会放在一个 RabbitMQ 实例上,但是每个实例都同步 queue 的元数据(元数据可以认为是 queue 的一些配置信息,通过元数据,可以找到 queue 所在实例)。你消费的时候,实际上如果连接到了另外一个实例,那么那个实例会从 queue 所在实例上拉取数据过来。这方案主要是提高吞吐量的,就是说让集群中多个节点来服务某个 queue 的读写操作。镜像集群模式:这种模式,才是所谓的 RabbitMQ 的高可用模式。跟普通集群模式不一样的是,在镜像集群模式下,你创建的 queue,无论元数据还是 queue 里的消息都会存在于多个实例上,就是说,每个 RabbitMQ 节点都有这个 queue 的一个完整镜像,包含 queue 的全部数据的意思。然后每次你写消息到 queue 的时候,都会自动把消息同步到多个实例的 queue 上。RabbitMQ 有很好的管理控制台,就是在后台新增一个策略,这个策略是镜像集群模式的策略,指定的时候是可以要求数据同步到所有节点的,也可以要求同步到指定数量的节点,再次创建 queue 的时候,应用这个策略,就会自动将数据同步到其他的节点上去了。这样的话,好处在于,你任何一个机器宕机了,没事儿,其它机器(节点)还包含了这个 queue 的完整数据,别的 consumer 都可以到其它节点上去消费数据。坏处在于,第一,这个性能开销也太大了吧,消息需要同步到所有机器上,导致网络带宽压力和消耗很重!RabbitMQ 一个 queue 的数据都是放在一个节点里的,镜像集群下,也是每个节点都放这个 queue 的完整数据。2.13 如何解决消息队列的延时以及过期失效问题?消息队列满了以后该怎么处理?有几百万消息持续积压几小时,说说怎么解决?消息积压处理办法:临时紧急扩容:先修复 consumer 的问题,确保其恢复消费速度,然后将现有 cnosumer 都停掉。新建一个 topic,partition 是原来的 10 倍,临时建立好原先 10 倍的 queue 数量。然后写一个临时的分发数据的 consumer 程序,这个程序部署上去消费积压的数据,消费之后不做耗时的处理,直接均匀轮询写入临时建立好的 10 倍数量的 queue。接着临时征用 10 倍的机器来部署 consumer,每一批 consumer 消费一个临时 queue 的数据。这种做法相当于是临时将 queue 资源和 consumer 资源扩大 10 倍,以正常的 10 倍速度来消费数据。等快速消费完积压数据之后,得恢复原先部署的架构,重新用原先的 consumer 机器来消费消息。MQ中消息失效:假设你用的是 RabbitMQ,RabbtiMQ 是可以设置过期时间的,也就是 TTL。如果消息在 queue 中积压超过一定的时间就会被 RabbitMQ 给清理掉,这个数据就没了。那这就是第二个坑了。这就不是说数据会大量积压在 mq 里,而是大量的数据会直接搞丢。我们可以采取一个方案,就是批量重导,这个我们之前线上也有类似的场景干过。就是大量积压的时候,我们当时就直接丢弃数据了,然后等过了高峰期以后,比如大家一起喝咖啡熬夜到晚上12点以后,用户都睡觉了。这个时候我们就开始写程序,将丢失的那批数据,写个临时程序,一点一点的查出来,然后重新灌入 mq 里面去,把白天丢的数据给他补回来。也只能是这样了。假设 1 万个订单积压在 mq 里面,没有处理,其中 1000 个订单都丢了,你只能手动写程序把那 1000 个订单给查出来,手动发到 mq 里去再补一次。mq消息队列块满了:如果消息积压在 mq 里,你很长时间都没有处理掉,此时导致 mq 都快写满了,咋办?这个还有别的办法吗?没有,谁让你第一个方案执行的太慢了,你临时写程序,接入数据来消费,消费一个丢弃一个,都不要了,快速消费掉所有的消息。然后走第二个方案,到了晚上再补数据吧。2.14 设计MQ思路比如说这个消息队列系统,我们从以下几个角度来考虑一下:首先这个 mq 得支持可伸缩性吧,就是需要的时候快速扩容,就可以增加吞吐量和容量,那怎么搞?设计个分布式的系统呗,参照一下 kafka 的设计理念,broker -> topic -> partition,每个 partition 放一个机器,就存一部分数据。如果现在资源不够了,简单啊,给 topic 增加 partition,然后做数据迁移,增加机器,不就可以存放更多数据,提供更高的吞吐量了?其次你得考虑一下这个 mq 的数据要不要落地磁盘吧?那肯定要了,落磁盘才能保证别进程挂了数据就丢了。那落磁盘的时候怎么落啊?顺序写,这样就没有磁盘随机读写的寻址开销,磁盘顺序读写的性能是很高的,这就是 kafka 的思路。其次你考虑一下你的 mq 的可用性啊?这个事儿,具体参考之前可用性那个环节讲解的 kafka 的高可用保障机制。多副本 -> leader & follower -> broker 挂了重新选举 leader 即可对外服务。能不能支持数据 0 丢失啊?可以的,参考我们之前说的那个 kafka 数据零丢失方案。
文章
消息中间件  ·  SQL  ·  网络协议  ·  固态存储  ·  中间件  ·  Java  ·  大数据  ·  Kafka  ·  RocketMQ  ·  流计算
2023-01-31
Apache RocketMQ谁知道这个是什么问题导致的呢 ?
Apache RocketMQ谁知道这个是什么问题导致的呢 ?
问答
消息中间件  ·  Apache  ·  RocketMQ
2023-01-29
Fastjson官方再次披露高危漏洞,包括rocketmq、jeecg-boot等近15%的github开源项目受影响
漏洞简述2022年5月23日,fastjson 官方发布安全通报,fastjson <= 1.2.80 存在反序列化任意代码执行漏洞,在特定条件下可绕过默认autoType关闭限制,可能会导致远程服务器被攻击,风险影响较大。建议使用了 fastjson 的用户尽快采取安全措施保障系统安全。漏洞评级:高危影响组件:com.alibaba:fastjson影响版本:<= 1.2.80墨菲安全通过murphysec开源工具对github 7000多个java开源项目进行了检测,发现本次漏洞至少影响1031个项目,其中star大于500的项目达到290个。/jeecgboot/jeecg-boot/alibaba/Sentinel/xkcoding/spring-boot-demo/Tencent/APIJSON/apache/rocketmq/alibaba/DataX/zhaojun1998/zfile/alibaba/jetcache/alibaba/yugong/alibaba/GraphScope等项目均受到此次漏洞影响;处置建议方式一:升级到1.2.83版本,该版本涉及autotype行为变更,在某些场景会出现不兼容的情况,需要注意。方式二:开启safeMode来禁用autoType,开启方式参考官方说明:https://github.com/alibaba/fastjson/wiki/fastjson_safemode方式三:使用fastjson V2版本,不完全兼容1.x,升级需要做认真的兼容测试。如何快速排查墨菲安全提供了一系列检测工具,能够帮助您快速排查项目是否收到影响。墨菲安全IDE插件在 IDE 中即可检测代码依赖的安全问题,并通过准确的修复方案和一键修复功能,快速解决安全问题。使用方式:IDE插件中搜索“murphysec”即可安装选择“点击开始扫描”,即可检测出代码中存在哪些安全缺陷组件墨菲安全开源CLI工具使用CLI工具,在命令行检测指定目录代码的依赖安全问题工具地址:https://github.com/murphysecurity/murphysec具体使用方式可参考项目 README 或官方文档说明:检测仅发生在您的本地环境中,不会上传任何代码至服务端以上几种检测方式均可在墨菲安全平台上查看详细的检测结果,并可以查看项目的直接或间接依赖信息。参考链接:https://github.com/alibaba/fastjson/wiki/security_update_20220523
文章
消息中间件  ·  安全  ·  IDE  ·  fastjson  ·  Java  ·  Apache  ·  开发工具  ·  DataX  ·  RocketMQ  ·  Sentinel
2023-01-30
1 2 3 4 5 6 7 8 9
...
20
跳转至:
消息队列
0 人关注 | 0 讨论 | 5 内容
+ 订阅
  • 基于 RocketMQ Prometheus Exporter 打造定制化 DevOps 平台
  • 阿里的 RocketMQ 如何让双十一峰值之下 0 故障?
  • 如何在 Spring 生态中玩转 RocketMQ?
查看更多 >
阿里云云原生
8670 人关注 | 6434 讨论 | 1961 内容
+ 订阅
  • OpenYurt v1.2 新版本深度解读(一): 聚焦边云网络优化
  • Sealer 0.9 :帮助集群和分布式应用实现 Build、 Share、Run
  • 全景剖析阿里云容器网络数据链路(二):Terway EN
查看更多 >
微服务
22994 人关注 | 11313 讨论 | 33211 内容
+ 订阅
  • DCM:中间件家族迎来新成员,属实牛逼
  • DCM:中间件家族迎来新成员,属实牛逼
  • Jenkins集群配置/并发构建
查看更多 >
开发与运维
5611 人关注 | 131418 讨论 | 301452 内容
+ 订阅
  • 阿里云国际站怎么样怎么注册怎么USDT冲值怎么实名怎么购买服务器准备工作(迁移前必读)
  • 【MySQL从入门到精通】【高级篇】(二十八)子查询优化,排序优化,GROUP BY优化和分页查询优化
  • 【云原生】为什么 BI 软件都搞不定关联分析,到底为什么呢?
查看更多 >
云原生
233850 人关注 | 11318 讨论 | 45139 内容
+ 订阅
  • 【云原生】为什么 BI 软件都搞不定关联分析,到底为什么呢?
  • 【云原生】SPL 提速天体聚类任务 2000 倍【文末送书】
  • Apifox --- 全套服务提升了团队效率,让研测之间充满了爱(记Apifox在工程中的实际应用)【云原生】
查看更多 >