《Storm分布式实时计算模式》——3.4 Trident运算

本文涉及的产品
实时计算 Flink 版,1000CU*H 3个月
简介:

本节书摘来自华章计算机《Storm分布式实时计算模式》一书中的第3章,第3.4节,作者:(美)P. Taylor Goetz Brian O’Neill 更多章节内容可以访问云栖社区“华章计算机”公众号查看。

3.4 Trident运算

时间戳已经生成好了,下一步是加入处理事件的逻辑组件。在Trident中,这些组件称为运算(operation)。在我们的topology中,使用两种不同的运算:filter和function。
运算通过Stream对象的方法来调用。这个例子中,我们使用了Stream对象的下述方法:


<a href=https://yqfile.alicdn.com/46e7bea64528ebdb3f64b3cb53140489e03591ff.png
" >

注意前面代码中列出的方法返回形式为Stream对象或者TridentState对象,返回可以用来创建新的数据流。因此,运算可以连在一起使用流式接口形式的Java代码。让我们再看看示例topology中的关键代码:

<a href=https://yqfile.alicdn.com/f0d2d1524271a5239e587c3c0700475aeec9a7b1.png
" >

通常,应用运算需要声明一个输入域集合和一个输出域集合,也就是funcition域。上面代码中topology第二行声明我们需要CityAssignment对数据流中的每个tuple执行操作。在每个tuple中,CityAssignment会在event字段上运算并且增加一个叫做city的新字段,这个字段会附在tuple中向后发射。
每个操作在流式风格的语法上略有不同,这取决于操作需要哪些信息。下面将介绍不同操作的详细语法和语义。
3.4.1 Trident filter
我们topology逻辑中的第一部分就是个过滤器filter,它会忽略掉我们不关心的疾病事件。在这个例子中,系统只关心脑膜炎(meningitis)的病情,从前面表格中看到,脑膜炎对应的疾病代码是320、321和322。
为了通过疾病代码过滤事件,我们需要利用Trident filter。Trident通过提供BaseFilter类,我们通过实现子类就可以方便地对tuple进行过滤,滤除系统不需要的tuple。BaseFilter类实现了Filter接口,这个接口如下面代码片段所示:


446ccb57e74454f084a446537c716ba03d54a5fa

为了在数据流中过滤tuple,应用需要通过继承BaseFilter类来实现这个接口。这个例子中,我们使用下述过滤器过滤事件:

ba130363a5f4f5faf6ab6c07be99f8374d7aa01b

上面的代码中,我们从tuple中提取了DiagnosisEvent并且检查疾病代码。因为所有的脑膜炎代码小于等于322,我们也没有发送其他代码,所以只需要简单地检查代码是否小于322,就可以决定事件是否和脑膜炎有关。
Filter操作结果返回True的tuple将会被发送到下游进行操作。如果方法返回False,该tuple就不会发送到下游。
在我们的topology中,我们在数据流上使用each(inputFields,filter)方法,将这个过滤器应用到数据流的每个tuple中:

3d4e63bda34a7fb899bf489211c25f75393fe723

3.4.2 Trident function
在filter之外,Storm还提供了一个更通用功能的接口function。function和Storm的bolt类似,读取tuple并且发送新的tuple。其中一个区别是,Trident function只能添加数据。function发送数据时,将新字段添加在tuple中,并不会删除或者变更已有的字段。
function接口如下代码片段所示:

<a href=https://yqfile.alicdn.com/beff14fd4d5d776524b631d21ff364c68ce26b01.png
" >

和Storm的bolt类似,function实现了一个包括实际逻辑的方法execute。function的实现也可以选用TridentCollector来发送tuple到新的function中。用这种方式,function也可以用来过滤tuple,起到filter的作用。
我们topology中的第一个function是CityAssignment,如下所示:

<a href=https://yqfile.alicdn.com/96f5af484cf5f33107478974af7e25235245791e.png
" >


b7288346e87f87e02aa44920100f5cb20fb00024

