Flink on zepplien的安装配置

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: Apache Zeppelin 是一个让交互式数据分析变得可行的基于网页的notebook。Zeppelin提供了数据可视化的框架。Flink结合zepplien使用可以让提交Flink任务变的简单化. 从Zeppelin 0.9开始将正式支持Flink 1.10。Flink是一个批流统一的计算引擎,本文将从第一个wordcount的例子为起点来介绍一下Flink on zeplien(on yarn)的配置和使用.版本说明:Flink 1.11.0

Apache Zeppelin 是一个让交互式数据分析变得可行的基于网页的notebook。Zeppelin提供了数据可视化的框架。Flink结合zepplien使用可以让提交Flink任务变的简单化. 从Zeppelin 0.9开始将正式支持Flink 1.10。Flink是一个批流统一的计算引擎,本文将从第一个wordcount的例子为起点来介绍一下Flink on zeplien(on yarn)的配置和使用.


版本说明:


Flink 1.11.0


zepplien: 0.9.0


hadoop 2.9.0


安装配置zepplien


下载安装完后要进行几个简单的配置,不能直接启动,否则后面使用会遇到报错,这里我也是遇到了几个问题,搞了差不多2个小时,才把第一个任务跑起来.


1, tar -zxvf zeppelin-0.9.0-SNAPSHOT.tar.gz
cd zeppelin-0.9.0-SNAPSHOT/conf
mv zeppelin-env.sh.template zeppelin-env.sh


这里面需要添加两个配置


export JAVA_HOME=/home/jason/bigdata/jdk/jdk1.8.0_221
  export ZEPPELIN_ADDR=storm1

JAVA_HOME需要指定JDK的路径,并且JDK的版本需要高一点的版本,不然提交任务的时候会遇到下面的报错


org.apache.zeppelin.interpreter.InterpreterException: java.io.IOException: Fail to launch interpreter process:
Apache Zeppelin requires either Java 8 update 151 or newer
  at org.apache.zeppelin.interpreter.remote.RemoteInterpreter.open(RemoteInterpreter.java:134)
  at org.apache.zeppelin.interpreter.remote.RemoteInterpreter.getFormType(RemoteInterpreter.java:298)
  at org.apache.zeppelin.notebook.Paragraph.jobRun(Paragraph.java:433)
  at org.apache.zeppelin.notebook.Paragraph.jobRun(Paragraph.java:75)
  at org.apache.zeppelin.scheduler.Job.run(Job.java:172)
  at org.apache.zeppelin.scheduler.AbstractScheduler.runJob(AbstractScheduler.java:130)
  at org.apache.zeppelin.scheduler.RemoteScheduler$JobRunner.run(RemoteScheduler.java:159)
  at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
  at java.util.concurrent.FutureTask.run(FutureTask.java:266)
  at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
  at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
  at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
  at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
  at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException: Fail to launch interpreter process:
Apache Zeppelin requires either Java 8 update 151 or newer
  at org.apache.zeppelin.interpreter.remote.RemoteInterpreterManagedProcess.start(RemoteInterpreterManagedProcess.java:130)
  at org.apache.zeppelin.interpreter.ManagedInterpreterGroup.getOrCreateInterpreterProcess(ManagedInterpreterGroup.java:65)
  at org.apache.zeppelin.interpreter.remote.RemoteInterpreter.getOrCreateInterpreterProcess(RemoteInterpreter.java:110)
  at org.apache.zeppelin.interpreter.remote.RemoteInterpreter.internal_create(RemoteInterpreter.java:163)
  at org.apache.zeppelin.interpreter.remote.RemoteInterpreter.open(RemoteInterpreter.java:131)
  ... 13 more

我的JDK版本之前是jdk1.8.0_111,改成jdk1.8.0_221后才可以运行的.


ZEPPELIN_ADDR需要配置成ip地址或者host,如果你的zepplien不是安装在本机的话,否则无法通过ip+port进行访问,或者也可以修改zeppelin-site.xml配置文件里面的zeppelin.server.addr也可以的.


<property>
  <name>zeppelin.server.addr</name>
  <value>storm1</value>
  <description>Server binding address</description>
</property>

添加hadoop的依赖包


因为要用yarn来调度Flink任务,所以还需要把 flink-shaded-hadoop-2-uber-2.8.3-10.0.jar 这个jar包添加到zeppelin的lib下面.


然后就可以启动zeppelin了,到bin目录下执行


zeppelin-daemon.sh start 如果看到控制台正常输出Zeppelin start [ OK ],那就说明安装完成, 为了确保启动成功了,可以再到log下面看下日志,有没有报错的信息.


在浏览器里面输入ip+port就可以看到zeppelin的UI界面了,如下所示:



配置 Flink Interpreter


在 Zeppelin 中可以使用 3 种不同的 Flink 集群模式。


1, Local


2, Remote


3, Yarn


这里主要介绍yarn的配置方法,local和remote的配置自己可以去试一下.



点击右上角的Interpreters进入配置页面,然后在左上角的搜索框输入Flink就可以看到下面有很多Flink相关的配置,点击edit就可以配置了.


其实on yarn模式主要配置前面3个就可以了.


FLINK_HOME  /home/jason/bigdata/flink/flink-1.11.0  
HADOOP_CONF_DIR  /home/jason/bigdata/hadoop/hadoop-2.9.0/etc/hadoop  
flink.execution.mode  yarn
flink.yarn.queue  flink


第4个 flink.yarn.queue 队列其实可以不用配置默认的是default,但是我的队列把default去掉了.所以是需要配置的.


