标签继承实践--RocketMQ实例标签继承Topic或者Group

简介: 基于运维编排将RocketMQ实例上的标签同步到实例所包含的topic 和 group 上。

需求

需要将RocketMQ实例上的标签同步到实例所包含的topic 和 group 上


实现原理

  • 查询RocketMQ实例标签
  • 是否继承topic
  • 将指定实例标签继承到topic
  • 是否继承group
  • 将指定实例标签继承到group

image.png


操作步骤

  • 先创建子模版 模板名称为Ons_topicExtendInstanceTags,具体脚本参考下面子模板代码。
  • 再创建子模板 模板名称为Ons_groupExtendInstanceTags,具体脚本参考下面子模板代码。
  • 最后创建父模版,并将父模版中的子模版执行名称,改成第1、2步里自己创建模版时的名称,如果使用上述名称就不需要修改。具体脚本参考下面父模板代码。
  • 创建执行
  • 选择需要继承的地域
  • 选择需要继承的标签
  • 选择需要继承资源类型
  • 选择相关key是否覆盖
  • 在 消息队列 RocketMQ 版控制台查看执行效果。

image.png



模板

子模版:根据实例Id以及其他条件更新Topic 的标签

模板名称:Ons_topicExtendInstanceTags

FormatVersion: OOS-2019-06-01
Description: 标签继承
Parameters:  regionId:    Type: String
    Description:      en: The id of region
      zh-cn: 地域ID
    Label:      en: Region
      zh-cn: 地域
    AssociationProperty: RegionId
    Default: '{{ ACS::RegionId }}'  instanceId:    Type: String
    Description:      en: The id of OnsInstance
      zh-cn: RocketMQ实例ID
    Label:      en: InstanceId
      zh-cn: RocketMQ实例ID
  tagKeys:    Type: List
    Description: 标签Key的集合
  isUpdate:    Type: Boolean
    Description: 是否覆盖
    Default: falseRamRole: ''Tasks:  - Name: getOnsInstanceAndTags
    Action: ACS::ExecuteAPI
    Description: ListTagResources
    Properties:      Service: ONS
      API: ListTagResources
      Parameters:        RegionId: '{{ regionId }}'        ResourceId:          - '{{instanceId}}'        ResourceType: instance
    Outputs:      onsTags:        Type: Json
        ValueSelector: '.TagResources | map(select( .TagKey | test("^(?!acs).*"))) | map(select(.TagKey as $tagKey | {{tagKeys}} | index($tagKey) >=0)) '  - Name: getTopicAndTags
    Action: ACS::ExecuteAPI
    Description: OnsTopicList
    Properties:      Service: Ons
      API: OnsTopicList
      Parameters:        RegionId: '{{ regionId }}'        InstanceId: '{{instanceId}}'    Outputs:      updateTopicAndTags:        Type: List
        ValueSelector: .Data.PublishInfoDo[] | {"Topic":.Topic,"Tags":.Tags}  - Name: tagResources
    Action: ACS::ExecuteAPI
    Description: TagTopics
    Properties:      Service: Ons
      API: TagResources
      Parameters:        RegionId: '{{ regionId }}'        ResourceId:          Fn::Jq:            - All
            - .Topic
            - '{{ACS::TaskLoopItem}}'        Tag:          Fn::If:            - '{{isUpdate}}'            - Fn::Jq:                - All
                - '.[] |{"Key":.TagKey, "Value": .TagValue} '                - '{{getOnsInstanceAndTags.onsTags}}'            - Fn::Jq:                - All
                - '.Tags.Tag as $topicTags | {{getOnsInstanceAndTags.onsTags}} | map(select(.TagKey as $tagKey | [$topicTags[].Key] | index($tagKey) <0)) |.[] |{"Key":.TagKey, "Value": .TagValue}'                - '{{ACS::TaskLoopItem}}'        ResourceType: topic
        InstanceId: '{{instanceId}}'    Outputs:      reqResult:        Type: Json
        ValueSelector: .RequestId
    Loop:      RateControl:        Mode: Concurrency
        MaxErrors: 100        Concurrency: 1      Items: '{{getTopicAndTags.updateTopicAndTags}}'      Outputs:        tagResult:          AggregateType: Fn::ListJoin
          AggregateField: reqResult

