Loading
0

使用Cloudera Machine Learning实施深度学习的分布式模型训练

许多企业数据科学团队正在使用Cloudera的机器学习平台进行模型探索和培训,包括使用Tensorflow,PyTorch等创建深度学习模型。但是,训练深度学习模型通常是一个耗时的过程,因此采用GPU和分布式模型训练方法来加快训练速度。

这是我们关于在Cloudera机器学习平台上进行深度学习的分布式模型训练的博客系列的第一篇文章,其中包括Cloudera数据科学工作台(CDSW)和Cloudera机器学习(CML),这是为云构建的新一代CDSW。在下文中,为简单起见,我们仅指“CML”,且此文的内容也适用于CDSW安装。

在这篇文章中,我们将介绍:

  • 深度学习的分布式模型训练的基本方法
  • 如何将这些方法集成到CML中的方法
  • 新功能可自动执行基于参数服务器的分布式Tensorflow模型训练
  • 以及使用即将推出的CML功能启用MPI Allreduce分布式模型训练的预览。

深度学习的分布式模型训练的技术基础

通常使用随机梯度后裔(SGD)算法训练深度学习模型。对于SGD的每次迭代,我们将从训练集中采样一个小批量,将其输入到训练模型中,计算观察值和实际值的损失函数的梯度,并更新模型参数(或权重)。众所周知,SGD迭代必须顺序执行,因此不可能通过并行化迭代来加快训练过程。但是,由于使用CIFAR10或IMAGENET等许多常用模型处理一次迭代要花费很长时间,即使使用最先进的GPU,我们仍然可以尝试并行化前馈计算以及每次迭代中的梯度计算以加快速度加快模型训练过程。

在实践中,我们将训练数据的微型批次分为几个部分,例如4、8、16等(在本文中,我们将使用术语“子批次”来指代这些拆分的部分),并且每个培训工人分一个批次。然后,培训人员分别使用子批进行前馈、梯度计算和模型更新,就像在整体培训模式中一样。在这些步骤之后,将调用称为模型平均的过程,对参与培训的所有工作人员的模型参数求平均,以便在新的培训迭代开始时使模型参数完全相同。然后,新一轮的训练迭代又从数据采样和拆分步骤开始。

形式上,上面的分布式模型训练过程的模型平均的一般思想可以使用以下伪代码表示。

# Suppose w0 is the initial global parameters, K is the number of workers, T is the overall iterating number, ftk(wt) is the output of the kth worker under the parameters wt at time t, and lr is the learning rate.
FOR t = 0, 1, …, T-1 DO
    Read the current model parameters wt
    Stochastically sample a batch of data itk
    Compute the stochastic gradients 𝛻ftk(wt) at each worker
    Accumulate all of the gradients of K workers
    Update the global parameters wt+1=wt-ltK𝛻ftk(wt)
END FOR

如上所示,在每次迭代结束时,我们一直等到模型参数达到一致性为止,因此可以在新的迭代开始之前使模型同步。这种方法称为同步SGD,这是我们将在本文中考虑的方法。(另一种方法是异步SGD,在异步SGD中,模型参数存储在称为参数服务器的集中位置,并且工作线程在每次迭代结束时自行独立于参数服务器进行更新,而与其他工作线程的状态无关。同步SGD、异步SGD的整体训练速度不会受到单个“慢”工人的影响,但是,如果训练集群中的GPU处理速度大致相同(通常是这种情况),在实际情况中显然很“慢”,因此,同步SGD在ML应用领域中是一个不错的选择。)

上述算法中的每个工作人员在训练过程中还具有模型参数的完整副本,只有训练数据分配给不同的工作人员。这种方法称为数据并行性,这是我们将在此处考虑的方法。另一种称为模型并行性的方法也可以拆分模型参数。模型并行性的优点是能够训练大于内存容量(主内存或GPU内存)的模型。但是,如果模型的大小小于内存容量,则数据并行性将更加高效,因为在每次迭代的前馈期间它不需要工作人员之间的通信。

“参数服务器”对比“MPI Allreduce”

许多深度学习框架,例如Tensorflow,PyTorch和Horovod,都支持分布式模型训练。它们在模型参数的平均或同步方式上有很大不同。当前,有两种模型同步方法:1)基于参数服务器,和2)MPI Allreduce。

