当前位置: 首页 > news >正文

MySQL存储STM32F407上的HX711数据

注意:在串口调试助手和脚本之间切换时记得拔下单片机再重新插上,否则串口会被堵塞

1.监听脚本

# -*- coding: utf-8 -*-
import serial
import mysql.connector
import re
import time
import signal# ======================
# 配置区域(根据实际情况修改)
# ======================
SERIAL_PORT = 'COM3'
BAUD_RATE = 9600
ENCODING = 'latin-1'  # 适配特殊字符DB_CONFIG = {'host': 'localhost','user': 'root','password': '','database': 'weight_data','raise_on_warnings': True
}# ======================
# 核心功能函数
# ======================def test_db_connection():"""测试数据库连接是否正常"""try:conn = mysql.connector.connect(**DB_CONFIG)conn.close()print("[数据库] ✅ 连接测试成功")except mysql.connector.Error as e:print(f"[数据库] ❌ 连接失败: {e}")exit(1)def create_table(cursor):"""创建数据表(如果不存在)"""try:create_sql = """CREATE TABLE IF NOT EXISTS weight_records (id INT AUTO_INCREMENT PRIMARY KEY,weight FLOAT NOT NULL,record_time DATETIME DEFAULT CURRENT_TIMESTAMP)"""cursor.execute(create_sql)# 检查表中现有记录数cursor.execute("SELECT COUNT(*) FROM weight_records")count = cursor.fetchone()[0]print(f"[数据库] 📝 表结构正常 (现有记录数: {count})")except mysql.connector.Error as e:if e.errno == 1050:  # 表已存在的错误码print("[数据库] 📝 表已存在,继续使用")else:raise  # 其他错误则继续抛出def parse_serial_data(line):"""解析串口数据"""try:# 清理数据,移除空白字符line = line.strip()print(f"[调试] 清理后的数据: '{line}'")# 直接尝试将整行转换为数字if line:weight = float(line)print(f"[解析] ✅ 成功解析重量值: {weight}g")return weightprint(f"[解析] ⚠️ 数据为空")return Noneexcept Exception as e:print(f"[解析] ❌ 解析错误: {e}")print(f"[解析] ⚠️ 问题数据: {repr(line)}")return Nonedef save_to_db(weight):"""无条件保存数据到数据库"""try:conn = mysql.connector.connect(**DB_CONFIG)cursor = conn.cursor()cursor.execute("INSERT INTO weight_records (weight) VALUES (%s)", (weight,))conn.commit()print(f"[数据库] 📊 数据已保存: {weight}g")except mysql.connector.Error as e:print(f"[数据库] ❌ 保存失败: {e}")finally:if 'conn' in locals():cursor.close()conn.close()def view_stored_data():"""查看数据库中存储的数据"""try:conn = mysql.connector.connect(**DB_CONFIG)cursor = conn.cursor()cursor.execute("SELECT id, weight, record_time FROM weight_records ORDER BY record_time DESC LIMIT 10")records = cursor.fetchall()if not records:print("[数据库] 📊 暂无存储数据")else:print("\n[数据库] 📊 最近10条记录:")print("ID\t重量(g)\t\t记录时间")print("-" * 50)for record in records:print(f"{record[0]}\t{record[1]:.2f}\t\t{record[2]}")except mysql.connector.Error as e:print(f"[数据库] ❌ 查询失败: {e}")finally:if 'conn' in locals():cursor.close()conn.close()def signal_handler(signum, frame):"""处理中止信号"""print("\n[系统] ⏹️ 正在优雅退出...")print("\n当前数据库中的记录:")view_stored_data()exit(0)def main():"""主程序"""signal.signal(signal.SIGINT, signal_handler)try:ser = serial.Serial(port=SERIAL_PORT,baudrate=BAUD_RATE,bytesize=serial.EIGHTBITS,parity=serial.PARITY_NONE,stopbits=serial.STOPBITS_ONE,timeout=1  # 设置更短的超时时间,方便调试)except serial.SerialException as e:print(f"[串口] ❌ 设备未找到: {e}")exit(1)print(f"[系统] 🟢 正在监听 {SERIAL_PORT}...")print("[提示] 按 Ctrl+C 可以查看已存储数据并退出程序")test_db_connection()try:conn = mysql.connector.connect(**DB_CONFIG)cursor = conn.cursor()create_table(cursor)conn.commit()print("[数据库] ✅ 数据表已就绪")except mysql.connector.Error as e:if e.errno != 1050:  # 忽略"表已存在"的错误print(f"[数据库] ❌ 数据库操作失败: {e}")exit(1)finally:if 'conn' in locals():cursor.close()conn.close()data_count = 0buffer = ""try:with ser:print("[串口] 🔄 正在等待数据...")print("[调试] 提示:请确保STM32已连接并运行")while True:if ser.in_waiting > 0:# 读取字节char = ser.read().decode(ENCODING, errors='ignore')print(".", end="", flush=True)  # 显示接收进度if char == '\n':  # 收到换行符时处理数据if buffer:print(f"\n[串口] 📡 收到完整数据: {repr(buffer)}")weight = parse_serial_data(buffer)if weight is not None:if 0 <= weight <= 100000:  # 数据范围检查save_to_db(weight)data_count += 1print(f"[统计] ✅ 已成功存储 {data_count} 条数据")else:print(f"[警告] ⚠️ 数据超出合理范围: {weight}g")buffer = ""  # 清空缓冲区else:buffer += char  # 累积数据time.sleep(0.01)  # 短暂延时,避免CPU占用过高except Exception as e:print(f"\n[系统] ❌ 发生错误: {e}")finally:if ser.is_open:ser.close()print("[系统] 🔌 串口已关闭")if __name__ == "__main__":main()

