MLSQL(Byzer)的快速入门

本文涉及的产品
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
云数据库 RDS MySQL,高可用系列 2核4GB
简介: MLSQL(Byzer)的快速入门

建议在阅读本教程之前先阅读官方文档

https://docs.byzer.org/#/byzer-lang/zh-cn/


b849bd45ae334ee3a05d0db8a257cde1.png

数据加载/Load


load,代表读入数据源的行为


load一张mysql表

load jdbc.`stg_sxyxpz_nw.DecDeclareFlowConf` options and driver="com.mysql.jdbc.Driver"
and url="jdbc:mysql://127.0.0.1:9030/stg_sxyxpz_nw?useSSL=false"
and dbtable="`stg_sxyxpz_nw`.`DecDeclareFlowConf`"
and user="fs_nwbj"
and password="123456"
as DecDeclareFlowConf;

load多张mysql表

-- 01加载数据源
connect jdbc where
driver="com.mysql.jdbc.Driver"
and url="jdbc:mysql://127.0.0.1:9030/ods_yxjc_prod?useSSL=false"
and user="fs_nwbj"
and password="123456"
 as ods_yxjc_prod;
load jdbc.`ods_yxjc_prod.ods_SmpOnethingModule` as ods_SmpOnethingModule;
load jdbc.`ods_yxjc_prod.ods_SmpOnethingCatalog` as ods_SmpOnethingCatalog;

load的时候添加原生语句的支持

-- 01加载数据源t_item_implement
connect jdbc where 
driver="com.mysql.jdbc.Driver"
and url="jdbc:mysql://127.0.0.1:9030/stg_sx?useSSL=false"
and user="fs_nwbj"
and password="123456"
 as stg_sx;
load jdbc.`stg_sx.t_item_implement` where
directQuery = '''
SELECT * FROM t_item_implement WHERE `STATUS` = "1" and IN_CURRENT = "1" AND ITEM_TYPE = "01"
'''


这样就避免了全量加载导致的OOM

load一张Doris

load doris.`stg_sx.t_item_material`   --注意表名和doris.table.identifier一致
options doris.table.identifier="stg_sx.t_item_material"  --需指定,与上一行要load的表名一致
and doris.fenodes="127.0.0.1:8030"
and user="fs_nwbj"
and password="123456" 
and doris.request.tablet.size="3"  -- 可选参数
as t_item_material;


可选参数说明:

1)doris.request.tablet.size

默认值:

Integer.MAX_VALUE

说明:

一个RDD Partition对应的Doris Tablet个数。

此数值设置越小,则会生成越多的Partition。从而提升Spark侧的并行度,但同时会对Doris造成更大的压力。

(如一个表的tablet=10,设置的doris.request.tablet.size=3,则spark按4个并行度进行数据读取(10/3=3.3≈4),即读取数据的时候可按tablet分成4个并行任务,提高效率,但“同时会对Doris造成更大的压力”,适当调整)


load多张Doris表


load doris.`ods_yxjc_prod.ods_SmpOnethingFlowNode`   --注意表名和doris.table.identifier一致
options doris.table.identifier="ods_yxjc_prod.ods_SmpOnethingFlowNode"  --需指定,与上一行要load的表名一致
and doris.fenodes="127.0.0.1:8030"
and user="fs_nwbj"
and password="123456"
and doris.request.tablet.size="3"
as ods_SmpOnethingFlowNode;
load doris.`ods_yxjc_prod.ods_SmpOnethingHditmDir`   --注意表名和doris.table.identifier一致
options doris.table.identifier="ods_yxjc_prod.ods_SmpOnethingHditmDir"  --需指定,与上一行要load的表名一致
and doris.fenodes="127.0.0.1:8030"
and user="fs_nwbj"
and password="123456"
and doris.request.tablet.size="3"
as ods_SmpOnethingHditmDir;



每张表都是一个源,需要单独写连接和用户密码,不支持一个connetor重复使用load表

注册函数,模型/Register


Register动态注册 Java/Scala 写的 UDF/UDAF 函数


注册一个java函数


