大数据——Flink学习

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
云数据库 Tair(兼容Redis),内存型 2GB
Redis 开源版,标准版 2GB
推荐场景:
搭建游戏排行榜
简介: 1. Flink简介

Apache Flink is a framework and distributed processing engine for stateful computations over unbounded and bounded data streams. Flink has been designed to run in all common cluster environments, perform computations at in-memory speed and at any scale.


Apache Flink是一个框架和分布式处理引擎,用于对无边界和有边界的数据流进行有状态的计算。Flink被设计为可以在所有常见的集群环境中运行,以内存速度和任何规模执行计算。


1.2 为什么选择Flink


流数据 源源不断的数据


目标: 低延迟 高吞吐 准确性 容错性


1.3 Flink特点


事件驱动


有界流 使用DataSet


无界流  使用 DataStreamAPI


分层API


支持事件时间和处理时间


精确一次的状态一致性保证


低延迟 每秒百万个事件 毫秒级延迟


高可用


与众多常用存储系统的链接


1.4 Flink VS Spark Streaming


流处理 vs 微批处理


  1. 数据模型
  2. 运行时架构


2.Flink 部署


2.1 Standalone单机


可以使用webui界面部署


也可以使用shell命令


#启动命令

/bin/start-cluster.sh

#停止

/bin/stop-cluster.sh


#提交任务

/bin/flink run -c [指定启动类] -p [并行度] [要提交的jar包地址] [指定jvm参数]

#查看当前所有作业

/bin/flink list

#取消作业

/bin/flink cancel [jobId]


2.2. Yarn


需要hadoop集群


没有安装条件 略


2.3 k8s



3. Flink 运行架构


3.1 运行时组件


3.1.1 作业管理器JobManager

image.png

3.1.2 任务管理器TaskManager

image.png

3.1.3 资源管理器ResourceManager

image.png

3.1.4 分发器Dispatcher

image.png

3.2 任务提交流程

image.png

image.png

3.3. 任务调度原理

image.png

3.4  Slot


并行度: 一个特定算子的子任务的个数称为其并行度


一般情况下,一个stream的并行度,可以认为就是其所有算子中最大的并行度


Slots 是指Flink计算中执行一个线程所需要资源(CPU,内存)的最小单元


所以Slot的数量一般设置为TaskManager(JVM)的核心数


Slot 有分组的概念


如果是不同的组,必须使用不同的Slot


3.5 程序与数据流DataFlow


Flinke程序分为三大块: Source transform sink


数据传输的形式:


  1. One-to-one 必须是同共享组,并行度也相同的情况下才会One-to-one
  2. Redistributing  重新分区操作, 当并行度不一样时会进行重新分区轮询操作


4. 流处理API


流处理过程


Environment => source => transform => sink


4.1 Environment


执行环境


//流处理执行环境

StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();

// 批处理执行环境

ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();


//创建本地执行环境

ExecutionEnvironment.createLocalEnvironment([并行度]);


//创建远程执行环境

ExecutionEnvironment.createRemoteEnvironment(host,port,jar包地址 );


4.2 Source


Flink可以从不同数据源读取数据


4.2.1 从集合和元素中读取数据


API executionEnvironment.fromCollection(list);


public static void main(String[] args) throws Exception {


       // 创建流处理执行环境

       StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();


       //设置并行度1

       executionEnvironment.setParallelism(1);


       // 创造集合数据

       List<SensorReading> list = new ArrayList<>();

       for (int i = 0; i < 5; i++) {

           list.add(new SensorReading("Sensor" + i, LocalDateTime.now().toInstant(ZoneOffset.of("+8")).toEpochMilli(), ThreadLocalRandom.current().nextDouble(35, 40)));

       }

       // 从集合中收集数据

       DataStreamSource<SensorReading> sensorReadingDataStreamSource = executionEnvironment.fromCollection(list);

       // 打印集合数据

       sensorReadingDataStreamSource.print("sensor");


       // 从元素中收集数据

       DataStreamSource<Integer> integerDataStreamSource = executionEnvironment.fromElements(1, 2, 3, 4, 56, 7);

       // 打印从元素中收集到数据

       integerDataStreamSource.print("element");


       // 执行Flink程序

       executionEnvironment.execute();


   }


