Spark 教程

Spark SQL

Spark 笔记

Spark MLlib

original icon
版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://www.knowledgedict.com/tutorial/spark-rdd.html

Spark RDD


Spark 编程模型是弹性分布式数据集(Resilient Distributed Dataset,RDD),它是 MapReduce 模型的扩展和延伸,但它解决了 MapReduce 的缺陷,在并行计算阶段高效地进行数据共享。

运用高效的数据共享概念和类似于 MapReduce 的操作方式,使得并行计算能够高效地进行,并可以在特定的系统中得到关键的优化。

RDD 简介

相比以前集群容错处理模型,如 MapReduce、Dryad,它们将计算转换为一个有向无环图(DAG)的任务集合。这使在这些模型中能够有效地恢复 DAG 中故障和慢节点执行的任务,但是这些模型中除了文件系统外没有提供其他的存储方式,这就导致了在网络上进行频繁的数据复制而造成 IO 压力。由于 RDD 提供一种基于粗粒度变换(如 map、filter 和 join 等)的接口,该接口会将相同的操作应用到多个数据集上,这就使得它们可以记录创建数据集的“血统”(Lineage),而不需要存储真正的数据,从而达到高效的容错性。当某个 RDD 分区丢失的时候,RDD 记录有足够的信息来重新计算,而且只需要计算该分区,这样丢失的数据可以很快地恢复,不需要昂贵的复制代价。

基于 RDD 机制实现了多类模型计算,包括多个现有的集群编程模式。在这些模型中,RDD 不仅在性能方面达到了之前的系统水平,也带来了现有系统所缺少的新特性,如容错性、慢节点执行和弹性资源分配等。这些模型包括以下几方面的内容。

  1. 迭代计算:目前最常见的工作方式,比如应用于图处理、数值优化以及机器学习中的算法。RDD 可以支持各类型模型,包括 Pregel、MapReduce、GraphLab 和 PowerGraph 模型。
  2. 交互式 SQL 查询:在 MapReduce 集群中大部分需求是执行 SQL 查询,而 MapReduce 相对并行数据库在交互式查询有很大的不足。而 Spark 的 RDD 不仅拥有很多常见数据库引擎的特性,达到可观的性能,而且在 Spark SQL 中提供完善的容错机制,能够在短查询和长查询中很好地处理故障和慢节点。
  3. MapReduceRDD:通过提供 MapReduce 的超集,能够高效地执行 MapReduce 程序,同样也可以如 DryadLINQ 这样常见的 DAG 数据流的应用。
  4. 流式数据处理:流式数据处理已经在数据库和系统领域进行了很长时间的研究,但是大规模流式数据处理仍是一项挑战。当前的模型没有解决在大规模集群中频繁出现的慢节点的问题,同时对故障解决办法有限,需要大量的复制或浪费很长的恢复时间。为了恢复一个丢失的节点,当前的系统需要保存每一个操作的两个副本,或通过一系列耗费大量开销的串行处理对故障点之前的数据进行重新处理。

    Spark 提出了离散数据流(D-Streams)来解决这样的问题,D-Streams 把流式计算的执行当作一系列短而确定的批量计算的序列,并将状态保存在 RDD 中。D-Streams 根据相关 RDD 的依赖关系图进行并行化恢复,可以达到快速故障恢复,避免了数据复制。另外通过推测执行来支持对 Straggler 迁移执行,例如,对慢任务运行经过推测的备份副本。尽管 D-Streams 将计算转换为许多不相关联的作业来运行而增加延迟,但这种延迟在 D-Streams 集群处理中只耗费次秒级时间。

    RDD 还能够支持一些现有系统不能表示的新应用。例如,许多数据流应用程序还需要加入历史数据的信息;通过使用 RDD 可以在同一程序中同时使用批处理和流式处理,这样来实现所有模型中数据共享和容错恢复,同样地,流式应用的操作者常常需要在数据流的状态上执行及时查询。一般来说,每一个批处理应用常常需要整合多个处理类型,比如,一个应用可能需要使用 SQL 提取数据,在数据集上训练一个机器学习模型,然后对这个模型进行查询。由于计算的大部分时间花在系统之间共享数据的分布式文件系统 IO 的开销上,因此使用当前多个系统组合而成的工作流效率非常低下。而使用一个基于 RDD 机制的系统,这些计算可以在同一个引擎中紧接着执行,不需要额外的 IO 操作,处理效率大大提高。

