保姆级教程!玩转 ChunJun 详细指南

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: 「chunJun 新手入门」系列的第三篇,本文将为大家介绍如何配置一个 ChunJun 任务,获取 ChunJun 以及通过 ChunJun Client 端提交任务的流程等内容,教会大家更好地玩转 ChunJun。ChunJun 是一款稳定、易用、高效、批流一体的数据集成框架,⽀持海量数据的同步与计算,对ChunJun 感兴趣的小伙伴不要错过~

ChunJun 是一款稳定、易用、高效、批流一体的数据集成框架,⽀持海量数据的同步与计算。ChunJun 既可以采集静态的数据,比如 MySQL,HDFS 等,也可以采集实时变化的数据,比如 binlog,Kafka 等。同时 ChunJun 也是一个支持原生 FlinkSQL 所有语法和特性的计算框架。

经过5年的迭代和开发,ChunJun 已经帮助很多公司快速进行数据整合,并解决数据开发人员需要过多进行繁琐的数据抽取工作的问题,可以专注在企业业务场景的构建。

之前的内容当中,我们已经介绍过 ChunJun 的技术力、优势,及如何提交 pr、Issue 的方法。作为「chunJun 新手入门」系列的第三篇,本文将为大家介绍如何配置一个 ChunJun 任务以及通过 ChunJun Client 端提交任务的流程等内容,教会大家更好地玩转 ChunJun。

ChunJun 新手入门

Hi,我是ChunJun,一个有趣好用的开源项目

Ding!您有一份ChunJun实用指南,请查收

ChunJun 地址

官网:

https://dtstack.github.io/chunjun/

GitHub:

https://github.com/DTStack/chunjun

Gitee:

https://gitee.com/dtstack_dev_0/chunjun

配置一个 ChunJun 任务

ChunJun 的任务脚本⽀持两种模式:Sync(Json) 和 SQL,前者配置更加丰富,底层使⽤的是 StreamAPI,在同步场景使⽤的较多;后者借助 Flink SQL 本身的能⼒,利⽤ SQL 实现对数据的聚合等计算操作,底层使⽤的是 TableAPI。

Sync

同步任务使⽤的 Json 格式的配置⽂件,通过配置 Source/Sink 来完成数据的 EL 流程。⼀个同步任务的基本结构如下:

{
"job": {
"content": [
{
"nameMapping": {},
"reader": {
"parameter": {},
"name": "reader"
},
"writer": {
"parameter": {},
"name": "writer"
},
"restoration": {
"cache": {
"properties": {}
},
"workerMax": 3,
"workerSize": 3,
"workerNum": 2,
"ddl": {
"properties": {}
}
}
}
],
"setting": {
"restore": {},

● Job 整个任务的参数配置

1)同步任务的算⼦配置,如 Reader/Writer/Restoration 等。

• nameMapping:表名映射配置,⽤在 CDC 场景

• reader:同步任务 reader 的配置

• writer:同步任务writer的配置

• restoration:数据还原相关配置

2)setting 系统的⼀些参数配置,如增量同步(restore)、流控(speed)等。

SQL

ChunJun 的 SQL 任务直接沿⽤了 FlinkSQL 的引擎。详细⽂档请看:

https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/dev/table/overview/

● DDL

CREATE TABLE xx(xxx) WITH(xxx); 
CREATE VIEW xxx

● DML

INSERT INTO xxx;

获取 ChunJun

前置准备

· Java(JDK8);

· Maven(3.6.3,版本太低会找不到对应的 jar,另外,⾼版本的 Maven 对仓库地址强制要求是 HTTPS,会存在仓库地址访问失败的情况)

ChunJun 下载

● release 下载

ChunJun release 下载地址:

https://github.com/DTStack/chunjun/releases

● 源码编译

源码下载:

https://github.com/DTStack/chunjun.git

ChunJun 是通过 Maven 来进⾏代码依赖管理,对应的打包命令是:

mvn clean package -Dmaven.test.skip

ChunJun 使⽤的是 spotless 插件来进⾏代码⻛格管理,在修改源码之后打包,需要对源码先执⾏下 mvn spotless:apply 命令来进⾏代码格式化,否则会出现格式化不合规问题。

● 目录结构

chunjun-dist
├── chunjun-core.jar
├── connector
├── ddl
├── dirty-data-collector
├── docker-build
├── metrics
└── restore-plugins

通过 ChunJun Client 端提交任务

通过 LocalTest、Standalone、Yarn Session、Yarn Perjob 四种模式为大家介绍如何通过ChunJun Client 端提交任务。

LocalTest 模式(适⽤于本地调试)

Local Test 模式是针对开发者同学⽤来进行本地测试验证的模块,只需要修改 main() 中的 jobPath 路径即可,需要注意,同步任务的脚本请以 json ⽂件结尾,计算任务的脚本请以 sql ⽂件结尾。

Standalone 模式

● 环境准备

下载 Flink 并解压

wget "http://archive.apache.org/dist/flink/flink-<flink.version>/flink-<flink.version>-bin-scala_<scala.version>.tgz"      
tar -zxvf flink-<flink.version>-bin-scala_<scala.version>.tgz

● 配置 ChunJun

1)下载 ChunJun 并解压

