Spark RDD创建☆

1 从集合(内存)中创建RDD

从集合中创建RDD,Spark 主要提供了两个方法:parallelize 和 makeRDD

val rdd1 = sparkContext.parallelize( List(1,2,3,4))  
val rdd2 = sparkContext.makeRDD( List(1,2,3,4))  

2 从外部存储(文件)创建RDD

textFile

val fileRDD: RDD[String] = sparkContext.textFile("input")

3 从其他RDD创建

4 直接创建RDD(new)

RDD 转换算子

RDD 根据数据处理方式的不同将算子整体上分为两种:

  • value
  • key-value

Value数据类型的Transformation算子

1)、map

map是对RDD中的每个元素都执行一个指定函数来产生一个新的RDD

将处理的数据逐条进行映射转换,这里的转换可以是类型的转换,也可以是值的转换

val dataRDD1: RDD[Int] = dataRDD.map(  

num => {  

num * 2  

}  

)  

2) 、mapPartitions

mapPartitions是map的一个变种。map的输入函数应用于RDD中的每个元素,而mapPartitions的输入函数应用于每个分区,也就是把每个分区中的内容作为整体来处理的

val dataRDD1: RDD[Int] = dataRDD.mapPartitions( datas => {  
datas.filter(_==2)  
}  
)

3)、mapPartitionsWithIndex

在处理时同时可以获取当前分区索引

val dataRDD1 = dataRDD.mapPartitionsWithIndex( (index, datas) => {  
datas.map(index, _)  
} 

4)、flatMap

与map类似,将原RDD中的每个元素通过函数f转换为新的元素,并将这些元素放入一个集合,也称之为扁平映射

5)、glom

将同一个分区的数据直接转换为相同类型的内存数组进行处理,分区不变

val dataRDD = sparkContext.makeRDD(List( 1,2,3,4),1)  
val dataRDD1:RDD[Array[Int]] = dataRDD.glom() 

6)、groupBy

将数据根据指定的规则进行分组, 分区默认不变

val dataRDD = sparkContext.makeRDD(List(1,2,3,4),1) 
val dataRDD1 = dataRDD.groupBy(  
_%2  
) 

7) 、filter

将数据根据指定的规则进行筛选过滤,符合规则的数据保留,不符合规则的数据丢弃(数据进行筛选过滤后,分区不变,但是分区内的数据可能不均衡,可能会出现数据倾斜)

val dataRDD = sparkContext.makeRDD(List( 1,2,3,4  ),1)  
val dataRDD1 = dataRDD.filter(_%2 == 0)  

8)、sample

根据指定的规则从数据集中抽取数据

def sample( withReplacement: Boolean, fraction: Double,  
seed: Long = Utils.random.nextLong): RDD[T]  

函数示例:
val dataRDD = sparkContext.makeRDD(List( 1,2,3,4),1)  
// 抽取数据不放回(伯努利算法)  
// 伯努利算法:又叫 0、1 分布。例如扔硬币,要么正面,要么反面。  
// 具体实现:根据种子和随机算法算出一个数和第二个参数设置几率比较,小于第二个参数要,大于不要  
// 第一个参数:抽取的数据是否放回,false:不放回  
// 第二个参数:抽取的几率,范围在[0,1]之间,0:全不取;1:全取;  
// 第三个参数:随机数种子  
val dataRDD1 = dataRDD.sample(false, 0.5)  
// 抽取数据放回(泊松算法)  
// 第一个参数:抽取的数据是否放回,true:放回;false:不放回  
// 第二个参数:重复数据的几率,范围大于等于 0.表示每一个元素被期望抽取到的次数  
// 第三个参数:随机数种子  
val dataRDD2 = dataRDD.sample(true, 2) 

9)、distinct

将数据集中重复的数据去重

10)、coalesce

用于减小分区

存在过多的小任务的时候,可以通过 coalesce 方法,收缩合并分区,减少分区的个数,减小任务调度成本

val dataRDD = sparkContext.makeRDD(List( 1,2,3,4,1,2),6)  
val dataRDD1 = dataRDD.coalesce(2)

11)、repartition

该操作内部其实执行的是 coalesce 操作,参数 shuffle 的默认值为 true。可以实现增加分区和减小分区。

val dataRDD = sparkContext.makeRDD(List( 1,2,3,4,1,2),2)  
val dataRDD1 = dataRDD.repartition(4) 

12)、sortBy

该操作用于排序数据

默认为升序排列。排序后新产生的 RDD 的分区数与原RDD 的分区数一致。中间存在 shuffle 的过程

局部排序,非全局排序

双Value数据类型的Transformation算子

1)、intersection

对源RDD 和参数RDD 求交集后返回一个新的RDD

