女朋友问阿里双十一实时大屏如何实现,我惊呆一会,马上手把手教她背后的大数据技术(二)

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
云原生大数据计算服务 MaxCompute,5000CU*H 100GB 3个月
云原生大数据计算服务MaxCompute,500CU*H 100GB 3个月
简介: 女朋友问阿里双十一实时大屏如何实现,我惊呆一会,马上手把手教她背后的大数据技术(二)

2、Flink简单介绍


2009年Flink 诞生于柏林工业大学的一个大数据研究项目 StratoSphere。


2014 年孵化出 Flink捐献给Apache,并成为 Apache 顶级项目,同时 Flink 的主流方向被定位为流式计算并大数据行业内崭露头角。


2015 年阿里巴巴开始使用 Flink 并持续贡献社区(阿里内部还基于Flink做了一套Blink)


2019年1月8日,阿里巴巴以 9000 万欧元(7亿元人民币)收购了创业公司 Data Artisans。 从此Flink开始了新一轮的乘风破浪!在国内流行的一发不可收拾!


image.png


3、Flink官网介绍:https://flink.apache.org/


image.png


四、Flink实现双十一实时大屏



在大数据的实时处理中,实时的大屏展示已经成了一个很重要的展示项,比如最有名的双十一大屏实时销售总价展示。  


今天就做一个最简单的模拟电商统计大屏的小例子,需求如下:


1.实时计算出当天零点截止到当前时间的销售总额


2.计算出各个分类的销售top3


3.每秒钟更新一次统计结果


image.png


实现代码


package cn.lanson.action;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.triggers.ContinuousProcessingTimeTrigger;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import java.math.BigDecimal;
import java.math.RoundingMode;
import java.text.SimpleDateFormat;
import java.util.*;
import java.util.stream.Collectors;
/**
 * Author Lansonli
 * Desc 模拟双十一电商实时大屏显示:
 * 1.实时计算出当天零点截止到当前时间的销售总额
 * 2.计算出各个分类的销售top3
 * 3.每秒钟更新一次统计结果
 */
