创建DataHub源表
数据总线(DATAHUB)
DataHub作为一个流式数据总线,为阿里云数加平台提供了大数据的入口服务。结合阿里云众多云产品,可以构建一站式的数据处理平台。流计算通常使用DataHub作为流式数据存储头和输出目的端。同时,上游众多流式数据,包括DTS、IOT等均选择DataHub作为大数据平台的数据入口。
DataHub本身是流数据存储,Flink可将其作为批处理的输入。示例如下:
create table datahub_stream(name varchar,age BIGINT,birthday BIGINT) with (type='datahub',endPoint='http://dh-et2.aliyun-inc.com',project='blink_datahub_test',topic='test_topic_1',accessId='0i70RRFJXXXX',accessKey='yF60EwURseo1UAn4NiXXXXXhCfHU',startTime='2017-07-21 00:00:00',endTime='2017-07-21 01:00:00',blinkEnvironmentTypeKey='batchExec');
注意: 作为批处理的输入时,必须指定起始及结束时间。
属性字段
目前默认支持的属性字段如下,也支持其他自定义写入的字段:
| 字段名 | 注释说明 |
|---|---|
timestamp |
每条记录入datahub的systemtime |
属性字段的定义和获取详见获取源表属性字段。
WITH参数
目前只支持tuple模式的topic。
| 参数 | 注释说明 | 备注 |
|---|---|---|
| endPoint | 消费端点信息 | 无 |
| accessId | 读取的accessId | 无 |
| accessKey | 读取的密钥 | 无 |
| project | 读取的项目 | 无 |
| topic | project下的具体的topic | 无 |
| startTime | 日志开始时间 | 格式为yyyy-MM-dd hh:mm:ss |
| maxRetryTimes | 读取最大尝试次数 | 可选,默认为20。 |
| retryIntervalMs | 重试间隔 | 可选,默认为1000。 |
| batchReadSize | 单次读取条数 | 可选,默认为10。 |
| lengthCheck | 单行字段条数检查策略 | 可选,默认为SKIP。其它可选值为EXCEPTION、PAD。SKIP表示字段数目不符合时跳过 。 EXCEPTION表示字段数目不符合时抛出异常。 PAD表示按顺序填充,不存在的置为null。 |
| columnErrorDebug | 是否打开调试开关,如果打开,会把解析异常的log打印出来。 | 可选,默认为false。 |
| isBlob | datahub是否为blob类型 | 可选,默认为false。2.0.x版本开始支持 |
| endTime | datahub日志数据结束时间 | 批处理方式读取datahub数据时,必须声明该参数。 |
注意:使用blob类型时,字段需要声明成varbinary类型,与metaq类似。
类型映射
DataHub和流计算字段类型对应关系,建议您使用该对应关系进行DDL声明。
| DataHub字段类型 | 流计算字段类型 |
|---|---|
| BIGINT | BIGINT |
| DOUBLE | DOUBLE |
| TIMESTAMP | BIGINT |
| BOOLEAN | BOOLEAN |
| DECIMAL | DECIMAL |
本文转自实时计算——
创建DataHub源表