上图显示了基于参数服务器的体系结构。在这种方法中,计算节点被划分为工作程序和参数服务器。每个工作人员“拥有”一部分数据和工作负载,并且参数服务器共同维护全局共享的参数(使用参数服务器扩展分布式机器学习)。在每次迭代的开始,工作人员会提取完整的模型参数副本,并在迭代结束时将新更新的模型推回参数服务器。对于同步SGD,参数服务器将平均所有工作人员推送的模型参数,从而创建更新的“全局”模型供工作人员在下一次迭代开始时提取。

另一方面,MPI Allreduce方法不需要一组专用服务器来存储参数。取而代之的是,它利用环减少(将HPC技术带入深度学习)算法和MPI(消息传递接口)API来实现模型同步。对于由N个节点组成的模型训练集群,模型参数将被划分为N个块,并且参与环归约算法的每个节点都将与其两个对等节点进行2∗(N−1)次通信。因此,从理论上讲,模型平均时间仅与模型的大小有关,而与节点的数量无关。在此通信期间,节点发送和接收数据缓冲区的块。在前N-1次迭代中,将接收到的值添加到节点缓冲区中的值。在第二次N-1迭代中,接收到的值替换了保存在节点缓冲区中的值。MPI API是由高性能计算社区开发的,用于实现模型参数同步,而Open MPI是由学术,研究和行业合作伙伴组成的联盟开发和维护的,广泛使用的MPI实现之一。

关于与MPI Allreduce方法相比基于参数服务器的方法的性能,在Uber和MXNet中的基准测试结果表明,在小数量的节点(8-64)上,MPI Allreduce的性能优于参数服务器(Horovod:快速而轻松在TensorFlow中进行分布式深度学习,并通过MPI AllReduce扩展MXNet分布式培训)。

上图是Uber的基准测试,参数服务器(Tensorflow本机)与MPI Allreduce(Horovod)的结果,当在不同数量的NVIDIA Pascal GPU上运行分布式培训作业时,将每秒处理的图像与标准分布式TensorFlow和Horovod进行比较适用于基于25GbE TCP的Inception V3和ResNet-101 TensorFlow模型。同时,下面的MXNet基准测试结果还显示,即使参数服务器和辅助服务器的数量均为8,MPI Allreduce方法的性能仍高于参数服务器方法。

从性能数据中,我们可以得出以下结论(由MPI AllReduce扩展MXNet分布式培训):1)MPI Allreduce方法不需要额外的服务器节点,并且可以获得比基于参数服务器的方法更好的性能。同步SGD多节点训练。(在基于参数服务器的方法中,如果配置不当,则不足的服务器将成为网络带宽的热点。)2)此外,MPI Allreduce方法更易于硬件部署。(在基于参数服务器的方法中,需要精心计算服务器:工人比率的配置,并且该比率不是固定的(取决于拓扑和网络带宽)。)

传统上,Tensorflow支持基于参数服务器的方法,而PyTorch和Horovod支持MPI Allreduce方法。但是,从r1.3开始,Tensorflow也开始支持MPI Allreduce方法(在r1.4中具有实验支持)。

注意:基于参数服务器的方法能够支持同步和异步SGD,例如Tensorflow。据我们所知,MPI Allreduce方法的所有当前实现仅支持同步SGD。

有了这些基础知识,让我们继续进行分布式模型训练的编程部分,同时使用基于参数服务器的参数和MPI Allreduce方法,并了解如何在CML中使用这两种方法。

使用CML中基于参数服务器的方法进行编程

本节将概述用CML编写基于参数服务器的分布式模型训练代码的概述。我们将使用Tensorflow本机分布式API和CML的分布式API(cdsw.launch_worker)进行演示。

首先,分布式Tensorflow中的每个参数服务器或工作程序都是一个Python进程。因此,我们很自然地使用CML工作器(或容器)来表示TF参数服务器或TF工作器,并使用cdsw.launch_workers(…)函数在主CML会话中调用这些CML工作器。在cdsw.launch_workers(…)中,我们还可以为TF参数服务器和TF工作者指定不同的Python程序文件。然后,主要的CML会话需要收集每个容器的主机名或IP地址,并将它们发送给所有CML子工作程序,以创建集群规范(tf.train.ClusterSpec)。在CML中,实际上有许多方法可以获取每个子工作者的IP地址,我们将介绍一种使用新的await_workers函数的方法,该函数在CML Docker引擎V10中正式可用。