public class DoubleElevenBigScreem {
    public static void main(String[] args) throws Exception {
        //1.env
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //2.source
        DataStream<Tuple2<String, Double>> dataStream = env.addSource(new MySource());
        //3.transformation
        DataStream<CategoryPojo> result = dataStream
                .keyBy(0)
                .window(
                        //定义大小为一天的窗口,第二个参数表示中国使用的UTC+08:00时区比UTC时间早8小时
                        TumblingProcessingTimeWindows.of(Time.days(1), Time.hours(-8))
                )
                .trigger(
                        ContinuousProcessingTimeTrigger.of(Time.seconds(1))//定义一个1s的触发器
                )
                .aggregate(new PriceAggregate(), new WindowResult());
        //看一下聚合结果
        //result.print("初步聚合结果");
        //4.使用上面聚合的结果,实现业务需求:
        // * 1.实时计算出当天零点截止到当前时间的销售总额
        // * 2.计算出各个分类的销售top3
        // * 3.每秒钟更新一次统计结果
        result.keyBy("dateTime")
                .window(TumblingProcessingTimeWindows.of(Time.seconds(1)))//每秒钟更新一次统计结果
                .process(new WindowResultProcess());//在ProcessWindowFunction中实现该复杂业务逻辑
        env.execute();
    }
    /**
     * 自定义价格聚合函数,其实就是对price的简单sum操作
     */
    private static class PriceAggregate implements AggregateFunction<Tuple2<String, Double>, Double, Double> {
        @Override
        public Double createAccumulator() {
            return 0D;
        }
        @Override
        public Double add(Tuple2<String, Double> value, Double accumulator) {
            return accumulator + value.f1;
        }
        @Override
        public Double getResult(Double accumulator) {
            return accumulator;
        }
        @Override
        public Double merge(Double a, Double b) {
            return a + b;
        }
    }
    /**
     * 自定义WindowFunction,实现如何收集窗口结果数据
     */
    private static class WindowResult implements WindowFunction<Double, CategoryPojo, Tuple, TimeWindow> {
        SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        @Override
        public void apply(Tuple key, TimeWindow window, Iterable<Double> input, Collector<CategoryPojo> out) throws Exception {
            BigDecimal bg = new BigDecimal(input.iterator().next());
            double p = bg.setScale(2, BigDecimal.ROUND_HALF_UP).doubleValue();//四舍五入
            CategoryPojo categoryPojo = new CategoryPojo();
            categoryPojo.setCategory(((Tuple1<String>) key).f0);
            categoryPojo.setTotalPrice(p);
            categoryPojo.setDateTime(simpleDateFormat.format(new Date()));
            out.collect(categoryPojo);
        }
    }
    /**
     * 实现ProcessWindowFunction
     * 在这里我们做最后的结果统计,
     * 把各个分类的总价加起来,就是全站的总销量金额,
     * 然后我们同时使用优先级队列计算出分类销售的Top3,
     * 最后打印出结果,在实际中我们可以把这个结果数据存储到hbase或者redis中,以供前端的实时页面展示。
     */
    private static class WindowResultProcess extends ProcessWindowFunction<CategoryPojo, Object, Tuple, TimeWindow> {
        @Override
        public void process(Tuple tuple, Context context, Iterable<CategoryPojo> elements, Collector<Object> out) throws Exception {
            String date = ((Tuple1<String>) tuple).f0;
            //优先级队列
            //实际开发中常使用PriorityQueue实现大小顶堆来解决topK问题
            //求最大k个元素的问题:使用小顶堆
            //求最小k个元素的问题:使用大顶堆
            //https://blog.csdn.net/hefenglian/article/details/81807527
            Queue<CategoryPojo> queue = new PriorityQueue<>(3,
                    (c1, c2) -> c1.getTotalPrice() >= c2.getTotalPrice() ? 1 : -1);//小顶堆
            double price = 0D;
            Iterator<CategoryPojo> iterator = elements.iterator();
            int s = 0;
            while (iterator.hasNext()) {
                CategoryPojo categoryPojo = iterator.next();
                //使用优先级队列计算出top3
                if (queue.size() < 3) {
                    queue.add(categoryPojo);
                } else {
                    //计算topN的时候需要小顶堆,也就是要去掉堆顶比较小的元素
                    CategoryPojo tmp = queue.peek();//取出堆顶元素
                    if (categoryPojo.getTotalPrice() > tmp.getTotalPrice()) {
                        queue.poll();//移除
                        queue.add(categoryPojo);
                    }
                }
                price += categoryPojo.getTotalPrice();
            }
            //按照TotalPrice逆序
            List<String> list = queue.stream().sorted((c1, c2) -> c1.getTotalPrice() <= c2.getTotalPrice() ? 1 : -1)//逆序
                    .map(c -> "(分类:" + c.getCategory() + " 销售额:" + c.getTotalPrice() + ")")
                    .collect(Collectors.toList());
            System.out.println("时间 :" + date);
            System.out.println("总价 : " + new BigDecimal(price).setScale(2, RoundingMode.HALF_UP));
            System.out.println("Top3 : \n" + StringUtils.join(list, ",\n"));
            System.out.println("-------------");
        }
    }
    /**
     * 用于存储聚合的结果
     */
    @Data
    @AllArgsConstructor
    @NoArgsConstructor
    public static class CategoryPojo {
        private String category;//分类名称
        private double totalPrice;//该分类总销售额
        private String dateTime;// 截止到当前时间的时间
    }
    /**
     * 模拟生成某一个分类下的订单
     */
    public static class MySource implements SourceFunction<Tuple2<String, Double>> {
        private volatile boolean isRunning = true;
        private Random random = new Random();
        String category[] = {
                "女装", "男装",
                "图书", "家电",
                "洗护", "美妆",
                "运动", "游戏",
                "户外", "家具",
                "乐器", "办公"
        };
        @Override
        public void run(SourceContext<Tuple2<String, Double>> ctx) throws Exception {
            while (isRunning) {
                Thread.sleep(10);
                //随机生成一个分类
                String c = category[(int) (Math.random() * (category.length - 1))];
                //随机生成一个该分类下的随机金额的成交订单
                double price = random.nextDouble() * 100;
                ctx.collect(Tuple2.of(c, price));
            }
        }
        @Override
        public void cancel() {
            isRunning = false;
        }
    }
}


五、Flink实现超时订单自动好评



在电商领域会有这么一个场景,如果用户买了商品,在订单完成之后一定时间之内没有做出评价,系统自动给与五星好评, 接下来我使用Flink的定时器来实现这一功能。


image.png


实现代码


package cn.lanson.action;
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.util.Collector;
import java.util.Iterator;
import java.util.Map;
import java.util.Random;
import java.util.UUID;
/**
 * Author Lansonli
 * Desc 在电商领域会有这么一个场景,如果用户买了商品,在订单完成之后,一定时间之内没有做出评价,系统自动给与五星好评,
 * 今天我们使用Flink的定时器来实现这一功能。
 */
public class OrderAutomaticFavorableComments {
    public static void main(String[] args) throws Exception {
        //env
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);
        env.enableCheckpointing(5000);
        //source
        DataStream<Tuple2<String, Long>> dataStream = env.addSource(new MySource());
        //经过interval毫秒用户未对订单做出评价,自动给与好评.为了演示方便,设置了5s的时间
        long interval = 5000L;
        //分组后使用自定义KeyedProcessFunction完成定时判断超时订单并自动好评
        dataStream.keyBy(0).process(new TimerProcessFuntion(interval));
        env.execute();
    }
    /**
     * 定时处理逻辑
     * 1.首先我们定义一个MapState类型的状态,key是订单号,value是订单完成时间
     * 2.在processElement处理数据的时候,把每个订单的信息存入状态中,这个时候不做任何处理,
     * 并且注册一个定时器(在订单完成时间+间隔时间(interval)时触发).
     * 3.注册的时器到达了订单完成时间+间隔时间(interval)时就会触发onTimer方法,我们主要在这个里面进行处理。
     * 我们调用外部的接口来判断用户是否做过评价,
     * 如果没做评价,调用接口给与五星好评,如果做过评价,则什么也不处理,最后记得把相应的订单从MapState删除
     */
    public static class TimerProcessFuntion extends KeyedProcessFunction<Tuple, Tuple2<String, Long>, Object> {
        //定义MapState类型的状态,key是订单号,value是订单完成时间
        private MapState<String, Long> mapState;
        //超过多长时间(interval,单位:毫秒) 没有评价,则自动五星好评
        private long interval = 0L;
        public TimerProcessFuntion(long interval) {
            this.interval = interval;
        }
        //创建MapState
        @Override
        public void open(Configuration parameters) {
            MapStateDescriptor<String, Long> mapStateDesc =
                    new MapStateDescriptor<>("mapStateDesc", String.class, Long.class);
            mapState = getRuntimeContext().getMapState(mapStateDesc);
        }
        //注册定时器
        @Override
        public void processElement(Tuple2<String, Long> value, Context ctx, Collector<Object> out) throws Exception {
            mapState.put(value.f0, value.f1);
            ctx.timerService().registerProcessingTimeTimer(value.f1 + interval);
        }
        //定时器被触发时执行
        @Override
        public void onTimer(long timestamp, OnTimerContext ctx, Collector<Object> out) throws Exception {
            Iterator iterator = mapState.iterator();
            while (iterator.hasNext()) {
                Map.Entry<String, Long> entry = (Map.Entry<String, Long>) iterator.next();
                String orderid = entry.getKey();
                boolean evaluated = isEvaluation(entry.getKey()); //调用方法判断订单是否已评价?
                mapState.remove(orderid);
                if (evaluated) {
                    System.out.println("订单(orderid: "+orderid+")在"+interval+"毫秒时间内已经评价,不做处理");
                }
                if (evaluated) {
                    //如果用户没有做评价,在调用相关的接口给与默认的五星评价
                    System.out.println("订单(orderid: "+orderid+")超过"+interval+"毫秒未评价,调用接口自动给与五星好评");
                }
            }
        }
        //自定义方法实现查询用户是否对该订单进行了评价,我们这里只是随便做了一个判断
        //在生产环境下,可以去查询相关的订单系统.
        private boolean isEvaluation(String key) {
            return key.hashCode() % 2 == 0;
        }
    }
    /**
     * 自定义source模拟生成一些订单数据.
     * 在这里,我们生了一个最简单的二元组Tuple2,包含订单id和订单完成时间两个字段.
     */
    public static class MySource implements SourceFunction<Tuple2<String, Long>> {
        private volatile boolean isRunning = true;
        @Override
        public void run(SourceContext<Tuple2<String, Long>> ctx) throws Exception {
            Random random = new Random();
            while (isRunning) {
                Thread.sleep(1000);
                //订单id
                String orderid = UUID.randomUUID().toString();
                //订单完成时间
                long orderFinishTime = System.currentTimeMillis();
                ctx.collect(Tuple2.of(orderid, orderFinishTime));
            }
        }
        @Override
        public void cancel() {
            isRunning = false;
        }
    }
}


