创建消息队列(MQ)源表

简介:

什么是消息队列MQ

消息队列(Message Queue),简称MQ。是阿里云商用的专业消息中间件,是企业级互联网架构的核心产品。基于高可用分布式集群技术,搭建了包括发布订阅、消息轨迹、资源统计、定时(延时)、监控报警等一套完整的消息云服务,实现分布式计算场景中所有异步解耦功能。
实时计算 Flink可以将消息队列作为流式数据输入,如下:

示例

 
  
  1. create table metaq_stream(
  2. x varchar,
  3. y varchar,
  4. z varchar
  5. ) with (
  6. type='mq',
  7. topic='blink_dXXXXXXX',
  8. endpoint='onsaddr.aliyun.com',
  9. pullIntervalMs='100',
  10. accessId='xxx',
  11. accessKey='xxx',
  12. startMessageOffset='547',
  13. consumerGroup='CID_BLINK_SOURCE_001',
  14. fieldDelimiter='####'
  15. );

注意:MQ实际上是一个非结构化存储格式,对于数据的Schema不提供强制定义,完全由业务层指定。目前实时计算支持类CSV格式文本和二进制格式。

CSV类格式

对于CSV类格式,假定你的一条MQ消息记录格式如下:

 
  
  1. 1,name,male
  2. 2,name,female

一条MQ消息可以包括0条到多条数据记录,记录与记录之间使用\n分隔。您在实时计算必须定义该MQ的DDL:

 
  
  1. create table metaq_stream(
  2. id varchar,
  3. name varchar,
  4. sex varchar
  5. ) with (
  6. type='mq',
  7. topic='blink_dXXXXXXX',
  8. endpoint='onsaddr.aliyun.com',
  9. pullIntervalMs='100',
  10. accessId='xxx',
  11. accessKey='xxx',
  12. startMessageOffset='547',
  13. consumerGroup='CID_BLINK_SOURCE_001',
  14. fieldDelimiter='####'
  15. );

WITH参数

参数 注释说明 备注
topic topic名
endPoint endPoint地址 公共云内网接入(阿里云经典网络/VPC):华东1、华东2、华北1、华北2、华南1、
香港的区域endpoint的地址是:onsaddr-internal.aliyun.com:8080 
公共云公网接入地址是:http://onsaddr-internet.aliyun.com/rocketmq/nsaddr4client-internet
accessId accessId
accessKey accessKey
consumerGroup 订阅消费group名
pullIntervalMs 拉取时间间隔,毫秒
startTime 可选,消息消费启动的时间点
startMessageOffset 可选,消息开始的偏移量 如果填了优先以startMessageoffset的位点开始加载
tag 订阅的标签 可选
lineDelimiter 解析TT block时行分隔符 可选,默认为 “\n”
fieldDelimiter 字段分隔符 可选,默认为”\u0001” 表示 Crtl+A 和 \001
encoding 编码格式 可选,默认为 “utf-8”
lengthCheck 单行字段条数检查策略 可选,默认为SKIP,其它可选值为EXCEPTION、PAD。SKIP:字段数目不符合时跳过 。EXCEPTION:字段数目不符合时抛出异常。PAD:按顺序填充,不存在的置为null。
columnErrorDebug 是否打开调试开关,如果打开,会把解析异常的log打印出来 可选,默认为false

类型映射

MQ字段类型 建议实时计算字段类型
STRING VARCHAR
本文转自实时计算—— 创建消息队列(MQ)源表
相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
2月前
|
消息中间件 C语言 RocketMQ
消息队列 MQ操作报错合集之出现"Connection reset by peer"的错误,该如何处理
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
2月前
|
消息中间件 Java C语言
消息队列 MQ使用问题之在使用C++客户端和GBase的ESQL进行编译时出现core dump,该怎么办
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
22天前
|
消息中间件 存储 Java
【揭秘】RocketMQ内部运作大揭秘:一探究竟,原来消息队列是这样工作的!
【8月更文挑战第19天】RocketMQ是一款高性能、高可用的消息中间件,在分布式系统中至关重要。它采用发布/订阅模式,支持高吞吐量的消息传递。核心组件包括管理元数据的NameServer、存储消息的Broker以及Producer和Consumer。RocketMQ支持发布/订阅与点对点两种模型,并具备复杂的消息持久化和路由机制。通过Java API示例,可轻松实现消息的发送与接收。RocketMQ凭借其出色的特性和可靠性,成为大型分布式系统首选的消息解决方案。
45 5
|
29天前
|
消息中间件 存储 缓存
一个用过消息队列的人,竟不知为何要用 MQ?
一个用过消息队列的人,竟不知为何要用 MQ?
62 1
|
2月前
|
消息中间件 Java 物联网
消息队列 MQ操作报错合集之建立连接时发生了超时错误,该如何解决
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
消息队列 MQ操作报错合集之建立连接时发生了超时错误,该如何解决
|
26天前
|
消息中间件 网络架构
RabbitMQ消息队列常见面试题
这篇文章总结了RabbitMQ的常见面试题,涵盖了消息模型、使用场景、实现功能、消息幂等性、顺序性、堆积和丢失的避免方法,以及推模式和拉模式的区别。
37 0
|
27天前
|
消息中间件 Java Kafka
MQ 消息队列 比较
MQ 消息队列 比较
27 0
|
2月前
|
消息中间件 JavaScript Linux
消息队列 MQ操作报错合集之客户端在启动时遇到了连接错误,是什么原因
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
2月前
|
消息中间件 Java 开发工具
消息队列 MQ使用问题之如何使用DefaultMQPushConsumer来消费消息
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
2月前
|
消息中间件 运维 Go
消息队列 MQ使用问题之如何配置生产环境
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。

相关产品

  • 云消息队列 MQ
  • 下一篇
    DDNS