暂无说说

spark文件数据读写

pyspark jiajun 8个月前 (03-02) 118次浏览 0个评论 扫描二维码

除了可以对本地文件系统进行读写以外,Spark 还支持很多其他常见的文件格式(如文本文件、JSON、SequenceFile 等)和文件系统(如 HDFS、Amazon S3 等)和数据库(如 MySQL、HBase、Hive 等)。这里只介绍文件系统的读写和不同文件格式的读写。

本地文件系统的数据读写

1、读取 spark 安装目录下的 README.md 文件

textFile = sc.textFile("file:///soft/spark/README.md")

上面代码中,sc.textFile()中的这个 textFile 是 sc 的一个方法名称,这个方法用来加载文件数据。这两个 textFile 不是一个东西,不要混淆。实际上,等号前面的是变量名 textFile,完全可以换个变量名称,比如,lines = sc.textFile("file:///soft/spark/README.md")。这里使用相同名称,就是有意强调二者的区别。
注意,要加载本地文件,必须采用“file:///”开头的这种格式。执行上面这条命令以后,并不会马上显示结果,因为,Spark 采用惰性机制,只有遇到“行动”类型的操作,才会从头到尾执行所有操作。所以,下面执行一条“行动”类型的语句,就可以看到结果:

textFile.first()

first()是一个“行动”(Action)类型的操作,会启动真正的计算过程,从文件中加载数据到变量 textFile 中,并取出第一行文本。屏幕上会显示很多反馈信息,这里不再给出,你可以从这些结果信息中,找到 word.txt 文件中的第一行的内容。

正因为 Spark 采用了惰性机制,在执行转换操作的时候,即使我们输入了错误的语句,pyspark也不会马上报错,而是等到执行“行动”类型的语句时启动真正的计算,那个时候“转换”操作语句中的错误就会显示出来,比如:

textFile = sc.textFile("file:///soft/spark/README1111.md")

上面我们使用了一个根本就不存在的 word123.txt,执行上面语句时,pyspark根本不会报错,因为,没有遇到“行动”类型的 first()操作之前,这个加载操作时不会真正执行的。然后,我们执行一个“行动”类型的操作 first(),如下:

textFile.first()

执行上面语句后,会出现 org.apache.hadoop.mapred.InvalidInputException: Input path does not exist: file:/soft/spark/README1111.md 报错。因为,这个 README1111.md 文件根本就不存在。

2、将 textFile 变量中的内容再次写回到另外一个文本文件 readbak.md 中。

textFile = sc.textFile("file:///soft/spark/README.md")
textFile.saveAsTextFile("file:///home/hadoop/readbak.md")

saveAsTextFile()是一个“行动”(Action)类型的操作,所以,马上会执行真正的计算过程,从 README.md 中加载数据到变量 textFile 中,然后,又把 textFile 中的数据写回到 readbak.md 中。现在我们到/home/hadoop 目录看一下,会发现,确实多了一个 readbak.md,但是,和我们预期的不一样,它不是一个文件,而是一个文件夹(readbak.md 作为文件夹名称当然是没有问题的,虽然不符合我们平时的习惯)。查看文件夹 readbak.md 的内容

cd /home/hadoop/readbak.md/
ls
part-00000  part-00001  _SUCCESS

也就是说,该目录下包含两个文件,我们可以使用 cat 命令查看一下 part-00000 文件(注意:part-后面是五个零):

cat part-00000

显示结果,是和 README.md 中的内容一样的。
如果想再次把数据加载在 RDD 中,只要使用 writeback.txt 这个目录即可,如下:

lines = sc.textFile("file:///home/hadoop/readbak.md")
lines.first()

分布式文件系统 HDFS 的数据读写

1、将 spark 安装目录下的 README.md 上传到 hdfs 中

hdfs dfs -put /soft/spark/README.md /user/hadoop/

查看

hdfs dfs -ls .

可以看到,确实多了一个 README.md 文件,使用 cat 命令查看一个 HDFS 中的 README.md 文件的内容,命令如下:

hdfs dfs -cat ./README.md

上面命令执行后,就会看到 HDFS 中 README.md 的内容了。

2、从 HDFS 中加载 README.md 文件,并显示第一行文本内容:

lines = sc.textFile("hdfs://hadoop:9000/user/hadoop/README.md")
lines.first()

执行上面语句后,就可以看到 HDFS 文件系统中(不是本地文件系统)的 README.md 的第一行内容了。

