9.3DDD之集成事件

简介: 和领域事件不同,集成事件主要用于在微服务之间进行事件传递,即可能在多个服务器之间进行通信。本文讲解`RabbitMQ`中间件来完成集成事件的处理。

9.3DDD之集成事件

和领域事件不同,集成事件主要用于在微服务之间进行事件传递,即可能在多个服务器之间进行通信。本文讲解RabbitMQ中间件来完成集成事件的处理。

  1. RabbitMQ的基本概念:
  • 信道(channel),信道是消息的生产者,消费者和服务器之间进行通信的虚拟连接。
  • 队列,队列是用来进行消息收发的地方,生产者将消息放到队列中,消费者从队列中获取消息。
  • 交换机,交换机用于把消息路由到队列中。
  1. RabbitMQ的routing模式:
  • 生产者把消息发布到交换机中,消息会携带routingKey属性,交互机根据routingKey的值吧消息发送到一个或者多个队列,然后消费者从队列中获取消息。这种模式的优点是交换机和队列都位于RabbitMQ服务器的内部,即使消费者不在线,相关消息也会保存在队列中,等消费者上线后就可以获取到消息了。

使用步骤

Nuget安装RabbitMQ.Client

  • 发送消息端

usingRabbitMQ.Client;

usingSystem.Text;

 

varfactory=newConnectionFactory();

factory.HostName="127.0.0.1";//RabbitMQ服务器地址

factory.DispatchConsumersAsync=true;//兼容消费者异步使用

stringexchangeName="exchange1";//交换机的名字

stringeventName="myEvent";// routingKey的值

usingvarconn=factory.CreateConnection();//创建一个客户端到RabbitMQ的TCP连接,尽量重复使用

while (true)

{

   stringmsg=DateTime.Now.TimeOfDay.ToString();//待发送消息

   using (varchannel=conn.CreateModel())//创建信道,信道可以关闭,关闭后消息才会发出

   {

       varproperties=channel.CreateBasicProperties;//创建一个空的内容头

       properties.DeliveryMode=2;//1为非持久2为持久

       //声明指定名字的交换机,如果已有同名的交换机则不再创建

       //type:direct表示交换机会根据消息routingKey的值进行相等性匹配,消息会发布到和它的routingKey绑定的队列

       channel.ExchangeDeclare(exchange: exchangeName, type: "direct");

       byte[] body=Encoding.UTF8.GetBytes(msg);//RabbitMQ的消息只能按照byte[]类型传递

       channel.BasicPublish(exchange: exchangeName, routingKey: eventName,

           mandatory: true, basicProperties: properties, body: body);//发布消息        

   }

   Console.WriteLine("发布了消息:"+msg);

   Thread.Sleep(1000);

}

  • 接收消息端:

usingRabbitMQ.Client;

usingRabbitMQ.Client.Events;

usingSystem.Text;

 

varfactory=newConnectionFactory();

factory.HostName="127.0.0.1";

factory.DispatchConsumersAsync=true;

stringexchangeName="exchange1";

stringeventName="myEvent";

usingvarconn=factory.CreateConnection();

usingvarchannel=conn.CreateModel();

stringqueueName="queue1";

//声明和发送端相同的交换机,如果发送端已经声明了相同的,则这天语句会被忽略

//但是仍然要写,因为不确定是哪端先启动

channel.ExchangeDeclare(exchange: exchangeName, type: "direct");

//声明一个队列来接收交换机转发来的消息,如果已经指定了同名的队列,则自动忽略

//消息从队列中取走后则队列中就没有这个消息了

//如果A、B两程序都想取同一条消息,则需要声明两个不同名字的队列

channel.QueueDeclare(queue: queueName, durable: true,

       exclusive: false, autoDelete: false, arguments: null);

//将队列绑定到交换机上,并设定routingKey参数,这样当交换机收到routingKey的值和设定的值相同时

//会把消息转发到我们指定的队列,一个交换机可绑定多个队列,如果这些队列的routingKey的值相同

