Knative Eventing 之 Parallel 介绍

简介: 从 Knative Eventing 0.8 开始,支持根据不同的过滤条件对事件进行选择处理。通过 Parallel 提供了这样的能力。本文就给大家介绍一下这个特性。

从 Knative Eventing 0.8 开始,支持根据不同的过滤条件对事件进行选择处理。通过 Parallel 提供了这样的能力。本文就给大家介绍一下这个特性。

资源定义

我们先看一下 Parallel 资源定义,典型的 Parallel Spec描述如下:

apiVersion: messaging.knative.dev/v1alpha1
kind: Parallel
metadata:
  name: me-odd-even-parallel
spec:
  channelTemplate:
    apiVersion: messaging.knative.dev/v1alpha1
    kind: InMemoryChannel
  cases:
    - filter:
        uri: "http://me-even-odd-switcher.default.svc.cluster.local/0"
      subscriber:
        ref:
          apiVersion: serving.knative.dev/v1alpha1
          kind: Service
          name: me-even-transformer
    - filter:
        uri: "http://me-even-odd-switcher.default.svc.cluster.local/1"
      subscriber:
        ref:
          apiVersion: serving.knative.dev/v1alpha1
          kind: Service
          name: me-odd-transformer
  reply:
    apiVersion: serving.knative.dev/v1alpha1
    kind: Service
    name: me-event-display

主要包括如下 3 部分:

  • cases 定义了一系列 filter 和 subscriber。对于每个条件分支:

    • 首先判断 filter, 当返回事件时,调用 subscriber。filter和subscriber要求都是可访问的。
    • subscriber 执行返回的事件会发生到 reply。如果 reply 为空,则发送到 spec.reply
  • channelTemplate 定义了当前 Parallel 中使用的Channel类型
  • reply 定义了全局响应的目标函数。

逻辑架构如图所示:
image

代码实现

关键代码实现如下:

  1. 首先为 Parallel 创建一个全局的 Channel。然后为每一个case创建一个过滤 Channel
  2. 在每个case中做了如下处理:

    • 为全局的 Channel创建一个 Subscription,订阅条件为filter信息,并且把 reply 响应发送给当前 case中的过滤 Channel
    • 为过滤 Channel 创建一个 Subscription,将订阅信息发送给每个case中的 Reply。如果当前case中没有设置Reply,则发送的全局Reply
