[基础架构] [Flink] Flink/Flink-CDC代码实现业务接入

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
云数据库 RDS MySQL,集群版 2核4GB 100GB
推荐场景:
搭建个人博客
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
简介: [基础架构] [Flink] Flink/Flink-CDC代码实现业务接入

简介

DataStream 和 FlinkSQL 方式的对比
DataStream 在 Flink1.12 和 1.13 都可以用,而 FlinkSQL 只能在 Flink1.13 使用。
DataStream 可以同时监控多库多表,而 FlinkSQL 只能监控单表。

方法 / 步骤

一:进行编码

1.1 导入相关依赖

  <dependencies>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-java</artifactId>
      <version>1.12.0</version>
    </dependency>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-streaming-java_2.12</artifactId>
      <version>1.12.0</version>
    </dependency>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-clients_2.12</artifactId>
      <version>1.12.0</version>
    </dependency>
    <dependency>
      <groupId>org.apache.hadoop</groupId>
      <artifactId>hadoop-client</artifactId>
      <version>3.1.3</version>
    </dependency>
    <dependency>
      <groupId>mysql</groupId>
      <artifactId>mysql-connector-java</artifactId>
      <version>5.1.49</version>
    </dependency>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-table-planner-blink_2.12</artifactId>
      <version>1.12.0</version>
    </dependency>
    <dependency>
      <groupId>com.ververica</groupId>
      <artifactId>flink-connector-mysql-cdc</artifactId>
      <version>2.0.0</version>
    </dependency>
    <dependency>
      <groupId>com.alibaba</groupId>
      <artifactId>fastjson</artifactId>
      <version>1.2.75</version>
    </dependency>
  </dependencies>

  <build>
    <plugins>
      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <!-- 可以将依赖打到jar包中 -->
        <artifactId>maven-assembly-plugin</artifactId>
        <version>3.0.0</version>
        <configuration>
          <descriptorRefs>
            <descriptorRef>jar-with-dependencies</descriptorRef>
          </descriptorRefs>
        </configuration>
        <executions>
          <execution>
            <id>make-assembly</id>
            <phase>package</phase>
            <goals>
              <goal>single</goal>
            </goals>
          </execution>
        </executions>
      </plugin>
    </plugins>
  </build>

1.2 业务编码

1.2.1 入口类

import com.ververica.cdc.connectors.mysql.MySqlSource;
import com.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.ververica.cdc.debezium.DebeziumSourceFunction;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

/**
 * Description:
 *
 * @author: YangGC
 */
public class FlinkCDC2 {
    public static void main(String[] args) throws Exception {

        //1.获取Flink 执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        //1.1 开启 Checkpoint
        env.enableCheckpointing(5000);
        env.getCheckpointConfig().setCheckpointTimeout(10000);
        env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
        env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
//
//        env.setStateBackend(new FsStateBackend("hdfs://hadoop102:8020/cdc-test/ck"));

        //2.通过FlinkCDC构建SourceFunction
        DebeziumSourceFunction<String> sourceFunction = MySqlSource.<String>builder()
                .hostname("192.168.1.220")
                .port(3308)
                .username("root")
                .password("useradmin")
                //flinkcdc 下面的所有表
                .databaseList("flinkcdc.*")
//                .tableList("flinkcdc.user_info")
                //使用自定义的反序列化器
                .deserializer(new CustomerDeserializationSchema())
                .startupOptions(StartupOptions.initial())
                .build();
        DataStreamSource<String> dataStreamSource = env.addSource(sourceFunction);

        //3.数据打印
        dataStreamSource.print();

        //4.启动任务
        env.execute("FlinkCDC2");
    }

}

1.2.2 自定义反序列化器

import com.alibaba.fastjson.JSONObject;
import com.ververica.cdc.debezium.DebeziumDeserializationSchema;
import io.debezium.data.Envelope;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.util.Collector;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;

import java.util.List;


/**
 * 自定义反序列化器
 * Description:
 *
 * @author: YangGC
 */
public class CustomerDeserializationSchema implements DebeziumDeserializationSchema<String> {


    /**
     * {
     * "db":"",
     * "tableName":"",
     * "before":{"id":"1001","name":""...},
     * "after":{"id":"1001","name":""...},
     * "op":""
     * }
     */
    @Override
    public void deserialize(SourceRecord sourceRecord, Collector<String> collector) throws Exception {

        //创建JSON对象用于封装结果数据
        JSONObject result = new JSONObject();

        //获取库名&表名
        String topic = sourceRecord.topic();
        String[] fields = topic.split("\\.");
        result.put("db", fields[1]);
        result.put("tableName", fields[2]);

        //获取before数据
        Struct value = (Struct) sourceRecord.value();
        Struct before = value.getStruct("before");
        JSONObject beforeJson = new JSONObject();
        if (before != null) {
            //获取列信息
            Schema schema = before.schema();
            List<Field> fieldList = schema.fields();

            for (Field field : fieldList) {
                beforeJson.put(field.name(), before.get(field));
            }
        }
        result.put("before", beforeJson);

        //获取after数据
        Struct after = value.getStruct("after");
        JSONObject afterJson = new JSONObject();
        if (after != null) {
            //获取列信息
            Schema schema = after.schema();
            List<Field> fieldList = schema.fields();

            for (Field field : fieldList) {
                afterJson.put(field.name(), after.get(field));
            }
        }
        result.put("after", afterJson);

        //获取操作类型
        Envelope.Operation operation = Envelope.operationFor(sourceRecord);
        result.put("op", operation);

        //输出数据
        collector.collect(result.toJSONString());

    }

