当前位置: > > > Flink - YARN集群模式使用详解(安装Flink客户端提交任务)

Flink - YARN集群模式使用详解(安装Flink客户端提交任务)

    在实际工作中,当 Flink 代码开发完成并在本地调试完毕后,需要将其打包提交到集群中执行。前文我介绍了如何将任务提交到 Standalone 模式的集群上(点击查看)。在而生产环境下通常采用 on yarn 这种模式比较多,因为这样可以综合利用集群资源。此时就需要安装一个 Flink 客户端用于向集群提交任务。本文以 Flink ON YARN 模式为例,演示 Flink 客户端节点的安装,以及如何提交任务。

一、安装 Flink 客户端

1,准备工作

既然是 ON YARN 模式,那首先就需要有 Hadoop 环境,具体安装方法可以参考我之前写的文章:
 

2,下载 Flink 安装包

(1)我们访问 Flink 的官网获取需要的版本安装包(点击访问),然后将其下载到 Hadoop 客户端服务器上:
注意:
  • 针对 Flink ON YARN 模式,Flink 客户端需要和 Hadoop 客户端安装在一起,即在 Flink 客户端节点上需要有 Hadoop 的相关环境,这样 Flink 才能找到 YARN 集群。
  • 这里我下载的是 1.14.6 版本,也是最后一个支持 JDK1.8 的版本,后续版本最低要求 JDK 11
wget https://archive.apache.org/dist/flink/flink-1.14.6/flink-1.14.6-bin-scala_2.12.tgz

(2)接着执行如下命令解压下载下来的压缩:
tar -zxvf flink-1.14.6-bin-scala_2.12.tgz

(3)最后将解压出来的文件夹移动到合适的位置,这个可以根据个人习惯修改:
mv flink-1.14.6 /usr/local/flink

3,修改环境变量

(1)首先执行如下命令编辑系统 profile 文件:
vi /etc/profile

(2)查看 profile 文件中是否配置了 HADOOP_HOMEHADOOP_CLASSPATH 这两个环境变量,如果没有的话需要加上,否则 Flink 无法识别 Hadoop 中的一些依赖。
export HADOOP_HOME=/usr/local/hadoop
export HADOOP_CLASSPATH=`${HADOOP_HOME}/bin/hadoop classpath`
export PATH=$HADOOP_HOME/sbin:$HADOOP_HOME/bin:$PATH

(3)然后执行如下命令让配置生效:
source /etc/profile

二、任务打包

1,创建任务应用

(1)这里我们创建一个简单的流处理任务,使用 Flink 实时接收 Socket 数据,对指定时间窗口内的单词数据进行聚合统计,并且把时间窗口内计算的结果打印出来。具体代码参考我之前写的文章:

(2)当然 Flink 安装包内已经包含一些示例 Jar 包,位于 examples 目录下。比如我们直接选择里面的 SocketWindowWordCount.jar 进行测试也是可以的,那么就直接跳过下面打包步骤,进入任务提交环境。

2,对 Flink 任务打 Jar 包

(1)在 IDEA 中调试通过之后,就可以将代码提交到生产环境的集群中运行了。首先修改 pom.xml 文件中依赖的作用范围:
注意:此处在 flink 相关依赖中增加 scope 属性,值为 provided,表示只在编译时使用该依赖,在执行及打包时都不使用。因为 flink 依赖在 Storm 集群中已经存在了,所以在打 Jar 包时就不需要将其打包进去了。如果我们使用了集群中没有的第三方依赖包,则需要将其打进 Jar 包里。
<!-- Flink 依赖 -->
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-streaming-java_2.12</artifactId>
    <version>1.14.0</version>
    <scope>provided</scope>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-streaming-scala_2.12</artifactId>
    <version>1.14.0</version>
    <scope>provided</scope>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-clients_2.12</artifactId>
    <version>1.14.0</version>
    <scope>provided</scope>
</dependency>

(2)接着在 pom.xml 文件中添加 Maven 的编译打包插件配置:
<build>
    <plugins>
        <!-- 编译插件 -->
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-compiler-plugin</artifactId>
            <version>3.6.0</version>
            <configuration>
                <source>1.8</source>
                <target>1.8</target>
                <encoding>UTF-8</encoding>
            </configuration>
        </plugin>
        <!-- scala 编译插件 -->
        <plugin>
            <groupId>net.alchim31.maven</groupId>
            <artifactId>scala-maven-plugin</artifactId>
            <version>3.1.6</version>
            <configuration>
                <scalaCompatVersion>2.12</scalaCompatVersion>
                <scalaVersion>2.12.11</scalaVersion>
                <encoding>UTF-8</encoding>
            </configuration>
            <executions>
                <execution>
                    <id>compile-scala</id>
                    <phase>compile</phase>
                    <goals>
                        <goal>add-source</goal>
                        <goal>compile</goal>
                    </goals>
                </execution>
                <execution>
                    <id>test-compile-scala</id>
                    <phase>test-compile</phase>
                    <goals>
                        <goal>add-source</goal>
                        <goal>testCompile</goal>
                    </goals>
                </execution>
            </executions>
        </plugin>
        <!-- 打 Jar 包的插件(包含所有依赖) -->
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-assembly-plugin</artifactId>
            <version>2.6</version>
            <configuration>
                <descriptorRefs>
                    <descriptorRef>jar-with-dependencies</descriptorRef>
                </descriptorRefs>
                <archive>
                    <manifest>
                        <!-- 可以设置 Jar 包的入口类(可选) -->
                        <mainClass></mainClass>
                    </manifest>
                </archive>
            </configuration>
            <executions>
                <execution>
                    <id>make-assembly</id>
                    <phase>package</phase>
                    <goals>
                        <goal>single</goal>
                    </goals>
                </execution>
            </executions>
        </plugin>
    </plugins>
</build>

(3)然后执行打 Jar 包的操作:

(4)打包完毕后,在在项目的 target 目录下看到生成的 XXX-jar-with-dependencies.jar 文件,这个就是我们需要的 jar 包。

三、任务的提交、查看、停止

1,提交任务

(1)将打包后的 jar 包上传到服务器,由于我的这个任务是读取 socket 数据,在提交任务之前,应先开启 Socket
nc -l 9999

(2)然后执行如下命令提交任务:
cd /usr/local/flink
bin/flink run -m yarn-cluster -c SocketWindowWordCountJava -yjm 1024 -ytm 1024 MyTask-1.0-SNAPSHOT-jar-with-dependencies.jar
  • 我们也可以将命令放在后台执行,这样不受终端关闭的影响:
nohup bin/flink run -m yarn-cluster -c SocketWindowWordCountJava -yjm 1024 -ytm 1024 MyTask-1.0-SNAPSHOT-jar-with-dependencies.jar > output.log 2>&1 &

(3)如果我们想要使用 flink 自带的 SocketWindowWordCount.jar,则可以执行如下命令提交:
bin/flink run -m yarn-cluster  -yjm 1024 -ytm 1024 ./examples/streaming/SocketWindowWordCount.jar --hostname 172.16.8.51 --port 9999

2,查看任务

(1)此时到 YARNWeb 界面上查看,可以看到确实新增了一个任务:

(2)点击这个任务最后面的链接进去:

(3)可以看到 Flink 的任务界面:

(4)我们通过前面开启的 Socket 输入一串测试内容:
hello hangge hello world

(5)要查看 Flink 任务通过 print() 输出的结果,则需要到 Flink 的日志界面中查看:




3,停止任务

(1)我们可以使用 YARN 的命令通过指定程序的 Application ID 来停止 Flink 任务:
yarn application -kill application_1705825903812_0001

(2)我们也使用 Flink 的命令通过指定任务的 JobID 来停止 Flink 任务:
bin/flink cancel 8990c7bc59987803272b149594886df2

(3)还可以在 Flink 的任务界面中停止:
评论0