本文由社区志愿者陈政羽整理,Apache Flink Committer、阿里巴巴技术专家宋辛童,Apache Flink Contributor、阿里巴巴高级开发工程师郭旸泽分享,主要介绍 Flink 1.12 资源管理的一些特性。内容主要分为 4 部分:
1.内存管理
2.资源调度扩展
3.资源框架
4.未来规划
首先回顾 Flink 的内存模型变迁。下图左边分别为 Flink 1.10、Flink 1.11 引入的新的内存模型。尽管涉及的模块较多,但 80% - 90% 的用户仅需关注真正用于任务执行的 Task Heap Memory、Task Off-Heap Memory、Network Memory、Managed Memory 四部分。
其它模块大部分是 Flink 的框架内存,正常不需要调整,即使遇到问题也可以通过社区文档来解决。除此之外,“一个作业究竟需要多少内存才能满足实际生产需求” 也是大家不得不面临的问题,比如其他指标的功能使用、作业是否因为内存不足影响了性能,是否存在资源浪费等。
针对上述内容,社区在 Flink 1.12 版本提供了一个全新的, 关于 Task manager 和 Job
manager 的 Web UI。
在新的 Web UI 中,可以直接将每一项监控指标配置值、实际使用情况对应到内存模型中进行直观的展示。在此基础上,可以更清楚的了解到作业的运行情况、该如何调整、用哪些配置参数调整等 (社区也有相应的文档提供支持)。通过新的 Web UI,大家能更好的了解作业的使用情况,内存管理也更方便。
1. 本地内存(Managed Memory)
Flink 托管内存实际上是 Flink 特有的一种本地内存,不受 JVM 和 GC 的管理,而是由 Flink 自行进行管理。
本地内存的特点主要体现在两方面:
当前,针对托管内存,Flink 的使用场景如下:
2. Job Graph 编译阶段
Flink 对于 management memory 的管理主要分为两个阶段。
2.1 作业的 Job Graph 编译阶段
在这个阶段需要注意三个问题:
2.2 执行阶段
第一个步骤是根据 State Backend 的类型去判断是否有 RocksDB。如上图所示,比如一个 slot,有 ABC 三个算子,B 跟 C 都用到了 Python,C 还用到了 Stateful 的 Operator。这种情况下,如果是在 heap 的情况下,我们走上面的分支,整个 slot 当中只有一种在使用,就是Python。之后会存在两种使用方式:
批处理的情况跟流的情况有两个不同的地方,首先它不需要去判断 State Backend 的类型,这是一个简化; 其次对于 batch 的算子,上文提到每一个算子有自己独享的资源的预算,这种情况下我们会去根据使用率算出不同的使用场景需要多少的 Shared 之后,还要把比例进一步的细分到每个 Operator。
3. 参数配置
上方图表展示了我们需要的是 manager,memory 大小有两种配置方式:
taskmanager.memory.managed.consumer-weight 是一个新加的配置项,它的数据类型是 map 的类型,也就是说我们在这里面实际上是给了一个 key 冒号 value,然后逗号再加上下一组 key 冒号 value 的这样的一个数据的结构。这里面我们目前支持两种 consumer 的 key:
部分资源调度相关的 Feature 是其他版本或者邮件列表里面大家询问较多的,这里我们也做对应的介绍。
1. 最大 Slot 数
Flink 在 1.12 支持了最大 slot 数的一个限制(slotmanager.number-of-slots.max),在之前我们也有提到过对于流式作业我们要求所有的 operator 同时执行起来,才能够保证数据的顺畅的运行。在这种情况下,作业的并发度决定了我们的任务需要多少个 slot 和资源去执行作业。
然而对于批处理其实并不是这样的,批处理作业往往可以有一个很大的并发度,但实际并不需要这么多的资源,批处理用很少的资源,跑完前面的任务腾出 Slot 给后续的任务使用。通过这种串行的方式去执行任务能避免 YARN/K8s 集群的资源过多的占用。目前这个参数支持在 yarn/mesos/native k8 使用。
2. TaskManager 容错
在我们实际生产中有可能会有程序的错误、网络的抖动、硬件的故障等问题造成 TaskManager 无法连接,甚至直接挂掉。我们在日志中常见的就是 TaskManagerLost 这样的报错。对于这种情况需要进行作业重启,在重启的过程中需要重新申请资源和重启 TaskManager 进程,这种性能消耗代价是非常高昂的。
对于稳定性要求相对比较高的作业,Flink1.12 提供了一个新的 feature,能够支持在 Flink 集群当中始终持有少量的冗余的 TaskManager,这些冗余的 TaskManager 可以用于在单点故障的时候快速的去恢复,而不需要等待一个重新的资源申请的过程。
通过配置 slotmanager.redundant-taskmanager-num 可以实现冗余 TaskManager。这里所谓的冗余 TaskManager 并不是完完全全有两个 TaskManager 是空负载运行的,而是说相比于我所需要的总共的资源数量,会多出两个 TaskManager。
任务可能是相对比较均匀的分布在上面,在能够在利用空闲 TaskManager 的同时,也能够达到一个相对比较好的负载。 一旦发生故障的时候,可以去先把任务快速的调度到现有的还存活的 TaskManager 当中,然后再去进行新一轮的资源申请。目前这个参数支持在 yarn/mesos/native k8 使用。
3. 任务平铺分布
任务平铺问题主要出现在 Flink Standalone 模式下或者是比较旧版本的 k8s 模式部署下的。在这种模式下因为事先定义好了有多少个 TaskManager,每个 TaskManager 上有多少 slot,这样会导致经常出现调度不均的问题,可能部分 manager 放的任务很满,有的则放的比较松散。
在 1.11 的版本当中引入了参数 cluster.evenly-spread-out-slots,这样的参数能够控制它,去进行一个相对比较均衡的调度。
注意:
1. 背景
近年来,随着人工智能领域的不断发展,深度学习模型已经被应用到了各种各样的生产需求中,比较典型的场景如推荐系统,广告推送,智能风险控制。这些也是 Flink 一直以来被广泛使用的场景,因此,支持人工智能一直以来都是 Flink 社区的长远目标之一。针对这个目标,目前已经有了很多第三方的开源扩展工作。由阿里巴巴开源的工作主要有两个:
以上的两个工作都是从功能性上对 Flink 进行扩展,然而从算力的角度上讲,深度学习模型亦或机器学习算法,通常都是整个任务的计算瓶颈所在。GPU 则是这个领域被广泛使用用来加速训练或者预测的资源。因此,支持 GPU 资源来加速计算是 Flink 在 AI 领域的发展过程中必不可少的功能。
2. 使用扩展资源
目前 Flink 支持用户配置的资源维度只有 CPU 与内存,而在实际使用中,不仅是 GPU,我们还会遇到其他资源需求,如 SSD 或 RDMA 等网络加速设备。因此,我们希望提供一个通用的扩展资源框架,任何扩展资源都可以以插件的形式来加入这个框架,GPU 只是其中的一种扩展资源。
对于扩展资源的使用,可以抽象出两个通用需求:
3. 扩展资源框架使用方法
使用资源框架我们可以分为以下这 3 个步骤:
3.1 配置参数
# 定义扩展资源名称,“gpu”external-resources: gpu# 定义每个 TaskManager 所需的 GPU 数量external-resource.gpu.amount: 1 # 定义Yarn或Kubernetes中扩展资源的配置键external-resource.gpu.yarn.config-key: yarn.io/gpuexternal-resource.gpu.kubernetes.config-key: nvidia.com/gpu# 定义插件 GPUDriver 的工厂类。external-resource.gpu.driver-factory.class: org.apache.flink.externalresource.gpu.GPUDriverFactory
以上是使用 GPU 资源的配置示例:
3.2 前置准备
在实际使用扩展资源前,还需要做一些前置准备工作,以 GPU 为例:
3.3 扩展资源框架插件
完成了对扩展资源的调度后,用户自定义算子可能还需要运行时扩展资源的信息才能使用它。扩展资源框架中的插件负责完成该信息的获取,它的接口如下:
public interface ExternalResourceDriverFactory { /** * 根据提供的设置创建扩展资源的Driver */ ExternalResourceDriver createExternalResourceDriver(Configuration config) throws Exception;}public interface ExternalResourceDriver { /** * 获取所需数量的扩展资源信息 */ Set<? extends ExternalResourceInfo> retrieveResourceInfo(long amount) throws Exception;}
ExternalResourceDriver 会在各个 TaskManager 上启动,扩展资源框架会调用各个 Driver 的 retrieveResourceInfo 接口来获得 TaskManager 上的扩展资源信息,并将得到的信息传到算子的 RuntimeContext。ExternalResourceDriverFactory 则为插件的工厂类。
4. GPU 插件
Flink 目前内置了针对 GPU 资源的插件,其内部通过执行名为 Discovery Script 的脚本来获取当前环境可用的 GPU 信息,目前信息中包含了 GPU 设备的 Index。
Flink 提供了一个默认脚本,位于项目的 "plugins/external-resource-gpu/" 目录,用户也可以实现自定义的 Discovery Script 并通过配置来指定使用自定义脚本。该脚本与 GPU 插件的协议为:
Flink 提供的默认脚本是通过 "nvidia-smi" 工具来获取当前的机器中可用的 GPU 数量以及 index,并根据所需要的 GPU 数量返回对应数量的 GPU Index 列表。当无法获取到所需数量的 GPU 时,脚本将以非零值退出。
GPU 设备的资源分为两个维度,流处理器与显存,其显存资源只支持独占使用。因此,当多个 TaskManager 运行在同一台机器上时,若一块 GPU 被多个进程使用,可能导致其显存 OOM。因此,Standalone 模式下,需要 TaskManager 级别的资源隔离机制。
默认脚本提供了 Coordination Mode 来支持单机中多个 TaskManager 进程之间的 GPU 资源隔离。该模式通过使用文件锁来实现多进程间 GPU 使用信息同步,协调同一台机器上多个 TaskManager 进程对 GPU 资源的使用。
5. 在算子中获取扩展资源信息
在用户自定义算子中,可使用在 "external-resources" 中定义的资源名称来调用 RuntimeContext 的 getExternalResourceInfos 接口获取对应扩展资源的信息。以 GPU 为例,得到的每个 ExternalResourceInfo 代表一块 GPU 卡,而其中包含名为 "index" 的字段代表该 GPU 卡的设备 Index。
public class ExternalResourceMapFunction extends RichMapFunction<String, String> { private static finalRESOURCE_NAME="gpu"; @Override public String map(String value) { Set<ExternalResourceInfo> gpuInfos = getRuntimeContext().getExternalResourceInfos(RESOURCE_NAME); List<String> indexes = gpuInfos.stream() .map(gpuInfo -> gpuInfo.getProperty("index").get()).collect(Collectors.toList()); // Map function with GPU// ... }}
6. MNIST Demo
下图以 MNIST 数据集的识别任务来演示使用 GPU 加速 Flink 作业。
MNIST 如上图所示,为手写数字图片数据集,每个图片可表示为为 28*28 的矩阵。在该任务中,我们使用预训练好的 DNN 模型,图片输入经过一层全连接网络得到一个 10 维向量,该向量最大元素的下标即为识别结果。
我们在一台拥有两块 GPU 卡的 ECS 上启动一个有两个 TaskManager 进程的 Standalone 集群。借助默认脚本提供的 Coordination Mode 功能,我们可以保证每个 TaskManager 各使用其中一块 GPU 卡。
该任务的核心算子为图像识别函数 MNISTClassifier,核心实现如下所示
class MNISTClassifier extends RichMapFunction<List<Float>, Integer> { @Override public void open(Configuration parameters) { //获取GPU信息并且选择第一块GPU Set<ExternalResourceInfo> externalResourceInfos = getRuntimeContext().getExternalResourceInfos(resourceName); final Optional<String> firstIndexOptional = externalResourceInfos.iterator().next().getProperty("index"); // 使用第一块GPU的index初始化JCUDA组件 JCuda.cudaSetDevice(Integer.parseInt(firstIndexOptional.get())); JCublas.cublasInit(); }}
在 Open 方法中,从 RuntimeContext 获取当前 TaskManager 可用的 GPU,并选择第一块来初始化 JCuda 以及 JCublas 库。
class MNISTClassifier extends RichMapFunction<List<Float>, Integer> { @Override public Integer map(List<Float> value) { // 使用Jucblas做矩阵算法 JCublas.cublasSgemv('n', DIMENSIONS.f1, DIMENSIONS.f0, 1.0f, matrixPointer, DIMENSIONS.f1, inputPointer, 1, 0.0f, outputPointer, 1); // 获得乘法结果并得出该图所表示的数字 JCublas.cublasGetVector(DIMENSIONS.f1, Sizeof.FLOAT, outputPointer, 1, Pointer.to(output), 1); JCublas.cublasFree(inputPointer); JCublas.cublasFree(outputPointer); int result = 0; for (int i = 0; i < DIMENSIONS.f1; ++i) { result = output[i] > output[result] ? i : result; } return result; }}
在 Map 方法中,将预先训练好的模型参数与输入矩阵放入 GPU 显存,使用 JCublas 进行 GPU 中的矩阵乘法运算,最后将结果向量从 GPU 显存中取出并得到识别结果数字。
具体案例演示流程可以前往观看视频或者参考 Github 上面的链接动手尝试。
除了上文介绍的这些已经发布的特性外,Apache Flink 社区也正在积极准备更多资源管理方面的优化特性,在未来的版本中将陆续和大家见面。
通过文章的介绍,相信大家对 Flink 内存管理有了更加清晰的认知。
原文链接:http://click.aliyun.com/m/1000284093/
本文为阿里云原创内容,未经允许不得转载。