Saprk:数据插入的优化(forachPartition)
在spark中处理数据,将处理好的数据保存到mysql中,如果直接处理RDD数据,将其循环使得每一条数据都能插入到数据库中,如果数据量不大的情况下,可以使用。但是针对大数据,处理的数据是海量的,所以每次循环一条数据都要创建新的数据库连接,就会非常耗时,如果把数据库的连接放在外面,这样又造成了算子内外变量的问题,所以我们用foreachPartition来优化,这是每个分区建立一次数据库连接,然后再在每个分区内迭代器循环,这样极大减少了连接次数,提高了性能
代码如下:
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDDimport java.sql.{Connection, DriverManager, PreparedStatement}object Demo20ToMysql {def main(args: Array[String]): Unit = {//1、创建spark的执行环境val conf = new SparkConf()//设置运行模式conf.setMaster("local")conf.setAppName("wc")val sc = new SparkContext(conf)//2、读取数据//RDD:弹性的分布式数据集(相当于List)val linesRDD: RDD[String] = sc.textFile("data/lines.txt")//一行转换多行val wordsRDD: RDD[String] = linesRDD.flatMap(_.split(","))val kvRD: RDD[(String, Int)] = wordsRDD.map(word => (word, 1))//统计单词的数量val countRDD: RDD[(String, Int)] = kvRD.reduceByKey((x, y) => x + y)val start: Long = System.currentTimeMillis()/* //1 创建数据库连接//数据库连接不能在网络中传输val con: Connection = DriverManager.getConnection("jdbc:mysql://master:3306/bigdata31", "root", "123456")val end: Long = System.currentTimeMillis()println(end - start)//保存到数据库中,使用JDBCcountRDD.foreach {case (word, count) =>//2 编写sql插入数据val stat: PreparedStatement = con.prepareStatement("insert into word_count values(?,?)")stat.setString(1, word)stat.setInt(2, count)stat.execute()}con.close()*///foreachPartition: 训练分区countRDD.foreachPartition(iter => {//1 创建数据库连接//每一个分区创建一个数据库连接val con: Connection = DriverManager.getConnection("jdbc:mysql://master:3306/bigdata31", "root", "123456")val end: Long = System.currentTimeMillis()println(end - start)//在分区内循环iter.foreach {case (word, count) =>//2 编写sql插入数据val stat: PreparedStatement = con.prepareStatement("insert into word_count values(?,?)")stat.setString(1, word)stat.setInt(2, count)stat.execute()}con.close()})}
}