Apache Flink 零基础入门(四):客户端操作的 5 种模式

本文涉及的产品
实时计算 Flink 版,1000CU*H 3个月
简介: 本文主要分享 Flink 的 5 种任务提交的方式。熟练掌握各种任务提交方式,有利于提高我们日常的开发和运维效率。

作者:周凯波(宝牛)

1.环境说明

在前面几期的课程里面讲过了 Flink 开发环境的搭建和应用的部署以及运行,今天的课程主要是讲 Flink 的客户端操作。本次讲解以实际操作为主。这次课程是基于社区的 Flink 1.7.2 版本,操作系统是 Mac 系统,浏览器是 Google Chrome 浏览器。有关开发环境的准备和集群的部署,请参考「开发环境搭建和应用的配置、部署及运行」的内容。

2.课程概要

如下图所示,Flink 提供了丰富的客户端操作来提交任务和与任务进行交互,包括 Flink 命令行,Scala Shell,SQL Client,Restful API 和 Web。Flink 首先提供的最重要的是命令行,其次是 SQL Client 用于提交 SQL 任务的运行,还有就是 Scala Shell 提交 Table API 的任务。同时,Flink 也提供了Restful 服务,用户可以通过 http 方式进行调用。此外,还有 Web 的方式可以提交任务。

flink_clients.png

在 Flink 安装目录的 bin 目录下面可以看到有 flink, start-scala-shell.sh 和 sql-client.sh 等文件,这些都是客户端操作的入口。

flink_1_7_2.jpg

3.Flink 客户端操作

3.1 Flink 命令行

Flink 的命令行参数很多,输入 flink - h 能看到完整的说明:

  flink-1.7.2 bin/flink -h

如果想看某一个命令的参数,比如 Run 命令,输入:

  flink-1.7.2 bin/flink run -h

本文主要讲解常见的一些操作,更详细的文档请参考: Flink 命令行官方文档

3.1.1 Standalone

首先启动一个 Standalone 的集群:

  flink-1.7.2 bin/start-cluster.sh
Starting cluster.
Starting standalonesession daemon on host zkb-MBP.local.
Starting taskexecutor daemon on host zkb-MBP.local.

打开 http://127.0.0.1:8081 能看到 Web 界面。

Run

运行任务,以 Flink 自带的例子 TopSpeedWindowing 为例:

  flink-1.7.2 bin/flink run -d examples/streaming/TopSpeedWindowing.jar
Starting execution of program
Executing TopSpeedWindowing example with default input data set.
Use --input to specify file input.
Printing result to stdout. Use --output to specify output path.
Job has been submitted with JobID 5e20cb6b0f357591171dfcca2eea09de

运行起来后默认是 1 个并发:

flink_run_1.jpg

点左侧「Task Manager」,然后点「Stdout」能看到输出日志:

flink_run_2.jpg

或者查看本地 Log 目录下的 *.out 文件:

flink_run_3.jpg

List

查看任务列表:

  flink-1.7.2 bin/flink list -m 127.0.0.1:8081
Waiting for response...
------------------ Running/Restarting Jobs -------------------
24.03.2019 10:14:06 : 5e20cb6b0f357591171dfcca2eea09de : CarTopSpeedWindowingExample (RUNNING)
--------------------------------------------------------------
No scheduled jobs.

Stop

停止任务。通过 -m 来指定要停止的 JobManager 的主机地址和端口。

  flink-1.7.2 bin/flink stop -m 127.0.0.1:8081 d67420e52bd051fae2fddbaa79e046bb
Stopping job d67420e52bd051fae2fddbaa79e046bb.
------------------------------------------------------------
The program finished with the following exception:
  org.apache.flink.util.FlinkException: Could not stop the job   d67420e52bd051fae2fddbaa79e046bb.
  at org.apache.flink.client.cli.CliFrontend.lambda$stop$5(CliFrontend.java:554)
  at org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:985)
  at org.apache.flink.client.cli.CliFrontend.stop(CliFrontend.java:547)
  at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1062)
  at org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1126)
  at java.security.AccessController.doPrivileged(Native Method)
  at javax.security.auth.Subject.doAs(Subject.java:422)
  at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1836)
  at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
  at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1126)
Caused by: java.util.concurrent.ExecutionException: org.apache.flink.runtime.rest.util.RestClientException: [Job termination (STOP) failed: This job is not stoppable.]
  at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
  at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915)
  at org.apache.flink.client.program.rest.RestClusterClient.stop(RestClusterClient.java:392)
  at org.apache.flink.client.cli.CliFrontend.lambda$stop$5(CliFrontend.java:552)
... 9 more
Caused by: org.apache.flink.runtime.rest.util.RestClientException: [Job termination (STOP) failed: This job is not stoppable.]
  at org.apache.flink.runtime.rest.RestClient.parseResponse(RestClient.java:380)
  at org.apache.flink.runtime.rest.RestClient.lambda$submitRequest$3(RestClient.java:364)
  at java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:952)
  at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:926)
  at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)
  at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
  at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
  at java.lang.Thread.run(Thread.java:748)

从日志里面能看出 Stop 命令执行失败了。一个 Job 能够被 Stop 要求所有的 Source 都是可以 Stoppable 的,即实现了 StoppableFunction 接口。

 
/**
 * 需要能 stoppable 的函数必须实现这个接口,例如流式任务的 source。
 * stop() 方法在任务收到 STOP 信号的时候调用。
 * source 在接收到这个信号后,必须停止发送新的数据且优雅的停止。
 */
@PublicEvolving
public interface StoppableFunction {
    /**
      * 停止 source。与 cancel() 不同的是,这是一个让 source 优雅停止的请求。
     * 等待中的数据可以继续发送出去,不需要立即停止。
      */
    void stop();
}

Cancel

取消任务。如果在 conf/flink-conf.yaml 里面配置了 state.savepoints.dir,会保存 Savepoint,否则不会保存 Savepoint。

  flink-1.7.2 bin/flink cancel -m 127.0.0.1:8081 5e20cb6b0f357591171dfcca2eea09de
 
Cancelling job 5e20cb6b0f357591171dfcca2eea09de.
Cancelled job 5e20cb6b0f357591171dfcca2eea09de.

也可以在停止的时候显示指定 Savepoint 目录。

  flink-1.7.2 bin/flink cancel -m 127.0.0.1:8081 -s /tmp/savepoint 29da945b99dea6547c3fbafd57ed8759
 
Cancelling job 29da945b99dea6547c3fbafd57ed8759 with savepoint to /tmp/savepoint.
Cancelled job 29da945b99dea6547c3fbafd57ed8759. Savepoint stored in file:/tmp/savepoint/savepoint-29da94-88299bacafb7.
 
  flink-1.7.2 ll /tmp/savepoint/savepoint-29da94-88299bacafb7
total 32K
-rw-r--r-- 1 baoniu 29K Mar 24 10:33 _metadata

取消和停止(流作业)的区别如下:

  • cancel() 调用,立即调用作业算子的 cancel() 方法,以尽快取消它们。如果算子在接到 cancel() 调用后没有停止,Flink 将开始定期中断算子线程的执行,直到所有算子停止为止。
  • stop() 调用,是更优雅的停止正在运行流作业的方式。stop() 仅适用于 Source 实现了 StoppableFunction 接口的作业。当用户请求停止作业时,作业的所有 Source 都将接收 stop() 方法调用。直到所有 Source 正常关闭时,作业才会正常结束。这种方式,使作业正常处理完所有作业。

Savepoint

触发 Savepoint。

  flink-1.7.2 bin/flink savepoint -m 127.0.0.1:8081 ec53edcfaeb96b2a5dadbfbe5ff62bbb /tmp/savepoint
Triggering savepoint for job ec53edcfaeb96b2a5dadbfbe5ff62bbb.
Waiting for response...
Savepoint completed. Path: file:/tmp/savepoint/savepoint-ec53ed-84b00ce500ee
You can resume your program from this savepoint with the run command.

说明:Savepoint 和 Checkpoint 的区别(详见文档):

  • Checkpoint 是增量做的,每次的时间较短,数据量较小,只要在程序里面启用后会自动触发,用户无须感知;Checkpoint 是作业 failover 的时候自动使用,不需要用户指定。
  • Savepoint 是全量做的,每次的时间较长,数据量较大,需要用户主动去触发。Savepoint 一般用于程序的版本更新(详见文档),Bug 修复,A/B Test 等场景,需要用户指定。

通过 -s 参数从指定的 Savepoint 启动:

  flink-1.7.2 bin/flink run -d -s /tmp/savepoint/savepoint-f049ff-24ec0d3e0dc7 ./examples/streaming/TopSpeedWindowing.jar
Starting execution of program
Executing TopSpeedWindowing example with default input data set.
Use --input to specify file input.
Printing result to stdout. Use --output to specify output path.

查看 JobManager 的日志,能够看到类似这样的 Log:

2019-03-28 10:30:53,957 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     
- Starting job 790d7b98db6f6af55d04aec1d773852d from savepoint /tmp/savepoint/savepoint-f049ff-24ec0d3e0dc7 ()
2019-03-28 10:30:53,959 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator    
 - Reset the checkpoint ID of job 790d7b98db6f6af55d04aec1d773852d to 2.
2019-03-28 10:30:53,959 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     
- Restoring job 790d7b98db6f6af55d04aec1d773852d from latest valid checkpoint: Checkpoint 1 @ 0 for 790d7b98db6f6af55d04aec1d773852d.

Modify

修改任务并行度。

为了方便演示,我们修改 conf/flink-conf.yaml 将 Task Slot 数从默认的 1 改为 4,并配置 Savepoint 目录。(Modify 参数后面接 -s 指定 Savepoint 路径当前版本可能有 Bug,提示无法识别)

taskmanager.numberOfTaskSlots: 4
state.savepoints.dir: file:///tmp/savepoint

修改参数后需要重启集群生效,然后再启动任务:

  flink-1.7.2 bin/stop-cluster.sh && bin/start-cluster.sh
Stopping taskexecutor daemon (pid: 53139) on host zkb-MBP.local.
Stopping standalonesession daemon (pid: 52723) on host zkb-MBP.local.
Starting cluster.
Starting standalonesession daemon on host zkb-MBP.local.
Starting taskexecutor daemon on host zkb-MBP.local.
 
  flink-1.7.2 bin/flink run -d examples/streaming/TopSpeedWindowing.jar
Starting execution of program
Executing TopSpeedWindowing example with default input data set.
Use --input to specify file input.
Printing result to stdout. Use --output to specify output path.
Job has been submitted with JobID 7752ea7b0e7303c780de9d86a5ded3fa

从页面上能看到 Task Slot 变为了 4,这时候任务的默认并发度是 1。

standalone-modify-1.jpg

standalone-modify-2.jpg

通过 Modify 命令依次将并发度修改为 4 和 3,可以看到每次 Modify 命令都会触发一次 Savepoint。

  flink-1.7.2 bin/flink modify -p 4 7752ea7b0e7303c780de9d86a5ded3fa
Modify job 7752ea7b0e7303c780de9d86a5ded3fa.
Rescaled job 7752ea7b0e7303c780de9d86a5ded3fa. Its new parallelism is 4.
 
  flink-1.7.2 ll /tmp/savepoint
total 0
drwxr-xr-x 3 baoniu 96 Jun 17 09:05 savepoint-7752ea-00c05b015836/
 
  flink-1.7.2 bin/flink modify -p 3 7752ea7b0e7303c780de9d86a5ded3fa
Modify job 7752ea7b0e7303c780de9d86a5ded3fa.
Rescaled job 7752ea7b0e7303c780de9d86a5ded3fa. Its new parallelism is 3.
 
  flink-1.7.2 ll /tmp/savepoint
total 0
drwxr-xr-x 3 baoniu 96 Jun 17 09:08 savepoint-7752ea-449b131b2bd4/

standalone-modify-3.jpg

查看 JobManager 的日志,可以看到:

2019-06-17 09:05:11,179 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Starting job 7752ea7b0e7303c780de9d86a5ded3fa from savepoint file:/tmp/savepoint/savepoint-790d7b-3581698f007e ()
2019-06-17 09:05:11,182 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Reset the checkpoint ID of job 7752ea7b0e7303c780de9d86a5ded3fa to 3.
2019-06-17 09:05:11,182 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Restoring job 790d7b98db6f6af55d04aec1d773852d from latest valid checkpoint: Checkpoint 2 @ 0 for 7752ea7b0e7303c780de9d86a5ded3fa.
2019-06-17 09:05:11,184 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - No master state to restore
2019-06-17 09:05:11,184 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Job CarTopSpeedWindowingExample (7752ea7b0e7303c780de9d86a5ded3fa) switched from state RUNNING to SUSPENDING.
org.apache.flink.util.FlinkException: Job is being rescaled.

Info

Info 命令是用来查看 Flink 任务的执行计划(StreamGraph)的。

  flink-1.7.2 bin/flink info examples/streaming/TopSpeedWindowing.jar
----------------------- Execution Plan -----------------------
{"nodes":[{"id":1,"type":"Source: Custom Source","pact":"Data Source","contents":"Source: Custom Source","parallelism":1},{"id":2,"type":"Timestamps/Watermarks","pact":"Operator","contents":"Timestamps/Watermarks","parallelism":1,"predecessors":[{"id":1,"ship_strategy":"FORWARD","side":"second"}]},{"id":4,"type":"Window(GlobalWindows(), DeltaTrigger, TimeEvictor, ComparableAggregator, PassThroughWindowFunction)","pact":"Operator","contents":"Window(GlobalWindows(), DeltaTrigger, TimeEvictor, ComparableAggregator, PassThroughWindowFunction)","parallelism":1,"predecessors":[{"id":2,"ship_strategy":"HASH","side":"second"}]},{"id":5,"type":"Sink: Print to Std. Out","pact":"Data Sink","contents":"Sink: Print to Std. Out","parallelism":1,"predecessors":[{"id":4,"ship_strategy":"FORWARD","side":"second"}]}]}
--------------------------------------------------------------

拷贝输出的 Json 内容,粘贴到这个网站:http://flink.apache.org/visualizer/

visualizer.jpg

可以和实际运行的物理执行计划对比:

physical-execute-plan.jpg

3.1.2 Yarn per-job

单任务 Attach 模式

默认是 Attach 模式,即客户端会一直等待直到程序结束才会退出。

  • 通过 -m yarn-cluster 指定 Yarn 模式
  • Yarn 上显示名字为 Flink session cluster,这个 Batch 的 Wordcount 任务运行完会 FINISHED。
  • 客户端能看到结果输出
[admin@z17.sqa.zth /home/admin/flink/flink-1.7.2]
$echo $HADOOP_CONF_DIR
/etc/hadoop/conf/
 
[admin@z17.sqa.zth /home/admin/flink/flink-1.7.2]
$./bin/flink run -m yarn-cluster ./examples/batch/WordCount.jar
 
2019-06-17 09:15:24,511 INFO  org.apache.hadoop.yarn.client.RMProxy                         - Connecting to ResourceManager at z05c05217.sqa.zth.tbsite.net/11.163.188.29:8050
2019-06-17 09:15:24,690 INFO  org.apache.flink.yarn.cli.FlinkYarnSessionCli                 - No path for the flink jar passed. Using the location of class org.apache.flink.yarn.YarnClusterDescriptor to locate the jar
2019-06-17 09:15:24,690 INFO  org.apache.flink.yarn.cli.FlinkYarnSessionCli                 - No path for the flink jar passed. Using the location of class org.apache.flink.yarn.YarnClusterDescriptor to locate the jar
2019-06-17 09:15:24,907 INFO  org.apache.flink.yarn.AbstractYarnClusterDescriptor           - Cluster specification: ClusterSpecification{masterMemoryMB=1024, taskManagerMemoryMB=1024, numberTaskManagers=1, slotsPerTaskManager=4}
2019-06-17 09:15:25,430 WARN  org.apache.hadoop.hdfs.shortcircuit.DomainSocketFactory       - The short-circuit local reads feature cannot be used because libhadoop cannot be loaded.
2019-06-17 09:15:25,438 WARN  org.apache.flink.yarn.AbstractYarnClusterDescriptor           - The configuration directory ('/Users/baoniu/Documents/work/tool/flink/flink-1.7.2/conf') contains both LOG4J and Logback configuration files. Please delete or rename one of them.
2019-06-17 09:15:36,239 INFO  org.apache.flink.yarn.AbstractYarnClusterDescriptor           - Submitting application master application_1532332183347_0724
2019-06-17 09:15:36,276 INFO  org.apache.hadoop.yarn.client.api.impl.YarnClientImpl         - Submitted application application_1532332183347_0724
2019-06-17 09:15:36,276 INFO  org.apache.flink.yarn.AbstractYarnClusterDescriptor           - Waiting for the cluster to be allocated
2019-06-17 09:15:36,281 INFO  org.apache.flink.yarn.AbstractYarnClusterDescriptor           - Deploying cluster, current state ACCEPTED
2019-06-17 09:15:40,426 INFO  org.apache.flink.yarn.AbstractYarnClusterDescriptor           - YARN application has been deployed successfully.
Starting execution of program
Executing WordCount example with default input data set.
Use --input to specify file input.
Printing result to stdout. Use --output to specify output path.
(a,5)
(action,1)
(after,1)
(against,1)
(all,2)
... ...
(would,2)
(wrong,1)
(you,1)
Program execution finished
Job with JobID 8bfe7568cb5c3254af30cbbd9cd5971e has finished.
Job Runtime: 9371 ms
Accumulator Results:
- 2bed2c5506e9237fb85625416a1bc508 (java.util.ArrayList) [170 elements]

yarn-1.jpg

yarn-2.jpg

如果我们以 Attach 模式运行 Streaming 的任务,客户端会一直等待不退出,可以运行以下的例子试验下:

./bin/flink run -m yarn-cluster ./examples/streaming/TopSpeedWindowing.jar

单任务 Detached 模式

  • 由于是 Detached 模式,客户端提交完任务就退出了
  • Yarn 上显示为 Flink per-job cluster
$./bin/flink run -yd -m yarn-cluster ./examples/streaming/TopSpeedWindowing.jar
 
2019-06-18 09:21:59,247 INFO  org.apache.hadoop.yarn.client.RMProxy                         - Connecting to ResourceManager at z05c05217.sqa.zth.tbsite.net/11.163.188.29:8050
2019-06-18 09:21:59,428 INFO  org.apache.flink.yarn.cli.FlinkYarnSessionCli                 - No path for the flink jar passed. Using the location of class org.apache.flink.yarn.YarnClusterDescriptor to locate the jar
2019-06-18 09:21:59,428 INFO  org.apache.flink.yarn.cli.FlinkYarnSessionCli                 - No path for the flink jar passed. Using the location of class org.apache.flink.yarn.YarnClusterDescriptor to locate the jar
2019-06-18 09:21:59,940 INFO  org.apache.flink.yarn.AbstractYarnClusterDescriptor           - Cluster specification: ClusterSpecification{masterMemoryMB=1024, taskManagerMemoryMB=1024, numberTaskManagers=1, slotsPerTaskManager=4}
2019-06-18 09:22:00,427 WARN  org.apache.hadoop.hdfs.shortcircuit.DomainSocketFactory       - The short-circuit local reads feature cannot be used because libhadoop cannot be loaded.
2019-06-18 09:22:00,436 WARN  org.apache.flink.yarn.AbstractYarnClusterDescriptor           - The configuration directory ('/Users/baoniu/Documents/work/tool/flink/flink-1.7.2/conf') contains both LOG4J and Logback configuration files. Please delete or rename one of them.
^@2019-06-18 09:22:12,113 INFO  org.apache.flink.yarn.AbstractYarnClusterDescriptor           - Submitting application master application_1532332183347_0729
2019-06-18 09:22:12,151 INFO  org.apache.hadoop.yarn.client.api.impl.YarnClientImpl         - Submitted application application_1532332183347_0729
2019-06-18 09:22:12,151 INFO  org.apache.flink.yarn.AbstractYarnClusterDescriptor           - Waiting for the cluster to be allocated
2019-06-18 09:22:12,155 INFO  org.apache.flink.yarn.AbstractYarnClusterDescriptor           - Deploying cluster, current state ACCEPTED
2019-06-18 09:22:16,275 INFO  org.apache.flink.yarn.AbstractYarnClusterDescriptor           - YARN application has been deployed successfully.
2019-06-18 09:22:16,275 INFO  org.apache.flink.yarn.AbstractYarnClusterDescriptor           - The Flink YARN client has been started in detached mode. In order to stop Flink on YARN, use the following command or a YARN web interface to stop it:
yarn application -kill application_1532332183347_0729
Please also note that the temporary files of the YARN session in the home directory will not be removed.
Job has been submitted with JobID e61b9945c33c300906ad50a9a11f36df

yarn-detached-1.jpg

yarn-detached-2.jpg

3.1.3 Yarn session

启动 Session

./bin/yarn-session.sh -tm 2048 -s 3

表示启动一个 Yarn session 集群,每个 TM 的内存是 2 G,每个 TM 有 3 个 Slot。(注意:-n 参数不生效)

  flink-1.7.2 ./bin/yarn-session.sh -tm 2048 -s 3
