Flink教程(09)- Flink批流一体API(Connectors示例)

本文涉及的产品
Redis 开源版,标准版 2GB
推荐场景:
搭建游戏排行榜
实时计算 Flink 版,5000CU*H 3个月
云数据库 Tair(兼容Redis),内存型 2GB
简介: Flink教程(09)- Flink批流一体API(Connectors示例)

01 引言

在前面的博客,我们已经对Flink的程序模型里的Sink使用有了一定的了解了,有兴趣的同学可以参阅下:

在前面的章节,我们知道了一些比较基本的 Source 和 Sink 已经内置在 Flink 里。

  • 预定义 data sources :支持从文件、目录、socket,以及 collectionsiterators 中读取数据。
  • 预定义 data sinks :支持把数据写入文件、标准输出(stdout)、标准错误输出(stderr)和 socket

而连接器(Connectors)可以和多种多样的第三方系统进行交互,本文来讲解下。

02 Connectors

Connectors连接器参考:https://nightlies.apache.org/flink/flink-docs-release-1.12/zh/dev/connectors/

2.1 Flink目前支持的Connectors

Flink目前支持以下系统:

在使用一种连接器时,通常需要额外的第三方组件,比如:数据存储服务器或者消息队列,要注意这些列举的连接器是Flink 工程的一部分,包含在发布的源码中,但是不包含在二进制发行版中。

Flink 还有些一些额外的连接器通过 Apache Bahir 发布, 包括:

系统 使用地方
Apache ActiveMQ source/sink
Apache Flume sink
Redis sink
Akka sink
Netty source

2.2 JDBC案例

参考:https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/connectors/jdbc.html

代码如下:

/**
 * Connectors -JDBC
 *
 * @author : YangLinWei
 * @createTime: 2022/3/7 4:59 下午
 */
public class ConnectorsDemo {
    public static void main(String[] args) throws Exception {
        //1.env
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //2.Source
        env.fromElements(new Student(null, "tonyma111", 18))
                //3.Transformation
                //4.Sink
                .addSink(JdbcSink.sink(
                        "INSERT INTO `t_student` (`id`, `name`, `age`) VALUES (null, ?, ?)",
                        (ps, s) -> {
                            ps.setString(1, s.getName());
                            ps.setInt(2, s.getAge());
                        },
                        new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
                                .withUrl("jdbc:mysql://localhost:3306/big_data")
                                .withUsername("root")
                                .withPassword("123456")
                                .withDriverName("com.mysql.jdbc.Driver")
                                .build()));
        //5.execute
        env.execute();
    }
    @Data
    @NoArgsConstructor
    @AllArgsConstructor
    public static class Student {
        private Integer id;
        private String name;
        private Integer age;
    }
}

运行结果:

2.3 Kafa案例

参考:https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html

--------------Flink 里已经提供了一些绑定的Connector,例如kafka sourcesinkEs sink等。读写 kafkaesrabbitMQ时可以直接使用相应connectorapi即可,虽然该部分是 Flink项目源代码里的一部分,但是真正意义上不算作Flink引擎相关逻辑,并且该部分没有打包在二进制的发布包里面。所以在提交Job 时候需要注意, job代码jar包中一定要将相应的connetor相关类打包进去,否则在提交作业时就会失败,提示找不到相应的类,或初始化某些类异常

2.3.1 Kafa相关命令

● 查看当前服务器中的所有topic:

/export/server/kafka/bin/kafka-topics.sh --list --zookeeper  node1:2181
• 1

● 创建topic

/export/server/kafka/bin/kafka-topics.sh --create --zookeeper node1:2181 --replication-factor 2 --partitions 3 --topic flink_kafka
• 1

● 查看某个Topic的详情

/export/server/kafka/bin/kafka-topics.sh --topic flink_kafka --describe --zookeeper node1:2181
• 1

● 删除topic

/export/server/kafka/bin/kafka-topics.sh --delete --zookeeper node1:2181 --topic flink_kafka
• 1

● 通过shell命令发送消息

/export/server/kafka/bin/kafka-console-producer.sh --broker-list node1:9092 --topic flink_kafka
• 1

● 通过shell消费消息

/export/server/kafka/bin/kafka-console-consumer.sh --bootstrap-server node1:9092 --topic flink_kafka --from-beginning 
• 1

● 修改分区

/export/server/kafka/bin/kafka-topics.sh --alter --partitions 4 --topic flink_kafka --zookeeper node1:2181
• 1

2.3.2 Kafka Consumer代码

需求:使用flink-connector-kafka_2.12中的FlinkKafkaConsumer消费Kafka中的数据做WordCount

需要设置如下参数:

  • 1.订阅的主题
  • 2.反序列化规则
  • 3.消费者属性-集群地址
  • 4.消费者属性-消费者组id(如果不设置,会有默认的,但是默认的不方便管理)
  • 5.消费者属性-offset重置规则,如earliest/latest…
  • 6.动态分区检测(当kafka的分区数变化/增加时,Flink能够检测到!)
  • 7.如果没有设置Checkpoint,那么可以设置自动提交offset,后续学习了Checkpoint会把offset随着做Checkpoint的时候提交到Checkpoint和默认主题中
/**
 * KafkaConsumer
 *
 * @author : YangLinWei
 * @createTime: 2022/3/7 5:17 下午
 */
public class KafkaConsumer {
    public static void main(String[] args) throws Exception {
        //1.env
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //2.Source
        Properties props = new Properties();
        props.setProperty("bootstrap.servers", "node1:9092");
        props.setProperty("group.id", "flink");
        props.setProperty("auto.offset.reset", "latest");
        props.setProperty("flink.partition-discovery.interval-millis", "5000");//会开启一个后台线程每隔5s检测一下Kafka的分区情况
        props.setProperty("enable.auto.commit", "true");
        props.setProperty("auto.commit.interval.ms", "2000");
        //kafkaSource就是KafkaConsumer
        FlinkKafkaConsumer<String> kafkaSource = new FlinkKafkaConsumer<>("flink_kafka", new SimpleStringSchema(), props);
        kafkaSource.setStartFromGroupOffsets();//设置从记录的offset开始消费,如果没有记录从auto.offset.reset配置开始消费
        //kafkaSource.setStartFromEarliest();//设置直接从Earliest消费,和auto.offset.reset配置无关
        DataStreamSource<String> kafkaDS = env.addSource(kafkaSource);
        //3.Transformation
        //3.1切割并记为1
        SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOneDS = kafkaDS.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
                String[] words = value.split(" ");
                for (String word : words) {
                    out.collect(Tuple2.of(word, 1));
                }
            }
        });
        //3.2分组
        KeyedStream<Tuple2<String, Integer>, Tuple> groupedDS = wordAndOneDS.keyBy(0);
        //3.3聚合
        SingleOutputStreamOperator<Tuple2<String, Integer>> result = groupedDS.sum(1);
        //4.Sink
        result.print();
        //5.execute
        env.execute();
    }
}

2.3.3 Kafka Producer代码

需求:将Flink集合中的数据通过自定义Sink保存到Kafka

/**
 * KafkaProducer
 *
 * @author : YangLinWei
 * @createTime: 2022/3/7 5:19 下午
 */
public class KafkaProducer {
    public static void main(String[] args) throws Exception {
        //1.env
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //2.Source
        DataStreamSource<Student> studentDS = env.fromElements(new Student(1, "tonyma", 18));
        //3.Transformation
        //注意:目前来说我们使用Kafka使用的序列化和反序列化都是直接使用最简单的字符串,所以先将Student转为字符串
        //可以直接调用Student的toString,也可以转为JSON
        SingleOutputStreamOperator<String> jsonDS = studentDS.map(new MapFunction<Student, String>() {
            @Override
            public String map(Student value) throws Exception {
                //String str = value.toString();
                String jsonStr = JSON.toJSONString(value);
                return jsonStr;
            }
        });
        //4.Sink
        jsonDS.print();
        //根据参数创建KafkaProducer/KafkaSink
        Properties props = new Properties();
        props.setProperty("bootstrap.servers", "node1:9092");
        FlinkKafkaProducer<String> kafkaSink = new FlinkKafkaProducer<>("flink_kafka", new SimpleStringSchema(), props);
        jsonDS.addSink(kafkaSink);
        //5.execute
        env.execute();
        // /export/server/kafka/bin/kafka-console-consumer.sh --bootstrap-server node1:9092 --topic flink_kafka
    }
    @Data
    @NoArgsConstructor
    @AllArgsConstructor
    public static class Student {
        private Integer id;
        private String name;
        private Integer age;
    }
}

2.4 Redis案例

参考:https://bahir.apache.org/docs/flink/current/flink-streaming-redis/

2.4.1 相关API

通过flink 操作redis 其实我们可以通过传统的redis 连接池Jpoools 进行redis 的相关操作,但是flink提供了专门操作redisRedisSink,使用起来更方便,而且不用我们考虑性能的问题,接下来将主要介绍RedisSink 如何使用。

