暂无说说

把RDD转换成DataFrame

pyspark jiajun 8个月前 (03-03) 133次浏览 0个评论 扫描二维码

Spark 官网提供了两种方法来实现从 RDD 转换得到 DataFrame,第一种方法是,利用反射来推断包含特定类型对象的 RDD 的 schema,适用对已知数据结构的 RDD 转换;第二种方法是,使用编程接口,构造一个 schema 并将其应用在已知的 RDD 上。

利用反射机制推断 RDD 模式

在利用反射机制推断 RDD 模式时,会用到 toDF()方法

下面是在pyspark中执行命令以及反馈的信息:

from pyspark.sql.types import Row
def f(x):
    rel = {}
    rel['name'] = x[0]
    rel['age'] = x[1]
    return rel

peopleDF = sc.textFile("file:///soft/spark/examples/src/main/resources/people.txt").map(lambda line:line.split(",")).map(lambda x:Row(**f(x))).toDF()
peopleDF.show()
peopleDF.createOrReplaceTempView("people") #必须注册为临时表才能供下面的查询使用
personsDF = spark.sql("select * from people")
personsDF.rdd.map(lambda t : "Name:"+t[0]+","+"Age:"+t[1]).collect()

使用编程方式定义 RDD 模式

使用 createDataFrame(rdd, schema)编程方式定义 RDD 模式。

from pyspark.sql.types import Row
from pyspark.sql.types import StructType
from pyspark.sql.types import StructField
from pyspark.sql.types import StringType
# 读取文件,生成 RDD
peopleRDD = sc.textFile("file:///soft/spark/examples/src/main/resources/people.txt")
# 定义一个模式字符串
schemaString = "name age"
# 根据模式字符串生成模式
fields = list(map( lambda fieldName : StructField(fieldName, StringType(), nullable = True), schemaString.split(" ")))
schema = StructType(fields)
# 从上面信息可以看出,schema 描述了模式信息,模式中包含 name 和 age 两个字段
rowRDD = peopleRDD.map(lambda line : line.split(',')).map(lambda attributes : Row(attributes[0], attributes[1]))
peopleDF = spark.createDataFrame(rowRDD, schema)
# 必须注册为临时表才能供下面查询使用
peopleDF.createOrReplaceTempView("people")
results = spark.sql("SELECT * FROM people")
results.rdd.map( lambda attributes : "name: " + attributes[0]+","+"age:"+attributes[1]).foreach(print)

在上面的代码中,peopleRDD.map(lambda line : line.split(‘,’))作用是对 people 这个 RDD 中的每一行元素都进行解析。比如,people 这个 RDD 的第一行是:

Michael, 29

这行内容经过 peopleRDD.map(lambda line : line.split(‘,’)).操作后,就得到一个集合{Michael,29}。后面经过 map(lambda attributes : Row(attributes[0], attributes[1]))操作时,这时的 p 就是这个集合{Michael,29},这时 p[0]就是 Micheael,p[1]就是 29,map(lambda attributes : Row(attributes[0], attributes[1]))就会生成一个 Row 对象,这个对象里面包含了两个字段的值,这个 Row 对象就构成了 rowRDD 中的其中一个元素。因为 people 有 3 行文本,所以,最终,rowRDD 中会包含 3 个元素,每个元素都是 org.apache.spark.sql.Row 类型。实际上,Row 对象只是对基本数据类型(比如整型或字符串)的数组的封装,本质就是一个定长的字段数组。
peopleDF = spark.createDataFrame(rowRDD, schema),这条语句就相当于建立了 rowRDD 数据集和模式之间的对应关系,从而我们就知道对于 rowRDD 的每行记录,第一个字段的名称是 schema 中的“name”,第二个字段的名称是 schema 中的“age”。

把 RDD 保存成文件

这里介绍如何把 RDD 保存成文本文件,后面还会介绍其他格式的保存。

1、第 1 种保存方法

#读取文件
peopleDF = spark.read.format("json").load("file:///soft/spark/examples/src/main/resources/people.json")
#保存文件
peopleDF.select("name", "age").write.format("csv").save("file:///home/hadoop/newpeople.csv")

可以看出,这里使用 select(“name”, “age”)确定要把哪些列进行保存,然后调用 write.format(“csv”).save ()保存成 csv 文件。在后面小节中,我们还会介绍其他保存方式。
另外,write.format()支持输出 json,parquet, jdbc, orc, libsvm, csv, text 等格式文件,如果要输出文本文件,可以采用 write.format(“text”),但是,需要注意,只有 select()中只存在一个列时,才允许保存成文本文件,如果存在两个列,比如 select(“name”, “age”),就不能保存成文本文件。

上述过程执行结束后,在 Shell 命令提示符下查看新生成的 newpeople.csv:

cd /home/hadoop/
ls

可以看到/home/hadoop 这个目录下面有个 newpeople.csv 文件夹(注意,不是文件),这个文件夹中包含下面两个文件:

part-00000-eedec405-f649-494d-b58c-643199238ad0-c000.csv  
_SUCCESS

不用理会 _SUCCESS 这个文件,只要看一下 part-00000-eedec405-f649-494d-b58c-643199238ad0-c000.csv 这个文件,可以用 vim 编辑器打开这个文件查看它的内容,该文件内容如下:

Michael,""
Andy,30
Justin,19

如果我们要再次把 newpeople.csv 中的数据加载到 RDD 中,可以直接使用 newpeople.csv 目录名称,而不需要使用 part-00000-eedec405-f649-494d-b58c-643199238ad0-c000.csv 文件,如下:

textFile = sc.textFile("file:///home/hadoop/newpeople.csv")
textFile.foreach(print)

2、第 2 种保存方法

#读取文件
peopleDF = spark.read.format("json").load("file:///soft/spark/examples/src/main/resources/people.json")
#保存文件
peopleDF.rdd.saveAsTextFile("file:///home/hadoop/newpeople.txt")

可以看出,我们是把 DataFrame 转换成 RDD,然后调用 saveAsTextFile()保存成文本文件。在后面小节中,我们还会介绍其他保存方式。

上述过程执行结束后,在 Shell 命令提示符下查看新生成的 newpeople.txt:

cd /home/hadoop/
ls

可以看到/home/hadoop/这个目录下面有个 newpeople.txt 文件夹(注意,不是文件),这个文件夹中包含下面两个文件:

part-00000  
_SUCCESS

不用理会 _SUCCESS 这个文件,只要看一下 part-00000 这个文件,可以用 vim 编辑器打开这个文件查看它的内容,该文件内容如下:

Row(age=None, name='Michael')
Row(age=30, name='Andy')
Row(age=19, name='Justin')

如果我们要再次把 newpeople.txt 中的数据加载到 RDD 中,可以直接使用 newpeople.txt 目录名称,而不需要使用 part-00000 文件,如下:

textFile = sc.textFile("file:///home/hadoop/newpeople.txt")
textFile.foreach(print)

 

喜欢 (0)
发表我的评论
取消评论

表情 贴图 加粗 删除线 居中 斜体 签到

Hi,您需要填写昵称和邮箱!

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址