暂无说说

RDD编程

pyspark jiajun 6个月前 (03-02) 73次浏览 0个评论 扫描二维码

RDD 概念

RDD(Resilient Distributed Dataset)弹性分布式数据集,是 Spark 中最基本的数据抽象,它代表一个不可变、可分区、里面的元素可并行计算的集合。RDD 具有数据流模型的特点:自动容错、位置感知性调度和可伸缩性。RDD 允许用户在执行多个查询时显式地将工作集缓存在内存中,后续的查询能够重用工作集,这极大地提升了查询速度。

RDD 的属性

(1)一组分片(Partition),即数据集的基本组成单位。对于 RDD 来说,每个分片都会被一个计算任务处理,并决定并行计算的粒度。用户可以在创建 RDD 时指定 RDD 的分片个数,如果没有指定,那么就会采用默认值。默认值就是程序所分配到的 CPU Core 的数目。

(2)一个计算每个分区的函数。Spark 中 RDD 的计算是以分片为单位的,每个 RDD 都会实现 compute 函数以达到这个目的。compute 函数会对迭代器进行复合,不需要保存每次计算的结果。

(3)RDD 之间的依赖关系。RDD 的每次转换都会生成一个新的 RDD,所以 RDD 之间就会形成类似于流水线一样的前后依赖关系。在部分分区数据丢失时,Spark 可以通过这个依赖关系重新计算丢失的分区数据,而不是对 RDD 的所有分区进行重新计算。

(4)一个 Partitioner,即 RDD 的分片函数。当前 Spark 中实现了两种类型的分片函数,一个是基于哈希的 HashPartitioner,另外一个是基于范围的 RangePartitioner。只有对于于 key-value 的 RDD,才会有 Partitioner,非 key-value 的 RDD 的 Parititioner 的值是 None。Partitioner 函数不但决定了 RDD 本身的分片数量,也决定了 parent RDD Shuffle 输出时的分片数量。

(5)一个列表,存储存取每个 Partition 的优先位置(preferred location)。对于一个 HDFS 文件来说,这个列表保存的就是每个 Partition 所在的块的位置。按照“移动数据不如移动计算”的理念,Spark 在进行任务调度的时候,会尽可能地将计算任务分配到其所要处理数据块的存储位置。

RDD 创建

1、RDD 可以通过三种方式创建:
第一种:读取一个外部数据集。比如,从本地文件加载数据集,或者从 HDFS 文件系统、HBase、Cassandra、Amazon S3 等外部数据源中加载数据集。Spark 可以支持文本文件、SequenceFile 文件(Hadoop 提供的 SequenceFile 是一个由二进制序列化过的 key/value 的字节流组成的文本存储文件)和其他符合 Hadoop InputFormat 格式的文件。

第二种:调用 SparkContext 的 parallelize 方法,在 Driver 中一个已经存在的集合(数组)上创建。

第三种:RDD 可以通过其他的 RDD 转换而来

从文件系统中加载数据创建 RDD

Spark 采用 textFile()方法来从文件系统中加载数据创建 RDD,该方法把文件的 URI 作为参数,这个 URI 可以是本地文件系统的地址,或者是分布式文件系统 HDFS 的地址,或者是 Amazon S3 的地址等等。

#从本地文件系统读取数据
lines = sc.textFile("file:///home/hadoop/sample_data.csv")
lines.count()

如果使用了本地文件系统的路径,必须要保证在所有的 worker 节点上,也都能够采用相同的路径访问到该文件,比如,可以把该文件拷贝到每个 worker 节点上,或者也可以使用网络挂载共享文件系统。

#上传文件到 hdfs
hdfs dfs -put word.txt /user/hadoop
#从 hdfs 上读取文件,以下三条命令等价
lines = sc.textFile("hdfs://hadoop:9000/user/hadoop/word.txt")
lines = sc.textFile("/user/hadoop/word.txt")
lines = sc.textFile("word.txt")

在使用 Spark 读取文件时,需要说明以下几点:

(1)textFile()方法的输入参数,可以是文件名,也可以是目录,也可以是压缩文件等。比如,textFile(“/my/directory”), textFile(“/my/directory/.txt”), and textFile(“/my/directory/.gz”).
(2)textFile()方法也可以接受第 2 个输入参数(可选),用来指定分区的数目。默认情况下,Spark 会为 HDFS 的每个 block 创建一个分区(HDFS 中每个 block 默认是 128MB)。你也可以提供一个比 block 数量更大的值作为分区数目,但是,你不能提供一个小于 block 数量的值作为分区数目。

通过并行集合(数组)创建 RDD

可以调用 SparkContext 的 parallelize 方法,在 Driver 中一个已经存在的集合(数组)上创建,如:

nums = range(1,5)
rdd = sc.parallelize(nums)
rdd.sum()

RDD 操作

RDD 被创建好以后,在后续使用过程中一般会发生两种操作:

转换(Transformation): 基于现有的数据集创建一个新的数据集。

行动(Action):在数据集上进行运算,返回计算值。

1、转换(Transformation)操作

对于 RDD 而言,每一次转换操作都会产生不同的 RDD,供给下一个“转换”使用。转换得到的 RDD 是惰性求值的,也就是说,整个转换过程只是记录了转换的轨迹,并不会发生真正的计算,只有遇到行动操作时,才会发生真正的计算,开始从血缘关系源头开始,进行物理的转换操作。
下面列出一些常见的转换操作(Transformation API):

filter(func):筛选出满足函数 func 的元素,并返回一个新的数据集
map(func):将每个元素传递到函数 func 中,并将结果返回为一个新的数据集
flatMap(func):与 map()相似,但每个输入元素都可以映射到 0 或多个输出结果
groupByKey():应用于(K,V)键值对的数据集时,返回一个新的(K, Iterable)形式的数据集
reduceByKey(func):应用于(K,V)键值对的数据集时,返回一个新的(K, V)形式的数据集,其中的每个值是将每个 key 传递到函数 func 中进行聚合。

更多请参考:http://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.SparkContext

2、行动(Action)操作

行动操作是真正触发计算的地方。Spark 程序执行到行动操作时,才会执行真正的计算,从文件中加载数据,完成一次又一次转换操作,最终,完成行动操作得到结果。
下面列出一些常见的行动操作(Action API):
 count() 返回数据集中的元素个数
collect() 以数组的形式返回数据集中的所有元素
first() 返回数据集中的第一个元素
take(n) 以数组的形式返回数据集中的前 n 个元素
reduce(func) 通过函数 func(输入两个参数并返回一个值)聚合数据集中的元素
foreach(func) 将数据集中的每个元素传递到函数 func 中运行

更多请参考:http://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.RDD

惰性机制说明

这里给出一段简单的代码来解释 Spark 的惰性机制。

lines = sc.textFile("data.txt")
lineLengths = lines.map(lambda s : len(s))
totalLength = lineLengths.reduce( lambda a, b : a + b)

第一行首先从外部文件 data.txt 中构建得到一个 RDD,名称为 lines,但是,由于 textFile()方法只是一个转换操作,因此,这行代码执行后,不会立即把 data.txt 文件加载到内存中,这时的 lines 只是一个指向这个文件的指针。
第二行代码用来计算每行的长度(即每行包含多少个单词),同样,由于 map()方法只是一个转换操作,这行代码执行后,不会立即计算每行的长度。
第三行代码的 reduce()方法是一个“动作”类型的操作,这时,就会触发真正的计算。这时,Spark 会把计算分解成多个任务在不同的机器上执行,每台机器运行位于属于它自己的 map 和 reduce,最后把结果返回给 Driver Program。

持久化

在 Spark 中,RDD 采用惰性求值的机制,每次遇到 action 操作,都会从头开始执行计算。如果整个 Spark 程序中只有一次 action 操作,这当然不会有什么问题。但是,在一些情形下,我们需要多次调用不同的 action 操作,这就意味着,每次调用 action 操作,都会触发一次从头开始的计算。这对于迭代计算而言,代价是很大的,迭代计算经常需要多次重复使用同一组数据。
比如,下面就是多次计算同一个 RDD 的例子:

list = ["Hadoop","Spark","Hive"]
rdd = sc.parallelize(list)
print(rdd.count())   #action 操作,触发一次真正从头到尾的计算
print(','.join(rdd.collect()))   #action 操作,触发一次真正从头到尾的计算

上面代码执行过程中,前后共触发了两次从头到尾的计算。
实际上,可以通过持久化(缓存)机制避免这种重复计算的开销。可以使用 persist()方法对一个 RDD 标记为持久化,之所以说“标记为持久化”,是因为出现 persist()语句的地方,并不会马上计算生成 RDD 并把它持久化,而是要等到遇到第一个 action 操作触发真正计算以后,才会把计算结果进行持久化,持久化后的 RDD 将会被保留在计算节点的内存中被后面的 action 操作重复使用。
persist()的圆括号中包含的是持久化级别参数,比如,persist(MEMORY_ONLY)表示将 RDD 作为反序列化的对象存储于 JVM 中,如果内存不足,就要按照 LRU 原则替换缓存中的内容。persist(MEMORY_AND_DISK)表示将 RDD 作为反序列化的对象存储在 JVM 中,如果内存不足,超出的分区将会被存放在硬盘上。一般而言,使用 cache()方法时,会调用 persist(MEMORY_ONLY)。
例子如下:

list = ["Hadoop","Spark","Hive"]
rdd = sc.parallelize(list)
rdd.cache()    #会调用 persist(MEMORY_ONLY),但是,语句执行到这里,并不会缓存 rdd,这是 rdd 还没有被计算生成
print(rdd.count())   #第一次 action 操作,触发一次真正从头到尾的计算,这时才会执行上面的 rdd.cache(),把这个 rdd 放到缓存中
print(','.join(rdd.collect()))   #第二次 action 操作,不需要触发从头到尾的计算,只需要重复使用上面缓存中的 rdd

最后,可以使用 unpersist()方法手动地把持久化的 RDD 从缓存中移除。

分区

RDD 是弹性分布式数据集,通常 RDD 很大,会被分成很多个分区,分别保存在不同的节点上。RDD 分区的一个分区原则是使得分区的个数尽量等于集群中的 CPU 核心(core)数目。
对于不同的 Spark 部署模式而言(本地模式、Standalone 模式、YARN 模式、Mesos 模式),都可以通过设置 spark.default.parallelism 这个参数的值,来配置默认的分区数目,一般而言:
本地模式:默认为本地机器的 CPU 数目,若设置了 local[N],则默认为 N;
Apache Mesos:默认的分区数为 8;
Standalone 或 YARN:在“集群中所有 CPU 核心数目总和”和“2”二者中取较大值作为默认值;

因此,对于 parallelize 而言,如果没有在方法中指定分区数,则默认为 spark.default.parallelism,比如:

array = [1,2,3,4,5]
rdd = sc.parallelize(array,2) #设置两个分区

对于 textFile 而言,如果没有在方法中指定分区数,则默认为 min(defaultParallelism,2),其中,defaultParallelism 对应的就是 spark.default.parallelism。
如果是从 HDFS 中读取文件,则分区数为文件分片数(比如,128MB/片)。

打印元素

在实际编程中,我们经常需要把 RDD 中的元素打印输出到屏幕上(标准输出 stdout),一般会采用语句 rdd.foreach(print)或者 rdd.map(print)。当采用本地模式(local)在单机上执行时,这些语句会打印出一个 RDD 中的所有元素。但是,当采用集群模式执行时,在 worker 节点上执行打印语句是输出到 worker 节点的 stdout 中,而不是输出到任务控制节点 Driver Program 中,因此,任务控制节点 Driver Program 中的 stdout 是不会显示打印语句的这些输出内容的。为了能够把所有 worker 节点上的打印输出信息也显示到 Driver Program 中,可以使用 collect()方法,比如,rdd.collect().foreach(print),但是,由于 collect()方法会把各个 worker 节点上的所有 RDD 元素都抓取到 Driver Program 中,因此,这可能会导致内存溢出。因此,当你只需要打印 RDD 的部分元素时,可以采用语句 rdd.take(100).foreach(print)。

RDD 操作实例

1、统计包含 Spark 单词行数

lines=sc.textFile("file:///soft/spark/README.md")
lines.filter(lambda line : "Spark" in line).count()

上面的代码中,lines 就是一个 RDD。lines.filter()会遍历 lines 中的每行文本,并对每行文本执行括号中的匿名函数,也就是执行 Lamda 表达式:line : “Spark” in line,在执行 Lamda 表达式时,会把当前遍历到的这行文本内容赋值给参数 line,然后,执行处理逻辑”Spark” in line,也就是只有当改行文本包含“Spark”才满足条件,才会被放入到结果集中。最后,等到 lines 集合遍历结束后,就会得到一个结果集,这个结果集中包含了所有包含“Spark”的行。最后,对这个结果集调用 count(),这是一个行动操作,会计算出结果集中的元素个数。

2、要找出文本文件中单行文本所包含的单词数量的最大值

lines=sc.textFile("file:///soft/spark/README.md")
lines.map(lambda line : len(line.split(" "))).reduce(lambda a,b : (a > b and a or b))

上面代码中,lines 是一个 RDD,是 String 类型的 RDD,因为这个 RDD 里面包含了很多行文本。lines.map(),是一个转换操作,之前说过,map(func):将每个元素传递到函数 func 中,并将结果返回为一个新的数据集,所以,lines.map(lambda line : len(line.split(” “)))会把每行文本都传递给匿名函数,也就是传递给 Lamda 表达式 line : len(line.split(" "))中的 line,然后执行处理逻辑 len(line.split(" "))。len(line.split(" "))这个处理逻辑的功能是,对 line 文本内容进行单词切分,得到很多个单词构成的集合,然后,计算出这个集合中的单词的个数。因此,最终 lines.map(lambda line : len(line.split(" ")))转换操作得到的 RDD,是一个整型 RDD,里面每个元素都是整数值(也就是单词的个数)。最后,针对这个 RDD[Int],调用 reduce()行动操作,完成计算。reduce()操作每次接收两个参数,取出较大者留下,然后再继续比较,例如,RDD[Int]中包含了 1,2,3,4,5,那么,执行 reduce 操作时,首先取出 1 和 2,把 a 赋值为 1,把 b 赋值为 2,然后,执行大小判断,保留 2。下一次,让保留下来的 2 赋值给 a,再从 RDD[Int]中取出下一个元素 3,把 3 赋值给 b,然后,对 a 和 b 执行大小判断,保留较大者 3.依此类推。最终,reduce()操作会得到最大值是 5。

 

喜欢 (1)
发表我的评论
取消评论

表情 贴图 加粗 删除线 居中 斜体 签到

Hi,您需要填写昵称和邮箱!

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址