TPL Dataflow组件应对高并发,低延迟要求

本文涉及的产品
云数据库 Tair(兼容Redis),内存型 2GB
Redis 开源版,标准版 2GB
推荐场景:
搭建游戏排行榜
简介: 2C互联网业务增长,单机多核的共享内存模式带来的排障问题、编程困难;随着多核时代和分布式系统的到来,共享模型已经不太适合并发编程,因此actor-based模型又重新受到了人们的重视。

长话短说


2C互联网业务增长,单机多核的共享内存模式带来的排障问题、编程困难;随着多核时代和分布式系统的到来,共享模型已经不太适合并发编程,因此actor-based模型又重新受到了人们的重视。


---------------------------调试过多线程的都懂-----------------------------

  • 传统编程模型通常使用回调和同步对象(如锁)来协调任务和访问共享数据,从宏观看:若任务的执行需要某些共享资源,不可避免该任务需要关注并抢占资源


  • actor-based模型是一种流水线模型,actor-based模型share nothing。所有的线程(或进程)通过消息传递的方式进行合作,这些线程(或进程)称为参与者actor,预先定义任务流水线后,不关注数据什么时候流到这个任务,专注完成当前任务本身。


 .Net TPL Dataflow组件帮助我们快速实现actor-based模型,当有多个必须异步通信的操作或要等待数据可用再进一步处理时Dataflow组件非常有用


072ce97951cf7f549f55ee166abc85b1.jpg


TPL Dataflow是微软前几年给出的数据处理库, 内置常见的处理块,可将这些块组装成一个处理管道,"块"对应处理管道中的"阶段任务",可类比AspNetCore 中Middleware和Pipeline。

  • TPL Dataflow库为消息传递、CPU密集型/I-O密集型应用程序提供了编程基础, 可更明确控制数据的暂存方式、移动路线,达到高吞吐量和低延迟。

  • 需要注意的是:TPL Dataflow非分布式数据流,消息在进程内传递


TPL Dataflow核心概念


TPL Dataflow 内置的Block覆盖了常见的应用场景,如果内置块不能满足你的要求,你也可以自定“块”。


Block可以划分为下面3类


  • Buffering Only   [Buffer不是缓存Cache的概念,而是一个暂存区的概念]

  • Execution


  • Grouping


使用以上块混搭处理管道, 大多数的块都会执行一个操作,有些时候需要将消息分发到不同Block,这时可使用特殊类型的缓冲块给管道“”分叉”。


Execution Block


可执行的块有两个核心组件


  • 输入、输出消息的暂存区(一般称为Input,Output队列)


  • 在消息上执行动作的委托

ae958915c9ce2275c0aa9c74d93067fe.png


消息在输入和输出时能够被暂存:


当输入的消息速度比Func委托的执行速度比快,后续消息将在到达时暂存;


当下一个块的输入暂存区中无可用空间,将在当前块输出时暂存。


每个块我们可以配置:


  • 暂存区的总容量,默认无上限

  • 执行操作委托的并发度,默认情况下块按照顺序处理消息,一次一个。


将块链接在一起形成处理管道,生产者将消息推向管道。


TPL Dataflow有一个基于pull的机制(使用Receive和TryReceive方法),但我们将在管道中使用块连接和推送机制。


  • TransformBlock(Execution category)-- 由输入输出暂存区和一个Func委托组成,输入的每个消息,都会输为出另一个,可以使用这个Block去执行消息的转换,或者转发输出的消息到另外一个Block

  • TransformManyBlock (Execution category) -- 由输入输出暂存区和一个Func>委托组成, 它为输入的每个消息输出一个IEnumerable
  • BroadcastBlock (Buffering category)-- 只容纳最多1个消息的暂存区和Func委托组成(新消息到达会覆盖原消息),委托仅仅为了让你控制怎样克隆这个消息,不做消息转换

该块在需要将消息广播给多个块时很有用(管道分叉)


  • ActionBlock (Execution category)-- 由缓冲区和Action委托组成,它们不再给其他块转发消息,只处理输入的消息,一般作为管道结尾

  • BatchBlock (Grouping category)-- 告诉它你想要的每个批处理的大小,它将累积消息,直到它达到那个大小,然后将它作为一组消息转发到下一个块


其他内建Block类型:BufferBlock、WriteOnceBlock、JoinBlock、BatchedJoinBlock,暂时不会深入。