//那么交换机收到同一个routingKey的时候,会发送给多个队列

channel.QueueBind(queue: queueName,

   exchange: exchangeName, routingKey: eventName);

//AsyncEventingBasicConsumer用于从队列中接收消息,当一天消息被接收时,Received事件就会被触发

varconsumer=newAsyncEventingBasicConsumer(channel);

//Received是阻塞执行的,也就是一条回调方法执行完成后才会触发下一条Received事件

consumer.Received+=Consumer_Received;//增加处理事件

 

channel.BasicConsume(queue: queueName,autoAck: false, consumer: consumer);//执行

Console.ReadLine();

 

 

asyncTaskConsumer_Received(objectsender, BasicDeliverEventArgsargs)

{

   //RabbitMQ支持消息的失败重发

   try

   {

       varbytes=args.Body.ToArray();

       stringmsg=Encoding.UTF8.GetString(bytes);

       Console.WriteLine(DateTime.Now+"收到了消息"+msg);

       //如果消息处理成功,则调用BasicAck通知队列

       //如果消息没有处理成功,则抵用BasicReject通知队列

       channel.BasicAck(args.DeliveryTag, multiple: false);

       awaitTask.Delay(800);

   }

   catch (Exceptionex)

   {

       channel.BasicReject(args.DeliveryTag, true);

       Console.WriteLine("处理收到的消息出错"+ex);

   }

}

简化框架

  1. Nuget安装Zack.EventBus
  2. 在配置系统下创建EventBus节点

"EventBus": {

   "HostName": "127.0.01",//RabbitMQ服务器地址

   "ExchangeName": "EventBusDemo1"//交换机的名字

 }

  1. 在program.cs中进行配置
  • 发送端

vareventBusSec=builder.Configuration.GetSection("EventBus");

builder.Services.Configure<IntegrationEventRabbitMQOptions>(eventBusSec);

//第一个参数用来设定程序绑定的队列的名字,一般一个微服务使用一个名字

//但是同一个微服务项目的每个集群实例都要收到消息则不能使用一个名字

//第二个参数为含有监听继承事件的处理者代码的程序集

builder.Services.AddEventBus("EventBusDemo1_Q1", Assembly.GetExecutingAssembly());

varapp=builder.Build();

app.UseEventBus();

  • 接收端

vareventBusSec=builder.Configuration.GetSection("EventBus");

builder.Services.Configure<IntegrationEventRabbitMQOptions>(eventBusSec);

builder.Services.AddEventBus("EventBusDemo1_Q2", Assembly.GetExecutingAssembly());

varapp=builder.Build();

app.UseEventBus();

  1. 在需要发布事件的类中注入IEventBus服务,调用Publish方法

[Route("api/[controller]")]

[ApiController]

publicclassDemoController : ControllerBase

{

   privateIEventBuseventBus;

 

   publicDemoController(IEventBuseventBus)

   {

       this.eventBus=eventBus;

   }

 

   [HttpPost]

   publicstringPublish()

   {

       eventBus.Publish("UserAdded", new { UserName="yzk", Age=18 });

       return"ok";

   }

}

  1. 编写事件处理者
  • 实现IIntegrationEventHandler接口

[EventName("UserAdded")] //设定监听的事件名称,和publish中的名称一致,可以增加多个[EventName]来监听多个事件

publicclassUserAddesEventHandler : IIntegrationEventHandler

{

    privatereadonlyILogger<UserAddesEventHandler>logger;

    publicUserAddesEventHandler(ILogger<UserAddesEventHandler>logger)

    {

        this.logger=logger;

    }

   //当收到一个事件后,Handle方法就会被调用,第一个参数为事件的名字,第二个是publish设置的数据,事件数据是以JSON格式传入

    publicTaskHandle(stringeventName, stringeventData)

    {

        logger.LogInformation("新建了用户:"+eventData);

        returnTask.CompletedTask;

    }

}

  • 事件数据是以JSON格式传入,可以使用JsonIntegrationEventHandler<T>接口来解析成.net对象

