点一下关注吧!!!非常感谢!!持续更新!!!
目前已经更新到了:
Hadoop(已更完)
HDFS(已更完)
MapReduce(已更完)
Hive(已更完)
Flume(已更完)
Sqoop(已更完)
Zookeeper(已更完)
HBase(已更完)
Redis (已更完)
Kafka(已更完)
Spark(已更完)
Flink(已更完)
ClickHouse(已更完)
Kudu(已更完)
Druid(已更完)
Kylin(正在更新…)
章节内容
上节我们完成了如下的内容:
Cube 的优化
案例 2 定义衍生维度及对比
聚合组 详细讲解
RowKeys 详细讲解
基本概念
实时数据更新是一种普遍的需求,快速分析趋势才能做正确的决策。
KylinV1.6发布了扩展StreamingCubing功能,它利用Hadoop消费Kafka数据的方式构建Cube,这种方式构建的Cube能满足分钟级的更新需求。
实现步骤
步骤:项目 => 定义数据源(Kafka)=> 定义Model => 定义Cube => Build Cube => 作业调度(频率高)
生成数据
从Kafka消费消息,每条消息都需要包含:
维度信息
度量信息
业务时间戳
每条消息的数据结构都应该相同,并且可以用同一个分析器将每条消息中的维度、度量和时间戳信息提取出来。
目前默认的分析器为:org.apache.kylin.source.kafka.TimedJsonStreamParser
创建数据
# 创建名为kylin_streaming_topic的topic kafka-topics.sh --create --zookeeper h121.wzk.icu:2181 --replication-factor 1 --partitions 1
执行结果如下图所示:
数据采样
设置采样器
kylin.sh org.apache.kylin.source.kafka.util.KafkaSampleProducer --topic kylin_streaming_topic1
发了一大批数据,如下图所示:
检查数据
检查数据是否发送成功:
kafka-console-consumer.sh --bootstrap-server h121.wzk.icu:9092 --topic kylin_streaming_topic
数据样例如下所示:
{"country":"INDIA","amount":44.47793969871658,"qty":3,"currency":"USD","order_time":1723358207350,"category":"TOY","device":"iOS","user":{"gender":"Female","id":"1c54f68e-f89a-b5d2-f802-45b60ffccf60","first_name":"unknown","age":15}} {"country":"AUSTRALIA","amount":64.86505054935878,"qty":9,"currency":"USD","order_time":1723358207361,"category":"TOY","device":"iOS","user":{"gender":"Female","id":"de11d872-e843-19c9-6b35-9263f1d1a2a1","first_name":"unknown","age":19}} {"country":"CANADA","amount":90.1591854077722,"qty":4,"currency":"USD","order_time":1723358207371,"category":"Other","device":"Andriod","user":{"gender":"Male","id":"4387ee8b-c8c1-1df4-f2ed-c4541cb97621","first_name":"unknown","age":26}} {"country":"INDIA","amount":59.17956535472526,"qty":2,"currency":"USD","order_time":172335820738
定义数据源
数据源选择Add Streaming Table:
点击之后,把刚才的JSON填写进去,就可以解析出来:
定义Kafka信息,填写对应的内容,如下图所示:
可以看到我们刚才添加的内容如下图所示:
定义Model
新建Model,如下图所示,名称随意:
原则DataModel,如下图所示:
选择维度Dimension信息:
选择度量Measures,如下图所示:
设置Setting中,设置对应的PartitionDateColumn信息,如下图:
定义Cube
名字随意,自己能分清就可以,如下图:
设置Dimensions信息如下图所示:
设置度量Measure信息如下图所示:
RefreshSetting设置信息如下图所示:
设置Aggregation Groups信息:
RowKeys的设置如下图所示:
StreamingCube 和 普通的Cube大致上一样,以下几点需要注意:
分区时间列应该Cube的一个Dimension,在SteamingOLAP中时间总是一个查询条件,Kylin利用它来缩小扫描分区的范围
不要使用order time作为dimmension 因为它非常精细,建议使用minute_start、hour_start或其他,取决于用户如何查询数据
定义 year_start、quarter_start、month_start、day_start、hour_start、minute_start或其他,取决于用户如何查询数据
在RefreshSetting设置中,创建更多合并的范围,如0.5时、4小时、1天、7天,这样设置有助于控制CubeSegment的数量
在RowKeys部分,拖拽minute_start到最上面的位置,对于Streaming查询,时间条件会一直显示,将其放到前面将会缩小扫描范围。
构建Cube
可以通过 HTTP 的方式完成构建
curl -X PUT --user ADMIN:KYLIN -H "Content-Type: application/json;charset=utf-8" -d '{ "sourceOffsetStart": 0, "sourceOffsetEnd": 92233720368
也可以使用WebUI,我比较喜欢用页面来构建:
执行查询
select minute_start, count(*), sum(amount), sum(qty) from streamingds1 group by minute_start order by minute_start
自动构建
用 crontab 来定时任务,让其定时执行:
crontab -e */20 * * * * curl -X PUT --user ADMIN:KYLIN -H "Content-Type:application/json;cha