大数据-业务数据采集-FlinkCDC 读取 MySQL 数据存入 Kafka

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
简介: 大数据-业务数据采集-FlinkCDC 读取 MySQL 数据存入 Kafka

目录 作用
app 产生各层数据的 flink 任务
bean 数据对象
common 公共常量
utils 工具类

app.ods.FlinkCDC.java

package com.atguigu.app.ods;
import com.alibaba.ververica.cdc.connectors.mysql.MySQLSource;
import com.alibaba.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.alibaba.ververica.cdc.debezium.DebeziumSourceFunction;
import com.atguigu.app.function.CustomerDeserialization;
import com.atguigu.utils.MyKafkaUtil;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class FlinkCDC {
    public static void main(String[] args) throws Exception {
        //1.获取执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        //1.1 设置CK&状态后端
        //env.setStateBackend(new FsStateBackend("hdfs://hadoop102:8020/gmall-flink-210325/ck"));
        //env.enableCheckpointing(5000L);
        //env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
        //env.getCheckpointConfig().setCheckpointTimeout(10000L);
        //env.getCheckpointConfig().setMaxConcurrentCheckpoints(2);
        //env.getCheckpointConfig().setMinPauseBetweenCheckpoints(3000);
        //env.setRestartStrategy(RestartStrategies.fixedDelayRestart());
        //2.通过FlinkCDC构建SourceFunction并读取数据
        DebeziumSourceFunction<String> sourceFunction = MySQLSource.<String>builder()
                .hostname("hadoop102")
                .port(3306)
                .username("root")
                .password("000000")
                .databaseList("gmall-210325-flink")
                .deserializer(new CustomerDeserialization())
                .startupOptions(StartupOptions.latest())
                .build();
        DataStreamSource<String> streamSource = env.addSource(sourceFunction);
        //3.打印数据并将数据写入Kafka
        streamSource.print();
        String sinkTopic = "ods_base_db";
        streamSource.addSink(MyKafkaUtil.getKafkaProducer(sinkTopic));
        //4.启动任务
        env.execute("FlinkCDC");
    }
}

CustomerDeserialization

package com.atguigu.app.function;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.ververica.cdc.debezium.DebeziumDeserializationSchema;
import io.debezium.data.Envelope;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.util.Collector;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import java.util.List;
public class CustomerDeserialization implements DebeziumDeserializationSchema<String> {
    /**
     * 封装的数据格式
     * {
     * "database":"",
     * "tableName":"",
     * "before":{"id":"","tm_name":""....},
     * "after":{"id":"","tm_name":""....},
     * "type":"c u d",
     * //"ts":156456135615
     * }
     */
    @Override
    public void deserialize(SourceRecord sourceRecord, Collector<String> collector) throws Exception {
        //1.创建JSON对象用于存储最终数据
        JSONObject result = new JSONObject();
        //2.获取库名&表名
        String topic = sourceRecord.topic();
        String[] fields = topic.split("\\.");
        String database = fields[1];
        String tableName = fields[2];
        Struct value = (Struct) sourceRecord.value();
        //3.获取"before"数据
        Struct before = value.getStruct("before");
        JSONObject beforeJson = new JSONObject();
        if (before != null) {
            Schema beforeSchema = before.schema();
            List<Field> beforeFields = beforeSchema.fields();
            for (Field field : beforeFields) {
                Object beforeValue = before.get(field);
                beforeJson.put(field.name(), beforeValue);
            }
        }
        //4.获取"after"数据
        Struct after = value.getStruct("after");
        JSONObject afterJson = new JSONObject();
        if (after != null) {
            Schema afterSchema = after.schema();
            List<Field> afterFields = afterSchema.fields();
            for (Field field : afterFields) {
                Object afterValue = after.get(field);
                afterJson.put(field.name(), afterValue);
            }
        }
        //5.获取操作类型  CREATE UPDATE DELETE
        Envelope.Operation operation = Envelope.operationFor(sourceRecord);
        String type = operation.toString().toLowerCase();
        if ("create".equals(type)) {
            type = "insert";
        }
        //6.将字段写入JSON对象
        result.put("database", database);
        result.put("tableName", tableName);
        result.put("before", beforeJson);
        result.put("after", afterJson);
        result.put("type", type);
        //7.输出数据
        collector.collect(result.toJSONString());
    }
    @Override
    public TypeInformation<String> getProducedType() {
        return BasicTypeInfo.STRING_TYPE_INFO;
    }
}

MyKafkaUtil

