Flink on yarn

上一次我们实践了 Local 模式和Standalone 模式 :Flink多种集群模式部署实践
我们来实践一下推荐生产环境部署的 Flink on yarn模式

这篇贼辛苦,因为涉及到 hadoop和flink的部署,不过收获也颇多。

在实际开发中,使用Flink时,更多的使用方式是Flink On Yarn模式,原因如下:
Yarn的资源可以按需使用,提高集群的资源利用率
Yarn的任务有优先级,根据优先级运行作业
基于Yarn调度系统,能够自动化地处理各个角色的 Failover(容错)

  • JobManager 进程和 TaskManager 进程都由 Yarn NodeManager 监控
  • 如果 JobManager 进程异常退出,则 Yarn ResourceManager 会重新调度 JobManager 到其他机器
  • 如果 TaskManager 进程异常退出,JobManager 会收到消息并重新向 Yarn ResourceManager 申请资源,重新启动 TaskManage

Flink如何和Yarn进行交互?

sh7K78

  • 1.Client上传jar包和配置文件到HDFS集群上

  • 2.Client向Yarn ResourceManager提交任务并申请资源

  • 3.ResourceManager分配Container资源并启动ApplicationMaster,然后AppMaster加载Flink的Jar包和配置构建环境,启动JobManager

    JobManager和ApplicationMaster运行在同一个container上。一旦他们被成功启动,AppMaster就知道JobManager的地址(AM它自己所在的机器)。它就会为TaskManager生成一个新的Flink配置文件(他们就可以连接到JobManager)。这个配置文件也被上传到HDFS上。此外,AppMaster容器也提供了Flink的web服务接口。YARN所分配的所有端口都是临时端口,这允许用户并行执行多个Flink

  • 4.ApplicationMaster向ResourceManager申请工作资源,NodeManager加载Flink的Jar包和配置构建环境并启动TaskManager

  • 5.TaskManager启动后向JobManager发送心跳包,并等待JobManager向其分配任务

部署模式

首先明确下面两个术语:

  • Flink Application:Flink Application 是一个 Java 应用程序,它通过 main 方法(或其他方式)提交一个或多个 Flink Job。提交作业通常通过调用 execute 来完成。
  • Flink Job:是 Flink DAG 图在运行时的具体表现,一个 DAG 对应一个 Flink Job,一般是通过在 Flink Application 中调用 execute 来创建和提交的。

然后再来看 Flink 如何区分不同的部署模式:

  • 集群的生命周期和隔离性保证
  • application 的 main 方法在哪里执行

KpoPTd

Session Mode

多个job共享同一个集群<jobmanager/tastmanager>、job退出集群也不会退出,用户类的main方法再client端运行
适用:需要频繁提交小job,因为每次提交新job时候,不需要去向yarn注册应用

  1. Session模式是预分配资源的,也就是提前根据指定的资源参数初始化一个Flink集群,并常驻在YARN系统中,拥有固定数量的JobManager和TaskManager,该资源中JobManager有且只有一个。
  2. 提交到这个集群的作业可以直接运行,免去了每个提交作业都启动一个完整集群的资源开销,但是Session的资源总量有限,多个作业之间又不是隔离的,故可能会造成资源的争用,如果有一个作业因为异常导致TaskManager宕机,则它上面承载着的所有作业也都会受到影响。
  3. 另外,启动的作业越多,JobManager的负载也就越大。所以,Session模式一般用来部署那些对延迟非常敏感但运行时长较短的作业。

Per-Job Mode (1.15 deprecated).

每个job独享一个集群,job退出集群则退出,用户类的main方法再client端运行
适用:大job,运行时间很长,因为每起一个job,都要去向yarn申请容器启动jm,tm,比较耗时

  1. 为了提供更好的资源隔离保证,在Per-Job模式下,每个提交到YARN上的作业会各自形成单独的Flink集群,拥有专属的JobManager和TaskManager。
  2. 当作业完成时,分配给当前作业的集群将被销毁,所有缓存在集群中的资源(文件等)将被清除。作业之间的资源完全隔离,一个作业的TaskManager失败不会影响其他作业的运行。
  3. Per-Job模式一般用来部署那些长时间运行的作业。

每个job独享一个集群,job退出集群则退出,用户类的main方法再集群上运行

  1. application模式,在该模式下会为每个提交的应用创建一个集群,用户程序的 main 方法将在JobManager集群中而不是客户端运行。
  2. Application模式的会话集群,仅在特定应用程序的作业之间共享,并在应用程序完成时终止。
  3. 在这种体系结构中,Application 模式在不同应用之间提供了资源隔离和负载平衡保证。在特定一个应用程序上,JobManager 执行 main() 可以节省所需的 CPU 周期,还可以节省本地下载依赖项所需的带宽。

部署实践运行与测试

