透过 In-memory Channel 看 Knative Eventing 中 Broker/Trigger 工作机制

本文涉及的产品
容器镜像服务 ACR,镜像仓库100个 不限时长
简介: In-memory Channel是当前Knative Eventing中默认的Channel, 也是一般刚接触Knative Eventing首先了解到的Channel。本文通过分析 In-memory Channel 来进一步了解 Knative Eventing 中Broker/Trigger事件处理机制。

In-memory Channel是当前Knative Eventing中默认的Channel, 也是一般刚接触Knative Eventing首先了解到的Channel。本文通过分析 In-memory Channel 来进一步了解 Knative Eventing 中Broker/Trigger事件处理机制。

事件处理概览

我们先整体看一下Knative Eventing 工作机制示意图:
image

通过 namespace 创建默认 Broker 如果不指定Channel,会使用默认的 Inmemory Channel。

下面我们从数据平面开始分析Event事件是如何通过In-memory Channel分发到Knative Service

Ingress

Ingress是事件进入Channel前的第一级过滤,但目前的功能仅仅是接收事件然后转发到Channel。过滤功能处理TODO状态。

func (h *handler) serveHTTP(ctx context.Context, event cloudevents.Event, resp *cloudevents.EventResponse) error {
    tctx := cloudevents.HTTPTransportContextFrom(ctx)
    if tctx.Method != http.MethodPost {
        resp.Status = http.StatusMethodNotAllowed
        return nil
    }

    // tctx.URI is actually the path...
    if tctx.URI != "/" {
        resp.Status = http.StatusNotFound
        return nil
    }

    ctx, _ = tag.New(ctx, tag.Insert(TagBroker, h.brokerName))
    defer func() {
        stats.Record(ctx, MeasureEventsTotal.M(1))
    }()

    send := h.decrementTTL(&event)
    if !send {
        ctx, _ = tag.New(ctx, tag.Insert(TagResult, "droppedDueToTTL"))
        return nil
    }

    // TODO Filter.

    ctx, _ = tag.New(ctx, tag.Insert(TagResult, "dispatched"))
    return h.sendEvent(ctx, tctx, event)
}

In-memory Channel

Broker 字面意思为代理者,那么它代理的是谁呢?是Channel。为什么要代理Channel呢,而不直接发给访问Channel。这个其实涉及到Broker/Trigger设计的初衷:对事件过滤处理。我们知道Channel(消息通道)负责事件传递,Subscription(订阅)负责订阅事件,通常这二者的模型如下:
image

这里就涉及到消息队列和订阅分发的实现。那么在In-memory Channel中如何实现的呢?
其实 In-memory 的核心处理在Fanout Handler中,它负责将接收到的事件分发到不同的 Subscription。
In-memory Channel处理示意图:
image

事件接收并分发核心代码如下:

func createReceiverFunction(f *Handler) func(provisioners.ChannelReference, *provisioners.Message) error {
    return func(_ provisioners.ChannelReference, m *provisioners.Message) error {
        if f.config.AsyncHandler {
            go func() {
                // Any returned error is already logged in f.dispatch().
                _ = f.dispatch(m)
            }()
            return nil
        }
        return f.dispatch(m)
    }
}

当前分发机制默认是异步机制(可通过AsyncHandler参数控制分发机制)。

消息分发机制:

// dispatch takes the request, fans it out to each subscription in f.config. If all the fanned out
// requests return successfully, then return nil. Else, return an error.
func (f *Handler) dispatch(msg *provisioners.Message) error {
    errorCh := make(chan error, len(f.config.Subscriptions))
    for _, sub := range f.config.Subscriptions {
        go func(s eventingduck.SubscriberSpec) {
            errorCh <- f.makeFanoutRequest(*msg, s)
        }(sub)
    }

    for range f.config.Subscriptions {
        select {
        case err := <-errorCh:
            if err != nil {
                f.logger.Error("Fanout had an error", zap.Error(err))
                return err
            }
        case <-time.After(f.timeout):
            f.logger.Error("Fanout timed out")
            return errors.New("fanout timed out")
        }
    }
    // All Subscriptions returned err = nil.
    return nil
}

