标签:耗资源 get bsp spark 其他 node 采样 执行 data
Spark(分布式的计算平台),分布式:指计算节点之间不共享内存,需要通过网络通信的方式交换数据。Spark最典型的应用方式是建立在大量廉价计算节点(廉价主机、虚拟的docker container)上;但这种方式区别于CPU+GPU的架构和共享内存多处理器的高性能服务器架构。
从Spark架构图看出,Manager node调度组织Spark程序,而Worker Node(可分为不同的partition(数据分片),是spark的基本出来单元)执行具体的计算任务,然后将结果返回给Drive Program。
执行具体的程序时,Spark将程序拆解成一个任务DAG(有向无环图),再根据DAG决定程序各步骤执行的方法。如下图所示,该程序先分别从textFile和HadoopFile读取文件,经过一些列操作后再进行join,最终得到处理结果。
在Spark平台上并行处理DAG时,最关键的过程是找到哪些是可以并行处理的部分,哪些是shuffle和reduce。
shuffle指的是所有partition的数据必须进行洗牌后才能得到下一步的数据,最典型的操作就是groupByKey和join操作。拿join操作来说,必须通过在textFile数据中和hadoopFile数据中做全量的匹配才可以得到join后的dataframe。而groupby操作需要对数据中所有相同的key进行合并,也需要全局的shuffle才能够完成。
map,filter等操作仅需要逐条的进行数据处理和转换就可以,不需要进行数据间的操作,因此各partition之间可以并行处理。
在得到最终的计算结果之前,程序需要进行reduce的操作,从各partition上汇总统计结果,随着partition的数量逐渐减小,reduce操作的并行程度逐渐降低,直到将最终的计算结果汇总到master节点上。
DAG根据宽依赖划分成stag(如图2粉色框),而shuffle操作是宽依赖,需要在不同计算节点之间进行数据交换,非常消耗计算、通信及存储资源,因此shuffle操作是spark程序应该尽量避免的。Spark的计算过程:Stage内部数据高效并行计算,Stage边界处进行消耗资源的shuffle操作或者最终的reduce操作。
Spark MLlib如何实现Random Forest(完全可以实现数据并行,而GBDT智能并行)这里不讨论,是讨论如何实现深度学习的分布式训练。梯度下降的并行程度实现质量直接决定了深度学习模型的训练速度。
MiniBatch梯度下降的源码(猜测在sparkcontext上运行)
while (!converged && i <= numIterations) { val bcWeights = data.context.broadcast(weights) // Sample a subset (fraction miniBatchFraction) of the total data // compute and sum up the subgradients on this subset (this is one map-reduce) val (gradientSum, lossSum, miniBatchSize) = data.sample(false, miniBatchFraction, 42 + i) .treeAggregate((BDV.zeros[Double](n), 0.0, 0L))( seqOp = (c, v) => { // c: (grad, loss, count), v: (label, features) val l = gradient.compute(v._2, v._1, bcWeights.value, Vectors.fromBreeze(c._1)) (c._1, c._2 + l, c._3 + 1) }, combOp = (c1, c2) => { // c: (grad, loss, count) (c1._1 += c2._1, c1._2 + c2._2, c1._3 + c2._3) }) bcWeights.destroy(blocking = false) if (miniBatchSize > 0) { /** * lossSum is computed using the weights from the previous iteration * and regVal is the regularization value computed in the previous iteration as well. */ stochasticLossHistory += lossSum / miniBatchSize + regVal val update = updater.compute( weights, Vectors.fromBreeze(gradientSum / miniBatchSize.toDouble), stepSize, i, regParam) weights = update._1 regVal = update._2 previousWeights = currentWeights currentWeights = Some(weights) if (previousWeights != None && currentWeights != None) { converged = isConverged(previousWeights.get, currentWeights.get, convergenceTol) } } else { logWarning(s"Iteration ($i/$numIterations). The size of sampled batch is zero") } i += 1 }
简化版:
while (i <= numIterations) { //迭代次数不超过上限 val bcWeights = data.context.broadcast(weights) //广播模型所有权重参数 val (gradientSum, lossSum, miniBatchSize) = data.sample(false, miniBatchFraction, 42 + i) .treeAggregate() //各节点采样后计算梯度,通过treeAggregate汇总梯度 val weights = updater.compute(weights, gradientSum / miniBatchSize) //根据梯度更新权重 i += 1 //迭代次数+1 }
Spark只有简单的数据平行,没有参数并行,mini batch过程制作了三件事,:
1. 把当前的模型参数广播到各个数据partition(可当作虚拟的计算节点)
2. 各计算节点进行数据抽样得到mini batch的数据,分别计算梯度(可并行),再通过treeAggregate操作汇总梯度,得到最终梯度gradientSum
3. 利用gradientSum更新模型权重
其中treeAggregate,逐层汇总求和,这个操作是一次reduce操作,本身并不包含shuffle操作,再加上采用分层的树形操作,在每层中都是并行执行的,因此整个过程是相对高效的。
1. 采用全局广播的方式,在每轮迭代前广播全部模型参数。众所周知Spark的广播过程非常消耗带宽资源,特别是当模型的参数规模过大时,广播过程和在每个节点都维护一个权重参数副本的过程都是极消耗资源的过程,这导致了Spark在面对复杂模型时的表现不佳;
2. 采用阻断式的梯度下降方式,每轮梯度下降由最慢的节点决定。从上面的分析可知,Spark MLlib的mini batch的过程是在所有节点计算完各自的梯度之后,逐层Aggregate最终汇总生成全局的梯度。也就是说,如果由于数据倾斜等问题导致某个节点计算梯度的时间过长,那么这一过程将block其他所有节点无法执行新的任务。这种同步阻断的分布式梯度计算方式,是Spark MLlib并行训练效率较低的主要原因;
3. Spark MLlib并不支持复杂网络结构和大量可调超参。事实上,Spark MLlib在其标准库里只支持标准的多层感知机神经网络的训练,并不支持RNN,LSTM等复杂网络结构,而且也无法选择不同的activation function等大量超参。这就导致Spark MLlib在支持深度学习方面的能力欠佳。
参考资料:
1.https://zhuanlan.zhihu.com/p/81784947
分布式深度学习(Spark MLlib,Parameter Server、Ring-allreduce和Tensorflow )
标签:耗资源 get bsp spark 其他 node 采样 执行 data
原文地址:https://www.cnblogs.com/yutingmoran/p/12092815.html