定义无处不在的 Event 事件之 CloudEvents

简介: Event 事件无处不在,然而如果没有一套事件统一的定义标准,那么对于事件处理的开发者来说无疑是痛苦的。CloudEvent 的出现统一了事件的标准,本篇文章简要介绍了这一标准规范协议,以及实际场景中使用的方式。

背景

Event 事件无处不在,然而每个事件提供者产生的事件各部相同。由于缺乏事件的统一描述,对于事件的开发者来说需要不断的重复学习如何消费不同类型的事件。这也限制了类库、工具和基础设施在跨环境(如 SDK、事件路由或跟踪系统)提供事件数据方面的潜力。从事件数据本身实现的可移植性和生产力上受到了阻碍。

什么是 CloudEvents

CloudEvents 是一种规范,用于以通用格式描述事件数据,以提供跨服务、平台和系统的交互能力。
事件格式指定了如何使用某些编码格式来序列化 CloudEvent。支持这些编码的兼容 CloudEvents 实现必须遵循在相应的事件格式中指定的编码规则。所有实现都必须支持 JSON 格式。

协议规范

命名规范

CloudEvents 属性名称必须由 ASCII 字符集的小写字母(“a”到“z”)或数字(“0”到“9”)组成,并且必须以小写字母开头。属性名称应具有描述性和简洁性,长度不应超过20个字符。

术语定义

本规范定义如下术语:

  • Occurrence: “Occurrence”是指在软件系统运行期间捕获描述信息。
  • Event: "Event" 是表示事件及其上下文的数据记录。
  • Context: Context 表示上下文,元数据将封装在 Context 属性中。应用程序代码可以使用这些信息来标识事件与系统或其他事件之间的关系。
  • Data: 实际事件中有效信息载荷。
  • Message: 事件通过 Message 从数据源传输到目标地址。
  • Protocol: 消息可以通过各种行业标准协议(如http、amqp、mqtt、smtp)、开源协议(如kafka、nats)或平台/供应商特定协议(aws-kineis、azure-event-grid)进行传递。

上下文属性(Context Attributes)

符合本规范的每个 CloudEvent 必须包括根据需要指定的上下文属性,并且可以包括一个或多个可选的上下文属性。
参考示例:

  specversion: 0.2
  type: dev.knative.k8s.event
  source: /apis/serving.knative.dev/v1alpha1/namespaces/default/routes/sls-cloudevent
  id: 269345ff-7d0a-11e9-b1f1-00163f005e02
  time: 2019-05-23T03:23:36Z
  contenttype: application/json
  • type: 事件类型, 通常此属性用于路由、监控、安全策略等。
  • specversion: 表示CloudEvents 规范的版本。引用 0.2 版本的规范时,事件生产者必须使用 0.2 设置此值。
  • source:表示事件的产生者, 也就是事件源。
  • id: 事件的 id
  • time: 事件的产生时间
  • contenttype: 表示Data 的数据内容格式

扩展属性(Extension Attributes)

CloudEvents 生产者可以在事件中包含其他上下文属性,这些属性可能用于与事件处理相关的辅助操作。

Data

正如术语Data所定义的,CloudEvents 产生具体事件的内容信息封装在数据属性中。例如,KubernetesEventSource所产生的 CloudEvent 的Data信息如下:

data:
  {
    "metadata": {
      "name": "event-display.15a0a2b54007189b",
      "namespace": "default",
      "selfLink": "/api/v1/namespaces/default/events/event-display.15a0a2b54007189b",
      "uid": "9195ff11-7b9b-11e9-b1f1-00163f005e02",
      "resourceVersion": "18070551",
      "creationTimestamp": "2019-05-21T07:39:30Z"
    },
    "involvedObject": {
      "kind": "Route",
      "namespace": "default",
      "name": "event-display",
      "uid": "31c68419-675b-11e9-a087-00163e08f3bc",
      "apiVersion": "serving.knative.dev/v1alpha1",
      "resourceVersion": "9242540"
    },
    "reason": "InternalError",
    "message": "Operation cannot be fulfilled on clusteringresses.networking.internal.knative.dev \"route-31c68419-675b-11e9-a087-00163e08f3bc\": the object has been modified; please apply your changes to the latest version and try again",
    "source": {
      "component": "route-controller"
    },
    "firstTimestamp": "2019-05-21T07:39:30Z",
    "lastTimestamp": "2019-05-26T07:10:51Z",
    "count": 5636,
    "type": "Warning",
    "eventTime": null,
    "reportingComponent": "",
    "reportingInstance": ""
  }

实现

以 Go SDK 实现 CloudEvent 0.2 规范为例:

事件接收服务

  • 导入cloudevents
import "github.com/cloudevents/sdk-go"

-通过 HTTP 协议接收 CloudEvent 事件

func Receive(event cloudevents.Event) {
    fmt.Printf("cloudevents.Event\n%s", event.String())
}

func main() {
    c, err := cloudevents.NewDefaultClient()
    if err != nil {
        log.Fatalf("failed to create client, %v", err)
    }
    log.Fatal(c.StartReceiver(context.Background(), Receive));
}

事件发送服务

  • 创建一个基于 0.2 协议的 CloudEvents 事件
event := cloudevents.NewEvent()
event.SetID("ABC-123")
event.SetType("com.cloudevents.readme.sent")
event.SetSource("http://localhost:8080/")
event.SetData(data)
  • 通过HTTP协议发送这个CloudEvent
t, err := cloudevents.NewHTTPTransport(
    cloudevents.WithTarget("http://localhost:8080/"),
    cloudevents.WithEncoding(cloudevents.HTTPBinaryV02),
)
if err != nil {
    panic("failed to create transport, " + err.Error())
}

c, err := cloudevents.NewClient(t)
if err != nil {
    panic("unable to create cloudevent client: " + err.Error())
}
if err := c.Send(ctx, event); err != nil {
    panic("failed to send cloudevent: " + err.Error())
}

这样我们就通过 Go SDK 方式实现了 CloudEvent 事件的发送和接收

相关实践学习
使用ACS算力快速搭建生成式会话应用
阿里云容器计算服务 ACS(Container Compute Service)以Kubernetes为使用界面,采用Serverless形态提供弹性的算力资源,使您轻松高效运行容器应用。本文将指导您如何通过ACS控制台及ACS集群证书在ACS集群中快速部署并公开一个容器化生成式AI会话应用,并监控应用的运行情况。
深入解析Docker容器化技术
Docker是一个开源的应用容器引擎,让开发者可以打包他们的应用以及依赖包到一个可移植的容器中,然后发布到任何流行的Linux机器上,也可以实现虚拟化,容器是完全使用沙箱机制,相互之间不会有任何接口。Docker是世界领先的软件容器平台。开发人员利用Docker可以消除协作编码时“在我的机器上可正常工作”的问题。运维人员利用Docker可以在隔离容器中并行运行和管理应用,获得更好的计算密度。企业利用Docker可以构建敏捷的软件交付管道,以更快的速度、更高的安全性和可靠的信誉为Linux和Windows Server应用发布新功能。 在本套课程中,我们将全面的讲解Docker技术栈,从环境安装到容器、镜像操作以及生产环境如何部署开发的微服务应用。本课程由黑马程序员提供。     相关的阿里云产品:容器服务 ACK 容器服务 Kubernetes 版(简称 ACK)提供高性能可伸缩的容器应用管理能力,支持企业级容器化应用的全生命周期管理。整合阿里云虚拟化、存储、网络和安全能力,打造云端最佳容器化应用运行环境。 了解产品详情: https://www.aliyun.com/product/kubernetes
目录
相关文章
|
Java 程序员 API
Java并发基础:concurrent Flow API全面解析
java.util.concurrent.Flow定义了响应式编程的核心接口,促进了Java在异步数据处理和背压机制方面的标准化,这使得第三方库如Reactor和RxJava能够基于这些接口提供丰富的实现和功能,同时简化了响应式编程在Java中的使用,Flow API增强了Java在并发编程领域的灵活性,使得处理异步数据流变得更加自然和高效。
849 0
Java并发基础:concurrent Flow API全面解析
|
Prometheus Kubernetes 负载均衡
Opentelemetry collector用法
本文详细介绍了Opentelemetry Collector的使用方法及其各个组件(receiver、processor、exporter、connector和服务配置)的功能与配置。Collector的核心组件通过官方仓库提供丰富的实现,涵盖了认证、健康监控等功能。
2359 63
Opentelemetry collector用法
|
SQL 关系型数据库 MySQL
MySQL:CTE 通用表达式
CTE(通用表表达式)为处理复杂查询提供了强大的工具。通过普通CTE,可以简化查询逻辑,提高可读性;通过递归CTE,可以优雅地处理层级结构数据。掌握CTE的使用,对于提升SQL查询能力和优化数据库操作有着重要意义。希望本文能帮助你更好地理解和使用MySQL中的CTE,提高工作效率和代码质量。
482 7
|
消息中间件 人工智能 Kubernetes
解密开源Serverless容器框架:事件驱动篇
Knative是一款基于Kubernetes的开源Serverless框架,提供了云原生、跨平台的Serverless编排标准。作为Serverless中必不可少的事件驱动能力,Knative Eventing提供了云原生的事件驱动能力。
|
缓存 NoSQL 应用服务中间件
高性能软件负载OpenResty整合Reids集群配置
高性能软件负载OpenResty整合Reids集群配置
775 1
|
Oracle Java 关系型数据库
EMT4J——让 Java 应用升级更轻松
EMT4J 是什么?如何使用 EMT4J 工具进行 Java 应用升级?
EMT4J——让 Java 应用升级更轻松
|
NoSQL Java Spring
教程:Spring Boot与ETCD键值存储的整合
教程:Spring Boot与ETCD键值存储的整合
|
消息中间件 测试技术 领域建模
DDD - 一文读懂DDD领域驱动设计
DDD - 一文读懂DDD领域驱动设计
49520 6
|
设计模式 供应链 数据可视化
DDD - 事件风暴从理论到落地
DDD - 事件风暴从理论到落地
1035 1