Zeppelin SDK :Flink 平台建设的基石-阿里云开发者社区

开发者社区> 阿里云实时计算Flink> 正文

Zeppelin SDK :Flink 平台建设的基石

简介: 用过 Zeppelin 的人应该比较熟悉 Zeppelin 的 UI,因为 Zeppelin 的主要使用场景都是交互式,用户需要手动来操作。那除了这种手动的方式,还有其他的方式吗?如果你不想用 Zeppelin UI,但又想用 Zeppelin 提交和管理大数据作业 (比如 Flink Job)的能力该怎么办?或者是你在 Zeppelin 里写好了代码,想定时调度起来,或者集成到其他系统里,该怎么办?
+关注继续查看

作者:章剑锋(简锋),阿里巴巴高级技术专家

用过 Zeppelin 的人应该比较熟悉 Zeppelin 的 UI,因为 Zeppelin 的主要使用场景都是交互式,用户需要手动来操作。那除了这种手动的方式,还有其他的方式吗?如果你不想用 Zeppelin UI,但又想用 Zeppelin 提交和管理大数据作业 (比如 Flink Job)的能力该怎么办?或者是你在 Zeppelin 里写好了代码,想定时调度起来,或者集成到其他系统里,该怎么办?

如果你有这样的诉求,那么 Zeppelin Client API (SDK)就是你所需要的东西。

Zeppelin 简介

对于不熟悉 Zeppelin 的人,可以用一句话来解释 Zeppelin:大数据引擎的入口,交互式大数据分析平台底座。Zeppelin 最大的特点是连接多种引擎,具有可插拔式,下面这张图例举了一些常用的引擎,当然 Zeppelin 还支持其他很多引擎,这里就不一一例举。

image3.png

虽然 Zeppelin 有 Rest API,但是 Zeppelin 的 Rest API 太多,对于很多不熟悉 Zeppelin 的人来说使用 Rest API 门槛太高,所以 Zeppelin 专门开发了一个 Client API (SDK),方便大家做集成。Zeppelin Client API (SDK)分为 2 个层面的的东西(接下来会逐个详细介绍):

  • Zeppelin Client API (Low Level API)
  • Session API (High Level API)

Zeppelin Client API (Low Level API)

Zeppelin Client API 可以在 Note 和 Paragraph 的粒度进行操作。你可以先在 notebook 里写好代码 (比如开发阶段在 notebook 里写代码,做测试),然后用 Low Level API 用编程的方式把 Job 跑起来(比如生产阶段把作业定时调度起来)。Zeppelin Client API 最重要的 class 是 ZeppelinClient,也是 Zeppelin Client API 的入口。下面例举几个重要的接口(这些 API 都比较直观,我就不多做解释了)。

public String createNote(String notePath) throws Exception 

public void deleteNote(String noteId) throws Exception 

public NoteResult executeNote(String noteId) throws Exception 

public NoteResult executeNote(String noteId, 
                              Map<String, String> parameters) throws Exception
                              
public NoteResult queryNoteResult(String noteId) throws Exception 

public NoteResult submitNote(String noteId) throws Exception

public NoteResult submitNote(String noteId, 
                             Map<String, String> parameters) throws Exception 
                             
public NoteResult waitUntilNoteFinished(String noteId) throws Exception

public String addParagraph(String noteId, 
                           String title, 
                           String text) throws Exception
                           
public void updateParagraph(String noteId, 
                            String paragraphId, 
                            String title, 
                            String text) throws Exception
                            
public ParagraphResult executeParagraph(String noteId,
                                        String paragraphId,
                                        String sessionId,
                                        Map<String, String> parameters) throws Exception
                                        
public ParagraphResult submitParagraph(String noteId,
                                       String paragraphId,
                                       String sessionId,
                                       Map<String, String> parameters) throws Exception
                                       
public void cancelParagraph(String noteId, String paragraphId)
    
public ParagraphResult queryParagraphResult(String noteId, String paragraphId) 
    
