Spark RDD编程指南

前言

在高层次上,每个 Spark 应用程序都包含一个驱动程序,该驱动程序运行用户的主要功能并在集群上执行各种并行操作。 Spark 提供的主要抽象是弹性分布式数据集 (RDD),它是跨集群节点分区的元素集合,可以并行操作。 RDD 是通过从 Hadoop 文件系统(或任何其他 Hadoop 支持的文件系统)中的文件或驱动程序中现有的 Scala 集合开始并对其进行转换来创建的。 用户还可以要求 Spark 将 RDD 持久化到内存中,以便在并行操作中有效地重用它。 最后,RDD 会自动从节点故障中恢复。

Spark 中的第二个抽象是可以在并行操作中使用的共享变量。 默认情况下,当 Spark 在不同节点上并行运行一个函数作为一组任务时,它会将函数中使用的每个变量的副本发送到每个任务。 有时,需要在任务之间或在任务和驱动程序之间共享变量。 Spark 支持两种类型的共享变量:广播变量,可用于在所有节点的内存中缓存一个值,以及累加器,它们是仅“添加”到的变量,例如计数器和总和。

本指南以 Spark 支持的每种语言显示了这些功能中的每一个。 如果您启动 Spark 的交互式 shell,则可以使用 – Scala shell 的 bin/spark-shell 或 Python 的 bin/pyspark。

与Spark建立连接

Spark 3.2.1 的构建和分发默认与 Scala 2.12 一起使用。 (Spark 也可以与其他版本的 Scala 一起使用。)要在 Scala 中编写应用程序,您需要使用兼容的 Scala 版本(例如 2.12.X)。

要编写 Spark 应用程序,您需要在 Spark 上添加 Maven 依赖项。 Spark 可通过 Maven Central 在以下位置获得:

groupId = org.apache.spark
artifactId = spark-core_2.12
version = 3.2.1

此外,如果您希望访问 HDFS 集群,您需要为您的 HDFS 版本添加对 hadoop-client 的依赖。

groupId = org.apache.hadoop
artifactId = hadoop-client
version = <your-hdfs-version>

最后,您需要将一些 Spark 类导入您的程序。 添加以下行:

import org.apache.spark.SparkContext
import org.apache.spark.SparkConf

(在 Spark 1.3.0 之前,您需要显式导入 org.apache.spark.SparkContext._ 以启用必要的隐式转换。)

初始化Spark

Spark 程序必须做的第一件事是创建一个 SparkContext 对象,它告诉 Spark 如何访问集群。 要创建 SparkContext,您首先需要构建一个包含应用程序信息的 SparkConf 对象。

每个 JVM 应该只有一个 SparkContext 处于活动状态。 在创建新的 SparkContext 之前,您必须 stop() 活动的 SparkContext。

val conf = new SparkConf().setAppName(appName).setMaster(master)
new SparkContext(conf)

appName 参数是您的应用程序在集群 UI 上显示的名称。 master 是 Spark、Mesos 或 YARN 集群 URL,或者是在本地模式下运行的特殊“local”字符串。 实际上,在集群上运行时,您不会希望在程序中对 master 进行硬编码,而是使用 spark-submit 启动应用程序并在那里接收它。 但是,对于本地测试和单元测试,您可以传递“local”以在进程内运行 Spark。

使用Shell

在 Spark shell 中,已在名为 sc 的变量中为您创建了一个特殊的解释器感知 SparkContext。 制作你自己的 SparkContext 是行不通的。 您可以使用 –master 参数设置上下文连接到哪个 master,并且可以通过将逗号分隔的列表传递给 –jars 参数来将 JAR 添加到类路径。 您还可以通过向 –packages 参数提供逗号分隔的 Maven 坐标列表来将依赖项(例如 Spark 包)添加到 shell 会话。 任何可能存在依赖项的其他存储库(例如 Sonatype)都可以传递给 –repositories 参数。 例如,要在四个内核上运行 bin/spark-shell,请使用:

$ ./bin/spark-shell --master local[4]

或者,要将 code.jar 添加到其类路径中,请使用:

$ ./bin/spark-shell --master local[4] --jars code.jar

要使用 Maven 坐标包含依赖项:

$ ./bin/spark-shell --master local[4] --packages "org.example:example:0.1"

如需完整的选项列表,请运行 spark-shell –help。 在后台,spark-shell 调用更通用的 spark-submit 脚本。

弹性分布式数据集 (RDD)

Spark 围绕弹性分布式数据集 (RDD) 的概念展开,RDD 是可以并行操作的元素的容错集合。 有两种方法可以创建 RDD:并行化驱动程序中的现有集合,或引用外部存储系统中的数据集,例如共享文件系统、HDFS、HBase 或任何提供 Hadoop InputFormat 的数据源。

