太方便了!DLA消息回执,及时通知您的异步Query

本文涉及的产品
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
云数据库 RDS MySQL,集群版 2核4GB 100GB
推荐场景:
搭建个人博客
对象存储 OSS,20GB 3个月
简介: 一、DLA介绍 数据湖(Data Lake)是时下热门的概念,更多阅读可以参考:https://en.wikipedia.org/wiki/Data_lake。基于数据湖,可以不用做任何ETL、数据搬迁等过程,实现跨各种异构数据源进行大数据关联分析,从而极大的节省成本和提升用户体验。

一、DLA介绍

数据湖(Data Lake)是时下热门的概念,更多阅读可以参考:https://en.wikipedia.org/wiki/Data_lake。基于数据湖,可以不用做任何ETL、数据搬迁等过程,实现跨各种异构数据源进行大数据关联分析,从而极大的节省成本和提升用户体验。

终于,阿里云现在也有了自己的数据湖分析产品(Data Lake Analytics,后续简称DLA):https://www.aliyun.com/product/datalakeanalytics

产品文档:https://help.aliyun.com/product/70174.html

二、本案背景

因为一般OLAP的场景,SQL通常比较复杂也比较耗时,如果程序一直阻塞等待的话,会影响业务,因此DLA支持异步执行任务。在SQL上加一个Hint即可,相关的查询结果会默认写入到您的OSS目录中(如果OSS未开通则无法使用异步查询,目录是DLA默认规划好的),并且会返回这个Query对应的ID回执给您,这个回执在后续有用:

mysql> /*+ run-async=true */select * from test_table1;
+--------------------------------+
| ASYNC_TASK_ID                  |
+--------------------------------+
| q201811021109sh8d1a0b750000182 |
+--------------------------------+
1 row in set (0.04 sec)

如果您想要查看您的异步SQL任务执行状况,可以执行另一个SQL:

mysql> show query_task where id = 'q201811021109sh8d1a0b750000182'\G
*************************** 1. row ***************************
                  id: q201811021109sh8d1a0b750000182
        mpp_query_id: 20181102_030939_149_svnhw
              status: SUCCESS
           task_name: SELECT
        table_schema: sh_tpch
             command: /*+ run-async=true */select * from test_table1
          creator_id: ${您的dla账号}
         create_time: 2018-11-02 11:09:40.0
         update_time: 2018-11-02 11:09:40.0
       connection_id: 693929276088405
             message: 
           row_count: 2
         elapse_time: 692
  scanned_data_bytes: 147
result_file_oss_file: oss://aliyun-oa-query-results-${您的uid}-oss-cn-shanghai/DLA_Result/2018/11/02/q201811021109sh8d1a0b750000182/result.csv
    cancellable_task: 0
          mq_product: NULL
            mq_topic: NULL
      mq_producer_id: NULL
            mq_model: NULL
           mq_status: NULL
        mq_error_msg: NULL
       mq_message_id: NULL
       mq_total_time: NULL
1 row in set (0.02 sec)

其中status: SUCCESS(RUNNING表示运行中,FAILURE表示已失败,CANCELLED表示用户取消,等等)表示这个Query执行成功,result_file_oss_file: oss://aliyun-oa-query-results-.......就是存储到您的OSSQuery的结果集文件在您的OSS上,elapse_time: 692表示整体Query花费的时间,scanned_data_bytes: 147表示Query扫描的数据量(收费依据),等等信息;

关键问题是,用户程序需要一直轮询执行show query_task,才能知道Query的状态是不是从RUNNING转移到SUCCESS状态,这样对用户的程序影响很大,因此需要有通知机制来及时告知用户程序。

所以,DLA最近支持了消息回执的机制,帮助用户快速、异步感知任务执行状态。目前DLA已经支持将消息写入到阿里云上的__ONS__(https://www.aliyun.com/product/ons)服务,后续还会考虑支持kafka(https://www.aliyun.com/product/kafka)、mns(https://www.aliyun.com/product/mns)、http等接口或服务;

下面,就以我们的测试账号,来介绍下如何在DLA中使用ONS来做消息回执通知。

三、准备工作

a)开通ONS服务,进入控制台