wget "https://github.com/DTStack/chunjun/releases/download/<chunjun-tag>/chunjun-dist.tar.gz"  
tar -zxvf chunjun-dist.tar.gz

2)将 ChunJun-Dist 内容复制到 Flink Lib ⽬录下并启动 Flink Standalone 集群

# copy the chunjun-dist to the flink_lib
cp -r chunjun-dist $FLINK_HOME/lib
# start flink standalone cluster
sh $FLINK_HOME/bin/start-cluster.sh

3)在 Flink classpath 中可以看到 ChunJun 相关 jar,表示启动成功;

● 提交任务

sh $CHUNJUN_DIST/bin/chunjun-standalone.sh <task-script path>

命令执⾏成功之后,即可在 Flink WEB UI 中看到对应的任务。

Yarn Session 模式

● 环境准备

1)下载 ChunJun 并解压

wget "https://github.com/DTStack/chunjun/releases/download/<chunjun-tag>/chunjun-dist.tar.gz"  
tar -zxvf chunjun-dist.tar.gz

2)下载 ChunJun 并提交到 Yarn Session 集群中

sh $FLINK_HOME/bin?yarn-session.sh -t $CHUNJUN_DIST -d

· 执⾏命令成功之后,即可在Yarn Session ⽇志,对应Classpath 部分中看到 ChunJun 相关的jar, 表示启动成功;

· 记录当前 Yarn Session 的,并将任务提交到指定 Session中;

sh ./bin/chunjun-yarn-session.sh -job <task-script path> -confProp {\"yarn.application.id\":\"<ApplicationID>\"}

之后就可以在 Yarn Session 中看到对应的任务,注意以下两点:

• 如果将 yarn.application.id 配置到 flink-conf.yaml,那么使⽤这份配置⽂件的任务都会提交到这个 id 的 session 中;

• 如果将 yarn.application.id 配置到 confProp,那么仅有当前任务会提交到这个 id 的 session 中。

Yarn Perjob 模式

后续会废弃这种模式,改⽤ Application 模式。

● 环境准备

下载 Flink 并解压

wget "http://archive.apache.org/dist/flink/flink-<flink.version>/flink-<flink.version>-bin-scala_<scala.version>.tgz"      
tar -zxvf flink-<flink.version>-bin-scala_<scala.version>.tgz

● 配置 ChunJun

下载 ChunJun 并解压

wget "https://github.com/DTStack/chunjun/releases/download/<chunjun-tag>/chunjun-dist.tar.gz"  
tar -zxvf chunjun-dist.tar.gz

● 提交任务

sh ./bin/chunjun-yarn-perjob.sh -job <task-script path>

执⾏成功之后,可以在 Yarn Web UI 中看到相关任务。

调试 ChunJun 代码

调试代码能够更好地定位问题,并解决问题。下⾯将为开发者介绍如何快速调试 ChunJun 代码:

本地调试

ChunJun 为开发者准备了⼀个 local-test 模块,替换 main ⽅法中的 jobPath 即可。需要提前将相关插件配置在 local-test 模块的 pom 中,部分插件相互存在依赖冲突,需要开发者关注下。

远程调试

在 flink-conf.yaml 中配置 debug 端⼝即可(端⼝号可以⾃⼰定义)。

# debug jobmanager
env.java.opts.jobmanager: -agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=5005
# debug taskmanager
env.java.opts.taskmanager: -agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=5006


《数据治理行业实践白皮书》下载地址:https://fs80.cn/380a4b

想了解或咨询更多有关袋鼠云大数据产品、行业解决方案、客户案例的朋友,浏览袋鼠云官网:https://www.dtstack.com/?src=szalykfz

同时,欢迎对大数据开源项目有兴趣的同学加入「袋鼠云开源框架钉钉技术 qun」,交流最新开源技术信息,qun 号码:30537511,项目地址:https://github.com/DTStack

相关实践学习
基于Hologres+Flink搭建GitHub实时数据大屏
通过使用Flink、Hologres构建实时数仓,并通过Hologres对接BI分析工具(以DataV为例),实现海量数据实时分析.
实时计算 Flink 实战课程
如何使用实时计算 Flink 搞定数据处理难题?实时计算 Flink 极客训练营产品、技术专家齐上阵,从开源 Flink功能介绍到实时计算 Flink 优势详解,现场实操,5天即可上手! 欢迎开通实时计算 Flink 版: https://cn.aliyun.com/product/bigdata/sc Flink Forward Asia 介绍: Flink Forward 是由 Apache 官方授权,Apache Flink Community China 支持的会议,通过参会不仅可以了解到 Flink 社区的最新动态和发展计划,还可以了解到国内外一线大厂围绕 Flink 生态的生产实践经验,是 Flink 开发者和使用者不可错过的盛会。 去年经过品牌升级后的 Flink Forward Asia 吸引了超过2000人线下参与,一举成为国内最大的 Apache 顶级项目会议。结合2020年的特殊情况,Flink Forward Asia 2020 将在12月26日以线上峰会的形式与大家见面。
目录
相关文章
|
API Apache 数据库
Flink CDC 3.0 正式发布,详细解读新一代实时数据集成框架
Flink CDC 于 2023 年 12 月 7 日重磅推出了其全新的 3.0 版本 ~
108473 8
 Flink CDC 3.0 正式发布,详细解读新一代实时数据集成框架
|
SQL 存储 数据采集
【技术分享】元数据与数据血缘实现思路
【技术分享】元数据与数据血缘实现思路
5873 0
|
关系型数据库 MySQL API
|
SQL API Apache
官宣|Apache Flink 1.20 发布公告
Apache Flink 1.20.0 已发布,这是迈向 Flink 2.0 的最后一个小版本,后者预计年底发布。此版本包含多项改进和新功能,涉及 13 个 FLIPs 和 300 多个问题解决。亮点包括引入物化表简化 ETL 管道开发,统一检查点文件合并机制减轻文件系统压力,以及 SQL 语法增强如支持 `DISTRIBUTED BY` 语句。此外,还进行了大量的配置项清理工作,为 Flink 2.0 铺平道路。这一版本得益于 142 位贡献者的共同努力,其中包括来自中国多家知名企业的开发者。
1785 7
官宣|Apache Flink 1.20 发布公告
|
SQL 分布式计算 Hadoop
Hadoop学习笔记(HDP)-Part.08 部署Ambari集群
01 关于HDP 02 核心组件原理 03 资源规划 04 基础环境配置 05 Yum源配置 06 安装OracleJDK 07 安装MySQL 08 部署Ambari集群 09 安装OpenLDAP 10 创建集群 11 安装Kerberos 12 安装HDFS 13 安装Ranger 14 安装YARN+MR 15 安装HIVE 16 安装HBase 17 安装Spark2 18 安装Flink 19 安装Kafka 20 安装Flume
447 0
Hadoop学习笔记(HDP)-Part.08 部署Ambari集群
|
SQL 关系型数据库 MySQL
数据集成框架FlinkX(纯钧)入门
数据集成框架FlinkX(纯钧)入门
821 0
|
存储 固态存储 关系型数据库
Apache Doris 系列: 入门篇-安装部署
Apache Doris 系列: 入门篇-安装部署
3529 0
|
存储 监控 关系型数据库
DataX 概述、部署、数据同步运用示例
DataX是阿里巴巴开源的离线数据同步工具,支持多种数据源之间的高效传输。其特点是多数据源支持、可扩展性、灵活配置、高效传输、任务调度监控和活跃的开源社区支持。DataX通过Reader和Writer插件实现数据源的读取和写入,采用Framework+plugin架构。部署简单,解压即可用。示例展示了如何配置DataX同步MySQL到HDFS,并提供了速度和内存优化建议。此外,还解决了NULL值同步问题及配置文件变量传参的方法。
7835 5
|
数据采集 数据管理 大数据
推荐 | AllData开源数据中台技术分享
AllData数据中台架构师团队全面解析开源项目[alldata](https://github.com/alldatacenter/alldata),涵盖功能设计、架构分析及源码解读。团队分享了项目总结、发展规划,推荐关注公众号“大数据商业驱动引擎”以获取更多信息。他们讨论了数据治理、调度引擎、商业化探索及未来规划,涉及元数据管理、数据安全、Airflow调度引擎等。此外,还介绍了数据平台功能,如用户管理、权限控制,并提到了商业化版本的源码支持。鼓励用户参与社区交流,共同推动项目发展。
推荐 | AllData开源数据中台技术分享
|
消息中间件 关系型数据库 MySQL
Flink CDC 最佳实践(以 MySQL 为例)
Flink CDC 最佳实践(以 MySQL 为例)
3382 0