使用 eKuiper 按需桥接 CAN Bus 数据至 MQTT

简介: 本文将通过使用开源边缘流式 SQL 引擎 eKuiper ,灵活地从 CAN Bus 提取有意义的数据和所需的信号,实现从 CAN Bus 到 MQTT 的无缝桥接。

CAN Bus 是一种广泛应用于汽车和工业领域的通信协议,它能够让多个设备在同一网络中进行交互。而 MQTT 是一种广泛应用于物联网领域的通信协议,作为一种轻量级的发布-订阅消息传输协议,它有效地促进了机器之间的通信。

通过将 CAN Bus 数据桥接到 MQTT,能够实现 CAN Bus 设备与物联网平台和应用的集成。尽管市场上存在多种解决方案和工具可以实现这一目标,但它们通常只传输原始的二进制 CAN 数据,这导致对信号进行过滤和处理非常不方便。

在本文中,我们将介绍一种全新的解决方案,通过使用 开源边缘流式 SQL 引擎 eKuiper,灵活地从 CAN Bus 提取有意义的数据和所需的信号,实现从 CAN Bus 到 MQTT 的无缝桥接。

CAN Bus 的工作原理

CAN Bus 是一种通信系统,它能够让车辆中的不同设备相互传递数据。它还能提供许多有用的车辆信息,例如速度、油量、发动机温度和诊断码等。然而,从 CAN Bus 中获取和解读这些信息并不是一件容易的事情,因为它们通常以二进制形式进行存储。

深入了解 CAN Bus ,欢迎阅读:车联网 CAN Bus 协议介绍与数据实时流处理

CAN 帧

我们可以从 CAN Bus 接收 CAN 帧流,其中包含我们感兴趣的二进制形式的信号。每个 CAN 帧都包含 ID、数据长度码(DLC)和有效载荷。

  • ID 用来标识帧中数据的类型。
  • DLC 用来指定帧中数据的字节数。
  • 有效载荷是帧中携带的实际数据。

CAN 协议有多种类型,它们在 ID 和有效载荷长度的定义上略有不同。下面是一个 CAN 2.0A 帧的例子,其 ID 为 11 位,有效载荷长度最多为 8 字节。

CAN 2.0A 帧例子

有效载荷由一系列信号组成。每个信号都有名称、长度和值。

  • 长度是信号在有效载荷里占用的位数。
  • 值是信号里包含的实际数据。

为了把二进制数据转换成有意义的信息,我们需要提取这些信号。

信号提取

CAN 数据库(DBC)是一个文本文件,用于描述 CAN 帧有效载荷中信号的组织方式。它相当于一个字典,提供了每个信号的名称、长度和值的计算方法,这样我们就可以通过 CAN 帧进行通信。

下面是 DBC 文件的一段内容。它定义了一个 ID 为 544,DLC 为 8 的 CAN 帧。该帧包含 5 个信号,每个信号都有名称、长度和值。例如,信号 EngineSpeed 的长度为 16 位,值的范围是 0 到 16383.75。信号的值是通过把原始数据乘以 0.25 再加上 0 来计算得出。

BO_ 544 EMS_220h: 8 EMS
SG_ EngineSpeed : 0|16@1+ (0.25,0) [0|16383.75] "rpm" Vector__XXX
SG_ CurrentEngineTorque : 16|16@1+ (0.25,-500) [-500|1547.5] "Nm" Vector__XXX
SG_ DriverRequestTorque : 32|16@1+ (0.25,-500) [-500|1547.5] "Nm" Vector__XXX
SG_ CurrentEngineTorqueStatus : 48|1@1+ (1,0) [0|1] "" Vector__XXX
SG_ DriverRequestTorqueStatus : 49|1@1+ (1,0) [0|1] "" Vector__XXX

CAN 帧的解码流程如下:

CAN 帧解码流程

完成解码后,我们可以得知发动机的转速为每分钟 1000 转。然而通过编写应用进行信号解码的话,一旦信号发生变化或更新,我们就必须重新开发和部署整个解码过程,并通过 OTA 进行更新。使用 eKuiper 可以帮助您省去这些繁琐的工作。

保护您的 DBC

