[基础架构] [Flink] Flink/Flink-CDC代码实现业务接入

本文涉及的产品
实时计算 Flink 版,1000CU*H 3个月
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
RDS MySQL Serverless 高可用系列,价值2615元额度,1个月
简介: [基础架构] [Flink] Flink/Flink-CDC代码实现业务接入

简介

DataStream 和 FlinkSQL 方式的对比
DataStream 在 Flink1.12 和 1.13 都可以用,而 FlinkSQL 只能在 Flink1.13 使用。
DataStream 可以同时监控多库多表,而 FlinkSQL 只能监控单表。

方法 / 步骤

一:进行编码

1.1 导入相关依赖

  <dependencies>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-java</artifactId>
      <version>1.12.0</version>
    </dependency>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-streaming-java_2.12</artifactId>
      <version>1.12.0</version>
    </dependency>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-clients_2.12</artifactId>
      <version>1.12.0</version>
    </dependency>
    <dependency>
      <groupId>org.apache.hadoop</groupId>
      <artifactId>hadoop-client</artifactId>
      <version>3.1.3</version>
    </dependency>
    <dependency>
      <groupId>mysql</groupId>
      <artifactId>mysql-connector-java</artifactId>
      <version>5.1.49</version>
    </dependency>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-table-planner-blink_2.12</artifactId>
      <version>1.12.0</version>
    </dependency>
    <dependency>
      <groupId>com.ververica</groupId>
      <artifactId>flink-connector-mysql-cdc</artifactId>
      <version>2.0.0</version>
    </dependency>
    <dependency>
      <groupId>com.alibaba</groupId>
      <artifactId>fastjson</artifactId>
      <version>1.2.75</version>
    </dependency>
  </dependencies>

  <build>
    <plugins>
      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <!-- 可以将依赖打到jar包中 -->
        <artifactId>maven-assembly-plugin</artifactId>
        <version>3.0.0</version>
        <configuration>
          <descriptorRefs>
            <descriptorRef>jar-with-dependencies</descriptorRef>
          </descriptorRefs>
        </configuration>
        <executions>
          <execution>
            <id>make-assembly</id>
            <phase>package</phase>
            <goals>
              <goal>single</goal>
            </goals>
          </execution>
        </executions>
      </plugin>
    </plugins>
  </build>

1.2 业务编码

1.2.1 入口类

import com.ververica.cdc.connectors.mysql.MySqlSource;
import com.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.ververica.cdc.debezium.DebeziumSourceFunction;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

/**
 * Description:
 *
 * @author: YangGC
 */
public class FlinkCDC2 {
    public static void main(String[] args) throws Exception {

        //1.获取Flink 执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        //1.1 开启 Checkpoint
        env.enableCheckpointing(5000);
        env.getCheckpointConfig().setCheckpointTimeout(10000);
        env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
        env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
//
//        env.setStateBackend(new FsStateBackend("hdfs://hadoop102:8020/cdc-test/ck"));

        //2.通过FlinkCDC构建SourceFunction
        DebeziumSourceFunction<String> sourceFunction = MySqlSource.<String>builder()
                .hostname("192.168.1.220")
                .port(3308)
                .username("root")
                .password("useradmin")
                //flinkcdc 下面的所有表
                .databaseList("flinkcdc.*")
//                .tableList("flinkcdc.user_info")
                //使用自定义的反序列化器
                .deserializer(new CustomerDeserializationSchema())
                .startupOptions(StartupOptions.initial())
                .build();
        DataStreamSource<String> dataStreamSource = env.addSource(sourceFunction);

        //3.数据打印
        dataStreamSource.print();

        //4.启动任务
        env.execute("FlinkCDC2");
    }

}

1.2.2 自定义反序列化器

import com.alibaba.fastjson.JSONObject;
import com.ververica.cdc.debezium.DebeziumDeserializationSchema;
import io.debezium.data.Envelope;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.util.Collector;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;

import java.util.List;


/**
 * 自定义反序列化器
 * Description:
 *
 * @author: YangGC
 */
public class CustomerDeserializationSchema implements DebeziumDeserializationSchema<String> {


    /**
     * {
     * "db":"",
     * "tableName":"",
     * "before":{"id":"1001","name":""...},
     * "after":{"id":"1001","name":""...},
     * "op":""
     * }
     */
    @Override
    public void deserialize(SourceRecord sourceRecord, Collector<String> collector) throws Exception {

        //创建JSON对象用于封装结果数据
        JSONObject result = new JSONObject();

        //获取库名&表名
        String topic = sourceRecord.topic();
        String[] fields = topic.split("\\.");
        result.put("db", fields[1]);
        result.put("tableName", fields[2]);

        //获取before数据
        Struct value = (Struct) sourceRecord.value();
        Struct before = value.getStruct("before");
        JSONObject beforeJson = new JSONObject();
        if (before != null) {
            //获取列信息
            Schema schema = before.schema();
            List<Field> fieldList = schema.fields();

            for (Field field : fieldList) {
                beforeJson.put(field.name(), before.get(field));
            }
        }
        result.put("before", beforeJson);

        //获取after数据
        Struct after = value.getStruct("after");
        JSONObject afterJson = new JSONObject();
        if (after != null) {
            //获取列信息
            Schema schema = after.schema();
            List<Field> fieldList = schema.fields();

            for (Field field : fieldList) {
                afterJson.put(field.name(), after.get(field));
            }
        }
        result.put("after", afterJson);

        //获取操作类型
        Envelope.Operation operation = Envelope.operationFor(sourceRecord);
        result.put("op", operation);

        //输出数据
        collector.collect(result.toJSONString());

    }

    @Override
    public TypeInformation<String> getProducedType() {
        return BasicTypeInfo.STRING_TYPE_INFO;
    }

1.3 业务打包

在这里插入图片描述
打包完成:
在这里插入图片描述

二:Flink 作业到任务面板

2.1 通过命令行添加任务

把cdc-connector-1.0-SNAPSHOT-jar-with-dependencies.jar 包上传到 到flink主目录
并运行下面命令行

# 主要是配置入口类 指定flink的运行地址
bin/flink run -m 127.0.0.1:8081 -c com.yanggc.cdc.FlinkCDC2  ./cdc-connector-1.0-SNAPSHOT-jar-with-dependencies.jar
  • 作业面板查看job正在运行

查看业务进行正常监控输出

2.2 上传Jar包进行任务添加

  • 添加相关参数

和命令行启动相关效果, 正常成功启动进行监控

相关实践学习
基于Hologres+Flink搭建GitHub实时数据大屏
通过使用Flink、Hologres构建实时数仓,并通过Hologres对接BI分析工具(以DataV为例),实现海量数据实时分析.
实时计算 Flink 实战课程
如何使用实时计算 Flink 搞定数据处理难题?实时计算 Flink 极客训练营产品、技术专家齐上阵,从开源 Flink功能介绍到实时计算 Flink 优势详解,现场实操,5天即可上手! 欢迎开通实时计算 Flink 版: https://cn.aliyun.com/product/bigdata/sc Flink Forward Asia 介绍: Flink Forward 是由 Apache 官方授权,Apache Flink Community China 支持的会议,通过参会不仅可以了解到 Flink 社区的最新动态和发展计划,还可以了解到国内外一线大厂围绕 Flink 生态的生产实践经验,是 Flink 开发者和使用者不可错过的盛会。 去年经过品牌升级后的 Flink Forward Asia 吸引了超过2000人线下参与,一举成为国内最大的 Apache 顶级项目会议。结合2020年的特殊情况,Flink Forward Asia 2020 将在12月26日以线上峰会的形式与大家见面。
目录
相关文章
|
2月前
|
SQL 前端开发 关系型数据库
如何开发一套研发项目管理系统?(附架构图+流程图+代码参考)
研发项目管理系统助力企业实现需求、缺陷与变更的全流程管理,支持看板可视化、数据化决策与成本优化。系统以MVP模式快速上线,核心功能包括需求看板、缺陷闭环、自动日报及关键指标分析,助力中小企业提升交付效率与协作质量。
|
2月前
|
JSON 文字识别 BI
如何开发车辆管理系统中的加油管理板块(附架构图+流程图+代码参考)
本文针对中小企业在车辆加油管理中常见的单据混乱、油卡管理困难、对账困难等问题,提出了一套完整的系统化解决方案。内容涵盖车辆管理系统(VMS)的核心功能、加油管理模块的设计要点、数据库模型、系统架构、关键业务流程、API设计与实现示例、前端展示参考(React + Antd)、开发技巧与工程化建议等。通过构建加油管理系统,企业可实现燃油费用的透明化、自动化对账、异常检测与数据分析,从而降低运营成本、提升管理效率。适合希望通过技术手段优化车辆管理的企业技术人员与管理者参考。
|
2月前
|
消息中间件 缓存 JavaScript
如何开发ERP(离散制造-MTO)系统中的生产管理板块(附架构图+流程图+代码参考)
本文详解离散制造MTO模式下的ERP生产管理模块,涵盖核心问题、系统架构、关键流程、开发技巧及数据库设计,助力企业打通计划与执行“最后一公里”,提升交付率、降低库存与浪费。
|
1月前
|
前端开发 JavaScript BI
如何开发车辆管理系统中的车务管理板块(附架构图+流程图+代码参考)
本文介绍了中小企业如何通过车务管理模块提升车辆管理效率。许多企业在管理车辆时仍依赖人工流程,导致违章处理延误、年检过期、维修费用虚高等问题频发。将这些流程数字化,可显著降低合规风险、提升维修追溯性、优化调度与资产利用率。文章详细介绍了车务管理模块的功能清单、数据模型、系统架构、API与前端设计、开发技巧与落地建议,以及实现效果与验收标准。同时提供了数据库建表SQL、后端Node.js/TypeScript代码示例与前端React表单设计参考,帮助企业快速搭建并上线系统,实现合规与成本控制的双重优化。
|
2月前
|
消息中间件 JavaScript 前端开发
如何开发ERP(离散制造-MTO)系统中的技术管理板块(附架构图+流程图+代码参考)
本文详解ERP(离散制造-MTO)系统中的技术管理板块,涵盖产品定义、BOM、工序、工艺文件及变更控制的结构化与系统化管理。内容包括技术管理的核心目标、总体架构、关键组件、业务流程、开发技巧与最佳实践,并提供完整的参考代码,助力企业将技术数据转化为可执行的生产指令,提升制造效率与质量。
|
2月前
|
消息中间件 JavaScript 关系型数据库
如何开发一套ERP(离散制造-MTO)系统(附架构图+流程图+代码参考)
本文介绍了面向离散制造-MTO(按订单生产)模式的ERP系统设计与实现方法。内容涵盖ERP系统定义、总体架构设计、主要功能模块解析、关键业务流程(订单到交付、BOM展开、MRP逻辑、排产等)、开发技巧(DDD、微服务、事件驱动)、参考代码示例、部署上线注意事项及实施效果评估。旨在帮助企业与开发团队构建高效、灵活、可扩展的ERP系统,提升订单交付能力与客户满意度。
|
2月前
|
机器学习/深度学习 人工智能 搜索推荐
从零构建短视频推荐系统:双塔算法架构解析与代码实现
短视频推荐看似“读心”,实则依赖双塔推荐系统:用户塔与物品塔分别将行为与内容编码为向量,通过相似度匹配实现精准推送。本文解析其架构原理、技术实现与工程挑战,揭秘抖音等平台如何用AI抓住你的注意力。
480 7
从零构建短视频推荐系统:双塔算法架构解析与代码实现
|
2月前
|
监控 供应链 前端开发
如何开发ERP(离散制造-MTO)系统中的财务管理板块(附架构图+流程图+代码参考)
本文详解离散制造MTO企业ERP系统中财务管理模块的搭建,聚焦应收账款与应付账款管理,涵盖核心功能、业务流程、开发技巧及Python代码示例,助力企业实现财务数据准确、实时可控,提升现金流管理能力。
|
2月前
|
供应链 监控 JavaScript
如何开发ERP(离散制造-MTO)系统中的库存管理板块(附架构图+流程图+代码参考)
本文详解MTO模式下ERP库存管理的关键作用,涵盖核心模块、业务流程、开发技巧与代码示例,助力制造企业提升库存周转率、降低缺货风险,实现高效精准的库存管控。
|
2月前
|
前端开发 API 定位技术
如何开发车辆管理系统中的用车申请板块(附架构图+流程图+代码参考)
本文详细解析了如何将传统纸质车辆管理流程数字化,涵盖业务规则、审批流、调度决策及数据留痕等核心环节。内容包括用车申请模块的价值定位、系统架构设计、数据模型构建、前端表单实现及后端开发技巧,助力企业打造可落地、易扩展的车辆管理系统。

热门文章

最新文章