女朋友问阿里双十一实时大屏如何实现,我惊呆一会,马上手把手教她背后的大数据技术(二)

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
云原生大数据计算服务 MaxCompute,5000CU*H 100GB 3个月
云原生大数据计算服务MaxCompute,500CU*H 100GB 3个月
简介: 女朋友问阿里双十一实时大屏如何实现,我惊呆一会,马上手把手教她背后的大数据技术(二)

2、Flink简单介绍


2009年Flink 诞生于柏林工业大学的一个大数据研究项目 StratoSphere。


2014 年孵化出 Flink捐献给Apache,并成为 Apache 顶级项目,同时 Flink 的主流方向被定位为流式计算并大数据行业内崭露头角。


2015 年阿里巴巴开始使用 Flink 并持续贡献社区(阿里内部还基于Flink做了一套Blink)


2019年1月8日,阿里巴巴以 9000 万欧元(7亿元人民币)收购了创业公司 Data Artisans。 从此Flink开始了新一轮的乘风破浪!在国内流行的一发不可收拾!


image.png


3、Flink官网介绍:https://flink.apache.org/


image.png


四、Flink实现双十一实时大屏



在大数据的实时处理中,实时的大屏展示已经成了一个很重要的展示项,比如最有名的双十一大屏实时销售总价展示。  


今天就做一个最简单的模拟电商统计大屏的小例子,需求如下:


1.实时计算出当天零点截止到当前时间的销售总额


2.计算出各个分类的销售top3


3.每秒钟更新一次统计结果


image.png


实现代码


package cn.lanson.action;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.triggers.ContinuousProcessingTimeTrigger;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import java.math.BigDecimal;
import java.math.RoundingMode;
import java.text.SimpleDateFormat;
import java.util.*;
import java.util.stream.Collectors;
/**
 * Author Lansonli
 * Desc 模拟双十一电商实时大屏显示:
 * 1.实时计算出当天零点截止到当前时间的销售总额
 * 2.计算出各个分类的销售top3
 * 3.每秒钟更新一次统计结果
 */
