如何利用.NETCore向Azure EventHubs准实时批量发送数据?

简介: Azure事件中心的基础用法.NET Core准实时分批向Azure事件中心发送数据,其中用到的TPL Dataflow以actor模型:提供了粗粒度的数据流和流水线任务,提高了高并发程序的健壮性。

最近在做一个基于Azure云的物联网分析项目:


2c17a94413e779c1f743f6c16e12c3a7.png


.netcore采集程序向Azure事件中心(EventHubs)发送数据,通过Azure EventHubs Capture转储到Azure BlogStorage,供数据科学团队分析。


为什么使用Azure事件中心?


Azure事件中心是一种Azure上完全托管的实时数据摄取服务, 每秒可流式传输来自website、app、device任何源的数百万个事件。提供的统一流式处理平台和时间保留缓冲区,将事件生成者和事件使用者分开。


  • 事件生成者:可使用https、AQMP协议发布事件


  • 分区:事件中心通过分区使用者模式提供消息流式处理功能,提高可用性和并行化


  • 事件接收者:所有事件中心使用者通过AMQP 1.0会话进行连接,读取数据


956b9f1c3258de7860560512d42a6829.png


例如,如果事件中心具有四个分区,并且其中一个分区要在负载均衡操作中从一台服务器移动到另一台服务器,则仍可以通过其他三个分区进行发送和接收。此外,具有更多分区可以让更多并发读取器处理数据,从而提高聚合吞吐量。了解分布式系统中分区和排序的意义是解决方案设计的重要方面。 为了帮助说明排序与可用性之间的权衡,请参阅 CAP 定理


最直观的方式:请在portal.azure.cn门户站点---->创建事件中心命名空间---> 创建事件中心


a7f05a192378dc4f4f49525ffde6a56f.png


.NetCore 准实时批量发送数据到事件中心


.NET库 (Azure.Messaging.EventHubs)


我们使用Asp.NetCore以Azure App Service形式部署,依赖Azure App Service的自动缩放能录应对物联网的潮汐大流量。


通常推荐批量发送到事件中心,能有效增加web服务的吞吐量和响应能力。

目前新版SDk:Azure.Messaging.EventHubs仅支持分批发送。


  1. nuget上引入Azure.Messaging.EventHubs库


  1. EventHubProducerClient客户端负责分批发送数据到事件中心,根据发送时指定的选项,事件数据可能会自动路由到可用分区或发送到特定请求的分区。


在以下情况下,建议允许自动路由分区:


1) 事件的发送必须高度可用


2) 事件数据应在所有可用分区之间平均分配。


自动路由分区的规则:


1)使用循环法将事件平均分配到所有可用分区中


2)如果某个分区不可用,事件中心将自动检测到该分区并将消息转发到另一个可用分区。


我们要注意,根据选定的 命令空间定价层, 每批次发给事件中心的最大消息字节大小也不一样:


fc38a91633514817d5f51c61d4c82b32.png


分段批量发送策略


这里我们就需要思考:web程序收集数据是以个数为单位;但是我们分批发送时要根据分批的字节大小来切分。


我的方案是:因引入TPL Dataflow 管道:


9501b98b120fa7873306b72192c566e6.png


  1. web程序收到数据,立刻丢入TransformBlock<string, EventData>


  1. 转换到EventData之后,使用BatchBlock<EventData>按照配置的个数打包


  1. 利用ActionBlock<EventData[]>在包内 累积指定字节大小批量发送


  • 最后我们设置一个定时器(5min),强制在BatchBlock的前置队列未满时打包发送。

核心的TPL Dataflow代码如下:


public class MsgBatchSender
    {
        private readonly EventHubProducerClient Client;
        private readonly TransformBlock<string, EventData> _transformBlock;
        private readonly BatchBlock<EventData> _packer;
        private readonly ActionBlock<EventData[]> _batchSender;
        private readonly DataflowOption _dataflowOption;
        private readonly Timer _trigger;
        private readonly ILogger _logger;
        public MsgBatchSender(EventHubProducerClient client, IOptions<DataflowOption> option,ILoggerFactory loggerFactory)
        {
            Client = client;
            _dataflowOption = option.Value;
            var dfLinkoption = new DataflowLinkOptions { PropagateCompletion = true };
            _transformBlock = new TransformBlock<string, EventData>(
                text => new EventData(Encoding.UTF8.GetBytes(text)),
                   new ExecutionDataflowBlockOptions
                   {
                       MaxDegreeOfParallelism = _dataflowOption.MaxDegreeOfParallelism
                   });
            _packer = new BatchBlock<EventData>(_dataflowOption.BatchSize);
            _batchSender = new ActionBlock<EventData[]>(msgs=> BatchSendAsync(msgs));
            _packer.LinkTo(_batchSender, dfLinkoption);
            _transformBlock.LinkTo(_packer, dfLinkoption, x => x != null);
            _trigger = new Timer(_ => _packer.TriggerBatch(), null, TimeSpan.Zero, TimeSpan.FromSeconds(_dataflowOption.TriggerInterval));
            _logger = loggerFactory.CreateLogger<DataTrackerMiddleware>();
        }
        private async Task BatchSendAsync(EventData[] msgs)
        {
            try
            {
                if (msgs != null)
                {
                    var i = 0;
                    while (i < msgs.Length)
                    {
                        var batch = await Client.CreateBatchAsync();
                        while (i < msgs.Length)
                        {
                            if (batch.TryAdd(msgs[i++]) == false)
                            {
                                break;
                            }
                        }
                        if(batch!= null && batch.Count>0)
                        {
                            await Client.SendAsync(batch);
                            batch.Dispose();
                        }
                    }
                }
            }
             catch (Exception ex)
            {
                // ignore and log any exception
                _logger.LogError(ex, "SendEventsAsync: {error}", ex.Message);
            }
        }
        public  async Task<bool> PostMsgsync(string txt)
        {
            return await _transformBlock.SendAsync(txt);
        }
        public async Task CompleteAsync()
        {
            _transformBlock.Complete();
            await _transformBlock.Completion;
            await _batchSender.Completion;
            await _batchSender.Completion;
        }
    }