    @Override
    public TypeInformation<String> getProducedType() {
        return BasicTypeInfo.STRING_TYPE_INFO;
    }

1.3 业务打包

在这里插入图片描述
打包完成:
在这里插入图片描述

二:Flink 作业到任务面板

2.1 通过命令行添加任务

把cdc-connector-1.0-SNAPSHOT-jar-with-dependencies.jar 包上传到 到flink主目录
并运行下面命令行

# 主要是配置入口类 指定flink的运行地址
bin/flink run -m 127.0.0.1:8081 -c com.yanggc.cdc.FlinkCDC2  ./cdc-connector-1.0-SNAPSHOT-jar-with-dependencies.jar
  • 作业面板查看job正在运行

查看业务进行正常监控输出

2.2 上传Jar包进行任务添加

  • 添加相关参数

和命令行启动相关效果, 正常成功启动进行监控

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
目录
相关文章
|
20天前
|
Cloud Native PHP Docker
PHP 中的异常处理:提升代码的健壮性云原生时代的微服务架构实践
【7月更文挑战第31天】在PHP开发中,异常处理是确保应用程序稳定性和可靠性的关键。本文将引导您了解如何在PHP中实现有效的异常处理机制,通过实际代码示例展示如何捕获和处理异常,以及如何使用自定义异常类来增强错误管理的灵活性。我们将探索不同的异常处理策略,并讨论它们对提升代码质量的影响。 【7月更文挑战第31天】在数字化浪潮的推动下,云原生技术正成为企业转型的关键。本文将深入探讨如何在云平台上利用微服务架构实现敏捷开发和高效运维,通过具体的代码示例,揭示微服务与容器化部署的协同优势,同时指出在实施过程中可能遇到的挑战及应对策略。
22 1
|
26天前
|
供应链 负载均衡 数据库
软件架构一致性问题之分析代码修改的 Scalability如何解决
软件架构一致性问题之分析代码修改的 Scalability如何解决
29 1
|
1月前
|
移动开发 前端开发 架构师
前端架构师需要具备什么能力以及代码能力?
【7月更文挑战第17天】 前端架构师是技术、领导与管理的融合,需精通HTML/CSS/JS及React/Vue等框架,擅长工程化、跨平台开发与安全。他们设计高效架构,优化性能,领导团队,做技术选型,并持续学习分享,确保代码质量和团队成长。
62 7
|
29天前
|
监控 前端开发 UED
软件交付问题之架构让代码组织更有序,如何解决
软件交付问题之架构让代码组织更有序,如何解决
|
1月前
编程之路:从代码到架构的心路历程
【7月更文挑战第9天】在数字世界的迷宫中,每一行代码都承载着创造者的梦想与挑战。本文将通过个人技术感悟的镜头,探索编程实践的深层次价值,从最初的代码编写到复杂的系统架构设计,揭示技术成长的内在逻辑和情感变迁。我们将一同穿梭在技术的森林里,寻找那些让代码生动起来的秘密。
22 2
|
29天前
|
运维 Java Docker
业务系统架构实践问题之在某些情况下,将能力代码和业务逻辑严格分层可能是一个挑战问题如何解决
业务系统架构实践问题之在某些情况下,将能力代码和业务逻辑严格分层可能是一个挑战问题如何解决
|
29天前
|
SQL 索引
业务系统架构实践问题之想要再SQL代码中生成递增序列,那么步骤问题如何解决
业务系统架构实践问题之想要再SQL代码中生成递增序列,那么步骤问题如何解决
|
30天前
|
算法 Java 调度
高并发架构设计三大利器:缓存、限流和降级问题之使用Java代码实现令牌桶算法问题如何解决
高并发架构设计三大利器:缓存、限流和降级问题之使用Java代码实现令牌桶算法问题如何解决
|
30天前
|
缓存 算法 Java
高并发架构设计三大利器:缓存、限流和降级问题之使用代码实现漏桶算法问题如何解决
高并发架构设计三大利器:缓存、限流和降级问题之使用代码实现漏桶算法问题如何解决
|
30天前
业务系统架构实践问题之代码应该主要放在biz层还是domain层
业务系统架构实践问题之代码应该主要放在biz层还是domain层