Flink on zepplien的安装配置

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: Apache Zeppelin 是一个让交互式数据分析变得可行的基于网页的notebook。Zeppelin提供了数据可视化的框架。Flink结合zepplien使用可以让提交Flink任务变的简单化. 从Zeppelin 0.9开始将正式支持Flink 1.10。Flink是一个批流统一的计算引擎,本文将从第一个wordcount的例子为起点来介绍一下Flink on zeplien(on yarn)的配置和使用.版本说明:Flink 1.11.0

Apache Zeppelin 是一个让交互式数据分析变得可行的基于网页的notebook。Zeppelin提供了数据可视化的框架。Flink结合zepplien使用可以让提交Flink任务变的简单化. 从Zeppelin 0.9开始将正式支持Flink 1.10。Flink是一个批流统一的计算引擎,本文将从第一个wordcount的例子为起点来介绍一下Flink on zeplien(on yarn)的配置和使用.


版本说明:


Flink 1.11.0


zepplien: 0.9.0


hadoop 2.9.0


安装配置zepplien


下载安装完后要进行几个简单的配置,不能直接启动,否则后面使用会遇到报错,这里我也是遇到了几个问题,搞了差不多2个小时,才把第一个任务跑起来.


1, tar -zxvf zeppelin-0.9.0-SNAPSHOT.tar.gz
cd zeppelin-0.9.0-SNAPSHOT/conf
mv zeppelin-env.sh.template zeppelin-env.sh


这里面需要添加两个配置


export JAVA_HOME=/home/jason/bigdata/jdk/jdk1.8.0_221
  export ZEPPELIN_ADDR=storm1

JAVA_HOME需要指定JDK的路径,并且JDK的版本需要高一点的版本,不然提交任务的时候会遇到下面的报错


org.apache.zeppelin.interpreter.InterpreterException: java.io.IOException: Fail to launch interpreter process:
Apache Zeppelin requires either Java 8 update 151 or newer
  at org.apache.zeppelin.interpreter.remote.RemoteInterpreter.open(RemoteInterpreter.java:134)
  at org.apache.zeppelin.interpreter.remote.RemoteInterpreter.getFormType(RemoteInterpreter.java:298)
  at org.apache.zeppelin.notebook.Paragraph.jobRun(Paragraph.java:433)
  at org.apache.zeppelin.notebook.Paragraph.jobRun(Paragraph.java:75)
  at org.apache.zeppelin.scheduler.Job.run(Job.java:172)
  at org.apache.zeppelin.scheduler.AbstractScheduler.runJob(AbstractScheduler.java:130)
  at org.apache.zeppelin.scheduler.RemoteScheduler$JobRunner.run(RemoteScheduler.java:159)
  at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
  at java.util.concurrent.FutureTask.run(FutureTask.java:266)
  at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
  at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
  at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
  at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
  at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException: Fail to launch interpreter process:
Apache Zeppelin requires either Java 8 update 151 or newer
  at org.apache.zeppelin.interpreter.remote.RemoteInterpreterManagedProcess.start(RemoteInterpreterManagedProcess.java:130)
  at org.apache.zeppelin.interpreter.ManagedInterpreterGroup.getOrCreateInterpreterProcess(ManagedInterpreterGroup.java:65)
  at org.apache.zeppelin.interpreter.remote.RemoteInterpreter.getOrCreateInterpreterProcess(RemoteInterpreter.java:110)
  at org.apache.zeppelin.interpreter.remote.RemoteInterpreter.internal_create(RemoteInterpreter.java:163)
  at org.apache.zeppelin.interpreter.remote.RemoteInterpreter.open(RemoteInterpreter.java:131)
  ... 13 more

我的JDK版本之前是jdk1.8.0_111,改成jdk1.8.0_221后才可以运行的.


ZEPPELIN_ADDR需要配置成ip地址或者host,如果你的zepplien不是安装在本机的话,否则无法通过ip+port进行访问,或者也可以修改zeppelin-site.xml配置文件里面的zeppelin.server.addr也可以的.


<property>
  <name>zeppelin.server.addr</name>
  <value>storm1</value>
  <description>Server binding address</description>
</property>

添加hadoop的依赖包


因为要用yarn来调度Flink任务,所以还需要把 flink-shaded-hadoop-2-uber-2.8.3-10.0.jar 这个jar包添加到zeppelin的lib下面.


然后就可以启动zeppelin了,到bin目录下执行


zeppelin-daemon.sh start 如果看到控制台正常输出Zeppelin start [ OK ],那就说明安装完成, 为了确保启动成功了,可以再到log下面看下日志,有没有报错的信息.


在浏览器里面输入ip+port就可以看到zeppelin的UI界面了,如下所示:



配置 Flink Interpreter


在 Zeppelin 中可以使用 3 种不同的 Flink 集群模式。


1, Local


2, Remote


3, Yarn


这里主要介绍yarn的配置方法,local和remote的配置自己可以去试一下.



点击右上角的Interpreters进入配置页面,然后在左上角的搜索框输入Flink就可以看到下面有很多Flink相关的配置,点击edit就可以配置了.


其实on yarn模式主要配置前面3个就可以了.


FLINK_HOME  /home/jason/bigdata/flink/flink-1.11.0  
HADOOP_CONF_DIR  /home/jason/bigdata/hadoop/hadoop-2.9.0/etc/hadoop  
flink.execution.mode  yarn
flink.yarn.queue  flink


第4个 flink.yarn.queue 队列其实可以不用配置默认的是default,但是我的队列把default去掉了.所以是需要配置的.


其次还需要注意下面的几个配置,需要根据自己的集群资源合理的设置,不要超过集群可用的资源.




点击首页面的Flink Basics,zeppelin自带了几个Flink的demo,先跑一下demo看是否能跑通.




直接点击运行就可以了,我这里是自己修改了,原始的有batch,streaming的demo.随便跑一个就行.




你会发现任务启动不了,还是报上面的JDK版本过低的问题,虽然我们在上面已经设置过了环境变量,但是zeppelin启动Interpreter的时候,没有把环境变量传入,具体的解决方法如下: 需要修改zeppelin/bin目录下的common.sh文件.


直接找到66行,把java_ver_output=$("${JAVA:- 后面的改为自己的JDK的路径就可以了,重启一下zeppelin. 然后再点刚才的运行按钮, zeppelin会先在yarn上启动一个session模式的集群,然后在把任务提交到这个集群上.


点击右上角的Flink job就可以跳到Flink的WEB UI看到Job的详细信息如下图所示.




再来看一下最后打印的结果是否正确,在taskmanager的stdout可以看到输出的结果.




结果也没有问题,还需要注意任务的并行度不能大于slot的总数,如果想要修改并行度的话可以在paragraph里面设置如下所示:


%flink(parallelism=4)
val data = senv.fromElements("hello world", "hello flink", "hello hadoop")
data.flatMap(line => line.split("\\s"))
  .map(w => (w, 1))
  .keyBy(0)
  .sum(1)
  .print
senv.execute("jason_test")


在zeppelin里面点击完运行后,需要耐心的等待一会儿,整个任务提交的过程感觉还是有点慢,多等一会儿就行了,如果等了很长时间任务还没提交成功,可以到zeppelin的log下面看下打印的日志有没有报错,一定要看日志,可以帮助我们快速的定位问题.


这篇文章主要介绍了Flink on zeppelin的安装以及配置,如何在on yarn模式上提交一个streaming的任务.后面会有更多关于zeppelin的使用介绍.


相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
4天前
|
消息中间件 SQL Kafka
实时计算 Flink版产品使用合集之brokers可以配置多个,但是只要第1个brokers挂了任务就挂了如何解决
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
22 3
|
4天前
|
消息中间件 关系型数据库 Kafka
实时计算 Flink版产品使用合集之想要加快消费 Kafka 数据的速度,该怎么配置参数
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
17 2
|
4天前
|
Oracle 关系型数据库 数据库
实时计算 Flink版产品使用合集之用于实时同步整个数据库时,该如何配置DB2
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
28 5
|
6天前
|
SQL 资源调度 Java
Flink问题之动态配置如何解决
Apache Flink是由Apache软件基金会开发的开源流处理框架,其核心是用Java和Scala编写的分布式流数据流引擎。本合集提供有关Apache Flink相关技术、使用技巧和最佳实践的资源。
52 1
|
3天前
|
SQL 数据处理 API
实时计算 Flink版产品使用合集之配置的Managed Memory不生效如何解决
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
10 0
|
3天前
|
SQL Oracle 关系型数据库
实时计算 Flink版产品使用合集之可以通过配置Oracle数据库的schema注册表来监测表结构的变化吗
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
12 1
|
4天前
|
SQL 存储 Java
实时计算 Flink版产品使用合集之怎么配置日志的输出格式和文件大小
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
17 1
|
4天前
|
消息中间件 关系型数据库 Java
实时计算 Flink版产品使用合集之在生产环境中配置Flink CDC,主要需要考虑什么
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
25 1
|
4天前
|
SQL 关系型数据库 MySQL
实时计算 Flink版产品使用合集之有没有什么好的配置方案
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
9 1
|
4天前
|
SQL 关系型数据库 MySQL
实时计算 Flink版产品使用合集之是否支持对每个表的增量快照键列进行单独配置
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
10 1