Flink教程(09)- Flink批流一体API(Connectors示例)

本文涉及的产品
云数据库 Tair(兼容Redis),内存型 2GB
Redis 开源版,标准版 2GB
推荐场景:
搭建游戏排行榜
实时计算 Flink 版,5000CU*H 3个月
简介: Flink教程(09)- Flink批流一体API(Connectors示例)

01 引言

在前面的博客,我们已经对Flink的程序模型里的Sink使用有了一定的了解了,有兴趣的同学可以参阅下:

在前面的章节,我们知道了一些比较基本的 Source 和 Sink 已经内置在 Flink 里。

  • 预定义 data sources :支持从文件、目录、socket,以及 collectionsiterators 中读取数据。
  • 预定义 data sinks :支持把数据写入文件、标准输出(stdout)、标准错误输出(stderr)和 socket

而连接器(Connectors)可以和多种多样的第三方系统进行交互,本文来讲解下。

02 Connectors

Connectors连接器参考:https://nightlies.apache.org/flink/flink-docs-release-1.12/zh/dev/connectors/

2.1 Flink目前支持的Connectors

Flink目前支持以下系统:

在使用一种连接器时,通常需要额外的第三方组件,比如:数据存储服务器或者消息队列,要注意这些列举的连接器是Flink 工程的一部分,包含在发布的源码中,但是不包含在二进制发行版中。

Flink 还有些一些额外的连接器通过 Apache Bahir 发布, 包括:

系统 使用地方
Apache ActiveMQ source/sink
Apache Flume sink
Redis sink
Akka sink
Netty source

2.2 JDBC案例

参考:https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/connectors/jdbc.html

代码如下:

/**
 * Connectors -JDBC
 *
 * @author : YangLinWei
 * @createTime: 2022/3/7 4:59 下午
 */
public class ConnectorsDemo {
    public static void main(String[] args) throws Exception {
        //1.env
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //2.Source
        env.fromElements(new Student(null, "tonyma111", 18))
                //3.Transformation
                //4.Sink
                .addSink(JdbcSink.sink(
                        "INSERT INTO `t_student` (`id`, `name`, `age`) VALUES (null, ?, ?)",
                        (ps, s) -> {
                            ps.setString(1, s.getName());
                            ps.setInt(2, s.getAge());
                        },
                        new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
                                .withUrl("jdbc:mysql://localhost:3306/big_data")
                                .withUsername("root")
                                .withPassword("123456")
                                .withDriverName("com.mysql.jdbc.Driver")
                                .build()));
        //5.execute
        env.execute();
    }
    @Data
    @NoArgsConstructor
    @AllArgsConstructor
    public static class Student {
        private Integer id;
        private String name;
        private Integer age;
    }
}

运行结果:

2.3 Kafa案例

参考:https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html

--------------Flink 里已经提供了一些绑定的Connector,例如kafka sourcesinkEs sink等。读写 kafkaesrabbitMQ时可以直接使用相应connectorapi即可,虽然该部分是 Flink项目源代码里的一部分,但是真正意义上不算作Flink引擎相关逻辑,并且该部分没有打包在二进制的发布包里面。所以在提交Job 时候需要注意, job代码jar包中一定要将相应的connetor相关类打包进去,否则在提交作业时就会失败,提示找不到相应的类,或初始化某些类异常

2.3.1 Kafa相关命令

● 查看当前服务器中的所有topic:

/export/server/kafka/bin/kafka-topics.sh --list --zookeeper  node1:2181
• 1

● 创建topic

/export/server/kafka/bin/kafka-topics.sh --create --zookeeper node1:2181 --replication-factor 2 --partitions 3 --topic flink_kafka
• 1

● 查看某个Topic的详情

/export/server/kafka/bin/kafka-topics.sh --topic flink_kafka --describe --zookeeper node1:2181
• 1

● 删除topic

/export/server/kafka/bin/kafka-topics.sh --delete --zookeeper node1:2181 --topic flink_kafka
• 1

● 通过shell命令发送消息

/export/server/kafka/bin/kafka-console-producer.sh --broker-list node1:9092 --topic flink_kafka
• 1

● 通过shell消费消息

/export/server/kafka/bin/kafka-console-consumer.sh --bootstrap-server node1:9092 --topic flink_kafka --from-beginning 
• 1

● 修改分区

/export/server/kafka/bin/kafka-topics.sh --alter --partitions 4 --topic flink_kafka --zookeeper node1:2181
• 1

2.3.2 Kafka Consumer代码

需求:使用flink-connector-kafka_2.12中的FlinkKafkaConsumer消费Kafka中的数据做WordCount

需要设置如下参数:

  • 1.订阅的主题
  • 2.反序列化规则
  • 3.消费者属性-集群地址
  • 4.消费者属性-消费者组id(如果不设置,会有默认的,但是默认的不方便管理)
  • 5.消费者属性-offset重置规则,如earliest/latest…
  • 6.动态分区检测(当kafka的分区数变化/增加时,Flink能够检测到!)
  • 7.如果没有设置Checkpoint,那么可以设置自动提交offset,后续学习了Checkpoint会把offset随着做Checkpoint的时候提交到Checkpoint和默认主题中
/**
 * KafkaConsumer
 *
 * @author : YangLinWei
 * @createTime: 2022/3/7 5:17 下午
 */
public class KafkaConsumer {
    public static void main(String[] args) throws Exception {
        //1.env
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //2.Source
        Properties props = new Properties();
        props.setProperty("bootstrap.servers", "node1:9092");
        props.setProperty("group.id", "flink");
        props.setProperty("auto.offset.reset", "latest");
        props.setProperty("flink.partition-discovery.interval-millis", "5000");//会开启一个后台线程每隔5s检测一下Kafka的分区情况
        props.setProperty("enable.auto.commit", "true");
        props.setProperty("auto.commit.interval.ms", "2000");
        //kafkaSource就是KafkaConsumer
        FlinkKafkaConsumer<String> kafkaSource = new FlinkKafkaConsumer<>("flink_kafka", new SimpleStringSchema(), props);
        kafkaSource.setStartFromGroupOffsets();//设置从记录的offset开始消费,如果没有记录从auto.offset.reset配置开始消费
        //kafkaSource.setStartFromEarliest();//设置直接从Earliest消费,和auto.offset.reset配置无关
        DataStreamSource<String> kafkaDS = env.addSource(kafkaSource);
        //3.Transformation
        //3.1切割并记为1
        SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOneDS = kafkaDS.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
                String[] words = value.split(" ");
                for (String word : words) {
                    out.collect(Tuple2.of(word, 1));
                }
            }
        });
        //3.2分组
        KeyedStream<Tuple2<String, Integer>, Tuple> groupedDS = wordAndOneDS.keyBy(0);
        //3.3聚合
        SingleOutputStreamOperator<Tuple2<String, Integer>> result = groupedDS.sum(1);
        //4.Sink
        result.print();
        //5.execute
        env.execute();
    }
}

2.3.3 Kafka Producer代码

需求:将Flink集合中的数据通过自定义Sink保存到Kafka

/**
 * KafkaProducer
 *
 * @author : YangLinWei
 * @createTime: 2022/3/7 5:19 下午
 */
public class KafkaProducer {
    public static void main(String[] args) throws Exception {
        //1.env
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //2.Source
        DataStreamSource<Student> studentDS = env.fromElements(new Student(1, "tonyma", 18));
        //3.Transformation
        //注意:目前来说我们使用Kafka使用的序列化和反序列化都是直接使用最简单的字符串,所以先将Student转为字符串
        //可以直接调用Student的toString,也可以转为JSON
        SingleOutputStreamOperator<String> jsonDS = studentDS.map(new MapFunction<Student, String>() {
            @Override
            public String map(Student value) throws Exception {
                //String str = value.toString();
                String jsonStr = JSON.toJSONString(value);
                return jsonStr;
            }
        });
        //4.Sink
        jsonDS.print();
        //根据参数创建KafkaProducer/KafkaSink
        Properties props = new Properties();
        props.setProperty("bootstrap.servers", "node1:9092");
        FlinkKafkaProducer<String> kafkaSink = new FlinkKafkaProducer<>("flink_kafka", new SimpleStringSchema(), props);
        jsonDS.addSink(kafkaSink);
        //5.execute
        env.execute();
        // /export/server/kafka/bin/kafka-console-consumer.sh --bootstrap-server node1:9092 --topic flink_kafka
    }
    @Data
    @NoArgsConstructor
    @AllArgsConstructor
    public static class Student {
        private Integer id;
        private String name;
        private Integer age;
    }
}