子模版:根据实例Id 以及输入条件更新Group 的标签

模板名称:Ons_groupExtendInstanceTags

FormatVersion: OOS-2019-06-01
Description: 标签继承
Parameters:  regionId:    Type: String
    Description:      en: The id of region
      zh-cn: 地域ID
    Label:      en: Region
      zh-cn: 地域
    AssociationProperty: RegionId
    Default: '{{ ACS::RegionId }}'  instanceId:    Type: String
    Description:      en: The id of OnsInstance
      zh-cn: 实例ID
    Label:      en: InstanceId
      zh-cn: 实例ID
  tagKeys:    Type: List
    Description: 标签Key的集合
  isUpdate:    Type: Boolean
    Description: 是否覆盖
    Default: falseRamRole: ''Tasks:  - Name: getOnsInstanceAndTags
    Action: ACS::ExecuteAPI
    Description: ListTagResources
    Properties:      Service: ONS
      API: ListTagResources
      Parameters:        RegionId: '{{ regionId }}'        ResourceId:          - '{{instanceId}}'        ResourceType: instance
    Outputs:      onsTags:        Type: Json
        ValueSelector: '.TagResources | map(select( .TagKey | test("^(?!acs).*"))) | map(select(.TagKey as $tagKey | {{tagKeys}} | index($tagKey) >=0)) '  - Name: getGroupAndTags
    Action: ACS::ExecuteAPI
    Description: OnsGroupList
    Properties:      Service: Ons
      API: OnsGroupList
      Parameters:        RegionId: '{{ regionId }}'        InstanceId: '{{instanceId}}'    Outputs:      updateGroupAndTags:        Type: List
        ValueSelector: .Data.SubscribeInfoDo[] | {"Group":.GroupId,"Tags":.Tags}  - Name: tagResources
    Action: ACS::ExecuteAPI
    Description: TagGroups
    Properties:      Service: Ons
      API: TagResources
      Parameters:        RegionId: '{{ regionId }}'        ResourceId:          Fn::Jq:            - All
            - .Group
            - '{{ACS::TaskLoopItem}}'        Tag:          Fn::If:            - '{{isUpdate}}'            - Fn::Jq:                - All
                - '.[] |{"Key":.TagKey, "Value": .TagValue} '                - '{{getOnsInstanceAndTags.onsTags}}'            - Fn::Jq:                - All
                - '.Tags.Tag as $groupTags | {{getOnsInstanceAndTags.onsTags}} | map(select(.TagKey as $tagKey | [$groupTags[].Key] | index($tagKey) <0)) |.[] |{"Key":.TagKey, "Value": .TagValue}'                - '{{ACS::TaskLoopItem}}'        ResourceType: group
        InstanceId: '{{instanceId}}'    Outputs:      reqResult:        Type: Json
        ValueSelector: .RequestId
    Loop:      RateControl:        Mode: Concurrency
        MaxErrors: 100        Concurrency: 1      Items: '{{getGroupAndTags.updateGroupAndTags}}'      Outputs:        tagResult:          AggregateType: Fn::ListJoin
          AggregateField: reqResult

父模版:按照输入条件同步某个地域下所有Ons 实例的Topic 和Group 的标签

模板名称:RocketMQInstanceTagInherit

