怎样做多数据源的混合计算

本文涉及的产品
云数据库 MongoDB,独享型 2核8GB
推荐场景:
构建全方位客户视图
云原生数据仓库AnalyticDB MySQL版,基础版 8ACU 100GB 1个月
简介: 早期应用通常只会连接一个数据库,计算也都由数据库完成,基本不存在多数据源混合计算的问题。而现代应用的数据源变得很丰富,同一个应用也可能访问多种数据源,各种 SQL 和 NoSQL 数据库、文本 /XLS、WebService/Restful、Kafka、Hadoop、…。多数据源上的混合计算就是个摆在桌面需要解决的问题了。直接在应用中硬编码实现是很繁琐的,Java 这些常用的应用开发语言很不擅长做这类事,和 SQL 比,简洁性差得很远。

早期应用通常只会连接一个数据库,计算也都由数据库完成,基本不存在多数据源混合计算的问题。而现代应用的数据源变得很丰富,同一个应用也可能访问多种数据源,各种 SQL 和 NoSQL 数据库、文本 /XLS、WebService/Restful、Kafka、Hadoop、…。多数据源上的混合计算就是个摆在桌面需要解决的问题了。

直接在应用中硬编码实现是很繁琐的,Java 这些常用的应用开发语言很不擅长做这类事,和 SQL 比,简洁性差得很远。
把多源数据导入一个数据库再计算也不合适,且不说导入需要时间导致丧失数据实时性,有些数据想要无损地导入关系数据库(比如 Mongodb 支持和多层文档数据)是件非常困难且成本高昂的任务。毕竟这些五花八门的数据源之所以存在都是有其道理的,不可能轻松被关系数据库取代。否则人没必要发明 Mongodb,直接用 MySQL 算了。
逻辑数仓呢?听起来就很沉重。使用前先需要定义元数据来映射这些多样性的数据源,很繁琐。而且大部分逻辑数仓还是 SQL 型的,依然很难无损地映射这些多样性的数据。
那一堆计算框架呢?特别是流计算框架。倒是能接入不少数据源,但计算本身的功能却提供得很少。要么用 SQL,就会有逻辑数仓一样的映射困难问题;要想自由接入各种数据源,就自己用 Java 写计算代码。

面对多数据源上的混合计算问题,esProc SPL 才是个好方法。
esProc SPL 是纯 Java 开发的开源计算引擎,在这里 https://github.com/SPLWare/esProc

esProc SPL 怎么解决这个问题呢?主要是两个方面。

  1. 抽象多样性数据源的访问接口,可以将五花八门的数据映射成少数几个数据对象。
  2. 基于 1 中的数据对象,自行实现足够丰富且不依赖于数据源的计算能力。
    有了这两方面能力,碰到新的数据源只要封装一下接口,不断补充就可以了。

esProc SPL 提供了两个基本的数据对象:序表和游标,分别对应内存数据表和流式数据表。
包括关系数据库在内,几乎所有的数据源都会提供返回这两种数据对象的接口:小数据一次性读出,使用内存数据表(序表);大数据要逐步返回,使用流式数据表(游标)。有了这两种数据对象,就可以覆盖几乎所有的数据源了。
这样,不需要事先定义元数据做映射,直接使用数据源本身提供的方法来访问数据,然后封装成这两种数据对象之一即可。这样可以保留数据源的特点,充分利用其存储和计算能力。当然更不需要先把数据做“某种”入库动作,实时访问就可以。这两种数据对象就是多样性数据源访问接口共有的能力,而逻辑数仓采用的映射数据表方法并没有正确抽象出多样性数据源的公共特征,适用面要窄很多。
需要特别指出的是,SPL 的序表和游标都支持多层结构化数据以及文本数据,这就能接收和处理 json 数据(或其二进制变种)。

看一些例子:
关系数据库,A2 返回序表,A3 返回游标

A

1 =connect("MyCompany")
2 =A1.query("select from employees order by hire_date asc limit 100")
3 =A1.cursor("select
from salaries")
4 >A1.close()
本地文件,A1/A3 返回序表,A2 返回游标

A

1 =T("Orders.csv")
2 =file("sales.txt").cursor@t()
3 =file("Orders.xls").xlsimport@t()
Restful,A1 返回 json 格式的文本

A

1 =httpfile("http://127.0.0.1:6868/restful/emp_orders").read()
2 =json(A1)
Elastic Search

A

1 >apikey="Authorization:ApiKey a2x6aEF……KZ29rT2hoQQ=="
2
'{

"counter" : 1,

"tags" : ["red"]

,"beginTime":"2022-01-03"

,"endTime":"2022-02-15"

}

