Flink教程(09)- Flink批流一体API(Connectors示例)

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
云数据库 Redis 版,社区版 2GB
推荐场景:
搭建游戏排行榜
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
简介: Flink教程(09)- Flink批流一体API(Connectors示例)

01 引言

在前面的博客,我们已经对Flink的程序模型里的Sink使用有了一定的了解了,有兴趣的同学可以参阅下:

在前面的章节,我们知道了一些比较基本的 Source 和 Sink 已经内置在 Flink 里。

  • 预定义 data sources :支持从文件、目录、socket,以及 collectionsiterators 中读取数据。
  • 预定义 data sinks :支持把数据写入文件、标准输出(stdout)、标准错误输出(stderr)和 socket

而连接器(Connectors)可以和多种多样的第三方系统进行交互,本文来讲解下。

02 Connectors

Connectors连接器参考:https://nightlies.apache.org/flink/flink-docs-release-1.12/zh/dev/connectors/

2.1 Flink目前支持的Connectors

Flink目前支持以下系统:

在使用一种连接器时,通常需要额外的第三方组件,比如:数据存储服务器或者消息队列,要注意这些列举的连接器是Flink 工程的一部分,包含在发布的源码中,但是不包含在二进制发行版中。

Flink 还有些一些额外的连接器通过 Apache Bahir 发布, 包括:

系统 使用地方
Apache ActiveMQ source/sink
Apache Flume sink
Redis sink
Akka sink
Netty source

2.2 JDBC案例

参考:https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/connectors/jdbc.html

代码如下:

/**
 * Connectors -JDBC
 *
 * @author : YangLinWei
 * @createTime: 2022/3/7 4:59 下午
 */
public class ConnectorsDemo {
    public static void main(String[] args) throws Exception {
        //1.env
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //2.Source
        env.fromElements(new Student(null, "tonyma111", 18))
                //3.Transformation
                //4.Sink
                .addSink(JdbcSink.sink(
                        "INSERT INTO `t_student` (`id`, `name`, `age`) VALUES (null, ?, ?)",
                        (ps, s) -> {
                            ps.setString(1, s.getName());
                            ps.setInt(2, s.getAge());
                        },
                        new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
                                .withUrl("jdbc:mysql://localhost:3306/big_data")
                                .withUsername("root")
                                .withPassword("123456")
                                .withDriverName("com.mysql.jdbc.Driver")
                                .build()));
        //5.execute
        env.execute();
    }
    @Data
    @NoArgsConstructor
    @AllArgsConstructor
    public static class Student {
        private Integer id;
        private String name;
        private Integer age;
    }
}

运行结果:

2.3 Kafa案例

参考:https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html

--------------Flink 里已经提供了一些绑定的Connector,例如kafka sourcesinkEs sink等。读写 kafkaesrabbitMQ时可以直接使用相应connectorapi即可,虽然该部分是 Flink项目源代码里的一部分,但是真正意义上不算作Flink引擎相关逻辑,并且该部分没有打包在二进制的发布包里面。所以在提交Job 时候需要注意, job代码jar包中一定要将相应的connetor相关类打包进去,否则在提交作业时就会失败,提示找不到相应的类,或初始化某些类异常

2.3.1 Kafa相关命令

● 查看当前服务器中的所有topic:

/export/server/kafka/bin/kafka-topics.sh --list --zookeeper  node1:2181
• 1

● 创建topic

/export/server/kafka/bin/kafka-topics.sh --create --zookeeper node1:2181 --replication-factor 2 --partitions 3 --topic flink_kafka
• 1

● 查看某个Topic的详情

/export/server/kafka/bin/kafka-topics.sh --topic flink_kafka --describe --zookeeper node1:2181
• 1

● 删除topic

/export/server/kafka/bin/kafka-topics.sh --delete --zookeeper node1:2181 --topic flink_kafka
• 1

● 通过shell命令发送消息

/export/server/kafka/bin/kafka-console-producer.sh --broker-list node1:9092 --topic flink_kafka
• 1