public ParagraphResult waitUtilParagraphFinish(String noteId, String paragraphId)

那这些 API 能用来做什么呢?

一个典型的用途是我们在 Zeppelin 里写好代码,做好测试,然后在第三方系统里集成进来。比如下面的代码就是把 Zeppelin 自带的 Spark Basic Features 用编程的方式跑起来,你不仅可以跑 Zeppelin Note,还可以拿到运行结果 (ParagraphResult)。怎么处理运行结果,就留给你发挥想象的空间吧(可以在你的系统里展示出来,或者可视化出来,或者传给其他系统做消费等等)。

此外,对于 Dynamic forms(动态控件,比如文本框,下拉框等等),你还可以动态的提供参数,如下面例子里的 maxAge 和 marital。

ClientConfig clientConfig = new ClientConfig("http://localhost:8080");
ZeppelinClient zClient = new ZeppelinClient(clientConfig);

String zeppelinVersion = zClient.getVersion();
System.out.println("Zeppelin version: " + zeppelinVersion);

ParagraphResult paragraphResult = zClient.executeParagraph("2A94M5J1Z", "20150210-015259_1403135953");
System.out.println("Execute the 1st spark tutorial paragraph, paragraph result: " + paragraphResult);

paragraphResult = zClient.executeParagraph("2A94M5J1Z", "20150210-015302_1492795503");
System.out.println("Execute the 2nd spark tutorial paragraph, paragraph result: " + paragraphResult);

Map<String, String> parameters = new HashMap<>();
parameters.put("maxAge", "40");
paragraphResult = zClient.executeParagraph("2A94M5J1Z", "20150212-145404_867439529", parameters);
System.out.println("Execute the 3rd spark tutorial paragraph, paragraph result: " + paragraphResult);

parameters = new HashMap<>();
parameters.put("marital", "married");
paragraphResult = zClient.executeParagraph("2A94M5J1Z", "20150213-230422_1600658137", parameters);
System.out.println("Execute the 4th spark tutorial paragraph, paragraph result: " + paragraphResult);

这下面这张图就是上面我们要 Zeppelin Client API 跑的 Zeppelin 自带的 Spark Basic Features。

image2.png

Session API (High Level API)

Session API 是 Zeppelin 的high level api,Session API 里没有 Note,Paragraph 的概念,粒度是你提交的代码。Session API里最重要的class就是 ZSession,这也是Session API的入口,一个 ZSession 代表一个独立的Zeppelin Interpreter 进程,对于 Flink 来说就是一个独立的 Flink Session Cluster。下面例举一些典型的接口(这些 API 都比较直观,我就不多做解释了)。

public void start() throws Exception

public void start(MessageHandler messageHandler) throws Exception

public void stop() throws Exception

public ExecuteResult execute(String code) throws Exception

public ExecuteResult execute(String subInterpreter,
                             Map<< span="">String, String> localProperties,
                             String code,
                             StatementMessageHandler messageHandler) throws Exception

public ExecuteResult submit(String code) throws Exception

public ExecuteResult submit(String subInterpreter,
                            Map<< span="">String, String> localProperties,
                            String code,
                            StatementMessageHandler messageHandler) throws Exception
                           
public void cancel(String statementId) throws Exception
 
public ExecuteResult queryStatement(String statementId) throws Exception

public ExecuteResult waitUntilFinished(String statementId) throws Exception

那这个 API 能用来做什么呢? 一个典型的用途是就是我们动态创建 Session (Zeppelin Interpreter 进程),动态的提交运行代码,并拿到运行结果。比如你不想用 Zeppelin 的 UI,要自己做一个 Flink 的开发管理平台,那么你就可以自己做 UI,让用户在 UI 上配置 Flink Job,输入 SQL,然后把所有的这些信息发送到后端,后端调用 ZSession 来运行 Flink Job。

