MLSQL(Byzer)的快速入门

本文涉及的产品
RDS MySQL DuckDB 分析主实例,集群系列 4核8GB
简介: MLSQL(Byzer)的快速入门

建议在阅读本教程之前先阅读官方文档

https://docs.byzer.org/#/byzer-lang/zh-cn/


b849bd45ae334ee3a05d0db8a257cde1.png

数据加载/Load


load,代表读入数据源的行为


load一张mysql表

load jdbc.`stg_sxyxpz_nw.DecDeclareFlowConf` options and driver="com.mysql.jdbc.Driver"
and url="jdbc:mysql://127.0.0.1:9030/stg_sxyxpz_nw?useSSL=false"
and dbtable="`stg_sxyxpz_nw`.`DecDeclareFlowConf`"
and user="fs_nwbj"
and password="123456"
as DecDeclareFlowConf;

load多张mysql表

-- 01加载数据源
connect jdbc where
driver="com.mysql.jdbc.Driver"
and url="jdbc:mysql://127.0.0.1:9030/ods_yxjc_prod?useSSL=false"
and user="fs_nwbj"
and password="123456"
 as ods_yxjc_prod;
load jdbc.`ods_yxjc_prod.ods_SmpOnethingModule` as ods_SmpOnethingModule;
load jdbc.`ods_yxjc_prod.ods_SmpOnethingCatalog` as ods_SmpOnethingCatalog;

load的时候添加原生语句的支持

-- 01加载数据源t_item_implement
connect jdbc where 
driver="com.mysql.jdbc.Driver"
and url="jdbc:mysql://127.0.0.1:9030/stg_sx?useSSL=false"
and user="fs_nwbj"
and password="123456"
 as stg_sx;
load jdbc.`stg_sx.t_item_implement` where
directQuery = '''
SELECT * FROM t_item_implement WHERE `STATUS` = "1" and IN_CURRENT = "1" AND ITEM_TYPE = "01"
'''


这样就避免了全量加载导致的OOM

load一张Doris

load doris.`stg_sx.t_item_material`   --注意表名和doris.table.identifier一致
options doris.table.identifier="stg_sx.t_item_material"  --需指定,与上一行要load的表名一致
and doris.fenodes="127.0.0.1:8030"
and user="fs_nwbj"
and password="123456" 
and doris.request.tablet.size="3"  -- 可选参数
as t_item_material;


可选参数说明:

1)doris.request.tablet.size

默认值:

Integer.MAX_VALUE

说明:

一个RDD Partition对应的Doris Tablet个数。

此数值设置越小,则会生成越多的Partition。从而提升Spark侧的并行度,但同时会对Doris造成更大的压力。

(如一个表的tablet=10,设置的doris.request.tablet.size=3,则spark按4个并行度进行数据读取(10/3=3.3≈4),即读取数据的时候可按tablet分成4个并行任务,提高效率,但“同时会对Doris造成更大的压力”,适当调整)


load多张Doris表


load doris.`ods_yxjc_prod.ods_SmpOnethingFlowNode`   --注意表名和doris.table.identifier一致
options doris.table.identifier="ods_yxjc_prod.ods_SmpOnethingFlowNode"  --需指定,与上一行要load的表名一致
and doris.fenodes="127.0.0.1:8030"
and user="fs_nwbj"
and password="123456"
and doris.request.tablet.size="3"
as ods_SmpOnethingFlowNode;
load doris.`ods_yxjc_prod.ods_SmpOnethingHditmDir`   --注意表名和doris.table.identifier一致
options doris.table.identifier="ods_yxjc_prod.ods_SmpOnethingHditmDir"  --需指定,与上一行要load的表名一致
and doris.fenodes="127.0.0.1:8030"
and user="fs_nwbj"
and password="123456"
and doris.request.tablet.size="3"
as ods_SmpOnethingHditmDir;



每张表都是一个源,需要单独写连接和用户密码,不支持一个connetor重复使用load表

注册函数,模型/Register


Register动态注册 Java/Scala 写的 UDF/UDAF 函数


注册一个java函数


-- 设置java函数
set decDeclareProxyAcceptConfighandleScope ='''
public class DecDeclareProxyAcceptConfighandleScope {
    public String is_full_city_handle(String value) {
        if (value == null || "".equals(value)) {
            return "0";
        }
        return value.contains("city") ? "1" : "0";
    }
}
''';
load script.`decDeclareProxyAcceptConfighandleScope` as scriptTable;
register ScriptUDF.`scriptTable` as is_full_city_handle
options lang="java"
and className = "DecDeclareProxyAcceptConfighandleScope"
and methodName = "is_full_city_handle";


注册多个java函数

-- 设置java函数
set smpOnethingModuleExtendContent ='''
public class SmpOnethingModuleExtendContent {
    public String project_type(String value) {
        if (value == null || "".equals(value)) {
            return null;
        }
        int valueIndex = value.indexOf("infoType=");
        int indexOf = value.indexOf(",", valueIndex);
        int beginIndex = valueIndex + "infoType=".length();
        if (indexOf < beginIndex) {
            return null;
        }
        String substring = value.substring(beginIndex, indexOf);
        switch (substring) {
            case "承诺上报件":
                return "0";
            case "承诺件":
                return "1";
            case "即办件":
                return "2";
            case "联办件":
                return "3";
            default:
                return null;
        }
    }
    public String project_type_text(String value) {
        if (value == null || "".equals(value)) {
            return null;
        }
        int valueIndex = value.indexOf("infoType=");
        if (valueIndex == -1) {
            return null;
        }
        int indexOf = value.indexOf(",", valueIndex);
        return value.substring(valueIndex + "infoType=".length(), indexOf);
    }
''';
load script.`smpOnethingModuleExtendContent` as scriptTable;
register ScriptUDF.`scriptTable` as project_type
options lang="java"
and className = "SmpOnethingModuleExtendContent"
and methodName = "project_type";
register ScriptUDF.`scriptTable` as project_type_text
options lang="java"
and className = "SmpOnethingModuleExtendContent"
and methodName = "project_type_text";

数据转换/Select

这个select就和我们mysql中的sql没有区别,他完全兼容 Spark SQL,除了select 句式最后 as 表名

select一张表

select
    unid,
    jointTrialModel,
    jointTrialOvertime
from SmpOnethingJointTrialConfig as res01;

select多张表

select
    SmpOnethingModule.unid,
    SmpOnethingModule.moduleName,
    core_tag.value as module_class_tag_name
from SmpOnethingModule
         left join core_tag on core_tag.unid = SmpOnethingModule.moduleClassTag
    as res01


select使用自带函数

select
    SmpOnethingModule.unid,
    SUBSTRING(SmpOnethingModule.areaCode,1,6) as region_city_code,
from SmpOnethingModule as res01;

select使用上方注册的java函数

select
    SmpOnethingModule.unid,
    project_type(SmpOnethingModule.extendContent) as project_type,
    project_type_text(SmpOnethingModule.extendContent) as project_type_text
from SmpOnethingModule as res01;

select使用自带语句


select
    catalogName,
    catalogType,
    catalogCode,
    legalStandard,
    case status
    when 'Y' then '1'
        when 'N' then '0'
        else null end as status,
    deptName,
    deptUnid,
    exerciseLevel,
    creditCode,
    case onethingType
        when 'L0' then '0'
        when 'L1' then '1'
        else null end as onethingType
from SmpOnethingCatalog as res01;

select使用子查询

select
    SmpOnethingModule.unid,
    (SELECT count(*) FROM SmpOnethingFlowNode WHERE moduleUnid = SmpOnethingModule.unid) as integrate_single_item_num
from SmpOnethingModule as res01;


保存数据/Save


save 句式类似传统 SQL 中的 insert 语法


保存select结果到mysql数据表中

save overwrite res01 as jdbc.`ods_yxjc_prod.ods_SmpOnethingModule` options
and driver="com.mysql.jdbc.Driver"
and url="jdbc:mysql://127.0.0.1:9030/ods_yxjc_prod?useSSL=false&rewriteBatchedStatements=true"
and dbtable="`ods_yxjc_prod`.`ods_SmpOnethingModule`"
and user="fs_nwbj"
and password="123456";

保存的时候清空mysql数据表

-- --save保存
save overwrite res01 as jdbc.`ods_yxjc_prod.ods_SmpOnethingModule` options
and driver="com.mysql.jdbc.Driver"
and url="jdbc:mysql://127.0.0.1:9030/ods_yxjc_prod?useSSL=false&rewriteBatchedStatements=true"
and dbtable="`ods_yxjc_prod`.`ods_SmpOnethingModule`"
and user="fs_nwbj"
and password="123456"
and truncate="true";

更新一条数据

如果主键id一样直接append就会覆盖

save append res01 as jdbc.`dws_yxjc_nw.yxjc_index_full` options 
and driver="com.mysql.jdbc.Driver"
and url="jdbc:mysql://127.0.0.1:9030/dws_yxjc_nw?useSSL=false&rewriteBatchedStatements=true"
and dbtable="`dws_yxjc_nw`.`yxjc_index_full`"
and user="fs_nwbj"
and password="123456"
and truncate="true";

清空Doris数据表

执行DDL语句truncate表格

