使用 eKuiper 按需桥接 CAN Bus 数据至 MQTT

简介: 本文将通过使用开源边缘流式 SQL 引擎 eKuiper ,灵活地从 CAN Bus 提取有意义的数据和所需的信号,实现从 CAN Bus 到 MQTT 的无缝桥接。

CAN Bus 是一种广泛应用于汽车和工业领域的通信协议,它能够让多个设备在同一网络中进行交互。而 MQTT 是一种广泛应用于物联网领域的通信协议,作为一种轻量级的发布-订阅消息传输协议,它有效地促进了机器之间的通信。

通过将 CAN Bus 数据桥接到 MQTT,能够实现 CAN Bus 设备与物联网平台和应用的集成。尽管市场上存在多种解决方案和工具可以实现这一目标,但它们通常只传输原始的二进制 CAN 数据,这导致对信号进行过滤和处理非常不方便。

在本文中,我们将介绍一种全新的解决方案,通过使用 开源边缘流式 SQL 引擎 eKuiper,灵活地从 CAN Bus 提取有意义的数据和所需的信号,实现从 CAN Bus 到 MQTT 的无缝桥接。

CAN Bus 的工作原理

CAN Bus 是一种通信系统,它能够让车辆中的不同设备相互传递数据。它还能提供许多有用的车辆信息,例如速度、油量、发动机温度和诊断码等。然而,从 CAN Bus 中获取和解读这些信息并不是一件容易的事情,因为它们通常以二进制形式进行存储。

深入了解 CAN Bus ,欢迎阅读:车联网 CAN Bus 协议介绍与数据实时流处理

CAN 帧

我们可以从 CAN Bus 接收 CAN 帧流,其中包含我们感兴趣的二进制形式的信号。每个 CAN 帧都包含 ID、数据长度码(DLC)和有效载荷。

  • ID 用来标识帧中数据的类型。
  • DLC 用来指定帧中数据的字节数。
  • 有效载荷是帧中携带的实际数据。

CAN 协议有多种类型,它们在 ID 和有效载荷长度的定义上略有不同。下面是一个 CAN 2.0A 帧的例子,其 ID 为 11 位,有效载荷长度最多为 8 字节。

CAN 2.0A 帧例子

有效载荷由一系列信号组成。每个信号都有名称、长度和值。

  • 长度是信号在有效载荷里占用的位数。
  • 值是信号里包含的实际数据。

为了把二进制数据转换成有意义的信息,我们需要提取这些信号。

信号提取

CAN 数据库(DBC)是一个文本文件,用于描述 CAN 帧有效载荷中信号的组织方式。它相当于一个字典,提供了每个信号的名称、长度和值的计算方法,这样我们就可以通过 CAN 帧进行通信。

下面是 DBC 文件的一段内容。它定义了一个 ID 为 544,DLC 为 8 的 CAN 帧。该帧包含 5 个信号,每个信号都有名称、长度和值。例如,信号 EngineSpeed 的长度为 16 位,值的范围是 0 到 16383.75。信号的值是通过把原始数据乘以 0.25 再加上 0 来计算得出。

BO_ 544 EMS_220h: 8 EMS
SG_ EngineSpeed : 0|16@1+ (0.25,0) [0|16383.75] "rpm" Vector__XXX
SG_ CurrentEngineTorque : 16|16@1+ (0.25,-500) [-500|1547.5] "Nm" Vector__XXX
SG_ DriverRequestTorque : 32|16@1+ (0.25,-500) [-500|1547.5] "Nm" Vector__XXX
SG_ CurrentEngineTorqueStatus : 48|1@1+ (1,0) [0|1] "" Vector__XXX
SG_ DriverRequestTorqueStatus : 49|1@1+ (1,0) [0|1] "" Vector__XXX

CAN 帧的解码流程如下:

CAN 帧解码流程

完成解码后,我们可以得知发动机的转速为每分钟 1000 转。然而通过编写应用进行信号解码的话,一旦信号发生变化或更新,我们就必须重新开发和部署整个解码过程,并通过 OTA 进行更新。使用 eKuiper 可以帮助您省去这些繁琐的工作。

保护您的 DBC

DBC 是解码 CAN 帧的关键。即使 CAN Bus 数据泄露,没有 DBC 也几乎无法解码。因此,DBC 是您的重要资产,不应向任何人泄露,包括参与解码开发的工程师。eKuiper 可以在运行时加载 DBC 文件,从而可以避免让开发者看到它。此外,当场景发生变化时,它可以在不重启进程的情况下热加载 DBC 文件。这有助于保护您的 DBC 文件,使其保持私密。

