码迷,mamicode.com
首页 > 其他好文 > 详细

Spark小象学院笔记

时间:2016-07-22 06:37:20      阅读:184      评论:0      收藏:0      [点我收藏+]

标签:

---小象学院陈超视频教程笔记------陈超讲
第一节
Scala基础与实践
基于JVM的FP+OO
静态类型
和Java互操作
函数式编程和面向对象的结合,纯静态的语言。


解释器(interpreter)
值与变量(val & var)
函数(Function)


1、常量 val
2、变量 var
3、main函数要定义在object里面
实例:
object Basic{
  def hello(name : String): String ={
    "Hello :" + name //block的最后一行就是输出值
  }
  def helloScala(){
    println("hello Scala!!")
  }
  val add = (x: Int,y :Int) => x + y
  def add2(x:Int)(y:Int) = x + y
  def printEveryChar(c : String*)={
    c.foreach(x => println(x))
  }
  def main(args : Array[String]){
    println("hello Scala")
    //println(hello("Scala"))
    helloScala()
    add(1,2)
    println(add2(4)(6))
    printEveryChar("a","b","c","d")
//if表达式
    val x=1
    val a=if(x>0) 1 else 0
    println(a)
//循环
    var(n,r)=(10,0)
    while(n>0){
      r = r + n
      n = n - 1
    }
    println(r)


    for(i<- 1 to(10)){
      println(i)
    }
  }
}
------------------------------------------------------------
条件表达式(if)
循环表达式(no continue,no break)
语句终止(;no need)
实例2
class Basic2{


}


class Person{
  var name : String = _ //会生成getter和setter方法
  val age = 10 //只会生成getter方法
  private[this] val  gender = "male"
}


object Basic2 {
  def main(args: Array[String]): Unit ={
    val p = new Person //括号可省略
    p.name = "Jack"
    println(p.name + ":" + p.age)
  }
}
----------------------------------------------------------
类(class)
声明类(一个源文件中可以包含很多类,并且都是public级别)
getter和setter
构造函数(primary constructor & auxiliary constructor)
继承(extends)
重写父类方法(override def)
重写字段(override val,override var)
实例3
class Basic2{


}
class Person{
  var name : String = _ //会生成getter和setter方法
  val age = 10 //只会生成getter方法
  private[this] val  gender = "male"
}


object Basic2 {
  def main(args: Array[String]): Unit ={
    val p = new Person //括号可省略
    p.name = "Jack"
    println(p.name + ":" + p.age)
  }
}
实例4
class Basic2{


}
//1、主构造器直接跟在类名后面,主构造器中的参数,最后会被编译成字段
//2、主构造器执行的时候,会执行类中的所有语句
//3、假设参数声明时不带val和var,那么相当于private(this)!!!
class Person(var name:String,val age:Int){
 /* var name : String = _ //会生成getter和setter方法
  val age = 10 //只会生成getter方法
  private[this] val  gender = "male"*/


  println("this is the primary constructor!")
}


object Basic2 {
  def main(args: Array[String]): Unit ={
/*    val p = new Person
    p.name = "Jack"
    println(p.name + ":" + p.age)*/
    val p = new Person("Jack",20)
    println(p.name + ":" + p.age)
  }
}
--------------------------------------------------------------------
抽象类(abstract class)
类的一个或者多个方法没有没完整的定义
声明抽象方法不需要加abstract关键字,只需要不写方法体
子类重写父类的抽象方法时不需要加override
父类可以声明抽象字段(没有初始值的字段)
子类重写父类的抽象字段时不需要加override


class Basic3{
}
abstract class Person1{
    def speak
    val name : String
    var age : Int
}
abstract Student1 extends Persion1{
    def speak{
        print("speak!!!")
    }
    val name = "AAA"
    var age = 100
}
trait Logger{
    def log(msg : String){
        println("log" + msg)
    }
}


class Test extends Logger{
    def test{
        log("xxx")
    }
}


trait logger{
    def log(msg:String)
}


trait ConsoleLogger extends Logger{
    def log(msg:String){
        println(msg)
    }
}


class Test extends ConsoleLogger{
    def test{
        log("PPP")
    }
}


trait ConsoleLogger{
    def log(ms : String){
        println("save money:" + msg)
    }
}


trait MessageLogger extends ConsoleLogger{
    def log(msg : String){
        println("save money to bank :" + msg)
    }
}


abstract class Account{
    def save
}


class MyAccount extends Account with ConsoleLogger{
    def save{
        log(100)
    }
}


object Basic3 extends App{
    val t = new Test
    t.test
    