public class DoubleElevenBigScreem {
    public static void main(String[] args) throws Exception {
        //1.env
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //2.source
        DataStream<Tuple2<String, Double>> dataStream = env.addSource(new MySource());
        //3.transformation
        DataStream<CategoryPojo> result = dataStream
                .keyBy(0)
                .window(
                        //定义大小为一天的窗口,第二个参数表示中国使用的UTC+08:00时区比UTC时间早8小时
                        TumblingProcessingTimeWindows.of(Time.days(1), Time.hours(-8))
                )
                .trigger(
                        ContinuousProcessingTimeTrigger.of(Time.seconds(1))//定义一个1s的触发器
                )
                .aggregate(new PriceAggregate(), new WindowResult());
        //看一下聚合结果
        //result.print("初步聚合结果");
        //4.使用上面聚合的结果,实现业务需求:
        // * 1.实时计算出当天零点截止到当前时间的销售总额
        // * 2.计算出各个分类的销售top3
        // * 3.每秒钟更新一次统计结果
        result.keyBy("dateTime")
                .window(TumblingProcessingTimeWindows.of(Time.seconds(1)))//每秒钟更新一次统计结果
                .process(new WindowResultProcess());//在ProcessWindowFunction中实现该复杂业务逻辑
        env.execute();
    }
    /**
     * 自定义价格聚合函数,其实就是对price的简单sum操作
     */
    private static class PriceAggregate implements AggregateFunction<Tuple2<String, Double>, Double, Double> {
        @Override
        public Double createAccumulator() {
            return 0D;
        }
        @Override
        public Double add(Tuple2<String, Double> value, Double accumulator) {
            return accumulator + value.f1;
        }
        @Override
        public Double getResult(Double accumulator) {
            return accumulator;
        }
        @Override
        public Double merge(Double a, Double b) {
            return a + b;
        }
    }
    /**
     * 自定义WindowFunction,实现如何收集窗口结果数据
     */
    private static class WindowResult implements WindowFunction<Double, CategoryPojo, Tuple, TimeWindow> {
        SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        @Override
        public void apply(Tuple key, TimeWindow window, Iterable<Double> input, Collector<CategoryPojo> out) throws Exception {
            BigDecimal bg = new BigDecimal(input.iterator().next());
            double p = bg.setScale(2, BigDecimal.ROUND_HALF_UP).doubleValue();//四舍五入
            CategoryPojo categoryPojo = new CategoryPojo();
            categoryPojo.setCategory(((Tuple1<String>) key).f0);
            categoryPojo.setTotalPrice(p);
            categoryPojo.setDateTime(simpleDateFormat.format(new Date()));
            out.collect(categoryPojo);
        }
    }
    /**
     * 实现ProcessWindowFunction
     * 在这里我们做最后的结果统计,
     * 把各个分类的总价加起来,就是全站的总销量金额,
     * 然后我们同时使用优先级队列计算出分类销售的Top3,
     * 最后打印出结果,在实际中我们可以把这个结果数据存储到hbase或者redis中,以供前端的实时页面展示。
     */
    private static class WindowResultProcess extends ProcessWindowFunction<CategoryPojo, Object, Tuple, TimeWindow> {
        @Override
        public void process(Tuple tuple, Context context, Iterable<CategoryPojo> elements, Collector<Object> out) throws Exception {
            String date = ((Tuple1<String>) tuple).f0;
            //优先级队列
            //实际开发中常使用PriorityQueue实现大小顶堆来解决topK问题
            //求最大k个元素的问题:使用小顶堆
            //求最小k个元素的问题:使用大顶堆
            //https://blog.csdn.net/hefenglian/article/details/81807527
            Queue<CategoryPojo> queue = new PriorityQueue<>(3,
                    (c1, c2) -> c1.getTotalPrice() >= c2.getTotalPrice() ? 1 : -1);//小顶堆
            double price = 0D;
            Iterator<CategoryPojo> iterator = elements.iterator();
            int s = 0;
            while (iterator.hasNext()) {
                CategoryPojo categoryPojo = iterator.next();
                //使用优先级队列计算出top3
                if (queue.size() < 3) {
                    queue.add(categoryPojo);
                } else {
                    //计算topN的时候需要小顶堆,也就是要去掉堆顶比较小的元素
                    CategoryPojo tmp = queue.peek();//取出堆顶元素
                    if (categoryPojo.getTotalPrice() > tmp.getTotalPrice()) {
                        queue.poll();//移除
                        queue.add(categoryPojo);
                    }
                }
                price += categoryPojo.getTotalPrice();
            }
            //按照TotalPrice逆序
            List<String> list = queue.stream().sorted((c1, c2) -> c1.getTotalPrice() <= c2.getTotalPrice() ? 1 : -1)//逆序
                    .map(c -> "(分类:" + c.getCategory() + " 销售额:" + c.getTotalPrice() + ")")
                    .collect(Collectors.toList());
            System.out.println("时间 :" + date);
            System.out.println("总价 : " + new BigDecimal(price).setScale(2, RoundingMode.HALF_UP));
            System.out.println("Top3 : \n" + StringUtils.join(list, ",\n"));
            System.out.println("-------------");
        }
    }
    /**
     * 用于存储聚合的结果
     */
    @Data
    @AllArgsConstructor
    @NoArgsConstructor
    public static class CategoryPojo {
        private String category;//分类名称
        private double totalPrice;//该分类总销售额
        private String dateTime;// 截止到当前时间的时间
    }
    /**
     * 模拟生成某一个分类下的订单
     */
    public static class MySource implements SourceFunction<Tuple2<String, Double>> {
        private volatile boolean isRunning = true;
        private Random random = new Random();
        String category[] = {
                "女装", "男装",
                "图书", "家电",
                "洗护", "美妆",
                "运动", "游戏",
                "户外", "家具",
                "乐器", "办公"
        };
        @Override
        public void run(SourceContext<Tuple2<String, Double>> ctx) throws Exception {
            while (isRunning) {
                Thread.sleep(10);
                //随机生成一个分类
                String c = category[(int) (Math.random() * (category.length - 1))];
                //随机生成一个该分类下的随机金额的成交订单
                double price = random.nextDouble() * 100;
                ctx.collect(Tuple2.of(c, price));
            }
        }
        @Override
        public void cancel() {
            isRunning = false;
        }
    }
}


五、Flink实现超时订单自动好评



在电商领域会有这么一个场景,如果用户买了商品,在订单完成之后一定时间之内没有做出评价,系统自动给与五星好评, 接下来我使用Flink的定时器来实现这一功能。


image.png


实现代码


package cn.lanson.action;
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.util.Collector;
import java.util.Iterator;
import java.util.Map;
import java.util.Random;
import java.util.UUID;
/**
 * Author Lansonli
 * Desc 在电商领域会有这么一个场景,如果用户买了商品,在订单完成之后,一定时间之内没有做出评价,系统自动给与五星好评,
 * 今天我们使用Flink的定时器来实现这一功能。
 */
