如何在ASP.NET Core中使用Azure Service Bus Queue

简介: 如何在ASP.NET Core中使用Azure Service Bus Queue原文:USING AZURE SERVICE BUS QUEUES WITH ASP.NET CORE SERVICES作者:damienbod译文:如何在ASP.

如何在ASP.NET Core中使用Azure Service Bus Queue
原文:USING AZURE SERVICE BUS QUEUES WITH ASP.NET CORE SERVICES
作者:damienbod
译文:如何在ASP.NET Core中使用Azure Service Bus Queue
地址:https://www.cnblogs.com/lwqlun/p/10760227.html
作者:Lamond Lu
源代码: https://github.com/lamondlu/AzureServiceBusMessaging

本文展示了如何使用Azure Service Bus Queue, 实现2个ASP.NET Core Api应用之间的消息传输。

配置Azure Service Bus Queue#
你可以从官网文档中了解到如何配置一个Azure Service Bus Queue.

https://docs.microsoft.com/en-us/azure/service-bus-messaging/service-bus-create-namespace-portal

这里我们使用Queue或者Topic来实现消息传输。Queue是一种消息传输类型,一旦一个消息被一个消费者接收了,该消息就会从Queue中被移除。

与Queue不同,Topic提供的是一对多的通讯方式。

架构图#
整个应用的实现如下:

Api 1负责发送消息
Api 2负责监听Azure Service Bus,并处理接收到的消息
实现一个Service Bus Queue#
这里我们首先需要引入 Microsoft.Azure.ServiceBus 程序集。Microsoft.Azure.ServiceBus是Azure Service Bus的客户端库。针对Service Bus的连接字符串我们保存在项目的User Secret中。当部署项目的时候,我们可以使用Azure Key Valut来设置这个Secret值。

在Visual Studio中,右键点击API1, API2项目属性,选择Manage User Secrets就可以管理当前项目使用的所有私密信息。

为了发送向Azure Service Bus Queue发送消息,我们需要创建一个SendMessage方法,并接收一个消息参数。这里我们创建了一个我们自己的消息内容类型MyPayload, 将当前该MyPayload对象序列化成Json字符串, 添加到一个Message对象中。

Copy
using Microsoft.Azure.ServiceBus;
using Microsoft.Extensions.Configuration;
using Newtonsoft.Json;
using System.Text;
using System.Threading.Tasks;

namespace ServiceBusMessaging
{

public class ServiceBusSender
{
    private readonly QueueClient _queueClient;
    private readonly IConfiguration _configuration;
    private const string QUEUE_NAME = "simplequeue";

    public ServiceBusSender(IConfiguration configuration)
    {
        _configuration = configuration;
        _queueClient = new QueueClient(
        _configuration
            .GetConnectionString("ServiceBusConnectionString"), 
            QUEUE_NAME);
    }
     
    public async Task SendMessage(MyPayload payload)
    {
        string data = JsonConvert.SerializeObject(payload);
        Message message = new Message(Encoding.UTF8.GetBytes(data));

        await _queueClient.SendAsync(message);
    }
}

}
在API 1和API 2中,我们需要将ServiceBusSender注册到应用程序的IOC容器中。这里为了测试方便,我们同时注册Swagger服务。

Copy
public void ConfigureServices(IServiceCollection services)
{

services.AddMvc();

services.AddScoped<ServiceBusSender>();

services.AddSwaggerGen(c =>
{
    c.SwaggerDoc("v1", new Info
    {
        Version = "v1",
        Title = "Payload View API",
    });
});

}
接下来,我们就可以在控制器中通过构造函数注入的方式使用这个服务了。

在API1中,我们创建一个POST方法,这个方法会将API接收到Payload对象发送到Azure Service Bus Queue中。

Copy
[HttpPost]
[ProducesResponseType(typeof(Payload), StatusCodes.Status200OK)]
[ProducesResponseType(typeof(Payload), StatusCodes.Status409Conflict)]
public async Task Create(FromBodyPayload request)
{

if (data.Any(d => d.Id == request.Id))
{
    return Conflict($"data with id {request.Id} already exists");
}

data.Add(request);

// Send this to the bus for the other services
await _serviceBusSender.SendMessage(new MyPayload
{
    Goals = request.Goals,
    Name = request.Name,
    Delete = false
});

return Ok(request);

}
从Queue中获取消息#
为了监听Azure Service Bus Queue, 并处理接收到的消息,我们创建了一个新类ServiceBusConsumer,ServiceBusConsumer实现了IServiceBusConsumer接口。

Queue的连接字符串是使用IConfiguration读取的。 RegisterOnMessageHandlerAndReceiveMessages方法负责注册消息处理程序ProcessMessagesAsync处理消息。ProcessMessagesAsync方法会将得到的消息转换成对象,并调用IProcessData接口完成最终的消息处理。

Copy
using Microsoft.Azure.ServiceBus;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

namespace ServiceBusMessaging
{

public interface IServiceBusConsumer
{
    void RegisterOnMessageHandlerAndReceiveMessages();
    Task CloseQueueAsync();
}

public class ServiceBusConsumer : IServiceBusConsumer
{
    private readonly IProcessData _processData;
    private readonly IConfiguration _configuration;
    private readonly QueueClient _queueClient;
    private const string QUEUE_NAME = "simplequeue";
    private readonly ILogger _logger;

    public ServiceBusConsumer(IProcessData processData, 
        IConfiguration configuration, 
        ILogger<ServiceBusConsumer> logger)
    {
        _processData = processData;
        _configuration = configuration;
        _logger = logger;
        _queueClient = new QueueClient(
          _configuration.GetConnectionString("ServiceBusConnectionString"), QUEUE_NAME);
    }

    public void RegisterOnMessageHandlerAndReceiveMessages()
    {
        var messageHandlerOptions = new MessageHandlerOptions(ExceptionReceivedHandler)
        {
            MaxConcurrentCalls = 1,
            AutoComplete = false
        };

        _queueClient.RegisterMessageHandler(ProcessMessagesAsync, messageHandlerOptions);
    }

    private async Task ProcessMessagesAsync(Message message, CancellationToken token)
    {
        var myPayload = JsonConvert.DeserializeObject<MyPayload>(Encoding.UTF8.GetString(message.Body));
        _processData.Process(myPayload);
        await _queueClient.CompleteAsync(message.SystemProperties.LockToken);
    }

    private Task ExceptionReceivedHandler(ExceptionReceivedEventArgs exceptionReceivedEventArgs)
    {
        _logger.LogError(exceptionReceivedEventArgs.Exception, "Message handler encountered an exception");
        var context = exceptionReceivedEventArgs.ExceptionReceivedContext;

        _logger.LogDebug($"- Endpoint: {context.Endpoint}");
        _logger.LogDebug($"- Entity Path: {context.EntityPath}");
        _logger.LogDebug($"- Executing Action: {context.Action}");

        return Task.CompletedTask;
    }

    public async Task CloseQueueAsync()
    {
        await _queueClient.CloseAsync();
    }
}

}
其中IProcessData接口存在于类库项目ServiceBusMessaging中,它是用来处理消息的。

Copy
public interface IProcessData
{

void Process(MyPayload myPayload);

}
在Api 2中,我们创建一个ProcessData类,它实现了IProcessData接口。

Copy
public class ProcessData : IProcessData
{

public void Process(MyPayload myPayload)
{
    DataServiceSimi.Data.Add(new Payload
    {
        Name = myPayload.Name,
        Goals = myPayload.Goals
    });
}

}
这里为了简单测试,我们创建了一个静态类DataServiceSimi,其中存放了API2中所有保存Payload对象。同时,我们还创建了一个新的控制器ViewPayloadMessagesController,在其中添加了一个GET Action,并返回了静态类DataServiceSimi中的所有数据。

Copy
[Route("api/[controller]")]
[ApiController]
public class ViewPayloadMessagesController : ControllerBase
{

[HttpGet]
[ProducesResponseType(StatusCodes.Status200OK)]
public ActionResult<List<Payload>> Get()
{
    return Ok(DataServiceSimi.Data);
}

}
最后我们还需要将ProcessData注册到API2的IOC容器中。

Copy
public void ConfigureServices(IServiceCollection services)
{

services.AddMvc();

services.AddSingleton<IServiceBusConsumer, ServiceBusConsumer>();
services.AddTransient<IProcessData, ProcessData>();

}
最终效果#
现在我们分别启用2个Api项目,并在Api 1的Swagger文档界面,调用POST请求,添加一个Payload

操作完成之后,我们访问Api 2的/api/ViewPayloadMessages, 获得结果如下,Api 1发出的消息出现在了Api 2的结果集中,这说明Api 2从Azure Service Bus Queue中获取了消息,并保存在了自己的静态类DataServiceSimi中。

作者:LamondLu

出处:https://www.cnblogs.com/lwqlun/p/10760227.html

相关文章
|
1月前
|
存储 开发框架 JSON
ASP.NET Core OData 9 正式发布
【10月更文挑战第8天】Microsoft 在 2024 年 8 月 30 日宣布推出 ASP.NET Core OData 9,此版本与 .NET 8 的 OData 库保持一致,改进了数据编码以符合 OData 规范,并放弃了对旧版 .NET Framework 的支持,仅支持 .NET 8 及更高版本。新版本引入了更快的 JSON 编写器 `System.Text.UTF8JsonWriter`,优化了内存使用和序列化速度。
|
14天前
|
开发框架 监控 .NET
【Azure App Service】部署在App Service上的.NET应用内存消耗不能超过2GB的情况分析
x64 dotnet runtime is not installed on the app service by default. Since we had the app service running in x64, it was proxying the request to a 32 bit dotnet process which was throwing an OutOfMemoryException with requests >100MB. It worked on the IaaS servers because we had the x64 runtime install
|
1月前
|
安全 网络安全 数据安全/隐私保护
【Azure Developer】System.Net.WebException: The request was aborted: Could not create SSL/TLS secure channel.
System.Net.WebException: The request was aborted: Could not create SSL/TLS secure channel.
|
1月前
mcr.microsoft.com/dotnet/core/aspnet:2.1安装libgdiplus
mcr.microsoft.com/dotnet/core/aspnet:2.1安装libgdiplus
30 1
|
2月前
|
开发框架 监控 前端开发
在 ASP.NET Core Web API 中使用操作筛选器统一处理通用操作
【9月更文挑战第27天】操作筛选器是ASP.NET Core MVC和Web API中的一种过滤器,可在操作方法执行前后运行代码,适用于日志记录、性能监控和验证等场景。通过实现`IActionFilter`接口的`OnActionExecuting`和`OnActionExecuted`方法,可以统一处理日志、验证及异常。创建并注册自定义筛选器类,能提升代码的可维护性和复用性。
|
2月前
|
开发框架 .NET 中间件
ASP.NET Core Web 开发浅谈
本文介绍ASP.NET Core,一个轻量级、开源的跨平台框架,专为构建高性能Web应用设计。通过简单步骤,你将学会创建首个Web应用。文章还深入探讨了路由配置、依赖注入及安全性配置等常见问题,并提供了实用示例代码以助于理解与避免错误,帮助开发者更好地掌握ASP.NET Core的核心概念。
100 3
|
1月前
|
开发框架 JavaScript 前端开发
一个适用于 ASP.NET Core 的轻量级插件框架
一个适用于 ASP.NET Core 的轻量级插件框架
|
2月前
|
开发框架 前端开发 JavaScript
ASP.NET MVC 教程
ASP.NET 是一个使用 HTML、CSS、JavaScript 和服务器脚本创建网页和网站的开发框架。
43 7
|
2月前
|
存储 开发框架 前端开发
ASP.NET MVC 迅速集成 SignalR
ASP.NET MVC 迅速集成 SignalR
63 0
|
3月前
|
开发框架 前端开发 .NET
ASP.NET MVC WebApi 接口返回 JOSN 日期格式化 date format
ASP.NET MVC WebApi 接口返回 JOSN 日期格式化 date format
49 0
下一篇
无影云桌面