9.3DDD之集成事件

简介: 和领域事件不同,集成事件主要用于在微服务之间进行事件传递,即可能在多个服务器之间进行通信。本文讲解`RabbitMQ`中间件来完成集成事件的处理。

9.3DDD之集成事件

和领域事件不同,集成事件主要用于在微服务之间进行事件传递,即可能在多个服务器之间进行通信。本文讲解RabbitMQ中间件来完成集成事件的处理。

  1. RabbitMQ的基本概念:
  • 信道(channel),信道是消息的生产者,消费者和服务器之间进行通信的虚拟连接。
  • 队列,队列是用来进行消息收发的地方,生产者将消息放到队列中,消费者从队列中获取消息。
  • 交换机,交换机用于把消息路由到队列中。
  1. RabbitMQ的routing模式:
  • 生产者把消息发布到交换机中,消息会携带routingKey属性,交互机根据routingKey的值吧消息发送到一个或者多个队列,然后消费者从队列中获取消息。这种模式的优点是交换机和队列都位于RabbitMQ服务器的内部,即使消费者不在线,相关消息也会保存在队列中,等消费者上线后就可以获取到消息了。

使用步骤

Nuget安装RabbitMQ.Client

  • 发送消息端

usingRabbitMQ.Client;

usingSystem.Text;

 

varfactory=newConnectionFactory();

factory.HostName="127.0.0.1";//RabbitMQ服务器地址

factory.DispatchConsumersAsync=true;//兼容消费者异步使用

stringexchangeName="exchange1";//交换机的名字

stringeventName="myEvent";// routingKey的值

usingvarconn=factory.CreateConnection();//创建一个客户端到RabbitMQ的TCP连接,尽量重复使用

while (true)

{

   stringmsg=DateTime.Now.TimeOfDay.ToString();//待发送消息

   using (varchannel=conn.CreateModel())//创建信道,信道可以关闭,关闭后消息才会发出

   {

       varproperties=channel.CreateBasicProperties;//创建一个空的内容头

       properties.DeliveryMode=2;//1为非持久2为持久

       //声明指定名字的交换机,如果已有同名的交换机则不再创建

       //type:direct表示交换机会根据消息routingKey的值进行相等性匹配,消息会发布到和它的routingKey绑定的队列

       channel.ExchangeDeclare(exchange: exchangeName, type: "direct");

       byte[] body=Encoding.UTF8.GetBytes(msg);//RabbitMQ的消息只能按照byte[]类型传递

       channel.BasicPublish(exchange: exchangeName, routingKey: eventName,

           mandatory: true, basicProperties: properties, body: body);//发布消息        

   }

   Console.WriteLine("发布了消息:"+msg);

   Thread.Sleep(1000);

}

  • 接收消息端:

usingRabbitMQ.Client;

usingRabbitMQ.Client.Events;

usingSystem.Text;

 

varfactory=newConnectionFactory();

factory.HostName="127.0.0.1";

factory.DispatchConsumersAsync=true;

stringexchangeName="exchange1";

stringeventName="myEvent";

usingvarconn=factory.CreateConnection();

usingvarchannel=conn.CreateModel();

stringqueueName="queue1";

//声明和发送端相同的交换机,如果发送端已经声明了相同的,则这天语句会被忽略

//但是仍然要写,因为不确定是哪端先启动

channel.ExchangeDeclare(exchange: exchangeName, type: "direct");

//声明一个队列来接收交换机转发来的消息,如果已经指定了同名的队列,则自动忽略

//消息从队列中取走后则队列中就没有这个消息了

//如果A、B两程序都想取同一条消息,则需要声明两个不同名字的队列

channel.QueueDeclare(queue: queueName, durable: true,

       exclusive: false, autoDelete: false, arguments: null);

//将队列绑定到交换机上,并设定routingKey参数,这样当交换机收到routingKey的值和设定的值相同时

//会把消息转发到我们指定的队列,一个交换机可绑定多个队列,如果这些队列的routingKey的值相同

//那么交换机收到同一个routingKey的时候,会发送给多个队列

channel.QueueBind(queue: queueName,

   exchange: exchangeName, routingKey: eventName);

//AsyncEventingBasicConsumer用于从队列中接收消息,当一天消息被接收时,Received事件就会被触发

varconsumer=newAsyncEventingBasicConsumer(channel);

//Received是阻塞执行的,也就是一条回调方法执行完成后才会触发下一条Received事件

consumer.Received+=Consumer_Received;//增加处理事件

 

channel.BasicConsume(queue: queueName,autoAck: false, consumer: consumer);//执行

Console.ReadLine();

 

 

asyncTaskConsumer_Received(objectsender, BasicDeliverEventArgsargs)

{

   //RabbitMQ支持消息的失败重发

   try

   {

       varbytes=args.Body.ToArray();

       stringmsg=Encoding.UTF8.GetString(bytes);

       Console.WriteLine(DateTime.Now+"收到了消息"+msg);

       //如果消息处理成功,则调用BasicAck通知队列

       //如果消息没有处理成功,则抵用BasicReject通知队列

       channel.BasicAck(args.DeliveryTag, multiple: false);

       awaitTask.Delay(800);

   }

   catch (Exceptionex)

   {

       channel.BasicReject(args.DeliveryTag, true);

       Console.WriteLine("处理收到的消息出错"+ex);

   }

}

简化框架

  1. Nuget安装Zack.EventBus
  2. 在配置系统下创建EventBus节点

"EventBus": {

   "HostName": "127.0.01",//RabbitMQ服务器地址

   "ExchangeName": "EventBusDemo1"//交换机的名字

 }

  1. 在program.cs中进行配置
  • 发送端

vareventBusSec=builder.Configuration.GetSection("EventBus");

builder.Services.Configure<IntegrationEventRabbitMQOptions>(eventBusSec);

//第一个参数用来设定程序绑定的队列的名字,一般一个微服务使用一个名字

//但是同一个微服务项目的每个集群实例都要收到消息则不能使用一个名字

//第二个参数为含有监听继承事件的处理者代码的程序集

builder.Services.AddEventBus("EventBusDemo1_Q1", Assembly.GetExecutingAssembly());

varapp=builder.Build();

app.UseEventBus();

  • 接收端

vareventBusSec=builder.Configuration.GetSection("EventBus");

builder.Services.Configure<IntegrationEventRabbitMQOptions>(eventBusSec);

builder.Services.AddEventBus("EventBusDemo1_Q2", Assembly.GetExecutingAssembly());

varapp=builder.Build();

app.UseEventBus();

  1. 在需要发布事件的类中注入IEventBus服务,调用Publish方法

[Route("api/[controller]")]

[ApiController]

publicclassDemoController : ControllerBase

{

   privateIEventBuseventBus;

 

   publicDemoController(IEventBuseventBus)

   {

       this.eventBus=eventBus;

   }

 

   [HttpPost]

   publicstringPublish()

   {

       eventBus.Publish("UserAdded", new { UserName="yzk", Age=18 });

       return"ok";

   }

}

  1. 编写事件处理者
  • 实现IIntegrationEventHandler接口

[EventName("UserAdded")] //设定监听的事件名称,和publish中的名称一致,可以增加多个[EventName]来监听多个事件

publicclassUserAddesEventHandler : IIntegrationEventHandler

{

    privatereadonlyILogger<UserAddesEventHandler>logger;

    publicUserAddesEventHandler(ILogger<UserAddesEventHandler>logger)

    {

        this.logger=logger;

    }

   //当收到一个事件后,Handle方法就会被调用,第一个参数为事件的名字,第二个是publish设置的数据,事件数据是以JSON格式传入

    publicTaskHandle(stringeventName, stringeventData)

    {

        logger.LogInformation("新建了用户:"+eventData);

        returnTask.CompletedTask;

    }

}

  • 事件数据是以JSON格式传入,可以使用JsonIntegrationEventHandler<T>接口来解析成.net对象

publicrecordUserData(stringUserName, intAge);

 

[EventName("UserAdded")]

publicclassUserAddesEventHandler3 : JsonIntegrationEventHandler<UserData>

