当前位置: 首页 > news >正文

Spark八股

Spark八股

  • Spark
    • Spark1_Spark core
        • spark了解吗?核心组件是什么?
        • rdd是什么?rdd的特点
        • 1) 为什么要有 RDD?
        • 2) RDD 是什么?
        • RDD算子
          • 1.Transformation转换操作:返回一个新的 RDD
          • 2.Action动作操作:返回值不是 RDD(无返回值或返回其他的)
        • RDD 的持久化/缓存
        • RDD 容错机制 Checkpoint
        • 宽依赖和窄依赖
        • 为什么要设计宽窄依赖
        • DAG 的生成和划分 Stage
        • 为什么要划分 Stage? --并行计算
        • RDD 累加器和广播变量
      • RDD 累加器和广播变量
      • 1. **累加器(Accumulators)**
        • 核心概念:
        • 特点:
        • 使用场景:
        • 示例:
        • 记忆技巧:
      • 2. **广播变量(Broadcast Variables)**
        • 核心概念:
        • 特点:
        • 使用场景:
        • 示例:
        • 记忆技巧:
      • 对比总结
      • 记忆口诀
        • 1) 累加器
        • 2) 广播变量
    • Spark2_Spark SQL
        • Hive 和 SparkSQL
        • Spark SQL 数据抽象
          • 1) DataFrame
          • 2) DataSet
          • 3) RDD、DataFrame、DataSet 的区别
      • RDD、DataFrame、DataSet 的区别与关系
      • 1. **RDD(Resilient Distributed Dataset)**
        • 核心概念:
        • 特点:
        • 使用场景:
        • 示例:
        • 记忆技巧:
      • 2. **DataFrame**
        • 核心概念:
        • 特点:
        • 使用场景:
        • 示例:
        • 记忆技巧:
      • 3. **DataSet**
        • 核心概念:
        • 特点:
        • 使用场景:
        • 示例:
        • Spark SQL 应用
          • 1) 创建 DataFrame/DataSet
          • 2) 两种查询风格:DSL 和 SQL
          • 3) Spark SQL 完成 WordCount
          • 4) Spark SQL 多数据源交互
    • Spark3_Spark Streaming
        • 1. 整体流程
        • 2. 数据抽象
        • 3. DStream 相关操作
        • 4. Spark Streaming 完成实时需求
          • 1) WordCount
          • 2) updateStateByKey
          • 3) reduceByKeyAndWindow
      • Structured Streaming
      • Spark 的 Structured Streaming 和 Streaming 的区别
      • 1. **Streaming(DStream)**
        • 核心概念:
        • 特点:
        • 使用场景:
        • 示例:
        • 记忆技巧:
      • 2. **Structured Streaming**
        • 核心概念:
        • 特点:
        • 使用场景:
        • 示例:
        • 记忆技巧:
      • 对比总结
      • 记忆口诀
        • 1. API
        • 2. 核心思想
        • 3. 应用场景
        • 4. Structured Streaming 实战
    • Spark4_Spark的两种核心 Shuffle
        • Spark Shuffle
      • Spark Shuffle 的两种实现方式
        • 1. **基于 Hash 的 Shuffle**
        • 核心思想:
        • 实现过程:
        • 特点:
        • 使用场景:
        • 记忆技巧:
        • 2. **基于 Sort 的 Shuffle**
        • 核心思想:
        • 实现过程:
        • 特点:
        • 使用场景:
        • 记忆技巧:
        • 对比总结
        • 记忆口诀
        • 为什么 Spark 最终还是放弃了 HashShuffle ,使用了 Sorted-Based Shuffle?
        • 一、Hash Shuffle 解析
        • Hash Shuffle 解析
        • 1. **未优化的 Hash Shuffle**
          • 核心思想:
          • 实现过程:
          • 问题:
          • 记忆技巧:
        • 2. **优化后的 Hash Shuffle**
          • 核心思想:
          • 实现过程:
          • 优化效果:
          • 记忆技巧:
          • Hash Shuffle 的优缺点
          • 优点:
          • 缺点:
          • 对比总结
          • 记忆口诀
        • 二、SortShuffle 解析
          • 1. **普通运行机制**
            • 核心思想:
            • 实现过程:
            • 优点:
            • 记忆技巧:
          • 2. **bypass 运行机制**
            • 核心思想:
            • 触发条件:
            • 实现过程:
            • 优点:
            • 记忆技巧:
          • 3. **Tungsten Sort 运行机制**
            • 核心思想:
            • 触发条件:
            • 实现过程:
            • 优点:
            • 记忆技巧:
          • 对比总结
          • 记忆口诀
    • Spark5_Spark 底层执行原理 DAG+任务执行流程
        • Spark运行流程 !!!
        • 1. 从代码角度看 DAG 图的构建
        • 2. 将 DAG 划分为 Stage 核心算法
        • 3. 将 DAG 划分为 Stage 剖析
        • spark执行流程中 各个 单位的含义
        • spark中哪些算子是宽依赖+为什么
        • 4. 提交 Stages
        • 5. 监控 Job、Task、Executor
        • 6. 获取任务执行结果
        • 7. 任务调度总体诠释
        • Spark 运行架构特点
    • Spark 6_Spark 数据倾斜 !!!
        • 1. 预聚合原始数据
          • 1.1. 避免shuffle过程
          • 1.2. 增大key粒度(考虑扩大key的聚合粒度(减少维度),减小数据倾斜可能性,增大每个task的数据量)
        • 2. 预处理导致倾斜的key (groupByKey、reduceByKey这类算子造成的数据倾斜)
          • 2.1. 过滤
          • 2.2. 使用随机key
          • 2.3. sample采样对倾斜key单独进行join (一个key)
        • 3. 提高reduce并行度
          • 3.1. reduce端并行度的设置
          • 3.2. reduce端并行度设置存在的缺陷
        • 4. 使用map join
          • 4. 1. 核心思路:
          • 4. 2. 不适用场景分析:
    • spark7_Spark性能优化_RDD算子调优 !!!
        • 1. RDD复用
        • 2. 尽早filter
        • 3. 读取大量小文件-用wholeTextFiles
        • 4. mapPartition和foreachPartition
        • 5. filter+coalesce/repartition(减少分区)
        • 6. 并行度设置
        • 7. repartition/coalesce调节并行度
        • 8. reduceByKey本地预聚合
        • 9. 使用持久化+checkpoint
        • 10. 使用广播变量
        • 11. 使用Kryo序列化
    • Spqrk8_Spark性能优化_Shuffle调优 !!!
        • 1. map和reduce端缓冲区大小
        • 2. reduce端重试次数和等待时间间隔
        • 3. bypass机制开启阈值
    • Spark9_Spark故障排除 !!!
        • 1. 避免OOM-out of memory
        • 2. 避免GC导致的shuffle文件拉取失败
        • 3. YARN-CLIENT模式导致的网卡流量激增问题
        • 4. YARN-CLUSTER模式的JVM栈内存溢出无法执行问题
        • 5. 避免SparkSQL JVM栈内存溢出
    • Spark10_Spark大厂面试真题 !!!
        • 1. 通常来说,Spark与MapReduce相比,Spark运行效率更高。请说明效率更高来源于Spark内置的哪些机制?
        • 2. hadoop和spark使用场景?
        • 3. spark如何保证宕机迅速恢复?
        • 4. hadoop和spark的相同点和不同点?
        • 5. RDD持久化原理?
        • 6. checkpoint检查点机制?
        • 7. checkpoint和持久化机制的区别?
        • 8. RDD机制理解吗?
        • 9. Spark streaming以及基本工作原理?
        • 10. DStream以及基本工作原理?
        • 11. spark有哪些组件?
        • 12. spark工作机制?
        • 13. 说下宽依赖和窄依赖
        • 14. Spark主备切换机制原理知道吗?
        • 15. spark解决了hadoop的哪些问题?
        • 16. 数据倾斜的产生和解决办法?
        • 17. 你用sparksql处理的时候, 处理过程中用的dataframe还是直接写的sql?为什么?
        • 18. RDD中reduceBykey与groupByKey哪个性能好,为什么
        • 19. Spark master HA主从切换过程不会影响到集群已有作业的运行,为什么
        • 20. spark master使用zookeeper进行ha,有哪些源数据保存到Zookeeper里面

Spark

Spark1_Spark core

spark了解吗?核心组件是什么?

sprak Core
spark ML
spark 图计算
spark SQL
spark streaming

rdd是什么?rdd的特点
1) 为什么要有 RDD?

在许多迭代式算法(比如机器学习、图算法等)和交互式数据挖掘中,不同计算阶段之间会重用中间结果,即一个阶段的输出结果会作为下一个阶段的输入。
但是,之前的 MapReduce 框架采用非循环式的数据流模型,把中间结果写入到 HDFS 中,带来了大量的数据复制、磁盘 IO 和序列化开销。且这些框架只能支持一些特定的计算模式(map/reduce),并没有提供一种通用的数据抽象。
RDD 提供了一个抽象的数据模型,让我们不必担心底层数据的分布式特性,只需将具体的应用逻辑表达为一系列转换操作(函数),不同 RDD 之间的转换操作之间还可以形成依赖关系,进而实现管道化,从而避免了中间结果的存储,大大降低了数据复制、磁盘 IO 和序列化开销,并且还提供了更多的 API(map/reduec/filter/groupBy…)。

2) RDD 是什么?

RDD(Resilient Distributed Dataset)叫做弹性分布式数据集,是 Spark 中最基本的数据抽象,代表一个不可变、可分区、里面的元素可并行计算的集合。 单词拆解:
-Resilient :它是弹性的,RDD 里面的中的数据可以保存在内存中或者磁盘里面;
-Distributed :它里面的元素是分布式存储的,可以用于分布式计算;
-Dataset: 它是一个集合,可以存放很多元素。

RDD 是一个数据集的表示,不仅表示了数据集,还表示了这个数据集从哪来,如何计算,主要属性包括:
1.分区列表
2.计算函数
3.依赖关系
4.分区函数(默认是 hash)
5.最佳位置
分区列表、分区函数、最佳位置,这三个属性其实说的就是数据集在哪,在哪计算更合适,如何分区;
计算函数、依赖关系,这两个属性其实说的是数据集怎么来的。

RDD算子
1.Transformation转换操作:返回一个新的 RDD

在这里插入图片描述
在这里插入图片描述

2.Action动作操作:返回值不是 RDD(无返回值或返回其他的)

在这里插入图片描述
统计操作:
在这里插入图片描述

RDD 的持久化/缓存

在实际开发中某些 RDD 的计算或转换可能会比较耗费时间,如果这些 RDD 后续还会频繁的被使用到,那么可以将这些 RDD 进行持久化/缓存,这样下次再使用到的时候就不用再重新计算了,提高了程序运行的效率。
持久化/缓存 API 详解
-ersist 方法和 cache 方法
RDD 通过 persist 或 cache 方法可以将前面的计算结果缓存,但是并不是这两个方法被调用时立即缓存,而是触发后面的 action 时,该 RDD 将会被缓存在计算节点的内存中,并供后面重用。
通过查看 RDD 的源码发现 cache 最终也是调用了 persist 无参方法(默认存储只存在内存中):

  • 存储级别
    默认的存储级别都是仅在内存存储一份,Spark 的存储级别还有好多种,存储级别在 object StorageLevel 中定义的。
    在这里插入图片描述
    总结:
    1.RDD 持久化/缓存的目的是为了提高后续操作的速度
    2.缓存的级别有很多,默认只存在内存中,开发中使用 memory_and_disk
    3.只有执行 action 操作的时候才会真正将 RDD 数据进行持久化/缓存
    4.实际开发中如果某一个 RDD 后续会被频繁的使用,可以将该 RDD 进行持久化/缓存
RDD 容错机制 Checkpoint

-持久化的局限:
持久化/缓存可以把数据放在内存中,虽然是快速的,但是也是最不可靠的;也可以把数据放在磁盘上,也不是完全可靠的!例如磁盘会损坏等。
-问题解决:
Checkpoint 的产生就是为了更加可靠的数据持久化,在 Checkpoint 的时候一般把数据放在在 HDFS 上,这就天然的借助了 HDFS 天生的高容错、高可靠来实现数据最大程度上的安全,实现了 RDD 的容错和高可用。
用法:

SparkContext.setCheckpointDir("目录") //HDFS的目录
RDD.checkpoint

-总结:
-开发中如何保证数据的安全性性及读取效率: 可以对频繁使用且重要的数据,先做缓存/持久化,再做 checkpint 操作。
-持久化和 Checkpoint 的区别:
1.位置: Persist 和 Cache 只能保存在本地的磁盘和内存中(或者堆外内存–实验中) Checkpoint 可以保存数据到 HDFS 这类可靠的存储上。
2.生命周期: Cache 和 Persist 的 RDD 会在程序结束后会被清除或者手动调用 unpersist 方法 Checkpoint 的 RDD 在程序结束后依然存在,不会被删除。

宽依赖和窄依赖

在这里插入图片描述
宽窄依赖
-如何区分宽窄依赖:
窄依赖:父 RDD 的一个分区只会被子 RDD 的一个分区依赖;
宽依赖:父 RDD 的一个分区会被子 RDD 的多个分区依赖(涉及到 shuffle)。

为什么要设计宽窄依赖

1.对于窄依赖:
窄依赖的多个分区可以并行计算;
窄依赖的一个分区的数据如果丢失只需要重新计算对应的分区的数据就可以了。
2.对于宽依赖:
划分 Stage(阶段)的依据:对于宽依赖,必须等到上一阶段计算完成才能计算下一阶段。

16.介绍自己做的比较好的项目
自己的项目,long long time

DAG 的生成和划分 Stage
  1. DAG 介绍
    -DAG 是什么:
    DAG(Directed Acyclic Graph 有向无环图)指的是数据转换执行的过程,有方向,无闭环(其实就是 RDD 执行的流程);
    原始的 RDD 通过一系列的转换操作就形成了 DAG 有向无环图,任务执行时,可以按照 DAG 的描述,执行真正的计算(数据被操作的一个过程)。
    -DAG 的边界
    开始:通过 SparkContext 创建的 RDD;
    结束:触发 Action,一旦触发 Action 就形成了一个完整的 DAG。
  2. DAG 划分 Stage
    在这里插入图片描述
    DAG划分Stage
    一个 Spark 程序可以有多个 DAG(有几个 Action,就有几个 DAG,上图最后只有一个 Action(图中未表现),那么就是一个 DAG)。
    一个 DAG 可以有多个 Stage(根据宽依赖/shuffle 进行划分)。
    同一个 Stage 可以有多个 Task 并行执行(task 数=分区数,如上图,Stage1 中有三个分区 P1、P2、P3,对应的也有三个 Task)。
    可以看到这个 DAG 中只 reduceByKey 操作是一个宽依赖,Spark 内核会以此为边界将其前后划分成不同的 Stage。
    同时我们可以注意到,在图中 Stage1 中,从 textFile 到 flatMap 到 map 都是窄依赖,这几步操作可以形成一个流水线操作,通过 flatMap 操作生成的 partition 可以不用等待整个 RDD 计算结束,而是继续进行 map 操作,这样大大提高了计算的效率。
为什么要划分 Stage? --并行计算

一个复杂的业务逻辑如果有 shuffle,那么就意味着前面阶段产生结果后,才能执行下一个阶段,即下一个阶段的计算要依赖上一个阶段的数据。那么我们按照 shuffle 进行划分(也就是按照宽依赖就行划分),就可以将一个 DAG 划分成多个 Stage/阶段,在同一个 Stage 中,会有多个算子操作,可以形成一个 pipeline 流水线,流水线内的多个平行的分区可以并行执行。

-如何划分 DAG 的 stage?
对于窄依赖,partition 的转换处理在 stage 中完成计算,不划分(将窄依赖尽量放在在同一个 stage 中,可以实现流水线计算)。
对于宽依赖,由于有 shuffle 的存在,只能在父 RDD 处理完成后,才能开始接下来的计算,也就是说需要要划分 stage。

总结:
Spark 会根据 shuffle/宽依赖使用回溯算法来对 DAG 进行 Stage 划分,从后往前,遇到宽依赖就断开,遇到窄依赖就把当前的 RDD 加入到当前的 stage/阶段中

RDD 累加器和广播变量

在默认情况下,当 Spark 在集群的多个不同节点的多个任务上并行运行一个函数时,它会把函数中涉及到的每个变量,在每个任务上都生成一个副本
但是,有时候需要在多个任务之间共享变量,或者在任务(Task)和任务控制节点(Driver Program)之间共享变量。
为了满足这种需求,Spark 提供了两种类型的变量:
1.累加器 accumulators:累加器支持在所有不同节点之间进行累加计算(比如计数或者求和)。
2.广播变量 broadcast variables:广播变量用来把变量在所有节点的内存之间进行共享,在每个机器上缓存一个只读的变量,而不是为机器上的每个任务都生成一个副本。

RDD 累加器和广播变量

在 Spark 中,当任务在集群的不同节点上并行执行时,默认情况下,每个任务都会获取到函数中涉及的变量的一个独立副本。然而,在某些场景下,我们需要在任务之间共享变量,或者在任务和驱动程序之间共享变量。为了满足这些需求,Spark 提供了两种特殊的变量类型:累加器(Accumulators)广播变量(Broadcast Variables)