其次还需要注意下面的几个配置,需要根据自己的集群资源合理的设置,不要超过集群可用的资源.




点击首页面的Flink Basics,zeppelin自带了几个Flink的demo,先跑一下demo看是否能跑通.




直接点击运行就可以了,我这里是自己修改了,原始的有batch,streaming的demo.随便跑一个就行.




你会发现任务启动不了,还是报上面的JDK版本过低的问题,虽然我们在上面已经设置过了环境变量,但是zeppelin启动Interpreter的时候,没有把环境变量传入,具体的解决方法如下: 需要修改zeppelin/bin目录下的common.sh文件.


直接找到66行,把java_ver_output=$("${JAVA:- 后面的改为自己的JDK的路径就可以了,重启一下zeppelin. 然后再点刚才的运行按钮, zeppelin会先在yarn上启动一个session模式的集群,然后在把任务提交到这个集群上.


点击右上角的Flink job就可以跳到Flink的WEB UI看到Job的详细信息如下图所示.




再来看一下最后打印的结果是否正确,在taskmanager的stdout可以看到输出的结果.




结果也没有问题,还需要注意任务的并行度不能大于slot的总数,如果想要修改并行度的话可以在paragraph里面设置如下所示:


%flink(parallelism=4)
val data = senv.fromElements("hello world", "hello flink", "hello hadoop")
data.flatMap(line => line.split("\\s"))
  .map(w => (w, 1))
  .keyBy(0)
  .sum(1)
  .print
senv.execute("jason_test")


在zeppelin里面点击完运行后,需要耐心的等待一会儿,整个任务提交的过程感觉还是有点慢,多等一会儿就行了,如果等了很长时间任务还没提交成功,可以到zeppelin的log下面看下打印的日志有没有报错,一定要看日志,可以帮助我们快速的定位问题.


这篇文章主要介绍了Flink on zeppelin的安装以及配置,如何在on yarn模式上提交一个streaming的任务.后面会有更多关于zeppelin的使用介绍.


相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
1月前
|
分布式计算 资源调度 大数据
大数据-110 Flink 安装部署 下载解压配置 Standalone模式启动 打包依赖(一)
大数据-110 Flink 安装部署 下载解压配置 Standalone模式启动 打包依赖(一)
49 0
|
1月前
|
分布式计算 资源调度 大数据
大数据-110 Flink 安装部署 下载解压配置 Standalone模式启动 打包依赖(二)
大数据-110 Flink 安装部署 下载解压配置 Standalone模式启动 打包依赖(二)
67 0
|
9天前
|
消息中间件 资源调度 关系型数据库
如何在Flink on YARN环境中配置Debezium CDC 3.0,以实现实时捕获数据库变更事件并将其传输到Flink进行处理
本文介绍了如何在Flink on YARN环境中配置Debezium CDC 3.0,以实现实时捕获数据库变更事件并将其传输到Flink进行处理。主要内容包括安装Debezium、配置Kafka Connect、创建Flink任务以及启动任务的具体步骤,为构建实时数据管道提供了详细指导。
28 9
|
1月前
|
Java Shell Maven
Flink-11 Flink Java 3分钟上手 打包Flink 提交任务至服务器执行 JobSubmit Maven打包Ja配置 maven-shade-plugin
Flink-11 Flink Java 3分钟上手 打包Flink 提交任务至服务器执行 JobSubmit Maven打包Ja配置 maven-shade-plugin
95 4
|
1月前
|
消息中间件 NoSQL Kafka
大数据-116 - Flink DataStream Sink 原理、概念、常见Sink类型 配置与使用 附带案例1:消费Kafka写到Redis
大数据-116 - Flink DataStream Sink 原理、概念、常见Sink类型 配置与使用 附带案例1:消费Kafka写到Redis
126 0
|
3月前
|
资源调度 调度 流计算
Flink 细粒度资源管理问题之为不同的SSG配置资源如何解决
Flink 细粒度资源管理问题之为不同的SSG配置资源如何解决
|
3月前
|
存储 NoSQL 分布式数据库
Flink 细粒度资源管理问题之调整 slot 配置来提高资源利用效率如何解决
Flink 细粒度资源管理问题之调整 slot 配置来提高资源利用效率如何解决
|
3月前
|
SQL 关系型数据库 MySQL
实时计算 Flink版产品使用问题之如何配置Connector来保持与MySOL一致
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
3月前
|
资源调度 关系型数据库 MySQL
【Flink on YARN + CDC 3.0】神操作!看完这篇教程,你也能成为数据流处理高手!从零开始,一步步教会你在Flink on YARN模式下如何配置Debezium CDC 3.0,让你的数据库变更数据瞬间飞起来!
【8月更文挑战第15天】随着Apache Flink的普及,企业广泛采用Flink on YARN部署流处理应用,高效利用集群资源。变更数据捕获(CDC)工具在现代数据栈中至关重要,能实时捕捉数据库变化并转发给下游系统处理。本文以Flink on YARN为例,介绍如何在Debezium CDC 3.0中配置MySQL连接器,实现数据流处理。首先确保YARN上已部署Flink集群,接着安装Debezium MySQL连接器并配置Kafka Connect。最后,创建Flink任务消费变更事件并提交任务到Flink集群。通过这些步骤,可以构建出从数据库变更到实时处理的无缝数据管道。
286 2
|
4月前
|
消息中间件 NoSQL Redis
实时计算 Flink版产品使用问题之配置了最大连续失败数不为1,在Kafka的精准一次sink中,如果ck失败了,这批数据是否会丢失
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。