让CarbonData使用更简单

本文涉及的产品
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
云数据库 RDS MySQL,高可用系列 2核4GB
简介: Apache CarbonData是一种新的高性能数据存储格式,针对当前大数据领域分析场景需求各异而导致的存储冗余问题,CarbonData提供了一种新的融合数据存储方案,以一份数据同时支持“任意维度组合的过滤查询、快速扫描、详单查询等”多种应用场景,并通过多级索引、字典编码、列存等特性提升了IO扫描和计算性能,实现百亿数据级秒级响应。
CarbonData 是什么
引用官方的说法:
Apache CarbonData是一种新的高性能数据存储格式,针对当前大数据领域分析场景需求各异而导致的存储冗余问题,CarbonData提供了一种新的融合数据存储方案,以一份数据同时支持“任意维度组合的过滤查询、快速扫描、详单查询等”多种应用场景,并通过多级索引、字典编码、列存等特性提升了IO扫描和计算性能,实现百亿数据级秒级响应。

CarbonData的使用

我之前写过一篇使用的文章。CarbonData集群模式体验。到0.3.0版本,已经把kettle去掉了,并且我提交的PR已经能够让其在Spark Streaming中运行。之后将其集成到StreamingPro中,可以简单通过配置即可完成数据的流式写入和作为SQL服务被读取。


准备工作
CarbonData 使用了Hive的MetaStore。
  • MySQL数据库
  • hive-site.xml 文件
  • 下载StreamingPro with CarbonData
MySQL

创建一个库:

create database hive CHARACTER SET latin1;
AI 代码解读

hdfs-site.xml
新建文件 /tmp/hdfs-site.xml,然后写入如下内容:

<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>

<property>
  <name>javax.jdo.option.ConnectionURL</name>
  <value>jdbc:mysql://127.0.0.1:3306/hive?createDatabaseIfNoExist=true</value>
</property>

<property>
  <name>javax.jdo.option.ConnectionDriverName</name>
  <value>com.mysql.jdbc.Driver</value>
</property>

<property>
  <name>javax.jdo.option.ConnectionUserName</name>
  <value>你的账号</value>
</property>

<property>
  <name>javax.jdo.option.ConnectionPassword</name>
  <value>你的密码</value>
</property>

<property>
  <name>hive.metastore.warehouse.dir</name>
  <value>file:///tmp/user/hive/warehouse</value>
</property>

<property>
<name>hive.exec.scratchdir</name>
<value>file:///tmp/hive/scratchdir</value>
</property>

<property>
 <name>hive.metastore.uris</name>
 <value></value>
</property>

<property>
  <name>datanucleus.autoCreateSchema</name>
  <value>true</value>
</property>


</configuration>
AI 代码解读


启动Spark Streaming写入数据
新建一个文件,/tmp/streaming-test-carbondata.json,内容如下:

{
  "test": {
    "desc": "测试",
    "strategy": "spark",
    "algorithm": [],
    "ref": [
      "testJoinTable"
    ],
    "compositor": [
      {
        "name": "streaming.core.compositor.spark.streaming.source.MockInputStreamCompositor",
        "params": [
          {
            "data1": [
              "1",
              "2",
              "3"
            ],
            "data2": [
              "1",
              "2",
              "3"
            ],
            "data3": [
              "1",
              "2",
              "3"
            ],
            "data4": [
              "1",
              "2",
              "3"
            ]
          }
        ]
      },
      {
        "name": "streaming.core.compositor.spark.streaming.transformation.SingleColumnJSONCompositor",
        "params": [
          {
            "name": "a"
          }
        ]
      },
      {
        "name": "stream.table",
        "params": [
          {
            "tableName": "test"
          }
        ]
      },
      {
        "name": "stream.sql",
        "params": [
          {
            "sql": "select a, \"5\" as b from test",
            "outputTableName": "test2"
          }
        ]
      },
      {
        "name": "stream.sql",
        "params": [
          {
            "sql": "select t2.a,t2.b from test2 t2, testJoinTable t3 where t2.a = t3.a"
          }
        ]
      },
      {
        "name": "stream.output.carbondata",
        "params": [
          {
            "format": "carbondata",
            "mode": "Append",
            "tableName": "carbon4",
            "compress": "true",
            "useKettle": "false",
            "tempCSV":"false"
          }
        ]
      }
    ],
    "configParams": {
    }
  },
  "testJoinTable": {
    "desc": "测试",
    "strategy": "refTable",
    "algorithm": [],
    "ref": [],
    "compositor": [
      {
        "name": "streaming.core.compositor.spark.source.MockJsonCompositor",
        "params": [
          {
            "a": "3"
          },
          {
            "a": "4"
          },
          {
            "a": "5"
          }
        ]
      },
      {
        "name": "batch.refTable",
        "params": [
          {
            "tableName": "testJoinTable"
          }
        ]
      }
    ],
    "configParams": {
    }
  }
}
AI 代码解读


运行即可(spark 1.6 都可以)

