9.3DDD之集成事件
和领域事件不同,集成事件主要用于在微服务之间进行事件传递,即可能在多个服务器之间进行通信。本文讲解RabbitMQ
中间件来完成集成事件的处理。
- RabbitMQ的基本概念:
- 信道(channel),信道是消息的生产者,消费者和服务器之间进行通信的虚拟连接。
- 队列,队列是用来进行消息收发的地方,生产者将消息放到队列中,消费者从队列中获取消息。
- 交换机,交换机用于把消息路由到队列中。
- RabbitMQ的routing模式:
- 生产者把消息发布到交换机中,消息会携带routingKey属性,交互机根据routingKey的值吧消息发送到一个或者多个队列,然后消费者从队列中获取消息。这种模式的优点是交换机和队列都位于RabbitMQ服务器的内部,即使消费者不在线,相关消息也会保存在队列中,等消费者上线后就可以获取到消息了。
使用步骤
Nuget安装RabbitMQ.Client
- 发送消息端
usingRabbitMQ.Client;
usingSystem.Text;
varfactory=newConnectionFactory();
factory.HostName="127.0.0.1";//RabbitMQ服务器地址
factory.DispatchConsumersAsync=true;//兼容消费者异步使用
stringexchangeName="exchange1";//交换机的名字
stringeventName="myEvent";// routingKey的值
usingvarconn=factory.CreateConnection();//创建一个客户端到RabbitMQ的TCP连接,尽量重复使用
while (true)
{
stringmsg=DateTime.Now.TimeOfDay.ToString();//待发送消息
using (varchannel=conn.CreateModel())//创建信道,信道可以关闭,关闭后消息才会发出
{
varproperties=channel.CreateBasicProperties;//创建一个空的内容头
properties.DeliveryMode=2;//1为非持久2为持久
//声明指定名字的交换机,如果已有同名的交换机则不再创建
//type:direct表示交换机会根据消息routingKey的值进行相等性匹配,消息会发布到和它的routingKey绑定的队列
channel.ExchangeDeclare(exchange: exchangeName, type: "direct");
byte[] body=Encoding.UTF8.GetBytes(msg);//RabbitMQ的消息只能按照byte[]类型传递
channel.BasicPublish(exchange: exchangeName, routingKey: eventName,
mandatory: true, basicProperties: properties, body: body);//发布消息
}
Console.WriteLine("发布了消息:"+msg);
Thread.Sleep(1000);
}
- 接收消息端:
usingRabbitMQ.Client;
usingRabbitMQ.Client.Events;
usingSystem.Text;
varfactory=newConnectionFactory();
factory.HostName="127.0.0.1";
factory.DispatchConsumersAsync=true;
stringexchangeName="exchange1";
stringeventName="myEvent";
usingvarconn=factory.CreateConnection();
usingvarchannel=conn.CreateModel();
stringqueueName="queue1";
//声明和发送端相同的交换机,如果发送端已经声明了相同的,则这天语句会被忽略
//但是仍然要写,因为不确定是哪端先启动
channel.ExchangeDeclare(exchange: exchangeName, type: "direct");
//声明一个队列来接收交换机转发来的消息,如果已经指定了同名的队列,则自动忽略
//消息从队列中取走后则队列中就没有这个消息了
//如果A、B两程序都想取同一条消息,则需要声明两个不同名字的队列
channel.QueueDeclare(queue: queueName, durable: true,
exclusive: false, autoDelete: false, arguments: null);
//将队列绑定到交换机上,并设定routingKey参数,这样当交换机收到routingKey的值和设定的值相同时
//会把消息转发到我们指定的队列,一个交换机可绑定多个队列,如果这些队列的routingKey的值相同
//那么交换机收到同一个routingKey的时候,会发送给多个队列
channel.QueueBind(queue: queueName,
exchange: exchangeName, routingKey: eventName);
//AsyncEventingBasicConsumer用于从队列中接收消息,当一天消息被接收时,Received事件就会被触发
varconsumer=newAsyncEventingBasicConsumer(channel);
//Received是阻塞执行的,也就是一条回调方法执行完成后才会触发下一条Received事件
consumer.Received+=Consumer_Received;//增加处理事件
channel.BasicConsume(queue: queueName,autoAck: false, consumer: consumer);//执行
Console.ReadLine();
asyncTaskConsumer_Received(objectsender, BasicDeliverEventArgsargs)
{
//RabbitMQ支持消息的失败重发
try
{
varbytes=args.Body.ToArray();
stringmsg=Encoding.UTF8.GetString(bytes);
Console.WriteLine(DateTime.Now+"收到了消息"+msg);
//如果消息处理成功,则调用BasicAck通知队列
//如果消息没有处理成功,则抵用BasicReject通知队列
channel.BasicAck(args.DeliveryTag, multiple: false);
awaitTask.Delay(800);
}
catch (Exceptionex)
{
channel.BasicReject(args.DeliveryTag, true);
Console.WriteLine("处理收到的消息出错"+ex);
}
}
简化框架
- Nuget安装
Zack.EventBus
- 在配置系统下创建EventBus节点
"EventBus": {
"HostName": "127.0.01",//RabbitMQ服务器地址
"ExchangeName": "EventBusDemo1"//交换机的名字
}
- 在program.cs中进行配置
- 发送端
vareventBusSec=builder.Configuration.GetSection("EventBus");
builder.Services.Configure<IntegrationEventRabbitMQOptions>(eventBusSec);
//第一个参数用来设定程序绑定的队列的名字,一般一个微服务使用一个名字
//但是同一个微服务项目的每个集群实例都要收到消息则不能使用一个名字
//第二个参数为含有监听继承事件的处理者代码的程序集
builder.Services.AddEventBus("EventBusDemo1_Q1", Assembly.GetExecutingAssembly());
varapp=builder.Build();
app.UseEventBus();
- 接收端
vareventBusSec=builder.Configuration.GetSection("EventBus");
builder.Services.Configure<IntegrationEventRabbitMQOptions>(eventBusSec);
builder.Services.AddEventBus("EventBusDemo1_Q2", Assembly.GetExecutingAssembly());
varapp=builder.Build();
app.UseEventBus();
- 在需要发布事件的类中注入IEventBus服务,调用Publish方法
[Route("api/[controller]")]
[ApiController]
publicclassDemoController : ControllerBase
{
privateIEventBuseventBus;
publicDemoController(IEventBuseventBus)
{
this.eventBus=eventBus;
}
[HttpPost]
publicstringPublish()
{
eventBus.Publish("UserAdded", new { UserName="yzk", Age=18 });
return"ok";
}
}
- 编写事件处理者
- 实现
IIntegrationEventHandler
接口
[EventName("UserAdded")] //设定监听的事件名称,和publish中的名称一致,可以增加多个[EventName]来监听多个事件
publicclassUserAddesEventHandler : IIntegrationEventHandler
{
privatereadonlyILogger<UserAddesEventHandler>logger;
publicUserAddesEventHandler(ILogger<UserAddesEventHandler>logger)
{
this.logger=logger;
}
//当收到一个事件后,Handle方法就会被调用,第一个参数为事件的名字,第二个是publish设置的数据,事件数据是以JSON格式传入
publicTaskHandle(stringeventName, stringeventData)
{
logger.LogInformation("新建了用户:"+eventData);
returnTask.CompletedTask;
}
}
- 事件数据是以JSON格式传入,可以使用
JsonIntegrationEventHandler<T>
接口来解析成.net对象
publicrecordUserData(stringUserName, intAge);
[EventName("UserAdded")]
publicclassUserAddesEventHandler3 : JsonIntegrationEventHandler<UserData>
{
privatereadonlyILogger<UserAddesEventHandler3>logger;
publicUserAddesEventHandler3(ILogger<UserAddesEventHandler3>logger)
{
this.logger=logger;
}
publicoverrideTaskHandleJson(stringeventName, UserDataeventData)
{
logger.LogInformation($"Json:{eventData.UserName}");
returnTask.CompletedTask;
}
}
- 进行微服务开发时,为了降低耦合,一般不会新建一个UserData类供多个微服务使用。则可以使用
DynamicIntegrationEventHandler
接口来将JSON解析为dynamic类型
[EventName("UserAdded")]
publicclassUserAddesEventHandler2 : DynamicIntegrationEventHandler
{
privatereadonlyILogger<UserAddesEventHandler2>logger;
publicUserAddesEventHandler2(ILogger<UserAddesEventHandler2>logger)
{
this.logger=logger;
}
publicoverrideTaskHandleDynamic(stringeventName, dynamiceventData)
{
logger.LogInformation($"Dynamic:{eventData.UserName}");
returnTask.CompletedTask;
}
}