● 通过shell消费消息

/export/server/kafka/bin/kafka-console-consumer.sh --bootstrap-server node1:9092 --topic flink_kafka --from-beginning 
• 1

● 修改分区

/export/server/kafka/bin/kafka-topics.sh --alter --partitions 4 --topic flink_kafka --zookeeper node1:2181
• 1

2.3.2 Kafka Consumer代码

需求:使用flink-connector-kafka_2.12中的FlinkKafkaConsumer消费Kafka中的数据做WordCount

需要设置如下参数:

  • 1.订阅的主题
  • 2.反序列化规则
  • 3.消费者属性-集群地址
  • 4.消费者属性-消费者组id(如果不设置,会有默认的,但是默认的不方便管理)
  • 5.消费者属性-offset重置规则,如earliest/latest…
  • 6.动态分区检测(当kafka的分区数变化/增加时,Flink能够检测到!)
  • 7.如果没有设置Checkpoint,那么可以设置自动提交offset,后续学习了Checkpoint会把offset随着做Checkpoint的时候提交到Checkpoint和默认主题中
/**
 * KafkaConsumer
 *
 * @author : YangLinWei
 * @createTime: 2022/3/7 5:17 下午
 */
public class KafkaConsumer {
    public static void main(String[] args) throws Exception {
        //1.env
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //2.Source
        Properties props = new Properties();
        props.setProperty("bootstrap.servers", "node1:9092");
        props.setProperty("group.id", "flink");
        props.setProperty("auto.offset.reset", "latest");
        props.setProperty("flink.partition-discovery.interval-millis", "5000");//会开启一个后台线程每隔5s检测一下Kafka的分区情况
        props.setProperty("enable.auto.commit", "true");
        props.setProperty("auto.commit.interval.ms", "2000");
        //kafkaSource就是KafkaConsumer
        FlinkKafkaConsumer<String> kafkaSource = new FlinkKafkaConsumer<>("flink_kafka", new SimpleStringSchema(), props);
        kafkaSource.setStartFromGroupOffsets();//设置从记录的offset开始消费,如果没有记录从auto.offset.reset配置开始消费
        //kafkaSource.setStartFromEarliest();//设置直接从Earliest消费,和auto.offset.reset配置无关
        DataStreamSource<String> kafkaDS = env.addSource(kafkaSource);
        //3.Transformation
        //3.1切割并记为1
        SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOneDS = kafkaDS.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
                String[] words = value.split(" ");
                for (String word : words) {
                    out.collect(Tuple2.of(word, 1));
                }
            }
        });
        //3.2分组
        KeyedStream<Tuple2<String, Integer>, Tuple> groupedDS = wordAndOneDS.keyBy(0);
        //3.3聚合
        SingleOutputStreamOperator<Tuple2<String, Integer>> result = groupedDS.sum(1);
        //4.Sink
        result.print();
        //5.execute
        env.execute();
    }
}

2.3.3 Kafka Producer代码

需求:将Flink集合中的数据通过自定义Sink保存到Kafka

/**
 * KafkaProducer
 *
 * @author : YangLinWei
 * @createTime: 2022/3/7 5:19 下午
 */
public class KafkaProducer {
    public static void main(String[] args) throws Exception {
        //1.env
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //2.Source
        DataStreamSource<Student> studentDS = env.fromElements(new Student(1, "tonyma", 18));
        //3.Transformation
        //注意:目前来说我们使用Kafka使用的序列化和反序列化都是直接使用最简单的字符串,所以先将Student转为字符串
        //可以直接调用Student的toString,也可以转为JSON
        SingleOutputStreamOperator<String> jsonDS = studentDS.map(new MapFunction<Student, String>() {
            @Override
            public String map(Student value) throws Exception {
                //String str = value.toString();
                String jsonStr = JSON.toJSONString(value);
                return jsonStr;
            }
        });
        //4.Sink
        jsonDS.print();
        //根据参数创建KafkaProducer/KafkaSink
        Properties props = new Properties();
        props.setProperty("bootstrap.servers", "node1:9092");
        FlinkKafkaProducer<String> kafkaSink = new FlinkKafkaProducer<>("flink_kafka", new SimpleStringSchema(), props);
        jsonDS.addSink(kafkaSink);
        //5.execute
        env.execute();
        // /export/server/kafka/bin/kafka-console-consumer.sh --bootstrap-server node1:9092 --topic flink_kafka
    }
    @Data
    @NoArgsConstructor
    @AllArgsConstructor
    public static class Student {
        private Integer id;
        private String name;
        private Integer age;
    }
}