并行数据集合

通过在驱动程序(Scala Seq)中的现有集合上调用 SparkContext 的 parallelize 方法来创建并行化集合。 复制集合的元素以形成可以并行操作的分布式数据集。 例如,这里是如何创建一个包含数字 1 到 5 的并行化集合:

val data = Array(1, 2, 3, 4, 5)
val distData = sc.parallelize(data)

创建后,分布式数据集 (distData) 可以并行操作。 例如,我们可以调用 distData.reduce((a, b) => a + b) 来将数组的元素相加。 我们稍后将描述对分布式数据集的操作。

并行集合的一个重要参数是将数据集切割成的分区数量。 Spark 将为集群的每个分区运行一个任务。 通常,您希望集群中的每个 CPU 有 2-4 个分区。 通常,Spark 会尝试根据您的集群自动设置分区数。 但是,您也可以通过将其作为第二个参数传递来手动设置它以进行并行化(例如 sc.parallelize(data, 10))。 注意:代码中的某些地方使用术语切片(分区的同义词)来保持向后兼容性。

外部数据集

Spark 可以从 Hadoop 支持的任何存储源创建分布式数据集,包括本地文件系统、HDFS、Cassandra、HBase、Amazon S3 等。Spark 支持文本文件、SequenceFiles 和任何其他 Hadoop 输入格式。