fbebfff67a7ce8ade8fac545815c3035.png


总结


  • Azure事件中心的基础用法


  • .NET Core准实时分批向Azure事件中心发送数据,其中用到的TPL Dataflow以actor模型:提供了粗粒度的数据流和流水线任务,提高了高并发程序的健壮性。
相关文章
|
1月前
|
存储 开发框架 JSON
ASP.NET Core OData 9 正式发布
【10月更文挑战第8天】Microsoft 在 2024 年 8 月 30 日宣布推出 ASP.NET Core OData 9,此版本与 .NET 8 的 OData 库保持一致,改进了数据编码以符合 OData 规范,并放弃了对旧版 .NET Framework 的支持,仅支持 .NET 8 及更高版本。新版本引入了更快的 JSON 编写器 `System.Text.UTF8JsonWriter`,优化了内存使用和序列化速度。
|
2月前
|
开发框架 监控 前端开发
在 ASP.NET Core Web API 中使用操作筛选器统一处理通用操作
【9月更文挑战第27天】操作筛选器是ASP.NET Core MVC和Web API中的一种过滤器,可在操作方法执行前后运行代码,适用于日志记录、性能监控和验证等场景。通过实现`IActionFilter`接口的`OnActionExecuting`和`OnActionExecuted`方法,可以统一处理日志、验证及异常。创建并注册自定义筛选器类,能提升代码的可维护性和复用性。
|
2月前
|
开发框架 .NET 中间件
ASP.NET Core Web 开发浅谈
本文介绍ASP.NET Core,一个轻量级、开源的跨平台框架,专为构建高性能Web应用设计。通过简单步骤,你将学会创建首个Web应用。文章还深入探讨了路由配置、依赖注入及安全性配置等常见问题,并提供了实用示例代码以助于理解与避免错误,帮助开发者更好地掌握ASP.NET Core的核心概念。
89 3
|
27天前
|
开发框架 JavaScript 前端开发
一个适用于 ASP.NET Core 的轻量级插件框架
一个适用于 ASP.NET Core 的轻量级插件框架
|
2月前
|
开发框架 NoSQL .NET
利用分布式锁在ASP.NET Core中实现防抖
【9月更文挑战第5天】在 ASP.NET Core 中,可通过分布式锁实现防抖功能,仅处理连续相同请求中的首个请求,其余请求返回 204 No Content,直至锁释放。具体步骤包括:安装分布式锁库如 `StackExchange.Redis`;创建分布式锁服务接口及其实现;构建防抖中间件;并在 `Startup.cs` 中注册相关服务和中间件。这一机制有效避免了短时间内重复操作的问题。
|
3月前
|
开发框架 监控 .NET
开发者的革新利器:ASP.NET Core实战指南,构建未来Web应用的高效之道
【8月更文挑战第28天】本文探讨了如何利用ASP.NET Core构建高效、可扩展的Web应用。ASP.NET Core是一个开源、跨平台的框架,具有依赖注入、配置管理等特性。文章详细介绍了项目结构规划、依赖注入配置、中间件使用及性能优化方法,并讨论了安全性、可扩展性以及容器化的重要性。通过这些技术要点,开发者能够快速构建出符合现代Web应用需求的应用程序。
53 0
|
3月前
|
缓存 数据库连接 API
Entity Framework Core——.NET 领域的 ORM 利器,深度剖析其最佳实践之路
【8月更文挑战第28天】在软件开发领域,高效的数据访问与管理至关重要。Entity Framework Core(EF Core)作为一款强大的对象关系映射(ORM)工具,在 .NET 开发中扮演着重要角色。本文通过在线书店应用案例,展示了 EF Core 的核心特性和优势。我们定义了 `Book` 实体类及其属性,并通过 `BookStoreContext` 数据库上下文配置了数据库连接。EF Core 提供了简洁的 API,支持数据的查询、插入、更新和删除操作。
110 0
|
开发框架 前端开发 .NET
ASP.NET Core 核心特性学习笔记「下」
ASP.NET Core 核心特性学习笔记「下」
|
开发框架 前端开发 中间件
ASP.NET Core 核心特性学习笔记「上」
ASP.NET Core 核心特性学习笔记「上」
|
SQL 机器学习/深度学习 Cloud Native
.NET 云原生架构师训练营(模块二 基础巩固 EF Core 更新和迁移)--学习笔记
- 状态 - 自动变更检测 - 不查询删除和更新 - 并发
250 0
.NET 云原生架构师训练营(模块二 基础巩固 EF Core 更新和迁移)--学习笔记
下一篇
无影云桌面