如何在ASP.NET Core中使用Azure Service Bus Queue

简介: 如何在ASP.NET Core中使用Azure Service Bus Queue原文:USING AZURE SERVICE BUS QUEUES WITH ASP.NET CORE SERVICES作者:damienbod译文:如何在ASP.

如何在ASP.NET Core中使用Azure Service Bus Queue
原文:USING AZURE SERVICE BUS QUEUES WITH ASP.NET CORE SERVICES
作者:damienbod
译文:如何在ASP.NET Core中使用Azure Service Bus Queue
地址:https://www.cnblogs.com/lwqlun/p/10760227.html
作者:Lamond Lu
源代码: https://github.com/lamondlu/AzureServiceBusMessaging

本文展示了如何使用Azure Service Bus Queue, 实现2个ASP.NET Core Api应用之间的消息传输。

配置Azure Service Bus Queue#
你可以从官网文档中了解到如何配置一个Azure Service Bus Queue.

https://docs.microsoft.com/en-us/azure/service-bus-messaging/service-bus-create-namespace-portal

这里我们使用Queue或者Topic来实现消息传输。Queue是一种消息传输类型,一旦一个消息被一个消费者接收了,该消息就会从Queue中被移除。

与Queue不同,Topic提供的是一对多的通讯方式。

架构图#
整个应用的实现如下:

Api 1负责发送消息
Api 2负责监听Azure Service Bus,并处理接收到的消息
实现一个Service Bus Queue#
这里我们首先需要引入 Microsoft.Azure.ServiceBus 程序集。Microsoft.Azure.ServiceBus是Azure Service Bus的客户端库。针对Service Bus的连接字符串我们保存在项目的User Secret中。当部署项目的时候,我们可以使用Azure Key Valut来设置这个Secret值。

在Visual Studio中,右键点击API1, API2项目属性,选择Manage User Secrets就可以管理当前项目使用的所有私密信息。

为了发送向Azure Service Bus Queue发送消息,我们需要创建一个SendMessage方法,并接收一个消息参数。这里我们创建了一个我们自己的消息内容类型MyPayload, 将当前该MyPayload对象序列化成Json字符串, 添加到一个Message对象中。

Copy
using Microsoft.Azure.ServiceBus;
using Microsoft.Extensions.Configuration;
using Newtonsoft.Json;
using System.Text;
using System.Threading.Tasks;

namespace ServiceBusMessaging
{

public class ServiceBusSender
{
    private readonly QueueClient _queueClient;
    private readonly IConfiguration _configuration;
    private const string QUEUE_NAME = "simplequeue";

    public ServiceBusSender(IConfiguration configuration)
    {
        _configuration = configuration;
        _queueClient = new QueueClient(
        _configuration
            .GetConnectionString("ServiceBusConnectionString"), 
            QUEUE_NAME);
    }
     
    public async Task SendMessage(MyPayload payload)
    {
        string data = JsonConvert.SerializeObject(payload);
        Message message = new Message(Encoding.UTF8.GetBytes(data));

        await _queueClient.SendAsync(message);
    }
}

}
在API 1和API 2中,我们需要将ServiceBusSender注册到应用程序的IOC容器中。这里为了测试方便,我们同时注册Swagger服务。

Copy
public void ConfigureServices(IServiceCollection services)
{

services.AddMvc();

services.AddScoped<ServiceBusSender>();

services.AddSwaggerGen(c =>
{
    c.SwaggerDoc("v1", new Info
    {
        Version = "v1",
        Title = "Payload View API",
    });
});

}
接下来,我们就可以在控制器中通过构造函数注入的方式使用这个服务了。

在API1中,我们创建一个POST方法,这个方法会将API接收到Payload对象发送到Azure Service Bus Queue中。

Copy
[HttpPost]
[ProducesResponseType(typeof(Payload), StatusCodes.Status200OK)]
[ProducesResponseType(typeof(Payload), StatusCodes.Status409Conflict)]
public async Task Create(FromBodyPayload request)
{

if (data.Any(d => d.Id == request.Id))
{
    return Conflict($"data with id {request.Id} already exists");
}

data.Add(request);

// Send this to the bus for the other services
await _serviceBusSender.SendMessage(new MyPayload
{
    Goals = request.Goals,
    Name = request.Name,
    Delete = false
});

return Ok(request);

}
从Queue中获取消息#
为了监听Azure Service Bus Queue, 并处理接收到的消息,我们创建了一个新类ServiceBusConsumer,ServiceBusConsumer实现了IServiceBusConsumer接口。