需要注意的是,sc.textFile("hdfs://hadoop:9000/user/hadoop/README.md")中,“hdfs://hadoop:9000/”是前面介绍 Hadoop 安装内容时确定下来的端口地址 9000。实际上,也可以省略不写,如下三条语句都是等价的:

lines = sc.textFile("hdfs://hadoop:9000/user/hadoop/README.md")
lines = sc.textFile("/user/hadoop/README.md")
lines = sc.textFile("README.md")

把 lines 的内容写回到 HDFS 文件系统中(写到 hadoop 用户目录下):

lines.saveAsTextFile("writeback.txt")

执行上面命令后,文本内容会被写入到 HDFS 文件系统的“/user/hadoop/writeback.txt”目录下,可以通过下面命令查看

hdfs dfs -ls .

执行上述命令后,在执行结果中,可以看到有个 writeback.txt 目录,下面我们查看该目录下有什么文件:

hdfs dfs -ls ./writeback.txt
Found 3 items
-rw-r--r--   1 hadoop supergroup          0 2019-03-02 17:29 writeback.txt/_SUCCESS
-rw-r--r--   1 hadoop supergroup       2030 2019-03-02 17:29 writeback.txt/part-00000
-rw-r--r--   1 hadoop supergroup       1922 2019-03-02 17:29 writeback.txt/part-00001

输出 part-00000 文件的内容(注意:part-00000 里面有五个零):

hdfs dfs -cat ./writeback.txt/part-00000

执行结果中,就可以看到和 README.md 文件中一样的文本内容。

当需要再次把 writeback.txt 中的内容加载到 RDD 中时,只需要加载 writeback.txt 目录即可,不需要使用 part-00000 文件,如下所示:

textFile = sc.textFile("hdfs://hadoop:9000/user/hadoop/writeback.txt")

不同文件格式的读写

1、文本文件

把本地文件系统中的文本文件加载到 RDD 中的语句如下:

rdd = sc.textFile("file:///usr/local/spark/mycode/wordcount/word.txt")

当我们给 textFile()函数传递一个“包含完整路径的文件名”时,就会把这个文件加载到 RDD 中。如果我们给 textFile()函数传递的不是文件名,而是一个目录,则该目录下的所有文件内容都会被读取到 RDD 中。

把 RDD 中的数据保存到文本文件,可以采用如下语句:

rdd.saveAsTextFile("file:///usr/local/spark/mycode/wordcount/outputFile")

正像上面我们已经介绍的那样,我们在 saveAsTextFile()函数的参数中给出的是目录,不是文件名,RDD 中的数据会被保存到给定的目录下。

2、JSON

JSON(JavaScript Object Notation) 是一种轻量级的数据交换格式。
Spark 提供了一个 JSON 样例数据文件,存放在“/soft/spark/examples/src/main/resources/people.json”中。people.json 文件的内容如下:

{"name":"Michael"}
{"name":"Andy", "age":30}
{"name":"Justin", "age":19}

读取 people.json 文件

jsonStr = sc.textFile("file:///soft/spark/examples/src/main/resources/people.json")
jsonStr.foreach(print)
{"name":"Michael"}
{"name":"Andy", "age":30}
{"name":"Justin", "age":19}

从上面执行结果可以看出,people.json 文件加载到 RDD 中以后,在 RDD 中存在三个字符串。我们下面要做的事情,就是把这三个 JSON 格式的字符串解析出来,比如说,第一个字符串{“name”:”Michael”},经过解析后,解析得到 key 是”name”,value 是”Michael”。

import json
jsonStrs = sc.textFile("file:///soft/spark/examples/src/main/resources/people.json")
result = jsonStrs.map(lambda s : json.loads(s))
result.collect()

运行后出现下面结果

[{'name': 'Michael'},
 {'name': 'Andy', 'age': 30},
 {'name': 'Justin', 'age': 19}]

在 python 脚本中运行上面内容

vi jsontest.py

from pyspark import SparkContext
import json
sc = SparkContext('local','JSONAPP')
inputFile =  "file:///soft/spark/examples/src/main/resources/people.json"
jsonStrs = sc.textFile(inputFile)
result = jsonStrs.map(lambda s : json.loads(s))
result.foreach(print) 

保存后运行 python3 jsontest.py

喜欢 (1)
发表我的评论
取消评论

表情 贴图 加粗 删除线 居中 斜体 签到

Hi,您需要填写昵称和邮箱!

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址