4.2.2 从文件中读取数据


API executionEnvironment.readTextFile(inputPath);


public static void main(String[] args) throws Exception {

       StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();

       String inputPath = "E:\\张尧\\idea项目\\FlinkTutorial\\src\\main\\resources\\word.txt";

  DataStreamSource<String> dataStreamSource = executionEnvironment.readTextFile(inputPath);


       SingleOutputStreamOperator<Tuple2<String, Integer>> sum = dataStreamSource.flatMap(new WorkCount.MyFlagMapFunction()).keyBy(0).sum(1);

       sum.print();


       executionEnvironment.execute();

   }


4.2.3 从Kafka中读取数据


4.2.3.1 kafka配置


下载kafka 1.0.0版本以上


需要配置kafka的监听地址(本机除外)

image.png

修改config/server.properties


advertised.listeners=PLAINTEXT://192.168.164.205:9092


#启动kafka bin目录下

#启动zookeeper

./zookeeper-server-start.sh ../config/zookeeper.properties

#启动kafka

./kafka-server-start.sh config/server.properties


package com.zy.flink.source;


import org.apache.flink.api.common.serialization.SimpleStringSchema;

import org.apache.flink.streaming.api.datastream.DataStreamSource;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;


import java.util.Properties;


/**

* @author: zhangyao

* @create:2020-12-21 10:04

* @Description: 测试kafka数据源

**/

public class KafkaSourceTest {


   public static void main(String[] args) throws Exception {


       // 创建kafka连接配置信息

       Properties properties = new Properties();

       properties.setProperty("bootstrap.servers", "192.168.164.205:9092");

//        properties.setProperty("group.id", "")


       // 创建流处理执行环境

       StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();

       executionEnvironment.setParallelism(1);


       // 从kafka从获取数据

       DataStreamSource<String> dataStreamSource = executionEnvironment.addSource(new FlinkKafkaConsumer<String>("sourcetest",

               new SimpleStringSchema(), properties));


       dataStreamSource.print();


       executionEnvironment.execute();

   }

}


4.2.4 自定义数据源


package com.zy.Flink.source;


import com.sun.org.apache.xpath.internal.operations.Bool;

import com.zy.Flink.entity.SensorReading;

import org.apache.flink.streaming.api.datastream.DataStreamSource;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import org.apache.flink.streaming.api.functions.source.SourceFunction;


import java.time.LocalDateTime;

import java.time.ZoneOffset;

import java.util.HashMap;

import java.util.Random;

import java.util.concurrent.ThreadLocalRandom;


/**

* @author: zhangyao

* @create:2020-12-19 17:21

* @Description: 自定义数据源测试

**/

public class UDFSourceTest {


   public static void main(String[] args) throws Exception {

       StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();

       executionEnvironment.setParallelism(1);

       DataStreamSource<SensorReading> sensorReadingDataStreamSource = executionEnvironment.addSource(new MySensorSource());

       sensorReadingDataStreamSource.print();

       executionEnvironment.execute();

   }



   public static class MySensorSource implements SourceFunction<SensorReading>{


       //定义属性控制数据的生成

       private Boolean running = true;




       @Override

       public void run(SourceContext<SensorReading> sourceContext) throws Exception {

           //定义传感器集合

           HashMap<String, Double> map = new HashMap<>();

           for (int i = 0; i < 10; i++) {

               map.put("sensor"+i, 60 + ThreadLocalRandom.current().nextGaussian() * 20);

           }

           while (running){

               for (String s : map.keySet()) {

                   sourceContext.collect(new SensorReading(s, LocalDateTime.now().toInstant(ZoneOffset.of("+8")).toEpochMilli(),map.get(s)+ThreadLocalRandom.current().nextGaussian()));

               }

               Thread.sleep(1000L);

           }

       }