package com.atguigu.utils;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import javax.annotation.Nullable;
import java.util.Properties;
public class MyKafkaUtil {
    private static String brokers = "hadoop102:9092,hadoop103:9092,hadoop104:9092";
    private static String default_topic = "DWD_DEFAULT_TOPIC";
    public static FlinkKafkaProducer<String> getKafkaProducer(String topic) {
        return new FlinkKafkaProducer<String>(brokers,
                topic,
                new SimpleStringSchema());
    }
    public static <T> FlinkKafkaProducer<T> getKafkaProducer(KafkaSerializationSchema<T> kafkaSerializationSchema) {
        Properties properties = new Properties();
        properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);
        return new FlinkKafkaProducer<T>(default_topic,
                kafkaSerializationSchema,
                properties,
                FlinkKafkaProducer.Semantic.EXACTLY_ONCE);
    }
    public static FlinkKafkaConsumer<String> getKafkaConsumer(String topic, String groupId) {
        Properties properties = new Properties();
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);
        return new FlinkKafkaConsumer<String>(topic,
                new SimpleStringSchema(),
                properties);
    }
    //拼接Kafka相关属性到DDL
    public static String getKafkaDDL(String topic, String groupId) {
        return  " 'connector' = 'kafka', " +
                " 'topic' = '" + topic + "'," +
                " 'properties.bootstrap.servers' = '" + brokers + "', " +
                " 'properties.group.id' = '" + groupId + "', " +
                " 'format' = 'json', " +
                " 'scan.startup.mode' = 'latest-offset'  ";
    }
}

尚硅谷 源代码

https://gitee.com/wh-alex/gmall-flink-210325

目录
相关文章
|
10天前
|
关系型数据库 MySQL 数据库
ORM对mysql数据库中数据进行操作报错解决
ORM对mysql数据库中数据进行操作报错解决
34 2
|
10天前
|
SQL 关系型数据库 MySQL
MySQL如何排查和删除重复数据
该文章介绍了在MySQL中如何排查和删除重复数据的方法,包括通过组合字段生成唯一标识符以及使用子查询和聚合函数来定位并删除重复记录的具体步骤。
27 2
|
1月前
|
canal 消息中间件 关系型数据库
Canal作为一款高效、可靠的数据同步工具,凭借其基于MySQL binlog的增量同步机制,在数据同步领域展现了强大的应用价值
【9月更文挑战第1天】Canal作为一款高效、可靠的数据同步工具,凭借其基于MySQL binlog的增量同步机制,在数据同步领域展现了强大的应用价值
207 4
|
2月前
|
关系型数据库 MySQL 数据库
RDS MySQL灾备服务协同解决方案构建问题之数据库备份数据的云上云下迁移如何解决
RDS MySQL灾备服务协同解决方案构建问题之数据库备份数据的云上云下迁移如何解决
|
5天前
|
消息中间件 canal 关系型数据库
Maxwell:binlog 解析器,轻松同步 MySQL 数据
Maxwell:binlog 解析器,轻松同步 MySQL 数据
38 11
|
3天前
|
关系型数据库 MySQL 数据库
MySQL的语法涵盖了数据定义、数据操作、数据查询和数据控制等多个方面
MySQL的语法涵盖了数据定义、数据操作、数据查询和数据控制等多个方面
17 5
|
10天前
|
关系型数据库 MySQL 数据库
Python MySQL查询返回字典类型数据的方法
通过使用 `mysql-connector-python`库并选择 `MySQLCursorDict`作为游标类型,您可以轻松地将MySQL查询结果以字典类型返回。这种方式提高了代码的可读性,使得数据操作更加直观和方便。上述步骤和示例代码展示了如何实现这一功能,希望对您的项目开发有所帮助。
33 4
|
18天前
|
存储 关系型数据库 MySQL
技术解析:MySQL中取最新一条重复数据的方法
以上提供的两种方法都可以有效地从MySQL数据库中提取每个类别最新的重复数据。选择哪种方法取决于具体的使用场景和MySQL版本。子查询加分组的方法兼容性更好,适用于所有版本的MySQL;而窗口函数方法代码更简洁,执行效率可能更高,但需要MySQL 8.0及以上版本。在实际应用中,应根据数据量大小、查询性能需求以及MySQL版本等因素综合考虑,选择最合适的实现方案。
89 6
|
18天前
|
关系型数据库 MySQL 数据处理
针对MySQL亿级数据的高效插入策略与性能优化技巧
在处理MySQL亿级数据的高效插入和性能优化时,以上提到的策略和技巧可以显著提升数据处理速度,减少系统负担,并保持数据的稳定性和一致性。正确实施这些策略需要深入理解MySQL的工作原理和业务需求,以便做出最适合的配置调整。
70 6
|
2月前
|
SQL 存储 缓存
MySQL是如何保证数据不丢失的?
文章详细阐述了InnoDB存储引擎中Buffer Pool与DML操作的关系。在执行插入、更新或删除操作时,InnoDB为了减少磁盘I/O,会在Buffer Pool中缓存数据页进行操作,随后将更新后的“脏页”刷新至磁盘。为防止服务宕机导致数据丢失,InnoDB采用了日志先行(WAL)机制,通过将DML操作记录为Redo Log并异步刷新到磁盘,结合双写机制和合理的日志刷新策略,确保数据的持久性和一致性。尽管如此,仍需合理配置参数以平衡性能与数据安全性。
MySQL是如何保证数据不丢失的?

热门文章

最新文章

下一篇
无影云桌面