当前位置: 首页 > news >正文

【Spark | Spark-Core篇】转换算子Transformation

1. Value类型

1.1 map映射

参数f是一个函数可以写作匿名子类,它可以接收一个参数。当某个RDD执行map方法时,会遍历该RDD中的每一个数据项,并依次应用f函数,从而产生一个新的RDD。即,这个新RDD中的每一个元素都是原来RDD中每一个元素依次应用f函数而得到的。

object wc_example {def main(args: Array[String]): Unit = {val sparkConf = new SparkConf().setMaster("local[*]").setAppName("wc")val sc = new SparkContext(sparkConf)// 从文件中创建RDDval rdd = sc.makeRDD(List(1, 2, 4, 1, 2, 4, 12 , 3))val maprdd = rdd.map((_, 1)).reduceByKey(_+_)maprdd.collect().foreach(println)sc.stop()}
}

借用wordcount例子,map算子的作用就是将rdd的每一个数据项做了一个处理,返回了一个元组。

元素的个数不变,元素发生了变化。

1.2 mapPartition

将待处理的数据以分区为单位发往到计算机节点进行计算,这里的处理是指可以进行任意的处理,哪怕是过滤。

object Spark_02_RDD_mapPartition {def main(args: Array[String]): Unit = {val sparkConf = new SparkConf().setMaster("local[*]").setAppName("wc")val sc = new SparkContext(sparkConf)val rdd = sc.makeRDD(List(1, 2, 3, 4), 2)rdd.mapPartitions(_.filter(_ % 2 == 0)).collect().foreach(println)sc.stop()}
}

map和mapPartition的区别

数据处理角度

map算子是分区内一个数据一个数据的操作,类似于串行操作,而mapPartition算子是以分区为单位进行批处理。

功能的角度

map算子主要目的是将数据源中的数据进行转换和改变,但是不会减少或增多数据。mapPartition算子需要传递一个迭代器(一个分区里的数据),返回一个迭代器,没有要求元素的个数保持不变。

性能的角度

map算子因为类似于串行操作,所以性能比较低,而mapPartition算子类似于批处理,所以性能较高。但是mapPartition算子会长时间占用内存,那么会导致内存可能不够用,出现内存溢出的错误。

1.3 mapPartitionWithIndex

源码:

def mapPartitionsWithIndex[U: ClassTag](f: (Int, Iterator[T]) => Iterator[U],preservesPartitioning: Boolean = false):

输入:元组(分区索引,分区的迭代器)=> 迭代器

将处理的数据以分区为单位发送到计算节点上进行处理,这里的处理是指可以进行任意的处理。在处理的同时获得分区的索引,可以通过模式匹配来选择处理哪个分区的数据。

object Spark_02_RDD_mapPartition {def main(args: Array[String]): Unit = {val sparkConf = new SparkConf().setMaster("local[*]").setAppName("wc")val sc = new SparkContext(sparkConf)val rdd = sc.makeRDD(List(1, 2, 3, 4), 2)rdd.mapPartitionsWithIndex((index, iter)=>{if(index == 0){iter.filter(_%2==0)}else{Nil.iterator}}).collect().foreach(println)sc.stop()}
}

1.4 flatMap扁平化(flatten + map)

1)功能说明

与map操作类似,将RDD中的每一个元素通过应用f函数依次转换为新的元素,并封装到RDD中。

区别:在flatMap操作中,f函数的返回值是一个集合,并且会将每一个该集合中的元素拆分出来放到新的RDD中。

2)需求说明:创建一个集合,集合里面存储的还是子集合,把所有子集合中数据取出放入到一个大的集合中。