进入https://www.aliyun.com/product/ons 开通服务,然后进入ons的控制台:https://ons.console.aliyun.com/

image.png | left | 827x355

b)选择与DLA相同的Region,创建相关的元信息

1. 在这里,我们选择华东2集群,作为我们的测试集群:

image.png | left | 827x368

2. 创建Topic,专门接收DLA的消息:

image.png | left | 827x375

3. 为DLA创建一个生产者:

image.png | left | 827x351

image.png | left | 827x353

c)为DLA开通角色授权,允许DLA给您的topic发送消息

1. 给DLA云账号授权:

访问连接:https://ram.console.aliyun.com/#/role/authorize?request=%7B%22Requests%22:%20%7B%22request1%22:%20%7B%22RoleName%22:%20%22AliyunOpenAnalyticsAccessingMQRole%22,%20%22TemplateId%22:%20%22MQRole%22%7D%7D,%20%22ReturnUrl%22:%20%22https:%2F%2Fopenanalytics.console.aliyun.com%2F%22,%20%22Service%22:%20%22OpenAnalytics%22%7D

image.png | left | 827x313

2. 在访问控制(https://ram.console.aliyun.com)中,查看自己的角色列表:

image.png | left | 827x461

3. 确认一下,确实是只有PUB权限:

image.png | left | 827x300

四、开始使用:

a)目前消息回执,也是通过在SQL上增加hint的方式来通知:

mysql> /*+ run_async=true, mq-notify-by=ons, mq-topic=dla_shanghai_topic, mq-producer-id=PID_dla_shanghai */ select * from test_table1;
+--------------------------------+
| ASYNC_TASK_ID                  |
+--------------------------------+
| q201811021152sh8d1a0b750000191 |
+--------------------------------+
1 row in set (0.03 sec)

其中mq-notify-by=ons, mq-topic=dla_shanghai_topic, mq-producer-id=PID_dla_shanghai是上面已经配置过的。

b)执行show query_task看下效果:

mysql> show query_task where id = 'q201811021152sh8d1a0b750000191'\G
*************************** 1. row ***************************
                  id: q201811021152sh8d1a0b750000191
        mpp_query_id: 20181102_035233_150_svnhw
              status: SUCCESS
           task_name: SELECT
        table_schema: sh_tpch
             command: /*+ run_async=true, mq-notify-by=ons, mq-topic=dla_shanghai_topic, mq-producer-id=PID_dla_shanghai */ select * from test_table1
          creator_id: ${您的dla账号}
         create_time: 2018-11-02 11:52:34.0
         update_time: 2018-11-02 11:52:35.0
       connection_id: 693929276092181
             message: 
           row_count: 2
         elapse_time: 814
  scanned_data_bytes: 147
result_file_oss_file: oss://aliyun-oa-query-results-${您的uid}-oss-cn-shanghai/DLA_Result/2018/11/02/q201811021152sh8d1a0b750000191/result.csv
    cancellable_task: 0
          mq_product: ons
            mq_topic: dla_shanghai_topic
      mq_producer_id: PID_dla_shanghai
            mq_model: NULL
           mq_status: SUCCESS
        mq_error_msg: NULL
       mq_message_id: AC1314094B444E0E2F2A07B2C1E40000
       mq_total_time: 1834
1 row in set (0.02 sec)

上面的mq_xxx列,都是关于消息通知的信息。其中mq_status: SUCCESS表示消息发送成功,而mq_message_id: AC1314094B444E0E2F2A07B2C1E40000则是这个消息回执的消息id。

c)进入消息查询,查看消息情况:

image.png | left | 827x444

image.png | left | 827x436

d)消息消费:

消息是有3天保质期的,过期了消息就会被清理掉,请及时消费掉。具体的消息消费过程,请查看文档:https://help.aliyun.com/document_detail/29551.html,这里我不再赘述。