val dataRDD1 = sparkContext.makeRDD(List(1,2,3,4)) 
val dataRDD2 = sparkContext.makeRDD(List(3,4,5,6)) 
val dataRDD = dataRDD1.intersection(dataRDD2)

2)、union

对源RDD 和参数RDD 求并集后返回一个新的RDD

类似于sql union all 操作

val dataRDD1 = sparkContext.makeRDD(List(1,2,3,4)) 
val dataRDD2 = sparkContext.makeRDD(List(3,4,5,6)) 
val dataRDD = dataRDD1.union(dataRDD2)  

3)、subtract

以一个 RDD 元素为主,去除两个 RDD 中重复元素,将其他元素保留下来。求差集

val dataRDD1 = sparkContext.makeRDD(List(1,2,3,4)) 
val dataRDD2 = sparkContext.makeRDD(List(3,4,5,6)) 
val dataRDD = dataRDD1.subtract(dataRDD2)  

类似于sql not in  操作

4)、zip

将两个 RDD 中的元素,以键值对的形式进行合并。其中,键值对中的Key 为第 1 个 RDD中的元素,Value 为第 2 个 RDD 中的相同位置的元素

val dataRDD1 = sparkContext.makeRDD(List(1,2,3,4))  
val dataRDD2 = sparkContext.makeRDD(List(3,4,5,6))   
val dataRDD = dataRDD1.zip(dataRDD2)

5)、cartesian

笛卡尔操作,对输入RDD内的所有元素计算笛卡尔积

val x = sc.parallelize(List(1,2,3,4,5))
val y = sc.parallelize(List(6,7,8,9,10))   
x.cartesian(y).collect 

Key-Value数据类型的Transformation算子

处理数据类型为Key-Value的Transmation算子,大致可以分为三类:

  • 输入输出分区1对1
  • 聚集操作
  • 连接

输入输出分区1对1

1)、mapValues

原RDD中的Key保持不变,与新的Value一起组成新的RDD中的元素

val a = sc.parallelize(List("dog", "tiger", "lion", "cat", "panther", "eagle"), 2)
val b = a.map(x => (x.length, x))
b.mapValues("x" + _ + "x").collect

聚集操作

对一个RDD聚集

1)、partitionBy

将数据按照指定Partitioner 重新进行分区。Spark 默认的分区器是HashPartitioner

val rdd: RDD[(Int, String)] = sc.makeRDD(Array((1,"aaa"),(2,"bbb"),(3,"ccc")),3)  
import org.apache.spark.HashPartitioner  
val rdd2: RDD[(Int, String)] = rdd.partitionBy(new HashPartitioner(2))

2)、reduceByKey

可以将数据按照相同的Key 对Value 进行聚合

val dataRDD1 = sparkContext.makeRDD(List(("a",1),("b",2),("c",3))) 
val dataRDD2 = dataRDD1.reduceByKey(_+_)  
val dataRDD3 = dataRDD1.reduceByKey(_+_, 2)

同groupBy

3)、groupByKey

可以将数据按照相同的Key 对Value 进行聚合

val dataRDD1 = sparkContext.makeRDD(List(("a",1),("b",2),("c",3)))  
val dataRDD2 = dataRDD1.groupByKey() 
val dataRDD3 = dataRDD1.groupByKey(2)  
val dataRDD4 = dataRDD1.groupByKey(new HashPartitioner(2))  

reduceByKey 和 groupByKey 的区别

reduceByKey 可以在 shuffle 前对分区内相同 key 的数据进行预聚合(combine)功能

reduceByKey 其实包含分组和聚合的功能。GroupByKey 只能分组,不能聚合,所以在分组聚合的场合下,推荐使用 reduceByKey,如果仅仅是分组而不需要聚合。那么还是只能使用groupByKey

4)、aggregateByKey

将数据根据不同的规则进行分区内计算和分区间计算

val dataRDD1 = sparkContext.makeRDD(List(("a",1),("b",2),("c",3)))  
val dataRDD2 = dataRDD1.aggregateByKey(0)(_+_,_+_)
// aggregateByKey 算子是函数柯里化,存在两个参数列表  
// 1. 第一个参数列表中的参数表示初始值  
// 2. 第二个参数列表中含有两个参数  
//  2.1 第一个参数表示分区内的计算规则  
//  2.2 第二个参数表示分区间的计算规则  

5)、foldByKey

当分区内计算规则和分区间计算规则相同时,aggregateByKey 就可以简化为foldByKey

val dataRDD1 = sparkContext.makeRDD(List(("a",1),("b",2),("c",3))) 
val dataRDD2 = dataRDD1.foldByKey(0)(_+_)  

6)、combineByKey

最通用的对key-value 型 rdd 进行聚集操作的聚集函数

类似于aggregate(),combineByKey()允许用户返回值的类型与输入不一致

reduceByKey、foldByKey、aggregateByKey、combineByKey 的区别

reduceByKey: 相同 key 的第一个数据不进行任何计算,分区内和分区间计算规则相同
foldByKey: 相同 key 的第一个数据和初始值进行分区内计算,分区内和分区间计算规则相同
aggregateByKey:相同 key 的第一个数据和初始值进行分区内计算,分区内和分区间计算规则可以不相同
combineByKey:当计算时,发现数据结构不满足要求时,可以让第一个数据转换结构。分区内和分区间计算规则不相同

7)、sortByKey

在一个(K,V)的 RDD 上调用,K 必须实现 Ordered 接口(特质),返回一个按照 key 进行排序的

val dataRDD1 = sparkContext.makeRDD(List(("a",1),("b",2),("c",3))) 
val sortRDD1: RDD[(String, Int)] = dataRDD1.sortByKey(true)
val sortRDD1: RDD[(String, Int)] = dataRDD1.sortByKey(false)

对两个RDD聚集

1)、cogroup

在类型为(K,V)和(K,W)的RDD 上调用,返回一个(K,(Iterable<V>,Iterable<W>))类型的迭代器

val dataRDD1 = sparkContext.makeRDD(List(("a",1),("a",2),("c",3)))  
val dataRDD2 = sparkContext.makeRDD(List(("a",1),("c",2),("c",3))) 
val value: RDD[(String, (Iterable[Int], Iterable[Int]))] = dataRDD1.cogroup(dataRDD2)

使用场景:根据key进行内连接,value合并为元组,比如我们想获取某个用户购买过的所有物品列表

连接

1)、join

本质是对两个含有KV对元素的RDD进行coGroup算子协同划分,再通过flatMapValues将合并的数据分散

在类型为(K,V)和(K,W)的RDD 上调用,返回一个相同 key 对应的所有元素连接在一起的(K,(V,W))的RDD

val rdd: RDD[(Int, String)] = sc.makeRDD(Array((1, "a"), (2, "b"), (3, "c")))  
val rdd1: RDD[(Int, Int)] = sc.makeRDD(Array((1, 4), (2, 5), (3, 6))) 
rdd.join(rdd1).collect().foreach(println) 

2)、leftOuterJoin

相当于在join基础上判断一侧的RDD是否为空,如果为空,则填充空,如果有数据,则将数据进行连接计算,然后返回结果

类似于 SQL 语句的左外连接

val dataRDD1 = sparkContext.makeRDD(List(("a",1),("b",2),("c",3)))  
val dataRDD2 = sparkContext.makeRDD(List(("a",1),("b",2),("c",3)))  
val rdd: RDD[(String, (Int, Option[Int]))] = dataRDD1.leftOuterJoin(dataRDD2)

RDD 行动算子

行动算子的分类

Action算子—无输出

Action算子—HDFS

Action算子—Scala集合及数据类型

1)、reduce

聚集RDD 中的所有元素,先聚合分区内数据,再聚合分区间数据

val rdd: RDD[Int] = sc.makeRDD(List(1,2,3,4))  
  
// 聚合数据  
val reduceResult: Int = rdd.reduce(_+_)

2)、collect

collect:将RDD分散存储的元素转换为单机上的Scala数组并返回,类似于toArray功能

val rdd: RDD[Int] = sc.makeRDD(List(1,2,3,4))、  
// 收 集 数 据 到 Driver   
rdd.collect().foreach(println)

collectAsMap:与collect类似,将元素类型为key-value对的RDD,转换为Scala Map并返回,保存元素的KV结构

3)、count

返回RDD 中元素的个数

val rdd: RDD[Int] = sc.makeRDD(List(1,2,3,4))  
  
// 返回 RDD 中元素的个数  
val countResult: Long = rdd.count() 

4)、first

返回RDD 中的第一个元素

val rdd: RDD[Int] = sc.makeRDD(List(1,2,3,4))  
  
// 返回 RDD 中元素的个数  
val firstResult: Int = rdd.first() println(firstResult) 

5)、take

返回一个由RDD 的前 n 个元素组成的数组

val rdd: RDD[Int] = sc.makeRDD(List(1,2,3,4))  
  
// 返回 RDD 中元素的个数  
val takeResult: Array[Int] = rdd.take(2) 
println(takeResult.mkString(",")) 

6)、takeOrdered

返回该 RDD 排序后的前 n 个元素组成的数组

val rdd: RDD[Int] = sc.makeRDD(List(1,3,2,4))  
  
// 返回 RDD 中元素的个数  
val result: Array[Int] = rdd.takeOrdered(2) 

7)、aggregate

