2021年最新Flink读写Kafka数据——Flink数据写入Kafka+从Kafka存入Mysql(二)

本文涉及的产品
RDS MySQL DuckDB 分析主实例,集群系列 4核8GB
RDS DuckDB + QuickBI 企业套餐,8核32GB + QuickBI 专业版
RDS MySQL DuckDB 分析主实例,基础系列 4核8GB
简介: 2021年最新Flink读写Kafka数据——Flink数据写入Kafka+从Kafka存入Mysql(二)

前言

大家好,我是ChinaManor,直译过来就是中国码农的意思,我希望自己能成为国家复兴道路的铺路人,大数据领域的耕耘者,平凡但不甘于平庸的人。

这次是上篇文章的续集,最新的Flink版本大大简化了之前复杂的写法~

之前的文章

首先准备模拟数据:

//1、准备配置文件
        Properties props = new Properties();
        props.put("bootstrap.servers", "node1.itcast.cn:9092");
        props.put("acks", "all");
        props.put("retries", 0);
        props.put("batch.size", 16384);
        props.put("linger.ms", 1);
        props.put("buffer.memory", 33554432);
        props.put("KafkaCustomPartitioner.class", "test.KafkaCustomPartitioner");
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

Kafka的一系列配置,可以从官网直接copy过来@~@~

然后正式生产模拟数据:

//2、创建KafkaProducer
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(props);
        String[] categorys = {"女装", "男装", "图书", "家电", "洗护", "美妆", "运动", "游戏", "户外", "家具", "乐器", "办公"};
        Random random = new Random();
        while (true){
            //随机生成分类和金额
            int index = random.nextInt(categorys.length);//[0~length) ==> [0~length-1]
            String category = categorys[index];//获取的随机分类
            double price = random.nextDouble() * 100;//注意nextDouble生成的是[0~1)之间的随机数,*100之后表示[0~100)
            CategoryPojo categoryPojo = new CategoryPojo(category, price,System.currentTimeMillis());
            String data = JSON.toJSONString(categoryPojo);
            //3、发送数据
            kafkaProducer.send(new ProducerRecord<String, String>("topicDemo",data));
            System.out.println("数据是"+data);
            try {
                Thread.sleep(200);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

这里的实体类用Lombok,比较简单:

这是之前写的Lombok用法文章

@Data
    @AllArgsConstructor
    @NoArgsConstructor
    public static class CategoryPojo {
        private String category;//分类名称
        private double price;//该分类总销售额
        private long time;// 截止到当前时间的时间,本来应该是EventTime,但是我们这里简化了直接用当前系统时间即可
    }

有了数据写入Kafka,我们开始消费“她”:

设置一下Flink运行环境:

//TODO 1.设置环境env
         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
         //并行度为1,表示不分区
        env.setParallelism(1);

配置Kafka相关并从哪里开始读offset

//TODO 2设置Kafka相关参数
        Properties props = new Properties();
        //kafka的地址,消费组名
        props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.88.161:9092");
        props.setProperty(ConsumerConfig.GROUP_ID_CONFIG,"category");
        //Flink设置kafka的offset,从最新的开始
         FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>(
                "myDemo",
                new SimpleStringSchema(),
                props
        );
        consumer.setStartFromLatest();
        consumer.setCommitOffsetsOnCheckpoints(true);

第3步解析数据源并测试:

DataStreamSource<String> source = env.addSource(consumer);
         SingleOutputStreamOperator<Order> mapDS = source.map(new MapFunction<String, Order>() {
            @Override
            public Order map(String s) throws Exception {
                JSONObject jsonObject = JSON.parseObject(s);
                Order order = JSON.toJavaObject(jsonObject, Order.class);
                return order;
            }
        });
        //测试一下
        mapDS.print();

success!

最后存入Mysql

//sink输出到Mysql
        result.addSink(JdbcSink.sink(
                "INSERT INTO t_order(category,price,time) values(?,?,?)",
                (ps,order)->{
                    ps.setString(1,order.category);
                    ps.setDouble(2,order.price);
                    ps.setLong(3,order.time);
                },
                //批处理
                JdbcExecutionOptions.builder()
                        .withBatchSize(1000)
                        .withBatchIntervalMs(200)
                        .withMaxRetries(5)
                        .build(),
                 new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
                .withUrl("jdbc:mysql://192.168.88.163:3306/bigdata?characterEncoding=utf-8") //jdbc
                .withUsername("root")   //配置用户名
                .withPassword("123456") //密码
                .withDriverName("com.mysql.jdbc.Driver") //驱动类
                .build()
        ));
        env.execute();

以上就是全部内容了,感谢您的阅读!

另外补充一些不成熟的代码:双流Join

//双流Join
        SingleOutputStreamOperator<Order> order1watermark = mapDS.assignTimestampsAndWatermarks(new OrderItem1WaterMark());
        SingleOutputStreamOperator<Order> order2watermark = mapDS.assignTimestampsAndWatermarks(new OrderItem2WaterMark());
        //商品ID=订单ID
        final DataStream<Order> result = order1watermark.join(order2watermark)
                .where(o1 -> o1.category)
                .equalTo(o2 -> o2.category)
                .window(TumblingEventTimeWindows.of(Time.seconds(5)))
                .apply((o1, o2) -> {
                    Order order = new Order();
                    order.setCategory(o1.category);
                    order.setPrice(o2.price);
                    order.setTime(o2.time);
                    return order;
                });
//        result.print();

水印机制,简化了直接使用系统时间

//水印机制
    public static class OrderItem2WaterMark implements WatermarkStrategy<Order>{
        @Override
        public WatermarkGenerator<Order> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {
            return new WatermarkGenerator<Order>() {
                @Override
                public void onEvent(Order order, long l, WatermarkOutput watermarkOutput) {
                    watermarkOutput.emitWatermark(new Watermark(System.currentTimeMillis()));
                }
                @Override
                public void onPeriodicEmit(WatermarkOutput watermarkOutput) {
                    watermarkOutput.emitWatermark(new Watermark(System.currentTimeMillis()));
                }
            };
        }
        @Override
        public TimestampAssigner<Order> createTimestampAssigner(TimestampAssignerSupplier.Context context) {
            return (element,recordTimestamp)->System.currentTimeMillis();
        }
    }
    public static class OrderItem1WaterMark implements WatermarkStrategy<Order> {
        @Override
        public TimestampAssigner<Order> createTimestampAssigner(TimestampAssignerSupplier.Context context) {
            return (element, recordTimestamp) -> System.currentTimeMillis();
        }
        @Override
        public WatermarkGenerator<Order> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {
            return new WatermarkGenerator<Order>() {
                @Override
                public void onEvent(Order event, long eventTimestamp, WatermarkOutput output) {
                    output.emitWatermark(new Watermark(System.currentTimeMillis()));
            }
                @Override
                public void onPeriodicEmit(WatermarkOutput output) {
                    output.emitWatermark(new Watermark(System.currentTimeMillis()));
                }
            };
        }
    }

好了,终于完成了✅

双流join不怎么会写,慢慢来吧,

毕竟对于考60分的人,下一次考80分已经是极大的进步~~

总结

以上便是Flink数据写入Kafka+从Kafka存入Mysql(二)~

喜欢的小伙伴欢迎一键三连!!!

我是manor,一枚相信技术改变世界的码农,我们下期再见~


目录
相关文章
消息中间件 存储 传感器
429 0
|
11月前
|
消息中间件 SQL 关系型数据库
Flink CDC + Kafka 加速业务实时化
Flink CDC 是一种支持流批一体的分布式数据集成工具,通过 YAML 配置实现数据传输过程中的路由与转换操作。它已从单一数据源的 CDC 数据流发展为完整的数据同步解决方案,支持 MySQL、Kafka 等多种数据源和目标端(如 Delta Lake、Iceberg)。其核心功能包括多样化数据输入链路、Schema Evolution、Transform 和 Routing 模块,以及丰富的监控指标。相比传统 SQL 和 DataStream 作业,Flink CDC 提供更灵活的 Schema 变更控制和原始 binlog 同步能力。
|
12月前
|
消息中间件 运维 Kafka
直播预告|Kafka+Flink 双引擎实战:手把手带你搭建分布式实时分析平台!
直播预告|Kafka+Flink 双引擎实战:手把手带你搭建分布式实时分析平台!
326 12
|
12月前
|
消息中间件 运维 Kafka
直播预告|Kafka+Flink双引擎实战:手把手带你搭建分布式实时分析平台!
在数字化转型中,企业亟需从海量数据中快速提取价值并转化为业务增长动力。5月15日19:00-21:00,阿里云三位技术专家将讲解Kafka与Flink的强强联合方案,帮助企业零门槛构建分布式实时分析平台。此组合广泛应用于实时风控、用户行为追踪等场景,具备高吞吐、弹性扩缩容及亚秒级响应优势。直播适合初学者、开发者和数据工程师,参与还有机会领取定制好礼!扫描海报二维码或点击链接预约直播:[https://developer.aliyun.com/live/255088](https://developer.aliyun.com/live/255088)
688 35
直播预告|Kafka+Flink双引擎实战:手把手带你搭建分布式实时分析平台!
|
消息中间件 关系型数据库 MySQL
基于 Flink CDC YAML 的 MySQL 到 Kafka 流式数据集成
基于 Flink CDC YAML 的 MySQL 到 Kafka 流式数据集成
1365 0
|
消息中间件 关系型数据库 MySQL
基于 Flink CDC YAML 的 MySQL 到 Kafka 流式数据集成
本教程展示如何使用Flink CDC YAML快速构建从MySQL到Kafka的流式数据集成作业,涵盖整库同步和表结构变更同步。无需编写Java/Scala代码或安装IDE,所有操作在Flink CDC CLI中完成。首先准备Flink Standalone集群和Docker环境(包括MySQL、Kafka和Zookeeper),然后通过配置YAML文件提交任务,实现数据同步。教程还介绍了路由变更、写入多个分区、输出格式设置及上游表名到下游Topic的映射等功能,并提供详细的命令和示例。最后,包含环境清理步骤以确保资源释放。
1064 2
基于 Flink CDC YAML 的 MySQL 到 Kafka 流式数据集成
|
8月前
|
关系型数据库 MySQL 分布式数据库
阿里云PolarDB云原生数据库收费价格:MySQL和PostgreSQL详细介绍
阿里云PolarDB兼容MySQL、PostgreSQL及Oracle语法,支持集中式与分布式架构。标准版2核4G年费1116元起,企业版最高性能达4核16G,支持HTAP与多级高可用,广泛应用于金融、政务、互联网等领域,TCO成本降低50%。
|
8月前
|
SQL 关系型数据库 MySQL
Mysql数据恢复—Mysql数据库delete删除后数据恢复案例
本地服务器,操作系统为windows server。服务器上部署mysql单实例,innodb引擎,独立表空间。未进行数据库备份,未开启binlog。 人为误操作使用Delete命令删除数据时未添加where子句,导致全表数据被删除。删除后未对该表进行任何操作。需要恢复误删除的数据。 在本案例中的mysql数据库未进行备份,也未开启binlog日志,无法直接还原数据库。
|
8月前
|
关系型数据库 MySQL 数据库
阿里云数据库RDS费用价格:MySQL、SQL Server、PostgreSQL和MariaDB引擎收费标准
阿里云RDS数据库支持MySQL、SQL Server、PostgreSQL、MariaDB,多种引擎优惠上线!MySQL倚天版88元/年,SQL Server 2核4G仅299元/年,PostgreSQL 227元/年起。高可用、可弹性伸缩,安全稳定。详情见官网活动页。
1281 152
|
8月前
|
关系型数据库 分布式数据库 数据库
阿里云数据库收费价格:MySQL、PostgreSQL、SQL Server和MariaDB引擎费用整理
阿里云数据库提供多种类型,包括关系型与NoSQL,主流如PolarDB、RDS MySQL/PostgreSQL、Redis等。价格低至21元/月起,支持按需付费与优惠套餐,适用于各类应用场景。

推荐镜像

更多