Standalone集群的部署方式
一、安装Flink:首先,需要下载并安装flink,可以从官方网站下载预编译的二进制文件,解压到指定的目录。
二、配置flink,进入到flink的安装目录,修改conf/flink-conf.yaml配置文件,主要配置项包括jobManager.rpc.address和taskManager.numberOfTaskSlots等。
rest.port: 18081 # The address to which the REST client will connect to # rest.address: cdh1 # Port range for the REST and web server to bind to. # #rest.bind-port: 8080-8090 # The address that the REST & web server binds to # By default, this is localhost, which prevents the REST & web server from # being able to communicate outside of the machine/container it is running on. # # To enable this, set the bind address to one that has access to outsidefacing # network interface, such as 0.0.0.0. # rest.bind-address: 0.0.0.0
三、启动JobManager:打开终端,进入flink安装目录,执行以下命令启动JobManager:
root@cdh1:/home/flink/flink-1.16.3# ./bin/start-cluster.shStarting cluster.Starting standalonesession daemon on host cdh1.Starting taskexecutor daemon on host cdh1.
四、启动TaskManager:打开终端,进入flink安装目录,执行以下命令启动TaskManager:
root@cdh1:/home/flink/flink-1.16.3# ./bin/taskmanager.sh start [INFO] 1 instance(s) of taskexecutor are already running on cdh1. Starting taskexecutor daemon on host cdh1.
五、提交作业:使用Flink客户端工具提交作业,可以使用以下命令提交jar文件中的作业:
echo -e "hello world\nhello flink" > input.txt
运行:
./bin/flink run examples/batch/WordCount.jar --input input.txt --output output.txt
或者在dashboard提交作业:
六、停止集群:可以使用以下命令停止整个Standalone集群
./bin/stop-cluster.sh
总之,Standalone集群是一个简单且易于部署的flink集群模式,适用于开发,测试和小规模应用场景,然而,由于资源共享和不支持高可用性的特点,不适合部署在生产环境中。
Apache Flink的exampls目录包含了一些示例程序,这些程序展示了Flink不同功能和API的用法。以下是exampls目录下各个子目录的介绍
root@cdh1:/home/flink/flink-1.16.3/examples# lsbatch gelly python streaming table
①、batch
batch目录包含了批处理作业的示例,这些示例展示了如何使用Flink 的DataSet API来处理静态数据集。批处理示例包含了经典的WordCount示例,KMeans聚类示例等。
示例:①、WordCount.jar:经典的WordCount示例,统计输入文本中每个单词的出现次数。②、KMeans.jar:一个实现KMeans聚类算法的示例。
②、gelly
gelly目录包含了Flink Gelly图处理库的示例,Gelly提供了丰富的API用于图数据的处理和分析,如图的创建,转换和算法应用等。
示例:Gelly.jar:含有PageRank,Connected Components等图算法的示例。
③、Python
python目录包含了使用PyFlink的示例,PyFlink是Flink的PythonAPI,允许用户使用Python编写Flink的作业。这些示例展示了如何使用PyFlink进行数据处理和分析。
示例:word_count.py:使用PyFlink实现的WordCount示例。
④、streaming
streaming目录包含了流处理作业的示例,这些示例展示了如何使用Flink的DataStream API来处理实时数据流,流处理示例包含了实时的WordCount示例,SocketTextStream示例等。
示例:wordCount.jar:经典的流式WordCount示例,从实时数据流中统计每个单词出现的次数。SocketTextStreamWordCount.jar:从socket流中读取数据并进行WordCount统计。
⑤、Table
table目录包含了使用Flink Table API和SQL的示例,这些示例展示了如何使用Flink的高级API 来进行关系型数据处理,既可以用于批处理也可以用于流处理。
示例:TableWordCount.jar:使用Table API实现的WordCount示例。SQLExample.jar:使用SQL查询实现的示例。
示例的运行方式
可以通过以下命令运行这些示例(以wordCount.jar为例)
# 启动 Flink 集群./bin/start-cluster.sh WordCount.jar 为例):# 运行批处理示例./bin/flink run examples/batch/WordCount.jar --input input.txt --output output.txt # 运行流处理示例./bin/flink run examples/streaming/WordCount.jar --input input.txt --output output.txt # 运行 Table API 示例./bin/flink run examples/table/TableWordCount.jar --input input.txt --output output.txt# 运行 Gelly 图处理示例./bin/flink run examples/gelly/PageRank.jar --input input.txt --output output.txt # 运行 PyFlink 示例./bin/flink run -py examples/python/word_count.py --input input.txt --output output.txt