当前位置: 首页 > news >正文

Saprk:数据插入的优化(forachPartition)

在spark中处理数据,将处理好的数据保存到mysql中,如果直接处理RDD数据,将其循环使得每一条数据都能插入到数据库中,如果数据量不大的情况下,可以使用。但是针对大数据,处理的数据是海量的,所以每次循环一条数据都要创建新的数据库连接,就会非常耗时,如果把数据库的连接放在外面,这样又造成了算子内外变量的问题,所以我们用foreachPartition来优化,这是每个分区建立一次数据库连接,然后再在每个分区内迭代器循环,这样极大减少了连接次数,提高了性能

代码如下:

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDDimport java.sql.{Connection, DriverManager, PreparedStatement}object Demo20ToMysql {def main(args: Array[String]): Unit = {//1、创建spark的执行环境val conf = new SparkConf()//设置运行模式conf.setMaster("local")conf.setAppName("wc")val sc = new SparkContext(conf)//2、读取数据//RDD:弹性的分布式数据集(相当于List)val linesRDD: RDD[String] = sc.textFile("data/lines.txt")//一行转换多行val wordsRDD: RDD[String] = linesRDD.flatMap(_.split(","))val kvRD: RDD[(String, Int)] = wordsRDD.map(word => (word, 1))//统计单词的数量val countRDD: RDD[(String, Int)] = kvRD.reduceByKey((x, y) => x + y)val start: Long = System.currentTimeMillis()/*   //1 创建数据库连接//数据库连接不能在网络中传输val con: Connection = DriverManager.getConnection("jdbc:mysql://master:3306/bigdata31", "root", "123456")val end: Long = System.currentTimeMillis()println(end - start)//保存到数据库中,使用JDBCcountRDD.foreach {case (word, count) =>//2 编写sql插入数据val stat: PreparedStatement = con.prepareStatement("insert into word_count values(?,?)")stat.setString(1, word)stat.setInt(2, count)stat.execute()}con.close()*///foreachPartition: 训练分区countRDD.foreachPartition(iter => {//1 创建数据库连接//每一个分区创建一个数据库连接val con: Connection = DriverManager.getConnection("jdbc:mysql://master:3306/bigdata31", "root", "123456")val end: Long = System.currentTimeMillis()println(end - start)//在分区内循环iter.foreach {case (word, count) =>//2 编写sql插入数据val stat: PreparedStatement = con.prepareStatement("insert into word_count values(?,?)")stat.setString(1, word)stat.setInt(2, count)stat.execute()}con.close()})}
}


http://www.mrgr.cn/news/55554.html

相关文章:

  • Linux运维篇-误操作已经做了pv的磁盘导致pv异常
  • 求刚体移动后的转换矩阵
  • Verilator——最简单、最细节上手教程
  • 若依前后端分离版,部署到服务器CentOS7.5
  • RHCE——时间服务器
  • Web Storage:数据储存机制
  • 电能表预付费系统-标准传输规范(STS)(15)
  • Hadoop---HDFS(2)
  • A Graph-Transformer for Whole SlideImage Classification文献笔记
  • arm_acle.h找不到
  • 基于递推式最小二乘法的PMSM参数辨识MATLAB仿真模型
  • 六、栈————相关概念详解
  • ChatGPT4o、o1 谁才是最佳大模型?
  • DDD话语批评之一:评“状态和事件本质相同”[全文]
  • 稀疏表示的图像修复、图像退化、白噪声
  • Linux conda activate报错:CondaError: Run ‘conda init‘ before ‘conda activate‘
  • 算法|牛客网华为机试1-10C++
  • 拥抱云开发的未来:腾讯云数据库、云模板与AI智能化的应用场景探索
  • 大数据新视界 --大数据大厂之区块链技术:为大数据安全保驾护航
  • GEE引擎传奇UI界面修改教程
  • 【C Language】 运算符:按位运算符;逻辑运算符;关系运算符;条件运算符
  • 光伏工程造价单自动生成
  • CEEMDAN +组合预测模型(CNN-Transformer + ARIMA)
  • Markdown 简单实用的单词本格式编辑
  • Canal数据同步
  • 变换器交流模型建模方法