创建数据总线(DataHub)源表

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介:

什么是数据总线(DataHub)

DataHub作为一个流式数据总线,为阿里云数加平台提供了大数据的入口服务,共同构建一站式的数据处理平台。实时计算 Flink通常使用DataHub作为流式数据存储头和输出目的端。同时,上游众多流式数据,包括DTS、IOT等均选择DataHub作为大数据平台的数据入口。DataHub本身是流数据存储,实时计算只能将其作为流式数据输入。示例如下:


  
  
  1. create table datahub_stream(
  2. name varchar,
  3. age BIGINT,
  4. birthday BIGINT
  5. ) with (
  6. type='datahub',
  7. endPoint='http://dXXXXXXXX.com',
  8. project='blink_datahub_test',
  9. topic='test_topic_1',
  10. accessId='0i70XXXXXXXXs',
  11. accessKey='yF60EwXXXXXXXXXnvQPJ2zhCfHU',
  12. startTime='2017-07-21 00:00:00'
  13. );

属性字段

Flink SQL支持获取DataHub的属性字段。能够记录每条信息写入DataHub的系统时间。

如图所示:12421

字段名 注释说明
timestamp 每条记录入datahub的systemtime

示例

通过 HEADER 关键字获取属性字段。

例如,属性字段并不存在于DATAHUB的字段声明里。想获取每条记录入datahub的systemtime,可以将timestamp作为字段名,在后面加上HEADER就可以取出想要的属性值。

测试数据

name(VARCHAT) MsgID(VARCHAT)
ens_altar_flow ems0a

测试案例


  
  
  1. CREATE TABLE datahub_log (
  2. `timestamp` varchar HEADER,
  3. result varchar
  4. MsgID varchar
  5. )
  6. WITH
  7. (
  8. type ='datahub'
  9. );
  10. CREATE TABLE RDS_out (
  11. `timestamp` varchar,
  12. MsgID varchar,
  13. Version varchar
  14. )
  15. WITH
  16. (
  17. type ='RDS'
  18. );
  19. INSERT INTO RDS_out
  20. SELECT
  21. `timestamp`,
  22. result,
  23. MsgID
  24. FROM
  25. datahub_log;

测试结果

TIME(VARCHAT) MsgID(VARCHAT) Version(VARCHAT)
1522652455625 ems0a 0.0.1

WITH参数

目前只支持tuple模式的topic

参数 注释说明 备注
endPoint 消费端点信息 DATAHUB的Endpoint地址
accessId 读取的accessId
accessKey 读取的密钥
project 读取的项目
topic project下的具体的topic
startTime 启动位点的时间 格式为”yyyy-MM-dd hh:mm:ss”
maxRetryTimes 读取最大尝试次数 可选,默认为20。
retryIntervalMs 重试间隔 可选,默认为1000。
batchReadSize 单次读取条数 可选,默认为10,可设置的最大值为1000。
lengthCheck 单行字段条数检查策略 可选,默认为SKIP,其它可选值为EXCEPTION、PAD。SKIP:字段数目不符合时跳过 。EXCEPTION:字段数目不符合时抛出异常。PAD:按顺序填充,不存在的置为null。
columnErrorDebug 是否打开调试开关,如果打开,会把解析异常的log打印出来 可选,默认为false。

类型映射

DataHub和实时计算字段类型对应关系如下,建议使用该对应关系时进行DDL声明:

DataHub字段类型 实时计算字段类型
BIGINT BIGINT
STRING VARCHAR
DOUBLE DOUBLE
TIMESTAMP BIGINT
BOOLEAN BOOLEAN
DECIMAL DECIMAL

注意:DataHub的TIMESTAMP是精确到微妙级别的,在Unix时间戳里是16位的。而实时计算定义的TIMESTAMP是精确到毫秒级别的,在Unix时间戳里是13位的所以建议大家使用BIGINT来映射。如果一定是要用TIMESTAMP建议使用计算列来做转换。


本文转自实时计算—— 创建数据总线(DataHub)源表
相关文章
|
8月前
|
消息中间件 分布式计算 DataWorks
DataWorks常见问题之kafka数据导入datahub失败如何解决
DataWorks是阿里云提供的一站式大数据开发与管理平台,支持数据集成、数据开发、数据治理等功能;在本汇总中,我们梳理了DataWorks产品在使用过程中经常遇到的问题及解答,以助用户在数据处理和分析工作中提高效率,降低难度。
|
SQL 关系型数据库 数据管理
Datahub实践——Sqllineage解析Sql实现端到端数据血缘
Datahub实践——Sqllineage解析Sql实现端到端数据血缘
1574 1
|
数据采集 JSON 关系型数据库
将 MySQL 数据抽取并写入 DataHub,您可以按照以下步骤进行
将 MySQL 数据抽取并写入 DataHub,您可以按照以下步骤进行
588 2
|
7月前
|
数据采集 DataWorks 监控
DataWorks产品使用合集之mysql-cdc读取数据写入到datahub中,datahub如何转换时区
DataWorks作为一站式的数据开发与治理平台,提供了从数据采集、清洗、开发、调度、服务化、质量监控到安全管理的全套解决方案,帮助企业构建高效、规范、安全的大数据处理体系。以下是对DataWorks产品使用合集的概述,涵盖数据处理的各个环节。
67 1
|
8月前
|
存储 监控 Apache
Flink整库同步 如何把数据丢入到 datahub 中
Flink整库同步 如何把数据丢入到 datahub 中
|
数据采集 大数据 数据挖掘
企业级数据治理工作怎么开展?Datahub这样做
企业级数据治理工作怎么开展?Datahub这样做
195 0
|
数据采集 JSON 关系型数据库
将 MySQL 数据抽取并写入 DataHub
将 MySQL 数据抽取并写入 DataHub
286 3
|
Java API Maven
Fink在处理DataHub数据源时无法正确识别RecordData类的字段
Fink在处理DataHub数据源时无法正确识别RecordData类的字段
116 1