【大数据计算引擎】流式计算引擎Flink3

本文涉及的产品
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
实时计算 Flink 版,5000CU*H 3个月
简介: 【大数据计算引擎】流式计算引擎Flink

8.Flink增量聚合和全窗口函数

8.1.AggregateFunction增量聚合函数

  • 增量聚合函数
aggregate(agg函数,WindowFunction(){  })
  • 窗口保存临时数据,每进入一个新数据,会与中间数据累加,生成新的中间数据,再保存到窗口中
  • 常见的增量聚合函数有 reduceFunction、aggregateFunction
  • min、max、sum 都是简单的聚合操作,不需要自定义规则
AggregateFunction<IN, ACC, OUT>
IN是输入类型,ACC是中间聚合状态类型,OUT是输出类型,是聚合统计当前窗口的数据
  • 滚动窗口聚合案例
/**
 * @author lixiang
 * Tumbling-Window滚动窗口
 */
public class FlinkWindow1Demo {
    public static void main(String[] args) throws Exception {
        //构建环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
        DataStream<VideoOrderDO> ds = env.addSource(new VideoOrderSource());
        WindowedStream<VideoOrderDO, String, TimeWindow> stream = ds.keyBy(new KeySelector<VideoOrderDO, String>() {
            @Override
            public String getKey(VideoOrderDO value) throws Exception {
                return value.getTitle();
            }
        }).window(TumblingProcessingTimeWindows.of(Time.seconds(5)));
        stream.aggregate(new AggregateFunction<VideoOrderDO, Map<String, Object>, Map<String, Object>>() {
            //初始化累加器
            @Override
            public Map<String, Object> createAccumulator() {
                return new HashMap<>();
            }
            //聚合方式
            @Override
            public Map<String, Object> add(VideoOrderDO value, Map<String, Object> accumulator) {
                if (accumulator.size() == 0) {
                    accumulator.put("title", value.getTitle());
                    accumulator.put("money", value.getMoney());
                    accumulator.put("num", 1);
                    accumulator.put("createTime", value.getCreateTime());
                } else {
                    accumulator.put("title", value.getTitle());
                    accumulator.put("money", value.getMoney() + Integer.parseInt(accumulator.get("money").toString()));
                    accumulator.put("num", 1 + Integer.parseInt(accumulator.get("num").toString()));
                    accumulator.put("createTime", value.getCreateTime());
                }
                return accumulator;
            }
            //返回结果
            @Override
            public Map<String, Object> getResult(Map<String, Object> accumulator) {
                return accumulator;
            }
            //合并内容
            @Override
            public Map<String, Object> merge(Map<String, Object> a, Map<String, Object> b) {
                return null;
            }
        }).print();
        env.execute("Tumbling Window job");
    }
}

d47e2532fdeb4d189ce95a5ffa663c8a.jpg

8.2.WindowFunction全窗口函数

  • 全窗口函数
apply(new WindowFunction(){ })
  • 窗口先缓存该窗口所有元素,等窗口的全部数据收集起来后再触发条件计算
  • 常见的全窗口聚合函数 windowFunction(未来可能弃用)、processWindowFunction(可以获取到窗口上下文 更多信息,包括窗口信息)
IN是输入类型,OUT是输出类型,KEY是分组类型,W是时间窗 
WindowFunction<IN, OUT, KEY, W extends Window>
  • 案例实战
/**
 * @author lixiang
 * apply
 */
public class FlinkWindow2Demo {
    public static void main(String[] args) throws Exception {
        //构建环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
        DataStream<VideoOrderDO> ds = env.addSource(new VideoOrderSource());
        WindowedStream<VideoOrderDO, String, TimeWindow> stream = ds.keyBy(new KeySelector<VideoOrderDO, String>() {
            @Override
            public String getKey(VideoOrderDO value) throws Exception {
                return value.getTitle();
            }
        }).window(TumblingProcessingTimeWindows.of(Time.seconds(5)));
        stream.apply(new WindowFunction<VideoOrderDO, Map<String,Object>, String, TimeWindow>() {
            @Override
            public void apply(String key, TimeWindow timeWindow, Iterable<VideoOrderDO> iterable, Collector<Map<String, Object>> collector) throws Exception {
                List<VideoOrderDO> list = IterableUtils.toStream(iterable).collect(Collectors.toList());
                long sum = list.stream().collect(Collectors.summarizingInt(VideoOrderDO::getMoney)).getSum();
                Map<String,Object> map = new HashMap<>();
                map.put("sumMoney",sum);
                map.put("title",key);
                collector.collect(map);
            }
        }).print();
        env.execute("apply Window job");
    }
}


958b3dee8c2e43cfa0de360a9bc01849.jpg

8.3.processWindowFunction全窗口函数

  • 全窗口函数
process(new ProcessWindowFunction(){})
  • 窗口先缓存该窗口所有元素,等窗口的全部数据收集起来后再触发条件计算
  • 常见的全窗口聚合函数 windowFunction(未来可能弃用)、processWindowFunction(可以获取到窗口上下文 更多信息,包括窗口信息)
IN是输入类型,OUT是输出类型,KEY是分组类型,W是时间窗 
ProcessWindowFunction<IN, OUT, KEY, W extends Window>
  • 案例实战
/**
 * @author lixiang
 * process-Window滚动窗口
 */
public class FlinkWindow3Demo {
    public static void main(String[] args) throws Exception {
        //构建环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
        DataStream<VideoOrderDO> ds = env.addSource(new VideoOrderSource());
        WindowedStream<VideoOrderDO, String, TimeWindow> stream = ds.keyBy(new KeySelector<VideoOrderDO, String>() {
            @Override
            public String getKey(VideoOrderDO value) throws Exception {
                return value.getTitle();
            }
        }).window(TumblingProcessingTimeWindows.of(Time.seconds(5)));
        stream.process(new ProcessWindowFunction<VideoOrderDO, Map<String,Object>, String, TimeWindow>() {
            @Override
            public void process(String key, ProcessWindowFunction<VideoOrderDO, Map<String, Object>, String, TimeWindow>.Context context, Iterable<VideoOrderDO> iterable, Collector<Map<String, Object>> collector) throws Exception {
                List<VideoOrderDO> list = IterableUtils.toStream(iterable).collect(Collectors.toList());
                long sum = list.stream().collect(Collectors.summarizingInt(VideoOrderDO::getMoney)).getSum();
                Map<String,Object> map = new HashMap<>();
                map.put("sumMoney",sum);
                map.put("title",key);
                collector.collect(map);
            }
        }).print();
        env.execute("process Window job");
    }
}


