[基础架构] [Flink] Flink/Flink-CDC代码实现业务接入

本文涉及的产品
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
实时计算 Flink 版,5000CU*H 3个月
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
简介: [基础架构] [Flink] Flink/Flink-CDC代码实现业务接入

简介

DataStream 和 FlinkSQL 方式的对比
DataStream 在 Flink1.12 和 1.13 都可以用,而 FlinkSQL 只能在 Flink1.13 使用。
DataStream 可以同时监控多库多表,而 FlinkSQL 只能监控单表。

方法 / 步骤

一:进行编码

1.1 导入相关依赖

  <dependencies>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-java</artifactId>
      <version>1.12.0</version>
    </dependency>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-streaming-java_2.12</artifactId>
      <version>1.12.0</version>
    </dependency>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-clients_2.12</artifactId>
      <version>1.12.0</version>
    </dependency>
    <dependency>
      <groupId>org.apache.hadoop</groupId>
      <artifactId>hadoop-client</artifactId>
      <version>3.1.3</version>
    </dependency>
    <dependency>
      <groupId>mysql</groupId>
      <artifactId>mysql-connector-java</artifactId>
      <version>5.1.49</version>
    </dependency>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-table-planner-blink_2.12</artifactId>
      <version>1.12.0</version>
    </dependency>
    <dependency>
      <groupId>com.ververica</groupId>
      <artifactId>flink-connector-mysql-cdc</artifactId>
      <version>2.0.0</version>
    </dependency>
    <dependency>
      <groupId>com.alibaba</groupId>
      <artifactId>fastjson</artifactId>
      <version>1.2.75</version>
    </dependency>
  </dependencies>

  <build>
    <plugins>
      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <!-- 可以将依赖打到jar包中 -->
        <artifactId>maven-assembly-plugin</artifactId>
        <version>3.0.0</version>
        <configuration>
          <descriptorRefs>
            <descriptorRef>jar-with-dependencies</descriptorRef>
          </descriptorRefs>
        </configuration>
        <executions>
          <execution>
            <id>make-assembly</id>
            <phase>package</phase>
            <goals>
              <goal>single</goal>
            </goals>
          </execution>
        </executions>
      </plugin>
    </plugins>
  </build>

1.2 业务编码

1.2.1 入口类

import com.ververica.cdc.connectors.mysql.MySqlSource;
import com.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.ververica.cdc.debezium.DebeziumSourceFunction;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

/**
 * Description:
 *
 * @author: YangGC
 */
public class FlinkCDC2 {
    public static void main(String[] args) throws Exception {

        //1.获取Flink 执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        //1.1 开启 Checkpoint
        env.enableCheckpointing(5000);
        env.getCheckpointConfig().setCheckpointTimeout(10000);
        env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
        env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
//
//        env.setStateBackend(new FsStateBackend("hdfs://hadoop102:8020/cdc-test/ck"));

        //2.通过FlinkCDC构建SourceFunction
        DebeziumSourceFunction<String> sourceFunction = MySqlSource.<String>builder()
                .hostname("192.168.1.220")
                .port(3308)
                .username("root")
                .password("useradmin")
                //flinkcdc 下面的所有表
                .databaseList("flinkcdc.*")
//                .tableList("flinkcdc.user_info")
                //使用自定义的反序列化器
                .deserializer(new CustomerDeserializationSchema())
                .startupOptions(StartupOptions.initial())
                .build();
        DataStreamSource<String> dataStreamSource = env.addSource(sourceFunction);

        //3.数据打印
        dataStreamSource.print();

        //4.启动任务
        env.execute("FlinkCDC2");
    }

}

1.2.2 自定义反序列化器

import com.alibaba.fastjson.JSONObject;
import com.ververica.cdc.debezium.DebeziumDeserializationSchema;
import io.debezium.data.Envelope;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.util.Collector;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;

import java.util.List;


/**
 * 自定义反序列化器
 * Description:
 *
 * @author: YangGC
 */
public class CustomerDeserializationSchema implements DebeziumDeserializationSchema<String> {


