《Storm分布式实时计算模式》——3.3 Trident spout

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介:

本节书摘来自华章计算机《Storm分布式实时计算模式》一书中的第3章,第3.3节,作者:(美)P. Taylor Goetz Brian O’Neill 更多章节内容可以访问云栖社区“华章计算机”公众号查看。

3.3 Trident spout

让我们先来看topology中的spout。和Storm相比,Trident引入了“数据批次”(batch)的概念。不像Storm的spout,Trident spout必须成批地发送tuple。
每个batch会分配一个唯一的事务标识符。spout基于约定决定batch的组成方式。spout有三种约定:非事务型(non-transactional)、事务型(transactional)、非透明型(opaque)。
非事务型spout对batch的组成部分不提供保障,并且可能出现重复。两个不同的batch可能含有相同的tuple。事务型spout保证batch是非重复的,并且batch总是包含相同的tuple。非透明型spout保证数据是非重复的,但不能保证batch的内容是不变的。
表3-1描述了这些特性。


e574f1d6817ad2083d67ad3afe2a4eefd2741b54

spout接口如下面代码片段所示:

3f85c26bd7fe342a2fc8a165e7544912e6c52500

在Trident中,spout没有真的发射tuple,而是把这项工作分解给了BatchCoordinator和Emitter方法。Emitter负责发送tuple,BatchCoordinator负责管理批次和元数据,Emitter需要依靠元数据来恰当地进行批次的数据重放。TridentSpout函数仅仅是简单地提供了到BatchCoordinator和Emitter的访问方法,并且声明发射的tuple包括哪些字段。下面列出了示例中的DiagnosisEventSpout方法:

<a href=https://yqfile.alicdn.com/5982a1410c9ced7b857f57df5b027a5f2acbd30b.png
" >

如上述代码中的getOutputFields()方法所示,在我们的实例topology中,spout发射一个字段event,值是一个DiagnosisEvent类。
BatchCoordinator类实现下述接口:

<a href=https://yqfile.alicdn.com/543b295a73f41c1b15d2ebf08d7fe93c925607da.png
" >

BatchCoordinator是一个泛型类。这个泛型类是重放一个batch所需要的元数据。在本例中,spout发送随机事件,因此元数据可以忽略。实际系统中,元数据可能包含组成了这个batch的消息或者对象的标识符。通过这个信息,非透明型和事务型spout可以实现约定,确保batch的内容不出现重复,在事务型spout中,batch的内容不会出现变化。
BatchCoordinator类作为一个Storm Bolt运行在一个单线程中。Storm会在ZooKeeper中持久化存储这个元数据。当事务处理完成时会通知到对应的coordinator。
在我们的例子中,没有做特定的协调操作,下面就是DiagnosisEventSpout类中使用的协调操作:


726dd1f565b0aee88ccc7f75d41efce744fe22a5


92bab81c270ef1bd437522b533394c48200c0656

Trident spout的第二个组成部分是Emitter。在Storm里,spout使用collector来发送tuple,Emmiter函数在Trident spout中执行这种功能。唯一的区别是,使用TridentCollector类,发送出去的tuple是通过BatchCoordinator类初始化的一批数据。
Emitter方法的接口格式如下所示:

f0cf181bef762cbbe94205b7febe7042d3d473ef

如前面代码所示,Emitter函数只有一个功能,将tuple打包发射出去。为了实现这个功能,函数接收的参数包括batch(由coordinator生成的)的元数据、事务的信息和Emitter用来发送tuple的collector。DiagnosisEventEmitter类的代码如下所示:

<a href=https://yqfile.alicdn.com/41005299c40f1557e89f4faa0a3025c6b13f2f5d.png
" >


1a80e7abeb7ca6e95f45eab6de66d386d4f846af

发送的工作在emitBatch()中进行。例子中,我们随机分配一个经度和纬度,大体保持在美国范围内,使用System.currentTimeMillis()方法生成诊断的时间戳。
实际场景中,ICD-9-CM的范围在000到999之间。针对本示例,我们仅使用320到327之间的诊断代码。这些代码如下所示:


d9cbb7b6ba555e1c939b2a0688b8e74a533c7e2d

这些诊断代码随机分配给事件。
在这个例子里,我们使用对象来封装诊断事件。为简化起见,我们将事件的每个组成部分作为tuple的一个独立字段。这里,对象封装还是使用tuple字段进行封装,需要权衡。通常会限制tuple的字段在易于管理的数量之内,但为了数据流控制或tuple的分组策略,将数据放在tuple的字段里还是有意义的。
在我们的例子中,DiagnosisEvent类表示topology处理的关键数据。对象的代码如下所示:


89ddeb0c03596fa01c6ded7721600d120efb6439


<a href=https://yqfile.alicdn.com/5a3c17d0c95fa54fc745c49201f50749647949fb.png" >

这个对象是一个简单的JavaBean。时间戳使用long变量存储,存储的是纪元时间的秒数。经度和纬度使用dobule存储。diagnosisCode类使用string,以防系统可能需要处理非ICD-9数据,比如有字母的代码。
至此,topology已经可以发射事件了。在实际场景中,我们可能将topology集成到一个医疗请求处理系统或者一个电子健康记录系统来进行实践演练。
相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
目录
打赏
0
0
0
0
1408
分享
相关文章
基于Flink CDC 开发,支持Web-UI的实时KingBase 连接器,三大模式无缝切换,效率翻倍!
TIS 是一款基于Web-UI的开源大数据集成工具,通过与人大金仓Kingbase的深度整合,提供高效、灵活的实时数据集成方案。它支持增量数据监听和实时写入,兼容MySQL、PostgreSQL和Oracle模式,无需编写复杂脚本,操作简单直观,特别适合非专业开发人员使用。TIS率先实现了Kingbase CDC连接器的整合,成为业界首个开箱即用的Kingbase CDC数据同步解决方案,助力企业数字化转型。
104 5
基于Flink CDC 开发,支持Web-UI的实时KingBase 连接器,三大模式无缝切换,效率翻倍!
分布式锁服务深度解析:以Apache Flink的Checkpointing机制为例
【10月更文挑战第7天】在分布式系统中,多个进程或节点可能需要同时访问和操作共享资源。为了确保数据的一致性和系统的稳定性,我们需要一种机制来协调这些进程或节点的访问,避免并发冲突和竞态条件。分布式锁服务正是为此而生的一种解决方案。它通过在网络环境中实现锁机制,确保同一时间只有一个进程或节点能够访问和操作共享资源。
208 3
【YashanDB 知识库】用 yasldr 配置 Bulkload 模式作单线程迁移 300G 的业务数据到分布式数据库,迁移任务频繁出错
问题描述 详细版本:YashanDB Server Enterprise Edition Release 23.2.4.100 x86_64 6db1237 影响范围: 离线数据迁移场景,影响业务数据入库。 外场将部分 NewCIS 的报表业务放到分布式数据库,验证 SQL 性能水平。 操作系统环境配置: 125G 内存 32C CPU 2T 的 HDD 磁盘 问题出现的步骤/操作: 1、部署崖山分布式数据库 1mm 1cn 3dn 单线启动 yasldr 数据迁移任务,设置 32 线程的 bulk load 模式 2、观察 yasldr.log 是否出现如下错
大数据-110 Flink 安装部署 下载解压配置 Standalone模式启动 打包依赖(一)
大数据-110 Flink 安装部署 下载解压配置 Standalone模式启动 打包依赖(一)
181 0
大数据-110 Flink 安装部署 下载解压配置 Standalone模式启动 打包依赖(二)
大数据-110 Flink 安装部署 下载解压配置 Standalone模式启动 打包依赖(二)
146 0
|
5月前
|
Saga模式在分布式系统中保证事务的隔离性
Saga模式在分布式系统中保证事务的隔离性
101 4
大数据-38 Redis 高并发下的分布式缓存 Redis简介 缓存场景 读写模式 旁路模式 穿透模式 缓存模式 基本概念等
大数据-38 Redis 高并发下的分布式缓存 Redis简介 缓存场景 读写模式 旁路模式 穿透模式 缓存模式 基本概念等
165 4
大数据-111 Flink 安装部署 YARN部署模式 FlinkYARN模式申请资源、提交任务
大数据-111 Flink 安装部署 YARN部署模式 FlinkYARN模式申请资源、提交任务
198 0
阿里云实时计算Flink版测评报告
该测评报告详细介绍了阿里云实时计算Flink版在用户行为分析与标签画像中的应用实践,展示了其毫秒级的数据处理能力和高效的开发流程。报告还全面评测了该服务在稳定性、性能、开发运维及安全性方面的卓越表现,并对比自建Flink集群的优势。最后,报告评估了其成本效益,强调了其灵活扩展性和高投资回报率,适合各类实时数据处理需求。

热门文章

最新文章

AI助理

你好,我是AI助理

可以解答问题、推荐解决方案等