func (r *Reconciler) reconcile(ctx context.Context, p *v1alpha1.Parallel) error {
    p.Status.InitializeConditions()

    // Reconciling parallel is pretty straightforward, it does the following things:
    // 1. Create a channel fronting the whole parallel and one filter channel per branch.
    // 2. For each of the Branches:
    //     2.1 create a Subscription to the fronting Channel, subscribe the filter and send reply to the filter Channel
    //     2.2 create a Subscription to the filter Channel, subcribe the subscriber and send reply to
    //         either the branch Reply. If not present, send reply to the global Reply. If not present, do not send reply.
    // 3. Rinse and repeat step #2 above for each branch in the list
    if p.DeletionTimestamp != nil {
        // Everything is cleaned up by the garbage collector.
        return nil
    }

    channelResourceInterface := r.DynamicClientSet.Resource(duckroot.KindToResource(p.Spec.ChannelTemplate.GetObjectKind().GroupVersionKind())).Namespace(p.Namespace)

    if channelResourceInterface == nil {
        msg := fmt.Sprintf("Unable to create dynamic client for: %+v", p.Spec.ChannelTemplate)
        logging.FromContext(ctx).Error(msg)
        return errors.New(msg)
    }

    // Tell tracker to reconcile this Parallel whenever my channels change.
    track := r.resourceTracker.TrackInNamespace(p)

    var ingressChannel *duckv1alpha1.Channelable
    channels := make([]*duckv1alpha1.Channelable, 0, len(p.Spec.Branches))
    for i := -1; i < len(p.Spec.Branches); i++ {
        var channelName string
        if i == -1 {
            channelName = resources.ParallelChannelName(p.Name)
        } else {
            channelName = resources.ParallelBranchChannelName(p.Name, i)
        }

        c, err := r.reconcileChannel(ctx, channelName, channelResourceInterface, p)
        if err != nil {
            logging.FromContext(ctx).Error(fmt.Sprintf("Failed to reconcile Channel Object: %s/%s", p.Namespace, channelName), zap.Error(err))
            return err

        }
        // Convert to Channel duck so that we can treat all Channels the same.
        channelable := &duckv1alpha1.Channelable{}
        err = duckapis.FromUnstructured(c, channelable)
        if err != nil {
            logging.FromContext(ctx).Error(fmt.Sprintf("Failed to convert to Channelable Object: %s/%s", p.Namespace, channelName), zap.Error(err))
            return err

        }
        // Track channels and enqueue parallel when they change.
        if err = track(utils.ObjectRef(channelable, channelable.GroupVersionKind())); err != nil {
            logging.FromContext(ctx).Error("Unable to track changes to Channel", zap.Error(err))
            return err
        }
        logging.FromContext(ctx).Info(fmt.Sprintf("Reconciled Channel Object: %s/%s %+v", p.Namespace, channelName, c))

        if i == -1 {
            ingressChannel = channelable
        } else {
            channels = append(channels, channelable)
        }
    }
    p.Status.PropagateChannelStatuses(ingressChannel, channels)

    filterSubs := make([]*v1alpha1.Subscription, 0, len(p.Spec.Branches))
    subs := make([]*v1alpha1.Subscription, 0, len(p.Spec.Branches))
    for i := 0; i < len(p.Spec.Branches); i++ {
        filterSub, sub, err := r.reconcileBranch(ctx, i, p)
        if err != nil {
            return fmt.Errorf("Failed to reconcile Subscription Objects for branch: %d : %s", i, err)
        }
        subs = append(subs, sub)
        filterSubs = append(filterSubs, filterSub)
        logging.FromContext(ctx).Debug(fmt.Sprintf("Reconciled Subscription Objects for branch: %d: %+v, %+v", i, filterSub, sub))
    }
    p.Status.PropagateSubscriptionStatuses(filterSubs, subs)

    return nil
}

示例演示

接下来让我们通过一个实例具体了解一下 Parallel 。通过CronJobSource产生事件发送给 me-odd-even-parallel Parallel, Parallel 会将事件发送给每个case, Case中通过 filter 不同的参数访问 me-even-odd-switcher服务, me-even-odd-switcher服务会根据当前事件的创建时间随机计算0或1的值,如果计算值和请求参数值相匹配,则返回事件,否则不返回事件。

  • http://me-even-odd-switcher.default.svc.cluster.local/0匹配成功,返回事件到me-even-transformer服务进行处理
  • http://me-even-odd-switcher.default.svc.cluster.local/1匹配成功,返回事件到odd-transformer服务进行处理

不管哪个case处理完之后,将最终的事件发送给me-event-display服务进行事件显示。
具体操作步骤如下:

创建 Knative Service

apiVersion: serving.knative.dev/v1alpha1
kind: Service
metadata:
  name: me-even-odd-switcher
spec:
  template:
    spec:
      containers:
      - image: villardl/switcher-nodejs:0.1
        env:
        - name: EXPRESSION
          value: Math.round(Date.parse(event.time) / 60000) % 2
        - name: CASES
          value: '[0, 1]'
---
apiVersion: serving.knative.dev/v1alpha1
kind: Service
metadata:
  name: even-transformer
spec:
  template:
    spec:
      containers:
      - image: villardl/transformer-nodejs:0.1
        env:
        - name: TRANSFORMER
          value: |
            ({"message": "we are even!"})

---
apiVersion: serving.knative.dev/v1alpha1
kind: Service
metadata:
  name: odd-transformer
spec:
  template:
    spec:
      containers:
      - image: villardl/transformer-nodejs:0.1
        env:
        - name: TRANSFORMER
          value: |
            ({"message": "this is odd!"})
.

创建 Parallel

