客户端操作(一)| 学习笔记

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
日志服务 SLS,月写入数据量 50GB 1个月
简介: 快速学习客户端操作。

开发者学堂课程【Apache Flink 入门到实战 - Flink 开源社区出品 客户端操作(一)】学习笔记,与课程紧密联系,让用户快速学习知识。

课程地址:https://developer.aliyun.com/learning/course/632/detail/10035


客户端操作

 

目录

一、 环境说明

二、 课程概要

(1) Scala Shell

(2)SQL Client

(3) CommendLine

(4) Restful

(5) Web

三、flink 命令行

四、基本用法

五、结果展示

 

一、环境说明

本期课程基于社区 fink-172 版本运行环境的准备和集群(Slandalone,yarn)的部参考前面的课程本文所有操作的演示都是基于 Mac/ linux 系统和  Chrome 浏览器

 

二、课程概要

Flink 提供了丰富的客户端操作来提交任务和与任务进行交互。本期课程将分为5个部分进行讲解包括 Fink 行, Scala Shell SQL Client Restful API Web

在 fink 安装目录的 bin 目录下面可以看到有link,start-scala-shell-- sh和s -clientsh 等文件,这些都是客户端操作的入口。

 

三、 Flink 命令行

flink 命行参数多,输入 flink-h 能看到完整的说明

+fiink-1.7. bin/fink-h

如果想看某一个命的参数,比如 run 命令,输入:

fink-1.7.2 bin/ run-h

(1)standalone

启动 standalone 集群

flink-1.7 bin/start-cluster .sh

Starting cluster

Starting standalonesession daemon on host zkb-MBP.local

Starting taskexecutor daemon on host zkb-MBP.local.

打开 tt127.0.0.1:8081 能看到 Web 界面。

Run

运行任务

4flink-1..2 bin/fink run-d examples/streaming/TopSpeedWindowing jar

Starting execution of program

Executing TopSpeedWindowing example with default input data set

Use -input to specily file input.

Printing result to stdout. Use-output to specify output path.

Job has been submitted with JoblDe20cb6b357591172eeade

默认是1个开发。

使用官方自带的例子。任务启动后,可以在页面上看到已经启动完成。在任务里面有两个 winters。

点左侧『 Task Manager,然后点 Stdout 能看到输出日志。

在 log 下面点out文件,在这里也可以看到程序的输出。

list 命令,当前一个任务正在运行。

查看任务列表

flink-1.7.2 bin/fink list-m 127.0.0.1:8081

Waiting for response...

------Running/Restarting Jobs ---------

24.03201910:14:06:5e20cb6b0f575911712eeao9de:

------------------------------

CarTopSpeedWindowingExample (RUNNING)

No scheduled jobs.

(2)stop

停止任务

在 stop 任务的时候发现有一个错误,这个任务不能被停止。

一个 job 能够 stop 要求所有的  sourco 都是可以  stoppatle 的,即实现了StoppableFunchon 接口。

需要能 stoppable 的函数必须实现这个接口例如流式任务的 source

stop 方法在任务收到 STOP 信号的时候调用。

source 在接收到这个信号后,必须停止发送新的数据且优雅的停止。

(3)PublicEvolving

public interfacs StoppableFunction

停止 source,与 cance)不同的是:这是一个 source 优停止的请求。

等待中的数据可以继续发送出去,不需要立即停止。

void stop

)

cancel

取消任务如果在 con/ink-cont.yaml 里面配 state savepointsdir会保存 savepoi nt否则不会保存 savepoint

flink-1.7.2 bin/fink cancel -m 127.0.0.1:8081 5e20cb6b0r357591171dfcca2eea09de

Cancelling job 5e20cb6b0f357591171dfcca2eea09de.

Cancelled job 5e20cb6b0f357591171dfcca2eea09de

也可以在停止的时候显示指定 savepoint 目录

可以看到这个任务已经停掉了。把任务再启动,可以看到现在任务在启动,准备一个等级目录,目录已经存在,可以把数据清空,调一下 cancel,可以指定savepoint,已经有数据在里面。

取消和停止(流作业)的区别如下:

cancel 调用立即调用作业算子的 cancel 方法,以快取消它们,如果算子在到 cancel 调用后没有停止 flink 将开始定期中断算子线程的执行,到所有算子停止为止。

stop 调用,是更优雅的停止正在运行流作业的方式。 stop仅适用于 source实现了StoppableFunction 接口的作业。当用户请求停止作业时,作业的所有 sourca都将接收 stop 方法调用。直到所有  source 正常关闭时,作业才会正常结束。这种方式使作业正常处理完所有作业。

(4) savepoint

触发 savepoint

flink-1.7.2 bin/ savepoint-m 127.0.0.1:8081 ec53edcfaeb96b2a5dadbfbe5f52btb

说明: savepoint 和 checkpoint 的区别(详见文档):

checkpoint是 增量做的每次的时间较短,数据量较小,只要在程序里面后用后会自动触发,用户无须感知; savepoint 是全量做的,每次的时间较长,数据量较大需要用户主动去触发。

checkpoint 是作业  failover 的时候自动使用,不需要用户指定 savepoint 一般用干程序的版本更新(详见文档),bug 修复,A/B Test 等场景,需要用户指定。

从指定的 savepoint 启动

先把任务停掉,指定 savepoint,任务启动,看一下日志文件。这个任务在启动的时候可以看到它在恢复。

查看 JobManager 的日志,能够看到类似这样的 log:

2019-03-2810:30:53,957INFo

org apache.fink. runtime. checkpoint CheckpointC-Starting j

(5)modify

修改任务并行度。

为了方便演示我们修改 conflink-c-conf.yal 将 task slot 数调为

4.并置 savepoint 目录

(modify 参数后面接 -s 指定 savepoint 路径可能有 bug.提示无法识别)

taskmanager. numberOfTaskSlots: 4

State. savepoint. dir file:// tmp /savepoint

修改参数后需要重后集群生效,然后再后动任务

从页面上能看到 Task Slots 变为了4,这时候任务的默认并发度是1。

通过 modify 命并发度修改为4和3.可以看到每次 modily 命都会触发一次 savepoint

查看 JobManager 的日志,可以看到:

2019-03-2810:33:11.179INFo

(6)info

拷贝输出的 json 内容,粘贴到这个网站 ison : http://flink.ap org/visualizer

可以和实际运行的物理执行计划对比:

(7) yarn per-job

单任务 attach 模式

默认是 attach 模式即客户端会一等待值到程序结束才会退出

通过 myam- -cluster 指定 yam 模式

客户端能看到结果输出

yam 上显示 Flink session cluster这个 batch 任务行完会 FINISHED

[adminz17.sqa. zth /home/admin/llink/flink-1.7.2)

Secho SHADOOP CONF_DIR

/etc/hadoop/cont/

[adminez17. sqa. zth /home/admin/ink/llink-1.7.2)

s /bin/ run -m yarn-cluster Jexamplesbatch/WordCount in

单任务 detached

由于是 detached 模式,客户端提交完任务就退出了

yarn 上显示为 Flink per-job- cluster

S. /bin/flink run-yd-m

(8)启动 session

/bin/yam-session.sh -tm 2048-s 3

表示动一个  yarn session 集群,每个 TM 的内存是2G每个 TM 有3个 slot(-n参数不生效)

客户端默认是 attach 模式,不会退出

a.可以 ctr+c 退出,然后再通过 bin/yar--sessionsh-id

application1532332183347_0708连上来。

b.或者启动的时候用 d 则为 detached 模式

yam 上显示为 Flink session cluster

/tmp下生成了一个文件

提交任务

/bin/flink run Jexamples/batch/WordCount.jar

将会根据 /tmp/yam -properties-admin--文件内容提交到了刚动的 session

运行结束后 TM 的资源会释放。

提交到指定的 session

通过 yid 参数来提交到指定的 session

blink 的 session 与 fink 的  session 的区别:

flink session-n 参数不生效,而且不会提前启动 TM

blink 的  session 可以通过-n指定启动多少个 TM 而且 TM 会提前起来

(9) Scala Shell

官方文档 :

https: llci.apache  org/projects/flink/flink-./scala shell. html.org/projectsfikfin-docs-rease-.7oosscalashell.html

Deploy 

任务运行说明

a.Batch 任务内置了 benv 变量,通 过 prit 结果输出到控制台

b. Streaming 任务内置了 senv 变量,通过 env v.executeob name )来提交任务且 Datastream的输出只有在 local 模式下打印到控制台。

remote

先启动一个 yam session cluster

. /bin/yamn-session.sh -Im 2048-s 3

(10) yarn

按 CTRL+C 退出后这个 flink cluster 还会继续运行不会退出

(11) Execute

Dataset

flink--1.7.2bin/stop-cluster-.sh

No taskexecutor daemon to stop on host zkb-MBP.local.

(that,)

(the,1)

to,2)

对 DataSet 任务来说  print() 会触发任务的执行。

也可以将结果输出到文件(先删除 /tmp/ou1, 不然会报错同名文件已经存在),继续执行以

下命令:

scala> counts writeAsText("/mp/out1")

res1: org. apache. flink. api.java.operatorsDataSink[(String. Int)] DataSink'<unnameds'

ITextOutputFormat (/mp/out1)-UTF-8)

scala> benv.execute("batch test")

res2: org apache flink. api. common.JobExecutionResult=

org-apache.fink.api. common.JobExecutionRes@737652a9

查看/tmp/out1文件就能看到输出结果。

4flink-1.7.2 cat /tmp/out1

be,2)

(is,1)

(not,)

(or1)

(question,1)

(that, 1)

(the,1)

(to,2)

(12)DataStream

(13)TableAPI

在 Blink 开源版本里面,支持了 TableAPI 方式提交任务(可以用  btenv sqlQuery提交 sq 查询)

社区版本1.8会支持  tableapi: https: /ssuesssues

radira/browse/FLINK-9555

SQL Client Beta

SOL Client 目前还只是测试版处干开发阶段只能用于 SQL 的原型验证不能在生产环境使用。

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
存储 SQL NoSQL
事务功能使用及原理介绍(二)|学习笔记
快速学习事务功能使用及原理介绍
197 0
事务功能使用及原理介绍(二)|学习笔记
|
SQL 存储 NoSQL
事务功能使用及原理介绍(一)|学习笔记
快速学习事务功能使用及原理介绍
312 0
事务功能使用及原理介绍(一)|学习笔记
|
存储 缓存 监控
ChangeStreams 使用及原理(二)|学习笔记
快速学习 ChangeStreams 使用及原理
661 0
ChangeStreams 使用及原理(二)|学习笔记
|
SQL 运维 监控
ChangeStreams 使用及原理(一)|学习笔记
快速学习 ChangeStreams 使用及原理
424 0
ChangeStreams 使用及原理(一)|学习笔记
|
Web App开发 缓存 负载均衡
缓存基础(一)|学习笔记
快速学习 缓存基础(一)
108 0
缓存基础(一)|学习笔记
|
存储 缓存 边缘计算
缓存基础(二)|学习笔记
快速学习 缓存基础(二)
123 0
缓存基础(二)|学习笔记
|
Kubernetes 监控 固态存储
阿里云 K8s 环境创建(下)|学习笔记
快速学习阿里云 K8s 环境创建(下)
阿里云 K8s 环境创建(下)|学习笔记
|
SQL Kubernetes 关系型数据库
阿里云 K8s 环境创建(上)|学习笔记
快速学习阿里云 K8s 环境创建(上)
阿里云 K8s 环境创建(上)|学习笔记
|
NoSQL Java 数据库
注册功能(接口)| 学习笔记
快速学习 注册功能(接口)
400 0
注册功能(接口)| 学习笔记
|
编译器 Go 开发者
包使用注意事项和细节(1)|学习笔记
快速学习包使用注意事项和细节(1)
包使用注意事项和细节(1)|学习笔记