点一下关注吧!!!非常感谢!!持续更新!!!
目前已经更新到了:
Hadoop(已更完)
HDFS(已更完)
MapReduce(已更完)
Hive(已更完)
Flume(已更完)
Sqoop(已更完)
Zookeeper(已更完)
HBase(已更完)
Redis (已更完)
Kafka(已更完)
Spark(已更完)
Flink(正在更新!)
章节内容
上节我们完成了如下的内容:
Flink CEP 核心组件
CEP 的应用场景
CEP 的优势
超时事件提取
当一个模式通过within关键字定义了检测窗口时间时,部分事件序列可能因为超过窗口长度而被丢弃,为了能够处理这些超时的部分匹配,select和flatSelectAPI调用允许制定超时处理程序。
FlinkCEP开发流程
DataSource中的数据转换为DataStream
定义Pattern,并将DataStream和Pattern组合转换为PatternStream。
PatternStream 经过 Select、Process 等算子转换为 DataStream
再次转换为 DataStream 经过处理后,Sink到目标库。
SELECT 方法:
SingleOutputStreamOperator<PayEvent> result = patternStream.select(orderTimeoutOutput, new PatternTimeoutFunction<PayEvent, PayEvent>() { @Override public PayEvent timeout(Map<String, List<PayEvent>> map, long l) throws Exception { return map.get("begin").get(0); } }, new PatternSelectFunction<PayEvent, PayEvent>() { @Override public PayEvent select(Map<String, List<PayEvent>> map) throws Exception { return map.get("pay").get(0); } });
对检测到的序列模式序列应用选择函数,对于每个模式序列,调用提供的 PatternSelectFunction,模式选择函数只能产生一个结果元素。
对超时的部分模式序列应用超时函数,对于每个部分模式序列,调用提供的 PatternTimeoutFunction,模式超时函数只能产生一个结果元素。
你可以在使用相同 OutputTag 进行 Select 操作 SingleOutputStreamOperator上获得SingleOutputStreamOperator生成的超时数据流。
非确定有限自动机
FlinkCEP 在运行时会将用户的逻辑转换为这样一个 NFA Graph(NFA对象)
所以有限状态机的工作过程,就是从开始状态,根据不同的输入,自动进行转换的过程。
上图中的状态机的功能,是检测二进制数是否含有偶数个0。从图上可以看出,输入只有1和0两种。
从S1状态开始,只有输入0才会转换到S2状态,同样S2状态下只有输入0才会转换到S1。所以,二进制输入完毕,如果满足最终状态,也就是最后停在S1状态,那么输入的二进制数就含有偶数个0。
CEP开发流程
FlinkCEP开发流程:
DataSource中数据转换为DataStream、Watermark、keyby
定义Pattern,并将DataStream和Pattern组合转换为PatternStream
PatternStream经过select、process等算子转换为 DataStream
再次转换为 DataStream 经过处理后,Sink到目标库
添加依赖
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-cep_2.12</artifactId> <version>${flink.version}</version> </dependency>
案例1:恶意登录检测
找出5秒内,连续登录失败的账号
以下是数据:
new CepLoginBean(1L, "fail", 1597905234000L), new CepLoginBean(1L, "success", 1597905235000L), new CepLoginBean(2L, "fail", 1597905236000L), new CepLoginBean(2L, "fail", 1597905237000L), new CepLoginBean(2L, "fail", 1597905238000L), new CepLoginBean(3L, "fail", 1597905239000L), new CepLoginBean(3L, "success", 1597905240000L)
整体思路
- 获取到数据
- 在数据源上做Watermark
- 在Watermark上根据ID分组keyBy
- 做出模式Pattern
- 在数据流上进行模式匹配
- 提取匹配成功的数据
编写代码
package icu.wzk; import org.apache.flink.api.common.eventtime.*; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.cep.CEP; import org.apache.flink.cep.PatternStream; import org.apache.flink.cep.functions.PatternProcessFunction; import org.apache.flink.cep.pattern.Pattern; import org.apache.flink.cep.pattern.conditions.IterativeCondition; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.KeyedStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.util.Collector; import java.util.List; import java.util.Map; public class FlinkCepLoginTest { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); env.setParallelism(1); DataStreamSource<CepLoginBean> data = env.fromElements( new CepLoginBean(1L, "fail", 1597905234000L), new CepLoginBean(1L, "success", 1597905235000L), new CepLoginBean(2L, "fail", 1597905236000L), new CepLoginBean(2L, "fail", 1597905237000L), new CepLoginBean(2L, "fail", 1597905238000L), new CepLoginBean(3L, "fail", 1597905239000L), new CepLoginBean(3L, "success", 1597905240000L) ); SingleOutputStreamOperator<CepLoginBean> watermarks = data .assignTimestampsAndWatermarks(new WatermarkStrategy<CepLoginBean>() { @Override public WatermarkGenerator<CepLoginBean> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) { return new WatermarkGenerator<CepLoginBean>() { long maxTimestamp = Long.MAX_VALUE; long maxOutOfOrderness = 500L; @Override public void onEvent(CepLoginBean event, long eventTimestamp, WatermarkOutput output) { maxTimestamp = Math.max(maxTimestamp, event.getTimestamp()); } @Override public void onPeriodicEmit(WatermarkOutput output) { output.emitWatermark(new Watermark(maxTimestamp - maxOutOfOrderness)); } }; } }.withTimestampAssigner((element, recordTimestamp) -> element.getTimestamp()) ); KeyedStream<CepLoginBean, Long> keyed = watermarks .keyBy(new KeySelector<CepLoginBean, Long>() { @Override public Long getKey(CepLoginBean value) throws Exception { return value.getUserId(); } }); Pattern<CepLoginBean, CepLoginBean> pattern = Pattern .<CepLoginBean>begin("start") .where(new IterativeCondition<CepLoginBean>() { @Override public boolean filter(CepLoginBean cepLoginBean, Context<CepLoginBean> context) throws Exception { return cepLoginBean.getOperation().equals("fail"); } }) .next("next") .where(new IterativeCondition<CepLoginBean>() { @Override public boolean filter(CepLoginBean cepLoginBean, Context<CepLoginBean> context) throws Exception { return cepLoginBean.getOperation().equals("fail"); } }) .within(Time.seconds(5)); PatternStream<CepLoginBean> patternStream = CEP.pattern(keyed, pattern); SingleOutputStreamOperator<CepLoginBean> process = patternStream .process(new PatternProcessFunction<CepLoginBean, CepLoginBean>() { @Override public void processMatch(Map<String, List<CepLoginBean>> map, Context context, Collector<CepLoginBean> collector) throws Exception { System.out.println("map: " + map); List<CepLoginBean> start = map.get("start"); collector.collect(start.get(0)); } }); process.print(); env.execute("FlinkCepLoginTest"); } } class CepLoginBean { private Long userId; private String operation; private Long timestamp; public CepLoginBean(Long userId, String operation, Long timestamp) { this.userId = userId; this.operation = operation; this.timestamp = timestamp; } public Long getUserId() { return userId; } public void setUserId(Long userId) { this.userId = userId; } public String getOperation() { return operation; } public void setOperation(String operation) { this.operation = operation; } public Long getTimestamp() { return timestamp; } public void setTimestamp(Long timestamp) { this.timestamp = timestamp; } @Override public String toString() { return "CepLoginBean{" + "userId=" + userId + ", operation='" + operation + '\'' + ", timestamp=" + timestamp + '}'; } }
运行结果
可以看到程序输出:
map: {start=[CepLoginBean{userId=2, operation='fail', timestamp=1597905236000}], next=[CepLoginBean{userId=2, operation='fail', timestamp=1597905237000}]} CepLoginBean{userId=2, operation='fail', timestamp=1597905236000} map: {start=[CepLoginBean{userId=2, operation='fail', timestamp=1597905237000}], next=[CepLoginBean{userId=2, operation='fail', timestamp=1597905238000}]} CepLoginBean{userId=2, operation='fail', timestamp=1597905237000} Process finished with exit code 0
运行截图如下所示: