学习资料 – 第四章 Flink任务提交与架构模型 枫叶云笔记 (fynote.com)
Flink任务运行时架构


上图是Flink运行时架构流程,涉及集群启动、任务提交、资源申请分配整个流程,大体步骤如下:
1.启动Flink集群首先会启动JobManager,Standalone集群模式下同时启动TaskManager,该模式资源也就固定;其他集群部署模式会根据提交任务来动态启动TaskManager;
2.当在客户端提交任务后,客户端会将任务转换成JobGraph提交给JobManager;
3.JobManager首先启动Dispatcher用于分发作业,运行Flink WebUI提供作业执行信息;
4.Dispatcher启动后会启动JobMaster并将JobGraph提交给JobMaster,JobMaster会将JobGraph转换成可执行的ExecutionGraph。
5.JobMaster向对应的资源管理器ResourceManager为当前任务申请Slot资源;
6.在Standalone资源管理器中会直接找到启动的TaskManager来申请Slot资源,如果资源不足,那么任务执行失败;
7.其他资源管理器会启动新的TaskManager,新启动的TaskManager会向ResourceManager进行注册资源,然后ResourceManager再向
8.TaskManager申请Slot资源,如果资源不足会启动新的TaskManager来满足资源;
TaskManager为对应的JobMaster offer Slot资源;
9.JobMaster将要执行的task发送到对应的TaskManager上执行,TaskManager之间可以进行数据交换。
向Flink集群中提交任务有三种任务部署模式
会话模式 - Session Mode
单作业模式 - Per-Job Mode(过时)
应用模式 - Application Mode
1.会话模式(Session Mode)
Session模式下我们首先会启动一个集群,保持一个会话,这个会话中通过客户端提交作业,集群启动时所有的资源都已经确定,所以所有的提交的作业会竞争集群中的资源。这种模式适合单个作业规模小、执行时间短的大量作业

优势:只需要一个集群,所有作业提交之后都运行在这一个集群中,所有任务共享集群资源,每个任务执行完成后就释放资源。
缺点:因为集群资源是共享的,所以资源不够了,提交新的作业就会失败,如果一个作业发生故障导致TaskManager宕机,那么所有的作业都会受到影响。
2.单作业模式(Per-Job Mode)
为了更好的隔离资源,Per-job模式是每提交一个作业会启动一个集群,集群只为这个作业而生,这种模式下客户端运行应用程序,然后启动集群,作业被提交给JobManager,进而分发给TaskManager执行,作业执行完成之后集群就会关闭,所有资源也会释放。

优势:这种模式下每个作业都有自己的JobManager管理,独享当下这个集群的资源,就算作业发生故障,对应的TaskManager宕机也不影响其他作业。如果一个Application有多个job组成,那么每个job都有自己独立的集群。
缺点:每个作业都在客户端向集群JobManager提交,如果一个时间点大量提交Flink作业会造成客户端占用大量的网络带宽,会加重客户端所在节点的资源消耗。
注意:Per-Job 模式目前只有yarn支持,Per-job模式在Flink1.15中已经被弃用,后续版本可能会完全剔除,替代的是Application模式,主要原因就是Application模式把main方法的初始化放到了集群组件的JobManager中,这样对于客户端来说从性能上有很大优化。
3.应用模式(Application Mode)
Application模式与Per-job类似,只是不需要客户端,每个Application提交之后就会启动一个JobManager,也就是创建一个集群,这个JobManager只为执行这一个Flink Application而存在,Application中的多个job都会共用该集群,Application执行结束之后JobManager也就关闭了。这种模式下一个Application会动态创建自己的专属集群(JobManager),所有任务共享该集群,不同Application之间是完全隔离的,在生产环境中建议使用Application模式提交任务。

以上三种Flink任务部署方式生产环境中优先选择Application模式,三者区别总结如下:
1.Session 模式是先有Flink集群后再提交任务,任务在客户端提交运行,提交的多个作业共享Flink集群;
2.Per-Job模式和Application模式都是提交Flink任务后创建集群;
3.Per-Job模式通过客户端提交Flink任务,每个Flink任务对应一个Flink集群,每个任务有很好的资源隔离性;
4.Application模式是在JobManager上执行main方法,为每个Flink的Application创建一个Flink集群,如果该Application有多个任务,这些Flink任务共享一个集群。
Flink On Yarn任务提交
Flink On Yarn即Flink任务运行在Yarn集群中,Flink On Yarn的内部实现原理如下图:

1.当启动一个新的Flink YARN Client会话时,客户端首先会检查所请求的资源(容器和内存)是否可用,之后,它会上传Flink配置和JAR文件到HDFS。
2.客户端的下一步是向ResourceManager请求一个YARN容器启动ApplicationMaster。JobManager和ApplicationMaster(AM)运行在同一个容器中,一旦它们成功地启动了,AM就能够知道JobManager的地址,它会为TaskManager生成一个新的Flink配置文件(这样它才能连上JobManager),该文件也同样会被上传到HDFS。另外,AM容器还提供了Flink的Web界面服务。Flink用来提供服务的端口是由用户和应用程序ID作为偏移配置的,这使得用户能够并行执行多个YARN会话。
3.之后,AM开始为Flink的TaskManager分配容器(Container),从HDFS下载JAR文件和修改过的配置文件,一旦这些步骤完成了,Flink就可以基于Yarn运行任务了。
Yarn Application模式
Yarn Application模式提交任务命令如下:
#Yarn Application模式提交任务命令
[root@node5 bin]# ./flink run-application -t yarn-application -c com.mashibing.flinkjava.code.chapter3.FlinkAppWithMultiJob /root/FlinkJavaCode-1.0-SNAPSHOT-jar-with-dependencies.jar

1.客户端提交Flink任务,Flink会将jar包和配置上传HDFS并向Yarn请求Container启动JobManager
2.Yarn资源管理器分配Container资源,启动JobManager,并启动Dispatcher、ResourceManager对象。
3.客户端会将任务转换成JobGraph提交给JobManager。
4.Dispatcher启动JobMaster并将JobGraph提交给JobMaster。
5.JobMaster向ResourceManager申请Slot资源。
6.ResourceManager会向Yarn请求Container计算资源
7.Yarn分配Container启动TaskManager,TaskManager启动后会向ResourceManager注册Slot
8.ResourceManager会在对应的TaskManager上划分Slot资源。
9.TaskManager向JobMaster offer Slot资源。
10.JobMaster将任务对应的task发送到TaskManager上执行。
Apache Flink术语
Application
一个完整的Flink程序代码叫做一个Flink Application
一个完整的Flink Application一般由Source(数据来源)、Transformation(转换)、Sink(数据输出)三部分组成,Flink中一个或者多个Operator(算子)组合对数据进行转换形成Transformation,一个Flink Application 开始于一个或者多个Source,结束于一个或者多个Sink。
Job
一个Flink Application中可以有多个Flink Job,每次调用execute()或者executeAsyc()方法可以触发一个Flink Job
一个Flink Application中可以执行多次以上两个方法来触发多个job执行。但往往我们在编写一个Flink Application时只需要一个Job即可
DataFlow数据流图
一个Flink Job 执行时会按照Source、Transformatioin、Sink顺序来执行,这就形成了Stream DataFlow(数据流图),数据流图是整体展示Flink作业执行流程的高级视图,通过WebUI我们可以看到提交应用程序的DataFlow

Subtask子任务与并行度
在集群中运行Flink代码本质上是以并行和分布式方式来执行,这样可以提高处理数据的吞吐量和速度,处理一个Flink流过程中涉及多个Operator,每个Operator有一个或者多个Subtask(子任务),不同的Operator的Subtask个数可以不同,一个Operator有几个Subtask就代表当前算子的并行度(Parallelism)是多少,Subtask在不同的线程、不同的物理机或不同的容器中完全独立执行

上图下半部分是多并行度DataFlow视图,Source、Map、KeyBy等操作有2个并行度,对应2个subtask分布式执行,Sink操作并行度为1,只有一个subtask,一共有7个Subtask,每个Subtask处理的数据也经常说成处理一个分区(Stream Partition)的数据。 一个 Flink Application 的并行度通常认为是所有Operator中最大的并行度 。上图中的Application并行度就为2
Flink中并行度可以从以下四个层面指定:(优先级从高到低)
1.Operator Level (算子层面)
2.Execution Environment Level(执行环境层面)
3.Client Level(客户端层面)
4.System Level(系统层面)
#算子层面设置并行度
ds.flatMap(line=>{line.split(" ")}).setParallelism(2)
#执行环境层面设置并行度
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(3)
#客户端层面设置并行度
在客户端提交Flink任务时通过指定命令参数-p来动态设置并行度,并行度作用于全局代码
#系统层面设置并行度
#配置flink-conf.yaml文件
parallelism.default: 5
以上四种不同方式指定Flink 并行度的优先级为: Operator Level>Execution Environment Level>Client Level>System Level,本地编写代码时如果没有指定并行度,默认的并行度是当前机器的cpu core数。
Operator Chains 算子链
在Flink作业中,用户可以指定Operator Chains(算子链)将相关性非常强的算子操作绑定在一起,这样能够让转换过程上下游的Task数据处理逻辑由一个Task执行,进而避免因为数据在网络或者线程间传输导致的开销,减少数据处理延迟提高数据吞吐量。默认情况下,Flink开启了算子链

在集群中提交Flink任务后,可以通过Flink WebUI中查看到形成的算子链:

在Flink中哪些算子操作可以合并在一起形成算子链进行优化?
主要取决于算子之间的并行度与算子之间数据传递的模式
一个数据流在算子之间传递数据可以是一对一(One-to-one)的模式传递,也可以是重分区(Redistributing)的模式传递
即无shuffle 和 有 shuffle的操作
在Flink中 One-to-one 的算子操作且并行度一致,默认自动合并在一起形成一个算子链
通过以下方式来禁用算子链
#禁用算子链
StreamExecutionEnvironment.disableOperatorChaining()

设置新的算子链
#从当前算子开始一个新的算子链
someStream.filter(...).map(...).startNewChain().map(...);
在算子上禁用算子链
#打断算子链
someStream.map(...).disableChaining();
Flink执行图
Flink代码提交到集群执行时最终会被转换成task分布式的在各个节点上运行,在前面我们学习到DataFlow数据流图,DataFlow是一个Flink应用程序执行的高级视图,展示了Flink应用程序执行的总体流程,在Flink底层由DataFlow最终转换成执行的task的过程还涉及一些对象转换。下图以一个普通的Flink处理数据流程展示了一个Flink任务提交到集群后内部对象转换关系和流程,其中每个虚线框代表一个task,p代表并行度,这里假设为2

1.在Flink中一个Task一般对应的就是一个算子或者多个算子逻辑。多个算子逻辑经过Operator Chains优化后也是由一个Task执行的。
2.Flink分布式运行中,Task会按照并行度划分成多个Subtask,每个Subtask由一个Thread线程执行,多个Subtask分布在不同的线程不同节点形成Flink分布式的执行。
3.Subtask是Flink任务调度的基本单元
TaskSlot任务槽
一个Flink 任务有多个Subtask,这些Subtask能否正常在该TaskManager上启动?到底一个TaskManager上能同时执行多少个SubtasK?
Flink集群中每个TaskManager是一个JVM进程,可以在TaskManagr中执行一个或者多个subtask,为了能控制一个TaskManager中接收多少个Task,TaskManager节点上可以提供taskslot(任务槽),一个TaskManager上可以划分多个taskslot,taskslot是Flink系统中资源调度的最小单元,可以对TaskManager上的资源进行明确划分,每个taskslot可以运行一个或者多个subtask,每个JobManager上至少有一个taskSlot。

每个taskSlot都有固定的资源,假设一个TaskManager有三个TaskSlots,那么每个TaskSlot会将TaskMananger中的内存均分,即每个任务槽的内存是总内存的1/3,分配资源意味着subtask不会与其他作业的subtask竞争内存,taskslot的作用就是分离任务的托管内存,不会发生cpu隔离
通过调整taskSlot的数据量,用户可以指定每个TaskManager有多少task slot,TaskManager可以配置成单Slot模式,这样这个JobManager上运行的任务就独占了整个JVM进程,更多的taskSlot意味着更多的subtask可以共享同一个JVM,同一个JVM中的task共享TCP连接和心跳信息,共享数据集和数据结构,从而减少TaskManager中的task开销
#flink-conf.yamml文件中配置每个taskmanager拥有的taskslot个数
taskmanager.numberOfTaskSlots: 3
我们可以通过配置每个TaskManager上taskslot的数量来决定每个TaskManager上可以执行多少subtask,由于taskslot只会对内存进行隔离不会对CPU进行隔离,一台TaskManager taskslot越多意味着越多的taskslot争夺CPU资源,所以 taskslot的值设置建议和该 TaskManager 节点 CPU core 的数量保持一致
TaskSlot共享&SlotSharingGroup共享组
默认情况下,Flink 允许 subtask 共享 taskSlot,即便它们是不同的 subtask,只要是来自于同一Flink作业即可(Flink不允许属于不同作业的task共享同一个slot),结果就是一个 slot 可以持有整个作业管道。

#手动指定slotSharingGroup
someStream.filter(...).slotSharingGroup("name");
如果一个Flink 任务有多个共享组,那么该Flink任务所需的总slot个数就是每个共享组最大并行度的总和
允许 slot 共享有两个主要优点:
1.Flink 集群所需的 taskSlot 和作业中使用的最大并行度恰好一样,不需要关注Flink程序总共包含多少个 subtask。
2.容易获得更好的资源利用。如果没有 slot 共享,非密集 subtask(source/map())将阻塞和密集型 subtask(window())一样多的资源。通过 slot 共享,确保繁重的 subtask 在 TaskManager 之间公平分配
TaskSlot与并行度关系
taskslot是静态概念,指的是 Flink TaskManager 能够并发执行的 task 数
并行度是动态概念,指的是每个应用程序实际的并发能力
如果Flink集群中所有slot个数大于等于Flink 任务的并行度(Flink中所有算子最大并行度),那么Flink程序可以正常运行,否则Flink程序不能正常启动。我们结合下图来理解TaskSlot和并行度的关系:
首先我们的Flink集群有3个TaskManager,每个TaskManager根据配置划分3个slot,所以Flink整个集群Slot总数为9,代表了当前集群能够支持并发task的最高能力。

如图:example1中当我们向集群中提交Flink任务(WordCount)只有1个并行度时,这个任务只会占用集群中的1个taskslot。当我们向集群中提交的Flink任务有2个并行度时,这个任务占用集群2个taskslot,如图example2。

如上图example3中当我们提交的Flink任务有9个并行度时,任务在Flink集群中占用了所有的slot资源,当前集群不能再提交新的任务,因为当前集群中没有更多资源支撑新的Flink任务运行。
如果提交的Flink 任务所有算子并行度为9,就算其中有一些操作并行度为1(如example4中sink操作)同样占用Flink集群9个taskslot。
粗粒度资源管理
将任务被部署到预定义的、通常相同的Slot中,而无需了解每个Slot包含多少资源,使用粗粒度资源管理只是简单的将所有的task任务运行在一个SlotSharingGroup(SSG)中就可以很好地利用资源
由于不同的Task热点峰值不一定相同,当一个任务的消耗减少时,额外的资源可以被另一个消耗增加的任务使用,这被称为削峰填谷效应,通过这种效应将不同的task放到同一个大的Slot里,可以进一步降低整体的资源开销。
多数简单作业使用粗粒度资源调度的特点如下:
1.对于所有任务具有相同并行度的流作业处理,每个Slot将包含一个完整的管道。理想情况下,所有管道应该使用大致相同的资源,这可以通过调整相同Slot的资源轻松满足。
2.任务的资源消耗随时间而变化,当一个任务的消耗减少时,额外的资源可以被另一个消耗增加的任务使用,这被称为削峰填谷效应,减少了所需的整体资源
但是在如下情况中,粗粒度资源调度管理效果不佳:
1.任务可能有不同的并行度。有时,这种不同的并行是不可避免的。例如,Source/Sink/转换操作的tasks并行性可能会受到外部上游/下游系统的分区和IO负载的限制。在这种情况下,并行度小的task需要的Slot资源比整个任务管道需要的Slot的资源要少。
2.有时整个管道所需的资源可能太多,单个Slot/TaskManager无法提供。在这种情况下,需要将管道拆分为多个SSG(SlotSharingGroup),这些SSG可能并不总是具有相同的资源需求。
3.对于批处理作业,并非所有任务都可以同时执行,因此,管道的瞬时资源需求随时间而变化。
综上所述,尝试使用相同的Slot执行所有任务可能会导致资源利用率不佳

在以上这个作业中粗粒度资源管理就可能导致资源利用效率降低。
同时,整个作业的 pipeline 可能由于资源过大而无法放到一个 slot 或是 TM 中,比如上述算子的内存,再比如 Tensorflow 模块需要 GPU 来保证计算效率。由于 GPU 是一种非常昂贵的资源,集群上不一定有足够的数量,从而导致作业因为对齐并发而无法申请到足够的资源,最终无法执行。
我们可以将整个作业拆分成多个 SSG。如下图所示:

细粒度资源管理
基本思想是,每个 slot 的资源规格都可以单独定制,用户按需申请,最大化资源的利用效率
细粒度资源管理就是通过使作业各个模块按需申请和使用资源来提高资源的整体利用效率
注意:细粒度资源管理特性目前是一个MVP("最小可行产品")特性,只对DataStream API可用。
细粒度资源适用场景:
1.作业中上下游 task 并发有显著差异。
2.整个任务pipeline所需的资源太多,无法放入单个Slot/TaskManager中。
3.批作业,不同Stage消耗资源有显著差异。
几种情况都需要将作业拆分成多个 SSG,而不同的 SSG 资源需求存在差异,这时通过细粒度资源管理就能减少资源浪费。

资源分配策略-动态资源切割机制
Flink针对每个SSG进行资源分配采用的是动态资源切割机制

在细粒度资源管理中,Flink将从TaskManager中切割出一个完全匹配的Slot,用于指定资源的Slot请求。内部流程如上所示,TaskManager将使用全部资源启动,但没有预定义的Slot,当一个具有0.25 Core和1GB内存的Slot请求到达时,Flink将选择一个具有足够空闲资源的TaskManager,并使用请求的资源创建一个新的Slot。如果一个Slot被释放,它将其资源返回给TaskManager的可用资源。
在细粒度资源分配策略中,Flink将遍历所有已注册的Taskmanager,并选择第一个有足够空闲资源的Taskmanager来完成Slot请求。当没有足够可用资源的TaskManager时,Flink会Native Kubernetes或YARN上时尝试分配一个新的TaskManager。
在当前的策略中,Flink会根据用户的配置分配相同的Taskmanager。由于TaskManagers的资源规范是预定义的,需要注意以下问题:
1.集群中可能存在资源碎片。例如:如果有两个Slot请求,每个Slot具有3G堆内存,而TaskManager的总堆内存是4G, 则Flink将启动两个TaskManager,每个TaskManager中会有1G的堆内存被浪费。在将来,可能会有一种资源分配策略,可以根据作业的Slot请求分配异构的taskmanager,从而减轻资源碎片。
2.请确保为Slot共享组配置的资源数量不超过TaskManager的资源总量。否则,Flink job 将异常失败。
回到以上案例上来,我们可以根据细粒度资源管理针对不同的SSG分配不同的资源来最大化利用资源:

在集群中进行资源分配如下,我们只需要起8个同样规格的TM就能调度作业,每个TM上带一块GPU来满足SSG4,之后将CPU密集型的SSG1和内存密集型的SSG2和SSG3进行混布,对齐TM上整体的CPU内存比即可

细粒度资源管理的局限性
1.不支持平均分配Slot策略。这个策略试图在所有可用的taskmanager上平均分配Slot。细粒度资源管理和集群第一版不支持该策略。平均分配Slot目前不会在其中生效。
2.与Flink的Web UI的集成有限。细粒度资源管理中的slot可以具有不同的资源规格。web UI目前不显示slot详细信息。
3.与批处理作业的有限集成。目前,细粒度资源管理要求在所有边缘类型都被阻塞BLOCKING的情况下执行批处理工作负载。为此,您需要配置 fine-grained.shuffle-mode.all-blocking(在批处理作业中应用细粒度资源管理时,是否将所有的PIPELINE边缘转换为BLOCKING)为true。注意,这可能会影响性能。详见FLINK-20865。
4.不建议配置混合资源。不建议仅为作业的某些部分指定资源需求,而未指定其余部分的资源需求。目前,任何资源的slot都可以满足未指定的需求。它获取的实际资源可能在不同的作业执行或故障切换中不一致。
5.Slot分配结果可能不是最优的。由于Slot需求包含多个维度的资源,默认的资源分配策略可能无法实现最优的Slot分配,在某些场景下可能会导致资源分片或资源分配失败。
Flink 内存模型
由于在Flink中计算主要存在于TaskManager节点,这里说的Flink内存模型也就是TaskManager的内存模型,JobManager的内存模型与TaskManager的内存模型类似。

Flink 总内存(Total Flink Memory)
TaskManager进程占用的所有与Flink相关的内存,不包括JVM特定内存部分,包含6个部分内存(Framework堆内存、Task堆内存、托管内存、Framework非堆内存、Task非堆内存、Network),关于Flink Framework和Flink Task使用的内存既有堆内内存也有堆外内存,托管内存和Network使用的仅是堆外内存。
Flink总内存配置参数根据不同的部署场景不同:taskmanager.memory.flink.size 或者 taskmanager.memory.process.size(容器部署指定参数),无默认值,需要用户指定。
Flink堆内存(JVM Heap)
Flink堆内存就是JVM堆内存(JVM Heap),分为Framework堆内存(Framework Heap)和Task堆内存(Task Heap),其中Framework 主要用于Flink框架本身需要的内存空间,Task堆内存则用于Flink算子及用户代码的执行,也被称为TaskExecutor使用的内存,两者的主要区别在于是否将内存计入Slot计算资源中,Framework堆内存不会将内存分配给Slot,Task堆内存会分配给Slot。
Framework堆内存(Framework Heap)
Framework堆内存配置参数为:taskmanager.memory.framework.heap.size,该值默认为128M。
Task 堆内存(Task Heap)
Task堆内存配置参数为:taskmanager.memory.task.heap.size,该值没有默认值,如果没有指定会自动用Flink总内存减去Framework堆内存(Framework Heap)、托管内存(Managed Memory)、Framework非堆内存(Framework Off-Heap)、Task非堆内存(Task Off-Heap)、NetWork的剩余内存。
Flink非堆内存(Off-Heap Memory)
非堆内存也可以叫做堆外内存,更准确来说是大部分的堆外内存,包含了托管内存(Managed Memory)、直接内存(Direct Memory)两部分。
托管内存(Managed Memory)
托管内存(Managed Memory)是由Flink负责分配和管理的本地堆外内存,在流处理作业中用于RocksDBstateBackend状态存储后端,在批处理作业中用于排序、哈希表及缓存中间结果。
托管内存(Managed Memory)配置参数有两个,分别如下:
taskmanager.memory.managed.fraction,默认值0.4,如果未显式指定托管内存大小,则使用总Flink内存的百分比作为托管内存。
taskmanager.memory.managed.size,无默认值,一般也不指定,而是按照比例来推定,更加灵活。
直接内存(Direct Memory)
直接内存(Direct Memory)分为Framework非堆内存(Framework Off-Heap)、Task 非堆内存(Task Off-Heap)和Network三个部分。直接内存主要作用是减少GC压力、提升性能效率。
Framework非堆内存(Framework Off-Heap)
Framework 非堆内存即taskexecutor的Framework 堆外内存大小,不会分配给slot,配置参数为:taskmanager.memory.framework.off-heap.size,默认值128M。
Task非堆内存(Task Off-Heap)
Task非堆内存,配置参数taskmanager.memory.task.off-heap.size,默认值为0,即不使用。
Network
Network内存存储空间主要用于基于Netty进行网络数据交换数据传输的本地缓存,例如:TaskManager之间Shuffle、广播、与外部组件的数据传输。Network的配置相关参数有3个,分别如下:
taskmanager.memory.network.min:网络缓存的最小值,默认64MB;
taskmanager.memory.network.max:网络缓存的最大值,默认1GB;
taskmanager.memory.network.fraction:网络缓存占Flink总内存taskmanager.memory.flink.size的比例,默认值0.1。若根据此比例算出的内存量比最小值小或比最大值大,就会限制到最小值或者最大值。
JVM 特定内存
JVM特定内存是JVM堆外内存的另一小部分内存,其不在Flink总内存范围之内,包括JVM元空间(JVM Metaspace)和JVM Overhead 两部分,其中JVM元空间存储JVM加载类的元数据,加载的类越多,需要的内存空间越大,JVM Overhead 则主要用于其他JVM开销,例如代码缓存、线程栈等。
Flink中将内存分成不同的区域,实现了更加精准地内存控制,在使用Flink过程中一般指定Flink总内存(Total Flink Memory,taskmanager.memory.flink.size)即可,其他额外指定JVM内存参数不需额外指定,如果需要根据Flink程序做一些调整建议有限调整fraction比例参数,例如:网络缓存占比taskmanager.memory.network.fraction(根据网络流量大小调节)与托管内存占比taskmanager.memory.managed.fraction(根据RocksDB状态大小调节),这样做可以间接影响任务内存的配额,需要特别注意的是如果手动指定较多的固定参数很有可能出现内存配额冲突导致Flink程序部署失败