Apache Carbondata接入Kafka实时流数据-阿里云开发者社区

开发者社区> 北斗云> 正文

Apache Carbondata接入Kafka实时流数据

简介: 1.导入carbondata依赖的jar包 将apache-carbondata-1.5.3-bin-spark2.3.2-hadoop2.7.2.jar导入$SPARKHOME/jars;或将apache-carbondata-1.
+关注继续查看

1.导入carbondata依赖的jar包

apache-carbondata-1.5.3-bin-spark2.3.2-hadoop2.7.2.jar导入$SPARKHOME/jars;或将apache-carbondata-1.5.3-bin-spark2.3.2-hadoop2.7.2.jar导入在$SPARKHOME创建的carbondlib目录

2.导入kafka依赖的jar包

接入kafka数据需要依赖kafka的jars,将以下jars导入$SPARKHOME/jars

kafka-clients-0.10.0.1.jar
spark-sql-kafka-0-10_2.11-2.3.2.jar

3.spark-shell启动服务

./bin/spark-shell --master spark://hostname:7077 --jars apache-carbondata-1.5.3-bin-spark2.3.2-hadoop2.7.2.jar

a).导入依赖

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.CarbonSession._

b).创建session

启动第一个目录是数据存储目录,第二个目录是元数据目录;都可以是hdfs目录

val carbon = SparkSession.builder().config(sc.getConf).getOrCreateCarbonSession("/home/bigdata/carbondata/data","/home/bigdata/carbondata/carbon.metastore")

c).创建source表

carbon.sql(
            s"""
                | CREATE TABLE IF NOT EXISTS kafka_json_source(
                |     id STRING,
                |    name STRING,
                |    age INT,
                |    brithday TIMESTAMP)
                | STORED AS carbondata
                | TBLPROPERTIES(
                |    'streaming'='source',
                |    'format'='kafka',
                |    'kafka.bootstrap.servers'='hostname:9092',
                |    'subscribe'='kafka_json',
                |    'record_format'='json',
                |    'comment'='get kafka data')
            """.stripMargin).show()

d).创建sink表

carbon.sql(
            s"""
                | CREATE TABLE IF NOT EXISTS kafka_json_sink(
                |     id STRING,
                |    name STRING,
                |    age INT,
                |    brithday TIMESTAMP)
                | STORED AS carbondata
                | TBLPROPERTIES(
                |    'streaming'='sink')
            """.stripMargin).show()

e).创建job任务

carbon.sql(
            s"""
                | CREATE STREAM kafka_json_job ON TABLE kafka_json_sink(
                | STMPROPERTIES(
                |    'trigger'='ProcessingTime',
                |    'interval'='10 seconds')
                | AS SELECT * FROM kafka_json_source
            """.stripMargin).show()

f).创建DATAMAP

carbon.sql(
            s"""
                | CREATE DATAMAP agg_kafka_json_sink 
                | ON TABLE kafka_json_sink(
                | USING "preaggregate"
                | AS
                |    SELECT id,name,sum(age),max(age),min(age),avg(age)
                |    FROM kafka_json_sink
                |    GRPUP BY id,name
            """.stripMargin).show()

4.常用SQL命令

a).导入本地数据

carbon.sql("LOAD DATA INPATH '/home/bigdata/carbondata/sample.csv' INTO TABLE kafka_json_source").show()

b).查看表结构

carbon.sql("DESC kafka_json_source").show()

c).查看表数据

carbon.sql("SELECT * FROM kafka_json_source WHERE id=1").show()

d).清理表数据

carbon.sql("TRUNCATE TABLE kafka_json_sink").show()

e).删除表

carbon.sql("DROP TABLE IF EXISTS kafka_json_source").show()

f).查看job任务状态

carbon.sql("SHOW STREAMS ON TABLE kafka_json_sink").show()

g).删除job任务

carbon.sql("DROP STREAM kafka_json_job").show()

h).查询DATAMAP表信息

carbon.sql("DESC agg_kafka_json_sink_kafka_json_sink").show()

i).查询表Segments信息

carbon.sql("SHOW SEGMENTS FOR TABLE kafka_json_sink").show()

j).条件查询

carbon.sql("SELECT * FROM kafka_json_sink WHERE agent_id=499 AND signature=''").show()

k).聚合查询

carbon.sql("SELECT agent_id,signature,method_type,sum(elapse_time),max(elapse_time),min(elapse_time) FROM kafka_json_sink GROUP BY agent_id,signature,method_type").show()

5.注意事项

a).kafka使用配置

由于Carbondata的kafka-consumer反序列化配置如下,所以在kafka-producer应该使用对于配置,否则无法解析数据

key.deserializer = org.apache.kafka.common.serialization.ByteArrayDeserializer
value.deserializer = org.apache.kafka.common.serialization.ByteArrayDeserializer

版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。

相关文章
【译】Databricks使用Spark Streaming和Delta Lake对流式数据进行数据质量监控介绍
本文主要对Databricks如何使用Spark Streaming和Delta Lake对流式数据进行数据质量监控的方法和架构进行了介绍,本文探讨了一种数据管理架构,该架构可以在数据到达时,通过主动监控和分析来检测流式数据中损坏或不良的数据,并且不会造成瓶颈。
1183 0
DataHub:流行的元数据架构介绍
时至今日,我们正生活在数据的黄金时代。当数据科学家加入数据驱动型公司时,他们希望找到一种数据发现工具(即数据目录),可以用来找出公司中存在哪些数据集,以及如何使用这些数据集来测试新假设和产生新见解。大多数数据科学家并不真正关心这个工具在幕后是如何工作的,只要它能使他们富有成效。
162 0
EMR Spark Relational Cache 利用数据预组织加速查询
本文介绍了EMR Spark的Relational Cache如何从数据量较大的Cube中快速提取出所需数据加速查询的原理。通过列式存储、文件索引、Z-Order等技术,我们可以快速过滤数据,大大减少实际发生的IO数据量,避免IO瓶颈的出现,从而优化整体查询性能。
919 0
C#.Net组件开发 - 使用Attach to Process实时调试设计器代码(转)
本文转载于:CS框架网http://www.csframework.com/archive/2/arc-2-20110829-1811.htm C#.Net组件开发 - 使用Attach to Process实时调试设计器代码 组件设计的调试对象为两个:一是我们关注的组件本身(Component/Control),二是组件的设计器(Component Designer/Control Designer)。
798 0
Kafka零数据丢失的配置方案
这两年大数据行业发展前景较好,行业工程师薪资高、人才少、竞争压力小,很多人也因此想要转型成为大数据工程师,但也正是因为行业新、人才少,很多技术解决方案也是缺少很优质的答案。
342 0
+关注
北斗云
风舒云卷,,北斗不移
52
文章
8
问答
文章排行榜
最热
最新
相关电子书
更多
《2021云上架构与运维峰会演讲合集》
立即下载
《零基础CSS入门教程》
立即下载
《零基础HTML入门教程》
立即下载