标签:version 任务 serial gns debug master dir counter proc
val sc = new SparkContext(...) val file = sc.textFile("file.txt") val blankLines = sc.accumulator(0) // create an Accumulator[Int] initialized to 0 val callSigns = file.flatMap(line => { if (line == ""){ blankLines += 1 // Add to the accumulator } line.split(" ") })
callSigns.saveAsTextFile("output.txt")
println("Blank lines: " + blankLines.value)
注意只有在调用saveAsTextFile之后才能得到正确的count of blankLines,因为transfomation是lazy的。
An operation op is associative if (a op b) op c = a op (b op c) for all values a, b, and c.
# Look up the locations of the call signs on the # RDD contactCounts. We load a list of call sign # prefixes to country code to support this lookup.
signPrefixes = loadCallSignTable() def processSignCount(sign_count, signPrefixes): country = lookupCountry(sign_count[0], signPrefixes) count = sign_count[1] return (country, count) countryContactCounts = (contactCounts .map(processSignCount) .reduceByKey((lambda x, y: x+ y)))
上面的代码中,如果signPrefixes是一个很大的table,那么将该表从master传到每个slave将是很昂贵的。而且如果后期还要用到signPrefixes,它将会被再次发送到每个节点。通过将signPrefixes变成broadcast变量可以解决这个问题。如下:
// Look up the countries for each call sign for the // contactCounts RDD. We load an array of call sign // prefixes to country code to support this lookup. val signPrefixes = sc.broadcast(loadCallSignTable()) val countryContactCounts = contactCounts.map{case (sign, count) => val country = lookupInArray(sign, signPrefixes.value) (country, count) }.reduceByKey((x, y) => x + y) countryContactCounts.saveAsTextFile(outputDir + "/countries.txt")
标签:version 任务 serial gns debug master dir counter proc
原文地址:http://www.cnblogs.com/wttttt/p/6844918.html