2019-06-17 09:21:50,177 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: jobmanager.rpc.address, localhost
2019-06-17 09:21:50,179 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: jobmanager.rpc.port, 6123
2019-06-17 09:21:50,179 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: jobmanager.heap.size, 1024m
2019-06-17 09:21:50,179 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: taskmanager.heap.size, 1024m
2019-06-17 09:21:50,179 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: taskmanager.numberOfTaskSlots, 4
2019-06-17 09:21:50,179 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: state.savepoints.dir, file:///tmp/savepoint
2019-06-17 09:21:50,180 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: parallelism.default, 1
2019-06-17 09:21:50,180 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: rest.port, 8081
2019-06-17 09:21:50,644 WARN  org.apache.hadoop.util.NativeCodeLoader                       - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
2019-06-17 09:21:50,746 INFO  org.apache.flink.runtime.security.modules.HadoopModule        - Hadoop user set to baoniu (auth:SIMPLE)
2019-06-17 09:21:50,848 INFO  org.apache.hadoop.yarn.client.RMProxy                         - Connecting to ResourceManager at z05c05217.sqa.zth.tbsite.net/11.163.188.29:8050
2019-06-17 09:21:51,148 INFO  org.apache.flink.yarn.AbstractYarnClusterDescriptor           - Cluster specification: ClusterSpecification{masterMemoryMB=1024, taskManagerMemoryMB=2048, numberTaskManagers=1, slotsPerTaskManager=3}
2019-06-17 09:21:51,588 WARN  org.apache.hadoop.hdfs.shortcircuit.DomainSocketFactory       - The short-circuit local reads feature cannot be used because libhadoop cannot be loaded.
2019-06-17 09:21:51,596 WARN  org.apache.flink.yarn.AbstractYarnClusterDescriptor           - The configuration directory ('/Users/baoniu/Documents/work/tool/flink/flink-1.7.2/conf') contains both LOG4J and Logback configuration files. Please delete or rename one of them.
^@2019-06-17 09:22:03,304 INFO  org.apache.flink.yarn.AbstractYarnClusterDescriptor           - Submitting application master application_1532332183347_0726
2019-06-17 09:22:03,336 INFO  org.apache.hadoop.yarn.client.api.impl.YarnClientImpl         - Submitted application application_1532332183347_0726
2019-06-17 09:22:03,336 INFO  org.apache.flink.yarn.AbstractYarnClusterDescriptor           - Waiting for the cluster to be allocated
2019-06-17 09:22:03,340 INFO  org.apache.flink.yarn.AbstractYarnClusterDescriptor           - Deploying cluster, current state ACCEPTED
2019-06-17 09:22:07,722 INFO  org.apache.flink.yarn.AbstractYarnClusterDescriptor           - YARN application has been deployed successfully.
2019-06-17 09:22:08,050 INFO  org.apache.flink.runtime.rest.RestClient                      - Rest client endpoint started.
Flink JobManager is now running on z07.sqa.net:37109 with leader id 00000000-0000-0000-0000-000000000000.
JobManager Web Interface: http://z07.sqa.net:37109

客户端默认是 Attach 模式,不会退出:

  • 可以 ctrl + c 退出,然后再通过 ./bin/yarn-session.sh -id application_1532332183347_0726 连上来;
  • 或者启动的时候用 -d 则为 detached 模式
    Yarn 上显示为 Flink session cluster;

yarn-session-1.jpg

yarn-session-2.jpg

  • 在本机的临时目录(有些机器是 /tmp 目录)下会生成一个文件:
  flink-1.7.2 cat /var/folders/2b/r6d49pcs23z43b8fqsyz885c0000gn/T/.yarn-properties-baoniu
#Generated YARN properties file
#Mon Jun 17 09:22:08 CST 2019
parallelism=3
dynamicPropertiesString=
applicationID=application_1532332183347_0726

提交任务

./bin/flink run ./examples/batch/WordCount.jar

将会根据 /tmp/.yarn-properties-admin 文件内容提交到了刚启动的 Session。

  flink-1.7.2 ./bin/flink run ./examples/batch/WordCount.jar
2019-06-17 09:26:42,767 INFO  org.apache.flink.yarn.cli.FlinkYarnSessionCli                 - Found Yarn properties file under /var/folders/2b/r6d49pcs23z43b8fqsyz885c0000gn/T/.yarn-properties-baoniu.
2019-06-17 09:26:42,767 INFO  org.apache.flink.yarn.cli.FlinkYarnSessionCli                 - Found Yarn properties file under /var/folders/2b/r6d49pcs23z43b8fqsyz885c0000gn/T/.yarn-properties-baoniu.
2019-06-17 09:26:43,058 INFO  org.apache.flink.yarn.cli.FlinkYarnSessionCli                 - YARN properties set default parallelism to 3
2019-06-17 09:26:43,058 INFO  org.apache.flink.yarn.cli.FlinkYarnSessionCli                 - YARN properties set default parallelism to 3
YARN properties set default parallelism to 3
2019-06-17 09:26:43,097 INFO  org.apache.hadoop.yarn.client.RMProxy                         - Connecting to ResourceManager at z05c05217.sqa.zth.tbsite.net/11.163.188.29:8050
2019-06-17 09:26:43,229 INFO  org.apache.flink.yarn.cli.FlinkYarnSessionCli                 - No path for the flink jar passed. Using the location of class org.apache.flink.yarn.YarnClusterDescriptor to locate the jar
2019-06-17 09:26:43,229 INFO  org.apache.flink.yarn.cli.FlinkYarnSessionCli                 - No path for the flink jar passed. Using the location of class org.apache.flink.yarn.YarnClusterDescriptor to locate the jar
2019-06-17 09:26:43,327 INFO  org.apache.flink.yarn.AbstractYarnClusterDescriptor           - Found application JobManager host name 'z05c07216.sqa.zth.tbsite.net' and port '37109' from supplied application id 'application_1532332183347_0726'
Starting execution of program
Executing WordCount example with default input data set.
Use --input to specify file input.
Printing result to stdout. Use --output to specify output path.
^@(a,5)
(action,1)
(after,1)
(against,1)
(all,2)
(and,12)
... ...
(wrong,1)
(you,1)
Program execution finished
Job with JobID ad9b0f1feed6d0bf6ba4e0f18b1e65ef has finished.
Job Runtime: 9152 ms
Accumulator Results:
- fd07c75d503d0d9a99e4f27dd153114c (java.util.ArrayList) [170 elements]

运行结束后 TM 的资源会释放。

session-sumit-job1.jpg

提交到指定的 Session

通过 -yid 参数来提交到指定的 Session。

$./bin/flink run -d -p 30 -m yarn-cluster -yid application_1532332183347_0708 ./examples/streaming/TopSpeedWindowing.jar
 
2019-03-24 12:36:33,668 INFO  org.apache.hadoop.yarn.client.RMProxy                         - Connecting to ResourceManager at z05c05217.sqa.zth.tbsite.net/11.163.188.29:8050
2019-03-24 12:36:33,773 INFO  org.apache.flink.yarn.cli.FlinkYarnSessionCli                 - No path for the flink jar passed. Using the location of class org.apache.flink.yarn.YarnClusterDescriptor to locate the jar
2019-03-24 12:36:33,773 INFO  org.apache.flink.yarn.cli.FlinkYarnSessionCli                 - No path for the flink jar passed. Using the location of class org.apache.flink.yarn.YarnClusterDescriptor to locate the jar
2019-03-24 12:36:33,837 INFO  org.apache.flink.yarn.AbstractYarnClusterDescriptor           - Found application JobManager host name 'z05c05218.sqa.zth.tbsite.net' and port '60783' from supplied application id 'application_1532332183347_0708'
Starting execution of program
Executing TopSpeedWindowing example with default input data set.
Use --input to specify file input.
Printing result to stdout. Use --output to specify output path.
Job has been submitted with JobID 58d5049ebbf28d515159f2f88563f5fd

yarn-specifiy-session-1.jpg

注:Blink版本 的 Session 与 Flink 的 Session 的区别:

  • Flink 的 session -n 参数不生效,而且不会提前启动 TM;
  • Blink 的 session 可以通过 -n 指定启动多少个 TM,而且 TM 会提前起来;

3.2 Scala Shell

官方文档:https://ci.apache.org/projects/flink/flink-docs-release-1.7/ops/scala_shell.html

3.2.1 Deploy

Local

$bin/start-scala-shell.sh local
Starting Flink Shell:
Starting local Flink cluster (host: localhost, port: 8081).
Connecting to Flink cluster (host: localhost, port: 8081).
... ...
scala>

任务运行说明:

  • Batch 任务内置了 benv 变量,通过 print() 将结果输出到控制台;
  • Streaming 任务内置了 senv 变量,通过 senv.execute("job name") 来提交任务,且 Datastream 的输出只有在 Local 模式下打印到控制台;

Remote

先启动一个 yarn session cluster:

$./bin/yarn-session.sh  -tm 2048 -s 3

2019-03-25 09:52:16,341 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: jobmanager.rpc.address, localhost
2019-03-25 09:52:16,342 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: jobmanager.rpc.port, 6123
2019-03-25 09:52:16,342 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: jobmanager.heap.size, 1024m
2019-03-25 09:52:16,343 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: taskmanager.heap.size, 1024m
2019-03-25 09:52:16,343 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: taskmanager.numberOfTaskSlots, 4
2019-03-25 09:52:16,343 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: parallelism.default, 1
2019-03-25 09:52:16,343 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: state.savepoints.dir, file:///tmp/savepoint
2019-03-25 09:52:16,343 INFO  org.apache.flink.configuration.GlobalConfiguration            
… ...
Flink JobManager is now running on z054.sqa.net:28665 with leader id 00000000-0000-0000-0000-000000000000.
JobManager Web Interface: http://z054.sqa.net:28665

启动 scala shell,连到 jm:

$bin/start-scala-shell.sh remote z054.sqa.net 28665
Starting Flink Shell:
Connecting to Flink cluster (host: z054.sqa.net, port: 28665).
... ...
scala>

Yarn

$./bin/start-scala-shell.sh yarn -n 2 -jm 1024 -s 2 -tm 1024 -nm flink-yarn

Starting Flink Shell:
2019-03-25 09:47:44,695 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: jobmanager.rpc.address, localhost
2019-03-25 09:47:44,697 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: jobmanager.rpc.port, 6123
2019-03-25 09:47:44,697 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: jobmanager.heap.size, 1024m
2019-03-25 09:47:44,697 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: taskmanager.heap.size, 1024m
2019-03-25 09:47:44,697 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: taskmanager.numberOfTaskSlots, 4
2019-03-25 09:47:44,698 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: parallelism.default, 1
2019-03-25 09:47:44,698 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: state.savepoints.dir, file:///tmp/savepoint
2019-03-25 09:47:44,698 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: rest.port, 8081
2019-03-25 09:47:44,717 INFO  org.apache.flink.yarn.cli.FlinkYarnSessionCli                 - Found Yarn properties file under /tmp/.yarn-properties-admin.
2019-03-25 09:47:45,041 INFO  org.apache.hadoop.yarn.client.RMProxy                         - Connecting to ResourceManager at z05c05217.sqa.zth.tbsite.net/11.163.188.29:8050
2019-03-25 09:47:45,098 WARN  org.apache.hadoop.util.NativeCodeLoader                       - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
2019-03-25 09:47:45,266 INFO  org.apache.flink.yarn.cli.FlinkYarnSessionCli                 - No path for the flink jar passed. Using the location of class org.apache.flink.yarn.YarnClusterDescriptor to locate the jar
2019-03-25 09:47:45,275 INFO  org.apache.flink.yarn.cli.FlinkYarnSessionCli                 - The argument yn is deprecated in will be ignored.
2019-03-25 09:47:45,357 INFO  org.apache.flink.yarn.AbstractYarnClusterDescriptor           - Cluster specification: ClusterSpecification{masterMemoryMB=1024, taskManagerMemoryMB=1024, numberTaskManagers=2, slotsPerTaskManager=2}
2019-03-25 09:47:45,711 WARN  org.apache.hadoop.hdfs.shortcircuit.DomainSocketFactory       - The short-circuit local reads feature cannot be used because libhadoop cannot be loaded.
2019-03-25 09:47:45,718 WARN  org.apache.flink.yarn.AbstractYarnClusterDescriptor           - The configuration directory ('/home/admin/flink/flink-1.7.2/conf') contains both LOG4J and Logback configuration files. Please delete or rename one of them.
2019-03-25 09:47:46,514 INFO  org.apache.flink.yarn.AbstractYarnClusterDescriptor           - Submitting application master application_1532332183347_0710
2019-03-25 09:47:46,534 INFO  org.apache.hadoop.yarn.client.api.impl.YarnClientImpl         - Submitted application application_1532332183347_0710
2019-03-25 09:47:46,534 INFO  org.apache.flink.yarn.AbstractYarnClusterDescriptor           - Waiting for the cluster to be allocated
2019-03-25 09:47:46,535 INFO  org.apache.flink.yarn.AbstractYarnClusterDescriptor           - Deploying cluster, current state ACCEPTED
2019-03-25 09:47:51,051 INFO  org.apache.flink.yarn.AbstractYarnClusterDescriptor           - YARN application has been deployed successfully.
2019-03-25 09:47:51,222 INFO  org.apache.flink.runtime.rest.RestClient                      - Rest client endpoint started.

Connecting to Flink cluster (host: 10.10.10.10, port: 56942).

scala-shell-yarn.jpg

按 CTRL + C 退出 Shell 后,这个 Flink cluster 还会继续运行,不会退出。

3.2.2 Execute

DataSet

  flink-1.7.2 bin/stop-cluster.sh
No taskexecutor daemon to stop on host zkb-MBP.local.
No standalonesession daemon to stop on host zkb-MBP.local.
  flink-1.7.2 bin/start-scala-shell.sh local
Starting Flink Shell:
Starting local Flink cluster (host: localhost, port: 8081).
Connecting to Flink cluster (host: localhost, port: 8081).

scala> val text = benv.fromElements("To be, or not to be,--that is the question:--")
text: org.apache.flink.api.scala.DataSet[String] = org.apache.flink.api.scala.DataSet@5b407336

scala> val counts = text.flatMap { _.toLowerCase.split("\\W+") }.map { (_, 1) }.groupBy(0).sum(1)
counts: org.apache.flink.api.scala.AggregateDataSet[(String, Int)] = org.apache.flink.api.scala.AggregateDataSet@6ee34fe4