2.查询脚本 

import mysql.connector
import signal
import sys# ======================
# 配置区域(根据实际情况修改)
# ======================DB_CONFIG = {'host': 'localhost','user': 'root','password': '12345wjh','database': 'weight_data','raise_on_warnings': True
}def ensure_table_exists():"""确保数据表存在并返回当前记录数"""conn = Nonetry:conn = mysql.connector.connect(**DB_CONFIG)cursor = conn.cursor()# 检查表是否存在cursor.execute("SHOW TABLES LIKE 'weight_records'")table_exists = cursor.fetchone() is not Noneif not table_exists:# 创建表create_table_sql = """CREATE TABLE weight_records (id INT AUTO_INCREMENT PRIMARY KEY,weight FLOAT NOT NULL,record_time DATETIME DEFAULT CURRENT_TIMESTAMP)"""cursor.execute(create_table_sql)conn.commit()print("[数据库] ✅ 数据表创建成功")# 检查表中现有记录数cursor.execute("SELECT COUNT(*) FROM weight_records")count = cursor.fetchone()[0]print(f"[数据库] ✅ 表结构正常 (现有记录数: {count})")return countexcept mysql.connector.Error as e:error_msg = str(e)if "Access denied" in error_msg:print(f"[数据库] ❌ 数据库访问被拒绝,请检查用户名和密码")elif "Connection refused" in error_msg:print(f"[数据库] ❌ 无法连接到数据库服务器,请确保MySQL服务已启动")elif "Unknown database" in error_msg:print(f"[数据库] ❌ 数据库'{DB_CONFIG['database']}'不存在")else:print(f"[数据库] ❌ 数据库操作失败: {error_msg}")sys.exit(1)finally:if conn:try:conn.cursor().close()conn.close()except:passdef view_stored_data():"""查看数据库中存储的数据"""try:conn = mysql.connector.connect(**DB_CONFIG)cursor = conn.cursor()cursor.execute("SELECT id, weight, record_time FROM weight_records ORDER BY record_time DESC LIMIT 10")records = cursor.fetchall()if not records:print("\n[数据库] 📊 暂无存储数据")else:print("\n[数据库] 📊 最近10条记录:")print("ID\t重量(g)\t\t记录时间")print("-" * 50)for record in records:print(f"{record[0]}\t{record[1]:.2f}\t\t{record[2]}")except mysql.connector.Error as e:print(f"[数据库] ❌ 查询失败: {e}")finally:if 'conn' in locals():cursor.close()conn.close()def signal_handler(signum, frame):"""处理中止信号"""print("\n[系统] ⏹️ 正在退出程序...")sys.exit(0)def main():# 设置信号处理器signal.signal(signal.SIGINT, signal_handler)print("[系统] 🟢 数据库监控程序已启动")record_count = ensure_table_exists()print("[提示] 按 Ctrl+C 可以退出程序")while True:print("\n请选择操作:")print("1. 查看最近10条记录")print("2. 退出程序")choice = input("请输入选项数字: ")if choice == "1":view_stored_data()elif choice == "2":print("[系统] ⏹️ 正在退出程序...")breakelse:print("[错误] ❌ 无效的选项,请重试")if __name__ == "__main__":main()


http://www.mrgr.cn/news/100275.html

相关文章:

  • 路由与OSPF学习
  • 【AI】生产规模的向量数据库 Pinecone 使用指南
  • 【prometheus+Grafana篇】从零开始:Linux 7.6 上二进制安装 Prometheus、Grafana 和 Node Exporter
  • 文件IO(Java)
  • 【2025计算机网络-面试常问】http和https区别是什么,http的内容有哪些,https用的是对称加密还是非对称加密,流程是怎么样的
  • XSS跨站--原理和分类
  • 第七讲 | list的使用及其模拟实现
  • java—12 kafka
  • 【专题三】二分查找(2)
  • UOJ 228 基础数据结构练习题 Solution
  • 第一部分:git基本操作
  • 皖维 大病救助办理手续说明
  • IOT项目——DIY 气象站
  • 【深度强化学习 DRL 快速实践】逆向强化学习算法 (IRL)
  • Shell 脚本入门:从零开始写自动化脚本
  • QT窗口相关控件及其属性
  • 再来1章linux 系列-11 系统的延迟任务及定时任务 ,名单,at ,crontab,mail;/etc/at.allow,/etc/at.deny
  • 使用spring boot vue 上传mp4转码为dash并播放
  • 【深度强化学习 DRL 快速实践】深度确定性策略梯度算法 (DDPG)
  • 【深度强化学习 DRL 快速实践】近端策略优化 (PPO)