RedisSink 核心类是RedisMapper是一个接口,使用时我们要编写自己的redis操作类实现这个接口中的三个方法,如下所示:

  • getCommandDescription() : 设置使用的redis 数据结构类型,和key 的名称,通过RedisCommand 设置数据结构类型
  • String getKeyFromData(T data):设置value 中的键值对key的值
  • String getValueFromData(T data):设置value 中的键值对value的值

使用RedisCommand设置数据结构类型时和redis结构对应关系:

Data Type Redis Command [Sink]
HASH HSET
LIST RPUSH, LPUSH
SET SADD
PUBSUB PUBLISH
STRING SET
HYPER_LOG_LOG PFADD
SORTED_SET ZADD
SORTED_SET ZADD
SORTED_SET ZREM

2.4.2 示例代码

需求:将Flink集合中的数据通过自定义Sink保存到Redis

代码如下:

/**
 * Connector-Redis
 *
 * @author : YangLinWei
 * @createTime: 2022/3/7 5:25 下午
 */
public class ConnectorsDemoRedis {
    public static void main(String[] args) throws Exception {
        //1.env
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //2.Source
        DataStream<String> linesDS = env.socketTextStream("node1", 9999);
        //3.Transformation
        //3.1切割并记为1
        SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOneDS = linesDS.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
                String[] words = value.split(" ");
                for (String word : words) {
                    out.collect(Tuple2.of(word, 1));
                }
            }
        });
        //3.2分组
        KeyedStream<Tuple2<String, Integer>, Tuple> groupedDS = wordAndOneDS.keyBy(0);
        //3.3聚合
        SingleOutputStreamOperator<Tuple2<String, Integer>> result = groupedDS.sum(1);
        //4.Sink
        result.print();
        // * 最后将结果保存到Redis
        // * 注意:存储到Redis的数据结构:使用hash也就是map
        // * key            value
        // * WordCount      (单词,数量)
        //-1.创建RedisSink之前需要创建RedisConfig
        //连接单机版Redis
        FlinkJedisPoolConfig conf = new FlinkJedisPoolConfig.Builder().setHost("127.0.0.1").build();
        //连接集群版Redis
        //HashSet<InetSocketAddress> nodes = new HashSet<>(Arrays.asList(new InetSocketAddress(InetAddress.getByName("node1"), 6379),new InetSocketAddress(InetAddress.getByName("node2"), 6379),new InetSocketAddress(InetAddress.getByName("node3"), 6379)));
        //FlinkJedisClusterConfig conf2 = new FlinkJedisClusterConfig.Builder().setNodes(nodes).build();
        //连接哨兵版Redis
        //Set<String> sentinels = new HashSet<>(Arrays.asList("node1:26379", "node2:26379", "node3:26379"));
        //FlinkJedisSentinelConfig conf3 = new FlinkJedisSentinelConfig.Builder().setMasterName("mymaster").setSentinels(sentinels).build();
        //-3.创建并使用RedisSink
        result.addSink(new RedisSink<Tuple2<String, Integer>>(conf, new RedisWordCountMapper()));
        //5.execute
        env.execute();
    }
    /**
     * -2.定义一个Mapper用来指定存储到Redis中的数据结构
     */
    public static class RedisWordCountMapper implements RedisMapper<Tuple2<String, Integer>> {
        @Override
        public RedisCommandDescription getCommandDescription() {
            return new RedisCommandDescription(RedisCommand.HSET, "WordCount");
        }
        @Override
        public String getKeyFromData(Tuple2<String, Integer> data) {
            return data.f0;
        }
        @Override
        public String getValueFromData(Tuple2<String, Integer> data) {
            return data.f1.toString();
        }
    }
}

03 文末

本文主要讲解与Flink批流一体API相关的连接器Connectors(连接器),谢谢大家的阅读,本文完!

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
目录
打赏
0
0
0
0
237
分享
相关文章
满血 DeepSeek 免费用?附联网搜索教程!暨第三方 API 平台全面横评
满血 DeepSeek 免费用!支持联网搜索!创作声明:真人攥写-非AI生成,Written-By-Human-Not-By-AI
716 8
满血 DeepSeek 免费用?附联网搜索教程!暨第三方 API 平台全面横评
Python 中调用 DeepSeek-R1 API的方法介绍,图文教程
本教程详细介绍了如何使用 Python 调用 DeepSeek 的 R1 大模型 API,适合编程新手。首先登录 DeepSeek 控制台获取 API Key,安装 Python 和 requests 库后,编写基础调用代码并运行。文末包含常见问题解答和更简单的可视化调用方法,建议收藏备用。 原文链接:[如何使用 Python 调用 DeepSeek-R1 API?](https://apifox.com/apiskills/how-to-call-the-deepseek-r1-api-using-python/)
Flink CDC YAML:面向数据集成的 API 设计
本文整理自阿里云智能集团 Flink PMC Member & Committer 徐榜江(雪尽)在 FFA 2024 分论坛的分享,涵盖四大主题:Flink CDC、YAML API、Transform + AI 和 Community。文章详细介绍了 Flink CDC 的发展历程及其优势,特别是 YAML API 的设计与实现,以及如何通过 Transform 和 AI 模型集成提升数据处理能力。最后,分享了社区动态和未来规划,欢迎更多开发者加入开源社区,共同推动 Flink CDC 的发展。
319 12
Flink CDC YAML:面向数据集成的 API 设计
快速调用 Deepseek API!【超详细教程】
Deepseek 强大的功能,在本教程中,将指导您如何获取 DeepSeek API 密钥,并演示如何使用该密钥调用 DeepSeek API 以进行调试。
如何调用 DeepSeek-R1 API ?图文教程
首先登录 DeepSeek 开放平台,创建并保存 API Key。接着,在 Apifox 中设置环境变量,导入 DeepSeek 提供的 cURL 并配置 Authorization 为 `Bearer {{API_KEY}}`。通过切换至正式环境发送请求,可实现对话功能,支持流式或整体输出。
检测指定TCP端口开放状态免费API接口教程
此API用于检测指定TCP端口是否开放,支持POST/GET请求。需提供用户ID、KEY、目标主机,可选指定端口(默认80)和地区(默认国内)。返回状态码、信息提示、检测主机、端口及状态(开放或关闭)。示例中ID和KEY为公共测试用,建议使用个人ID和KEY以享受更高调用频率。
79 14
提取网页所有链接免费API接口教程
此API用于提取指定网页内的所有链接信息并进行分类,支持POST和GET请求方式。需提供用户ID、KEY及目标网址等参数,可选指定访问节点。返回结果包括状态码、信息提示及各类链接集合,如图片、视频、文档等。示例中展示了请求格式与返回数据结构。
Flink CDC 在阿里云实时计算Flink版的云上实践
本文整理自阿里云高级开发工程师阮航在Flink Forward Asia 2024的分享,重点介绍了Flink CDC与实时计算Flink的集成、CDC YAML的核心功能及应用场景。主要内容包括:Flink CDC的发展及其在流批数据处理中的作用;CDC YAML支持的同步链路、Transform和Route功能、丰富的监控指标;典型应用场景如整库同步、Binlog原始数据同步、分库分表同步等;并通过两个Demo展示了MySQL整库同步到Paimon和Binlog同步到Kafka的过程。最后,介绍了未来规划,如脏数据处理、数据限流及扩展数据源支持。
199 0
Flink CDC 在阿里云实时计算Flink版的云上实践
实时计算UniFlow:Flink+Paimon构建流批一体实时湖仓
实时计算架构中,传统湖仓架构在数据流量管控和应用场景支持上表现良好,但在实际运营中常忽略细节,导致新问题。为解决这些问题,提出了流批一体的实时计算湖仓架构——UniFlow。该架构通过统一的流批计算引擎、存储格式(如Paimon)和Flink CDC工具,简化开发流程,降低成本,并确保数据一致性和实时性。UniFlow还引入了Flink Materialized Table,实现了声明式ETL,优化了调度和执行模式,使用户能灵活调整新鲜度与成本。最终,UniFlow不仅提高了开发和运维效率,还提供了更实时的数据支持,满足业务决策需求。
大数据实时计算产品的对比测评:实时计算Flink版 VS 自建Flink集群
本文介绍了实时计算Flink版与自建Flink集群的对比,涵盖部署成本、性能表现、易用性和企业级能力等方面。实时计算Flink版作为全托管服务,显著降低了运维成本,提供了强大的集成能力和弹性扩展,特别适合中小型团队和业务波动大的场景。文中还提出了改进建议,并探讨了与其他产品的联动可能性。总结指出,实时计算Flink版在简化运维、降低成本和提升易用性方面表现出色,是大数据实时计算的优选方案。
zdl
212 56

热门文章

最新文章

AI助理

你好,我是AI助理

可以解答问题、推荐解决方案等