暂无说说

键值对RDD

pyspark jiajun 8个月前 (03-02) 115次浏览 0个评论 扫描二维码

虽然 RDD 中可以包含任何类型的对象,但是“键值对”是一种比较常见的 RDD 元素类型,分组和聚合操作中经常会用到。
Spark 操作中经常会用到“键值对 RDD”(Pair RDD),用于完成聚合计算。普通 RDD 里面存储的数据类型是 Int、String 等,而“键值对 RDD”里面存储的数据类型是“键值对”。

键值对 RDD 的创建

第一种创建方式:从文件中加载

可以采用多种方式创建键值对 RDD,其中一种主要方式是使用 map()函数来实现,如下:

lines=sc.textFile("file:///soft/spark/README.md")
pairRDD = lines.flatMap(lambda line : line.split(" ")).map(lambda word : (word,1))
pairRDD.collect()

第二种创建方式:通过并行集合(列表)创建 RDD

list = ["Hadoop","Spark","Hive","Spark"]
rdd = sc.parallelize(list)
pairRDD = rdd.map(lambda word : (word,1))
pairRDD.collect()

常用的键值对转换操作

常用的键值对转换操作包括 reduceByKey()、groupByKey()、sortByKey()、join()、cogroup()等,下面我们通过实例来介绍。

reduceByKey(func)

reduceByKey(func)的功能是,使用 func 函数合并具有相同键的值。比如,reduceByKey((a,b) => a+b),有四个键值对(“spark”,1)、(“spark”,2)、(“hadoop”,3)和(“hadoop”,5),对具有相同 key 的键值对进行合并后的结果就是:(“spark”,3)、(“hadoop”,8)。可以看出,(a,b) => a+b 这个 Lamda 表达式中,a 和 b 都是指 value,比如,对于两个具有相同 key 的键值对(“spark”,1)、(“spark”,2),a 就是 1,b 就是 2。

list = ["Hadoop","Spark","Hive","Spark"]
rdd = sc.parallelize(list)
pairRDD = rdd.map(lambda word : (word,1))
pairRDD.reduceByKey(lambda a,b : a+b).collect()

groupByKey()

groupByKey()的功能是,对具有相同键的值进行分组。比如,对四个键值对(“spark”,1)、(“spark”,2)、(“hadoop”,3)和(“hadoop”,5),采用 groupByKey()后得到的结果是:(“spark”,(1,2))和(“hadoop”,(3,5))。

list = ["Hadoop","Spark","Hive","Spark"]
rdd = sc.parallelize(list)
pairRDD = rdd.map(lambda word : (word,1))
pairRDD.groupByKey().collect()

keys()

keys()只会把键值对 RDD 中的 key 返回形成一个新的 RDD。比如,对四个键值对(“spark”,1)、(“spark”,2)、(“hadoop”,3)和(“hadoop”,5)构成的 RDD,采用 keys()后得到的结果是一个 RDD[Int],内容是{“spark”,”spark”,”hadoop”,”hadoop”}。

list = ["Hadoop","Spark","Hive","Spark"]
rdd = sc.parallelize(list)
pairRDD = rdd.map(lambda word : (word,1))
pairRDD.keys().collect()pairRDD.keys().collect()

values()

values()只会把键值对 RDD 中的 value 返回形成一个新的 RDD。比如,对四个键值对(“spark”,1)、(“spark”,2)、(“hadoop”,3)和(“hadoop”,5)构成的 RDD,采用 values()后得到的结果是一个 RDD[Int],内容是{1,2,3,5}。

list = ["Hadoop","Spark","Hive","Spark"]
rdd = sc.parallelize(list)
pairRDD = rdd.map(lambda word : (word,1))
pairRDD.values().collect()

sortByKey()

sortByKey()的功能是返回一个根据键排序的 RDD。

list = ["Hadoop","Spark","Hive","Spark"]
rdd = sc.parallelize(list)
pairRDD = rdd.map(lambda word : (word,1))
pairRDD.sortByKey().collect()

mapValues(func)

我们经常会遇到一种情形,我们只想对键值对 RDD 的 value 部分进行处理,而不是同时对 key 和 value 进行处理。对于这种情形,Spark 提供了 mapValues(func),它的功能是,对键值对 RDD 中的每个 value 都应用一个函数,但是,key 不会发生变化。比如,对四个键值对(“spark”,1)、(“spark”,2)、(“hadoop”,3)和(“hadoop”,5)构成的 pairRDD,如果执行 pairRDD.mapValues(lambda x : x+1),就会得到一个新的键值对 RDD,它包含下面四个键值对(“spark”,2)、(“spark”,3)、(“hadoop”,4)和(“hadoop”,6)。

list = ["Hadoop","Spark","Hive","Spark"]
rdd = sc.parallelize(list)
pairRDD = rdd.map(lambda word : (word,1))
pairRDD.mapValues(lambda x : x+1).collect()pairRDD.mapValues(lambda x : x+1).collect()

join

join(连接)操作是键值对常用的操作。“连接”(join)这个概念来自于关系数据库领域,因此,join 的类型也和关系数据库中的 join 一样,包括内连接(join)、左外连接(leftOuterJoin)、右外连接(rightOuterJoin)等。最常用的情形是内连接,所以,join 就表示内连接。
对于内连接,对于给定的两个输入数据集(K,V1)和(K,V2),只有在两个数据集中都存在的 key 才会被输出,最终得到一个(K,(V1,V2))类型的数据集。

比如,pairRDD1 是一个键值对集合{(“spark”,1)、(“spark”,2)、(“hadoop”,3)和(“hadoop”,5)},pairRDD2 是一个键值对集合{(“spark”,”fast”)},那么,pairRDD1.join(pairRDD2)的结果就是一个新的 RDD,这个新的 RDD 是键值对集合{(“spark”,1,”fast”),(“spark”,2,”fast”)}。

pairRDD1 = sc.parallelize([('spark',1),('spark',2),('hadoop',3),('hadoop',5)])
pairRDD2 = sc.parallelize([('spark','fast')])
pairRDD1.join(pairRDD2).collect()

综合实例

1、给定一组键值对(“spark”,2),(“hadoop”,6),(“hadoop”,4),(“spark”,6),键值对的 key 表示图书名称,value 表示某天图书销量,请计算每个键对应的平均值,也就是计算每种图书的每天平均销量。
很显然,对于上面的题目,结果是很显然的,(“spark”,4),(“hadoop”,5)。

下面,我们在pyspark中演示代码执行过程:

rdd = sc.parallelize([("spark",2),("hadoop",6),("hadoop",4),("spark",6)])
rdd.mapValues(lambda x:(x,1)).reduceByKey(lambda x,y:(x[0]+y[0],x[1]+y[1])).mapValues(lambda x:x[0]/x[1]).collect()

 

喜欢 (1)
发表我的评论
取消评论

表情 贴图 加粗 删除线 居中 斜体 签到

Hi,您需要填写昵称和邮箱!

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址