scala> counts.print()
(be,2)
(is,1)
(not,1)
(or,1)
(question,1)
(that,1)
(the,1)
(to,2)

对 DataSet 任务来说,print() 会触发任务的执行。

scala-shell-dataset-run-1.jpg

也可以将结果输出到文件(先删除 /tmp/out1,不然会报错同名文件已经存在),继续执行以下命令:

scala> counts.writeAsText("/tmp/out1")
res1: org.apache.flink.api.java.operators.DataSink[(String, Int)] = DataSink '<unnamed>' (TextOutputFormat (/tmp/out1) - UTF-8)

scala> benv.execute("batch test")
res2: org.apache.flink.api.common.JobExecutionResult = org.apache.flink.api.common.JobExecutionResult@737652a9

查看 /tmp/out1 文件就能看到输出结果。

  flink-1.7.2 cat /tmp/out1
(be,2)
(is,1)
(not,1)
(or,1)
(question,1)
(that,1)
(the,1)
(to,2)

scala-shell-dataset-run-2.jpg

DataSteam

scala> val textStreaming = senv.fromElements("To be, or not to be,--that is the question:--")
textStreaming: org.apache.flink.streaming.api.scala.DataStream[String] = org.apache.flink.streaming.api.scala.DataStream@4970b93d

scala> val countsStreaming = textStreaming.flatMap { _.toLowerCase.split("\\W+") }.map { (_, 1) }.keyBy(0).sum(1)
countsStreaming: org.apache.flink.streaming.api.scala.DataStream[(String, Int)] = org.apache.flink.streaming.api.scala.DataStream@6a478680

scala> countsStreaming.print()
res3: org.apache.flink.streaming.api.datastream.DataStreamSink[(String, Int)] = org.apache.flink.streaming.api.datastream.DataStreamSink@42bfc11f

scala> senv.execute("Streaming Wordcount")
(to,1)
(be,1)
(or,1)
(not,1)
(to,2)
(be,2)
(that,1)
(is,1)
(the,1)
(question,1)
res4: org.apache.flink.api.common.JobExecutionResult = org.apache.flink.api.common.JobExecutionResult@1878815a

对 DataStream 任务,print() 并不会触发任务的执行,需要显示调用 execute(“job name”) 才会执行任务。

scala-shell-datastream-1.jpg

scala-shell-datastream-2.jpg

TableAPI

在 Blink 开源版本里面,支持了 TableAPI 方式提交任务(可以用 btenv.sqlQuery 提交 SQL 查询),社区版本 Flink 1.8 会支持 TableAPI: https://issues.apache.org/jira/browse/FLINK-9555

3.3 SQL Client Beta

SQL Client 目前还只是测试版,处于开发阶段,只能用于 SQL 的原型验证,不推荐在生产环境使用。

3.3.1 基本用法

  flink-1.7.2 bin/start-cluster.sh
Starting cluster.
Starting standalonesession daemon on host zkb-MBP.local.
Starting taskexecutor daemon on host zkb-MBP.local.

  flink-1.7.2 ./bin/sql-client.sh embedded
No default environment specified.
Searching for '/Users/baoniu/Documents/work/tool/flink/flink-1.7.2/conf/sql-client-defaults.yaml'...found.
Reading default environment from: file:/Users/baoniu/Documents/work/tool/flink/flink-1.7.2/conf/sql-client-defaults.yaml
No session environment specified.
Validating current environment...done.
… …

Flink SQL> help;
The following commands are available:

QUIT        Quits the SQL CLI client.
CLEAR        Clears the current terminal.
HELP        Prints the available commands.
SHOW TABLES        Shows all registered tables.
SHOW FUNCTIONS        Shows all registered user-defined functions.
DESCRIBE        Describes the schema of a table with the given name.
EXPLAIN        Describes the execution plan of a query or table with the given name.
SELECT        Executes a SQL SELECT query on the Flink cluster.
INSERT INTO        Inserts the results of a SQL SELECT query into a declared table sink.
CREATE VIEW        Creates a virtual table from a SQL query. Syntax: 'CREATE VIEW <name> AS <query>;'
DROP VIEW        Deletes a previously created virtual table. Syntax: 'DROP VIEW <name>;'
SOURCE        Reads a SQL SELECT query from a file and executes it on the Flink cluster.
SET        Sets a session configuration property. Syntax: 'SET <key>=<value>;'. Use 'SET;' for listing all properties.
RESET        Resets all session configuration properties.

Hint: Make sure that a statement ends with ';' for finalizing (multi-line) statements.

Select 查询

Flink SQL> SELECT 'Hello World';

sql-select.jpg

按 ”Q” 退出这个界面
打开 http://127.0.0.1:8081 能看到这条 Select 语句产生的查询任务已经结束了。这个查询采用的是读取固定数据集的 Custom Source,输出用的是 Stream Collect Sink,且只输出一条结果。

注意:如果本机的临时目录存在类似 .yarn-properties-baoniu 的文件,任务会提交到 Yarn 上。

sql-select-run-1.jpg

sql-select-run-2.jpg

Explain

Explain 命令可以查看 SQL 的执行计划。

Flink SQL> explain SELECT name, COUNT(*) AS cnt FROM (VALUES ('Bob'), ('Alice'), ('Greg'), ('Bob')) AS NameTable(name) GROUP BY name;

== Abstract Syntax Tree ==        // 抽象语法树
LogicalAggregate(group=[{0}], cnt=[COUNT()])
  LogicalValues(tuples=[[{ _UTF-16LE'Bob  ' }, { _UTF-16LE'Alice' }, { _UTF-16LE'Greg ' }, { _UTF-16LE'Bob  ' }]])

== Optimized Logical Plan ==     // 优化后的逻辑执行计划
DataStreamGroupAggregate(groupBy=[name], select=[name, COUNT(*) AS cnt])
  DataStreamValues(tuples=[[{ _UTF-16LE'Bob  ' }, { _UTF-16LE'Alice' }, { _UTF-16LE'Greg ' }, { _UTF-16LE'Bob  ' }]])

== Physical Execution Plan ==   // 物理执行计划
Stage 3 : Data Source
    content : collect elements with CollectionInputFormat

    Stage 5 : Operator
        content : groupBy: (name), select: (name, COUNT(*) AS cnt)
        ship_strategy : HASH

3.3.2 结果展示

SQL Client 支持两种模式来维护并展示查询结果:

  • table mode: 在内存中物化查询结果,并以分页 table 形式展示。用户可以通过以下命令启用 table mode;
 SET execution.result-mode=table
  • changlog mode: 不会物化查询结果,而是直接对 continuous query 产生的添加和撤回(retractions)结果进行展示。
SET execution.result-mode=changelog

接下来通过实际的例子进行演示。

Table mode

Flink SQL> SET execution.result-mode=table;
[INFO] Session property has been set.

Flink SQL> SELECT name, COUNT(*) AS cnt FROM (VALUES ('Bob'), ('Alice'), ('Greg'), ('Bob')) AS NameTable(name) GROUP BY name;

运行结果如下图所示:

sql-table-mode-1.jpg

sql-table-mode-2.jpg

sql-table-mode-3.jpg

Changlog mode