d4d4294eb1f146c983f7e8735a3ea666.jpg

窗口函数对比

  • 增量聚合
aggregate(new AggregateFunction(){});
  • 全窗口聚合
apply(new WindowFunction(){})
process(new ProcessWindowFunction(){}) //比WindowFunction功能强大

9.迟到无序数据处理watermark

9.1.Watermark简介和应用

(1)基本概念

  • Window:Window是处理无界流的关键,Windows将流拆分为一个个有限大小的buckets,可以可以在每一个buckets中进行计算
  • start_time,end_time:当Window时时间窗口的时候,每个window都会有一个开始时间和结束时间(前开后闭),这个时间是系统时间
  • event-time: 事件发生时间,是事件发生所在设备的当地时间,
  • 比如一个点击事件的时间发生时间,是用户点击操作所在的手机或电脑的时间
  • Watermarks:可以把他理解为一个水位线,等于evevtTime - delay(比如规定为20分钟),一旦Watermarks大于了某个window的end_time,

就会触发此window的计算,Watermarks就是用来触发1window计算的。

  • Watermaker = 当前计算窗口最大的事件时间 - 允许乱序延迟的时间

推迟窗口触发的时间,实现方式:通过当前窗口中最大的eventTime-延迟时间所得到的Watermark与窗口原始触发时间进行对比,当Watermark大于窗口原始触发时间时则触发窗口执行!!!我们知道,流处理从事件产生,

到流经source,再到operator,中间是有一个过程和时间的,虽然大部分情况下,流到operator的数据都是按照事件产生的时间顺序来的,但是也不排除由于网络、分布式等原因,导致乱序的产生,所谓乱序,

就是指Flink接收到的事件的先后顺序不是严格按照事件的Event Time顺序排列的。

c62015c208e843c7b16a70925a15fee9.jpg

那么此时出现一个问题,一旦出现乱序,如果只根据eventTime决定window的运行,我们不能明确数据是否全部到位,但又不能无限期的等下去此时必须要有个机制来保证一个特定的时间后,必须触发window去进行计算了,这个特别的机制,就是Watermark。

(2)Watermark水位线介绍

  • 由flink的某个operator操作生成后,就在整个程序中随event数据流转
  • With Periodic Watermarks(周期生成,可以定义一个最大允许乱序的时间,用的很多)
  • With Punctuated Watermarks(标点水位线,根据数据流中某些特殊标记事件来生成,相对少)
  • 衡量数据是否乱序的时间,什么时候不用等早之前的数据
  • 是一个全局时间戳,不是某一个key下的值
  • 是一个特殊字段,单调递增的方式,主要是和数据本身的时间戳做比较
  • 用来确定什么时候不再等待更早的数据了,可以触发窗口进行计算,忍耐是有限度的,给迟到的数据一些机会
  • 注意
  • Watermark 设置太小会影响数据准确性,设置太大会影响数据的实时性,更加会加重Flink作业的负担
  • 需要经过测试,和业务相关联,得出一个较合适的值即可
  • 触发计算后,其他窗口内数据再到达也被丢弃

9.2.Watermark案例实战

  • 概念很抽象,下面用一个案例给解释下watermark的作用。
  • 需求:每10s分组统计不同视频的成交总价,数据有乱序延迟,允许5秒的时间。

(1)时间工具类

public class TimeUtil {
    /**
     * 时间处理
     * @param date
     * @return
     */
    public static String toDate(Date date){
        DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
        ZoneId zoneId = ZoneId.systemDefault();
        return formatter.format(date.toInstant().atZone(zoneId));
    }
    /**
     * 字符串转日期类型
     * @param time
     * @return
     */
    public static Date strToDate(String time){
        DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
        LocalDateTime dateTime = LocalDateTime.parse(time, formatter);
        return Date.from(dateTime.atZone(ZoneId.systemDefault()).toInstant());
    }
    /**
     * 时间处理
     * @param date
     * @return
     */
    public static String format(long date){
        DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
        ZoneId zoneId = ZoneId.systemDefault();
        return formatter.format(new Date(date).toInstant().atZone(zoneId));
    }
}

(2)Flink入口函数

/**
 * @author lixiang
 */
public class FlinkWaterDemo {
    public static void main(String[] args) throws Exception {
        //初始化环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //设置并行度
        env.setParallelism(1);
        //监听socket输入
        DataStreamSource<String> source = env.socketTextStream("192.168.139.20", 8888);
        //一对多转换,将输入的字符串转成Tuple类型
        SingleOutputStreamOperator<Tuple3<String, String, Integer>> flatMap = source.flatMap(new FlatMapFunction<String, Tuple3<String, String, Integer>>() {
            @Override
            public void flatMap(String value, Collector<Tuple3<String, String, Integer>> out) throws Exception {
                String[] split = value.split(",");
                out.collect(new Tuple3<String,String,Integer>(split[0],split[1],Integer.parseInt(split[2])));
            }
        });
        //设置watermark,官方文档直接拿来的,注意修改自己的时间参数
        SingleOutputStreamOperator<Tuple3<String, String, Integer>> watermarks = flatMap.assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple3<String, String, Integer>>forBoundedOutOfOrderness(Duration.ofSeconds(5))
                .withTimestampAssigner((element, recordTimestamp) -> {
                    return TimeUtil.strToDate(element.f1).getTime();
                }));
        //根据标题进行分组
        watermarks.keyBy(new KeySelector<Tuple3<String, String, Integer>, String>() {
            @Override
            public String getKey(Tuple3<String, String, Integer> value) throws Exception {
                return value.f0;
            }
        }).window(TumblingEventTimeWindows.of(Time.seconds(10))).apply(new WindowFunction<Tuple3<String, String, Integer>, String, String, TimeWindow>() {
            //滚动窗口,10s一统计,全窗口函数
            @Override
            public void apply(String key, TimeWindow timeWindow, Iterable<Tuple3<String, String, Integer>> iterable, Collector<String> collector) throws Exception {
                List<String> eventTimeList = new ArrayList<>();
                int total = 0;
                for (Tuple3<String, String, Integer> order : iterable) {
                    eventTimeList.add(order.f1);
                    total = total + order.f2;
                }
                String outStr = "分组key:"+key+",总价:"+total+",窗口开始时间:"+TimeUtil.format(timeWindow.getStart())+",窗口结束时间:"+TimeUtil.format(timeWindow.getEnd())+",窗口所有事件时间:"+eventTimeList;
                collector.collect(outStr);
            }
        }).print();
        env.execute("watermark job");
    }
}

(3)测试数据,nc -lk 8888监听8888端口,一条一条的输入

[root@flink ~]# nc -lk 8888
java,2022-11-11 23:12:07,10
java,2022-11-11 23:12:11,10
java,2022-11-11 23:12:08,10
mysql,2022-11-11 23:12:13,20
java,2022-11-11 23:12:13,10
java,2022-11-11 23:12:17,10
java,2022-11-11 23:12:09,10
java,2022-11-11 23:12:20,10
java,2022-11-11 23:12:22,10
java,2022-11-11 23:12:25,10


0531d8e63a2d4cbea40fcc26c96571c3.jpg


d2008403aace4ae1b7309510dc8ae276.jpg

9.3.二次兜底延迟数据处理

  • 超过了watermark的等待后,还有延迟数据到达怎么办?
  • 上一个案例我们发现,第七个窗口本是[0-10s)窗口的数据,
  • 但是[0-10s)窗口的数据已经被统计了,所以数据丢失了,这就需要allowedLateness 来做二次兜底延迟数据处理。
  • 编码很简单,只需要在开窗函数那设置allowedLateness即可
  • 60b28a975ea34c3d9658ccd2957dcf04.jpg
/**
 * @author lixiang
 */
public class FlinkWaterDemo {
    public static void main(String[] args) throws Exception {
        //初始化环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //设置并行度
        env.setParallelism(1);
        //监听socket输入
        DataStreamSource<String> source = env.socketTextStream("192.168.139.20", 8888);
        //一对多转换,将输入的字符串转成Tuple类型
        SingleOutputStreamOperator<Tuple3<String, String, Integer>> flatMap = source.flatMap(new FlatMapFunction<String, Tuple3<String, String, Integer>>() {
            @Override
            public void flatMap(String value, Collector<Tuple3<String, String, Integer>> out) throws Exception {
                String[] split = value.split(",");
                out.collect(new Tuple3<String,String,Integer>(split[0],split[1],Integer.parseInt(split[2])));
            }
        });
        //设置watermark,官方文档直接拿来的,注意修改自己的时间参数
        SingleOutputStreamOperator<Tuple3<String, String, Integer>> watermarks = flatMap.assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple3<String, String, Integer>>forBoundedOutOfOrderness(Duration.ofSeconds(5))
                .withTimestampAssigner((element, recordTimestamp) -> {
                    return TimeUtil.strToDate(element.f1).getTime();
                }));
        //根据标题进行分组
        watermarks.keyBy(new KeySelector<Tuple3<String, String, Integer>, String>() {
            @Override
            public String getKey(Tuple3<String, String, Integer> value) throws Exception {
                return value.f0;
            }
        }).window(TumblingEventTimeWindows.of(Time.seconds(10)))
                //1min的容忍时间,即使时间段窗口被统计了,只要数据没有超过1min就可以再次被统计进去
                .allowedLateness(Time.minutes(1))
                .apply(new WindowFunction<Tuple3<String, String, Integer>, String, String, TimeWindow>() {
            //滚动窗口,10s一统计,全窗口函数
            @Override
            public void apply(String key, TimeWindow timeWindow, Iterable<Tuple3<String, String, Integer>> iterable, Collector<String> collector) throws Exception {
                List<String> eventTimeList = new ArrayList<>();
                int total = 0;
                for (Tuple3<String, String, Integer> order : iterable) {
                    eventTimeList.add(order.f1);
                    total = total + order.f2;
                }
                String outStr = "分组key:"+key+",总价:"+total+",窗口开始时间:"+TimeUtil.format(timeWindow.getStart())+",窗口结束时间:"+TimeUtil.format(timeWindow.getEnd())+",窗口所有事件时间:"+eventTimeList;
                collector.collect(outStr);
            }
        }).print();
        env.execute("watermark job");
    }
}
  • 测试

07d07273944c428f9f2d52ae901750b6.jpg

9.4.最后的兜底延迟数据处理

  • watermark先输出,然后配置allowedLateness 再延长时间,然后到了后更新之前的窗口数据。
  • 数据超过了allowedLateness 后,怎么办?这会就用侧输出流 SideOutput,最终的一个兜底。
  • 侧输出流不会在统计到之前的窗口上,类似于独立存储起来,就是说没有被统计的数据会被单独存放在一个容器中,自定义的去进行最终一致性的操作。我们的数据输出会存放到redis或者mysql,侧输出的那一部分也存放在redis或者mysql,这样等统计之后,我们可以手动的讲结果进行整合。
  • 编码实战
  • 3a6e3c5bbf9642bdbd025444ecc6bb6a.jpg
/**
 * @author lixiang
 */
public class FlinkWaterDemo {
    public static void main(String[] args) throws Exception {
        //初始化环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //设置并行度
        env.setParallelism(1);
        //监听socket输入
        DataStreamSource<String> source = env.socketTextStream("192.168.139.20", 8888);
        //一对多转换,将输入的字符串转成Tuple类型
        SingleOutputStreamOperator<Tuple3<String, String, Integer>> flatMap = source.flatMap(new FlatMapFunction<String, Tuple3<String, String, Integer>>() {
            @Override
            public void flatMap(String value, Collector<Tuple3<String, String, Integer>> out) throws Exception {
                String[] split = value.split(",");
                out.collect(new Tuple3<String,String,Integer>(split[0],split[1],Integer.parseInt(split[2])));
            }
        });
        //设置watermark,官方文档直接拿来的,注意修改自己的时间参数
        SingleOutputStreamOperator<Tuple3<String, String, Integer>> watermarks = flatMap.assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple3<String, String, Integer>>forBoundedOutOfOrderness(Duration.ofSeconds(5))
                .withTimestampAssigner((element, recordTimestamp) -> {
                    return TimeUtil.strToDate(element.f1).getTime();
                }));
        //new 一个OutputTag Bean
        OutputTag<Tuple3<String,String,Integer>> lateData = new OutputTag<Tuple3<String,String,Integer>>("lateData"){};
        //根据标题进行分组
        SingleOutputStreamOperator<String> operator = watermarks.keyBy(new KeySelector<Tuple3<String, String, Integer>, String>() {
                    @Override
                    public String getKey(Tuple3<String, String, Integer> value) throws Exception {
                        return value.f0;
                    }
                }).window(TumblingEventTimeWindows.of(Time.seconds(10)))
                //1min的容忍时间,即使时间段窗口被统计了,只要数据没有超过1min就可以再次被统计进去
                .allowedLateness(Time.minutes(1))
                //侧输入,最后的兜底数据
                .sideOutputLateData(lateData)
                .apply(new WindowFunction<Tuple3<String, String, Integer>, String, String, TimeWindow>() {
                    //滚动窗口,10s一统计,全窗口函数
                    @Override
                    public void apply(String key, TimeWindow timeWindow, Iterable<Tuple3<String, String, Integer>> iterable, Collector<String> collector) throws Exception {
                        List<String> eventTimeList = new ArrayList<>();
                        int total = 0;
                        for (Tuple3<String, String, Integer> order : iterable) {
                            eventTimeList.add(order.f1);
                            total = total + order.f2;
                        }
                        String outStr = "分组key:" + key + ",总价:" + total + ",窗口开始时间:" + TimeUtil.format(timeWindow.getStart()) + ",窗口结束时间:" + TimeUtil.format(timeWindow.getEnd()) + ",窗口所有事件时间:" + eventTimeList;
                        collector.collect(outStr);
                    }
                });
        operator.print();
        //侧输出流数据
        operator.getSideOutput(lateData).print();
        env.execute("watermark job");
    }
}
  • 测试
[root@flink ~]# nc -lk 8888
java,2022-11-11 23:12:07,10
java,2022-11-11 23:12:11,10
java,2022-11-11 23:12:08,10
mysql,2022-11-11 23:12:13,20
java,2022-11-11 23:12:13,10
java,2022-11-11 23:12:17,10
java,2022-11-11 23:12:09,10
java,2022-11-11 23:12:20,10
java,2022-11-11 23:14:22,10 
java,2022-11-11 23:12:25,10 #设置一个超过1分钟的数据,测试

8b23cbd2b4734c7ab2b07f249152bf89.jpg

9.5.Flink多层保证措施归纳

(1)如何保证在需要的窗口内获取指定的数据?数据有乱序延迟

  • flink采用watermark、allowedLateness()、sideOutputLateData()三个机制来保证获取数据。
  • watermark的作用是防止出现延迟乱序,允许等待一会在触发窗口计算。
  • allowLateness,是将窗口关闭时间在延迟一段时间,允许有一个最大迟到时间,allowLateness的数据会重新触发窗口计算
  • sideOutPut是最后的兜底操作,超过allowLateness后,窗口已经彻底关闭,就会把数据放到侧输出流,侧输出流OutputTag tag = new OutputTag(){},由于泛型擦除的问题,需要重写方法,加花括号。

(2)应用场景:实时监控平台

  • 可以用watermark及时输出数据
  • allowLateness 做短期的更新迟到数据
  • sideOutPut做兜底更新保证数据准确性

(3)总结Flink的机制

  • 第一层 窗口window 的作用是从DataStream数据流里指定范围获取数据。
  • 第二层 watermark的作用是防止数据出现乱序延迟,允许窗口等待延迟数据达到,再触发计算
  • 第三层 allowLateness 会让窗口关闭时间再延迟一段时间, 如果还有数据达到,会局部修复数据并主动更新窗口的数据输出
  • 第四层 sideOutPut侧输出流是最后兜底操作,在窗口已经彻底关闭后,所有过期延迟数据放到侧输出流,可以单独获取,存储到某个地方再批量更新之前的聚合的数据
  • 注意
  • Flink 默认的处理方式直接丢弃迟到的数据
  • sideOutPut还可以进行分流功能
  • DataStream没有getSideOutput方法,SingleOutputStreamOperator才有

(4)版本弃用API

新接口,`WatermarkStrategy`,`TimestampAssigner` 和 `WatermarkGenerator` 因为其对时间戳和 watermark 等重点的抽象和分离很清晰,并且还统一了周期性和标记形式的 watermark 生成方式
新接口之前是用AssignerWithPeriodicWatermarks和AssignerWithPunctuatedWatermarks ,现在可以弃用了

10.Flink状态State管理和Checkpoint

10.1.Flink的状态State管理简介

(1)什么是State状态

  • 数据流处理离不开状态管理,比如窗口聚合统计、去重、排序等
  • 是一个Operator的运行的状态\历史值,是维护在内存中
  • 流程:一个算子的子任务接收输入流,获取对应的状态,计算新的结果然后把结果更新到状态里面

(2)有状态和无状态介绍

  • 无状态计算: 同个数据进到算子里面多少次,都是一样的输出,比如 filter
  • 有状态计算:需要考虑历史状态,同个输入会有不同的输出,比如sum、reduce聚合操作

(3)状态管理分类

  • ManagedState
  • Flink管理,自动存储恢复
  • 细分两类
  • Keyed State 键控状态(用的多)
  • 有KeyBy才用这个,仅限用在KeyStream中,每个key都有state ,是基于KeyedStream上的状态
  • 一般是用richFlatFunction,或者其他richfunction里面,在open()声明周期里面进行初始化
  • ValueState、ListState、MapState等数据结构
  • Operator State 算子状态(用的少,部分source会用)
  • ListState、UnionListState、BroadcastState等数据结构
  • RawState
  • 用户自己管理和维护
  • 存储结构:二进制数组
  • State数据结构(状态值可能存在内存、磁盘、DB或者其他分布式存储中)
  • ValueState 简单的存储一个值(ThreadLocal / String)
  • ValueState.value()
  • ValueState.update(T value)
  • ListState 列表
  • ListState.add(T value)
  • ListState.get() //得到一个Iterator
  • MapState 映射类型
  • MapState.get(key)
  • MapState.put(key, value)

10.2.Flink状态State后端存储讲解

从Flink 1.13开始,社区重新设计了其公共状态后端类,以帮助用户更好地理解本地状态存储和检查点存储的分离 用户可以迁移现有应用程序以使用新 API,⽽不会丢失任何状态或⼀致性。

(1)Flink内置了以下这些开箱即用的state backends :

  • (新版)HashMapStateBackend、EmbeddedRocksDBStateBackend
  • 如果没有其他配置,系统将使用 HashMapStateBackend。
  • (旧版)MemoryStateBackend、FsStateBackend、RocksDBStateBackend
  • 如果不设置,默认使用 MemoryStateBackend

(2)State状态详解

HashMapStateBackend 保存数据在内部作为Java堆的对象。

  • 键/值状态和窗口操作符持有哈希表,用于存储值、触发器等
  • 非常快,因为每个状态访问和更新都对 Java 堆上的对象进行操作
  • 但是状态大小受集群内可用内存的限制
  • 场景:
  • 具有大状态、长窗口、大键/值状态的作业。
  • 所有高可用性设置。

EmbeddedRocksDBStateBackend 在RocksDB数据库中保存状态数据

  • 该数据库(默认)存储在 TaskManager 本地数据目录中
  • 与HashMapStateBackend在java存储 对象不同,数据存储为序列化的字节数组
  • RocksDB可以根据可用磁盘空间进行扩展,并且是唯一支持增量快照的状态后端。
  • 但是每个状态访问和更新都需要(反)序列化并可能从磁盘读取,这导致平均性能比内存状态后端慢一个数量级
  • 场景
  • 具有非常大状态、长窗口、大键/值状态的作业。
  • 所有高可用性设置
  • 旧版的状态管理
MemoryStateBackend(内存,不推荐在生产场景使用)
FsStateBackend(文件系统上,本地文件系统、HDFS, 性能更好,常用)
RocksDBStateBackend (无需担心 OOM 风险,是大部分时候的选择)

(3)配置方式

方式一:可以flink-conf.yaml使用配置键在中配置默认状态后端state.backend。

配置条目的可能值是hashmap (HashMapStateBackend)、rocksdb (EmbeddedRocksDBStateBackend) 
或实现状态后端工厂StateBackendFactory的类的完全限定类名
#全局配置例子一
# The backend that will be used to store operator state checkpoints
state.backend: hashmap
# Optional, Flink will automatically default to JobManagerCheckpointStorage
# when no checkpoint directory is specified.
state.checkpoint-storage: jobmanager
#全局配置例子二
state.backend: rocksdb
state.checkpoints.dir: file:///checkpoint-dir/
# Optional, Flink will automatically default to FileSystemCheckpointStorage
# when a checkpoint directory is specified.
state.checkpoint-storage: filesystem

方式二:代码 单独job配置例子

//代码配置一
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStateBackend(new HashMapStateBackend());
env.getCheckpointConfig().setCheckpointStorage(new JobManagerCheckpointStorage());
//代码配置二
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStateBackend(new EmbeddedRocksDBStateBackend());
env.getCheckpointConfig().setCheckpointStorage("file:///checkpoint-dir");
//或者
env.getCheckpointConfig().setCheckpointStorage(new FileSystemCheckpointStorage("file:///checkpoint-dir"));
  • 备注:使用 RocksDBStateBackend 需要加依赖
<dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-statebackend-rocksdb_${scala.version}</artifactId>
            <version>1.13.1</version>
</dependency>

10.3.Flink的状态State管理实战

  • sum()、maxBy() 等函数底层源码也是有ValueState进行状态存储
  • 需求:
  • 根据订单进行分组,统计找出每个商品最大的订单成交额
  • 不用maxBy实现,用ValueState实现
  • 编码实战
/**
 * 使用valueState实现maxBy功能,统计分组内订单金额最高的订单
 * @author lixiang
 */
public class FlinkStateDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        DataStream<String> ds = env.socketTextStream("192.168.139.20", 8888);
        DataStream<Tuple3<String, String, Integer>> flatMapDS = ds.flatMap(new RichFlatMapFunction<String, Tuple3<String, String, Integer>>() {
            @Override
            public void flatMap(String value, Collector<Tuple3<String, String, Integer>> out) throws Exception {
                String[] arr = value.split(",");
                out.collect(Tuple3.of(arr[0], arr[1], Integer.parseInt(arr[2])));
            }
        });
        //一定要key by后才可以使用键控状态ValueState
        SingleOutputStreamOperator<Tuple2<String, Integer>> maxVideoOrder = flatMapDS.keyBy(new KeySelector<Tuple3<String,String,Integer>, String>() {
            @Override
            public String getKey(Tuple3<String, String, Integer> value) throws Exception {
                return value.f0;
            }
        }).map(new RichMapFunction<Tuple3<String, String, Integer>, Tuple2<String, Integer>>() {
            private ValueState<Integer> valueState = null;
            @Override
            public void open(Configuration parameters) throws Exception {
                valueState = getRuntimeContext().getState(new ValueStateDescriptor<Integer>("total", Integer.class));
            }
            @Override
            public Tuple2<String, Integer> map(Tuple3<String, String, Integer> tuple3) throws Exception {
                // 取出State中的最大值
                Integer stateMaxValue = valueState.value();
                Integer currentValue = tuple3.f2;
                if (stateMaxValue == null || currentValue > stateMaxValue) {
                    //更新状态,把当前的作为新的最大值存到状态中
                    valueState.update(currentValue);
                    return Tuple2.of(tuple3.f0, currentValue);
                } else {
                    //历史值更大
                    return Tuple2.of(tuple3.f0, stateMaxValue);
                }
            }
        });
        maxVideoOrder.print();
        env.execute("valueState job");
    }
}
  • 测试
[root@flink ~]# nc -lk 8888
java,2022-11-11 23:12:07,10
java,2022-11-11 23:12:11,10
java,2022-11-11 23:12:08,30
mysql,2022-11-11 23:12:13,20
java,2022-11-11 23:12:13,10


1cc87424018c43fca76b3b75fed7f414.jpg

