关于 Broker/Trigger 事件模型

本文涉及的产品
可观测监控 Prometheus 版,每月50GB免费额度
应用实时监控服务-应用监控,每月50GB免费额度
注册配置 MSE Nacos/ZooKeeper,182元/月
简介: Broker 提供一个事件集,可以通过属性选择该事件集。它负责接收事件并将其转发给由一个或多个匹配 Trigger 定义的订阅者。Trigger 描述基于事件属性的过滤器。同时可以根据需要创建多个 Trigger。本文将为大家详细讲解 Broker/Trigger 事件模型。

本文选自《Knative 云原生应用开发指南》
knative海报.png
更多云原生技术资讯可关注阿里巴巴云原生技术圈

Broker 和 Trigger

从 v0.5 开始,Knative Eventing 定义 Broker 和 Trigger 对象,从而能方便的对事件进行过滤(亦如通过 ingress 和 ingress controller 对网络流量的过滤一样)。

  • Broker 提供一个事件集,可以通过属性选择该事件集。它负责接收事件并将其转发给由一个或多个匹配 Trigger 定义的订阅者。
  • Trigger 描述基于事件属性的过滤器。同时可以根据需要创建多个 Trigger。

    如图:

6.png

当前实现方式

Namespace

通过Namespace Reconciler (代码:eventing/pkg/reconciler/v1alpha1/namespace/namespace.go)创建 broker。Namespace Reconciler 会查询所有带knative-eventing-injection: enabled 标签的 namespace。如果存在这样标签的 namespace, 那么Namespace Reconciler将会进行如下处理操作:

  1. 创建 Broker 过滤器的 ServiceAccount:eventing-broker-filter
  2. 通过 RoleBinding 确保 ServiceAccount 的 RBAC 权限
  3. 创建名称为 default 的 Broker
// newBroker creates a placeholder default Broker object for Namespace 'ns'.
func newBroker(ns *corev1.Namespace) *v1alpha1.Broker {
    return &v1alpha1.Broker{
        ObjectMeta: metav1.ObjectMeta{
            Namespace: ns.Name,
            Name:      defaultBroker,
            Labels:    injectedLabels(),
        },
    }
}

Broker (事件代理)

通过Broker Reconciler进行处理 broker,对于每一个 broker, 会进行一下处理操作:

  1. 创建 'trigger'Channel。所有在 Broker 中的 event 事件都会发送到这个Channel, 所有的 Trigger 会订阅这个Channel
  2. 创建'filter'Deployment。这个 Deployment 会运行cmd/broker/filter。其目的是处理与此 Broker 相关的所有 Trigger 的数据平面。说白了其实就做了两件事情,从 Channel 中接收事件,然后转发给事件的订阅者。
  3. 创建'filter' Kubernetes Service。通过该 Service 提供'filter' Deployment的服务访问。
  4. 创建'ingress' Deployment。这个 Deployment 会运行 cmd/broker/ingress。其目的是检查进入 Broker的所有事件
  5. 创建'ingress' Kubernetes Service。通过该 Service提供'Ingress' Deployment的服务访问。
  6. 创建'ingress' Channel。这是一个 Trigger 应答的 Channel。目的是将 Trigger 中返回的事件通过 Ingress Deployment 回写到 Broker。理想情况下,其实不需要这个,可以直接将 Trigger 的响应发送给 k8s Service。但是作为订阅的场景,只允许我们向 Channel 发送响应信息,所以我们需要这个作为中介。
  7. 创建'ingress' Subscription。它通过'ingress' Channel来订阅'ingress' Service

    代码如下:
func (r *reconciler) reconcile(ctx context.Context, b *v1alpha1.Broker) (reconcile.Result, error) {
    // 1. Trigger Channel is created for all events. Triggers will Subscribe to this Channel.
    // 2. Filter Deployment.
    // 3. Ingress Deployment.
    // 4. K8s Services that point at the Deployments.
    // 5. Ingress Channel is created to get events from Triggers back into this Broker via the
    //    Ingress Deployment.
    //   - Ideally this wouldn't exist and we would point the Trigger's reply directly to the K8s
    //     Service. However, Subscriptions only allow us to send replies to Channels, so we need
    //     this as an intermediary.
    // 6. Subscription from the Ingress Channel to the Ingress Service.

    if b.DeletionTimestamp != nil {
        // Everything is cleaned up by the garbage collector.
        return nil
    }

    if b.Spec.ChannelTemplate == nil {
        r.Logger.Error("Broker.Spec.ChannelTemplate is nil",
            zap.String("namespace", b.Namespace), zap.String("name", b.Name))
        return nil
    }

    gvr, _ := meta.UnsafeGuessKindToResource(b.Spec.ChannelTemplate.GetObjectKind().GroupVersionKind())
    channelResourceInterface := r.DynamicClientSet.Resource(gvr).Namespace(b.Namespace)
    if channelResourceInterface == nil {
        return fmt.Errorf("unable to create dynamic client for: %+v", b.Spec.ChannelTemplate)
    }

    track := r.channelableTracker.TrackInNamespace(b)

    triggerChannelName := resources.BrokerChannelName(b.Name, "trigger")
    triggerChannelObjRef := corev1.ObjectReference{
        Kind:       b.Spec.ChannelTemplate.Kind,
        APIVersion: b.Spec.ChannelTemplate.APIVersion,
        Name:       triggerChannelName,
        Namespace:  b.Namespace,
    }
    // Start tracking the trigger channel.
    if err := track(triggerChannelObjRef); err != nil {
        return fmt.Errorf("unable to track changes to the trigger Channel: %v", err)
    }

    logging.FromContext(ctx).Info("Reconciling the trigger channel")
    triggerChan, err := r.reconcileTriggerChannel(ctx, channelResourceInterface, triggerChannelObjRef, b)
    if err != nil {
        logging.FromContext(ctx).Error("Problem reconciling the trigger channel", zap.Error(err))
        b.Status.MarkTriggerChannelFailed("ChannelFailure", "%v", err)
        return err
    }
   ......

    filterDeployment, err := r.reconcileFilterDeployment(ctx, b)
    if err != nil {
        logging.FromContext(ctx).Error("Problem reconciling filter Deployment", zap.Error(err))
        b.Status.MarkFilterFailed("DeploymentFailure", "%v", err)
        return err
    }
    _, err = r.reconcileFilterService(ctx, b)
    if err != nil {
        logging.FromContext(ctx).Error("Problem reconciling filter Service", zap.Error(err))
        b.Status.MarkFilterFailed("ServiceFailure", "%v", err)
        return err
    }
    b.Status.PropagateFilterDeploymentAvailability(filterDeployment)

    ingressDeployment, err := r.reconcileIngressDeployment(ctx, b, triggerChan)
    if err != nil {
        logging.FromContext(ctx).Error("Problem reconciling ingress Deployment", zap.Error(err))
        b.Status.MarkIngressFailed("DeploymentFailure", "%v", err)
        return err
    }

    svc, err := r.reconcileIngressService(ctx, b)
    if err != nil {
        logging.FromContext(ctx).Error("Problem reconciling ingress Service", zap.Error(err))
        b.Status.MarkIngressFailed("ServiceFailure", "%v", err)
        return err
    }
    b.Status.PropagateIngressDeploymentAvailability(ingressDeployment)
    b.Status.SetAddress(&apis.URL{
        Scheme: "http",
        Host:   names.ServiceHostName(svc.Name, svc.Namespace),
    })

    ingressChannelName := resources.BrokerChannelName(b.Name, "ingress")
    ingressChannelObjRef := corev1.ObjectReference{
        Kind:       b.Spec.ChannelTemplate.Kind,
        APIVersion: b.Spec.ChannelTemplate.APIVersion,
        Name:       ingressChannelName,
        Namespace:  b.Namespace,
    }

    // Start tracking the ingress channel.
    if err = track(ingressChannelObjRef); err != nil {
        return fmt.Errorf("unable to track changes to the ingress Channel: %v", err)
    }

    ingressChan, err := r.reconcileIngressChannel(ctx, channelResourceInterface, ingressChannelObjRef, b)
    if err != nil {
        logging.FromContext(ctx).Error("Problem reconciling the ingress channel", zap.Error(err))
        b.Status.MarkIngressChannelFailed("ChannelFailure", "%v", err)
        return err
    }
    b.Status.IngressChannel = &ingressChannelObjRef
    b.Status.PropagateIngressChannelReadiness(&ingressChan.Status)

    ingressSub, err := r.reconcileIngressSubscription(ctx, b, ingressChan, svc)
    if err != nil {
        logging.FromContext(ctx).Error("Problem reconciling the ingress subscription", zap.Error(err))
        b.Status.MarkIngressSubscriptionFailed("SubscriptionFailure", "%v", err)
        return err
    }
    b.Status.PropagateIngressSubscriptionReadiness(&ingressSub.Status)

    return nil
}

Broker 示例:

apiVersion: eventing.knative.dev/v1alpha1
kind: Broker
metadata:
  name: default
spec:
  channelTemplateSpec:
    apiVersion: messaging.knative.dev/v1alpha1
    kind: InMemoryChannel

Trigger (触发器)

通过Trigger Reconciler进行处理 trigger,对于每一个 trigger, 会进行一下处理操作:

  1. 验证 Broker 是否存在
  2. 获取对应 Broker 的 Trigger Channel、 Ingress Channel 以及 Filter Service
  3. 确定订阅者的 URI
  4. 创建一个从 Broker 特定的 Channel 到这个 Trigger 的 kubernetes Service 的订阅。reply 被发送到

    Broker 的 ingress Channel。
  5. 检查是否包含 knative.dev/dependency 的注释。

    代码如下:
func (r *reconciler) reconcile(ctx context.Context, t *v1alpha1.Trigger) error {
    ......
    // 1. Verify the Broker exists.
    // 2. Get the Broker's:
    //   - Trigger Channel
    //   - Ingress Channel
    //   - Filter Service
    // 3. Find the Subscriber's URI.
    // 4. Creates a Subscription from the Broker's Trigger Channel to this Trigger via the Broker's
    //    Filter Service with a specific path, and reply set to the Broker's Ingress Channel.
    // 5. Find whether there is annotation with key "knative.dev/dependency".
    // If not, mark Dependency to be succeeded, else figure out whether the dependency is ready and mark Dependency correspondingly

    if t.DeletionTimestamp != nil {
        // Everything is cleaned up by the garbage collector.
        return nil
    }
    // Tell tracker to reconcile this Trigger whenever the Broker changes.
    brokerObjRef := corev1.ObjectReference{
        Kind:       brokerGVK.Kind,
        APIVersion: brokerGVK.GroupVersion().String(),
        Name:       t.Spec.Broker,
        Namespace:  t.Namespace,
    }

    if err := r.tracker.Track(brokerObjRef, t); err != nil {
        logging.FromContext(ctx).Error("Unable to track changes to Broker", zap.Error(err))
        return err
    }

    b, err := r.brokerLister.Brokers(t.Namespace).Get(t.Spec.Broker)
    if err != nil {
        logging.FromContext(ctx).Error("Unable to get the Broker", zap.Error(err))
        if apierrs.IsNotFound(err) {
            t.Status.MarkBrokerFailed("DoesNotExist", "Broker does not exist")
            _, needDefaultBroker := t.GetAnnotations()[v1alpha1.InjectionAnnotation]
            if t.Spec.Broker == "default" && needDefaultBroker {
                if e := r.labelNamespace(ctx, t); e != nil {
                    logging.FromContext(ctx).Error("Unable to label the namespace", zap.Error(e))
                }
            }
        } else {
            t.Status.MarkBrokerFailed("BrokerGetFailed", "Failed to get broker")
        }
        return err
    }
    t.Status.PropagateBrokerStatus(&b.Status)

    brokerTrigger := b.Status.TriggerChannel
    if brokerTrigger == nil {
        logging.FromContext(ctx).Error("Broker TriggerChannel not populated")
        r.Recorder.Eventf(t, corev1.EventTypeWarning, triggerChannelFailed, "Broker's Trigger channel not found")
        return errors.New("failed to find Broker's Trigger channel")
    }

    brokerIngress := b.Status.IngressChannel
    if brokerIngress == nil {
        logging.FromContext(ctx).Error("Broker IngressChannel not populated")
        r.Recorder.Eventf(t, corev1.EventTypeWarning, ingressChannelFailed, "Broker's Ingress channel not found")
        return errors.New("failed to find Broker's Ingress channel")
    }

    // Get Broker filter service.
    filterSvc, err := r.getBrokerFilterService(ctx, b)
    ......

    subscriberURI, err := r.uriResolver.URIFromDestination(*t.Spec.Subscriber, t)
    if err != nil {
        logging.FromContext(ctx).Error("Unable to get the Subscriber's URI", zap.Error(err))
        return err
    }
    t.Status.SubscriberURI = subscriberURI

    sub, err := r.subscribeToBrokerChannel(ctx, t, brokerTrigger, brokerIngress, filterSvc)
    if err != nil {
        logging.FromContext(ctx).Error("Unable to Subscribe", zap.Error(err))
        t.Status.MarkNotSubscribed("NotSubscribed", "%v", err)
        return err
    }
    t.Status.PropagateSubscriptionStatus(&sub.Status)

    if err := r.checkDependencyAnnotation(ctx, t); err != nil {
        return err
    }

    return nil
}

Trigger 示例:

apiVersion: eventing.knative.dev/v1alpha1
kind: Trigger
metadata:
  name: my-service-trigger
spec:
  broker: default
  filter:
    attributes:
      type: dev.knative.foo.bar
      myextension: my-extension-value
  subscriber:
    ref:
      apiVersion: serving.knative.dev/v1
      kind: Service
      name: my-service

总结

Broker/Trigger 模型出现的意义不仅在于其提供了消息过滤机制,更充分解耦了消息通道的实现,目前除了系统自身支持的基于内存的消息通道 InMemoryChannel 之外,还支持 Kafka、NATS Streaming 等消息服务。
此外结合 CloudEvent 进行事件统一标准传输,无论对于客户端接入事件源,还是消费端提供的消费事件服务,都能极大的提升了应用的跨平台可移植性。