       @Override

       public void cancel() {

           running = false;

       }

   }

}


4.3 Transform


转换算子


4.3.1 基本转换算子


map flatMap filter 这三个是基本转换算子


package com.zy.Flink.transform;


import com.zy.Flink.entity.SensorReading;

import org.apache.flink.api.common.functions.FilterFunction;

import org.apache.flink.api.common.functions.FlatMapFunction;

import org.apache.flink.api.common.functions.MapFunction;

import org.apache.flink.streaming.api.datastream.DataStreamSource;

import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import org.apache.flink.util.Collector;


import java.time.LocalDateTime;

import java.time.ZoneOffset;

import java.util.ArrayList;

import java.util.List;

import java.util.concurrent.ThreadLocalRandom;


/**

* @author: zhangyao

* @create:2020-12-19 17:55

* @Description:

**/

public class TransormTest {


   public static void main(String[] args) throws Exception {

       StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();

       executionEnvironment.setParallelism(1);


       // 创造集合数据

       List<SensorReading> list = new ArrayList<>();

       for (int i = 0; i < 5; i++) {

           list.add(new SensorReading("Sensor" + i, LocalDateTime.now().toInstant(ZoneOffset.of("+8")).toEpochMilli(), ThreadLocalRandom.current().nextDouble(35, 40)));

       }


       // 使用集合收集数据

       DataStreamSource<SensorReading> sensorReadingDataStreamSource = executionEnvironment.fromCollection(list);



       // map转换 返回sensorReading的sensorId

       SingleOutputStreamOperator<Object> map = sensorReadingDataStreamSource.map(new MapFunction<SensorReading, Object>() {

           @Override

           public Object map(SensorReading sensorReading) throws Exception {

               return sensorReading.getId();

           }

       });


       // flatMap转换 将各个属性拆分输出

       SingleOutputStreamOperator<Object> flatMap = sensorReadingDataStreamSource.flatMap(new FlatMapFunction<SensorReading, Object>() {

           @Override

           public void flatMap(SensorReading sensorReading, Collector<Object> collector) throws Exception {

               String[] split = sensorReading.toString().split(", ");

               for (String s : split) {

                   collector.collect(s);

               }

           }

       });



       // filter 过滤转换

       SingleOutputStreamOperator<SensorReading> filter = sensorReadingDataStreamSource.filter(new FilterFunction<SensorReading>() {

           @Override

           public boolean filter(SensorReading sensorReading) throws Exception {

               return sensorReading.getId().equals("Sensor2");


           }

       });


       // 输出map转换后的数据

       map.print("map");


       // 输出flatmao转换后的数据

       flatMap.print("flatMap");


       // 输出filter转换后的数据

       filter.print("filter");


       executionEnvironment.execute();

   }

}


4.3.2 聚合算子


keyBy 滚动聚合算子(Rolling Aggregation min() max() sum()  minBy() maxBy()) Reduce是聚合类的算子


package com.zy.Flink.transform;


import com.zy.Flink.TestUtil;

import com.zy.Flink.entity.SensorReading;

import org.apache.flink.streaming.api.datastream.DataStreamSource;

import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;


/**

* @author: zhangyao

* @create:2020-12-20 09:27

* @Description: 滚动聚合算子转换

**/

public class TransormTest3 {


   public static void main(String[] args) throws Exception {

       // 创建流执行环境

       StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();


       // 设置并行度

       executionEnvironment.setParallelism(1);


       // 从集合收集数据

       DataStreamSource<SensorReading> sensorReadingDataStreamSource = executionEnvironment.fromCollection(TestUtil.createTestCollection());


       // 根据id keyBy 取最大温度

       SingleOutputStreamOperator<SensorReading> max = sensorReadingDataStreamSource.keyBy("id").maxBy("tmpperature");


       // 输出

       max.print();


       // 执行

       executionEnvironment.execute();


   }

}