DBC 是解码 CAN 帧的关键。即使 CAN Bus 数据泄露,没有 DBC 也几乎无法解码。因此,DBC 是您的重要资产,不应向任何人泄露,包括参与解码开发的工程师。eKuiper 可以在运行时加载 DBC 文件,从而可以避免让开发者看到它。此外,当场景发生变化时,它可以在不重启进程的情况下热加载 DBC 文件。这有助于保护您的 DBC 文件,使其保持私密。

eKuiper「理解」CAN Bus 数据

作为一个边缘流式引擎,eKuiper 非常轻巧,可以部署在 CAN Bus 设备附近。它能够从 HTTP、文件系统、MQTT,以及本文所提到的 CAN Bus 等各种南向数据源收集数据。收集到的数据可以高效地进行处理,并发布到北向数据源(例如 MQTT 和 HTTP)。

eKuiper 具备对 CAN Bus 数据的理解能力。它简化了 CAN 帧的解码过程,并将其转化为一些配置信息。要处理 CAN Bus 数据,您可以使用以下 SQL 语句创建一个流:

CREATE STREAM canDemo () WITH (TYPE="can", FORMAT="can", SHARED="TRUE", SCHEMAID="dbc")

这条语句创建了一个名为 canDemo 的流,用于从 CAN Bus 中获取数据。该语句还指定了连接方式和数据格式,并指定使用 DBC 模式将 CAN 帧解码成信号。

DBC 设置

DBC 文件在解码 CAN 帧时扮演了模式的角色。就像为 protobuf 格式指定 *.proto 文件一样,您可以在 SCHEMAID 属性中指定 DBC 文件,它可以是文件路径也可以是目录路径。这意味着您可以指定一个单独的 DBC 文件或一个包含多个 DBC 文件的目录。eKuiper 会加载目录中的所有 DBC 文件,并将它们作为模式使用。

在运行时,用户可以通过替换文件或向目录中添加新文件来更新 DBC 文件。eKuiper 能够热加载 DBC 文件,并通过重启规则来使用新的模式解码 CAN 帧。这可以帮助您保护 DBC 文件,让它保持私密。

连接和格式分离

在创建流的语句中,我们将 type 属性和 format 属性都设置为"can"。这是因为 eKuiper 将数据源的连接方式和数据格式进行了分离。

  • type 属性指定了连接方式,本案例中是 CAN Bus 。
  • format 属性指定了数据格式,本案例中是 CAN 帧。

这种分离使得 eKuiper 能够支持 CAN 帧和传输协议的各种组合,这在使用一些 CAN 适配器时非常常见。CAN 适配器可能会将 CAN 帧记录到文件中,或者将原始的 CAN 帧发送到 MQTT Broker,或者通过 TCP 或 UDP 以批量的形式发送 CAN 帧。在这些情况下,type 属性将是"file"或"mqtt",而 format 属性将是"can"。

如果 type 是“can”,eKuiper 会通过 socketCan 连接到 CAN Bus 。在下面的例子中,eKuiper 从文件中读取 CAN 帧:

CREATE STREAM canDemo () WITH (TYPE="file", FORMAT="can", SHARED="TRUE", SCHEMAID="dbc")

将 CAN Bus 灵活地桥接到 MQTT

CAN Bus 设备会以高频率(如 100HZ)在总线上周期性地发送消息。由于存储或带宽的限制,我们可能只想以较低的频率对数据进行采样,并有选择的保留信号。有了 eKuiper,我们可以:

  • 通过指定采样率来对数据进行采样。
  • 通过选择所需的信号来在信号层面对数据进行过滤。
  • 只桥接发生变化的信号。
  • 将不同 CAN 帧中的信号合并成一个消息。

所有这些功能都可以通过规则 SQL 来实现,并且由于具备规则热加载的能力,所以变更几乎没有成本。下面让我们看一些例子。

## 过滤信号
SELECT EnginSpeed, DriverRequestTorqueStatus FROM canDemo
## 将不同 CAN 帧中的信号合并
SELECT latest(EnginSpeed) as speed, latest(anotherSignal) as anotherSignal FROM canDemo
## 只桥接发生变化的信号
SELECT CHANGED_COLS(EngineSpeed, DriverRequestTorqueStatus) FROM canDemo

一旦获得了所需的信号,我们就需要决定将数据发布到哪个 MQTT 主题。用户可以指定一个固定的主题名称,或者使用从数据中派生出来的动态主题名称。

