Spark学习笔记之之RDD编程
1. RDD简介
RDD,全称Resilient Distributed Datasets(弹性分布式数据集),是Spark最为核心的概念,是Spark对数据的抽象。RDD是分布式的元素集合,每个RDD只支持读操作,且每个RDD都被分为多个分区存储到集群的不同节点上。除此之外,RDD还允许用户显示的指定数据存储到内存和磁盘中。
对RDD的操作,从类型上也比较简单,包括:创建RDD、转化已有的RDD以及在已有RDD的基础上进行求值。
现在理解这些概念可能比较抽象。简言之,我们可以把RDD看成是spark对数据的抽象,例如:可以为集群上的一个文件README.md创建一个RDD对象lines
,lines
是spark的一个实例对象,我们通过操作lines
就可以对README.md的数据进行操作。
上个小栗子。
例1:
scala> val lines = sc.textFile("README.md")
lines: org.apache.spark.rdd.RDD[String] = README.md MapPartitionsRDD[1] at textFile at <console>:27
在上面的代码实例中,lines
就是spark内部的一个RDD实例,之后就可以对该变量进行操作。
在spark中,对RDD的操作分为两类:转换操作(transform)和行动操作(action)。
- 转换操作:通过一个RDD,生成一个新的RDD
- 行动操作:对RDD计算出一个结果,将结果返回驱动程序或者写入外部存储设备(例如hdfs)
例2 (承接例1):
scala> val pythonLines = lines.filter(line => line.contains("Python"))
pythonLines: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[2] at filter at <console>:29
scala> pythonLines.first()
res1: String = high-level APIs in Scala, Java, Python, and R, and an optimized engine that
在例2中,filter()
是一个转换操作,转换已有的RDD实例lines
,生成新的RDD实例pythonLines
,在此过程中,spark并没有进行实际的计算,称为惰性计算,只有第一次在一个行动操作时,才会计算。该函数接受一个匿名函数作为参数,完成数据的过滤。
first()
是一个行动操作,整个代码的结果就是:获取README.md文件中第一行包含Python子串的文本。
优点: 在定义RDD时,spark并没有将数据立即加入内存,而是了解完成整个操作链之后,只获取所需的数据即可。上述例子中,spark在找到第一行包含Python子串的文本后就停止读取数据了。
默认情况下,spark每次运行进行行动操作时,都会对RDD进行重新计算。如果想在多个行动操作中,重用一个RDD,可以将一个RDD使用persist()
缓存下来,在第一次对RDD计算之后,spark会将数据以分区的方式保存到集群的多台机器中。
spark程序的整体工作方式为:
- 从外部数据创建一个RDD
- 转换已有的RDD,形成新的RDD
- 调用
persist()
函数,对需要被重用的RDD进行持久化 - 使用行动操作进行计算
2. RDD创建
2.1. parallelize()
parallelize()
是创建RDD最简单的方法,通过程序中已有的数据集合来创建RDD。
例3:
scala> val list = List("meituan","dianping")
list: List[String] = List(meituan, dianping)
scala> val lines = sc.parallelize(list)
lines: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[0] at parallelize at <console>:29
2.2. textFile()
textFile()
用于从一个文本文件中读入数据,并创建一个存储字符串的RDD。
首先创建一个文本文件用于演示下面的例子,log.txt
:
This is the first error.
This is the second error.
THis is the first warning.
THis is the second warning.
THis is nothing.
THis is the first error and warning.
例4:
scala> val inputRDD = sc.textFile("log.txt")
inputRDD: org.apache.spark.rdd.RDD[String] = log.txt MapPartitionsRDD[1] at textFile at <console>:27
例4中,使用textFile()
函数创建了inputRDD,该RDD存储字符串。
3. RDD操作
RDD操作包括:转换操作和行动操作,可以通过一个函数的返回值判断操作的类型。
3.1. 转换操作
3.1.1. filter()
继续使用例4中的log.txt
来演示。
例5:
scala> val inputRDD = sc.textFile("log.txt")
inputRDD: org.apache.spark.rdd.RDD[String] = log.txt MapPartitionsRDD[1] at textFile at <console>:27
scala> val errorLines = inputRDD.filter(line => line.contains("error"))
errorLines: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[2] at filter at <console>:29
scala> errorLines.count()
res1: Long = 3
scala> errorLines.foreach(line => println(line))
This is the first error.
This is the second error.
THis is the first error and warning.
例5中,使用filter()
函数过滤包含”error”的行,使用count()
函数计算过滤后RDD的元素个数,并用foreach()
函数将其元素输出到控制台。
3.1.2. union()
union()
是另一个转换函数,可以合并两个RDD,合并后有重复元素
例6(承接例5):
scala> val warningLines = inputRDD.filter(line => line.contains("warning"))
warningLines: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[5] at filter at <console>:29
scala> warningLines.foreach(line => (println(line)))
THis is the first warning.
THis is the second warning.
THis is the first error and warning.
scala> val badLines = errorLines.union(warningLines)
badLines: org.apache.spark.rdd.RDD[String] = UnionRDD[6] at union at <console>:33
scala> badLines.foreach(line => println(line))
This is the first error.
This is the second error.
THis is the first error and warning.
THis is the first warning.
THis is the second warning.
THis is the first error and warning.
例6中又创建另一个包含warning
字符串的warningRDD
, 利用union()
函数获取包含error
或者包含warning
的字符串ihe,从结果可以看出,有后有重复元素。
最后说明一点:Spark使用谱系图记录RDD之间的依赖关系。
3.2. 行动操作
行动操作会将结果返回驱动程序,或者写入外部存储设备。注意:每调用一次新的行动操作,RDD都会重新计算,除非将中间结果持久化。
3.2.1. count()
参见例5 。
3.2.2. first()
参见例2 。
3.2.3. take()
take()
函数返回RDD中的部分元素。
例7:
scala> val inputRDD = sc.textFile("log.txt")
inputRDD: org.apache.spark.rdd.RDD[String] = log.txt MapPartitionsRDD[1] at textFile at <console>:27
scala> val errorLines = inputRDD.filter(line => line.contains("error"))
errorLines: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[2] at filter at <console>:29
scala> errorLines.count()
res0: Long = 3
scala> errorLines.take(1).foreach(line=>println(line))
This is the first error.
3.2.4. collect()
与take()
函数不同,collect()
函数会获取整个RDD数据,所以只有当单台机器的内存可以装入时调用该函数。
例8:
scala> errorLines.collect()
res7: Array[String] = Array(This is the first error., This is the second error., THis is the first error and warning.)
collect()
函数返回包含所有RDD元素的数组!!!
3.3. 惰性求值
惰性求值是指转化操作并没有立即执行,类似sc.textFile()
这样的读操作也是惰性的。实际上,我们应该把RDD看成通过转化操作构建出来的、记录如何进行计算的指令集。
3.4. 向Spark传递参数
向Spark传递参数时,要避免意外的将参数所在的对象都进行传递。
例9:
class SearchFunctions(val query: String){
def isMatch(s: String): boolean = {
s.contains(query);
}
def getMatchesFunctionReference(rdd: RDD[String]): RDD[String] = {
// 问题:“isMatch”表示this.isMatch(),因此要传递整个对象
rdd.map(isMatch());
}
def getMatchesFieldReference(rdd: RDD[String]): RDD[String] = {
// 问题:“query”表示this.query,因此要传递整个对象
rdd.map(x => x.split(query));
}
def getMatchesNoReference(rdd: RDD[String]): RDD[String] = {
// 安全:只把我们需要的字段拿出来放入局部变量中
val query2 = this.query
rdd.map(x => x.split(query2));
}
}
4. 常见的RDD转化操作
4.1. map()
接收一个函数作为参数,将该函数应用于RDD中的每个元素,将函数的返回值作为一个新的RDD。
4.2. filter()
接收一个返回boolean类型的函数作为参数,对RDD进行过滤。
例11,求一个集合中每个正数的平方。
scala> val data = sc.parallelize(List(-3, -2, -1, 0, 1, 2, 3))
data: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:27
scala> val squared = data.filter(x => x > 0).map(x => x * x)
squared: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[2] at map at <console>:29
scala> println(squared.collect().mkString(","))
1,4,9
4.3. flatMap()
flatMap()
接收一个函数作为参数,该函数将每个元素转为一个列表
,最终flatMap()
并不是返回由上述列表作为元素组成的RDD,而是返回一个包含每个列表所有元素的RDD
例子12:
scala> val lines = sc.parallelize(List("hello world", "hello spark"))
lines: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[0] at parallelize at <console>:27
scala> val tem = lines.flatMap(x => x.split(" "))
tem: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[1] at flatMap at <console>:29
scala> tem.foreach(x => println(x))
16/05/23 22:09:27 WARN SizeEstimator: Failed to check whether UseCompressedOops is set; assuming yes
hello
world
hello
spark
4.4.1. 伪集合操作之distinct()
distinct()
函数可以去除RDD中的重复元素。但该操作需要在全网范围内进行混洗,操作开销较大
4.4.2. 伪集合操作之union()
参考3.1.2.
, 注意包含重复元素
4.4.3. 伪集合操作之intersection()
intersection()
函数用于求两个RDD中都有的元素。注意:该函数会去掉重复的元素,单个RDD中的重复元素也会删除,因此需要数据混洗
4.4.4. 伪集合操作之substract()
substract()
用于求位于第一个RDD中而不位于第二个RDD中的元素。注意:该函数会混洗、去重
4.4.4. 伪集合操作之cartesian()
cartesian()
函数返回所有可能的(a, b)对,其中,a来自第一个RDD,b来自第二个RDD。
例12, 伪集合操作:
scala> val data1 = sc.parallelize(List(1, 2, 3, 3))
data1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:27
scala> val data2 = sc.parallelize(List(2, 2, 3, 4, 4, 5))
data2: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[1] at parallelize at <console>:27
scala> data1.map(x => x * x).foreach(x => println(x))
16/05/24 22:43:53 WARN SizeEstimator: Failed to check whether UseCompressedOops is set; assuming yes
1
4
9
9
scala> data1.filter(x => x <= 2).foreach(x=>println(x))
1
2
scala> data1.flatMap(x=>x.to(3)).foreach(x=>println(x))
1
2
3
2
3
3
3
scala> data1.distinct().foreach(x=>println(x)) // 去重
1
3
2
scala> data1.sample(false, 0.5).foreach(x=>println(x)) // 采样
1
3
scala> data1.union(data2).foreach(x=>println(x)) //保留了两个RDD所有的元素
1
2
3
3
2
2
3
4
4
5
scala> data1.intersection(data2).foreach(x=>println(x)) // 交集,去重
3
2
scala> data1.substract(data2).foreach(x=>println(x)) // 差集,去重
1
scala> data1.cartesian(data2).foreach(x=>println(x))
(1,2)
(1,2)
(1,3)
(1,4)
(1,4)
(1,5)
(2,2)
(2,2)
(2,3)
(2,4)
(2,4)
(2,5)
(3,2)
(3,2)
(3,3)
(3,4)
(3,4)
(3,5)
(3,2)
(3,2)
(3,3)
(3,4)
(3,4)
(3,5)
5. 常见的RDD行动操作
5.1. reduce() 函数
reduce()
接收一个函数作为参数,该函数以两个相同类型的元素作为输入,并返回一个同类型的元素,用于求和等。
例13:
scala> val data = sc.parallelize(List(1, 2, 3, 4))
data: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:27
scala> val sum = data.reduce((x, y) => x + y)
16/05/31 22:29:52 WARN SizeEstimator: Failed to check whether UseCompressedOops is set; assuming yes
sum: Int = 10
5.2. fold() 函数
fold()
接收一个与reduce()
接收的函数签名相同的函数,再加上一个“初始值”来作为每个分区第一次调用的结果。
例子14:
scala> val data = sc.parallelize(List(1, 2, 3, 4))
data: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:27
scala> val sum = data.fold(0)((x, y)=> x + y) // fold()函数需要一个初始值,该初始值对于每个分区都有效
sum: Int = 10
5.3. aggregate() 函数
利用aggregate()
函数计算平均值
例15,求均值:
scala> val data = sc.parallelize(List(1, 2, 3, 4))
data: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:27
scala> val result = data.aggregate((0, 0))( // (0, 0)为初始值,第一个0代表每个分区累加的和,第二个0代表每个分区累加的个数
(acc, value) => (acc._1 + value, acc._2 + 1), // acc为一个二元组,第一个元素代表每个分区累加的和,第二个0元素代表每个分区累加的个数;value代表每个RDD中的元素
(acc1, acc2)=>(acc1._1 + acc2._1, acc1._2 + acc2._2) // 该函数汇总所有分区的结果
)
16/05/31 22:43:38 WARN SizeEstimator: Failed to check whether UseCompressedOops is set; assuming yes
result: (Int, Int) = (10,4)
scala> val avg = result._1 / result._2;
avg: Int = 2
scala> val avg = result._1 / result._2.toDouble
avg: Double = 2.5