await_workers函数用于等待其他由其会话ID指定的CML容器的启动。 await_workers的返回值是一个Python字典,带有一个项的键名是ip_address,并带有其IP地址。下面的代码显示了如何在CML主会话中使用cdsw_await_workers。请注意,如果某些容器在指定的时间(例如,以下代码为60秒)后无法启动,则await_workers的返回值将导致键名失败的项,其中包含失败人员的会话ID。

# CML main session
import cdsw
workers = cdsw.launch_workers(NUM_WORKERS,
                              cpu=0.5, memory=2,
                              script=”...”)
worker_ids = [worker["id"] for worker in workers]
running_workers = cdsw.await_workers(worker_ids,
                              wait_for_completion=False,
                              timeout_seconds=60)
worker_ips = [worker["ip_address"] for worker in \
                              running_workers["workers"]]

在获取并分配所有TF参数服务器和TF工作程序的IP地址之后,每个工作程序都需要构造实例。在下面的代码段中,PS1:PORT1代表第一个TF参数服务器进程的IP地址和端口号,PS2:PORT2代表第二个TF参数服务器进程的IP地址和端口号,而WORKER1:PORT1代表第二个TF参数服务器进程的IP地址和端口号。 第一个TF工作程序的IP地址和端口号等

cluster = tf.train.ClusterSpec({"ps": ["PS1:PORT1","PS2:PORT2",...],
                     "worker": ["WORKER1:PORT1","WORKER2:PORT2",...]})
server = tf.train.Server(cluster, job_name="PS or WORKER",
                                  task_index=NUM)

对于TF参数服务器容器,请调用server.join()等待,直到所有其他参数服务器进程和辅助进程都加入集群。

server.join()

对于TF工作人员,所有的建模和培训代码都需要进行编程。如果您使用数据并行性,那么建模部分实际上与整体式Tensorflow程序相同。但是,该训练代码与整体Tensorflow程序至少有2个明显的不同。

  • 使用tf.train.SyncReplicasOptimizer包装模型优化器,tf.train.SyncReplicasOptimizer是整体Tensorflow优化器的同步SGD版本。这是tf.train.SyncReplicasOptimizer的示例代码。
optimizer = tf.train.AdamOptimizer(learning_rate=...)
sr_optim = tf.train.SyncReplicasOptimizer(
                     optimizer,
                     replicas_to_aggregate=NUM_WORKER,
                     total_num_replicas=NUM_WORKER)
  • 通常,建议使用tf.train.MonitoredTrainingSession代替tf.Session进行培训,因为MonitoredTrainingSession提供了许多监视和自动管理功能,这些功能对于高效的分布式模型培训环境至关重要。

注意:在最新版本的Tensorflow中不推荐使用tf.train.Supervisor,现在建议使用tf.train.MonitoredTrainingSession代替tf.train.Supervisor。

每次我们编写分布式Tensorflow代码时,重复上述上述编程过程不仅很耗时,而且容易出错。因此,我们将它们包装在一个函数(cdsw_tensorflow_utils.run_cluster)中,该函数随此文章一起发布,从而使整个过程自动化,因此数据科学家仅需指定参数服务器,工作程序和培训代码的数量即可。可以在此处找到包含该功能的脚本。以下程序演示了如何使用cdsw_tensorflow_utils.run_cluster创建分布式Tensorflow集群。

cluster_spec, session_addr = cdsw_tensorflow_utils.run_cluster(
                                 n_workers=n_workers,
                                 n_ps=n_ps,
                                 cpu=0.5,
                                 memory=2,
                                 worker_script="train.py")

文件train.py是模型定义和训练代码所在的地方,它看起来很像单片Tensorflow代码。train.py的结构如下:

import sys, time
import tensorflow as tf

 
# config model training parameters
batch_size = 100
learning_rate = 0.0005
training_epochs = 20

 
# load data set
from tensorflow.examples.tutorials.mnist import input_data
mnist = input_data.read_data_sets('MNIST_data', one_hot=True)

 
# Define the run() function with the following arguments
# And this function will be invoked within CML API
def run(cluster, server, task_index):

 
    # Specify cluster and device in tf.device() function
    with tf.device(tf.train.replica_device_setter(
        worker_device="/job:worker/task:%d" % task_index,
        cluster=cluster)):

 
        # Count the number of updates
        global_step = tf.get_variable(
            'global_step',
            [],
            initializer = tf.constant_initializer(0),
            trainable = False)


        # Model definition
        …

 
    # Define a tf.train.Supervisor instance
    # and use it to start model training
    sv = tf.train.Supervisor(is_chief=(task_index == 0),
                            global_step=global_step,
                            init_op=init_op)
    with sv.prepare_or_wait_for_session(server.target) as sess:

 
        # Model training code
        …

 
    # Stop the Supervisor instance
    sv.stop()

下面的屏幕快照显示了使用上面介绍的CML内置API在CML上平均异步模型的分布式模型训练程序的执行过程。

MPI Allreduce CML编程方法预览

本节概述了使用CML中的MPI Allreduce方法执行分布式模型训练的过程,并使用Horovod进行了实现。

使用Horovod时,驱动程序节点(在此上下文中为CML主会话)需要执行SSH无密码登录到Horovod辅助节点,以启动所有模型训练过程。 要在CML中为用户cdsw启用SSH无密码登录,需要两个设置步骤:1)为用户cdsw设置无密码身份验证,以及2)指定默认的SSH侦听端口从22到2222。

  • 在CML中为用户cdsw启用SSH无密码身份验证非常容易。您只需要转到用户设置页面,然后将公钥从“outbound ssh”标签复制到“remote editing”标签即可。在那之后,同一用户内的所有会话都可以使用无密码登录相互进行SSH交换。

注意:CML的April 14及更高版本不需要此步骤。

  • CML引擎的sshd侦听端口为2222。使用mpirun命令调用Horovod worker时,可以创建~/.ssh/config文件,并输入如下内容。
Host 100.66.0.29
Port 2222
Host 100.66.0.30
Port 2222

否则,使用horovodrun命令时,只需为horovodrun指定一个额外的参数-p 2222。

接下来,启动几个CML工作器容器,并等待直到获得启动的工作器的IP地址。 (此方法与基于参数服务器的分布式深度学习方法中使用的方法完全相同。)然后,在这些启动的工作程序中启动Horovod模型训练过程,可以通过在os.system中调用horovodrun命令来实现() Python函数。这两个步骤都可以通过CML主会话中的Python代码完成。以下是实现功能的示例代码,train.py只是用于模型训练的Python代码。

# CML main session
Import os
import cdsw

 
workers = cdsw.launch_workers(NUM_WORKERS,
                              cpu=0.5, memory=2,
                              script=”...”)
worker_ids = [worker["id"] for worker in workers]
running_workers = cdsw.await_workers(worker_ids,
                              wait_for_completion=False,
                              timeout_seconds=60)
worker_ips = [worker["ip_address"] for worker in \
                              Running_workers["workers"]]
cmd="horovodrun -np {} -H {} -p 2222 python train.py".format(
                              len(worker_ips),
                              ",".join(worker_ips))

 
os.system(cmd)

在MPI Allreduce方法中,仍然需要修改模型训练文件,即上面的示例代码的train.py。在train.py中,代码有2个主要修改:1)在工作进程之间创建集群环境,以及2)执行模型平均。除了对MPI Allreduce方法及其在CML中的实现进行技术深入研究之外,我们还将在本系列的下一篇文章中讨论这些主题,并介绍这些方法的性能基准测试结果。

Cloudera试用和报价


关于Cloudera

在 Cloudera,我们相信数据可以使今天的不可能,在明天成为可能。我们使人们能够将复杂的数据转换为清晰而可行的洞察力。Cloudera 为任何地方的任何数据从边缘到人工智能提供企业数据云平台服务。在开源社区不懈创新的支持下, Cloudera推动了全球最大型企业的数字化转型历程。了解更多,请联系在线客服

慧都大数据专业团队为企业提供Cloudera大数据平台搭建,免费业务咨询,定制开发等完整服务,快速、轻松、低成本将任何Hadoop集群从试用阶段转移到生产阶段。

欢迎拨打慧都热线023-68661681或咨询慧都在线客服,我们有专业的大数据团队,为您提供免费大数据相关业务咨询!