让CarbonData使用更简单

本文涉及的产品
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
RDS MySQL Serverless 高可用系列,价值2615元额度,1个月
简介: Apache CarbonData是一种新的高性能数据存储格式,针对当前大数据领域分析场景需求各异而导致的存储冗余问题,CarbonData提供了一种新的融合数据存储方案,以一份数据同时支持“任意维度组合的过滤查询、快速扫描、详单查询等”多种应用场景,并通过多级索引、字典编码、列存等特性提升了IO扫描和计算性能,实现百亿数据级秒级响应。
CarbonData 是什么
引用官方的说法:
Apache CarbonData是一种新的高性能数据存储格式,针对当前大数据领域分析场景需求各异而导致的存储冗余问题,CarbonData提供了一种新的融合数据存储方案,以一份数据同时支持“任意维度组合的过滤查询、快速扫描、详单查询等”多种应用场景,并通过多级索引、字典编码、列存等特性提升了IO扫描和计算性能,实现百亿数据级秒级响应。

CarbonData的使用

我之前写过一篇使用的文章。CarbonData集群模式体验。到0.3.0版本,已经把kettle去掉了,并且我提交的PR已经能够让其在Spark Streaming中运行。之后将其集成到StreamingPro中,可以简单通过配置即可完成数据的流式写入和作为SQL服务被读取。


准备工作
CarbonData 使用了Hive的MetaStore。
  • MySQL数据库
  • hive-site.xml 文件
  • 下载StreamingPro with CarbonData
MySQL

创建一个库:

create database hive CHARACTER SET latin1;

hdfs-site.xml
新建文件 /tmp/hdfs-site.xml,然后写入如下内容:

<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>

<property>
  <name>javax.jdo.option.ConnectionURL</name>
  <value>jdbc:mysql://127.0.0.1:3306/hive?createDatabaseIfNoExist=true</value>
</property>

<property>
  <name>javax.jdo.option.ConnectionDriverName</name>
  <value>com.mysql.jdbc.Driver</value>
</property>

<property>
  <name>javax.jdo.option.ConnectionUserName</name>
  <value>你的账号</value>
</property>

<property>
  <name>javax.jdo.option.ConnectionPassword</name>
  <value>你的密码</value>
</property>

<property>
  <name>hive.metastore.warehouse.dir</name>
  <value>file:///tmp/user/hive/warehouse</value>
</property>

<property>
<name>hive.exec.scratchdir</name>
<value>file:///tmp/hive/scratchdir</value>
</property>

<property>
 <name>hive.metastore.uris</name>
 <value></value>
</property>

<property>
  <name>datanucleus.autoCreateSchema</name>
  <value>true</value>
</property>


</configuration>


启动Spark Streaming写入数据
新建一个文件,/tmp/streaming-test-carbondata.json,内容如下:

{
  "test": {
    "desc": "测试",
    "strategy": "spark",
    "algorithm": [],
    "ref": [
      "testJoinTable"
    ],
    "compositor": [
      {
        "name": "streaming.core.compositor.spark.streaming.source.MockInputStreamCompositor",
        "params": [
          {
            "data1": [
              "1",
              "2",
              "3"
            ],
            "data2": [
              "1",
              "2",
              "3"
            ],
            "data3": [
              "1",
              "2",
              "3"
            ],
            "data4": [
              "1",
              "2",
              "3"
            ]
          }
        ]
      },
      {
        "name": "streaming.core.compositor.spark.streaming.transformation.SingleColumnJSONCompositor",
        "params": [
          {
            "name": "a"
          }
        ]
      },
      {
        "name": "stream.table",
        "params": [
          {
            "tableName": "test"
          }
        ]
      },
      {
        "name": "stream.sql",
        "params": [
          {
            "sql": "select a, \"5\" as b from test",
            "outputTableName": "test2"
          }
        ]
      },
      {
        "name": "stream.sql",
        "params": [
          {
            "sql": "select t2.a,t2.b from test2 t2, testJoinTable t3 where t2.a = t3.a"
          }
        ]
      },
      {
        "name": "stream.output.carbondata",
        "params": [
          {
            "format": "carbondata",
            "mode": "Append",
            "tableName": "carbon4",
            "compress": "true",
            "useKettle": "false",
            "tempCSV":"false"
          }
        ]
      }
    ],
    "configParams": {
    }
  },
  "testJoinTable": {
    "desc": "测试",
    "strategy": "refTable",
    "algorithm": [],
    "ref": [],
    "compositor": [
      {
        "name": "streaming.core.compositor.spark.source.MockJsonCompositor",
        "params": [
          {
            "a": "3"
          },
          {
            "a": "4"
          },
          {
            "a": "5"
          }
        ]
      },
      {
        "name": "batch.refTable",
        "params": [
          {
            "tableName": "testJoinTable"
          }
        ]
      }
    ],
    "configParams": {
    }
  }
}


运行即可(spark 1.6 都可以)

./bin/spark-submit   --class streaming.core.StreamingApp \
--master local[2] \
--name test \
--files /tmp/hdfs-site.xml \
/Users/allwefantasy/CSDNWorkSpace/streamingpro/target/streamingpro-0.4.7-SNAPSHOT-online-1.6.1-carbondata-0.3.0.jar    \
-streaming.name test    \
-streaming.platform  spark_streaming  \
-streaming.job.file.path file:///tmp/streaming-test-carbondata.json \
-streaming.enableCarbonDataSupport true \
-streaming.carbondata.store /tmp/carbondata/store \
-streaming.carbondata.meta /tmp/carbondata/meta

