MySQL存储STM32F407上的HX711数据
注意:在串口调试助手和脚本之间切换时记得拔下单片机再重新插上,否则串口会被堵塞
1.监听脚本
# -*- coding: utf-8 -*-
import serial
import mysql.connector
import re
import time
import signal# ======================
# 配置区域(根据实际情况修改)
# ======================
SERIAL_PORT = 'COM3'
BAUD_RATE = 9600
ENCODING = 'latin-1' # 适配特殊字符DB_CONFIG = {'host': 'localhost','user': 'root','password': '','database': 'weight_data','raise_on_warnings': True
}# ======================
# 核心功能函数
# ======================def test_db_connection():"""测试数据库连接是否正常"""try:conn = mysql.connector.connect(**DB_CONFIG)conn.close()print("[数据库] ✅ 连接测试成功")except mysql.connector.Error as e:print(f"[数据库] ❌ 连接失败: {e}")exit(1)def create_table(cursor):"""创建数据表(如果不存在)"""try:create_sql = """CREATE TABLE IF NOT EXISTS weight_records (id INT AUTO_INCREMENT PRIMARY KEY,weight FLOAT NOT NULL,record_time DATETIME DEFAULT CURRENT_TIMESTAMP)"""cursor.execute(create_sql)# 检查表中现有记录数cursor.execute("SELECT COUNT(*) FROM weight_records")count = cursor.fetchone()[0]print(f"[数据库] 📝 表结构正常 (现有记录数: {count})")except mysql.connector.Error as e:if e.errno == 1050: # 表已存在的错误码print("[数据库] 📝 表已存在,继续使用")else:raise # 其他错误则继续抛出def parse_serial_data(line):"""解析串口数据"""try:# 清理数据,移除空白字符line = line.strip()print(f"[调试] 清理后的数据: '{line}'")# 直接尝试将整行转换为数字if line:weight = float(line)print(f"[解析] ✅ 成功解析重量值: {weight}g")return weightprint(f"[解析] ⚠️ 数据为空")return Noneexcept Exception as e:print(f"[解析] ❌ 解析错误: {e}")print(f"[解析] ⚠️ 问题数据: {repr(line)}")return Nonedef save_to_db(weight):"""无条件保存数据到数据库"""try:conn = mysql.connector.connect(**DB_CONFIG)cursor = conn.cursor()cursor.execute("INSERT INTO weight_records (weight) VALUES (%s)", (weight,))conn.commit()print(f"[数据库] 📊 数据已保存: {weight}g")except mysql.connector.Error as e:print(f"[数据库] ❌ 保存失败: {e}")finally:if 'conn' in locals():cursor.close()conn.close()def view_stored_data():"""查看数据库中存储的数据"""try:conn = mysql.connector.connect(**DB_CONFIG)cursor = conn.cursor()cursor.execute("SELECT id, weight, record_time FROM weight_records ORDER BY record_time DESC LIMIT 10")records = cursor.fetchall()if not records:print("[数据库] 📊 暂无存储数据")else:print("\n[数据库] 📊 最近10条记录:")print("ID\t重量(g)\t\t记录时间")print("-" * 50)for record in records:print(f"{record[0]}\t{record[1]:.2f}\t\t{record[2]}")except mysql.connector.Error as e:print(f"[数据库] ❌ 查询失败: {e}")finally:if 'conn' in locals():cursor.close()conn.close()def signal_handler(signum, frame):"""处理中止信号"""print("\n[系统] ⏹️ 正在优雅退出...")print("\n当前数据库中的记录:")view_stored_data()exit(0)def main():"""主程序"""signal.signal(signal.SIGINT, signal_handler)try:ser = serial.Serial(port=SERIAL_PORT,baudrate=BAUD_RATE,bytesize=serial.EIGHTBITS,parity=serial.PARITY_NONE,stopbits=serial.STOPBITS_ONE,timeout=1 # 设置更短的超时时间,方便调试)except serial.SerialException as e:print(f"[串口] ❌ 设备未找到: {e}")exit(1)print(f"[系统] 🟢 正在监听 {SERIAL_PORT}...")print("[提示] 按 Ctrl+C 可以查看已存储数据并退出程序")test_db_connection()try:conn = mysql.connector.connect(**DB_CONFIG)cursor = conn.cursor()create_table(cursor)conn.commit()print("[数据库] ✅ 数据表已就绪")except mysql.connector.Error as e:if e.errno != 1050: # 忽略"表已存在"的错误print(f"[数据库] ❌ 数据库操作失败: {e}")exit(1)finally:if 'conn' in locals():cursor.close()conn.close()data_count = 0buffer = ""try:with ser:print("[串口] 🔄 正在等待数据...")print("[调试] 提示:请确保STM32已连接并运行")while True:if ser.in_waiting > 0:# 读取字节char = ser.read().decode(ENCODING, errors='ignore')print(".", end="", flush=True) # 显示接收进度if char == '\n': # 收到换行符时处理数据if buffer:print(f"\n[串口] 📡 收到完整数据: {repr(buffer)}")weight = parse_serial_data(buffer)if weight is not None:if 0 <= weight <= 100000: # 数据范围检查save_to_db(weight)data_count += 1print(f"[统计] ✅ 已成功存储 {data_count} 条数据")else:print(f"[警告] ⚠️ 数据超出合理范围: {weight}g")buffer = "" # 清空缓冲区else:buffer += char # 累积数据time.sleep(0.01) # 短暂延时,避免CPU占用过高except Exception as e:print(f"\n[系统] ❌ 发生错误: {e}")finally:if ser.is_open:ser.close()print("[系统] 🔌 串口已关闭")if __name__ == "__main__":main()
2.查询脚本 
import mysql.connector
import signal
import sys# ======================
# 配置区域(根据实际情况修改)
# ======================DB_CONFIG = {'host': 'localhost','user': 'root','password': '12345wjh','database': 'weight_data','raise_on_warnings': True
}def ensure_table_exists():"""确保数据表存在并返回当前记录数"""conn = Nonetry:conn = mysql.connector.connect(**DB_CONFIG)cursor = conn.cursor()# 检查表是否存在cursor.execute("SHOW TABLES LIKE 'weight_records'")table_exists = cursor.fetchone() is not Noneif not table_exists:# 创建表create_table_sql = """CREATE TABLE weight_records (id INT AUTO_INCREMENT PRIMARY KEY,weight FLOAT NOT NULL,record_time DATETIME DEFAULT CURRENT_TIMESTAMP)"""cursor.execute(create_table_sql)conn.commit()print("[数据库] ✅ 数据表创建成功")# 检查表中现有记录数cursor.execute("SELECT COUNT(*) FROM weight_records")count = cursor.fetchone()[0]print(f"[数据库] ✅ 表结构正常 (现有记录数: {count})")return countexcept mysql.connector.Error as e:error_msg = str(e)if "Access denied" in error_msg:print(f"[数据库] ❌ 数据库访问被拒绝,请检查用户名和密码")elif "Connection refused" in error_msg:print(f"[数据库] ❌ 无法连接到数据库服务器,请确保MySQL服务已启动")elif "Unknown database" in error_msg:print(f"[数据库] ❌ 数据库'{DB_CONFIG['database']}'不存在")else:print(f"[数据库] ❌ 数据库操作失败: {error_msg}")sys.exit(1)finally:if conn:try:conn.cursor().close()conn.close()except:passdef view_stored_data():"""查看数据库中存储的数据"""try:conn = mysql.connector.connect(**DB_CONFIG)cursor = conn.cursor()cursor.execute("SELECT id, weight, record_time FROM weight_records ORDER BY record_time DESC LIMIT 10")records = cursor.fetchall()if not records:print("\n[数据库] 📊 暂无存储数据")else:print("\n[数据库] 📊 最近10条记录:")print("ID\t重量(g)\t\t记录时间")print("-" * 50)for record in records:print(f"{record[0]}\t{record[1]:.2f}\t\t{record[2]}")except mysql.connector.Error as e:print(f"[数据库] ❌ 查询失败: {e}")finally:if 'conn' in locals():cursor.close()conn.close()def signal_handler(signum, frame):"""处理中止信号"""print("\n[系统] ⏹️ 正在退出程序...")sys.exit(0)def main():# 设置信号处理器signal.signal(signal.SIGINT, signal_handler)print("[系统] 🟢 数据库监控程序已启动")record_count = ensure_table_exists()print("[提示] 按 Ctrl+C 可以退出程序")while True:print("\n请选择操作:")print("1. 查看最近10条记录")print("2. 退出程序")choice = input("请输入选项数字: ")if choice == "1":view_stored_data()elif choice == "2":print("[系统] ⏹️ 正在退出程序...")breakelse:print("[错误] ❌ 无效的选项,请重试")if __name__ == "__main__":main()