创建消息队列(MQ)源表

简介:

什么是消息队列MQ

消息队列(Message Queue),简称MQ。是阿里云商用的专业消息中间件,是企业级互联网架构的核心产品。基于高可用分布式集群技术,搭建了包括发布订阅、消息轨迹、资源统计、定时(延时)、监控报警等一套完整的消息云服务,实现分布式计算场景中所有异步解耦功能。
实时计算 Flink可以将消息队列作为流式数据输入,如下:

示例

 
  
  1. create table metaq_stream(
  2. x varchar,
  3. y varchar,
  4. z varchar
  5. ) with (
  6. type='mq',
  7. topic='blink_dXXXXXXX',
  8. endpoint='onsaddr.aliyun.com',
  9. pullIntervalMs='100',
  10. accessId='xxx',
  11. accessKey='xxx',
  12. startMessageOffset='547',
  13. consumerGroup='CID_BLINK_SOURCE_001',
  14. fieldDelimiter='####'
  15. );

注意:MQ实际上是一个非结构化存储格式,对于数据的Schema不提供强制定义,完全由业务层指定。目前实时计算支持类CSV格式文本和二进制格式。

CSV类格式

对于CSV类格式,假定你的一条MQ消息记录格式如下:

 
  
  1. 1,name,male
  2. 2,name,female

一条MQ消息可以包括0条到多条数据记录,记录与记录之间使用\n分隔。您在实时计算必须定义该MQ的DDL:

 
  
  1. create table metaq_stream(
  2. id varchar,
  3. name varchar,
  4. sex varchar
  5. ) with (
  6. type='mq',
  7. topic='blink_dXXXXXXX',
  8. endpoint='onsaddr.aliyun.com',
  9. pullIntervalMs='100',
  10. accessId='xxx',
  11. accessKey='xxx',
  12. startMessageOffset='547',
  13. consumerGroup='CID_BLINK_SOURCE_001',
  14. fieldDelimiter='####'
  15. );

WITH参数

参数 注释说明 备注
topic topic名
endPoint endPoint地址 公共云内网接入(阿里云经典网络/VPC):华东1、华东2、华北1、华北2、华南1、
香港的区域endpoint的地址是:onsaddr-internal.aliyun.com:8080 
公共云公网接入地址是:http://onsaddr-internet.aliyun.com/rocketmq/nsaddr4client-internet
accessId accessId
accessKey accessKey
consumerGroup 订阅消费group名
pullIntervalMs 拉取时间间隔,毫秒
startTime 可选,消息消费启动的时间点
startMessageOffset 可选,消息开始的偏移量 如果填了优先以startMessageoffset的位点开始加载
tag 订阅的标签 可选
lineDelimiter 解析TT block时行分隔符 可选,默认为 “\n”
fieldDelimiter 字段分隔符 可选,默认为”\u0001” 表示 Crtl+A 和 \001
encoding 编码格式 可选,默认为 “utf-8”
lengthCheck 单行字段条数检查策略 可选,默认为SKIP,其它可选值为EXCEPTION、PAD。SKIP:字段数目不符合时跳过 。EXCEPTION:字段数目不符合时抛出异常。PAD:按顺序填充,不存在的置为null。
columnErrorDebug 是否打开调试开关,如果打开,会把解析异常的log打印出来 可选,默认为false

类型映射

MQ字段类型 建议实时计算字段类型
STRING VARCHAR
本文转自实时计算—— 创建消息队列(MQ)源表
相关实践学习
快速体验阿里云云消息队列RocketMQ版
本实验将带您快速体验使用云消息队列RocketMQ版Serverless系列实例进行获取接入点、创建Topic、创建订阅组、收发消息、查看消息轨迹和仪表盘。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
4月前
|
消息中间件 数据管理 Serverless
阿里云消息队列 Apache RocketMQ 创新论文入选顶会 ACM FSE 2025
阿里云消息团队基于 Apache RocketMQ 构建 Serverless 消息系统,适配多种主流消息协议(如 RabbitMQ、MQTT 和 Kafka),成功解决了传统中间件在可伸缩性、成本及元数据管理等方面的难题,并据此实现 ApsaraMQ 全系列产品 Serverless 化,助力企业提效降本。
|
2月前
|
消息中间件 Java Kafka
消息队列比较:Spring 微服务中的 Kafka 与 RabbitMQ
本文深入解析了 Kafka 和 RabbitMQ 两大主流消息队列在 Spring 微服务中的应用与对比。内容涵盖消息队列的基本原理、Kafka 与 RabbitMQ 的核心概念、各自优势及典型用例,并结合 Spring 生态的集成方式,帮助开发者根据实际需求选择合适的消息中间件,提升系统解耦、可扩展性与可靠性。
192 1
消息队列比较:Spring 微服务中的 Kafka 与 RabbitMQ
|
消息中间件 JSON Java
开发者如何使用轻量消息队列MNS
【10月更文挑战第19天】开发者如何使用轻量消息队列MNS
827 98
|
消息中间件 安全 Java
云消息队列RabbitMQ实践解决方案评测
一文带你详细了解云消息队列RabbitMQ实践的解决方案优与劣
338 113
|
消息中间件 存储 Kafka
MQ 消息队列核心原理,12 条最全面总结!
本文总结了消息队列的12个核心原理,涵盖消息顺序性、ACK机制、持久化及高可用性等内容。关注【mikechen的互联网架构】,10年+BAT架构经验倾囊相授。
|
消息中间件
解决方案 | 云消息队列RabbitMQ实践获奖名单公布!
云消息队列RabbitMQ实践获奖名单公布!
195 1
|
消息中间件 存储 弹性计算
云消息队列RabbitMQ实践
云消息队列RabbitMQ实践
|
消息中间件 存储 监控
解决方案 | 云消息队列RabbitMQ实践
在实际业务中,网站因消息堆积和高流量脉冲导致系统故障。为解决这些问题,云消息队列 RabbitMQ 版提供高性能的消息处理和海量消息堆积能力,确保系统在流量高峰时仍能稳定运行。迁移前需进行技术能力和成本效益评估,包括功能、性能、限制值及费用等方面。迁移步骤包括元数据迁移、创建用户、网络打通和数据迁移。
327 4
|
消息中间件 运维 监控
云消息队列RabbitMQ实践解决方案评测报告
本报告旨在对《云消息队列RabbitMQ实践》解决方案进行综合评测。通过对该方案的原理理解、部署体验、设计验证以及实际应用价值等方面进行全面分析,为用户提供详尽的反馈与建议。
252 16
|
消息中间件 弹性计算 运维
阿里云云消息队列RabbitMQ实践解决方案评测报告
阿里云云消息队列RabbitMQ实践解决方案评测报告
231 9

相关产品

  • 云消息队列 MQ