通过这里的代码,我们可以看到分发处理超时机制。默认为60s。也就是说如果分发的请求响应超过60s,那么In-memory会报错:Fanout timed out。

Filter

一般的消息分发会将消息发送给订阅的服务,但在 Broker/Trigger 模型中需要对事件进行过滤处理,这个处理的地方就是在Filter 中。

  • 根据请求获取Trigger信息。Filter中会根据请求的信息拿到 Trigger 名称,然后通过获取Trigger对应的资源信息拿到过滤规则
  • 根据Trigger 过滤规则进行事件的过滤处理
  • 最后将满足过滤规则的分发到Kservice

其中过滤规则处理代码如下:

func (r *Receiver) shouldSendMessage(ctx context.Context, ts *eventingv1alpha1.TriggerSpec, event *cloudevents.Event) bool {
    if ts.Filter == nil || ts.Filter.SourceAndType == nil {
        r.logger.Error("No filter specified")
        ctx, _ = tag.New(ctx, tag.Upsert(TagFilterResult, "empty-fail"))
        return false
    }

    // Record event count and filtering time
    startTS := time.Now()
    defer func() {
        filterTimeMS := int64(time.Now().Sub(startTS) / time.Millisecond)
        stats.Record(ctx, MeasureTriggerFilterTime.M(filterTimeMS))
    }()

    filterType := ts.Filter.SourceAndType.Type
    if filterType != eventingv1alpha1.TriggerAnyFilter && filterType != event.Type() {
        r.logger.Debug("Wrong type", zap.String("trigger.spec.filter.sourceAndType.type", filterType), zap.String("event.Type()", event.Type()))
        ctx, _ = tag.New(ctx, tag.Upsert(TagFilterResult, "fail"))
        return false
    }
    filterSource := ts.Filter.SourceAndType.Source
    s := event.Context.AsV01().Source
    actualSource := s.String()
    if filterSource != eventingv1alpha1.TriggerAnyFilter && filterSource != actualSource {
        r.logger.Debug("Wrong source", zap.String("trigger.spec.filter.sourceAndType.source", filterSource), zap.String("message.source", actualSource))
        ctx, _ = tag.New(ctx, tag.Upsert(TagFilterResult, "fail"))

        return false
    }

    ctx, _ = tag.New(ctx, tag.Upsert(TagFilterResult, "pass"))
    return true
}

当前的机制是所有的订阅事件都会通过 Filter 集中进行事件过滤,如果一个Broker有大量的订阅Trigger,是否会给Filter带来性能上的压力? 这个在实际场景 Broker/Trigger 的运用中需要考虑到这个问题。

结论

作为内置的默认Channel实现,In-memory 可以说很好的完成了事件接收并转发的使命,并且 Knative Eventing 在接下来的迭代中会支持部署时指定设置默认的Channel。有兴趣的同学可以持续关注一下。

欢迎加入 Knative 交流群

image

相关实践学习
使用ACS算力快速搭建生成式会话应用
阿里云容器计算服务 ACS(Container Compute Service)以Kubernetes为使用界面,采用Serverless形态提供弹性的算力资源,使您轻松高效运行容器应用。本文将指导您如何通过ACS控制台及ACS集群证书在ACS集群中快速部署并公开一个容器化生成式AI会话应用,并监控应用的运行情况。
深入解析Docker容器化技术
Docker是一个开源的应用容器引擎,让开发者可以打包他们的应用以及依赖包到一个可移植的容器中,然后发布到任何流行的Linux机器上,也可以实现虚拟化,容器是完全使用沙箱机制,相互之间不会有任何接口。Docker是世界领先的软件容器平台。开发人员利用Docker可以消除协作编码时“在我的机器上可正常工作”的问题。运维人员利用Docker可以在隔离容器中并行运行和管理应用,获得更好的计算密度。企业利用Docker可以构建敏捷的软件交付管道,以更快的速度、更高的安全性和可靠的信誉为Linux和Windows Server应用发布新功能。 在本套课程中,我们将全面的讲解Docker技术栈,从环境安装到容器、镜像操作以及生产环境如何部署开发的微服务应用。本课程由黑马程序员提供。 &nbsp; &nbsp; 相关的阿里云产品:容器服务 ACK 容器服务 Kubernetes 版(简称 ACK)提供高性能可伸缩的容器应用管理能力,支持企业级容器化应用的全生命周期管理。整合阿里云虚拟化、存储、网络和安全能力,打造云端最佳容器化应用运行环境。 了解产品详情: https://www.aliyun.com/product/kubernetes
目录
相关文章
|
存储 监控 安全
推荐5款极具效率的实用工具软件
每次分享实用的软件,都会给人一种踏实和喜悦的感觉,这也是我热衷于搜集和推荐高效工具软件的原因。
397 1
|
机器学习/深度学习 算法 测试技术
Python中实现多层感知机(MLP)的深度学习模型
Python中实现多层感知机(MLP)的深度学习模型
829 0
|
安全 调度 数据安全/隐私保护
PCIe访问控制服务(ACS)
PCIe访问控制服务(ACS)
6879 0
PCIe访问控制服务(ACS)
|
5月前
|
供应链 安全 Go
Go Modules 详解 -《Go语言实战指南》
Go Modules 是 Go 语言官方推出的依赖管理工具,自 Go 1.11 起引入,Go 1.16 成为默认方式。它解决了第三方依赖版本控制、项目脱离 GOPATH 限制及多模块管理等问题。本文全面讲解了 Go Modules 的基本原理、初始化方法、常用命令(如 `go mod init`、`go get` 等)、依赖管理(添加/升级/删除)、子模块开发以及常见问题排查,帮助开发者高效使用 Go Modules 进行项目管理。
|
6月前
|
数据可视化 Rust 机器学习/深度学习
mlop.ai 无脑使用教程 (机器学习工具 WandB/ClearML 的首个国区开源平替)
mlop.ai 是首个为国区用户优化的机器学习工具,全栈免费开源,是主流付费解决方案 ClearML/WandB 的开源平替。常规实验追踪的工具经常大幅人为降速,mlop因为底层为Rust代码,能轻松支持高频数据写入。如需更多开发者帮助或企业支持,敬请联系cn@mlop.ai
365 12
mlop.ai 无脑使用教程 (机器学习工具 WandB/ClearML 的首个国区开源平替)
|
Java
用java实现Client和Server之间的互相通信
本文介绍了如何使用Java实现客户端和服务器之间的通信,包括服务器端创建ServerSocket、接受客户端连接、读取和发送消息,以及客户端创建Socket连接、发送和接收消息的完整过程。
444 1
用java实现Client和Server之间的互相通信
|
NoSQL Redis 数据库
Redis 从入门到精通之Redis事务实现原理
Redis 通过 MULTI 、 DISCARD 、 EXEC 和 WATCH 四个命令来实现事务功能,本章首先讨论使用 MULTI 、 DISCARD 和 EXEC 三个命令实现的一般事务,然后再来讨论带有 WATCH 的事务的实现。因为事务的安全性也非常重要,所以本章最后通过常见的 ACID 性质对 Redis 事务的安全性进行了说明
850 91
Redis 从入门到精通之Redis事务实现原理
|
Linux 网络安全 开发者
深入探索Linux命令:`dmesg`
`dmesg`是Linux命令,用于显示和控制内核环形缓冲区的系统消息,包含驱动加载、硬件错误和启动消息。基本用法包括:无参数运行以显示所有内核消息,使用`-c`清除消息,`-n`限制显示数量,以及`-T`按时间戳排序。适用于系统启动诊断、硬件问题排查和内核模块调试。注意,内核消息可能因新消息覆盖而丢失,及时查看很重要。
|
JSON Java 数据格式
JAVA获取GET和POST请求参数
JAVA获取GET和POST请求参数
2387 0
|
监控 固态存储 安全
源码剖析:Elasticsearch 段合并调度及优化手段
源码剖析:Elasticsearch 段合并调度及优化手段