标签:direct 读取文件 .text path def dfs turn list 遍历
由于业务需要,需要将大量文件按照目录分类的方式存储在HDFS上,这时从HDFS上读取文件就需要使用 sparkcontext.wholeTextFiles(),
众所周知,sc.textFiles(path) 能将path 里的所有文件内容读出,以文件中的每一行作为一条记录的方式:
>>> textFile = sc.textFile(path)
>>> textFile.collect()
[u‘Hello world!‘]
文件的每一行 相当于 列表 的一个元素,因此可以在每个partition中用for i in data的形式遍历处理数据。
而使用 sc.wholeTextFiles()时,
Read a directory of text files from HDFS, a local file system
(available on all nodes), or any Hadoop-supported file system
URI. Each file is read as a single record and returned in a
key-value pair, where the key is the path of each file, the
value is the content of each file.
...
>>> textFiles = sc.wholeTextFiles(dirPath)
>>> sorted(textFiles.collect())
[(u‘.../1.txt‘, u‘1‘), (u‘.../2.txt‘, u‘2‘)]
"""
如上面所示,返回的是[(key, val), (key, val)...]的形式,其中key是文件路径,val是文件内容,这里我们要注意的重点是:
‘‘‘Each file is read as a single record‘‘‘ 这句话,
每个文件作为一个记录!这说明这里的 val 将不再是 list 的方式为你将文件每行拆成一个 list的元素,
而是将整个文本的内容以字符串的形式读进来,也就是说val = ‘...line1...\n...line2...\n‘
这时需要你自己去拆分每行!而如果你还是用for i in val的形式来便利 val那么i得到的将是每个字符
------------以下是亲自实验的截图---
dataRdd = sc.wholeTextFiles(‘hdfs://192.168.7.217:9000/clean/TD_FIRSTPAGE/part-r-*‘)
from cp_analysis.clean.DataClean import printData, printDataPartition
dataRdd.foreach(printData)
dataRdd.foreachPartition(printDataPartition)
def printData(x):
print x[0]
for line in x[1].split(‘\n‘):
print line
def printDataPartition(data):
for x in data:
print x[0]
print ‘-----------------------------------------‘
print x[1]
下面是使用 for ele in X[1]:
print ele
输出的截图:
Spark:sc.textFiles() 与 sc.wholeTextFiles() 的区别
标签:direct 读取文件 .text path def dfs turn list 遍历
原文地址:https://www.cnblogs.com/ivan1026/p/9047726.html