    val acc = new MyAccount with MessageLogger
    acc.save


    var s = new Student1
    s.speak
    println(s.name + ":" + s.age)
}


特质(trait)-对比下JAVA8的接口
字段和行为的集合
混入类中
通过with关键字,一个类可以扩展多个特质


trait续
当做接口
带有具体实现的接口
带有特质的对象
特质从左到右被构造


apply方法
单例对象
class ApplyTest{
      def test{
      println("test")
    }
}
class Basic4{


}


object Basic4 extends App{
    val a = ApplyTest
}


--------包(package com.xx.data)
支持嵌套,下层可以访问上层作用域中的名称
可串联
顶部标记
包对象
包可见性
包在任何地方都可以引入,作用域至该语句所在块的末尾
重命名引入成员(xx => yy)
隐藏方法(xx => _)
自动引入(java.lang._ scala._ Predef._)
--------模式匹配-----------------
标准用法(match)
使用守卫
匹配类型
class Basic5{
}


case class Book(name : String,author : String)
object Basic5 extends App{
    val value = 1
    value match{
        case 1 => "one"
case 2 => "two"
case _ => "some other number"
    }
    val result = value match{
        case i if i == 1 => "one"
case i if i == 2 => "two"
case _ => "some other number"
    }
    println("result of match is :" + result)
    println("result2 of match is :" + result2)
    def t(obj:Any) = obj match{
        case x : Int => println("Int")
case s : String => println("String")
case _ => println("unknow type")
    }
    t("1")
    t(1)
    t(1L)


    val macTalk = Book("MacTalk","CJQ")
    macTalk match{
        case Book(name,author) => println("this is book")
case _ =>println("unknown")
    }
}
---case class(多用在模式匹配中)
构造器中的每一个类型都为val,不建议用var
不用new就可以直接生产对象(为什么?apply方法)
-----------------------------
高阶函数
匿名函数 val double = (x: Int)=> 2 * x
函数作为参数
参数类型推断
常用高阶函数
    map、filter、reduce等等
----------------------------
集合
List
Set
Tuple
Map
----------------
集合操作
foreach(类似于map,但是没有返回值)
map(迭代)
filter(过滤)
zip(聚合)
partition(列表分割)
flatten(扁平化)
flatMap(map + flatten)
------------------------
泛型
泛型类
class Pair[T,S](val first:T,val second:S)
泛型方法
def computer[T](list : List[T])= ...
------------------------
class Basic6{
}
class A{
}
class Rich(a : A){
    def rich{
        println("rich...")
    }
}
object Basic6 extends App{
    implicit def a2Rich(a : A) = new Rich(a)
    val a = new A
    a.rich


    def testParam(implicit name : String){
      println(name)
    }
    implicit val name = "implicit!!!"
    testParam
implicit class Calculator(x : Int){
    def add(a : Int) : Int = a + 1
}
println(1.add(1))
}
---------------Spark概述与编程模型---------------------
Spark的快只是因为内存?
内存计算
DAG
支持三种语言的API:Scala、Python、Java
有四种运行模式:local()
--------------集群配置-----------------------------------------
spark-env.sh
export JAVA_HOME=
export SPARK_MASTER_IP=
export SPARK_WORKER_CORES=
export SPARK_WORKER_INSTANCES=
export SPARK_WORKER_MEMORY=
export SPARK_MASTER_PORT=
export SPAEK_JAVA_OPTS="-verbose:gc -XX:-PrintGCDetails -XX:+PrintGCTimeStamps"


slaves
xxx.xxx.xxx.2
xxx.xxx.xxx.3
xxx.xxx.xxx.4
xxx.xxx.xxx.5
----------------------------------------------------------------------------------------
shell 运行
MASTER=local[4] ADD_JARS=code.jar ./spark-shell
MASTER=spark://host:port
指定executor内存:export SPARK_MEM=25g
-------------------------------------------------------------------
class Analysis{
}
object Analysis{
    def main(arg : Array[String]){
    if(args.length !=3){
        println("Usage : java -jar code.jar dependency_jars file_location save_location")
System.exit(0)
    }
    val jars = ListBuffer[String]()
    args(0).split(‘.‘).map(jar += _)


    val conf = new SparkConf()
    conf.setMaster("spark://master:8080")
        .setSparkHome("/usr/hadoop/spark-1.6.0-cdh5.7.1")
.setAppName("analysis")
.setJar(jars)
.set("spark.executor.memory","2g")


    val sc = new SparkContext(conf)
    val data = sc.textFile(args(1))


    data.cache


    println(data.count)


    data.file(_.split(‘ ‘).length == 3).map(_.split(‘ ‘)(1)).map((_,1)).reduceByKey(_+_)
    .map(x=>(x._2,x._1)).sortByKey(false).map(x => (x._2,x._1)).saveAsTextFile(args(2))
    }
}
-----------------------------------------------------------------------------
java -jar /home/cloudera/IdeaProjects/scala/out/artifacts/scala_jar/scala.jar /home/cloudera/IdeaProjects/scala/out/artifacts/scala_jar/scala.jar hdfs://quickstart.cloudera:8020/spark/test.txt hdfs://quickstart.cloudera:8020/spark/


