标签:调用 als output tco lookup lambda equal 获取 效率
简介??spark执行操作时,可以使用驱动器程序Driver中定义的变量,但有时这种默认的使用方式却并不理想。
??spark为了解决这两个问题,提供了两种类型的共享变量:广播变量(broadcast variable)和累加器(accumulator)。
??查询每个国家的呼号个数
# 将呼号前缀(国家代码)作为广播变量
signPrefixes = sc.broadcast(loadCallSignTable())
def processSignCount(sign_count, signPrefixes):
country = lookupCountry(sign_count[0], signPrefixes.value)
count = sign_count[1]
return (country, count)
countryContactCounts = (contactCounts.map(processSignCount).reduceByKey((lambda x, y: x+y)))
countryContactCounts.saveAsTextFile(outputDir + "/countries.txt")
// 将呼号前缀(国家代码)作为广播变量
val signPrefixes = sc.broadcast(loadCallSignTable())
def processSignCount(sign_count, signPrefixes):
country = lookupCountry(sign_count[0], signPrefixes.value)
count = sign_count[1]
return (country, count)
val countryContactCounts = contactCounts.map{case (sign, count) => {
val country = lookupInArray(sign, signPrefixes.value)
(country, count)
}}.reduceByKey((x, y) => x+y)
countryContactCounts.saveAsTextFile(outputDir + "/countries.txt")
// 将呼号前缀(国家代码)作为广播变量
final Broadcast<String[]> signPrefixes = sc.broadcast(loadCallSignTable());
JavaPairRDD<String, Integer> countryContactCounts = contactCounts.mapToPair(new PairFunction<Tuple2<String, Integer>, String, Integer>() {
public Tuple2<String, Integer> call(Tuple2<String, Integer> callSignCount) {
String sign = callSignCount._1();
String country = lookupCountry(sign, signPrefixes.value());
return new Tuple2(country, callSignCount._2());
}
}).reduceByKey(new SumInts());
countryContactCounts.saveAsTextFile(outputDir + "/countries.txt");
??累加空行
file = sc.textFile(inputFile)
# 创建Accumulator[Int]并初始化为0
blankLines = sc.accumulator(0)
def extractCallSigns(line):
global blankLines # 访问全局变量
if (line == ""):
blankLines += 1
return line.split(" ")
callSigns = file.flatMap(extractCallSigns)
callSigns.saveAsTextFile(outputDir + "/callsigns")
print "Blank lines: %d" % blankLines.value
val file = sc.textFile("file.txt")
val blankLines = sc.accumulator(0) //创建Accumulator[Int]并初始化为0
val callSigns = file.flatMap(line => {
if (line == "") {
blankLines += 1 //累加器加1
}
line.split(" ")
})
callSigns.saveAsTextFile("output.txt")
println("Blank lines:" + blankLines.value)
JavaRDD<String> rdd = sc.textFile(args[1]);
final Accumulator<Integer> blankLines = sc.accumulator(0);
JavaRDD<String> callSigns = rdd.flatMap(new FlatMapFunction<String, String>() {
public Iterable<String> call(String line) {
if ("".equals(line)) {
blankLines.add(1);
}
return Arrays.asList(line.split(" "));
}
});
callSigns.saveAsTextFile("output.text");
System.out.println("Blank lines:" + blankLines.value());
标签:调用 als output tco lookup lambda equal 获取 效率
原文地址:http://blog.51cto.com/12967015/2172863