码迷,mamicode.com
首页 > Web开发 > 详细

Apache Spark 1.4 读取 hadoop 2.6 文件系统上文件

时间:2015-07-12 23:08:03      阅读:274      评论:0      收藏:0      [点我收藏+]

标签:

scala> val file = sc.textFile("hdfs://9.125.73.217:9000/user/hadoop/logs")

scala> val count = file.flatMap(line => line.split(" ")).map(word => (word,1)).reduceByKey(_+_)

scala> count.collect()

以Spark上经典的wordcount为例,验证spark对hdfs文件系统的读写

1. 启动Spark shell

/root/spark-1.4.0-bin-hadoop2.4/bin/spark-shell

log4j:WARN No appenders could be found for logger (org.apache.hadoop.metrics2.lib.MutableMetricsFactory).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
Using Spark‘s default log4j profile: org/apache/spark/log4j-defaults.properties
15/07/12 21:32:05 INFO SecurityManager: Changing view acls to: root
15/07/12 21:32:05 INFO SecurityManager: Changing modify acls to: root
15/07/12 21:32:05 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(root); users with modify permissions: Set(root)
15/07/12 21:32:05 INFO HttpServer: Starting HTTP Server
15/07/12 21:32:05 INFO Utils: Successfully started service ‘HTTP class server‘ on port 50452.
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  ‘_/
   /___/ .__/\_,_/_/ /_/\_\   version 1.4.0
      /_/

Using Scala version 2.10.4 (OpenJDK 64-Bit Server VM, Java 1.7.0_45)
Type in expressions to have them evaluated.
Type :help for more information.
15/07/12 21:32:09 INFO SparkContext: Running Spark version 1.4.0
15/07/12 21:32:10 INFO SecurityManager: Changing view acls to: root
15/07/12 21:32:10 INFO SecurityManager: Changing modify acls to: root
15/07/12 21:32:10 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(root); users with modify permissions: Set(root)
15/07/12 21:32:10 INFO Slf4jLogger: Slf4jLogger started
15/07/12 21:32:10 INFO Remoting: Starting remoting
15/07/12 21:32:10 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkDriver@9.125.73.217:35775]
15/07/12 21:32:10 INFO Utils: Successfully started service ‘sparkDriver‘ on port 35775.
15/07/12 21:32:10 INFO SparkEnv: Registering MapOutputTracker
15/07/12 21:32:10 INFO SparkEnv: Registering BlockManagerMaster
15/07/12 21:32:10 INFO DiskBlockManager: Created local directory at /tmp/spark-6bd4dc00-8a04-4b62-8f16-76f4beeba918/blockmgr-b0db297e-f183-4ca5-8cb5-7ee943df509d
15/07/12 21:32:10 INFO MemoryStore: MemoryStore started with capacity 265.4 MB
15/07/12 21:32:10 INFO HttpFileServer: HTTP File server directory is /tmp/spark-6bd4dc00-8a04-4b62-8f16-76f4beeba918/httpd-b22e2de4-9618-4bba-b25a-a8c1fd28826d
15/07/12 21:32:10 INFO HttpServer: Starting HTTP Server
15/07/12 21:32:10 INFO Utils: Successfully started service ‘HTTP file server‘ on port 55255.
15/07/12 21:32:10 INFO SparkEnv: Registering OutputCommitCoordinator
15/07/12 21:32:11 INFO Utils: Successfully started service ‘SparkUI‘ on port 4040.
15/07/12 21:32:11 INFO SparkUI: Started SparkUI at http://9.125.73.217:4040
15/07/12 21:32:11 INFO Executor: Starting executor ID driver on host localhost
15/07/12 21:32:11 INFO Executor: Using REPL class URI: http://9.125.73.217:50452
15/07/12 21:32:11 INFO Utils: Successfully started service ‘org.apache.spark.network.netty.NettyBlockTransferService‘ on port 60268.
15/07/12 21:32:11 INFO NettyBlockTransferService: Server created on 60268
15/07/12 21:32:11 INFO BlockManagerMaster: Trying to register BlockManager
15/07/12 21:32:11 INFO BlockManagerMasterEndpoint: Registering block manager localhost:60268 with 265.4 MB RAM, BlockManagerId(driver, localhost, 60268)
15/07/12 21:32:11 INFO BlockManagerMaster: Registered BlockManager
15/07/12 21:32:11 INFO SparkILoop: Created spark context..
Spark context available as sc.
15/07/12 21:32:12 INFO HiveContext: Initializing execution hive, version 0.13.1
15/07/12 21:32:12 INFO HiveMetaStore: 0: Opening raw store with implemenation class:org.apache.hadoop.hive.metastore.ObjectStore
15/07/12 21:32:12 INFO ObjectStore: ObjectStore, initialize called
15/07/12 21:32:13 INFO Persistence: Property datanucleus.cache.level2 unknown - will be ignored
15/07/12 21:32:13 INFO Persistence: Property hive.metastore.integral.jdo.pushdown unknown - will be ignored
15/07/12 21:32:13 WARN Connection: BoneCP specified but not present in CLASSPATH (or one of dependencies)
15/07/12 21:32:13 WARN Connection: BoneCP specified but not present in CLASSPATH (or one of dependencies)
15/07/12 21:32:14 INFO ObjectStore: Setting MetaStore object pin classes with hive.metastore.cache.pinobjtypes="Table,StorageDescriptor,SerDeInfo,Partition,Database,Type,FieldSchema,Order"
15/07/12 21:32:15 INFO MetaStoreDirectSql: MySQL check failed, assuming we are not on mysql: Lexical error at line 1, column 5.  Encountered: "@" (64), after : "".
15/07/12 21:32:15 INFO Datastore: The class "org.apache.hadoop.hive.metastore.model.MFieldSchema" is tagged as "embedded-only" so does not have its own datastore table.
15/07/12 21:32:15 INFO Datastore: The class "org.apache.hadoop.hive.metastore.model.MOrder" is tagged as "embedded-only" so does not have its own datastore table.
15/07/12 21:32:17 INFO Datastore: The class "org.apache.hadoop.hive.metastore.model.MFieldSchema" is tagged as "embedded-only" so does not have its own datastore table.
15/07/12 21:32:17 INFO Datastore: The class "org.apache.hadoop.hive.metastore.model.MOrder" is tagged as "embedded-only" so does not have its own datastore table.
15/07/12 21:32:17 INFO ObjectStore: Initialized ObjectStore
15/07/12 21:32:17 WARN ObjectStore: Version information not found in metastore. hive.metastore.schema.verification is not enabled so recording the schema version 0.13.1aa
15/07/12 21:32:17 INFO HiveMetaStore: Added admin role in metastore
15/07/12 21:32:17 INFO HiveMetaStore: Added public role in metastore
15/07/12 21:32:17 INFO HiveMetaStore: No user is added in admin role, since config is empty
15/07/12 21:32:18 INFO SessionState: No Tez session required at this point. hive.execution.engine=mr.
15/07/12 21:32:18 INFO SparkILoop: Created sql context (with Hive support)..
SQL context available as sqlContext.

