码迷,mamicode.com
首页 > 其他好文 > 详细

spark-------------RDD 转换算子-----value类型(一)

时间:2021-01-27 12:50:51      阅读:0      评论:0      收藏:0      [点我收藏+]

标签:add   max   转换   合规   rgs   als   pre   http   simple   

引言

RDD 根据数据处理方式的不同将算子整体上分为 Value 类型、双 Value 类型和 Key-Value类型,本文主要讲一些Value 类型

正文

资源路径和资源内容

技术图片

map

  • 函数签名:def map[U: ClassTag](f: T => U): RDD[U]

  • 函数说明:将处理的数据逐条进行映射转换,这里的转换可以是类型的转换,也可以是值的转换。

案例:从服务器日志数据 apache.log 中获取用户请求 URL 资源路径

package com.xiao.spark.core.rdd.operator.tranform

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object Spark01_RDD_Operator_Map {
  def main(args: Array[String]): Unit = {
      // TODO 准备环境
      val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("transform")
      val sc = new SparkContext(sparkConf)

    val rdd: RDD[String] = sc.textFile("datas/apache.log")
    val value: RDD[String] = rdd.map({
      line => {
        val address: String = line.split(" ")(6)
        address
      }
    })
    value.collect().foreach(println)
      // TODO 关闭环境
      sc.stop()
  }
}

运行结果:

技术图片

mapPartitions

  • 函数签名:def mapPartitions[U: ClassTag](f: Iterator[T] => Iterator[U],preservesPartitioning: Boolean = false): RDD[U]

  • 函数说明:将待处理的数据以分区为单位发送到计算节点进行处理,这里的处理是指可以进行任意的处理,哪怕是过滤数据。

案例:获取每个数据分区的最大值

package com.xiao.spark.core.rdd.operator.tranform

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object Spark02_RDD_Operator_MapPartitions {
  def main(args: Array[String]): Unit = {
      // TODO 准备环境
      val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("transform")
      val sc = new SparkContext(sparkConf)
      // 求每个分区的最大值
      val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4),numSlices = 2)
    val value: RDD[Int] = rdd.mapPartitions(
      iter => {
        List(iter.max).iterator
      }
    )
      value.collect().foreach(println)
        // TODO 关闭环境
      sc.stop()
  }
}

运行结果

技术图片

mapPartitionsWithIndex

  • 函数签名:def mapPartitionsWithIndex[U: ClassTag](f: (Int, Iterator[T]) => Iterator[U],preservesPartitioning: Boolean = false): RDD[U]

  • 函数说明:将待处理的数据以分区为单位发送到计算节点进行处理,这里的处理是指可以进行任意的处理,哪怕是过滤数据,在处理时同时可以获取当前分区索引。

案例:打印数据和以及所在的分区

package com.xiao.spark.core.rdd.operator.tranform

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object Spark01_RDD_Operator_MapPartitionsWithIndex {
  def main(args: Array[String]): Unit = {
      // TODO 准备环境
      val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("transform")
      val sc = new SparkContext(sparkConf)
      val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4),numSlices = 2)
      val value: RDD[(Int, Int)] = rdd.mapPartitionsWithIndex(
        (index, iter) => {
          iter.map(
            num => {
              (index, num)
            }
          )
        }
      )
      value.collect().foreach(println)
      // TODO 关闭环境
      sc.stop()
  }
}

运行截图

技术图片

flatMap

  • 函数签名:def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U]

  • 函数说明:将处理的数据进行扁平化后再进行映射处理,所以算子也称之为扁平映射

案例:将 List(List(1,2),3,List(4,5))进行扁平化操作

package com.xiao.spark.core.rdd.operator.tranform

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object Spark01_RDD_Operator_FlatMap {
  def main(args: Array[String]): Unit = {
      // TODO 准备环境
      val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("transform")
      val sc = new SparkContext(sparkConf)
      val rdd: RDD[Any] = sc.makeRDD(List(List(1, 2), 3, List(4, 5)))
      val value: RDD[Any] = rdd.flatMap(
        // 用模式匹配
        data => {
          data match {
            case list: List[_] => list
            case dat => List(dat)
          }
        }
      )
    value.collect().foreach(println)
      // TODO 关闭环境
      sc.stop()
  }
}

运行截图

技术图片

glom

  • 函数签名:def glom(): RDD[Array[T]]

  • 函数说明:将同一个分区的数据直接转换为相同类型的内存数组进行处理,分区不变

案例:计算所有分区最大值求和(分区内取最大值,分区间最大值求和)

package com.xiao.spark.core.rdd.operator.tranform

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object Spark01_RDD_Operator_Glom {
  def main(args: Array[String]): Unit = {
      // TODO 准备环境
      val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("transform")
      val sc = new SparkContext(sparkConf)
      // 【1,2】【3,4】
      // 【2】【4】
      // 6
      val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4),numSlices = 2)
      // 变成数组
      val glomRDD: RDD[Array[Int]] = rdd.glom()
      // 取最大值
      val maxRDD: RDD[Int] = glomRDD.map(
        array => {
          array.max
        }
      )
    println(maxRDD.collect().sum)
      // TODO 关闭环境
      sc.stop()
  }
}

运行截图:

技术图片

groupBy

  • 函数签名:def groupBy[K](f: T => K)(implicit kt: ClassTag[K]): RDD[(K, Iterable[T])]

  • 函数说明:将数据根据指定的规则进行分组, 分区默认不变,但是数据会被打乱重新组合,我们将这样的操作称之为 shuffle。极限情况下,数据可能被分在同一个分区中

案例:

  • 从服务器日志数据 apache.log 中获取每个时间段访问量
  • 将 List("Hello", "hive", "hbase", "Hadoop")根据单词首写字母进行分组。
package com.xiao.spark.core.rdd.operator.tranform

import java.text.SimpleDateFormat
import java.util.Date

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object Spark01_RDD_Operator_GroupBy {
  def main(args: Array[String]): Unit = {
      // TODO 准备环境
      val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("transform")
      val sc = new SparkContext(sparkConf)
    // 按照首字母进行分组
//    val rdd: RDD[String] = sc.makeRDD(List("Hadoop","Spark","Scala","Hive"))
//
//    val value: RDD[(Char, Iterable[String])] = rdd.groupBy(_.charAt(0))

    // 从服务器日志数据 apache.log 中获取每个时间段访问量
    val rdd: RDD[String] = sc.textFile("datas/apache.log")
    val timeRDD: RDD[(String, Iterable[(String, Int)])] = rdd.map(
      line => {
        val time: String = line.split(" ")(3)
        val sdf = new SimpleDateFormat("dd/MM/yy:HH:mm:ss")
        val date: Date = sdf.parse(time)
        val sdf1 = new SimpleDateFormat("HH")
        val hour: String = sdf1.format(date)
        (hour, 1)
      }
    ).groupBy(_._1) // _._1 获取元组第一个元素  ._1:取第一个元素
    val value: RDD[(String, Int)] = timeRDD.map {
      case (hour, iter) => (hour, iter.size)
    }
    value.collect().foreach(println)
      // TODO 关闭环境
      sc.stop()
  }
}

运行截图:

技术图片
技术图片

filter

  • 函数签名:def filter(f: T => Boolean): RDD[T]

  • 函数说明:将数据根据指定的规则进行筛选过滤,符合规则的数据保留,不符合规则的数据丢弃。当数据进行筛选过滤后,分区不变,但是分区内的数据可能不均衡,生产环境下,可能会出现数据倾斜。

案例:从服务器日志数据 apache.log 中获取 2015 年 5 月 17 日的请求路径

package com.xiao.spark.core.rdd.operator.tranform

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object Spark01_RDD_Operator_Filter {
  def main(args: Array[String]): Unit = {
      // TODO 准备环境
      val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("transform")
      val sc = new SparkContext(sparkConf)

    val rdd: RDD[String] = sc.textFile("datas/apache.log")
    val timeRDD: RDD[String] = rdd.filter(
      line => {
        val time: String = line.split(" ")(3)
        time.startsWith("17/05/2015")
      }
    )
    timeRDD.collect().foreach(println)
      // TODO 关闭环境
      sc.stop()
  }
}

运行结果:

技术图片

spark-------------RDD 转换算子-----value类型(一)

标签:add   max   转换   合规   rgs   als   pre   http   simple   

原文地址:https://www.cnblogs.com/yangxiao-/p/14326296.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!