阿里巴巴云原生关注微服务、Serverless、容器、Service Mesh 等技术领域、聚焦云原生流行技术趋势、云原生大规模的落地实践,做最懂云原生开发者的技术圈。”

相关实践学习
深入解析Docker容器化技术
Docker是一个开源的应用容器引擎,让开发者可以打包他们的应用以及依赖包到一个可移植的容器中,然后发布到任何流行的Linux机器上,也可以实现虚拟化,容器是完全使用沙箱机制,相互之间不会有任何接口。Docker是世界领先的软件容器平台。开发人员利用Docker可以消除协作编码时“在我的机器上可正常工作”的问题。运维人员利用Docker可以在隔离容器中并行运行和管理应用,获得更好的计算密度。企业利用Docker可以构建敏捷的软件交付管道,以更快的速度、更高的安全性和可靠的信誉为Linux和Windows Server应用发布新功能。 在本套课程中,我们将全面的讲解Docker技术栈,从环境安装到容器、镜像操作以及生产环境如何部署开发的微服务应用。本课程由黑马程序员提供。     相关的阿里云产品:容器服务 ACK 容器服务 Kubernetes 版(简称 ACK)提供高性能可伸缩的容器应用管理能力,支持企业级容器化应用的全生命周期管理。整合阿里云虚拟化、存储、网络和安全能力,打造云端最佳容器化应用运行环境。 了解产品详情: https://www.aliyun.com/product/kubernetes
相关文章
|
SQL JavaScript 关系型数据库
MySQL Shell 使用指南
MySQL Shell 是一个强大且灵活的工具,它扩展了 MySQL 客户端的功能,使得数据库管理和运维工作更加便捷高效。
1119 0
Web server failed to start. Port XXX was already in use.【完美解决方案】
Web server failed to start. Port XXX was already in use.【完美解决方案】
Web server failed to start. Port XXX was already in use.【完美解决方案】
|
测试技术 API
WebDriver 中的 driver.close() 和 driver.quit()
【8月更文挑战第27天】
855 5
|
12月前
|
SQL Java 数据库连接
Pagehelper超级好用的分页插件
Pagehelper超级好用的分页插件
2003 0
|
安全 Java Spring
Spring Boot中的环境配置和管理
Spring Boot中的环境配置和管理
|
消息中间件 存储 容灾
AutoMQ 云上十倍成本节约的奥秘: SPOT 实例
AutoMQ Kafka 优化设计,充分利用云基础设施,尤其是成本低廉的Spot实例,实现公有云成本节约。尽管Spot实例的不确定性可能导致服务中断,AutoMQ通过Broker无状态化、快速弹性扩展和Serverless支持,以及应对Spot实例回收的优雅停机和容灾机制,确保了可靠的Kafka服务。混合使用按需实例以保证关键服务稳定,同时在面临Spot实例库存不足时,具备回退到按需实例的能力。AutoMQ Kafka通过创新技术在稳定性与成本之间找到了平衡,为用户提供灵活且经济高效的解决方案。
277 0
AutoMQ 云上十倍成本节约的奥秘: SPOT 实例
|
网络协议 安全 Linux
iptables的原理和使用样例
iptables是Linux系统中用于配置和管理网络包过滤规则的工具。它可以用于设置防火墙、网络地址转换(NAT)以及网络包的源地址、目标地址和端口的过滤等功能。以下是一些iptables的用法示例: 1. 允许特定IP地址的入站连接: ``` iptables -A INPUT -s 192.168.0.1 -j ACCEPT ``` 这条规则将允许来自IP地址为192.168.0.1的主机的所有入站连接。 2. 允许特定端口的入站连接: ``` iptables -A INPUT -p tcp --dport 22 -j ACCEPT ``` 这条规则将允许所有TCP协议的目标端口为2
322 2
|
安全 中间件 C++
【C++运算符重载】运算符重载的艺术与实践:何时使用以及使用示例
【C++运算符重载】运算符重载的艺术与实践:何时使用以及使用示例
321 5
|
开发者 Python
Django的信号机制:实现应用间的通信与响应
【4月更文挑战第15天】Django信号机制实现跨组件通信,基于订阅/发布模式,允许在事件(如模型保存、删除)发生时触发自定义函数。内置信号如`pre_save`、`post_save`,也可自定义信号。使用包括定义信号、连接处理器和触发信号。常用于模型操作监听、第三方应用集成和跨应用通信。注意避免滥用和保证处理器健壮性。信号机制提升代码可维护性和扩展性。
|
C语言
###51单片机学习-----如何通过C语言运用延时函数设计LED流水灯
###51单片机学习-----如何通过C语言运用延时函数设计LED流水灯
522 0