相关实践学习
基于CentOS快速搭建LAMP环境
本教程介绍如何搭建LAMP环境,其中LAMP分别代表Linux、Apache、MySQL和PHP。
全面了解阿里云能为你做什么
阿里云在全球各地部署高效节能的绿色数据中心,利用清洁计算为万物互联的新世界提供源源不断的能源动力,目前开服的区域包括中国(华北、华东、华南、香港)、新加坡、美国(美东、美西)、欧洲、中东、澳大利亚、日本。目前阿里云的产品涵盖弹性计算、数据库、存储与CDN、分析与搜索、云通信、网络、管理与监控、应用服务、互联网中间件、移动服务、视频服务等。通过本课程,来了解阿里云能够为你的业务带来哪些帮助     相关的阿里云产品:云服务器ECS 云服务器 ECS(Elastic Compute Service)是一种弹性可伸缩的计算服务,助您降低 IT 成本,提升运维效率,使您更专注于核心业务创新。产品详情: https://www.aliyun.com/product/ecs
相关文章
|
2月前
|
JSON 机器人 数据安全/隐私保护
钉钉中,如何获取机器人发送群聊消息接口返回的加密消息id(processQueryKey)?
钉钉中,如何获取机器人发送群聊消息接口返回的加密消息id(processQueryKey)?【1月更文挑战第5天】【1月更文挑战第24篇】
112 5
|
1月前
|
消息中间件 SQL Kafka
实时计算 Flink版产品使用问题之在重试失败后如何通过回调的方式来手动关闭数据源连接
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
1月前
|
消息中间件 Java 测试技术
消息队列 MQ操作报错合集之设置了setKeepAliveInterval(1)但仍然出现客户端未连接,该怎么解决
在使用消息队列MQ时,可能会遇到各种报错情况。以下是一些常见的错误场景、可能的原因以及解决建议的汇总:1.连接错误、2.消息发送失败、3.消息消费报错、4.消息重试与死信处理、5.资源与权限问题、6.配置错误、7.系统资源限制、8.版本兼容性问题。
|
2月前
|
Windows
微信消息实现自动推送--方式一 成功啦 进来学
微信消息实现自动推送--方式一 成功啦 进来学
57 1
|
2月前
|
消息中间件 运维 Serverless
Serverless 应用引擎产品使用之在阿里云函数计算中,使用了RocketMQ的触发器,并且发送和接收消息都没有问题,但是消息轨迹中没有体现出来消费的情况如何解决
阿里云Serverless 应用引擎(SAE)提供了完整的微服务应用生命周期管理能力,包括应用部署、服务治理、开发运维、资源管理等功能,并通过扩展功能支持多环境管理、API Gateway、事件驱动等高级应用场景,帮助企业快速构建、部署、运维和扩展微服务架构,实现Serverless化的应用部署与运维模式。以下是对SAE产品使用合集的概述,包括应用管理、服务治理、开发运维、资源管理等方面。
|
2月前
|
消息中间件 Kafka API
Kafka - 异步/同步发送API
Kafka - 异步/同步发送API
61 0
|
9月前
|
DataWorks 网络架构
DataWorks节点执行成功后,可以通过Rest通知进行消息通知
DataWorks节点执行成功后,可以通过Rest通知进行消息通知
55 1
|
消息中间件 关系型数据库 MySQL
SpringBoot-Kafka(生产者事务、手动提交offset、定时消费、消息转发、过滤消息内容、自定义分区器、提高吞吐量)
SpringBoot-Kafka(生产者事务、手动提交offset、定时消费、消息转发、过滤消息内容、自定义分区器、提高吞吐量)
SpringBoot-Kafka(生产者事务、手动提交offset、定时消费、消息转发、过滤消息内容、自定义分区器、提高吞吐量)
如何实现短信API批量发送
在短信实际的发送当中,不仅仅是发送单条验证码短信,还有很多通知短信,或者营销短信也要发送,而如果用单条提交的方式,发送效率会很低,本文将为您介绍,如何用接口实现批量发送短信通知、营销的方法。
如何实现短信API批量发送
|
消息中间件 存储 算法
多类型业务消息专题-定时消息| 学习笔记
快速学习多类型业务消息专题-定时消息
154 0
多类型业务消息专题-定时消息| 学习笔记