DataX 概述、部署、数据同步运用示例

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
检索分析服务 Elasticsearch 版,2核4GB开发者规格 1个月
大数据开发治理平台 DataWorks,不限时长
简介: DataX是阿里巴巴开源的离线数据同步工具,支持多种数据源之间的高效传输。其特点是多数据源支持、可扩展性、灵活配置、高效传输、任务调度监控和活跃的开源社区支持。DataX通过Reader和Writer插件实现数据源的读取和写入,采用Framework+plugin架构。部署简单,解压即可用。示例展示了如何配置DataX同步MySQL到HDFS,并提供了速度和内存优化建议。此外,还解决了NULL值同步问题及配置文件变量传参的方法。

@[toc]

什么是 DataX?

DataX 是阿里巴巴集团开源的、通用的数据抽取工具,广泛使用的离线数据同步工具/平台。它设计用于支持多种数据源之间的高效数据传输,可以实现不同数据源之间的数据同步、迁移、ETL(抽取、转换、加载)等数据操作。



主要特点和功能包括:

  1. 多数据源支持:DataX支持多种数据源,包括关系型数据库(如 MySQL、Oracle、SQL Server)、NoSQL 数据库(如 MongoDB、HBase、Redis)、HDFS、FTP、Hive 等。

  2. 扩展性:DataX具有良好的扩展性,可以通过编写插件来支持新的数据源或者数据目的地,以满足不同数据存储系统的需求。

  3. 灵活配置:通过配置文件,用户可以灵活地定义数据抽取任务,包括数据源、目的地、字段映射、转换规则等,以适应不同的数据处理需求。

  4. 高效传输:DataX 采用分布式、并行的数据传输策略,可以将数据以高效、快速的方式传输到目标数据源,提高数据处理的效率。

  5. 任务调度和监控:DataX 提供了任务调度和监控的功能,可以通过控制台查看任务运行情况,监控任务的健康状态,以及处理异常情况。

  6. 开源社区支持:DataX 是开源项目,拥有活跃的开源社区支持,可以获取到丰富的文档、示例和开发者社区的帮助。

image.png

DataX 设计框架

DataX 本身作为数据同步框架,将不同数据源的同步抽象为从源头数据源读取数据的 Reader 插件,以及向目标端写入数据的 Writer 插件,理论上 DataX 框架可以支持任意数据源类型的数据同步工作。同时 DataX 插件体系作为一套生态系统, 每接入一套新数据源该新加入的数据源即可实现和现有的数据源互通。



DataX 本身作为离线数据同步框架,采用 Framework + plugin 架构构建。将数据源读取和写入抽象成为 Reader/Writer 插件,纳入到整个同步框架中。

  • Reader:Reader 为数据采集模块,负责采集数据源的数据,将数据发送给Framework。

  • Writer: Writer 为数据写入模块,负责不断向 Framework 取数据,并将数据写入到目的端。

  • Framework:Framework 用于连接 readerwriter,作为两者的数据传输通道,并处理缓冲,流控,并发,数据转换等核心技术问题。

DataX 核心架构

  1. DataX 完成单个数据同步的作业,我们称之为 Job,DataX 接受到一个 Job 之后,将启动一个进程来完成整个作业同步过程。DataX Job 模块是单个作业的中枢管理节点,承担了数据清理、子任务切分(将单一作业计算转化为多个子Task)、TaskGroup管理等功能。

  2. DataXJob 启动后,会根据不同的源端切分策略,将 Job 切分成多个小的 Task(子任务),以便于并发执行。Task 便是 DataX 作业的最小单元,每一个 Task 都会负责一部分数据的同步工作。

  3. 切分多个 Task 之后,DataX Job 会调用 Scheduler 模块,根据配置的并发数据量,将拆分成的 Task 重新组合,组装成 TaskGroup(任务组)。

  4. 每一个 TaskGroup 负责以一定的并发运行完毕分配好的所有 Task,默认单个任务组的并发数量为 5

  1. DataX 作业运行起来之后, Job 监控并等待多个 TaskGroup 模块任务完成,等待所有 TaskGroup 任务完成后 Job 成功退出。否则,异常退出,进程退出值非 0


DataX 部署

DataX 3.0 下载地址

1. 解压压缩包

tar -zxvf datax.tar.gz -C /opt/module/

2. 配置环境变量

sudo vim /etc/profile.d/my.sh

# 添加如下内容:
#DATAX_HOME
export DATAX_HOME=/opt/module/datax

刷新环境变量:source /etc/profile.d/my.sh

3. 运行测试程序

运行 DataX 自带的测试程序。

cd $DATAX_HOME

python bin/datax.py job/job.json

其中 datax.py 是执行的主程序,job.json 中存储的是数据源插件配置(其中有数据源的连接信息、存储信息、配置信息等)。

正常运行完成后,会看到输出相关日志内容:

image.png

DataX 无需进行其它配置,解压后即可快速使用。

DataX 数据同步 MySQL —> HDFS

1.创建 MySQL 测试库表

CREATE DATABASE `test_datax` CHARACTER SET 'utf8mb4';
USE `test_datax`;

-- 商品属性表
DROP TABLE IF EXISTS sku_info;
CREATE TABLE sku_info(
    `sku_id`      varchar(100) COMMENT "商品id",
    `name`        varchar(200) COMMENT "商品名称",
    `category_id` varchar(100) COMMENT "所属分类id",
    `from_date`   varchar(100) COMMENT "上架日期",
    `price`       decimal(10,2) COMMENT "商品单价"
) COMMENT "商品属性表";

insert into sku_info
values ('1', 'xiaomi 10', '1', '2020-01-01', 2000),
       ('2', '手机壳', '1', '2020-02-01', 10),
       ('3', 'apple 12', '1', '2020-03-01', 5000),
       ('4', 'xiaomi 13', '1', '2020-04-01', 6000),
       ('5', '破壁机', '2', '2020-01-01', 500),
       ('6', '洗碗机', '2', '2020-02-01', 2000),
       ('7', '热水壶', '2', '2020-03-01', 100),
       ('8', '微波炉', '2', '2020-04-01', 600),
       ('9', '自行车', '3', '2020-01-01', 1000),
       ('10', '帐篷', '3', '2020-02-01', 100),
       ('11', '烧烤架', '3', '2020-02-01', 50),
       ('12', '遮阳伞', '3', '2020-03-01', 20);

2.配置 DataX 插件

我们进入 DataX 官网,下滑找到对应组件插件的文档。

我们这里是从 MySQL 中读数据,然后存储到 HDFS 上,所以我们这里就先去查找 MySQL Reader 插件文档。



然后就会找到一个 MySQL Reader 的插件配置样例,如下所示:

{
   
   
    "job": {
   
   
        "setting": {
   
   
             // 指定通道数量(并发)
            "speed": {
   
   
                 "channel": 3
            },
            // 允许的误差范围
            "errorLimit": {
   
    
                "record": 0,
                "percentage": 0.02
            }
        },
        "content": [
            {
   
   
                // 读取配置
                "reader": {
   
   
                    // 固定写法——mysqlreader
                    "name": "mysqlreader",
                    "parameter": {
   
   
                        // 账号密码
                        "username": "root",
                        "password": "root",
                        // 选取需要进行同步的字段
                        "column": [
                            "id",
                            "name"
                        ],
                        // 通过数据切片进行数据同步
                        "splitPk": "db_id",

                        // 指定库表连接信息
                        "connection": [
                            {
   
   
                                "table": [
                                    "table"
                                ],
                                "jdbcUrl": [
     "jdbc:mysql://127.0.0.1:3306/database"
                                ]
                            }
                        ]
                    }
                },
                // 写入配置
               "writer": {
   
   
                    "name": "streamwriter",
                    "parameter": {
   
   
                        "print":true
                    }
                }
            }
        ]
    }
}

了解 MySQL Reader 插件配置后,我们现在来学习 HDFS Writer 插件配置如何编写。



HDFS Writer 插件配置样例如下所示:

{
   
   
    "setting": {
   
   },
    "job": {
   
   
         // 指定通道数量(并发)
        "setting": {
   
   
            "speed": {
   
   
                "channel": 2
            }
        },
        "content": [
            {
   
   
                // 读取配置
                "reader": {
   
   
                    // 固定写法——txtfilereader
                    "name": "txtfilereader",
                    // 指定读取路径、编码、字段
                    "parameter": {
   
   
                        "path": ["/Users/shf/workplace/txtWorkplace/job/dataorcfull.txt"],
                        "encoding": "UTF-8",
                        "column": [
                            {
   
   
                                "index": 0,
                                "type": "long"
                            },
                            {
   
   
                                "index": 1,
                                "type": "long"
                            },
                            {
   
   
                                "index": 2,
                                "type": "long"
                            },
                            {
   
   
                                "index": 3,
                                "type": "long"
                            },
                            {
   
   
                                "index": 4,
                                "type": "DOUBLE"
                            },
                            {
   
   
                                "index": 5,
                                "type": "DOUBLE"
                            },
                            {
   
   
                                "index": 6,
                                "type": "STRING"
                            },
                            {
   
   
                                "index": 7,
                                "type": "STRING"
                            },
                            {
   
   
                                "index": 8,
                                "type": "STRING"
                            },
                            {
   
   
                                "index": 9,
                                "type": "BOOLEAN"
                            },
                            {
   
   
                                "index": 10,
                                "type": "date"
                            },
                            {
   
   
                                "index": 11,
                                "type": "date"
                            }
                        ],
                        "fieldDelimiter": "\t"
                    }
                },
                // 写入配置
                "writer": {
   
   
                    // 固定写法——hdfswriter
                    "name": "hdfswriter",
                    // 指定存储路径、格式、文件前缀名、字段
                    "parameter": {
   
   
                        "defaultFS": "hdfs://xxx:port",
                        "fileType": "orc",
                        "path": "/user/hive/warehouse/writerorc.db/orcfull",
                        "fileName": "xxxx",
                        "column": [
                            {
   
   
                                "name": "col1",
                                "type": "TINYINT"
                            },
                            {
   
   
                                "name": "col2",
                                "type": "SMALLINT"
                            },
                            {
   
   
                                "name": "col3",
                                "type": "INT"
                            },
                            {
   
   
                                "name": "col4",
                                "type": "BIGINT"
                            },
                            {
   
   
                                "name": "col5",
                                "type": "FLOAT"
                            },
                            {
   
   
                                "name": "col6",
                                "type": "DOUBLE"
                            },
                            {
   
   
                                "name": "col7",
                                "type": "STRING"
                            },
                            {
   
   
                                "name": "col8",
                                "type": "VARCHAR"
                            },
                            {
   
   
                                "name": "col9",
                                "type": "CHAR"
                            },
                            {
   
   
                                "name": "col10",
                                "type": "BOOLEAN"
                            },
                            {
   
   
                                "name": "col11",
                                "type": "date"
                            },
                            {
   
   
                                "name": "col12",
                                "type": "TIMESTAMP"
                            }
                        ],
                        // 指定存储模式
                        "writeMode": "append",
                        // 指定存储间隔符
                        "fieldDelimiter": "\t",
                        // 指定数据压缩模式
                        "compress":"NONE"
                    }
                }
            }
        ]
    }
}

3. 编写 DataX 同步 MySQL 数据到 HDFS 插件配置

cd $DATAX_HOME/job

vim test.json

添加下列内容:

{
   
   
    "job": {
   
   
        "setting": {
   
   
            "speed": {
   
   
                 "channel": 1
            }
        },
        "content": [
            {
   
   
                "reader": {
   
   
                    "name": "mysqlreader",
                    "parameter": {
   
   
                        "username": "root",
                        "password": "000000",
                        "column": [
                            "sku_id",
                            "name",
                            "category_id",
                            "from_date",
                            "price"
                        ],
                        "splitPk": "",
                        "connection": [
                            {
   
   
                                "table": [
                                    "sku_info"
                                ],
                                "jdbcUrl": [
     "jdbc:mysql://127.0.0.1:3306/test_datax?useSSL=false&useUnicode=true&characterEncoding=utf-8&allowPublicKeyRetrieval=true"
                                ]
                            }
                        ]
                    }
                },
                "writer": {
   
   
                    "name": "hdfswriter",
                    "parameter": {
   
   
                        "defaultFS": "hdfs://hadoop120:8020",
                        "fileType": "text",
                        "path": "/testInputPath",
                        "fileName": "sku_info",
                        "column": [
                            {
   
   
                                "name": "sku_id",
                                "type": "STRING"
                            },
                            {
   
   
                                "name": "name",
                                "type": "STRING"
                            },
                            {
   
   
                                "name": "category_id",
                                "type": "STRING"
                            },
                            {
   
   
                                "name": "from_date",
                                "type": "STRING"
                            },
                            {
   
   
                                "name": "price",
                                "type": "DOUBLE"
                            }
                        ],
                        "writeMode": "append",
                        "fieldDelimiter": "\t",
                        "compress":"gzip"
                    }
                }
            }
        ]
    }
}

上面的插件配置信息表明,将 MySQL 中 test_datax.sku_info 表的全量数据同步到 HDFS 中的 /testInputPath 路径下,如果指定输出文件类型为 text,那么必须指定压缩模式为 gzipbzip2

启动 Hadoop 集群,提前创建 HDFS 中的存储路径

hadoop fs -mkdir /testInputPath

运行 DataX,进行数据同步

cd $DATAX_HOME

python bin/datax.py job/test.json

执行完成后如下所示:

image.png

在 HDFS 上查看存储路径下的文件:

image.png

如果想要直接查看 HDFS 文件中存储的内容,可以运行下列命令:

hadoop fs -cat /testInputPath/* | zcat

image.png

DataX 数据同步 HDFS —> MySQL

将 HDFS 中 /testInputPath 路径下的数据全量导入到 MySQL 中 test_datax 库中的 test_sku_info 表(该表需要提前创建)。

1. 编写 HDFS 读取插件

{
   
   
    "job": {
   
   
        "setting": {
   
   
            "speed": {
   
   
                "channel": 1
            }
        },
        "content": [
            {
   
   
                "reader": {
   
   
                    "name": "hdfsreader",
                    "parameter": {
   
   
                        "path": "/testInputPath/*",
                        "defaultFS": "hdfs://hadoop120:8020",
                        "column": [
                            "*"
                        ],
                        "fileType": "text",
                        "encoding": "UTF-8",
                        "fieldDelimiter": "\t",
                        "compress": "gzip"
                    }

                },
                "writer": {
   
   
                    // 待编写
                }
            }
        ]
    }
}

这里需要注意 HDFS 的数据存放格式、间隔符、压缩方式等。

2. 编写 MySQL 写入插件

{
   
   
    "job": {
   
   
        "setting": {
   
   
            "speed": {
   
   
                "channel": 1
            }
        },
        "content": [
            {
   
   
                 "reader": {
   
   
                   // 待编写
                    }
                },
                "writer": {
   
   
                    "name": "mysqlwriter",
                    "parameter": {
   
   
                        "writeMode": "insert",
                        "username": "root",
                        "password": "000000",
                        "column": [
                            "*"
                        ],
                        "connection": [
                            {
   
   
                                "jdbcUrl": "jdbc:mysql://hadoop120:3306/test_datax?useSSL=false&useUnicode=true&characterEncoding=utf-8&allowPublicRetrieval=true",
                                "table": [
                                    "test_sku_info"
                                ]
                            }
                        ]
                    }
                }
            }
        ]
    }
}

导入数据到 MySQL 时,需要注意写入模式 writeMode,因为 HDFS 中的存储数据内容并没有主键的概念,而 MySQL 中有主键,如果模式选择不正确,会带来不必要的麻烦,模式说明如下:

image.png

分别表示插入写 insert、覆盖写 replace、更新写 update

3. 合并读写插件

{
   
   
    "job": {
   
   
        "setting": {
   
   
            "speed": {
   
   
                "channel": 1
            }
        },
        "content": [
            {
   
   
                "reader": {
   
   
                    "name": "hdfsreader",
                    "parameter": {
   
   
                        "path": "/testInputPath/*",
                        "defaultFS": "hdfs://hadoop120:8020",
                        "column": [
                            "*"
                        ],
                        "fileType": "text",
                        "encoding": "UTF-8",
                        "fieldDelimiter": "\t",
                        "compress": "gzip"
                    }

                },
                "writer": {
   
   
                    "name": "mysqlwriter",
                    "parameter": {
   
   
                        "writeMode": "insert",
                        "username": "root",
                        "password": "000000",
                        "column": [
                            "*"
                        ],
                        "connection": [
                            {
   
   
                                "jdbcUrl": "jdbc:mysql://hadoop120:3306/test_datax?useSSL=false&useUnicode=true&characterEncoding=utf-8&allowPublicRetrieval=true",
                                "table": [
                                    "test_sku_info"
                                ]
                            }
                        ]
                    }
                }
            }
        ]
    }
}

合并完成后,将其存储到 $DATAX_HOME/job/hdfs_to_mysql.json 中。

4. 创建 MySQL 中的存储库

USE `test_datax`;

DROP TABLE IF EXISTS `test_sku_info`;
CREATE TABLE `test_sku_info`  (
  `sku_id` varchar(100) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NULL DEFAULT NULL COMMENT '商品id',
  `name` varchar(200) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NULL DEFAULT NULL COMMENT '商品名称',
  `category_id` varchar(100) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NULL DEFAULT NULL COMMENT '所属分类id',
  `from_date` varchar(100) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NULL DEFAULT NULL COMMENT '上架日期',
  `price` decimal(10, 2) NULL DEFAULT NULL COMMENT '商品单价'
) ENGINE = InnoDB CHARACTER SET = utf8mb4 COLLATE = utf8mb4_0900_ai_ci COMMENT = '商品属性表' ROW_FORMAT = Dynamic;

5. 执行数据同步

将 HDFS 中 /testInputPath 路径下的数据全量导入到 MySQL 中 test_datax 库中的 test_sku_info 表。

cd $DATAX_HOME

python bin/datax.py job/hdfs_to_mysql.json

image.png

运行完成后,进入 MySQL 中查看同步过来的内容:

select * from test_datax.test_sku_info limit 10;

image.png

数据同步完成,其它数据源数据同步方式也基本上差不多,多看看官方插件配置文档就会了。

更多内容请查看:DataX 官网

DataX 优化

DataX3.0 提供了包括通道(并发)、记录流、字节流三种流控模式,可以根据随意控制你的作业速度,让你的作业在库可以承受的范围内达到最佳的同步速度。

速度优化

"speed": {
   
   
   "channel": 5,
   "byte": 1048576,
   "record": 10000
}
  • "channel": 5 这个参数指定了通道的数量。通道可以理解为并行处理数据的通道数。在数据传输过程中,可以同时处理多个通道,提高数据传输效率。

  • "byte": 1048576 这个参数指定了每个通道传输的字节数,这里的数值是 1048576,即 1 MB

  • "record": 10000 这个参数指定了每个通道传输的记录数,这里的数值是 10000

配置时必须遵守以下规则:

  • 若配置了总 record 记录限速,则必须配置单个 channelrecord 记录限速。
  • 若配置了总 byte 字节限速,则必须配置单个 channelbyte 字节限速。

  • 若配置了总 record 记录限速和总 byte 字节限速,channel 并发数参数就会失效。

配置格式:

{
   
   
    "core": {
   
   
        "transport": {
   
   
            "channel": {
   
   
                "speed": {
   
   
                    "byte": 1048576 //单个channel的byte字节限速1M/s
                }
            }
        }
    },
    "job": {
   
   
        "setting": {
   
   
            "speed": {
   
   
                "byte" : 5242880 //总byte字节限速5M/s
            }
        },
        ......
    }
}

内存优化

在运行数据同步的时候直接指定堆内存大小。

cd $DATAX_HOME

python bin/datax.py --jvm="-Xms4G -Xmx4G" job/job.json

在这里,将 JVM 最大和最小堆内存都设置为 4GB

同步 MySQL 中 NULL 值数据到 HDFS 出现错误

解决datax抽mysql数据到hdfs之null值变成‘‘(引号)的问题

配置文件变量传参

有时候我们不想将 JSON 插件配置文件直接固定写死,那么我们就可以通过传参的方式来进行动态设置。

例如,我想要将 MySQL 数据同步到 HDFS 上,每天的同步日期都要进行修改,这时候就可以通过变量传参的方式来解决。

{
   
   
    "job": {
   
   
        "setting": {
   
   
            "speed": {
   
   
                 "channel": 1,
            },
        },
        "content": [
            {
   
   
                "reader": {
   
   
                    "name": "mysqlreader",
                    "parameter": {
   
   
                        "username": "root",
                        "password": "000000",
                        "connection": [
                            {
   
   
                                "querySql": [
                                    "select sku_id,name,category_id,from_date,price from sku_info;"
                                ],
                                "jdbcUrl": [
     "jdbc:mysql://127.0.0.1:3306/test_datax?useSSL=false&useUnicode=true&characterEncoding=utf-8&allowPublicKeyRetrieval=true"
                                ]
                            }
                        ]
                    }
                },
                "writer": {
   
   
                    "name": "hdfswriter",
                    "parameter": {
   
   
                        "defaultFS": "hdfs://hadoop120:8020",
                        "fileType": "text",
                        "path": "/testInputPath/${dt}",
                        "fileName": "sku_info",
                        "column": [
                          "*"
                        ],
                        "writeMode": "append",
                        "fieldDelimiter": "\t",
                        "compress":"gzip"
                    }
                }
            }
        ]
    }
}

这里使用了 MySQL 数据读取另一种写法(querySql

image.png

以 SQL 作为查询语句,同时也支持使用 Where 语句进行条件过滤。

在写入配置 "writer" 中设置了需要传参的日期变量 dt

    "path": "/testInputPath/${dt}",

模拟将其存储路径按照日期进行动态设置。

注意: 执行数据同步前需要在 HDFS 上先创建对应存储路径。

hadoop fs -mkdir /testInputPath/2023-09-16

现在来进行数据同步测试

cd $DATAX_HOME

python bin/datax.py job/var.json -p "-Ddt=2023-09-16"

正常执行完成后,结果如下所示:

image.png

数据同步完成

image.png

基于 DataX 开发的快速同步 MySQL 数据至 HDFS 上的工具

项目地址:基于 DataX 开发的快速同步 MySQL 数据至 HDFS 上的工具

image.png

相关文章
|
1月前
|
SQL 分布式计算 Oracle
数据同步工具DataX的安装
数据同步工具DataX的安装
585 0
|
9天前
|
SQL 存储 关系型数据库
|
9天前
|
SQL 关系型数据库 MySQL
|
21天前
|
消息中间件 关系型数据库 MySQL
Maxwell 概述、安装、数据同步【一篇搞定】!
Maxwell 是一个由 Zendesk 开源的用于 MySQL 数据库实时数据捕获和同步的工具,支持多种数据库系统,以 JSON 格式输出变更数据。它实时监控数据库中的更新,将变化传递给其他系统,常用于实时数据管道、数据仓库和事件驱动架构。Maxwell 具有实时性、可配置性和高性能等特点。其工作流程包括 Binlog 解析、数据解析、重构、发布到消息队列(如 Kafka)以及事件处理。安装时需注意 JDK 版本,并配置 MySQL、Zookeeper 和 Kafka。此外,Maxwell 支持定向监听特定库表,并能进行历史和增量数据同步。
|
1月前
|
SQL JSON DataWorks
DataWorks产品使用合集之DataWorks 数据集成任务中,将数据同步到 Elasticsearch(ES)中,并指定 NESTED 字段中的 properties 类型如何解决
DataWorks作为一站式的数据开发与治理平台,提供了从数据采集、清洗、开发、调度、服务化、质量监控到安全管理的全套解决方案,帮助企业构建高效、规范、安全的大数据处理体系。以下是对DataWorks产品使用合集的概述,涵盖数据处理的各个环节。
33 0
|
1月前
|
Java Linux DataX
DataX入门指南:快速部署和安装指南
DataX入门指南:快速部署和安装指南
488 2
DataX入门指南:快速部署和安装指南
|
25天前
|
SQL Kubernetes 关系型数据库
实时计算 Flink版产品使用合集之如何实现MySQL单表数据同步到多个表
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStreamAPI、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
25天前
|
SQL 关系型数据库 MySQL
实时计算 Flink版产品使用合集之使用 MySQL CDC 进行数据同步时,设置 server_id 参数如何解决
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
30天前
|
消息中间件 关系型数据库 MySQL
MySQL 到 Kafka 实时数据同步实操分享(1),字节面试官职级
MySQL 到 Kafka 实时数据同步实操分享(1),字节面试官职级
|
30天前
|
机器学习/深度学习 关系型数据库 MySQL
MySQL 到 Greenplum 实时数据同步实操分享,2024年最新【Python面试题
MySQL 到 Greenplum 实时数据同步实操分享,2024年最新【Python面试题

热门文章

最新文章