如果/tmp/carbondata/store/default/ 目录生成了文件就代表数据已经写入。


启动SQL查询服务
新建一个/tmp/empty.json文件,内容为:
{}

启动命令:

./bin/spark-submit   --class streaming.core.StreamingApp \
--master local[2] \
--name test \
--files /tmp/hdfs-site.xml \
/Users/allwefantasy/CSDNWorkSpace/streamingpro/target/streamingpro-0.4.7-SNAPSHOT-online-1.6.1-carbondata-0.3.0.jar    \
-streaming.name test    \
-streaming.rest true \
-streaming.spark.service true \
-streaming.platform  spark  \
-streaming.job.file.path file:///tmp/empty.json \
-streaming.enableCarbonDataSupport true \
-streaming.carbondata.store /tmp/carbondata/store \
-streaming.carbondata.meta /tmp/carbondata/meta
查询方式:
curl --request POST \
  --url http://127.0.0.1:9003/sql \
  --header 'cache-control: no-cache' \
  --header 'content-type: application/x-www-form-urlencoded' \
  --data 'sql=select%20*%20from%20carbon4%20where%20a%3D%223%22&resultType=json'
如果放在PostMan之类的东西里,是这样子的:

69bd64f4bf84fb8560667e4147e854cc4de6cbb2

常见问题
如果出现类似

File does not exist: /tmp/carbondata/store/default/carbon3/Fact/Part0/Segment_0

则是因为在你的环境里找到了hadoop相关的配置文件,比如hdfs-site.xml之类的。去掉或者自己写一个,比如新建一个 hdfs-site.xml,然后写入如下内容:

<?xml version="1.0" encoding="UTF-8"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>

    <property>
        <name>fs.default.name</name>
        <value>file:///</value>
    </property>
</configuration>
这样就会读本地文件了。

相关实践学习
如何在云端创建MySQL数据库
开始实验后,系统会自动创建一台自建MySQL的 源数据库 ECS 实例和一台 目标数据库 RDS。
全面了解阿里云能为你做什么
阿里云在全球各地部署高效节能的绿色数据中心,利用清洁计算为万物互联的新世界提供源源不断的能源动力,目前开服的区域包括中国(华北、华东、华南、香港)、新加坡、美国(美东、美西)、欧洲、中东、澳大利亚、日本。目前阿里云的产品涵盖弹性计算、数据库、存储与CDN、分析与搜索、云通信、网络、管理与监控、应用服务、互联网中间件、移动服务、视频服务等。通过本课程,来了解阿里云能够为你的业务带来哪些帮助 &nbsp; &nbsp; 相关的阿里云产品:云服务器ECS 云服务器 ECS(Elastic Compute Service)是一种弹性可伸缩的计算服务,助您降低 IT 成本,提升运维效率,使您更专注于核心业务创新。产品详情: https://www.aliyun.com/product/ecs
目录
相关文章
|
3月前
|
存储 数据挖掘 Apache
Apache Doris + Iceberg 快速搭建指南|Lakehouse 使用手册(三)
如何在 Docker 环境下快速搭建 Apache Doris + Apache Iceberg 测试 & 演示环境,并展示各功能的使用操作
Apache Doris + Iceberg 快速搭建指南|Lakehouse 使用手册(三)
|
4月前
|
SQL 分布式计算 Apache
Apache Doris + Apache Hudi 快速搭建指南|Lakehouse 使用手册(一)
本文将在 Docker 环境下,为读者介绍如何快速搭建 Apache Doris + Apache Hudi 的测试及演示环境,并对各功能操作进行演示,帮助读者快速入门。
Apache Doris + Apache Hudi 快速搭建指南|Lakehouse 使用手册(一)
|
6月前
|
SQL 分布式计算 Java
Apache Hudi与Hive集成手册
Apache Hudi与Hive集成手册
355 0
|
SQL 存储 分布式计算
【Kylin】(一)初识 Apache Kylin 1
【Kylin】(一)初识 Apache Kylin 1
290 0
【Kylin】(一)初识 Apache Kylin 1
|
SQL 存储 分布式计算
【Kylin】(一)初识 Apache Kylin 2
【Kylin】(一)初识 Apache Kylin 2
315 0
【Kylin】(一)初识 Apache Kylin 2
|
SQL 消息中间件 Java
Flink on zeppelin 实时写入hive
概述 随着Flink1.11.0版本的发布,一个很重要的特性就是支持了流数据直接写入到hive中,用户可以非常方便的用SQL的方式把kafka的数据直接写入到hive里面.这篇文章会给出Flink on zeppelin里面实现流式写入hive的简单示例以及遇到问题的解决方案
|
SQL 存储 分布式计算
Apache Hudi和Presto的前世今生
一篇由Apache Hudi PMC Bhavani Sudha Saktheeswaran和AWS Presto团队工程师Brandon Scheller分享Apache Hudi和Presto集成的一篇文章。
293 0
Apache Hudi和Presto的前世今生
|
SQL 分布式计算 前端开发
|
API 数据安全/隐私保护 Hbase
Dremio与Drill的对比
1.简述 Dremio与Drill简述 2.区别 a).数据源支持 使用最新版本Dremio 3.3.1和Drill 1.16.0Dremio3.1.3版本开始不支持HBase,将来会开源社区版HBase连接器 b).
2999 0