聚合算子 min() 与 minBy() 的区别: min 只有聚合的字段是最小的,其他字段还是第一次收集 到的数据


minBy()是最小的聚合字段对应的数据


4.3.3 Reduce


package com.zy.Flink.transform;


import com.zy.Flink.TestUtil;

import com.zy.Flink.entity.SensorReading;

import org.apache.flink.api.java.functions.KeySelector;

import org.apache.flink.streaming.api.datastream.DataStreamSource;

import org.apache.flink.streaming.api.datastream.KeyedStream;

import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;


/**

* @author: zhangyao

* @create:2020-12-20 11:37

* @Description: reduce操作

**/

public class TransormTest4_Reduce {


   public static void main(String[] args) throws Exception {

       StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();

       executionEnvironment.setParallelism(1);


       DataStreamSource<SensorReading> sensorReadingDataStreamSource = executionEnvironment.fromCollection(TestUtil.createTestCollection());


       KeyedStream<SensorReading, Object> keyedStream = sensorReadingDataStreamSource.keyBy(new KeySelector<SensorReading, Object>() {

           @Override

           public Object getKey(SensorReading value) throws Exception {

               return value.getId();

           }

       });


       SingleOutputStreamOperator<SensorReading> reduce = keyedStream.reduce((value1, value2) -> {

           return new SensorReading(value1.getId(), System.currentTimeMillis(), Math.max(value1.getTmpperature(), value2.getTmpperature()));

       });


       reduce.print();


       executionEnvironment.execute();


   }

}


4.3.4 多流转换


split(1.12移除)


connect map 只能合并两条流


union 合并多条流


4.3.5 数据类型


Flink支持的数据类型


  1. 支持Java和scale的所有基本数据类型(包括包装类)
  2. Java 和 Scale 元组
  3. Scale样例类 ?
  4. Java 简单对象 (空参构造)
  5. Java 集合 枚举


4.3.6 UDF 函数


  1. 函数类
  2. 匿名函数
  3. 富函数


4.3.7 数据重分区


4.3.7.1 shuffle


打乱分区顺序,重新分区


4.3.7.2 keyby


根据hash计算出分区,相同的key一定在同一分区(同一分区的key不一定相同)


4.3.7.3 global


当前的所有流发送到下一分区(同一个分区)


package com.zy.flink.transform;


import com.zy.flink.entity.SensorReading;

import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;


/**

* @author: zhangyao

* @create:2020-12-21 09:11

* @Description: 分区操作

**/

public class TransfromTest6_partition {

   public static void main(String[] args) throws Exception {

       // 创建执行环境

       StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();


       // 要测试充分区操作,就不能设置并行度为1

       executionEnvironment.setParallelism(4);


       // 读取文件数据

       SingleOutputStreamOperator<SensorReading> dataStreamSource = executionEnvironment.readTextFile("E" +

               ":\\张尧\\idea项目\\tl\\Flink_learn\\learn_feature\\src\\main\\resources" +

               "\\sensorReading.txt").map(line -> {

           String[] split = line.split(",");

           return new SensorReading(split[0],new Long(split[1]),new Double(split[2]));

       });


       //输出源流

       dataStreamSource.print("input");


       // shuffle

       dataStreamSource.shuffle().print("shuffle");


       // keyBy

       dataStreamSource.keyBy("id").print("keyBy");


       // global

       dataStreamSource.global().print("global");


       // 执行作业

       executionEnvironment.execute();



   }

}


4.4 Sink


4.4.1 写入kafka


package com.zy.flink.sink;


import org.apache.flink.api.common.serialization.SimpleStringSchema;

import org.apache.flink.streaming.api.datastream.DataStreamSink;

import org.apache.flink.streaming.api.datastream.DataStreamSource;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;

import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;

