当前位置: > > > Spark - RDD使用详解1(核心属性、创建方式、分区)

Spark - RDD使用详解1(核心属性、创建方式、分区)

   RDDResilient Distributed Dataset)叫做弹性分布式数据集,是 Spark 中最基本的数据处理模型。它在代码中是一个抽象类,代表一个弹性的、不可变、可分区、里面的元素可并行计算的集合。

一、五大核心属性

1,分区列表(getPartitions)

    对于 RDD 来说,每个分片都会被一个计算任务处理,并决定并行计算的粒度。用户可以在创建 RDD 时指定 RDD 的分片个数,如果没有指定,那么就会采用默认值。默认值就是程序所分配到的 CPU Core 的数目。
protected def getPartitions : Array[Partition]

2,分区计算函数(compute)

    SparkRDD 的计算是以分片为单位的,每个 RDD 都会实现 compute 函数以达到这个目的。compute 函数会对迭代器进行复合,不需要保存每次计算的结果。
def compute(split : Partition, context : TaskContext) : Iterator[T]

3,RDD 之间的依赖关系(getDependencies)

    RDD 的每次转换都会生成一个新的 RDD,所以 RDD 之间就会形成类似于流水线一样的前后依赖关系。在部分分区数据丢失时,Spark 可以通过这个依赖关系重新计算丢失的分区数据,而不是对 RDD 的所有分区进行重新计算。
protected def getDependencies : Seq[Dependency[_]] = deps

4,分区器(partitioner)(可选)

    当数据为 key-value 类型数据时,可以通过设定分区器自定义数据的分区。partitioner 函数不但决定了 RDD 本身的分片数量, 也决定了 parent RDD Shuffle 输出时的分片数量。
提示:当前 Spark 中实现了两种类型的分片函数,一个是基于哈希的 HashPartitioner,另外一个是基于范围的 RangePartitioner
@transient val partitioner: Option[Partitioner] = None

5,首选位置(getPreferredLocations)(可选)

    计算数据时,可以根据计算节点的状态选择不同的节点位置进行计算。对于一个 HDFS 文件来说,这个列表保存的就是每个 Partition 所在的块的位置。按照“移动数据不如移动计算”的理念,Spark 在进行任务调度的时候,会尽可能地将计算任务分配到其所要处理数据块的存储位置。
protected def getPreferredLocations(split: Partition): Seq[String] = Nil

二、RDD 的创建方式

1,从集合(内存)中创建 RDD

(1)Spark 主要提供了两个方法:parallelizemakeRDD 来从集合中创建 RDD
// 创建 Spark 运行配置对象
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Hello")
// 创建 Spark 上下文环境对象(连接对象)
val sc : SparkContext = new SparkContext(sparkConf)

// 使用 parallelize 方法创建 RDD
val rdd1 = sc.parallelize(List(1,2,3,4))
rdd1.collect().foreach(println)

// 使用 makeRDD 方法创建 RDD
val rdd2 = sc.makeRDD(List(5,6,7,8))
rdd2.collect().foreach(println)

//关闭 Spark
sc.stop()

(2)其实从底层代码实现来讲,makeRDD 方法内部调用的就是 parallelize 方法,因此这两个方法效果没有区别:

2,从外部存储(文件)创建 RDD

(1)我们可以从外部存储系统的数据集创建 RDD 包括:本地的文件系统,所有 Hadoop 支持的数据集(如 HDFSHBase 等)。比如我们在项目根目录创建一个 datas 文件夹,然后里面存放一个 txt 文件;

(2)下面代码通过这个 txt 文件创建相应的 RDD
// 创建 Spark 运行配置对象
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Hello")
// 创建 Spark 上下文环境对象(连接对象)
val sc : SparkContext = new SparkContext(sparkConf)

// 使用文件创建 RDD
val fileRDD: RDD[String] = sc.textFile("datas/word.txt")
fileRDD.collect().foreach(println)

//关闭 Spark
sc.stop()

(3)下面代码则从本地的磁盘文件上读取数据:
// 创建 Spark 运行配置对象
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Hello")
// 创建 Spark 上下文环境对象(连接对象)
val sc : SparkContext = new SparkContext(sparkConf)

// 使用文件创建 RDD
val fileRDD: RDD[String] = sc.textFile("D:\\temp\\word.txt")
fileRDD.collect().foreach(println)

//关闭 Spark
sc.stop()

(4)下面代码则从 HDFS 上读取数据:
// 创建 Spark 运行配置对象
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Hello")
// 创建 Spark 上下文环境对象(连接对象)
val sc : SparkContext = new SparkContext(sparkConf)

// 使用文件创建 RDD
val fileRDD: RDD[String] = sc.textFile("hdfs://node1:9000/hello.txt")
fileRDD.collect().foreach(println)

//关闭 Spark
sc.stop()

3,从其他 RDD 创建