{

    privatereadonlyILogger<UserAddesEventHandler3>logger;

    publicUserAddesEventHandler3(ILogger<UserAddesEventHandler3>logger)

    {

        this.logger=logger;

    }

    publicoverrideTaskHandleJson(stringeventName, UserDataeventData)

    {

        logger.LogInformation($"Json:{eventData.UserName}");

        returnTask.CompletedTask;

    }

}

  • 进行微服务开发时,为了降低耦合,一般不会新建一个UserData类供多个微服务使用。则可以使用DynamicIntegrationEventHandler接口来将JSON解析为dynamic类型

[EventName("UserAdded")]

publicclassUserAddesEventHandler2 : DynamicIntegrationEventHandler

{

   privatereadonlyILogger<UserAddesEventHandler2>logger;

   publicUserAddesEventHandler2(ILogger<UserAddesEventHandler2>logger)

   {

       this.logger=logger;

   }

   publicoverrideTaskHandleDynamic(stringeventName, dynamiceventData)

   {

       logger.LogInformation($"Dynamic:{eventData.UserName}");

       returnTask.CompletedTask;

   }

}


相关实践学习
RocketMQ一站式入门使用
从源码编译、部署broker、部署namesrv,使用java客户端首发消息等一站式入门RocketMQ。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
新零售 敏捷开发 Cloud Native
“全”事件触发:阿里云函数计算与事件总线产品完成全面深度集成
目前,函数计算已具备接入EventBridge所有事件源的触发能力,实现触达阿里云全系产品服务的“最后一公里”。
249 0
“全”事件触发:阿里云函数计算与事件总线产品完成全面深度集成
|
jenkins 持续交付
【Jenkins】Jenkins集成slack实现事件实时通知
本文做以下事情: 一、首先创建Slack账号 二、在Slack中配置Jenkins集成 三、在Jenkins中安装Slack插件 四、Create a Webhook by visiting Integrations 五、Jenkins中配置Slack 六、调试配置是否成功 .
5960 0
|
6天前
|
前端开发 Java 应用服务中间件
从零手写实现 tomcat-08-tomcat 如何与 springboot 集成?
该文是一系列关于从零开始手写实现 Apache Tomcat 的教程概述。作者希望通过亲自动手实践理解 Tomcat 的核心机制。文章讨论了 Spring Boot 如何实现直接通过 `main` 方法启动,Spring 与 Tomcat 容器的集成方式,以及两者生命周期的同步原理。文中还提出了实现 Tomcat 的启发,强调在设计启动流程时确保资源的正确加载和初始化。最后提到了一个名为 mini-cat(嗅虎)的简易 Tomcat 实现项目,开源于 [GitHub](https://github.com/houbb/minicat)。
|
6天前
|
消息中间件 Java Kafka
Springboot集成高低版本kafka
Springboot集成高低版本kafka
|
6天前
|
存储 JSON Java
SpringBoot集成AOP实现每个接口请求参数和返回参数并记录每个接口请求时间
SpringBoot集成AOP实现每个接口请求参数和返回参数并记录每个接口请求时间
50 2
|
6天前
|
前端开发 Java 应用服务中间件
从零手写实现 tomcat-08-tomcat 如何与 springboot 集成?
本文探讨了Spring Boot如何实现像普通Java程序一样通过main方法启动,关键在于Spring Boot的自动配置、内嵌Servlet容器(如Tomcat)以及`SpringApplication`类。Spring与Tomcat集成有两种方式:独立模式和嵌入式模式,两者通过Servlet规范、Spring MVC协同工作。Spring和Tomcat的生命周期同步涉及启动、运行和关闭阶段,通过事件和监听器实现。文章鼓励读者从实现Tomcat中学习资源管理和生命周期管理。此外,推荐了Netty权威指南系列文章,并提到了一个名为mini-cat的简易Tomcat实现项目。
|
5天前
|
Java 数据库连接 数据安全/隐私保护
springBoot集成token认证,最全Java面试知识点梳理
springBoot集成token认证,最全Java面试知识点梳理
|
6天前
|
消息中间件 JSON Java
RabbitMQ的springboot项目集成使用-01
RabbitMQ的springboot项目集成使用-01
|
6天前
|
搜索推荐 Java 数据库
springboot集成ElasticSearch的具体操作(系统全文检索)
springboot集成ElasticSearch的具体操作(系统全文检索)

热门文章

最新文章