3 =es_rest("https://localhost:9200/index1/_doc/1", "PUT",A2;"Content-Type: application/x-ndjson",apikey)
Mongodb,A2 返回序表,A3 返回游标

A

1 =mongo_open("mongodb://127.0.0.1:27017/mymongo")
2 =mongo_shell(A1,"{'find':'orders',filter:{OrderID: {$gte: 50}},batchSize:100}")
3 =mongo_shell@dc(A1,"{'find':'orders',filter:{OrderID: { $gte: 50}},batchSize:10}")
4 =mongo_close(A1)
Kafka,A2 返回含有 json 数据的序表,A3 返回游标

A

1 =kafka_open("/kafka/my.properties", "topic1")
2 =kafka_poll(A1)
3 =kafka_poll@c(A1)
4 =kafka_close(A1)
HBase,A2/A3 返回序表,A4 返回游标

A

1 =hbase_open("hdfs://192.168.0.8", "192.168.0.8")
2 =hbase_get(A1,"Orders","row1","datas:Amount":number:amt,"datas:OrderDate": date:od)
3 =hbase_scan(A1,"Orders")
4 =hbase_scan@c(A1,"Orders")
5 =hbase_close(A1)
esProc SPL 封装过的数据源已经很多,还在不断增加中:

esProc SPL 针对序表提供了完善的计算能力,包括过滤、分组、排序、连接等,丰富程度还远远超过 SQL,大部分运算只要一句就可以完成:

Filter:T.select(Amount>1000 && Amount<=3000 && like(Client,"s"))
Sort:T.sort(Client,-Amount)
Distinct:T.id(Client)
Group:T.groups(year(OrderDate);sum(Amount))
Join:join(T1:O,SellerId; T2:E,EId)
TopN:T.top(-3;Amount)
TopN in group:T.groups(Client;top(3,Amount))
游标上也有类似的计算,语法几乎完全一样,这里就不详细举例了。感兴趣的小伙伴可以参考 esProc SPL 官网上的材料。

在这些基础上,混合计算就非常容易实现了:

两个关系数据库

A

1 =oracle.query("select EId,Name from employees")
2 =mysql.query("select SellerId, sum(Amount) subtotal from Orders group by SellerId")
3 =join(A1:O,SellerId; A2:E,EId)
4 =A3.new(O.Name,E.subtotal)
关系数据库与 json

A

1 =json(file("/data/EO.json").read())
2 =A1.conj(Orders)
3 =A2.select(Amount>1000 &&Amount<=3000 && like@c(Client,"s"))
4 =db.query@x("select ID,Name,Area from Client")
5 =join(A3:o,Client;A4:c,ID)
Mongodb 和关系数据库

A

1 =mongo_open("mongodb://127.0.0.1:27017/mongo")
2 =mongo_shell(A1,"test1.find()")
3 =A2.new(Orders.OrderID,Orders.Client,Name,Gender,Dept).fetch()
4 =mongo_close(A1)
5 =db.query@x("select ID,Name,Area from Client")
6 =join(A3:o, Orders.Client;A4:c,ID)
Restful 和本地文本文件

A

1 =httpfile("http://127.0.0.1:6868/api/getData").read()
2 =json(A1)
3 =T("/data/Client.csv")
4 =join(A2:o,Client;A3:c,ClientID)
有了混合计算能力,还可以顺便解决 T+0 计算。

单体的 TP 数据库天然就支持 T+0 计算。数据积累太多时,会影响 TP 数据库的性能。这时候通常会把一部分历史数据移到专业的 AP 数据库中,也就是冷热分离,TP 数据库只保存近期产生的热数据,AP 数据库保存历史冷数据,TP 数据库压力减轻后即可流畅运行。

但这样,想做实时全量统计时就需要跨库计算了,而这一直是个麻烦事,特别是面对异构数据库(TP 库和 AP 库常常不是同一种)就更困难。有了 esProc SPL 这种混合数据源计算能力,这个问题就可以轻松解决了。

A    B

1 =[[connect@l("oracle"),"ORACLE"],[connect@l("hive"),"HIVE"]]
2 =SQL="select month(orderdate) ordermonth,sellerid,sum(amount) samount,count(amount) camount from sales group by month(orderdate),sellerid"
3 fork A1 =SQL.sqltranslate(A3(2))
4
=A3(1).query(B3)
5 =A3.conj().groups(ordermonth,sellerid;sum(samount):totalamount,sum(camount):totalcount)
TP 数据库 Oracle 和 AP 数据库 Hive 之间的混合运算,SPL 还能把 SQL 转换成不同数据库的方言语法。

那么,esProc SPL 写出来的代码如何集成到应用程序中呢?
很简单,esProc 提供了标准的 JDBC 驱动,被 Java 程序引入后,就可以使用 SPL 语句了,和调用数据库 SQL 一样。

Class.forName("com.esproc.jdbc.InternalDriver");
Connection conn =DriverManager.getConnection("jdbc:esproc:local://");
Statement statement = conn.createStatement();
ResultSet result = statement.executeQuery("=json(file(\"Orders.csv\")).select(Amount>1000 && like(Client,\"s\")
较复杂的 SPL 脚本可以存成文件,然后就像调用存储过程一样:

Class.forName("com.esproc.jdbc.InternalDriver");
Connection conn =DriverManager.getConnection("jdbc:esproc:local://");
CallableStatement statement = conn.prepareCall("call queryOrders()");
statement.execute();
这相当于提供了一个没有存储且不用 SQL 的逻辑数据库。

相关实践学习
AnalyticDB MySQL海量数据秒级分析体验
快速上手AnalyticDB MySQL,玩转SQL开发等功能!本教程介绍如何在AnalyticDB MySQL中,一键加载内置数据集,并基于自动生成的查询脚本,运行复杂查询语句,秒级生成查询结果。
阿里云云原生数据仓库AnalyticDB MySQL版 使用教程
云原生数据仓库AnalyticDB MySQL版是一种支持高并发低延时查询的新一代云原生数据仓库,高度兼容MySQL协议以及SQL:92、SQL:99、SQL:2003标准,可以对海量数据进行即时的多维分析透视和业务探索,快速构建企业云上数据仓库。 了解产品 https://www.aliyun.com/product/ApsaraDB/ads
相关文章
|
6月前
|
存储 JSON 数据格式
通用研发提效问题之存储模型选择存储方式,如何解决
通用研发提效问题之存储模型选择存储方式,如何解决
|
6月前
|
Java Spring
通用研发提效问题之配置的若干场景下若干方案的变化该如何解决
通用研发提效问题之配置的若干场景下若干方案的变化该如何解决
|
8月前
|
调度
【核心完整复现】基于目标级联法的微网群多主体分布式优化调度
【核心完整复现】基于目标级联法的微网群多主体分布式优化调度
|
算法 调度
基于目标级联法的微网群多主体分布式优化调度(Matlab代码实现)
基于目标级联法的微网群多主体分布式优化调度(Matlab代码实现)
200 0
|
负载均衡 NoSQL Java
任务调度系统就该这么设计(万能通用),稳的一批! 上
任务调度系统就该这么设计(万能通用),稳的一批!上
|
消息中间件 资源调度 分布式计算
任务调度系统就该这么设计(万能通用),稳的一批! 下
任务调度系统就该这么设计(万能通用),稳的一批! 下
利用Dynamo和自适应常规模型建立多联空心板桥
利用Dynamo和自适应常规模型建立多联空心板桥
利用Dynamo和自适应常规模型建立多联空心板桥
|
安全 自动驾驶 中间件
为啥AUTOSAR自适应平台是一个安全的标准化软件框架
为啥AUTOSAR自适应平台是一个安全的标准化软件框架
为啥AUTOSAR自适应平台是一个安全的标准化软件框架
|
存储 算法 关系型数据库
「数据密集型系统搭建」原理篇|数据类型不怕精挑细选
本篇围绕MySQL数据库的底层存储模型、列类型来聊聊数据库表设计及建模中要注意的事项,剖析最根源的底层物理存储文件,用最真实的数据剖析来证明和解答开发过程中的疑惑。
166 0
「数据密集型系统搭建」原理篇|数据类型不怕精挑细选
|
数据采集 消息中间件 监控
最终整体回顾(代码-离线计算)|学习笔记
快速学习最终整体回顾(代码-离线计算)