这是彭文华的第100篇原创
Flink开始往各大厂渗透了,虽然批处理能力弱了一些,但是流式数据基本已经是标配了。Flink不仅有优秀的Checkpoint机制(流式数据快照)、Watermark机制(解决乱序问题),还有及其强大的规则引擎--CEP(Complex Event Processing复杂事件处理)。他就像是一个正则表达式一样,从一串串流动的数据中,按照规则提取所需的数据进行加工处理。
CEP的使用场景
CEP其实就是一个规则引擎,把符合规则的所有数据都拉出来。Flink在实时处理的超高性能非常适合做一些实时逻辑判断的事情,比如风控。
如上图所示,原始数据流中源源不断的走过非常多的数据,我们定义一个CEP,假设是某个IP地址连续抢5个红包。一旦数据流中出现这种情况,Flink会迅速锁定,交给后续处理。
所以CEP会非常适合流式数据中的各种逻辑判断,适用一些实时性要求非常高的场景,比如异常行为监测(风控)、策略营销(抢单模式)、运维(流量抖动)。
风控的例子可以举出很多来,视频网站刷流量、电商网站抢红包、黑产等等,只要数据流中有相关数据,简单设置一个CEP就能瞬间监测出来,立刻处理掉。比如所有账号,短时间内购买优惠产品超过100个,基本可以判断是黑产了。
策略营销的场景,滴滴分享过一些实时营销场景:
- 乘客线上冒泡1分钟没发单;
- 乘客下单后2分钟内没司机接单;
- 乘客在不同业务线之间比价。
我们bykey之后,对每个客户设置一个CEP规则,监控他的动作,发现下单之后没有后续接单,则进行后续营销动作:
CEP的定义和使用
CEP的实现比较简单,主要是三个步骤:
- 定义模式
- 绑定DataStream
- 匹配结果输出
定义模式代码如下:
pattern.next("newP").where( //Pattern:前一个模式,用来组装多模式用的 //next:是一种模式类型,分为严格连续、宽松连续和非确定宽松连续。next是严格连续 new SimpleCondition<Event>() { @Override public boolean filter(Event event) { return event.getid() ==1000; //filter:核心处理逻辑 } } )
定义一个模式的主要核心属性其实就几个:属性、有效期和模式序列。
- 模式属性
- 匹配固定次数,times
- 匹配1次以上,oneOrMore
- 匹配发送多次以上,timesOrMore
- 模式有效期
- 根据业务需求,设定有效期
- 如果不设置,匹配事件一直会持续
- 模式序列
- 严格连续(next/notNext)
- 宽松连续性(followedBy/notFollowedBy)
- 和非确定宽松连续性(followedByAny)
这里解释一下模式序列。严格连续就是必须两个事件前后紧挨着;宽松连续就是两个事件中可以隔着其他事件;非确定宽松连续就是可以重复判定。
举个例子:
如上图所示,原始数据流是12334,定义的模式是找到1、3事件。
对于严格连续来说,数据流中只有12,没有13,所以无法匹配出结果。
对于宽松连续来说,数据流中有123,这就能找到1、3事件了,输出结果1,3。
对于非确定宽松连续来说,数据流中有123,判定出一个1,3,数据流中还有一个1233,又可以判定出一个1,3,所以会输出两个1,3。
定义好一个模式之后,可以再继续定义其他的模式,这样穿起来可以组装成比较复杂的逻辑。
比如我们bykey之后,一个id在10分钟内购买超过100个优惠商品,这就需要浏览、下单、付款三个事件,然后设定时间范围,超过100次,就处罚报警,作出各种防黑产动作。
总结
实时数据处理有很多的应用场景,这些场景需要一种灵活、高效、简单易操作的能力来应对。
Flink通过类似于正则表达式的CEP来完成这些功能。我们可以通过很少的一段代码,定义一个CEP规则,规则可以设置模式匹配类型,比如只匹配一次、匹配多次等,还需要设置模式有效期。
为了应对流数据的扰动,Flink的CEP还设置了严格连续、宽松连续和非确定宽松连续三种匹配方式。
为了满足多条件、复杂逻辑的应用场景,Flink还可以进行多个模式连接在一起,形成模式组。
这样我们就能完成风控、实时营销等实时性要求非常高的需求。