scala>

 

2. 读取hdfs上的文件

scala> val file = sc.textFile("hdfs://9.125.73.217:9000/hbase/hbase.version")
15/07/12 21:34:50 INFO MemoryStore: ensureFreeSpace(80368) called with curMem=0, maxMem=278302556
15/07/12 21:34:50 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 78.5 KB, free 265.3 MB)
15/07/12 21:34:50 INFO MemoryStore: ensureFreeSpace(17237) called with curMem=80368, maxMem=278302556
15/07/12 21:34:50 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 16.8 KB, free 265.3 MB)
15/07/12 21:34:50 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on localhost:60268 (size: 16.8 KB, free: 265.4 MB)
15/07/12 21:34:50 INFO SparkContext: Created broadcast 0 from textFile at <console>:21
file: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[1] at textFile at <console>:21

 

3.  计算单词数量

scala> val count = file.flatMap(line => line.split(" ")).map(word => (word,1)).reduceByKey(_+_)

scala> val count = file.flatMap(line => line.split(" ")).map(word => (word,1)).reduceByKey(_+_)
15/07/12 21:38:43 INFO FileInputFormat: Total input paths to process : 1
count: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[8] at reduceByKey at <console>:23

scala>

scala> count.collect()
15/07/12 21:39:25 INFO SparkContext: Starting job: collect at <console>:26
15/07/12 21:39:25 INFO DAGScheduler: Registering RDD 7 (map at <console>:23)
15/07/12 21:39:25 INFO DAGScheduler: Got job 0 (collect at <console>:26) with 3 output partitions (allowLocal=false)
15/07/12 21:39:25 INFO DAGScheduler: Final stage: ResultStage 1(collect at <console>:26)
15/07/12 21:39:25 INFO DAGScheduler: Parents of final stage: List(ShuffleMapStage 0)
15/07/12 21:39:25 INFO DAGScheduler: Missing parents: List(ShuffleMapStage 0)
15/07/12 21:39:25 INFO DAGScheduler: Submitting ShuffleMapStage 0 (MapPartitionsRDD[7] at map at <console>:23), which has no missing parents
15/07/12 21:39:25 INFO MemoryStore: ensureFreeSpace(4128) called with curMem=297554, maxMem=278302556
15/07/12 21:39:25 INFO MemoryStore: Block broadcast_2 stored as values in memory (estimated size 4.0 KB, free 265.1 MB)
15/07/12 21:39:25 INFO MemoryStore: ensureFreeSpace(2305) called with curMem=301682, maxMem=278302556
15/07/12 21:39:25 INFO MemoryStore: Block broadcast_2_piece0 stored as bytes in memory (estimated size 2.3 KB, free 265.1 MB)
15/07/12 21:39:25 INFO BlockManagerInfo: Added broadcast_2_piece0 in memory on localhost:60268 (size: 2.3 KB, free: 265.4 MB)
15/07/12 21:39:25 INFO SparkContext: Created broadcast 2 from broadcast at DAGScheduler.scala:874
15/07/12 21:39:25 INFO DAGScheduler: Submitting 3 missing tasks from ShuffleMapStage 0 (MapPartitionsRDD[7] at map at <console>:23)
15/07/12 21:39:25 INFO TaskSchedulerImpl: Adding task set 0.0 with 3 tasks
15/07/12 21:39:25 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, localhost, ANY, 1406 bytes)
15/07/12 21:39:25 INFO TaskSetManager: Starting task 1.0 in stage 0.0 (TID 1, localhost, ANY, 1406 bytes)
15/07/12 21:39:25 INFO Executor: Running task 1.0 in stage 0.0 (TID 1)
15/07/12 21:39:25 INFO Executor: Running task 0.0 in stage 0.0 (TID 0)
15/07/12 21:39:25 INFO HadoopRDD: Input split: hdfs://9.125.73.217:9000/hbase/hbase.version:0+3
15/07/12 21:39:25 INFO HadoopRDD: Input split: hdfs://9.125.73.217:9000/hbase/hbase.version:3+3
15/07/12 21:39:25 INFO deprecation: mapred.tip.id is deprecated. Instead, use mapreduce.task.id
15/07/12 21:39:25 INFO deprecation: mapred.task.id is deprecated. Instead, use mapreduce.task.attempt.id
15/07/12 21:39:25 INFO deprecation: mapred.task.partition is deprecated. Instead, use mapreduce.task.partition
15/07/12 21:39:25 INFO deprecation: mapred.job.id is deprecated. Instead, use mapreduce.job.id
15/07/12 21:39:25 INFO deprecation: mapred.task.is.map is deprecated. Instead, use mapreduce.task.ismap
15/07/12 21:39:25 INFO Executor: Finished task 0.0 in stage 0.0 (TID 0). 2003 bytes result sent to driver
15/07/12 21:39:25 INFO Executor: Finished task 1.0 in stage 0.0 (TID 1). 2003 bytes result sent to driver
15/07/12 21:39:25 INFO TaskSetManager: Starting task 2.0 in stage 0.0 (TID 2, localhost, ANY, 1406 bytes)
15/07/12 21:39:25 INFO Executor: Running task 2.0 in stage 0.0 (TID 2)
15/07/12 21:39:25 INFO TaskSetManager: Finished task 1.0 in stage 0.0 (TID 1) in 162 ms on localhost (1/3)
15/07/12 21:39:25 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 179 ms on localhost (2/3)
15/07/12 21:39:25 INFO HadoopRDD: Input split: hdfs://9.125.73.217:9000/hbase/hbase.version:6+1
15/07/12 21:39:25 INFO Executor: Finished task 2.0 in stage 0.0 (TID 2). 2003 bytes result sent to driver
15/07/12 21:39:25 INFO DAGScheduler: ShuffleMapStage 0 (map at <console>:23) finished in 0.205 s
15/07/12 21:39:25 INFO DAGScheduler: looking for newly runnable stages
15/07/12 21:39:25 INFO DAGScheduler: running: Set()
15/07/12 21:39:25 INFO DAGScheduler: waiting: Set(ResultStage 1)
15/07/12 21:39:25 INFO DAGScheduler: failed: Set()
15/07/12 21:39:25 INFO DAGScheduler: Missing parents for ResultStage 1: List()
15/07/12 21:39:25 INFO DAGScheduler: Submitting ResultStage 1 (ShuffledRDD[8] at reduceByKey at <console>:23), which is now runnable
15/07/12 21:39:25 INFO TaskSetManager: Finished task 2.0 in stage 0.0 (TID 2) in 25 ms on localhost (3/3)
15/07/12 21:39:25 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool
15/07/12 21:39:25 INFO MemoryStore: ensureFreeSpace(2288) called with curMem=303987, maxMem=278302556
15/07/12 21:39:25 INFO MemoryStore: Block broadcast_3 stored as values in memory (estimated size 2.2 KB, free 265.1 MB)
15/07/12 21:39:25 INFO MemoryStore: ensureFreeSpace(1377) called with curMem=306275, maxMem=278302556
15/07/12 21:39:25 INFO MemoryStore: Block broadcast_3_piece0 stored as bytes in memory (estimated size 1377.0 B, free 265.1 MB)
15/07/12 21:39:25 INFO BlockManagerInfo: Added broadcast_3_piece0 in memory on localhost:60268 (size: 1377.0 B, free: 265.4 MB)
15/07/12 21:39:25 INFO SparkContext: Created broadcast 3 from broadcast at DAGScheduler.scala:874
15/07/12 21:39:25 INFO DAGScheduler: Submitting 3 missing tasks from ResultStage 1 (ShuffledRDD[8] at reduceByKey at <console>:23)
15/07/12 21:39:25 INFO TaskSchedulerImpl: Adding task set 1.0 with 3 tasks
15/07/12 21:39:25 INFO TaskSetManager: Starting task 0.0 in stage 1.0 (TID 3, localhost, PROCESS_LOCAL, 1165 bytes)
15/07/12 21:39:25 INFO TaskSetManager: Starting task 1.0 in stage 1.0 (TID 4, localhost, PROCESS_LOCAL, 1165 bytes)
15/07/12 21:39:25 INFO Executor: Running task 0.0 in stage 1.0 (TID 3)
15/07/12 21:39:25 INFO Executor: Running task 1.0 in stage 1.0 (TID 4)
15/07/12 21:39:25 INFO ShuffleBlockFetcherIterator: Getting 1 non-empty blocks out of 3 blocks
15/07/12 21:39:25 INFO ShuffleBlockFetcherIterator: Getting 1 non-empty blocks out of 3 blocks
15/07/12 21:39:25 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 7 ms
15/07/12 21:39:25 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 8 ms
15/07/12 21:39:25 INFO Executor: Finished task 1.0 in stage 1.0 (TID 4). 1031 bytes result sent to driver
15/07/12 21:39:25 INFO Executor: Finished task 0.0 in stage 1.0 (TID 3). 1029 bytes result sent to driver
15/07/12 21:39:25 INFO TaskSetManager: Starting task 2.0 in stage 1.0 (TID 5, localhost, PROCESS_LOCAL, 1165 bytes)
15/07/12 21:39:25 INFO Executor: Running task 2.0 in stage 1.0 (TID 5)
15/07/12 21:39:25 INFO TaskSetManager: Finished task 0.0 in stage 1.0 (TID 3) in 47 ms on localhost (1/3)
15/07/12 21:39:25 INFO ShuffleBlockFetcherIterator: Getting 0 non-empty blocks out of 3 blocks
15/07/12 21:39:25 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 0 ms
15/07/12 21:39:25 INFO TaskSetManager: Finished task 1.0 in stage 1.0 (TID 4) in 46 ms on localhost (2/3)
15/07/12 21:39:25 INFO Executor: Finished task 2.0 in stage 1.0 (TID 5). 882 bytes result sent to driver
15/07/12 21:39:25 INFO TaskSetManager: Finished task 2.0 in stage 1.0 (TID 5) in 6 ms on localhost (3/3)
15/07/12 21:39:25 INFO TaskSchedulerImpl: Removed TaskSet 1.0, whose tasks have all completed, from pool
15/07/12 21:39:25 INFO DAGScheduler: ResultStage 1 (collect at <console>:26) finished in 0.043 s
15/07/12 21:39:25 INFO DAGScheduler: Job 0 finished: collect at <console>:26, took 0.352074 s
res1: Array[(String, Int)] = Array((?8,1), (PBUF,1))

scala>

Apache Spark 1.4 读取 hadoop 2.6 文件系统上文件

标签:

原文地址:http://www.cnblogs.com/fm365/p/4641600.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!