FormatVersion: OOS-2019-06-01
Description: 标签继承
Parameters:  regionIds:    Type: List
    Description:      en: The id of region
      zh-cn: 地域ID
    Label:      en: Region
      zh-cn: 地域
    AssociationProperty: RegionId
  tags:    Type: List
    AssociationProperty: Tags
    AssociationPropertyMetadata:      ResourceType: ALIYUN::MQ::INSTANCE
    Description: 消息队列RocketMQ实例上的标签,选择需要继承到topic/group上的标签键及标签值
  resourceTypes:    Type: List
    Description: 所选择继承的资源类型
    AllowedValues:      - topic
      - group
  isUpdate:    Type: Boolean
    Description: 如果被继承的资源存在相同key但是value不相同时是否覆盖value
    Default: falseRamRole: ''Tasks:  - Name: getOnsInstance
    Action: ACS::ExecuteAPI
    Description: OnsInstanceInServiceList
    Properties:      Service: ONS
      API: OnsInstanceInServiceList
      Parameters:        RegionId: '{{ACS::TaskLoopItem}}'    Outputs:      onsInstance:        Type: List
        ValueSelector: '.Data.InstanceVO[] |. + {"RegionId": "{{ACS::TaskLoopItem}}" }'    Loop:      RateControl:        Mode: Concurrency
        MaxErrors: 100        Concurrency: 1      Items: '{{regionIds}}'      Outputs:        allOnsInstances:          AggregateType: Fn::ListJoin
          AggregateField: onsInstance
  - Name: isTagTopic
    Action: ACS::Choice
    Description:      en: Choose next task by resource type
      zh-cn: 根据资源类型选择下一个任务
    Properties:      DefaultTask: isTagGroup
      Choices:        - When:            Fn::Equals:              - true
              - Fn::Jq:                  - First
                  - contains(["topic"])
                  - '{{resourceTypes}}'          NextTask: tagTopic
  - Name: tagTopic
    Action: ACS::Template
    Description:      en: Update topic tags by ons instance tags
      zh-cn: 通过RocketMQ实例标签更新Topic的标签
    Properties:      TemplateName: Ons_topicExtendInstanceTags
      Parameters:        regionId:          Fn::Jq:            - First
            - .RegionId
            - '{{ACS::TaskLoopItem}}'        instanceId:          Fn::Jq:            - First
            - .InstanceId
            - '{{ACS::TaskLoopItem}}'        tagKeys:          Fn::Jq:            - All
            - .[].Key
            - '{{tags}}'        isUpdate: '{{isUpdate}}'    Loop:      RateControl:        Mode: Concurrency
        MaxErrors: 100        Concurrency: 1      Items:        Fn::Jq:          - All
          - .[][]          - '{{getOnsInstance.allOnsInstances}}'      Outputs:        tagResult:          AggregateType: Fn::ListJoin
          AggregateField: reqResult
  - Name: isTagGroup
    Action: ACS::Choice
    Description:      en: Choose next task by resource type
      zh-cn: 根据资源类型选择下一个任务
    Properties:      DefaultTask: ACS::END
      Choices:        - When:            Fn::Equals:              - true
              - Fn::Jq:                  - First
                  - contains(["group"])
                  - '{{resourceTypes}}'          NextTask: tagGroup
  - Name: tagGroup
    Action: ACS::Template
    Description:      en: Update snapshot tags by ecs instance tags
      zh-cn: 通过RocketMQ实例标签更新Group的标签
    Properties:      TemplateName: Ons_groupExtendInstanceTags
      Parameters:        regionId:          Fn::Jq:            - First
            - .RegionId
            - '{{ACS::TaskLoopItem}}'        instanceId:          Fn::Jq:            - First
            - .InstanceId
            - '{{ACS::TaskLoopItem}}'        tagKeys:          Fn::Jq:            - All
            - .[].Key
            - '{{tags}}'        isUpdate: '{{isUpdate}}'    Loop:      RateControl:        Mode: Concurrency
        MaxErrors: 100        Concurrency: 1      Items:        Fn::Jq:          - All
          - .[][]          - '{{getOnsInstance.allOnsInstances}}'      Outputs:        tagResult:          AggregateType: Fn::ListJoin
          AggregateField: reqResult
