2、Flink简单介绍
2009年Flink 诞生于柏林工业大学的一个大数据研究项目 StratoSphere。
2014 年孵化出 Flink捐献给Apache,并成为 Apache 顶级项目,同时 Flink 的主流方向被定位为流式计算并大数据行业内崭露头角。
2015 年阿里巴巴开始使用 Flink 并持续贡献社区(阿里内部还基于Flink做了一套Blink)
2019年1月8日,阿里巴巴以 9000 万欧元(7亿元人民币)收购了创业公司 Data Artisans。 从此Flink开始了新一轮的乘风破浪!在国内流行的一发不可收拾!
3、Flink官网介绍:https://flink.apache.org/
四、Flink实现双十一实时大屏
在大数据的实时处理中,实时的大屏展示已经成了一个很重要的展示项,比如最有名的双十一大屏实时销售总价展示。
今天就做一个最简单的模拟电商统计大屏的小例子,需求如下:
1.实时计算出当天零点截止到当前时间的销售总额
2.计算出各个分类的销售top3
3.每秒钟更新一次统计结果
实现代码
package cn.lanson.action; import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; import org.apache.commons.lang3.StringUtils; import org.apache.flink.api.common.functions.AggregateFunction; import org.apache.flink.api.java.tuple.Tuple; import org.apache.flink.api.java.tuple.Tuple1; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction; import org.apache.flink.streaming.api.functions.windowing.WindowFunction; import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.triggers.ContinuousProcessingTimeTrigger; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.util.Collector; import java.math.BigDecimal; import java.math.RoundingMode; import java.text.SimpleDateFormat; import java.util.*; import java.util.stream.Collectors; /** * Author Lansonli * Desc 模拟双十一电商实时大屏显示: * 1.实时计算出当天零点截止到当前时间的销售总额 * 2.计算出各个分类的销售top3 * 3.每秒钟更新一次统计结果 */ public class DoubleElevenBigScreem { public static void main(String[] args) throws Exception { //1.env StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //2.source DataStream<Tuple2<String, Double>> dataStream = env.addSource(new MySource()); //3.transformation DataStream<CategoryPojo> result = dataStream .keyBy(0) .window( //定义大小为一天的窗口,第二个参数表示中国使用的UTC+08:00时区比UTC时间早8小时 TumblingProcessingTimeWindows.of(Time.days(1), Time.hours(-8)) ) .trigger( ContinuousProcessingTimeTrigger.of(Time.seconds(1))//定义一个1s的触发器 ) .aggregate(new PriceAggregate(), new WindowResult()); //看一下聚合结果 //result.print("初步聚合结果"); //4.使用上面聚合的结果,实现业务需求: // * 1.实时计算出当天零点截止到当前时间的销售总额 // * 2.计算出各个分类的销售top3 // * 3.每秒钟更新一次统计结果 result.keyBy("dateTime") .window(TumblingProcessingTimeWindows.of(Time.seconds(1)))//每秒钟更新一次统计结果 .process(new WindowResultProcess());//在ProcessWindowFunction中实现该复杂业务逻辑 env.execute(); } /** * 自定义价格聚合函数,其实就是对price的简单sum操作 */ private static class PriceAggregate implements AggregateFunction<Tuple2<String, Double>, Double, Double> { @Override public Double createAccumulator() { return 0D; } @Override public Double add(Tuple2<String, Double> value, Double accumulator) { return accumulator + value.f1; } @Override public Double getResult(Double accumulator) { return accumulator; } @Override public Double merge(Double a, Double b) { return a + b; } } /** * 自定义WindowFunction,实现如何收集窗口结果数据 */ private static class WindowResult implements WindowFunction<Double, CategoryPojo, Tuple, TimeWindow> { SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); @Override public void apply(Tuple key, TimeWindow window, Iterable<Double> input, Collector<CategoryPojo> out) throws Exception { BigDecimal bg = new BigDecimal(input.iterator().next()); double p = bg.setScale(2, BigDecimal.ROUND_HALF_UP).doubleValue();//四舍五入 CategoryPojo categoryPojo = new CategoryPojo(); categoryPojo.setCategory(((Tuple1<String>) key).f0); categoryPojo.setTotalPrice(p); categoryPojo.setDateTime(simpleDateFormat.format(new Date())); out.collect(categoryPojo); } } /** * 实现ProcessWindowFunction * 在这里我们做最后的结果统计, * 把各个分类的总价加起来,就是全站的总销量金额, * 然后我们同时使用优先级队列计算出分类销售的Top3, * 最后打印出结果,在实际中我们可以把这个结果数据存储到hbase或者redis中,以供前端的实时页面展示。 */ private static class WindowResultProcess extends ProcessWindowFunction<CategoryPojo, Object, Tuple, TimeWindow> { @Override public void process(Tuple tuple, Context context, Iterable<CategoryPojo> elements, Collector<Object> out) throws Exception { String date = ((Tuple1<String>) tuple).f0; //优先级队列 //实际开发中常使用PriorityQueue实现大小顶堆来解决topK问题 //求最大k个元素的问题:使用小顶堆 //求最小k个元素的问题:使用大顶堆 //https://blog.csdn.net/hefenglian/article/details/81807527 Queue<CategoryPojo> queue = new PriorityQueue<>(3, (c1, c2) -> c1.getTotalPrice() >= c2.getTotalPrice() ? 1 : -1);//小顶堆 double price = 0D; Iterator<CategoryPojo> iterator = elements.iterator(); int s = 0; while (iterator.hasNext()) { CategoryPojo categoryPojo = iterator.next(); //使用优先级队列计算出top3 if (queue.size() < 3) { queue.add(categoryPojo); } else { //计算topN的时候需要小顶堆,也就是要去掉堆顶比较小的元素 CategoryPojo tmp = queue.peek();//取出堆顶元素 if (categoryPojo.getTotalPrice() > tmp.getTotalPrice()) { queue.poll();//移除 queue.add(categoryPojo); } } price += categoryPojo.getTotalPrice(); } //按照TotalPrice逆序 List<String> list = queue.stream().sorted((c1, c2) -> c1.getTotalPrice() <= c2.getTotalPrice() ? 1 : -1)//逆序 .map(c -> "(分类:" + c.getCategory() + " 销售额:" + c.getTotalPrice() + ")") .collect(Collectors.toList()); System.out.println("时间 :" + date); System.out.println("总价 : " + new BigDecimal(price).setScale(2, RoundingMode.HALF_UP)); System.out.println("Top3 : \n" + StringUtils.join(list, ",\n")); System.out.println("-------------"); } } /** * 用于存储聚合的结果 */ @Data @AllArgsConstructor @NoArgsConstructor public static class CategoryPojo { private String category;//分类名称 private double totalPrice;//该分类总销售额 private String dateTime;// 截止到当前时间的时间 } /** * 模拟生成某一个分类下的订单 */ public static class MySource implements SourceFunction<Tuple2<String, Double>> { private volatile boolean isRunning = true; private Random random = new Random(); String category[] = { "女装", "男装", "图书", "家电", "洗护", "美妆", "运动", "游戏", "户外", "家具", "乐器", "办公" }; @Override public void run(SourceContext<Tuple2<String, Double>> ctx) throws Exception { while (isRunning) { Thread.sleep(10); //随机生成一个分类 String c = category[(int) (Math.random() * (category.length - 1))]; //随机生成一个该分类下的随机金额的成交订单 double price = random.nextDouble() * 100; ctx.collect(Tuple2.of(c, price)); } } @Override public void cancel() { isRunning = false; } } }
五、Flink实现超时订单自动好评
在电商领域会有这么一个场景,如果用户买了商品,在订单完成之后一定时间之内没有做出评价,系统自动给与五星好评, 接下来我使用Flink的定时器来实现这一功能。
实现代码
package cn.lanson.action; import org.apache.flink.api.common.state.MapState; import org.apache.flink.api.common.state.MapStateDescriptor; import org.apache.flink.api.java.tuple.Tuple; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.KeyedProcessFunction; import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.util.Collector; import java.util.Iterator; import java.util.Map; import java.util.Random; import java.util.UUID; /** * Author Lansonli * Desc 在电商领域会有这么一个场景,如果用户买了商品,在订单完成之后,一定时间之内没有做出评价,系统自动给与五星好评, * 今天我们使用Flink的定时器来实现这一功能。 */ public class OrderAutomaticFavorableComments { public static void main(String[] args) throws Exception { //env StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1); env.enableCheckpointing(5000); //source DataStream<Tuple2<String, Long>> dataStream = env.addSource(new MySource()); //经过interval毫秒用户未对订单做出评价,自动给与好评.为了演示方便,设置了5s的时间 long interval = 5000L; //分组后使用自定义KeyedProcessFunction完成定时判断超时订单并自动好评 dataStream.keyBy(0).process(new TimerProcessFuntion(interval)); env.execute(); } /** * 定时处理逻辑 * 1.首先我们定义一个MapState类型的状态,key是订单号,value是订单完成时间 * 2.在processElement处理数据的时候,把每个订单的信息存入状态中,这个时候不做任何处理, * 并且注册一个定时器(在订单完成时间+间隔时间(interval)时触发). * 3.注册的时器到达了订单完成时间+间隔时间(interval)时就会触发onTimer方法,我们主要在这个里面进行处理。 * 我们调用外部的接口来判断用户是否做过评价, * 如果没做评价,调用接口给与五星好评,如果做过评价,则什么也不处理,最后记得把相应的订单从MapState删除 */ public static class TimerProcessFuntion extends KeyedProcessFunction<Tuple, Tuple2<String, Long>, Object> { //定义MapState类型的状态,key是订单号,value是订单完成时间 private MapState<String, Long> mapState; //超过多长时间(interval,单位:毫秒) 没有评价,则自动五星好评 private long interval = 0L; public TimerProcessFuntion(long interval) { this.interval = interval; } //创建MapState @Override public void open(Configuration parameters) { MapStateDescriptor<String, Long> mapStateDesc = new MapStateDescriptor<>("mapStateDesc", String.class, Long.class); mapState = getRuntimeContext().getMapState(mapStateDesc); } //注册定时器 @Override public void processElement(Tuple2<String, Long> value, Context ctx, Collector<Object> out) throws Exception { mapState.put(value.f0, value.f1); ctx.timerService().registerProcessingTimeTimer(value.f1 + interval); } //定时器被触发时执行 @Override public void onTimer(long timestamp, OnTimerContext ctx, Collector<Object> out) throws Exception { Iterator iterator = mapState.iterator(); while (iterator.hasNext()) { Map.Entry<String, Long> entry = (Map.Entry<String, Long>) iterator.next(); String orderid = entry.getKey(); boolean evaluated = isEvaluation(entry.getKey()); //调用方法判断订单是否已评价? mapState.remove(orderid); if (evaluated) { System.out.println("订单(orderid: "+orderid+")在"+interval+"毫秒时间内已经评价,不做处理"); } if (evaluated) { //如果用户没有做评价,在调用相关的接口给与默认的五星评价 System.out.println("订单(orderid: "+orderid+")超过"+interval+"毫秒未评价,调用接口自动给与五星好评"); } } } //自定义方法实现查询用户是否对该订单进行了评价,我们这里只是随便做了一个判断 //在生产环境下,可以去查询相关的订单系统. private boolean isEvaluation(String key) { return key.hashCode() % 2 == 0; } } /** * 自定义source模拟生成一些订单数据. * 在这里,我们生了一个最简单的二元组Tuple2,包含订单id和订单完成时间两个字段. */ public static class MySource implements SourceFunction<Tuple2<String, Long>> { private volatile boolean isRunning = true; @Override public void run(SourceContext<Tuple2<String, Long>> ctx) throws Exception { Random random = new Random(); while (isRunning) { Thread.sleep(1000); //订单id String orderid = UUID.randomUUID().toString(); //订单完成时间 long orderFinishTime = System.currentTimeMillis(); ctx.collect(Tuple2.of(orderid, orderFinishTime)); } } @Override public void cancel() { isRunning = false; } } }
六、大数据行业趋势分析
1、新基建和数字化转型助力大数据+AI多场景落地
新型基础建设场景分析
各行业数字化转型场景分析
发展新机遇,产业新高度
2、多行业场景大数据应用占比
3、从传统物流到智慧物流演变之旅
- 赋能新零售:这种收集物流数据、利用物流数据的手段普及到整个零售行业。
- 加持传统物流:物流不等同于快递,与传统物流联手,一定是未来智慧物流的必经之路。
4、智慧物流大数据行业级解决方案
- 基于大型物流公司研发的智慧物流大数据平台,日订单上千万
- 围绕订单、运输、仓储、搬运装卸、包装以及流通加工等物流环节中涉及的数据信息等
- 提高运输以及配送效率、减少物流成本、更有效地满足客户服务要求,并针对数据分析结果,提出具有中观指导意义的解决方案
5、大数据行业趋势分析
6、大数据技术框架应用
7、大数据开发岗位
七、大数据技术知识体系