./bin/spark-submit   --class streaming.core.StreamingApp \
--master local[2] \
--name test \
--files /tmp/hdfs-site.xml \
/Users/allwefantasy/CSDNWorkSpace/streamingpro/target/streamingpro-0.4.7-SNAPSHOT-online-1.6.1-carbondata-0.3.0.jar    \
-streaming.name test    \
-streaming.platform  spark_streaming  \
-streaming.job.file.path file:///tmp/streaming-test-carbondata.json \
-streaming.enableCarbonDataSupport true \
-streaming.carbondata.store /tmp/carbondata/store \
-streaming.carbondata.meta /tmp/carbondata/meta
AI 代码解读

如果/tmp/carbondata/store/default/ 目录生成了文件就代表数据已经写入。


启动SQL查询服务
新建一个/tmp/empty.json文件,内容为:
{}
AI 代码解读

启动命令:

./bin/spark-submit   --class streaming.core.StreamingApp \
--master local[2] \
--name test \
--files /tmp/hdfs-site.xml \
/Users/allwefantasy/CSDNWorkSpace/streamingpro/target/streamingpro-0.4.7-SNAPSHOT-online-1.6.1-carbondata-0.3.0.jar    \
-streaming.name test    \
-streaming.rest true \
-streaming.spark.service true \
-streaming.platform  spark  \
-streaming.job.file.path file:///tmp/empty.json \
-streaming.enableCarbonDataSupport true \
-streaming.carbondata.store /tmp/carbondata/store \
-streaming.carbondata.meta /tmp/carbondata/meta
AI 代码解读
查询方式:
curl --request POST \
  --url http://127.0.0.1:9003/sql \
  --header 'cache-control: no-cache' \
  --header 'content-type: application/x-www-form-urlencoded' \
  --data 'sql=select%20*%20from%20carbon4%20where%20a%3D%223%22&resultType=json'
AI 代码解读
如果放在PostMan之类的东西里,是这样子的:

69bd64f4bf84fb8560667e4147e854cc4de6cbb2

常见问题
如果出现类似

File does not exist: /tmp/carbondata/store/default/carbon3/Fact/Part0/Segment_0
AI 代码解读

则是因为在你的环境里找到了hadoop相关的配置文件,比如hdfs-site.xml之类的。去掉或者自己写一个,比如新建一个 hdfs-site.xml,然后写入如下内容:

<?xml version="1.0" encoding="UTF-8"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>

    <property>
        <name>fs.default.name</name>
        <value>file:///</value>
    </property>
</configuration>
AI 代码解读
这样就会读本地文件了。

相关实践学习
如何快速连接云数据库RDS MySQL
本场景介绍如何通过阿里云数据管理服务DMS快速连接云数据库RDS MySQL,然后进行数据表的CRUD操作。
全面了解阿里云能为你做什么
阿里云在全球各地部署高效节能的绿色数据中心,利用清洁计算为万物互联的新世界提供源源不断的能源动力,目前开服的区域包括中国(华北、华东、华南、香港)、新加坡、美国(美东、美西)、欧洲、中东、澳大利亚、日本。目前阿里云的产品涵盖弹性计算、数据库、存储与CDN、分析与搜索、云通信、网络、管理与监控、应用服务、互联网中间件、移动服务、视频服务等。通过本课程,来了解阿里云能够为你的业务带来哪些帮助 &nbsp; &nbsp; 相关的阿里云产品:云服务器ECS 云服务器 ECS(Elastic Compute Service)是一种弹性可伸缩的计算服务,助您降低 IT 成本,提升运维效率,使您更专注于核心业务创新。产品详情: https://www.aliyun.com/product/ecs
目录
打赏
0
0
0
0
110
分享
相关文章
Hive起源
Hive起源于Facebook,它最初是Facebook的一个内部项目,用于处理大规模数据。2009年,Facebook将Hive捐赠给了Apache软件基金会,成为Apache的一个开源项目。【2月更文挑战第9天】
166 1
|
10月前
|
Apache Hudi PMC畅谈Hudi未来演进之路
Apache Hudi PMC畅谈Hudi未来演进之路
103 0
大数据面试题百日更新_Hadoop专题_Yarn专题(Day11)
大数据面试题百日更新_Hadoop专题_Yarn专题(Day11)
85 0
大数据面试题百日更新_Hive专题(Day12)
大数据面试题百日更新_Hive专题(Day12)
55 0
Flink Table Store 独立孵化启动 ,Apache Paimon 诞生
2023 年 3 月 12 日,Flink Table Store 项目顺利通过投票,正式进入 Apache 软件基金会 (ASF) 的孵化器,改名为 Apache Paimon (incubating)。
Flink Table Store 独立孵化启动 ,Apache Paimon 诞生
carbondata1.5.1编译
What is CarbonData? Apache CarbonData is an indexed columnar data format for fast analytics on big data platform, e.g. Apache Hadoop, Apache Spark, etc. 因为我的spark是2.3.1的版本,而最新版的carbondata1.5.1才支持,但是官网没有编译好的,需要我们自己编译,在编译的时候遇到一些问题,记录一下.
carbondata1.5.1编译
Apache Carbondata on Preto
1.download apache carbondata - 1.5.3apache carbondata - 1.5.4apache spark - 2.3.2apache hadoop - 2.7.
1269 0
AI助理

你好,我是AI助理

可以解答问题、推荐解决方案等