RDD优化:缓存和checkpoint机制、数据共享(广播变量、累加器)、RDD的依赖关系、shuffle过程、并行度说明
文章目录
- 1. 缓存和checkpoint机制
- 1.1 缓存使用
- 1.2 checkpoint
- 1.3 缓存和checkpoint的区别
- 2. 数据共享
- 2.1 广播变量
- 2.2 累加器
- 3. RDD依赖关系
- 4.shuffle过程
- 4.1 shuffle介绍
- 4.2 spark计算要尽量避免shuffle
- 5. 并行度
1. 缓存和checkpoint机制
缓存和checkpoint
也叫作rdd的持久化
,将rdd的数据存储在指定位置
。
作用:
计算容错
提升计算速度
1.1 缓存使用
缓存是将数据存储在内存或者磁盘上,缓存的特点
是计算结束时,缓存自动清空
。
- 缓存级别
- 指定缓存的数据位置
- 默认是缓存到内存上
- 使用
- persist使用该方法
- cache内部调用persist
- 手动释放 unpersist
from pyspark import SparkContext
from pyspark.storagelevel import StorageLevelsc = SparkContext()
rdd = sc.parallelize(['a', 'b','a','c'])# rdd数据进行转化
rdd_kv = rdd.map(lambda x: (x,1))
#rdd_kv数据进行缓存
rdd_kv.persist(storageLevel=StorageLevel.MEMORY_AND_DISK)
#使用action算子触发
rdd_kv.collect()# 分组处理
rdd_group = rdd_kv.groupByKey()#求和计算
rdd_reduce = rdd_group.reduceByKey(lambda x,y:x+y)
# 查看计算结果
res = rdd_reduce.collect()
print(res)
1.2 checkpoint
checkpoint也是将中间rdd数据存储起来,但是存储的位置实时分布式存储系统,可以永久保存,程序结束不会释放
。
如果需要删除就在HDFS上删除对应的目录文件。
#checkpoint使用
from pyspark import SparkContext
sc = SparkContext()#使用sc对象指定checkpoint存储位置
sc.setCheckpointDir('hdfs://node1:8020/checkpoint')rdd = sc.parallelize(['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k'])#rdd数据进行转化
rdd_kv = rdd.map(lambda x: (x, 1))#rdd 数据进行checkpoint
rdd_kv.checkpoint()
#需要使用action算子触发checkpoint
print(rdd_kv.glom().collect())#分组处理
rdd_group = rdd_kv.groupByKey()#求和计算
rdd_reduce = rdd_group.reduceByKey(lambda x, y: x + y)#查看计算结果
res = rdd_kv.collect()
print(res)res1 = rdd_group.collect()
print(res1)res2 = rdd_reduce.collect()
print(res2)
1.3 缓存和checkpoint的区别
- 生命周期
- 缓存数据,程序计算结束后自动删除
- checkpoint 程序结束,数据依然保留在HDFS
- 存储位置
- 缓存 优先存储在内存上,也可以选存储在本地磁盘,是在计算任务所在的内存和磁盘上。
- checkpoint存储在HDFS上
- 依赖关系
- 缓存数据后,会
保留
rdd之间依赖关系,缓存临时存储,数据可能会丢失,需要保留依赖,当缓存丢失后可以按照依赖重新计算。 - checkpoint,数据存储后会
断开
依赖,数据保存在HDFS,HDFS三副本机制可以保证数据不丢失,所以没有比较保留依赖关系。
注意:缓存和checkpoint可以作为rdd优化的方案,提升计算速度
,一般对经常要使用的rdd
进行缓存和checkpoint,对计算比较复杂的rdd
进行缓存或checkpoint。
- 缓存数据后,会
2. 数据共享
2.1 广播变量
如果要在分布式计算里面分发大的变量数据,这个都会由driver端进行分发,一般来讲,如果这个变量不是广播变量,那么每个task都会分发一份,这在task数目十分多的情况下Driver的带宽会成为系统的瓶颈,而且会大量消耗task服务器上的资源,如果将这个变量声明为广播变量,那么每个executor都会拥有一份,这个executor启动的task会共享这个变量,节省了通信的成本和服务器的资源。
减少task线程对应变量的定义,节省内存空间
。
#广播变量
from pyspark import SparkContextsc= SparkContext()a_obj = sc.broadcast(10)
#生成rdd
rdd = sc.parallelize([1,2,3,4,5,6,7,8,9])#转化计算
rdd2 = rdd.map(lambda x:x+a_obj.value)#查看数据
res = rdd2.collect()
print(res)
2.2 累加器
避免资源抢占造成错误
#累加器的使用
from pyspark import SparkContextsc = SparkContext()
#将变量值添加到累加器中
acc_obj = sc.accumulator(0)#rdd操作
rdd = sc.parallelize([1,2,3])#使用累加器进行数据累加
rdd2 = rdd.map(lambda x:acc_obj.add(x))#查看结果
res = rdd.collect()
print(res)res1 = rdd2.collect()
print(res1)#查看累加器的结果
print(acc_obj.value)
查看结果:
3. RDD依赖关系
- 窄依赖
- 每个父RDD的一个partition最多被子RDD的一个partition所使用。
- map
- flatMap
- filter
- 每个父RDD的一个partition最多被子RDD的一个partition所使用。
- 宽依赖
- 一个父RDD的partition会被多个子RDD的partition所使用
- groupByKey
- reduceByKey
- sortByKey
- 在宽依赖中rdd之间会发生数据交换,这个交换的过程称为rdd的shuffle
- 只要是宽依赖必然发生shuffle
- 在宽依赖进行数据交换时,只有等待所有分区交换完成后,才能进行后续的计算,非常影响计算速度。
- 一个父RDD的partition会被多个子RDD的partition所使用
那么如何判断是宽依赖还是窄依赖呢?
#判断宽窄依赖
from pyspark import SparkContext
sc = SparkContext()rdd = sc.parallelize([1,2,3,4])
rdd_kv = sc.parallelize([('a',1), ('b',2), ('c',3)])#算子演示
rdd2 = rdd.map(lambda x:x+1)
rdd3 = rdd_kv.groupByKey()#查看结果
# res = rdd2.collect()
# print(res)res = rdd3.collect()
print(res)
DAG 管理维护rdd之间依赖关系,保证代码的执行顺序,
DAG会根据依赖关系划分stage,每个stage都是一个独立的计算步骤,当发生宽依赖时,会单独拆分一个计算步骤(stage),进行相关数据计算,可以保证每个单独的stage可以并行执行
在发生宽依赖进行shuffle时,会独立的方法执行shuffle计算
拆分计算步骤的本质是为了保证数据计算的并行执行
查看spark的计算过程,通过DAG判断算子是宽依赖还是窄依赖
拆分了计算stage是宽依赖,没有拆分是窄依赖
启动spark的历史日志
start-history-server.sh
4.shuffle过程
mapreduce的shuffle作用: 将map计算后的数据传递给redue使用。
mapreduce的shuffle过程: 分区(将相同key的数据放在一个分区,采用hash),排序,合并(规约)。
将map计算的数据传递给reduce
spark中也有shuffle
当执行宽依赖的算子就会进行shuffle
将rdd的数据传递给下一个rdd,进行数据交换
无论是spark还是mr,shuffle的本质是传递交换数据。
4.1 shuffle介绍
- spark的shuffle的两个部分
- shuffle write 写
- shuffle read 读
- 会进行文件的读写,影响spark的计算速度
- spark的shuffle方法类
- 是spark封装好的处理shuffle的方法
- hashshuffle类
- 进行的是hash计算
- spark1.2版本之前主要使用,之后引入了sortshuffle
- spark2.0之后,删除了hashshuffle,从2.0版本开始使用sortshuffle类
- 优化的hashshuffle和未优化的ashshuffle
- sortshuffle类
- 排序方式将相同key值数据放在一起
- sortshuffle类使用时,有两个方法实现shuffle
- bypass模式版本和普通模式版本
- bypass模式版本不会排序,会进行hash操作
- 普通模式版本会排序进行shuffle
- 可以通过配置指定按照那种模式执行 根据task数量决定 默认 task数量小于等于200 采用bypass,task数量超过200个则使用普通模式的方法进行shuffle
- 一个分区对应一个task,所以task数量由分区数决定
普通模式和bypass模式的主要区别在于如何将相同key值的数据放在一起
- 排序 普通模式采用的策略
- 哈希取余 bypass模式采用的策略
4.2 spark计算要尽量避免shuffle
# 优化计算,减少shuffle
from pyspark import SparkContext
sc = SparkContext()rdd = sc.parallelize([('男',20),('男',23),('女',20),('女',22)])
#求不同性别的年龄和
#reduceByKey 是宽依赖算子
rdd2 = rdd.reduceByKey(lambda x,y:x+y)# 避免shuffle,需要将宽依赖算子计算的过程换成窄依赖
boy = sc.accumulator(0)
girl = sc.accumulator(0)
def func(x):if x[0]=='男':boy.add(x[1])else :girl.add(x[1])return None
rdd3 = rdd.map(func)rdd3.collect()
print(boy.value)
print(girl.value)
5. 并行度
-
资源并行度
-
task在指定任务能够使用到的cpu核心数量
-
多任务 多个进程或多个线程执行任务
- 两种方式
- 并行 多个任务同时执行
- 并发 任务交替执行
- 和cpu的核心数有关
- 例如
- cpu核心是4核 有两个线程任务 两个线程任务可以 并行执行
- cpu核心是4核 有八个线程任务 并发执行
- 两种方式
-
spark中cpu核心数据设置
- –num-executors=2 设置executors数量 和服务器数量保持一致
- –executor-cores=2 设置每个executors中的cpu核心数 每个服务器中cpu核心数一致
spark-submit --master yarn --num-executors=3 --executor-cores=2
最大支持的task并行数量是 num-executors* executor-cores =6
需要按照服务器实际的cpu核心数指定 lscpu
-
-
数据并行度
- 就是task数量,task由分区数决定
- 为了保证task能充分利用cpu资源,实现并行计算,需要设置分区数应该和资源并行度一致
- 在实际公司中就要根据公司资源并行度进行设置分区数
- 有的场景下公司会要求数据并行度大于资源并行度
资源并行度,
按照yarn安装的服务器数量指定excutor数量 3
核心数量按照yarn中的nodemanager中的核心数指定 2
数据并行度指定官方建议 数据并行度的task数量和资源并行度数量一致