【大数据计算引擎】流式计算引擎Flink2

本文涉及的产品
Redis 开源版,标准版 2GB
推荐场景:
搭建游戏排行榜
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
实时计算 Flink 版,1000CU*H 3个月
简介: 【大数据计算引擎】流式计算引擎Flink

5.Flink里的Sink Operator实战

5.1.Sink Operator速览

  • Flink编程模型


86383c057b25424498c1917effbe6f4d.jpg


(1)Sink输出源

  • 预定义
  • print
  • writeAsText(过期)
  • 自定义
  • SinkFunction
  • RichSinkFunction
  • Rich相关的api更丰富,多了Open、Close方法,用于初始化连接等
  • Flink官方提供的Bundle Connector
  • Kafka、ES等
  • Apache Bahir
  • Kafka、ES、Redis等

5.2.自定义Sink连接Mysql

(1)部署MySQL环境

docker pull mysql:5.7
docker run -itd -p 3306:3306 --name my-mysql -e MYSQL_ROOT_PASSWORD=123456 mysql:5.7

(2)连接MySQL创建表


0482af245b6d4d839d5b6fff2225538d.jpg


CREATE TABLE `video_order` (
  `id` int(11) unsigned NOT NULL AUTO_INCREMENT,
  `user_id` int(11) DEFAULT NULL,
  `money` int(11) DEFAULT NULL,
  `title` varchar(32) DEFAULT NULL,
  `trade_no` varchar(64) DEFAULT NULL,
  `create_time` date DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

(3)加入flink-mysql依赖

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-jdbc_2.12</artifactId>
    <version>1.12.0</version>
</dependency>
<dependency>
    <groupId>mysql</groupId>
    <artifactId>mysql-connector-java</artifactId>
    <version>8.0.25</version>
</dependency>

(4)编写MySQLSink

/**
 * @author lixiang
 */
public class MysqlSink extends RichSinkFunction<VideoOrderDO> {
    private Connection conn = null;
    private PreparedStatement ps = null;
    @Override
    public void invoke(VideoOrderDO videoOrder, Context context) throws Exception {
        //给ps中的?设置具体值
        ps.setInt(1, videoOrder.getUserId());
        ps.setInt(2, videoOrder.getMoney());
        ps.setString(3, videoOrder.getTitle());
        ps.setString(4, videoOrder.getTradeNo());
        ps.setDate(5, new Date(videoOrder.getCreateTime().getTime()));
        int i = ps.executeUpdate();
        System.out.println("处理数据,插入数据库结果:" + (i > 0));
    }
    @Override
    public void open(Configuration parameters) throws Exception {
        System.out.println("---open---");
        conn = DriverManager.getConnection("jdbc:mysql://192.168.139.20:3306/flink?useUnicode=true&characterEncoding=utf8&allowMultiQueries=true&serverTimezone=Asia/Shanghai", "root", "123456");
        String sql = "INSERT INTO `video_order` (`user_id`, `money`, `title`, `trade_no`, `create_time`) VALUES(?,?,?,?,?);";
        ps = conn.prepareStatement(sql);
    }
    @Override
    public void close() throws Exception {
        if (conn != null) {
            conn.close();
        }
        if (ps != null) {
            ps.close();
        }
        System.out.println("---close---");
    }
}

(5)整合MySQLSink

public class FlinkMainSink {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
        env.setParallelism(1);
        DataStreamSource<VideoOrderDO> source = env.addSource(new VideoOrderSource());
        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
                3, // 尝试重启的次数
                Time.of(10, TimeUnit.SECONDS) // 间隔
        ));
        source.print("接收的数据");
        source.addSink(new MysqlSink());
        //流程启动
        env.execute("custom sink job");
    }
}

(6)运行结果

19504c4f93dd4d44962a1f17af4558ef.jpg


ba4b80553c9140549dfb4ded76ec7701.jpg

5.3.自定义Sink连接Redis

(1)部署Redis环境

拉取镜像:docker pull redis
启动容器:docker run -d --name redis -p 6379:6379 redis --requirepass "123456"

(2)Flink怎么操作redis?

  • 方式一:自定义sink
  • 方式二:使用connector

(3)Redis Sink 核心是RedisMapper 是一个接口,使用时要编写自己的redis操作类实现这个接口中的三个方法

  • getCommandDescription 选择对应的数据结构和key名称配置
  • getKeyFromData 获取key
  • getValueFromData 获取value

(4)添加redis的connector依赖,使用connector整合redis

<dependency>
     <groupId>org.apache.bahir</groupId>
     <artifactId>flink-connector-redis_2.11</artifactId>
     <version>1.0</version>
</dependency>

(5)自定义RedisSink

/**
 * 定义泛型,就是要返回的类型
 * @author lixiang
 */