import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011;


import java.util.Properties;


/**

* @author: zhangyao

* @create:2020-12-21 10:30

* @Description: 从kafka中读取数据,写入到kafka

**/

public class KafkaSink {


   public static void main(String[] args) throws Exception {

       // 创建流处理执行环境

       StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();

       executionEnvironment.setParallelism(1);


       // 从kafka获取消息

       // 创建kafka配置信息

       Properties properties = new Properties();

       properties.setProperty("bootstrap.servers", "192.168.164.205:9092");


       // 消费消息

       DataStreamSource<String> sourcetest =

               executionEnvironment.addSource(new FlinkKafkaConsumer<String>("sourcetest",

               new SimpleStringSchema(), properties));



       // 写入kafka

       DataStreamSink sinktest = sourcetest.addSink(new FlinkKafkaProducer("192.168.164.205:9092", "sinktest", new SimpleStringSchema()));


       // 执行作业

       executionEnvironment.execute();


   }

}


需要依赖kafka connector连接器


<!-- kafka依赖 -->

       <dependency>

           <groupId>org.apache.flink</groupId>

           <artifactId>flink-connector-kafka_${flinks-clients.suffix.version}</artifactId>

       </dependency>


4.4.2 写入redis


<flinks-clients.suffix.version>2.11</flinks-clients.suffix.version>

       <flinks-redis.version>1.0</flinks-redis.version>



<!-- flink写入redis依赖-->

           <!-- https://mvnrepository.com/artifact/org.apache.bahir/flink-connector-redis -->

           <dependency>

               <groupId>org.apache.bahir</groupId>

               <artifactId>flink-connector-redis_${flinks-clients.suffix.version}</artifactId>

               <version>${flinks-redis.version}</version>

           </dependency>


package com.zy.flink.sink;


import com.zy.flink.TestUtil;

import com.zy.flink.entity.SensorReading;

import org.apache.flink.streaming.api.datastream.DataStreamSink;

import org.apache.flink.streaming.api.datastream.DataStreamSource;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import org.apache.flink.streaming.connectors.redis.RedisSink;

import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisConfigBase;

import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;

import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;

import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;

import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;


/**

* @author: zhangyao

* @create:2020-12-21 10:54

* @Description: 从文件中读取到数据输出到redis

**/

public class RedisSinkTest {


   public static void main(String[] args) throws Exception {

       StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();


       DataStreamSource<SensorReading> sensorReadingDataStreamSource = executionEnvironment.fromCollection(TestUtil.createTestCollection());


       // 创建jedis配置环境

       FlinkJedisPoolConfig flinkJedisPoolConfig =

               new FlinkJedisPoolConfig.Builder().setHost("localhost").setPort(6379).build();

       DataStreamSink test = sensorReadingDataStreamSource.addSink(new RedisSink(flinkJedisPoolConfig,

               new RedisMapper<SensorReading>() {

                   // 创建执行方法的描述

                   @Override

                   public RedisCommandDescription getCommandDescription() {

                       return new RedisCommandDescription(RedisCommand.HSET, "test");

                   }


                   @Override

                   public String getKeyFromData(SensorReading sensorReading) {

                       return sensorReading.getId();

                   }


                   @Override

                   public String getValueFromData(SensorReading sensorReading) {

                       return sensorReading.getTmpperature().toString();

                   }

               }));


       executionEnvironment.execute();

   }

}


4.4.3 写入es


4.4.4 写入jdbc


就是写入数据库


5. window


5.1 window 概念


窗口


窗口就是无界流切割为有限流的一种方式


5.2  window 类型


  1. 时间窗口
  1. 滚动时间窗口 Tumbling Windows
  1. 时间对齐,窗口长度固定,一个数据只属于一个窗口
  1. image.png
  2. 滑动时间窗口
  3. image.png
  1. 滑动窗口有步长,一个数据可以存在多个窗口
  1. 会话窗口
  1. image.png
  2. 时间无对齐
  1. 计数窗口
  1. 滚动计数窗口
  2. 滑动计数窗口

