当前位置: 首页 > news >正文

SparkSql输出数据的方式

一、普通文件输出方式

 方式一:给定输出数据源的类型和地址

df.write.format("json").save(path)
df.write.format("csv").save(path)
df.write.format("parquet").save(path)

方式二:直接调用对应数据源类型的方法

df.write.json(path)
df.write.csv(path)
df.write.parquet(path)
append: 追加模式,当数据存在时,继续追加
overwrite: 覆写模式,当数据存在时,覆写以前数据,存储当前最新数据;
error/errorifexists: 如果目标存在就报错,默认的模式
ignore: 忽略,数据存在时不做任何操作

代码编写模板: 

df.write.mode(saveMode="append").format("csv").save(path)

代码演示普通的文件输出格式: 

import osfrom pyspark.sql import SparkSessionif __name__ == '__main__':# 配置环境os.environ['JAVA_HOME'] = 'C:/Program Files/Java/jdk1.8.0_241'# 配置Hadoop的路径,就是前面解压的那个路径os.environ['HADOOP_HOME'] = 'D:/hadoop-3.3.1'# 配置base环境Python解析器的路径os.environ['PYSPARK_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe'  # 配置base环境Python解析器的路径os.environ['PYSPARK_DRIVER_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe'spark = SparkSession.builder.master("local[2]").appName("").config("spark.sql.shuffle.partitions", 2).getOrCreate()df = spark.read.json("../../datas/person.json")# 获取年龄最大的人的名字df.createOrReplaceTempView("persons")rsDf = spark.sql("""select name,age from persons where age = (select max(age) from persons)""")# 将结果打印到控制台#rsDf.write.format("console").save()#rsDf.write.json("../../datas/result",mode="overwrite")#rsDf.write.mode(saveMode='overwrite').format("json").save("../../datas/result")#rsDf.write.mode(saveMode='overwrite').format("csv").save("../../datas/result1")#rsDf.write.mode(saveMode='overwrite').format("parquet").save("../../datas/result2")#rsDf.write.mode(saveMode='append').format("csv").save("../../datas/result1")# text 保存路径为hdfs 直接报错,不支持#rsDf.write.mode(saveMode='overwrite').text("hdfs://bigdata01:9820/result")#rsDf.write.orc("hdfs://bigdata01:9820/result",mode="overwrite")rsDf.write.parquet("hdfs://bigdata01:9820/result", mode="overwrite")spark.stop()

二、保存到数据库中

代码演示:

import os
# 导入pyspark模块
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSessionif __name__ == '__main__':# 配置环境os.environ['JAVA_HOME'] = 'D:\Download\Java\JDK'# 配置Hadoop的路径,就是前面解压的那个路径os.environ['HADOOP_HOME'] = 'D:\\bigdata\hadoop-3.3.1\hadoop-3.3.1'# 配置base环境Python解析器的路径os.environ['PYSPARK_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe'  # 配置base环境Python解析器的路径os.environ['PYSPARK_DRIVER_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe'spark = SparkSession.builder.master('local[*]').appName('').config("spark.sql.shuffle.partitions", 2).getOrCreate()df5 = spark.read.format("csv").option("sep", "\t").load("../../datas/zuoye/emp.tsv")\.toDF('eid','ename','salary','sal','dept_id')df5.createOrReplaceTempView('emp')rsDf = spark.sql("select * from emp")rsDf.write.format("jdbc") \.option("driver", "com.mysql.cj.jdbc.Driver") \.option("url", "jdbc:mysql://bigdata01:3306/mysql") \.option("user", "root") \.option("password", "123456") \.option("dbtable", "emp1") \.save(mode="overwrite")spark.stop()# 使用完后,记得关闭

三、保存到hive中 

代码演示: 

import os
# 导入pyspark模块
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSessionif __name__ == '__main__':# 配置环境os.environ['JAVA_HOME'] = 'D:\Download\Java\JDK'# 配置Hadoop的路径,就是前面解压的那个路径os.environ['HADOOP_HOME'] = 'D:\\bigdata\hadoop-3.3.1\hadoop-3.3.1'# 配置base环境Python解析器的路径os.environ['PYSPARK_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe'  # 配置base环境Python解析器的路径os.environ['PYSPARK_DRIVER_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe'os.environ['HADOOP_USER_NAME'] = 'root'spark = SparkSession \.builder \.appName("HiveAPP") \.master("local[2]") \.config("spark.sql.warehouse.dir", 'hdfs://bigdata01:9820/user/hive/warehouse') \.config('hive.metastore.uris', 'thrift://bigdata01:9083') \.config("spark.sql.shuffle.partitions", 2) \.enableHiveSupport() \.getOrCreate()df5 = spark.read.format("csv").option("sep", "\t").load("../../datas/zuoye/emp.tsv") \.toDF('eid', 'ename', 'salary', 'sal', 'dept_id')df5.createOrReplaceTempView('emp')rsDf = spark.sql("select * from emp")rsDf.write.saveAsTable("spark.emp")spark.stop()# 使用完后,记得关闭

 

 

 

 

 

 


http://www.mrgr.cn/news/67112.html

相关文章:

  • .NET使用SqlSugar实现单列批量更新的几种实现和对比
  • 如何搭建obsidian学术体系——微课详解
  • InsCode线上IDE推荐及使用指南
  • 第五次作业
  • ssd作为hdd缓存加速方案
  • Java学习路线:JUL日志系统(一)日志框架介绍
  • 代码要走的路:编程“三部曲”
  • 基于Multisim光控夜灯LED电路(含仿真和报告)
  • 基于STM32的八位数码管显示Proteus仿真设计
  • ubuntu中安装matplotcpp绘图
  • web端div带地图导出png图片功能
  • [LitCTF 2023]ez_XOR
  • 第十九课 Vue组件中的方法
  • 驱动-----dht11温湿度传感器
  • 《XGBoost算法的原理推导》12-7损失函数经验损失项二阶泰勒展开式 公式解析
  • Python数据可视化seaborn
  • pyspark基础准备
  • 鸿蒙Next如何接入微信支付
  • 扩散模型的数学原理(基于分数)
  • 开源的flash浏览器 CelfFlashBrowser
  • 一招教你查看最真实的Facebook广告转化
  • 【你也能从零基础学会网站开发】 SQL Server结构化查询语言数据操作应用--DML篇 浅谈SQL JOIN多表查询之FULL JOIN 全连接查询
  • VBA06-组件
  • ThreadLocal从入门到精通
  • RPM Fusion 软件仓库简介
  • Java第十一天(实训学习整理资料(十)Java IO流)