可以使用 SparkContext 的 textFile 方法创建文本文件 RDD。 此方法获取文件的 URI(机器上的本地路径,或 hdfs://、s3a:// 等 URI)并将其作为行集合读取。 这是一个示例调用:

scala> val distFile = sc.textFile("data.txt")
distFile: org.apache.spark.rdd.RDD[String] = data.txt MapPartitionsRDD[10] at textFile at <console>:26

创建后,可以通过数据集操作对 distFile 进行操作。 例如,我们可以使用 map 和 reduce 操作将所有行的大小相加,如下所示:distFile.map(s => s.length).reduce((a, b) => a + b)。

使用 Spark 读取文件的一些注意事项:

  • 如果使用本地文件系统上的路径,则该文件也必须可在工作节点上的同一路径上访问。 将文件复制到所有工作节点或使用网络安装的共享文件系统。
  • Spark 的所有基于文件的输入法,包括 textFile,都支持在目录、压缩文件和通配符上运行。 例如,您可以使用 textFile(“/my/directory”)、textFile(“/my/directory/.txt”) 和 textFile(“/my/directory/.gz”)。 当读取多个文件时,分区的顺序取决于文件从文件系统返回的顺序。 例如,它可能会也可能不会按照路径对文件的字典顺序进行排序。 在一个分区中,元素根据它们在底层文件中的顺序进行排序。
  • textFile 方法还采用可选的第二个参数来控制文件的分区数。 默认情况下,Spark 为文件的每个块创建一个分区(在 HDFS 中,块默认为 128MB),但您也可以通过传递更大的值来请求更大数量的分区。 请注意,您的分区不能少于块。

除了文本文件,Spark 的 Scala API 还支持其他几种数据格式:

  • SparkContext.wholeTextFiles 允许您读取包含多个小文本文件的目录,并将每个文件作为(文件名,内容)对返回。 这与 textFile 形成对比,后者将在每个文件中每行返回一条记录。 分区由数据局部性决定,在某些情况下,可能会导致分区太少。 对于这些情况,wholeTextFiles 提供了一个可选的第二个参数来控制最小的分区数。
  • 对于 SequenceFiles,使用 SparkContext 的 sequenceFile[K, V] 方法,其中 K 和 V 是文件中键和值的类型。 这些应该是 Hadoop 的 Writable 接口的子类,例如 IntWritable 和 Text。 此外,Spark 允许您为一些常见的 Writables 指定原生类型; 例如,sequenceFile[Int, String] 将自动读取 IntWritables 和 Texts。
  • 对于其他 Hadoop InputFormats,您可以使用 SparkContext.hadoopRDD 方法,该方法接受任意 JobConf 和输入格式类、键类和值类。 设置这些参数的方式与使用输入源的 Hadoop 作业相同。 您还可以将 SparkContext.newAPIHadoopRDD 用于基于“新” MapReduce API (org.apache.hadoop.mapreduce) 的 InputFormats。
  • RDD.saveAsObjectFile 和 SparkContext.objectFile 支持以由序列化 Java 对象组成的简单格式保存 RDD。 虽然这不如 Avro 等专用格式高效,但它提供了一种简单的方法来保存任何 RDD。

RDD操作

RDD 支持两种类型的操作:转换(从现有数据集创建新数据集)和操作(在对数据集运行计算后将值返回给驱动程序)。 例如,map 是一种转换,它通过一个函数传递每个数据集元素并返回一个表示结果的新 RDD。 另一方面,reduce 是一个操作,它使用某个函数聚合 RDD 的所有元素并将最终结果返回给驱动程序(尽管也有一个并行的 reduceByKey,它返回一个分布式数据集)。

Spark 中的所有转换都是惰性的,因为它们不会立即计算结果。 相反,他们只记得应用于某些基础数据集(例如文件)的转换。 仅当操作需要将结果返回给驱动程序时才计算转换。 这种设计使 Spark 能够更高效地运行。 例如,我们可以意识到通过 map 创建的数据集将在 reduce 中使用,并且仅将 reduce 的结果返回给驱动程序,而不是更大的映射数据集。

默认情况下,每个转换后的 RDD 可能会在您每次对其运行操作时重新计算。 但是,您也可以使用持久(或缓存)方法将 RDD 持久化在内存中,在这种情况下,Spark 会将元素保留在集群上,以便下次查询时更快地访问它。 还支持在磁盘上持久化 RDD,或跨多个节点复制。

基础

为了说明 RDD 的基础知识,考虑下面的简单程序:

val lines = sc.textFile("data.txt")
val lineLengths = lines.map(s => s.length)
val totalLength = lineLengths.reduce((a, b) => a + b)

第一行定义了来自外部文件的基本 RDD。 此数据集未加载到内存中或以其他方式执行:行只是指向文件的指针。 第二行将 lineLengths 定义为map转换的结果。 同样,由于懒惰,不会立即计算 lineLengths。 最后,我们运行reduce,这是一个动作。 此时,Spark 将计算分解为在不同机器上运行的任务,每台机器都运行它的映射部分和本地归约,只将其答案返回给驱动程序。

如果我们以后还想再次使用 lineLengths,我们可以添加:

lineLengths.persist()

在 reduce 之前,这将导致 lineLengths 在第一次计算后保存在内存中。

给Spark传入函数

Spark 的 API 在很大程度上依赖于在驱动程序中传递函数来在集群上运行。 有两种推荐的方法来做到这一点:

  • 匿名函数语法,可用于短代码。
  • 全局单例对象中的静态方法。 例如可以定义对象MyFunctions,然后传入MyFunctions.func1,如下:
object MyFunctions {
  def func1(s: String): String = { ... }
}

myRdd.map(MyFunctions.func1)

请注意,虽然也可以在类实例中传递对方法的引用(与单例对象相反),但这需要将包含该类的对象与方法一起发送。 例如,考虑:

class MyClass {
  def func1(s: String): String = { ... }
  def doStuff(rdd: RDD[String]): RDD[String] = { rdd.map(func1) }
}

在这里,如果我们创建一个新的 MyClass 实例并在其上调用 doStuff,则其中的映射引用了该 MyClass 实例的 func1 方法,因此需要将整个对象发送到集群。 它类似于编写 rdd.map(x => this.func1(x))。

以类似的方式,访问外部对象的字段将引用整个对象:

class MyClass {
  val field = "Hello"
  def doStuff(rdd: RDD[String]): RDD[String] = { rdd.map(x => field + x) }
}

相当于写了rdd.map(x => this.field + x),它引用了所有这些。 为避免此问题,最简单的方法是将字段复制到局部变量中,而不是从外部访问它:

def doStuff(rdd: RDD[String]): RDD[String] = {
  val field_ = this.field
  rdd.map(x => field_ + x)
}

理解闭包

关于 Spark 的难点之一是在跨集群执行代码时了解变量和方法的范围和生命周期。 修改其范围之外的变量的 RDD 操作可能是一个常见的混淆源。 在下面的示例中,我们将查看使用 foreach() 来增加计数器的代码,但其他操作也会出现类似的问题。

示例

考虑下面简单的 RDD 元素总和,根据执行是否在同一个 JVM 中发生,它的行为可能会有所不同。 一个常见的例子是在本地模式下运行 Spark (–master = local[n]) 与将 Spark 应用程序部署到集群(例如通过 spark-submit 到 YARN):

var counter = 0
var rdd = sc.parallelize(data)

// Wrong: Don't do this!!
rdd.foreach(x => counter += x)

println("Counter value: " + counter)

本地和集群模式

上述代码的行为未定义,可能无法按预期工作。 为了执行作业,Spark 将 RDD 操作的处理分解为任务,每个任务都由一个 executor 执行。 在执行之前,Spark 会计算任务的闭包。 闭包是那些必须对执行程序可见的变量和方法,以便在 RDD 上执行其计算(在本例中为 foreach())。 这个闭包被序列化并发送给每个执行器。

发送给每个执行程序的闭包中的变量现在是副本,因此,当在 foreach 函数中引用计数器时,它不再是驱动程序节点上的计数器。 驱动程序节点的内存中仍有一个计数器,但执行程序不再可见! 执行者只能看到来自序列化闭包的副本。 因此,counter 的最终值仍然为零,因为对 counter 的所有操作都引用了序列化闭包中的值。

在本地模式下,在某些情况下,foreach 函数实际上将在与驱动程序相同的 JVM 中执行,并将引用相同的原始计数器,并且可能会实际更新它。

为了确保在这些场景中定义明确的行为,应该使用累加器。 Spark 中的累加器专门用于提供一种机制,用于在集群中的工作节点之间拆分执行时安全地更新变量。 本指南的累加器部分更详细地讨论了这些。

一般来说,闭包——像循环或本地定义的方法这样的结构,不应该被用来改变一些全局状态。 Spark 不定义或保证从闭包外部引用的对象的突变行为。 一些这样做的代码可能在本地模式下工作,但这只是偶然,这样的代码在分布式模式下不会像预期的那样运行。 如果需要一些全局聚合,请改用累加器。

打印 RDD 的元素

另一个常见的习惯用法是尝试使用 rdd.foreach(println) 或 rdd.map(println) 打印出 RDD 的元素。 在一台机器上,这将生成预期的输出并打印所有 RDD 的元素。 但是,在集群模式下,执行程序调用的标准输出的输出现在写入执行程序的标准输出,而不是驱动程序上的标准输出,因此驱动程序上的标准输出不会显示这些! 要打印驱动程序上的所有元素,可以使用 collect() 方法首先将 RDD 带到驱动程序节点:rdd.collect().foreach(println)。 但是,这可能会导致驱动程序耗尽内存,因为 collect() 将整个 RDD 获取到单个机器; 如果只需要打印 RDD 的几个元素,更安全的方法是使用 take():rdd.take(100).foreach(println)。

使用键值对

虽然大多数 Spark 操作适用于包含任何类型对象的 RDD,但少数特殊操作仅适用于键值对的 RDD。 最常见的是分布式“shuffle”操作,例如通过键对元素进行分组或聚合。

在 Scala 中,这些操作在包含 Tuple2 对象的 RDD 上自动可用(语言中的内置元组,通过简单地编写 (a, b) 创建)。 PairRDDFunctions 类中提供了键值对操作,该类自动包装元组的 RDD。

例如,以下代码对键值对使用 reduceByKey 操作来计算文件中每行文本出现的次数:

val lines = sc.textFile("data.txt")
val pairs = lines.map(s => (s, 1))
val counts = pairs.reduceByKey((a, b) => a + b)

例如,我们还可以使用 counts.sortByKey() 按字母顺序对进行排序,最后使用 counts.collect() 将它们作为对象数组返回驱动程序。

注意:当使用自定义对象作为键值对操作中的键时,您必须确保自定义的 equals() 方法伴随着匹配的 hashCode() 方法。 有关完整详细信息,请参阅 Object.hashCode() 文档中概述

转换(Transformations)

下表列出了 Spark 支持的一些常见转换。 有关详细信息,请参阅 RDD API 文档(Scala、Java、Python、R)和配对 RDD 函数文档(Scala、Java)。

TransformationMeaning
map(func)Return a new distributed dataset formed by passing each element of the source through a function func.
filter(func)Return a new dataset formed by selecting those elements of the source on which func returns true.
flatMap(func)Similar to map, but each input item can be mapped to 0 or more output items (so func should return a Seq rather than a single item).
mapPartitions(func)Similar to map, but runs separately on each partition (block) of the RDD, so func must be of type Iterator => Iterator when running on an RDD of type T.
mapPartitionsWithIndex(func)Similar to mapPartitions, but also provides func with an integer value representing the index of the partition, so func must be of type (Int, Iterator) => Iterator when running on an RDD of type T.
sample(withReplacement, fraction, seed)Sample a fraction fraction of the data, with or without replacement, using a given random number generator seed.
union(otherDataset)Return a new dataset that contains the union of the elements in the source dataset and the argument.
intersection(otherDataset)Return a new RDD that contains the intersection of elements in the source dataset and the argument.
distinct([numPartitions]))Return a new dataset that contains the distinct elements of the source dataset.
groupByKey([numPartitions])When called on a dataset of (K, V) pairs, returns a dataset of (K, Iterable) pairs.Note: If you are grouping in order to perform an aggregation (such as a sum or average) over each key, using reduceByKey or aggregateByKey will yield much better performance.Note: By default, the level of parallelism in the output depends on the number of partitions of the parent RDD. You can pass an optional numPartitions argument to set a different number of tasks.
reduceByKey(func, [numPartitions])When called on a dataset of (K, V) pairs, returns a dataset of (K, V) pairs where the values for each key are aggregated using the given reduce function func, which must be of type (V,V) => V. Like in groupByKey, the number of reduce tasks is configurable through an optional second argument.
aggregateByKey(zeroValue)(seqOp, combOp, [numPartitions])When called on a dataset of (K, V) pairs, returns a dataset of (K, U) pairs where the values for each key are aggregated using the given combine functions and a neutral “zero” value. Allows an aggregated value type that is different than the input value type, while avoiding unnecessary allocations. Like in groupByKey, the number of reduce tasks is configurable through an optional second argument.
sortByKey([ascending], [numPartitions])When called on a dataset of (K, V) pairs where K implements Ordered, returns a dataset of (K, V) pairs sorted by keys in ascending or descending order, as specified in the boolean ascending argument.
join(otherDataset, [numPartitions])When called on datasets of type (K, V) and (K, W), returns a dataset of (K, (V, W)) pairs with all pairs of elements for each key. Outer joins are supported through leftOuterJoin, rightOuterJoin, and fullOuterJoin.
cogroup(otherDataset, [numPartitions])When called on datasets of type (K, V) and (K, W), returns a dataset of (K, (Iterable, Iterable)) tuples. This operation is also called groupWith.
cartesian(otherDataset)When called on datasets of types T and U, returns a dataset of (T, U) pairs (all pairs of elements).
pipe(command, [envVars])Pipe each partition of the RDD through a shell command, e.g. a Perl or bash script. RDD elements are written to the process’s stdin and lines output to its stdout are returned as an RDD of strings.
coalesce(numPartitions)Decrease the number of partitions in the RDD to numPartitions. Useful for running operations more efficiently after filtering down a large dataset.
repartition(numPartitions)Reshuffle the data in the RDD randomly to create either more or fewer partitions and balance it across them. This always shuffles all data over the network.
repartitionAndSortWithinPartitions(partitioner)Repartition the RDD according to the given partitioner and, within each resulting partition, sort records by their keys. This is more efficient than calling repartition and then sorting within each partition because it can push the sorting down into the shuffle machinery.

行为(Actions)

下表列出了 Spark 支持的一些常见操作。 请参阅 RDD API 文档(Scala、Java、Python、R)

并配对 RDD 函数 doc (Scala, Java) 以获取详细信息。

ActionMeaning
reduce(func)Aggregate the elements of the dataset using a function func (which takes two arguments and returns one). The function should be commutative and associative so that it can be computed correctly in parallel.
collect()Return all the elements of the dataset as an array at the driver program. This is usually useful after a filter or other operation that returns a sufficiently small subset of the data.
count()Return the number of elements in the dataset.
first()Return the first element of the dataset (similar to take(1)).
take(n)Return an array with the first n elements of the dataset.
takeSample(withReplacement, num, [seed])Return an array with a random sample of num elements of the dataset, with or without replacement, optionally pre-specifying a random number generator seed.
takeOrdered(n, [ordering])Return the first n elements of the RDD using either their natural order or a custom comparator.
saveAsTextFile(path)Write the elements of the dataset as a text file (or set of text files) in a given directory in the local filesystem, HDFS or any other Hadoop-supported file system. Spark will call toString on each element to convert it to a line of text in the file.
saveAsSequenceFile(path)(Java and Scala)Write the elements of the dataset as a Hadoop SequenceFile in a given path in the local filesystem, HDFS or any other Hadoop-supported file system. This is available on RDDs of key-value pairs that implement Hadoop’s Writable interface. In Scala, it is also available on types that are implicitly convertible to Writable (Spark includes conversions for basic types like Int, Double, String, etc).
saveAsObjectFile(path)(Java and Scala)Write the elements of the dataset in a simple format using Java serialization, which can then be loaded using SparkContext.objectFile().
countByKey()Only available on RDDs of type (K, V). Returns a hashmap of (K, Int) pairs with the count of each key.
foreach(func)Run a function func on each element of the dataset. This is usually done for side effects such as updating an Accumulator or interacting with external storage systems.Note: modifying variables other than Accumulators outside of the foreach() may result in undefined behavior. See Understanding closures for more details.