public class MyRedisSink implements RedisMapper<Tuple2<String,Integer>> {
    /**
     * 选择对应的数据结构,和key的名称
     * @return
     */
    @Override
    public RedisCommandDescription getCommandDescription() {
        return new RedisCommandDescription(RedisCommand.HSET,"VIDEO_ORDER_COUNTER");
    }
    /**
     * 返回key
     * @param value
     * @return
     */
    @Override
    public String getKeyFromData(Tuple2<String,Integer> value) {
        return value.f0;
    }
    /**
     * 返回value
     * @param value
     * @return
     */
    @Override
    public String getValueFromData(Tuple2<String,Integer> value) {
        return value.f1.toString();
    }
}

(5)编写Flink任务类

/**
 * @author lixiang
 */
public class FlinkRedisDemo {
    public static void main(String[] args) throws Exception {
        //构建执行任务环境以及任务的启动入口
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        //自己构建的数据源
        /*DataStream<VideoOrderDO> ds = env.fromElements(new VideoOrderDO(5, 32, "java", "2123143432", new Date()),
                new VideoOrderDO(5, 40, "spring", "2123143432", new Date()),
                new VideoOrderDO(5, 60, "springBoot", "2233143432", new Date()),
                new VideoOrderDO(5, 29, "springBoot", "2125643432", new Date()),
                new VideoOrderDO(5, 67, "docker", "2129843432", new Date()),
                new VideoOrderDO(5, 89, "java", "2120943432", new Date()));*/
        //使用自定义的source
        DataStream<VideoOrderDO> ds = env.addSource(new VideoOrderSource());
        //map转换。来一个记录一个,方便后续统计
        DataStream<Tuple2<String,Integer>> mapDS = ds.map(new MapFunction<VideoOrderDO, Tuple2<String,Integer>>() {
            @Override
            public Tuple2<String,Integer> map(VideoOrderDO value) throws Exception {
                return new Tuple2<>(value.getTitle(), 1);
            }
        });
        KeyedStream<Tuple2<String, Integer>, String> keyedStream = mapDS.keyBy(new KeySelector<Tuple2<String, Integer>, String>() {
            @Override
            public String getKey(Tuple2<String, Integer> value) throws Exception {
                return value.f0;
            }
        });
        DataStream<Tuple2<String, Integer>> sumDS = keyedStream.sum(1);
        //输出统计
        sumDS.print();
        FlinkJedisPoolConfig conf = new FlinkJedisPoolConfig.Builder().setHost("192.168.139.20").setPassword("123456").setPort(6379).build();
        sumDS.addSink(new RedisSink<>(conf,new MyRedisSink()));
        //DataStream需要调用execute,可以取这个名称
        env.execute("custom redis job");
    }
}
/**
 * 自定义的source
 * @author lixiang
 */
public class VideoOrderSource extends RichParallelSourceFunction<VideoOrderDO>
{
    private volatile Boolean flag = true;
    private Random random = new Random();
    private static List<String> list = new ArrayList<>();
    static
    {
        list.add("SpringBoot2.x");
        list.add("Linux");
        list.add("Flink");
        list.add("Kafka");
        list.add("SpringCloud");
        list.add("SpringBoot");
        list.add("Docker");
        list.add("Netty");
    }
    @Override
    public void run(SourceContext<VideoOrderDO> sourceContext) throws Exception {
        int x = 0;
        while (flag)
        {
            Thread.sleep(1000);
            String id = UUID.randomUUID().toString();
            int userId = random.nextInt(10);
            int money = random.nextInt(100);
            int videoNum = random.nextInt(list.size());
            String title = list.get(videoNum);
            String uuid = UUID.randomUUID().toString();
            sourceContext.collect(new VideoOrderDO(userId, money, title,uuid, new Date()));
        }
    }
    /**
     * 取消任务
     */
    @Override
    public void cancel()
    {
        flag = false;
    }
}

a7d640c2d2c74b7dbe254c9933d408ed.jpg

5.4.Flink整合KafkaConnetor

(1)Kafka环境搭建

  • 拉取镜像,部署容器
#zk部署
docker run -d --name zookeeper -p 2181:2181 -t wurstmeister/zookeeper
#kafka部署,换成自己的IP
docker run -d --name kafka \
-p 9092:9092 \
-e KAFKA_BROKER_ID=0 \
-e KAFKA_ZOOKEEPER_CONNECT=192.168.139.20:2181 \
-e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.139.20:9092 \
-e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 wurstmeister/kafka
  • 进入容器内部,创建Topic
#进入容器内部,创建topic
docker exec -it kafka /bin/bash
cd /opt/kafka
bin/kafka-topics.sh --create --zookeeper 192.168.139.20:2181 --replication-factor 1 --partitions 1 --topic test-topic
  • 生产者生产消息,消费者消费消息
#创建生产者发送消息
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test-topic
#运行一个消费者,注意--from-beginning从开头第一个开始消费
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test-topic --from-beginning

(2)Flink整合Kafka读取消息,发送消息

  • 之前自定义SourceFunction,Flink官方也有提供对接外部系统的,比如读取Kafka
  • flink官方提供的连接器
  • 添加依赖