Flink SQL> SET execution.result-mode=changelog;
[INFO] Session property has been set.

Flink SQL> SELECT name, COUNT(*) AS cnt FROM (VALUES ('Bob'), ('Alice'), ('Greg'), ('Bob')) AS NameTable(name) GROUP BY name;

运行结果如下图所示:

sql-change-mode-1.jpg

其中 ‘-’ 代表的就是撤回消息。

sql-change-mode-2.jpg

sql-change-mode-3.jpg

3.3.3 Environment Files

目前的 SQL Client 还不支持 DDL 语句,只能通过 yaml 文件的方式来定义 SQL 查询需要的表,UDF 和运行参数等信息。

首先,准备 env.yaml 和 input.csv 两个文件。

  flink-1.7.2 cat /tmp/env.yaml
tables:
  - name: MyTableSource
    type: source-table
    update-mode: append
    connector:
      type: filesystem
      path: "/tmp/input.csv"
    format:
      type: csv
      fields:
        - name: MyField1
          type: INT
        - name: MyField2
          type: VARCHAR
      line-delimiter: "\n"
      comment-prefix: "#"
    schema:
      - name: MyField1
        type: INT
      - name: MyField2
        type: VARCHAR
  - name: MyCustomView
    type: view
    query: "SELECT MyField2 FROM MyTableSource"
  - name: MyTableSink
    type: sink-table
    update-mode: append
    connector:
      type: filesystem
      path: "/tmp/output.csv"
    format:
      type: csv
      fields:
        - name: MyField1
          type: INT
        - name: MyField2
          type: VARCHAR
    schema:
      - name: MyField1
        type: INT
      - name: MyField2
        type: VARCHAR
        
# Execution properties allow for changing the behavior of a table program.

execution:
  type: streaming                   # required: execution mode either 'batch' or 'streaming'
  result-mode: table                # required: either 'table' or 'changelog'
  max-table-result-rows: 1000000    # optional: maximum number of maintained rows in
                                    #   'table' mode (1000000 by default, smaller 1 means unlimited)
  time-characteristic: event-time   # optional: 'processing-time' or 'event-time' (default)
  parallelism: 1                    # optional: Flink's parallelism (1 by default)
  periodic-watermarks-interval: 200 # optional: interval for periodic watermarks (200 ms by default)
  max-parallelism: 16               # optional: Flink's maximum parallelism (128 by default)
  min-idle-state-retention: 0       # optional: table program's minimum idle state time
  max-idle-state-retention: 0       # optional: table program's maximum idle state time
  restart-strategy:                 # optional: restart strategy
    type: fallback                  #   "fallback" to global restart strategy by default

# Deployment properties allow for describing the cluster to which table programs are submitted to.

deployment:
  response-timeout: 5000

  flink-1.7.2 cat /tmp/input.csv
1,hello
2,world
3,hello world
1,ok
3,bye bye
4,yes

启动 SQL Client:

  flink-1.7.2 ./bin/sql-client.sh embedded -e /tmp/env.yaml
No default environment specified.
Searching for '/Users/baoniu/Documents/work/tool/flink/flink-1.7.2/conf/sql-client-defaults.yaml'...found.
Reading default environment from: file:/Users/baoniu/Documents/work/tool/flink/flink-1.7.2/conf/sql-client-defaults.yaml
Reading session environment from: file:/tmp/env.yaml
Validating current environment...done.

Flink SQL> show tables;
MyCustomView
MyTableSink
MyTableSource

Flink SQL> describe MyTableSource;
root
 |-- MyField1: Integer
 |-- MyField2: String

Flink SQL> describe MyCustomView;
root
 |-- MyField2: String

Flink SQL> create view MyView1 as select MyField1 from MyTableSource;
[INFO] View has been created.

Flink SQL> show tables;
MyCustomView
MyTableSource
MyView1

Flink SQL> describe MyView1;
root
 |-- MyField1: Integer

Flink SQL> select * from MyTableSource;

sql_env_file_1.jpg

sql_env_file_2.jpg

sql_env_file_3.jpg

使用 insert into 写入结果表:

Flink SQL> insert into MyTableSink select * from MyTableSource;
[INFO] Submitting SQL update statement to the cluster...
[INFO] Table update statement has been successfully submitted to the cluster:
Cluster ID: StandaloneClusterId
Job ID: 3fac2be1fd891e3e07595c684bb7b7a0
Web interface: http://localhost:8081

sql_insert_into_1.jpg

sql_insert_into_2.jpg

查询生成的结果数据文件:

  flink-1.7.2 cat /tmp/output.csv
1,hello
2,world
3,hello world
1,ok
3,bye bye
4,yes

也可以在 Environment 文件里面定义 UDF,在 SQL Client 里面通过 「HOW FUNCTIONS」查询和使用,这里就不再说明了。

SQL Client 功能社区还在开发中,详见 FLIP-24

3.4 Restful API

接下来我们演示如何通过 Rest API 来提交 Jar 包和执行任务。

更详细的操作请参考 Flink 的 Restful API 文档:https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/rest_api.html

  flink-1.7.2 curl http://127.0.0.1:8081/overview
{"taskmanagers":1,"slots-total":4,"slots-available":0,"jobs-running":3,"jobs-finished":0,"jobs-cancelled":0,"jobs-failed":0,"flink-version":"1.7.2","flink-commit":"ceba8af"}%

  flink-1.7.2 curl -X POST -H "Expect:" -F "jarfile=@/Users/baoniu/Documents/work/tool/flink/flink-1.7.2/examples/streaming/TopSpeedWindowing.jar" http://127.0.0.1:8081/jars/upload
{"filename":"/var/folders/2b/r6d49pcs23z43b8fqsyz885c0000gn/T/flink-web-124c4895-cf08-4eec-8e15-8263d347efc2/flink-web-upload/6077eca7-6db0-4570-a4d0-4c3e05a5dc59_TopSpeedWindowing.jar","status":"success"}%       
                                                                                                                                                                                                     flink-1.7.2 curl http://127.0.0.1:8081/jars
{"address":"http://localhost:8081","files":[{"id":"6077eca7-6db0-4570-a4d0-4c3e05a5dc59_TopSpeedWindowing.jar","name":"TopSpeedWindowing.jar","uploaded":1553743438000,"entry":[{"name":"org.apache.flink.streaming.examples.windowing.TopSpeedWindowing","description":null}]}]}%
                                                                                                                                         
  flink-1.7.2 curl http://127.0.0.1:8081/jars/6077eca7-6db0-4570-a4d0-4c3e05a5dc59_TopSpeedWindowing.jar/plan
{"plan":{"jid":"41029eb3feb9132619e454ec9b2a89fb","name":"CarTopSpeedWindowingExample","nodes":[{"id":"90bea66de1c231edf33913ecd54406c1","parallelism":1,"operator":"","operator_strategy":"","description":"Window(GlobalWindows(), DeltaTrigger, TimeEvictor, ComparableAggregator, PassThroughWindowFunction) -> Sink: Print to Std. Out","inputs":[{"num":0,"id":"cbc357ccb763df2852fee8c4fc7d55f2","ship_strategy":"HASH","exchange":"pipelined_bounded"}],"optimizer_properties":{}},{"id":"cbc357ccb763df2852fee8c4fc7d55f2","parallelism":1,"operator":"","operator_strategy":"","description":"Source: Custom Source -> Timestamps/Watermarks","optimizer_properties":{}}]}}%                                                                                                                                                      flink-1.7.2 curl -X POST http://127.0.0.1:8081/jars/6077eca7-6db0-4570-a4d0-4c3e05a5dc59_TopSpeedWindowing.jar/run
{"jobid":"04d80a24b076523d3dc5fbaa0ad5e1ad"}%

rest_api_1.jpg

rest_api_2.jpg

Restful API 还提供了很多监控和 Metrics 相关的功能,对于任务提交的操作也支持的比较全面。

3.5 Web

在 Flink Dashboard 页面左侧可以看到有个「Submit new Job」的地方,用户可以上传 Jar 包和显示执行计划和提交任务。Web 提交功能主要用于新手入门和演示用。

flink_web_1.jpg

4.结束

本期的课程到这里就结束了,我们主要讲解了 Flink 的 5 种任务提交的方式。熟练掌握各种任务提交方式,有利于提高我们日常的开发和运维效率。

视频回顾:https://zh.ververica.com/developers/flink-training-course2/

相关实践学习
基于Hologres+Flink搭建GitHub实时数据大屏
通过使用Flink、Hologres构建实时数仓,并通过Hologres对接BI分析工具(以DataV为例),实现海量数据实时分析.
实时计算 Flink 实战课程
如何使用实时计算 Flink 搞定数据处理难题?实时计算 Flink 极客训练营产品、技术专家齐上阵,从开源 Flink功能介绍到实时计算 Flink 优势详解,现场实操,5天即可上手! 欢迎开通实时计算 Flink 版: https://cn.aliyun.com/product/bigdata/sc Flink Forward Asia 介绍: Flink Forward 是由 Apache 官方授权,Apache Flink Community China 支持的会议,通过参会不仅可以了解到 Flink 社区的最新动态和发展计划,还可以了解到国内外一线大厂围绕 Flink 生态的生产实践经验,是 Flink 开发者和使用者不可错过的盛会。 去年经过品牌升级后的 Flink Forward Asia 吸引了超过2000人线下参与,一举成为国内最大的 Apache 顶级项目会议。结合2020年的特殊情况,Flink Forward Asia 2020 将在12月26日以线上峰会的形式与大家见面。
相关文章
|
1月前
|
人工智能 运维 Java
Flink Agents:基于Apache Flink的事件驱动AI智能体框架
本文基于Apache Flink PMC成员宋辛童在Community Over Code Asia 2025的演讲,深入解析Flink Agents项目的技术背景、架构设计与应用场景。该项目聚焦事件驱动型AI智能体,结合Flink的实时处理能力,推动AI在工业场景中的工程化落地,涵盖智能运维、直播分析等典型应用,展现其在AI发展第四层次——智能体AI中的重要意义。
431 27
Flink Agents:基于Apache Flink的事件驱动AI智能体框架
|
10月前
|
存储 人工智能 大数据
The Past, Present and Future of Apache Flink
本文整理自阿里云开源大数据负责人王峰(莫问)在 Flink Forward Asia 2024 上海站主论坛开场的分享,今年正值 Flink 开源项目诞生的第 10 周年,借此时机,王峰回顾了 Flink 在过去 10 年的发展历程以及 Flink社区当前最新的技术成果,最后展望下一个十年 Flink 路向何方。
697 33
The Past, Present and Future of Apache Flink
|
5月前
|
SQL 关系型数据库 MySQL
Flink CDC 3.4 发布, 优化高频 DDL 处理,支持 Batch 模式,新增 Iceberg 支持
Apache Flink CDC 3.4.0 版本正式发布!经过4个月的开发,此版本强化了对高频表结构变更的支持,新增 batch 执行模式和 Apache Iceberg Sink 连接器,可将数据库数据全增量实时写入 Iceberg 数据湖。51位贡献者完成了259次代码提交,优化了 MySQL、MongoDB 等连接器,并修复多个缺陷。未来 3.5 版本将聚焦脏数据处理、数据限流等能力及 AI 生态对接。欢迎下载体验并提出反馈!
922 1
Flink CDC 3.4 发布, 优化高频 DDL 处理,支持 Batch 模式,新增 Iceberg 支持
|
7月前
|
SQL 存储 人工智能
Apache Flink 2.0.0: 实时数据处理的新纪元
Apache Flink 2.0.0 正式发布!这是自 Flink 1.0 发布九年以来的首次重大更新,凝聚了社区两年的努力。此版本引入分离式状态管理、物化表、流批统一等创新功能,优化云原生环境下的资源利用与性能表现,并强化了对人工智能工作流的支持。同时,Flink 2.0 对 API 和配置进行了全面清理,移除了过时组件,为未来的发展奠定了坚实基础。感谢 165 位贡献者的辛勤付出,共同推动实时计算进入新纪元!
851 1
Apache Flink 2.0.0: 实时数据处理的新纪元
|
7月前
|
关系型数据库 MySQL 数据库
基于Flink CDC 开发,支持Web-UI的实时KingBase 连接器,三大模式无缝切换,效率翻倍!
TIS 是一款基于Web-UI的开源大数据集成工具,通过与人大金仓Kingbase的深度整合,提供高效、灵活的实时数据集成方案。它支持增量数据监听和实时写入,兼容MySQL、PostgreSQL和Oracle模式,无需编写复杂脚本,操作简单直观,特别适合非专业开发人员使用。TIS率先实现了Kingbase CDC连接器的整合,成为业界首个开箱即用的Kingbase CDC数据同步解决方案,助力企业数字化转型。
1303 5
基于Flink CDC 开发,支持Web-UI的实时KingBase 连接器,三大模式无缝切换,效率翻倍!
|
10月前
|
存储 SQL 人工智能
Apache Flink 2.0:Streaming into the Future
本文整理自阿里云智能高级技术专家宋辛童、资深技术专家梅源和高级技术专家李麟在 Flink Forward Asia 2024 主会场的分享。三位专家详细介绍了 Flink 2.0 的四大技术方向:Streaming、Stream-Batch Unification、Streaming Lakehouse 和 AI。主要内容包括 Flink 2.0 的存算分离云原生化、流批一体的 Materialized Table、Flink 与 Paimon 的深度集成,以及 Flink 在 AI 领域的应用。
1305 13
Apache Flink 2.0:Streaming into the Future
|
7月前
|
存储 大数据 数据处理
您有一份 Apache Flink 社区年度报告请查收~
您有一份 Apache Flink 社区年度报告请查收~
|
SQL 消息中间件 分布式计算
《Apache Flink 案例集(2022版)》——5.数字化转型——移动云Apache Flink 在移动云实时计算的实践(上)
《Apache Flink 案例集(2022版)》——5.数字化转型——移动云Apache Flink 在移动云实时计算的实践(上)
368 0
|
数据采集 分布式计算 Kubernetes
《Apache Flink 案例集(2022版)》——5.数字化转型——移动云Apache Flink 在移动云实时计算的实践(下)
《Apache Flink 案例集(2022版)》——5.数字化转型——移动云Apache Flink 在移动云实时计算的实践(下)
380 0

相关产品

  • 实时计算 Flink版
  • 推荐镜像

    更多