publicrecordUserData(stringUserName, intAge);

 

[EventName("UserAdded")]

publicclassUserAddesEventHandler3 : JsonIntegrationEventHandler<UserData>

{

    privatereadonlyILogger<UserAddesEventHandler3>logger;

    publicUserAddesEventHandler3(ILogger<UserAddesEventHandler3>logger)

    {

        this.logger=logger;

    }

    publicoverrideTaskHandleJson(stringeventName, UserDataeventData)

    {

        logger.LogInformation($"Json:{eventData.UserName}");

        returnTask.CompletedTask;

    }

}

  • 进行微服务开发时,为了降低耦合,一般不会新建一个UserData类供多个微服务使用。则可以使用DynamicIntegrationEventHandler接口来将JSON解析为dynamic类型

[EventName("UserAdded")]

publicclassUserAddesEventHandler2 : DynamicIntegrationEventHandler

{

   privatereadonlyILogger<UserAddesEventHandler2>logger;

   publicUserAddesEventHandler2(ILogger<UserAddesEventHandler2>logger)

   {

       this.logger=logger;

   }

   publicoverrideTaskHandleDynamic(stringeventName, dynamiceventData)

   {

       logger.LogInformation($"Dynamic:{eventData.UserName}");

       returnTask.CompletedTask;

   }

}


相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
7月前
|
存储 NoSQL Java
大事件后端项目34_登录优化----redis_SpringBoot集成redis
大事件后端项目34_登录优化----redis_SpringBoot集成redis
大事件后端项目34_登录优化----redis_SpringBoot集成redis
|
7月前
|
对象存储
大事件后端项目32--------文件上传_阿里云OSS_程序集成
大事件后端项目32--------文件上传_阿里云OSS_程序集成
|
新零售 敏捷开发 Cloud Native
“全”事件触发:阿里云函数计算与事件总线产品完成全面深度集成
目前,函数计算已具备接入EventBridge所有事件源的触发能力,实现触达阿里云全系产品服务的“最后一公里”。
313 0
|
jenkins 持续交付
【Jenkins】Jenkins集成slack实现事件实时通知
本文做以下事情: 一、首先创建Slack账号 二、在Slack中配置Jenkins集成 三、在Jenkins中安装Slack插件 四、Create a Webhook by visiting Integrations 五、Jenkins中配置Slack 六、调试配置是否成功 .
6047 0
|
3月前
|
Java Maven Docker
gitlab-ci 集成 k3s 部署spring boot 应用
gitlab-ci 集成 k3s 部署spring boot 应用
|
2月前
|
消息中间件 监控 Java
您是否已集成 Spring Boot 与 ActiveMQ?
您是否已集成 Spring Boot 与 ActiveMQ?
56 0
|
6月前
|
监控 druid Java
spring boot 集成配置阿里 Druid监控配置
spring boot 集成配置阿里 Druid监控配置
324 6
|
6月前
|
Java 关系型数据库 MySQL
如何实现Springboot+camunda+mysql的集成
【7月更文挑战第2天】集成Spring Boot、Camunda和MySQL的简要步骤: 1. 初始化Spring Boot项目,添加Camunda和MySQL驱动依赖。 2. 配置`application.properties`,包括数据库URL、用户名和密码。 3. 设置Camunda引擎属性,指定数据源。 4. 引入流程定义文件(如`.bpmn`)。 5. 创建服务处理流程操作,创建控制器接收请求。 6. Camunda自动在数据库创建表结构。 7. 启动应用,测试流程启动,如通过服务和控制器开始流程实例。 示例代码包括服务类启动流程实例及控制器接口。实际集成需按业务需求调整。
451 4
|
6月前
|
消息中间件 Java 测试技术
【RocketMQ系列八】SpringBoot集成RocketMQ-实现普通消息和事务消息
【RocketMQ系列八】SpringBoot集成RocketMQ-实现普通消息和事务消息
405 1

热门文章

最新文章