5.3. window API


5.3.1 窗口分配器


window()


timeWindow()


countWindow()


都是开窗操作


5.3.2 window Function


当开窗之后,需要使用window Function进行聚合操作


  1. 增量聚合函数 bucket中只存储一个sum的结果,每来一条数据就计算一次
  1. ReduceFunction
  2. AggregateFunction
  3. ...
  1. 全窗口函数 收集所有的数据放入bucket 最终计算
  1. ProcessWindowFunction
  2. WindowFunction
  3. ...


package com.zy.flink.window;


import com.zy.flink.entity.SensorReading;

import org.apache.flink.api.common.functions.AggregateFunction;

import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import org.apache.flink.streaming.api.windowing.time.Time;


/**

* @author: zhangyao

* @create:2020-12-21 14:38

* @Description: 时间窗口 滚动

**/

public class WindowApiTest1_TimeWindow {


   public static void main(String[] args) throws Exception {

       StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();

//        DataStreamSource<SensorReading> sensorReadingDataStreamSource = executionEnvironment.fromCollection(TestUtil.createTestCollection());

       SingleOutputStreamOperator<SensorReading> dataStreamSource = executionEnvironment.socketTextStream(

               "192.168.164.205", 8888).map(line -> {

           String[] splits = line.split(",");

           return new SensorReading(splits[0], new Long(splits[1]), new Double(splits[2]));

       });

       SingleOutputStreamOperator<Integer> resultStream = dataStreamSource.keyBy("id")

               .timeWindow(Time.seconds(10))

               .aggregate(new AggregateFunction<SensorReading, Integer, Integer>() {

                   @Override

                   public Integer createAccumulator() {

                       return 0;

                   }


                   @Override

                   public Integer add(SensorReading s, Integer integer) {

                       return integer + 1;

                   }


                   @Override

                   public Integer getResult(Integer integer) {

                       return integer;

                   }


                   @Override

                   public Integer merge(Integer integer, Integer acc1) {

                       return integer + acc1;

                   }

               });


       resultStream.print();

       executionEnvironment.execute();

   }

}


5.3.3 可选API


  1. trigger 触发器
  1. 定义window的关闭时间,什么时候触发计算输出结果
  1. evictor 移除器
  1. 定义移除某些数据的逻辑
  1. allowLateness
  1. 允许迟到数据
  1. sideOutputLateDate
  1. 将迟到数据放入侧输出流
  1. getSideOutput
  1. 获取测输出流


6. 时间


6.1 时间语义


分为三个时间


  1. event Time 事件本身的时间,业务数据产生自身的时间
  2. Ingestion Time 数据进入Flink的时间
  3. Process Time flink操作算子进行计算的计算时间


6.2 EventTime


设置eventTime


executionEnvironment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);


6.3 水位线 watemark


水位线用来处理乱序数据的延迟到达,当数据由于网络或分布式导致到达时间不是顺序时,要使用水位线平衡延迟


6.4 水位线的传递,引入和设定

