创建MetaQ (MQ)源表
消息队列(Message Queue)简称MQ,是阿里云商用的专业消息中间件,是企业级互联网架构的核心产品,基于高可用分布式集群技术,搭建了包括发布订阅、消息轨迹、资源统计、定时(延时)、监控报警等一套完整的消息云服务。MQ的历史已经超过了7年,帮您实现分布式计算场景中所有异步解耦功能,是阿里双11使用的核心产品。
Flink可以将消息队列作为批处理数据输入,作为批处理输入源时,必须写endTime,否则作业不会正常结束,示例如下。
create table metaq_batch(
x varchar,
y varchar,
z varchar
) with (
type='metaq',
topic='blink_dXXXXXXX',
pullIntervalMs='100',
consumerGroup='CID_BLINK_SOURCE_001',
fieldDelimiter='#',
startTime='20180806 00:00:00',
endTime='20180806 01:00:00'
);
注意:预发环境访问metaq时unitName需要置为
pre
。
WITH参数
参数 | 注释说明 | 备注 |
---|---|---|
topic | topic名 | 无 |
consumerGroup | 订阅消费group名 | 无 |
pullIntervalMs | 拉取时间间隔,毫秒 | 无 |
startTime | 可选,消息消费启动的时间点 | 无 |
unitName | 跨单元访问时需指明app所在单元 | 默认为空,访问预发环境metaq时unitName需要置为pre 。 |
tag | 订阅的标签 | 可选 |
lineDelimiter | 解析message body时的行分隔符 | 可选,默认为 \n 。 |
fieldDelimiter | 字段分隔符 | 可选,默认为\u0001 ,表示 Crtl+A 和 \001 ,(暂不支持\001 写法)。 |
encoding | 编码格式 | 可选,默认为 utf-8 。 |
lengthCheck | 单行字段条数检查策略 | 可选,默认为SKIP。其它可选值为EXCEPTION 、PAD 。SKIP 表示字段数目不符合时跳过 。EXCEPTION 表示字段数目不符合时抛出异常。PAD 表示按顺序填充,不存在的置为null 。 |
columnErrorDebug | 是否打开调试开关,如果打开,会把解析异常的log打印出来。 | 可选,默认为false 。 |
startMessageOffset | 可选,消息开始的偏移量 如果填了优先以startMessageoffset的位点开始加载。 | 1.4.5之后不推荐使用。 |
endTime | 读取结束时间 | 以batch方式读取时,endTime必填。 |
FAQ
-
怎么样自己解析MetaQ表的数据?
对于有些场景来说,MetaQ表里面存的可能是二进制,或者json格式。如果您希望自己去解析,可以参考自定义源表解析。
-
MetaQ控制台报警消费堆积怎么办?
Flink消费MetaQ采用的是pull模式。在pull模式下,metaq控制台的消费堆积报警意义不大,您可以忽略。您可以在bayes平台上配置延迟等指标来监控metaq的消费情况。
本文转自实时计算——
创建MetaQ (MQ)源表