    /**
     * {
     * "db":"",
     * "tableName":"",
     * "before":{"id":"1001","name":""...},
     * "after":{"id":"1001","name":""...},
     * "op":""
     * }
     */
    @Override
    public void deserialize(SourceRecord sourceRecord, Collector<String> collector) throws Exception {

        //创建JSON对象用于封装结果数据
        JSONObject result = new JSONObject();

        //获取库名&表名
        String topic = sourceRecord.topic();
        String[] fields = topic.split("\\.");
        result.put("db", fields[1]);
        result.put("tableName", fields[2]);

        //获取before数据
        Struct value = (Struct) sourceRecord.value();
        Struct before = value.getStruct("before");
        JSONObject beforeJson = new JSONObject();
        if (before != null) {
            //获取列信息
            Schema schema = before.schema();
            List<Field> fieldList = schema.fields();

            for (Field field : fieldList) {
                beforeJson.put(field.name(), before.get(field));
            }
        }
        result.put("before", beforeJson);

        //获取after数据
        Struct after = value.getStruct("after");
        JSONObject afterJson = new JSONObject();
        if (after != null) {
            //获取列信息
            Schema schema = after.schema();
            List<Field> fieldList = schema.fields();

            for (Field field : fieldList) {
                afterJson.put(field.name(), after.get(field));
            }
        }
        result.put("after", afterJson);

        //获取操作类型
        Envelope.Operation operation = Envelope.operationFor(sourceRecord);
        result.put("op", operation);

        //输出数据
        collector.collect(result.toJSONString());

    }

    @Override
    public TypeInformation<String> getProducedType() {
        return BasicTypeInfo.STRING_TYPE_INFO;
    }

1.3 业务打包

在这里插入图片描述
打包完成:
在这里插入图片描述

二:Flink 作业到任务面板

2.1 通过命令行添加任务

把cdc-connector-1.0-SNAPSHOT-jar-with-dependencies.jar 包上传到 到flink主目录
并运行下面命令行

# 主要是配置入口类 指定flink的运行地址
bin/flink run -m 127.0.0.1:8081 -c com.yanggc.cdc.FlinkCDC2  ./cdc-connector-1.0-SNAPSHOT-jar-with-dependencies.jar
  • 作业面板查看job正在运行

查看业务进行正常监控输出

2.2 上传Jar包进行任务添加

  • 添加相关参数

和命令行启动相关效果, 正常成功启动进行监控

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
目录
相关文章
|
1月前
|
数据采集 机器学习/深度学习 大数据
行为检测代码(一):超详细介绍C3D架构训练+测试步骤
这篇文章详细介绍了C3D架构在行为检测领域的应用,包括训练和测试步骤,使用UCF101数据集进行演示。
44 1
行为检测代码(一):超详细介绍C3D架构训练+测试步骤
|
24天前
|
存储 安全 Java
系统安全架构的深度解析与实践:Java代码实现
【11月更文挑战第1天】系统安全架构是保护信息系统免受各种威胁和攻击的关键。作为系统架构师,设计一套完善的系统安全架构不仅需要对各种安全威胁有深入理解,还需要熟练掌握各种安全技术和工具。
68 10
|
1月前
|
机器学习/深度学习 网络架构 计算机视觉
目标检测笔记(一):不同模型的网络架构介绍和代码
这篇文章介绍了ShuffleNetV2网络架构及其代码实现,包括模型结构、代码细节和不同版本的模型。ShuffleNetV2是一个高效的卷积神经网络,适用于深度学习中的目标检测任务。
71 1
目标检测笔记(一):不同模型的网络架构介绍和代码
|
1月前
|
设计模式 人工智能 算法
编程之旅:从代码到架构的感悟
【9月更文挑战第33天】在编程的世界里,代码不仅是实现功能的工具,更是连接思想与现实的桥梁。本文将通过个人的编程经历,分享从编写第一行代码到设计系统架构的旅程,探索编程背后的哲学和技术演变。我们将一起思考,如何在代码的海洋中找到自己的航向,以及在这个过程中如何不断成长和适应变化。
|
1月前
|
机器学习/深度学习 大数据 PyTorch
行为检测(一):openpose、LSTM、TSN、C3D等架构实现或者开源代码总结
这篇文章总结了包括openpose、LSTM、TSN和C3D在内的几种行为检测架构的实现方法和开源代码资源。
43 0
|
2月前
|
机器学习/深度学习 测试技术 数据处理
KAN专家混合模型在高性能时间序列预测中的应用:RMoK模型架构探析与Python代码实验
Kolmogorov-Arnold网络(KAN)作为一种多层感知器(MLP)的替代方案,为深度学习领域带来新可能。尽管初期测试显示KAN在时间序列预测中的表现不佳,近期提出的可逆KAN混合模型(RMoK)显著提升了其性能。RMoK结合了Wav-KAN、JacobiKAN和TaylorKAN等多种专家层,通过门控网络动态选择最适合的专家层,从而灵活应对各种时间序列模式。实验结果显示,RMoK在多个数据集上表现出色,尤其是在长期预测任务中。未来研究将进一步探索RMoK在不同领域的应用潜力及其与其他先进技术的结合。
96 4
|
3月前
|
XML 开发框架 .NET
.NET框架:软件开发领域的瑞士军刀,如何让初学者变身代码艺术家——从基础架构到独特优势,一篇不可错过的深度解读。
【8月更文挑战第28天】.NET框架是由微软推出的统一开发平台,支持多种编程语言,简化应用程序的开发与部署。其核心组件包括公共语言运行库(CLR)和类库(FCL)。CLR负责内存管理、线程管理和异常处理等任务,确保代码稳定运行;FCL则提供了丰富的类和接口,涵盖网络、数据访问、安全性等多个领域,提高开发效率。此外,.NET框架还支持跨语言互操作,允许开发者使用C#、VB.NET等语言编写代码并无缝集成。这一框架凭借其强大的功能和广泛的社区支持,已成为软件开发领域的重要工具,适合初学者深入学习以奠定职业生涯基础。
102 1
|
3月前
|
设计模式 算法 PHP
深入理解PHP中的数组操作探索编程之美:从代码到架构的思维转变
【8月更文挑战第24天】在PHP编程中,数组是基础且强大的数据结构。本文将通过浅显易懂的方式,介绍如何在PHP中高效地操作数组,包括创建、遍历、排序和过滤等常见任务。无论你是初学者还是有经验的开发者,这篇文章都会带给你新的启示。 【8月更文挑战第24天】在编程的世界中,代码不仅仅是冰冷的字符排列,它承载着思想、解决问题的智慧和创新的灵魂。本文将通过个人的技术感悟,带领读者从编写单一功能的代码片段出发,逐步深入到整个软件架构的设计哲学,探索如何将代码块转化为高效、可维护和可扩展的系统。我们将一起见证,当代码与架构思维相结合时,如何引发技术实践的革命性飞跃。
|
3月前
|
前端开发 开发者 C#
WPF开发者必读:MVVM模式实战,轻松实现现代桌面应用架构,让你的代码更上一层楼!
【8月更文挑战第31天】在WPF应用程序开发中,MVVM(Model-View-ViewModel)模式通过分离应用程序的逻辑和界面,提高了代码的可维护性和可扩展性。本文介绍了MVVM模式的三个核心组件:Model(数据模型)、View(用户界面)和ViewModel(处理数据绑定和逻辑),并通过示例代码展示了如何在WPF项目中实现MVVM模式。通过这种方式,开发者可以构建更加高效和可扩展的桌面应用程序。
169 0
|
3月前
|
消息中间件 缓存 Java
如何优化大型Java后端系统的性能:从代码到架构
当面对大型Java后端系统时,性能优化不仅仅是简单地提高代码效率或硬件资源的投入,而是涉及到多层次的技术策略。本篇文章将从代码层面的优化到系统架构的调整,详细探讨如何通过多种方式来提升Java后端系统的性能。通过对常见问题的深入分析和实际案例的分享,我们将探索有效的性能优化策略,帮助开发者构建更高效、更可靠的后端系统。
下一篇
无影云桌面