目录
相关文章
|
28天前
|
分布式计算 大数据 Apache
ClickHouse与大数据生态集成:Spark & Flink 实战
【10月更文挑战第26天】在当今这个数据爆炸的时代,能够高效地处理和分析海量数据成为了企业和组织提升竞争力的关键。作为一款高性能的列式数据库系统,ClickHouse 在大数据分析领域展现出了卓越的能力。然而,为了充分利用ClickHouse的优势,将其与现有的大数据处理框架(如Apache Spark和Apache Flink)进行集成变得尤为重要。本文将从我个人的角度出发,探讨如何通过这些技术的结合,实现对大规模数据的实时处理和分析。
82 2
ClickHouse与大数据生态集成:Spark & Flink 实战
|
2月前
|
数据采集 数据可视化 大数据
大数据体系知识学习(三):数据清洗_箱线图的概念以及代码实现
这篇文章介绍了如何使用Python中的matplotlib和numpy库来创建箱线图,以检测和处理数据集中的异常值。
49 1
大数据体系知识学习(三):数据清洗_箱线图的概念以及代码实现
|
29天前
|
分布式计算 大数据 OLAP
AnalyticDB与大数据生态集成:Spark & Flink
【10月更文挑战第25天】在大数据时代,实时数据处理和分析变得越来越重要。AnalyticDB(ADB)是阿里云推出的一款完全托管的实时数据仓库服务,支持PB级数据的实时分析。为了充分发挥AnalyticDB的潜力,将其与大数据处理工具如Apache Spark和Apache Flink集成是非常必要的。本文将从我个人的角度出发,分享如何将AnalyticDB与Spark和Flink集成,构建端到端的大数据处理流水线,实现数据的实时分析和处理。
54 1
zdl
|
15天前
|
消息中间件 运维 大数据
大数据实时计算产品的对比测评:实时计算Flink版 VS 自建Flink集群
本文介绍了实时计算Flink版与自建Flink集群的对比,涵盖部署成本、性能表现、易用性和企业级能力等方面。实时计算Flink版作为全托管服务,显著降低了运维成本,提供了强大的集成能力和弹性扩展,特别适合中小型团队和业务波动大的场景。文中还提出了改进建议,并探讨了与其他产品的联动可能性。总结指出,实时计算Flink版在简化运维、降低成本和提升易用性方面表现出色,是大数据实时计算的优选方案。
zdl
51 0
|
2月前
|
存储 SQL 分布式计算
大数据学习
【10月更文挑战第15天】
44 1
|
2月前
|
分布式计算 大数据 Linux
大数据体系知识学习(二):WordCount案例实现及错误总结
这篇文章介绍了如何使用PySpark进行WordCount操作,包括环境配置、代码实现、运行结果和遇到的错误。作者在运行过程中遇到了Py4JJavaError和JAVA_HOME未设置的问题,并通过导入findspark初始化和设置环境变量解决了这些问题。文章还讨论了groupByKey和reduceByKey的区别。
33 1
|
2月前
|
分布式计算 Hadoop 大数据
大数据体系知识学习(一):PySpark和Hadoop环境的搭建与测试
这篇文章是关于大数据体系知识学习的,主要介绍了Apache Spark的基本概念、特点、组件,以及如何安装配置Java、PySpark和Hadoop环境。文章还提供了详细的安装步骤和测试代码,帮助读者搭建和测试大数据环境。
61 1
|
2月前
|
消息中间件 分布式计算 Kafka
大数据平台的毕业设计02:Spark与实时计算
大数据平台的毕业设计02:Spark与实时计算
|
3月前
|
运维 数据处理 数据安全/隐私保护
阿里云实时计算Flink版测评报告
该测评报告详细介绍了阿里云实时计算Flink版在用户行为分析与标签画像中的应用实践,展示了其毫秒级的数据处理能力和高效的开发流程。报告还全面评测了该服务在稳定性、性能、开发运维及安全性方面的卓越表现,并对比自建Flink集群的优势。最后,报告评估了其成本效益,强调了其灵活扩展性和高投资回报率,适合各类实时数据处理需求。
|
27天前
|
存储 分布式计算 流计算
实时计算 Flash – 兼容 Flink 的新一代向量化流计算引擎
本文介绍了阿里云开源大数据团队在实时计算领域的最新成果——向量化流计算引擎Flash。文章主要内容包括:Apache Flink 成为业界流计算标准、Flash 核心技术解读、性能测试数据以及在阿里巴巴集团的落地效果。Flash 是一款完全兼容 Apache Flink 的新一代流计算引擎,通过向量化技术和 C++ 实现,大幅提升了性能和成本效益。
859 17
实时计算 Flash – 兼容 Flink 的新一代向量化流计算引擎