run command as JDBC.`byzer_demo._` where 
driver="com.mysql.jdbc.Driver"
and url="jdbc:mysql://127.0.0.1:9030/ods_yxjc_prod?useSSL=false&rewriteBatchedStatements=true"
and user="fs_nwbj"
and password="123456"
and `driver-statement-0`="truncate table ods_yxjc_prod.ods_item_material_test";


保存select结果到Doris数据表中

-- 执行写入语句(暂时不支持overwrite .... truncate=true,有需要添加上面那句)
save append res01 as doris.`ods_yxjc_prod.ods_item_material_test` 
and doris.table.identifier="ods_yxjc_prod.ods_item_material_test"
and doris.fenodes="127.0.0.1:8030"
and user="fs_nwbj"
and password="123456" ;


相关实践学习
每个IT人都想学的“Web应用上云经典架构”实战
本实验从Web应用上云这个最基本的、最普遍的需求出发,帮助IT从业者们通过“阿里云Web应用上云解决方案”,了解一个企业级Web应用上云的常见架构,了解如何构建一个高可用、可扩展的企业级应用架构。
MySQL数据库入门学习
本课程通过最流行的开源数据库MySQL带你了解数据库的世界。 &nbsp; 相关的阿里云产品:云数据库RDS MySQL 版 阿里云关系型数据库RDS(Relational Database Service)是一种稳定可靠、可弹性伸缩的在线数据库服务,提供容灾、备份、恢复、迁移等方面的全套解决方案,彻底解决数据库运维的烦恼。 了解产品详情:&nbsp;https://www.aliyun.com/product/rds/mysql&nbsp;
相关文章
|
网络协议 Linux 网络安全
openstack 云平台一体化部署(超详细)
openstack 云平台一体化部署(超详细)
2264 0
openstack 云平台一体化部署(超详细)
|
6月前
|
存储 监控 NoSQL
【干货满满】电商API数据抓取实战:从商品信息到订单管理全链路实现
本文详解构建电商API数据抓取系统,涵盖商品采集、订单管理、防封策略、数据存储与分析,适用于价格监控、供应链管理等场景。
|
10月前
|
JavaScript Linux 网络安全
Termux安卓终端美化与开发实战:从下载到插件优化,小白也能玩转Linux
Termux是一款安卓平台上的开源终端模拟器,支持apt包管理、SSH连接及Python/Node.js/C++开发环境搭建,被誉为“手机上的Linux系统”。其特点包括零ROOT权限、跨平台开发和强大扩展性。本文详细介绍其安装准备、基础与高级环境配置、必备插件推荐、常见问题解决方法以及延伸学习资源,帮助用户充分利用Termux进行开发与学习。适用于Android 7+设备,原创内容转载请注明来源。
2642 77
|
SQL 存储 关系型数据库
IDEA中居然有碾压Navicat的数据库管理工具
【8月更文挑战第12天】IDEA中居然有碾压Navicat的数据库管理工具
745 3
IDEA中居然有碾压Navicat的数据库管理工具
一文搞懂正则表达式之零宽断言
零宽断言: 用于查找在某些内容之前或之后的东西,也就是说它们像\b,^,$那样用于指定一个位置,这个位置应该满足一定的条件(即断言),因此它们也被称为零宽断言。
670 1
一文搞懂正则表达式之零宽断言
|
自然语言处理 Java 关系型数据库
ElasticSearch 实现分词全文检索 - 聚合查询 cardinality
ElasticSearch 实现分词全文检索 - 聚合查询 cardinality
451 1
|
数据安全/隐私保护
vuex数据持久化、加密(vuex-persistedstate、secure-ls)
本文介绍了如何在Vuex中使用`vuex-persistedstate`和`secure-ls`库进行数据的持久化和加密,确保在Vite打包上线后,Vuex中的数据安全。
491 1
|
中间件 编译器 数据处理
在web开发中应用管道过滤器
【9月更文挑战第1天】本文介绍管道-过滤器架构将数据处理流程分解为一系列独立组件,通过管道连接,适用于数据流处理如图像处理、编译器设计等。通过具体实例说明了Gin如何有效支持管道-过滤器风格的设计,构建高性能Web服务。
309 10
|
机器学习/深度学习 分布式计算 算法
【大数据分析&机器学习】分布式机器学习
本文主要介绍分布式机器学习基础知识,并介绍主流的分布式机器学习框架,结合实例介绍一些机器学习算法。
1637 6
|
JavaScript 前端开发 小程序
uniapp一个人开发APP关键步骤和考虑因素
uniapp一个人开发APP关键步骤和考虑因素
401 1
uniapp一个人开发APP关键步骤和考虑因素