eKuiper「理解」CAN Bus 数据

作为一个边缘流式引擎,eKuiper 非常轻巧,可以部署在 CAN Bus 设备附近。它能够从 HTTP、文件系统、MQTT,以及本文所提到的 CAN Bus 等各种南向数据源收集数据。收集到的数据可以高效地进行处理,并发布到北向数据源(例如 MQTT 和 HTTP)。

eKuiper 具备对 CAN Bus 数据的理解能力。它简化了 CAN 帧的解码过程,并将其转化为一些配置信息。要处理 CAN Bus 数据,您可以使用以下 SQL 语句创建一个流:

CREATE STREAM canDemo () WITH (TYPE="can", FORMAT="can", SHARED="TRUE", SCHEMAID="dbc")

这条语句创建了一个名为 canDemo 的流,用于从 CAN Bus 中获取数据。该语句还指定了连接方式和数据格式,并指定使用 DBC 模式将 CAN 帧解码成信号。

DBC 设置

DBC 文件在解码 CAN 帧时扮演了模式的角色。就像为 protobuf 格式指定 *.proto 文件一样,您可以在 SCHEMAID 属性中指定 DBC 文件,它可以是文件路径也可以是目录路径。这意味着您可以指定一个单独的 DBC 文件或一个包含多个 DBC 文件的目录。eKuiper 会加载目录中的所有 DBC 文件,并将它们作为模式使用。

在运行时,用户可以通过替换文件或向目录中添加新文件来更新 DBC 文件。eKuiper 能够热加载 DBC 文件,并通过重启规则来使用新的模式解码 CAN 帧。这可以帮助您保护 DBC 文件,让它保持私密。

连接和格式分离

在创建流的语句中,我们将 type 属性和 format 属性都设置为"can"。这是因为 eKuiper 将数据源的连接方式和数据格式进行了分离。

  • type 属性指定了连接方式,本案例中是 CAN Bus 。
  • format 属性指定了数据格式,本案例中是 CAN 帧。

这种分离使得 eKuiper 能够支持 CAN 帧和传输协议的各种组合,这在使用一些 CAN 适配器时非常常见。CAN 适配器可能会将 CAN 帧记录到文件中,或者将原始的 CAN 帧发送到 MQTT Broker,或者通过 TCP 或 UDP 以批量的形式发送 CAN 帧。在这些情况下,type 属性将是"file"或"mqtt",而 format 属性将是"can"。

如果 type 是“can”,eKuiper 会通过 socketCan 连接到 CAN Bus 。在下面的例子中,eKuiper 从文件中读取 CAN 帧:

CREATE STREAM canDemo () WITH (TYPE="file", FORMAT="can", SHARED="TRUE", SCHEMAID="dbc")

将 CAN Bus 灵活地桥接到 MQTT

CAN Bus 设备会以高频率(如 100HZ)在总线上周期性地发送消息。由于存储或带宽的限制,我们可能只想以较低的频率对数据进行采样,并有选择的保留信号。有了 eKuiper,我们可以:

  • 通过指定采样率来对数据进行采样。
  • 通过选择所需的信号来在信号层面对数据进行过滤。
  • 只桥接发生变化的信号。
  • 将不同 CAN 帧中的信号合并成一个消息。

所有这些功能都可以通过规则 SQL 来实现,并且由于具备规则热加载的能力,所以变更几乎没有成本。下面让我们看一些例子。

## 过滤信号
SELECT EnginSpeed, DriverRequestTorqueStatus FROM canDemo
## 将不同 CAN 帧中的信号合并
SELECT latest(EnginSpeed) as speed, latest(anotherSignal) as anotherSignal FROM canDemo
## 只桥接发生变化的信号
SELECT CHANGED_COLS(EngineSpeed, DriverRequestTorqueStatus) FROM canDemo

一旦获得了所需的信号,我们就需要决定将数据发布到哪个 MQTT 主题。用户可以指定一个固定的主题名称,或者使用从数据中派生出来的动态主题名称。

例如,在下面的规则中,每个解析出的 CAN 帧信号都会被桥接到 MQTT 主题 can/{ {CanId}}{ {CanId}} 是从数据中派生出的动态主题名称,比如一个 CAN ID 为 123 的 CAN 帧将被桥接到 MQTT 主题 can/123