相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
22天前
|
消息中间件 Java Apache
RocketMQ消息回溯实践与解析
在分布式系统和高并发应用的开发中,消息队列扮演着至关重要的角色,而RocketMQ作为阿里巴巴开源的一款高性能消息中间件,以其高吞吐量、高可用性和灵活的配置能力,在业界得到了广泛应用。本文将围绕RocketMQ的消息回溯功能进行实践与解析,分享工作学习中的技术干货。
54 3
|
2月前
|
消息中间件 弹性计算 Kubernetes
RabbitMQ与容器化技术的集成实践
【8月更文第28天】RabbitMQ 是一个开源消息代理和队列服务器,用于在分布式系统中存储、转发消息。随着微服务架构的普及,容器化技术(如 Docker 和 Kubernetes)成为了部署和管理应用程序的标准方式。本文将探讨如何使用 Docker 和 Kubernetes 在生产环境中部署和管理 RabbitMQ 服务,同时保证高可用性和弹性伸缩能力。
39 3
|
22天前
|
消息中间件 存储 缓存
RabbitMQ:交换机详解(Fanout交换机、Direct交换机、Topic交换机)
RabbitMQ:交换机详解(Fanout交换机、Direct交换机、Topic交换机)
83 7
RabbitMQ:交换机详解(Fanout交换机、Direct交换机、Topic交换机)
|
17天前
|
消息中间件 运维 监控
云消息队列RabbitMQ实践解决方案评测报告
本报告旨在对《云消息队列RabbitMQ实践》解决方案进行综合评测。通过对该方案的原理理解、部署体验、设计验证以及实际应用价值等方面进行全面分析,为用户提供详尽的反馈与建议。
48 15
|
17天前
|
消息中间件 弹性计算 运维
阿里云云消息队列RabbitMQ实践解决方案评测报告
阿里云云消息队列RabbitMQ实践解决方案评测报告
43 9
|
12天前
|
消息中间件 监控 数据处理
解决方案 | 云消息队列RabbitMQ实践
解决方案 | 云消息队列RabbitMQ实践
26 1
|
14天前
|
消息中间件 弹性计算 运维
云消息队列RabbitMQ实践
本评测报告详细分析了阿里云云消息队列 RabbitMQ 版的实践原理、部署体验及核心优势。报告认为其在解决消息积压、脑裂难题及弹性伸缩方面表现优秀,但建议进一步细化架构优化策略和技术细节描述。部署文档详尽,对初学者友好,但仍需加强网络配置和版本兼容性说明。实际部署展示了其高可用性和成本优化能力,适用于高并发消息处理和分布式系统数据同步。为进一步提升方案,建议增加安全性配置指导、性能调优建议及监控告警系统设置。
|
2天前
|
消息中间件 监控 测试技术
云消息队列RabbitMQ实践 - 评测
根据反馈,对本解决方案的实践原理已有一定理解,描述整体清晰但需在消息队列配置与使用上增加更多示例和说明以助理解。部署体验中获得了一定的引导和文档支持,尽管文档仍有待完善;期间出现的配置文件错误及依赖库缺失等问题已通过查阅资料解决。设计验证展示了云消息队列RabbitMQ的核心优势,包括高可用性和灵活性,未来可通过增加自动化测试来提高系统稳定性。实践后,用户对方案解决问题的能力及适用场景有了明确认识,认为其具有实际生产价值,不过仍需在性能优化、安全性增强及监控功能上进行改进以适应高并发和大数据量环境。
9 0
|
2月前
|
消息中间件 开发者
【RabbitMQ深度解析】Topic交换器与模式匹配:掌握消息路由的艺术!
【8月更文挑战第24天】在消息队列(MQ)体系中,交换器作为核心组件之一负责消息路由。特别是`topic`类型的交换器,它通过模式匹配实现消息的精准分发,适用于发布-订阅模式。不同于直接交换器和扇形交换器,`topic`交换器支持更复杂的路由策略,通过带有通配符(如 * 和 #)的模式字符串来定义队列与交换器间的绑定关系。
47 2
|
3月前
|
消息中间件 存储 Java
消息队列 MQ使用问题之如何将RocketMQ中某个集群的topic迁移到另一个集群
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。