标签:stream filter nbsp action rda apache pat 字符 1.0
http://blog.csdn.net/pipisorry/article/details/52366356
我们可通过如下方式调用 SparkContext 的简单构造函数,以默认的参数值来创建相应的对象。
val sc = new SparkContext("local[4]", "Test Spark App")
这段代码会创建一个4线程的 SparkContext 对象,并将其相应的任务命名为 Test Spark APP 。
[pyspark.SparkContext(master=None, appName=None, sparkHome=None, pyFiles=None, environment=None, batchSize=0, serializer=PickleSerializer(), conf=None, gateway=None, jsc=None, profiler_cls=<class ‘pyspark.profiler.BasicProfiler‘>)?]
RDD可从现有的集合创建。比如在Scala shell中:
val collection = List("a", "b", "c", "d", "e")
val rddFromCollection = sc.parallelize(collection)
RDD也可以基于Hadoop的输入源创建,比如本地文件系统、 HDFS和Amazon S3。
用一个本地文件系统里的文件创建RDD:
val rddFromTextFile = sc.textFile("LICENSE")
在Spark编程模式下,所有的操作被分为转换(transformation)和执行(action)两种。
一般来说,转换操作是对一个数据集里的所有记录执行某种函数,从而使记录发生改变;而执行通常是运行某些计算或聚合操作,并将结果返回运行 SparkContext 的那个驱动程序。
Spark中的转换操作是延后的,也就是说,在RDD上调用一个转换操作并不会立即触发相应的计算。相反,这些转换操作会链接起来,并只在有执行操作被调用时才被高效地计算。这样,大部分操作可以在集群上并行执行,只有必要时才计算结果并将其返回给驱动程序(如执行操作count),从而提高了Spark的效率。
map 函数,我们将每一个字符串都转换为一个整数,从而返回一个由若干 Int 构成的RDD对象。
val intsFromStringsRDD = rddFromTextFile.map(line => line.size)
转换
reduce(func) | 通过函数func聚集数据集中的所有元素。Func函数接受2个参数,返回一个值。这个函数必须是关联性的,确保可以被正确的并发执行 |
collect() | 在Driver的程序中,以数组的形式,返回数据集的所有元素。这通常会在使用filter或者其它操作后,返回一个足够小的数据子集再使用,直接将整个RDD集Collect返回,很可能会让Driver程序OOM。返回所有元素到驱动程序 |
count() | 返回数据集的元素个数 |
take(n) | 返回一个数组,由数据集的前n个元素组成。注意,这个操作目前并非在多个节点上,并行执行,而是Driver程序所在机器,单机计算所有的元素(Gateway的内存压力会增大,需要谨慎使用)。返回前k个元素到驱动程序 |
first() | 返回数据集的第一个元素(类似于take(1);也就是返回第1个元素到驱动程序 |
saveAsTextFile(path) | 将数据集的元素,以textfile的形式,保存到本地文件系统,hdfs或者任何其它hadoop支持的文件系统。Spark将会调用每个元素的toString方法,并将它转换为文件中的一行文本 |
saveAsSequenceFile(path) | 将数据集的元素,以sequencefile的格式,保存到指定的目录下,本地系统,hdfs或者任何其它hadoop支持的文件系统。RDD的元素必须由key-value对组成,并都实现了Hadoop的Writable接口,或隐式可以转换为Writable(Spark包括了基本类型的转换,例如Int,Double,String等等) |
foreach(func) | 在数据集的每一个元素上,运行函数func。这通常用于更新一个累加器变量,或者和外部存储系统做交互 |
操作
map(func) | 返回一个新的分布式数据集,由每个原元素经过func函数转换后组成 |
filter(func) | 返回一个新的数据集,由经过func函数后返回值为true的原元素组成 |
flatMap(func) | 类似于map,但是每一个输入元素,会被映射为0到多个输出元素(因此,func函数的返回值是一个Seq,而不是单一元素) |
sample(withReplacement, frac, seed) | 根据给定的随机种子seed,随机抽样出数量为frac的数据 |
union(otherDataset) | 返回一个新的数据集,由原数据集和参数联合而成 |
groupByKey([numTasks]) | 在一个由(K,V)对组成的数据集上调用,返回一个(K,Seq[V])对的数据集。注意:默认情况下,使用8个并行任务进行分组,你可以传入numTask可选参数,根据数据量设置不同数目的Task |
reduceByKey(func, [numTasks]) | 在一个(K,V)对的数据集上使用,返回一个(K,V)对的数据集,key相同的值,都被使用指定的reduce函数聚合到一起。和groupbykey类似,任务的个数是可以通过第二个可选参数来配置的。 |
join(otherDataset, [numTasks]) | 在类型为(K,V)和(K,W)类型的数据集上调用,返回一个(K,(V,W))对,每个key中的所有元素都在一起的数据集 |
groupWith(otherDataset, [numTasks]) | 在类型为(K,V)和(K,W)类型的数据集上调用,返回一个数据集,组成元素为(K, Seq[V], Seq[W]) Tuples。这个操作在其它框架,称为CoGroup |
cartesian(otherDataset) | 笛卡尔积。但在数据集T和U上调用时,返回一个(T,U)对的数据集,所有元素交互进行笛卡尔积。 |
注意事项
通常只在需将结果返回到驱动程序所在节点以供本地处理时,才调用 collect 函数。
注意, collect 函数一般仅在的确需要将整个结果集返回驱动程序并进行后续处理时才有必要调用。如果在一个非常大的数据集上调用该函数,可能耗尽驱动程序的可用内存,进而导致程序崩溃。
高负荷的处理应尽可能地在整个集群上进行,从而避免驱动程序成为系统瓶颈。然而在不少情况下,将结果收集到驱动程序的确是有必要的。很多机器学习算法的迭代过程便属于这类情况。
[RDD支持的转换和执行操作的完整列表以及更为详细的例子
参见《Spark编程指南》(http://spark.apache.org/docs/latest/programming-guide.html#rdd operations)
以及Spark API(Scala)文档(http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.rdd.RDD)。]
rddFromTextFile.cache
调用一个RDD的 cache 函数将会告诉Spark将这个RDD缓存在内存中。在RDD首次调用一个执行操作时,这个操作对应的计算会立即执行,数据会从数据源里读出并保存到内存。因此,首次调用 cache 函数所需要的时间会部分取决于Spark从输入源读取数据所需要的时间。但是,当下一次访问该数据集的时候,数据可以直接从内存中读出从而减少低效的I/O操作,加快计算。多数情况下,这会取得数倍的速度提升。
Spark的另一个核心功能是能创建两种特殊类型的变量:广播变量和累加器。
广播变量(broadcast variable)为只读变量,它由运行 SparkContext 的驱动程序创建后发送给会参与计算的节点。对那些需要让各工作节点高效地访问相同数据的应用场景,比如机器学习,
这非常有用。Spark下创建广播变量只需在 SparkContext 上调用一个方法即可:
val broadcastAList = sc.broadcast(List("a", "b", "c", "d", "e"))
终端的输出表明,广播变量存储在内存中。
广播变量也可以被非驱动程序所在的节点(即工作节点)访问,访问的方法是调用该变量的value 方法:
sc.parallelize(List("1", "2", "3")).map(x => broadcastAList.value ++ x).collect
RDD分布 各节点map操作 合并到主节点
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
/**
* 用Scala编写的一个简单的Spark应用
*/
object ScalaApp {
在主函数里,我们要初始化所需的 SparkContext 对象,并且用它通过 textFile 函数来访问CSV数据文件。之后对每一行原始字符串以逗号为分隔符进行分割,提取出相应的用户名、产品和价格信息,从而完成对原始文本的映射:
def main(args: Array[String]) {
val sc = new SparkContext("local[2]", "First Spark App")
// 将CSV格式的原始数据转化为(user,product,price)格式的记录集
val data = sc.textFile("data/UserPurchaseHistory.csv")
.map(line => line.split(","))
.map(purchaseRecord => (purchaseRecord(0), purchaseRecord(1),
purchaseRecord(2)))
现在,我们有了一个RDD,其每条记录都由 (user, product, price) 三个字段构成。我们可以对商店计算如下指标:购买总次数 客户总个数 总收入1.4 最畅销的产品
// 求购买次数
val numPurchases = data.count()
// 求有多少个不同客户购买过商品
val uniqueUsers = data.map{ case (user, product, price) => user }.distinct().count()
// 求和得出总收入
val totalRevenue = data.map{ case (user, product, price) => price.toDouble }.sum()
// 求最畅销的产品是什么
val productsByPopularity = data
.map{ case (user, product, price) => (product, 1) }
.reduceByKey(_ + _)
.collect()
.sortBy(-_._2)
val mostPopular = productsByPopularity(0)
最后那段计算最畅销产品的代码演示了如何进行Map/Reduce模式的计算,该模式随Hadoop而流行。第一步,我们将 (user, product, price) 格式的记录映射为 (product, 1) 格式。然后,我们执行一个 reduceByKey 操作,它会对各个产品的1值进行求和。转换后的RDD包含各个商品的购买次数。有了这个RDD后,我们可以调用 collect 函数,这会将其计算结果以Scala集合的形式返回驱动程序。之后在驱动程序的本地对这些记录按照购买次数进行排序。(注意,在实际处理大量数据时,我们通常通过 sortByKey 这类操作来对其进行并行排序。) 最后,可在终端上打印出计算结果:
println("Total purchases: " + numPurchases)
println("Unique users: " + uniqueUsers)
println("Total revenue: " + totalRevenue)
println("Most popular product: %s with %d purchases".
format(mostPopular._1, mostPopular._2))
}
}
可以在项目的主目录下执行 sbt run 命令来运行这个程序。如果你使用了IDE的话,也可以从Scala IDE直接运行。最终的输出应该与下面的内容相似:
...
[info] Compiling 1 Scala source to ...
[info] Running ScalaApp
...
14/01/30 10:54:40 INFO spark.SparkContext: Job finished: collect at
ScalaApp.scala:25, took 0.045181 s
Total purchases: 5
Unique users: 4
Total revenue: 39.91
Most popular product: iPhone Cover with 2 purchases
Spark的Python API几乎覆盖了所有Scala API所能提供的功能,但的确有些特性,比如SparkStreaming和个别的API方法 , 暂不支持 。具体可参见《 Spark 编 程 指 南 》的 Python 部分 :http://spark.apache.org/docs/latest/programming-guide.html。
"""用Python编写的一个简单Spark应用"""
from pyspark import SparkContext
sc = SparkContext("local[2]", "First Spark App")
# 将CSV格式的原始数据转化为(user,product,price)格式的记录集
data = sc.textFile("data/UserPurchaseHistory.csv").map(lambda line:
line.split(",")).map(lambda record: (record[0], record[1], record[2]))
# 求总购买次数
numPurchases = data.count()
# 求有多少不同客户购买过商品
uniqueUsers = data.map(lambda record: record[0]).distinct().count()
# 求和得出总收入
totalRevenue = data.map(lambda record: float(record[2])).sum()
# 求最畅销的产品是什么
products = data.map(lambda record: (record[1], 1.0)).
reduceByKey(lambda a, b: a + b).collect()
mostPopular = sorted(products, key=lambda x: x[1], reverse=True)[0]
print("Total purchases: %d" % numPurchases)
print("Unique users: %d" % uniqueUsers)
print("Total revenue: %2.2f" % totalRevenue)
print("Most popular product: %s with %d purchases" % (mostPopular[0], mostPopular[1]))
>$SPARK_HOME/bin/spark-submit pythonapp.py
添加到path后直接$ spark-submit SparkLDA.py
代码及数据下载[spark机器学习示例代码]
回归模型
from:ref: [Spark quick-start]
[针对Scala、 Java和Python的Spark programming-guide]
[王家林的spark视频公开课]
Machine Learning with Spark*
标签:stream filter nbsp action rda apache pat 字符 1.0
原文地址:http://blog.csdn.net/pipisorry/article/details/52366356