当前位置: 首页 > news >正文

虫洞数观系列二 | Python+MySQL高效封装:为pandas数据分析铺路

目录

系列文章

1. 引言

2. 常规写法mysql

3. 封装设计接口mysql

3.1dbname.py文件

3.1.1. 导入和基类定义

3.1.2. 具体表定义类

3.1.3. 表定义整合函数

3.1.4. 全局字典和测试代码

3.2mysql_dao文件

3.2.1. 模块导入与配置

3.2.2. 数据库连接池初始化

3.2.3. CommonSQL 类功能

3.3db文件使用

4总结


系列文章

虫洞数观系列总览 | 技术全景:豆瓣电影TOP250数据采集→分析→可视化完整指南

虫洞数观系列一 | 豆瓣电影TOP250数据采集与MySQL存储实战

虫洞数观系列三 | 数据分析全链路实践:Pandas清洗统计 + Navicat可视化呈现 

1. 引言

在上一篇文章中,我们完成了豆瓣TOP250电影数据的爬取,存储字段包括:

  • 基础信息(中英文片名、详情页链接)

  • 制作信息(导演、主演、年份、国家、类型)

  • 评分数据(分数、评分人数、经典评语)

这些数据已存入MySQL数据库doubantop250movie表中。

本文核心目标

  1. 用Python封装MySQL的CRUD(增删改查)操作类

  2. 建立高效的数据存取管道

  3. 为后续的Pandas透视分析(如:

    • 按年份/国家的评分分布

    • 类型与评分的关联性

    • 导演/演员的作品统计等)奠定基础

通过标准化数据库操作接口,后续数据分析时只需关注业务逻辑,无需重复编写SQL语句。

2. 常规写法mysql

可以参考之前的文章

知识周汇 | MySQL增删改查与Python连接

对以下的数据表格实现增删改查,

# coding=utf-8import mysql.connector.pooling
import pandas as pd# 本地数据库
__config = {"host": "localhost","port": 3306,"user": "root","password": "faw-vw.1901","database": "douban"
}try:pool = mysql.connector.pooling.MySQLConnectionPool(**__config,pool_size=10)
except Exception as e:print(e)class DoubanDao():# 增def add_infro_from_douban(self):sql = "REPLACE INTO top250movie (feature) VALUES (2);"print(sql)try:con = pool.get_connection()cursor = con.cursor()cursor.execute(sql)con.commit()except Exception as e:if "con" in dir():con.rollback()finally:if "con" in dir():con.close()# 删def del_infro_from_douban(self):sql = "DELETE FROM top250movie WHERE feature ='TOP0001';"print(sql)try:con = pool.get_connection()cursor = con.cursor()cursor.execute(sql)con.commit()except Exception as e:if "con" in dir():con.rollback()finally:if "con" in dir():con.close()# 改def update_infro_from_douban(self):sql = "UPDATE top250movie SET movie_ch = '" + str("你好") + "' WHERE feature = '" + "TOP0002" + "';"# print(sql)try:con = pool.get_connection()cursor = con.cursor()cursor.execute(sql)print(sql)con.commit()except Exception as e:print(e)if "con" in dir():con.rollback()finally:if "con" in dir():con.close()# 查def select_infro_from_douban(self):sql = "SELECT update_date ,movie_ch,movie_en,movie_url FROM top250movie;"print(sql)try:con = pool.get_connection()cursor = con.cursor()cursor.execute(sql)result = cursor.fetchall()return resultexcept Exception as e:if "con" in dir():con.rollback()finally:if "con" in dir():con.close()

这种实现方式存在代码冗余问题,当数据库表数量增加时,需要为每个表单独编写定制化逻辑,显著增加了开发维护成本。

3. 封装接口设计mysql

这边设计3个py文件,dbname.py - 表定义模块,mysql_dao.py - 数据访问对象(DAO),main.py - 主程序。

架构设计图

+-------------------+    +-------------------+    +-------------------+
|    dbname.py      |    |   mysql_dao.py    |    |     main.py       |
| 表结构定义模块     |<--->| 数据访问层        |<--->| 业务逻辑层        |
+-------------------+    +-------------------+    +-------------------+|v+-------------------+|     MySQL 数据库   |+-------------------+

3.1dbname.py文件

该文件主要是想表达数据表的列名和中文名字对应关系

from typing import Dictclass TableDefinition:"""表定义基类"""@staticmethoddef _create_table_dict(table_name: str, columns: Dict[str, str]) -> Dict[str, Dict[str, str]]:"""创建表字典结构"""return {table_name: columns}class DouBan(TableDefinition):"""分析表定义"""@staticmethoddef top250movie() -> Dict[str, Dict[str, str]]:"""top250电影"""columns = {'update_date': '更新日期','feature': '特征值','movie_ch': '电影中文名','movie_en': '电影英文名','movie_url': '电影详情页链接','director': '导演','star': '主演','start_year': '上映年份','country': '国籍','type': '类型','rating': '评分','num_ratings': '评分人数','comment': '评语',}return DouBan._create_table_dict('top250movie', columns)# 其他表定义类...def get_dbname_dict() -> Dict[str, Dict[str, str]]:"""获取所有表定义的字典"""db_dict = {}# 合并所有表定义db_dict.update(DouBan.top250movie())# 添加其他表...return db_dict# 全局表定义字典
dbname_dic = get_dbname_dict()
for dbname in dbname_dic:print('>>>>>>>>>>>>>>>>>')print(dbname)print(dbname_dic[dbname])

打印的结果:

3.1.1. 导入和基类定义

from typing import Dictclass TableDefinition:"""表定义基类"""@staticmethoddef _create_table_dict(table_name: str, columns: Dict[str, str]) -> Dict[str, Dict[str, str]]:"""创建表字典结构"""return {table_name: columns}
  • typing模块导入Dict,用于类型注解。

  • 定义了一个基类TableDefinition,包含一个静态方法_create_table_dict,用于创建表结构的字典表示。该方法接收表名和列定义字典,返回一个嵌套字典,外层键是表名,内层是列名到列描述的映射。

3.1.2. 具体表定义类

class DouBan(TableDefinition):"""分析表定义"""@staticmethoddef top250movie() -> Dict[str, Dict[str, str]]:"""top250电影"""columns = {'update_date': '更新日期','feature': '特征值','movie_ch': '电影中文名','movie_en': '电影英文名','movie_url': '电影详情页链接','director': '导演','star': '主演','start_year': '上映年份','country': '国籍','type': '类型','rating': '评分','num_ratings': '评分人数','comment': '评语',}return DouBan._create_table_dict('top250movie', columns)
  • DouBan继承自TableDefinition,表示豆瓣相关的表定义。

  • 定义了一个静态方法top250movie,返回豆瓣Top250电影的表结构:

    • 包含13个字段,如更新日期、电影中英文名、导演、评分等。

    • 每个字段都有英文名和中文描述。

    • 使用基类的_create_table_dict方法生成最终的字典结构。

3.1.3. 表定义整合函数

def get_dbname_dict() -> Dict[str, Dict[str, str]]:"""获取所有表定义的字典"""db_dict = {}# 合并所有表定义db_dict.update(DouBan.top250movie())# 添加其他表...return db_dict
  • 该函数整合所有表定义,返回一个统一的字典。

  • 目前只添加了DouBan.top250movie(),但注释表明可以添加其他表定义。

3.1.4. 全局字典和测试代码

# 全局表定义字典
dbname_dic = get_dbname_dict()
for dbname in dbname_dic:print('>>>>>>>>>>>>>>>>>')print(dbname)print(dbname_dic[dbname])
  • 生成全局表定义字典dbname_dic

  • 遍历并打印每个表名及其列定义,用于测试和验证。

3.2mysql_dao文件

