建议在阅读本教程之前先阅读官方文档
https://docs.byzer.org/#/byzer-lang/zh-cn/
数据加载/Load
load,代表读入数据源的行为
load一张mysql表
load jdbc.`stg_sxyxpz_nw.DecDeclareFlowConf` options and driver="com.mysql.jdbc.Driver" and url="jdbc:mysql://127.0.0.1:9030/stg_sxyxpz_nw?useSSL=false" and dbtable="`stg_sxyxpz_nw`.`DecDeclareFlowConf`" and user="fs_nwbj" and password="123456" as DecDeclareFlowConf;
load多张mysql表
-- 01加载数据源 connect jdbc where driver="com.mysql.jdbc.Driver" and url="jdbc:mysql://127.0.0.1:9030/ods_yxjc_prod?useSSL=false" and user="fs_nwbj" and password="123456" as ods_yxjc_prod; load jdbc.`ods_yxjc_prod.ods_SmpOnethingModule` as ods_SmpOnethingModule; load jdbc.`ods_yxjc_prod.ods_SmpOnethingCatalog` as ods_SmpOnethingCatalog;
load的时候添加原生语句的支持
-- 01加载数据源t_item_implement connect jdbc where driver="com.mysql.jdbc.Driver" and url="jdbc:mysql://127.0.0.1:9030/stg_sx?useSSL=false" and user="fs_nwbj" and password="123456" as stg_sx; load jdbc.`stg_sx.t_item_implement` where directQuery = ''' SELECT * FROM t_item_implement WHERE `STATUS` = "1" and IN_CURRENT = "1" AND ITEM_TYPE = "01" '''
这样就避免了全量加载导致的OOM
load一张Doris表
load doris.`stg_sx.t_item_material` --注意表名和doris.table.identifier一致 options doris.table.identifier="stg_sx.t_item_material" --需指定,与上一行要load的表名一致 and doris.fenodes="127.0.0.1:8030" and user="fs_nwbj" and password="123456" and doris.request.tablet.size="3" -- 可选参数 as t_item_material;
可选参数说明:
1)doris.request.tablet.size
默认值:
Integer.MAX_VALUE
说明:
一个RDD Partition对应的Doris Tablet个数。
此数值设置越小,则会生成越多的Partition。从而提升Spark侧的并行度,但同时会对Doris造成更大的压力。
(如一个表的tablet=10,设置的doris.request.tablet.size=3,则spark按4个并行度进行数据读取(10/3=3.3≈4),即读取数据的时候可按tablet分成4个并行任务,提高效率,但“同时会对Doris造成更大的压力”,适当调整)
load多张Doris表
load doris.`ods_yxjc_prod.ods_SmpOnethingFlowNode` --注意表名和doris.table.identifier一致 options doris.table.identifier="ods_yxjc_prod.ods_SmpOnethingFlowNode" --需指定,与上一行要load的表名一致 and doris.fenodes="127.0.0.1:8030" and user="fs_nwbj" and password="123456" and doris.request.tablet.size="3" as ods_SmpOnethingFlowNode; load doris.`ods_yxjc_prod.ods_SmpOnethingHditmDir` --注意表名和doris.table.identifier一致 options doris.table.identifier="ods_yxjc_prod.ods_SmpOnethingHditmDir" --需指定,与上一行要load的表名一致 and doris.fenodes="127.0.0.1:8030" and user="fs_nwbj" and password="123456" and doris.request.tablet.size="3" as ods_SmpOnethingHditmDir;
每张表都是一个源,需要单独写连接和用户密码,不支持一个connetor重复使用load表
注册函数,模型/Register
Register动态注册 Java/Scala 写的 UDF/UDAF 函数
注册一个java函数
-- 设置java函数 set decDeclareProxyAcceptConfighandleScope =''' public class DecDeclareProxyAcceptConfighandleScope { public String is_full_city_handle(String value) { if (value == null || "".equals(value)) { return "0"; } return value.contains("city") ? "1" : "0"; } } '''; load script.`decDeclareProxyAcceptConfighandleScope` as scriptTable; register ScriptUDF.`scriptTable` as is_full_city_handle options lang="java" and className = "DecDeclareProxyAcceptConfighandleScope" and methodName = "is_full_city_handle";
注册多个java函数
-- 设置java函数 set smpOnethingModuleExtendContent =''' public class SmpOnethingModuleExtendContent { public String project_type(String value) { if (value == null || "".equals(value)) { return null; } int valueIndex = value.indexOf("infoType="); int indexOf = value.indexOf(",", valueIndex); int beginIndex = valueIndex + "infoType=".length(); if (indexOf < beginIndex) { return null; } String substring = value.substring(beginIndex, indexOf); switch (substring) { case "承诺上报件": return "0"; case "承诺件": return "1"; case "即办件": return "2"; case "联办件": return "3"; default: return null; } } public String project_type_text(String value) { if (value == null || "".equals(value)) { return null; } int valueIndex = value.indexOf("infoType="); if (valueIndex == -1) { return null; } int indexOf = value.indexOf(",", valueIndex); return value.substring(valueIndex + "infoType=".length(), indexOf); } '''; load script.`smpOnethingModuleExtendContent` as scriptTable; register ScriptUDF.`scriptTable` as project_type options lang="java" and className = "SmpOnethingModuleExtendContent" and methodName = "project_type"; register ScriptUDF.`scriptTable` as project_type_text options lang="java" and className = "SmpOnethingModuleExtendContent" and methodName = "project_type_text";
数据转换/Select
这个select就和我们mysql中的sql没有区别,他完全兼容 Spark SQL,除了select 句式最后 as 表名
select一张表
select unid, jointTrialModel, jointTrialOvertime from SmpOnethingJointTrialConfig as res01;
select多张表
select SmpOnethingModule.unid, SmpOnethingModule.moduleName, core_tag.value as module_class_tag_name from SmpOnethingModule left join core_tag on core_tag.unid = SmpOnethingModule.moduleClassTag as res01
select使用自带函数
select SmpOnethingModule.unid, SUBSTRING(SmpOnethingModule.areaCode,1,6) as region_city_code, from SmpOnethingModule as res01;
select使用上方注册的java函数
select SmpOnethingModule.unid, project_type(SmpOnethingModule.extendContent) as project_type, project_type_text(SmpOnethingModule.extendContent) as project_type_text from SmpOnethingModule as res01;
select使用自带语句
select catalogName, catalogType, catalogCode, legalStandard, case status when 'Y' then '1' when 'N' then '0' else null end as status, deptName, deptUnid, exerciseLevel, creditCode, case onethingType when 'L0' then '0' when 'L1' then '1' else null end as onethingType from SmpOnethingCatalog as res01;
select使用子查询
select SmpOnethingModule.unid, (SELECT count(*) FROM SmpOnethingFlowNode WHERE moduleUnid = SmpOnethingModule.unid) as integrate_single_item_num from SmpOnethingModule as res01;
保存数据/Save
save 句式类似传统 SQL 中的 insert 语法
保存select结果到mysql数据表中
save overwrite res01 as jdbc.`ods_yxjc_prod.ods_SmpOnethingModule` options and driver="com.mysql.jdbc.Driver" and url="jdbc:mysql://127.0.0.1:9030/ods_yxjc_prod?useSSL=false&rewriteBatchedStatements=true" and dbtable="`ods_yxjc_prod`.`ods_SmpOnethingModule`" and user="fs_nwbj" and password="123456";
保存的时候清空mysql数据表
-- --save保存 save overwrite res01 as jdbc.`ods_yxjc_prod.ods_SmpOnethingModule` options and driver="com.mysql.jdbc.Driver" and url="jdbc:mysql://127.0.0.1:9030/ods_yxjc_prod?useSSL=false&rewriteBatchedStatements=true" and dbtable="`ods_yxjc_prod`.`ods_SmpOnethingModule`" and user="fs_nwbj" and password="123456" and truncate="true";
更新一条数据
如果主键id一样直接append就会覆盖
save append res01 as jdbc.`dws_yxjc_nw.yxjc_index_full` options and driver="com.mysql.jdbc.Driver" and url="jdbc:mysql://127.0.0.1:9030/dws_yxjc_nw?useSSL=false&rewriteBatchedStatements=true" and dbtable="`dws_yxjc_nw`.`yxjc_index_full`" and user="fs_nwbj" and password="123456" and truncate="true";
清空Doris数据表
执行DDL语句truncate表格
run command as JDBC.`byzer_demo._` where driver="com.mysql.jdbc.Driver" and url="jdbc:mysql://127.0.0.1:9030/ods_yxjc_prod?useSSL=false&rewriteBatchedStatements=true" and user="fs_nwbj" and password="123456" and `driver-statement-0`="truncate table ods_yxjc_prod.ods_item_material_test";
保存select结果到Doris数据表中
-- 执行写入语句(暂时不支持overwrite .... truncate=true,有需要添加上面那句) save append res01 as doris.`ods_yxjc_prod.ods_item_material_test` and doris.table.identifier="ods_yxjc_prod.ods_item_material_test" and doris.fenodes="127.0.0.1:8030" and user="fs_nwbj" and password="123456" ;