在这个function中,我们使用静态初始化的方式建立了一个我们关心的城市的地图。示例中,function包括一个地图,存储了的坐标信息包括:Philadelphia(PHL)、New York City(NYC)、San Francisco(SF)和LosAngeles(LA)。
在execute()方法中,函数遍历城市计算事件和城市之间的距离。现实系统中,地理空间的索引效率会高很多。
function声明的字段数量必须和它发射出值的字段数一致。如果不一致,Storm就会抛出IndexOutOfBoundsException异常。
我们topology中的下一个function是HourAssignment,用来转化Unix时间戳为纪元时间的小时,可以用来对事件发生进行时间上的分组操作。HourAssignment的代码如下:

df7858b7101a4526516be6950ab431194768bbc0


f4e9bbc2d64df03fdfebda9ef1f69a322c9a8848

我们重写了这个function,同时发射了小时的数值,以及由城市、疾病代码、小时组合而成的key。实际上,这个组合值会作为聚合计数的唯一标识符,后面会详细解释。
我们topology中最后两个funciton是用来侦测疾病暴发并且告警的。OutbreakDetector类的代码如下:

7ae5e5fe1010d6caa04985485d3f6f79e48c8dcf

这个function提取出了特定城市、疾病、时间的发生次数,并且检查计数是否超过了设定的阈值。如果超过,发送一个新的字段包括一条告警信息。在上面代码里,注意这个function实际上扮演了一个过滤器的角色,但是却作为一个function的形式来实现,是因为需要在tuple中添加新的字段。因为filter不能改变tuple,当我们既想过滤又想添加字段时必须使用function。
最后一个function的功能就是发布一个告警(并且结束程序)。代码如下:


6a036fcc8d96405ffa8d79ac6bf8e3858987c9f3


<a href=https://yqfile.alicdn.com/f6fa126f5a8efa4d10f9909240f12104e8f1e658.png" >

