Flink - YARN集群模式使用详解(安装Flink客户端提交任务)
在实际工作中,当 Flink 代码开发完成并在本地调试完毕后,需要将其打包提交到集群中执行。前文我介绍了如何将任务提交到 Standalone 模式的集群上(点击查看)。在而生产环境下通常采用 on yarn 这种模式比较多,因为这样可以综合利用集群资源。此时就需要安装一个 Flink 客户端用于向集群提交任务。本文以 Flink ON YARN 模式为例,演示 Flink 客户端节点的安装,以及如何提交任务。
一、安装 Flink 客户端
1,准备工作
既然是 ON YARN 模式,那首先就需要有 Hadoop 环境,具体安装方法可以参考我之前写的文章:
2,下载 Flink 安装包
注意:
- 针对 Flink ON YARN 模式,Flink 客户端需要和 Hadoop 客户端安装在一起,即在 Flink 客户端节点上需要有 Hadoop 的相关环境,这样 Flink 才能找到 YARN 集群。
- 这里我下载的是 1.14.6 版本,也是最后一个支持 JDK1.8 的版本,后续版本最低要求 JDK 11。
wget https://archive.apache.org/dist/flink/flink-1.14.6/flink-1.14.6-bin-scala_2.12.tgz
(2)接着执行如下命令解压下载下来的压缩:
tar -zxvf flink-1.14.6-bin-scala_2.12.tgz
(3)最后将解压出来的文件夹移动到合适的位置,这个可以根据个人习惯修改:
mv flink-1.14.6 /usr/local/flink
3,修改环境变量
(1)首先执行如下命令编辑系统 profile 文件:
vi /etc/profile
(2)查看 profile 文件中是否配置了 HADOOP_HOME 和 HADOOP_CLASSPATH 这两个环境变量,如果没有的话需要加上,否则 Flink 无法识别 Hadoop 中的一些依赖。
export HADOOP_HOME=/usr/local/hadoop export HADOOP_CLASSPATH=`${HADOOP_HOME}/bin/hadoop classpath` export PATH=$HADOOP_HOME/sbin:$HADOOP_HOME/bin:$PATH
(3)然后执行如下命令让配置生效:
source /etc/profile
二、任务打包
1,创建任务应用
(1)这里我们创建一个简单的流处理任务,使用 Flink 实时接收 Socket 数据,对指定时间窗口内的单词数据进行聚合统计,并且把时间窗口内计算的结果打印出来。具体代码参考我之前写的文章:
(2)当然 Flink 安装包内已经包含一些示例 Jar 包,位于 examples 目录下。比如我们直接选择里面的 SocketWindowWordCount.jar 进行测试也是可以的,那么就直接跳过下面打包步骤,进入任务提交环境。

2,对 Flink 任务打 Jar 包
(1)在 IDEA 中调试通过之后,就可以将代码提交到生产环境的集群中运行了。首先修改 pom.xml 文件中依赖的作用范围:
注意:此处在 flink 相关依赖中增加 scope 属性,值为 provided,表示只在编译时使用该依赖,在执行及打包时都不使用。因为 flink 依赖在 Storm 集群中已经存在了,所以在打 Jar 包时就不需要将其打包进去了。如果我们使用了集群中没有的第三方依赖包,则需要将其打进 Jar 包里。
<!-- Flink 依赖 --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java_2.12</artifactId> <version>1.14.0</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-scala_2.12</artifactId> <version>1.14.0</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients_2.12</artifactId> <version>1.14.0</version> <scope>provided</scope> </dependency>
(2)接着在 pom.xml 文件中添加 Maven 的编译打包插件配置:
<build> <plugins> <!-- 编译插件 --> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.6.0</version> <configuration> <source>1.8</source> <target>1.8</target> <encoding>UTF-8</encoding> </configuration> </plugin> <!-- scala 编译插件 --> <plugin> <groupId>net.alchim31.maven</groupId> <artifactId>scala-maven-plugin</artifactId> <version>3.1.6</version> <configuration> <scalaCompatVersion>2.12</scalaCompatVersion> <scalaVersion>2.12.11</scalaVersion> <encoding>UTF-8</encoding> </configuration> <executions> <execution> <id>compile-scala</id> <phase>compile</phase> <goals> <goal>add-source</goal> <goal>compile</goal> </goals> </execution> <execution> <id>test-compile-scala</id> <phase>test-compile</phase> <goals> <goal>add-source</goal> <goal>testCompile</goal> </goals> </execution> </executions> </plugin> <!-- 打 Jar 包的插件(包含所有依赖) --> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-assembly-plugin</artifactId> <version>2.6</version> <configuration> <descriptorRefs> <descriptorRef>jar-with-dependencies</descriptorRef> </descriptorRefs> <archive> <manifest> <!-- 可以设置 Jar 包的入口类(可选) --> <mainClass></mainClass> </manifest> </archive> </configuration> <executions> <execution> <id>make-assembly</id> <phase>package</phase> <goals> <goal>single</goal> </goals> </execution> </executions> </plugin> </plugins> </build>
(3)然后执行打 Jar 包的操作:

(4)打包完毕后,在在项目的 target 目录下看到生成的 XXX-jar-with-dependencies.jar 文件,这个就是我们需要的 jar 包。

三、任务的提交、查看、停止
1,提交任务
(1)将打包后的 jar 包上传到服务器,由于我的这个任务是读取 socket 数据,在提交任务之前,应先开启 Socket。nc -l 9999
(2)然后执行如下命令提交任务:
cd /usr/local/flink bin/flink run -m yarn-cluster -c SocketWindowWordCountJava -yjm 1024 -ytm 1024 MyTask-1.0-SNAPSHOT-jar-with-dependencies.jar
- 我们也可以将命令放在后台执行,这样不受终端关闭的影响:
nohup bin/flink run -m yarn-cluster -c SocketWindowWordCountJava -yjm 1024 -ytm 1024 MyTask-1.0-SNAPSHOT-jar-with-dependencies.jar > output.log 2>&1 &
(3)如果我们想要使用 flink 自带的 SocketWindowWordCount.jar,则可以执行如下命令提交:
bin/flink run -m yarn-cluster -yjm 1024 -ytm 1024 ./examples/streaming/SocketWindowWordCount.jar --hostname 172.16.8.51 --port 9999
2,查看任务
(1)此时到 YARN 的 Web 界面上查看,可以看到确实新增了一个任务:


(2)点击这个任务最后面的链接进去:

(3)可以看到 Flink 的任务界面:

(4)我们通过前面开启的 Socket 输入一串测试内容:
hello hangge hello world
(5)要查看 Flink 任务通过 print() 输出的结果,则需要到 Flink 的日志界面中查看:




3,停止任务
(1)我们可以使用 YARN 的命令通过指定程序的 Application ID 来停止 Flink 任务:
yarn application -kill application_1705825903812_0001
(2)我们也使用 Flink 的命令通过指定任务的 JobID 来停止 Flink 任务:
bin/flink cancel 8990c7bc59987803272b149594886df2

(3)还可以在 Flink 的任务界面中停止:
