标签:友好 eset err set over map cat scala ons
scala> val notice = sc.textFile("./NOTICE") notice: org.apache.spark.rdd.RDD[String] = ./NOTICE MapPartitionsRDD[40] at textFile at <console>:32 scala> val blanklines = sc.accumulator(0) warning: there were two deprecation warnings; re-run with -deprecation for details blanklines: org.apache.spark.Accumulator[Int] = 0 scala> val tmp = notice.flatMap(line => { | if (line == "") { | blanklines += 1 | } | line.split(" ") | }) tmp: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[41] at flatMap at <console>:36 scala> tmp.count() res31: Long = 3213 scala> blanklines.value res32: Int = 171
package com.lxl.spark
import org.apache.spark.util.AccumulatorV2 import org.apache.spark.{SparkConf, SparkContext} import scala.collection.JavaConversions._
class LogAccumulator extends org.apache.spark.util.AccumulatorV2[String, java.util.Set[String]] { private val _logArray: java.util.Set[String] = new java.util.HashSet[String]() override def isZero: Boolean = { _logArray.isEmpty } override def reset(): Unit = { _logArray.clear() } override def add(v: String): Unit = { _logArray.add(v) } override def merge(other: org.apache.spark.util.AccumulatorV2[String, java.util.Set[String]]): Unit = { other match { case o: LogAccumulator => _logArray.addAll(o.value) } } override def value: java.util.Set[String] = { java.util.Collections.unmodifiableSet(_logArray) } override def copy():org.apache.spark.util.AccumulatorV2[String, java.util.Set[String]] = { val newAcc = new LogAccumulator() _logArray.synchronized{ newAcc._logArray.addAll(_logArray) } newAcc } } // 过滤掉带字母的 object LogAccumulator { def main(args: Array[String]) { val conf=new SparkConf().setAppName("LogAccumulator") val sc=new SparkContext(conf) val accum = new LogAccumulator sc.register(accum, "logAccum") val sum = sc.parallelize(Array("1", "2a", "3", "4b", "5", "6", "7cd", "8", "9"), 2).filter(line => { val pattern = """^-?(\d+)""" val flag = line.matches(pattern) if (!flag) { accum.add(line) } flag }).map(_.toInt).reduce(_ + _) println("sum: " + sum) for (v <- accum.value) print(v + "") println() sc.stop() } }
scala> val broadcastVar = sc.broadcast(Array(1, 2, 3)) broadcastVar: org.apache.spark.broadcast.Broadcast[Array[Int]] = Broadcast(35) scala> broadcastVar.value res33: Array[Int] = Array(1, 2, 3)
标签:友好 eset err set over map cat scala ons
原文地址:https://www.cnblogs.com/LXL616/p/11148328.html