val rdd1 = sc.textFile("hdfs://192.168.1.198:8020/spark/SogouQ1.txt")
rdd1.toDebugString
val words=rdd1.flatMap(_.split(" "))
val wordscount=words.map(x=>(x,1)).reduceByKey(_+_)
wordscount.collect
wordscount.toDebugString


val rdd1 = sc.textFile("hdfs://192.168.1.198:8020/spark/test.txt",9)
val rdd2=rdd1.map(_.split("\t")).filter(_.length==6)
val rdd3=rdd2.filter(_(3).toInt==1).filter(_(4).toInt==2)
rdd3.count()


spark-shell --executor-memory 2g --driver-memory 1g --master spark://192.168.1.198:7077


val rdd = sc.textFile("hdfs://192.168.1.198:8020/spark/test1.txt")
rdd.cache()
val wordcount = rdd.flatMap(_.split(" ")).map(x=>(x,1)).reduceByKey(_+_)
wordcount.take(10)
val wordsort = wordcount.map(x=>(x._2,x._1)).sortByKey(false).map(x=>(x._2,x._1))
wordsort.take(10)


val data=sc.textFile("hdfs://192.168.1.198:8020/spark/SogouQ1.txt")
data.count
data.map(_.split(‘\t‘)(0)).filter(_ < "20111230010101").count
data.map(_.split(‘\t‘)(3)).filter(_.toInt == 1).count
data.map(_.split(‘\t‘)).filter(_(3).toInt == 1).filter(_(4).toInt == 1).count
data.map(_.split(‘\t‘)).filter(_(2).contains("baidu")).count
--------maven--依赖-----包----------
<project>
<dependencies>
  <dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-core_2.10</artifactId>
    <version>1.5.0-incubating</version>
  </dependency>


  <dependency>
    <groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-client</artifactId>
    <version>2.6.0</version>
  </dependency>
<dependencies>
</project>


-----------------------------------------------------
??学习下maven
------深入Spark内核-----------
术语解释
Application 基于Spark的用户,包含了driver程序
Driver Program 运行main函数并且新建SparkContext的程序
Cluster Manager 在集群上获取资源的外部服务(例如:standalone.Mesos,Yarn)
Worker Node 集群中任何可以运行应用代码的节点
Executor 是在一个worker node上为某应用启动的一个进展,该进程负责运行任务,并且负责将数据存在内存或者磁盘上。每个应用都有各自独立的executors
Task 被送到某个executor上的工作单元
Job 包含很多任务的并行计算,可以看做和Spark的action对应
Stage 一个Job会被拆分很多组任务,每组任务被称为Stage(就像Marpreduce分map任务和reduce任务一样)
|--------------------------------------------------------------------------|
|Cluster Overview                                                          |
|--------------------------------------------------------------------------|
|Driver Program                                                            |
|SparkContext <---------->   Cluster Manager  <-----------> Worker Node    |
|                                                           Executor Cache |
|                    Task Task      |
|                                                                          |
|                    Worker Node    |
|                    Executor Cache |
|                    Task Task      |
|--------------------------------------------------------------------------|


---------数据本地性---------------
第一次运行时数据不在内存中,所以从HDFS上取,任务最好运行在数据所在的节点上         文件系统本地性
第二次运行,数据已经在内存中,所以任务最好运行在该数据所在内存的节点上             内存本地性
万一有数据被置换出内存,则任然从HDFS上取                                           LRU置换
-------------------------------------------------------------------------------------------
再看RDD
分区 protected def getPartitions:Array[Partition]
依赖 protected def getDependencies:Seq[Dependency[]] = deps
函数 def computes(split:Partition,context:TaskContext):Iterator[T]
最佳位置(可选) protected def getPreferredLocations(split:Partition):Seq[String]=Nil
分区策略(可选)@transient val partitioner: Option[Partitioner] = None
------------------------------------------------------------------------------------------
最常见的HadoopRDD
分区:每个HDFS block
依赖:无
函数:读取每一个block
最佳位置:HDFS block所在位置
分区策略:无
----------------------------------------------------------------------
FilteredRDD
分区:与父RDD一致
依赖:与父RDD一对一
函数:计算父RDD的每个分区并过滤
最佳位置:无(与父RDD一致)
分区策略:无
------------------------------------------------------
JoinedRdd
分区:每个reduce任务一个分区
依赖:依赖所有父RDD
函数:读取suffle数据并计算
最佳位置:无
分区策略:HashPartitioner(partitions:Int)
---------------------------------------------
细看DAG Scheduler
目标RDD    计算每个分区的函数     结果监听器
           DAG Scheduler


