Zeppelin SDK :Flink 平台建设的基石

本文涉及的产品
实时计算 Flink 版,1000CU*H 3个月
简介: 用过 Zeppelin 的人应该比较熟悉 Zeppelin 的 UI,因为 Zeppelin 的主要使用场景都是交互式,用户需要手动来操作。那除了这种手动的方式,还有其他的方式吗?如果你不想用 Zeppelin UI,但又想用 Zeppelin 提交和管理大数据作业 (比如 Flink Job)的能力该怎么办?或者是你在 Zeppelin 里写好了代码,想定时调度起来,或者集成到其他系统里,该怎么办?

作者:章剑锋(简锋),阿里巴巴高级技术专家

用过 Zeppelin 的人应该比较熟悉 Zeppelin 的 UI,因为 Zeppelin 的主要使用场景都是交互式,用户需要手动来操作。那除了这种手动的方式,还有其他的方式吗?如果你不想用 Zeppelin UI,但又想用 Zeppelin 提交和管理大数据作业 (比如 Flink Job)的能力该怎么办?或者是你在 Zeppelin 里写好了代码,想定时调度起来,或者集成到其他系统里,该怎么办?

如果你有这样的诉求,那么 Zeppelin Client API (SDK)就是你所需要的东西。

Zeppelin 简介

对于不熟悉 Zeppelin 的人,可以用一句话来解释 Zeppelin:大数据引擎的入口,交互式大数据分析平台底座。Zeppelin 最大的特点是连接多种引擎,具有可插拔式,下面这张图例举了一些常用的引擎,当然 Zeppelin 还支持其他很多引擎,这里就不一一例举。

image3.png

虽然 Zeppelin 有 Rest API,但是 Zeppelin 的 Rest API 太多,对于很多不熟悉 Zeppelin 的人来说使用 Rest API 门槛太高,所以 Zeppelin 专门开发了一个 Client API (SDK),方便大家做集成。Zeppelin Client API (SDK)分为 2 个层面的的东西(接下来会逐个详细介绍):

  • Zeppelin Client API (Low Level API)
  • Session API (High Level API)

Zeppelin Client API (Low Level API)

Zeppelin Client API 可以在 Note 和 Paragraph 的粒度进行操作。你可以先在 notebook 里写好代码 (比如开发阶段在 notebook 里写代码,做测试),然后用 Low Level API 用编程的方式把 Job 跑起来(比如生产阶段把作业定时调度起来)。Zeppelin Client API 最重要的 class 是 ZeppelinClient,也是 Zeppelin Client API 的入口。下面例举几个重要的接口(这些 API 都比较直观,我就不多做解释了)。

public String createNote(String notePath) throws Exception 

public void deleteNote(String noteId) throws Exception 

public NoteResult executeNote(String noteId) throws Exception 

public NoteResult executeNote(String noteId, 
                              Map<String, String> parameters) throws Exception
                              
public NoteResult queryNoteResult(String noteId) throws Exception 

public NoteResult submitNote(String noteId) throws Exception

public NoteResult submitNote(String noteId, 
                             Map<String, String> parameters) throws Exception 
                             
public NoteResult waitUntilNoteFinished(String noteId) throws Exception

public String addParagraph(String noteId, 
                           String title, 
                           String text) throws Exception
                           
public void updateParagraph(String noteId, 
                            String paragraphId, 
                            String title, 
                            String text) throws Exception
                            
public ParagraphResult executeParagraph(String noteId,
                                        String paragraphId,
                                        String sessionId,
                                        Map<String, String> parameters) throws Exception
                                        
public ParagraphResult submitParagraph(String noteId,
                                       String paragraphId,
                                       String sessionId,
                                       Map<String, String> parameters) throws Exception
                                       
public void cancelParagraph(String noteId, String paragraphId)
    
public ParagraphResult queryParagraphResult(String noteId, String paragraphId) 
    
public ParagraphResult waitUtilParagraphFinish(String noteId, String paragraphId)

那这些 API 能用来做什么呢?

一个典型的用途是我们在 Zeppelin 里写好代码,做好测试,然后在第三方系统里集成进来。比如下面的代码就是把 Zeppelin 自带的 Spark Basic Features 用编程的方式跑起来,你不仅可以跑 Zeppelin Note,还可以拿到运行结果 (ParagraphResult)。怎么处理运行结果,就留给你发挥想象的空间吧(可以在你的系统里展示出来,或者可视化出来,或者传给其他系统做消费等等)。