object wc_example1 {def main(args: Array[String]): Unit = {val sparkConf = new SparkConf().setMaster("local[*]").setAppName("wc")val sc = new SparkContext(sparkConf)// 从文件中创建RDDval rdd = sc.textFile("data/1.txt", 2)// data/1.txt文件内容:
//    hello spark
//    hello java
//    hello scala// flatMap函数先进行map操作,对读取的每行数据split=>得到字符串数组// 然后flatten函数扁平化=>得到字符串rdd.flatMap(_.split(" ")).collect().foreach(println)
//    hello
//    spark
//    hello
//    java
//    hello
//    scalasc.stop()}
}

1.5 glom

源码:

def glom(): RDD[Array[T]] = withScope {new MapPartitionsRDD[Array[T], T](this, (_, _, iter) => Iterator(iter.toArray))}

将同一个分区的数据直接转换为相同类型的内存数组进行处理,分区不变。

1.6 groupBy()分组

源码:

def groupBy[K](f: T => K)(implicit kt: ClassTag[K]): RDD[(K, Iterable[T])] = withScope {groupBy[K](f, defaultPartitioner(this))}

根据key分组,结果返回元组(key,迭代器)

函数说明:

将数据根据指定的规则进行分组。分区默认不变。但是数据会被打乱重新组合,我们将这样的操作称为shuffle。极限情况下,数据可能被分在同一个分区。

一个组的数据在一个分区,但并不是说一个分区只有一个组。

object Spark_02_RDD_groupBy {def main(args: Array[String]): Unit = {val sparkConf = new SparkConf().setMaster("local[*]").setAppName("wc")val sc = new SparkContext(sparkConf)// 从文件中创建RDDval rdd = sc.textFile("data/1.txt", 2)// data/1.txt文件内容:
//    hello spark
//    hello java
//    hello scala
//    rdd.flatMap(_.split(" ")).map((_, 1)).groupByKey().map(t => {
//        (t._1, t._2.size)
//      })
//      .collect().foreach(println)rdd.flatMap(_.split(" ")).map((_, 1)).groupBy(_._1).collect().foreach(println)//    (scala,CompactBuffer((scala,1)))
//    (hello,CompactBuffer((hello,1), (hello,1), (hello,1)))
//    (java,CompactBuffer((java,1)))
//    (spark,CompactBuffer((spark,1)))sc.stop()}
}

1.7 filter过滤

源码:

 def filter(f: T => Boolean): RDD[T] = withScope {val cleanF = sc.clean(f)new MapPartitionsRDD[T, T](this,(_, _, iter) => iter.filter(cleanF),preservesPartitioning = true)}

函数说明:

将数据根据指定的规则进行筛选过滤,符合规则的数据保留,不符合规则的数据丢弃。

当数据进行筛选过滤后,分区不变,但是分区内的数据可能会不均衡。生产环境下,可能会出现数据倾斜。

object Spark_04_RDD_filter {def main(args: Array[String]): Unit = {val sparkConf = new SparkConf().setMaster("local[*]").setAppName("wc")val sc = new SparkContext(sparkConf)val rdd = sc.makeRDD(List(1, 2, 3, 4, 5, 6), 2)rdd.filter(_%2==0).collect().foreach(println)
//    2
//    4
//    6sc.stop()}
}

1.8 sample

源码:

def sample(withReplacement: Boolean,fraction: Double,seed: Long = Utils.random.nextLong): RDD[T]

函数说明:

根据指定的规则从数据集中抽取数据。

object Spark_05_RDD_sample {def main(args: Array[String]): Unit = {val sparkConf = new SparkConf().setMaster("local[*]").setAppName("wc")val sc = new SparkContext(sparkConf)val rdd = sc.makeRDD(List(1, 2, 3, 4, 5, 6), 2)// 第一个参数表示:抽取数据后是否将数据返回true(放回),false(丢弃)// 第二个参数表示:数据源中每条数据被抽取的概率。// 如果抽到不放回的场合:数据源中每条数据被抽取的概率,基准值的概念// 如果抽取放回的场合,表示数据源中的每条数据被抽取的可能次数。// 第三个参数表示:抽取数据时随机算法的种子// 如果不传递第三个参数,那么使用的是当前系统时间。rdd.sample(false, 0.4, 1).collect().foreach(println)
//    1
//    2
//    4sc.stop()}
}

1.9 distinct

源码:

def distinct(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope {def removeDuplicatesInPartition(partition: Iterator[T]): Iterator[T]

函数说明:

将数据集中重复的数据去重。

object Spark_04_RDD_distinct {def main(args: Array[String]): Unit = {val sparkConf = new SparkConf().setMaster("local[*]").setAppName("wc")val sc = new SparkContext(sparkConf)val rdd = sc.makeRDD(List(1, 2, 5, 1, 2, 7, 1, 2), 2)rdd.distinct().collect().foreach(println)
//    2
//    1
//    7
//    5sc.stop()}
}

1.10 coalesce

源码:

def coalesce(numPartitions: Int, shuffle: Boolean = false,partitionCoalescer: Option[PartitionCoalescer] = Option.empty)(implicit ord: Ordering[T] = null): RDD[T]

函数说明:

根据数据量缩减分区,用于大数据集过滤后,提高小数据集执行效率。

当spark程序中,存在过多的小任务时,可以通过coalesce方法缩减合并分区,减少分区的个数,减小任务调度成本。

coalesce方法的第一个参数为要修改成的分区数。可以减少分区,也可以增大分区。但增大分区有另一个算子。

object Spark_06_RDD_coalesce {def main(args: Array[String]): Unit = {val sparkConf = new SparkConf().setMaster("local[*]").setAppName("wc")val sc = new SparkContext(sparkConf)val rdd = sc.makeRDD(List(1, 2, 3, 4, 5, 6, 7, 8,9), 3)// 将会产生三个分区:【1,2 ,3】, 【4, 5,6 】, 【7, 8, 9】rdd.coalesce(2, true).saveAsTextFile("datas/")// 缩减分区,如果将默认第二个参数,将会产生2个分区:【1,2,3】,【4,5,6,7,8,9】// 原先在一个分区里的数据,在缩减分区或增大分区以后,还在同一个分区=>并没有shuffle// 第二个参数是是否shuffle:如果设置为true,则shuffle:[1, 3, 5, 7, 9], [2, 4, 6, 8]sc.stop()}
}

注意:如果coalesce算子可以扩大分区,但如果不进行shuffle操作,是没有意义的。

所以如果想要实现扩大分区的效果,需要使用shuffle。

spark提供了一个简化的操作。

缩减分区:coalesce:如果想要数据均衡,可以采用shuffle。

扩大分区:repartition:底层代码调用的是coalesce,而且肯定会采用shuffle。

1. 11 repartition(扩大分区)

源码:

def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope {coalesce(numPartitions, shuffle = true)}

repartition方法底层调用的是coalesce方法,而且第二个参数为true=>即一定会采用shuffle。

object Spark_06_RDD_repartition {def main(args: Array[String]): Unit = {val sparkConf = new SparkConf().setMaster("local[*]").setAppName("wc")val sc = new SparkContext(sparkConf)val rdd = sc.makeRDD(List(1, 2, 3, 4, 5, 6, 7, 8,9), 3)rdd.repartition(3).saveAsTextFile("datas")// 分区数据的分配:由于采用shuffle,所以原一个分区内的数据可能会去往不同的分区// [3, 5, 9], [1, 6, 7], [2, 4, 8]sc.stop()}
}

1. 12 sortBy

源码:

def sortBy[K](f: (T) => K,ascending: Boolean = true,numPartitions: Int = this.partitions.length)(implicit ord: Ordering[K], ctag: ClassTag[K]): RDD[T] = withScope {this.keyBy[K](f).sortByKey(ascending, numPartitions).values}

根据函数返回的映射值排序。结果显示的还是原数据。

sortBy方法可以根据指定的规则对数据源中的数据进行排序,默认为升序:第二个参数ascending=true。第二个参数可以改变排序的最终顺序。

sortBy默认情况下,不会改变分区,但中间会存在shuffle操作。源码产生了shuffleRDD对象:

new ShuffledRDD[K, V, V](self, part).setKeyOrdering(if (ascending) ordering else ordering.reverse)
object Spark_07_RDD_sortBy {def main(args: Array[String]): Unit = {val sparkConf = new SparkConf().setMaster("local[*]").setAppName("wc")val sc = new SparkContext(sparkConf)val rdd = sc.makeRDD(List(4, 2, 1, 6, 7, 9, 8, 3, 5), 3)rdd.sortBy(_*2).saveAsTextFile("datas")// sortBy根据返回的映射值排序:比如以映射值_*2排序// [1, 2, 3], [4, 5, 6], [7, 8, 9]sc.stop()}
}

2. 双Value类型

2.1 intersection(交集)

源码:

def intersection(other: RDD[T]): RDD[T] = withScope {this.map(v => (v, null)).cogroup(other.map(v => (v, null))).filter { case (_, (leftGroup, rightGroup)) => leftGroup.nonEmpty && rightGroup.nonEmpty }.keys}

函数说明:

对源RDD和参数RDD求交集后返回一个新的RDD

object Spark_08_RDD_intersection {def main(args: Array[String]): Unit = {val sparkConf = new SparkConf().setMaster("local[*]").setAppName("wc")val sc = new SparkContext(sparkConf)val rdd1 = sc.makeRDD(List(1, 2, 3, 4, 5, 6), 3)val rdd2 = sc.makeRDD(List(3, 4, 5, 7, 9, 10), 3)val rdd = rdd1.intersection(rdd2)rdd.collect().foreach(println)sc.stop()}
}

2.2 union(并集)

源码:

def union(other: RDD[T]): RDD[T] = withScope {sc.union(this, other)}def union[T: ClassTag](rdds: Seq[RDD[T]]): RDD[T] = withScope {val nonEmptyRdds = rdds.filter(!_.partitions.isEmpty)val partitioners = nonEmptyRdds.flatMap(_.partitioner).toSetif (nonEmptyRdds.forall(_.partitioner.isDefined) && partitioners.size == 1) {new PartitionerAwareUnionRDD(this, nonEmptyRdds)} else {new UnionRDD(this, nonEmptyRdds)}}

函数说明:

对源RDD和参数RDD求并集。

object Spark_08_RDD_union_Transformation {def main(args: Array[String]): Unit = {val sparkConf = new SparkConf().setMaster("local[*]").setAppName("wc")val sc = new SparkContext(sparkConf)val rdd1 = sc.makeRDD(List(1, 2, 3, 4, 5, 6), 3)val rdd2 = sc.makeRDD(List(3, 4, 5, 7, 9, 10), 3)val rdd = rdd1.union(rdd2)rdd.collect().foreach(println)// [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]sc.stop()}
}

2.3 subtarct(差集)

源码:

def subtract(other: RDD[T]): RDD[T] = withScope {subtract(other, partitioner.getOrElse(new HashPartitioner(partitions.length)))}def subtract(other: RDD[T],p: Partitioner)(implicit ord: Ordering[T] = null): RDD[T] = withScope {if (partitioner == Some(p)) {// Our partitioner knows how to handle T (which, since we have a partitioner, is// really (K, V)) so make a new Partitioner that will de-tuple our fake tuplesval p2 = new Partitioner() {override def numPartitions: Int = p.numPartitionsoverride def getPartition(k: Any): Int = p.getPartition(k.asInstanceOf[(Any, _)]._1)}// Unfortunately, since we're making a new p2, we'll get ShuffleDependencies// anyway, and when calling .keys, will not have a partitioner set, even though// the SubtractedRDD will, thanks to p2's de-tupled partitioning, already be// partitioned by the right/real keys (e.g. p).this.map(x => (x, null)).subtractByKey(other.map((_, null)), p2).keys} else {this.map(x => (x, null)).subtractByKey(other.map((_, null)), p).keys}}

对源RDD和参数RDD求差集

object Spark_08_RDD_subtract_Transformation {def main(args: Array[String]): Unit = {val sparkConf = new SparkConf().setMaster("local[*]").setAppName("wc")val sc = new SparkContext(sparkConf)val rdd1 = sc.makeRDD(List(1, 2, 3, 4, 5, 6), 3)val rdd2 = sc.makeRDD(List(3, 4, 5, 7, 9, 10), 3)val rdd = rdd1.subtract(rdd2)rdd.collect().foreach(println)// 数据存在于rdd1,但不存在于rdd2// [1, 2, 6]sc.stop()}
}

2.4 zip(拉链)

intersection和union和subtract都要求RDD的数据类型一致,而zip并不要求。

源码:

def zip[U: ClassTag](other: RDD[U]): RDD[(T, U)] = withScope {zipPartitions(other, preservesPartitioning = false) { (thisIter, otherIter) =>new Iterator[(T, U)] {def hasNext: Boolean = (thisIter.hasNext, otherIter.hasNext) match {case (true, true) => truecase (false, false) => falsecase _ => throw new SparkException("Can only zip RDDs with " +"same number of elements in each partition")}def next(): (T, U) = (thisIter.next(), otherIter.next())}}}

在源码也可以看到:RDD[(T, U)],所以并不要求两个RDD的数据类型一致。

将源RDD和参数RDD的数据一一拉链起来。

object Spark_08_RDD_zip_Transformation {def main(args: Array[String]): Unit = {val sparkConf = new SparkConf().setMaster("local[*]").setAppName("wc")val sc = new SparkContext(sparkConf)val rdd1 = sc.makeRDD(List("A", "B", "C", "D", "E", "F"), 3)val rdd2 = sc.makeRDD(List(1, 2, 3, 4, 5, 6), 3)val rdd = rdd1.zip(rdd2)// 将两个RDD的数据拉链起来// 但有两个要求,分区个数numSlice相同,数据的个数一致rdd.collect().foreach(println)// (A, 1), (B, 2), (C, 3), (D, 4), (E, 5), (F, 6)sc.stop()}
}


http://www.mrgr.cn/news/54033.html

相关文章:

  • 关于懒汉饿汉模式下的线程安全问题
  • Vue脚手架学习 vue脚手架配置代理、插槽、Vuex使用、路由、ElementUi插件库的使用
  • 从零学习大模型(一)-----GPT3(上)
  • C#中Task.ContinueWith如何使用
  • Pytorch常用函数汇总【持续更新】
  • Linux运维篇-误操作已经做了pv的磁盘导致pv异常
  • Win32图片库CxImage在vs2022下的编译和使用
  • Python基础入门
  • Linux的基础指令
  • SHELL脚本之数组介绍
  • 压缩SQL Server 2014 数据库日志文件
  • 韩江荣获2024年诺贝尔文学奖:深度解读《植物妻子》《少年来了》和《素食者》
  • 【CTF刷题9】2024.10.19
  • 《重置MobaXterm密码并连接Linux虚拟机的完整操作指南》
  • Java--集合框架
  • 【OD】【E卷】【真题】【100分】补种未成活胡杨(PythonJavajavaScriptC++C)
  • 【C++】string类(2)
  • 选择、冒泡和插入排序及其优化版本课件
  • 揭秘A/B测试:如何用Z统计量和t统计量揭示成功背后的统计学奥秘
  • Linux——K8S平台的权限规划
  • 飞控开发软件有哪些?技术详解
  • 大模型照亮人工智能医疗助手的发展之路
  • CSP-J2023年复赛
  • 那些年 我们说走就走
  • 数学归纳法——第一数学归纳法、第二数学归纳法步骤和示例
  • RHCE--ntp客户端,时间服务器服务端