public class OrderAutomaticFavorableComments {
    public static void main(String[] args) throws Exception {
        //env
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);
        env.enableCheckpointing(5000);
        //source
        DataStream<Tuple2<String, Long>> dataStream = env.addSource(new MySource());
        //经过interval毫秒用户未对订单做出评价,自动给与好评.为了演示方便,设置了5s的时间
        long interval = 5000L;
        //分组后使用自定义KeyedProcessFunction完成定时判断超时订单并自动好评
        dataStream.keyBy(0).process(new TimerProcessFuntion(interval));
        env.execute();
    }
    /**
     * 定时处理逻辑
     * 1.首先我们定义一个MapState类型的状态,key是订单号,value是订单完成时间
     * 2.在processElement处理数据的时候,把每个订单的信息存入状态中,这个时候不做任何处理,
     * 并且注册一个定时器(在订单完成时间+间隔时间(interval)时触发).
     * 3.注册的时器到达了订单完成时间+间隔时间(interval)时就会触发onTimer方法,我们主要在这个里面进行处理。
     * 我们调用外部的接口来判断用户是否做过评价,
     * 如果没做评价,调用接口给与五星好评,如果做过评价,则什么也不处理,最后记得把相应的订单从MapState删除
     */
    public static class TimerProcessFuntion extends KeyedProcessFunction<Tuple, Tuple2<String, Long>, Object> {
        //定义MapState类型的状态,key是订单号,value是订单完成时间
        private MapState<String, Long> mapState;
        //超过多长时间(interval,单位:毫秒) 没有评价,则自动五星好评
        private long interval = 0L;
        public TimerProcessFuntion(long interval) {
            this.interval = interval;
        }
        //创建MapState
        @Override
        public void open(Configuration parameters) {
            MapStateDescriptor<String, Long> mapStateDesc =
                    new MapStateDescriptor<>("mapStateDesc", String.class, Long.class);
            mapState = getRuntimeContext().getMapState(mapStateDesc);
        }
        //注册定时器
        @Override
        public void processElement(Tuple2<String, Long> value, Context ctx, Collector<Object> out) throws Exception {
            mapState.put(value.f0, value.f1);
            ctx.timerService().registerProcessingTimeTimer(value.f1 + interval);
        }
        //定时器被触发时执行
        @Override
        public void onTimer(long timestamp, OnTimerContext ctx, Collector<Object> out) throws Exception {
            Iterator iterator = mapState.iterator();
            while (iterator.hasNext()) {
                Map.Entry<String, Long> entry = (Map.Entry<String, Long>) iterator.next();
                String orderid = entry.getKey();
                boolean evaluated = isEvaluation(entry.getKey()); //调用方法判断订单是否已评价?
                mapState.remove(orderid);
                if (evaluated) {
                    System.out.println("订单(orderid: "+orderid+")在"+interval+"毫秒时间内已经评价,不做处理");
                }
                if (evaluated) {
                    //如果用户没有做评价,在调用相关的接口给与默认的五星评价
                    System.out.println("订单(orderid: "+orderid+")超过"+interval+"毫秒未评价,调用接口自动给与五星好评");
                }
            }
        }
        //自定义方法实现查询用户是否对该订单进行了评价,我们这里只是随便做了一个判断
        //在生产环境下,可以去查询相关的订单系统.
        private boolean isEvaluation(String key) {
            return key.hashCode() % 2 == 0;
        }
    }
    /**
     * 自定义source模拟生成一些订单数据.
     * 在这里,我们生了一个最简单的二元组Tuple2,包含订单id和订单完成时间两个字段.
     */
    public static class MySource implements SourceFunction<Tuple2<String, Long>> {
        private volatile boolean isRunning = true;
        @Override
        public void run(SourceContext<Tuple2<String, Long>> ctx) throws Exception {
            Random random = new Random();
            while (isRunning) {
                Thread.sleep(1000);
                //订单id
                String orderid = UUID.randomUUID().toString();
                //订单完成时间
                long orderFinishTime = System.currentTimeMillis();
                ctx.collect(Tuple2.of(orderid, orderFinishTime));
            }
        }
        @Override
        public void cancel() {
            isRunning = false;
        }
    }
}


六、大数据行业趋势分析



1、新基建和数字化转型助力大数据+AI多场景落地


新型基础建设场景分析

image.png


各行业数字化转型场景分析


image.png


发展新机遇,产业新高度


image.png


2、多行业场景大数据应用占比


image.png


3、从传统物流到智慧物流演变之旅


image.png


  • 赋能新零售:这种收集物流数据、利用物流数据的手段普及到整个零售行业。
  • 加持传统物流:物流不等同于快递,与传统物流联手,一定是未来智慧物流的必经之路。


4、智慧物流大数据行业级解决方案


  • 基于大型物流公司研发的智慧物流大数据平台,日订单上千万
  • 围绕订单、运输、仓储、搬运装卸、包装以及流通加工等物流环节中涉及的数据信息等
  • 提高运输以及配送效率、减少物流成本、更有效地满足客户服务要求,并针对数据分析结果,提出具有中观指导意义的解决方案


image.png


5、大数据行业趋势分析


image.png


6、大数据技术框架应用


image.png


7、大数据开发岗位


image.png


七、大数据技术知识体系



image.png

相关实践学习
基于MaxCompute的热门话题分析
本实验围绕社交用户发布的文章做了详尽的分析,通过分析能得到用户群体年龄分布,性别分布,地理位置分布,以及热门话题的热度。
SaaS 模式云数据仓库必修课
本课程由阿里云开发者社区和阿里云大数据团队共同出品,是SaaS模式云原生数据仓库领导者MaxCompute核心课程。本课程由阿里云资深产品和技术专家们从概念到方法,从场景到实践,体系化的将阿里巴巴飞天大数据平台10多年的经过验证的方法与实践深入浅出的讲给开发者们。帮助大数据开发者快速了解并掌握SaaS模式的云原生的数据仓库,助力开发者学习了解先进的技术栈,并能在实际业务中敏捷的进行大数据分析,赋能企业业务。 通过本课程可以了解SaaS模式云原生数据仓库领导者MaxCompute核心功能及典型适用场景,可应用MaxCompute实现数仓搭建,快速进行大数据分析。适合大数据工程师、大数据分析师 大量数据需要处理、存储和管理,需要搭建数据仓库?学它! 没有足够人员和经验来运维大数据平台,不想自建IDC买机器,需要免运维的大数据平台?会SQL就等于会大数据?学它! 想知道大数据用得对不对,想用更少的钱得到持续演进的数仓能力?获得极致弹性的计算资源和更好的性能,以及持续保护数据安全的生产环境?学它! 想要获得灵活的分析能力,快速洞察数据规律特征?想要兼得数据湖的灵活性与数据仓库的成长性?学它! 出品人:阿里云大数据产品及研发团队专家 产品 MaxCompute 官网 https://www.aliyun.com/product/odps&nbsp;
目录
相关文章
|
5天前
|
存储 机器学习/深度学习 SQL
大数据处理与分析技术
大数据处理与分析技术
23 2
|
7天前
|
存储 分布式计算 NoSQL
【赵渝强老师】大数据技术的理论基础
本文介绍了大数据平台的核心思想,包括Google的三篇重要论文:Google文件系统(GFS)、MapReduce分布式计算模型和BigTable大表。这些论文奠定了大数据生态圈的技术基础,进而发展出了Hadoop、Spark和Flink等生态系统。文章详细解释了GFS的架构、MapReduce的计算过程以及BigTable的思想和HBase的实现。
|
7天前
|
SQL 存储 算法
比 SQL 快出数量级的大数据计算技术
SQL 是大数据计算中最常用的工具,但在实际应用中,SQL 经常跑得很慢,浪费大量硬件资源。例如,某银行的反洗钱计算在 11 节点的 Vertica 集群上跑了 1.5 小时,而用 SPL 重写后,单机只需 26 秒。类似地,电商漏斗运算和时空碰撞任务在使用 SPL 后,性能也大幅提升。这是因为 SQL 无法写出低复杂度的算法,而 SPL 提供了更强大的数据类型和基础运算,能够实现高效计算。
|
10天前
|
存储 大数据 定位技术
大数据 数据索引技术
【10月更文挑战第26天】
24 3
|
10天前
|
存储 大数据 OLAP
大数据数据分区技术
【10月更文挑战第26天】
38 2
|
13天前
|
消息中间件 分布式计算 大数据
数据为王:大数据处理与分析技术在企业决策中的力量
【10月更文挑战第29天】在信息爆炸的时代,大数据处理与分析技术为企业提供了前所未有的洞察力和决策支持。本文探讨了大数据技术在企业决策中的重要性和实际应用,包括数据的力量、实时分析、数据驱动的决策以及数据安全与隐私保护。通过这些技术,企业能够从海量数据中提取有价值的信息,预测市场趋势,优化业务流程,从而在竞争中占据优势。
44 2
|
15天前
|
存储 分布式计算 Hadoop
数据湖技术:Hadoop与Spark在大数据处理中的协同作用
【10月更文挑战第27天】在大数据时代,数据湖技术凭借其灵活性和成本效益成为企业存储和分析大规模异构数据的首选。Hadoop和Spark作为数据湖技术的核心组件,通过HDFS存储数据和Spark进行高效计算,实现了数据处理的优化。本文探讨了Hadoop与Spark的最佳实践,包括数据存储、处理、安全和可视化等方面,展示了它们在实际应用中的协同效应。
58 2
|
1月前
|
存储 机器学习/深度学习 分布式计算
大数据技术——解锁数据的力量,引领未来趋势
【10月更文挑战第5天】大数据技术——解锁数据的力量,引领未来趋势
|
3天前
|
存储 分布式计算 数据挖掘
数据架构 ODPS 是什么?
数据架构 ODPS 是什么?
41 7
|
3天前
|
存储 分布式计算 大数据
大数据 优化数据读取
【11月更文挑战第4天】
13 2