10.4.Flink的Checkpoint-SavePoint

  • 什么是Checkpoint 检查点
  • Flink中所有的Operator的当前State的全局快照
  • 默认情况下 checkpoint 是禁用的
  • Checkpoint是把State数据定时持久化存储,防止丢失
  • 手工调用checkpoint,叫 savepoint,主要是用于flink集群维护升级等
  • 底层使用了Chandy-Lamport 分布式快照算法,保证数据在分布式环境下的一致性
  • 开箱即用,Flink 捆绑了这些检查点存储类型:
  • 作业管理器检查点存储 JobManagerCheckpointStorage
  • 文件系统检查点存储 FileSystemCheckpointStorage
  • 配置
//全局配置checkpoints
state.checkpoints.dir: hdfs:///checkpoints/
//作业单独配置checkpoints
env.getCheckpointConfig().setCheckpointStorage("hdfs:///checkpoints-data/");
//全局配置savepoint
state.savepoints.dir: hdfs:///flink/savepoints

Savepoint 与 Checkpoint 的不同之处

  • 类似于传统数据库中的备份与恢复日志之间的差异
  • Checkpoint 的主要目的是为意外失败的作业提供【重启恢复机制】,
  • Checkpoint 的生命周期由 Flink 管理,即 Flink 创建,管理和删除 Checkpoint - 无需用户交互
  • Savepoint 由用户创建,拥有和删除, 主要是【升级 Flink 版本】,调整用户逻辑
  • 除去概念上的差异,Checkpoint 和 Savepoint 的当前实现基本上使用相同的代码并生成相同的格式
  • 端到端(end-to-end)状态一致性
数据一致性保证都是由流处理器实现的,也就是说都是在Flink流处理器内部保证的
在真实应用中,了流处理器以外还包含了数据源(例如Kafka、Mysql)和输出到持久化系统(Kafka、Mysql、Hbase、CK)
端到端的一致性保证,是意味着结果的正确性贯穿了整个流处理应用的各个环节,每一个组件都要保证自己的一致性。
  • Source
  • 需要外部数据源可以重置读取位置,当发生故障的时候重置偏移量到故障之前的位置
  • 内部
  • 依赖Checkpoints机制,在发生故障的时可以恢复各个环节的数据
  • Sink:
  • 当故障恢复时,数据不会重复写入外部系统,常见的就是 幂等和事务写入(和checkpoint配合)

10.5.Flink的Checkpoint代码配置

//两个检查点之间间隔时间,默认是0,单位毫秒
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
//Checkpoint过程中出现错误,是否让整体任务都失败,默认值为0,表示不容忍任何Checkpoint失败
env.getCheckpointConfig().setTolerableCheckpointFailureNumber(5);
//Checkpoint是进行失败恢复,当一个 Flink 应用程序失败终止、人为取消等时,它的 Checkpoint 就会被清除
//可以配置不同策略进行操作
// DELETE_ON_CANCELLATION: 当作业取消时,Checkpoint 状态信息会被删除,因此取消任务后,不能从 Checkpoint 位置进行恢复任务
// RETAIN_ON_CANCELLATION(多): 当作业手动取消时,将会保留作业的 Checkpoint 状态信息,要手动清除该作业的 Checkpoint 状态信息
       env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
//Flink 默认提供 Extractly-Once 保证 State 的一致性,还提供了 Extractly-Once,At-Least-Once 两种模式,
// 设置checkpoint的模式为EXACTLY_ONCE,也是默认的,
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
//设置checkpoint的超时时间, 如果规定时间没完成则放弃,默认是10分钟
env.getCheckpointConfig().setCheckpointTimeout(60000);
//设置同一时刻有多少个checkpoint可以同时执行,默认为1就行,以避免占用太多正常数据处理资源
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
//设置了重启策略, 作业在失败后能自动恢复,失败后最多重启3次,每次重启间隔10s
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 10000));

11.Flink复杂事件处理CEP

11.1.Flink复杂事件处理CEP介绍

(1)什么是FlinkCEP

  • CEP全称 Complex event processing 复杂事件处理

  • FlinkCEP 是在 Flink 之上实现的复杂事件处理(CEP)库
  • 擅长高吞吐、低延迟的处理,市场上有多种CEP的解决方案,例如Spark,但是Flink专门类库更方便使用

(2)FlinkCEP用途

  • 检测和发现无边界事件流中多个记录的关联规则,得到满足规则的复杂事件
  • 允许业务定义要从输入流中提取的复杂模式序列

(3)FlinkCEP使用流程

  • 定义pattern
  • pattern应用到数据流,得到模式流
  • 从模式流 获取结果

(4)CEP并不包含在flink中,使用前需要自己导入

 <dependency>
     <groupId>org.apache.flink</groupId>
     <artifactId>flink-cep-scala_2.11</artifactId>
     <version>1.7.0</version>
</dependency>

11.2.Flink的复杂事件CEP常见概念

(1)模式(Pattern):定义处理事件的规则

  • 三种模式PatternAPI
  • 个体模式(Individual Patterns):组成复杂规则的每一个单独的模式定义,就是个体模式

组合模式(Combining Patterns):很多个体模式组合起来,形成组合模式

模式组(Groups of Patterns):将一个组合模式作为条件嵌套在个体模式里,就是模式组

近邻模式


严格近邻:期望所有匹配事件严格地一个接一个出现,中间没有任何不匹配的事件, API是.next()

宽松近邻:允许中间出现不匹配的事件,API是.followedBy()

非确定性宽松近邻:可以忽略已经匹配的条件,API是followedByAny()

指定时间约束:指定模式在多长时间内匹配有效,API是within

如果您不希望事件类型直接跟随另一个,notNext()

如果您不希望事件类型介于其他两种事件类型之间,notFollowedBy()

模式分类

单次模式:接收一次一个事件

循环模式:接收一个或多个事件

(2)其他参数

times:指定固定的循环执行次数

greedy:贪婪模式,尽可能多触发

oneOrMore:指定触发一次或多次

timesOrMore:指定触发固定以上的次数

optional:要么不触发要么触发指定的次数

11.3.Flink复杂事件CEP案例实战

需求:同个账号,在5秒内连续登录失败2次,则认为存在而已登录问题

数据格式 李祥,2022-11-11 12:01:01,-1

/**
 * cep-demo
 * @author lixiang
 */
