CEP案例
1.入门案例
需求:
有一个业务系统,用户要使用该业务系统必须要先登陆
过滤出来在2秒内连续登陆失败的用户
在test源码目录下创建测试类:cn.itcast.LoginFailDemo
开发步骤:
1.获取流处理执行环境
2.设置并行度,设置事件时间
加载数据源,提取事件时间
4.定义匹配模式,设置时间长度
5.匹配模式(分组)
6.数据处理
7.打印
8.触发执行
数据源:
Arrays.asList( new LoginUser (1, "192.168.0.1", "fail", 1558430842000L), //2019-05-21 17:27:22 new LoginUser (1, "192.168.0.2", "fail", 1558430843000L), //2019-05-21 17:27:23 new LoginUser (1, "192.168.0.3", "fail", 1558430844000L), //2019-05-21 17:27:24 new LoginUser (2, "192.168.10.10", "success", 1558430845000L) //2019-05-21 17:27:25 )
参考代码
/** * 使用CEP实现三秒内登录失败两次的用户 */ public class LoginFailDemo { public static void main(String[] args) throws Exception { //1:初始化流式运行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //2:设置并行度为1 env.setParallelism(1); //3:指定数据按照事件时间进行处理 env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); //4:构建数据源 DataStream<LoginUser > LoginUserStream = env.fromCollection(Arrays.asList( new LoginUser (1, "192.168.0.1", "fail", 1558430842000L),//2019-05-21 17:27:22 new LoginUser (1, "192.168.0.2", "fail", 1558430843000L),//2019-05-21 17:27:23 new LoginUser (1, "192.168.0.3", "fail", 1558430844000L),//2019-05-21 17:27:24 new LoginUser (2, "192.168.10.10", "success", 1558430845000L)//2019-05-21 17:27:25 )).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<LoginUser>(Time.seconds(0)) { @Override public long extractTimestamp(LoginUser element) { return element.getEventTime(); } }); //5.1:定义规则模型 Pattern<LoginUser, LoginUser > LoginUserPattern = Pattern.<LoginUser >begin("begin") .where(new IterativeCondition<LoginUser>() { @Override public boolean filter(LoginUser loginUser, Context<LoginUser > context) throws Exception { return loginUser.getEventType().equals("fail"); } })//匹配第一个事件,匹配的是登录失败 .next("next") //匹配到第一个事件以后,紧跟着一个事件数据,next表示两个事件必须严格的临近 .where(new IterativeCondition<LoginUser >() { @Override public boolean filter(LoginUser loginUser, Context<LoginUser> context) throws Exception { return loginUser.getEventType().equals("fail"); } })//匹配第二个事件,匹配的是登录失败 .within(Time.seconds(3));//定义结束状态,结束状态可以是时间触发也可以是满足了某个事件触发 //5.2:将规则模型应用到数据流中 PatternStream<LoginUser > patternDataStream = CEP.pattern(LoginUserStream.keyBy(LoginUser ::getUserId), LoginUserPattern); //5.3:获取到符合规则模型的数据 /** * IN:传入的数据类型 * OUT:返回值的数据类型 * (Long, String, String, Long):(用户id, 登录ip,登录状态,登录时间) */ SingleOutputStreamOperator<Tuple4<Integer, String, String, Long>> loginFailDataStream = patternDataStream.select(new PatternSelectFunction<LoginUser, Tuple4<Integer, String, String, Long>>() { @Override public Tuple4<Integer, String, String, Long> select(Map<String, List<LoginUser>> pattern) throws Exception { //根据刚才的分析,符合规则的数据会存储到状态集合中,也就是state中,所以查找匹配的时候需要在state中获取数据 LoginUser loginUser = pattern.getOrDefault("next", null).iterator().next(); //返回匹配到的数据 return Tuple4.of(loginUser.getUserId(), loginUser.getIp(), loginUser.getEventType(), loginUser.getEventTime()); } }); //打印出来符合条件的数据 loginFailDataStream.print("连续两次登录失败的用户>>>"); //执行任务 env.execute(); }
登陆对象:
public int userId; //用户id public String ip;//用户Ip public String eventType; //状态 public Long eventTime;//事件时间 /** * 构建登录对象 */ public static class LoginUser implements Serializable { public int userId; //用户id public String ip;//用户Ip public String eventType; //状态 public Long eventTime;//事件时间 public int getUserId() { return userId; } public void setUserId(int userId) { this.userId = userId; } public String getIp() { return ip; } public void setIp(String ip) { this.ip = ip; } public String getEventType() { return eventType; } public void setEventType(String eventType) { this.eventType = eventType; } public Long getEventTime() { return eventTime; } public void setEventTime(Long eventTime) { this.eventTime = eventTime; } public LoginEvent(int userId, String ip, String eventType, Long eventTime) { this.userId = userId; this.ip = ip; this.eventType = eventType; this.eventTime = eventTime; } @Override public String toString() { return "LoginEvent{" + "userId=" + userId + ", ip='" + ip + '\'' + ", eventType='" + eventType + '\'' + ", eventTime=" + eventTime + '}'; } } }
2.监控市场价格
需求:
物价局和工商局会监督市场上各种商品得销售价格,随着市场行情和商品供需得变化,商品价格会有一定程度得浮动,如果商品价格在指定得价格区间波动,政府部门是不会干预的额,如果商品价格在一定的时间范围内波动幅度超出了指定的区间范围,并且上行幅度过大,物价局会上报敏感数据信息,并规范市场价格。
在此,我们假定如果商品售价在1分钟之内有连续两次超过预定商品价格阀值就发送告警信息。
测试数据
{"goodsId":100001,"goodsPrice":6,"goodsName":"apple","alias":"苹果","orderTime":1558430843000} {"goodsId":100007,"goodsPrice":0.5,"goodsName":"mask","alias":"口罩","orderTime":1558430844000} {"goodsId":100002,"goodsPrice":2,"goodsName":"rice","alias":"大米","orderTime":1558430845000} {"goodsId":100003,"goodsPrice":2,"goodsName":"flour","alias":"面粉","orderTime":1558430846000} {"goodsId":100004,"goodsPrice":12,"goodsName":"rice","alias":"大米","orderTime":1558430847000} {"goodsId":100005,"goodsPrice":20,"goodsName":"apple","alias":"苹果","orderTime":1558430848000} {"goodsId":100006,"goodsPrice":3,"goodsName":"banana","alias":"香蕉","orderTime":1558430849000} {"goodsId":100007,"goodsPrice":10,"goodsName":"mask","alias":"口罩","orderTime":1558430850000} {"goodsId":100001,"goodsPrice":16,"goodsName":"apple","alias":"苹果","orderTime":1558430852000} {"goodsId":100007,"goodsPrice":15,"goodsName":"mask","alias":"口罩","orderTime":1558430853000} {"goodsId":100002,"goodsPrice":12,"goodsName":"rice","alias":"大米","orderTime":1558430854000} {"goodsId":100003,"goodsPrice":12,"goodsName":"flour","alias":"面粉","orderTime":1558430855000} {"goodsId":100004,"goodsPrice":12,"goodsName":"rice","alias":"大米","orderTime":1558430856000} {"goodsId":100005,"goodsPrice":20,"goodsName":"apple","alias":"苹果","orderTime":1558430857000} {"goodsId":100006,"goodsPrice":13,"goodsName":"banana","alias":"香蕉","orderTime":1558430858000} {"goodsId":100007,"goodsPrice":10,"goodsName":"mask","alias":"口罩","orderTime":1558430859000}
创建kafka topic
./kafka-topics.sh --create --topic cep --zookeeper node01:2181 --partitions 1 --replication-factor 1
生产数据
./kafka-console-producer.sh --broker-list node01:9092 --topic cep
redis保存限制价格
jedisCluster.hset(“product”,“apple”,“10”); jedisCluster.hset(“product”,“rice”,“6”); jedisCluster.hset(“product”,“flour”,“6”); jedisCluster.hset(“product”,“banana”,“8”); jedisCluster.hset(“product”,“mask”,“5”);
开发步骤
在test源码目录下创建测试类:cn.itcast.CepMarkets
1.获取流处理执行环境
2.设置事件时间、并行度
整合kafka
4.数据转换
5.process获取bean,设置status,并设置事件时间
6.定义匹配模式,设置时间长度
7.匹配模式(分组)
8.查询告警数据
2.1.代码开发
public class CepMarkets { public static void main(String[] args) throws Exception { //1.获取流处理执行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); //2.设置事件时间 env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); //3.整合kafka Properties properties = new Properties(); properties.setProperty("bootstrap.servers", "node01:9092"); //broker地址 properties.setProperty("group.id", "cep"); //消费组 properties.setProperty("enable.auto.commit", "true"); properties.setProperty("auto.commit.interval.ms", "5000"); FlinkKafkaConsumer011<String> kafkaConsumer = new FlinkKafkaConsumer011<>("cep", new SimpleStringSchema(), properties); kafkaConsumer.setStartFromEarliest(); DataStreamSource<String> source = env.addSource(kafkaConsumer); //4.数据转换 SingleOutputStreamOperator<Product> mapData = source.map(new MapFunction<String, Product>() { @Override public Product map(String value) throws Exception { JSONObject json = JSON.parseObject(value); Product product = new Product( json.getLong("goodsId"), json.getDouble("goodsPrice"), json.getString("goodsName"), json.getString("alias"), json.getLong("orderTime"), false ); return product; } }); //5.保留告警数据(设置时间) SingleOutputStreamOperator<Product> waterData = mapData.keyBy(Product::getGoodsId) .process(new KeyedProcessFunction<Long, Product, Product>() { Map<String, String> map = null; @Override public void open(Configuration parameters) throws Exception { JedisCluster jedisCluster = RedisUtil.getJedisCluster(); map = jedisCluster.hgetAll("product"); } @Override public void processElement(Product value, Context ctx, Collector<Product> out) throws Exception { long priceAlert = Long.parseLong(map.get(value.getGoodsName())); if (value.getGoodsPrice() > priceAlert) { value.setStatus(true); } out.collect(value); } }) .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<Product>(Time.seconds(0)) { @Override public long extractTimestamp(Product element) { return element.getOrderTime(); } }) ; //6.定义匹配模式,设置时间长度 Pattern<Product, Product> pattern = Pattern.<Product>begin("begin") .where(new SimpleCondition<Product>() { @Override public boolean filter(Product value) throws Exception { return value.getStatus() == true; } }) .next("next") .where(new SimpleCondition<Product>() { @Override public boolean filter(Product value) throws Exception { return value.getStatus() == true; } }) .within(Time.seconds(60)); //7.匹配模式(分组) PatternStream<Product> cep = CEP.pattern(waterData.keyBy(Product::getGoodsId), pattern); //8.查询告警数据 cep.select(new PatternSelectFunction<Product, Object>() { @Override public Object select(Map<String, List<Product>> pattern) throws Exception { List<Product> result = pattern.get("next"); return result; } }).print("告警数据:"); env.execute(); } } 2.2.Bean对象 属性:goodsId、goodsPrice、goodsName、alias、orderTime、status public class Product { private Long goodsId; private Double goodsPrice; private String goodsName; private String alias; private Long orderTime; private Boolean status; public Product(Long goodsId, Double goodsPrice, String goodsName, String alias, Long orderTime, Boolean status) { this.goodsId = goodsId; this.goodsPrice = goodsPrice; this.goodsName = goodsName; this.alias = alias; this.orderTime = orderTime; this.status = status; } @Override public String toString() { return "Product{" + "goodsId=" + goodsId + ", goodsPrice=" + goodsPrice + ", goodsName='" + goodsName + '\'' + ", alias='" + alias + '\'' + ", orderTime=" + orderTime + ", status=" + status + '}'; } public Long getGoodsId() { return goodsId; } public void setGoodsId(Long goodsId) { this.goodsId = goodsId; } public Double getGoodsPrice() { return goodsPrice; } public void setGoodsPrice(Double goodsPrice) { this.goodsPrice = goodsPrice; } public String getGoodsName() { return goodsName; } public void setGoodsName(String goodsName) { this.goodsName = goodsName; } public String getAlias() { return alias; } public void setAlias(String alias) { this.alias = alias; } public Long getOrderTime() { return orderTime; } public void setOrderTime(Long orderTime) { this.orderTime = orderTime; } public Boolean getStatus() { return status; } public void setStatus(Boolean status) { this.status = status; } }