标签:check examples eve avg form inf 分配 操作 occurs
horovod使用学习方式参考:https://github.com/uber/horovod#usage
To use Horovod, make the following additions to your program:
添加下述代码到程序中,就可以使用Horovod来分布式运行tensorflow程序:
Run hvd.init()
.
添加hvd.init()
Pin a server GPU to be used by this process using session_config.gpu_options.visible_device_list
. With the typical setup of one GPU per process, this can be set to local rank. In that case, the first process on the server will be allocated the first GPU, second process will be allocated the second GPU and so forth.
使用session_config.gpu_options.visible_device_list指定要使用机器的哪个GPU。如果想让一个进程使用一个GPU,该值可以设置为local rank,
例如:session_config.gpu_options.visible_device_list = str(hvd.local_rank())。
这样设置的话,机器上的第一个GPU就被分配给了第一个进程;第二个GPU就被分配给了第二个进程;依此类推。
Scale the learning rate by number of workers. Effective batch size in synchronous distributed training is scaled by the number of workers. An increase in learning rate compensates for the increased batch size.
学习率需要放到到原来的n倍,n等于worker的个数。同步分布式训练的实际批的大小也根据worker个数扩容,学习率的增长要和批大小的增长保持一致。
例如:opt = tf.train.RMSPropOptimizer(0.001 * hvd.size())
Wrap optimizer in hvd.DistributedOptimizer
. The distributed optimizer delegates gradient computation to the original optimizer, averages gradients using allreduce or allgather, and then applies those averaged gradients.
在hvd.DistributedOptimizer中封装一个tf.optimizer。 DistributedOptimizer会先调用被封装的optimizer的compute_gradient()方法,然后使用allreduce或者allgather获得梯度并计算梯度的均值,然后再应用平均梯度。
本节详细介绍hvd.DistributedOptimizer。
Add hvd.BroadcastGlobalVariablesHook(0)
to broadcast initial variable states from rank 0 to all other processes. This is necessary to ensure consistent initialization of all workers when training is started with random weights or restored from a checkpoint. Alternatively, if you‘re not using MonitoredTrainingSession
, you can simply execute the hvd.broadcast_global_variables
op after global variables have been initialized.
使用hvd.BroadcastGlobalVariablesHook(0)可以从rank 0向其他所有进程广播初始变量的值。以此来保证在训练开始或者从checkpoint恢复时,所有worker上初始值是相同的。
或者,如果没有使用MonitoredTrainingSession,可以在所有全局变量初始化之后,直接执行hvd.broadcast_global_variables操作来广播初始值。
Modify your code to save checkpoints only on worker 0 to prevent other workers from corrupting them. This can be accomplished by passing checkpoint_dir=None
to tf.train.MonitoredTrainingSession
if hvd.rank() != 0
.
只有在rank 0上需要保存checkpoint,例如: checkpoint_dir = ‘./checkpoints‘ if hvd.rank() == 0 else None。
Example (see the examples directory for full training examples):
import tensorflow as tf import horovod.tensorflow as hvd # Initialize Horovod hvd.init() # Pin GPU to be used to process local rank (one GPU per process) config = tf.ConfigProto() config.gpu_options.visible_device_list = str(hvd.local_rank()) # Build model... loss = ... opt = tf.train.AdagradOptimizer(0.01 * hvd.size()) # Add Horovod Distributed Optimizer opt = hvd.DistributedOptimizer(opt) # Add hook to broadcast variables from rank 0 to all other processes during # initialization. hooks = [hvd.BroadcastGlobalVariablesHook(0)] # Make training operation train_op = opt.minimize(loss) # Save checkpoints only on worker 0 to prevent other workers from corrupting them. checkpoint_dir = ‘/tmp/train_logs‘ if hvd.rank() == 0 else None # The MonitoredTrainingSession takes care of session initialization, # restoring from a checkpoint, saving to a checkpoint, and closing when done # or an error occurs. with tf.train.MonitoredTrainingSession(checkpoint_dir=checkpoint_dir, config=config, hooks=hooks) as mon_sess: while not mon_sess.should_stop(): # Perform synchronous training. mon_sess.run(train_op)
本节就重点学习hvd.DistributedOptimizer
类。
# DistributedOptimizer继承了tf.train.Optimizer class DistributedOptimizer(tf.train.Optimizer): """An optimizer that wraps another tf.Optimizer, using an allreduce to average gradient values before applying gradients to model weights. DistributedOptimizer封装了另外一个tf.Optimizer,在模型应用梯度之前 使用allreduce操作收集梯度值并求其均值。 """ def __init__(self, optimizer, name=None, use_locking=False, device_dense=‘‘, device_sparse=‘‘): """Construct a new DistributedOptimizer, which uses another optimizer under the hood for computing single-process gradient values and applying gradient updates after the gradient values have been averaged across all the Horovod ranks. Args: optimizer: Optimizer to use for computing gradients and applying updates. name: Optional name prefix for the operations created when applying gradients. Defaults to "Distributed" followed by the provided optimizer type. use_locking: Whether to use locking when updating variables. See Optimizer.__init__ for more info. device_dense: Device to be used for dense tensors. Uses GPU by default if Horovod was build with HOROVOD_GPU_ALLREDUCE. device_sparse: Device to be used for sparse tensors. Uses GPU by default if Horovod was build with HOROVOD_GPU_ALLGATHER. """ if name is None: name = "Distributed{}".format(type(optimizer).__name__) self._optimizer = optimizer self._device_dense = device_dense self._device_sparse = device_sparse super(DistributedOptimizer, self).__init__( name=name, use_locking=use_locking) def compute_gradients(self, *args, **kwargs): """Compute gradients of all trainable variables. See Optimizer.compute_gradients() for more info. In DistributedOptimizer, compute_gradients() is overridden to also allreduce the gradients before returning them. """ # self._optimizer表示原始的optimizer # 调用其compute_gradients()方法来计算所有训练参数的梯度 # compute_gradients()方法返回一个元祖(gradient,variable)的列表 gradients = self._optimizer.compute_gradients(*args, **kwargs) # size()表示worker的个数,如果size() > 1,表示分布式运行 if size() > 1: # 分布式运行需要计算平均梯度值 averaged_gradients = [] with tf.name_scope(self._name + "_Allreduce"): # 遍历元祖(gradient,variable)的列表 for grad, var in gradients: if grad is not None: # 使用allreduce()与其他worker同步grad # allreduce():Perform an allreduce on a tf.Tensor or tf.IndexedSlices avg_grad = allreduce(grad, device_dense=self._device_dense, device_sparse=self._device_sparse) # 将同步后的avg_grad添加到列表中 averaged_gradients.append((avg_grad, var)) else: averaged_gradients.append((None, var)) return averaged_gradients else: return gradients def apply_gradients(self, *args, **kwargs): """Calls this same method on the underlying optimizer.""" return self._optimizer.apply_gradients(*args, **kwargs) def get_slot(self, *args, **kwargs): """Calls this same method on the underlying optimizer.""" return self._optimizer.get_slot(*args, **kwargs) def get_slot_names(self, *args, **kwargs): """Calls this same method on the underlying optimizer.""" return self._optimizer.get_slot_names(*args, **kwargs) def variables(self, *args, **kwargs): """Calls this same method on the underlying optimizer.""" return self._optimizer.variables(*args, **kwargs)
horovod使用学习之一 -- hvd.DistributedOptimizer(optimizer)
标签:check examples eve avg form inf 分配 操作 occurs
原文地址:https://www.cnblogs.com/lixiaolun/p/9172364.html