Spark RDD API 还公开了一些操作的异步版本,例如 foreachAsync for foreach,它会立即将 FutureAction 返回给调用者,而不是在操作完成时阻塞。 这可用于管理或等待操作的异步执行。

Shuffle操作

Spark 中的某些操作会触发一个称为 shuffle 的事件。 shuffle 是 Spark 用于重新分配数据的机制,以便跨分区以不同方式分组。 这通常涉及跨执行器和机器复制数据,使 shuffle 成为一项复杂且昂贵的操作。

背景

要了解在 shuffle 期间发生了什么,我们可以考虑 reduceByKey 操作的示例。 reduceByKey 操作生成一个新的 RDD,其中单个键的所有值组合成一个元组 – 键以及针对与该键关联的所有值执行 reduce 函数的结果。 挑战在于,并非单个键的所有值都必须驻留在同一分区甚至同一台机器上,但它们必须位于同一位置才能计算结果。

在 Spark 中,数据通常不会跨分区分布在特定操作的必要位置。 在计算过程中,单个任务将在单个分区上操作 – 因此,为了组织单个 reduceByKey 减少任务执行的所有数据,Spark 需要执行 all-to-all 操作。 它必须从所有分区中读取以找到所有键的所有值,然后将跨分区的值汇总以计算每个键的最终结果 – 这称为 shuffle。

尽管新shuffled数据的每个分区中的元素集合是确定性的,分区本身的顺序也是确定性的,但这些元素的顺序不是。 如果人们希望在 shuffle 之后获得可预测的有序数据,那么可以使用:

  • 使用 mapPartitions 对每个分区进行排序,例如 .sorted
  • repartitionAndSortWithinPartitions 在重新分区的同时有效地对分区进行排序
  • sortBy 创建一个全局排序的 RDD

可能导致 shuffle 的操作包括 repartition 操作,如 repartition 和 coalesce,’ByKey 操作(计数除外),如 groupByKey 和 reduceByKey,以及 join 操作,如 cogroup 和 join。

性能影响

Shuffle 是一项昂贵的操作,因为它涉及磁盘 I/O、数据序列化和网络 I/O。 为了组织 shuffle 的数据,Spark 生成了一组任务——映射任务来组织数据,一组 reduce 任务来聚合它。 这个命名法来自 MapReduce,与 Spark 的 map 和 reduce 操作没有直接关系。

在内部,各个map任务的结果会保存在内存中,直到无法容纳为止。 然后,这些根据目标分区排序并写入单个文件。 在reduce方面,任务读取相关的排序块。

在内部,各个地图任务的结果会保存在内存中,直到无法容纳为止。 然后,这些根据目标分区排序并写入单个文件。 在减少方面,任务读取相关的排序块。

某些 shuffle 操作可能会消耗大量堆内存,因为它们使用内存中的数据结构在传输之前或之后组织记录。 具体来说,reduceByKey 和 aggregateByKey 在 map 端创建这些结构,而 ‘ByKey 操作在 reduce 端生成这些结构。 当数据不适合内存时,Spark 会将这些表溢出到磁盘,从而产生额外的磁盘 I/O 开销并增加垃圾收集。

Shuffle 行为可以通过调整各种配置参数来调整。 请参阅 Spark 配置指南中的“随机播放行为”部分。

RDD持久化

Spark 中最重要的功能之一是跨操作将数据集持久化(或缓存)在内存中。 当你持久化一个 RDD 时,每个节点都会将它计算的任何分区存储在内存中,并在对该数据集(或从它派生的数据集)的其他操作中重用它们。 这使得未来的操作更快(通常快 10 倍以上)。 缓存是迭代算法和快速交互使用的关键工具。

你可以使用persist() 或cache() 方法将RDD 标记为持久化。 第一次在动作中计算时,它将保存在节点的内存中。 Spark 的缓存是容错的——如果 RDD 的任何分区丢失,它将使用最初创建它的转换自动重新计算。

此外,每个持久化的 RDD 都可以使用不同的存储级别进行存储,例如,允许您将数据集持久化到磁盘上,将其持久化在内存中,但作为序列化的 Java 对象(以节省空间),跨节点复制它。 这些级别是通过将 StorageLevel 对象(Scala、Java、Python)传递给 persist() 来设置的。 cache() 方法是使用默认存储级别的简写,即 StorageLevel.MEMORY_ONLY(将反序列化的对象存储在内存中)。 完整的存储级别是:

Storage LevelMeaning
MEMORY_ONLYStore RDD as deserialized Java objects in the JVM. If the RDD does not fit in memory, some partitions will not be cached and will be recomputed on the fly each time they’re needed. This is the default level.
MEMORY_AND_DISKStore RDD as deserialized Java objects in the JVM. If the RDD does not fit in memory, store the partitions that don’t fit on disk, and read them from there when they’re needed.
MEMORY_ONLY_SER
(Java and Scala)
Store RDD as serialized Java objects (one byte array per partition). This is generally more space-efficient than deserialized objects, especially when using a fast serializer, but more CPU-intensive to read.
MEMORY_AND_DISK_SER
(Java and Scala)
Similar to MEMORY_ONLY_SER, but spill partitions that don’t fit in memory to disk instead of recomputing them on the fly each time they’re needed.
DISK_ONLYStore the RDD partitions only on disk.
MEMORY_ONLY_2, MEMORY_AND_DISK_2, etc.Same as the levels above, but replicate each partition on two cluster nodes.
OFF_HEAP (experimental)Similar to MEMORY_ONLY_SER, but store the data in off-heap memory. This requires off-heap memory to be enabled.

注意:在 Python 中,存储的对象将始终使用 Pickle 库进行序列化,因此您是否选择序列化级别并不重要。 Python 中可用的存储级别包括 MEMORY_ONLY、MEMORY_ONLY_2、MEMORY_AND_DISK、MEMORY_AND_DISK_2、DISK_ONLY、DISK_ONLY_2 和 DISK_ONLY_3。

Spark 还会在 shuffle 操作中自动持久化一些中间数据(例如 reduceByKey),即使没有用户调用persist。 这样做是为了避免在 shuffle 期间节点发生故障时重新计算整个输入。 如果他们打算重用它,我们仍然建议用户在生成的 RDD 上调用persist。

选择什么样的存储级别

Spark 的存储级别旨在在内存使用和 CPU 效率之间提供不同的权衡。 我们建议通过以下过程来选择一个:

  • 如果您的 RDD 很适合默认存储级别 (MEMORY_ONLY),请保持这种状态。 这是 CPU 效率最高的选项,允许 RDD 上的操作尽可能快地运行。
  • 如果没有,请尝试使用 MEMORY_ONLY_SER 并选择快速序列化库以使对象更节省空间,但访问速度仍然相当快。 (Java 和 Scala)
  • 除非计算数据集的函数很昂贵,或者它们过滤了大量数据,否则不要溢出到磁盘。 否则,重新计算分区可能与从磁盘读取分区速度一样。
  • 如果您想要快速故障恢复(例如,如果使用 Spark 为来自 Web 应用程序的请求提供服务),请使用复制的存储级别。 所有存储级别都通过重新计算丢失的数据来提供完全的容错能力,但是复制的存储级别允许您继续在 RDD 上运行任务,而无需等待重新计算丢失的分区。

删除数据

Spark 自动监控每个节点上的缓存使用情况,并以最近最少使用 (LRU) 的方式丢弃旧数据分区。 如果您想手动删除 RDD 而不是等待它从缓存中掉出来,请使用 RDD.unpersist() 方法。 请注意,此方法默认情况下不会阻止。 要阻塞直到资源被释放,调用此方法时指定blocking=true。

共享变量

通常,当传递给 Spark 操作(例如 map 或 reduce)的函数在远程集群节点上执行时,它会处理函数中使用的所有变量的单独副本。 这些变量被复制到每台机器上,并且对远程机器上的变量的更新不会传播回驱动程序。 支持跨任务的通用读写共享变量效率低下。 然而,Spark 确实为两种常见的使用模式提供了两种有限类型的共享变量:广播变量和累加器。

广播变量

广播变量允许程序员在每台机器上缓存一个只读变量,而不是随任务一起发送它的副本。 例如,它们可用于以有效的方式为每个节点提供大型输入数据集的副本。 Spark 还尝试使用高效的广播算法来分发广播变量,以降低通信成本。

Spark 动作通过一组阶段执行,由分布式“shuffle”操作分隔。 Spark 自动广播每个阶段内任务所需的公共数据。 以这种方式广播的数据以序列化形式缓存,并在运行每个任务之前进行反序列化。 这意味着显式创建广播变量仅在跨多个阶段的任务需要相同数据或以反序列化形式缓存数据很重要时才有用。

通过调用 SparkContext.broadcast(v) 从变量 v 创建广播变量。 广播变量是 v 的一个包装器,它的值可以通过调用 value 方法来访问。 下面的代码显示了这一点:

scala> val broadcastVar = sc.broadcast(Array(1, 2, 3))
broadcastVar: org.apache.spark.broadcast.Broadcast[Array[Int]] = Broadcast(0)

scala> broadcastVar.value
res0: Array[Int] = Array(1, 2, 3)

创建广播变量后,应在集群上运行的任何函数中使用它而不是值 v,以便 v 不会多次传送到节点。 此外,对象 v 在广播后不应被修改,以确保所有节点都获得相同的广播变量值(例如,如果变量稍后被运送到新节点)。