<dependency>
     <groupId>org.apache.flink</groupId>
     <artifactId>flink-connector-kafka_${scala.version}</artifactId>
     <version>${flink.version}</version>
</dependency>
  • 编写Flink任务类
/**
 * @author lixiang
 */
public class FlinkKafka {
    public static void main(String[] args) throws Exception {
        //构建执行任务环境以及任务的启动入口
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        Properties props = new Properties();
        //kafka地址
        props.setProperty("bootstrap.servers", "192.168.139.20:9092");
        //组名
        props.setProperty("group.id", "video-order-group");
        //字符串序列化和反序列化规则
        props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        //offset重置规则
        props.setProperty("auto.offset.reset", "latest");
        //自动提交
        props.setProperty("enable.auto.commit", "true");
        props.setProperty("auto.commit.interval.ms", "2000");
        //有后台线程每隔10s检测一下Kafka的分区变化情况
        props.setProperty("flink.partition-discovery.interval-millis","10000");
        //监听test-topic发送的消息
        FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>("test-topic",new SimpleStringSchema(),props);
        consumer.setStartFromGroupOffsets();
        DataStream<String> consumerDS = env.addSource(consumer);
        consumerDS.print("test-topic接收的消息");
        //接到消息后,处理
        DataStream<String> dataStream = consumerDS.map(new MapFunction<String, String>() {
            @Override
            public String map(String value) throws Exception {
                return "新来一个订单课程:"+value;
            }
        });
    //处理后的消息发送到order-topic
        FlinkKafkaProducer<String> producer = new FlinkKafkaProducer<>("order-topic",new SimpleStringSchema(),props);
        dataStream.addSink(producer);
        env.execute("kafka job");
    }
}
  • 测试testtopic发送消息,order-topic消费消息
#创建生产者发送消息
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test-topic
#运行一个消费者
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic order-topic --from-beginning

0cb6f130f91c4d64b26cbaf5e47680f2.jpg

52d3f2a1b0814429ac175f39047b9e1a.jpg

8b0747ca405e4d10b7fc295788db8d2d.jpg

6.Flink常用算子Transformation

6.1.Map和FlatMap实战

(1)java里面的Map操作

  • 一对一转换对象
/**
 * @author lixiang
 * flink map算子demo
 */
public class FlinkMapDemo {
    public static void main(String[] args) throws Exception {
        //构建环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        DataStream<VideoOrderDO> streamSource = env.addSource(new VideoOrderSource());
        streamSource.print("处理前数据");
        DataStream<Tuple2<String,Integer>> dataStream = streamSource.map(new MapFunction<VideoOrderDO, Tuple2<String,Integer>>() {
            @Override
            public Tuple2<String, Integer> map(VideoOrderDO value) throws Exception {
                return new Tuple2<>(value.getTitle(),value.getMoney());
            }
        });
        dataStream.print("处理后");
        env.execute("map job");
    }
}

6758e3e159574b1ea6abc540223e227c.jpg

(2)java里面的FlatMap操作

  • 一对多转换对象
/**
 * @author lixiang
 * flatMap 算子demo
 */
public class FlinkFlatMapDemo {
    public static void main(String[] args) throws Exception {
        //构建环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        DataStream<String> ds = env.fromElements("java&35,spring&20,springboot&30", "springcloud&21,shiro&39,docker&56,linux&87", "netty&98,kafka&48");
        ds.print("处理前");
        DataStream<Tuple2<String,Integer>> out = ds.flatMap(new FlatMapFunction<String, Tuple2<String,Integer>>() {
            @Override
            public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
                String[] element = value.split(",");
                for (String s : element) {
                    String[] eles = s.split("&");
                    out.collect(new Tuple2<>(eles[0],Integer.parseInt(eles[1])));
                }
            }
        });
        out.print("处理后");
        env.execute("flatMap job");
    }
}


177975d5e71548e7828c2f67da8bab5c.jpg

6.2.RichMap和RichFlatMap实战

  • Rich相关的api多了open、close方法,用于初始化连接
  • RichXXX相关open、close、setRuntimeContext等Api方法会根据并行度进行操作的
  • 比如并行度是4,那就有4次触发对应的open、close方法等,是4个不同的subtask
  • 如:RichMapFunction、RichFlatMapFunction、RichSourceFunction等

(1)RichMap实战

/**
 * @author lixiang
 * flink map算子demo
 */
public class FlinkMapDemo {
    public static void main(String[] args) throws Exception {
        //构建环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        DataStream<VideoOrderDO> streamSource = env.addSource(new VideoOrderSource());
        streamSource.print("处理前数据");
        DataStream<Tuple2<String,Integer>> dataStream = streamSource.map(new RichMapFunction<VideoOrderDO, Tuple2<String,Integer>>() {
            @Override
            public Tuple2<String, Integer> map(VideoOrderDO value) throws Exception {
                return new Tuple2<>(value.getTitle(),value.getMoney());
            }
            @Override
            public void open(Configuration parameters) throws Exception {
                System.out.println("open方法执行");
            }
            @Override
            public void close() throws Exception {
                System.out.println("close方法执行");
            }
        });
        dataStream.print("处理后");
        env.execute("map job");
    }
}

