SparkCore算子

刘超 1年前 ⋅ 5406 阅读   编辑

目录

版本 分类 函数 数据依赖关系
transform map OneToOneDependency 


  mapValues
filter
filterByRange
flatMap
flatMapValues
sample
sampleByKey
glom
zipWithIndex/zipWithUniqueId
mapPartitions
mapPartitionsWithIndex
union(一般情况下) RangeDependency
union(特殊情况下) ManyToOneDependency
coalesce(shuffle=false)
zip
zipPartitions
cartesian ManyToManyDependency
partitionBy 单一ShuffleDependency
groupByKey
reduceByKey
aggregateByKey
combineByKey
foldByKey
sortByKey
coalesce(shuffle=true)
repartition
repartitionAndSortWithinPartitions
sortBy
distinct
cogroup/groupWith 多ShuffleDependency
join
intersection
subtract
subtractByKey
mapWith
withColumn  
makeRDD
 
reduceByKey  
action action()数据操作是用来对计算结果进行后处理的,同时提交计算job,经常在一连串transformation()后使用。然而,RDD的数据操作具有各种各样的名字,我们如何判断一个操作是action()还是transformation()?答案是看返回值,transformation()操作一般返回RDD类型,而action()操作一般返回数值、数据结构(如Map)或者不返回任何值(如写磁盘)。下面我们总结一下常用action()数据操作,分析这些操作如何得到最终计算结果
reduce(func)
collect()
collectAsMap()
count()
take(n)
first()
take()
takeOrdered()
top()
saveAsTextFile(path)
saveAsSequenceFile(path)
saveAsObjectFile()
saveAsHadoopFile()
foreach(func)
countByKey()
countByValue()
foreach()
foreachPartition()
fold()
reduce()
aggregate()
reduceByKeyLocality()
max()
min()
isEmpty()
lookup()

 


1、map
  a、说明:使用func对rdd1中每个record进行处理,输出一个新的record
  b、用法:rdd2 = rdd1.map(func)
  c、示例:

scala> // 数据源是一个呗=被划分为3份的《K,V》数组
scala> val inputRDD = sc.parallelize(Array[(Int,Char)]((1,'a'),(2,'b'),(3,'c'),(4,'d'),(2,'e'),(3,'f'),(2,'g'),(1,'h')),3)
inputRDD: org.apache.spark.rdd.RDD[(Int, Char)] = ParallelCollectionRDD[0] at parallelize at :24
scala> // 对于每个record,如r = (1,'a'),使用r._1得到其key值,加上下划线,然后使用r._2加上其Value值
scala> val resultRDD = inputRDD.map(r => r._1 + "_" + r._2)
resultRDD: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[2] at map at :26
scala> // 输出RDD包含的record
scala> resultRDD.foreach(println)
3_f
2_g
1_h
3_c
4_d
2_e
1_a
2_b

  d、处理流程图:

 

2、filter
  a、说明:对rdd1中的每个record进行func操作,如果结果为true,则保留这个record,所有保留的record将形成新的rdd2
  b、用法:rdd2 = rdd1.filter(func)
  c、示例:

scala> // 输出rdd1中key为偶数的record
scala> val inputRDD = sc.parallelize(Array[(Int,Char)]((1,'a'),(2,'b'),(3,'c'),(4,'d'),(2,'e'),(3,'f'),(2,'g'),(1,'h')),3)
inputRDD: org.apache.spark.rdd.RDD[(Int, Char)] = ParallelCollectionRDD[5] at parallelize at :24
scala> val resultRDD = inputRDD.filter(r => r._1 % 2 == 0)
resultRDD: org.apache.spark.rdd.RDD[(Int, Char)] = MapPartitionsRDD[6] at filter at :26
scala> resultRDD.foreach(println)
(2,b)
(4,d)
(2,e)
(2,g)

  d、处理流程图:

 

3、filterByRange

  a、说明:对rdd1中的数据进行过滤,只保留[lower,upper]之间的record
  b、用法:rdd2 = rdd1.filterByRange(func)
c、示例:

scala> // 输出rdd1中key在[2,4]中的record
scala> val inputRDD = sc.parallelize(Array[(Int,Char)]((1,'a'),(2,'b'),(3,'c'),(4,'d'),(2,'e'),(3,'f'),(2,'g'),(1,'h')),3)
inputRDD: org.apache.spark.rdd.RDD[(Int, Char)] = ParallelCollectionRDD[7] at parallelize at :24
scala> val resultRDD = inputRDD.filterByRange(2,4)
resultRDD: org.apache.spark.rdd.RDD[(Int, Char)] = MapPartitionsRDD[8] at filterByRange at :26
scala> resultRDD.foreach(println)
(3,c)
(2,b)
(3,f)
(4,d)
(2,g)
(2,e)

  d、处理流程图:

 

3、flatMap
  a、说明:对rdd1中每个元素(如list)执行func操作,得到新元素,然后将所有新元素组合得到rdd2。例如,rdd1中某个分区中包含两个元素list(1,2)和list(3,4),func是对list中的每个元素加1,那么最后的到的rdd2中该分区的元素是(2,3,4,5)
  b、用法:rdd2 = rdd1.flatMap(func)
  c、示例: 

scala> // 数据源是3个字符串
scala> val inputRDD = sc.parallelize(Array[String]("how do you do","are you ok","thanks","bye bye","I'm ok"),3)
inputRDD: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[9] at parallelize at :24
scala> // 使用flatMap()对字符串进行分词,得到一组单词
scala> val resultRDD = inputRDD.flatMap(x => x.split(" "))
resultRDD: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[10] at flatMap at :26
scala> resultRDD.foreach(println)
how
do
you
do
bye
bye
are
you
ok
thanks
I'm
ok

  d、处理流程图:

 

4、flatMapValues
  a、说明:与flatMap()相同,但只针对rdd1中record 《K,V》中的Value进行flatMapValues()操作
  b、用法:rdd2 = rdd1.flatMapValues(func)
  c、示例:

scala> val inputRDD = sc.parallelize(Array[(Int,String)]((1,"how do you do"),(2,"are you ok"),(4,"thanks"),(5,"bye bye"),(2,"I'm ok")),3)
inputRDD: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[11] at parallelize at :24
scala> val resultRDD = inputRDD.flatMapValues(x => x.split(" "))
resultRDD: org.apache.spark.rdd.RDD[(Int, String)] = MapPartitionsRDD[12] at flatMapValues at :26
scala> resultRDD.foreach(println)
(1,how)
(1,do)
(1,you)
(1,do)
(2,are)
(2,you)
(2,ok)
(4,thanks)
(5,bye)
(5,bye)
(2,I'm)
(2,ok)

  d、处理流程图:
 

4、mapPartitions
  语法:mapPartitions(func)
  说明:对rdd1中每个分区进行func操作,输出新的一组数据,与map的区别在如何计算RDD中的数据时已讲过
  示例(计算每个分区中奇数的和与偶数的和):

// 数据源是被划分为3份的列表
scala> val inputRDD = sc.parallelize(List(1,2,3,4,5,6,7,8,9),3)
inputRDD: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[3] at parallelize at :24
scala> val resultRDD = inputRDD.mapPartitions(iter => {
     |   var result = List[Int]()
     |   var odd = 0
     |   var even = 0
     |   
     |   while(iter.hasNext){
     |     val value = iter.next()
     |     if(value%2==0)
     |       even += value // 计算偶数的和
     |     else
     |       odd += value // 计算奇数的和
     |   }
     |   result = result :+ odd :+ even // 将计算结果放入result列表中
     |   result.iterator // 输出result列表
     | })
resultRDD: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[4] at mapPartitions at :25
scala> resultRDD.foreach(println)
16
5
4
2
10
8

  逻辑处理流程: