Flink教程(09)- Flink批流一体API(Connectors示例)

本文涉及的产品
Redis 开源版,标准版 2GB
推荐场景:
搭建游戏排行榜
实时计算 Flink 版,5000CU*H 3个月
云数据库 Tair(兼容Redis),内存型 2GB
简介: Flink教程(09)- Flink批流一体API(Connectors示例)

01 引言

在前面的博客,我们已经对Flink的程序模型里的Sink使用有了一定的了解了,有兴趣的同学可以参阅下:

在前面的章节,我们知道了一些比较基本的 Source 和 Sink 已经内置在 Flink 里。

  • 预定义 data sources :支持从文件、目录、socket,以及 collectionsiterators 中读取数据。
  • 预定义 data sinks :支持把数据写入文件、标准输出(stdout)、标准错误输出(stderr)和 socket

而连接器(Connectors)可以和多种多样的第三方系统进行交互,本文来讲解下。

02 Connectors

Connectors连接器参考:https://nightlies.apache.org/flink/flink-docs-release-1.12/zh/dev/connectors/

2.1 Flink目前支持的Connectors

Flink目前支持以下系统:

在使用一种连接器时,通常需要额外的第三方组件,比如:数据存储服务器或者消息队列,要注意这些列举的连接器是Flink 工程的一部分,包含在发布的源码中,但是不包含在二进制发行版中。

Flink 还有些一些额外的连接器通过 Apache Bahir 发布, 包括:

系统 使用地方
Apache ActiveMQ source/sink
Apache Flume sink
Redis sink
Akka sink
Netty source

2.2 JDBC案例

参考:https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/connectors/jdbc.html

代码如下:

/**
 * Connectors -JDBC
 *
 * @author : YangLinWei
 * @createTime: 2022/3/7 4:59 下午
 */
public class ConnectorsDemo {
    public static void main(String[] args) throws Exception {
        //1.env
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //2.Source
        env.fromElements(new Student(null, "tonyma111", 18))
                //3.Transformation
                //4.Sink
                .addSink(JdbcSink.sink(
                        "INSERT INTO `t_student` (`id`, `name`, `age`) VALUES (null, ?, ?)",
                        (ps, s) -> {
                            ps.setString(1, s.getName());
                            ps.setInt(2, s.getAge());
                        },
                        new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
                                .withUrl("jdbc:mysql://localhost:3306/big_data")
                                .withUsername("root")
                                .withPassword("123456")
                                .withDriverName("com.mysql.jdbc.Driver")
                                .build()));
        //5.execute
        env.execute();
    }
    @Data
    @NoArgsConstructor
    @AllArgsConstructor
    public static class Student {
        private Integer id;
        private String name;
        private Integer age;
    }
}

运行结果:

2.3 Kafa案例

参考:https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html

--------------Flink 里已经提供了一些绑定的Connector,例如kafka sourcesinkEs sink等。读写 kafkaesrabbitMQ时可以直接使用相应connectorapi即可,虽然该部分是 Flink项目源代码里的一部分,但是真正意义上不算作Flink引擎相关逻辑,并且该部分没有打包在二进制的发布包里面。所以在提交Job 时候需要注意, job代码jar包中一定要将相应的connetor相关类打包进去,否则在提交作业时就会失败,提示找不到相应的类,或初始化某些类异常

2.3.1 Kafa相关命令

● 查看当前服务器中的所有topic:

/export/server/kafka/bin/kafka-topics.sh --list --zookeeper  node1:2181
• 1

● 创建topic

/export/server/kafka/bin/kafka-topics.sh --create --zookeeper node1:2181 --replication-factor 2 --partitions 3 --topic flink_kafka
• 1

● 查看某个Topic的详情

/export/server/kafka/bin/kafka-topics.sh --topic flink_kafka --describe --zookeeper node1:2181
• 1

● 删除topic

/export/server/kafka/bin/kafka-topics.sh --delete --zookeeper node1:2181 --topic flink_kafka
• 1

● 通过shell命令发送消息

/export/server/kafka/bin/kafka-console-producer.sh --broker-list node1:9092 --topic flink_kafka
• 1

● 通过shell消费消息

/export/server/kafka/bin/kafka-console-consumer.sh --bootstrap-server node1:9092 --topic flink_kafka --from-beginning 
• 1

● 修改分区

/export/server/kafka/bin/kafka-topics.sh --alter --partitions 4 --topic flink_kafka --zookeeper node1:2181
• 1

2.3.2 Kafka Consumer代码

需求:使用flink-connector-kafka_2.12中的FlinkKafkaConsumer消费Kafka中的数据做WordCount

需要设置如下参数:

  • 1.订阅的主题
  • 2.反序列化规则
  • 3.消费者属性-集群地址
  • 4.消费者属性-消费者组id(如果不设置,会有默认的,但是默认的不方便管理)
  • 5.消费者属性-offset重置规则,如earliest/latest…
  • 6.动态分区检测(当kafka的分区数变化/增加时,Flink能够检测到!)
  • 7.如果没有设置Checkpoint,那么可以设置自动提交offset,后续学习了Checkpoint会把offset随着做Checkpoint的时候提交到Checkpoint和默认主题中
/**
 * KafkaConsumer
 *
 * @author : YangLinWei
 * @createTime: 2022/3/7 5:17 下午
 */
public class KafkaConsumer {
    public static void main(String[] args) throws Exception {
        //1.env
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //2.Source
        Properties props = new Properties();
        props.setProperty("bootstrap.servers", "node1:9092");
        props.setProperty("group.id", "flink");
        props.setProperty("auto.offset.reset", "latest");
        props.setProperty("flink.partition-discovery.interval-millis", "5000");//会开启一个后台线程每隔5s检测一下Kafka的分区情况
        props.setProperty("enable.auto.commit", "true");
        props.setProperty("auto.commit.interval.ms", "2000");
        //kafkaSource就是KafkaConsumer
        FlinkKafkaConsumer<String> kafkaSource = new FlinkKafkaConsumer<>("flink_kafka", new SimpleStringSchema(), props);
        kafkaSource.setStartFromGroupOffsets();//设置从记录的offset开始消费,如果没有记录从auto.offset.reset配置开始消费
        //kafkaSource.setStartFromEarliest();//设置直接从Earliest消费,和auto.offset.reset配置无关
        DataStreamSource<String> kafkaDS = env.addSource(kafkaSource);
        //3.Transformation
        //3.1切割并记为1
        SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOneDS = kafkaDS.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
                String[] words = value.split(" ");
                for (String word : words) {
                    out.collect(Tuple2.of(word, 1));
                }
            }
        });
        //3.2分组
        KeyedStream<Tuple2<String, Integer>, Tuple> groupedDS = wordAndOneDS.keyBy(0);
        //3.3聚合
        SingleOutputStreamOperator<Tuple2<String, Integer>> result = groupedDS.sum(1);
        //4.Sink
        result.print();
        //5.execute
        env.execute();
    }
}

2.3.3 Kafka Producer代码

需求:将Flink集合中的数据通过自定义Sink保存到Kafka

/**
 * KafkaProducer
 *
 * @author : YangLinWei
 * @createTime: 2022/3/7 5:19 下午
 */
public class KafkaProducer {
    public static void main(String[] args) throws Exception {
        //1.env
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //2.Source
        DataStreamSource<Student> studentDS = env.fromElements(new Student(1, "tonyma", 18));
        //3.Transformation
        //注意:目前来说我们使用Kafka使用的序列化和反序列化都是直接使用最简单的字符串,所以先将Student转为字符串
        //可以直接调用Student的toString,也可以转为JSON
        SingleOutputStreamOperator<String> jsonDS = studentDS.map(new MapFunction<Student, String>() {
            @Override
            public String map(Student value) throws Exception {
                //String str = value.toString();
                String jsonStr = JSON.toJSONString(value);
                return jsonStr;
            }
        });
        //4.Sink
        jsonDS.print();
        //根据参数创建KafkaProducer/KafkaSink
        Properties props = new Properties();
        props.setProperty("bootstrap.servers", "node1:9092");
        FlinkKafkaProducer<String> kafkaSink = new FlinkKafkaProducer<>("flink_kafka", new SimpleStringSchema(), props);
        jsonDS.addSink(kafkaSink);
        //5.execute
        env.execute();
        // /export/server/kafka/bin/kafka-console-consumer.sh --bootstrap-server node1:9092 --topic flink_kafka
    }
    @Data
    @NoArgsConstructor
    @AllArgsConstructor
    public static class Student {
        private Integer id;
        private String name;
        private Integer age;
    }
}

2.4 Redis案例

参考:https://bahir.apache.org/docs/flink/current/flink-streaming-redis/

2.4.1 相关API

通过flink 操作redis 其实我们可以通过传统的redis 连接池Jpoools 进行redis 的相关操作,但是flink提供了专门操作redisRedisSink,使用起来更方便,而且不用我们考虑性能的问题,接下来将主要介绍RedisSink 如何使用。

RedisSink 核心类是RedisMapper是一个接口,使用时我们要编写自己的redis操作类实现这个接口中的三个方法,如下所示:

  • getCommandDescription() : 设置使用的redis 数据结构类型,和key 的名称,通过RedisCommand 设置数据结构类型
  • String getKeyFromData(T data):设置value 中的键值对key的值
  • String getValueFromData(T data):设置value 中的键值对value的值

使用RedisCommand设置数据结构类型时和redis结构对应关系:

Data Type Redis Command [Sink]
HASH HSET
LIST RPUSH, LPUSH
SET SADD
PUBSUB PUBLISH
STRING SET
HYPER_LOG_LOG PFADD
SORTED_SET ZADD
SORTED_SET ZADD
SORTED_SET ZREM

2.4.2 示例代码

需求:将Flink集合中的数据通过自定义Sink保存到Redis

代码如下:

/**
 * Connector-Redis
 *
 * @author : YangLinWei
 * @createTime: 2022/3/7 5:25 下午
 */
public class ConnectorsDemoRedis {
    public static void main(String[] args) throws Exception {
        //1.env
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //2.Source
        DataStream<String> linesDS = env.socketTextStream("node1", 9999);
        //3.Transformation
        //3.1切割并记为1
        SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOneDS = linesDS.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
                String[] words = value.split(" ");
                for (String word : words) {
                    out.collect(Tuple2.of(word, 1));
                }
            }
        });
        //3.2分组
        KeyedStream<Tuple2<String, Integer>, Tuple> groupedDS = wordAndOneDS.keyBy(0);
        //3.3聚合
        SingleOutputStreamOperator<Tuple2<String, Integer>> result = groupedDS.sum(1);
        //4.Sink
        result.print();
        // * 最后将结果保存到Redis
        // * 注意:存储到Redis的数据结构:使用hash也就是map
        // * key            value
        // * WordCount      (单词,数量)
        //-1.创建RedisSink之前需要创建RedisConfig
        //连接单机版Redis
        FlinkJedisPoolConfig conf = new FlinkJedisPoolConfig.Builder().setHost("127.0.0.1").build();
        //连接集群版Redis
        //HashSet<InetSocketAddress> nodes = new HashSet<>(Arrays.asList(new InetSocketAddress(InetAddress.getByName("node1"), 6379),new InetSocketAddress(InetAddress.getByName("node2"), 6379),new InetSocketAddress(InetAddress.getByName("node3"), 6379)));
        //FlinkJedisClusterConfig conf2 = new FlinkJedisClusterConfig.Builder().setNodes(nodes).build();
        //连接哨兵版Redis
        //Set<String> sentinels = new HashSet<>(Arrays.asList("node1:26379", "node2:26379", "node3:26379"));
        //FlinkJedisSentinelConfig conf3 = new FlinkJedisSentinelConfig.Builder().setMasterName("mymaster").setSentinels(sentinels).build();
        //-3.创建并使用RedisSink
        result.addSink(new RedisSink<Tuple2<String, Integer>>(conf, new RedisWordCountMapper()));
        //5.execute
        env.execute();
    }
    /**
     * -2.定义一个Mapper用来指定存储到Redis中的数据结构
     */
    public static class RedisWordCountMapper implements RedisMapper<Tuple2<String, Integer>> {
        @Override
        public RedisCommandDescription getCommandDescription() {
            return new RedisCommandDescription(RedisCommand.HSET, "WordCount");
        }
        @Override
        public String getKeyFromData(Tuple2<String, Integer> data) {
            return data.f0;
        }
        @Override
        public String getValueFromData(Tuple2<String, Integer> data) {
            return data.f1.toString();
        }
    }
}

03 文末

本文主要讲解与Flink批流一体API相关的连接器Connectors(连接器),谢谢大家的阅读,本文完!

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
目录
相关文章
|
13天前
|
消息中间件 分布式计算 大数据
大数据-121 - Flink Time Watermark 详解 附带示例详解
大数据-121 - Flink Time Watermark 详解 附带示例详解
40 0
|
16天前
|
机器学习/深度学习 PyTorch 算法框架/工具
揭秘深度学习中的微调难题:如何运用弹性权重巩固(EWC)策略巧妙应对灾难性遗忘,附带实战代码详解助你轻松掌握技巧
【10月更文挑战第1天】深度学习中,模型微调虽能提升性能,但常导致“灾难性遗忘”,即模型在新任务上训练后遗忘旧知识。本文介绍弹性权重巩固(EWC)方法,通过在损失函数中加入正则项来惩罚对重要参数的更改,从而缓解此问题。提供了一个基于PyTorch的实现示例,展示如何在训练过程中引入EWC损失,适用于终身学习和在线学习等场景。
36 4
揭秘深度学习中的微调难题:如何运用弹性权重巩固(EWC)策略巧妙应对灾难性遗忘,附带实战代码详解助你轻松掌握技巧
|
5天前
|
JSON API 数据安全/隐私保护
拍立淘按图搜索json数据格式示例(API接口)
拍立淘按图搜索API接口为电商平台和购物应用提供了强大的图像搜索功能,能够显著提升用户的购物体验和搜索效率。开发者可以根据自己的需求调用此接口,并处理返回的JSON格式数据来展示推荐商品
|
15天前
|
API 微服务
Traefik 微服务 API 网关教程(全)
Traefik 微服务 API 网关教程(全)
|
29天前
|
JSON Go API
使用Go语言和Gin框架构建RESTful API:GET与POST请求示例
使用Go语言和Gin框架构建RESTful API:GET与POST请求示例
|
6天前
|
JSON API 数据格式
商品详情数据JSON格式示例参考(api接口)
JSON数据格式的商品详情数据通常包含商品的多个层级信息,以下是一个综合多个来源信息的JSON数据格式的商品详情数据示例参考:
|
6天前
|
存储 前端开发 API
Restful API 设计示例
Restful API 设计示例
13 0
|
1月前
|
API iOS开发 开发者
Snapchat API 访问:Objective-C 实现示例
Snapchat API 访问:Objective-C 实现示例
|
1月前
|
存储 JSON API
实战派教程!Python Web开发中RESTful API的设计哲学与实现技巧,一网打尽!
在数字化时代,Web API成为连接前后端及构建复杂应用的关键。RESTful API因简洁直观而广受欢迎。本文通过实战案例,介绍Python Web开发中的RESTful API设计哲学与技巧,包括使用Flask框架构建一个图书管理系统的API,涵盖资源定义、请求响应设计及实现示例。通过准确使用HTTP状态码、版本控制、错误处理及文档化等技巧,帮助你深入理解RESTful API的设计与实现。希望本文能助力你的API设计之旅。
56 3
|
2月前
|
JSON API 数据格式
python 使用 Stable Diffusion API 生成图片示例
本文提供了一个使用Python调用Stable Diffusion API生成图片的示例程序,包括启动API设置、发送POST请求、保存生成的图片和JSON数据,以及如何通过API调用特定模型的说明。
python 使用 Stable Diffusion API 生成图片示例