本节书摘来自华章计算机《Storm分布式实时计算模式》一书中的第3章,第3.2节,作者:(美)P. Taylor Goetz Brian O’Neill 更多章节内容可以访问云栖社区“华章计算机”公众号查看。
3.2 Trident topology
为了满足这些需求,我们需要在topology中对疾病的发生进行统计。使用标准的Storm topology进行统计会遇到难题,因为tuple可能重复发送,这会导致重复计数的问题。下一节将会看到,Trident提供了操作原语来解决这个问题。
我们将使用的topology,如图3-1所示。
上述topology的代码如下:
![ba3180aa11462bdd8e15d929f470895871b6e135](https://yqfile.alicdn.com/ba3180aa11462bdd8e15d929f470895871b6e135.png?x-oss-process=image/resize,w_1400/format,webp)
" >
![faed3942e6aa21f4b3ca2e11c2697773aebf9916](https://yqfile.alicdn.com/faed3942e6aa21f4b3ca2e11c2697773aebf9916.png?x-oss-process=image/resize,w_1400/format,webp)
上述代码表现了不同Trident函数之间的布局关联方式。首先,DiagnosisEventSpout函数发射疾病事件。然后事件由DiseaseFilter函数过滤,过滤掉我们不关心的疾病事件。之后,事件由CityAssignment函数赋值一个对应的城市名。然后HourAssignment函数赋值一个表示小时的时间戳,并且增加一个key cityDiseaseHour到tuple的字段中,这个key包括城市、小时和疾病代码。后续就使用这个key进行分组统计并使用persistAggregate函数对统计量持久性存储。统计量传递给OutbreakDetector函数,如果统计量超过阈值,OutbreakDetector向后发送一个告警信息。最后DispatchAlert接收到告警信息,记录日志,并且结束流程。在后面,我们会深入了解每个步骤。