public class FlinkCEPDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        DataStream<String> ds = env.socketTextStream("192.168.139.20",8888);
        SingleOutputStreamOperator<Tuple3<String, String, Integer>> flatMapDS = ds.flatMap(new FlatMapFunction<String, Tuple3<String, String, Integer>>() {
            @Override
            public void flatMap(String value, Collector<Tuple3<String, String, Integer>> out) throws Exception {
                String[] arr = value.split(",");
                out.collect(Tuple3.of(arr[0], arr[1], Integer.parseInt(arr[2])));
            }
        });
        SingleOutputStreamOperator<Tuple3<String, String, Integer>> watermarks = flatMapDS.assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple3<String, String, Integer>>forMonotonousTimestamps()
                .withTimestampAssigner((event, timestamp) -> TimeUtil.strToDate(event.f1).getTime()));
        KeyedStream<Tuple3<String, String, Integer>, String> keyedStream = watermarks.keyBy(new KeySelector<Tuple3<String, String, Integer>, String>() {
            @Override
            public String getKey(Tuple3<String, String, Integer> value) throws Exception {
                return value.f0;
            }
        });
        //定义模式
        Pattern<Tuple3<String, String, Integer>, Tuple3<String, String, Integer>> pattern = Pattern.<Tuple3<String, String, Integer>>
                        begin("firstTimeLogin")
                .where(new SimpleCondition<Tuple3<String, String, Integer>>() {
                    @Override
                    public boolean filter(Tuple3<String, String, Integer> value) throws Exception {
                        return value.f2 == -1;
                    }
                })
                .next("secondTimeLogin")
                .where(new SimpleCondition<Tuple3<String, String, Integer>>() {
                    @Override
                    public boolean filter(Tuple3<String, String, Integer> value) throws Exception {
                        return value.f2 == -1;
                    }
                }).within(Time.seconds(5));
        //匹配检查
        PatternStream<Tuple3<String, String, Integer>> patternStream = CEP.pattern(keyedStream, pattern);
        SingleOutputStreamOperator<Tuple3<String, String, String>> select = patternStream.select(new PatternSelectFunction<Tuple3<String, String, Integer>, Tuple3<String, String, String>>() {
            @Override
            public Tuple3<String, String, String> select(Map<String, List<Tuple3<String, String, Integer>>> map) throws Exception {
                Tuple3<String, String, Integer> firstLoginFail = map.get("firstTimeLogin").get(0);
                Tuple3<String, String, Integer> secondLoginFail = map.get("secondTimeLogin").get(0);
                return Tuple3.of(firstLoginFail.f0, firstLoginFail.f1, secondLoginFail.f1);
            }
        });
        select.print("匹配结果");
        env.execute("CEP job");
    }
}
  • 测试
张三,2022-11-11 12:01:01,-1
李四,2022-11-11 12:01:10,-1
李四,2022-11-11 12:01:11,-1
张三,2022-11-11 12:01:13,-1
李四,2022-11-11 12:01:14,-1
李四,2022-11-11 12:01:15,1
张三,2022-11-11 12:01:16,-1
李四,2022-11-11 12:01:17,-1
张三,2022-11-11 12:01:20,1

12.Flink项目打包插件+部署

12.1.Flink服务端多种部署模式

Flink 部署方式是灵活,主要是对Flink计算时所需资源的管理方式不同

  • 直接部署启动服务
  • Standalone Cluster集群部署,flink自带集群模式
  • Hadoop YARN 计算资源统一由Hadoop YARN管理资源进行调度,按需使用提高集群的资源利用率
  • Kubernetes 部署
  • Docker部署
  • 53fa7860b2c244a9b9a3cfd323294a66.jpg

12.2.Flink本地模式部署Linux服务器

(1)安装JDK8环境

(1)上传jdk1.8安装包,解压到指目录
tar -xvf jdk-8u181-linux-x64.tar.gz -C /usr/local/
(2)查看解压后的文件
[root@flink ~]# cd /usr/local
[root@flink local]# ls
bin  etc  flink-1.13.1  games  include  jdk1.8.0_181  lib  lib64  libexec  sbin  share  src
(3)jdk1.8.0_181重命名为jdk1.8
mv jdk1.8.0_181/ jdk1.8
(4)配置环境变量
vi /etc/profile
添加配置:
JAVA_HOME=/usr/local/jdk1.8
CLASSPATH=$JAVA_HOME/lib/
PATH=$PATH:$JAVA_HOME/bin
export PATH JAVA_HOME CLASSPATH
(5)刷新配置
source /etc/profile
(6)查看java环境
[root@flink ~]# java -version
java version "1.8.0_181"
Java(TM) SE Runtime Environment (build 1.8.0_181-b13)
Java HotSpot(TM) 64-Bit Server VM (build 25.181-b13, mixed mode)
  • 注意:如果不安装jdk环境的话启动flink会报这个错误
Please specify JAVA_HOME. Either in Flink config ./conf/flink-conf.yaml or as system-wide JAVA_HOME.

(2)准备flink环境

tar -xvf flink-1.13.1-bin-scala_2.12.tgz -C /usr/local/ 
调整配置文件:conf/flink-conf.yaml
#web ui 端口
rest.port=8081
#调整jobmanager和taskmanager的大小,根据自己的机器进行调整
jobmanager.memory.process.size: 256m
taskmanager.memory.process.size: 256m


931200a761b14a7294202f335c9eb208.jpg

本地模式用到这两个脚本
start-cluster.sh
stop-cluster.sh
启动本地模式:./start-cluster.sh
注意这会可能会报错:
The derived from fraction jvm overhead memory (19.200mb (20132659 bytes)) is less than its min value 192.000mb (201326592 bytes), min value will be used instead
原因分析:
flink最少需要192M的内存才能启动,jdk1.8之后堆内存采用的是元空间,元空间是根据内存大小自动分配的。如果元空间给的太小,flink将会启动不起来。
调整如下配置:conf/flink-conf.yaml
taskmanager.memory.process.size: 512m
taskmanager.memory.framework.heap.size: 64m
taskmanager.memory.framework.off-heap.size: 64m
taskmanager.memory.jvm-metaspace.size: 64m
taskmanager.memory.jvm-overhead.fraction: 0.2
taskmanager.memory.jvm-overhead.min: 16m
taskmanager.memory.jvm-overhead.max: 64m
taskmanager.memory.network.fraction: 0.1
taskmanager.memory.network.min: 1mb
taskmanager.memory.network.max: 256mb
  • 重新启动,成功