b369277c5c594712bd234473a1290d2e.jpg

(2)RichFlatMapFunction实战

/**
 * @author lixiang
 * flatMap 算子demo
 */
public class FlinkFlatMapDemo {
    public static void main(String[] args) throws Exception {
        //构建环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        DataStream<String> ds = env.fromElements("java&35,spring&20,springboot&30", "springcloud&21,shiro&39,docker&56,linux&87", "netty&98,kafka&48");
        ds.print("处理前");
        DataStream<Tuple2<String,Integer>> out = ds.flatMap(new RichFlatMapFunction<String, Tuple2<String,Integer>>() {
            @Override
            public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
                String[] element = value.split(",");
                for (String s : element) {
                    String[] eles = s.split("&");
                    out.collect(new Tuple2<>(eles[0],Integer.parseInt(eles[1])));
                }
            }
            @Override
            public void open(Configuration parameters) throws Exception {
                System.out.println("open方法执行");
            }
            @Override
            public void close() throws Exception {
                System.out.println("close方法执行");
            }
        });
        out.print("处理后");
        env.execute("flatMap job");
    }
}

7298c174292747fd8488bf0a2928c284.jpg

6.3.KeyBy分组实战

  • KeyBy分组是把数据流按照某个字段分区,指定字段相同的数据放在同个组中,在进行组内统计
/**
 * @author lixiang
 * keyBy 算子demo
 */
public class FlinkKeyByDemo {
    public static void main(String[] args) throws Exception {
        //构建环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        DataStream<VideoOrderDO> dataStream = env.addSource(new VideoOrderSource());
        //根据title进行分组
        KeyedStream<VideoOrderDO,String> keyedStream = dataStream.keyBy(new KeySelector<VideoOrderDO, String>() {
            @Override
            public String getKey(VideoOrderDO value) throws Exception {
                return value.getTitle();
            }
        });
        //分组后将相同标题的money进行累加
        SingleOutputStreamOperator<VideoOrderDO> sumDS = keyedStream.sum("money");
        //map转换
        DataStream<Tuple2<String, Integer>> outputStreamOperator = sumDS.map(new MapFunction<VideoOrderDO, Tuple2<String,Integer>>() {
            @Override
            public Tuple2<String, Integer> map(VideoOrderDO value) throws Exception {
                return new Tuple2<>(value.getTitle(),value.getMoney());
            }
        });
        outputStreamOperator.print();
        env.execute("keyBy job");
    }
}

cb49e779bd3d4fc19b2971c19b4ee1f7.jpg

6.4.filter和sum实战

  • filter过滤算子
  • sum求和算子
/**
 * @author lixiang
 * flink filter算子demo
 * 先过滤money大于30的,然后根据标题进行分组,然后求每组money总和,最后map转换
 */
public class FlinkFliterDemo {
    public static void main(String[] args) throws Exception {
        //构建执行任务环境以及任务的启动的入口, 存储全局相关的参数
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
        env.setParallelism(1);
        DataStreamSource<VideoOrderDO> ds = env.addSource(new VideoOrderSource());
        DataStream<Tuple2<String,Integer>> out = ds.filter(new FilterFunction<VideoOrderDO>() {
            @Override
            public boolean filter(VideoOrderDO value) throws Exception {
                return value.getMoney()>30;
            }
        }).keyBy(new KeySelector<VideoOrderDO, String>() {
            @Override
            public String getKey(VideoOrderDO value) throws Exception {
                return value.getTitle();
            }
        }).sum("money").map(new MapFunction<VideoOrderDO, Tuple2<String,Integer>>() {
            @Override
            public Tuple2<String, Integer> map(VideoOrderDO value) throws Exception {
                return new Tuple2<>(value.getTitle(), value.getMoney());
            }
        });
        out.print();
        env.execute("filter sum job");
    }
}

744a13f977e84f9eb26d697d9523f500.jpg

6.5.reduce聚合实战

  • reduce函数
  • keyBy分组后聚合统计sum和reduce实现一样的效果
  • reduce和sum区别
  • sum(“xxx”)使用的时候,如果是tuple元组则用序号,POJO则用属性名称
  • keyBy分组后聚合统计sum和reduce实现一样的效果
  • sum是简单聚合,reduce是可以自定义聚合,aggregate支持复杂的自定义聚合
/**
 * @author lixiang
 * reduce 算子demo
 */
public class FlinkReduceDemo {
    public static void main(String[] args) throws Exception {
        //构建执行任务环境以及任务的启动的入口, 存储全局相关的参数
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
        env.setParallelism(1);
        DataStreamSource<VideoOrderDO> ds = env.addSource(new VideoOrderSource());
        SingleOutputStreamOperator<Tuple2<String,Integer>> reduce = ds.keyBy(new KeySelector<VideoOrderDO, String>() {
            @Override
            public String getKey(VideoOrderDO value) throws Exception {
                return value.getTitle();
            }
        }).reduce(new AggregationFunction<VideoOrderDO>() {
            //value1是历史对象,value2是加入统计的对象,所以value1.f1是历史值,value2.f1是新值,不断累加
            @Override
            public VideoOrderDO reduce(VideoOrderDO value1, VideoOrderDO value2) throws Exception {
                value1.setMoney(value1.getMoney() + value2.getMoney());
                return value1;
            }
        }).map(new MapFunction<VideoOrderDO, Tuple2<String,Integer>>() {
            @Override
            public Tuple2<String,Integer> map(VideoOrderDO value) throws Exception {
                return new Tuple2<>(value.getTitle(),value.getMoney());
            }
        });
        reduce.print();
        env.execute("reduce job");
    }
}

b67a8a9cbd3e41f8b369d0e014fffb9e.jpg

6.6.maxBy-max-minBy-min实战

  • 如果是用了keyBy,在后续算子要用maxBy,minBy类型,才可以再分组里面找对应的数据
  • 如果用max、min等,就不确定是哪个key中选了
  • 如果是keyBy的是对象的某个属性,则分组用max/min聚合统计,只有聚合的字段会更新,其他字段还是旧的,导致对象不准确
  • 需要用maxBy/minBy才对让整个对象的属性都是最新的
  • max、min出现的问题
/**
 * @author lixiang
 * maxBy-max-minBy-min的使用
 */
public class FlinkMinMaxDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
        //env.setParallelism(1);
        DataStream<VideoOrderDO> ds = env.fromElements(new VideoOrderDO(5, 32, "java", "2123143432", new Date()),
                new VideoOrderDO(25, 40, "spring", "2123143432", new Date()),
                new VideoOrderDO(45, 60, "springBoot", "2233143432", new Date()),
                new VideoOrderDO(15, 29, "springBoot", "2125643432", new Date()),
                new VideoOrderDO(54, 67, "java", "2129843432", new Date()),
                new VideoOrderDO(59, 89, "java", "2120943432", new Date()));
        SingleOutputStreamOperator<VideoOrderDO> out = ds.keyBy(new KeySelector<VideoOrderDO, String>() {
            @Override
            public String getKey(VideoOrderDO value) throws Exception {
                return value.getTitle();
            }
        }).max("money");
        out.print();
        env.execute("max job");
    }
}

e67e55bee01540b3a4766be5542b4061.jpg

  • maxBy、minBy就不会出现这种问题
/**
 * @author lixiang
 * maxBy-max-minBy-min的使用
 */
public class FlinkMinMaxDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
        //env.setParallelism(1);
        DataStream<VideoOrderDO> ds = env.fromElements(new VideoOrderDO(5, 32, "java", "2123143432", new Date()),
                new VideoOrderDO(25, 40, "spring", "2123143432", new Date()),
                new VideoOrderDO(45, 60, "springBoot", "2233143432", new Date()),
                new VideoOrderDO(15, 29, "springBoot", "2125643432", new Date()),
                new VideoOrderDO(54, 67, "java", "2129843432", new Date()),
                new VideoOrderDO(59, 89, "java", "2120943432", new Date()));
        SingleOutputStreamOperator<VideoOrderDO> out = ds.keyBy(new KeySelector<VideoOrderDO, String>() {
            @Override
            public String getKey(VideoOrderDO value) throws Exception {
                return value.getTitle();
            }
        }).maxBy("money");
        out.print();
        env.execute("max job");
    }
}

d30dc4e94b074d0fa7d9ec8fee7f735d.jpg

7.Flink滑动-滚动时间窗和触发器

7.1.Window窗口介绍和应用

  • 背景
  • 数据流是一直源源不断产生,业务需要聚合统计使用,比如每10s统计过去5分钟的点击量、成交额等。

  • Windows就可以将无限的数据流拆分成有限大小的桶"buckets",然后程序可以对其窗口内的数据进行计算
  • 窗口认为是bucket桶,一个窗口段就是一个桶,比如8到9点是一个桶,9到10点是一个桶
  • 分类
  • time Window时间窗口,即按照一定时间规则作为窗口统计
  • time-sliding-window时间滑动窗口
  • time-tumbing-window时间滚动窗口
  • session Window会话窗口,即一个会话内的数据进行统计
  • count Window数量窗口,即按照一定的数量作为窗口统计

(1)窗口属性

  • 滑动窗口 Sliding Windows
  • 窗口具有固定大小
  • 窗口数据有重叠
  • 例子:每10s统计一次最近1min内的订单数量