主要是通过一个 RDD 运算完后,再产生新的 RDD
// 创建 Spark 运行配置对象
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Hello")
// 创建 Spark 上下文环境对象(连接对象)
val sc : SparkContext = new SparkContext(sparkConf)

// 创建RDD(通过集合)
val rdd1: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4))
// 对数据进行转换
val rdd2: RDD[Int] = rdd1.map(_+1)
// 打印结果
rdd2.collect().foreach(println)

//关闭 Spark
sc.stop()

4,直接创建 RDD

即使用 new 的方式直接构造 RDD,这种方式一般由 Spark 框架自身使用。日常我们开发时不会用到。

三、RDD 分区和并行度

1,基本概念

  • 分区Partition)是将数据集划分为较小的片段,以便在集群上并行处理。每个分区都是数据集的一个子集,Spark 将这些分区分配给不同的执行器(executors)并行处理。
  • 任务数Task Number)是指作业中的任务数量,每个任务对应于一个分区上的操作。Spark 根据分区的数量和任务数来决定任务的并行度。
  • 并行度Parallelism)是指同时执行任务的能力,也可以理解为同时处理多个分区的能力。在 Spark 中,并行度通常与任务数或分区的数量相关联。Spark 根据分区的数量将任务分配给执行器,并尽可能使得每个执行器处理一个或多个任务。如果任务数多于分区数量,可能会有一些执行器处理多个任务。相反,如果任务数少于分区数量,可能会有一些执行器处于空闲状态。

2,设置集合数据源 RDD 分区数

(1)下面样例我们创建 RDD 时指定了分区数为 3,并且为能看到分区效果,我们还使用了saveAsTextFile() 方法将处理的数据保存成分区文件:
// 创建 Spark 运行配置对象
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Hello")
// 创建 Spark 上下文环境对象(连接对象)
val sc : SparkContext = new SparkContext(sparkConf)

// 创建RDD(指定分区数为3个)
val rdd = sc.makeRDD(List(1,2,3,4,5), 3)
// 将处理的数据保存成分区文件
rdd.saveAsTextFile("output")

//关闭 Spark
sc.stop()

(2)运行后在项目根目录会生成一个 output 文件夹,里面有 3 个分区文件:

(3)三个分区文件内容分别如下:
#part-00000 文件里面的内容:
1

#part-00001 文件里面的内容:
2
3

#part-00002 文件里面的内容:
4
5

(4)为何出现这种分区结果,可以查看 Spark 数据分区规则的核心代码,具体如下:
def positions(length: Long, numSlices: Int): Iterator[(Int, Int)] = {
 (0 until numSlices).iterator.map { i =>
   val start = ((i * length) / numSlices).toInt
   val end = (((i + 1) * length) / numSlices).toInt
   (start, end)
  }

3,设置文件数据源 RDD 分区数

(1)下面样例我们创建 RDD 时指定了分区数为 3,并且为能看到分区效果,我们还使用了saveAsTextFile() 方法将处理的数据保存成分区文件:
// 创建 Spark 运行配置对象
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Hello")
// 创建 Spark 上下文环境对象(连接对象)
val sc : SparkContext = new SparkContext(sparkConf)

// 创建RDD(指定分区数为3个)
val rdd = sc.textFile("datas/word.txt", 3)
// 将处理的数据保存成分区文件
rdd.saveAsTextFile("output")

//关闭 Spark
sc.stop()

(2)我们原始的 word.txt 文件内容如下:
hangge.com
航歌
china
1
2
3
4

(3)生成的三个分区文件内容分别如下:
#part-00000 文件里面的内容:
hangge.com

#part-00001 文件里面的内容:
航歌
china

#part-00002 文件里面的内容:
1
2
3
4

(4)出现这种分区结果,是因为读取文件数据时,数据是按照 Hadoop 文件读取的规则进行切片分区,而切片规则和数据读取的规则有些差异,具体可以查看如下 Spark 数据分区规则的核心代码:
public InputSplit[] getSplits(JobConf job, int numSplits)
 throws IOException {
   long totalSize = 0; // compute total size
   for (FileStatus file: files) { // check we have valid files
     if (file.isDirectory()) {
      throw new IOException("Not a file: "+ file.getPath());
     }
     totalSize += file.getLen();
   }
   long goalSize = totalSize / (numSplits == 0 ? 1 : numSplits);
   long minSize = Math.max(job.getLong(org.apache.hadoop.mapreduce.lib.input.
   FileInputFormat.SPLIT_MINSIZE, 1), minSplitSize);
 
   ...
 
   for (FileStatus file: files) {
 
   ...
 
   if (isSplitable(fs, path)) {
     long blockSize = file.getBlockSize();
     long splitSize = computeSplitSize(goalSize, minSize, blockSize);
     ...
   }
   protected long computeSplitSize(long goalSize, long minSize,
   long blockSize) {
     return Math.max(minSize, Math.min(goalSize, blockSize));
   }
评论0