Queue的连接字符串是使用IConfiguration读取的。 RegisterOnMessageHandlerAndReceiveMessages方法负责注册消息处理程序ProcessMessagesAsync处理消息。ProcessMessagesAsync方法会将得到的消息转换成对象,并调用IProcessData接口完成最终的消息处理。

Copy
using Microsoft.Azure.ServiceBus;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

namespace ServiceBusMessaging
{

public interface IServiceBusConsumer
{
    void RegisterOnMessageHandlerAndReceiveMessages();
    Task CloseQueueAsync();
}

public class ServiceBusConsumer : IServiceBusConsumer
{
    private readonly IProcessData _processData;
    private readonly IConfiguration _configuration;
    private readonly QueueClient _queueClient;
    private const string QUEUE_NAME = "simplequeue";
    private readonly ILogger _logger;

    public ServiceBusConsumer(IProcessData processData, 
        IConfiguration configuration, 
        ILogger<ServiceBusConsumer> logger)
    {
        _processData = processData;
        _configuration = configuration;
        _logger = logger;
        _queueClient = new QueueClient(
          _configuration.GetConnectionString("ServiceBusConnectionString"), QUEUE_NAME);
    }

    public void RegisterOnMessageHandlerAndReceiveMessages()
    {
        var messageHandlerOptions = new MessageHandlerOptions(ExceptionReceivedHandler)
        {
            MaxConcurrentCalls = 1,
            AutoComplete = false
        };

        _queueClient.RegisterMessageHandler(ProcessMessagesAsync, messageHandlerOptions);
    }

    private async Task ProcessMessagesAsync(Message message, CancellationToken token)
    {
        var myPayload = JsonConvert.DeserializeObject<MyPayload>(Encoding.UTF8.GetString(message.Body));
        _processData.Process(myPayload);
        await _queueClient.CompleteAsync(message.SystemProperties.LockToken);
    }

    private Task ExceptionReceivedHandler(ExceptionReceivedEventArgs exceptionReceivedEventArgs)
    {
        _logger.LogError(exceptionReceivedEventArgs.Exception, "Message handler encountered an exception");
        var context = exceptionReceivedEventArgs.ExceptionReceivedContext;

        _logger.LogDebug($"- Endpoint: {context.Endpoint}");
        _logger.LogDebug($"- Entity Path: {context.EntityPath}");
        _logger.LogDebug($"- Executing Action: {context.Action}");

        return Task.CompletedTask;
    }

    public async Task CloseQueueAsync()
    {
        await _queueClient.CloseAsync();
    }
}

}
其中IProcessData接口存在于类库项目ServiceBusMessaging中,它是用来处理消息的。

Copy
public interface IProcessData
{

void Process(MyPayload myPayload);

}
在Api 2中,我们创建一个ProcessData类,它实现了IProcessData接口。

Copy
public class ProcessData : IProcessData
{

public void Process(MyPayload myPayload)
{
    DataServiceSimi.Data.Add(new Payload
    {
        Name = myPayload.Name,
        Goals = myPayload.Goals
    });
}

}
这里为了简单测试,我们创建了一个静态类DataServiceSimi,其中存放了API2中所有保存Payload对象。同时,我们还创建了一个新的控制器ViewPayloadMessagesController,在其中添加了一个GET Action,并返回了静态类DataServiceSimi中的所有数据。

Copy
[Route("api/[controller]")]
[ApiController]
public class ViewPayloadMessagesController : ControllerBase
{

[HttpGet]
[ProducesResponseType(StatusCodes.Status200OK)]
public ActionResult<List<Payload>> Get()
{
    return Ok(DataServiceSimi.Data);
}

}
最后我们还需要将ProcessData注册到API2的IOC容器中。

Copy
public void ConfigureServices(IServiceCollection services)
{

services.AddMvc();

services.AddSingleton<IServiceBusConsumer, ServiceBusConsumer>();
services.AddTransient<IProcessData, ProcessData>();

}
最终效果#
现在我们分别启用2个Api项目,并在Api 1的Swagger文档界面,调用POST请求,添加一个Payload

操作完成之后,我们访问Api 2的/api/ViewPayloadMessages, 获得结果如下,Api 1发出的消息出现在了Api 2的结果集中,这说明Api 2从Azure Service Bus Queue中获取了消息,并保存在了自己的静态类DataServiceSimi中。

作者:LamondLu

出处:https://www.cnblogs.com/lwqlun/p/10760227.html

相关文章
|
7天前
|
开发框架 监控 前端开发
在 ASP.NET Core Web API 中使用操作筛选器统一处理通用操作
【9月更文挑战第27天】操作筛选器是ASP.NET Core MVC和Web API中的一种过滤器,可在操作方法执行前后运行代码,适用于日志记录、性能监控和验证等场景。通过实现`IActionFilter`接口的`OnActionExecuting`和`OnActionExecuted`方法,可以统一处理日志、验证及异常。创建并注册自定义筛选器类,能提升代码的可维护性和复用性。
|
7天前
|
开发框架 .NET 中间件
ASP.NET Core Web 开发浅谈
本文介绍ASP.NET Core,一个轻量级、开源的跨平台框架,专为构建高性能Web应用设计。通过简单步骤,你将学会创建首个Web应用。文章还深入探讨了路由配置、依赖注入及安全性配置等常见问题,并提供了实用示例代码以助于理解与避免错误,帮助开发者更好地掌握ASP.NET Core的核心概念。
21 3
|
29天前
|
开发框架 NoSQL .NET
利用分布式锁在ASP.NET Core中实现防抖
【9月更文挑战第5天】在 ASP.NET Core 中,可通过分布式锁实现防抖功能,仅处理连续相同请求中的首个请求,其余请求返回 204 No Content,直至锁释放。具体步骤包括:安装分布式锁库如 `StackExchange.Redis`;创建分布式锁服务接口及其实现;构建防抖中间件;并在 `Startup.cs` 中注册相关服务和中间件。这一机制有效避免了短时间内重复操作的问题。
|
2月前
|
开发框架 监控 .NET
开发者的革新利器:ASP.NET Core实战指南,构建未来Web应用的高效之道
【8月更文挑战第28天】本文探讨了如何利用ASP.NET Core构建高效、可扩展的Web应用。ASP.NET Core是一个开源、跨平台的框架,具有依赖注入、配置管理等特性。文章详细介绍了项目结构规划、依赖注入配置、中间件使用及性能优化方法,并讨论了安全性、可扩展性以及容器化的重要性。通过这些技术要点,开发者能够快速构建出符合现代Web应用需求的应用程序。
35 0
|
28天前
|
开发框架 前端开发 JavaScript
ASP.NET MVC 教程
ASP.NET 是一个使用 HTML、CSS、JavaScript 和服务器脚本创建网页和网站的开发框架。
28 7
|
27天前
|
存储 开发框架 前端开发
ASP.NET MVC 迅速集成 SignalR
ASP.NET MVC 迅速集成 SignalR
38 0
|
2月前
|
开发框架 前端开发 .NET
ASP.NET MVC WebApi 接口返回 JOSN 日期格式化 date format
ASP.NET MVC WebApi 接口返回 JOSN 日期格式化 date format
36 0
|
2月前
|
开发框架 前端开发 安全
ASP.NET MVC 如何使用 Form Authentication?
ASP.NET MVC 如何使用 Form Authentication?
|
2月前
|
开发框架 .NET
Asp.Net Core 使用X.PagedList.Mvc.Core分页 & 搜索
Asp.Net Core 使用X.PagedList.Mvc.Core分页 & 搜索
91 0
|
5月前
|
开发框架 前端开发 .NET
ASP.NET CORE 3.1 MVC“指定的网络名不再可用\企图在不存在的网络连接上进行操作”的问题解决过程
ASP.NET CORE 3.1 MVC“指定的网络名不再可用\企图在不存在的网络连接上进行操作”的问题解决过程
158 0
下一篇
无影云桌面