目录
1、map
a、说明:使用func对rdd1中每个record进行处理,输出一个新的record
b、用法:rdd2 = rdd1.map(func)
c、示例:
scala> // 数据源是一个呗=被划分为3份的《K,V》数组
scala> val inputRDD = sc.parallelize(Array[(Int,Char)]((1,'a'),(2,'b'),(3,'c'),(4,'d'),(2,'e'),(3,'f'),(2,'g'),(1,'h')),3)
inputRDD: org.apache.spark.rdd.RDD[(Int, Char)] = ParallelCollectionRDD[0] at parallelize at :24
scala> // 对于每个record,如r = (1,'a'),使用r._1得到其key值,加上下划线,然后使用r._2加上其Value值
scala> val resultRDD = inputRDD.map(r => r._1 + "_" + r._2)
resultRDD: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[2] at map at :26
scala> // 输出RDD包含的record
scala> resultRDD.foreach(println)
3_f
2_g
1_h
3_c
4_d
2_e
1_a
2_b
d、处理流程图:
2、filter
a、说明:对rdd1中的每个record进行func操作,如果结果为true,则保留这个record,所有保留的record将形成新的rdd2
b、用法:rdd2 = rdd1.filter(func)
c、示例:
scala> // 输出rdd1中key为偶数的record
scala> val inputRDD = sc.parallelize(Array[(Int,Char)]((1,'a'),(2,'b'),(3,'c'),(4,'d'),(2,'e'),(3,'f'),(2,'g'),(1,'h')),3)
inputRDD: org.apache.spark.rdd.RDD[(Int, Char)] = ParallelCollectionRDD[5] at parallelize at :24
scala> val resultRDD = inputRDD.filter(r => r._1 % 2 == 0)
resultRDD: org.apache.spark.rdd.RDD[(Int, Char)] = MapPartitionsRDD[6] at filter at :26
scala> resultRDD.foreach(println)
(2,b)
(4,d)
(2,e)
(2,g)
d、处理流程图:
a、说明:对rdd1中的数据进行过滤,只保留[lower,upper]之间的record
b、用法:rdd2 = rdd1.filterByRange(func)
c、示例:
scala> // 输出rdd1中key在[2,4]中的record
scala> val inputRDD = sc.parallelize(Array[(Int,Char)]((1,'a'),(2,'b'),(3,'c'),(4,'d'),(2,'e'),(3,'f'),(2,'g'),(1,'h')),3)
inputRDD: org.apache.spark.rdd.RDD[(Int, Char)] = ParallelCollectionRDD[7] at parallelize at :24
scala> val resultRDD = inputRDD.filterByRange(2,4)
resultRDD: org.apache.spark.rdd.RDD[(Int, Char)] = MapPartitionsRDD[8] at filterByRange at :26
scala> resultRDD.foreach(println)
(3,c)
(2,b)
(3,f)
(4,d)
(2,g)
(2,e)
d、处理流程图:
3、flatMap
a、说明:对rdd1中每个元素(如list)执行func操作,得到新元素,然后将所有新元素组合得到rdd2。例如,rdd1中某个分区中包含两个元素list(1,2)和list(3,4),func是对list中的每个元素加1,那么最后的到的rdd2中该分区的元素是(2,3,4,5)
b、用法:rdd2 = rdd1.flatMap(func)
c、示例:
scala> // 数据源是3个字符串
scala> val inputRDD = sc.parallelize(Array[String]("how do you do","are you ok","thanks","bye bye","I'm ok"),3)
inputRDD: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[9] at parallelize at :24
scala> // 使用flatMap()对字符串进行分词,得到一组单词
scala> val resultRDD = inputRDD.flatMap(x => x.split(" "))
resultRDD: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[10] at flatMap at :26
scala> resultRDD.foreach(println)
how
do
you
do
bye
bye
are
you
ok
thanks
I'm
ok
d、处理流程图:
4、flatMapValues
a、说明:与flatMap()相同,但只针对rdd1中record 《K,V》中的Value进行flatMapValues()操作
b、用法:rdd2 = rdd1.flatMapValues(func)
c、示例:
scala> val inputRDD = sc.parallelize(Array[(Int,String)]((1,"how do you do"),(2,"are you ok"),(4,"thanks"),(5,"bye bye"),(2,"I'm ok")),3)
inputRDD: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[11] at parallelize at :24
scala> val resultRDD = inputRDD.flatMapValues(x => x.split(" "))
resultRDD: org.apache.spark.rdd.RDD[(Int, String)] = MapPartitionsRDD[12] at flatMapValues at :26
scala> resultRDD.foreach(println)
(1,how)
(1,do)
(1,you)
(1,do)
(2,are)
(2,you)
(2,ok)
(4,thanks)
(5,bye)
(5,bye)
(2,I'm)
(2,ok)
d、处理流程图:
4、mapPartitions
语法:mapPartitions(func)
说明:对rdd1中每个分区进行func操作,输出新的一组数据,与map的区别在如何计算RDD中的数据时已讲过
示例(计算每个分区中奇数的和与偶数的和):
// 数据源是被划分为3份的列表
scala> val inputRDD = sc.parallelize(List(1,2,3,4,5,6,7,8,9),3)
inputRDD: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[3] at parallelize at :24
scala> val resultRDD = inputRDD.mapPartitions(iter => {
| var result = List[Int]()
| var odd = 0
| var even = 0
|
| while(iter.hasNext){
| val value = iter.next()
| if(value%2==0)
| even += value // 计算偶数的和
| else
| odd += value // 计算奇数的和
| }
| result = result :+ odd :+ even // 将计算结果放入result列表中
| result.iterator // 输出result列表
| })
resultRDD: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[4] at mapPartitions at :25
scala> resultRDD.foreach(println)
16
5
4
2
10
8
逻辑处理流程: