PySpark RDD
RDD
A Resilient Distributed Dataset (
RDD), thebasic abstractionin Spark. Represents animmutable(不可变的),partitioned collection(集合) of elementsthat can beoperated o in parallel. Thisclasscontains the basic operations available on all RDDs, such asmap,filter, andpersist.In addition,
PairRDDFunctions(类)contains operationsavailable onlyonRDDs of key-value pairs, such asgroupByKeyandjoin;
DoubleRDDFunctionscontains operations available only onRDDs of Doubles;
SequenceFileRDDFunctionscontains operations available on RDDs that can be saved asSequenceFiles.All operations are automatically available on any RDD of the right type (e.g. RDD[(Int, Int)]) through implicit(隐式).
Internally, each RDD is characterized byfive main properties:
A list of partitions
A function for computing each split
A list of dependencies on other RDDs
Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned)
Optionally,
a list of preferred locationsto compute eachspliton (e.g. block locations for an HDFS file)All of the scheduling and execution in Spark is done based on these methods, allowing each RDD to implement its own way of computing itself. Indeed(实际上), users can implement custom RDDs (e.g. for reading data from a new storage system) by
overridingthese functions. Please refer to(查阅) the Spark paper for more details on RDD internals.
RDD创建
从已有集合创建
1 | sc.parallelize([1,2,3]) |
从外部数据集创建
1 | 本地文件系统 |
RDD
转换算子
map (映射)
item => item_new
1 | def addOne(item): |
flatMap
首先将函数应用于此RDD的所有元素,然后将结果展平,返回一个新的RDD。
Return a new RDD by first applying a function to all elements of this RDD, and then flattening the results.
- 示例一
1 | data = [(1,2,3),(1,2),(3,)] |
此rdd中有三个元素 [(1,2,3), (1,2), (3,)]
先应用函数
(lambda x : x)对每个元素进行处理(1,2,3) —> (1,2,3)
(1,2) —> (1,2)
(3,) —> (3,)最后将结果拍扁展平,二维数组压扁成一维列表
[(1,2,3),(1,2),(3,)] ===> [1, 2, 3, 1, 2, 3]
- 示例二
1 | data = ['study big data cn','study big data','big data', 'study'] |
首先使用匿名函数,将字符串"study big data cn"拆分成字符串列表['study', 'big', 'data', 'cn'];
然后flatMap压扁。
先看一下去重、条件查询、排序,使用的RDD为:
1 | rdd = sc.parallelize([3,1,3,2]) |
distinct (去重)
1 | distinct_rdd = rdd.distinct() |
sortBy (排序)
- 升序
1 | sort_rdd = rdd.sortBy(lambda x:x) |
- 降序
1 | sort_rdd = rdd.sortBy(lambda x:-x) |
- 根据元素长度排序
1 | data = [(3,2,3),(2,),(1,1)] |
- 根据指定的维度排序
1 | # 使用上一步骤中的RDD |
集合运算
接下来演示一下RDD的交、并、差、笛卡尔积运算,使用的RDD为:
1 | rdd_1 = sc.parallelize([1,2,3]) |
intersection (交集)
1 | intersection_rdd = rdd_1.intersection(rdd_2) |
union (并集)
1 | union_rdd = rdd_1.union(rdd_2) |
subtract (差集)
1 | substract_rdd = rdd_1.subtract(rdd_2) |
cartesian (笛卡尔积)
1 | cartesian_rdd = rdd_1.cartesian(rdd_2) |
关系运算
filter (选择/条件查询/过滤)
1 | filter_rdd = rdd.filter(lambda x:x!=3) |
分区操作
下面是与分区相关的算子,使用的RDD为:
1 | nums = list(range(1,20)) |
glom
联合所有分区元素构造一个列表。
Return an RDD created by coalescing all elements within each partition into a list.
coalesce: [ˌkoʊəˈles] 合并,联合
1 | # 默认2个分区,实验时使用的yarn模式,启动了两个Container,每个Container分配1个核。 |
repartition
返回一个重新分区后的RDD
1 | new_num_rdd = num_rdd.repartition(3) |
mapPartitions
返回值 :迭代器类型
1 | def fun(part_data): |
mapPartitionsWithIndex
此算子传入一个函数对象;
形参 : 函数接收两个参数 (分区编号,指向分区元素的迭代器)
返回值 : 迭代器类型
如: 使用mapPartitionsWithIndex为每个分区的数据加上分区号。
1 | def fun(index, part_iter): |
动作算子
isEmpty
1 | rdd = sc.parallelize([1,2,3]) |
collect
返回一个包含此RDD中所有元素的list列表。
注意:由于所有数据都会加载到Driver的内存中,因此仅当生成的列表预计很小时才应使用此方法,数据过大的话,Driver内存放不下。
Return a list that contains all of the elements in this RDD. note: This method should only be used if the resulting array is expected to be small, as all the data is loaded into the driver's memory.
reduce
使用指定的交换和结合二元操作符reduce此RDD的元素。reduces partitions locally.。
比如加减乘除等二元操作符。
Reduces the elements of this RDD using the specified commutative and associative binary operator. Currently reduces partitions locally.
1 | rdd = sc.parallelize([1,2,3]) |
count
返回此RDD中的元素个数。
Return the number of elements in this RDD.
1 | rdd = sc.parallelize([2,1,3]) |
foreach
将函数应用到此RDD的所有元素。
Applies a function to all elements of this RDD.
1 | # 每个元素的值+1 (执行完控制台无任何输出) |
取值操作
take
获取RDD的前n个元素。
Take the first num elements of the RDD.
1 | rdd = sc.parallelize([2,1,3]) |
takeOrdered
从RDD中按升序或可选键函数指定的顺序获取N个元素。
Get the N elements from an RDD ordered in ascending order or as specified by the optional key function.
1 | # rdd中的元素本来是[2,1,3] |
first
返回此RDD中的第一个元素。
Return the first element in this RDD.
1 | # rdd = [2,1,3] |
top
从RDD中获取前N个元素。
Get the top N elements from an RDD.
1 | # rdd = [2,1,3] |
分区操作
getNumPartitions
Returns the number of partitions in RDD
返回RDD的分区数。
实验采用3台2核心虚拟机,通过Standalone模式启动,可以看到数据分区数默认为机器数*机器逻辑CPU的个数。
1 | # 生成19个数 |
foreachPartition
将函数应用到此RDD的所有分区。
Applies a function to each partition of this RDD.
1 | # 使用上一步的数据 |
持久化
saveAsTextFile
1 | # 使用上一步数据 |
如果一个RDD里面只有三个元素,会保存为几个文件?
1 | three_rdd = sc.parallelize([1,2,3]) |
键值对RDD
转换算子
keys
1 | rdd = sc.parallelize([(1,"zhangsan"),(2,"lisi"),(1,"wangwu"),(1,"zhaoliu")]) |
values
1 | rdd = sc.parallelize([(1,"zhangsan"),(2,"lisi"),(1,"wangwu"),(1,"zhaoliu")]) |
1 | data = [ |
groupByKey
1 |
sortByKey
1 | rdd = sc.parallelize([(1,"zhangsan"),(2,"lisi"),(1,"wangwu"),(1,"zhaoliu")]) |
mapValues
1 | rdd = sc.parallelize([(1,"zhangsan"),(2,"lisi"),(1,"wangwu"),(1,"zhaoliu")]) |
reduceByKey
1 |
flatMapValues
1 | rdd = sc.parallelize([("b",[1,2,3]), ("a",[2,3,4]),("c",[4,6,8])]) |
subtractByKey
1 | rdd1 = sc.parallelize((['a', 3], ['b', 2], ['c', 5], ['b', 3])) |
combineByKey
1 | ``` |
1 |
|
1 |
|
1 |
|
1 |
|
stats
mean
SequenceFileRDDFunctions
saveAsSequenceFile
练习
分区练习
初始化列表 list_p = [1,2,3,4,5]
使用list_p创建分区数为2的RDD ,命名为rdd_p
查看rdd_p的分区数
查看rdd_p各分区内的元素
使用rdd_p创建一个分区数为3的RDD,命名为rdd_rp
查看rdd_rp各分区内的元素
1 | list_p=[1,2,3,4,5] |
1 | www www www www www study study study study big big big data data cn |
1 | rdd = sc.textFile("hdfs:///input/data.txt") |