-- 设置java函数
set decDeclareProxyAcceptConfighandleScope ='''
public class DecDeclareProxyAcceptConfighandleScope {
    public String is_full_city_handle(String value) {
        if (value == null || "".equals(value)) {
            return "0";
        }
        return value.contains("city") ? "1" : "0";
    }
}
''';
load script.`decDeclareProxyAcceptConfighandleScope` as scriptTable;
register ScriptUDF.`scriptTable` as is_full_city_handle
options lang="java"
and className = "DecDeclareProxyAcceptConfighandleScope"
and methodName = "is_full_city_handle";


注册多个java函数

-- 设置java函数
set smpOnethingModuleExtendContent ='''
public class SmpOnethingModuleExtendContent {
    public String project_type(String value) {
        if (value == null || "".equals(value)) {
            return null;
        }
        int valueIndex = value.indexOf("infoType=");
        int indexOf = value.indexOf(",", valueIndex);
        int beginIndex = valueIndex + "infoType=".length();
        if (indexOf < beginIndex) {
            return null;
        }
        String substring = value.substring(beginIndex, indexOf);
        switch (substring) {
            case "承诺上报件":
                return "0";
            case "承诺件":
                return "1";
            case "即办件":
                return "2";
            case "联办件":
                return "3";
            default:
                return null;
        }
    }
    public String project_type_text(String value) {
        if (value == null || "".equals(value)) {
            return null;
        }
        int valueIndex = value.indexOf("infoType=");
        if (valueIndex == -1) {
            return null;
        }
        int indexOf = value.indexOf(",", valueIndex);
        return value.substring(valueIndex + "infoType=".length(), indexOf);
    }
''';
load script.`smpOnethingModuleExtendContent` as scriptTable;
register ScriptUDF.`scriptTable` as project_type
options lang="java"
and className = "SmpOnethingModuleExtendContent"
and methodName = "project_type";
register ScriptUDF.`scriptTable` as project_type_text
options lang="java"
and className = "SmpOnethingModuleExtendContent"
and methodName = "project_type_text";

数据转换/Select

这个select就和我们mysql中的sql没有区别,他完全兼容 Spark SQL,除了select 句式最后 as 表名

select一张表

select
    unid,
    jointTrialModel,
    jointTrialOvertime
from SmpOnethingJointTrialConfig as res01;

select多张表

select
    SmpOnethingModule.unid,
    SmpOnethingModule.moduleName,
    core_tag.value as module_class_tag_name
from SmpOnethingModule
         left join core_tag on core_tag.unid = SmpOnethingModule.moduleClassTag
    as res01


select使用自带函数

select
    SmpOnethingModule.unid,
    SUBSTRING(SmpOnethingModule.areaCode,1,6) as region_city_code,
from SmpOnethingModule as res01;

select使用上方注册的java函数

select
    SmpOnethingModule.unid,
    project_type(SmpOnethingModule.extendContent) as project_type,
    project_type_text(SmpOnethingModule.extendContent) as project_type_text
from SmpOnethingModule as res01;

select使用自带语句


select
    catalogName,
    catalogType,
    catalogCode,
    legalStandard,
    case status
    when 'Y' then '1'
        when 'N' then '0'
        else null end as status,
    deptName,
    deptUnid,
    exerciseLevel,
    creditCode,
    case onethingType
        when 'L0' then '0'
        when 'L1' then '1'
        else null end as onethingType
from SmpOnethingCatalog as res01;

select使用子查询

select
    SmpOnethingModule.unid,
    (SELECT count(*) FROM SmpOnethingFlowNode WHERE moduleUnid = SmpOnethingModule.unid) as integrate_single_item_num
from SmpOnethingModule as res01;


保存数据/Save


save 句式类似传统 SQL 中的 insert 语法


保存select结果到mysql数据表中

save overwrite res01 as jdbc.`ods_yxjc_prod.ods_SmpOnethingModule` options
and driver="com.mysql.jdbc.Driver"
and url="jdbc:mysql://127.0.0.1:9030/ods_yxjc_prod?useSSL=false&rewriteBatchedStatements=true"
and dbtable="`ods_yxjc_prod`.`ods_SmpOnethingModule`"
and user="fs_nwbj"
and password="123456";

保存的时候清空mysql数据表