2.4 Redis案例

参考:https://bahir.apache.org/docs/flink/current/flink-streaming-redis/

2.4.1 相关API

通过flink 操作redis 其实我们可以通过传统的redis 连接池Jpoools 进行redis 的相关操作,但是flink提供了专门操作redisRedisSink,使用起来更方便,而且不用我们考虑性能的问题,接下来将主要介绍RedisSink 如何使用。

RedisSink 核心类是RedisMapper是一个接口,使用时我们要编写自己的redis操作类实现这个接口中的三个方法,如下所示:

  • getCommandDescription() : 设置使用的redis 数据结构类型,和key 的名称,通过RedisCommand 设置数据结构类型
  • String getKeyFromData(T data):设置value 中的键值对key的值
  • String getValueFromData(T data):设置value 中的键值对value的值

使用RedisCommand设置数据结构类型时和redis结构对应关系:

Data Type Redis Command [Sink]
HASH HSET
LIST RPUSH, LPUSH
SET SADD
PUBSUB PUBLISH
STRING SET
HYPER_LOG_LOG PFADD
SORTED_SET ZADD
SORTED_SET ZADD
SORTED_SET ZREM

2.4.2 示例代码

需求:将Flink集合中的数据通过自定义Sink保存到Redis

代码如下:

/**
 * Connector-Redis
 *
 * @author : YangLinWei
 * @createTime: 2022/3/7 5:25 下午
 */
public class ConnectorsDemoRedis {
    public static void main(String[] args) throws Exception {
        //1.env
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //2.Source
        DataStream<String> linesDS = env.socketTextStream("node1", 9999);
        //3.Transformation
        //3.1切割并记为1
        SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOneDS = linesDS.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
                String[] words = value.split(" ");
                for (String word : words) {
                    out.collect(Tuple2.of(word, 1));
                }
            }
        });
        //3.2分组
        KeyedStream<Tuple2<String, Integer>, Tuple> groupedDS = wordAndOneDS.keyBy(0);
        //3.3聚合
        SingleOutputStreamOperator<Tuple2<String, Integer>> result = groupedDS.sum(1);
        //4.Sink
        result.print();
        // * 最后将结果保存到Redis
        // * 注意:存储到Redis的数据结构:使用hash也就是map
        // * key            value
        // * WordCount      (单词,数量)
        //-1.创建RedisSink之前需要创建RedisConfig
        //连接单机版Redis
        FlinkJedisPoolConfig conf = new FlinkJedisPoolConfig.Builder().setHost("127.0.0.1").build();
        //连接集群版Redis
        //HashSet<InetSocketAddress> nodes = new HashSet<>(Arrays.asList(new InetSocketAddress(InetAddress.getByName("node1"), 6379),new InetSocketAddress(InetAddress.getByName("node2"), 6379),new InetSocketAddress(InetAddress.getByName("node3"), 6379)));
        //FlinkJedisClusterConfig conf2 = new FlinkJedisClusterConfig.Builder().setNodes(nodes).build();
        //连接哨兵版Redis
        //Set<String> sentinels = new HashSet<>(Arrays.asList("node1:26379", "node2:26379", "node3:26379"));
        //FlinkJedisSentinelConfig conf3 = new FlinkJedisSentinelConfig.Builder().setMasterName("mymaster").setSentinels(sentinels).build();
        //-3.创建并使用RedisSink
        result.addSink(new RedisSink<Tuple2<String, Integer>>(conf, new RedisWordCountMapper()));
        //5.execute
        env.execute();
    }
    /**
     * -2.定义一个Mapper用来指定存储到Redis中的数据结构
     */
    public static class RedisWordCountMapper implements RedisMapper<Tuple2<String, Integer>> {
        @Override
        public RedisCommandDescription getCommandDescription() {
            return new RedisCommandDescription(RedisCommand.HSET, "WordCount");
        }
        @Override
        public String getKeyFromData(Tuple2<String, Integer> data) {
            return data.f0;
        }
        @Override
        public String getValueFromData(Tuple2<String, Integer> data) {
            return data.f1.toString();
        }
    }
}