2.4 Redis案例

参考:https://bahir.apache.org/docs/flink/current/flink-streaming-redis/

2.4.1 相关API

通过flink 操作redis 其实我们可以通过传统的redis 连接池Jpoools 进行redis 的相关操作,但是flink提供了专门操作redisRedisSink,使用起来更方便,而且不用我们考虑性能的问题,接下来将主要介绍RedisSink 如何使用。

RedisSink 核心类是RedisMapper是一个接口,使用时我们要编写自己的redis操作类实现这个接口中的三个方法,如下所示:

  • getCommandDescription() : 设置使用的redis 数据结构类型,和key 的名称,通过RedisCommand 设置数据结构类型
  • String getKeyFromData(T data):设置value 中的键值对key的值
  • String getValueFromData(T data):设置value 中的键值对value的值

使用RedisCommand设置数据结构类型时和redis结构对应关系:

Data Type Redis Command [Sink]
HASH HSET
LIST RPUSH, LPUSH
SET SADD
PUBSUB PUBLISH
STRING SET
HYPER_LOG_LOG PFADD
SORTED_SET ZADD
SORTED_SET ZADD
SORTED_SET ZREM

2.4.2 示例代码

需求:将Flink集合中的数据通过自定义Sink保存到Redis

代码如下:

/**
 * Connector-Redis
 *
 * @author : YangLinWei
 * @createTime: 2022/3/7 5:25 下午
 */
public class ConnectorsDemoRedis {
    public static void main(String[] args) throws Exception {
        //1.env
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //2.Source
        DataStream<String> linesDS = env.socketTextStream("node1", 9999);
        //3.Transformation
        //3.1切割并记为1
        SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOneDS = linesDS.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
                String[] words = value.split(" ");
                for (String word : words) {
                    out.collect(Tuple2.of(word, 1));
                }
            }
        });
        //3.2分组
        KeyedStream<Tuple2<String, Integer>, Tuple> groupedDS = wordAndOneDS.keyBy(0);
        //3.3聚合
        SingleOutputStreamOperator<Tuple2<String, Integer>> result = groupedDS.sum(1);
        //4.Sink
        result.print();
        // * 最后将结果保存到Redis
        // * 注意:存储到Redis的数据结构:使用hash也就是map
        // * key            value
        // * WordCount      (单词,数量)
        //-1.创建RedisSink之前需要创建RedisConfig
        //连接单机版Redis
        FlinkJedisPoolConfig conf = new FlinkJedisPoolConfig.Builder().setHost("127.0.0.1").build();
        //连接集群版Redis
        //HashSet<InetSocketAddress> nodes = new HashSet<>(Arrays.asList(new InetSocketAddress(InetAddress.getByName("node1"), 6379),new InetSocketAddress(InetAddress.getByName("node2"), 6379),new InetSocketAddress(InetAddress.getByName("node3"), 6379)));
        //FlinkJedisClusterConfig conf2 = new FlinkJedisClusterConfig.Builder().setNodes(nodes).build();
        //连接哨兵版Redis
        //Set<String> sentinels = new HashSet<>(Arrays.asList("node1:26379", "node2:26379", "node3:26379"));
        //FlinkJedisSentinelConfig conf3 = new FlinkJedisSentinelConfig.Builder().setMasterName("mymaster").setSentinels(sentinels).build();
        //-3.创建并使用RedisSink
        result.addSink(new RedisSink<Tuple2<String, Integer>>(conf, new RedisWordCountMapper()));
        //5.execute
        env.execute();
    }
    /**
     * -2.定义一个Mapper用来指定存储到Redis中的数据结构
     */
    public static class RedisWordCountMapper implements RedisMapper<Tuple2<String, Integer>> {
        @Override
        public RedisCommandDescription getCommandDescription() {
            return new RedisCommandDescription(RedisCommand.HSET, "WordCount");
        }
        @Override
        public String getKeyFromData(Tuple2<String, Integer> data) {
            return data.f0;
        }
        @Override
        public String getValueFromData(Tuple2<String, Integer> data) {
            return data.f1.toString();
        }
    }
}

