Quicksql
为什么要统一sql?
数据作业是多兵种合作的战场
公司内部围绕数据开展的工作,需要数据仓库工程师、数据分析师、数据研发工程师、运维工程师、甚至产品运营人员共同完成,沟通和协作的效率成为关键性因素。而sql语言也是大家相对更容易理解和接受,最易上手的查询语言。
数据存储引擎眼花缭乱
随着大数据行业的发展,数据存储引擎目前处于百花齐放的阶段,新兴技术层出不穷:如关系型数据库,包括MySQL、Oracle、SQL Server以及蚂蚁推出的OceanBase等;NoSQL方案,包括Redis、Aerospike、MongoDB等;基于Hadoop体系的方案,包括Hive、HBase等;以及目前比较火热的NewSQL方向,包括Greenplum、TiDB、Doris、ClickHouse等。
各式各样的存储引擎让不少参与数据作业的人感到茫然,不知道该选择什么样的方式。开发者要花非常多的成本不停地尝试各种方案,以应对市场需求。开发者需要准确了解数据存在哪里、字段格式怎样、数据表间的关系怎样、数据如何操作等信息,技术人员和业务人员还需要花很多时间掌握各种存储引擎的性能和特性。
计算引擎选择类型多、学习成本高
除了存储引擎,计算引擎的选择也给大家带来困扰。Spark和Flink各有千秋,也各自在快速发展和相互学习融合。另外机器学习引擎也有很多方案,比如Tensorflow、PyTorch以及计算引擎中携带的机器学习算法库,但这些方案的学习成本比较高,常常令开发者感到纠结,难以抉择。
工作语言不同意,编码风格差异化明显
存储引擎和计算引擎具有繁多的方案,会给协同工作带来较大的语言障碍。对于分析师来说,日常大量使用的主要是SQL,但是有些时候也会使用Python、Shell脚本等方式完成数据处理。数据建模人员主要依赖Python,而数据研发人员则主要使用Java和Scala来开发实时任务。产品运营人员甚至会使用Excel来完成一些简单的分析任务。当然,大多数时候他们还是将需求表达给数据分析师,由分析师来协助完成。
语言障碍也在一定程度上限制了协作的效率,在资源调配上也缺乏灵活性。比如基于Spark或Flink的实时任务目前只能由数据研发同学完成,这很容易造成工作积压。
因此统一sql显得尤为必要。如果我们仔细思考,其实会发现数据处理本质上是数仓的加工处理,而各类数据作业都可抽象为数仓的ETL过程,即数据的提取(extract)、转换(transform)和加载(load)。目前来看,SQL是描述数据处理流程最优的DSL(Domain Specific Language),是事实标准。
在目前大环境下推行统一SQL的解决方案是大势所趋,具备天时地利人和的基本条件:
天时:数据体量增长,计算、人力、沟通成本增加,为企业不能承受之重; 地利:主流关系型数据库、MPP数据库、计算引擎、ES、甚至NoSQL方案已经或者计划支持SQL语法; 人和:SQL语言易于上手,核心功能只有9个动词;分析师、建模师、数据研发甚至产品运营等非技术人员都可以快速掌握SQL这门语言。
我们认为可以通过统一SQL的方式去完成二八原则的转换,即从目前把80%的时间花在20%的常规数据作业上 ,转变成用20%的时间就可以完成80%的常规数据作业。这样我们就可以有更多的时间去解决更复杂的工作,去思考数据的价值。
统一sql需要支持哪些?
1、打通元数据,支持元数据采集和管理功能 2、兼容两种计算引擎(Spark/Flink),不对计算引擎进行修改; 3、可灵活扩展数据源; 4、对SQL优化过程自主可控,逻辑上与引擎解耦,物理上可以与引擎绑定; 5、使用统一的抽象屏蔽底层计算存储细节
什么是Quicksql?
Quicksql是由360团队开源的一款跨计算引擎的统一联邦查询中间件,用户可以使用标准SQL语法对各类数据源进行联合分析查询。其目标是构建实时\离线全数据源统一的数据处理范式,屏蔽底层物理存储和计算层,最大化业务处理数据的效率。同时能够提供给开发人员可插拔接口,由开发人员自行对接新数据源。
支持以下功能:
支持8种数据源查询:Hive, MySQL, Kylin, Elasticsearch, Oracle, MongoDB, PostgreSQL, GBase-8s; 支持Spark、Flink双计算引擎; 支持基础CLI命令行查询和JDBC远程连接查询; JDBC类型数据源可通过YAML配置快速接入,无需修改代码; 提供方言/语法对接模板,支持用户对新数据源的语法自定义; 提供元数据采集功能,批量拉取预置元数据; 支持落地HDFS,支持可配置的异步响应机制;
设计理念
使用统一的抽象屏蔽底层计算存储细节
针对业务痛点进行深入分析后,我们发现:数据分析人员的技能树通常向两个方向发展,一类是技术型,Spark、Flink 信手拈来,Python、R 语言一日千行,他们对业务和数据的理解可能不深入,但他们一定懂得怎样快速清洗加工数据。另一类是业务型,线性回归、残差网络各类算法应用自如,方差期望、概率分布准确求解,他们对技术栈的了解可能不完备,但他们可以从海量数据中抓出关键的信息打动老板。然而,日趋复杂的数据分析场景逐步提升了处理数据的难度,如果底层数据处理平台无法跟随场景进行演进,业务会陷入数据加工细节,导致产品迭代速度变缓,最终湮没在互联网快速迭代的大潮中。
因此,我们考虑对外提供统一的查询范式,屏蔽底层数据源和操作细节,让业务人员摆脱加工数据的技术束缚,不论是对成本控制还是效率提升都是一本万利的事情。
二次解析衔接所有数据源和引擎
对于统一查询的场景,我们参考了业界已有的引擎,调研发现:受限于当下计算体系结构,每一种引擎都存在优劣势。面临选择的场景,我们考虑使用动态调度的思想,使用 Apache Calcite 作为上层解析器,通过多一次解析搞清楚用户的查询意图,再向对应引擎解释路由,让合适的引擎做合适的事儿。QuickSQL 对外提供多种接口,用户可以直接通过 CLI 进行分析,也可以在平台中通过其他接口进行远程调用。
架构设计
整体架构包含三层:
语法解析层:负责 SQL 语句的解析、校验、优化、混算 SQL 的切分以及最终生成 Query Plan; 计算引擎层:负责 Query Plan 路由到具体的执行计划中,将 Query Plan 解释为具体的执行引擎可识别的语言; 数据存储层:负责数据的提取、存储;
解析层主要通过与元数据库交互进行语句解析和校验,结合独立的权限系统,可以在校验时进行表和字段级别的权限判断。解析生成的逻辑计划会经历联邦查询优化和逻辑树切分。切分后的逻辑子树由解释层进行方言解释和语句路由。对于特定源的查询直接由 JDBC 完成,混查场景则借助 Spark 或 Flink 作为分布式计算中间引擎进行数据周转处理。运行时层提供了下推语句的预聚合及抽取计算。
基本用法1. 将所有结构化数据查询统一为 SQL 语法
都用sql,如查询 Elasticsearch:
SELECT state, pop FROM geo_mapping WHERE state = 'CA' ORDER BY state; SELECT approx_count_distinct(city), state FROM geo_mapping GROUP BY state LIMIT 10;
屏蔽方言,直接写sql,不用指定数据库方言
SELECT * FROM geo_mapping LIMIT 10 OFFSET 10 -- Run Anywhere
2. 屏蔽不同数据源之间的隔离性
下面展示的是es关联hive的sql语法:
SELECT * FROM (SELECT * FROM es_raw.profile AS profile //index.tpye on Elasticsearch WHERE note IS NOT NULL )AS es_profile INNER JOIN (SELECT * FROM hive_db.employee AS emp //database.table on Hive INNER JOIN hive_db.action AS act //database.table on Hive ON emp.name = act.name) AS tmp ON es_profile.prefer = tmp.prefer
3. 选择最合适的方式来执行查询
涉及多个引擎的查询可以以很多种方式执行,作为一个长期的计划,Quicksql 希望结合每个引擎的优势来找到最合适的一个。
使用案例
demo1:CsvJoinWithEsExample
import com.qihoo.qsql.api.SqlRunner; import com.qihoo.qsql.api.SqlRunner.Builder.RunnerType; import com.qihoo.qsql.env.RuntimeEnv; import java.io.IOException; public class CsvJoinWithEsExample { /** * If you want to execute in the IDE, adjust the scope of the spark package in the parent pom to compile. * 如果希望在IDE中执行spark和flink的样例代码,请调整父pom中的spark、flink的scope值为compile。 * @param args nothing */ public static void main(String[] args) throws IOException { RuntimeEnv.init(); String sql = "SELECT * FROM depts " + "INNER JOIN (SELECT * FROM student " + "WHERE city in ('FRAMINGHAM', 'BROCKTON', 'CONCORD')) FILTERED " + "ON depts.name = FILTERED.type "; System.out.println("Iput: " + sql); SqlRunner.Builder.RunnerType runnerType = RunnerType.value(args.length < 1 ? "spark" : args[0]); SqlRunner runner = SqlRunner.builder() .setTransformRunner(runnerType) .setSchemaPath(RuntimeEnv.metadata) .setAppName("spark-mixed-app") .setAcceptedResultsNum(100) .ok(); runner.sql(sql).show(); System.exit(0); } }
demo2:SimpleQueryByFlinkExample
import com.qihoo.qsql.api.SqlRunner; import com.qihoo.qsql.api.SqlRunner.Builder.RunnerType; import com.qihoo.qsql.env.RuntimeEnv; import java.io.IOException; public class SimpleQueryByFlinkExample { /** * If you want to execute in the IDE, adjust the scope of the spark package in the parent pom to compile. * 如果希望在IDE中执行spark和flink的样例代码,请调整父pom中的spark、flink的scope值为compile。 * @param args nothing */ public static void main(String[] args) throws IOException { RuntimeEnv.init(); String sql = "select 1"; SqlRunner.Builder.RunnerType runnerType = RunnerType.value(args.length < 1 ? "flink" : args[0]); SqlRunner runner = SqlRunner.builder() .setTransformRunner(runnerType) .setSchemaPath(RuntimeEnv.metadata) .setAppName("test_csv_app") .setAcceptedResultsNum(100) .ok(); runner.sql(sql).show(); RuntimeEnv.close(); System.exit(0); } }
common:RuntimeEnv
import com.fasterxml.jackson.databind.node.ObjectNode; import com.google.common.collect.ImmutableMap; import com.google.common.io.LineProcessor; import com.google.common.io.Resources; import com.qihoo.qsql.CsvJoinWithEsExample; import com.qihoo.qsql.utils.PropertiesReader; import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.List; import java.util.Map; public class RuntimeEnv { static { PropertiesReader.configLogger(); } private static final String TEST_DATA_URL = PropertiesReader.getTestDataFilePath(); private static final EmbeddedElasticsearchPolicy NODE = EmbeddedElasticsearchPolicy.create(); public static void init() throws IOException { System.out.println("Elasticsearch Embedded Server is starting up, waiting...."); final Map<String, String> mapping = ImmutableMap.of("stu_id", "keyword", "type", "keyword", "city", "keyword", "digest", "long", "province", "keyword"); NODE.createIndex("student", mapping); // load records from file final List<ObjectNode> bulk = new ArrayList<>(); Resources.readLines(CsvJoinWithEsExample.class.getResource("/student.json"), StandardCharsets.UTF_8, new LineProcessor<Void>() { @Override public boolean processLine(String line) throws IOException { bulk.add((ObjectNode) NODE.mapper().readTree(line)); return true; } @Override public Void getResult() { return null; } }); if (bulk.isEmpty()) { throw new IllegalStateException("No records to index. Empty file ?"); } NODE.insertBulk("student", bulk); System.out.println("Elasticsearch Embedded Server has started!! Your query is running..."); } public static final String metadata = "inline:\n" + "{\n" + " \"version\": \"1.0\",\n" + " \"defaultSchema\": \"QSql\",\n" + " \"schemas\": [{\n" + " \"type\": \"custom\",\n" + " \"name\": \"custom_name\",\n" + " \"factory\": \"com.qihoo.qsql.org.apache.calcite.adapter.csv.CsvSchemaFactory\",\n" + " \"operand\": {\n" + " \"directory\": \"\"\n" + " },\n" + " \"tables\": [{\n" + " \"name\": \"depts\",\n" + " \"type\": \"custom\",\n" + " \"factory\": \"com.qihoo.qsql.org.apache.calcite.adapter.csv.CsvTableFactory\",\n" + " \"operand\": {\n" + " \"file\": \"" + TEST_DATA_URL + "\",\n" + " \"flavor\": \"scannable\"\n" + " },\n" + " \"columns\": [{\n" + " \"name\": \"deptno:int\"\n" + " },\n" + " {\n" + " \"name\": \"name:string\"\n" + " }\n" + " ]\n" + " }]\n" + " },\n" + " {\n" + " \"type\": \"custom\",\n" + " \"name\": \"esTest\",\n" + " \"factory\": \"com.qihoo.qsql.org.apache.calcite.adapter.elasticsearch.ElasticsearchCustomSchemaFactory\",\n" + " \"operand\": {\n" + " \"coordinates\": \"{'localhost': 9025}\",\n" + " \"userConfig\": \"{'bulk.flush.max.actions': 10, 'bulk.flush.max.size.mb': 1," + "'esUser':'username','esPass':'password'}\",\n" + " \"index\": \"student\"\n" + " },\n" + " \"tables\": [{\n" + " \"name\": \"student\",\n" + " \"factory\": \"com.qihoo.qsql.org.apache.calcite.adapter.elasticsearch.ElasticsearchTableFactory\",\n" + " \"operand\": {\n" + " \"dbName\": \"esTest\",\n" + " \"tableName\": \"student\",\n" + " \"esNodes\": \"localhost\",\n" + " \"esPort\": \"9025\",\n" + " \"esUser\": \"username\",\n" + " \"esPass\": \"password\",\n" + " \"esScrollNum\": \"1\",\n" + " \"esName\": \"esTest\"\n" + " },\n" + " \"columns\": [{\n" + " \"name\": \"city:string\"\n" + " },\n" + " {\n" + " \"name\": \"province:string\"\n" + " },\n" + " {\n" + " \"name\": \"digest:int\"\n" + " },\n" + " {\n" + " \"name\": \"type:string\"\n" + " },\n" + " {\n" + " \"name\": \"stu_id:string\"\n" + " }\n" + " ]\n" + " }]\n" + " }\n" + " ]\n" + "}"; public static final String metaDataSQLIET = "inline:\n" + "{\n" + " \"version\": \"1.0\",\n" + " \"defaultSchema\": \"QSql\",\n" + " \"schemas\": [{\n" + " \"type\": \"custom\",\n" + " \"name\": \"main\",\n" + " \"factory\": \"com.qihoo.qsql.org.apache.calcite.adapter.custom.JdbcSchemaFactory\",\n" + " \"operand\": {\n" + " \"dbName\": \"testdata\",\n" +" \"tableName\": \"t_test_data\",\n" +" \"dbType\": \"mysql\",\n" +" \"jdbcDriver\": \"org.sqlite.JDBC\",\n" +" \"jdbcUrl\": \"jdbc:sqlite:E://////test.db\",\n" +" \"jdbcUser\": \"xx\",\n" +" \"jdbcPassword\": \"xx\"\n" + " },\n" + " \"tables\": [{\n" + " \"name\": \"t_test_data\",\n" + " \"factory\": \"com.qihoo.qsql.org.apache.calcite.adapter.custom.JdbcTableFactory\",\n" + " \"operand\": {\n" + " \"dbName\": \"testdata\",\n" +" \"tableName\": \"t_test_data\",\n" +" \"dbType\": \"mysql\",\n" +" \"jdbcDriver\": \"org.sqlite.JDBC\",\n" +" \"jdbcUrl\": \"jdbc:sqlite:E://////test.db\",\n" +" \"jdbcUser\": \"xx\",\n" +" \"jdbcPassword\": \"xx\"\n" + " },\n" + " \"columns\": [{\n" + " \"name\": \"id:int\"\n" + " },\n" + " {\n" + " \"name\": \"dataid:int\"\n" + " },\n" + " {\n" + " \"name\": \"data:string\"\n" + " }\n" + " ]\n" + " }]\n" + " }\n" + " ]\n" + "}"; public static void close() { NODE.close(); } }
链接资料地址:
https://github.com/Qihoo360/Quicksql