Spark - RDD使用详解1(核心属性、创建方式、分区)
RDD(Resilient Distributed Dataset)叫做弹性分布式数据集,是 Spark 中最基本的数据处理模型。它在代码中是一个抽象类,代表一个弹性的、不可变、可分区、里面的元素可并行计算的集合。
(2)其实从底层代码实现来讲,makeRDD 方法内部调用的就是 parallelize 方法,因此这两个方法效果没有区别:
(3)下面代码则从本地的磁盘文件上读取数据:
(4)下面代码则从 HDFS 上读取数据:
(2)运行后在项目根目录会生成一个 output 文件夹,里面有 3 个分区文件:
(4)为何出现这种分区结果,可以查看 Spark 数据分区规则的核心代码,具体如下:
(2)我们原始的 word.txt 文件内容如下:
(3)生成的三个分区文件内容分别如下:
(4)出现这种分区结果,是因为读取文件数据时,数据是按照 Hadoop 文件读取的规则进行切片分区,而切片规则和数据读取的规则有些差异,具体可以查看如下 Spark 数据分区规则的核心代码:
一、五大核心属性
1,分区列表(getPartitions)
对于 RDD 来说,每个分片都会被一个计算任务处理,并决定并行计算的粒度。用户可以在创建 RDD 时指定 RDD 的分片个数,如果没有指定,那么就会采用默认值。默认值就是程序所分配到的 CPU Core 的数目。
protected def getPartitions : Array[Partition]
2,分区计算函数(compute)
Spark 中 RDD 的计算是以分片为单位的,每个 RDD 都会实现 compute 函数以达到这个目的。compute 函数会对迭代器进行复合,不需要保存每次计算的结果。
def compute(split : Partition, context : TaskContext) : Iterator[T]
3,RDD 之间的依赖关系(getDependencies)
RDD 的每次转换都会生成一个新的 RDD,所以 RDD 之间就会形成类似于流水线一样的前后依赖关系。在部分分区数据丢失时,Spark 可以通过这个依赖关系重新计算丢失的分区数据,而不是对 RDD 的所有分区进行重新计算。
protected def getDependencies : Seq[Dependency[_]] = deps
4,分区器(partitioner)(可选)
当数据为 key-value 类型数据时,可以通过设定分区器自定义数据的分区。partitioner 函数不但决定了 RDD 本身的分片数量, 也决定了 parent RDD Shuffle 输出时的分片数量。
提示:当前 Spark 中实现了两种类型的分片函数,一个是基于哈希的 HashPartitioner,另外一个是基于范围的 RangePartitioner。
@transient val partitioner: Option[Partitioner] = None
5,首选位置(getPreferredLocations)(可选)
计算数据时,可以根据计算节点的状态选择不同的节点位置进行计算。对于一个 HDFS 文件来说,这个列表保存的就是每个 Partition 所在的块的位置。按照“移动数据不如移动计算”的理念,Spark 在进行任务调度的时候,会尽可能地将计算任务分配到其所要处理数据块的存储位置。
protected def getPreferredLocations(split: Partition): Seq[String] = Nil
二、RDD 的创建方式
1,从集合(内存)中创建 RDD
(1)Spark 主要提供了两个方法:parallelize 和 makeRDD 来从集合中创建 RDD。
// 创建 Spark 运行配置对象 val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Hello") // 创建 Spark 上下文环境对象(连接对象) val sc : SparkContext = new SparkContext(sparkConf) // 使用 parallelize 方法创建 RDD val rdd1 = sc.parallelize(List(1,2,3,4)) rdd1.collect().foreach(println) // 使用 makeRDD 方法创建 RDD val rdd2 = sc.makeRDD(List(5,6,7,8)) rdd2.collect().foreach(println) //关闭 Spark sc.stop()
(2)其实从底层代码实现来讲,makeRDD 方法内部调用的就是 parallelize 方法,因此这两个方法效果没有区别:
2,从外部存储(文件)创建 RDD
(1)我们可以从外部存储系统的数据集创建 RDD 包括:本地的文件系统,所有 Hadoop 支持的数据集(如 HDFS、HBase 等)。比如我们在项目根目录创建一个 datas 文件夹,然后里面存放一个 txt 文件;
(2)下面代码通过这个 txt 文件创建相应的 RDD:
// 创建 Spark 运行配置对象 val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Hello") // 创建 Spark 上下文环境对象(连接对象) val sc : SparkContext = new SparkContext(sparkConf) // 使用文件创建 RDD val fileRDD: RDD[String] = sc.textFile("datas/word.txt") fileRDD.collect().foreach(println) //关闭 Spark sc.stop()
(3)下面代码则从本地的磁盘文件上读取数据:
// 创建 Spark 运行配置对象 val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Hello") // 创建 Spark 上下文环境对象(连接对象) val sc : SparkContext = new SparkContext(sparkConf) // 使用文件创建 RDD val fileRDD: RDD[String] = sc.textFile("D:\\temp\\word.txt") fileRDD.collect().foreach(println) //关闭 Spark sc.stop()
(4)下面代码则从 HDFS 上读取数据:
// 创建 Spark 运行配置对象 val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Hello") // 创建 Spark 上下文环境对象(连接对象) val sc : SparkContext = new SparkContext(sparkConf) // 使用文件创建 RDD val fileRDD: RDD[String] = sc.textFile("hdfs://node1:9000/hello.txt") fileRDD.collect().foreach(println) //关闭 Spark sc.stop()
3,从其他 RDD 创建
主要是通过一个 RDD 运算完后,再产生新的 RDD。
// 创建 Spark 运行配置对象 val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Hello") // 创建 Spark 上下文环境对象(连接对象) val sc : SparkContext = new SparkContext(sparkConf) // 创建RDD(通过集合) val rdd1: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4)) // 对数据进行转换 val rdd2: RDD[Int] = rdd1.map(_+1) // 打印结果 rdd2.collect().foreach(println) //关闭 Spark sc.stop()
4,直接创建 RDD
即使用 new 的方式直接构造 RDD,这种方式一般由 Spark 框架自身使用。日常我们开发时不会用到。三、RDD 分区和并行度
1,基本概念
- 分区(Partition)是将数据集划分为较小的片段,以便在集群上并行处理。每个分区都是数据集的一个子集,Spark 将这些分区分配给不同的执行器(executors)并行处理。
- 任务数(Task Number)是指作业中的任务数量,每个任务对应于一个分区上的操作。Spark 根据分区的数量和任务数来决定任务的并行度。
- 并行度(Parallelism)是指同时执行任务的能力,也可以理解为同时处理多个分区的能力。在 Spark 中,并行度通常与任务数或分区的数量相关联。Spark 根据分区的数量将任务分配给执行器,并尽可能使得每个执行器处理一个或多个任务。如果任务数多于分区数量,可能会有一些执行器处理多个任务。相反,如果任务数少于分区数量,可能会有一些执行器处于空闲状态。
2,设置集合数据源 RDD 分区数
(1)下面样例我们创建 RDD 时指定了分区数为 3,并且为能看到分区效果,我们还使用了saveAsTextFile() 方法将处理的数据保存成分区文件:
// 创建 Spark 运行配置对象 val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Hello") // 创建 Spark 上下文环境对象(连接对象) val sc : SparkContext = new SparkContext(sparkConf) // 创建RDD(指定分区数为3个) val rdd = sc.makeRDD(List(1,2,3,4,5), 3) // 将处理的数据保存成分区文件 rdd.saveAsTextFile("output") //关闭 Spark sc.stop()
(2)运行后在项目根目录会生成一个 output 文件夹,里面有 3 个分区文件:
(3)三个分区文件内容分别如下:
#part-00000 文件里面的内容: 1 #part-00001 文件里面的内容: 2 3 #part-00002 文件里面的内容: 4 5
(4)为何出现这种分区结果,可以查看 Spark 数据分区规则的核心代码,具体如下:
def positions(length: Long, numSlices: Int): Iterator[(Int, Int)] = { (0 until numSlices).iterator.map { i => val start = ((i * length) / numSlices).toInt val end = (((i + 1) * length) / numSlices).toInt (start, end) }
3,设置文件数据源 RDD 分区数
(1)下面样例我们创建 RDD 时指定了分区数为 3,并且为能看到分区效果,我们还使用了saveAsTextFile() 方法将处理的数据保存成分区文件:
// 创建 Spark 运行配置对象 val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Hello") // 创建 Spark 上下文环境对象(连接对象) val sc : SparkContext = new SparkContext(sparkConf) // 创建RDD(指定分区数为3个) val rdd = sc.textFile("datas/word.txt", 3) // 将处理的数据保存成分区文件 rdd.saveAsTextFile("output") //关闭 Spark sc.stop()
(2)我们原始的 word.txt 文件内容如下:
hangge.com 航歌 china 1 2 3 4
(3)生成的三个分区文件内容分别如下:
#part-00000 文件里面的内容: hangge.com #part-00001 文件里面的内容: 航歌 china #part-00002 文件里面的内容: 1 2 3 4
(4)出现这种分区结果,是因为读取文件数据时,数据是按照 Hadoop 文件读取的规则进行切片分区,而切片规则和数据读取的规则有些差异,具体可以查看如下 Spark 数据分区规则的核心代码:
public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException { long totalSize = 0; // compute total size for (FileStatus file: files) { // check we have valid files if (file.isDirectory()) { throw new IOException("Not a file: "+ file.getPath()); } totalSize += file.getLen(); } long goalSize = totalSize / (numSplits == 0 ? 1 : numSplits); long minSize = Math.max(job.getLong(org.apache.hadoop.mapreduce.lib.input. FileInputFormat.SPLIT_MINSIZE, 1), minSplitSize); ... for (FileStatus file: files) { ... if (isSplitable(fs, path)) { long blockSize = file.getBlockSize(); long splitSize = computeSplitSize(goalSize, minSize, blockSize); ... } protected long computeSplitSize(long goalSize, long minSize, long blockSize) { return Math.max(minSize, Math.min(goalSize, blockSize)); }