鱼跟猫 2019-09-29 1567浏览量
从EMR-3.21.0版本开始,EMR正式发布了Spark Streaming SQL功能,支持使用Spark SQL进行流式数据处理。经过两个版本的迭代,不少用户反馈当使用SQL进行流式作业开发时,查询结果正确性的调试过程比较麻烦。当前,我们需要完成用户真实的数据流开发,才能在结果存储系统中查看结果是否正确。有些数据存储系统又不方便查看,例如Kafka。这里简单罗列几点不便于调试的问题:
除此外,还有一些高级功能也可以考虑到调试工具中,例如:
本文将介绍EMR提供的流式SQL调试功能,它可以很好地解决了SQL调试中的基本需求。高级调试功能也将会在后续的迭代中逐步放出。
注:本功能将在EMR-3.23.0版本提供出来。
我想实现以下功能,将kafka的一个binlog主题同步到Kafka的另一个主题中。
CREATE TABLE rds_binlog
USING kafka
OPTIONS (
kafka.bootstrap.servers='a.b.c.d:9092',
subscribe='rds-binlog')
CREATE TABLE result_table
USING kafka
OPTIONS (
kafka.bootstrap.servers='a.b.c.d:9092',
subscribe='result-table')
CREATE SCAN rds_binlog_stream_read ON rds_binlog USING stream;
CREATE STREAM sync_rds_binlog
OPTIONS(
triggerInterval=1000,
checkpointLocation='/tmp/checkpoint/sync_rds_binlog')
INSERT INTO result_table
SELECT cast(value as string), *
FROM rds_binlog_stream_read;
我现在不确定查询结果是不是符合预期,所以下面我就用到流式SQL的调试功能。上面的SQL需要做一点点修改,这里不能直接将查询结果写到“result_table”,而是改成调试表“stream_debug_table”。Spark会默认创建出这个表。即改成如下:
CREATE STREAM sync_rds_binlog
OPTIONS(
triggerInterval=1000,
checkpointLocation='/tmp/checkpoint/sync_rds_binlog')
INSERT INTO stream_debug_table
SELECT cast(value as string), *
FROM rds_binlog_stream_read;
当我们调试确认没问题后,我们再改回去,正式部署上线运行。执行后,我们就可以在控制台看到如下SQL执行结果:
流式SQL结果输出界面主要分为三个功能区域:
具体每个功能键的使用说明如下:
当我们需要跳转到某个批次页面时,会弹出输入框,输入需要跳转到哪一个批次。
这里会提示合法的Batch ID区间,超出区间会提示非法。我们可以使用“ECS”键或者不输入直接回车退出。
对于每一个批次,我们可以查看它执行阶段的Metrics信息,输入“【M】Metrics”键:
当信息比较多时会分页展示,我们可以通过“<”和“>”进行翻页。同样的,我们可以输入“ECS”,“Q”或者“q”退出。
使用过程中有一些注意事项:
CREATE TABLE my_debug_table USING MEMORY;
本文主要介绍了EMR提供的一个流式SQL控制台调试小工具,可以解决基本的SQL正确性调试需求。除此外,一些高级的调试功能也在开发中。下面是录制的一段视频,直观感受下整个工具的使用:
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。
阿里巴巴开源大数据技术团队成立阿里云EMR技术圈, 每周推送前沿技术文章,直播分享经典案例、在线答疑,营造纯粹的开源大数据氛围,欢迎加入!加入钉钉群聊阿里云E-MapReduce交流2群,点击进入查看详情 https://qr.dingtalk.com/action/joingroup?code=v1,k1,cNBcqHn4TvG0iHpN3cSc1B86D1831SGMdvGu7PW+sm4=&_dt_no_comment=1&origin=11