例如,在下面的规则中,每个解析出的 CAN 帧信号都会被桥接到 MQTT 主题 can/{ {CanId}}{ {CanId}} 是从数据中派生出的动态主题名称,比如一个 CAN ID 为 123 的 CAN 帧将被桥接到 MQTT 主题 can/123

{
 "id": "distributeRule",
 "sql": "SELECT *, meta(id) as canId FROM canDemo",
 "actions": [
  {
     "mqtt": {
       "server": "tcp://broker.emqx.io:1883",
       "topic": "can/{
  
  {.canId}}",
       "sendSingle": true
    }
  }
]
}

eKuiper 允许多个规则处理同一个流。因此,用户可以根据需要创建多个规则,将 CAN Bus 数据桥接到不同的 MQTT 主题。

结语

要实现 CAN Bus 和 MQTT 之间的桥接,我们的解决方案要能够从 CAN Bus 设备读取数据,根据需求对数据进行过滤和转换,并将数据发布到 MQTT Broker。这正是 eKuiper 的用武之地,它提供了一种简单、高效和灵活的方式来完成这项工作。

除了桥接功能,eKuiper 还可以在边缘规则引擎和边缘计算的多种场景中提供帮助。我们将在后续的文章中详细讨论这些场景。

版权声明: 本文为 EMQ 原创,转载请注明出处。
原文链接:https://www.emqx.com/zh/blog/bridging-demanded-signals-from-can-bus-to-mqtt-by-ekuiper

相关实践学习
RocketMQ一站式入门使用
从源码编译、部署broker、部署namesrv,使用java客户端首发消息等一站式入门RocketMQ。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
2月前
|
消息中间件 存储 监控
|
2月前
|
机器学习/深度学习 开发工具
DP活动:HMI-Board以太网数据监视器(二)MQTT和LVGL
DP活动:HMI-Board以太网数据监视器(二)MQTT和LVGL
32 1
|
30天前
|
消息中间件 物联网 关系型数据库
MQTT常见问题之消息对列mqtt的历史数据查看失败如何解决
MQTT(Message Queuing Telemetry Transport)是一个轻量级的、基于发布/订阅模式的消息协议,广泛用于物联网(IoT)中设备间的通信。以下是MQTT使用过程中可能遇到的一些常见问题及其答案的汇总:
|
2月前
|
消息中间件 Web App开发 监控
mqtt数据问题之如何实现webRTC 协议的监控视频压测
MQTT协议是一个轻量级的消息传输协议,设计用于物联网(IoT)环境中设备间的通信;本合集将详细阐述MQTT协议的基本原理、特性以及各种实际应用场景,供用户学习和参考。
63 0
|
4月前
|
消息中间件 Shell Docker
百度搜索:蓝易云【docker rabbitmq-清空queue队列数据】
通过以上步骤,您可以使用Docker清空RabbitMQ队列的数据。这将帮助您重置队列并清除旧数据,以进行新的测试或使用。
33 0
|
4月前
|
存储 JSON 数据库
从 MQTT、InfluxDB 将数据无缝接入 TDengine,接入功能与 Logstash 类似
利用 TDengine Enterprise 和 TDengine Cloud 的数据接入功能,我们现在能够将 MQTT、InfluxDB 中的数据通过规则无缝转换至 TDengine 中,由于该功能在实现及使用上与 Logstash 类似,本文将结合 Logstash 为大家进行解读。
83 1
|
5月前
|
消息中间件 算法 关系型数据库
RocketMQ中,对一个包含200万条数据的表进行新建索引时,通常会需要锁定该
RocketMQ中,对一个包含200万条数据的表进行新建索引时,通常会需要锁定该
29 2
|
5月前
|
消息中间件 Java Spring
Spring Boot使用RabbitMq消费数据较慢解决
Spring Boot使用RabbitMq消费数据较慢解决
112 0
|
6月前
|
数据采集 JSON 移动开发
【MQTT】Esp32数据上传采集:最新mqtt插件(支持掉线、真机调试错误等问题)
【MQTT】Esp32数据上传采集:最新mqtt插件(支持掉线、真机调试错误等问题)
|
7月前
|
NoSQL 关系型数据库 MySQL
同步 MySQL 数据至 ES/Redis/MQ 等的五种方式
同步 MySQL 数据至 ES/Redis/MQ 等的五种方式
421 0

热门文章

最新文章