开发者学堂课程【Apache Flink 入门到实战 - Flink 开源社区出品 :客户端操作(一)】学习笔记,与课程紧密联系,让用户快速学习知识。
课程地址:https://developer.aliyun.com/learning/course/632/detail/10035
客户端操作
目录
一、 环境说明
二、 课程概要
(1) Scala Shell
(2)SQL Client
(3) CommendLine
(4) Restful
(5) Web
三、flink 命令行
四、基本用法
五、结果展示
一、环境说明
本期课程基于社区 fink-172 版本,运行环境的准备和集群(Slandalone,yarn)的部参考前面的课程,本文所有操作的演示都是基于 Mac/ linux 系统和 Chrome 浏览器。
二、课程概要
Flink 提供了丰富的客户端操作来提交任务和与任务进行交互。本期课程将分为5个部分进行讲解,包括 Fink 行, Scala Shell ,SQL Client, Restful API 和 Web。
在 fink 安装目录的 bin 目录下面可以看到有link,start-scala-shell-- sh和s -clientsh 等文件,这些都是客户端操作的入口。
三、 Flink 命令行
flink 命行参数多,输入 flink-h 能看到完整的说明
+fiink-1.7. bin/fink-h
如果想看某一个命的参数,比如 run 命令,输入:
fink-1.7.2 bin/ run-h
(1)standalone
启动 standalone 集群
flink-1.7 bin/start-cluster .sh
Starting cluster
Starting standalonesession daemon on host zkb-MBP.local
Starting taskexecutor daemon on host zkb-MBP.local.
打开 tt127.0.0.1:8081 能看到 Web 界面。
Run
运行任务
4flink-1..2 bin/fink run-d examples/streaming/TopSpeedWindowing jar
Starting execution of program
Executing TopSpeedWindowing example with default input data set
Use -input to specily file input.
Printing result to stdout. Use-output to specify output path.
Job has been submitted with JoblDe20cb6b357591172eeade
默认是1个开发。
使用官方自带的例子。任务启动后,可以在页面上看到已经启动完成。在任务里面有两个 winters。
点左侧『 Task Manager,然后点 Stdout 能看到输出日志。
在 log 下面点out文件,在这里也可以看到程序的输出。
list 命令,当前一个任务正在运行。
查看任务列表
flink-1.7.2 bin/fink list-m 127.0.0.1:8081
Waiting for response...
------Running/Restarting Jobs ---------
24.03201910:14:06:5e20cb6b0f575911712eeao9de:
------------------------------
CarTopSpeedWindowingExample (RUNNING)
No scheduled jobs.
(2)stop
停止任务
在 stop 任务的时候发现有一个错误,这个任务不能被停止。
一个 job 能够 stop 要求所有的 sourco 都是可以 stoppatle 的,即实现了StoppableFunchon 接口。
需要能 stoppable 的函数必须实现这个接口例如流式任务的 source
stop 方法在任务收到 STOP 信号的时候调用。
source 在接收到这个信号后,必须停止发送新的数据且优雅的停止。
(3)PublicEvolving
public interfacs StoppableFunction
停止 source,与 cance)不同的是:这是一个 source 优停止的请求。
等待中的数据可以继续发送出去,不需要立即停止。
void stop
)
cancel
取消任务。如果在 con/ink-cont.yaml 里面配 state savepointsdir,会保存 savepoi nt,否则不会保存 savepoint。
flink-1.7.2 bin/fink cancel -m 127.0.0.1:8081 5e20cb6b0r357591171dfcca2eea09de
Cancelling job 5e20cb6b0f357591171dfcca2eea09de.
Cancelled job 5e20cb6b0f357591171dfcca2eea09de
也可以在停止的时候显示指定 savepoint 目录。
可以看到这个任务已经停掉了。把任务再启动,可以看到现在任务在启动,准备一个等级目录,目录已经存在,可以把数据清空,调一下 cancel,可以指定savepoint,已经有数据在里面。
取消和停止(流作业)的区别如下:
cancel 调用立即调用作业算子的 cancel 方法,以快取消它们,如果算子在到 cancel 调用后没有停止 flink 将开始定期中断算子线程的执行,到所有算子停止为止。
stop 调用,是更优雅的停止正在运行流作业的方式。 stop仅适用于 source实现了StoppableFunction 接口的作业。当用户请求停止作业时,作业的所有 sourca都将接收 stop 方法调用。直到所有 source 正常关闭时,作业才会正常结束。这种方式使作业正常处理完所有作业。
(4) savepoint
触发 savepoint
flink-1.7.2 bin/ savepoint-m 127.0.0.1:8081 ec53edcfaeb96b2a5dadbfbe5f52btb
说明: savepoint 和 checkpoint 的区别(详见文档):
checkpoint是 增量做的每次的时间较短,数据量较小,只要在程序里面后用后会自动触发,用户无须感知; savepoint 是全量做的,每次的时间较长,数据量较大需要用户主动去触发。
checkpoint 是作业 failover 的时候自动使用,不需要用户指定 savepoint 一般用干程序的版本更新(详见文档),bug 修复,A/B Test 等场景,需要用户指定。
从指定的 savepoint 启动
先把任务停掉,指定 savepoint,任务启动,看一下日志文件。这个任务在启动的时候可以看到它在恢复。
查看 JobManager 的日志,能够看到类似这样的 log:
2019-03-2810:30:53,957INFo
org apache.fink. runtime. checkpoint CheckpointC-Starting j
(5)modify
修改任务并行度。
为了方便演示,我们修改 conflink-c-conf.yal 将 task slot 数调为
4.并配置 savepoint 目录。
(modify 参数后面接 -s 指定 savepoint 路径可能有 bug.提示无法识别)
taskmanager. numberOfTaskSlots: 4
State. savepoint. dir
:
file
://
tmp /savepoint
修改参数后需要重后集群生效,然后再后动任务
从页面上能看到 Task Slots 变为了4,这时候任务的默认并发度是1。
通过 modify 命并发度修改为4和3.可以看到每次 modily 命都会触发一次 savepoint
查看 JobManager 的日志,可以看到:
2019-03-2810:33:11.179INFo
(6)info
拷贝输出的 json 内容,粘贴到这个网站 ison : http://flink.ap org/visualizer
可以和实际运行的物理执行计划对比:
(7) yarn per-job
单任务 attach 模式
默认是 attach 模式,即客户端会一等待值到程序结束才会退出
通过 myam- -cluster 指定 yam 模式
客户端能看到结果输出
yam 上显示 Flink session cluster,这个 batch 任务行完会 FINISHED
[adminz17.sqa. zth /home/admin/llink/flink-1.7.2)
Secho SHADOOP CONF_DIR
/etc/hadoop/cont/
[adminez17. sqa. zth /home/admin/ink/llink-1.7.2)
s /bin/ run -m yarn-cluster Jexamplesbatch/WordCount in
单任务 detached
由于是 detached 模式,客户端提交完任务就退出了
yarn 上显示为 Flink per-job- cluster
S.
/bin/flink run-yd-m
(8)启动 session
/bin/yam-session.sh -tm 2048-s 3
表示动一个 yarn session 集群,每个 TM 的内存是2G,每个 TM 有3个 slot。(-n参数不生效)
客户端默认是 attach 模式,不会退出
a.可以 ctr+c 退出,然后再通过 bin/yar--sessionsh-id
application1532332183347_0708
连上来。
b.或者启动的时候用 d 则为 detached 模式
yam 上显示为 Flink session cluster
/tmp
下生成了一个文件
提交任务
/bin/flink run Jexamples/batch/WordCount.jar
将会根据 /tmp/yam -properties-admin--文件内容提交到了刚动的 session
运行结束后 TM 的资源会释放。
提交到指定的 session
通过 yid 参数来提交到指定的 session
blink 的 session 与 fink 的 session 的区别:
flink session-n 参数不生效,而且不会提前启动 TM
blink 的 session 可以通过-n指定启动多少个 TM 而且 TM 会提前起来
(9) Scala Shell
官方文档 :
https: llci.apache org/projects/flink/flink-./scala shell. html.org/projectsfikfin-docs-rease-.7oosscalashell.html
Deploy
任务运行说明
a.Batch 任务内置了 benv 变量,通 过 prit 结果输出到控制台
b. Streaming 任务内置了 senv 变量,通过 env v.executeob name )来提交任务且 Datastream的输出只有在 local 模式下打印到控制台。
remote
先启动一个 yam session cluster
. /bin/yamn-session.sh -Im 2048-s 3
(10) yarn
按 CTRL+C 退出后,这个 flink cluster 还会继续运行不会退出。
(11) Execute
Dataset
flink--1.7.2bin/stop-cluster-.sh
No taskexecutor daemon to stop on host zkb-MBP.local.
(that,)
(the,1)
to,2)
对 DataSet 任务来说 print() 会触发任务的执行。
也可以将结果输出到文件(先删除 /tmp/ou1, 不然会报错同名文件已经存在),继续执行以
下命令:
scala> counts writeAsText("/mp/out1")
res1: org. apache. flink. api.java.operatorsDataSink[(String. Int)] DataSink'<unnameds'
ITextOutputFormat (/mp/out1)-UTF-8)
scala> benv.execute("batch test")
res2: org apache flink. api. common.JobExecutionResult=
org-apache.fink.api. common.JobExecutionRes@737652a9
查看/tmp/out1文件就能看到输出结果。
4flink-1.7.2 cat /tmp/out1
be,2)
(is,1)
(not,)
(or1)
(question,1)
(that, 1)
(the,1)
(to,2)
(12)DataStream
(13)TableAPI
在 Blink 开源版本里面,支持了 TableAPI 方式提交任务(可以用 btenv sqlQuery提交 sq 查询)
社区版本1.8会支持 tableapi: https: /ssuesssues
radira/browse/FLINK-9555
SQL Client Beta
SOL Client 目前还只是测试版,处干开发阶段只能用于 SQL 的原型验证不能在生产环境使用。