工业物联网或系统集成中应用消息队列(ActiveMQ,C#的demo)的场景全面分析

简介: 1.[连载]《C#通讯(串口和网络)框架的设计与实现》 2.[开源]C#跨平台物联网通讯框架ServerSuperIO(SSIO)介绍 2.应用SuperIO(SIO)和开源跨平台物联网框架ServerSuperIO(SSIO)构建系统的整体方案 3.

1.[连载]《C#通讯(串口和网络)框架的设计与实现》

2.[开源]C#跨平台物联网通讯框架ServerSuperIO(SSIO)介绍

2.应用SuperIO(SIO)和开源跨平台物联网框架ServerSuperIO(SSIO)构建系统的整体方案

3.C#工业物联网和集成系统解决方案的技术路线(数据源、数据采集、数据上传与接收、ActiveMQ、Mongodb、WebApi、手机App)

5.ServerSuperIO开源地址:https://github.com/wxzz/ServerSuperIO

 

目       录

工业物联网或系统集成中应用消息队列(ActiveMQ)的场景全面分析... 1

前言... 1

第一章           终端/交互场景... 3

1.1           终端设备... 3

1.2           通讯机制... 3

第二章           ActvieMQ应用场景... 4

2.1           发布/订阅(Publish/Subscribe)... 4

2.2           生产者/消费者(Producer/Consumer)... 7

2.3           请求/应答(Request/Response)... 10

第三章           假定场景分析... 16

3.1           通讯层... 16

3.2           数据业务层... 16

3.3           综述... 16

 

前言

     互联网技术已经发展的很成熟了,各种开源的代码、框架和解决方案等。鉴于互联网技术的通用性,势必向其他领域延展。不管是工业4.0,还是互联网+  工业,互联网技术向工业领域传导也是必然的。

     所以,对于工业方面的应用场景的技术储备和技术线路调研也是日常工作很重要的一部分,为公司的横向和纵向发展提供技术平台和保障,当然也取决于领导的视野。

第一章     终端/交互场景

    任何技术都是为业务服务,而业务是有特定的应用场景。离开了实现环境去谈技术是没有实际意义的,解决实际问题而又能保证相当长时间内的稳定性是我们努力实现的目标。同时要从多个角度来考虑问题,以及做出平衡。

1.1    终端设备

(1)    终端种类:嵌入式硬件/传感器、PC机(监测站、大型监控设备等)、手机终端等。

(2)    交互方式:单向交互,数据上传,可能服务端会有返回确认信息,证明数据已经收到了;双向交互,服务端不仅仅会返回确认信息,同时还要主动下发给指定终端命令信息,例如:控制硬件设备机械动作命令、修改硬件设备参数命令、以及补传相关数据信息命令等等。

(3)    设备管理:这里指的设备管理是说设备的状态,包括两个方面:设备IO状态和设备通讯状态。设备IO状态包括:IO打开和IO关闭。设备通讯状态包括:通讯中断、通讯干扰和通讯正常。为了判断故障,这里的逻辑关系是:IO打开的时候不一定代表通讯正常;IO关闭不一定代表通讯中断;通讯中断不一定代表IO关闭;通讯干扰不一定代表IO打开。

(4)    数据完整性:允许数据缺失,一般在原来数据基础上的增量数据是可以允许丢失的;不允许数据缺失,一般脉冲数据是不允许数据丢失的。

1.2    通讯机制

(1)主动请求数据:服务器端主动下发命令给终端,让谁上传数据、上传什么数据都由服务器端决定。

(2)被动接收数据:服务器端被动接收终端上传的数据,根据数据信息进行数据处理,以及返回确认信息。

第二章     ActvieMQ应用场景

     消息队列比较多,本文以ActiveMQ为例进行介绍,全部代码实现C#为主,主要考虑到常见的应用模式。事例代码下载:http://pan.baidu.com/s/1qXZ1sU4

2.1    发布/订阅(Publish/Subscribe)

     一个信息发布者在某一个主题上发布消息,所有订阅该主题的订阅都会收到相同的消息,这种模式是一对多的关系,如下图:

发布端代码:

static void Main(string[] args)
        {
            try
            {
                IConnectionFactory factory = new ConnectionFactory("tcp://localhost:61616/");
                using (IConnection connection = factory.CreateConnection())
                {
                    using (ISession session = connection.CreateSession())
                    {
                        IMessageProducer prod = session.CreateProducer(new Apache.NMS.ActiveMQ.Commands.ActiveMQTopic("Topic"));

                        string text = Console.ReadLine();
                        while (text!="exit")
                        {
                            ITextMessage msg = prod.CreateTextMessage();
                            msg.Text = text;
                            prod.Send(msg, Apache.NMS.MsgDeliveryMode.NonPersistent, Apache.NMS.MsgPriority.Normal, TimeSpan.MinValue);
                            Console.WriteLine("Sending: " + text);
                            System.Threading.Thread.Sleep(2000);
                        }
                    }
                }
                Console.ReadLine();
            }
            catch (System.Exception e)
            {
                Console.WriteLine("{0}", e.Message);
                Console.ReadLine();
            }
        }

 订阅端代码:

static void Main(string[] args)
        {
            try
            {
                IConnectionFactory factory = new ConnectionFactory("tcp://localhost:61616/");
                using (IConnection connection = factory.CreateConnection())
                {
                    connection.ClientId = "testing listener1";
                    connection.Start();

                    using (ISession session = connection.CreateSession())
                    {
                        IMessageConsumer consumer = session.CreateDurableConsumer(new Apache.NMS.ActiveMQ.Commands.ActiveMQTopic("Topic"), "testing listener1", null, false);
                        consumer.Listener += new MessageListener(consumer_Listener);
                        Console.ReadLine();
                    }
                    connection.Stop();
                    connection.Close();
                }
            }
            catch (System.Exception e)
            {
                Console.WriteLine(e.Message);
                Console.ReadLine();
            }
        }

        static void consumer_Listener(IMessage message)
        {
            try
            {
                ITextMessage msg = (ITextMessage)message;
                Console.WriteLine("Receive: " + msg.Text);
            }
            catch (System.Exception e)
            {
                Console.WriteLine(e.Message);
            }
        }

2.2    生产者/消费者(Producer/Consumer)

    生产者生产了一块香皂,消费者购买了该块香皂,使用完了,就在这个世界上消息了,生产者和消费者之间的关系存在一种偶然性,这是一对一的关系,如下图:

 

生产端代码:

 static void Main(string[] args)
        {
            try
            {
                IConnectionFactory factory = new ConnectionFactory("tcp://localhost:61616/");
                using (IConnection connection = factory.CreateConnection())
                {
                    using (ISession session = connection.CreateSession())
                    {
                        IMessageProducer prod = session.CreateProducer(new Apache.NMS.ActiveMQ.Commands.ActiveMQQueue("Queue"));

                        string text = Console.ReadLine();
                        while (text != "exit")
                        {
                            ITextMessage msg = prod.CreateTextMessage();
                            msg.Text = text;
                            prod.Send(msg, Apache.NMS.MsgDeliveryMode.NonPersistent, Apache.NMS.MsgPriority.Normal, TimeSpan.MinValue);
                            Console.WriteLine("Sending: " + text);
                            System.Threading.Thread.Sleep(2000);
                        }
                    }
                }
                Console.ReadLine();
            }
            catch (System.Exception e)
            {
                Console.WriteLine("{0}", e.Message);
                Console.ReadLine();
            }
        }

 消费端代码:

static void Main(string[] args)
        {
            try
            {
                IConnectionFactory factory = new ConnectionFactory("tcp://localhost:61616/");
                using (IConnection connection = factory.CreateConnection())
                {
                    //connection.ClientId = "testing listener2";
                    connection.Start(); 
                    using (ISession session = connection.CreateSession())
                    {
                        IMessageConsumer consumer = session.CreateConsumer(new Apache.NMS.ActiveMQ.Commands.ActiveMQQueue("Queue"));
                        consumer.Listener += new MessageListener(consumer_Listener);
                        Console.ReadLine();
                    }
                    connection.Stop();
                    connection.Close();
                }
            }
            catch (System.Exception e)
            {
                Console.WriteLine(e.Message);
                Console.ReadLine();
            }
        }

        static void consumer_Listener(IMessage message)
        {
            try
            {
                ITextMessage msg = (ITextMessage)message;
                Console.WriteLine("Receive: " + msg.Text);
            }
            catch (System.Exception e)
            {
                Console.WriteLine(e.Message);
            }
        }

 

2.3    请求/应答(Request/Response)

请求-应答的通信方式应用很普遍,客户端向服务端上传实时数据或参数,服务端处理完之后,要返回确认信息,这种交互关系如下图:

 

客户端代码:

static void Main(string[] args)
        {
            IConnectionFactory factory = new ConnectionFactory("tcp://localhost:61616/");
            try
            {
                using (IConnection connection = factory.CreateConnection())
                {
                    connection.Start();
                    using (ISession session = connection.CreateSession(AcknowledgementMode.AutoAcknowledge))
                    {
                        IDestination destination =  new Apache.NMS.ActiveMQ.Commands.ActiveMQQueue("client.messages");

                        IMessageProducer producer = session.CreateProducer(destination);
                        producer.DeliveryMode=MsgDeliveryMode.NonPersistent;

                        IDestination tempDest = session.CreateTemporaryQueue();
                        IMessageConsumer responseConsumer = session.CreateConsumer(tempDest);
                        responseConsumer.Listener += new MessageListener(consumer_Listener);
                       
                        string text = Console.ReadLine();
                        while (text != "exit")
                        {
                            ITextMessage msg = session.CreateTextMessage();
                            msg.Text = text;
                            msg.NMSReplyTo = tempDest;
                            msg.NMSCorrelationID = DateTime.Now.ToString("yyyyMMddHHmmss");
                            producer.Send(msg, Apache.NMS.MsgDeliveryMode.NonPersistent, Apache.NMS.MsgPriority.Normal, TimeSpan.MinValue);
                            Console.WriteLine("Sending: " + text);
                            System.Threading.Thread.Sleep(2000);
                        }

                        Console.ReadLine();
                    }
                    connection.Stop();
                    connection.Close();
                }
            }
            catch (Exception ex)
            {
                Console.WriteLine(ex.Message);
                Console.ReadLine();
            }
        }

        static void consumer_Listener(IMessage message)
        {
            try
            {
                ITextMessage msg = (ITextMessage)message;
                Console.WriteLine("Receive: " + msg.Text);
            }
            catch (System.Exception e)
            {
                Console.WriteLine(e.Message);
            }
        }

 服务端代码:

 private static ISession session;

        private static IMessageProducer replyProducer;
        static void Main(string[] args)
        {
            IConnectionFactory factory = new ConnectionFactory("tcp://localhost:61616/");
            try
            {
                    IConnection connection = factory.CreateConnection();
                    connection.Start();
                    session = connection.CreateSession(AcknowledgementMode.AutoAcknowledge);

                    IDestination adminQueue = new Apache.NMS.ActiveMQ.Commands.ActiveMQQueue("client.messages");
                    replyProducer = session.CreateProducer();
                    replyProducer.DeliveryMode=MsgDeliveryMode.NonPersistent;

                    IMessageConsumer consumer = session.CreateConsumer(adminQueue);
                    consumer.Listener += new MessageListener(consumer_Listener);
                Console.ReadLine();
            }
            catch (Exception ex)
            {
                Console.WriteLine(ex.Message);
                Console.ReadLine();
            }
        }

        static void consumer_Listener(IMessage message)
        {
            try
            {
                ITextMessage response = session.CreateTextMessage();
                if (message is ITextMessage) {
                    ITextMessage txtMsg = (ITextMessage)message;
                    string messageText = txtMsg.Text;
                    response.Text = messageText;

                    Console.WriteLine("Receive:" + messageText);
                }

                response.NMSCorrelationID=message.NMSCorrelationID;

                replyProducer.Send(message.NMSReplyTo, response);
            }
            catch (Exception e)
            {
                Console.WriteLine(e.Message);

            }
        }

第三章     假定场景分析

     我们以系统建设过程中的构架来分析消息队列在两个层面的问题,通讯层和数据业务层。

3.1    通讯层

     通讯层是否可以用消息队列(ActiveMQ)?这个问题取决于两方面:1、如果终端设备有嵌入式硬件,甚至还是C51开发的,那么在系统集成和物联的过程中,就涉及到兼容性的问题。显然和消息队列进行对接是一件头痛的事,用C51写一个对接的驱动不是不可能,但是要评估工作量和稳定性。2、服务端与指定某个终端双向交互频繁的情况,特别是服务端实时发送设备校准命令的情况,这种情况消息队列是不如通讯框架的。

3.2    数据业务层

     服务端接收到数据后,完全可以使用消息队列的生产者和消费者模式处理数据,对系统的业务进行解耦。

     下发命令也可以通过消息队列,这样可以统一控制端的接口,再由通讯框架下发到指定的终端。

3.3    综述

     综合考虑,建议在通讯层使用通讯框架,对于设备的IO状态和通讯状态能够及时反应,通讯效率也是能够得到保障的;对于数据业务层,建议不要放在通讯框架内部进行处理,可以使用消息队列,配合通讯框架使用。

    整体架构图如下:

 

 

 

 

 

 

 

 

 

 

 

 

 

 

文章得到了群友支持:

 

相关实践学习
钉钉群中如何接收IoT温控器数据告警通知
本实验主要介绍如何将温控器设备以MQTT协议接入IoT物联网平台,通过云产品流转到函数计算FC,调用钉钉群机器人API,实时推送温湿度消息到钉钉群。
阿里云AIoT物联网开发实战
本课程将由物联网专家带你熟悉阿里云AIoT物联网领域全套云产品,7天轻松搭建基于Arduino的端到端物联网场景应用。 开始学习前,请先开通下方两个云产品,让学习更流畅: IoT物联网平台:https://iot.console.aliyun.com/ LinkWAN物联网络管理平台:https://linkwan.console.aliyun.com/service-open
相关文章
|
1月前
|
前端开发 JavaScript UED
探索Python Django中的WebSocket集成:为前后端分离应用添加实时通信功能
通过在Django项目中集成Channels和WebSocket,我们能够为前后端分离的应用添加实时通信功能,实现诸如在线聊天、实时数据更新等交互式场景。这不仅增强了应用的功能性,也提升了用户体验。随着实时Web应用的日益普及,掌握Django Channels和WebSocket的集成将为开发者开启新的可能性,推动Web应用的发展迈向更高层次的实时性和交互性。
75 1
|
1月前
|
Java Maven Docker
gitlab-ci 集成 k3s 部署spring boot 应用
gitlab-ci 集成 k3s 部署spring boot 应用
|
20天前
|
编译器 C#
c# - 运算符<<不能应用于long和long类型的操作数
在C#中,左移运算符的第二个操作数必须是 `int`类型,因此需要将 `long`类型的位移计数显式转换为 `int`类型。这种转换需要注意数据丢失和负值处理的问题。通过本文的详细说明和示例代码,相信可以帮助你在实际开发中正确使用左移运算符。
28 3
|
19天前
|
编译器 C#
c# - 运算符<<不能应用于long和long类型的操作数
在C#中,左移运算符的第二个操作数必须是 `int`类型,因此需要将 `long`类型的位移计数显式转换为 `int`类型。这种转换需要注意数据丢失和负值处理的问题。通过本文的详细说明和示例代码,相信可以帮助你在实际开发中正确使用左移运算符。
34 1
|
30天前
|
存储 JSON Ubuntu
时序数据库 TDengine 支持集成开源的物联网平台 ThingsBoard
本文介绍了如何结合 Thingsboard 和 TDengine 实现设备管理和数据存储。Thingsboard 中的“设备配置”与 TDengine 中的超级表相对应,每个设备对应一个子表。通过创建设备配置和设备,实现数据的自动存储和管理。具体操作包括创建设备配置、添加设备、写入数据,并展示了车辆实时定位追踪和车队维护预警两个应用场景。
51 3
|
17天前
|
SQL 监控 物联网
ClickHouse在物联网(IoT)中的应用:实时监控与分析
【10月更文挑战第27天】随着物联网(IoT)技术的快速发展,越来越多的设备被连接到互联网上,产生了海量的数据。这些数据不仅包含了设备的状态信息,还包括用户的使用习惯、环境参数等。如何高效地处理和分析这些数据,成为了一个重要的挑战。作为一位数据工程师,我在一个物联网项目中深入使用了ClickHouse,以下是我的经验和思考。
42 0
|
18天前
|
编译器 C#
c# - 运算符<<不能应用于long和long类型的操作数
在C#中,左移运算符的第二个操作数必须是 `int`类型,因此需要将 `long`类型的位移计数显式转换为 `int`类型。这种转换需要注意数据丢失和负值处理的问题。通过本文的详细说明和示例代码,相信可以帮助你在实际开发中正确使用左移运算符。
10 0
|
2月前
|
设计模式 开发框架 前端开发
MVC 模式在 C# 中的应用
MVC(Model-View-Controller)模式是广泛应用于Web应用程序开发的设计模式,将应用分为模型(存储数据及逻辑)、视图(展示数据给用户)和控制器(处理用户输入并控制模型与视图交互)三部分,有助于管理复杂应用并提高代码可读性和维护性。在C#中,ASP.NET MVC框架常用于构建基于MVC模式的Web应用,通过定义模型、控制器和视图,实现结构清晰且易维护的应用程序。
51 2
|
1月前
|
安全 网络协议 物联网
物联网僵尸网络和 DDoS 攻击的 CERT 分析
物联网僵尸网络和 DDoS 攻击的 CERT 分析
|
2月前
|
编译器 C# Android开发
Uno Platform 是一个用于构建跨平台应用程序的强大框架,它允许开发者使用 C# 和 XAML 来创建适用于多个平台的应用
Uno Platform 是一个用于构建跨平台应用程序的强大框架,它允许开发者使用 C# 和 XAML 来创建适用于多个平台的应用
258 8