03 文末

本文主要讲解与Flink批流一体API相关的连接器Connectors(连接器),谢谢大家的阅读,本文完!

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
目录
相关文章
|
14天前
|
JSON API 数据安全/隐私保护
淘宝评论API接口操作步骤详解,代码示例参考
淘宝评论API接口是淘宝开放平台提供的一项服务,通过该接口,开发者可以访问商品的用户评价和评论。这些评论通常包括评分、文字描述、图片或视频等内容。商家可以利用这些信息更好地了解消费者的需求和偏好,优化产品和服务。同时,消费者也可以从这些评论中获得准确的购买参考,做出更明智的购买决策。
|
25天前
|
API Python
【Azure Developer】分享一段Python代码调用Graph API创建用户的示例
分享一段Python代码调用Graph API创建用户的示例
48 11
|
1月前
|
API 开发工具 开发者
探究亚马逊国际获得AMAZON商品详情 API 接口功能、作用与实际应用示例
亚马逊提供的Amazon Product Advertising API或Selling Partner API,使开发者能编程访问亚马逊商品数据,包括商品标题、描述、价格等。支持跨境电商和数据分析,提供商品搜索和详情获取等功能。示例代码展示了如何使用Python和boto3库获取特定商品信息。使用时需遵守亚马逊政策并注意可能产生的费用。
|
1月前
|
JSON API 数据安全/隐私保护
拍立淘按图搜索API接口返回数据的JSON格式示例
拍立淘按图搜索API接口允许用户通过上传图片来搜索相似的商品,该接口返回的通常是一个JSON格式的响应,其中包含了与上传图片相似的商品信息。以下是一个基于淘宝平台的拍立淘按图搜索API接口返回数据的JSON格式示例,同时提供对其关键字段的解释
|
1月前
|
JSON API 数据格式
Amazon商品详情API,json数据格式示例参考
亚马逊商品详情API接口返回的JSON数据格式通常包含丰富的商品信息,以下是一个简化的JSON数据格式示例参考
|
2月前
|
机器学习/深度学习 PyTorch 算法框架/工具
揭秘深度学习中的微调难题:如何运用弹性权重巩固(EWC)策略巧妙应对灾难性遗忘,附带实战代码详解助你轻松掌握技巧
【10月更文挑战第1天】深度学习中,模型微调虽能提升性能,但常导致“灾难性遗忘”,即模型在新任务上训练后遗忘旧知识。本文介绍弹性权重巩固(EWC)方法,通过在损失函数中加入正则项来惩罚对重要参数的更改,从而缓解此问题。提供了一个基于PyTorch的实现示例,展示如何在训练过程中引入EWC损失,适用于终身学习和在线学习等场景。
140 4
揭秘深度学习中的微调难题:如何运用弹性权重巩固(EWC)策略巧妙应对灾难性遗忘,附带实战代码详解助你轻松掌握技巧
|
1月前
|
JSON API 数据格式
店铺所有商品列表接口json数据格式示例(API接口)
当然,以下是一个示例的JSON数据格式,用于表示一个店铺所有商品列表的API接口响应
|
1月前
|
JSON API 数据库
电商拍立淘按图搜索API接口,数据格式示例
电商拍立淘按图搜索API接口系列为电商平台和购物应用提供了强大的图像搜索功能,以下是其文档说明的详细参考
|
1月前
|
JSON API 数据格式
携程API接口系列,酒店景点详情请求示例参考
携程API接口系列涵盖了酒店预订、机票预订、旅游度假产品预订、景点门票预订等多个领域,其中酒店和景点详情请求是较为常用的功能。以下提供酒店和景点详情请求的示例参考
|
2月前
|
JSON API 数据安全/隐私保护
拍立淘按图搜索json数据格式示例(API接口)
拍立淘按图搜索API接口为电商平台和购物应用提供了强大的图像搜索功能,能够显著提升用户的购物体验和搜索效率。开发者可以根据自己的需求调用此接口,并处理返回的JSON格式数据来展示推荐商品