码迷,mamicode.com
首页 > 编程语言 > 详细

第2天Python实战Spark大数据分析及调度-RDD编程

时间:2020-05-31 16:03:22      阅读:92      评论:0      收藏:0      [点我收藏+]

标签:两种   orm   set   data   running   after   显示   创建   BMI   

Spark提供的主要抽象是resilient distributed dataset(RDD) 弹性分布式数据集,它是跨集群节点划分的元素的集合,可以并行操作。通过从Hadoop文件系统(或任何其他Hadoop支持的文件系统)中的文件或驱动程序中现有的Scala集合开始并进行转换来创建RDD。用户还可以要求SparkRDD 保留在内存中,以使其能够在并行操作中有效地重用。最后,RDD自动从节点故障中恢复。

Spark中的第二个抽象是可以在并行操作中使用的共享变量。默认情况下,当Spark作为一组任务在不同节点上并行运行一个函数时,它会将函数中使用的每个变量的副本传送给每个任务。有时,需要在任务之间或任务与驱动程序之间共享变量。Spark支持两种类型的共享变量:广播变量(可用于在所有节点上的内存中缓存值)和累加器(accumulator),这些变量仅被“添加”到其上,例如计数器和总和

 

RDD五大特性

  • A list of partitions

    一组分区:RDD由很多partition构成,有多少partition就对应有多少task

  • A function for computing each split

    一个函数:对RDD做计算,相当于对RDD的每个split或partition做计算

  • A list of dependencies on other RDDs

    RDD之间有依赖关系,可溯源

  • Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned)

    一个Partitioner:即RDD的分片函数,如果RDD里面存的数据是key-value形式,则可以传递一个自定义的Partitioner进行重新分区

  • Optionally, a list of preferred locations to compute each split on (e.g. block locations for an HDFS file)

    一个列表:存储存取每个Partition的优先位置(preferred location),计算每个split时,在split所在机器的本地上运行task是最好的,避免了数据的移动,split有多个副本,所以preferred location不止一个

 

初始化Spark

Spark程序做的第一件事情就是创建一个SparkContext对象,该对象告诉Spark如何访问集群,要创建一个SparkContext首先需要构建一个SparkConf对象,其中包含应用程序程序的信息

from pyspark import SparkConf, SparkContext
conf
= SparkConf().setAppName(appName).setMaster(master) sc = SparkContext(conf=conf)

# 业务逻辑

sc.stop()

appName参数是应用程序显示在集群UI上的名称

master是Spark,Mesos或YARN群集URL或特殊的“本地”字符串,以本地模式运行

当在集群上运行时,您将不希望master在程序中进行硬编码,而是在其中启动应用程序spark-submit并在其中接收。但是,对于本地测试和单元测试,您可以传递“ local”以在内部运行Spark

注意:

  在PySpark Shell中,已经为我们初始化了Spark, 变量为sc, 我们自己配置的SparkContext将不起作用,也就是我们自己不用再初始化了

 

创建RDD的两种方式

方式一:  通过现有的可迭代对象或集合调用SparkContextparallelize创建

data = [1, 2, 3, 4, 5]
rdd = sc.parallelize(data)

创建rdd后可以并行操作。例如调用distData.reduce(lambda a, b: a + b)计算集合元素的和

>>> rdd.reduce(lambda a,b: a+b)
15

并行集合的一个重要参数就是将数据集切入分区,Spark将为集群的每个分区运行一个任务。通常,群集中的每个CPU都需要2-4个分区。通常,Spark会尝试根据您的集群自动设置分区数。但是,您也可以通过将其作为第二个参数传递给parallelize(例如sc.parallelize(data, 10))来手动设置它。

 

方式二: 外部数据集

PySpark可以从Hadoop支持的任何存储源创建分布式数据集,包括您的本地文件系统,HDFSCassandraHBaseAmazon S3等。Spark支持文本文件,SequenceFiles和任何其他Hadoop InputFormat

可以使用SparkContexttextFile方法创建文本文件RDD 。此方法需要一个URI的文件(本地路径的机器上,或一个hdfs://s3a://等URI),并读取其作为行的集合。这是一个示例调用:

rdd = sc.textFile("data.txt")

 

RDD操作

RDD支持两种类型操作:

  1.  transformation(转换): create a new dataset from an existing one 从现有的数据集中创建新数据集
  2.  action(动作): return a value to the driver program after running a conputation on the dataset  对数据集执行计算后,将值返回给驱动程序

 

常用的transformation

...

 

第2天Python实战Spark大数据分析及调度-RDD编程

标签:两种   orm   set   data   running   after   显示   创建   BMI   

原文地址:https://www.cnblogs.com/sellsa/p/13019123.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!