什么是数据总线(DataHub)
DataHub作为一个流式数据总线,为阿里云数加平台提供了大数据的入口服务,共同构建一站式的数据处理平台。实时计算 Flink通常使用DataHub作为流式数据存储头和输出目的端。同时,上游众多流式数据,包括DTS、IOT等均选择DataHub作为大数据平台的数据入口。DataHub本身是流数据存储,实时计算只能将其作为流式数据输入。示例如下:
create table datahub_stream(
name varchar,
age BIGINT,
birthday BIGINT
) with (
type='datahub',
endPoint='http://dXXXXXXXX.com',
project='blink_datahub_test',
topic='test_topic_1',
accessId='0i70XXXXXXXXs',
accessKey='yF60EwXXXXXXXXXnvQPJ2zhCfHU',
startTime='2017-07-21 00:00:00'
);
属性字段
Flink SQL支持获取DataHub的属性字段。能够记录每条信息写入DataHub的系统时间。
字段名 | 注释说明 |
---|---|
timestamp | 每条记录入datahub的systemtime |
示例
通过 HEADER
关键字获取属性字段。
例如,属性字段并不存在于DATAHUB的字段声明里。想获取每条记录入datahub的systemtime,可以将timestamp作为字段名,在后面加上HEADER
就可以取出想要的属性值。
测试数据
name(VARCHAT) | MsgID(VARCHAT) |
---|---|
ens_altar_flow | ems0a |
测试案例
CREATE TABLE datahub_log (
`timestamp` varchar HEADER,
result varchar,
MsgID varchar
)
WITH
(
type ='datahub'
);
CREATE TABLE RDS_out (
`timestamp` varchar,
MsgID varchar,
Version varchar
)
WITH
(
type ='RDS'
);
INSERT INTO RDS_out
SELECT
`timestamp`,
result,
MsgID
FROM
datahub_log;
测试结果
TIME(VARCHAT) | MsgID(VARCHAT) | Version(VARCHAT) |
---|---|---|
1522652455625 | ems0a | 0.0.1 |
WITH参数
目前只支持tuple模式的topic
参数 | 注释说明 | 备注 |
---|---|---|
endPoint | 消费端点信息 | DATAHUB的Endpoint地址 |
accessId | 读取的accessId | 无 |
accessKey | 读取的密钥 | 无 |
project | 读取的项目 | 无 |
topic | project下的具体的topic | 无 |
startTime | 启动位点的时间 | 格式为”yyyy-MM-dd hh:mm:ss” |
maxRetryTimes | 读取最大尝试次数 | 可选,默认为20。 |
retryIntervalMs | 重试间隔 | 可选,默认为1000。 |
batchReadSize | 单次读取条数 | 可选,默认为10,可设置的最大值为1000。 |
lengthCheck | 单行字段条数检查策略 | 可选,默认为SKIP,其它可选值为EXCEPTION、PAD。SKIP:字段数目不符合时跳过 。EXCEPTION:字段数目不符合时抛出异常。PAD:按顺序填充,不存在的置为null。 |
columnErrorDebug | 是否打开调试开关,如果打开,会把解析异常的log打印出来 | 可选,默认为false。 |
类型映射
DataHub和实时计算字段类型对应关系如下,建议使用该对应关系时进行DDL声明:
DataHub字段类型 | 实时计算字段类型 |
---|---|
BIGINT | BIGINT |
STRING | VARCHAR |
DOUBLE | DOUBLE |
TIMESTAMP | BIGINT |
BOOLEAN | BOOLEAN |
DECIMAL | DECIMAL |
注意:DataHub的TIMESTAMP是精确到微妙级别的,在Unix时间戳里是16位的。而实时计算定义的TIMESTAMP是精确到毫秒级别的,在Unix时间戳里是13位的所以建议大家使用BIGINT来映射。如果一定是要用TIMESTAMP建议使用计算列来做转换。
本文转自实时计算——
创建数据总线(DataHub)源表