1 预定义Sink
1.1 基于控制台和文件的Sink
⚫ API
1.ds.print 直接输出到控制台
2.ds.printToErr() 直接输出到控制台,用红色
3.ds.writeAsText(“本地/HDFS的path”,WriteMode.OVERWRITE).setParallelism(1)
⚫ 注意:
在输出到path的时候,可以在前面设置并行度,如果
并行度>1,则path为目录
并行度=1,则path为文件名
⚫ 代码演示:
package cn.oldlu.sink; import org.apache.flink.core.fs.FileSystem; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; /** * Author oldlu * Desc * 1.ds.print 直接输出到控制台 * 2.ds.printToErr() 直接输出到控制台,用红色 * 3.ds.collect 将分布式数据收集为本地集合 * 4.ds.setParallelism(1).writeAsText("本地/HDFS的path",WriteMode.OVERWRITE) */ public class SinkDemo01 { public static void main(String[] args) throws Exception { //1.env StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //2.source //DataStream<String> ds = env.fromElements("hadoop", "flink"); DataStream<String> ds = env.readTextFile("data/input/words.txt"); //3.transformation //4.sink ds.print(); ds.printToErr(); ds.writeAsText("data/output/test", FileSystem.WriteMode.OVERWRITE).setParallelism(2); //注意: //Parallelism=1为文件 //Parallelism>1为文件夹 //5.execute env.execute(); } }
2 自定义Sink
2.1 MySQL
⚫ 需求:
将Flink集合中的数据通过自定义Sink保存到MySQL
⚫ 代码实现:
package cn.oldlu.sink; import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; import java.sql.Connection; import java.sql.DriverManager; import java.sql.PreparedStatement; /** * Author oldlu * Desc * 使用自定义sink将数据保存到MySQL */ public class SinkDemo02_MySQL { public static void main(String[] args) throws Exception { //1.env StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //2.Source DataStream<Student> studentDS = env.fromElements(new Student(null, "tonyma", 18)); //3.Transformation //4.Sink studentDS.addSink(new MySQLSink()); //5.execute env.execute(); } @Data @NoArgsConstructor @AllArgsConstructor public static class Student { private Integer id; private String name; private Integer age; } public static class MySQLSink extends RichSinkFunction<Student> { private Connection conn = null; private PreparedStatement ps = null; @Override public void open(Configuration parameters) throws Exception { //加载驱动,开启连接 //Class.forName("com.mysql.jdbc.Driver"); conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/bigdata", "root", "root"); String sql = "INSERT INTO `t_student` (`id`, `name`, `age`) VALUES (null, ?, ?)"; ps = conn.prepareStatement(sql); } @Override public void invoke(Student value, Context context) throws Exception { //给ps中的?设置具体值 ps.setString(1,value.getName()); ps.setInt(2,value.getAge()); //执行sql ps.executeUpdate(); } @Override public void close() throws Exception { if (conn != null) conn.close(); if (ps != null) ps.close(); } } }
3. Connectors
3.1 JDBC
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/connectors/jdbc.html
package cn.oldlu.connectors; import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; import org.apache.flink.connector.jdbc.JdbcConnectionOptions; import org.apache.flink.connector.jdbc.JdbcSink; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; /** * Author oldlu * Desc */ public class ConnectorsDemo_JDBC { public static void main(String[] args) throws Exception { //1.env StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //2.Source env.fromElements(new Student(null, "tonyma", 18)) //3.Transformation //4.Sink .addSink(JdbcSink.sink( "INSERT INTO `t_student` (`id`, `name`, `age`) VALUES (null, ?, ?)", (ps, s) -> { ps.setString(1, s.getName()); ps.setInt(2, s.getAge()); }, new JdbcConnectionOptions.JdbcConnectionOptionsBuilder() .withUrl("jdbc:mysql://localhost:3306/bigdata") .withUsername("root") .withPassword("root") .withDriverName("com.mysql.jdbc.Driver") .build())); //5.execute env.execute(); } @Data @NoArgsConstructor @AllArgsConstructor public static class Student { private Integer id; private String name; private Integer age; } }
3.2 Kafka
3.2.1 pom依赖
Flink 里已经提供了一些绑定的 Connector,例如 kafka source 和 sink,Es sink 等。读写
kafka、es、rabbitMQ 时可以直接使用相应 connector 的 api 即可,虽然该部分是 Flink 项目
源代码里的一部分,但是真正意义上不算作 Flink 引擎相关逻辑,并且该部分没有打包在二进制
的发布包里面。所以在提交 Job 时候需要注意, job 代码 jar 包中一定要将相应的 connetor
相关类打包进去,否则在提交作业时就会失败,提示找不到相应的类,或初始化某些类异常。
https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html
3.2.2 参数设置
以下参数都必须/建议设置上
1.订阅的主题
2.反序列化规则
3.消费者属性-集群地址
4.消费者属性-消费者组id(如果不设置,会有默认的,但是默认的不方便管理)
5.消费者属性-offset重置规则,如earliest/latest…
6.动态分区检测(当kafka的分区数变化/增加时,Flink能够检测到!)
7.如果没有设置Checkpoint,那么可以设置自动提交offset,后续学习了Checkpoint会把offset随着做
Checkpoint的时候提交到Checkpoint和默认主题中
3.2.3 参数说明
实际的生产环境中可能有这样一些需求,比如:
场景一:有一个 Flink 作业需要将五份数据聚合到一起,五份数据对应五个 kafka topic,随着
业务增长,新增一类数据,同时新增了一个 kafka topic,如何在不重启作业的情况下作业自动感
知新的 topic。
场景二:作业从一个固定的 kafka topic 读数据,开始该 topic 有 10 个 partition,但随着业
务的增长数据量变大,需要对 kafka partition 个数进行扩容,由 10 个扩容到 20。该情况下如
何在不重启作业情况下动态感知新扩容的 partition?
针对上面的两种场景,首先需要在构建 FlinkKafkaConsumer 时的 properties 中设置
flink.partition-discovery.interval-millis 参数为非负值,表示开启动态发现的开关,以及设置的时
间间隔。此时 FlinkKafkaConsumer 内部会启动一个单独的线程定期去 kafka 获取最新的 meta
信息。
针对场景一,还需在构建 FlinkKafkaConsumer 时,topic 的描述可以传一个正则表达式描述的
pattern。每次获取最新 kafka meta 时获取正则匹配的最新 topic 列表。
针对场景二,设置前面的动态发现参数,在定期获取 kafka 最新 meta 信息时会匹配新的
partition。为了保证数据的正确性,新发现的 partition 从最早的位置开始读取。
注意:
开启 checkpoint 时 offset 是 Flink 通过状态 state 管理和恢复的,并不是从 kafka 的 offset
位置恢复。在 checkpoint 机制下,作业从最近一次checkpoint 恢复,本身是会回放部分历史数
据,导致部分数据重复消费,Flink 引擎仅保证计算状态的精准一次,要想做到端到端精准一次需
要依赖一些幂等的存储系统或者事务操作。
3.2.4 Kafka命令
● 查看当前服务器中的所有topic
/export/server/kafka/bin/kafka-topics.sh --list --zookeeper node1:2181
● 创建topic
/export/server/kafka/bin/kafka-topics.sh --create --zookeeper node1:2181 --replication-factor 2
–partitions 3 --topic flink_kafka
● 查看某个Topic的详情
/export/server/kafka/bin/kafka-topics.sh --topic flink_kafka --describe --zookeeper node1:2181
● 删除topic
/export/server/kafka/bin/kafka-topics.sh --delete --zookeeper node1:2181 --topic flink_kafka
● 通过shell命令发送消息
/export/server/kafka/bin/kafka-console-producer.sh --broker-list node1:9092 --topic
flink_kafka
● 通过shell消费消息
/export/server/kafka/bin/kafka-console-consumer.sh --bootstrap-server node1:9092 --topic
flink_kafka --from-beginning
● 修改分区
/export/server/kafka/bin/kafka-topics.sh --alter --partitions 4 --topic flink_kafka --zookeeper
node1:2181
3.2.5 代码实现-Kafka Consumer
package cn.oldlu.connectors; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.api.java.tuple.Tuple; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.KeyedStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; import org.apache.flink.util.Collector; import java.util.Properties; /** * Author oldlu * Desc * 需求:使用flink-connector-kafka_2.12中的FlinkKafkaConsumer消费Kafka中的数据做WordCount * 需要设置如下参数: * 1.订阅的主题 * 2.反序列化规则 * 3.消费者属性-集群地址 * 4.消费者属性-消费者组id(如果不设置,会有默认的,但是默认的不方便管理) * 5.消费者属性-offset重置规则,如earliest/latest... * 6.动态分区检测(当kafka的分区数变化/增加时,Flink能够检测到!) * 7.如果没有设置Checkpoint,那么可以设置自动提交offset,后续学习了Checkpoint会把offset随着做Checkpoint的时候提交到Checkpoint和默认主题中 */ public class ConnectorsDemo_KafkaConsumer { public static void main(String[] args) throws Exception { //1.env StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //2.Source Properties props = new Properties(); props.setProperty("bootstrap.servers", "node1:9092"); props.setProperty("group.id", "flink"); props.setProperty("auto.offset.reset","latest"); props.setProperty("flink.partition-discovery.interval-millis","5000");//会开启一个后台线程每隔5s检测一下Kafka的分区情况 props.setProperty("enable.auto.commit", "true"); props.setProperty("auto.commit.interval.ms", "2000"); //kafkaSource就是KafkaConsumer FlinkKafkaConsumer<String> kafkaSource = new FlinkKafkaConsumer<>("flink_kafka", new SimpleStringSchema(), props); kafkaSource.setStartFromGroupOffsets();//设置从记录的offset开始消费,如果没有记录从auto.offset.reset配置开始消费 //kafkaSource.setStartFromEarliest();//设置直接从Earliest消费,和auto.offset.reset配置无关 DataStreamSource<String> kafkaDS = env.addSource(kafkaSource); //3.Transformation //3.1切割并记为1 SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOneDS = kafkaDS.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() { @Override public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception { String[] words = value.split(" "); for (String word : words) { out.collect(Tuple2.of(word, 1)); } } }); //3.2分组 KeyedStream<Tuple2<String, Integer>, Tuple> groupedDS = wordAndOneDS.keyBy(0); //3.3聚合 SingleOutputStreamOperator<Tuple2<String, Integer>> result = groupedDS.sum(1); //4.Sink result.print(); //5.execute env.execute(); } }
3.2.6 代码实现-Kafka Producer
⚫ 需求:
将Flink集合中的数据通过自定义Sink保存到Kafka
⚫ 代码实现
package cn.oldlu.connectors; import com.alibaba.fastjson.JSON; import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer; import java.util.Properties; /** * Author oldlu * Desc * 使用自定义sink-官方提供的flink-connector-kafka_2.12-将数据保存到Kafka */ public class ConnectorsDemo_KafkaProducer { public static void main(String[] args) throws Exception { //1.env StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //2.Source DataStreamSource<Student> studentDS = env.fromElements(new Student(1, "tonyma", 18)); //3.Transformation //注意:目前来说我们使用Kafka使用的序列化和反序列化都是直接使用最简单的字符串,所以先将Student转为字符串 //可以直接调用Student的toString,也可以转为JSON SingleOutputStreamOperator<String> jsonDS = studentDS.map(new MapFunction<Student, String>() { @Override public String map(Student value) throws Exception { //String str = value.toString(); String jsonStr = JSON.toJSONString(value); return jsonStr; } }); //4.Sink jsonDS.print(); //根据参数创建KafkaProducer/KafkaSink Properties props = new Properties(); props.setProperty("bootstrap.servers", "node1:9092"); FlinkKafkaProducer<String> kafkaSink = new FlinkKafkaProducer<>("flink_kafka", new SimpleStringSchema(), props); jsonDS.addSink(kafkaSink); //5.execute env.execute(); // /export/server/kafka/bin/kafka-console-consumer.sh --bootstrap-server node1:9092 --topic flink_kafka } @Data @NoArgsConstructor @AllArgsConstructor public static class Student { private Integer id; private String name; private Integer age; } }
3.3 Redis
⚫ API
通过flink 操作redis 其实我们可以通过传统的redis 连接池Jpoools 进行redis 的相关操作,但是
flink 提供了专门操作redis 的RedisSink,使用起来更方便,而且不用我们考虑性能的问题,接下
来将主要介绍RedisSink 如何使用。
https://bahir.apache.org/docs/flink/current/flink-streaming-redis/
RedisSink 核心类是RedisMapper 是一个接口,使用时我们要编写自己的redis 操作类实现这个
接口中的三个方法,如下所示
1.getCommandDescription() :
设置使用的redis 数据结构类型,和key 的名称,通过RedisCommand 设置数据结构类型
2.String getKeyFromData(T data):
设置value 中的键值对key的值
3.String getValueFromData(T data);
设置value 中的键值对value的值
⚫ 使用RedisCommand设置数据结构类型时和redis结构对应关系
⚫ 需求
将Flink集合中的数据通过自定义Sink保存到Redis
⚫ 代码实现
package cn.oldlu.connectors; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.java.tuple.Tuple; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.KeyedStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.redis.RedisSink; import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig; import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand; import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription; import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper; import org.apache.flink.util.Collector; /** * Author oldlu * Desc * 需求: * 接收消息并做WordCount, * 最后将结果保存到Redis * 注意:存储到Redis的数据结构:使用hash也就是map * key value * WordCount (单词,数量) */ public class ConnectorsDemo_Redis { public static void main(String[] args) throws Exception { //1.env StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //2.Source DataStream<String> linesDS = env.socketTextStream("node1", 9999); //3.Transformation //3.1切割并记为1 SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOneDS = linesDS.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() { @Override public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception { String[] words = value.split(" "); for (String word : words) { out.collect(Tuple2.of(word, 1)); } } }); //3.2分组 KeyedStream<Tuple2<String, Integer>, Tuple> groupedDS = wordAndOneDS.keyBy(0); //3.3聚合 SingleOutputStreamOperator<Tuple2<String, Integer>> result = groupedDS.sum(1); //4.Sink result.print(); // * 最后将结果保存到Redis // * 注意:存储到Redis的数据结构:使用hash也就是map // * key value // * WordCount (单词,数量) //-1.创建RedisSink之前需要创建RedisConfig //连接单机版Redis FlinkJedisPoolConfig conf = new FlinkJedisPoolConfig.Builder().setHost("127.0.0.1").build(); //连接集群版Redis //HashSet<InetSocketAddress> nodes = new HashSet<>(Arrays.asList(new InetSocketAddress(InetAddress.getByName("node1"), 6379),new InetSocketAddress(InetAddress.getByName("node2"), 6379),new InetSocketAddress(InetAddress.getByName("node3"), 6379))); //FlinkJedisClusterConfig conf2 = new FlinkJedisClusterConfig.Builder().setNodes(nodes).build(); //连接哨兵版Redis //Set<String> sentinels = new HashSet<>(Arrays.asList("node1:26379", "node2:26379", "node3:26379")); //FlinkJedisSentinelConfig conf3 = new FlinkJedisSentinelConfig.Builder().setMasterName("mymaster").setSentinels(sentinels).build(); //-3.创建并使用RedisSink result.addSink(new RedisSink<Tuple2<String, Integer>>(conf, new RedisWordCountMapper())); //5.execute env.execute(); } /** * -2.定义一个Mapper用来指定存储到Redis中的数据结构 */ public static class RedisWordCountMapper implements RedisMapper<Tuple2<String, Integer>> { @Override public RedisCommandDescription getCommandDescription() { return new RedisCommandDescription(RedisCommand.HSET, "WordCount"); } @Override public String getKeyFromData(Tuple2<String, Integer> data) { return data.f0; } @Override public String getValueFromData(Tuple2<String, Integer> data) { return data.f1.toString(); } } }