{
 "id": "distributeRule",
 "sql": "SELECT *, meta(id) as canId FROM canDemo",
 "actions": [
  {
     "mqtt": {
       "server": "tcp://broker.emqx.io:1883",
       "topic": "can/{
  
  {.canId}}",
       "sendSingle": true
    }
  }
]
}

eKuiper 允许多个规则处理同一个流。因此,用户可以根据需要创建多个规则,将 CAN Bus 数据桥接到不同的 MQTT 主题。

结语

要实现 CAN Bus 和 MQTT 之间的桥接,我们的解决方案要能够从 CAN Bus 设备读取数据,根据需求对数据进行过滤和转换,并将数据发布到 MQTT Broker。这正是 eKuiper 的用武之地,它提供了一种简单、高效和灵活的方式来完成这项工作。

除了桥接功能,eKuiper 还可以在边缘规则引擎和边缘计算的多种场景中提供帮助。我们将在后续的文章中详细讨论这些场景。

版权声明: 本文为 EMQ 原创,转载请注明出处。
原文链接:https://www.emqx.com/zh/blog/bridging-demanded-signals-from-can-bus-to-mqtt-by-ekuiper

相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
7月前
|
消息中间件 安全 物联网
MQTT常见问题之新增自定义主题后平台侧收不到发布的数据如何解决
MQTT(Message Queuing Telemetry Transport)是一个轻量级的、基于发布/订阅模式的消息协议,广泛用于物联网(IoT)中设备间的通信。以下是MQTT使用过程中可能遇到的一些常见问题及其答案的汇总:
|
15天前
|
消息中间件 测试技术
通过轻量消息队列(原MNS)主题HTTP订阅+ARMS实现自定义数据多渠道告警
轻量消息队列(原MNS)以其简单队列模型、轻量化协议及按量后付费模式,成为阿里云产品间消息传输首选。本文通过创建主题、订阅、配置告警集成等步骤,展示了该产品在实际应用中的部分功能,确保消息的可靠传输。
38 2
|
5月前
|
消息中间件 Java 测试技术
消息队列 MQ使用问题之数据流出规则是否支持平台的云RabbitMQ
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
2月前
|
消息中间件 数据采集 数据库
小说爬虫-03 爬取章节的详细内容并保存 将章节URL推送至RabbitMQ Scrapy消费MQ 对数据进行爬取后写入SQLite
小说爬虫-03 爬取章节的详细内容并保存 将章节URL推送至RabbitMQ Scrapy消费MQ 对数据进行爬取后写入SQLite
27 1
EMQ
|
5月前
|
传感器 人工智能 安全
EMQX 与 MQTT: AI 大模型时代的分布式数据中枢
在以数据为核心的 AI 时代,基于 MQTT 协议的消息服务器 EMQX 能帮助企业更好的利用人工智能和机器学习模型,是智能化系统中核心的数据基础软件。
EMQ
248 18
|
5月前
|
消息中间件 监控 物联网
消息队列 MQ使用问题之如何获取和处理消息堆积数据
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
7月前
|
消息中间件 Shell 数据处理
rocket mq 查看消费进度,消息堆积,清除堆积数据命令
该内容是关于RocketMQ的消费进度管理和堆积数据处理的指导。首先,需进入RocketMQ的bin目录,然后使用`mqadmin consumerProgress`命令查看消费者或生产者的消费进度。`broker offset`和`consumer offset`的差值表示未消费消息。通过`resetOffsetByTime`命令可重置消费位点来清除堆积数据,未消费消息默认3天后会被丢弃。此外,`CONSUME_FROM WHERE`枚举类定义了消费起点选项,包括从最后、最开始或指定时间点消费。
1711 3
|
7月前
|
消息中间件 JavaScript Java
MQ产品使用合集之视觉智能平台人脸搜索1:N怎么更新人脸数据
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
6月前
|
消息中间件 存储 RocketMQ
消息队列 MQ产品使用合集之如何防止丢数据
阿里云消息队列MQ(Message Queue)是一种高可用、高性能的消息中间件服务,它允许您在分布式应用的不同组件之间异步传递消息,从而实现系统解耦、流量削峰填谷以及提高系统的可扩展性和灵活性。以下是使用阿里云消息队列MQ产品的关键点和最佳实践合集。
|
7月前
|
弹性计算 JSON 监控
使用Logtail采集MQTT协议数据
本文主要介绍如何使用Logtail来采集MQTT协议的数据。
157 0
使用Logtail采集MQTT协议数据