标签:
1
2
3
4
5
6
7
8
9
10
11
|
val
NONE = new StorageLevel(false, false, false) val
DISK_ONLY = new StorageLevel(true, false, false) val
DISK_ONLY_2 = new StorageLevel(true, false, false, 2) val
MEMORY_ONLY = new StorageLevel(false, true, true) val
MEMORY_ONLY_2 = new StorageLevel(false, true, true, 2) val
MEMORY_ONLY_SER = new StorageLevel(false, true, false) val
MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, 2) val
MEMORY_AND_DISK = new StorageLevel(true, true, true) val
MEMORY_AND_DISK_2 = new StorageLevel(true, true, true, 2) val
MEMORY_AND_DISK_SER = new StorageLevel(true, true, false) val
MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, 2) |
val file = spark.textFile("hdfs://...")
,file变量就是RDD(实际是HadoopRDD实例),生成的它的核心代码如下:
1
2
3
4
5
6
7
8
|
//
SparkContext根据文件/目录及可选的分片数创建RDD, 这里我们可以看到Spark与Hadoop MapReduce很像 //
需要InputFormat, Key、Value的类型,其实Spark使用的Hadoop的InputFormat, Writable类型。 def
textFile(path: String, minSplits: Int = defaultMinSplits): RDD[String] = {
hadoopFile(path,
classOf[TextInputFormat], classOf[LongWritable], classOf[Text],
minSplits) .map(pair => pair._2.toString) } //
根据Hadoop配置,及InputFormat等创建HadoopRDD new
HadoopRDD(this, conf, inputFormatClass, keyClass, valueClass, minSplits) |
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
|
//
根据hadoop配置和分片从InputFormat中获取RecordReader进行数据的读取。 reader
= fmt.getRecordReader(split.inputSplit.value, conf, Reporter.NULL) val
key: K = reader.createKey() val
value: V = reader.createValue() //使用Hadoop
MapReduce的RecordReader读取数据,每个Key、Value对以元组返回。 override
def getNext() = { try
{ finished
= !reader.next(key, value) }
catch { case
eof: EOFException => finished
= true } (key,
value) } |
1
2
3
4
5
6
7
8
9
10
11
12
13
|
val
sc = new SparkContext(master, "Example", System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_TEST_JAR"))) val
rdd_A = sc.textFile(hdfs://.....) val
rdd_B = rdd_A.flatMap((line => line.split("\\s+"))).map(word => (word, 1)) val
rdd_C = sc.textFile(hdfs://.....) val
rdd_D = rdd_C.map(line => (line.substring(10), 1)) val
rdd_E = rdd_D.reduceByKey((a, b) => a + b) val
rdd_F = rdd_B.jion(rdd_E) rdd_F.saveAsSequenceFile(hdfs://....) |
1
2
3
|
val sc
= new
SparkContext(master, appName, [sparkHome], [jars]) val textFile
= sc.textFile( "hdfs://....." ) textFile.map(....).filter(.....)..... |
1
2
3
4
5
6
7
8
9
|
JavaSparkContext
sc = new JavaSparkContext(...);
JavaRDD
lines = ctx.textFile( "hdfs://..." ); JavaRDD
words = lines.flatMap( new FlatMapFunction<String,
String>() { public Iterable
call(String s) { return Arrays.asList(s.split( "
" )); } } ); |
1
2
3
4
|
from pyspark
import SparkContext
sc = SparkContext( "local" , "Job
Name" ,
pyFiles = [ ‘MyFile.py‘ , ‘lib.zip‘ , ‘app.egg‘ ]) words = sc.textFile( "/usr/share/dict/words" ) words. filter ( lambda w:
w.startswith( "spar" )).take( 5 ) |
http://spark-project.org/download/spark-0.7.3-prebuilt-cdh4.tgz
)
1
2
3
4
5
6
7
8
9
|
SCALA_HOME=/home/spark/scala-2.9.3 JAVA_HOME=/home/spark/jdk1.6.0_45 SPARK_MASTER_IP=spark1 SPARK_MASTER_PORT=30111 SPARK_MASTER_WEBUI_PORT=30118 SPARK_WORKER_CORES=2
SPARK_WORKER_MEMORY=4g SPARK_WORKER_PORT=30333 SPARK_WORKER_WEBUI_PORT=30119 SPARK_WORKER_INSTANCES=1 |
1
|
$SPARK_HOME/start-all.sh |
1
|
git
clone git://github.com/mesos/spark |
1
2
|
cd
spark git
checkout -b yarn --track origin/yarn |
1
2
3
|
$SPARK_HOME/sbt/sbt >
package >
assembly |
1
2
3
|
SPARK_JAR=./core/target/scala-2.9.3/spark-core-assembly-0.8.0-SNAPSHOT.jar
\ ./run
spark.deploy.yarn.Client --jar examples/target/scala-2.9.3/ \ --class
spark.examples.SparkPi --args yarn-standalone |
$SPARK_HOME/spark-shell
进入shell即可,在Spark-shell中SparkContext已经创建好了,实例名为sc可以直接使用,还有一个需要注意的是,在Standalone模式下,Spark默认使用的调度器的FIFO调度器而不是公平调度,而Spark-shell作为一个Spark程序一直运行在Spark上,其它的Spark程序就只能排队等待,也就是说同一时间只能有一个Spark-shell在运行。
1
2
3
4
5
6
7
8
|
scala>
val textFile = sc.textFile("hdfs://hadoop1:2323/user/data") textFile:
spark.RDD[String] = spark.MappedRDD@2ee9b6e3 scala>
textFile.count() // Number of items in this RDD res0:
Long = 21374 scala>
textFile.first() // First item in this RDD res1:
String = # Spark |
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
|
import
spark.SparkContext import
SparkContext._ object
WordCount { def
main(args: Array[String]) { if
(args.length ==0 ){ println("usage
is org.test.WordCount ") } println("the
args: ") args.foreach(println) val
hdfsPath = "hdfs://hadoop1:8020" //
create the SparkContext, args(0)由yarn传入appMaster地址 val
sc = new SparkContext(args(0), "WrodCount", System.getenv("SPARK_HOME"),
Seq(System.getenv("SPARK_TEST_JAR"))) val
textFile = sc.textFile(hdfsPath + args(1)) val
result = textFile.flatMap(line => line.split("\\s+")) .map(word
=> (word, 1)).reduceByKey(_ + _) result.saveAsTextFile(hdfsPath
+ args(2)) } } |
标签:
原文地址:http://blog.csdn.net/fanyun_01/article/details/50921847