简单分析一下GraphX是怎么为图数据建模和存储的。
可以看GraphLoader的函数,
def edgeListFile(
sc: SparkContext,
path: String,
canonicalOrientation: Boolean = false,
numEdgePartitions: Int = -1,
edgeStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY,
vertexStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY)
: Graph[Int, Int]
以下是代码,比较清晰地展现了内部存储结构。
private[graphx]
class EdgePartition[
@specialized(Char, Int, Boolean, Byte, Long, Float, Double) ED: ClassTag, VD: ClassTag](
localSrcIds: Array[Int],
localDstIds: Array[Int],
data: Array[ED],
index: GraphXPrimitiveKeyOpenHashMap[VertexId, Int],
global2local: GraphXPrimitiveKeyOpenHashMap[VertexId, Int],
local2global: Array[VertexId],
vertexAttrs: Array[VD],
activeSet: Option[VertexSet])
extends Serializable {
/**
* Stores the locations of edge-partition join sites for each vertex attribute in a particular
* vertex partition. This provides routing information for shipping vertex attributes to edge
* partitions.
*/
private[graphx]
class RoutingTablePartition(
private val routingTable: Array[(Array[VertexId], BitSet, BitSet)]) extends Serializable {
EdgeRDD的分区怎么切分的呢?因为数据是根据HadoopRDD从文件里根据offset扫出来的。可以理解为对边数据的切分是没有任何处理的,因为文件也没有特殊排列过,所以切分成多少个分区应该就是随机的。
VertexRDD的分区怎么切分的呢?EdgeRDD生成的vertexIdToPartitionId这份RDD数据是RDD[VertexId, Int]型,它根据hash分区规则,分成和EdgeRDD分区数一样大。所以VertexRDD的分区数和Edge一样,分区规则是Long取hash。
所以我可以想象的计算过程是:
对点操作的时候,首先对vertexId(是个Long)进行hash,找到对应分区的位置,在这个分区上,如果是内存存储的VertexRDD,那很快可以查到它的边所在的几个Edge分区的所在位置,然后把计算分到这几个Edge所在的分区上去计算。
第一步根据点hash后找边分区位置的过程就类似一次建好索引的查询。
配官方图方面理解:
对原生类型的存储和读写有比较好的数据结构支持,典型的是EdgePartition里使用的map:
/**
* A fast hash map implementation for primitive, non-null keys. This hash map supports
* insertions and updates, but not deletions. This map is about an order of magnitude
* faster than java.util.HashMap, while using much less space overhead.
*
* Under the hood, it uses our OpenHashSet implementation.
*/
private[graphx]
class GraphXPrimitiveKeyOpenHashMap[@specialized(Long, Int) K: ClassTag,
@specialized(Long, Int, Double) V: ClassTag](
以及之前提到的vector
/**
* An append-only, non-threadsafe, array-backed vector that is optimized for primitive types.
*/
private[spark]
class PrimitiveVector[@specialized(Long, Int, Double) V: ClassTag](initialSize: Int = 64) {
private var _numElements = 0
private var _array: Array[V] = _
全文完 :)
版权声明:本文为博主原创文章,未经博主允许不得转载。
原文地址:http://blog.csdn.net/pelick/article/details/47293495