基于Stage构建DAG,决定每个任务的最佳位置
记录哪个RDD或者Stage输出被物化
将taskset传给底层调度器TaskScheduler
重新提交shuffle输出丢失的stage
--------------------------------------------
调度器优化
一个Stage内的窄依赖进行pipeline操作
1+1+1+1=4  1+1=2 2+1=3 3+1=4
基于partition选择最优的join算法使shuffle的数据最小化
重用已经缓存过的数据
--------------------------------------------
Task细节
外部存储           Task         
shuffle数据      f1->f2->f3  ---------> 输出文件


Stage边界只出现在外部输入及取shuffle数据的时候
为了容错,会把suffle输出写在磁盘或者内存
任何一个任务可以运行在任何一个节点
允许任务使用那些被缓存但是已经被置换出去的数据
--------------------------------------------
TaskScheduler
提交taskset(一组task)到集群运行并汇报结果
出现shuffle输出lost要报告fetch failed错误
碰到straggle任务需要放到别的节点上重试
为每一个TaskSet维护一个TaskSetManager(追踪本地性及错误信息)
------------------------------------------
广播变量 Broadcast variables
BT形式的广播变量
使用场景:lookup表,mapside join
注意点:只读、存于每台worker的cache,不随task发送
使用方式:val broadcastVar = sc.broadcast(Array(1,2,3))
              broadcastVar.value
--------------------------------------------------
累加器Accumulators
只增
类似与MapReduce中的counter
用法:val accum = sc.accumulator(0)
          sc.parallelize(Array(1,2,3,4)).foreach(x=>accum +=x)
 accum.value
----------------------------------------------
性能调优
优化点1
问题:Task序列化后太大
解决:使用广播变量
优化点2
问题:val rdd=data.filter(f1).filter(f2).reduceBy...
经过以上语句会有很多空任务或者小任务
解决:使用coalesce或者repartition去减少RDD中partition数量
--------------------------------------------------------
优化点3
问题:每个记录的开销太大
rdd.map{x=conn=getDBConn;conn.write(x.toString);conn.close}
解决:rdd.mapPartitions(records=>conn.getDBConn;for(item <- records))
      write(item:toString);
      conn.close)
-------------------------------------------------------
优化点4
问题:任务执行速度倾斜
解决:1、数据倾斜(一般是partition key取的不好) 考虑其他的并行处理方式 中间可以加入一步aggregation
      2、Worker倾斜(在某些worker上的executor不给力)
      设置spark.speculation=true把哪些持续不给力的node去掉
------------------------------------------------------------
优化点5
问题:不设置spark.local.dir 这是spark写shuffle输出的地方
解决:设置一组磁盘
spark.local.dir=/mn1/spark,/mnt2/spark,/mnt3/spark
--------------------------------------------------------
优化点6
问题:reducer数量不合适
解决:需要按照实际情况调整
太多的reducer,造成很多的小任务,以此产生很多启动任务的开销
太少的reducer,任务执行慢
--------------------------------------------------------
优化7
问题:collect输出大量结果慢
解决:直接输出到分布式文件系统
-------------------------------------------
优化8
问题:序列化 Spark默认使用JDK自带的ObjectOutputStream
解决:使用Kryo serialization
---开源框架---------------------------------------
-----DB---
Cassandra
HBase
MongoDB
Terrastore
Redis
SSDB
MySQL
-----SQL on Hadoop(Spark)----------------------
Hive
Shark(Catalyst)
Impala
------------日志(数据)收集------------------
Sqoop
Flume
Chukwa
Kafka
DataX
Dbsync
TimeTunnel
------------ML---------------------
Mahout
MLib
--------other---------------
Zookeeper
Ooize
Hue
-----------------------





























































Spark小象学院笔记

标签:

原文地址:http://blog.csdn.net/china_demon/article/details/51991039

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!