管道连锁反应


  当B块输入缓冲区达到上限容量,为其供货的上游A块的输出暂存区将开始被填充,当A块输出暂存区已满时,该块必须暂停处理,直到暂存区有空间,这意味着一个Block的处理瓶颈可能导致所有前面的块的暂存区被填满。


   但是不是所有的块暂存区满时都会暂停,BroadcastBlock有1个消息的暂存区,每个消息都会被覆盖, 因此如果这个广播块不能及时将消息转发到下游,则在下个消息到达的时候消息将丢失,某种意义上达到一种限流效果(比较残暴).


编程实践

33b149e288fc34d6c0d94287f262343e.png


生产者投递消息


可使用Post或者SendAsync方法向首块投递消息:


  • Post方法即时返回true/false,True意味着消息被block接收(暂存区有空余),false意味着拒绝了消息(暂存区已满或者Block已经出错)。

  • SendAsync方法返回一个Task, 将会以异步的方式阻塞直到块接收、拒绝、块出错。

Post、SendAsync的不同点在于SendAsync可以延迟投递(后置管道的输入buffer不空,得到异步通知后再投递)


定义流水线管道


按照上图业务定义流水线:



    public EqidPairHandler(IHttpClientFactory httpClientFactory, RedisDatabase redisCache, IConfiguration con, LogConfig logConfig, ILoggerFactory loggerFactory)        {            _httpClient = httpClientFactory.CreateClient("bce-request");            _redisDB0 = redisCache[0];            _redisDB = redisCache;            _logger = loggerFactory.CreateLogger(nameof(EqidPairHandler));            var option = new DataflowLinkOptions { PropagateCompletion = true };
                publisher = _redisDB.RedisConnection.GetSubscriber();            _eqid2ModelTransformBlock = new TransformBlock<EqidPair, EqidModel>              (                   // redis piublih 没有做在TransformBlock fun里面, 因为publih失败可能影响后续的block传递                   eqidPair => EqidResolverAsync(eqidPair),                   new ExecutionDataflowBlockOptions                   {                       MaxDegreeOfParallelism = con.GetValue<int>("MaxDegreeOfParallelism")                   }              );            // https://docs.microsoft.com/en-us/dotnet/standard/parallel-programming/walkthrough-creating-a-dataflow-pipeline            _logBatchBlock = new LogBatchBlock<EqidModel>(logConfig, loggerFactory);            _logPublishBlock = new ActionBlock<EqidModel>(x => PublishAsync(x) );            _broadcastBlock = new BroadcastBlock<EqidModel>(x => x); // 由只容纳一个消息的缓存区和拷贝函数组成      _broadcastBlock.LinkTo(_logBatchBlock.InputBlock, option);                  _broadcastBlock.LinkTo(_logPublishBlock, option);                  _eqid2ModelTransformBlock.LinkTo(_broadcastBlock, option);        }

    仿IIS日志写入组件


    异常处理


    上述程序在生产部署时遇到相关的坑位:


    在测试环境_eqid2ModelTransformBlock块委托函数稳定执行,程序并未出现异样;

    部署到生产之后,该Pipeline运行一段时间就停止工作,一直很困惑。


    后来通过监测_eqid2ModelTransformBlock.Completion属性,发现该块在执行某次委托时报错,提前进入完成态


    当TPL Dataflow不再处理消息且保证不再处理消息的时候,就被定义为 "完成态", IDataflow.Completion属性(Task对象)标记该状态Task对象的TaskStatus枚举值描述此Block进入完成态的真实原因。


    • TaskStatus.RanToCompletion    "成功完成" 在Block中定义的任务  
    • TaskStatus.Fault    因未处理的异常导致"过早的完成"
    • TaskStatus.Canceled    因取消操作导致 "过早的完成"


    官方资料表明:某块进入Fault、Cancel状态,都会导致该块提前进入“完成态”,但因Fault、Canceled进入的“完成态”会导致输入暂存区和输出暂存区被清空。

    After Fault has been called on a dataflow block, that block will complete, and its Completion task will enter a final state. Faulting a block, as with canceling a block, causes buffered messages (unprocessed input messages as well as unoffered output messages) to be lost.


    故需要严肃对待异常,一般情况下我们使用try、catch包含所有的执行代码以确保所有的异常都被处理。

     

    本文作为TPL Dataflow的入门指南


    微软技术栈的可持续关注actor-based模型的流水线处理组件,应对单体程序中高并发,低延迟相当巴适。

    相关实践学习
    基于Redis实现在线游戏积分排行榜
    本场景将介绍如何基于Redis数据库实现在线游戏中的游戏玩家积分排行榜功能。
    云数据库 Redis 版使用教程
    云数据库Redis版是兼容Redis协议标准的、提供持久化的内存数据库服务,基于高可靠双机热备架构及可无缝扩展的集群架构,满足高读写性能场景及容量需弹性变配的业务需求。 产品详情:https://www.aliyun.com/product/kvstore &nbsp; &nbsp; ------------------------------------------------------------------------- 阿里云数据库体验:数据库上云实战 开发者云会免费提供一台带自建MySQL的源数据库&nbsp;ECS 实例和一台目标数据库&nbsp;RDS实例。跟着指引,您可以一步步实现将ECS自建数据库迁移到目标数据库RDS。 点击下方链接,领取免费ECS&amp;RDS资源,30分钟完成数据库上云实战!https://developer.aliyun.com/adc/scenario/51eefbd1894e42f6bb9acacadd3f9121?spm=a2c6h.13788135.J_3257954370.9.4ba85f24utseFl
    相关文章
    |
    SQL 安全 Java
    【高并发趣事三】——双重检查锁定与延迟初始化
    【高并发趣事三】——双重检查锁定与延迟初始化
    101 0
    【高并发趣事三】——双重检查锁定与延迟初始化
    |
    8月前
    |
    消息中间件 Java Linux
    2024年最全BATJ真题突击:Java基础+JVM+分布式高并发+网络编程+Linux(1),2024年最新意外的惊喜
    2024年最全BATJ真题突击:Java基础+JVM+分布式高并发+网络编程+Linux(1),2024年最新意外的惊喜
    |
    8月前
    |
    Java
    在高并发环境下,再次认识java 锁
    在高并发环境下,再次认识java 锁
    83 0
    |
    8月前
    |
    消息中间件 NoSQL Java
    Java高级开发:高并发+分布式+高性能+Spring全家桶+性能优化
    Java高架构师、分布式架构、高可扩展、高性能、高并发、性能优化、Spring boot、Redis、ActiveMQ、Nginx、Mycat、Netty、Jvm大型分布式项目实战学习架构师之路
    |
    7月前
    |
    缓存 NoSQL Java
    Java高并发实战:利用线程池和Redis实现高效数据入库
    Java高并发实战:利用线程池和Redis实现高效数据入库
    546 0
    |
    5月前
    |
    监控 算法 Java
    企业应用面临高并发等挑战,优化Java后台系统性能至关重要
    随着互联网技术的发展,企业应用面临高并发等挑战,优化Java后台系统性能至关重要。本文提供三大技巧:1)优化JVM,如选用合适版本(如OpenJDK 11)、调整参数(如使用G1垃圾收集器)及监控性能;2)优化代码与算法,减少对象创建、合理使用集合及采用高效算法(如快速排序);3)数据库优化,包括索引、查询及分页策略改进,全面提升系统效能。
    59 0
    |
    7月前
    |
    存储 NoSQL Java
    探索Java分布式锁:在高并发环境下的同步访问实现与优化
    【6月更文挑战第30天】Java分布式锁在高并发下确保数据一致性,通过Redis的SETNX、ZooKeeper的临时节点、数据库操作等方式实现。优化策略包括锁超时重试、续期、公平性及性能提升,关键在于平衡同步与效率,适应大规模分布式系统的需求。
    209 1
    |
    6月前
    |
    算法 Java 调度
    高并发架构设计三大利器:缓存、限流和降级问题之使用Java代码实现令牌桶算法问题如何解决
    高并发架构设计三大利器:缓存、限流和降级问题之使用Java代码实现令牌桶算法问题如何解决
    |
    6月前
    |
    监控 网络协议 Java
    Java面试题:解释Java NIO与BIO的区别,以及NIO的优势和应用场景。如何在高并发应用中实现NIO?
    Java面试题:解释Java NIO与BIO的区别,以及NIO的优势和应用场景。如何在高并发应用中实现NIO?
    90 0
    |
    6月前
    |
    设计模式 安全 NoSQL
    Java面试题:设计一个线程安全的单例模式,并解释其内存占用和垃圾回收机制;使用生产者消费者模式实现一个并发安全的队列;设计一个支持高并发的分布式锁
    Java面试题:设计一个线程安全的单例模式,并解释其内存占用和垃圾回收机制;使用生产者消费者模式实现一个并发安全的队列;设计一个支持高并发的分布式锁
    78 0