六、大数据行业趋势分析



1、新基建和数字化转型助力大数据+AI多场景落地


新型基础建设场景分析

image.png


各行业数字化转型场景分析


image.png


发展新机遇,产业新高度


image.png


2、多行业场景大数据应用占比


image.png


3、从传统物流到智慧物流演变之旅


image.png


  • 赋能新零售:这种收集物流数据、利用物流数据的手段普及到整个零售行业。
  • 加持传统物流:物流不等同于快递,与传统物流联手,一定是未来智慧物流的必经之路。


4、智慧物流大数据行业级解决方案


  • 基于大型物流公司研发的智慧物流大数据平台,日订单上千万
  • 围绕订单、运输、仓储、搬运装卸、包装以及流通加工等物流环节中涉及的数据信息等
  • 提高运输以及配送效率、减少物流成本、更有效地满足客户服务要求,并针对数据分析结果,提出具有中观指导意义的解决方案


image.png


5、大数据行业趋势分析


image.png


6、大数据技术框架应用


image.png


7、大数据开发岗位


image.png


七、大数据技术知识体系



image.png

相关实践学习
基于MaxCompute的热门话题分析
本实验围绕社交用户发布的文章做了详尽的分析,通过分析能得到用户群体年龄分布,性别分布,地理位置分布,以及热门话题的热度。
SaaS 模式云数据仓库必修课
本课程由阿里云开发者社区和阿里云大数据团队共同出品,是SaaS模式云原生数据仓库领导者MaxCompute核心课程。本课程由阿里云资深产品和技术专家们从概念到方法,从场景到实践,体系化的将阿里巴巴飞天大数据平台10多年的经过验证的方法与实践深入浅出的讲给开发者们。帮助大数据开发者快速了解并掌握SaaS模式的云原生的数据仓库,助力开发者学习了解先进的技术栈,并能在实际业务中敏捷的进行大数据分析,赋能企业业务。 通过本课程可以了解SaaS模式云原生数据仓库领导者MaxCompute核心功能及典型适用场景,可应用MaxCompute实现数仓搭建,快速进行大数据分析。适合大数据工程师、大数据分析师 大量数据需要处理、存储和管理,需要搭建数据仓库?学它! 没有足够人员和经验来运维大数据平台,不想自建IDC买机器,需要免运维的大数据平台?会SQL就等于会大数据?学它! 想知道大数据用得对不对,想用更少的钱得到持续演进的数仓能力?获得极致弹性的计算资源和更好的性能,以及持续保护数据安全的生产环境?学它! 想要获得灵活的分析能力,快速洞察数据规律特征?想要兼得数据湖的灵活性与数据仓库的成长性?学它! 出品人:阿里云大数据产品及研发团队专家 产品 MaxCompute 官网 https://www.aliyun.com/product/odps&nbsp;
目录
打赏
0
0
0
0
19
分享
相关文章
从“笨重大象”到“敏捷火花”:Hadoop与Spark的大数据技术进化之路
从“笨重大象”到“敏捷火花”:Hadoop与Spark的大数据技术进化之路
171 79
大数据项目成功的秘诀——不只是技术,更是方法论!
大数据项目成功的秘诀——不只是技术,更是方法论!
67 8
大数据项目成功的秘诀——不只是技术,更是方法论!
大数据在电子健康记录中的潜力与挑战:一次技术和伦理的深度碰撞
大数据在电子健康记录中的潜力与挑战:一次技术和伦理的深度碰撞
61 12
随着云计算和大数据技术的发展,Hyper-V在虚拟化领域的地位日益凸显
随着云计算和大数据技术的发展,Hyper-V在虚拟化领域的地位日益凸显。作为Windows Server的核心组件,Hyper-V具备卓越的技术性能,支持高可用性、动态迁移等功能,确保虚拟机稳定高效运行。它与Windows深度集成,管理便捷,支持远程管理和自动化部署,降低管理成本。内置防火墙、RBAC等安全功能,提供全方位安全保障。作为内置组件,Hyper-V无需额外购买软件,降低成本。其广泛的生态系统支持和持续增长的市场需求,使其成为企业虚拟化解决方案的首选。
大数据基础工程技术团队4篇论文入选ICLR,ICDE,WWW
大数据基础工程技术团队4篇论文入选ICLR,ICDE,WWW
技术评测:MaxCompute MaxFrame——阿里云自研分布式计算框架的Python编程接口
随着大数据和人工智能技术的发展,数据处理的需求日益增长。阿里云推出的MaxCompute MaxFrame(简称“MaxFrame”)是一个专为Python开发者设计的分布式计算框架,它不仅支持Python编程接口,还能直接利用MaxCompute的云原生大数据计算资源和服务。本文将通过一系列最佳实践测评,探讨MaxFrame在分布式Pandas处理以及大语言模型数据处理场景中的表现,并分析其在实际工作中的应用潜力。
179 2
轻量级的大数据处理技术
现代大数据应用架构中,数据中心作为核心,连接数据源与应用,承担着数据处理与服务的重要角色。然而,随着数据量的激增,数据中心面临运维复杂、体系封闭及应用间耦合性高等挑战。为缓解这些问题,一种轻量级的解决方案——esProc SPL应运而生。esProc SPL通过集成性、开放性、高性能、数据路由和敏捷性等特性,有效解决了现有架构的不足,实现了灵活高效的数据处理,特别适用于应用端的前置计算,降低了整体成本和复杂度。
从湖仓分离到湖仓一体,四川航空基于 SelectDB 的多源数据联邦分析实践
川航选择引入 SelectDB 建设湖仓一体大数据分析引擎,取得了数据导入效率提升 3-6 倍,查询分析性能提升 10-18 倍、实时性提升至 5 秒内等收益。
从湖仓分离到湖仓一体,四川航空基于 SelectDB 的多源数据联邦分析实践
别急着上算法,咱先把数据整明白:大数据分析的5个基本步骤,你都搞对了吗?
别急着上算法,咱先把数据整明白:大数据分析的5个基本步骤,你都搞对了吗?
30 4
数据驱动智能,智能优化数据——大数据与人工智能的双向赋能
数据驱动智能,智能优化数据——大数据与人工智能的双向赋能
62 4

热门文章

最新文章

AI助理

你好,我是AI助理

可以解答问题、推荐解决方案等