03 文末

本文主要讲解与Flink批流一体API相关的连接器Connectors(连接器),谢谢大家的阅读,本文完!

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
目录
相关文章
|
1月前
|
数据采集 监控 安全
各种业务场景调用API代理的API接口教程
API代理的API接口在各种业务场景中具有广泛的应用,本文将介绍哪些业务场景可以使用API代理的API接口,并提供详细的调用教程和代码演示,同时,我们还将讨论在不同场景下使用API代理的API接口所带来的好处。
|
17天前
|
JSON 监控 API
在API接口对接中关键示例问题(1)
在API接口对接中,有几个关键的问题需要注意,以确保接口的稳定性、安全性和易用性。以下是这些问题及部分示例代码的简要概述
|
1月前
|
安全 API 数据安全/隐私保护
email api接口配置教程步骤详解
Email API是用于程序化访问邮件服务的工具,让开发者能集成邮件功能到应用中。配置Email API包括选择供应商(如SendGrid、Mailgun、AokSend),注册获取API密钥,配置API参数,及测试邮件发送。使用Email API能提升邮件发送的可靠性和效率,便于邮件管理及营销活动。AokSend支持大量验证码发送,适合高效邮件运营。
|
1月前
|
SQL 分布式计算 测试技术
Flink API的4个层次
【2月更文挑战第28天】
|
1月前
|
消息中间件 SQL Kafka
如何高效接入 Flink: Connecter / Catalog API 核心设计与社区进展
本文整理自阿里云实时计算团队 Apache Flink Committer 和 PMC Member 任庆盛在 FFA 2023 核心技术专场(二)中的分享。
289 0
如何高效接入 Flink: Connecter / Catalog API 核心设计与社区进展
|
1月前
|
Java API PHP
获取1688商品详情API:步骤与代码示例
在电子商务领域,阿里巴巴的1688平台是一个广受商家和开发者欢迎的批发交易市场。若您是一名开发者,希望建立自己的应用程序或网站来获取并展示1688上的商品信息,您可能需要使用到1688提供的API接口。以下是获取1688商品详情API的详细步骤说明。
|
1月前
|
分布式计算 API 数据处理
Flink【基础知识 01】(简介+核心架构+分层API+集群架构+应用场景+特点优势)(一篇即可大概了解flink)
【2月更文挑战第15天】Flink【基础知识 01】(简介+核心架构+分层API+集群架构+应用场景+特点优势)(一篇即可大概了解flink)
60 1
|
1月前
|
Java API
Java 日期和时间 API:实用技巧与示例 - 轻松处理日期和时间
简介 Scanner 类用于获取用户输入,它位于 java.util 包中。 使用 Scanner 类 要使用 Scanner 类,请执行以下步骤: 导入 java.util.Scanner 包。 创建一个 Scanner 对象,并将其初始化为 System.in。 使用 Scanner 对象的方法读取用户输入。
54 1
|
2月前
|
存储 负载均衡 API
部署大模型API的实战教程
部署大模型API的实战教程可以分为以下步骤:
|
2月前
|
消息中间件 Kafka Apache
Apache Flink 是一个开源的分布式流处理框架
Apache Flink 是一个开源的分布式流处理框架
482 5