Kafka - SQL 代码实现

简介:

1.概述

  上次给大家分享了关于 Kafka SQL 的实现思路,这次给大家分享如何实现 Kafka SQL。要实现 Kafka SQL,在上一篇《Kafka - SQL 引擎分享》中分享了其实现的思路,核心包含数据源的加载,以及 SQL 树的映射。今天笔者给大家分享相关实现的代码。

2.内容

  这里,将数据映射成 SQL Tree 是使用了 Apache Calcite 来承接这部分工作。在实现代码之前,我们首先来了解下 Apache Calcite 的相关内容,Apache Calcite 是一个面向 Hadoop 的查询引擎,它提供了业界标准的 SQL 语言,以及多种查询优化和连接各种存储介质的适配器。另外,还能处理 OLAP 和流处理场景。因为存在这么多优秀和闪光的特性, Hadoop 生态圈中 Apache Calcite 越发引人注目,被诸多项目所集成,常见的有:

  • Apache Drill:基于大数据的实时查询引擎
  • Apache Spark:继 Hadoop 之后的新一代大数据分布式处理框架。
  • 更多详情,这里就不一一列举了,详情查看地址:《Adapters

2.1 数据类型

  这里数据源的数据类型,我们分为两种,一种是 SQL,另一种是基于编程语言的,这里我们所使用的是 Java,定义内容如下:

复制代码
public static Map<String, SqlTypeName> SQLTYPE_MAPPING = new HashMap<String, SqlTypeName>();
public static Map<String, Class> JAVATYPE_MAPPING = new HashMap<String, Class>();

public static void initRowType() {
        SQLTYPE_MAPPING.put("char", SqlTypeName.CHAR);
        JAVATYPE_MAPPING.put("char", Character.class);
        SQLTYPE_MAPPING.put("varchar", SqlTypeName.VARCHAR);
        JAVATYPE_MAPPING.put("varchar", String.class);
        // ......     
}
复制代码

2.2 表的相关描述

  另外,我们需要对表进行一个描述,在关系型数据库中,一个正常的表由行列组成,定义内容如下:

复制代码
    public static class Database {
        public List<Table> tables = new LinkedList<Table>();
    }

    public static class Table {
        public String tableName;
        public List<Column> columns = new LinkedList<Column>();
        public List<List<String>> data = new LinkedList<List<String>>();
    }

    public static class Column {
        public String name;
        public String type;
    }
复制代码

  在每个集合中存储数据库相关名称,每个数据库存储多个集合的表对象,每个表对象下面又有一系列的列以及绑定的数据源。在每个列对象中包含字段名和类型,层层递进,依次关联。在使用 Calcite 是,需要遵循其 JSON Model,上篇博客我们已经定义过其 JSON Model,这里我们直接拿来使用,内容如下:

复制代码
{
    version: '1.0',
    defaultSchema: 'kafka',  
    schemas: [  
        {
            name: 'kafka',  
            type: 'custom',
            factory: 'cn.smartloli.kafka.visual.engine.KafkaMemorySchemaFactory',  
            operand: {
                database: 'kafka_db'
            }  
        } 
    ]
}
复制代码

   要实现其 Model ,这里需要我们去实现 org.apache.calcite.schema.SchemaFactory 的接口,内容如下所示:

public class KafkaMemorySchemaFactory implements SchemaFactory {
    @Override
    public Schema create(SchemaPlus parentSchema, String name, Map<String, Object> operand) {
        return new KafkaMemorySchema(name);
    }
}

  而在 KafkaMemorySchema 类中,我们只需要实现它的 getTableMap 方法,内容如下所示:

复制代码
 @Override
 protected Map<String, Table> getTableMap() {
   Map<String, Table> tables = new HashMap<String, Table>();
    Database database = KafkaMemoryData.MAP.get(this.dbName);
    if (database == null)
      return tables;
    for (KafkaMemoryData.Table table : database.tables) {
      tables.put(table.tableName, new KafkaMemoryTable(table));
    }
    return tables;
 }
复制代码

  从上述代码中,可以知道通过内存中的 Map 表查看对应的数据库对象,然后根据数据库对象中的表作为 Schema 中的表,而表的类型为 KafkaMemoryTable。

2.3 表类型

  这里笔者就直接使用全表扫描,使用 org.apache.calcite.schema.impl.AbstractTable 的默认方式,实现其 getRowType 方法和 scan 方法,内容如下所示:

复制代码
public RelDataType getRowType(RelDataTypeFactory typeFactory) {
  if(dataType == null) {
     RelDataTypeFactory.FieldInfoBuilder fieldInfo = typeFactory.builder();
      for (KafkaMemoryData.Column column : this.sourceTable.columns) {
        RelDataType sqlType = typeFactory.createJavaType(
        KafkaMemoryData.JAVATYPE_MAPPING.get(column.type));
        sqlType = SqlTypeUtil.addCharsetAndCollation(sqlType, typeFactory);
        fieldInfo.add(column.name, sqlType);
      }
      this.dataType = typeFactory.createStructType(fieldInfo);
   }
   return this.dataType;
}
复制代码
复制代码
public Enumerable<Object[]> scan(DataContext root) {
        final List<String> types = new ArrayList<String>(sourceTable.columns.size());
        for(KafkaMemoryData.Column column : sourceTable.columns) {
            types.add(column.type);
        }
        final int[] fields = identityList(this.dataType.getFieldCount());
        return new AbstractEnumerable<Object[]>() {
            public Enumerator<Object[]> enumerator() {
                return new KafkaMemoryEnumerator<Object[]>(fields, types, sourceTable.data);
            }
        };
    }
复制代码

  代码中,表中的字段名和类型是根据初始化时,每个表中的数据类型映射匹配的,在 KafkaMemoryData.SQLTYPE_MAPPING 和 KafkaMemoryData.JAVATYPE_MAPPING 中有描述相关自定义类型映射,这里就不多做赘述了。

  实现流程大致就是这个样子,将每次的 SQL 查询,通过 Calcite 解析成标准可执行的 SQL 计划,执行期间会根据定义的信息,初始化每一个 Schema,在通过调用 getTableMap 获取字段名和类型,根据这些信息判断查询的表,字段名,类型以及 SQL 语法是否标准规范。然后在使用 Calcite 内部机制,生成物理执行计划。查询计划是 Tree 形式的,底层是进行扫表操作(可看作为 FROM),获取每个表的数据,之后在根据表数据进行上层的关联操作,如 JOIN,GROUP BY,LIMIT 等操作。

3.测试

  完成上述流程后,进行代码测试,测试代码如下所示:

复制代码
public static void main(String[] args) {
        try {
            Class.forName("org.apache.calcite.jdbc.Driver");
        } catch (Exception ex) {
            ex.printStackTrace();
        }
        Properties info = new Properties();
        try {
            Connection connection = DriverManager.getConnection("jdbc:calcite:model=/Users/dengjie/hadoop/workspace/kafka/kafka-visual/src/main/resources/plugins.json",info);    
            Statement st = connection.createStatement();
            // String sql = "select * from \"Kafka\" where \"_plat\"='1004' limit 1";
            String sql = "select * from \"Kafka\" limit 10";

            long start = System.currentTimeMillis();
            result = st.executeQuery(sql);
            ResultSetMetaData rsmd = result.getMetaData();
            List<Map<String, Object>> ret = new ArrayList<Map<String,Object>>();
            
            while (result.next()) {
                Map<String, Object> map = new HashMap<String, Object>();
                for (int i = 1; i <= rsmd.getColumnCount(); i++) {
                    System.out.print(result.getString(rsmd.getColumnName(i)) + " ");
                    map.put(rsmd.getColumnName(i), result.getString(rsmd.getColumnName(i)));
                }
                ret.add(map);
                System.out.println();
            }
            System.out.println(new Gson().toJson(ret));       
            result.close();
            connection.close();
        } catch (SQLException e) {
            e.printStackTrace();
        }
    }
复制代码

4.总结

  以上便是将 Kafka 中数据消费后,作为数据源加载和 SQL Tree 映射的实现代码,实现不算太困难,在编写 SQL 查询的时候,需要遵循标准的 SQL 语法来操作数据源。

5.结束语

  这篇博客就和大家分享到这里,如果大家在研究学习的过程当中有什么问题,可以加群进行讨论或发送邮件给我,我会尽我所能为您解答,与君共勉!

 

联系方式: 
邮箱:smartloli.org@gmail.com 
Twitter: https://twitter.com/smartloli 
QQ群(Hadoop - 交流社区1): 424769183 
温馨提示:请大家加群的时候写上加群理由(姓名+公司/学校),方便管理员审核,谢谢! 

热爱生活,享受编程,与君共勉!



本文转自哥不是小萝莉博客园博客,原文链接:http://www.cnblogs.com/smartloli/,如需转载请自行联系原作者

