MLSQL(Byzer)的快速入门

本文涉及的产品
RDS MySQL DuckDB 分析主实例,基础系列 4核8GB
RDS MySQL DuckDB 分析主实例,集群系列 4核8GB
RDS AI 助手,专业版
简介: MLSQL(Byzer)的快速入门

建议在阅读本教程之前先阅读官方文档

https://docs.byzer.org/#/byzer-lang/zh-cn/


b849bd45ae334ee3a05d0db8a257cde1.png

数据加载/Load


load,代表读入数据源的行为


load一张mysql表

load jdbc.`stg_sxyxpz_nw.DecDeclareFlowConf` options and driver="com.mysql.jdbc.Driver"
and url="jdbc:mysql://127.0.0.1:9030/stg_sxyxpz_nw?useSSL=false"
and dbtable="`stg_sxyxpz_nw`.`DecDeclareFlowConf`"
and user="fs_nwbj"
and password="123456"
as DecDeclareFlowConf;

load多张mysql表

-- 01加载数据源
connect jdbc where
driver="com.mysql.jdbc.Driver"
and url="jdbc:mysql://127.0.0.1:9030/ods_yxjc_prod?useSSL=false"
and user="fs_nwbj"
and password="123456"
 as ods_yxjc_prod;
load jdbc.`ods_yxjc_prod.ods_SmpOnethingModule` as ods_SmpOnethingModule;
load jdbc.`ods_yxjc_prod.ods_SmpOnethingCatalog` as ods_SmpOnethingCatalog;

load的时候添加原生语句的支持

-- 01加载数据源t_item_implement
connect jdbc where 
driver="com.mysql.jdbc.Driver"
and url="jdbc:mysql://127.0.0.1:9030/stg_sx?useSSL=false"
and user="fs_nwbj"
and password="123456"
 as stg_sx;
load jdbc.`stg_sx.t_item_implement` where
directQuery = '''
SELECT * FROM t_item_implement WHERE `STATUS` = "1" and IN_CURRENT = "1" AND ITEM_TYPE = "01"
'''


这样就避免了全量加载导致的OOM

load一张Doris

load doris.`stg_sx.t_item_material`   --注意表名和doris.table.identifier一致
options doris.table.identifier="stg_sx.t_item_material"  --需指定,与上一行要load的表名一致
and doris.fenodes="127.0.0.1:8030"
and user="fs_nwbj"
and password="123456" 
and doris.request.tablet.size="3"  -- 可选参数
as t_item_material;


可选参数说明:

1)doris.request.tablet.size

默认值:

Integer.MAX_VALUE

说明:

一个RDD Partition对应的Doris Tablet个数。

此数值设置越小,则会生成越多的Partition。从而提升Spark侧的并行度,但同时会对Doris造成更大的压力。

(如一个表的tablet=10,设置的doris.request.tablet.size=3,则spark按4个并行度进行数据读取(10/3=3.3≈4),即读取数据的时候可按tablet分成4个并行任务,提高效率,但“同时会对Doris造成更大的压力”,适当调整)


load多张Doris表


load doris.`ods_yxjc_prod.ods_SmpOnethingFlowNode`   --注意表名和doris.table.identifier一致
options doris.table.identifier="ods_yxjc_prod.ods_SmpOnethingFlowNode"  --需指定,与上一行要load的表名一致
and doris.fenodes="127.0.0.1:8030"
and user="fs_nwbj"
and password="123456"
and doris.request.tablet.size="3"
as ods_SmpOnethingFlowNode;
load doris.`ods_yxjc_prod.ods_SmpOnethingHditmDir`   --注意表名和doris.table.identifier一致
options doris.table.identifier="ods_yxjc_prod.ods_SmpOnethingHditmDir"  --需指定,与上一行要load的表名一致
and doris.fenodes="127.0.0.1:8030"
and user="fs_nwbj"
and password="123456"
and doris.request.tablet.size="3"
as ods_SmpOnethingHditmDir;



每张表都是一个源,需要单独写连接和用户密码,不支持一个connetor重复使用load表

注册函数,模型/Register


Register动态注册 Java/Scala 写的 UDF/UDAF 函数


注册一个java函数


-- 设置java函数
set decDeclareProxyAcceptConfighandleScope ='''
public class DecDeclareProxyAcceptConfighandleScope {
    public String is_full_city_handle(String value) {
        if (value == null || "".equals(value)) {
            return "0";
        }
        return value.contains("city") ? "1" : "0";
    }
}
''';
load script.`decDeclareProxyAcceptConfighandleScope` as scriptTable;
register ScriptUDF.`scriptTable` as is_full_city_handle
options lang="java"
and className = "DecDeclareProxyAcceptConfighandleScope"
and methodName = "is_full_city_handle";


注册多个java函数

-- 设置java函数
set smpOnethingModuleExtendContent ='''
public class SmpOnethingModuleExtendContent {
    public String project_type(String value) {
        if (value == null || "".equals(value)) {
            return null;
        }
        int valueIndex = value.indexOf("infoType=");
        int indexOf = value.indexOf(",", valueIndex);
        int beginIndex = valueIndex + "infoType=".length();
        if (indexOf < beginIndex) {
            return null;
        }
        String substring = value.substring(beginIndex, indexOf);
        switch (substring) {
            case "承诺上报件":
                return "0";
            case "承诺件":
                return "1";
            case "即办件":
                return "2";
            case "联办件":
                return "3";
            default:
                return null;
        }
    }
    public String project_type_text(String value) {
        if (value == null || "".equals(value)) {
            return null;
        }
        int valueIndex = value.indexOf("infoType=");
        if (valueIndex == -1) {
            return null;
        }
        int indexOf = value.indexOf(",", valueIndex);
        return value.substring(valueIndex + "infoType=".length(), indexOf);
    }
''';
load script.`smpOnethingModuleExtendContent` as scriptTable;
register ScriptUDF.`scriptTable` as project_type
options lang="java"
and className = "SmpOnethingModuleExtendContent"
and methodName = "project_type";
register ScriptUDF.`scriptTable` as project_type_text
options lang="java"
and className = "SmpOnethingModuleExtendContent"
and methodName = "project_type_text";

数据转换/Select

这个select就和我们mysql中的sql没有区别,他完全兼容 Spark SQL,除了select 句式最后 as 表名

select一张表

select
    unid,
    jointTrialModel,
    jointTrialOvertime
from SmpOnethingJointTrialConfig as res01;

select多张表

select
    SmpOnethingModule.unid,
    SmpOnethingModule.moduleName,
    core_tag.value as module_class_tag_name
from SmpOnethingModule
         left join core_tag on core_tag.unid = SmpOnethingModule.moduleClassTag
    as res01


select使用自带函数

select
    SmpOnethingModule.unid,
    SUBSTRING(SmpOnethingModule.areaCode,1,6) as region_city_code,
from SmpOnethingModule as res01;

select使用上方注册的java函数

select
    SmpOnethingModule.unid,
    project_type(SmpOnethingModule.extendContent) as project_type,
    project_type_text(SmpOnethingModule.extendContent) as project_type_text
from SmpOnethingModule as res01;

select使用自带语句


select
    catalogName,
    catalogType,
    catalogCode,
    legalStandard,
    case status
    when 'Y' then '1'
        when 'N' then '0'
        else null end as status,
    deptName,
    deptUnid,
    exerciseLevel,
    creditCode,
    case onethingType
        when 'L0' then '0'
        when 'L1' then '1'
        else null end as onethingType
from SmpOnethingCatalog as res01;

select使用子查询

select
    SmpOnethingModule.unid,
    (SELECT count(*) FROM SmpOnethingFlowNode WHERE moduleUnid = SmpOnethingModule.unid) as integrate_single_item_num
from SmpOnethingModule as res01;


保存数据/Save


save 句式类似传统 SQL 中的 insert 语法


保存select结果到mysql数据表中

save overwrite res01 as jdbc.`ods_yxjc_prod.ods_SmpOnethingModule` options
and driver="com.mysql.jdbc.Driver"
and url="jdbc:mysql://127.0.0.1:9030/ods_yxjc_prod?useSSL=false&rewriteBatchedStatements=true"
and dbtable="`ods_yxjc_prod`.`ods_SmpOnethingModule`"
and user="fs_nwbj"
and password="123456";

保存的时候清空mysql数据表

-- --save保存
save overwrite res01 as jdbc.`ods_yxjc_prod.ods_SmpOnethingModule` options
and driver="com.mysql.jdbc.Driver"
and url="jdbc:mysql://127.0.0.1:9030/ods_yxjc_prod?useSSL=false&rewriteBatchedStatements=true"
and dbtable="`ods_yxjc_prod`.`ods_SmpOnethingModule`"
and user="fs_nwbj"
and password="123456"
and truncate="true";

更新一条数据

如果主键id一样直接append就会覆盖

save append res01 as jdbc.`dws_yxjc_nw.yxjc_index_full` options 
and driver="com.mysql.jdbc.Driver"
and url="jdbc:mysql://127.0.0.1:9030/dws_yxjc_nw?useSSL=false&rewriteBatchedStatements=true"
and dbtable="`dws_yxjc_nw`.`yxjc_index_full`"
and user="fs_nwbj"
and password="123456"
and truncate="true";

清空Doris数据表

执行DDL语句truncate表格

run command as JDBC.`byzer_demo._` where 
driver="com.mysql.jdbc.Driver"
and url="jdbc:mysql://127.0.0.1:9030/ods_yxjc_prod?useSSL=false&rewriteBatchedStatements=true"
and user="fs_nwbj"
and password="123456"
and `driver-statement-0`="truncate table ods_yxjc_prod.ods_item_material_test";


保存select结果到Doris数据表中

-- 执行写入语句(暂时不支持overwrite .... truncate=true,有需要添加上面那句)
save append res01 as doris.`ods_yxjc_prod.ods_item_material_test` 
and doris.table.identifier="ods_yxjc_prod.ods_item_material_test"
and doris.fenodes="127.0.0.1:8030"
and user="fs_nwbj"
and password="123456" ;


相关实践学习
如何快速连接云数据库RDS MySQL
本场景介绍如何通过阿里云数据管理服务DMS快速连接云数据库RDS MySQL,然后进行数据表的CRUD操作。
MySQL数据库入门学习
本课程通过最流行的开源数据库MySQL带你了解数据库的世界。 &nbsp; 相关的阿里云产品:云数据库RDS MySQL 版 阿里云关系型数据库RDS(Relational Database Service)是一种稳定可靠、可弹性伸缩的在线数据库服务,提供容灾、备份、恢复、迁移等方面的全套解决方案,彻底解决数据库运维的烦恼。 了解产品详情:&nbsp;https://www.aliyun.com/product/rds/mysql&nbsp;
相关文章
|
8月前
|
存储 监控 NoSQL
【干货满满】电商API数据抓取实战:从商品信息到订单管理全链路实现
本文详解构建电商API数据抓取系统,涵盖商品采集、订单管理、防封策略、数据存储与分析,适用于价格监控、供应链管理等场景。
|
12月前
|
JavaScript Linux 网络安全
Termux安卓终端美化与开发实战:从下载到插件优化,小白也能玩转Linux
Termux是一款安卓平台上的开源终端模拟器,支持apt包管理、SSH连接及Python/Node.js/C++开发环境搭建,被誉为“手机上的Linux系统”。其特点包括零ROOT权限、跨平台开发和强大扩展性。本文详细介绍其安装准备、基础与高级环境配置、必备插件推荐、常见问题解决方法以及延伸学习资源,帮助用户充分利用Termux进行开发与学习。适用于Android 7+设备,原创内容转载请注明来源。
3282 77
|
SQL 存储 关系型数据库
IDEA中居然有碾压Navicat的数据库管理工具
【8月更文挑战第12天】IDEA中居然有碾压Navicat的数据库管理工具
865 3
IDEA中居然有碾压Navicat的数据库管理工具
|
资源调度 分布式计算 Kubernetes
Koordinator 支持 K8s 与 YARN 混部,小红书在离线混部实践分享
Koordinator 支持 K8s 与 YARN 混部,小红书在离线混部实践分享
一文搞懂正则表达式之零宽断言
零宽断言: 用于查找在某些内容之前或之后的东西,也就是说它们像\b,^,$那样用于指定一个位置,这个位置应该满足一定的条件(即断言),因此它们也被称为零宽断言。
768 0
一文搞懂正则表达式之零宽断言
|
消息中间件 存储 缓存
RabbitMQ 集群和镜像队列
【1月更文挑战第11天】 一、clustering(集群) 1、使用集群的原因 2、搭建步骤 2.1、拉取镜像 2.2、创建三个RabbitMQ容器节点 2.3、集群搭建 二、镜像队列 1、使用镜像的原因 2、搭建步骤
861 88
|
数据安全/隐私保护
vuex数据持久化、加密(vuex-persistedstate、secure-ls)
本文介绍了如何在Vuex中使用`vuex-persistedstate`和`secure-ls`库进行数据的持久化和加密,确保在Vite打包上线后,Vuex中的数据安全。
523 1
|
中间件 编译器 数据处理
在web开发中应用管道过滤器
【9月更文挑战第1天】本文介绍管道-过滤器架构将数据处理流程分解为一系列独立组件,通过管道连接,适用于数据流处理如图像处理、编译器设计等。通过具体实例说明了Gin如何有效支持管道-过滤器风格的设计,构建高性能Web服务。
341 10
|
JavaScript 前端开发 小程序
uniapp一个人开发APP关键步骤和考虑因素
uniapp一个人开发APP关键步骤和考虑因素
442 1
uniapp一个人开发APP关键步骤和考虑因素
|
机器学习/深度学习 分布式计算 算法
【大数据分析&机器学习】分布式机器学习
本文主要介绍分布式机器学习基础知识,并介绍主流的分布式机器学习框架,结合实例介绍一些机器学习算法。

热门文章

最新文章