2021年最新Flink读写Kafka数据——Flink数据写入Kafka+从Kafka存入Mysql(二)

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
简介: 2021年最新Flink读写Kafka数据——Flink数据写入Kafka+从Kafka存入Mysql(二)

前言

大家好,我是ChinaManor,直译过来就是中国码农的意思,我希望自己能成为国家复兴道路的铺路人,大数据领域的耕耘者,平凡但不甘于平庸的人。

这次是上篇文章的续集,最新的Flink版本大大简化了之前复杂的写法~

之前的文章

首先准备模拟数据:

//1、准备配置文件
        Properties props = new Properties();
        props.put("bootstrap.servers", "node1.itcast.cn:9092");
        props.put("acks", "all");
        props.put("retries", 0);
        props.put("batch.size", 16384);
        props.put("linger.ms", 1);
        props.put("buffer.memory", 33554432);
        props.put("KafkaCustomPartitioner.class", "test.KafkaCustomPartitioner");
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

Kafka的一系列配置,可以从官网直接copy过来@~@~

然后正式生产模拟数据:

//2、创建KafkaProducer
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(props);
        String[] categorys = {"女装", "男装", "图书", "家电", "洗护", "美妆", "运动", "游戏", "户外", "家具", "乐器", "办公"};
        Random random = new Random();
        while (true){
            //随机生成分类和金额
            int index = random.nextInt(categorys.length);//[0~length) ==> [0~length-1]
            String category = categorys[index];//获取的随机分类
            double price = random.nextDouble() * 100;//注意nextDouble生成的是[0~1)之间的随机数,*100之后表示[0~100)
            CategoryPojo categoryPojo = new CategoryPojo(category, price,System.currentTimeMillis());
            String data = JSON.toJSONString(categoryPojo);
            //3、发送数据
            kafkaProducer.send(new ProducerRecord<String, String>("topicDemo",data));
            System.out.println("数据是"+data);
            try {
                Thread.sleep(200);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

这里的实体类用Lombok,比较简单:

这是之前写的Lombok用法文章

@Data
    @AllArgsConstructor
    @NoArgsConstructor
    public static class CategoryPojo {
        private String category;//分类名称
        private double price;//该分类总销售额
        private long time;// 截止到当前时间的时间,本来应该是EventTime,但是我们这里简化了直接用当前系统时间即可
    }

有了数据写入Kafka,我们开始消费“她”:

设置一下Flink运行环境:

//TODO 1.设置环境env
         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
         //并行度为1,表示不分区
        env.setParallelism(1);

配置Kafka相关并从哪里开始读offset

//TODO 2设置Kafka相关参数
        Properties props = new Properties();
        //kafka的地址,消费组名
        props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.88.161:9092");
        props.setProperty(ConsumerConfig.GROUP_ID_CONFIG,"category");
        //Flink设置kafka的offset,从最新的开始
         FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>(
                "myDemo",
                new SimpleStringSchema(),
                props
        );
        consumer.setStartFromLatest();
        consumer.setCommitOffsetsOnCheckpoints(true);

第3步解析数据源并测试:

DataStreamSource<String> source = env.addSource(consumer);
         SingleOutputStreamOperator<Order> mapDS = source.map(new MapFunction<String, Order>() {
            @Override
            public Order map(String s) throws Exception {
                JSONObject jsonObject = JSON.parseObject(s);
                Order order = JSON.toJavaObject(jsonObject, Order.class);
                return order;
            }
        });
        //测试一下
        mapDS.print();

success!

最后存入Mysql

//sink输出到Mysql
        result.addSink(JdbcSink.sink(
                "INSERT INTO t_order(category,price,time) values(?,?,?)",
                (ps,order)->{
                    ps.setString(1,order.category);
                    ps.setDouble(2,order.price);
                    ps.setLong(3,order.time);
                },
                //批处理
                JdbcExecutionOptions.builder()
                        .withBatchSize(1000)
                        .withBatchIntervalMs(200)
                        .withMaxRetries(5)
                        .build(),
                 new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
                .withUrl("jdbc:mysql://192.168.88.163:3306/bigdata?characterEncoding=utf-8") //jdbc
                .withUsername("root")   //配置用户名
                .withPassword("123456") //密码
                .withDriverName("com.mysql.jdbc.Driver") //驱动类
                .build()
        ));
        env.execute();

以上就是全部内容了,感谢您的阅读!

另外补充一些不成熟的代码:双流Join

//双流Join
        SingleOutputStreamOperator<Order> order1watermark = mapDS.assignTimestampsAndWatermarks(new OrderItem1WaterMark());
        SingleOutputStreamOperator<Order> order2watermark = mapDS.assignTimestampsAndWatermarks(new OrderItem2WaterMark());
        //商品ID=订单ID
        final DataStream<Order> result = order1watermark.join(order2watermark)
                .where(o1 -> o1.category)
                .equalTo(o2 -> o2.category)
                .window(TumblingEventTimeWindows.of(Time.seconds(5)))
                .apply((o1, o2) -> {
                    Order order = new Order();
                    order.setCategory(o1.category);
                    order.setPrice(o2.price);
                    order.setTime(o2.time);
                    return order;
                });
//        result.print();

水印机制,简化了直接使用系统时间

//水印机制
    public static class OrderItem2WaterMark implements WatermarkStrategy<Order>{
        @Override
        public WatermarkGenerator<Order> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {
            return new WatermarkGenerator<Order>() {
                @Override
                public void onEvent(Order order, long l, WatermarkOutput watermarkOutput) {
                    watermarkOutput.emitWatermark(new Watermark(System.currentTimeMillis()));
                }
                @Override
                public void onPeriodicEmit(WatermarkOutput watermarkOutput) {
                    watermarkOutput.emitWatermark(new Watermark(System.currentTimeMillis()));
                }
            };
        }
        @Override
        public TimestampAssigner<Order> createTimestampAssigner(TimestampAssignerSupplier.Context context) {
            return (element,recordTimestamp)->System.currentTimeMillis();
        }
    }
    public static class OrderItem1WaterMark implements WatermarkStrategy<Order> {
        @Override
        public TimestampAssigner<Order> createTimestampAssigner(TimestampAssignerSupplier.Context context) {
            return (element, recordTimestamp) -> System.currentTimeMillis();
        }
        @Override
        public WatermarkGenerator<Order> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {
            return new WatermarkGenerator<Order>() {
                @Override
                public void onEvent(Order event, long eventTimestamp, WatermarkOutput output) {
                    output.emitWatermark(new Watermark(System.currentTimeMillis()));
            }
                @Override
                public void onPeriodicEmit(WatermarkOutput output) {
                    output.emitWatermark(new Watermark(System.currentTimeMillis()));
                }
            };
        }
    }

好了,终于完成了✅

双流join不怎么会写,慢慢来吧,

毕竟对于考60分的人,下一次考80分已经是极大的进步~~

总结

以上便是Flink数据写入Kafka+从Kafka存入Mysql(二)~

喜欢的小伙伴欢迎一键三连!!!

我是manor,一枚相信技术改变世界的码农,我们下期再见~


目录
相关文章
|
14天前
|
安全 关系型数据库 MySQL
如何将数据从MySQL同步到其他系统
【10月更文挑战第17天】如何将数据从MySQL同步到其他系统
85 0
|
20天前
|
SQL 前端开发 关系型数据库
全表数据核对 ,行数据核对,列数据核对,Mysql 8.0 实例(sample database classicmodels _No.3 )
全表数据核对 ,行数据核对,列数据核对,Mysql 8.0 实例(sample database classicmodels _No.3 )
36 0
全表数据核对 ,行数据核对,列数据核对,Mysql 8.0 实例(sample database classicmodels _No.3 )
|
10天前
|
SQL Java 关系型数据库
java连接mysql查询数据(基础版,无框架)
【10月更文挑战第12天】该示例展示了如何使用Java通过JDBC连接MySQL数据库并查询数据。首先在项目中引入`mysql-connector-java`依赖,然后通过`JdbcUtil`类中的`main`方法实现数据库连接、执行SQL查询及结果处理,最后关闭相关资源。
|
7天前
|
SQL 关系型数据库 MySQL
定时任务频繁插入数据导致锁表问题 -> 查询mysql进程
定时任务频繁插入数据导致锁表问题 -> 查询mysql进程
18 1
|
8天前
|
SQL 关系型数据库 MySQL
mysql数据误删后的数据回滚
【11月更文挑战第1天】本文介绍了四种恢复误删数据的方法:1. 使用事务回滚,通过 `pymysql` 库在 Python 中实现;2. 使用备份恢复,通过 `mysqldump` 命令备份和恢复数据;3. 使用二进制日志恢复,通过 `mysqlbinlog` 工具恢复特定位置的事件;4. 使用延迟复制从副本恢复,通过停止和重启从库复制来恢复数据。每种方法都有详细的步骤和示例代码。
|
20天前
|
消息中间件 存储 运维
为什么说Kafka还不是完美的实时数据通道
【10月更文挑战第19天】Kafka 虽然作为数据通道被广泛应用,但在实时性、数据一致性、性能及管理方面存在局限。数据延迟受消息堆积和分区再平衡影响;数据一致性难以达到恰好一次;性能瓶颈在于网络和磁盘I/O;管理复杂性涉及集群配置与版本升级。
|
20天前
|
存储 关系型数据库 MySQL
面试官:MySQL一次到底插入多少条数据合适啊?
本文探讨了数据库插入操作的基础知识、批量插入的优势与挑战,以及如何确定合适的插入数据量。通过面试对话的形式,详细解析了单条插入与批量插入的区别,磁盘I/O、内存使用、事务大小和锁策略等关键因素。最后,结合MyBatis框架,提供了实际应用中的批量插入策略和优化建议。希望读者不仅能掌握技术细节,还能理解背后的原理,从而更好地优化数据库性能。
|
29天前
|
消息中间件 Java Kafka
Flink-04 Flink Java 3分钟上手 FlinkKafkaConsumer消费Kafka数据 进行计算SingleOutputStreamOperatorDataStreamSource
Flink-04 Flink Java 3分钟上手 FlinkKafkaConsumer消费Kafka数据 进行计算SingleOutputStreamOperatorDataStreamSource
39 1
|
3月前
|
消息中间件 Java Kafka
Kafka不重复消费的终极秘籍!解锁幂等性、偏移量、去重神器,让你的数据流稳如老狗,告别数据混乱时代!
【8月更文挑战第24天】Apache Kafka作为一款领先的分布式流处理平台,凭借其卓越的高吞吐量与低延迟特性,在大数据处理领域中占据重要地位。然而,在利用Kafka进行数据处理时,如何有效避免重复消费成为众多开发者关注的焦点。本文深入探讨了Kafka中可能出现重复消费的原因,并提出了四种实用的解决方案:利用消息偏移量手动控制消费进度;启用幂等性生产者确保消息不被重复发送;在消费者端实施去重机制;以及借助Kafka的事务支持实现精确的一次性处理。通过这些方法,开发者可根据不同的应用场景灵活选择最适合的策略,从而保障数据处理的准确性和一致性。
219 9
|
3月前
|
消息中间件 负载均衡 Java
"Kafka核心机制揭秘:深入探索Producer的高效数据发布策略与Java实战应用"
【8月更文挑战第10天】Apache Kafka作为顶级分布式流处理平台,其Producer组件是数据高效发布的引擎。Producer遵循高吞吐、低延迟等设计原则,采用分批发送、异步处理及数据压缩等技术提升性能。它支持按消息键值分区,确保数据有序并实现负载均衡;提供多种确认机制保证可靠性;具备失败重试功能确保消息最终送达。Java示例展示了基本配置与消息发送流程,体现了Producer的强大与灵活性。
65 3