这个方法非常简单,提取了告警的内容,并写入日志,最后结束整个程序。
相关实践学习
基于Hologres+Flink搭建GitHub实时数据大屏
通过使用Flink、Hologres构建实时数仓,并通过Hologres对接BI分析工具(以DataV为例),实现海量数据实时分析.
实时计算 Flink 实战课程
如何使用实时计算 Flink 搞定数据处理难题?实时计算 Flink 极客训练营产品、技术专家齐上阵,从开源 Flink功能介绍到实时计算 Flink 优势详解,现场实操,5天即可上手! 欢迎开通实时计算 Flink 版: https://cn.aliyun.com/product/bigdata/sc Flink Forward Asia 介绍: Flink Forward 是由 Apache 官方授权,Apache Flink Community China 支持的会议,通过参会不仅可以了解到 Flink 社区的最新动态和发展计划,还可以了解到国内外一线大厂围绕 Flink 生态的生产实践经验,是 Flink 开发者和使用者不可错过的盛会。 去年经过品牌升级后的 Flink Forward Asia 吸引了超过2000人线下参与,一举成为国内最大的 Apache 顶级项目会议。结合2020年的特殊情况,Flink Forward Asia 2020 将在12月26日以线上峰会的形式与大家见面。
相关文章
|
6月前
|
消息中间件 运维 Kafka
直播预告|Kafka+Flink双引擎实战:手把手带你搭建分布式实时分析平台!
在数字化转型中,企业亟需从海量数据中快速提取价值并转化为业务增长动力。5月15日19:00-21:00,阿里云三位技术专家将讲解Kafka与Flink的强强联合方案,帮助企业零门槛构建分布式实时分析平台。此组合广泛应用于实时风控、用户行为追踪等场景,具备高吞吐、弹性扩缩容及亚秒级响应优势。直播适合初学者、开发者和数据工程师,参与还有机会领取定制好礼!扫描海报二维码或点击链接预约直播:[https://developer.aliyun.com/live/255088](https://developer.aliyun.com/live/255088)
507 35
直播预告|Kafka+Flink双引擎实战:手把手带你搭建分布式实时分析平台!
|
6月前
|
消息中间件 运维 Kafka
直播预告|Kafka+Flink 双引擎实战:手把手带你搭建分布式实时分析平台!
直播预告|Kafka+Flink 双引擎实战:手把手带你搭建分布式实时分析平台!
224 12
|
6月前
|
SQL 关系型数据库 MySQL
Flink CDC 3.4 发布, 优化高频 DDL 处理,支持 Batch 模式,新增 Iceberg 支持
Apache Flink CDC 3.4.0 版本正式发布!经过4个月的开发,此版本强化了对高频表结构变更的支持,新增 batch 执行模式和 Apache Iceberg Sink 连接器,可将数据库数据全增量实时写入 Iceberg 数据湖。51位贡献者完成了259次代码提交,优化了 MySQL、MongoDB 等连接器,并修复多个缺陷。未来 3.5 版本将聚焦脏数据处理、数据限流等能力及 AI 生态对接。欢迎下载体验并提出反馈!
1170 1
Flink CDC 3.4 发布, 优化高频 DDL 处理,支持 Batch 模式,新增 Iceberg 支持
|
存储 缓存 算法
分布式锁服务深度解析:以Apache Flink的Checkpointing机制为例
【10月更文挑战第7天】在分布式系统中,多个进程或节点可能需要同时访问和操作共享资源。为了确保数据的一致性和系统的稳定性,我们需要一种机制来协调这些进程或节点的访问,避免并发冲突和竞态条件。分布式锁服务正是为此而生的一种解决方案。它通过在网络环境中实现锁机制,确保同一时间只有一个进程或节点能够访问和操作共享资源。
452 3
|
8月前
|
关系型数据库 MySQL 数据库
基于Flink CDC 开发,支持Web-UI的实时KingBase 连接器,三大模式无缝切换,效率翻倍!
TIS 是一款基于Web-UI的开源大数据集成工具,通过与人大金仓Kingbase的深度整合,提供高效、灵活的实时数据集成方案。它支持增量数据监听和实时写入,兼容MySQL、PostgreSQL和Oracle模式,无需编写复杂脚本,操作简单直观,特别适合非专业开发人员使用。TIS率先实现了Kingbase CDC连接器的整合,成为业界首个开箱即用的Kingbase CDC数据同步解决方案,助力企业数字化转型。
1774 5
基于Flink CDC 开发,支持Web-UI的实时KingBase 连接器,三大模式无缝切换,效率翻倍!
|
9月前
|
SQL 数据建模 BI
【YashanDB 知识库】用 yasldr 配置 Bulkload 模式作单线程迁移 300G 的业务数据到分布式数据库,迁移任务频繁出错
问题描述 详细版本:YashanDB Server Enterprise Edition Release 23.2.4.100 x86_64 6db1237 影响范围: 离线数据迁移场景,影响业务数据入库。 外场将部分 NewCIS 的报表业务放到分布式数据库,验证 SQL 性能水平。 操作系统环境配置: 125G 内存 32C CPU 2T 的 HDD 磁盘 问题出现的步骤/操作: 1、部署崖山分布式数据库 1mm 1cn 3dn 单线启动 yasldr 数据迁移任务,设置 32 线程的 bulk load 模式 2、观察 yasldr.log 是否出现如下错
|
监控
Saga模式在分布式系统中保证事务的隔离性
Saga模式在分布式系统中保证事务的隔离性
254 4
|
资源调度 分布式计算 大数据
大数据-111 Flink 安装部署 YARN部署模式 FlinkYARN模式申请资源、提交任务
大数据-111 Flink 安装部署 YARN部署模式 FlinkYARN模式申请资源、提交任务
427 0
|
3月前
|
存储 负载均衡 NoSQL
【赵渝强老师】Redis Cluster分布式集群
Redis Cluster是Redis的分布式存储解决方案,通过哈希槽(slot)实现数据分片,支持水平扩展,具备高可用性和负载均衡能力,适用于大规模数据场景。
317 2

热门文章

最新文章

下一篇
oss云网关配置