此外,对于 Dynamic forms(动态控件,比如文本框,下拉框等等),你还可以动态的提供参数,如下面例子里的 maxAge 和 marital。

ClientConfig clientConfig = new ClientConfig("http://localhost:8080");
ZeppelinClient zClient = new ZeppelinClient(clientConfig);

String zeppelinVersion = zClient.getVersion();
System.out.println("Zeppelin version: " + zeppelinVersion);

ParagraphResult paragraphResult = zClient.executeParagraph("2A94M5J1Z", "20150210-015259_1403135953");
System.out.println("Execute the 1st spark tutorial paragraph, paragraph result: " + paragraphResult);

paragraphResult = zClient.executeParagraph("2A94M5J1Z", "20150210-015302_1492795503");
System.out.println("Execute the 2nd spark tutorial paragraph, paragraph result: " + paragraphResult);

Map<String, String> parameters = new HashMap<>();
parameters.put("maxAge", "40");
paragraphResult = zClient.executeParagraph("2A94M5J1Z", "20150212-145404_867439529", parameters);
System.out.println("Execute the 3rd spark tutorial paragraph, paragraph result: " + paragraphResult);

parameters = new HashMap<>();
parameters.put("marital", "married");
paragraphResult = zClient.executeParagraph("2A94M5J1Z", "20150213-230422_1600658137", parameters);
System.out.println("Execute the 4th spark tutorial paragraph, paragraph result: " + paragraphResult);

这下面这张图就是上面我们要 Zeppelin Client API 跑的 Zeppelin 自带的 Spark Basic Features。

image2.png

Session API (High Level API)

Session API 是 Zeppelin 的high level api,Session API 里没有 Note,Paragraph 的概念,粒度是你提交的代码。Session API里最重要的class就是 ZSession,这也是Session API的入口,一个 ZSession 代表一个独立的Zeppelin Interpreter 进程,对于 Flink 来说就是一个独立的 Flink Session Cluster。下面例举一些典型的接口(这些 API 都比较直观,我就不多做解释了)。

public void start() throws Exception

public void start(MessageHandler messageHandler) throws Exception

public void stop() throws Exception

public ExecuteResult execute(String code) throws Exception

public ExecuteResult execute(String subInterpreter,
                             Map<< span="">String, String> localProperties,
                             String code,
                             StatementMessageHandler messageHandler) throws Exception

public ExecuteResult submit(String code) throws Exception

public ExecuteResult submit(String subInterpreter,
                            Map<< span="">String, String> localProperties,
                            String code,
                            StatementMessageHandler messageHandler) throws Exception
                           
public void cancel(String statementId) throws Exception
 
public ExecuteResult queryStatement(String statementId) throws Exception

public ExecuteResult waitUntilFinished(String statementId) throws Exception

那这个 API 能用来做什么呢? 一个典型的用途是就是我们动态创建 Session (Zeppelin Interpreter 进程),动态的提交运行代码,并拿到运行结果。比如你不想用 Zeppelin 的 UI,要自己做一个 Flink 的开发管理平台,那么你就可以自己做 UI,让用户在 UI 上配置 Flink Job,输入 SQL,然后把所有的这些信息发送到后端,后端调用 ZSession 来运行 Flink Job。

下面的 Java 代码就是用编程的方式调用了 2 条 Flink SQL 语句,并且在 MyStatementMessageHandler1 和 MyStatementMessageHandler2 中读取源源不断发送过来更新的 SQL 运行结果 (怎么来使用这个结果就靠你的想象力了)。

需要说明的是像 Flink Interpreter 这种流式结果数据更新是通过 WebSocket 实现的,所以下面的代码里有会有 CompositeMessageHandler,MyStatementMessageHandler1 以及 MyStatementMessageHandler2,这些 MessageHandler 就是用来处理通过 WebSocket 发送过来的流式数据结果。下面是 2 条我们在 Zeppelin 里运行的 Flink SQL。

image5.png
image8.png

接下来我们会用 Zeppelin Session API 来跑着这 2 条 Flink SQL,然后我们会在MyStatementMessageHandler1,MyStatementMessageHandler2 里拿到结果展示出来。

