Flink on yarn
上一次我们实践了 Local 模式和Standalone 模式 :Flink多种集群模式部署实践
我们来实践一下推荐生产环境部署的 Flink on yarn模式
这篇贼辛苦,因为涉及到 hadoop和flink的部署,不过收获也颇多。
为什么使用Flink on Yarn
在实际开发中,使用Flink时,更多的使用方式是Flink On Yarn模式,原因如下:
Yarn的资源可以按需使用,提高集群的资源利用率
Yarn的任务有优先级,根据优先级运行作业
基于Yarn调度系统,能够自动化地处理各个角色的 Failover(容错)
- JobManager 进程和 TaskManager 进程都由 Yarn NodeManager 监控
- 如果 JobManager 进程异常退出,则 Yarn ResourceManager 会重新调度 JobManager 到其他机器
- 如果 TaskManager 进程异常退出,JobManager 会收到消息并重新向 Yarn ResourceManager 申请资源,重新启动 TaskManage
Flink如何和Yarn进行交互?
1.Client上传jar包和配置文件到HDFS集群上
2.Client向Yarn ResourceManager提交任务并申请资源
3.ResourceManager分配Container资源并启动ApplicationMaster,然后AppMaster加载Flink的Jar包和配置构建环境,启动JobManager
JobManager和ApplicationMaster运行在同一个container上。一旦他们被成功启动,AppMaster就知道JobManager的地址(AM它自己所在的机器)。它就会为TaskManager生成一个新的Flink配置文件(他们就可以连接到JobManager)。这个配置文件也被上传到HDFS上。此外,AppMaster容器也提供了Flink的web服务接口。YARN所分配的所有端口都是临时端口,这允许用户并行执行多个Flink
4.ApplicationMaster向ResourceManager申请工作资源,NodeManager加载Flink的Jar包和配置构建环境并启动TaskManager
5.TaskManager启动后向JobManager发送心跳包,并等待JobManager向其分配任务
部署模式
首先明确下面两个术语:
- Flink Application:Flink Application 是一个 Java 应用程序,它通过 main 方法(或其他方式)提交一个或多个 Flink Job。提交作业通常通过调用 execute 来完成。
- Flink Job:是 Flink DAG 图在运行时的具体表现,一个 DAG 对应一个 Flink Job,一般是通过在 Flink Application 中调用 execute 来创建和提交的。
然后再来看 Flink 如何区分不同的部署模式:
- 集群的生命周期和隔离性保证
- application 的 main 方法在哪里执行
Session Mode
多个job共享同一个集群<jobmanager/tastmanager>、job退出集群也不会退出,用户类的main方法再client端运行
适用:需要频繁提交小job,因为每次提交新job时候,不需要去向yarn注册应用
- Session模式是预分配资源的,也就是提前根据指定的资源参数初始化一个Flink集群,并常驻在YARN系统中,拥有固定数量的JobManager和TaskManager,该资源中JobManager有且只有一个。
- 提交到这个集群的作业可以直接运行,免去了每个提交作业都启动一个完整集群的资源开销,但是Session的资源总量有限,多个作业之间又不是隔离的,故可能会造成资源的争用,如果有一个作业因为异常导致TaskManager宕机,则它上面承载着的所有作业也都会受到影响。
- 另外,启动的作业越多,JobManager的负载也就越大。所以,Session模式一般用来部署那些对延迟非常敏感但运行时长较短的作业。
Per-Job Mode (1.15 deprecated).
每个job独享一个集群,job退出集群则退出,用户类的main方法再client端运行
适用:大job,运行时间很长,因为每起一个job,都要去向yarn申请容器启动jm,tm,比较耗时
- 为了提供更好的资源隔离保证,在Per-Job模式下,每个提交到YARN上的作业会各自形成单独的Flink集群,拥有专属的JobManager和TaskManager。
- 当作业完成时,分配给当前作业的集群将被销毁,所有缓存在集群中的资源(文件等)将被清除。作业之间的资源完全隔离,一个作业的TaskManager失败不会影响其他作业的运行。
- Per-Job模式一般用来部署那些长时间运行的作业。
Application Mode(Flink 1.11引入)
每个job独享一个集群,job退出集群则退出,用户类的main方法再集群上运行
- application模式,在该模式下会为每个提交的应用创建一个集群,用户程序的 main 方法将在JobManager集群中而不是客户端运行。
- Application模式的会话集群,仅在特定应用程序的作业之间共享,并在应用程序完成时终止。
- 在这种体系结构中,Application 模式在不同应用之间提供了资源隔离和负载平衡保证。在特定一个应用程序上,JobManager 执行 main() 可以节省所需的 CPU 周期,还可以节省本地下载依赖项所需的带宽。
部署实践运行与测试
1 | #HADOOP_HOME |
需要说明的是,Flink on yarn模式部署时,实际上不需要对Flink做任何修改配置,只需要将其解压传输到各个节点之上
如果要实现高可用的方案,这个时候就需要到Flink相应的配置修改参数,具体的配置文件是FLINK_HOME/conf/flink-conf.yaml。
standalone模式还是需要配置 Flink多种模式部署实践中的Standalone 模式
对于Flink on yarn模式,我们并不需要在conf配置下配置 masters和slaves。因为在指定TM的时候可以通过参数“-n”来标识需要启动几个TM;Flink on yarn启动后,如果是在分离式模式你会发现,在所有的节点只会出现一个 YarnSessionClusterEntrypoint进程;如果是客户端模式会出现2个进程一个YarnSessionClusterEntrypoint和一个FlinkYarnSessionCli进程。
Session Mode
Session Mode的运行主要有两个步骤。第一步是启动yarn session,开辟资源,第二步是flink run运行job。
用yarn session启动集群时,有两种方式可以使用,分别是客户端模式和分离模式
客户端模式启动yarn session
使用如下启动命令
1 | ./bin/yarn-session.sh -n 2 -jm 1024 -tm 1024 -s 2 |
- -n: TaskManager的数量,相当于executor的数量
- -s: 每个JobManager的core的数量,executor-core,建议将slot的数量设置成每台机器处理器数量
- -tm: 每个TaskManager的内存大小
- -jm: JobManager的内存大小
上面的命令的意思是,同时向Yarn申请3个container,其中2个Container启动TaskManager(-n 2),每个TaskManager 拥有两个Task Slot(-s 2),并且向每个TaskManager的Container申请1024 M 的内存,以及一个ApplicationMaster(Job Manager)。
在客户端模式下,那作业提交后,资源的大小是由yarn的队列所决定的,多个job提交,资源的占用和竞争都是由yarn所控制。
客户端模式下,jps查看的进程有两个
1
2FlinkYarnSessionCli
YarnSessionClusterEntrypointhadoop application查看
yarn命令行查看
1
2
3
4
5[root@hadoop103 flink-1.15.2]# yarn application -list
2022-09-28 17:13:04,195 INFO client.RMProxy: Connecting to ResourceManager at hadoop103/192.168.10.103:8032
Total number of applications (application-types: [], states: [SUBMITTED, ACCEPTED, RUNNING] and tags: []):1
Application-Id Application-Name Application-Type User Queue State Final-State Progress Tracking-URL
application_1664355973790_0001 Flink session cluster Apache Flink root default RUNNING UNDEFINED 100% http://192.168.10.103:39719从flink daskboard查看
分离模式启动yarn session
使用启动命令如下
1 | ./bin/yarn-session.sh -d -n 2 -jm 1024 -tm 1024 -s 2 |
- -d: 指定分离模式
分离模式在启动的时候会在yarn中常驻一个进程,并且已经确定了之后提交的job的内存等资源的大小,比如8G内存,如果某一个job把8G内存全部占完了,只能是第一个job执行完成把资源释放了,第二个job才能继续执行。
分离模式下,jps查看进程有一个
1
YarnSessionClusterEntrypoint
hadoop application查看
yarn命令行查看
1
2
3
4
5[root@hadoop102 flink-1.15.2]# yarn application -list
2022-09-28 17:28:38,451 INFO client.RMProxy: Connecting to ResourceManager at hadoop103/192.168.10.103:8032
Total number of applications (application-types: [], states: [SUBMITTED, ACCEPTED, RUNNING] and tags: []):1
Application-Id Application-Name Application-Type User Queue State Final-State Progress Tracking-URL
application_1664357090524_0002 Flink session cluster Apache Flink root default RUNNING UNDEFINED 100% http://192.168.10.103:41395从flink daskboard查看
提交任务
1 | ./bin/flink run ./examples/batch/WordCount.jar |
Flink 作业提交后,若要在Flink WebUI上查看作业的详细可以通过如下操作
- 方式1:从终端查看链接
- 方式2:从hadoop查看
点击yarn集群管理器中启动的application_ID,进入如下页面,点击红色方框
yarn session资源释放
1 | yarn application -kill application_1664357090524_0002 |
启动完整参数
1 | ./bin/yarn-session.sh 参数 |
- -n: 指定TaskManager的数量;
- -d: 以分离模式运行;
- -id: 指定yarn的任务ID;
- -j: Flink jar文件的路径;
- -jm: JobManager容器的内存(默认值:MB);
- -nl: 为YARN应用程序指定YARN节点标签;
- -nm: 在YARN上为应用程序设置自定义名称;
- -q: 显示可用的YARN资源(内存,内核);
- -qu: 指定YARN队列;
- -s: 指定TaskManager中slot的数量;
- -st: 以流模式启动Flink;
- -tm: 每个TaskManager容器的内存(默认值:MB);
- -z: 命名空间,用于为高可用性模式创建Zookeeper子路径;
Per-Job Mode 单作业模式部署
启动
为了方便查看测试过程,我们使用flink包中的SocketWindowWordCount项目,我们需要提前启动socket服务及端口。
1、启动socket服务
1 | nc -lk 9002 |
2、启动Per-Job命令并启动任务
1 | ./bin/flink run -m yarn-cluster -yjm 1024 -ytm 1024 examples/streaming/SocketWindowWordCount.jar --hostname hadoop102 --port 9002 |
官方:
1 | ./bin/flink run -t yarn-per-job -yjm 1024 -ytm 1024 ./examples/streaming/SocketWindowWordCount.jar --hostname hadoop102 --port 9002 |
- -t: 表示启动方式(–target),可取yarn-per-job、yarn-session、run-application等
- -yjm: JobManager内存
- -ytm: TaskManager内存大小申请
启动日志
JPS查看
yarn集群查看
从上图中我们可以看出,yarn cluster上管理了一个ID为application_1664357090524_0003
、name为Flink per-job cluster
的资源环境。鼠标点击ID,可计入详细信息界面。
点击上图中的Tracking URL:ApplicationMaster,可以打开Flink web ui管理界面。
测试数据
当前示例是一个实时流处理,只要socket服务和端口一直处于开启状态,Flink作业就会一直处于运行状态。我们在前面介绍中说过,Per-Job模式的作业,执行完成就会释放yarn资源,停掉socket作业和端口服务,当前yarn所管理的flink 作业和资源就会得到释放。
在socket服务界面 CTR+C 结束服务, Flink作业日志界面会打印如下信息
Yarn集群
Application Mode 应用模式部署
1 | bin/flink run-application -t yarn-application -p 3\ |
- -t 支持yarn 上(-t yarn-application) 和 k8s 上(-t kubernetes-application)
- -D 参数指定通用的 运行配置,比如 jobmanager/taskmanager 内存、checkpoint 时间间隔等
- jobmanager.memory.process.size=2048m,taskmanager.memory.process.size=4096m是JM和TM内存参数设置
- taskmanager.numberOfTaskSlots=2为设置TaskManager slots个数为3
- 指定并发还可以使用 -Dparallelism.default=3,-p的作用与这个相同,社区目前倾向使用-D+通用配置代替客户端命令参数-p
- yarn.provided.lib.dirs参数:可以预先上传 flink 客户端依赖包到远端共享存储中(一般是hdfs),flink运行时的依赖包省去了上传的过程,避免了网络和存储资源的浪费。同时运行在集群中的所有作业都会使用一份远端 flink 依赖包,并且每个 yarn nodemanager 都会缓存一份,提交速度也会大大提升,对于跨机房提交作业会有很大的优化。5.应用程序也可以提前上传到hdfs的/flink/examples目录下,将上例中./exapmles/batch/WorldCount.jar修改为hdfs://flink/examples/WorldCount.jar,也可以将yarn.provided.lib.dirs 配置到 conf/flink-conf.yaml(如果没有增加一个)可参考官网配置说明,这时提交作业就和上面示例的普通作业没有区别了
1
-Dyarn.provided.lib.dirs="hdfs://flink/libs;hdfs://flink/hotfix" \
常用命令
查看Flink Yarn Application
1 | yarn application -list |
停止Flink Yarn Application
1 | yarn application -kill appID |