# coding=utf-8
import pandas as pd
from tqdm import tqdm
import mysql.connector.pooling
from db.dbname import dbname_dic# Database configuration
__config = {"host": "localhost","port": 3306,"user": "root","password": "faw-vw.1901","database": "douban"
}# Initialize connection pool
try:pool = mysql.connector.pooling.MySQLConnectionPool(**__config,pool_size=10)
except Exception as e:print(f"Error initializing connection pool: {e}")class CommonSQL:def __init__(self):self.pool = pooldef execute_sql_no_return(self, sql):"""Execute SQL without return value."""try:con = self.pool.get_connection()cursor = con.cursor()cursor.execute(sql)con.commit()self._print_success(sql)except Exception as e:if "con" in locals():con.rollback()self._print_failure(sql)print(f"Error: {e}")finally:if "con" in locals():con.close()def executemany_sql_no_return(self, sql, value_list):"""Execute many SQL statements without return value."""try:con = self.pool.get_connection()cursor = con.cursor()cursor.executemany(sql, value_list)con.commit()self._print_success(sql)except Exception as e:if "con" in locals():con.rollback()self._print_failure(sql)print(f"Error: {e}")finally:if "con" in locals():con.close()def execute_sql_return_value(self, dbname):"""Execute SQL and return values as a DataFrame."""try:con = self.pool.get_connection()cursor = con.cursor()sql = f"SELECT * FROM {dbname};"cursor.execute(sql)rows = cursor.fetchall()columns = [desc[0] for desc in cursor.description]df = pd.DataFrame(rows, columns=columns)print(df)# 将df的英文列名更换为中文列名print(dbname)print(dbname_dic)print(dbname_dic[dbname])if dbname in dbname_dic:dbname_columns_dic = dbname_dic[dbname]print(dbname_columns_dic)for each_column in list(df.columns):if each_column in dbname_columns_dic:df.rename(columns={each_column: dbname_columns_dic[each_column]}, inplace=True)return dfexcept Exception as e:if "con" in locals():con.rollback()print(f"Error: {e}")finally:if "con" in locals():con.close()def bulk_update_infor_in_db(self, df, PRIMARY_KEY, update_cols, dbname):"""Bulk update database with DataFrame."""sql = self._create_update_sql(dbname, update_cols, PRIMARY_KEY)self._bulk_operation(df, sql, update_cols, PRIMARY_KEY, dbname, "update")def bulk_insert_infor_in_db(self, df, insert_cols, dbname):"""Bulk insert into database with DataFrame."""sql = self._create_insert_sql(dbname, insert_cols)self._bulk_operation(df, sql, insert_cols, None, dbname, "insert")def bulk_replace_infor_in_db(self, df, insert_cols, dbname):"""Bulk replace into database with DataFrame."""sql = self._create_replace_sql(dbname, insert_cols)self._bulk_operation(df, sql, insert_cols, None, dbname, "replace")def clear_db_table(self, dbname):"""Clear database table."""sql = f"TRUNCATE TABLE {dbname}"self.execute_sql_no_return(sql)def _create_update_sql(self, dbname, update_cols, PRIMARY_KEY):set_parts = ", ".join([f"{col} = %s" for col in update_cols])sql = f"UPDATE {dbname} SET {set_parts} WHERE {PRIMARY_KEY[0]} = %s;"return sqldef _create_insert_sql(self, dbname, insert_cols):columns = ", ".join(insert_cols)placeholders = ", ".join(["%s"] * len(insert_cols))sql = f"INSERT INTO {dbname} ({columns}) VALUES ({placeholders});"return sqldef _create_replace_sql(self, dbname, insert_cols):columns = ", ".join(insert_cols)placeholders = ", ".join(["%s"] * len(insert_cols))sql = f"REPLACE INTO {dbname} ({columns}) VALUES ({placeholders});"return sqldef _bulk_operation(self, df, sql, cols, PRIMARY_KEY, dbname, operation):"""Helper method to perform bulk operations."""df_copy = df.copy()i_max0 = df_copy.shape[0]num = i_max0 // 5000for j in range(num + 1):value_list = []start = j * 5000end = min((j + 1) * 5000, i_max0)for i in tqdm(range(start, end), desc=f"Batch {operation}"):row = df_copy.iloc[i]values = [str(row[cols[col]]) for col in cols]if PRIMARY_KEY:values.append(str(row[PRIMARY_KEY[1]]))value_list.append(tuple(values))self.executemany_sql_no_return(sql, value_list)print(f"Database {dbname} {operation}d {end - start} rows!!!")def _print_success(self, sql):"""Print success message."""operation = "insert" if "INSERT" in sql else "update" if "UPDATE" in sql else "execute"print(f"Successfully {operation} {sql}")def _print_failure(self, sql):"""Print failure message."""operation = "insert" if "INSERT" in sql else "update" if "UPDATE" in sql else "execute"print(f"Failed {operation} {sql}")

3.2.1. 模块导入与配置

