什么是消息队列MQ
消息队列(Message Queue),简称MQ。是阿里云商用的专业消息中间件,是企业级互联网架构的核心产品。基于高可用分布式集群技术,搭建了包括发布订阅、消息轨迹、资源统计、定时(延时)、监控报警等一套完整的消息云服务,实现分布式计算场景中所有异步解耦功能。
实时计算 Flink可以将消息队列作为流式数据输入,如下:
示例
create table metaq_stream(
x varchar,
y varchar,
z varchar
) with (
type='mq',
topic='blink_dXXXXXXX',
endpoint='onsaddr.aliyun.com',
pullIntervalMs='100',
accessId='xxx',
accessKey='xxx',
startMessageOffset='547',
consumerGroup='CID_BLINK_SOURCE_001',
fieldDelimiter='####'
);
注意:MQ实际上是一个非结构化存储格式,对于数据的Schema不提供强制定义,完全由业务层指定。目前实时计算支持类CSV格式文本和二进制格式。
CSV类格式
对于CSV类格式,假定你的一条MQ消息记录格式如下:
1,name,male
2,name,female
一条MQ消息可以包括0条到多条数据记录,记录与记录之间使用\n
分隔。您在实时计算必须定义该MQ的DDL:
create table metaq_stream(
id varchar,
name varchar,
sex varchar
) with (
type='mq',
topic='blink_dXXXXXXX',
endpoint='onsaddr.aliyun.com',
pullIntervalMs='100',
accessId='xxx',
accessKey='xxx',
startMessageOffset='547',
consumerGroup='CID_BLINK_SOURCE_001',
fieldDelimiter='####'
);
WITH参数
参数 | 注释说明 | 备注 |
---|---|---|
topic | topic名 | 无 |
endPoint | endPoint地址 | 公共云内网接入(阿里云经典网络/VPC):华东1、华东2、华北1、华北2、华南1、 香港的区域endpoint的地址是: onsaddr-internal.aliyun.com:8080 公共云公网接入地址是: http://onsaddr-internet.aliyun.com/rocketmq/nsaddr4client-internet |
accessId | accessId | 无 |
accessKey | accessKey | 无 |
consumerGroup | 订阅消费group名 | 无 |
pullIntervalMs | 拉取时间间隔,毫秒 | 无 |
startTime | 可选,消息消费启动的时间点 | 无 |
startMessageOffset | 可选,消息开始的偏移量 如果填了优先以startMessageoffset的位点开始加载 | 无 |
tag | 订阅的标签 | 可选 |
lineDelimiter | 解析TT block时行分隔符 | 可选,默认为 “\n” |
fieldDelimiter | 字段分隔符 | 可选,默认为”\u0001” 表示 Crtl+A 和 \001 |
encoding | 编码格式 | 可选,默认为 “utf-8” |
lengthCheck | 单行字段条数检查策略 | 可选,默认为SKIP,其它可选值为EXCEPTION、PAD。SKIP:字段数目不符合时跳过 。EXCEPTION:字段数目不符合时抛出异常。PAD:按顺序填充,不存在的置为null。 |
columnErrorDebug | 是否打开调试开关,如果打开,会把解析异常的log打印出来 | 可选,默认为false |
类型映射
MQ字段类型 | 建议实时计算字段类型 |
---|---|
STRING | VARCHAR |
本文转自实时计算——
创建消息队列(MQ)源表