1
2
3
4
5
6
#HADOOP_HOME
export HADOOP_HOME=/opt/module/hadoop-3.1.3
export PATH=$PATH:$HADOOP_HOME/bin
export PATH=$PATH:$HADOOP_HOME/sbin
export HADOOP_CONF_DIR=${HADOOP_HOME}/etc/hadoop
export HADOOP_CLASSPATH=`hadoop classpath`

需要说明的是,Flink on yarn模式部署时,实际上不需要对Flink做任何修改配置,只需要将其解压传输到各个节点之上
如果要实现高可用的方案,这个时候就需要到Flink相应的配置修改参数,具体的配置文件是FLINK_HOME/conf/flink-conf.yaml。

standalone模式还是需要配置 Flink多种模式部署实践中的Standalone 模式
对于Flink on yarn模式,我们并不需要在conf配置下配置 masters和slaves。因为在指定TM的时候可以通过参数“-n”来标识需要启动几个TM;Flink on yarn启动后,如果是在分离式模式你会发现,在所有的节点只会出现一个 YarnSessionClusterEntrypoint进程;如果是客户端模式会出现2个进程一个YarnSessionClusterEntrypoint和一个FlinkYarnSessionCli进程。

Session Mode

Session Mode的运行主要有两个步骤。第一步是启动yarn session,开辟资源,第二步是flink run运行job。
用yarn session启动集群时,有两种方式可以使用,分别是客户端模式和分离模式

客户端模式启动yarn session

使用如下启动命令

1
./bin/yarn-session.sh -n 2 -jm 1024 -tm 1024 -s 2

r3hxgX

  • -n: TaskManager的数量,相当于executor的数量
  • -s: 每个JobManager的core的数量,executor-core,建议将slot的数量设置成每台机器处理器数量
  • -tm: 每个TaskManager的内存大小
  • -jm: JobManager的内存大小
  1. 上面的命令的意思是,同时向Yarn申请3个container,其中2个Container启动TaskManager(-n 2),每个TaskManager 拥有两个Task Slot(-s 2),并且向每个TaskManager的Container申请1024 M 的内存,以及一个ApplicationMaster(Job Manager)。

  2. 在客户端模式下,那作业提交后,资源的大小是由yarn的队列所决定的,多个job提交,资源的占用和竞争都是由yarn所控制。

  3. 客户端模式下,jps查看的进程有两个

    1
    2
    FlinkYarnSessionCli
    YarnSessionClusterEntrypoint

    wolnIQ

  4. hadoop application查看
    JsJP0d

  5. yarn命令行查看

    1
    2
    3
    4
    5
    [root@hadoop103 flink-1.15.2]# yarn application -list
    2022-09-28 17:13:04,195 INFO client.RMProxy: Connecting to ResourceManager at hadoop103/192.168.10.103:8032
    Total number of applications (application-types: [], states: [SUBMITTED, ACCEPTED, RUNNING] and tags: []):1
    Application-Id Application-Name Application-Type User Queue State Final-State Progress Tracking-URL
    application_1664355973790_0001 Flink session cluster Apache Flink root default RUNNING UNDEFINED 100% http://192.168.10.103:39719
  6. 从flink daskboard查看
    aF0TY7

分离模式启动yarn session

使用启动命令如下

1
./bin/yarn-session.sh -d -n 2 -jm 1024 -tm 1024 -s 2 

tE8rCO

  • -d: 指定分离模式
  1. 分离模式在启动的时候会在yarn中常驻一个进程,并且已经确定了之后提交的job的内存等资源的大小,比如8G内存,如果某一个job把8G内存全部占完了,只能是第一个job执行完成把资源释放了,第二个job才能继续执行。

  2. 分离模式下,jps查看进程有一个

    1
    YarnSessionClusterEntrypoint

    KKrpSE

  3. hadoop application查看
    B7txuj

  4. yarn命令行查看

    1
    2
    3
    4
    5
    [root@hadoop102 flink-1.15.2]# yarn application -list
    2022-09-28 17:28:38,451 INFO client.RMProxy: Connecting to ResourceManager at hadoop103/192.168.10.103:8032
    Total number of applications (application-types: [], states: [SUBMITTED, ACCEPTED, RUNNING] and tags: []):1
    Application-Id Application-Name Application-Type User Queue State Final-State Progress Tracking-URL
    application_1664357090524_0002 Flink session cluster Apache Flink root default RUNNING UNDEFINED 100% http://192.168.10.103:41395
  5. 从flink daskboard查看
    23S1K2

提交任务

1
2
./bin/flink run ./examples/batch/WordCount.jar
./bin/flink run ./examples/batch/WordCount.jar --input /opt/tmp/words --output /opt/tmp/test5.txt

Flink 作业提交后,若要在Flink WebUI上查看作业的详细可以通过如下操作

  • 方式1:从终端查看链接
    h6kefu
  • 方式2:从hadoop查看
    点击yarn集群管理器中启动的application_ID,进入如下页面,点击红色方框
    0UT1hK