# coding=utf-8
import pandas as pd
from tqdm import tqdm
import mysql.connector.pooling
from db.dbname import dbname_dic
  • pandas:用于将查询结果转换为 DataFrame。

  • tqdm:显示批量操作的进度条。

  • mysql.connector.pooling:MySQL 连接池,提高数据库连接效率。

  • dbname_dic:从自定义模块导入表名和字段名的映射字典(如 {'update_date': '更新日期'})。

3.2.2. 数据库连接池初始化

__config = {"host": "localhost","port": 3306,"user": "root","password": "faw-vw.1901","database": "douban"
}pool = mysql.connector.pooling.MySQLConnectionPool(**__config,pool_size=10
)
  • 使用连接池管理数据库连接,默认大小为 10,避免频繁创建/销毁连接。

3.2.3. CommonSQL 类功能

初始化方法
def __init__(self):self.pool = pool
  • 直接使用全局连接池 pool

基础 SQL 操作方法
execute_sql_no_return
def execute_sql_no_return(self, sql):"""执行无返回值的 SQL(如 INSERT/UPDATE/DELETE)"""try:con = self.pool.get_connection()cursor = con.cursor()cursor.execute(sql)con.commit()self._print_success(sql)  # 打印成功日志except Exception as e:con.rollback()  # 回滚事务self._print_failure(sql)  # 打印失败日志finally:con.close()  # 释放连接
  • 用于执行不需要返回结果的 SQL(如 DML 语句)。

executemany_sql_no_return
def executemany_sql_no_return(self, sql, value_list):"""批量执行无返回值的 SQL"""try:con = self.pool.get_connection()cursor = con.cursor()cursor.executemany(sql, value_list)  # 批量执行con.commit()except Exception as e:con.rollback()finally:con.close()
  • 高效批量插入/更新数据(如 INSERT INTO ... VALUES (%s, %s))。

execute_sql_return_value
def execute_sql_return_value(self, dbname):"""执行查询并返回 DataFrame(自动转换列名为中文)"""sql = f"SELECT * FROM {dbname};"cursor.execute(sql)rows = cursor.fetchall()columns = [desc[0] for desc in cursor.description]  # 获取列名df = pd.DataFrame(rows, columns=columns)# 将英文列名替换为中文(通过 dbname_dic 映射)if dbname in dbname_dic:df.rename(columns=dbname_dic[dbname], inplace=True)return df
  • 查询结果转换为 DataFrame,并自动替换列名为中文(如 movie_ch → 电影中文名)。

批量操作方法
bulk_insert_infor_in_db / bulk_update_infor_in_db / bulk_replace_infor_in_db
def bulk_insert_infor_in_db(self, df, insert_cols, dbname):sql = self._create_insert_sql(dbname, insert_cols)  # 生成 INSERT SQLself._bulk_operation(df, sql, insert_cols, None, dbname, "insert")def _create_insert_sql(self, dbname, insert_cols):"""生成 INSERT 语句模板,如: INSERT INTO table (col1, col2) VALUES (%s, %s)"""columns = ", ".join(insert_cols)placeholders = ", ".join(["%s"] * len(insert_cols))return f"INSERT INTO {dbname} ({columns}) VALUES ({placeholders});"
  • 将 DataFrame 数据分批次(每批 5000 行)插入数据库,通过 tqdm 显示进度。

_bulk_operation(核心辅助方法)
def _bulk_operation(self, df, sql, cols, PRIMARY_KEY, dbname, operation):"""批量操作(插入/更新/替换)的通用逻辑"""for j in range(num_batches):value_list = []for i in tqdm(range(start, end), desc=f"Batch {operation}"):row = df.iloc[i]values = [str(row[col]) for col in cols]  # 提取数据if PRIMARY_KEY:  # 如果是更新操作,追加主键值values.append(str(row[PRIMARY_KEY[1]]))value_list.append(tuple(values))self.executemany_sql_no_return(sql, value_list)  # 批量执行
  • 支持分批次处理大数据量,避免内存溢出。

其他工具方法
  • clear_db_table:清空表(TRUNCATE TABLE)。

  • _print_success / _print_failure:格式化打印操作日志。

3.3db文件使用

以下完整演示了"查询→备份→清空→重新插入→更新"的数据处理流程