1. 累加器(Accumulators)

核心概念:

累加器是一种只能“累加”的变量,通常用于在分布式任务中收集信息执行全局聚合操作(如计数、求和等)。累加器的值可以在任务中更新,但只能在驱动程序中读取。

特点:

任务端(Task):任务可以对累加器进行累加操作,但无法读取累加器的值
驱动程序端(Driver):驱动程序可以读取累加器的最终值,但无法修改累加器的值

使用场景:

• 统计任务中某些事件的发生次数(如错误日志的数量)。
• 计算全局的聚合值(如所有任务中某个值的总和)。

示例:
from pyspark import SparkContextsc = SparkContext("local", "Accumulator Example")# 创建一个累加器,初始值为0
accum = sc.accumulator(0)# 定义一个RDD
rdd = sc.parallelize([1, 2, 3, 4, 5])# 在任务中对累加器进行累加
def add_to_accum(x):accum.add(x)rdd.foreach(add_to_accum)# 在驱动程序中读取累加器的值
print("累加器的值:", accum.value)  # 输出:15
记忆技巧:

• 累加器 = “全局计数器”,只能在任务中累加,在驱动程序中读取。
• 适合做分布式统计全局聚合


2. 广播变量(Broadcast Variables)

核心概念:

广播变量是一种只读变量,它允许将一个变量的值高效地分发到集群的所有节点,而不是在每个任务中复制一份。广播变量的值在每个节点上缓存,供所有任务共享。

特点:

只读:任务可以读取广播变量的值,但无法修改
高效:广播变量使用高效的广播算法,避免重复传输数据,减少网络开销。
节省内存:避免了在每个任务中复制变量的开销。

使用场景:

• 共享大型只读数据集(如字典、配置参数等)。
• 在任务中频繁访问的静态数据。

示例:
from pyspark import SparkContextsc = SparkContext("local", "Broadcast Example")# 创建一个广播变量
broadcast_var = sc.broadcast([1, 2, 3, 4, 5])# 定义一个RDD
rdd = sc.parallelize([1, 2, 3])# 在任务中使用广播变量
def use_broadcast(x):return x in broadcast_var.valueresult = rdd.filter(use_broadcast).collect()# 输出结果
print("过滤后的结果:", result)  # 输出:[1, 2, 3]
记忆技巧:

• 广播变量 = “全局只读变量”,高效分发到所有节点。
• 适合共享大型只读数据,避免重复传输和内存浪费。


对比总结

特性累加器(Accumulators)广播变量(Broadcast Variables)
用途全局聚合(如计数、求和)共享大型只读数据
可写性任务可写,驱动程序可读只读
传输方式任务结果汇总到驱动程序数据广播到所有节点
适用场景统计、聚合共享配置、字典等静态数据

记忆口诀

累加器“累加统计,全局聚合”
广播变量“只读共享,高效分发”

通过理解它们的用途和特点,可以轻松记住累加器和广播变量的区别和应用场景!

1) 累加器
  1. 不使用累加器
var counter = 0
val data = Seq(1, 2, 3)
data.foreach(x => counter += x)
println("Counter value: "+ counter)

运行结果:

Counter value: 6

如果我们将 data 转换成 RDD,再来重新计算:

var counter = 0
val data = Seq(1, 2, 3)
var rdd = sc.parallelize(data)
rdd.foreach(x => counter += x)
println("Counter value: "+ counter)

运行结果:

Counter value: 0
  1. 使用累加器
    通常在向 Spark 传递函数时,比如使用 map() 函数或者用 filter() 传条件时,可以使用驱动器程序中定义的变量,但是集群中运行的每个任务都会得到这些变量的一份新的副本,更新这些副本的值也不会影响驱动器中的对应变量。这时使用累加器就可以实现我们想要的效果:
    val xx: Accumulator[Int] = sc.accumulator(0)
  2. 代码示例:
import org.apache.spark.rdd.RDD
import org.apache.spark.{Accumulator, SparkConf, SparkContext}object AccumulatorTest {def main(args: Array[String]): Unit = {val conf: SparkConf = new SparkConf().setAppName("wc").setMaster("local[*]")val sc: SparkContext = new SparkContext(conf)sc.setLogLevel("WARN")//使用scala集合完成累加var counter1: Int = 0;var data = Seq(1,2,3)data.foreach(x => counter1 += x )println(counter1)//6println("+++++++++++++++++++++++++")//使用RDD进行累加var counter2: Int = 0;val dataRDD: RDD[Int] = sc.parallelize(data) //分布式集合的[1,2,3]dataRDD.foreach(x => counter2 += x)println(counter2)//0//注意:上面的RDD操作运行结果是0//因为foreach中的函数是传递给Worker中的Executor执行,用到了counter2变量//而counter2变量在Driver端定义的,在传递给Executor的时候,各个Executor都有了一份counter2//最后各个Executor将各自个x加到自己的counter2上面了,和Driver端的counter2没有关系//那这个问题得解决啊!不能因为使用了Spark连累加都做不了了啊!//如果解决?---使用累加器val counter3: Accumulator[Int] = sc.accumulator(0)dataRDD.foreach(x => counter3 += x)println(counter3)//6}
}
2) 广播变量
  1. 不使用广播变量
  2. 使用广播变量
  3. 代码示例:
    关键词:sc.broadcast()
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}object BroadcastVariablesTest {def main(args: Array[String]): Unit = {val conf: SparkConf = new SparkConf().setAppName("wc").setMaster("local[*]")val sc: SparkContext = new SparkContext(conf)sc.setLogLevel("WARN")//不使用广播变量val kvFruit: RDD[(Int, String)] = sc.parallelize(List((1,"apple"),(2,"orange"),(3,"banana"),(4,"grape")))val fruitMap: collection.Map[Int, String] =kvFruit.collectAsMap//scala.collection.Map[Int,String] = Map(2 -> orange, 4 -> grape, 1 -> apple, 3 -> banana)val fruitIds: RDD[Int] = sc.parallelize(List(2,4,1,3))//根据水果编号取水果名称val fruitNames: RDD[String] = fruitIds.map(x=>fruitMap(x))fruitNames.foreach(println)//注意:以上代码看似一点问题没有,但是考虑到数据量如果较大,且Task数较多,//那么会导致,被各个Task共用到的fruitMap会被多次传输//应该要减少fruitMap的传输,一台机器上一个,被该台机器中的Task共用即可//如何做到?---使用广播变量//注意:广播变量的值不能被修改,如需修改可以将数据存到外部数据源,如MySQL、Redisprintln("=====================")val BroadcastFruitMap: Broadcast[collection.Map[Int, String]] = sc.broadcast(fruitMap)val fruitNames2: RDD[String] = fruitIds.map(x=>BroadcastFruitMap.value(x))fruitNames2.foreach(println)}
}

Spark2_Spark SQL

Hive 和 SparkSQL

Hive 是将 SQL 转为 MapReduce。
SparkSQL 可以理解成是将 SQL 解析成:“RDD + 优化” 再执行。
在这里插入图片描述

Spark SQL 数据抽象
1) DataFrame

-什么是 DataFrame
DataFrame 的前身是 SchemaRDD,从 Spark 1.3.0 开始 SchemaRDD 更名为 DataFrame。并不再直接继承自 RDD,而是自己实现了 RDD 的绝大多数功能。
DataFrame 是一种以 RDD 为基础的分布式数据集,类似于传统数据库的二维表格,带有 Schema 元信息(可以理解为数据库的列名和类型)。
-总结:
DataFrame 就是一个分布式的表;
DataFrame = RDD - 泛型 + SQL 的操作 + 优化。

2) DataSet

-DataSet:
DataSet 是在 Spark1.6 中添加的新的接口。
与 RDD 相比,保存了更多的描述信息,概念上等同于关系型数据库中的二维表。
与 DataFrame 相比,保存了类型信息,是强类型的,提供了编译时类型检查。
调用 Dataset 的方法先会生成逻辑计划,然后被 spark 的优化器进行优化,最终生成物理计划,然后提交到集群中运行!
DataSet 包含了 DataFrame 的功能。
Spark2.0 中两者统一,DataFrame 表示为 DataSet[Row],即 DataSet 的子集。
DataFrame 其实就是 Dateset[Row]:

3) RDD、DataFrame、DataSet 的区别

1.结构图解:
-RDD[Person]:
以 Person 为类型参数,但不了解 其内部结构。
-DataFrame:
提供了详细的结构信息 schema 列的名称和类型。这样看起来就像一张表了。
-DataSet[Person]
不光有 schema 信息,还有类型信息。

2.数据图解:
-假设 RDD 中的两行数据长这样:

  • RDD[Person]:
    -那么 DataFrame 中的数据长这样:
    DataFrame = RDD[Person] - 泛型 + Schema + SQL 操作 + 优化:
    -那么 Dataset 中的数据长这样:
    Dataset[Person] = DataFrame + 泛型:
    -Dataset 也可能长这样:Dataset[Row]:
    即 DataFrame = DataSet[Row]:
    总结
    DataFrame = RDD - 泛型 + Schema + SQL + 优化
    DataSet = DataFrame + 泛型
    DataSet = RDD + Schema + SQL + 优化

RDD、DataFrame、DataSet 的区别与关系

在 Spark 中,RDDDataFrameDataSet 是三种核心的数据抽象,它们各自有不同的特点和适用场景。下面通过简单的类比和总结,帮助你快速理解它们的区别和关系。


1. RDD(Resilient Distributed Dataset)

核心概念:

RDD 是 Spark 中最基本的数据抽象,代表一个不可变的分布式数据集合。它提供了底层的 API,支持函数式编程和灵活的转换操作。

特点:

泛型:RDD 可以存储任意类型的对象(如 RDD[Person]),但 Spark 并不知道这些对象的内部结构。
低层次 API:需要手动编写代码来实现复杂的操作,性能优化依赖开发者。
无 Schema:RDD 没有元信息(列名、类型等),Spark 无法对其中的数据进行优化。

使用场景:

• 需要完全控制数据的处理流程时。
• 处理非结构化或半结构化数据时。

示例:
# 创建一个 RDD
rdd = sc.parallelize([("Alice", 25), ("Bob", 30)])# 对 RDD 进行操作
result = rdd.map(lambda x: (x[0], x[1] + 1)).collect()
记忆技巧:

• RDD = “原始数据集合”,灵活但无优化,适合底层操作。


2. DataFrame

核心概念:

DataFrame 是 Spark 1.3.0 引入的数据抽象,它基于 RDD,但添加了 Schema 信息(列名和类型),类似于关系型数据库中的二维表。

特点:

有 Schema:DataFrame 知道数据的结构(列名和类型),可以进行优化。
SQL 支持:可以通过 SQL 查询数据,适合结构化数据处理。
无泛型:DataFrame 中的数据是 Row 类型,没有编译时类型检查。

使用场景:

• 处理结构化数据时(如 CSV、JSON、数据库表)。
• 需要 SQL 查询或性能优化时。

示例:
from pyspark.sql import SparkSession# 创建 SparkSession
spark = SparkSession.builder.appName("DataFrame Example").getOrCreate()# 创建一个 DataFrame
data = [("Alice", 25), ("Bob", 30)]
df = spark.createDataFrame(data, ["Name", "Age"])# 使用 SQL 查询
df.createOrReplaceTempView("people")
result = spark.sql("SELECT Name, Age FROM people WHERE Age > 25").collect()
记忆技巧:

• DataFrame = “带 Schema 的表”,支持 SQL 和优化,适合结构化数据。


3. DataSet

核心概念:

DataSet 是 Spark 1.6 引入的数据抽象,它结合了 RDD 和 DataFrame 的优点,既有 Schema 信息,又支持泛型(编译时类型检查)。

特点:

有 Schema 和泛型:DataSet 知道数据的结构和类型,支持编译时类型检查。
高性能:DataSet 使用了 Catalyst 优化器和 Tungsten 执行引擎,性能更高。
统一 API:从 Spark 2.0 开始,DataFrame 是 DataSet[Row] 的别名。

使用场景:

• 需要强类型支持和编译时检查时。
• 处理复杂的结构化数据时。

示例:
import org.apache.spark.sql.SparkSession// 创建 SparkSession
val spark = SparkSession.builder.appName("DataSet Example").getOrCreate()// 定义一个 case class
case class Person(name: String, age: Int)// 创建一个 DataSet
val data = Seq(Person("Alice", 25), Person("Bob", 30))
val ds = spark.createDataset(data)// 对 DataSet 进行操作
val result = ds.filter(_.age > 25).collect()

记忆技巧:
• DataSet = “带 Schema 和泛型的高级表”,支持编译时检查和优化。


三者的关系与区别

  1. 结构图解
    RDD[Person]:知道存储的是 Person 类型,但不知道 Person 的内部结构。
    DataFrame:知道数据的 Schema(列名和类型),但数据是 Row 类型,无泛型。
    DataSet[Person]:知道数据的 Schema 和类型,支持编译时类型检查。

  2. 数据图解
    RDD:原始数据集合,无 Schema 和优化。
    DataFrame:RDD + Schema + SQL + 优化。
    DataSet:DataFrame + 泛型,或 RDD + Schema + SQL + 优化。

  3. 总结关系
    DataFrame = RDD - 泛型 + Schema + SQL + 优化
    DataSet = DataFrame + 泛型
    DataSet = RDD + Schema + SQL + 优化

  4. 统一关系
    从 Spark 2.0 开始:
    DataFrame = DataSet[Row]
    DataSet 是 DataFrame 的超集


记忆口诀
RDD“原始数据集合,灵活但无优化”
DataFrame“带 Schema 的表,支持 SQL 和优化”
DataSet“高级表,带 Schema 和泛型,支持编译时检查”

通过理解它们的核心特点和关系,可以轻松记住 RDD、DataFrame 和 DataSet 的区别和适用场景!

Spark SQL 应用

-在 spark2.0 版本之前
SQLContext 是创建 DataFrame 和执行 SQL 的入口。
HiveContext 通过 hive sql 语句操作 hive 表数据,兼容 hive 操作,hiveContext 继承自 SQLContext。

-在 spark2.0 之后
这些都统一于 SparkSession,SparkSession 封装了 SqlContext 及 HiveContext;
实现了 SQLContext 及 HiveContext 所有功能;
通过 SparkSession 还可以获取到 SparkConetxt。

1) 创建 DataFrame/DataSet

-读取文本文件:
1.在本地创建一个文件,有 id、name、age 三列,用空格分隔,然后上传到 hdfs 上。

vim /root/person.txt
1 zhangsan 20
2 lisi 29
3 wangwu 25
4 zhaoliu 30
5 tianqi 35
6 kobe 40

2.打开 spark-shell

spark/bin/spark-shell

创建 RDD

val lineRDD= sc.textFile("hdfs://node1:8020/person.txt").map(_.split(" ")) //RDD[Array[String]]

3.定义 case class(相当于表的 schema)

case class Person(id:Int, name:String, age:Int)

4.将 RDD 和 case class 关联 val personRDD = lineRDD.map(x => Person(x(0).toInt, x(1), x(2).toInt)) //RDD[Person]
5.将 RDD 转换成 DataFrame

val personDF = personRDD.toDF //DataFrame

6.查看数据和 schema

personDF.show
+---+--------+---+
| id|    name|age|
+---+--------+---+
|  1|zhangsan| 20|
|  2|    lisi| 29|
|  3|  wangwu| 25|
|  4| zhaoliu| 30|
|  5|  tianqi| 35|
|  6|    kobe| 40|
+---+--------+---+
personDF.printSchema

7.注册表

personDF.createOrReplaceTempView("t_person")

8.执行 SQL

spark.sql("select id,name from t_person where id > 3").show

9.也可以通过 SparkSession 构建 DataFrame

val dataFrame=spark.read.text("hdfs://node1:8020/person.txt")

dataFrame.show //注意:直接读取的文本文件没有完整schema信息
dataFrame.printSchema
-读取 json 文件:

val jsonDF= spark.read.json("file:///resources/people.json")

接下来就可以使用 DataFrame 的函数操作
jsonDF.show
注意:直接读取 json 文件有 schema 信息,因为 json 文件本身含有 Schema 信息,SparkSQL 可以自动解析。
-读取 parquet 文件:

val parquetDF=spark.read.parquet("file:///resources/users.parquet")

接下来就可以使用 DataFrame 的函数操作

parquetDF.show

注意:直接读取 parquet 文件有 schema 信息,因为 parquet 文件中保存了列的信息。

2) 两种查询风格:DSL 和 SQL

-准备工作:
先读取文件并转换为 DataFrame 或 DataSet:

val lineRDD= sc.textFile("hdfs://node1:8020/person.txt").map(_.split(" "))
case class Person(id:Int, name:String, age:Int)
val personRDD = lineRDD.map(x => Person(x(0).toInt, x(1), x(2).toInt))
val personDF = personRDD.toDF
personDF.show
//val personDS = personRDD.toDS
//personDS.show

-DSL 风格:
SparkSQL 提供了一个领域特定语言(DSL)以方便操作结构化数据
1.查看 name 字段的数据

personDF.select(personDF.col("name")).show
personDF.select(personDF("name")).show
personDF.select(col("name")).show
personDF.select("name").show

2.查看 name 和 age 字段数据

personDF.select("name", "age").show

3.查询所有的 name 和 age,并将 age+1

personDF.select(personDF.col("name"), personDF.col("age") + 1).show
personDF.select(personDF("name"), personDF("age") + 1).show
personDF.select(col("name"), col("age") + 1).show
personDF.select("name","age").show
//personDF.select("name", "age"+1).show
personDF.select($"name",$"age",$"age"+1).show

4.过滤 age 大于等于 25 的,使用 filter 方法过滤

personDF.filter(col("age") >= 25).show
personDF.filter($"age" >25).show

5.统计年龄大于 30 的人数

personDF.filter(col("age")>30).count()
personDF.filter($"age" >30).count()

6.按年龄进行分组并统计相同年龄的人数

personDF.groupBy("age").count().show
  • SQL 风格:
    DataFrame 的一个强大之处就是我们可以将它看作是一个关系型数据表,然后可以通过在程序中使用 spark.sql() 来执行 SQL 查询,结果将作为一个 DataFrame 返回。
    如果想使用 SQL 风格的语法,需要将 DataFrame 注册成表,采用如下的方式:
personDF.createOrReplaceTempView("t_person")
spark.sql("select * from t_person").show

1.显示表的描述信息

spark.sql("desc t_person").show

2.查询年龄最大的前两名

spark.sql("select * from t_person order by age desc limit 2").show

3.查询年龄大于 30 的人的信息

spark.sql("select * from t_person where age > 30 ").show

4.使用 SQL 风格完成 DSL 中的需求

spark.sql("select name, age + 1 from t_person").show
spark.sql("select name, age from t_person where age > 25").show
spark.sql("select count(age) from t_person where age > 30").show
spark.sql("select age, count(age) from t_person group by age").show

-总结:
1.DataFrame 和 DataSet 都可以通过 RDD 来进行创建;
2.也可以通过读取普通文本创建–注意:直接读取没有完整的约束,需要通过 RDD+Schema;
3.通过 josn/parquet 会有完整的约束;
4.不管是 DataFrame 还是 DataSet 都可以注册成表,之后就可以使用 SQL 进行查询了! 也可以使用 DSL!

3) Spark SQL 完成 WordCount

-SQL 风格:

import org.apache.spark.SparkContext
import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}object WordCount {def main(args: Array[String]): Unit = {//1.创建SparkSessionval spark: SparkSession = SparkSession.builder().master("local[*]").appName("SparkSQL").getOrCreate()val sc: SparkContext = spark.sparkContextsc.setLogLevel("WARN")//2.读取文件val fileDF: DataFrame = spark.read.text("D:\\data\\words.txt")val fileDS: Dataset[String] = spark.read.textFile("D:\\data\\words.txt")//fileDF.show()//fileDS.show()//3.对每一行按照空格进行切分并压平//fileDF.flatMap(_.split(" ")) //注意:错误,因为DF没有泛型,不知道_是Stringimport spark.implicits._val wordDS: Dataset[String] = fileDS.flatMap(_.split(" "))//注意:正确,因为DS有泛型,知道_是String//wordDS.show()/*+-----+|value|+-----+|hello||   me||hello||  you|...*///4.对上面的数据进行WordCountwordDS.createOrReplaceTempView("t_word")val sql ="""|select value ,count(value) as count|from t_word|group by value|order by count desc""".stripMarginspark.sql(sql).show()sc.stop()spark.stop()}
}

-DSL 风格:

import org.apache.spark.SparkContext
import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}object WordCount2 {def main(args: Array[String]): Unit = {//1.创建SparkSessionval spark: SparkSession = SparkSession.builder().master("local[*]").appName("SparkSQL").getOrCreate()val sc: SparkContext = spark.sparkContextsc.setLogLevel("WARN")//2.读取文件val fileDF: DataFrame = spark.read.text("D:\\data\\words.txt")val fileDS: Dataset[String] = spark.read.textFile("D:\\data\\words.txt")//fileDF.show()//fileDS.show()//3.对每一行按照空格进行切分并压平//fileDF.flatMap(_.split(" ")) //注意:错误,因为DF没有泛型,不知道_是Stringimport spark.implicits._val wordDS: Dataset[String] = fileDS.flatMap(_.split(" "))//注意:正确,因为DS有泛型,知道_是String//wordDS.show()/*+-----+|value|+-----+|hello||   me||hello||  you|...*///4.对上面的数据进行WordCountwordDS.groupBy("value").count().orderBy($"count".desc).show()sc.stop()spark.stop()}
}
4) Spark SQL 多数据源交互

-读数据:
读取 json 文件:

spark.read.json("D:\\data\\output\\json").show()

读取 csv 文件:

spark.read.csv("D:\\data\\output\\csv").toDF("id","name","age").show()

读取 parquet 文件:

spark.read.parquet("D:\\data\\output\\parquet").show()

读取 mysql 表:

val prop = new Properties()prop.setProperty("user","root")prop.setProperty("password","root")
spark.read.jdbc(
"jdbc:mysql://localhost:3306/bigdata?characterEncoding=UTF-8","person",prop).show()
  • 写数据:
    写入 json 文件:
personDF.write.json("D:\\data\\output\\json")

写入 csv 文件:

personDF.write.csv("D:\\data\\output\\csv")

写入 parquet 文件:

personDF.write.parquet("D:\\data\\output\\parquet")

写入 mysql 表:

val prop = new Properties()prop.setProperty("user","root")prop.setProperty("password","root")
personDF.write.mode(SaveMode.Overwrite).jdbc(
"jdbc:mysql://localhost:3306/bigdata?characterEncoding=UTF-8","person",prop)

Spark3_Spark Streaming

Spark Streaming 是一个基于 Spark Core 之上的实时计算框架,可以从很多数据源消费数据并对数据进行实时的处理,具有高吞吐量和容错能力强等特点。
在这里插入图片描述
Spark Streaming 的特点:
1.易用
可以像编写离线批处理一样去编写流式程序,支持 java/scala/python 语言。
2.容错
SparkStreaming 在没有额外代码和配置的情况下可以恢复丢失的工作。
3.易整合到 Spark 体系
流式处理与批处理和交互式查询相结合。

1. 整体流程

Spark Streaming 中,会有一个接收器组件 Receiver,作为一个长期运行的 task 跑在一个 Executor 上。Receiver 接收外部的数据流形成 input DStream。
DStream 会被按照时间间隔划分成一批一批的 RDD,当批处理间隔缩短到秒级时,便可以用于处理实时数据流。时间间隔的大小可以由参数指定,一般设在 500 毫秒到几秒之间
对 DStream 进行操作就是对 RDD 进行操作,计算处理的结果可以传给外部系统。
Spark Streaming 的工作流程像下面的图所示一样,接受到实时数据后,给数据分批次,然后传给 Spark Engine 处理最后生成该批次的结果。

2. 数据抽象

Spark Streaming 的基础抽象是 DStream(Discretized Stream,离散化数据流,连续不断的数据流),代表持续性的数据流和经过各种 Spark 算子操作后的结果数据流。
可以从以下多个角度深入理解 DStream:
1.DStream 本质上就是一系列时间上连续的 RDD
在这里插入图片描述
2.对 DStream 的数据的进行操作也是按照 RDD 为单位来进行的
在这里插入图片描述
3.容错性,底层 RDD 之间存在依赖关系,DStream 直接也有依赖关系,RDD 具有容错性,那么 DStream 也具有容错性
4.准实时性/近实时性
Spark Streaming 将流式计算分解成多个 Spark Job,对于每一时间段数据的处理都会经过 Spark DAG 图分解以及 Spark 的任务集的调度过程。
对于目前版本的 Spark Streaming 而言,其最小的 Batch Size 的选取在 0.5~5 秒钟之间。
所以 Spark Streaming 能够满足流式准实时计算场景,对实时性要求非常高的如高频实时交易场景则不太适合。
-总结
简单来说 DStream 就是对 RDD 的封装,你对 DStream 进行操作,就是对 RDD 进行操作。
对于 DataFrame/DataSet/DStream 来说本质上都可以理解成 RDD。

3. DStream 相关操作

DStream 上的操作与 RDD 的类似,分为以下两种:
1.Transformations(转换)
2.Output Operations(输出)/Action

  1. Transformations
    以下是常见 Transformation—都是无状态转换:即每个批次的处理不依赖于之前批次的数据:
    在这里插入图片描述
    除此之外还有一类特殊的 Transformations—有状态转换:当前批次的处理需要使用之前批次的数据或者中间结果。
    有状态转换包括基于追踪状态变化的转换(updateStateByKey)和滑动窗口的转换:
    1.UpdateStateByKey(func)
    2.Window Operations 窗口操作

  2. Output/Action
    Output Operations 可以将 DStream 的数据输出到外部的数据库或文件系统。
    当某个 Output Operations 被调用时,spark streaming 程序才会开始真正的计算过程(与 RDD 的 Action 类似)。
    在这里插入图片描述

4. Spark Streaming 完成实时需求
1) WordCount

-首先在 linux 服务器上安装 nc 工具
nc 是 netcat 的简称,原本是用来设置路由器,我们可以利用它向某个端口发送数据 yum install -y nc
-启动一个服务端并开放 9999 端口,等一下往这个端口发数据
nc -lk 9999
-发送数据
-接收数据,代码示例:

import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.streaming.{Seconds, StreamingContext}object WordCount {def main(args: Array[String]): Unit = {//1.创建StreamingContext//spark.master should be set as local[n], n > 1val conf = new SparkConf().setAppName("wc").setMaster("local[*]")val sc = new SparkContext(conf)sc.setLogLevel("WARN")val ssc = new StreamingContext(sc,Seconds(5))//5表示5秒中对数据进行切分形成一个RDD//2.监听Socket接收数据//ReceiverInputDStream就是接收到的所有的数据组成的RDD,封装成了DStream,接下来对DStream进行操作就是对RDD进行操作val dataDStream: ReceiverInputDStream[String] = ssc.socketTextStream("node01",9999)//3.操作数据val wordDStream: DStream[String] = dataDStream.flatMap(_.split(" "))val wordAndOneDStream: DStream[(String, Int)] = wordDStream.map((_,1))val wordAndCount: DStream[(String, Int)] = wordAndOneDStream.reduceByKey(_+_)wordAndCount.print()ssc.start()//开启ssc.awaitTermination()//等待停止}
}
2) updateStateByKey

-问题:
在上面的那个案例中存在这样一个问题:
每个批次的单词次数都被正确的统计出来,但是结果不能累加!
如果需要累加需要使用 updateStateByKey(func)来更新状态。
代码示例:

import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}object WordCount2 {def main(args: Array[String]): Unit = {//1.创建StreamingContext//spark.master should be set as local[n], n > 1val conf = new SparkConf().setAppName("wc").setMaster("local[*]")val sc = new SparkContext(conf)sc.setLogLevel("WARN")val ssc = new StreamingContext(sc,Seconds(5))//5表示5秒中对数据进行切分形成一个RDD//requirement failed: ....Please set it by StreamingContext.checkpoint().//注意:我们在下面使用到了updateStateByKey对当前数据和历史数据进行累加//那么历史数据存在哪?我们需要给他设置一个checkpoint目录ssc.checkpoint("./wc")//开发中HDFS//2.监听Socket接收数据//ReceiverInputDStream就是接收到的所有的数据组成的RDD,封装成了DStream,接下来对DStream进行操作就是对RDD进行操作val dataDStream: ReceiverInputDStream[String] = ssc.socketTextStream("node01",9999)//3.操作数据val wordDStream: DStream[String] = dataDStream.flatMap(_.split(" "))val wordAndOneDStream: DStream[(String, Int)] = wordDStream.map((_,1))//val wordAndCount: DStream[(String, Int)] = wordAndOneDStream.reduceByKey(_+_)//====================使用updateStateByKey对当前数据和历史数据进行累加====================val wordAndCount: DStream[(String, Int)] =wordAndOneDStream.updateStateByKey(updateFunc)wordAndCount.print()ssc.start()//开启ssc.awaitTermination()//等待优雅停止}//currentValues:当前批次的value值,如:1,1,1 (以测试数据中的hadoop为例)//historyValue:之前累计的历史值,第一次没有值是0,第二次是3//目标是把当前数据+历史数据返回作为新的结果(下次的历史数据)def updateFunc(currentValues:Seq[Int], historyValue:Option[Int] ):Option[Int] ={val result: Int = currentValues.sum + historyValue.getOrElse(0)Some(result)}
}
3) reduceByKeyAndWindow

使用上面的代码已经能够完成对所有历史数据的聚合,但是实际中可能会有一些需求,需要对指定时间范围的数据进行统计。
比如:
百度/微博的热搜排行榜 统计最近 24 小时的热搜词,每隔 5 分钟更新一次,所以面对这样的需求我们需要使用窗口操作 Window Operations。
图解:
我们先提出一个问题:统计经过某红绿灯的汽车数量之和?
假设在一个红绿灯处,我们每隔 15 秒统计一次通过此红绿灯的汽车数量,如下图:
在这里插入图片描述

可以把汽车的经过看成一个流,无穷的流,不断有汽车经过此红绿灯,因此无法统计总共的汽车数量。但是,我们可以换一种思路,每隔 15 秒,我们都将与上一次的结果进行 sum 操作(滑动聚合, 但是这个结果似乎还是无法回答我们的问题,根本原因在于流是无界的,我们不能限制流,但可以在有一个有界的范围内处理无界的流数据。
在这里插入图片描述

因此,我们需要换一个问题的提法:每分钟经过某红绿灯的汽车数量之和?
这个问题,就相当于一个定义了一个 Window(窗口),window 的界限是 1 分钟,且每分钟内的数据互不干扰,因此也可以称为翻滚(不重合)窗口,如下图:
第一分钟的数量为 8,第二分钟是 22,第三分钟是 27。。。这样,1 个小时内会有 60 个 window。
再考虑一种情况,每 30 秒统计一次过去 1 分钟的汽车数量之和:
在这里插入图片描述
此时,window 出现了重合。这样,1 个小时内会有 120 个 window。
滑动窗口转换操作的计算过程如下图所示:
在这里插入图片描述

我们可以事先设定一个滑动窗口的长度(也就是窗口的持续时间),并且设定滑动窗口的时间间隔(每隔多长时间执行一次计算),
比如设置滑动窗口的长度(也就是窗口的持续时间)为 24H,设置滑动窗口的时间间隔(每隔多长时间执行一次计算)为 1H
那么意思就是:每隔 1H 计算最近 24H 的数据
在这里插入图片描述

代码示例:

import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}object WordCount3 {def main(args: Array[String]): Unit = {//1.创建StreamingContext//spark.master should be set as local[n], n > 1val conf = new SparkConf().setAppName("wc").setMaster("local[*]")val sc = new SparkContext(conf)sc.setLogLevel("WARN")val ssc = new StreamingContext(sc,Seconds(5))//5表示5秒中对数据进行切分形成一个RDD//2.监听Socket接收数据//ReceiverInputDStream就是接收到的所有的数据组成的RDD,封装成了DStream,接下来对DStream进行操作就是对RDD进行操作val dataDStream: ReceiverInputDStream[String] = ssc.socketTextStream("node01",9999)//3.操作数据val wordDStream: DStream[String] = dataDStream.flatMap(_.split(" "))val wordAndOneDStream: DStream[(String, Int)] = wordDStream.map((_,1))//4.使用窗口函数进行WordCount计数//reduceFunc: (V, V) => V,集合函数//windowDuration: Duration,窗口长度/宽度//slideDuration: Duration,窗口滑动间隔//注意:windowDuration和slideDuration必须是batchDuration的倍数//windowDuration=slideDuration:数据不会丢失也不会重复计算==开发中会使用//windowDuration>slideDuration:数据会重复计算==开发中会使用//windowDuration<slideDuration:数据会丢失//下面的代码表示://windowDuration=10//slideDuration=5//那么执行结果就是每隔5s计算最近10s的数据//比如开发中让你统计最近1小时的数据,每隔1分钟计算一次,那么参数该如何设置?//windowDuration=Minutes(60)//slideDuration=Minutes(1)val wordAndCount: DStream[(String, Int)] = wordAndOneDStream.reduceByKeyAndWindow((a:Int,b:Int)=>a+b,Seconds(10),Seconds(5))wordAndCount.print()ssc.start()//开启ssc.awaitTermination()//等待优雅停止}
}

Structured Streaming

Spark 的 Structured Streaming 和 Streaming 的区别

在 Spark 中,StreamingStructured Streaming 是两种处理流数据的框架,它们的设计理念、API 和功能有很大的不同。下面通过对比帮助你理解它们的区别。


1. Streaming(DStream)

核心概念:

DStream(Discretized Stream) 是 Spark Streaming 的核心抽象,代表一个连续的流数据,但在内部被划分为一系列小批量的 RDD。
微批处理:Spark Streaming 将流数据切分为一系列小批次(如每 1 秒一个批次),然后对每个批次进行处理。

特点:

基于 RDD:DStream 是 RDD 的序列,因此继承了 RDD 的灵活性和底层 API。
无 Schema:DStream 中的数据是无结构的,需要手动解析和处理。
处理模型:微批处理(Micro-batching),有一定的延迟。
容错性:基于 RDD 的容错机制,数据丢失时可以恢复。

使用场景:

• 需要低层次 API 控制流处理时。
• 处理非结构化或半结构化数据时。

示例:
from pyspark.streaming import StreamingContext# 创建 StreamingContext
ssc = StreamingContext(sc, batchDuration=1)# 创建一个 DStream
lines = ssc.socketTextStream("localhost", 9999)# 对 DStream 进行处理
words = lines.flatMap(lambda line: line.split(" "))
word_counts = words.map(lambda word: (word, 1)).reduceByKey(lambda x, y: x + y)# 输出结果
word_counts.pprint()# 启动流处理
ssc.start()
ssc.awaitTermination()
记忆技巧:

• Streaming = “基于 RDD 的流处理”,灵活但无 Schema,适合底层操作。


2. Structured Streaming

核心概念:

Structured Streaming 是 Spark 2.0 引入的流处理框架,基于 Spark SQL 引擎,提供了一种声明式的流处理 API
无限表模型:将流数据视为一张无限增长的表,支持基于 SQL 的操作。

特点:

基于 DataFrame/DataSet:使用 DataFrame/DataSet API,支持结构化数据。
有 Schema:数据带有 Schema 信息,可以自动解析和优化。
处理模型:支持微批处理和连续处理(Continuous Processing),延迟更低。
容错性:基于 Spark SQL 的容错机制,保证端到端的一致性。
事件时间:支持基于事件时间的窗口操作。

使用场景:

• 处理结构化数据时(如 JSON、CSV、数据库表)。
• 需要 SQL 查询或声明式 API 时。

示例:
from pyspark.sql import SparkSession# 创建 SparkSession
spark = SparkSession.builder.appName("StructuredStreaming Example").getOrCreate()# 创建一个流 DataFrame
lines = spark.readStream.format("socket").option("host", "localhost").option("port", 9999).load()# 对流数据进行处理
words = lines.selectExpr("explode(split(value, ' ')) as word")
word_counts = words.groupBy("word").count()# 输出结果
query = word_counts.writeStream.outputMode("complete").format("console").start()# 启动流处理
query.awaitTermination()
记忆技巧:

• Structured Streaming = “基于 SQL 的流处理”,支持结构化数据和声明式 API。


对比总结

特性Streaming(DStream)Structured Streaming
核心抽象DStream(RDD 的序列)DataFrame/DataSet(无限表)
数据处理模型微批处理(Micro-batching)微批处理和连续处理(Continuous Processing)
API低层次 API(基于 RDD)高层次 API(基于 SQL 和 DataFrame)
Schema无 Schema,数据无结构有 Schema,数据结构化
延迟较高(微批处理)较低(支持连续处理)
容错性基于 RDD 的容错机制基于 Spark SQL 的容错机制
事件时间支持不支持支持
适用场景非结构化或半结构化数据,需要低层次 API结构化数据,需要 SQL 和声明式 API

记忆口诀

Streaming“基于 RDD 的流处理,灵活但无 Schema”
Structured Streaming“基于 SQL 的流处理,结构化且高效”

通过理解它们的核心特点和适用场景,可以轻松记住 Streaming 和 Structured Streaming 的区别!

在 2.0 之前,Spark Streaming 作为核心 API 的扩展,针对实时数据流,提供了一套可扩展、高吞吐、可容错的流式计算模型。Spark Streaming 会接收实时数据源的数据,并切分成很多小的 batches,然后被 Spark Engine 执行,产出同样由很多小的 batchs 组成的结果流。本质上,这是一种 micro-batch(微批处理)的方式处理,用批的思想去处理流数据.这种设计让Spark Streaming 面对复杂的流式处理场景时捉襟见肘。
spark streaming 这种构建在微批处理上的流计算引擎,比较突出的问题就是处理延时较高(无法优化到秒以下的数量级),以及无法支持基于 event_time 的时间窗口做聚合逻辑。
spark 在 2.0 版本中发布了新的流计算的 API,Structured Streaming/结构化流。
Structured Streaming 是一个基于 Spark SQL 引擎的可扩展、容错的流处理引擎。统一了流、批的编程模型,你可以使用静态数据批处理一样的方式来编写流式计算操作。并且支持基于 event_time 的时间窗口的处理逻辑。
随着数据不断地到达,Spark 引擎会以一种增量的方式来执行这些操作,并且持续更新结算结果。可以使用 Scala、Java、Python 或 R 中的 DataSet/DataFrame API 来表示流聚合、事件时间窗口、流到批连接等。此外,Structured Streaming 会通过 checkpoint 和预写日志等机制来实现 Exactly-Once 语义。
简单来说,对于开发人员来说,根本不用去考虑是流式计算,还是批处理,只要使用同样的方式来编写计算操作即可,Structured Streaming 提供了快速、可扩展、容错、端到端的一次性流处理,而用户无需考虑更多细节。
默认情况下,结构化流式查询使用微批处理引擎进行处理,该引擎将数据流作为一系列小批处理作业进行处理,从而实现端到端的延迟,最短可达 100 毫秒,并且完全可以保证一次容错。自 Spark 2.3 以来,引入了一种新的低延迟处理模式,称为连续处理,它可以在至少一次保证的情况下实现低至 1 毫秒的端到端延迟。也就是类似于 Flink 那样的实时流,而不是小批量处理。实际开发可以根据应用程序要求选择处理模式,但是连续处理在使用的时候仍然有很多限制,目前大部分情况还是应该采用小批量模式。

1. API

Spark Streaming 时代 -DStream-RDD
Spark Streaming 采用的数据抽象是 DStream,而本质上就是时间上连续的 RDD,对数据流的操作就是针对 RDD 的操作。

Structured Streaming 时代 - DataSet/DataFrame -RDD
Structured Streaming 是 Spark2.0 新增的可扩展和高容错性的实时计算框架,它构建于 Spark SQL 引擎,把流式计算也统一到 DataFrame/Dataset 里去了。
Structured Streaming 相比于 Spark Streaming 的进步就类似于 Dataset 相比于 RDD 的进步。

2. 核心思想

Structured Streaming 最核心的思想就是将实时到达的数据看作是一个不断追加的 unbound table 无界表,到达流的每个数据项(RDD)就像是表中的一个新行被附加到无边界的表中.这样用户就可以用静态结构化数据的批处理查询方式进行流计算,如可以使用 SQL 对到来的每一行数据进行实时查询处理。

3. 应用场景

Structured Streaming 将数据源映射为类似于关系数据库中的表,然后将经过计算得到的结果映射为另一张表,完全以结构化的方式去操作流式数据,这种编程模型非常有利于处理分析结构化的实时数据;

4. Structured Streaming 实战
  1. 读取 Socket 数据
import org.apache.spark.SparkContext
import org.apache.spark.sql.streaming.Trigger
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}object WordCount {def main(args: Array[String]): Unit = {//1.创建SparkSession,因为StructuredStreaming的数据模型也是DataFrame/DataSetval spark: SparkSession = SparkSession.builder().master("local[*]").appName("SparkSQL").getOrCreate()val sc: SparkContext = spark.sparkContextsc.setLogLevel("WARN")//2.接收数据val dataDF: DataFrame = spark.readStream.option("host", "node01").option("port", 9999).format("socket").load()//3.处理数据import spark.implicits._val dataDS: Dataset[String] = dataDF.as[String]val wordDS: Dataset[String] = dataDS.flatMap(_.split(" "))val result: Dataset[Row] = wordDS.groupBy("value").count().sort($"count".desc)//result.show()//Queries with streaming sources must be executed with writeStream.start();result.writeStream.format("console")//往控制台写.outputMode("complete")//每次将所有的数据写出.trigger(Trigger.ProcessingTime(0))//触发时间间隔,0表示尽可能的快//.option("checkpointLocation","./ckp")//设置checkpoint目录,socket不支持数据恢复,所以第二次启动会报错,需要注掉.start()//开启.awaitTermination()//等待停止}
}
  1. 读取目录下文本数据
import org.apache.spark.SparkContext
import org.apache.spark.sql.streaming.Trigger
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}
/*** {"name":"json","age":23,"hobby":"running"}* {"name":"charles","age":32,"hobby":"basketball"}* {"name":"tom","age":28,"hobby":"football"}* {"name":"lili","age":24,"hobby":"running"}* {"name":"bob","age":20,"hobby":"swimming"}* 统计年龄小于25岁的人群的爱好排行榜*/
object WordCount2 {def main(args: Array[String]): Unit = {//1.创建SparkSession,因为StructuredStreaming的数据模型也是DataFrame/DataSetval spark: SparkSession = SparkSession.builder().master("local[*]").appName("SparkSQL").getOrCreate()val sc: SparkContext = spark.sparkContextsc.setLogLevel("WARN")val Schema: StructType = new StructType().add("name","string").add("age","integer").add("hobby","string")//2.接收数据import spark.implicits._// Schema must be specified when creating a streaming source DataFrame.val dataDF: DataFrame = spark.readStream.schema(Schema).json("D:\\data\\spark\\data")//3.处理数据val result: Dataset[Row] = dataDF.filter($"age" < 25).groupBy("hobby").count().sort($"count".desc)//4.输出结果result.writeStream.format("console").outputMode("complete").trigger(Trigger.ProcessingTime(0)).start().awaitTermination()}
}
  1. 计算操作
    获得到 Source 之后的基本数据处理方式和之前学习的 DataFrame、DataSet 一致,不再赘述。
    官网示例代码:
case class DeviceData(device: String, deviceType: String, signal: Double, time: DateTime)
val df: DataFrame = ... // streaming DataFrame with IOT device data with schema { device: string, deviceType: string, signal: double, time: string }
val ds: Dataset[DeviceData] = df.as[DeviceData]    // streaming Dataset with IOT device data
// Select the devices which have signal more than 10
df.select("device").where("signal > 10")      // using untyped APIs
ds.filter(_.signal > 10).map(_.device)         // using typed APIs
// Running count of the number of updates for each device type
df.groupBy("deviceType").count()                 // using untyped API
// Running average signal for each device type
import org.apache.spark.sql.expressions.scalalang.typed
ds.groupByKey(_.deviceType).agg(typed.avg(_.signal))    // using typed API
  1. 输出
    计算结果可以选择输出到多种设备并进行如下设定:
    1.output mode:以哪种方式将 result table 的数据写入 sink,即是全部输出 complete 还是只输出新增数据;
    2.format/output sink 的一些细节:数据格式、位置等。如 console;
    3.query name:指定查询的标识。类似 tempview 的名字;
    4.trigger interval:触发间隔,如果不指定,默认会尽可能快速地处理数据;
    5.checkpointLocation:一般是 hdfs 上的目录。注意:Socket 不支持数据恢复,如果设置了,第二次启动会报错,Kafka 支持。
    output mode:
    在这里插入图片描述

每当结果表更新时,我们都希望将更改后的结果行写入外部接收器。
这里有三种输出模型:
1.Append mode:默认模式,新增的行才输出,每次更新结果集时,只将新添加到结果集的结果行输出到接收器。仅支持那些添加到结果表中的行永远不会更改的查询。因此,此模式保证每行仅输出一次。例如,仅查询 select,where,map,flatMap,filter,join 等会支持追加模式。不支持聚合
2.Complete mode:所有内容都输出,每次触发后,整个结果表将输出到接收器。聚合查询支持此功能。仅适用于包含聚合操作的查询。
3.Update mode:更新的行才输出,每次更新结果集时,仅将被更新的结果行输出到接收器(自 Spark 2.1.1 起可用),不支持排序
output sink:
在这里插入图片描述

-说明:
File sink: 输出存储到一个目录中。支持 parquet 文件,以及 append 模式。

writeStream.format("parquet")        // can be "orc", "json", "csv", etc..option("path", "path/to/destination/dir").start()

Kafka sink:将输出存储到 Kafka 中的一个或多个 topics 中。

writeStream.format("kafka").option("kafka.bootstrap.servers", "host1:port1,host2:port2").option("topic", "updates").start()

Foreach sink:对输出中的记录运行任意计算

writeStream.foreach(...).start()

Console sink:将输出打印到控制台

writeStream.format("console").start()

Spark4_Spark的两种核心 Shuffle

在这里插入图片描述
在这里插入图片描述

在 MapReduce 框架中, Shuffle 阶段是连接 Map 与 Reduce 之间的桥梁, Map 阶段通过 Shuffle 过程将数据输出到 Reduce 阶段中。由于 Shuffle 涉及磁盘的读写和网络 I/O,因此 Shuffle 性能的高低直接影响整个程序的性能。 Spark 也有 Map 阶段和 Reduce 阶段,因此也会出现 Shuffle 。

Spark Shuffle

Spark Shuffle 的两种实现方式

在 Spark 中,Shuffle 是连接不同阶段(如 Map 和 Reduce)的关键过程,负责将数据重新分发和排序。Spark 提供了两种 Shuffle 实现方式:基于 Hash 的 Shuffle基于 Sort 的 Shuffle。下面通过简单易懂的方式讲解它们的区别和特点。


1. 基于 Hash 的 Shuffle
核心思想:

• 使用 哈希函数 将数据分配到不同的分区(Partition)中。
• 每个分区对应一个 Reduce 任务。

实现过程:
  1. Map 任务:Map 任务生成键值对(key-value pairs)。
  2. 哈希分区:根据 key 的哈希值将数据分配到不同的分区中。
  3. 写入文件:每个分区会生成一个单独的文件,存储在磁盘上。
  4. Reduce 任务:Reduce 任务从磁盘上读取属于自己的分区文件。
特点:

简单高效:哈希分区实现简单,适合小规模数据。
文件数量多:每个 Map 任务会为每个分区生成一个文件,文件数量为 Map 任务数 × Reduce 任务数
• 例如:100 个 Map 任务和 100 个 Reduce 任务会生成 10,000 个文件。
性能问题
• 文件数量过多会导致 磁盘 I/O 压力大
• 小文件过多会影响性能。

使用场景:

• 适合小规模数据或 Reduce 任务数较少的场景。

记忆技巧:

• 基于 Hash 的 Shuffle = “简单哈希分发,文件多性能差”


2. 基于 Sort 的 Shuffle
核心思想:

• 在 Map 任务端对数据进行 排序,然后将排序后的数据写入一个 大文件
• 每个 Map 任务只会生成一个文件,文件中包含所有分区的数据。

实现过程:
  1. Map 任务:Map 任务生成键值对(key-value pairs)。
  2. 排序和分区:对数据按照 key 进行排序,并分配到不同的分区中。
  3. 写入文件:将排序后的数据写入一个 大文件,同时生成一个索引文件,记录每个分区的偏移量。
  4. Reduce 任务:Reduce 任务根据索引文件从大文件中读取属于自己的数据。
特点:

文件数量少:每个 Map 任务只生成一个文件,文件数量为 Map 任务数
• 例如:100 个 Map 任务会生成 100 个文件。
性能优化
• 减少磁盘 I/O 压力。
• 适合大规模数据。
排序开销:排序操作会增加一定的计算开销。

使用场景:

• 适合大规模数据或 Reduce 任务数较多的场景。

记忆技巧:

• 基于 Sort 的 Shuffle = “排序合并文件,性能优适合大数据”


对比总结
特性基于 Hash 的 Shuffle基于 Sort 的 Shuffle
核心思想哈希分区排序后合并
文件数量文件多(Map 任务数 × Reduce 任务数)文件少(Map 任务数)
性能文件多,性能差文件少,性能优
排序开销无排序有排序开销
适用场景小规模数据大规模数据

记忆口诀

基于 Hash 的 Shuffle“简单哈希分发,文件多性能差”
基于 Sort 的 Shuffle“排序合并文件,性能优适合大数据”

一张图了解下 Spark Shuffle 的迭代历史:
在这里插入图片描述

Spark Shuffle 迭代历史

为什么 Spark 最终还是放弃了 HashShuffle ,使用了 Sorted-Based Shuffle?

使用 HashShuffle 的 Spark 在 Shuffle 时产生大量的文件。当数据量越来越多时,产生的文件量是不可控的,这严重制约了 Spark 的性能及扩展能力,所以 Spark 必须要解决这个问题,减少 Mapper 端 ShuffleWriter 产生的文件数量,这样便可以让 Spark 从几百台集群的规模瞬间变成可以支持几千台,甚至几万台集群的规模。
但使用 Sorted-Based Shuffle 就完美了吗,答案是否定的,Sorted-Based Shuffle 也有缺点,其缺点反而是它排序的特性,它强制要求数据在 Mapper 端必须先进行排序,所以导致它排序的速度有点慢。好在出现了 Tim-Sort Shuffle ,它对排序算法进行了改进,优化了排序的速度。Tim-Sort Shuffle 已经并入了 Sorted-Based Shuffle,Spark 的引擎会自动识别程序需要的是 Sorted-Based Shuffle,还是 Tim-Sort Shuffle。

一、Hash Shuffle 解析

以下的讨论都假设每个 Executor 有 1 个 cpu core。

Hash Shuffle 解析

在 Spark 中,Hash Shuffle 是一种用于将数据重新分发(Shuffle)的机制,它将 Map 任务的输出数据按照 key 的哈希值分发到 Reduce 任务中。Hash Shuffle 分为 未优化的 Hash Shuffle优化后的 Hash Shuffle。下面通过简单易懂的方式讲解它们的工作原理和优缺点。


1. 未优化的 Hash Shuffle

在这里插入图片描述
未优化的HashShuffleManager工作原理

核心思想:

• 每个 Map 任务为下游的每个 Reduce 任务生成一个独立的磁盘文件。
• 文件数量 = Map 任务数 × Reduce 任务数。

实现过程:
  1. Shuffle Write
    • Map 任务将数据按 key 的哈希值分区,写入内存缓冲区。
    • 当缓冲区满时,数据溢写到磁盘文件。
    • 每个 Reduce 任务对应一个独立的文件。

  2. Shuffle Read
    • Reduce 任务从所有 Map 任务的输出文件中拉取属于自己的数据。
    • 数据拉取后,进行聚合操作。

问题:

文件数量过多:如果 Map 任务数为 M,Reduce 任务数为 R,则生成的文件总数为 M × R。
• 例如:100 个 Map 任务和 100 个 Reduce 任务会生成 10,000 个文件。
性能瓶颈
• 大量小文件对文件系统造成压力。
• 磁盘 I/O 和内存开销大。

记忆技巧:

• 未优化的 Hash Shuffle = “文件爆炸,性能差”


2. 优化后的 Hash Shuffle

在这里插入图片描述
优化后的HashShuffleManager工作原理

核心思想:

• 引入 Shuffle File Group 机制,允许多个 Map 任务复用同一批磁盘文件。
• 文件数量 = CPU Core 数 × Reduce 任务数。

实现过程:
  1. Shuffle Write
    • 每个 CPU Core 执行一批 Map 任务,为这批任务创建一个 Shuffle File Group。
    • 每个 Shuffle File Group 包含与 Reduce 任务数相同的文件。
    • 下一批 Map 任务复用已有的 Shuffle File Group,避免创建新文件。

  2. Shuffle Read
    • Reduce 任务从 Shuffle File Group 中拉取属于自己的数据。

优化效果:

文件数量大幅减少
• 例如:100 个 Map 任务和 100 个 Reduce 任务,如果 CPU Core 数为 10,则文件总数从 10,000 减少到 1,000。
性能提升
• 减少磁盘 I/O 和内存开销。

记忆技巧:

• 优化后的 Hash Shuffle = “文件复用,性能提升”


Hash Shuffle 的优缺点
优点:

无排序开销:基于哈希分区,避免了排序操作。
内存占用少:不需要额外的排序内存。

缺点:

文件数量多:未优化时,文件数量为 M × R,对文件系统造成压力。
磁盘 I/O 开销大:大量小文件的随机读写导致性能瓶颈。
内存压力:数据写入时的缓存空间需求较大。


对比总结
特性未优化的 Hash Shuffle优化后的 Hash Shuffle
文件数量M × R(文件爆炸)CPU Core 数 × R(文件复用)
性能差(大量小文件,磁盘 I/O 压力大)较好(文件数量减少,性能提升)
排序开销无排序无排序
适用场景小规模数据中等规模数据

记忆口诀

未优化的 Hash Shuffle“文件爆炸,性能差”
优化后的 Hash Shuffle“文件复用,性能提升”

基于 Hash 的 Shuffle 机制的优缺点
优点:
-可以省略不必要的排序开销。
-避免了排序所需的内存开销。
缺点:
-生产的文件过多,会对文件系统造成压力。
-大量小文件的随机读写带来一定的磁盘开销。
-数据块写入时所需的缓存空间也会随之增加,对内存造成压力。

二、SortShuffle 解析

SortShuffleManager 的运行机制主要分成三种:
1.普通运行机制;
2.bypass 运行机制,当 shuffle read task 的数量小于等于spark.shuffle.sort.bypassMergeThreshold参数的值时(默认为 200),就会启用 bypass 机制;
3.Tungsten Sort 运行机制,开启此运行机制需设置配置项 spark.shuffle.manager=tungsten-sort。开启此项配置也不能保证就一定采用此运行机制(后面会解释)。

在 Spark 中,Sort Shuffle 是一种优化后的 Shuffle 机制,它通过排序和文件合并减少了文件数量,从而提升了性能。Sort Shuffle 分为三种运行机制:普通运行机制bypass 运行机制Tungsten Sort 运行机制。下面通过简单易懂的方式讲解它们的工作原理和特点。


1. 普通运行机制

在这里插入图片描述
普通运行机制的SortShuffleManager工作原理

核心思想:

内存排序 + 磁盘合并:数据先写入内存数据结构(如 Map 或 Array),排序后溢写到磁盘,最后合并成一个文件。

实现过程:
  1. Shuffle Write
    • 数据写入内存数据结构(聚合类算子用 Map,普通算子用 Array)。
    • 当内存数据达到阈值时,排序并溢写到磁盘,生成多个临时文件。
    • 最后将所有临时文件合并成一个文件,并生成索引文件。

  2. Shuffle Read
    • Reduce 任务根据索引文件从合并后的文件中读取属于自己的数据。

优点:

文件数量少:每个 Map 任务只生成一个文件,文件数量为 Map 任务数
性能优化:减少磁盘 I/O 和内存开销。

记忆技巧:

• 普通运行机制 = “排序合并,文件少性能优”


2. bypass 运行机制

在这里插入图片描述
bypass运行机制的SortShuffleManager工作原理

核心思想:

回退到 Hash Shuffle:当 Reduce 任务数较少时,直接使用 Hash Shuffle 机制,避免排序开销。

触发条件:

• Reduce 任务数 ≤ spark.shuffle.sort.bypassMergeThreshold(默认 200)。
• 不是聚合类算子(如 reduceByKey)。

实现过程:
  1. Shuffle Write
    • 每个 Map 任务为每个 Reduce 任务生成一个临时文件。
    • 最后将所有临时文件合并成一个文件,并生成索引文件。

  2. Shuffle Read
    • Reduce 任务从合并后的文件中读取属于自己的数据。

优点:

避免排序开销:适合 Reduce 任务数较少的场景。

记忆技巧:

• bypass 运行机制 = “回退 Hash,避免排序”


3. Tungsten Sort 运行机制
核心思想:

基于 Tungsten 优化:使用高效的内存管理和序列化机制,进一步提升性能。

触发条件:

• Shuffle 依赖中不带聚合操作或排序要求。
• 使用支持重定位的序列化器(如 KryoSerializer)。
• 分区数 < 16777216。
• 单条记录长度 ≤ 128 MB。

实现过程:
  1. Shuffle Write
    • 数据直接写入 Tungsten 内存页,避免 Java 对象开销。
    • 排序后写入磁盘,生成一个文件。

  2. Shuffle Read
    • Reduce 任务从文件中读取属于自己的数据。

优点:

性能更高:减少内存开销和 GC 压力。

记忆技巧:

• Tungsten Sort = “内存优化,性能极致”


对比总结
特性普通运行机制bypass 运行机制Tungsten Sort 运行机制
核心思想排序 + 合并文件回退到 Hash Shuffle基于 Tungsten 优化
文件数量文件少(Map 任务数)文件多(Map 任务数 × Reduce 任务数)文件少(Map 任务数)
排序开销有排序无排序有排序(优化后)
适用场景通用场景Reduce 任务数较少大规模数据,无聚合或排序需求

记忆口诀

普通运行机制“排序合并,文件少性能优”
bypass 运行机制“回退 Hash,避免排序”
Tungsten Sort“内存优化,性能极致”

Spark5_Spark 底层执行原理 DAG+任务执行流程

Spark 运行流程
在这里插入图片描述

Spark运行流程 !!!

具体运行流程如下:
1.SparkContext 向资源管理器注册并向资源管理器申请运行 Executor
2.资源管理器分配 Executor,然后资源管理器启动 Executor
3.Executor 发送心跳至资源管理器
4.SparkContext 构建 DAG 有向无环图
5.将 DAG 分解成 Stage(TaskSet)
6.把 Stage 发送给 TaskScheduler
7.Executor 向 SparkContext 申请 Task
8.TaskScheduler 将 Task 发送给 Executor 运行
9.同时 SparkContext 将应用程序代码发放给 Executor
10.Task 在 Executor 上运行,运行完毕释放所有资源

1. 从代码角度看 DAG 图的构建
Val lines1 = sc.textFile(inputPath1).map(...).map(...)Val lines2 = sc.textFile(inputPath2).map(...)Val lines3 = sc.textFile(inputPath3)Val dtinone1 = lines2.union(lines3)Val dtinone = lines1.join(dtinone1)dtinone.saveAsTextFile(...)dtinone.filter(...).foreach(...)

上述代码的 DAG 图如下所示:
在这里插入图片描述

构建DAG图
Spark 内核会在需要计算发生的时刻绘制一张关于计算路径的有向无环图,也就是如上图所示的 DAG。
Spark 的计算发生在 RDD 的 Action 操作,而对 Action 之前的所有 Transformation,Spark 只是记录下 RDD 生成的轨迹,而不会触发真正的计算。

2. 将 DAG 划分为 Stage 核心算法

一个 Application 可以有多个 job 多个 Stage:
Spark Application 中可以因为不同的 Action 触发众多的 job,一个 Application 中可以有很多的 job,
每个 job 是由一个或者多个 Stage 构成的,后面的 Stage 依赖于前面的 Stage,也就是说只有前面依赖的 Stage 计算完毕后,后面的 Stage 才会运行。

划分依据:
Stage 划分的依据就是宽依赖,像 reduceByKey,groupByKey 等算子,会导致宽依赖的产生
回顾下宽窄依赖的划分原则:
窄依赖:父 RDD 的一个分区只会被子 RDD 的一个分区依赖。即一对一或者多对一的关系,可理解为独生子女。 常见的窄依赖有:map、filter、union、mapPartitions、mapValues、join(父 RDD 是 hash-partitioned)等。
宽依赖:父 RDD 的一个分区会被子 RDD 的多个分区依赖(涉及到 shuffle)。即一对多的关系,可理解为超生。 常见的宽依赖有 groupByKey、partitionBy、reduceByKey、join(父 RDD 不是 hash-partitioned)等。

核心算法:回溯算法
从后往前回溯/反向解析,遇到窄依赖加入本 Stage,遇见宽依赖进行 Stage 切分。
Spark 内核会从触发 Action 操作的那个 RDD 开始从后往前推首先会为最后一个 RDD 创建一个 Stage,然后继续倒推如果发现对某个 RDD 是宽依赖,那么就会将宽依赖的那个 RDD 创建一个新的 Stage,那个 RDD 就是新的 Stage 的最后一个 RDD。 然后依次类推,继续倒推,根据窄依赖或者宽依赖进行 Stage 的划分,直到所有的 RDD 全部遍历完成为止

3. 将 DAG 划分为 Stage 剖析

在这里插入图片描述

DAG划分Stage
一个 Spark 程序可以有多个 DAG(有几个 Action,就有几个 DAG,上图最后只有一个 Action(图中未表现),那么就是一个 DAG)。
一个 DAG 可以有多个 Stage(根据宽依赖/shuffle 进行划分)。
同一个 Stage 可以有多个 Task 并行执行(task 数=分区数,如上图,Stage1 中有三个分区 P1、P2、P3,对应的也有三个 Task)。
可以看到这个 DAG 中只 reduceByKey 操作是一个宽依赖,Spark 内核会以此为边界将其前后划分成不同的 Stage。
同时我们可以注意到,在图中 Stage1 中,从 textFile 到 flatMap 到 map 都是窄依赖,这几步操作可以形成一个流水线操作,通过 flatMap 操作生成的 partition 可以不用等待整个 RDD 计算结束,而是继续进行 map 操作,这样大大提高了计算的效率。

spark执行流程中 各个 单位的含义

Action 是触发 Spark 计算的触发器,例如 collect()、saveAsTextFile()、count() 等。每次调用 Action 操作时,Spark 会构建一个新的 DAG 并提交执行。一个 DAG 可以被划分为多个 Stage(阶段)。Stage 是 DAG 中的一个子图,表示一组可以并行执行的任务。

Stage 的划分主要基于宽依赖(Wide Dependency)和 Shuffle 操作。宽依赖是指父 RDD 的分区被多个子 RDD 分区依赖,例如 groupByKey()、reduceByKey()

Task 是并行执行的,每个 Task 负责处理一个分区的数据。Spark 会将 Task 分配到不同的节点上并行计算,从而提高计算效率。

Pipeline:流水线操作在 Stage 内部,窄依赖操作可以形成流水线(Pipeline)。例如,在你提到的 textFile -> flatMap -> map 的流程中:
流水线的优势:由于这些操作是窄依赖,它们可以在同一个 Stage 中流水线式地执行。这意味着数据不需要在不同阶段之间进行 Shuffle,从而减少了数据的序列化和网络传输开销。
效率提升:流水线操作允许数据在内存中直接传递,而不需要等待整个 RDD 计算完成。例如,flatMap 操作生成的分区可以直接传递给 map 操作,大大提高了计算效率。

spark中哪些算子是宽依赖+为什么
  1. groupByKey
    原因:将具有相同键的元素分组到一个组中,需要将所有相同键的数据聚集到同一分区,因此必须进行 Shuffle。
  2. reduceByKey
    原因:对相同键的值进行聚合操作,需要将所有相同键的数据合并,因此需要 Shuffle。
  3. sortByKey
    原因:根据键对数据进行排序,需要将数据重新分区并排序,因此会触发 Shuffle。
  4. join
    原因:将两个 RDD 按键进行连接,需要将相同键的数据聚集到同一分区。如果两个 RDD 的分区方式不同,或者没有预分区,则会触发 Shuffle。
  5. leftOuterJoin、rightOuterJoin、fullOuterJoin
    原因:这些操作类似于 join,但涉及外连接,同样需要将相同键的数据聚集到同一分区,因此会触发 Shuffle。
  6. cogroup
    原因:将多个 RDD 按键分组,需要将所有相同键的数据聚集到同一分区,因此会触发 Shuffle。
  7. partitionBy
    原因:根据指定的分区器重新分区数据,需要将数据重新分布到不同的分区,因此会触发 Shuffle。
  8. distinct
    原因:去除 RDD 中的重复元素,需要对数据进行全局去重,因此会触发 Shuffle。
  9. repartition 和 coalesce
    原因:这些操作会改变 RDD 的分区数量或分区方式,需要对数据重新分区,因此会触发 Shuffle。
  10. aggregateByKey 和 combineByKey
    原因:这些操作对相同键的值进行聚合,需要将所有相同键的数据聚集到同一分区,因此会触发 Shuffle。
  11. broadcastJoin(非 hash-partitioned)
    原因:如果在执行 join 之前没有对数据进行预分区(如 groupByKey 或 partitionBy),则会触发 Shuffle。
4. 提交 Stages

调度阶段的提交,最终会被转换成一个任务集的提交,DAGScheduler 通过 TaskScheduler 接口提交任务集,这个任务集最终会触发 TaskScheduler 构建一个 TaskSetManager 的实例来管理这个任务集的生命周期,对于 DAGScheduler 来说,提交调度阶段的工作到此就完成了。
而 TaskScheduler 的具体实现则会在得到计算资源的时候,进一步通过 TaskSetManager 调度具体的任务到对应的 Executor 节点上进行运算。
在这里插入图片描述

5. 监控 Job、Task、Executor

1.DAGScheduler 监控 Job 与 Task:
要保证相互依赖的作业调度阶段能够得到顺利的调度执行,DAGScheduler 需要监控当前作业调度阶段乃至任务的完成情况。
这通过对外暴露一系列的回调函数来实现的,对于 TaskScheduler 来说,这些回调函数主要包括任务的开始结束失败、任务集的失败,DAGScheduler 根据这些任务的生命周期信息进一步维护作业和调度阶段的状态信息。
2.DAGScheduler 监控 Executor 的生命状态:
TaskScheduler 通过回调函数通知 DAGScheduler 具体的 Executor 的生命状态,如果某一个 Executor 崩溃了,则对应的调度阶段任务集的 ShuffleMapTask 的输出结果也将标志为不可用,这将导致对应任务集状态的变更,进而重新执行相关计算任务,以获取丢失的相关数据。

6. 获取任务执行结果

1.结果 DAGScheduler:
一个具体的任务在 Executor 中执行完毕后,其结果需要以某种形式返回给 DAGScheduler,根据任务类型的不同,任务结果的返回方式也不同。
2.两种结果,中间结果与最终结果:
对于 FinalStage 所对应的任务,返回给 DAGScheduler 的是运算结果本身。
而对于中间调度阶段对应的任务 ShuffleMapTask,返回给 DAGScheduler 的是一个 MapStatus 里的相关存储信息,而非结果本身,这些存储位置信息将作为下一个调度阶段的任务获取输入数据的依据。
3.两种类型,DirectTaskResult 与 IndirectTaskResult:
根据任务结果大小的不同,ResultTask 返回的结果又分为两类:
如果结果足够小,则直接放在 DirectTaskResult 对象内中。
如果超过特定尺寸则在 Executor 端会将 DirectTaskResult 先序列化,再把序列化的结果作为一个数据块存放在 BlockManager 中,然后将 BlockManager 返回的 BlockID 放在 IndirectTaskResult 对象中返回给 TaskScheduler,TaskScheduler 进而调用 TaskResultGetter 将 IndirectTaskResult 中的 BlockID 取出并通过 BlockManager 最终取得对应的 DirectTaskResult。

7. 任务调度总体诠释

一张图说明任务总体调度:
在这里插入图片描述
任务总体调度

Spark 运行架构特点
  1. Executor 进程专属
    每个 Application 获取专属的 Executor 进程,该进程在 Application 期间一直驻留,并以多线程方式运行 Tasks。
    Spark Application 不能跨应用程序共享数据,除非将数据写入到外部存储系统。如图所示:
    在这里插入图片描述

Executor进程专属
2. 支持多种资源管理器
Spark 与资源管理器无关,只要能够获取 Executor 进程,并能保持相互通信就可以了。
Spark 支持资源管理器包含: Standalone、On Mesos、On YARN、Or On EC2。如图所示:
在这里插入图片描述

支持多种资源管理器
3. Job 提交就近原则
提交 SparkContext 的 Client 应该靠近 Worker 节点(运行 Executor 的节点),最好是在同一个 Rack(机架)里,因为 Spark Application 运行过程中 SparkContext 和 Executor 之间有大量的信息交换;
如果想在远程集群中运行,最好使用 RPC 将 SparkContext 提交给集群,不要远离 Worker 运行 SparkContext。
如图所示:
在这里插入图片描述

Job提交就近原则
4. 移动程序而非移动数据的原则执行
移动程序而非移动数据的原则执行,Task 采用了数据本地性和推测执行的优化机制。
关键方法:taskIdToLocations、getPreferedLocations。
如图所示:
在这里插入图片描述

数据本地性

Spark 6_Spark 数据倾斜 !!!

就是数据分到各个区的数量不太均匀,可以自定义分区器,想怎么分就怎么分。

Spark中的数据倾斜问题主要指:
shuffle过程中出现的数据倾斜问题,是由于不同的key对应的数据量不同导致的不同task所处理的数据量不同的问题。

例如,reduced端一共要处理100万条数据,第一个和第二个task分别被分配到了1万条数据,计算5分钟内完成,第三个task分配到了98万数据,此时第三个task可能需要10个小时完成,这使得整个Spark作业需要10个小时才能运行完成,这就是数据倾斜所带来的后果。
注意,要区分开数据倾斜与数据过量这两种情况,数据倾斜是指少数task被分配了绝大多数的数据,因此少数task运行缓慢;数据过量是指所有task被分配的数据量都很大,相差不多,所有task都运行缓慢。

数据倾斜的表现:
1.Spark作业的大部分task都执行迅速,只有有限的几个task执行的非常慢,此时可能出现了数据倾斜,作业可以运行,但是运行得非常慢;
2.Spark作业的大部分task都执行迅速,但是有的task在运行过程中会突然报出OOM,反复执行几次都在某一个task报出OOM错误,此时可能出现了数据倾斜,作业无法正常运行。 定位数据倾斜问题:
3.查阅代码中的shuffle算子,例如reduceByKey、countByKey、groupByKey、join等算子,根据代码逻辑判断此处是否会出现数据倾斜;
4.查看Spark作业的log文件,log文件对于错误的记录会精确到代码的某一行,可以根据异常定位到的代码位置来明确错误发生在第几个stage,对应的shuffle算子是哪一个;

1. 预聚合原始数据
1.1. 避免shuffle过程

绝大多数情况下,Spark作业的数据来源都是Hive表,这些Hive表基本都是经过ETL之后的昨天的数据。 为了避免数据倾斜,我们可以考虑避免shuffle过程,如果避免了shuffle过程,那么从根本上就消除了发生数据倾斜问题的可能。

如果Spark作业的数据来源于Hive表,那么可以先在Hive表中对数据进行聚合,例如按照key进行分组,将同一key对应的所有value用一种特殊的格式拼接到一个字符串里去,这样,一个key就只有一条数据了;之后,对一个key的所有value进行处理时,只需要进行map操作即可,无需再进行任何的shuffle操作。通过上述方式就避免了执行shuffle操作,也就不可能会发生任何的数据倾斜问题。

对于Hive表中数据的操作,不一定是拼接成一个字符串,也可以是直接对key的每一条数据进行累计计算。 要区分开,处理的数据量大和数据倾斜的区别。

1.2. 增大key粒度(考虑扩大key的聚合粒度(减少维度),减小数据倾斜可能性,增大每个task的数据量)

如果没有办法对每个key聚合出来一条数据,在特定场景下,可以考虑扩大key的聚合粒度(减少维度)

例如,目前有10万条用户数据,当前key的粒度是(省,城市,区,日期),现在我们考虑扩大粒度,将key的粒度扩大为(省,城市,日期),这样的话,key的数量会减少,key之间的数据量差异也有可能会减少,由此可以减轻数据倾斜的现象和问题。(此方法只针对特定类型的数据有效,当应用场景不适宜时,会加重数据倾斜)

2. 预处理导致倾斜的key (groupByKey、reduceByKey这类算子造成的数据倾斜)
2.1. 过滤

如果在Spark作业中允许丢弃某些数据,那么可以考虑将可能导致数据倾斜的key进行过滤,滤除可能导致数据倾斜的key对应的数据,这样,在Spark作业中就不会发生数据倾斜了。

2.2. 使用随机key

当使用了类似于groupByKey、reduceByKey这样的算子时,可以考虑使用随机key实现双重聚合,如下图所示:
在这里插入图片描述

随机key实现双重聚合
首先,通过map算子给每个数据的key添加随机数前缀,对key进行打散,将原先一样的key变成不一样的key,然后进行第一次聚合,这样就可以让原本被一个task处理的数据分散到多个task上去做局部聚合;随后,去除掉每个key的前缀,再次进行聚合。
此方法对于由groupByKey、reduceByKey这类算子造成的数据倾斜有比较好的效果,仅仅适用于聚合类的shuffle操作,适用范围相对较窄
如果是join类的shuffle操作,还得用其他的解决方案。
此方法也是前几种方案没有比较好的效果时要尝试的解决方案。

2.3. sample采样对倾斜key单独进行join (一个key)

在Spark中,如果某个RDD只有一个key,那么在shuffle过程中会默认将此key对应的数据打散,由不同的reduce端task进行处理。

所以当由单个key导致数据倾斜时,可有将发生数据倾斜的key单独提取出来,组成一个RDD,然后用这个原本会导致倾斜的key组成的RDD和其他RDD单独join,此时,根据Spark的运行机制,此RDD中的数据会在shuffle阶段被分散到多个task中去进行join操作

倾斜key单独join的流程如下图所示:
在这里插入图片描述
倾斜key单独join流程
适用场景分析:
对于RDD中的数据,可以将其转换为一个中间表,或者是直接使用countByKey()的方式,看一下这个RDD中各个key对应的数据量,
此时如果你发现整个RDD就一个key的数据量特别多,那么就可以考虑使用这种方法。

当数据量非常大时,可以考虑使用sample采样获取10%的数据,然后分析这10%的数据中哪个key可能会导致数据倾斜,然后将这个key对应的数据单独提取出来。

不适用场景分析:
如果一个RDD中导致数据倾斜的key很多,那么此方案不适用。

3. 提高reduce并行度

当方案一和方案二对于数据倾斜的处理没有很好的效果时,可以考虑提高shuffle过程中的reduce端并行度,reduce端并行度的提高就增加了reduce端task的数量,那么每个task分配到的数据量就会相应减少,由此缓解数据倾斜问题

3.1. reduce端并行度的设置

大部分的shuffle算子中,都可以传入一个并行度的设置参数,比如reduceByKey(500),这个参数会决定shuffle过程中reduce端的并行度,在进行shuffle操作的时候,就会对应着创建指定数量的reduce task。对于Spark SQL中的shuffle类语句,比如group by、join等,需要设置一个参数,即spark.sql.shuffle.partitions,该参数代表了shuffle read task的并行度,该值默认是200,对于很多场景来说都有点过小

增加shuffle read task的数量,可以让原本分配给一个task的多个key分配给多个task,从而让每个task处理比原来更少的数据。

举例来说,如果原本有5个key,每个key对应10条数据,这5个key都是分配给一个task的,那么这个task就要处理50条数据。而增加了shuffle read task以后,每个task就分配到一个key,即每个task就处理10条数据,那么自然每个task的执行时间都会变短了。

3.2. reduce端并行度设置存在的缺陷

提高reduce端并行度并没有从根本上改变数据倾斜的本质和问题(方案一和方案二从根本上避免了数据倾斜的发生),只是尽可能地去缓解和减轻shuffle reduce task的数据压力,以及数据倾斜的问题,适用于有较多key对应的数据量都比较大的情况。

该方案通常无法彻底解决数据倾斜,因为如果出现一些极端情况,比如某个key对应的数据量有100万,那么无论你的task数量增加到多少,这个对应着100万数据的key肯定还是会分配到一个task中去处理,因此注定还是会发生数据倾斜的。
所以这种方案只能说是在发现数据倾斜时尝试使用的一种手段,尝试去用最简单的方法缓解数据倾斜而已,或者是和其他方案结合起来使用。

在理想情况下,reduce端并行度提升后,会在一定程度上减轻数据倾斜的问题,甚至基本消除数据倾斜;
但是,在一些情况下,只会让原来由于数据倾斜而运行缓慢的task运行速度稍有提升,或者避免了某些task的OOM问题,
但是,仍然运行缓慢,此时,要及时放弃方案三,开始尝试后面的方案。

4. 使用map join

正常情况下,join操作都会执行shuffle过程,并且执行的是reduce join,也就是先将所有相同的key和对应的value汇聚到一个reduce task中,然后再进行join。普通join的过程如下图所示:
在这里插入图片描述

普通join过程
普通的join是会走shuffle过程的,而一旦shuffle,就相当于会将相同key的数据拉取到一个shuffle read task中再进行join,此时就是reduce join。但是如果一个RDD是比较小的,则可以采用广播小RDD全量数据+map算子来实现与join同样的效果,也就是map join,此时就不会发生shuffle操作,也就不会发生数据倾斜。
注意:RDD是并不能直接进行广播的,只能将RDD内部的数据通过collect拉取到Driver内存然后再进行广播。

4. 1. 核心思路:

不使用join算子进行连接操作,而使用broadcast变量与map类算子实现join操作,进而完全规避掉shuffle类的操作,彻底避免数据倾斜的发生和出现。
将较小RDD中的数据直接通过collect算子拉取到Driver端的内存中来,然后对其创建一个broadcast变量;
接着对另外一个RDD执行map类算子,在算子函数内,从broadcast变量中获取较小RDD的全量数据,与当前RDD的每一条数据按照连接key进行比对,如果连接key相同的话,那么就将两个RDD的数据用你需要的方式连接起来。

根据上述思路,根本不会发生shuffle操作,从根本上杜绝了join操作可能导致的数据倾斜问题。
当join操作有数据倾斜问题并且其中一个RDD的数据量较小时,可以优先考虑这种方式,效果非常好。

map join的过程如下图所示:
在这里插入图片描述
map join过程

4. 2. 不适用场景分析:

由于Spark的广播变量是在每个Executor中保存一个副本,如果两个RDD数据量都比较大,那么如果将一个数据量比较大的RDD做成广播变量,那么很有可能会造成内存溢出。

spark7_Spark性能优化_RDD算子调优 !!!

1. RDD复用

在对RDD进行算子时,要避免相同的算子和计算逻辑之下对RDD进行重复的计算,如下图所示:
在这里插入图片描述

RDD的重复计算
对上图中的RDD计算架构进行修改,得到如下图所示的优化结果:
在这里插入图片描述

RDD架构优化

2. 尽早filter

获取到初始RDD后,应该考虑尽早地过滤掉不需要的数据,进而减少对内存的占用,从而提升Spark作业的运行效率。
本文首发于公众号:五分钟学大数据,欢迎围观!回复【书籍】即可获得上百本大数据书籍

3. 读取大量小文件-用wholeTextFiles

当我们将一个文本文件读取为 RDD 时,输入的每一行都会成为RDD的一个元素。
也可以将多个完整的文本文件一次性读取为一个pairRDD,其中键是文件名,值是文件内容。

val input:RDD[String] = sc.textFile("dir/*.log") 

如果传递目录,则将目录下的所有文件读取作为RDD。文件路径支持通配符。
但是这样对于大量的小文件读取效率并不高,应该使用 wholeTextFiles 返回值为RDD[(String, String)],其中Key是文件的名称,Value是文件的内容。

def wholeTextFiles(path: String, minPartitions: Int = defaultMinPartitions): RDD[(String, String)])

wholeTextFiles读取小文件:

val filesRDD: RDD[(String, String)] =
sc.wholeTextFiles("D:\\data\\files", minPartitions = 3)
val linesRDD: RDD[String] = filesRDD.flatMap(_._2.split("\\r\\n"))
val wordsRDD: RDD[String] = linesRDD.flatMap(_.split(" "))
wordsRDD.map((_, 1)).reduceByKey(_ + _).collect().foreach(printl
4. mapPartition和foreachPartition

-mapPartitions
map(…) 表示每一个元素
mapPartitions(
…) 表示每个分区的数据组成的迭代器
普通的map算子对RDD中的每一个元素进行操作,而mapPartitions算子对RDD中每一个分区进行操作。
如果是普通的map算子,假设一个partition有1万条数据,那么map算子中的function要执行1万次,也就是对每个元素进行操作。
在这里插入图片描述
map 算子

如果是mapPartition算子,由于一个task处理一个RDD的partition,那么一个task只会执行一次function,function一次接收所有的partition数据,效率比较高。
在这里插入图片描述
mapPartition 算子

比如,当要把RDD中的所有数据通过JDBC写入数据,如果使用map算子,那么需要对RDD中的每一个元素都创建一个数据库连接,这样对资源的消耗很大,如果使用mapPartitions算子,那么针对一个分区的数据,只需要建立一个数据库连接。
mapPartitions算子也存在一些缺点:对于普通的map操作,一次处理一条数据,如果在处理了2000条数据后内存不足,那么可以将已经处理完的2000条数据从内存中垃圾回收掉;但是如果使用mapPartitions算子,但数据量非常大时,function一次处理一个分区的数据,如果一旦内存不足,此时无法回收内存,就可能会OOM,即内存溢出。
因此,mapPartitions算子适用于数据量不是特别大的时候,此时使用mapPartitions算子对性能的提升效果还是不错的。(当数据量很大的时候,一旦使用mapPartitions算子,就会直接OOM)
在项目中,应该首先估算一下RDD的数据量、每个partition的数据量,以及分配给每个Executor的内存资源,如果资源允许,可以考虑使用mapPartitions算子代替map。

-foreachPartition
rrd.foreache(…) 表示每一个元素
rrd.forPartitions(
…) 表示每个分区的数据组成的迭代器
在生产环境中,通常使用foreachPartition算子来完成数据库的写入,通过foreachPartition算子的特性,可以优化写数据库的性能。
如果使用foreach算子完成数据库的操作,由于foreach算子是遍历RDD的每条数据,因此,每条数据都会建立一个数据库连接,这是对资源的极大浪费,因此,对于写数据库操作,我们应当使用foreachPartition算子。
与mapPartitions算子非常相似,foreachPartition是将RDD的每个分区作为遍历对象,一次处理一个分区的数据,也就是说,如果涉及数据库的相关操作,一个分区的数据只需要创建一次数据库连接,如下图所示:
在这里插入图片描述
foreachPartition 算子
使用了foreachPartition 算子后,可以获得以下的性能提升:
1.对于我们写的function函数,一次处理一整个分区的数据;
2.对于一个分区内的数据,创建唯一的数据库连接;
3.只需要向数据库发送一次SQL语句和多组参数;
在生产环境中,全部都会使用foreachPartition算子完成数据库操作。foreachPartition算子存在一个问题,与mapPartitions算子类似,如果一个分区的数据量特别大,可能会造成OOM,即内存溢出。

5. filter+coalesce/repartition(减少分区)

在Spark任务中我们经常会使用filter算子完成RDD中数据的过滤,在任务初始阶段,从各个分区中加载到的数据量是相近的,但是一旦进过filter过滤后,每个分区的数据量有可能会存在较大差异,如下图所示:
在这里插入图片描述

分区数据过滤结果

根据上图我们可以发现两个问题:
1.每个partition的数据量变小了,如果还按照之前与partition相等的task个数去处理当前数据,有点浪费task的计算资源;
2.每个partition的数据量不一样,会导致后面的每个task处理每个partition数据的时候,每个task要处理的数据量不同,这很有可能导致数据
倾斜问题。
如上图所示,第二个分区的数据过滤后只剩100条,而第三个分区的数据过滤后剩下800条,在相同的处理逻辑下,第二个分区对应的task处理的数据量与第三个分区对应的task处理的数据量差距达到了8倍,这也会导致运行速度可能存在数倍的差距,这也就是数据倾斜问题。

针对上述的两个问题,我们分别进行分析:
1.针对第一个问题,既然分区的数据量变小了,我们希望可以对分区数据进行重新分配,比如将原来4个分区的数据转化到2个分区中,这样只需要用后面的两个task进行处理即可,避免了资源的浪费。
2.针对第二个问题,解决方法和第一个问题的解决方法非常相似,对分区数据重新分配,让每个partition中的数据量差不多,这就避免了数据倾斜问题。
那么具体应该如何实现上面的解决思路?我们需要coalesce算子。
repartition与coalesce都可以用来进行重分区,其中repartition只是coalesce接口中shuffle为true的简易实现,coalesce默认情况下不进行shuffle,但是可以通过参数进行设置。

假设我们希望将原本的分区个数A通过重新分区变为B,那么有以下几种情况:
1.A > B(多数分区合并为少数分区)
A与B相差值不大
此时使用coalesce即可,无需shuffle过程。
A与B相差值很大
此时可以使用coalesce并且不启用shuffle过程,但是会导致合并过程性能低下,所以推荐设置coalesce的第二个参数为true,即启动shuffle过程。
2.A < B(少数分区分解为多数分区)
此时使用repartition即可,如果使用coalesce需要将shuffle设置为true,否则coalesce无效。
我们可以在filter操作之后,使用coalesce算子针对每个partition的数据量各不相同的情况,压缩partition的数量,而且让每个partition的数据量尽量均匀紧凑,以便于后面的task进行计算操作,在某种程度上能够在一定程度上提升性能。
注意:local模式是进程内模拟集群运行,已经对并行度和分区数量有了一定的内部优化,因此不用去设置并行度和分区数量。

6. 并行度设置

Spark作业中的并行度指各个stage的task的数量。
如果并行度设置不合理而导致并行度过低,会导致资源的极大浪费,例如,20个Executor,每个Executor分配3个CPU core,而Spark作业有40个task,这样每个Executor分配到的task个数是2个,这就使得每个Executor有一个CPU core空闲,导致资源的浪费。
理想的并行度设置,应该是让并行度与资源相匹配,简单来说就是在资源允许的前提下,并行度要设置的尽可能大,达到可以充分利用集群资源。合理的设置并行度,可以提升整个Spark作业的性能和运行速度。
Spark官方推荐,task数量应该设置为Spark作业总CPU core数量的2~3倍。之所以没有推荐task数量与CPU core总数相等,是因为task的执行时间不同,有的task执行速度快而有的task执行速度慢,如果task数量与CPU core总数相等,那么执行快的task执行完成后,会出现CPU core空闲的情况。如果task数量设置为CPU core总数的2~3倍,那么一个task执行完毕后,CPU core会立刻执行下一个task,降低了资源的浪费,同时提升了Spark作业运行的效率。
Spark作业并行度的设置如下:

val conf = new SparkConf().set("spark.default.parallelism", "500")

原则:让 cpu 的 Core(cpu 核心数) 充分利用起来, 如有100个 Core,那么并行度可以设置为200~300。

7. repartition/coalesce调节并行度

我们知道 Spark 中有并行度的调节策略,但是,并行度的设置对于Spark SQL是不生效的,用户设置的并行度只对于Spark SQL以外的所有Spark的stage生效。
Spark SQL的并行度不允许用户自己指定,Spark SQL自己会默认根据hive表对应的HDFS文件的split个数自动设置Spark SQL所在的那个stage的并行度,用户自己通 spark.default.parallelism 参数指定的并行度,只会在没Spark SQL的stage中生效。
由于Spark SQL所在stage的并行度无法手动设置,如果数据量较大,并且此stage中后续的transformation操作有着复杂的业务逻辑,而Spark SQL自动设置的task数量很少,这就意味着每个task要处理为数不少的数据量,然后还要执行非常复杂的处理逻辑,这就可能表现为第一个有Spark SQL的stage速度很慢,而后续的没有Spark SQL的stage运行速度非常快。
为了解决Spark SQL无法设置并行度和task数量的问题,我们可以使用repartition算子。
repartition 算子使用前后对比图如下:
在这里插入图片描述

repartition 算子使用前后对比图
Spark SQL这一步的并行度和task数量肯定是没有办法去改变了,但是,对于Spark SQL查询出来的RDD,立即使用repartition算子,去重新进行分区,这样可以重新分区为多个partition,从repartition之后的RDD操作,由于不再涉及Spark SQL,因此stage的并行度就会等于你手动设置的值,这样就避免了Spark SQL所在的stage只能用少量的task去处理大量数据并执行复杂的算法逻辑。使用repartition算子的前后对比如上图所示。

8. reduceByKey本地预聚合

reduceByKey相较于普通的shuffle操作一个显著的特点就是会进行map端的本地聚合,map端会先对本地的数据进行combine操作,然后将数据写入给下个stage的每个task创建的文件中,也就是在map端,对每一个key对应的value,执行reduceByKey算子函数。
reduceByKey算子的执行过程如下图所示:
在这里插入图片描述

reduceByKey 算子执行过程
使用reduceByKey对性能的提升如下:
1.本地聚合后,在map端的数据量变少,减少了磁盘IO,也减少了对磁盘空间的占用;
2.本地聚合后,下一个stage拉取的数据量变少,减少了网络传输的数据量;
3.本地聚合后,在reduce端进行数据缓存的内存占用减少;
4.本地聚合后,在reduce端进行聚合的数据量减少。
基于reduceByKey的本地聚合特征,我们应该考虑使用reduceByKey代替其他的shuffle算子,例如groupByKey。
groupByKey与reduceByKey的运行原理如下图1和图2所示:
在这里插入图片描述

图1:groupByKey原理
在这里插入图片描述

图2:reduceByKey原理
根据上图可知,groupByKey不会进行map端的聚合,而是将所有map端的数据shuffle到reduce端,然后在reduce端进行数据的聚合操作。由于reduceByKey有map端聚合的特性,使得网络传输的数据量减小,因此效率要明显高于groupByKey。

9. 使用持久化+checkpoint

Spark持久化在大部分情况下是没有问题的,但是有时数据可能会丢失,如果数据一旦丢失,就需要对丢失的数据重新进行计算,计算完后再缓存和使用,为了避免数据的丢失,可以选择对这个RDD进行checkpoint,也就是将数据持久化一份到容错的文件系统上(比如HDFS)。
一个RDD缓存并checkpoint后,如果一旦发现缓存丢失,就会优先查看checkpoint数据存不存在,如果有,就会使用checkpoint数据,而不用重新计算。也即是说,checkpoint可以视为cache的保障机制,如果cache失败,就使用checkpoint的数据。
使用checkpoint的优点在于提高了Spark作业的可靠性,一旦缓存出现问题,不需要重新计算数据,缺点在于,checkpoint时需要将数据写入HDFS等文件系统,对性能的消耗较大。
持久化设置如下:

sc.setCheckpointDir(‘HDFS’)
rdd.cache/persist(memory_and_disk)
rdd.checkpoint
10. 使用广播变量

默认情况下,task中的算子中如果使用了外部的变量,每个task都会获取一份变量的复本,这就造成了内存的极大消耗。一方面,如果后续对RDD进行持久化,可能就无法将RDD数据存入内存,只能写入磁盘,磁盘IO将会严重消耗性能;另一方面,task在创建对象的时候,也许会发现堆内存无法存放新创建的对象,这就会导致频繁的GC,GC会导致工作线程停止,进而导致Spark暂停工作一段时间,严重影响Spark性能。
假设当前任务配置了20个Executor,指定500个task,有一个20M的变量被所有task共用,此时会在500个task中产生500个副本,耗费集群10G的内存,如果使用了广播变量, 那么每个Executor保存一个副本,一共消耗400M内存,内存消耗减少了5倍。

广播变量在每个Executor保存一个副本,此Executor的所有task共用此广播变量,这让变量产生的副本数量大大减少。
在初始阶段,广播变量只在Driver中有一份副本。task在运行的时候,想要使用广播变量中的数据,此时首先会在自己本地的Executor对应的BlockManager中尝试获取变量,如果本地没有,BlockManager就会从Driver或者其他节点的BlockManager上远程拉取变量的复本,并由本地的BlockManager进行管理;之后此Executor的所有task都会直接从本地的BlockManager中获取变量。
对于多个Task可能会共用的数据可以广播到每个Executor上:

val 广播变量名= sc.broadcast(会被各个Task用到的变量,即需要广播的变量)广播变量名.value//获取广播变量
11. 使用Kryo序列化

默认情况下,Spark使用Java的序列化机制。Java的序列化机制使用方便,不需要额外的配置,在算子中使用的变量实现Serializable接口即可,但是,Java序列化机制的效率不高,序列化速度慢并且序列化后的数据所占用的空间依然较大。
Spark官方宣称Kryo序列化机制比Java序列化机制性能提高10倍左右,Spark之所以没有默认使用Kryo作为序列化类库,是因为它不支持所有对象的序列化,同时Kryo需要用户在使用前注册需要序列化的类型,不够方便,但从Spark 2.0.0版本开始,简单类型、简单类型数组、字符串类型的Shuffling RDDs 已经默认使用Kryo序列化方式了。
Kryo序列化注册方式的代码如下:

public class MyKryoRegistrator implements KryoRegistrator{@Overridepublic void registerClasses(Kryo kryo){kryo.register(StartupReportLogs.class);}
}

配置Kryo序列化方式的代码如下:

//创建SparkConf对象
val conf = new SparkConf().setMaster(…).setAppName(…)
//使用Kryo序列化库
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");  
//在Kryo序列化库中注册自定义的类集合
conf.set("spark.kryo.registrator", "bigdata.com.MyKryoRegistrator"); 

Spqrk8_Spark性能优化_Shuffle调优 !!!

1. map和reduce端缓冲区大小

在Spark任务运行过程中,如果shuffle的map端处理的数据量比较大,但是map端缓冲的大小是固定的,可能会出现map端缓冲数据频繁spill溢写到磁盘文件中的情况,使得性能非常低下,通过调节map端缓冲的大小,可以避免频繁的磁盘IO操作,进而提升Spark任务的整体性能。
map端缓冲的默认配置是32KB,如果每个task处理640KB的数据,那么会发生640/32 = 20次溢写,如果每个task处理64000KB的数据,即会发生64000/32=2000次溢写,这对于性能的影响是非常严重的。
map端缓冲的配置方法:

val conf = new SparkConf().set("spark.shuffle.file.buffer", "64")

Spark Shuffle过程中,shuffle reduce task的buffer缓冲区大小决定了reduce task每次能够缓冲的数据量,也就是每次能够拉取的数据量,如果内存资源较为充足,适当增加拉取数据缓冲区的大小,可以减少拉取数据的次数,也就可以减少网络传输的次数,进而提升性能。
reduce端数据拉取缓冲区的大小可以通过spark.reducer.maxSizeInFlight参数进行设置,默认为48MB。该参数的设置方法如下:
reduce端数据拉取缓冲区配置:

val conf = new SparkConf().set("spark.reducer.maxSizeInFlight", "96")
2. reduce端重试次数和等待时间间隔

Spark Shuffle过程中,reduce task拉取属于自己的数据时,如果因为网络异常等原因导致失败会自动进行重试。对于那些包含了特别耗时的shuffle操作的作业,建议增加重试最大次数(比如60次),以避免由于JVM的full gc或者网络不稳定等因素导致的数据拉取失败。在实践中发现,对于针对超大数据量(数十亿~上百亿)的shuffle过程,调节该参数可以大幅度提升稳定性。
reduce端拉取数据重试次数可以通过spark.shuffle.io.maxRetries参数进行设置,该参数就代表了可以重试的最大次数。如果在指定次数之内拉取还是没有成功,就可能会导致作业执行失败,默认为3,该参数的设置方法如下:
reduce端拉取数据重试次数配置:

val conf = new SparkConf().set("spark.shuffle.io.maxRetries", "6")

Spark Shuffle过程中,reduce task拉取属于自己的数据时,如果因为网络异常等原因导致失败会自动进行重试,在一次失败后,会等待一定的时间间隔再进行重试,可以通过加大间隔时长(比如60s),以增加shuffle操作的稳定性。
reduce端拉取数据等待间隔可以通过spark.shuffle.io.retryWait参数进行设置,默认值为5s,该参数的设置方法如下:
reduce端拉取数据等待间隔配置:

val conf = new SparkConf().set("spark.shuffle.io.retryWait", "60s")
3. bypass机制开启阈值

对于SortShuffleManager,如果shuffle reduce task的数量小于某一阈值则shuffle write过程中不会进行排序操作,而是直接按照未经优化的HashShuffleManager的方式去写数据,但是最后会将每个task产生的所有临时磁盘文件都合并成一个文件,并会创建单独的索引文件。
当你使用SortShuffleManager时,如果的确不需要排序操作,那么建议将这个参数调大一些,大于shuffle read task的数量,那么此时map-side就不会进行排序了,减少了排序的性能开销,但是这种方式下,依然会产生大量的磁盘文件,因此shuffle write性能有待提高。
SortShuffleManager排序操作阈值的设置可以通过spark.shuffle.sort.bypassMergeThreshold这一参数进行设置,默认值为200,该参数的设置方法如下:
reduce端拉取数据等待间隔配置:

val conf = new SparkConf().set("spark.shuffle.sort.bypassMergeThreshold", "400")

Spark9_Spark故障排除 !!!

1. 避免OOM-out of memory

在Shuffle过程,reduce端task并不是等到map端task将其数据全部写入磁盘后再去拉取,而是map端写一点数据,reduce端task就会拉取一小部分数据,然后立即进行后面的聚合、算子函数的使用等操作。

reduce端task能够拉取多少数据,由reduce拉取数据的缓冲区buffer来决定,因为拉取过来的数据都是先放在buffer中,然后再进行后续的处理,buffer的默认大小为48MB。

reduce端task会一边拉取一边计算,不一定每次都会拉满48MB的数据,可能大多数时候拉取一部分数据就处理掉了。

虽然说增大reduce端缓冲区大小可以减少拉取次数,提升Shuffle性能,但是有时map端的数据量非常大,写出的速度非常快,此时reduce端的所有task在拉取的时候,有可能全部达到自己缓冲的最大极限值,即48MB,此时,再加上reduce端执行的聚合函数的代码,可能会创建大量的对象,这可能会导致内存溢出,即OOM。

如果一旦出现reduce端内存溢出的问题,我们可以考虑减小reduce端拉取数据缓冲区的大小,例如减少为12MB。
在实际生产环境中是出现过这种问题的,这是典型的以性能换执行的原理。reduce端拉取数据的缓冲区减小,不容易导致OOM,但是相应的,reudce端的拉取次数增加,造成更多的网络传输开销,造成性能的下降。
注意,要保证任务能够运行,再考虑性能的优化。

2. 避免GC导致的shuffle文件拉取失败

在Spark作业中,有时会出现shuffle file not found的错误,这是非常常见的一个报错,有时出现这种错误以后,选择重新执行一遍,就不再报出这种错误。

出现上述问题可能的原因是Shuffle操作中,后面stage的task想要去上一个stage的task所在的Executor拉取数据,结果对方正在执行GC,执行GC会导致Executor内所有的工作现场全部停止,比如BlockManager、基于netty的网络通信等,这就会导致后面的task拉取数据拉取了半天都没有拉取到,就会报出shuffle file not found的错误,而第二次再次执行就不会再出现这种错误。

可以通过调整reduce端拉取数据重试次数和reduce端拉取数据时间间隔这两个参数来对Shuffle性能进行调整,增大参数值,使得reduce端拉取数据的重试次数增加,并且每次失败后等待的时间间隔加长。
JVM GC导致的shuffle文件拉取失败调整数据重试次数和reduce端拉取数据时间间隔:

val conf = new SparkConf().set("spark.shuffle.io.maxRetries", "6").set("spark.shuffle.io.retryWait", "60s")
3. YARN-CLIENT模式导致的网卡流量激增问题

在YARN-client模式下,Driver启动在本地机器上,而Driver负责所有的任务调度,需要与YARN集群上的多个Executor进行频繁的通信。
假设有100个Executor,1000个task,那么每个Executor分配到10个task,之后,Driver要频繁地跟Executor上运行的1000个task进行通信,通信数据非常多,并且通信品类特别高。这就导致有可能在Spark任务运行过程中,由于频繁大量的网络通讯,本地机器的网卡流量会激增。

注意,YARN-client模式只会在测试环境中使用,而之所以使用YARN-client模式,是由于可以看到详细全面的log信息,通过查看log,可以锁定程序中存在的问题,避免在生产环境下发生故障。
在生产环境下,使用的一定是YARN-cluster模式。在YARN-cluster模式下,就不会造成本地机器网卡流量激增问题,如果YARN-cluster模式下存在网络通信的问题,需要运维团队进行解决。

4. YARN-CLUSTER模式的JVM栈内存溢出无法执行问题

当Spark作业中包含SparkSQL的内容时,可能会碰到YARN-client模式下可以运行,但是YARN-cluster模式下无法提交运行(报出OOM错误)的情况。

YARN-client模式下,Driver是运行在本地机器上的,Spark使用的JVM的PermGen的配置,是本地机器上的spark-class文件,JVM永久代的大小是128MB,这个是没有问题的,但是在YARN-cluster模式下,Driver运行在YARN集群的某个节点上,使用的是没有经过配置的默认设置,PermGen永久代大小为82MB。

SparkSQL的内部要进行很复杂的SQL的语义解析、语法树转换等等,非常复杂,如果sql语句本身就非常复杂,那么很有可能会导致性能的损耗和内存的占用,特别是对PermGen的占用会比较大。
所以,此时如果PermGen占用好过了82MB,但是又小于128MB,就会出现YARN-client模式下可以运行,YARN-cluster模式下无法运行的情况。
解决上述问题的方法是增加PermGen(永久代)的容量,需要在spark-submit脚本中对相关参数进行设置,设置方法如下:

--conf spark.driver.extraJavaOptions="-XX:PermSize=128M -XX:MaxPermSize=256M"

通过上述方法就设置了Driver永久代的大小,默认为128MB,最大256MB,这样就可以避免上面所说的问题。

5. 避免SparkSQL JVM栈内存溢出

当SparkSQL的sql语句有成百上千的or关键字时,就可能会出现Driver端的JVM栈内存溢出。
JVM栈内存溢出基本上就是由于调用的方法层级过多,产生了大量的,非常深的,超出了JVM栈深度限制的递归。(我们猜测SparkSQL有大量or语句的时候,在解析SQL时,例如转换为语法树或者进行执行计划的生成的时候,对于or的处理是递归,or非常多时,会发生大量的递归)

此时,建议将一条sql语句拆分为多条sql语句来执行,每条sql语句尽量保证100个以内的子句。根据实际的生产环境试验,一条sql语句的or关键字控制在100个以内,通常不会导致JVM栈内存溢出。

Spark10_Spark大厂面试真题 !!!

1. 通常来说,Spark与MapReduce相比,Spark运行效率更高。请说明效率更高来源于Spark内置的哪些机制?

spark是借鉴了Mapreduce,并在其基础上发展起来的,继承了其分布式计算的优点并进行了改进,spark生态更为丰富,功能更为强大,性能更加适用范围广,mapreduce更简单,稳定性好。主要区别:
1.spark把运算的中间数据(shuffle阶段产生的数据)存放在内存,迭代计算效率更高,mapreduce的中间结果需要落地,保存到磁盘;
2.Spark容错性高,它通过弹性分布式数据集RDD来实现高效容错,RDD是一组分布式的存储在 节点内存中的只读性的数据集,这些集合石弹性的,某一部分丢失或者出错,可以通过整个数据集的计算流程的血缘关系来实现重建,mapreduce的容错只能重新计算;
3.Spark更通用,提供了transformation和action这两大类的多功能api,另外还有流式处理sparkstreaming模块、图计算等等,mapreduce只提供了map和reduce两种操作,流计算及其他的模块支持比较缺乏;
4.Spark框架和生态更为复杂,有RDD,血缘lineage、执行时的有向无环图DAG,stage划分等,很多时候spark作业都需要根据不同业务场景的需要进行调优以达到性能要求,mapreduce框架及其生态相对较为简单,对性能的要求也相对较弱,运行较为稳定,适合长期后台运行;
5.Spark计算框架对内存的利用和运行的并行度比mapreduce高,Spark运行容器为executor,内部ThreadPool中线程运行一个Task,mapreduce在线程内部运行container,container容器分类为MapTask和ReduceTask,Spark程序运行并行度更高;
6.Spark对于executor的优化,在JVM虚拟机的基础上对内存弹性利用:storage memory与Execution memory的弹性扩容,使得内存利用效率更高。

2. hadoop和spark使用场景?

Hadoop/MapReduce和Spark最适合的都是做离线型的数据分析,但Hadoop特别适合是单次分析的数据量“很大”的情景,而Spark则适用于数据量不是很大的情景。
1.一般情况下,对于中小互联网和企业级的大数据应用而言,单次分析的数量都不会“很大”,因此可以优先考虑使用Spark。
2.业务通常认为Spark更适用于机器学习之类的“迭代式”应用,80GB的压缩数据(解压后超过200GB),10个节点的集群规模,跑类似“sum+group-by”的应用,MapReduce花了5分钟,而spark只需要2分钟。

3. spark如何保证宕机迅速恢复?

1.适当增加spark standby master
2.编写shell脚本,定期检测master状态,出现宕机后对master进行重启操作

4. hadoop和spark的相同点和不同点?

Hadoop底层使用MapReduce计算架构,只有map和reduce两种操作,表达能力比较欠缺,而且在MR过程中会重复的读写hdfs,造成大量的磁盘io读写操作,所以适合高时延环境下批处理计算的应用;
Spark是基于内存的分布式计算架构,提供更加丰富的数据集操作类型,主要分成转化操作和行动操作,包括map、reduce、filter、flatmap、groupbykey、reducebykey、union和join等,数据分析更加快速,所以适合低时延环境下计算的应用;
spark与hadoop最大的区别在于迭代式计算模型。基于mapreduce框架的Hadoop主要分为map和reduce两个阶段,两个阶段完了就结束了,所以在一个job里面能做的处理很有限;spark计算模型是基于内存的迭代式计算模型,可以分为n个阶段,根据用户编写的RDD算子和程序,在处理完一个阶段后可以继续往下处理很多个阶段,而不只是两个阶段。所以spark相较于mapreduce,计算模型更加灵活,可以提供更强大的功能。
但是spark也有劣势,由于spark基于内存进行计算,虽然开发容易,但是真正面对大数据的时候,在没有进行调优的情况下,可能会出现各种各样的问题,比如OOM内存溢出等情况,导致spark程序可能无法运行起来,而mapreduce虽然运行缓慢,但是至少可以慢慢运行完。

5. RDD持久化原理?

spark非常重要的一个功能特性就是可以将RDD持久化在内存中。
调用cache()和persist()方法即可。cache()和persist()的区别在于,cache()是persist()的一种简化方式,cache()的底层就是调用persist()的无参版本persist(MEMORY_ONLY),将数据持久化到内存中。
如果需要从内存中清除缓存,可以使用unpersist()方法。RDD持久化是可以手动选择不同的策略的。在调用persist()时传入对应的StorageLevel即可。

6. checkpoint检查点机制?

应用场景:当spark应用程序特别复杂,从初始的RDD开始到最后整个应用程序完成有很多的步骤,而且整个应用运行时间特别长,这种情况下就比较适合使用checkpoint功能。
原因:对于特别复杂的Spark应用,会出现某个反复使用的RDD,即使之前持久化过但由于节点的故障导致数据丢失了,没有容错机制,所以需要重新计算一次数据。
Checkpoint首先会调用SparkContext的setCheckPointDIR()方法,设置一个容错的文件系统的目录,比如说HDFS;然后对RDD调用checkpoint()方法。之后在RDD所处的job运行结束之后,会启动一个单独的job,来将checkpoint过的RDD数据写入之前设置的文件系统,进行高可用、容错的类持久化操作。
检查点机制是我们在spark streaming中用来保障容错性的主要机制,它可以使spark streaming阶段性的把应用数据存储到诸如HDFS等可靠存储系统中,以供恢复时使用。具体来说基于以下两个目的服务:
1.控制发生失败时需要重算的状态数。Spark streaming可以通过转化图的谱系图来重算状态,检查点机制则可以控制需要在转化图中回溯多远。
2.提供驱动器程序容错。如果流计算应用中的驱动器程序崩溃了,你可以重启驱动器程序并让驱动器程序从检查点恢复,这样spark streaming就可以读取之前运行的程序处理数据的进度,并从那里继续。

7. checkpoint和持久化机制的区别?

最主要的区别在于持久化只是将数据保存在BlockManager中,但是RDD的lineage(血缘关系,依赖关系)是不变的。但是checkpoint执行完之后,rdd已经没有之前所谓的依赖rdd了,而只有一个强行为其设置的checkpointRDD,checkpoint之后rdd的lineage就改变了。
持久化的数据丢失的可能性更大,因为节点的故障会导致磁盘、内存的数据丢失。但是checkpoint的数据通常是保存在高可用的文件系统中,比如HDFS中,所以数据丢失可能性比较低

8. RDD机制理解吗?

rdd分布式弹性数据集,简单的理解成一种数据结构,是spark框架上的通用货币。所有算子都是基于rdd来执行的,不同的场景会有不同的rdd实现类,但是都可以进行互相转换。rdd执行过程中会形成dag图,然后形成lineage保证容错性等。从物理的角度来看rdd存储的是block和node之间的映射。
RDD是spark提供的核心抽象,全称为弹性分布式数据集。
RDD在逻辑上是一个hdfs文件,在抽象上是一种元素集合,包含了数据。它是被分区的,分为多个分区,每个分区分布在集群中的不同结点上,从而让RDD中的数据可以被并行操作(分布式数据集)
比如有个RDD有90W数据,3个partition,则每个分区上有30W数据。RDD通常通过Hadoop上的文件,即HDFS或者HIVE表来创建,还可以通过应用程序中的集合来创建;RDD最重要的特性就是容错性,可以自动从节点失败中恢复过来。即如果某个结点上的RDD partition因为节点故障,导致数据丢失,那么RDD可以通过自己的数据来源重新计算该partition。这一切对使用者都是透明的。
RDD的数据默认存放在内存中,但是当内存资源不足时,spark会自动将RDD数据写入磁盘。比如某结点内存只能处理20W数据,那么这20W数据就会放入内存中计算,剩下10W放到磁盘中。RDD的弹性体现在于RDD上自动进行内存和磁盘之间权衡和切换的机制。

9. Spark streaming以及基本工作原理?

Spark streaming是spark core API的一种扩展,可以用于进行大规模、高吞吐量、容错的实时数据流的处理。
它支持从多种数据源读取数据,比如Kafka、Flume、Twitter和TCP Socket,并且能够使用算子比如map、reduce、join和window等来处理数据,处理后的数据可以保存到文件系统、数据库等存储中。
Spark streaming内部的基本工作原理是:接受实时输入数据流,然后将数据拆分成batch,比如每收集一秒的数据封装成一个batch,然后将每个batch交给spark的计算引擎进行处理,最后会生产处一个结果数据流,其中的数据也是一个一个的batch组成的。

10. DStream以及基本工作原理?

DStream是spark streaming提供的一种高级抽象,代表了一个持续不断的数据流。
DStream可以通过输入数据源来创建,比如Kafka、flume等,也可以通过其他DStream的高阶函数来创建,比如map、reduce、join和window等。
DStream内部其实不断产生RDD,每个RDD包含了一个时间段的数据。
Spark streaming一定是有一个输入的DStream接收数据,按照时间划分成一个一个的batch,并转化为一个RDD,RDD的数据是分散在各个子节点的partition中。

11. spark有哪些组件?

1.master:管理集群和节点,不参与计算。
2.worker:计算节点,进程本身不参与计算,和master汇报。
3.Driver:运行程序的main方法,创建spark context对象。
4.spark context:控制整个application的生命周期,包括dagsheduler和task scheduler等组件。
5.client:用户提交程序的入口。

12. spark工作机制?

用户在client端提交作业后,会由Driver运行main方法并创建spark context上下文。执行add算子,形成dag图输入dagscheduler,按照add之间的依赖关系划分stage输入task scheduler。task scheduler会将stage划分为task set分发到各个节点的executor中执行。

13. 说下宽依赖和窄依赖

宽依赖:
本质就是shuffle。父RDD的每一个partition中的数据,都可能会传输一部分到下一个子RDD的每一个partition中,此时会出现父RDD和子RDD的partition之间具有交互错综复杂的关系,这种情况就叫做两个RDD之间是宽依赖。
窄依赖:
父RDD和子RDD的partition之间的对应关系是一对一的。

14. Spark主备切换机制原理知道吗?

Master实际上可以配置两个,Spark原生的standalone模式是支持Master主备切换的。当Active Master节点挂掉以后,我们可以将Standby Master切换为Active Master。
Spark Master主备切换可以基于两种机制,一种是基于文件系统的,一种是基于ZooKeeper的。
基于文件系统的主备切换机制,需要在Active Master挂掉之后手动切换到Standby Master上;
而基于Zookeeper的主备切换机制,可以实现自动切换Master。

15. spark解决了hadoop的哪些问题?

1.MR:抽象层次低,需要使用手工代码来完成程序编写,使用上难以上手;
Spark:Spark采用RDD计算模型,简单容易上手。
2.MR:只提供map和reduce两个操作,表达能力欠缺;
Spark:Spark采用更加丰富的算子模型,包括map、flatmap、groupbykey、reducebykey等;
3.MR:一个job只能包含map和reduce两个阶段,复杂的任务需要包含很多个job,这些job之间的管理以来需要开发者自己进行管理;
Spark:Spark中一个job可以包含多个转换操作,在调度时可以生成多个stage,而且如果多个map操作的分区不变,是可以放在同一个task里面去执行;
4.MR:中间结果存放在hdfs中;
Spark:Spark的中间结果一般存在内存中,只有当内存不够了,才会存入本地磁盘,而不是hdfs;
5.MR:只有等到所有的map task执行完毕后才能执行reduce task;
Spark:Spark中分区相同的转换构成流水线在一个task中执行,分区不同的需要进行shuffle操作,被划分成不同的stage需要等待前面的stage执行完才能执行。
6.MR:只适合batch批处理,时延高,对于交互式处理和实时处理支持不够;
Spark:Spark streaming可以将流拆成时间间隔的batch进行处理,实时计算。

16. 数据倾斜的产生和解决办法?

数据倾斜以为着某一个或者某几个partition的数据特别大,导致这几个partition上的计算需要耗费相当长的时间。
在spark中同一个应用程序划分成多个stage,这些stage之间是串行执行的,而一个stage里面的多个task是可以并行执行,task数目由partition数目决定,如果一个partition的数目特别大,那么导致这个task执行时间很长,导致接下来的stage无法执行,从而导致整个job执行变慢。
避免数据倾斜,一般是要选用合适的key,或者自己定义相关的partitioner,通过加盐或者哈希值来拆分这些key,从而将这些数据分散到不同的partition去执行。
如下算子会导致shuffle操作,是导致数据倾斜可能发生的关键点所在:groupByKey;reduceByKey;aggregaByKey;join;cogroup;

17. 你用sparksql处理的时候, 处理过程中用的dataframe还是直接写的sql?为什么?

这个问题的宗旨是问你spark sql 中dataframe和sql的区别,从执行原理、操作方便程度和自定义程度来分析 这个问题。

18. RDD中reduceBykey与groupByKey哪个性能好,为什么

reduceByKey:reduceByKey会在结果发送至reducer之前会对每个mapper在本地进行merge,有点类似于在MapReduce中的combiner。这样做的好处在于,在map端进行一次reduce之后,数据量会大幅度减小,从而减小传输,保证reduce端能够更快的进行结果计算。
groupByKey:groupByKey会对每一个RDD中的value值进行聚合形成一个序列(Iterator),此操作发生在reduce端,所以势必会将所有的数据通过网络进行传输,造成不必要的浪费。同时如果数据量十分大,可能还会造成OutOfMemoryError。
所以在进行大量数据的reduce操作时候建议使用reduceByKey。不仅可以提高速度,还可以防止使用groupByKey造成的内存溢出问题。

19. Spark master HA主从切换过程不会影响到集群已有作业的运行,为什么

不会的。
因为程序在运行之前,已经申请过资源了,driver和Executors通讯,不需要和master进行通讯的。

20. spark master使用zookeeper进行ha,有哪些源数据保存到Zookeeper里面

spark通过这个参数spark.deploy.zookeeper.dir指定master元数据在zookeeper中保存的位置,包括Worker,Driver和Application以及Executors。standby节点要从zk中,获得元数据信息,恢复集群运行状态,才能对外继续提供服务,作业提交资源申请等,在恢复前是不能接受请求的。
注:Master切换需要注意2点:
1、在Master切换的过程中,所有的已经在运行的程序皆正常运行! 因为Spark Application在运行前就已经通过Cluster Manager获得了计算资源,所以在运行时Job本身的 调度和处理和Master是没有任何关系。
2、在Master的切换过程中唯一的影响是不能提交新的Job:一方面不能够提交新的应用程序给集群, 因为只有Active Master才能接受新的程序的提交请求;另外一方面,已经运行的程序中也不能够因 Action操作触发新的Job的提交请求。


http://www.mrgr.cn/news/93684.html

相关文章:

  • 【基础知识】回头看Maven基础
  • 【Java代码审计 | 第七篇】文件上传漏洞成因及防范
  • 本地部署大语言模型-DeepSeek
  • 【Java代码审计 | 第四篇】SQL注入防范
  • 根据输入汉字生成带拼音的米字格字帖
  • Hive八股
  • SQL经典查询
  • 存量思维和增量思维
  • 项目实战--网页五子棋(对战功能)(9)
  • Scala 中生成一个RDD的方法
  • LeetCodeHot100
  • 【落羽的落羽 C++】C++入门基础:引用,内联,nullptr
  • c语言笔记 一维数组与二维数组
  • 【Tools】Windows下Git 2.48安装教程详解
  • [数据抓取] Python 网络爬虫 - 学习手册
  • 硬件基础(4):(1)AD采集电路设计
  • 使用express创建服务器保存数据到mysql
  • 【Linux】权限相关知识点
  • GPU编程实战指南01:CUDA编程极简手册
  • P6412题解