下面的 Java 代码就是用编程的方式调用了 2 条 Flink SQL 语句,并且在 MyStatementMessageHandler1 和 MyStatementMessageHandler2 中读取源源不断发送过来更新的 SQL 运行结果 (怎么来使用这个结果就靠你的想象力了)。

需要说明的是像 Flink Interpreter 这种流式结果数据更新是通过 WebSocket 实现的,所以下面的代码里有会有 CompositeMessageHandler,MyStatementMessageHandler1 以及 MyStatementMessageHandler2,这些 MessageHandler 就是用来处理通过 WebSocket 发送过来的流式数据结果。下面是 2 条我们在 Zeppelin 里运行的 Flink SQL。

image5.png
image8.png

接下来我们会用 Zeppelin Session API 来跑着这 2 条 Flink SQL,然后我们会在MyStatementMessageHandler1,MyStatementMessageHandler2 里拿到结果展示出来。

ZSession session = null;
try {
    ClientConfig clientConfig = new ClientConfig("http://localhost:8080");
    Map<< span="">String, String> intpProperties = new HashMap<>();

    session = ZSession.builder()
        .setClientConfig(clientConfig)
        .setInterpreter("flink")
        .setIntpProperties(intpProperties)
        .build();

    // CompositeMessageHandler allow you to add StatementMessageHandler for each statement.
    // otherwise you have to use a global MessageHandler.
    session.start(new CompositeMessageHandler());
    System.out.println("Flink Web UI: " + session.getWeburl());

    System.out.println("-----------------------------------------------------------------------------");
    String initCode = IOUtils.toString(FlinkAdvancedExample.class.getResource("/init_stream.scala"));
    ExecuteResult result = session.execute(initCode);
    System.out.println("Job status: " + result.getStatus() + ", data: " + result.getResults().get(0).getData());

    // run flink ssql
    Map<< span="">String, String> localProperties = new HashMap<>();
    localProperties.put("type", "update");
    result = session.submit("ssql", localProperties, "select url, count(1) as pv from log group by url",
                            new MyStatementMessageHandler1());
    session.waitUntilFinished(result.getStatementId());

    result = session.submit("ssql", localProperties, "select upper(url), count(1) as pv from log group by url",
                            new MyStatementMessageHandler2());
    session.waitUntilFinished(result.getStatementId());

} catch (Exception e) {
    e.printStackTrace();
} finally {
    if (session != null) {
        try {
            session.stop();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

public static class MyStatementMessageHandler1 implements StatementMessageHandler {

    @Override
    public void onStatementAppendOutput(String statementId, int index, String output) {
        System.out.println("MyStatementMessageHandler1, append output: " + output);
    }

    @Override
    public void onStatementUpdateOutput(String statementId, int index, String type, String output) {
        System.out.println("MyStatementMessageHandler1, update output: " + output);
    }
}

public static class MyStatementMessageHandler2 implements StatementMessageHandler {

    @Override
    public void onStatementAppendOutput(String statementId, int index, String output) {
        System.out.println("MyStatementMessageHandler2, append output: " + output);
    }

    @Override
    public void onStatementUpdateOutput(String statementId, int index, String type, String output) {
        System.out.println("MyStatementMessageHandler2, update output: " + output);
    }
}

除了编程方式跑 Flink Job,这个 Session API 还能给我们带来什么呢?

在 Zeppelin 里如果你可以通过 %flink.conf 来对你的 Flink Cluster 进行非常丰富的配置,但是 %flink.conf 是纯文本的配置,不熟悉 Flink 的人很容易配错(如下图)。如果你是自己做 Flink 开发平台的话就可以做一个更完整的 UI,用一些下拉框等等把一些配置选项固定下来,用户只要选择就行了,不需要自己输入文本来配置。

image7.png

还有下面这类 paragraph 的 local properties 配置,比如 type,template, resumeFromLatestCheckpoint 也是比较容易写错的,同理你可以在自己 UI 里用一些控件把这些选项提前固定下来,而不是让用户输入文本的方式。

image6.png

我相信 Zeppelin Client API 还有很多可以发挥和想象的空间,大家脑洞起来吧。

▼ 视频演示 ▼

视频演示链接
https://v.qq.com/x/page/m3146grr5e1.html

更多 Flink 技术干货及使用交流可加入 Flink 社区钉钉大群。

最新钉群二维码.jpeg

版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。

相关文章
趣头条基于 Flink 的实时平台建设实践
本文由趣头条实时平台负责人席建刚分享趣头条实时平台的建设,整理者叶里君。文章将从平台的架构、Flink 现状,Flink 应用以及未来计划四部分分享。
3036 0
Zeppelin SDK :Flink 平台建设的基石
用过 Zeppelin 的人应该比较熟悉 Zeppelin 的 UI,因为 Zeppelin 的主要使用场景都是交互式,用户需要手动来操作。那除了这种手动的方式,还有其他的方式吗?如果你不想用 Zeppelin UI,但又想用 Zeppelin 提交和管理大数据作业 (比如 Flink Job)的能力该怎么办?或者是你在 Zeppelin 里写好了代码,想定时调度起来,或者集成到其他系统里,该怎么办?
1589 0
阿里云服务器端口号设置
阿里云服务器初级使用者可能面临的问题之一. 使用tomcat或者其他服务器软件设置端口号后,比如 一些不是默认的, mysql的 3306, mssql的1433,有时候打不开网页, 原因是没有在ecs安全组去设置这个端口号. 解决: 点击ecs下网络和安全下的安全组 在弹出的安全组中,如果没有就新建安全组,然后点击配置规则 最后如上图点击添加...或快速创建.   have fun!  将编程看作是一门艺术,而不单单是个技术。
4518 0
使用OpenApi弹性释放和设置云服务器ECS释放
云服务器ECS的一个重要特性就是按需创建资源。您可以在业务高峰期按需弹性的自定义规则进行资源创建,在完成业务计算的时候释放资源。本篇将提供几个Tips帮助您更加容易和自动化的完成云服务器的释放和弹性设置。
7830 0
阿里云服务器安全组设置内网互通的方法
虽然0.0.0.0/0使用非常方便,但是发现很多同学使用它来做内网互通,这是有安全风险的,实例有可能会在经典网络被内网IP访问到。下面介绍一下四种安全的内网互联设置方法。 购买前请先:领取阿里云幸运券,有很多优惠,可到下文中领取。
9445 0
阿里云ECS云服务器初始化设置教程方法
阿里云ECS云服务器初始化是指将云服务器系统恢复到最初状态的过程,阿里云的服务器初始化是通过更换系统盘来实现的,是免费的,阿里云百科网分享服务器初始化教程: 服务器初始化教程方法 本文的服务器初始化是指将ECS云服务器系统恢复到最初状态,服务器中的数据也会被清空,所以初始化之前一定要先备份好。
10778 0
日均百亿级日志处理:微博基于 Flink 的实时计算平台建设
传统基于 Hadoop 生态的离线数据存储计算方案已在业界形成统一的默契,但受制于离线计算的时效性制约,越来越多的数据应用场景已从离线转为实时。微博广告实时数据平台以此为背景进行设计与构建,目前该系统已支持日均处理日志数量超过百亿,接入产品线、业务日志类型若干。
6637 0
574
文章
6
问答
来源圈子
更多
实时计算 Flink 版(Alibaba Cloud Realtime Compute for Apache Flink,Powered by Ververica)是阿里云基于 Apache Flink 构建的企业级、高性能实时大数据处理系统,由 Apache Flink 创始团队官方出品,拥有全球统一商业化品牌,完全兼容开源 Flink API,提供丰富的企业级增值功能。
+ 订阅
文章排行榜
最热
最新
相关电子书
更多
文娱运维技术
立即下载
《SaaS模式云原生数据仓库应用场景实践》
立即下载
《看见新力量:二》电子书
立即下载