相关文章
|
1月前
|
消息中间件 存储 Java
大数据-58 Kafka 高级特性 消息发送02-自定义序列化器、自定义分区器 Java代码实现
大数据-58 Kafka 高级特性 消息发送02-自定义序列化器、自定义分区器 Java代码实现
44 3
|
1月前
|
SQL 关系型数据库 MySQL
创建SQL数据库的基本步骤与代码指南
在信息时代,数据管理显得尤为重要,其中数据库系统已成为信息技术架构的关键部分。而当我们谈论数据库系统时,SQL(结构化查询语言)无疑是其中最核心的工具之一。本文将详细介绍如何使用SQL创建数据库,包括编写相应的代码和必要的步骤。由于篇幅限制,本文可能无法达到您要求的2000字长度,但会尽量涵盖创建数
44 3
|
1月前
|
消息中间件 Java 大数据
大数据-56 Kafka SpringBoot与Kafka 基础简单配置和使用 Java代码 POM文件
大数据-56 Kafka SpringBoot与Kafka 基础简单配置和使用 Java代码 POM文件
65 2
|
1月前
|
SQL 监控 关系型数据库
SQL错误代码1303解析与处理方法
在SQL编程和数据库管理中,遇到错误代码是常有的事,其中错误代码1303在不同数据库系统中可能代表不同的含义
|
1月前
|
消息中间件 分布式计算 Kafka
大数据-102 Spark Streaming Kafka ReceiveApproach DirectApproach 附带Producer、DStream代码案例
大数据-102 Spark Streaming Kafka ReceiveApproach DirectApproach 附带Producer、DStream代码案例
55 0
|
1月前
|
SQL 安全 关系型数据库
SQL错误代码1303解析与解决方案:深入理解并应对权限问题
在数据库管理和开发过程中,遇到错误代码是常见的事情,每个错误代码都代表着一种特定的问题
|
3月前
|
存储 SQL 安全
【数据库高手的秘密武器:深度解析SQL视图与存储过程的魅力——封装复杂逻辑,实现代码高复用性的终极指南】
【8月更文挑战第31天】本文通过具体代码示例介绍 SQL 视图与存储过程的创建及应用优势。视图作为虚拟表,可简化复杂查询并提升代码可维护性;存储过程则预编译 SQL 语句,支持复杂逻辑与事务处理,增强代码复用性和安全性。通过创建视图 `high_earners` 和存储过程 `get_employee_details` 及 `update_salary` 的实例,展示了二者在实际项目中的强大功能。
41 1
|
2月前
|
SQL 安全 数据库
基于SQL Server事务日志的数据库恢复技术及实战代码详解
基于事务日志的数据库恢复技术是SQL Server中一个非常强大的功能,它能够帮助数据库管理员在数据丢失或损坏的情况下,有效地恢复数据。通过定期备份数据库和事务日志,并在需要时按照正确的步骤恢复,可以最大限度地减少数据丢失的风险。需要注意的是,恢复数据是一个需要谨慎操作的过程,建议在执行恢复操作之前,详细了解相关的操作步骤和注意事项,以确保数据的安全和完整。
116 0
|
3月前
|
JSON 数据格式 Java
化繁为简的魔法:Struts 2 与 JSON 联手打造超流畅数据交换体验,让应用飞起来!
【8月更文挑战第31天】在现代 Web 开发中,JSON 成为数据交换的主流格式,以其轻量、易读和易解析的特点受到青睐。Struts 2 内置对 JSON 的支持,结合 Jackson 库可便捷实现数据传输。本文通过具体示例展示了如何在 Struts 2 中进行 JSON 数据的序列化与反序列化,并结合 AJAX 技术提升 Web 应用的响应速度和用户体验。
115 0
|
3月前
|
SQL 数据库 索引
SQL 编程最佳实践简直太牛啦!带你编写高效又可维护的 SQL 代码,轻松应对数据库挑战!
【8月更文挑战第31天】在SQL编程中,高效与可维护的代码至关重要,不仅能提升数据库性能,还降低维护成本。本文通过案例分析探讨SQL最佳实践:避免全表扫描,利用索引加速查询;合理使用JOIN,避免性能问题;避免使用`SELECT *`,减少不必要的数据传输;使用`COMMIT`和`ROLLBACK`确保事务一致性;添加注释提高代码可读性。遵循这些实践,不仅提升性能,还便于后期维护和扩展。应根据具体情况选择合适方法并持续优化SQL代码。
55 0