# 导入必要的库
import pandas as pd  # 用于数据处理和分析
from db.mysql_dao import CommonSQL  # 自定义的MySQL数据库操作类def main():"""主函数,执行数据库CRUD操作流程"""# ==================== 数据查询模块 ====================# 从'top250movie'表查询数据并返回DataFramedf = CommonSQL().execute_sql_return_value('top250movie')print(df)  # 打印原始数据print(df.columns)  # 打印列名df.to_excel('原始数据.xlsx')  # 导出到Excel备份# ==================== 数据清理模块 ====================# 清空'top250movie'表中的所有数据CommonSQL().clear_db_table('top250movie')# ==================== 数据插入模块 ====================# 从Excel重新加载数据df = pd.read_excel('原始数据.xlsx')# 定义数据库字段与DataFrame列的映射关系insert_cols = {'update_date': '更新日期',  # 数据库字段: DataFrame列名'feature': '特征值','movie_ch': '电影中文名','movie_en': '电影英文名','movie_url': '电影详情页链接','director': '导演','star': '主演','start_year': '上映年份','country': '国籍','type': '类型','rating': '评分','num_ratings': '评分人数','comment': '评语',}# 执行批量插入操作(两种方式)CommonSQL().bulk_insert_infor_in_db(df, insert_cols=insert_cols, dbname='top250movie')CommonSQL().bulk_replace_infor_in_db(df, insert_cols=insert_cols, dbname='top250movie')# ==================== 数据更新模块 ====================# 定义主键和需要更新的字段映射PRIMARY_KEY = ['feature', '特征值']  # 主键字段update_cols = {'movie_ch': '电影中文名',  # 数据库字段: DataFrame列名'movie_en': '电影英文名','movie_url': '电影详情页链接','director': '导演','star': '主演','start_year': '上映年份','country': '国籍','type': '类型','rating': '评分','num_ratings': '评分人数','comment': '评语',}# 执行批量更新操作CommonSQL().bulk_update_infor_in_db(df, PRIMARY_KEY, update_cols, 'top250movie')# 程序入口
if __name__ == '__main__':main()

4总结

作为一名长期从事数据处理与分析的专业人士,我在实际工作中总结出了一套成熟的MySQL-DataFrame交互方案。该方案有效解决了数据分析过程中常见的"数据搬运"效率瓶颈问题,显著提升了工作效能。

✅ 智能双向无缝转换
• 实现DataFrame与数据库表的自动化映射
• 免除繁琐的SQL查询编写及结果解析过程
• 全面适配各类数据分析场景的特殊需求

⚡ 高性能批处理机制
• 采用智能分块处理技术(5000行/批)
• 基于executemany预编译实现高效数据操作
• 显著降低I/O开销,提升数据处理效率

应用价值:
• 节省90%以上的数据转换时间
• 专注于核心数据分析逻辑开发
• 充分利用DataFrame的强大分析功能

让数据真正流动起来,释放分析潜能!


http://www.mrgr.cn/news/96590.html

相关文章:

  • 分布式计算Ray框架面试题及参考答案
  • Mac Apple silicon如何指定运行amd64架构的ubuntu Docker?
  • 一个判断A股交易状态的python脚本
  • USB有驱ID卡读卡器C#小程序开发
  • 哈希表 - 两数之和(Map) - JS
  • 【Kubernetes】CentOS 7 安装 Kubernetes 1.30.1
  • HCIA-数据通信datacom认证
  • Qt使用QGraphicsView绘制线路图————附带详细实现代码
  • 【零基础入门unity游戏开发——2D篇】SpriteRenderer精灵渲染器组件
  • UGNX二次开发——截图功能
  • 蓝桥杯专项复习——二分
  • 将 PyTorch Model 用可视化方法浏览 torchview,onxx, netron, summary | 撰写论文 paper
  • PDF解析黑科技:从OCR-Free到多模态大模型的进化之旅
  • DeepSeek 助力 Vue3 开发:打造丝滑的表格(Table)之添加行拖拽排序功能示例14,TableView16_14 拖拽自动保存示例
  • 《异常检测——从经典算法到深度学习》30. 在线服务系统中重复故障的可操作和可解释的故障定位
  • 基于PX4和Ardupilot固件下自定义MAVLink消息测试(QGroundControl和Mission Planner)
  • SQL注入之盲注技术详解
  • DataPlatter:利用最少成本数据提升机器人操控的泛化能力
  • 大模型时代的基础架构 读书笔记
  • Android设计模式之代理模式