Spark - RDD使用详解5(行动算子)
七、行动算子
1,reduce
(1)该函数用于聚集 RDD 中的所有元素,先聚合分区内数据,再聚合分区间数据。
def reduce(f: (T, T) => T): T
(2)下面是一个 reduce 函数使用样例:
object Hello { def main(args: Array[String]): Unit = { // 创建 Spark 运行配置对象 val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Hello") // 创建 Spark 上下文环境对象(连接对象) val sc: SparkContext = new SparkContext(sparkConf) // 创建RDD val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4)) // 对数据集中的数据进行规约操作 val sum: Int = rdd.reduce(_ + _) // 打印结果 println(sum) //关闭 Spark sc.stop() } }
2,collect
(1)该函数用于将分布式数据集中的所有元素收集到驱动程序中,并以数组的形式返回。
def collect(): Array[T]
(2)下面是一个 collect 函数使用样例:
// 创建RDD val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4)) // 收集数据到 Driver val array: Array[Int] = rdd.collect() // 打印结果 array.foreach(println)
3,count
(1)该函数用于计算数据集中元素的数量,它返回 RDD 中元素的个数
def count(): Long
(2)下面是一个 count 函数使用样例:
// 创建RDD val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4)) // 统计 RDD 中元素的数量 val count: Long = rdd.count() // 打印结果 println(count)
4,first
(1)该函数用于获取数据集中的第一个元素,它返回 RDD 中的第一个元素。
def first(): T
(2)下面是一个 first 函数使用样例:
// 创建RDD val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4)) // 获取RDD中第一额元素 val first: Int = rdd.first() // 打印结果 println(first)
5,take
(1)该函数用于从数据集中获取指定数量的元素,它返回一个由 RDD 的前 n 个元素组成的数组:
def take(num: Int): Array[T]
(2)下面是一个 take 函数使用样例:
// 创建RDD val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4)) // 获取RDD前2个元素 val result: Array[Int] = rdd.take(2) // 打印结果 println(result.mkString(","))
6,takeOrdered
(1)该函数返回该 RDD 排序后的前 n 个元素组成的数组。
def takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T]
(2)下面是一个 takeOrdered 函数使用样例:
// 创建RDD val rdd: RDD[Int] = sc.makeRDD(List(5, 3, 1, 4, 2)) // 获取RDD排序后前2个元素 val result: Array[Int] = rdd.takeOrdered(2) // 打印结果 println(result.mkString(","))
7,aggregate
(1)该函数用于对数据集中的元素进行聚合操作,并返回一个聚合结果。它接受两个参数:初始值(零值)和一个聚合函数。
- 首先分区的数据通过初始值和分区内的数据进行聚合
- 然后再和初始值进行分区间的数据聚合
def aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U
(2)下面是一个 takeOrdered 函数使用样例:
// 创建RDD(指定分区数为3个) // 三个分区内容分别是(1,2,3)(4,5,6)(7,8,9,10) val rdd = sc.makeRDD(List(1,2,3,4,5,6,7,8,9,10), 3) // 对RDD数据进行聚合 val sum: Int = rdd.aggregate(1000)(_ + _, _ + _) // 打印结果 println(sum)
8,fold
(1)该函数用于对数据集中的元素进行聚合操作,并返回一个聚合结果。它是 aggregate 的简化版操作(分区内和分区间的聚合函数相同)
def fold(zeroValue: T)(op: (T, T) => T): T
(2)下面是一个 fold 函数使用样例:
// 创建RDD(指定分区数为3个) // 三个分区内容分别是(1,2,3)(4,5,6)(7,8,9,10) val rdd = sc.makeRDD(List(1,2,3,4,5,6,7,8,9,10), 3) // 对RDD数据进行聚合 val sum: Int = rdd.fold(1000)(_ + _) // 打印结果 println(sum)
9,countByKey
(1)该函数用于计算键值对 RDD 中每个键出现的次数,并返回一个键值对集合,其中键是唯一的,值是键出现的频率。
def countByKey(): Map[K, Long]
(2)下面是一个 countByKey 函数使用样例:
// 创建RDD val rdd = sc.makeRDD(List(("a", 1), ("b", 2), ("a", 3), ("c", 4), ("b", 5))) // 计算 RDD 中每个键的出现次数 val result: collection.Map[String, Long] = rdd.countByKey() // 打印结果 println(result)
10,foreach
(1)该函数可以分布式遍历 RDD 中的每一个元素,调用指定函数:
def foreach(f: T => Unit): Unit = withScope { val cleanF = sc.clean(f) sc.runJob(this, (iter: Iterator[T]) => iter.foreach(cleanF)) }
(2)下面样例使用 foreach 方法进行分布式打印元素:
// 创建RDD(指定分区数为3个) // 三个分区内容分别是(1,2,3)(4,5,6)(7,8,9,10) val rdd = sc.makeRDD(List(1,2,3,4,5,6,7,8,9,10), 3) // 分布式打印 rdd.foreach(println)
11,saveAsTextFile、saveAsObjectFile、saveAsSequenceFile
(1)这些函数用于将数据保存到不同格式的文件中:
- saveAsTextFile:该函数用于将数据集保存为文本文件,每个元素作为文件中的一行。它接受一个文件路径作为参数,并将数据集的内容保存到指定的路径中。
- saveAsObjectFile:该函数用于将数据集保存为序列化对象文件。它接受一个文件路径作为参数,并将数据集的内容保存为序列化对象。
- saveAsSequenceFile:该函数用于将键值对类型的 RDD 保存为 Hadoop 序列文件(SequenceFile)格式。每个键值对都作为 SequenceFile 中的一个记录。它接受一个文件路径和可选的压缩编解码器作为参数。
def saveAsTextFile(path: String): Unit def saveAsObjectFile(path: String): Unit def saveAsSequenceFile( path: String, codec: Option[Class[_ <: CompressionCodec]] = None): Unit
(2)下面样例使用 saveAsTextFile 函数将 RDD 数据保存到文本文件中(文件数由分区数决定):
// 创建RDD(两个分区) val rdd = sc.makeRDD(List("hello", "world", "hangge.com"), 2) // 保存成Text文件(存放在output文件夹下) rdd.saveAsTextFile("output")