StarRocks 【新一代MPP数据库】(3)https://developer.aliyun.com/article/1534283
3.3、Rountine Load
如果您需要将消息流不间断地导入至 StarRocks,则可以将消息流存储在 Kafka 的 Topic 中,并向 StarRocks 提交一个 Routine Load 导入作业。 StarRocks 会常驻地运行这个导入作业,持续生成一系列导入任务,消费 Kafka 集群中该 Topic 中的全部或部分分区的消息并导入到 StarRocks 中。
Routine Load 支持 Exactly-Once 语义,能够保证数据不丢不重。
Routine Load 目前支持从 Kakfa 集群中消费 CSV、JSON 格式的数据。
环境要求:
(1)支持访问无认证或使用 SSL 方式认证的 Kafka 集群。
(2)支持的消息格式为 CSV 文本格式,每一个 message 为一行,且行尾不包含换行符。
(3)仅支持 Kafka 0.10.0.0(含) 以上版本。
3.3.1、测试把Kafka的数据导入到StarRocks
StarRocks 创建表格:
CREATE TABLE test_kafka_routine( `order_id` bigint NOT NULL COMMENT "订单编号", `pay_dt` date NOT NULL COMMENT "支付日期", `customer_name` varchar(26) NULL COMMENT "顾客姓名", `nationality` varchar(26) NULL COMMENT "国籍", `price`double NULL COMMENT "支付金额" ) ENGINE=OLAP DUPLICATE KEY (order_id,pay_dt) DISTRIBUTED BY HASH(`order_id`) BUCKETS 5;
创建作业(配置Kafka输入源参数):
CREATE ROUTINE LOAD test_db.test_kafka_routine_like ON test_kafka_routine COLUMNS TERMINATED BY ",", COLUMNS (order_id, pay_dt, customer_name, nationality, temp_gender, price) PROPERTIES ( "desired_concurrent_number" = "3" #并行度(BE的数量) ) FROM KAFKA ( "kafka_broker_list" ="hadoop102:9092,hadoop103:9092,hadoop104:9092", "kafka_topic" = "like", "kafka_partitions" ="0,1,2", "property.kafka_default_offsets" = "OFFSET_BEGINNING" );
查询所有任务:
启动 Kafka 生产者:
查询:
可以看到,我们写入到 Kafka like 分区的数据被成功写入到表格中。
继续测试:
3.3.2、查看导入作业
除了上面的 show routine load\G 查看所有导入的作业之外,还可以通过下面的这些方法查看导入的作业:
通过名称查看导入作业
3.3.3、查看导入任务
3.3.4、暂停导入作业
3.3.5、恢复导入作业
RESUME ROUTINE LOAD FOR test_kafka_routine_like \G
3.3.6、停止导入作业
3.3.7、修改导入作业
官方文档写得很详细
4、使用 StarRocks
4.1、Colocate join
Colocate Join 功能是分布式系统实现 Join 数据分布的策略之一,能够减少数据多节点分布时 Join 操作引起的数据移动和网络传输,从而提高查询性能。(也就是把要 join 的数据尽可能放到相同的 BE 节点)
在 StarRocks 中使用 Colocate Join 功能,您需要在建表时为其指定一个 Colocation Group(CG),同一 CG 内的表需遵循相同的 Colocation Group Schema(CGS),即表对应的分桶副本具有一致的分桶键、副本数量和副本放置方式。如此可以保证同一 CG 内,所有表的数据分布在相同一组 BE 节点上。当 Join 列为分桶键时,计算节点只需做本地 Join,从而减少数据在节点间的传输耗时,提高查询性能。因此,Colocate Join,相对于其他 Join,例如 Shuffle Join 和 Broadcast Join,可以有效避免数据网络传输开销,提高查询性能。
Colocate Join 支持等值 Join。
StarRocks 【新一代MPP数据库】(5)https://developer.aliyun.com/article/1534285