码迷,mamicode.com
首页 > 其他好文 > 详细

Homework 2: UDF Caching in Spark

时间:2015-03-05 15:01:48      阅读:306      评论:0      收藏:0      [点我收藏+]

标签:spark   udfcache   sqark sql   scala   bigdata   

    为spark编写UDF cache:

    作业介绍 https://github.com/cs186-spring15/course/tree/master/hw2

    我花了点时间做了下,觉得是学习spark sql和scala的好材料。现在把我写的作业记录如下:

Task #1: Implementing DiskPartition and GeneralDiskHashedRelation

Task #2: Implementing object DiskHashedRelation


DiskPartition.scala

package org.apache.spark.sql.execution
import java.io._
import java.nio.file.{Path, StandardOpenOption, Files}
import java.util.{ArrayList => JavaArrayList}
import org.apache.spark.SparkException
import org.apache.spark.sql.catalyst.expressions.{Projection, Row}
import org.apache.spark.sql.execution.CS186Utils._
import scala.collection.JavaConverters._
/**
 * This trait represents a regular relation that is hash partitioned and spilled to
 * disk.
 */
private[sql] sealed trait DiskHashedRelation {
  /**
   *
   * @return an iterator of the [[DiskPartition]]s that make up this relation.
   */
  def getIterator(): Iterator[DiskPartition]
  /**
   * Close all the partitions for this relation. This should involve deleting the files hashed into.
   */
  def closeAllPartitions()
}
/**
 * A general implementation of [[DiskHashedRelation]].
 *
 * @param partitions the disk partitions that we are going to spill to
 */
protected [sql] final class GeneralDiskHashedRelation(partitions: Array[DiskPartition])
    extends DiskHashedRelation with Serializable {
  override def getIterator() = {
    // IMPLEMENT ME
    partitions.iterator
    //null
  }
  override def closeAllPartitions() = {
    // IMPLEMENT ME
    partitions.foreach((_: DiskPartition).closePartition())
  }
}
private[sql] class DiskPartition (
                                  filename: String,
                                  blockSize: Int) {
  private val path: Path = Files.createTempFile("", filename)
  private val data: JavaArrayList[Row] = new JavaArrayList[Row]
  private val outStream: OutputStream = Files.newOutputStream(path)
  private val inStream: InputStream = Files.newInputStream(path)
  private val chunkSizes: JavaArrayList[Int] = new JavaArrayList[Int]()
  private var writtenToDisk: Boolean = false
  private var inputClosed: Boolean = false
  /**
   * This method inserts a new row into this particular partition. If the size of the partition
   * exceeds the blockSize, the partition is spilled to disk.
   *
   * @param row the [[Row]] we are adding
   */
  def insert(row: Row) = {
    // IMPLEMENT ME
    if (inputClosed) {
      throw new SparkException("The partition is closed!")
    }
    data.add(row)
    val partitionDataSize = measurePartitionSize()
    if (partitionDataSize >blockSize) {
      spillPartitionToDisk()
      data.clear()
    }
  }
  /**
   * This method converts the data to a byte array and returns the size of the byte array
   * as an estimation of the size of the partition.
   *
   * @return the estimated size of the data
   */
  private[this] def measurePartitionSize(): Int = {
    CS186Utils.getBytesFromList(data).size
  }
  /**
   * Uses the [[Files]] API to write a byte array representing data to a file.
   */
  private[this] def spillPartitionToDisk() = {
    val bytes: Array[Byte] = getBytesFromList(data)
    // This array list stores the sizes of chunks written in order to read them back correctly.
    chunkSizes.add(bytes.size)
    Files.write(path, bytes, StandardOpenOption.APPEND)
    writtenToDisk = true
  }
  /**
   * If this partition has been closed, this method returns an Iterator of all the
   * data that was written to disk by this partition.
   *
   * @return the [[Iterator]] of the data
   */
  def getData(): Iterator[Row] = {
    if (!inputClosed) {
      throw new SparkException("Should not be reading from file before closing input. Bad things will happen!")
    }
    new Iterator[Row] {
      var currentIterator: Iterator[Row] = data.iterator.asScala
      val chunkSizeIterator: Iterator[Int] = chunkSizes.iterator().asScala
      var byteArray: Array[Byte] = null
      override def next() = {
        // IMPLEMENT ME
        currentIterator.next()
        //null
      }
      override def hasNext() = {
        // IMPLEMENT ME
        var hasNext = currentIterator.hasNext
        if(!hasNext){
          hasNext = chunkSizeIterator.hasNext
          if(hasNext){
            fetchNextChunk()
          }
        }
        hasNext
       // false
      }
      /**
       * Fetches the next chunk of the file and updates the iterator. Should return true
       * unless the iterator is empty.
       *
       * @return true unless the iterator is empty.
       */
      private[this] def fetchNextChunk(): Boolean = {
        // IMPLEMENT ME
        if (!chunkSizeIterator.hasNext) {
          return false
        }
        val size = chunkSizeIterator.next()
        if (size <= 0) {
          return false
        }
        byteArray = CS186Utils.getNextChunkBytes(inStream, size,byteArray)
        currentIterator = CS186Utils.getListFromBytes(byteArray).iterator.asScala
          true
      }
    }
  }
  /**
   * Closes this partition, implying that no more data will be written to this partition. If getData()
   * is called without closing the partition, an error will be thrown.
   *
   * If any data has not been written to disk yet, it should be written. The output stream should
   * also be closed.
   */
  def closeInput() = {
    // IMPLEMENT ME
    if(!data.isEmpty){
      //println(data.size())
      spillPartitionToDisk()
      data.clear()
    }
    inputClosed = true
  }
  /**
   * Closes this partition. This closes the input stream and deletes the file backing the partition.
   */
  private[sql] def closePartition() = {
    inStream.close()
    Files.deleteIfExists(path)
  }
}
private[sql] object DiskHashedRelation {
  /**
   * Given an input iterator, partitions each row into one of a number of [[DiskPartition]]s
   * and constructors a [[DiskHashedRelation]].
   *
   * This executes the first phase of external hashing -- using a course-grained hash function
   * to partition the tuples to disk.
   *
   * The block size is approximately set to 64k because that is a good estimate of the average
   * buffer page.
   *
   * @param input the input [[Iterator]] of [[Row]]s
   * @param keyGenerator a [[Projection]] that generates the keys for the input
   * @param size the number of [[DiskPartition]]s
   * @param blockSize the threshold at which each partition will spill
   * @return the constructed [[DiskHashedRelation]]
   */
  def apply (
                input: Iterator[Row],
                keyGenerator: Projection,
                size: Int = 64,
                blockSize: Int = 64000) = {
    // IMPLEMENT ME
    val partitionList: JavaArrayList[DiskPartition] = genDiskPartition(size, blockSize)
    input.foreach { (row: Row) => {
      val rowWithKey = keyGenerator(row)
      val index = rowWithKey.hashCode() % size
      partitionList.get(index).insert(row)
    }
    }
    val partitions: Array[DiskPartition] = partitionList.toArray(new Array[DiskPartition](size))
    partitionList.toArray(new Array[DiskPartition](size)).foreach((_: DiskPartition).closeInput())
    new GeneralDiskHashedRelation(partitions)
    //null
  }
  def genDiskPartition(size: Int, blockSize: Int): JavaArrayList[DiskPartition] = {
    val partitionList: JavaArrayList[DiskPartition] = new JavaArrayList[DiskPartition]
    (0 to size-1).foreach { (i: Int) => {
      partitionList.add(new DiskPartition("partition" + i, blockSize))
    }
    }
    partitionList
  }
}


Task #3: Implementing CS186Utils methods

CS186Utils.scala


package org.apache.spark.sql.execution
import java.io._
import java.util.{ArrayList => JavaArrayList, HashMap => JavaHashMap}
import org.apache.spark.sql.catalyst.expressions._
object CS186Utils {
  /**
   * Returns a Scala array that contains the bytes representing a Java ArrayList.
   *
   * @param data the Java ArrayList we are converting
   * @return an array of bytes
   */
  def getBytesFromList(data: JavaArrayList[Row]): Array[Byte] = {
    // create a ObjectOutputStream backed by a ByteArrayOutputStream
    val bytes = new ByteArrayOutputStream()
    val out = new ObjectOutputStream(bytes)
    // write the object to the output
    out.writeObject(data)
    out.flush()
    out.close()
    bytes.close()
    // return the byte array
    bytes.toByteArray
  }
  /**
   * Converts an array of bytes into a JavaArrayList of type [[Row]].
   *
   * @param bytes the input byte array
   * @return a [[JavaArrayList]] of Rows
   */
  def getListFromBytes(bytes: Array[Byte]): JavaArrayList[Row] = {
    val result: JavaArrayList[Row] = new JavaArrayList[Row]()
    var temp: JavaArrayList[Row] = null
    // create input streams based on the input bytes
    val bytesIn = new ByteArrayInputStream(bytes)
    var in = new ObjectInputStream(bytesIn)
    try {
      // read each object in and attempt to interpret it as a JavaArrayList[Row]
      while ((temp = in.readObject() match {
        case value: JavaArrayList[Row] => value
        case _: Throwable => throw new RuntimeException(s"Unexpected casting exception while reading from file.")
      }) != null) {
        // if it succeeds, add it to the result
        result.addAll(temp)
        // we need to create a new ObjectInputStream for each new object we read because of Java stream quirks
        in = new ObjectInputStream(bytesIn)
      }
    } catch {
      // ObjectInputStream control flow dictates that an EOFException will be thrown when the file is over -- this is expected
      case e: EOFException => // do nothing
      case other: Throwable => throw other
    }
    result
  }
  /**
   * Reads the next nextChunkSize bytes from the input stream provided. If the previous array read into is availab
   * please provide it so as to avoid allocating new object unless absolutely necessary.
   *
   * @param inStream the input stream we are reading from
   * @param nextChunkSize the number of bytes to read
   * @param previousArray the previous array we read into
   * @return
   */
  def getNextChunkBytes(inStream: InputStream, nextChunkSize: Int, previousArray: Array[Byte] = null): Array[Byte] = {
    var byteArray = previousArray
    if (byteArray == null || byteArray.size != nextChunkSize) {
      byteArray = new Array[Byte](nextChunkSize)
    }
    // Read the bytes in.
    inStream.read(byteArray)
    byteArray
  }
  /**
   * Return a new projection operator.
   *
   * @param expressions
   * @param inputSchema
   * @return
   */
  def getNewProjection(
                      expressions: Seq[Expression],
                      inputSchema: Seq[Attribute]) = new InterpretedProjection(expressions, inputSchema)
  /**
   * This function returns the [[ScalaUdf]] from a sequence of expressions. If there is no UDF in the
   * sequence of expressions then it returns null. If there is more than one, it returns the one that is
   * sequentially last.
   *
   * @param expressions
   * @return
   */
  def getUdfFromExpressions(expressions: Seq[Expression]): ScalaUdf = {
    // IMPLEMENT ME
    var udf: ScalaUdf = null
    expressions.foreach { (expression: Expression) => {
      if (expression.isInstanceOf[ScalaUdf]) udf = expression.asInstanceOf[ScalaUdf]
    }
    }
    udf
  }
  /**
   * This function takes a sequence of expressions. If there is no UDF in the sequence of expressions, it does
   * a regular projection operation.
   *
   * If there is a UDF, then it creates a caching iterator that caches the result of the UDF.
   *
   * NOTE: This only works for a single UDF. If there are multiple UDFs, then it will only cache for the last UDF
   * and execute all other UDFs regularly.
   *
   * @param expressions
   * @param inputSchema
   * @return
   */
  def generateCachingIterator(
                                expressions: Seq[Expression],
                                inputSchema: Seq[Attribute]): (Iterator[Row] => Iterator[Row]) = {
    // Get the UDF from the expressions.
    val udf: ScalaUdf = CS186Utils.getUdfFromExpressions(expressions)
    udf match {
      /* If there is no UDF, then do a regular projection operation. Note that this is very similar to Project in
         basicOperators.scala */
      case null => {
        { input =>
          val projection = CS186Utils.getNewProjection(expressions, inputSchema)
          input.map(projection)
        }
       /* def aaa (input: Iterator[Row]) : Iterator[Row] = {
          val projection = CS186Utils.getNewProjection(expressions, inputSchema)
          input.map(projection)
        }
        aaa*/
      }
      // Otherwise, separate the expressions appropriately and creating a caching iterator.
      case u: ScalaUdf => {
        val udfIndex: Int = expressions.indexOf(u)
        val preUdfExpressions = expressions.slice(0, udfIndex)
        val postUdfExpressions = expressions.slice(udfIndex + 1, expressions.size)
        CachingIteratorGenerator(udf.children, udf, preUdfExpressions, postUdfExpressions, inputSchema)
      }
    }
  }
}
object CachingIteratorGenerator {
  /**
   * This function takes an input iterator and returns an iterator that does in-memory memoization
   * as it evaluates the projection operator over each input row. The result is the concatenation of
   * the projection of the preUdfExpressions, the evaluation of the udf, and the projection of the
   * postUdfExpressions, in that order.
   *
   * The UDF should only be evaluated if the inputs to the UDF have never been seen before.
   *
   * This method only needs to worry about caching for the UDF that is specifically passed in. If
   * there are any other UDFs in the expression lists, then they can and should be evaluated
   * without any caching.
   *
   * @param cacheKeys the keys on which we will cache -- the inputs to the UDF
   * @param udf the udf we are caching for
   * @param preUdfExpressions the expressions that come before the UDF in the projection
   * @param postUdfExpressions the expressions that come after the UDF in the projection
   * @param inputSchema the schema of the rows -- useful for creating projections
   * @return
   */
  //CachingIteratorGenerator(studentAttributes, udf, Seq(studentAttributes(1)), Seq(), studentAttributes)
  //Student(sid: Int, gpa: Float)
  def apply(
               cacheKeys: Seq[Expression],
               udf: ScalaUdf,
               preUdfExpressions: Seq[Expression],
               postUdfExpressions: Seq[Expression],
               inputSchema: Seq[Attribute]): (Iterator[Row] => Iterator[Row]) = {
    { input =>
      new Iterator[Row] {
        val udfProject = CS186Utils.getNewProjection(Seq(udf), inputSchema)
        val cacheKeyProjection = CS186Utils.getNewProjection(udf.children, inputSchema)
        val preUdfProjection = CS186Utils.getNewProjection(preUdfExpressions, inputSchema)
        val postUdfProjection = CS186Utils.getNewProjection(postUdfExpressions, inputSchema)
        val cache: JavaHashMap[Row, Row] = new JavaHashMap[Row, Row]()
        def hasNext() = {
          // IMPLEMENT ME
          val hasNext = input.hasNext
          if(!hasNext){
            cache.clear()
          }
          hasNext
         // false
        }
        def next() = {
          // IMPLEMENT ME
          val row = input.next()
         // print("(b "+row+",")
          val computedKey:Row = cacheKeyProjection(row)
          var computedValues: Row = cache.get(computedKey)
         /* if(computedValues!=null){
            print("effect key"+computedKey +" val "+computedValues)
          }*/
          if(computedValues == null){
            val values: JavaArrayList[Any] = new JavaArrayList()
            preUdfProjection(row).iterator.foreach { (i: Any) => {
              values.add(i)
            }
            }
            udfProject(row).iterator.foreach { (i: Any) => {
              values.add(i)
            }
            }
            postUdfProjection(row).iterator.foreach { (i: Any) => {
              values.add(i)
            }
            }
            computedValues = Row.fromSeq(values.toArray)
            cache.put(computedKey,computedValues)
          }
        //  print(" a "+computedValues+")")
        //  print(cache.size())
            computedValues
        }
      }
    }
  }
}

Task 4: Implementing PartitionProject

basicOperators.scala

/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.spark.sql.execution
import scala.collection.mutable.ArrayBuffer
import org.apache.spark.{SparkEnv, HashPartitioner, SparkConf}
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.rdd.{RDD, ShuffledRDD}
import org.apache.spark.shuffle.sort.SortShuffleManager
import org.apache.spark.sql.catalyst.ScalaReflection
import org.apache.spark.sql.catalyst.errors._
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.physical.{ClusteredDistribution, OrderedDistribution, SinglePartition, UnspecifiedDistribution}
import org.apache.spark.util.MutablePair
import org.apache.spark.util.collection.ExternalSorter
/**
 * :: DeveloperApi ::
 */
@DeveloperApi
case class Project(projectList: Seq[NamedExpression], child: SparkPlan) extends UnaryNode {
  override def output = projectList.map(_.toAttribute)
  @transient lazy val buildProjection = newMutableProjection(projectList, child.output)
  def execute() = child.execute().mapPartitions { iter =>
    val resuableProjection = buildProjection()
    iter.map(resuableProjection)
  }
}
/**
 * A projection operator that is tailored to improve performance of UDF execution using
 * in-memory memoization.
 *
 * NOTE: This assumes that we are only caching for a single UDF. If there are multiple
 * UDFs, it will only cache for the last UDF. All other UDFs will be executed regularly.
 *
 * Once you have completed implementing the functions in [[CS186Utils]], this operator
 * should work.
 */
@DeveloperApi
case class CacheProject(projectList: Seq[Expression], child: SparkPlan) extends UnaryNode {
  override def output = child.output
  def execute() = {
    /* Generate the caching iterator. You should trace this code to understand it!
       You have to implement parts of the stack to make this work. */
    val generator: (Iterator[Row] => Iterator[Row]) = CS186Utils.generateCachingIterator(projectList, child.output)
    /* This is Spark magic. In short, it applies the generator function to each of the slices of an RDD.
       For the purposes of CS 186, we will only ever have one slice. */
    child.execute().mapPartitions(generator)
  }
}
/**
 * A projection operator that is tailor to improve performance of UDF execution by using
 * external hashing.
 *
 * @param projectList
 * @param child
 */
@DeveloperApi
case class PartitionProject(projectList: Seq[Expression], child: SparkPlan) extends UnaryNode {
  override def output = child.output
  def execute() = {
    child.execute().mapPartitions(generateIterator)
  }
  /**
   * This method takes an iterator as an input. It should first partition the whole input to disk.
   * It should then read each partition from disk and construct do in-memory memoization over each
   * partition to avoid recomputation of UDFs.
   *
   * @param input the input iterator
   * @return the result of applying the projection
   */
  def generateIterator(input: Iterator[Row]): Iterator[Row] = {
    // This is the key generator for the course-grained external hashing.
    val keyGenerator = CS186Utils.getNewProjection(projectList, child.output)
    // IMPLEMENT ME
    val hashedRelation: DiskHashedRelation = DiskHashedRelation(input, keyGenerator,4,64000)
    val partitions: Iterator[DiskPartition] = hashedRelation.getIterator()
    var diskPartition:DiskPartition = null
    var cachingIterator: Iterator[Row] =null
    new Iterator[Row] {
      def hasNext() = {
        var hasNext = false
        if(cachingIterator != null && cachingIterator.hasNext){
          hasNext = true
        }else{
          hasNext = fetchNextPartition
        }
       /* else if(cachingIterator != null&& !cachingIterator.hasNext){
          hasNext = fetchNextPartition
        }
        else if(cachingIterator == null){
          hasNext = fetchNextPartition
        }*/
        // IMPLEMENT ME
        hasNext
      }
      def next() = {
        // IMPLEMENT ME
        cachingIterator.next()
      }
      /**
       * This fetches the next partition over which we will iterate or returns false if there are no more partitions
       * over which we can iterate.
       *
       * @return
       */
      private def fetchNextPartition(): Boolean  = {
        // IMPLEMENT ME
        var hasNext = partitions.hasNext
        if(hasNext){
          diskPartition = partitions.next()
          val data:Iterator[Row]=diskPartition.getData()
          if(data.hasNext){
            cachingIterator = CS186Utils.generateCachingIterator(projectList, child.output)(data)
            hasNext = true
          }else{
            hasNext = false
          }
        }
        //println(hasNext)
        hasNext
      }
    }
  }
}
/**
 * :: DeveloperApi ::
 */
@DeveloperApi
case class Filter(condition: Expression, child: SparkPlan) extends UnaryNode {
  override def output = child.output
  @transient lazy val conditionEvaluator = newPredicate(condition, child.output)
  def execute() = child.execute().mapPartitions { iter =>
    iter.filter(conditionEvaluator)
  }
}
/**
 * :: DeveloperApi ::
 */
@DeveloperApi
case class Sample(fraction: Double, withReplacement: Boolean, seed: Long, child: SparkPlan)
  extends UnaryNode
{
  override def output = child.output
  // TODO: How to pick seed?
  override def execute() = child.execute().sample(withReplacement, fraction, seed)
}
/**
 * :: DeveloperApi ::
 */
@DeveloperApi
case class Union(children: Seq[SparkPlan]) extends SparkPlan {
  // TODO: attributes output by union should be distinct for nullability purposes
  override def output = children.head.output
  override def execute() = sparkContext.union(children.map(_.execute()))
}
/**
 * :: DeveloperApi ::
 * Take the first limit elements. Note that the implementation is different depending on whether
 * this is a terminal operator or not. If it is terminal and is invoked using executeCollect,
 * this operator uses something similar to Spark‘s take method on the Spark driver. If it is not
 * terminal or is invoked using execute, we first take the limit on each partition, and then
 * repartition all the data to a single partition to compute the global limit.
 */
@DeveloperApi
case class Limit(limit: Int, child: SparkPlan)
  extends UnaryNode {
  // TODO: Implement a partition local limit, and use a strategy to generate the proper limit plan:
  // partition local limit -> exchange into one partition -> partition local limit again
  /** We must copy rows when sort based shuffle is on */
  private def sortBasedShuffleOn = SparkEnv.get.shuffleManager.isInstanceOf[SortShuffleManager]
  override def output = child.output
  override def outputPartitioning = SinglePartition
  /**
   * A custom implementation modeled after the take function on RDDs but which never runs any job
   * locally.  This is to avoid shipping an entire partition of data in order to retrieve only a few
   * rows.
   */
  override def executeCollect(): Array[Row] = {
    if (limit == 0) {
      return new Array[Row](0)
    }
    val childRDD = child.execute().map(_.copy())
    val buf = new ArrayBuffer[Row]
    val totalParts = childRDD.partitions.length
    var partsScanned = 0
    while (buf.size < limit && partsScanned < totalParts) {
      // The number of partitions to try in this iteration. It is ok for this number to be
      // greater than totalParts because we actually cap it at totalParts in runJob.
      var numPartsToTry = 1
      if (partsScanned > 0) {
        // If we didn‘t find any rows after the first iteration, just try all partitions next.
        // Otherwise, interpolate the number of partitions we need to try, but overestimate it
        // by 50%.
        if (buf.size == 0) {
          numPartsToTry = totalParts - 1
        } else {
          numPartsToTry = (1.5 * limit * partsScanned / buf.size).toInt
        }
      }
      numPartsToTry = math.max(0, numPartsToTry)  // guard against negative num of partitions
      val left = limit - buf.size
      val p = partsScanned until math.min(partsScanned + numPartsToTry, totalParts)
      val sc = sqlContext.sparkContext
      val res =
        sc.runJob(childRDD, (it: Iterator[Row]) => it.take(left).toArray, p, allowLocal = false)
      res.foreach(buf ++= _.take(limit - buf.size))
      partsScanned += numPartsToTry
    }
    buf.toArray.map(ScalaReflection.convertRowToScala(_, this.schema))
  }
  override def execute() = {
    val rdd: RDD[_ <: Product2[Boolean, Row]] = if (sortBasedShuffleOn) {
      child.execute().mapPartitions { iter =>
        iter.take(limit).map(row => (false, row.copy()))
      }
    } else {
      child.execute().mapPartitions { iter =>
        val mutablePair = new MutablePair[Boolean, Row]()
        iter.take(limit).map(row => mutablePair.update(false, row))
      }
    }
    val part = new HashPartitioner(1)
    val shuffled = new ShuffledRDD[Boolean, Row, Row](rdd, part)
    shuffled.setSerializer(new SparkSqlSerializer(new SparkConf(false)))
    shuffled.mapPartitions(_.take(limit).map(_._2))
  }
}
/**
 * :: DeveloperApi ::
 * Take the first limit elements as defined by the sortOrder. This is logically equivalent to
 * having a [[Limit]] operator after a [[Sort]] operator. This could have been named TopK, but
 * Spark‘s top operator does the opposite in ordering so we name it TakeOrdered to avoid confusion.
 */
@DeveloperApi
case class TakeOrdered(limit: Int, sortOrder: Seq[SortOrder], child: SparkPlan) extends UnaryNode {
  override def output = child.output
  override def outputPartitioning = SinglePartition
  val ord = new RowOrdering(sortOrder, child.output)
  // TODO: Is this copying for no reason?
  override def executeCollect() = child.execute().map(_.copy()).takeOrdered(limit)(ord)
    .map(ScalaReflection.convertRowToScala(_, this.schema))
  // TODO: Terminal split should be implemented differently from non-terminal split.
  // TODO: Pick num splits based on |limit|.
  override def execute() = sparkContext.makeRDD(executeCollect(), 1)
}
/**
 * :: DeveloperApi ::
 * Performs a sort on-heap.
 * @param global when true performs a global sort of all partitions by shuffling the data first
 *               if necessary.
 */
@DeveloperApi
case class Sort(
    sortOrder: Seq[SortOrder],
    global: Boolean,
    child: SparkPlan)
  extends UnaryNode {
  override def requiredChildDistribution =
    if (global) OrderedDistribution(sortOrder) :: Nil else UnspecifiedDistribution :: Nil
  override def execute() = attachTree(this, "sort") {
    child.execute().mapPartitions( { iterator =>
      val ordering = newOrdering(sortOrder, child.output)
      iterator.map(_.copy()).toArray.sorted(ordering).iterator
    }, preservesPartitioning = true)
  }
  override def output = child.output
}
/**
 * :: DeveloperApi ::
 * Performs a sort, spilling to disk as needed.
 * @param global when true performs a global sort of all partitions by shuffling the data first
 *               if necessary.
 */
@DeveloperApi
case class ExternalSort(
    sortOrder: Seq[SortOrder],
    global: Boolean,
    child: SparkPlan)
  extends UnaryNode {
  override def requiredChildDistribution =
    if (global) OrderedDistribution(sortOrder) :: Nil else UnspecifiedDistribution :: Nil
  override def execute() = attachTree(this, "sort") {
    child.execute().mapPartitions( { iterator =>
      val ordering = newOrdering(sortOrder, child.output)
      val sorter = new ExternalSorter[Row, Null, Row](ordering = Some(ordering))
      sorter.insertAll(iterator.map(r => (r, null)))
      sorter.iterator.map(_._1)
    }, preservesPartitioning = true)
  }
  override def output = child.output
}
/**
 * :: DeveloperApi ::
 * Computes the set of distinct input rows using a HashSet.
 * @param partial when true the distinct operation is performed partially, per partition, without
 *                shuffling the data.
 * @param child the input query plan.
 */
@DeveloperApi
case class Distinct(partial: Boolean, child: SparkPlan) extends UnaryNode {
  override def output = child.output
  override def requiredChildDistribution =
    if (partial) UnspecifiedDistribution :: Nil else ClusteredDistribution(child.output) :: Nil
  override def execute() = {
    child.execute().mapPartitions { iter =>
      val hashSet = new scala.collection.mutable.HashSet[Row]()
      var currentRow: Row = null
      while (iter.hasNext) {
        currentRow = iter.next()
        if (!hashSet.contains(currentRow)) {
          hashSet.add(currentRow.copy())
        }
      }
      hashSet.iterator
    }
  }
}
/**
 * :: DeveloperApi ::
 * Returns a table with the elements from left that are not in right using
 * the built-in spark subtract function.
 */
@DeveloperApi
case class Except(left: SparkPlan, right: SparkPlan) extends BinaryNode {
  override def output = left.output
  override def execute() = {
    left.execute().map(_.copy()).subtract(right.execute().map(_.copy()))
  }
}
/**
 * :: DeveloperApi ::
 * Returns the rows in left that also appear in right using the built in spark
 * intersection function.
 */
@DeveloperApi
case class Intersect(left: SparkPlan, right: SparkPlan) extends BinaryNode {
  override def output = children.head.output
  override def execute() = {
    left.execute().map(_.copy()).intersection(right.execute().map(_.copy()))
  }
}
/**
 * :: DeveloperApi ::
 * A plan node that does nothing but lie about the output of its child.  Used to spice a
 * (hopefully structurally equivalent) tree from a different optimization sequence into an already
 * resolved tree.
 */
@DeveloperApi
case class OutputFaker(output: Seq[Attribute], child: SparkPlan) extends SparkPlan {
  def children = child :: Nil
  def execute() = child.execute()
}




本文出自 “软件开发” 博客,请务必保留此出处http://tangjj.blog.51cto.com/1848040/1617574

Homework 2: UDF Caching in Spark

标签:spark   udfcache   sqark sql   scala   bigdata   

原文地址:http://tangjj.blog.51cto.com/1848040/1617574

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!