-- --save保存
save overwrite res01 as jdbc.`ods_yxjc_prod.ods_SmpOnethingModule` options
and driver="com.mysql.jdbc.Driver"
and url="jdbc:mysql://127.0.0.1:9030/ods_yxjc_prod?useSSL=false&rewriteBatchedStatements=true"
and dbtable="`ods_yxjc_prod`.`ods_SmpOnethingModule`"
and user="fs_nwbj"
and password="123456"
and truncate="true";

更新一条数据

如果主键id一样直接append就会覆盖

save append res01 as jdbc.`dws_yxjc_nw.yxjc_index_full` options 
and driver="com.mysql.jdbc.Driver"
and url="jdbc:mysql://127.0.0.1:9030/dws_yxjc_nw?useSSL=false&rewriteBatchedStatements=true"
and dbtable="`dws_yxjc_nw`.`yxjc_index_full`"
and user="fs_nwbj"
and password="123456"
and truncate="true";

清空Doris数据表

执行DDL语句truncate表格

run command as JDBC.`byzer_demo._` where 
driver="com.mysql.jdbc.Driver"
and url="jdbc:mysql://127.0.0.1:9030/ods_yxjc_prod?useSSL=false&rewriteBatchedStatements=true"
and user="fs_nwbj"
and password="123456"
and `driver-statement-0`="truncate table ods_yxjc_prod.ods_item_material_test";


保存select结果到Doris数据表中

-- 执行写入语句(暂时不支持overwrite .... truncate=true,有需要添加上面那句)
save append res01 as doris.`ods_yxjc_prod.ods_item_material_test` 
and doris.table.identifier="ods_yxjc_prod.ods_item_material_test"
and doris.fenodes="127.0.0.1:8030"
and user="fs_nwbj"
and password="123456" ;


相关实践学习
如何在云端创建MySQL数据库
开始实验后,系统会自动创建一台自建MySQL的 源数据库 ECS 实例和一台 目标数据库 RDS。
全面了解阿里云能为你做什么
阿里云在全球各地部署高效节能的绿色数据中心,利用清洁计算为万物互联的新世界提供源源不断的能源动力,目前开服的区域包括中国(华北、华东、华南、香港)、新加坡、美国(美东、美西)、欧洲、中东、澳大利亚、日本。目前阿里云的产品涵盖弹性计算、数据库、存储与CDN、分析与搜索、云通信、网络、管理与监控、应用服务、互联网中间件、移动服务、视频服务等。通过本课程,来了解阿里云能够为你的业务带来哪些帮助 &nbsp; &nbsp; 相关的阿里云产品:云服务器ECS 云服务器 ECS(Elastic Compute Service)是一种弹性可伸缩的计算服务,助您降低 IT 成本,提升运维效率,使您更专注于核心业务创新。产品详情: https://www.aliyun.com/product/ecs
相关文章
|
算法 C语言 C++
快速入门C++
快速入门C++
86 0
|
Kubernetes 监控 调度
K8S快速入门
K8S快速入门
131 0
|
Kubernetes Linux API
[没接触过kubevirt?]15分钟快速入门kubevirt
什么是kubevirt? kubevirt是一个容器方式运行虚拟机的项目。`kubevirt`是附加`kubernetes`集群上的,它是通过 `CustomResourceDefinition(CRD)`部署到`Kubernetes API`变成资源对象。使用方式类似创建`deploy、pod`......这些资源清单。
4433 0
[没接触过kubevirt?]15分钟快速入门kubevirt
|
6月前
|
存储 Python
PythonOOP快速入门
PythonOOP快速入门
|
7月前
|
算法 数据可视化 Java
Gephi快速入门
Gephi快速入门
|
7月前
|
存储 编译器 Linux
C++:快速入门篇
C++:快速入门篇
77 0
|
Linux Windows
QMQTT快速入门
环境搭建 • 准备一台linux设备和一台windows设备虚拟机也是可以的; • 安装mosquitto ; • 准备QMQTT环境 - windows下;
174 0
|
SQL 负载均衡 NoSQL
DawnSql快速入门
DawnSql开源分布式数据库,快速入门
DawnSql快速入门
|
安全 编译器 C语言
【C++】—— 快速入门(2)
【C++】—— 快速入门(2)
106 0
【C++】—— 快速入门(2)
|
自然语言处理 编译器 Linux
【C++】—— 快速入门(1)
【C++】—— 快速入门(1)
266 0
【C++】—— 快速入门(1)