RDD 类型

Spark 编程中开发者需要编写一个驱动程序(Driver Program)来连接到工作进程(Worker)。驱动程序定义一个或多个 RDD 以及相关行动操作,驱动程序同时记录 RDD 的继承关系,即“血统”。而工作进程(Worker)是一直运行的进程,它将经过一系列操作后的 RDD 分区数据保存在内存中。

spark 架构图

Spark 中的操作大致可以分为四类操作,分别为创建操作、转换操作、控制操作和行为操作。

  1. 创建操作(Creation Operation):用于 RDD 创建工作。RDD 创建只有两种方法,一种是来自于内存集合和外部存储系统,另一种是通过转换操作生成的 RDD。
  2. 转换操作(Transformation Operation):将 RDD 通过一定的操作变换成新的 RDD,比如 HadoopRDD 可以使用 map 操作变换为 MappedRDD,RDD 的转换操作是惰性操作,它只是定义了一个新的 RDDs,并没有立即执行。
  3. 控制操作(Control Operation):进行 RDD 持久化,可以让 RDD 按不同的存储策略保存在磁盘或者内存中,比如 cache 接口默认将 RDD 缓存在内存中。
  4. 行动操作(Action Operation):能够触发 Spark 运行的操作,例如,对 RDD 进行 collect 就是行动操作。Spark 中行动操作分为两类,一类的操作结果变成 Scala 集合或者变量,另一类将 RDD 保存到外部文件系统或者数据库中。

RDD 的实现

作业调度

当对 RDD 执行转换操作时,调度器会根据 RDD 的“血统”来构建由若干调度阶段(Stage)组成的有向无环图(DAG),每个调度阶段包含尽可能多的连续窄依赖转换。调度器按照有向无环图顺序进行计算,并最终得到目标 RDD。

调度器向各节点分配任务采用延时调度机制并根据数据存储位置(数据本地性)来确定。若一个任务需要处理的某个分区刚好存储在某个节点的内存中,则该任务会分配给该节点;如果在内存中不包含该分区,调度器会找到包含该 RDD 的较佳位置,并把任务分配给所在节点。

对应宽依赖的操作,在 Spark 将中间结果物化到父分区的节点上,这和 MapReduce 物化 map 的输出类似,可以简化数据的故障恢复过程。如图下图所示,实线圆角方框标识的是 RDD。阴影背景的矩形是分区,若已存于内存中,则用黑色背景标识。RDD 上一个行动操作的执行将会以宽依赖为分区来构建各个调度阶段,对各调度阶段内部的窄依赖则前后连接构成流水线。在本例中,Stage 1 的输出已经存在内存中,所以直接执行 Stage 2,然后执行 Stage 3。

spark 作业调度

对于执行失败的任务,只要它对应调度阶段父类信息仍然可用,该任务会分散到其他节点重新执行。如果某些调度阶段不可用(例如,因为 Shuffle 在 map 节点输出丢失了),则重新提交相应的任务,并以并行方式计算丢失的分区。在作业中如果某个任务执行缓慢(即 Straggler),系统则会在其他节点上执行该任务的副本。该方法与 MapReduce 推测执行做法类似,并取最先得到的结果作为最终的结果。

解析器集成

与 Ruby 和 Python 类似,Scala 提供了一个交互式 Shell(解析器),借助内存数据带来的低延迟特性,可以让用户通过解析器对大数据进行交互式查询。

Spark 解析器将用户输入的多行命令解析为相应 Java 对象的。

Scala 解析器处理过程一般为:

  1. 将用户输入的每一行编译成一个类;
  2. 将该类载入到 JVM 中;
  3. 调用该类的某个函数。在该类中包含一个单利对象,对象中包含当前行的变量或函数,在初始化方法中包含处理该行的代码。

Spark 中做了以下两个改变。

  1. 类传输:为了让工作节点能够从各行生成的类中获取到字节码,通过 HTTP 传输。
  2. 代码生成器的改动:通常各种代码生成的单例对象是由类的静态方法来提供的。也就是说,当序列化一个引用上一行定义变量的闭包。

内存管理

Spark 提供了 3 种持久化 RDD 的存储策略:

  1. 未序列化 Java 对象存在内存中。
  2. 序列化的数据存于内存中。
  3. 存储在磁盘中。

第一个选项的性能是最优的,因为可以直接访问在 Java 虚拟机内存里的 RDD 对象,在空间有限的情况下,第二种方式可以让用户采用比 Java 对象更有效的内存组织方式,但代价是降低了性能,第三种策略使用于 RDD 太大的情形,每次重新计算该 RDD 会带来额外的资源开销(如 IO 等)。

对于内存使用 LRU 回收算法来进行管理,当计算得到一个新的 RDD 分区,但没有足够空间来存储时,系统会从最近最少使用的 RDD 回收其一个分区的空间。除非该 RDD 是新分区对应的 RDD,这种情况下 Spark 会将旧的分区继续保留在内存中,防止同一个 RDD 的分区循环调入/调出。这点很关键,因为大部分的操作会在一个 RDD 的所有分区上进行,那么很有可能已经存在内存中的分区将再次被使用。

检查点支持

虽然“血统”可以用于错误后 RDD 的恢复,但是对于很长的“血统”的 RDD 来说,这样的恢复耗时比较长,因此需要通过检查点操作(Checkpoint)保存到外部存储中。

通常情况下,对于包含宽依赖的长“血统”的 RDD 设置检查点操作是非常有用的。在这种情况下,集群中某个节点出现故障时,会使得从各个父 RDD 计算出的数据丢失,造成需要重新计算。相反,对于那些窄依赖的 RDD,对其进行检查点操作就不是有必须。在这种情况下,如果一个节点发生故障,RDD 在该节点中丢失的分区数据可以通过并行的方式从其他节点中重新计算出来,计算成本只是复制 RDD 的很小部分。

Spark 提供为 RDD 设置检查点操作的 API,可以让用户自行决定需要为那些数据设置检查点操作。另外由于 RDD 的只读特性,使得不需要关心数据一致性问题,比常用的共享内存更容易做检查点。

多用户管理

RDD 模型将计算分解为多个相互独立的细粒度任务,这使得它在多用户集群能够支持多种资源共享算法。特别地,每个 RDD 应用可以在执行过程中动态调整访问资源。

  • 在每个应用程序中,Spark 运行多线程同时提交作业,并通过一种等级公平调度器来实现多个作业对集群资源的共享,这种调度器和 Hadoop Fair Scheduler 类似。该算法主要用于创建基于针对相同内存数据的多用户应用,例如:Spark SQL 引擎有一个服务模式支持多用户并行查询。公平调度算法确保短的作业能够在即使长作业占满集群资源的情况下尽早完成。
  • Spark 的公平调度也使用延迟调度,通过轮询每台机器的数据,在保持公平的情况下给予作业高的本地性。Spark 支持多级本地化访问策略(本地化),包括内存、磁盘和机架。
  • 由于任务相互独立,调度器还支持取消作业来为高优先级的作业腾出资源。
  • Spark 中可以使用 Mesos 来实现细粒度的资源共享,这使得 Spark 应用能相互之间或在不同的计算框架之间实现资源的动态共享。
  • Spark 使用 Sparrow 系统扩展支持分布式调度,该调度允许多个 Spark 应用以去中心化的方式在同一集群上排队工作,同时提供数据本地性、低延迟和公平性。