rphVSR

eAJvh2

yarn session资源释放

1
yarn application -kill application_1664357090524_0002

kEWUT4

启动完整参数

1
./bin/yarn-session.sh 参数
  • -n: 指定TaskManager的数量;
  • -d: 以分离模式运行;
  • -id: 指定yarn的任务ID;
  • -j: Flink jar文件的路径;
  • -jm: JobManager容器的内存(默认值:MB);
  • -nl: 为YARN应用程序指定YARN节点标签;
  • -nm: 在YARN上为应用程序设置自定义名称;
  • -q: 显示可用的YARN资源(内存,内核);
  • -qu: 指定YARN队列;
  • -s: 指定TaskManager中slot的数量;
  • -st: 以流模式启动Flink;
  • -tm: 每个TaskManager容器的内存(默认值:MB);
  • -z: 命名空间,用于为高可用性模式创建Zookeeper子路径;

Per-Job Mode 单作业模式部署

启动

为了方便查看测试过程,我们使用flink包中的SocketWindowWordCount项目,我们需要提前启动socket服务及端口。
1、启动socket服务

1
nc -lk 9002

2、启动Per-Job命令并启动任务

1
./bin/flink run -m yarn-cluster -yjm 1024 -ytm 1024 examples/streaming/SocketWindowWordCount.jar --hostname hadoop102 --port 9002

官方:

1
./bin/flink run -t yarn-per-job  -yjm 1024 -ytm 1024 ./examples/streaming/SocketWindowWordCount.jar --hostname hadoop102 --port 9002
  • -t: 表示启动方式(–target),可取yarn-per-job、yarn-session、run-application等
  • -yjm: JobManager内存
  • -ytm: TaskManager内存大小申请

启动日志
m0T8UX
JPS查看
IBuAys
yarn集群查看
y4cGPP
从上图中我们可以看出,yarn cluster上管理了一个ID为application_1664357090524_0003、name为Flink per-job cluster的资源环境。鼠标点击ID,可计入详细信息界面。
dBLRnB
点击上图中的Tracking URL:ApplicationMaster,可以打开Flink web ui管理界面。
XpuUYQ

测试数据

dX69uU

OtTtLH

当前示例是一个实时流处理,只要socket服务和端口一直处于开启状态,Flink作业就会一直处于运行状态。我们在前面介绍中说过,Per-Job模式的作业,执行完成就会释放yarn资源,停掉socket作业和端口服务,当前yarn所管理的flink 作业和资源就会得到释放。

在socket服务界面 CTR+C 结束服务, Flink作业日志界面会打印如下信息
wQfSaZ

6Jc0Dk

Yarn集群
ATVVuX

Application Mode 应用模式部署

1
2
3
4
5
6
7
8
bin/flink run-application -t yarn-application -p 3\
-Dparallelism.default=3 \
-Djobmanager.memory.process.size=2048m \
-Dtaskmanager.memory.process.size=4096m \
-Dtaskmanager.numberOfTaskSlots=2 \
-Dparallelism.default=10 \
-Dyarn.application.name="application_test" \
./exapmles/batch/WorldCount.jar
  • -t 支持yarn 上(-t yarn-application) 和 k8s 上(-t kubernetes-application)
  • -D 参数指定通用的 运行配置,比如 jobmanager/taskmanager 内存、checkpoint 时间间隔等
  1. jobmanager.memory.process.size=2048m,taskmanager.memory.process.size=4096m是JM和TM内存参数设置
  2. taskmanager.numberOfTaskSlots=2为设置TaskManager slots个数为3
  3. 指定并发还可以使用 -Dparallelism.default=3,-p的作用与这个相同,社区目前倾向使用-D+通用配置代替客户端命令参数-p
  4. yarn.provided.lib.dirs参数:可以预先上传 flink 客户端依赖包到远端共享存储中(一般是hdfs),flink运行时的依赖包省去了上传的过程,避免了网络和存储资源的浪费。同时运行在集群中的所有作业都会使用一份远端 flink 依赖包,并且每个 yarn nodemanager 都会缓存一份,提交速度也会大大提升,对于跨机房提交作业会有很大的优化。
    1
    -Dyarn.provided.lib.dirs="hdfs://flink/libs;hdfs://flink/hotfix" \
    5.应用程序也可以提前上传到hdfs的/flink/examples目录下,将上例中./exapmles/batch/WorldCount.jar修改为hdfs://flink/examples/WorldCount.jar,也可以将yarn.provided.lib.dirs 配置到 conf/flink-conf.yaml(如果没有增加一个)可参考官网配置说明,这时提交作业就和上面示例的普通作业没有区别了

常用命令

查看Flink Yarn Application

1
yarn application -list

停止Flink Yarn Application

1
yarn application -kill appID

相关链接

Flink 1.14.2 on Yarn 遇到的坑
flink1.10三节点集群之yarn-session模式