ZSession session = null;
try {
    ClientConfig clientConfig = new ClientConfig("http://localhost:8080");
    Map<< span="">String, String> intpProperties = new HashMap<>();

    session = ZSession.builder()
        .setClientConfig(clientConfig)
        .setInterpreter("flink")
        .setIntpProperties(intpProperties)
        .build();

    // CompositeMessageHandler allow you to add StatementMessageHandler for each statement.
    // otherwise you have to use a global MessageHandler.
    session.start(new CompositeMessageHandler());
    System.out.println("Flink Web UI: " + session.getWeburl());

    System.out.println("-----------------------------------------------------------------------------");
    String initCode = IOUtils.toString(FlinkAdvancedExample.class.getResource("/init_stream.scala"));
    ExecuteResult result = session.execute(initCode);
    System.out.println("Job status: " + result.getStatus() + ", data: " + result.getResults().get(0).getData());

    // run flink ssql
    Map<< span="">String, String> localProperties = new HashMap<>();
    localProperties.put("type", "update");
    result = session.submit("ssql", localProperties, "select url, count(1) as pv from log group by url",
                            new MyStatementMessageHandler1());
    session.waitUntilFinished(result.getStatementId());

    result = session.submit("ssql", localProperties, "select upper(url), count(1) as pv from log group by url",
                            new MyStatementMessageHandler2());
    session.waitUntilFinished(result.getStatementId());

} catch (Exception e) {
    e.printStackTrace();
} finally {
    if (session != null) {
        try {
            session.stop();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

public static class MyStatementMessageHandler1 implements StatementMessageHandler {

    @Override
    public void onStatementAppendOutput(String statementId, int index, String output) {
        System.out.println("MyStatementMessageHandler1, append output: " + output);
    }

    @Override
    public void onStatementUpdateOutput(String statementId, int index, String type, String output) {
        System.out.println("MyStatementMessageHandler1, update output: " + output);
    }
}

public static class MyStatementMessageHandler2 implements StatementMessageHandler {

    @Override
    public void onStatementAppendOutput(String statementId, int index, String output) {
        System.out.println("MyStatementMessageHandler2, append output: " + output);
    }

    @Override
    public void onStatementUpdateOutput(String statementId, int index, String type, String output) {
        System.out.println("MyStatementMessageHandler2, update output: " + output);
    }
}

除了编程方式跑 Flink Job,这个 Session API 还能给我们带来什么呢?

在 Zeppelin 里如果你可以通过 %flink.conf 来对你的 Flink Cluster 进行非常丰富的配置,但是 %flink.conf 是纯文本的配置,不熟悉 Flink 的人很容易配错(如下图)。如果你是自己做 Flink 开发平台的话就可以做一个更完整的 UI,用一些下拉框等等把一些配置选项固定下来,用户只要选择就行了,不需要自己输入文本来配置。

image7.png

还有下面这类 paragraph 的 local properties 配置,比如 type,template, resumeFromLatestCheckpoint 也是比较容易写错的,同理你可以在自己 UI 里用一些控件把这些选项提前固定下来,而不是让用户输入文本的方式。

image6.png

我相信 Zeppelin Client API 还有很多可以发挥和想象的空间,大家脑洞起来吧。

▼ 视频演示 ▼

视频演示链接
https://v.qq.com/x/page/m3146grr5e1.html

更多 Flink 技术干货及使用交流可加入 Flink 社区钉钉大群。

最新钉群二维码.jpeg

相关实践学习
基于Hologres+Flink搭建GitHub实时数据大屏
通过使用Flink、Hologres构建实时数仓,并通过Hologres对接BI分析工具(以DataV为例),实现海量数据实时分析.
实时计算 Flink 实战课程
如何使用实时计算 Flink 搞定数据处理难题?实时计算 Flink 极客训练营产品、技术专家齐上阵,从开源 Flink功能介绍到实时计算 Flink 优势详解,现场实操,5天即可上手! 欢迎开通实时计算 Flink 版: https://cn.aliyun.com/product/bigdata/sc Flink Forward Asia 介绍: Flink Forward 是由 Apache 官方授权,Apache Flink Community China 支持的会议,通过参会不仅可以了解到 Flink 社区的最新动态和发展计划,还可以了解到国内外一线大厂围绕 Flink 生态的生产实践经验,是 Flink 开发者和使用者不可错过的盛会。 去年经过品牌升级后的 Flink Forward Asia 吸引了超过2000人线下参与,一举成为国内最大的 Apache 顶级项目会议。结合2020年的特殊情况,Flink Forward Asia 2020 将在12月26日以线上峰会的形式与大家见面。
相关文章
|
4月前
|
存储 消息中间件 人工智能
Lazada 如何用实时计算 Flink + Hologres 构建实时商品选品平台
本文整理自 Lazada Group EVP 及供应链技术负责人陈立群在 Flink Forward Asia 2025 新加坡实时分析专场的分享。作为东南亚领先的电商平台,Lazada 面临在六国管理数十亿商品 SKU 的挑战。为实现毫秒级数据驱动决策,Lazada 基于阿里云实时计算 Flink 和 Hologres 打造端到端实时商品选品平台,支撑日常运营与大促期间分钟级响应。本文深入解析该平台如何通过流式处理与实时分析技术重构电商数据架构,实现从“事后分析”到“事中调控”的跃迁。
438 55
Lazada 如何用实时计算 Flink + Hologres 构建实时商品选品平台
|
7月前
|
消息中间件 运维 Kafka
直播预告|Kafka+Flink双引擎实战:手把手带你搭建分布式实时分析平台!
在数字化转型中,企业亟需从海量数据中快速提取价值并转化为业务增长动力。5月15日19:00-21:00,阿里云三位技术专家将讲解Kafka与Flink的强强联合方案,帮助企业零门槛构建分布式实时分析平台。此组合广泛应用于实时风控、用户行为追踪等场景,具备高吞吐、弹性扩缩容及亚秒级响应优势。直播适合初学者、开发者和数据工程师,参与还有机会领取定制好礼!扫描海报二维码或点击链接预约直播:[https://developer.aliyun.com/live/255088](https://developer.aliyun.com/live/255088)
529 35
直播预告|Kafka+Flink双引擎实战:手把手带你搭建分布式实时分析平台!
|
2月前
|
存储 消息中间件 人工智能
云栖实录|实时计算 Flink 全新升级 - 全栈流处理平台助力实时智能
本文根据 2025 云栖大会演讲整理而成,演讲信息如下 演讲人:黄鹏程 阿里云智能集团计算平台事业部实时计算Flink版产品负责人
175 1
云栖实录|实时计算 Flink 全新升级 - 全栈流处理平台助力实时智能
|
7月前
|
消息中间件 运维 Kafka
直播预告|Kafka+Flink 双引擎实战:手把手带你搭建分布式实时分析平台!
直播预告|Kafka+Flink 双引擎实战:手把手带你搭建分布式实时分析平台!
226 12
|
8月前
|
SQL 监控 Go
新一代 Cron-Job分布式调度平台,v1.0.8版本发布,支持Go执行器SDK!
现代化的Cron-Job分布式任务调度平台,支持Go语言执行器SDK,多项核心优势优于其他调度平台。
160 8
|
10月前
|
SQL 消息中间件 Kafka
Flink+Paimon+Hologres,面向未来的一体化实时湖仓平台架构设计
本文介绍了阿里云实时数仓Hologres负责人姜伟华在Flink Forward Asia 2024上的分享,涵盖实时数仓的发展历程、从实时数仓到实时湖仓的演进,以及总结。文章通过三代实时数仓架构的演变,详细解析了Lambda架构、Kafka实时数仓分层+OLAP、Hologres实时数仓分层复用等方案,并探讨了未来从实时数仓到实时湖仓的演进方向。最后,结合实际案例和Demo展示了Hologres + Flink + Paimon在实时湖仓中的应用,帮助用户根据业务需求选择合适的方案。
1397 20
Flink+Paimon+Hologres,面向未来的一体化实时湖仓平台架构设计
|
10月前
|
SQL 存储 HIVE
鹰角基于 Flink + Paimon + Trino 构建湖仓一体化平台实践项目
本文整理自鹰角网络大数据开发工程师朱正军在Flink Forward Asia 2024上的分享,主要涵盖四个方面:鹰角数据平台架构、数据湖选型、湖仓一体建设及未来展望。文章详细介绍了鹰角如何构建基于Paimon的数据湖,解决了Hudi入湖的痛点,并通过Trino引擎和Ranger权限管理实现高效的数据查询与管控。此外,还探讨了湖仓一体平台的落地效果及未来技术发展方向,包括Trino与Paimon的集成增强、StarRocks的应用以及Paimon全面替换Hive的计划。
1103 1
鹰角基于 Flink + Paimon + Trino 构建湖仓一体化平台实践项目
|
9月前
|
SQL 消息中间件 Serverless
​Flink+Paimon+Hologres,面向未来的一体化实时湖仓平台架构设计
​Flink+Paimon+Hologres,面向未来的一体化实时湖仓平台架构设计
275 4
|
9月前
|
SQL 存储 HIVE
鹰角基于 Flink + Paimon + Trino 构建湖仓一体化平台实践项目
鹰角基于 Flink + Paimon + Trino 构建湖仓一体化平台实践项目
593 2

相关产品

  • 实时计算 Flink版