允许用户对RDD使用两个不同的reduce函数,第一个reduce函数对各个分区内的数据聚集,每个分区得到一个结果。第二个reduce函数对每个分区的结果进行聚集,最终得到一个总的结果。aggregate相当于对RDD内的元素数据归并聚集,且这种聚集是可以并行的。而fold与reduced的聚集是串行的

val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4), 8)  
  
// 将该 RDD 所有元素相加得到结果  
val result: Int = rdd.aggregate(0)(_ + _, _ + _) val result: Int = rdd.aggregate(10)(_ + _, _ + _) 

8)、fold

与reduce类似,不同的是每次对分区内的value聚集时,分区内初始化的值为zero value

val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4))  
  
val foldResult: Int = rdd.fold(0)(_+_)

9)、countByKey

统计每种 key 的个数

val rdd: RDD[(Int, String)] = s  "a"), (1, "a"), (2, "b"), (3, "c"), (3, "c")))  
  
// 统计每种 key 的个数  
val result: collection.Map[Int, Long] = rdd.countByKey() 

10)、save相关算子

将数据保存到不同格式的文件中

saveAsTextFile:函数将RDD保存为文本至HDFS指定目录,每次输出一行

saveAsObjectFile:将RDD分区中每10个元素保存为一个数组并将其序列化,映射为(null, BytesWritable(Y))的元素,以SequenceFile的格式写入HDFS

// 保存成 Text 文件  
rdd.saveAsTextFile("output")  
  
// 序列化成对象保存到文件  
rdd.saveAsObjectFile("output1")  
  
// 保存成 Sequencefile 文件  
rdd.map((_,1)).saveAsSequenceFile("output2")

11)、foreach

是对RDD中的每个元素执行无参数的f函数,返回Unit

val rdd: RDD[Int] = sc.makeRDD(List(1,2,3,4))  
  
// 收集后打印  
rdd.map(num=>num).collect().foreach(println) 
println("****************")  
// 分布式打印  
rdd.foreach(println) 

RDD持久化

RDDCache缓存

RDD 通过Cache 或者 Persist 方法将前面的计算结果缓存,默认情况下会把数据以缓存在 JVM 的堆内存中

并不是这两个方法被调用时立即缓存,要有行动算子进行触发(懒加载模式)

// 数据缓存。  
wordToOneRdd.cache()  
  
// 可以更改存储级别  
mapRdd.persist(StorageLevel.MEMORY_AND_DISK_2) 

cache 操作会增加血缘关系,不改变原有的血缘关系 

可以更改存储级别

存储级别

object StorageLevel {  
val NONE = new StorageLevel(false, false, false, false)    
val DISK_ONLY = new StorageLevel(true, false, false, false)  
val DISK_ONLY_2 = new StorageLevel(true, false, false, false, 2)   
val MEMORY_ONLY = new StorageLevel(false, true, false, true)  
val MEMORY_ONLY_2 = new StorageLevel(false, true, false, true, 2)   
val MEMORY_ONLY_SER = new StorageLevel(false, true, false, false)  
val MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, false, 2)   
val MEMORY_AND_DISK = new StorageLevel(true, true, false, true)  
val MEMORY_AND_DISK_2 = new StorageLevel(true, true, false, true, 2)   
val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false, false)  
val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2)   
val OFF_HEAP = new StorageLevel(true, true, true, false, 1)  

RDDCheckPoint检查点

所谓的检查点其实就是通过将RDD 中间结果写入磁盘

对 RDD 进行 checkpoint 操作并不会马上被执行,必须执行 Action 操作才能触发

// 设置检查点路径  
sc.setCheckpointDir("./checkpoint1")  
  
// 创建一个 RDD,读取指定位置文件:hello atguigu atguigu 
val lineRdd: RDD[String] = sc.textFile("input/1.txt")  
  
// 业务逻辑  
val wordRdd: RDD[String] = lineRdd.flatMap(line => line.split(" "))  
  
val wordToOneRdd: RDD[(String, Long)] = wordRdd.map { word => {  
(word, System.currentTimeMillis())  
}  
}  
  
// 增加缓存,避免再重新跑一个 job 做 checkpoint wordToOneRdd.cache()  
// 数据检查点:针对 wordToOneRdd 做检查点计算  
wordToOneRdd.checkpoint()  
  
// 触发执行逻辑  
wordToOneRdd.collect().foreach(println)

缓存和检查点区别

Cache 缓存只是将数据保存起来,不切断血缘依赖。Checkpoint 检查点切断血缘依赖
Cache 缓存的数据通常存储在磁盘、内存等地方,可靠性低。Checkpoint 的数据通常存储在HDFS 等容错、高可用的文件系统,可靠性高
建议对checkpoint()的RDD 使用Cache 缓存,这样 checkpoint 的 job 只需从 Cache 缓存中读取数据即可,否则需要再从头计算一次RDD

作者 admin

张宴银,大数据开发工程师

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注