72b2885066154f9f94b4f5315e1b79bb.jpg

  • 访问web UI ip:8081


ffb46c06437847e2a1cb18f6b8d0f9f6.jpg


相关实践学习
如何在云端创建MySQL数据库
开始实验后,系统会自动创建一台自建MySQL的 源数据库 ECS 实例和一台 目标数据库 RDS。
全面了解阿里云能为你做什么
阿里云在全球各地部署高效节能的绿色数据中心,利用清洁计算为万物互联的新世界提供源源不断的能源动力,目前开服的区域包括中国(华北、华东、华南、香港)、新加坡、美国(美东、美西)、欧洲、中东、澳大利亚、日本。目前阿里云的产品涵盖弹性计算、数据库、存储与CDN、分析与搜索、云通信、网络、管理与监控、应用服务、互联网中间件、移动服务、视频服务等。通过本课程,来了解阿里云能够为你的业务带来哪些帮助 &nbsp; &nbsp; 相关的阿里云产品:云服务器ECS 云服务器 ECS(Elastic Compute Service)是一种弹性可伸缩的计算服务,助您降低 IT 成本,提升运维效率,使您更专注于核心业务创新。产品详情: https://www.aliyun.com/product/ecs
相关文章
|
2月前
|
存储 SQL 大数据
用实时计算释放当下企业大数据潜能
本文整理自阿里云高级产品解决方案架构师王启华(敖北)老师在 Flink Forward Asia 2023 中闭门会的分享。
300 8
用实时计算释放当下企业大数据潜能
|
2月前
|
API C# Shell
WPF与Windows Shell完美融合:深入解析文件系统操作技巧——从基本文件管理到高级Shell功能调用,全面掌握WPF中的文件处理艺术
【8月更文挑战第31天】Windows Presentation Foundation (WPF) 是 .NET Framework 的关键组件,用于构建 Windows 桌面应用程序。WPF 提供了丰富的功能来创建美观且功能强大的用户界面。本文通过问题解答的形式,探讨了如何在 WPF 应用中集成 Windows Shell 功能,并通过具体示例代码展示了文件系统的操作方法,包括列出目录下的所有文件、创建和删除文件、移动和复制文件以及打开文件夹或文件等。
45 0
|
2月前
|
Java Spring 安全
Spring 框架邂逅 OAuth2:解锁现代应用安全认证的秘密武器,你准备好迎接变革了吗?
【8月更文挑战第31天】现代化应用的安全性至关重要,OAuth2 作为实现认证和授权的标准协议之一,被广泛采用。Spring 框架通过 Spring Security 提供了强大的 OAuth2 支持,简化了集成过程。本文将通过问答形式详细介绍如何在 Spring 应用中集成 OAuth2,包括 OAuth2 的基本概念、集成步骤及资源服务器保护方法。首先,需要在项目中添加 `spring-security-oauth2-client` 和 `spring-security-oauth2-resource-server` 依赖。
42 0
|
2月前
|
消息中间件 数据挖掘 Kafka
揭秘大数据时代的极速王者!Flink:颠覆性流处理引擎,让实时数据分析燃爆你的想象力!
【8月更文挑战第29天】Apache Flink 是一个高性能的分布式流处理框架,适用于高吞吐量和低延迟的实时数据处理。它采用统一执行引擎处理有界和无界数据流,具备精确状态管理和灵活窗口操作等特性。Flink 支持毫秒级处理和广泛生态集成,但学习曲线较陡峭,社区相对较小。通过实时日志分析示例,我们展示了如何利用 Flink 从 Kafka 中读取数据并进行词频统计,体现了其强大功能和灵活性。
32 0
|
2月前
|
机器学习/深度学习 监控 大数据
Serverless 应用的监控与调试问题之Flink在整个开源大数据生态中应该如何定位,差异化该如何保持
Serverless 应用的监控与调试问题之Flink在整个开源大数据生态中应该如何定位,差异化该如何保持
|
29天前
|
存储 大数据 数据挖掘
【数据新纪元】Apache Doris:重塑实时分析性能,解锁大数据处理新速度,引爆数据价值潜能!
【9月更文挑战第5天】Apache Doris以其卓越的性能、灵活的架构和高效的数据处理能力,正在重塑实时分析的性能极限,解锁大数据处理的新速度,引爆数据价值的无限潜能。在未来的发展中,我们有理由相信Apache Doris将继续引领数据处理的潮流,为企业提供更快速、更准确、更智能的数据洞察和决策支持。让我们携手并进,共同探索数据新纪元的无限可能!
81 11
|
2月前
|
存储 分布式计算 大数据
MaxCompute 数据分区与生命周期管理
【8月更文第31天】随着大数据分析需求的增长,如何高效地管理和组织数据变得至关重要。阿里云的 MaxCompute(原名 ODPS)是一个专为海量数据设计的计算服务,它提供了丰富的功能来帮助用户管理和优化数据。本文将重点讨论 MaxCompute 中的数据分区策略和生命周期管理方法,并通过具体的代码示例来展示如何实施这些策略。
82 1
|
2月前
数据平台问题之在数据影响决策的过程中,如何实现“决策/行动”阶段
数据平台问题之在数据影响决策的过程中,如何实现“决策/行动”阶段
|
2月前
|
存储 监控 安全
大数据架构设计原则:构建高效、可扩展与安全的数据生态系统
【8月更文挑战第23天】大数据架构设计是一个复杂而系统的工程,需要综合考虑业务需求、技术选型、安全合规等多个方面。遵循上述设计原则,可以帮助企业构建出既高效又安全的大数据生态系统,为业务创新和决策支持提供强有力的支撑。随着技术的不断发展和业务需求的不断变化,持续优化和调整大数据架构也将成为一项持续的工作。
|
2月前
|
分布式计算 DataWorks 关系型数据库
DataWorks产品使用合集之ODPS数据怎么Merge到MySQL数据库
DataWorks作为一站式的数据开发与治理平台,提供了从数据采集、清洗、开发、调度、服务化、质量监控到安全管理的全套解决方案,帮助企业构建高效、规范、安全的大数据处理体系。以下是对DataWorks产品使用合集的概述,涵盖数据处理的各个环节。
下一篇
无影云桌面