当前位置: 首页 > news >正文

MapReduce处理数据流程

(一)Shuffle

MapReduce中的Shuffle过程指的是在Map方法执行后、Reduce方法执行前对数据进行分区排序的阶段

(二)处理流程

1. 首先MapReduce会将处理的数据集划分成多个split,split划分是逻辑上进行划分,而非物理上的切分,每个split默认与Block块大小相同,每个split由1个map task进行处理


2. map task以为单位读取split中的数据,将数据转换成K,V格式数据,调用一次map方法执行处理逻辑。Map Task处理完的数据首先写入到默认100M的环形缓冲区,当环形缓冲区中的空间被使用到80%时数据会发生溢写。

溢写的数据会经过分区、快速排序形成小文件数据。(根据Key计算出本条数据应该写出的分区号,最终在内部得到(K,V,P)格式数据 写入到当前map task 所在的物理节点磁盘,便于后续reduce task的处理)


3. 为了避免每条数据都产生一次IO,根据split大小不同,可能会发生多次溢写磁盘过程


4. 每次溢写磁盘时会对数据进行二次排序:按照数据(K,V,P)中的P(分区)进行排序并在每个P(分区)中按照K进行排序,这样能保证相同的分区数据放在一起并能保证每个分区内的数据按照key有序。


5. 最终多次溢写的磁盘文件(多个小文件) 数据会根据归并排序算法合并成一个完整的磁盘文件,此刻,该磁盘文件特点是分区有序且分区内部数据按照key有序


6. Reduce端每个Reduce task会从每个map task所在的节点上拷贝落地的磁盘文件对应的分区数据,对于每个Reduce task来说,从各个节点上拉取到多个分区数据后,每个分区内的数据按照key分组有序,但是总体来看这些分区文件中key数据不是全局有序状态(分区数据内部有序,外部无序)。


7. 每个Reduce task需要再通过一次归并排序,将拷贝过来的所有同一分区数据进行merge,这样每个分区内的数据变成分区内按照key有序状态,然后通过Reduce task处理将结果写出。

(三)HASH分区算法

MapReduce处理数据过程中,map端将数据转换成K,V格式数据并写入对应的分区,根据key进行hashcode取值然后与Reduce Task个数取模得到该条数据写出的分区号。

public class HashPartitioner<K, V> extends Partitioner<K, V> {/** Use {@link Object#hashCode()} to partition. */public int getPartition(K key, V value, int numReduceTasks) {return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;}}
  • hashCode值可能是负数,为了保证key的hashCode非负,所以使用key.hashCode() & Integer.MAX_VALUE 按位与操作
  •  Map端写入的分区数默认与Reduce task个数相等

(四)压缩

在MapReduce中,压缩是一项常见的优化技术,用于减少数据在存储和传输过程中所占用的空间。通过对输入、中间和输出数据进行压缩,可以有效降低存储成本、减少网络传输开销。


•  压缩比率对比: bzip2 > gzip > snappy > lzo > lz4,bzip2压缩比可以达到8:1;gzip压缩比可以达到5比1;lzo可以达到3:1。
• 压缩性能对比:lz4 > lzo > snappy > gzip>bzip2 ,lzo压缩速度可达约50M/s,解压速度可达约70M/s;gzip速度约为20M/s,解压速度约为60M/s;bzip2压缩速度约为2.5M/s,解压速度约为9.5M/s。

注:

Reduce Task 个数没有固定计算方式,可以根据处理的总数据量大小来大约估算。简单业务中每个reduce task处理512M数据,复杂业务中每个reduce task 处理1G数据。

HDFS中数据以Block进行存储,Map阶段读取数据文件时,首先会对文件进行Split分片,Split切片默认与一个block大小相等,block是物理切分,split是逻辑切分,也就是说split大小是通过offset范围来决定每个split大小,而非真正的文件切分。Split可以人为调整大小,如果要调整split的大小可以通过调节mapreduce.input.fileinputformat.split.minsize或者mapreduce.input.fileinputformat.split.maxsize参数,假设想要调节Split大小为100M,那么就设置mapreduce.input.fileinputformat.split.maxsize为100M即可,如果要调节Split为200M,那么就设置mapreduce.input.fileinputformat.split.minsize为200M即可

数据块(Block)的大小通过参数dfs.blocksize设置


http://www.mrgr.cn/news/94671.html

相关文章:

  • 【论文阅读】Adversarial Patch Attacks on Monocular Depth Estimation Networks
  • CT重建笔记(四)——三维重建
  • 数据结构-----初始数据结构、及GDB调试
  • 安装配置Anaconda,配置VSCode
  • 西咸新区 能金区调研
  • 设计模式(行为型)-策略模式
  • 基于 Verilog 的时序设计:从理论到实践的深度探索
  • 【AWS入门】AWS云计算简介
  • 快速导出接口设计表——基于DOMParser的Swagger接口详情半自动化提取方法
  • TCP 三次握手四次挥手过程详解
  • 基于物品的协同过滤(itemCF)
  • 编程题-第k个语法符号(中等)
  • RK3568 android11 基于PN7160的NXP NFC移植
  • 基于cat1的贵重物品的状态和位置小型监控系统特色解析
  • hot100算法刷题:二叉树的层序遍历
  • 解决 openeuler 系统 docker 下载慢,docker 镜像加速
  • 线性回归原理推导与应用(五):波士顿房价预测实战
  • Flux 文生图技术解析与部署实践
  • C++初阶——类和对象(三) 构造函数、析构函数
  • Prosys OPC UA Gateway:实现 OPC Classic 与 OPC UA 无缝连接