要释放广播变量复制到执行程序的资源,请调用 .unpersist()。 如果之后再次使用广播,则会重新广播。 要永久释放广播变量使用的所有资源,请调用 .destroy()。 之后不能使用广播变量。 请注意,这些方法默认情况下不会阻塞。 要阻塞直到资源被释放,调用它们时指定blocking=true。

累加器

累加器是仅通过关联和交换操作“添加”到的变量,因此可以有效地并行支持。 它们可用于实现计数器(如在 MapReduce 中)或求和。 Spark 原生支持数值类型的累加器,程序员可以添加对新类型的支持。

作为用户,您可以创建命名或未命名的累加器。 如下图所示,一个命名的累加器(在此实例中为计数器)将显示在修改该累加器的阶段的 Web UI 中。 Spark 在“Tasks”表中显示由任务修改的每个累加器的值。

image.png

在 UI 中跟踪累加器有助于了解运行阶段的进度(注意:Python 尚不支持此功能)。

可以通过调用 SparkContext.longAccumulator() 或 SparkContext.doubleAccumulator() 来创建数值累加器,分别累加 Long 或 Double 类型的值。 然后可以使用 add 方法将在集群上运行的任务添加到其中。 但是,他们无法读取其值。 只有驱动程序可以使用其 value 方法读取累加器的值。

下面的代码显示了一个累加器,用于将数组的元素相加:

scala> val accum = sc.longAccumulator("My Accumulator")
accum: org.apache.spark.util.LongAccumulator = LongAccumulator(id: 0, name: Some(My Accumulator), value: 0)

scala> sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum.add(x))
...
10/09/29 18:41:08 INFO SparkContext: Tasks finished in 0.317106 s

scala> accum.value
res2: Long = 10

虽然这段代码使用了对 Long 类型累加器的内置支持,但程序员也可以通过继承 AccumulatorV2 来创建自己的类型。 AccumulatorV2 抽象类有几个必须重写的方法:reset 用于将累加器重置为零,add 用于将另一个值添加到累加器中,merge 用于将另一个相同类型的累加器合并到这个累加器中。 其他必须重写的方法包含在 API 文档中。 例如,假设我们有一个代表数学向量的 MyVector 类,我们可以这样写:

class VectorAccumulatorV2 extends AccumulatorV2[MyVector, MyVector] {

  private val myVector: MyVector = MyVector.createZeroVector

  def reset(): Unit = {
    myVector.reset()
  }

  def add(v: MyVector): Unit = {
    myVector.add(v)
  }
  ...
}

// Then, create an Accumulator of this type:
val myVectorAcc = new VectorAccumulatorV2
// Then, register it into spark context:
sc.register(myVectorAcc, "MyVectorAcc1")

请注意,当程序员定义自己的 AccumulatorV2 类型时,生成的类型可能与添加的元素的类型不同。

对于仅在操作内部执行的累加器更新,Spark 保证每个任务对累加器的更新只会应用一次,即重新启动的任务不会更新值。 在转换中,用户应注意,如果重新执行任务或作业阶段,每个任务的更新可能会应用多次。

累加器不会改变 Spark 的惰性求值模型。 如果它们在对 RDD 的操作中被更新,则它们的值仅在该 RDD 被计算为操作的一部分时才会更新。 因此,当在 map() 等惰性转换中进行累加器更新时,不能保证执行累加器更新。 下面的代码片段演示了这个属性:

val accum = sc.longAccumulator
data.map { x => accum.add(x); x }
// Here, accum is still 0 because no actions have caused the map operation to be computed.

将应用提交到集群

应用程序提交指南描述了如何将应用程序提交到集群。 简而言之,一旦您将应用程序打包成 JAR(对于 Java/Scala)或一组 .py 或 .zip 文件(对于 Python),bin/spark-submit 脚本可以让您将其提交到任何受支持的集群管理器。

从Java或Scala启动Spark任务

org.apache.spark.launcher 包提供了使用简单 Java API 将 Spark 作业作为子进程启动的类。

单元测试

Spark 对任何流行的单元测试框架的单元测试都很友好。 只需在您的测试中创建一个 SparkContext 并将主 URL 设置为本地,运行您的操作,然后调用 SparkContext.stop() 将其拆除。 确保在 finally 块或测试框架的 tearDown 方法中停止上下文,因为 Spark 不支持在同一程序中同时运行两个上下文。

5 1 投票
文章评分

本文转载自spark RDD,原文链接:https://spark.apache.org/docs/latest/rdd-programming-guide.html。

(0)
上一篇 2022-04-15 14:59
下一篇 2022-04-17 02:51

相关推荐

订阅评论
提醒
guest

0 评论
内联反馈
查看所有评论
0
希望看到您的想法,请您发表评论x