十、有C/C++/Java基础,迅速掌握Python,B站黑马2022版教程笔记(自用)
每篇必看前言:
该笔记适用于有C/C++/Java基础的想要迅速掌握Python语法的人。
该笔记是以B站黑马2022版教程为背景,所作的笔记。
链接数据库:
初始链接:
from pymysql import Connectionconn = Connection(host="localhost",port=3306, # 固定端口user="root",password="123456"
)print(conn.get_server_info()) # 打印mysql的软件版本
conn.close()
创建表:
from pymysql import Connectionconn = Connection(host="localhost",port=3306,user="root",password="123456"
)
conn.select_db("test") # 选择数据库
cursor=conn.cursor() # 获取游标对象
cursor.execute("create table test_pymysql(id int);")
# 创建表test_pymysql ATT:这里的分号可写可不写,但是mysql中写sql语句要加分号
conn.close()
查询表:
from pymysql import Connectionconn = Connection(host="localhost",port=3306,user="root",password="123456"
)
conn.select_db("mybatis")
cursor=conn.cursor() # 获取游标对象
cursor.execute("select * from tb_brand;")
results: tuple = cursor.fetchall() # fetchall拿到查询结果
for r in results:print(r)
conn.close()
PySpark(机器学习):
编程模型:
通过SparkContext对象作为编程入口,读取JSON文件、文本文件、数据库数据等数据,然后转换为RDD类对象,进行数据处理计算,最后完成写出文件、转换为list等数据输出操作。
RDD:弹性分布式数据集
数据输入:
from pyspark import SparkConf, SparkContext
import os
os.environ["PYSPARK_PYTHON"] = "D:/dev/python3.10.4/python.exe"
# 告诉spark我的python解释器在哪# 创建SparkConf类对象cf
cf=SparkConf().setMaster("local[*]").setAppName("test_spark")# 这里相当于链式调用 cf=SparkConf()
# cf.setMaster("local[*]")
# cf.setAppName("test_spark")# 基于cf创建SparkContext类对象sc
sc=SparkContext(conf=cf)
# 打印PySpark的运行版本
print(sc.version)
# 停止sc的运行,即停止PySpark程序
sc.stop()
ATT:这里结果的报红没有关系,是Pyspark的内置板块的问题。
将Python内置数据容器转换为RDD对象
通过sc.parallelize(数据容器对象)得到RDD对象
通过RDD对象.collect()得到RDD对象的内容
ATT:字符串会被拆分成单个字符存入RDD对象。字典仅有key被存入RDD对象。
from pyspark import SparkConf, SparkContext
import os
os.environ["PYSPARK_PYTHON"] = "D:/dev/python3.10.4/python.exe"
# 告诉spark我的python解释器在哪cf=SparkConf().setMaster("local[*]").setAppName("test_spark")
sc=SparkContext(conf=cf)rdd_list = sc.parallelize([1, 2, 3, 4, 5])
rdd_set = sc.parallelize({1, 2, 3, 4, 5})
rdd_tumple = sc.parallelize((1, 2, 3, 4, 5))
rdd_str = sc.parallelize("1, 2, 3, 4, 5")
rdd_dict = sc.parallelize({"key1": 1, "key2": 2})print(rdd_list.collect())
print(rdd_set.collect())
print(rdd_tumple.collect())
print(rdd_str.collect())
print(rdd_dict.collect())sc.stop()
将文本文件转换为RDD对象:
通过sc.textFile(文本文件地址)得到RDD对象。
通过RDD对象.collect()得到RDD对象的内容
有test.txt:
from pyspark import SparkConf, SparkContext
import os
os.environ["PYSPARK_PYTHON"] = "D:/dev/python3.10.4/python.exe"
# 告诉spark我的python解释器在哪cf=SparkConf().setMaster("local[*]").setAppName("test_spark")
sc=SparkContext(conf=cf)rdd_text=sc.textFile("D:/重要文件保护/Desktop/test.txt")print(rdd_text.collect())sc.stop()
数据计算(RDD成员方法\算子)
map算子:
将RDD的数据一条条处理(处理逻辑基于map算子中接受的处理函数),返回新的RDD
其处理函数要求传入参数只能有一个,且返回值类型也只有一个。传参和返回值类型可以不一样。即(U)->T
from pyspark import SparkConf, SparkContext
import os
os.environ["PYSPARK_PYTHON"] = "D:/dev/python3.10.4/python.exe"
# 告诉spark我的python解释器在哪cf=SparkConf().setMaster("local[*]").setAppName("test_spark")
sc=SparkContext(conf=cf)rdd = sc.parallelize([1, 2, 3, 4, 5])def Myfun(x):return 10*x# rdd1 = rdd.map(lambda x: 10*x) # 可以使用lambda表达式
rdd1 = rdd.map(Myfun)print(rdd1.collect())sc.stop()
flatmap算子:
与map算子只有一个区别,就是flatmap解除了嵌套。
嵌套的list:lst =[[1, 2, 3], [4, 5, 6], [7, 8, 9]]
解除嵌套的list:lst=[1, 2, 3, 4, 5, 6, 7, 8, 9]
from pyspark import SparkConf, SparkContext
import os
os.environ["PYSPARK_PYTHON"] = "D:/dev/python3.10.4/python.exe"cf=SparkConf().setMaster("local[*]").setAppName("test_spark")
sc=SparkContext(conf=cf)rdd = sc.parallelize(["shanshan deisu", "suki daisuki", "no more to say"])
# 如果我想要拿到单个的单词
rdd1 = rdd.map(lambda x: x.split(" "))
rdd2 = rdd.flatMap(lambda x: x.split(" "))print(rdd1.collect())
print(rdd2.collect())sc.stop()
reduceByKey算子:
功能:针对Key-Value型RDD(实际就是二元元组),按照key分组并完成组内Value的聚合,分组逻辑是依据提供的处理函数完成的。
可以处理二元元组。
其处理函数要求传入参数有两个,返回值类型只有一个。
传参和返回值类型必须一样。即(V,V)->V
from pyspark import SparkConf, SparkContext
import os
os.environ["PYSPARK_PYTHON"] = "D:/dev/python3.10.4/python.exe"cf=SparkConf().setMaster("local[*]").setAppName("test_spark")
sc=SparkContext(conf=cf)
# 标黄的()改成{}或[]都是可以的
rdd= sc.parallelize((("shanshan", 1), ("zhengzheng", 1), ("shanshan", 2), ("zhengzheng", 3)))
# 如果我想要拿到单个的单词
rdd1 = rdd.reduceByKey(lambda x, y: x+y)print(rdd1.collect())sc.stop()
应用场景:单词计数
在同目录下有个hello.txt文件,目的是统计文件内的单词数
from pyspark import SparkConf, SparkContext
import os
os.environ["PYSPARK_PYTHON"] = "D:/dev/python3.10.4/python.exe"cf=SparkConf().setMaster("local[*]").setAppName("test_spark")
sc=SparkContext(conf=cf)rdd = sc.textFile("hello.txt")# 拿到单个单词
rdd_words = rdd.flatMap(lambda x: x.split(" "))# 将单个单词变为二元元组,value=1
rdd_cnt = rdd_words.map(lambda x: (x, 1))# 用reduceByKey对二元元组进行聚合
rdd_final =rdd_cnt.reduceByKey(lambda x, y: x+y)print(rdd_final.collect())sc.stop()
filter算子:
功能是根据处理函数过滤掉一些数据
rdd = sc.parallelize([1, 2, 3, 4, 5])# 偶数保留 奇数过滤
rdd_final = rdd.filter(lambda x: (x % 2) == 0)print(rdd_final.collect())
distinct算子:
功能是直接去重,无需传参。
rdd = sc.textFile("hello.txt")
rdd2 = rdd.flatMap(lambda x: x.split(" "))print(rdd2.collect())
print(rdd2.distinct().collect())
sortBy算子:
功能是:依据处理函数对数据进行排序。
语法是:rdd.sortBy(func, ascending=False, numPartitions=1)
其中func表示的是告知rdd按哪个数据进行排序。如lambda x: x[1] 表示按照rdd中的第二列元素进行排序。
ascending=True表示从小到大排序,False表示从大到小排序。
numPartitions表示分区数的意思,目前暂记为1即可。
例:对应用场景:单词计数进行出现次数从大到小的排序。
rdd_final =rdd_cnt.reduceByKey(lambda x, y: x+y)
print(rdd_final.collect())rdd_release = rdd_final.sortBy(lambda x: x[1], ascending = False, numPartitions = 1)
print(rdd_release.collect())
全部算子应用场景:
import jsonfrom pyspark import SparkConf, SparkContext
import os
os.environ["PYSPARK_PYTHON"] = "D:/dev/python3.10.4/python.exe"cf=SparkConf().setMaster("local[*]").setAppName("test_spark")
sc=SparkContext(conf=cf)rdd = sc.textFile("orders.txt")# 拿到单个json数据
rdd1 = rdd.flatMap(lambda x: x.split("|"))
print("拿到单个json数据")
print(rdd1.collect())# 将单个json数据转换为字典
rdd2 = rdd1.map(lambda x: json.loads(x))
print("\n将单个json数据转换为字典")
print(rdd2.collect())# 将数据设为(城市,销售额)二元组
rdd3 = rdd2.map(lambda x: (x["areaName"], int(x["money"])))
print("\n将数据设为(城市,销售额)二元组")
print(rdd3.collect())# 按城市分组进行聚合
rdd4 = rdd3.reduceByKey(lambda x, y:x+y)
print("\n按城市分组进行聚合")
print(rdd4.collect())# 按销售额从大到小排序
rdd5 = rdd4.sortBy(lambda x: x[1], ascending= False, numPartitions=1)
print("\n按销售额从大到小排序")
print(rdd5.collect())
print("\需求1已完成")# 只看商品类别
rdd6 = rdd2.map(lambda x: x["category"])
print("\n只看商品类别")
print(rdd6.collect())# 去重并打印
print("\n去重并打印")
print(rdd6.distinct().collect())
print("\需求2已完成")# 过滤掉除了北京市的数据
print("\n过滤掉除了北京市的数据")
rdd7 = rdd2.filter(lambda x: x["areaName"]=="北京")
print(rdd7.collect())print("\n只看北京的商品类别")
rdd8 =rdd7.map(lambda x: x["category"])
print(rdd8.collect())print("\n去重并打印")
print(rdd8.distinct().collect())
print("\需求3已完成")sc.stop()
数据输出:
collect算子:
功能:将RDD各个分区内的数据,统一收集到Driver中,形成一个List对象。即,返回值是一个list。
reduce算子:
功能:将数据按照处理函数进行聚合。func: (T,T)->T 。即,需要两个相同类型的参数传入,并返回同类型的一个返回值。
rdd=sc.parallelize(range(1, 10))
print(rdd.reduce(lambda x, y: x+y))
take算子:
功能:取RDD的前N个元素,组成一个list并返回。