e8083055efda44889fd629cbc7546ada.jpg

  • 滚动窗口 Tumbling Windows
  • 窗口具有固定大小
  • 窗口数据不重叠
  • 例子:每10s统计一次最近10s内的订单数量

0d8af6bd47704b419598d9d3e9787ad4.jpg

(2)窗口大小size 和 滑动间隔 slide

  • tumbling-window:滚动窗口: size=slide,如:每隔10s统计最近10s的数据
  • sliding-window:滑动窗口: size>slide,如:每隔5s统计最近10s的数据

7.2.Window窗口API和使用流程

(1)什么情况下才可以使用WindowAPI

  • 有keyBy用window()api
  • 没keyBy用windowAll()api,并行度低

939d422cbcaf4d42a92d0bb3e843bb08.jpg

  • 一个窗口内 的是左闭右开
  • countWindow没过期,但timeWindow在1.12过期,统一使用window;

(2)窗口分配器Window Assigners

  • 定义了如何将元素分配给窗口,负责将每条数据分发到正确的window窗口上
  • window()的参数是一个WindowAssigner,flink本身提供了Tumbling、Sliding等Assigner
  • (3)窗口触发器trigger
  • 用来控制一个窗口是否被触发
  • 每个窗口分配器WindowAssigner都有一个默认的触发器,也支持自定义触发器
  • (4)窗口window function,对窗口内的数据操作
  • 增量聚合函数
aggregate(agg函数,WindowFunction(){  })
  • 窗口保存临时数据,每进入一个新数据,会与中间数据累加,生成新的中间数据,再保存到窗口中
  • 常见的增量聚合函数有 reduceFunction、aggregateFunction
  • min、max、sum 都是简单的聚合操作,不需要自定义规则
AggregateFunction<IN, ACC, OUT>
IN是输入类型,ACC是中间聚合状态类型,OUT是输出类型,是聚合统计当前窗口的数据
  • 全窗口函数
apply(new processWindowFunction(){})
  • 窗口先缓存该窗口所有元素,等窗口的全部数据收集起来后再触发条件计算
  • 常见的全窗口聚合函数 windowFunction(未来可能弃用)、processWindowFunction(可以获取到窗口上下文 更多信息)
IN是输入类型,OUT是输出类型,KEY是分组类型,W是时间窗 
WindowFunction<IN, OUT, KEY, W extends Window>

如果想处理每个元素更底层的API的时候用

//对数据进行解析 ,process对每个元素进行处理,相当于 map+flatMap+filter
process(new KeyedProcessFunction(){processElement、onTimer})

7.3.Tumbling-Window滚动时间窗

  • 滚动窗口 Tumbling Windows
  • 窗口具有固定大小
  • 窗口数据不重叠
  • 比如指定了一个5分钟大小的滚动窗口,无限流的数据会根据时间划分为[0:00, 0:05)、[0:05, 0:10)、[0:10, 0:15)等窗口
  • 代码实战
/**
 * @author lixiang
 * Tumbling-Window滚动窗口
 */
public class FlinkTumblingDemo {
    public static void main(String[] args) throws Exception {
        //构建环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
        DataStream<VideoOrderDO> ds = env.addSource(new VideoOrderSource());
        KeyedStream<VideoOrderDO, String> keyedStream = ds.keyBy(new KeySelector<VideoOrderDO, String>() {
            @Override
            public String getKey(VideoOrderDO value) throws Exception {
                return value.getTitle();
            }
        });
        SingleOutputStreamOperator<Map<String, Object>> map = keyedStream.window(TumblingProcessingTimeWindows.of(Time.seconds(5))).sum("money").map(new MapFunction<VideoOrderDO, Map<String, Object>>() {
            @Override
            public Map<String, Object> map(VideoOrderDO value) throws Exception {
                Map<String, Object> map = new HashMap<>();
                map.put("title", value.getTitle());
                map.put("money", value.getMoney());
                map.put("createDate", TimeUtil.toDate(value.getCreateTime()));
                return map;
            }
        });
        map.print();
        env.execute("Tumbling Window job");
    }
}

2422e700faf04d14bfa15c60b34f9833.jpg

7.4.Sliding-Window滑动时间窗

  • 滑动窗口 Sliding Windows
  • 窗口具有固定大小
  • 窗口数据有重叠
  • 例子:每5s统计一次最近20s内的订单数量
/**
 * @author lixiang
 * Tumbling-Window滚动窗口
 */
public class FlinkSlidingDemo {
    public static void main(String[] args) throws Exception {
        //构建环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
        DataStream<VideoOrderDO> ds = env.addSource(new VideoOrderSource());
        KeyedStream<VideoOrderDO, String> keyedStream = ds.keyBy(new KeySelector<VideoOrderDO, String>() {
            @Override
            public String getKey(VideoOrderDO value) throws Exception {
                return value.getTitle();
            }
        });
        //每5s去统计过去20s的数据
        SingleOutputStreamOperator<Map<String, Object>> map = keyedStream.window(SlidingProcessingTimeWindows.of(Time.seconds(20),Time.seconds(5))).sum("money").map(new MapFunction<VideoOrderDO, Map<String, Object>>() {
            @Override
            public Map<String, Object> map(VideoOrderDO value) throws Exception {
                Map<String, Object> map = new HashMap<>();
                map.put("title", value.getTitle());
                map.put("money", value.getMoney());
                map.put("createDate", TimeUtil.toDate(value.getCreateTime()));
                return map;
            }
        });
        map.print();
        env.execute("Sliding Window job");
    }
}

ad0b16ac75384b5c935a2f3e37defda6.jpg

7.5.Count-Window数量窗口

  • 基于数量的滚动窗口, 滑动计数窗口
  • 统计分组后同个key内的数据超过5次则进行统计 countWindow(5)
/**
 * @author lixiang
 * Tumbling-Window滚动窗口
 */
public class FlinkWindow1Demo {
    public static void main(String[] args) throws Exception {
        //构建环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
        DataStream<VideoOrderDO> ds = env.addSource(new VideoOrderSource());
        KeyedStream<VideoOrderDO, String> keyedStream = ds.keyBy(new KeySelector<VideoOrderDO, String>() {
            @Override
            public String getKey(VideoOrderDO value) throws Exception {
                return value.getTitle();
            }
        });
        SingleOutputStreamOperator<Map<String, Object>> map = keyedStream.countWindow(5).sum("money").map(new MapFunction<VideoOrderDO, Map<String, Object>>() {
            @Override
            public Map<String, Object> map(VideoOrderDO value) throws Exception {
                Map<String, Object> map = new HashMap<>();
                map.put("title", value.getTitle());
                map.put("money", value.getMoney());
                map.put("createDate", TimeUtil.toDate(value.getCreateTime()));
                return map;
            }
        });
        map.print();
        env.execute("Count Window job");
    }
}


6a04fb68607a4f1398e7c6103c451e59.jpg

  • 只要有2个数据到达后就可以往后统计5个数据的值, countWindow(5, 2)
/**
 * @author lixiang
 * Tumbling-Window滚动窗口
 */
public class FlinkWindow1Demo {
    public static void main(String[] args) throws Exception {
        //构建环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
        DataStream<VideoOrderDO> ds = env.addSource(new VideoOrderSource());
        KeyedStream<VideoOrderDO, String> keyedStream = ds.keyBy(new KeySelector<VideoOrderDO, String>() {
            @Override
            public String getKey(VideoOrderDO value) throws Exception {
                return value.getTitle();
            }
        });
        SingleOutputStreamOperator<Map<String, Object>> map = keyedStream.countWindow(5,2).sum("money").map(new MapFunction<VideoOrderDO, Map<String, Object>>() {
            @Override
            public Map<String, Object> map(VideoOrderDO value) throws Exception {
                Map<String, Object> map = new HashMap<>();
                map.put("title", value.getTitle());
                map.put("money", value.getMoney());
                map.put("createDate", TimeUtil.toDate(value.getCreateTime()));
                return map;
            }
        });
        map.print();
        env.execute("Count Window job");
    }
}


110bc0cf16f64fd2aeb9a92360214e78.jpg

相关文章
|
2月前
|
人工智能 数据处理 API
阿里云、Ververica、Confluent 与 LinkedIn 携手推进流式创新,共筑基于 Apache Flink Agents 的智能体 AI 未来
Apache Flink Agents 是由阿里云、Ververica、Confluent 与 LinkedIn 联合推出的开源子项目,旨在基于 Flink 构建可扩展、事件驱动的生产级 AI 智能体框架,实现数据与智能的实时融合。
393 6
阿里云、Ververica、Confluent 与 LinkedIn 携手推进流式创新,共筑基于 Apache Flink Agents 的智能体 AI 未来
|
8月前
|
存储 消息中间件 Kafka
基于 Flink 的中国电信星海时空数据多引擎实时改造
本文整理自中国电信集团大数据架构师李新虎老师在Flink Forward Asia 2024的分享,围绕星海时空智能系统展开,涵盖四个核心部分:时空数据现状、实时场景多引擎化、典型应用及未来展望。系统日处理8000亿条数据,具备亚米级定位能力,通过Flink多引擎架构解决数据膨胀与响应时效等问题,优化资源利用并提升计算效率。应用场景包括运动状态识别、个体行为分析和群智感知,未来将推进湖仓一体改造与三维时空服务体系建设,助力数字化转型与智慧城市建设。
837 3
基于 Flink 的中国电信星海时空数据多引擎实时改造
|
9月前
|
消息中间件 关系型数据库 MySQL
基于 Flink CDC YAML 的 MySQL 到 Kafka 流式数据集成
基于 Flink CDC YAML 的 MySQL 到 Kafka 流式数据集成
946 0
|
分布式计算 大数据 Apache
ClickHouse与大数据生态集成:Spark & Flink 实战
【10月更文挑战第26天】在当今这个数据爆炸的时代,能够高效地处理和分析海量数据成为了企业和组织提升竞争力的关键。作为一款高性能的列式数据库系统,ClickHouse 在大数据分析领域展现出了卓越的能力。然而,为了充分利用ClickHouse的优势,将其与现有的大数据处理框架(如Apache Spark和Apache Flink)进行集成变得尤为重要。本文将从我个人的角度出发,探讨如何通过这些技术的结合,实现对大规模数据的实时处理和分析。
920 2
ClickHouse与大数据生态集成:Spark & Flink 实战
|
存储 分布式计算 流计算
实时计算 Flash – 兼容 Flink 的新一代向量化流计算引擎
本文介绍了阿里云开源大数据团队在实时计算领域的最新成果——向量化流计算引擎Flash。文章主要内容包括:Apache Flink 成为业界流计算标准、Flash 核心技术解读、性能测试数据以及在阿里巴巴集团的落地效果。Flash 是一款完全兼容 Apache Flink 的新一代流计算引擎,通过向量化技术和 C++ 实现,大幅提升了性能和成本效益。
3681 73
实时计算 Flash – 兼容 Flink 的新一代向量化流计算引擎
|
10月前
|
消息中间件 关系型数据库 MySQL
基于 Flink CDC YAML 的 MySQL 到 Kafka 流式数据集成
本教程展示如何使用Flink CDC YAML快速构建从MySQL到Kafka的流式数据集成作业,涵盖整库同步和表结构变更同步。无需编写Java/Scala代码或安装IDE,所有操作在Flink CDC CLI中完成。首先准备Flink Standalone集群和Docker环境(包括MySQL、Kafka和Zookeeper),然后通过配置YAML文件提交任务,实现数据同步。教程还介绍了路由变更、写入多个分区、输出格式设置及上游表名到下游Topic的映射等功能,并提供详细的命令和示例。最后,包含环境清理步骤以确保资源释放。
766 2
基于 Flink CDC YAML 的 MySQL 到 Kafka 流式数据集成
|
11月前
|
SQL 存储 大数据
Flink 基础详解:大数据处理的强大引擎
Apache Flink 是一个分布式流批一体化的开源平台,专为大规模数据处理设计。它支持实时流处理和批处理,具有高吞吐量、低延迟特性。Flink 提供统一的编程抽象,简化大数据应用开发,并在流处理方面表现卓越,广泛应用于实时监控、金融交易分析等场景。其架构包括 JobManager、TaskManager 和 Client,支持并行度、水位线、时间语义等基础属性。Flink 还提供了丰富的算子、状态管理和容错机制,如检查点和 Savepoint,确保作业的可靠性和一致性。此外,Flink 支持 SQL 查询和 CDC 功能,实现实时数据捕获与同步,广泛应用于数据仓库和实时数据分析领域。
7341 32
zdl
|
消息中间件 运维 大数据
大数据实时计算产品的对比测评:实时计算Flink版 VS 自建Flink集群
本文介绍了实时计算Flink版与自建Flink集群的对比,涵盖部署成本、性能表现、易用性和企业级能力等方面。实时计算Flink版作为全托管服务,显著降低了运维成本,提供了强大的集成能力和弹性扩展,特别适合中小型团队和业务波动大的场景。文中还提出了改进建议,并探讨了与其他产品的联动可能性。总结指出,实时计算Flink版在简化运维、降低成本和提升易用性方面表现出色,是大数据实时计算的优选方案。
zdl
548 56
|
SQL 流计算 关系型数据库
基于OpenLake的Flink+Paimon+EMR StarRocks流式湖仓分析
阿里云OpenLake解决方案建立在开放可控的OpenLake湖仓之上,提供大数据搜索与AI一体化服务。通过元数据管理平台DLF管理结构化、半结构化和非结构化数据,提供湖仓数据表和文件的安全访问及IO加速,并支持大数据、搜索和AI多引擎对接。本文为您介绍以Flink作为Openlake方案的核心计算引擎,通过流式数据湖仓Paimon(使用DLF 2.0存储)和EMR StarRocks搭建流式湖仓。
1079 5
基于OpenLake的Flink+Paimon+EMR StarRocks流式湖仓分析
|
分布式计算 大数据 OLAP
AnalyticDB与大数据生态集成:Spark & Flink
【10月更文挑战第25天】在大数据时代,实时数据处理和分析变得越来越重要。AnalyticDB(ADB)是阿里云推出的一款完全托管的实时数据仓库服务,支持PB级数据的实时分析。为了充分发挥AnalyticDB的潜力,将其与大数据处理工具如Apache Spark和Apache Flink集成是非常必要的。本文将从我个人的角度出发,分享如何将AnalyticDB与Spark和Flink集成,构建端到端的大数据处理流水线,实现数据的实时分析和处理。
348 1