apiVersion: messaging.knative.dev/v1alpha1
kind: Parallel
metadata:
  name: me-odd-even-parallel
spec:
  channelTemplate:
    apiVersion: messaging.knative.dev/v1alpha1
    kind: InMemoryChannel
  cases:
    - filter:
        uri: "http://me-even-odd-switcher.default.svc.cluster.local/0"
      subscriber:
        ref:
          apiVersion: serving.knative.dev/v1alpha1
          kind: Service
          name: me-even-transformer
    - filter:
        uri: "http://me-even-odd-switcher.default.svc.cluster.local/1"
      subscriber:
        ref:
          apiVersion: serving.knative.dev/v1alpha1
          kind: Service
          name: me-odd-transformer
  reply:
    apiVersion: serving.knative.dev/v1alpha1
    kind: Service
    name: me-event-display

创建 CronJobSource 数据源

apiVersion: sources.eventing.knative.dev/v1alpha1
kind: CronJobSource
metadata:
  name: me-cronjob-source
spec:
  schedule: "*/1 * * * *"
  data: '{"message": "Even or odd?"}'
  sink:
    apiVersion: messaging.knative.dev/v1alpha1
    kind: Parallel
    name: me-odd-even-parallel

查看结果

运行之后可以看到类似如下结果:

kubectl logs -l serving.knative.dev/service=me-event-display --tail=30 -c user-container

️  cloudevents.Event
Validation: valid
Context Attributes,
  specversion: 0.3
  type: dev.knative.cronjob.event
  source: /apis/v1/namespaces/default/cronjobsources/me-cronjob-source
  id: 48eea348-8cfd-4aba-9ead-cb024ce16a48
  time: 2019-07-31T20:56:00.000477587Z
  datacontenttype: application/json; charset=utf-8
Extensions,
  knativehistory: me-odd-even-parallel-kn-parallel-kn-channel.default.svc.cluster.local, me-odd-even-parallel-kn-parallel-0-kn-channel.default.svc.cluster.local
Data,
  {
    "message": "we are even!"
  }
️  cloudevents.Event
Validation: valid
Context Attributes,
  specversion: 0.3
  type: dev.knative.cronjob.event
  source: /apis/v1/namespaces/default/cronjobsources/me-cronjob-source
  id: 42717dcf-b194-4b36-a094-3ea20e565ad5
  time: 2019-07-31T20:57:00.000312243Z
  datacontenttype: application/json; charset=utf-8
Extensions,
  knativehistory: me-odd-even-parallel-kn-parallel-1-kn-channel.default.svc.cluster.local, me-odd-even-parallel-kn-parallel-kn-channel.default.svc.cluster.local
Data,
  {
    "message": "this is odd!"
  }

结论

通过上面的介绍,相信大家对 Parallel 如何进行事件条件处理有了更多的了解。有兴趣的同学可以一起交流。

欢迎加入 Knative 交流群

image

