暂无说说

DataFrame的创建

pyspark jiajun 8个月前 (03-03) 122次浏览 0个评论 扫描二维码

从 Spark2.0 以上版本开始,Spark 使用全新的 SparkSession 接口替代 Spark1.6 中的 SQLContext 及 HiveContext 接口来实现其对数据加载、转换、处理等功能。SparkSession 实现了 SQLContext 及 HiveContext 所有功能。

SparkSession 支持从不同的数据源加载数据,并把数据转换成 DataFrame,并且支持把 DataFrame 转换成 SQLContext 自身中的表,然后使用 SQL 语句来操作数据。SparkSession 亦提供了 HiveQL 以及其他依赖于 Hive 的功能的支持。

下面我们就介绍如何使用 SparkSession 来创建 DataFrame。

测试数据

Spark 已经为我们提供了几个样例数据,就保存在 spark 安装目录下的“examples/src/main/resources/”子目录下,这个目录下有两个样例数据 people.json 和 people.txt。
people.json 文件的内容如下:

{"name":"Michael"}
{"name":"Andy", "age":30}
{"name":"Justin", "age":19}

people.txt 文件的内容如下:

Michael, 29
Andy, 30
Justin, 19

读取数据

下面介绍如何从 people.json 跟 people.txt 文件中读取数据成 DataFrame 并显示数据。

1、从 people.json 读取数据

from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
df = spark.read.json("file:///soft/spark/examples/src/main/resources/people.json")
df.show()

输出结果:

+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+

2、从 people.txt 中读取数据

from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
df = spark.read.csv("file:///soft/spark/examples/src/main/resources/people.txt")
df.show()

输出结果

+-------+---+
|    _c0|_c1|
+-------+---+
|Michael| 29|
|   Andy| 30|
| Justin| 19|
+-------+---+

常用的 DataFrame 操作

现在,我们可以执行一些常用的 DataFrame 操作。

// 打印模式信息
>>> df.printSchema()
root
 |-- age: long (nullable = true)
 |-- name: string (nullable = true)
 
// 选择多列
>>> df.select(df.name,df.age + 1).show()
+-------+---------+
|   name|(age + 1)|
+-------+---------+
|Michael|     null|
|   Andy|       31|
| Justin|       20|
+-------+---------+
 
// 条件过滤
>>> df.filter(df.age > 20 ).show()
+---+----+
|age|name|
+---+----+
| 30|Andy|
+---+----+
 
// 分组聚合
>>> df.groupBy("age").count().show()
+----+-----+
| age|count|
+----+-----+
|  19|    1|
|null|    1|
|  30|    1|
+----+-----+
 
// 排序
>>> df.sort(df.age.desc()).show()
+----+-------+
| age|   name|
+----+-------+
|  30|   Andy|
|  19| Justin|
|null|Michael|
+----+-------+
 
//多列排序
>>> df.sort(df.age.desc(), df.name.asc()).show()
+----+-------+
| age|   name|
+----+-------+
|  30|   Andy|
|  19| Justin|
|null|Michael|
+----+-------+
 
//对列进行重命名
>>> df.select(df.name.alias("username"),df.age).show()
+--------+----+
|username| age|
+--------+----+
| Michael|null|
|    Andy|  30|
|  Justin|  19|
+--------+----+

更多操作,请参考:https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame

喜欢 (0)
发表我的评论
取消评论

表情 贴图 加粗 删除线 居中 斜体 签到

Hi,您需要填写昵称和邮箱!

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址