01 引言
在前面的博客,我们已经对Flink
的程序模型里的Sink
使用有了一定的了解了,有兴趣的同学可以参阅下:
- 《Flink教程(01)- Flink知识图谱》
- 《Flink教程(02)- Flink入门》
- 《Flink教程(03)- Flink环境搭建》
- 《Flink教程(04)- Flink入门案例》
- 《Flink教程(05)- Flink原理简单分析》
- 《Flink教程(06)- Flink批流一体API(Source示例)》
- 《Flink教程(07)- Flink批流一体API(Transformation示例)》
- 《Flink教程(08)- Flink批流一体API(Sink示例)》
在前面的章节,我们知道了一些比较基本的 Source 和 Sink 已经内置在 Flink 里。
- 预定义 data sources :支持从文件、目录、
socket
,以及collections
和iterators
中读取数据。 - 预定义 data sinks :支持把数据写入文件、标准输出(
stdout
)、标准错误输出(stderr
)和socket
。
而连接器(Connectors
)可以和多种多样的第三方系统进行交互,本文来讲解下。
02 Connectors
Connectors连接器参考:https://nightlies.apache.org/flink/flink-docs-release-1.12/zh/dev/connectors/
2.1 Flink目前支持的Connectors
Flink目前支持以下系统:
系统 | 使用地方 |
Apache Kafka | source/sink |
Apache Cassandra | sink |
Amazon Kinesis Streams | source/sink |
Elasticsearch | sink |
FileSystem(包括 Hadoop ) - 仅支持流 | sink |
FileSystem(包括 Hadoop ) - 流批统一 | sink |
RabbitMQ | source/sink |
Apache NiFi | source/sink |
Twitter Streaming API | source |
Google PubSub | source/sink |
JDBC | sink |
在使用一种连接器时,通常需要额外的第三方组件,比如:数据存储服务器或者消息队列,要注意这些列举的连接器是Flink
工程的一部分,包含在发布的源码中,但是不包含在二进制发行版中。
Flink
还有些一些额外的连接器通过 Apache Bahir 发布, 包括:
系统 | 使用地方 |
Apache ActiveMQ | source/sink |
Apache Flume | sink |
Redis | sink |
Akka | sink |
Netty | source |
2.2 JDBC案例
参考:https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/connectors/jdbc.html
代码如下:
/** * Connectors -JDBC * * @author : YangLinWei * @createTime: 2022/3/7 4:59 下午 */ public class ConnectorsDemo { public static void main(String[] args) throws Exception { //1.env StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //2.Source env.fromElements(new Student(null, "tonyma111", 18)) //3.Transformation //4.Sink .addSink(JdbcSink.sink( "INSERT INTO `t_student` (`id`, `name`, `age`) VALUES (null, ?, ?)", (ps, s) -> { ps.setString(1, s.getName()); ps.setInt(2, s.getAge()); }, new JdbcConnectionOptions.JdbcConnectionOptionsBuilder() .withUrl("jdbc:mysql://localhost:3306/big_data") .withUsername("root") .withPassword("123456") .withDriverName("com.mysql.jdbc.Driver") .build())); //5.execute env.execute(); } @Data @NoArgsConstructor @AllArgsConstructor public static class Student { private Integer id; private String name; private Integer age; } }
运行结果:
2.3 Kafa案例
参考:https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html
--------------
Flink
里已经提供了一些绑定的Connector
,例如kafka source
和sink
,Es sink
等。读写kafka
、es
、rabbitMQ
时可以直接使用相应connector
的api
即可,虽然该部分是Flink
项目源代码里的一部分,但是真正意义上不算作Flink
引擎相关逻辑,并且该部分没有打包在二进制的发布包里面。所以在提交Job
时候需要注意,job
代码jar
包中一定要将相应的connetor
相关类打包进去,否则在提交作业时就会失败,提示找不到相应的类,或初始化某些类异常。
2.3.1 Kafa相关命令
● 查看当前服务器中的所有topic
:
/export/server/kafka/bin/kafka-topics.sh --list --zookeeper node1:2181 • 1
● 创建topic
/export/server/kafka/bin/kafka-topics.sh --create --zookeeper node1:2181 --replication-factor 2 --partitions 3 --topic flink_kafka • 1
● 查看某个Topic的详情
/export/server/kafka/bin/kafka-topics.sh --topic flink_kafka --describe --zookeeper node1:2181 • 1
● 删除topic
/export/server/kafka/bin/kafka-topics.sh --delete --zookeeper node1:2181 --topic flink_kafka • 1
● 通过shell命令发送消息
/export/server/kafka/bin/kafka-console-producer.sh --broker-list node1:9092 --topic flink_kafka • 1
● 通过shell消费消息
/export/server/kafka/bin/kafka-console-consumer.sh --bootstrap-server node1:9092 --topic flink_kafka --from-beginning • 1
● 修改分区
/export/server/kafka/bin/kafka-topics.sh --alter --partitions 4 --topic flink_kafka --zookeeper node1:2181 • 1
2.3.2 Kafka Consumer代码
需求:使用flink-connector-kafka_2.12
中的FlinkKafkaConsumer
消费Kafka
中的数据做WordCount
需要设置如下参数:
- 1.订阅的主题
- 2.反序列化规则
- 3.消费者属性-集群地址
- 4.消费者属性-消费者组id(如果不设置,会有默认的,但是默认的不方便管理)
- 5.消费者属性-offset重置规则,如earliest/latest…
- 6.动态分区检测(当kafka的分区数变化/增加时,Flink能够检测到!)
- 7.如果没有设置Checkpoint,那么可以设置自动提交offset,后续学习了Checkpoint会把offset随着做Checkpoint的时候提交到Checkpoint和默认主题中
/** * KafkaConsumer * * @author : YangLinWei * @createTime: 2022/3/7 5:17 下午 */ public class KafkaConsumer { public static void main(String[] args) throws Exception { //1.env StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //2.Source Properties props = new Properties(); props.setProperty("bootstrap.servers", "node1:9092"); props.setProperty("group.id", "flink"); props.setProperty("auto.offset.reset", "latest"); props.setProperty("flink.partition-discovery.interval-millis", "5000");//会开启一个后台线程每隔5s检测一下Kafka的分区情况 props.setProperty("enable.auto.commit", "true"); props.setProperty("auto.commit.interval.ms", "2000"); //kafkaSource就是KafkaConsumer FlinkKafkaConsumer<String> kafkaSource = new FlinkKafkaConsumer<>("flink_kafka", new SimpleStringSchema(), props); kafkaSource.setStartFromGroupOffsets();//设置从记录的offset开始消费,如果没有记录从auto.offset.reset配置开始消费 //kafkaSource.setStartFromEarliest();//设置直接从Earliest消费,和auto.offset.reset配置无关 DataStreamSource<String> kafkaDS = env.addSource(kafkaSource); //3.Transformation //3.1切割并记为1 SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOneDS = kafkaDS.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() { @Override public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception { String[] words = value.split(" "); for (String word : words) { out.collect(Tuple2.of(word, 1)); } } }); //3.2分组 KeyedStream<Tuple2<String, Integer>, Tuple> groupedDS = wordAndOneDS.keyBy(0); //3.3聚合 SingleOutputStreamOperator<Tuple2<String, Integer>> result = groupedDS.sum(1); //4.Sink result.print(); //5.execute env.execute(); } }
2.3.3 Kafka Producer代码
需求:将Flink
集合中的数据通过自定义Sink
保存到Kafka
/** * KafkaProducer * * @author : YangLinWei * @createTime: 2022/3/7 5:19 下午 */ public class KafkaProducer { public static void main(String[] args) throws Exception { //1.env StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //2.Source DataStreamSource<Student> studentDS = env.fromElements(new Student(1, "tonyma", 18)); //3.Transformation //注意:目前来说我们使用Kafka使用的序列化和反序列化都是直接使用最简单的字符串,所以先将Student转为字符串 //可以直接调用Student的toString,也可以转为JSON SingleOutputStreamOperator<String> jsonDS = studentDS.map(new MapFunction<Student, String>() { @Override public String map(Student value) throws Exception { //String str = value.toString(); String jsonStr = JSON.toJSONString(value); return jsonStr; } }); //4.Sink jsonDS.print(); //根据参数创建KafkaProducer/KafkaSink Properties props = new Properties(); props.setProperty("bootstrap.servers", "node1:9092"); FlinkKafkaProducer<String> kafkaSink = new FlinkKafkaProducer<>("flink_kafka", new SimpleStringSchema(), props); jsonDS.addSink(kafkaSink); //5.execute env.execute(); // /export/server/kafka/bin/kafka-console-consumer.sh --bootstrap-server node1:9092 --topic flink_kafka } @Data @NoArgsConstructor @AllArgsConstructor public static class Student { private Integer id; private String name; private Integer age; } }
2.4 Redis案例
参考:https://bahir.apache.org/docs/flink/current/flink-streaming-redis/
2.4.1 相关API
通过flink
操作redis
其实我们可以通过传统的redis
连接池Jpoools
进行redis
的相关操作,但是flink
提供了专门操作redis
的RedisSink
,使用起来更方便,而且不用我们考虑性能的问题,接下来将主要介绍RedisSink
如何使用。
RedisSink
核心类是RedisMapper
是一个接口,使用时我们要编写自己的redis
操作类实现这个接口中的三个方法,如下所示:
- getCommandDescription() : 设置使用的redis 数据结构类型,和key 的名称,通过RedisCommand 设置数据结构类型
- String getKeyFromData(T data):设置value 中的键值对key的值
- String getValueFromData(T data):设置value 中的键值对value的值
使用RedisCommand
设置数据结构类型时和redis
结构对应关系:
Data Type | Redis Command [Sink] |
HASH | HSET |
LIST | RPUSH, LPUSH |
SET | SADD |
PUBSUB | PUBLISH |
STRING | SET |
HYPER_LOG_LOG | PFADD |
SORTED_SET | ZADD |
SORTED_SET | ZADD |
SORTED_SET | ZREM |
2.4.2 示例代码
需求:将Flink
集合中的数据通过自定义Sink
保存到Redis
代码如下:
/** * Connector-Redis * * @author : YangLinWei * @createTime: 2022/3/7 5:25 下午 */ public class ConnectorsDemoRedis { public static void main(String[] args) throws Exception { //1.env StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //2.Source DataStream<String> linesDS = env.socketTextStream("node1", 9999); //3.Transformation //3.1切割并记为1 SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOneDS = linesDS.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() { @Override public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception { String[] words = value.split(" "); for (String word : words) { out.collect(Tuple2.of(word, 1)); } } }); //3.2分组 KeyedStream<Tuple2<String, Integer>, Tuple> groupedDS = wordAndOneDS.keyBy(0); //3.3聚合 SingleOutputStreamOperator<Tuple2<String, Integer>> result = groupedDS.sum(1); //4.Sink result.print(); // * 最后将结果保存到Redis // * 注意:存储到Redis的数据结构:使用hash也就是map // * key value // * WordCount (单词,数量) //-1.创建RedisSink之前需要创建RedisConfig //连接单机版Redis FlinkJedisPoolConfig conf = new FlinkJedisPoolConfig.Builder().setHost("127.0.0.1").build(); //连接集群版Redis //HashSet<InetSocketAddress> nodes = new HashSet<>(Arrays.asList(new InetSocketAddress(InetAddress.getByName("node1"), 6379),new InetSocketAddress(InetAddress.getByName("node2"), 6379),new InetSocketAddress(InetAddress.getByName("node3"), 6379))); //FlinkJedisClusterConfig conf2 = new FlinkJedisClusterConfig.Builder().setNodes(nodes).build(); //连接哨兵版Redis //Set<String> sentinels = new HashSet<>(Arrays.asList("node1:26379", "node2:26379", "node3:26379")); //FlinkJedisSentinelConfig conf3 = new FlinkJedisSentinelConfig.Builder().setMasterName("mymaster").setSentinels(sentinels).build(); //-3.创建并使用RedisSink result.addSink(new RedisSink<Tuple2<String, Integer>>(conf, new RedisWordCountMapper())); //5.execute env.execute(); } /** * -2.定义一个Mapper用来指定存储到Redis中的数据结构 */ public static class RedisWordCountMapper implements RedisMapper<Tuple2<String, Integer>> { @Override public RedisCommandDescription getCommandDescription() { return new RedisCommandDescription(RedisCommand.HSET, "WordCount"); } @Override public String getKeyFromData(Tuple2<String, Integer> data) { return data.f0; } @Override public String getValueFromData(Tuple2<String, Integer> data) { return data.f1.toString(); } } }
03 文末
本文主要讲解与Flink
批流一体API
相关的连接器Connectors
(连接器),谢谢大家的阅读,本文完!