相关实践学习
使用ACS算力快速搭建生成式会话应用
阿里云容器计算服务 ACS(Container Compute Service)以Kubernetes为使用界面,采用Serverless形态提供弹性的算力资源,使您轻松高效运行容器应用。本文将指导您如何通过ACS控制台及ACS集群证书在ACS集群中快速部署并公开一个容器化生成式AI会话应用,并监控应用的运行情况。
深入解析Docker容器化技术
Docker是一个开源的应用容器引擎,让开发者可以打包他们的应用以及依赖包到一个可移植的容器中,然后发布到任何流行的Linux机器上,也可以实现虚拟化,容器是完全使用沙箱机制,相互之间不会有任何接口。Docker是世界领先的软件容器平台。开发人员利用Docker可以消除协作编码时“在我的机器上可正常工作”的问题。运维人员利用Docker可以在隔离容器中并行运行和管理应用,获得更好的计算密度。企业利用Docker可以构建敏捷的软件交付管道,以更快的速度、更高的安全性和可靠的信誉为Linux和Windows Server应用发布新功能。 在本套课程中,我们将全面的讲解Docker技术栈,从环境安装到容器、镜像操作以及生产环境如何部署开发的微服务应用。本课程由黑马程序员提供。 &nbsp; &nbsp; 相关的阿里云产品:容器服务 ACK 容器服务 Kubernetes 版(简称 ACK)提供高性能可伸缩的容器应用管理能力,支持企业级容器化应用的全生命周期管理。整合阿里云虚拟化、存储、网络和安全能力,打造云端最佳容器化应用运行环境。 了解产品详情: https://www.aliyun.com/product/kubernetes
目录
相关文章
|
测试技术
软件测试的艺术:探索式测试的实践与思考
在软件开发的广阔海洋中,测试是确保航船稳健行驶的关键。本文将带你领略探索式测试的魅力,一种结合创造性思维和严格方法论的测试方式。我们将一起揭开探索式测试的神秘面纱,了解其核心概念、实施步骤和带来的效益。通过实际代码示例,你将学会如何将探索式测试融入日常的软件质量保证流程中,提升测试效率与质量。
|
Prometheus 监控 Kubernetes
Prometheus 在微服务架构中的应用
【8月更文第29天】随着微服务架构的普及,监控和跟踪各个服务的状态变得尤为重要。Prometheus 是一个开源的监控系统和时间序列数据库,非常适合用于微服务架构中的监控。本文将详细介绍 Prometheus 如何支持微服务架构下的监控需求,包括服务发现、服务间的监控指标收集以及如何配置 Prometheus 来适应这些需求。
585 1
|
存储 安全 Linux
Podman入门全指南:安装、配置与运行容器
Podman入门全指南:安装、配置与运行容器
10776 1
|
开发者 人工智能 自然语言处理
欢迎使用通义灵码
灵码使用指南!一键收藏。
145650 31
|
11月前
|
机器学习/深度学习 算法 数据安全/隐私保护
基于GA遗传优化TCN-LSTM时间卷积神经网络时间序列预测算法matlab仿真
本项目基于MATLAB 2022a实现了一种结合遗传算法(GA)优化的时间卷积神经网络(TCN)时间序列预测算法。通过GA全局搜索能力优化TCN超参数(如卷积核大小、层数等),显著提升模型性能,优于传统GA遗传优化TCN方法。项目提供完整代码(含详细中文注释)及操作视频,运行后无水印效果预览。 核心内容包括:1) 时间序列预测理论概述;2) TCN结构(因果卷积层与残差连接);3) GA优化流程(染色体编码、适应度评估等)。最终模型在金融、气象等领域具备广泛应用价值,可实现更精准可靠的预测结果。
|
JavaScript 前端开发 vr&ar
如何在 JavaScript 中对字符串进行索引、拆分和操作
如何在 JavaScript 中对字符串进行索引、拆分和操作
262 0
|
Kubernetes Docker 容器
registry.aliyuncs.com/google_containers这个镜像仓库都有啥镜像
registry.aliyuncs.com/google_containers这个镜像仓库都有啥镜像
4346 1
|
NoSQL MongoDB 关系型数据库
13个Mongodb GUI可视化管理工具,总有一款适合你
本文介绍了13个好用的MongoDB可视化工具。Robomongo,MongoDB Compass,phpMoAdmin等
115719 0
13个Mongodb GUI可视化管理工具,总有一款适合你
|
设计模式 Java 关系型数据库
【阿里规约】阿里开发手册解读——命名规范篇
本文中所有代码命名规范遵循《阿里规约》,从包名、类名、变量名等角度展开,详细阐述测试类、枚举类、数组、布尔型变量、方法等元素的命名规范。
【阿里规约】阿里开发手册解读——命名规范篇
anaconda下载安装,镜像源配置修改及虚拟环境的创建
这篇文章介绍了Anaconda的下载安装过程,包括Anaconda的简介、安装步骤、配置修改、创建虚拟环境以及一些常用命令的使用方法。文章还提供了如何修改conda的镜像源为国内镜像源以加速下载的步骤。
anaconda下载安装,镜像源配置修改及虚拟环境的创建