标签:
我们都知道Spark内部提供了HashPartitioner
和RangePartitioner
两种分区策略,这两种分区策略在很多情况下都适合我们的场景。但是有些情况下,Spark内部不能符合咱们的需求,这时候我们就可以自定义分区策略。为此,Spark提供了相应的接口,我们只需要扩展Partitioner
抽象类,然后实现里面的三个方法:
01 |
package org.apache.spark |
02 |
03 |
/** |
04 |
* An object that defines how the elements in a key-value pair RDD are partitioned by key. |
05 |
* Maps each key to a partition ID, from 0 to `numPartitions - 1`. |
06 |
*/ |
07 |
abstract class Partitioner extends Serializable { |
08 |
def numPartitions : Int |
09 |
def getPartition(key : Any) : Int |
10 |
} |
def numPartitions: Int
:这个方法需要返回你想要创建分区的个数;
def getPartition(key: Any): Int
:这个函数需要对输入的key做计算,然后返回该key的分区ID,范围一定是0到numPartitions-1
;
equals()
:这个是Java标准的判断相等的函数,之所以要求用户实现这个函数是因为Spark内部会比较两个RDD的分区是否一样。
假如我们想把来自同一个域名的URL放到一台节点上,比如:http://www.iteblog.com
和http://www.iteblog.com/archives/1368
,如果你使用HashPartitioner
,这两个URL的Hash值可能不一样,这就使得这两个URL被放到不同的节点上。所以这种情况下我们就需要自定义我们的分区策略,可以如下实现:
01 |
package com.iteblog.utils |
02 |
03 |
import org.apache.spark.Partitioner |
04 |
05 |
/** |
06 |
* User: 过往记忆 |
07 |
* Date: 2015-05-21 |
08 |
* Time: 下午23:34 |
09 |
* bolg: http://www.iteblog.com |
10 |
* 本文地址:http://www.iteblog.com/archives/1368 |
11 |
* 过往记忆博客,专注于hadoop、hive、spark、shark、flume的技术博客,大量的干货 |
12 |
* 过往记忆博客微信公共帐号:iteblog_hadoop |
13 |
*/ |
14 |
15 |
class IteblogPartitioner(numParts : Int) extends Partitioner { |
16 |
override def numPartitions : Int = numParts |
17 |
18 |
override def getPartition(key : Any) : Int = { |
19 |
val domain = new java.net.URL(key.toString).getHost() |
20 |
val code = (domain.hashCode % numPartitions) |
21 |
if (code < 0 ) { |
22 |
code + numPartitions |
23 |
} else { |
24 |
code |
25 |
} |
26 |
} |
27 |
28 |
override def equals(other : Any) : Boolean = other match { |
29 |
case iteblog : IteblogPartitioner = > |
30 |
iteblog.numPartitions == numPartitions |
31 |
case _ = > |
32 |
false |
33 |
} |
34 |
35 |
override def hashCode : Int = numPartitions |
36 |
} |
因为hashCode
值可能为负数,所以我们需要对他进行处理。然后我们就可以在partitionBy()
方法里面使用我们的分区:
1 |
iteblog.partitionBy( new IteblogPartitioner( 20 )) |
类似的,在Java中定义自己的分区策略和Scala类似,只需要继承org.apache.spark.Partitioner
,并实现其中的方法即可。
在Python中,你不需要扩展Partitioner类,我们只需要对iteblog.partitionBy()
加上一个额外的hash函数,如下:
1 |
import urlparse |
2 |
3 |
def iteblog_domain(url): |
4 |
return hash (urlparse.urlparse(url).netloc) |
5 |
6 |
iteblog.partitionBy( 20 , iteblog_domain) |
标签:
原文地址:http://www.cnblogs.com/shexinwei/p/4646557.html