微信机器人接入聊天模块
在完成之前的微信机器人后,我想能不能加入智能聊天的方式,使其成为一个真正的机器人:
https://blog.csdn.net/qq_43199509/article/details/143515780
我全部的微信机器人代码在github上:
https://github.com/CaLlMeErIC/WechatBot
用的是closeai的接口,这个是不需要翻墙或者科学上网的国内的聊天接口,可以通过我的推广链接进去,有额外福利:
https://referer.shadowai.xyz/r/4710
这里我额外编写了一个聊天模块:
"""
使用openapi接口进行聊天的模块
"""
import os
from openai import OpenAIclass FunctionModule:"""FunctionModule类。"""_instance = None# 命令标识,用于标注什么样的命令开头会调用这个功能模块# 比如用户发送 "@机器人 聊天" 或 "@机器人 闲聊" 会触发这个模块_command_sign = ["聊天"]_reply_string = None# 模块激活状态is_active = True # 设置为 True,表示模块被激活def __new__(cls):"""单例实现。"""if cls._instance is None:cls._instance = super(FunctionModule, cls).__new__(cls)return cls._instancedef __init__(self):"""初始化方法。"""if not hasattr(self, '_initialized'):self._initialized = True# 初始化操作print("初始化 FunctionModule 实例")# 在这里初始化 API 设置self.api_key = 'sk-8AZopg1uvf41Rthisisnottruekkey55EjUfhimGyb8HLjc2' # 请替换为您的实际 API 密钥self.api_base = 'https://api.openai-proxy.org/v1' # API 基础 URL,包含 /v1 后缀self.model = 'gpt-3.5-turbo' # 使用的模型名称self.max_windows = 5 # 每个用户的上下文消息最多保留 5 条self.max_users = 100 # 最多保存 100 个用户的会话self.user_contexts = {} # 用于保存用户的上下文self.client = OpenAI(# sdk,包括langchain,都需要这个/v1的后缀base_url=self.api_base,api_key=self.api_key,)def get_command_sign(self):"""返回当前模块的命令标识"""return self._command_signdef process_messages(self, sender_nickname, content, directly=False):"""根据发消息人的昵称和消息内容,与 FunctionModule 进行对话。Args:sender_nickname (str): 发送者的昵称。content (str): 消息内容。directly : 判断是否是直接发送消息聊天还是通过命令调用聊天Returns:str: 回复消息。"""# 去掉命令标识,获取实际用户输入user_message = contentif not directly:# 如果是通过命令调用聊天的话for sign in self._command_sign:if content.startswith(sign):user_message = content[len(sign):].strip()break# 如果用户没有输入内容,则提示用户输入if not user_message:self._reply_string = "请在命令后添加您想说的话,例如:'聊天 你好!'"return self.get_reply()# 管理用户上下文# 检查用户是否在上下文字典中if sender_nickname not in self.user_contexts:# 如果用户数量超过最大值,则删除最早的用户if len(self.user_contexts) >= self.max_users:first_user = next(iter(self.user_contexts))del self.user_contexts[first_user]# 初始化该用户的上下文列表self.user_contexts[sender_nickname] = []# 获取该用户的上下文user_context = self.user_contexts[sender_nickname]# 添加新消息到上下文user_context.append({'role': 'user', 'content': user_message})# 只保留最近的 max_windows * 2 条消息(因为包括 AI 的回复)if len(user_context) > self.max_windows * 2:user_context = user_context[-self.max_windows * 2:]# 更新上下文self.user_contexts[sender_nickname] = user_context# 调用 ChatCompletion API 进行对话try:response = self.client.chat.completions.create(model=self.model,messages=user_context)# 提取 AI 回复的内容ai_reply = response.choices[0].message.content# 将 AI 的回复也加入上下文user_context.append({'role': 'assistant', 'content': ai_reply})# 同样只保留最近的 max_windows * 2 条消息if len(user_context) > self.max_windows * 2:user_context = user_context[-self.max_windows * 2:]# 更新上下文self.user_contexts[sender_nickname] = user_contextself._reply_string = ai_replyexcept Exception as e:# 处理异常self._reply_string = f"抱歉,{sender_nickname},发生错误:{str(e)}"return self.get_reply()@staticmethoddef get_simple_description():"""返回简单的功能描述"""return "与 ChatGPT 进行聊天互动"@staticmethoddef get_detail_description():"""返回详细的功能描述"""return ("【聊天功能说明】\n""您可以通过发送“聊天”或“闲聊”作为开头,再加上您的消息,与 ChatGPT 进行对话。\n""例如:“聊天 今天的天气怎么样?”\n""ChatGPT 将会回复您的问题。")def get_reply(self):"""返回最终的回复内容"""return self._reply_stringdef close(self):"""如果需要,关闭与 OpenAI 的连接(在这个例子中不需要)。"""passif __name__ == "__main__":chat_module = FunctionModule()# 模拟多个用户的对话users = ["Alice", "Bob", "Charlie", "David", "Eve"]messages = ["聊天 你好,今天天气怎么样?","闲聊 给我讲个笑话吧!","聊天 你知道OpenAI吗?","闲聊 讲个故事吧。","聊天 谈谈人工智能的发展。"]for sender, message in zip(users, messages):reply = chat_module.process_messages(sender, message)print(f"{sender}: {message}")print(f"ChatGPT: {reply}")# 测试超过用户上限的情况for i in range(101):sender = f"User_{i}"message = "聊天 测试用户上限。"reply = chat_module.process_messages(sender, message)print(f"{sender}: {message}")print(f"ChatGPT: {reply}")print(f"当前用户数量:{len(chat_module.user_contexts)}") # 应该不超过 100
然后之前的代码主程序部分也需要修改下,如果没有对应的触发命令就直接调用智能聊天模块:
"""
启动微信机器人并导入不同的功能模块
"""
import traceback
import threading
import queue
import logging
import itchat
from itchat.content import TEXT
from utils.scan_module import get_command_module_dict# 配置日志
logging.basicConfig(level=logging.INFO, format='%(asctime)s %(threadName)s %(message)s')class WeChatBot:"""微信机器人"""def __init__(self):# 功能模块映射,根据消息前缀映射到对应的模块名self.module_mapping = get_command_module_dict()# 定义消息队列self.message_queue = queue.Queue()self.num_worker_threads = 5 # 工作线程数# 初始化发送者锁字典和锁self.sender_locks = {}self.sender_locks_lock = threading.Lock()self.handle_private_message = Noneself.handle_group_message = None# 初始化 itchatitchat.auto_login(hotReload=False)# 注册消息处理函数self.register_handlers()# 启动消息处理线程self.start_worker_threads()def register_handlers(self):"""注册消息处理器"""# 由于装饰器的使用,我们需要将函数定义在这里,并使用 self 作为参数@itchat.msg_register(TEXT, isFriendChat=True)def handle_private_message(msg):# 将消息放入队列,不直接处理self.message_queue.put(('private', msg))@itchat.msg_register(TEXT, isGroupChat=True)def handle_group_message(msg):# 将消息放入队列,不直接处理if msg['IsAt']:self.message_queue.put(('group', msg))# 将函数绑定到实例self.handle_private_message = handle_private_messageself.handle_group_message = handle_group_messagedef start_worker_threads(self):"""启动工作线程"""for i in range(self.num_worker_threads):thread_pool = threading.Thread(target=self.message_worker, name=f'Worker-{i + 1}')thread_pool.daemon = Truethread_pool.start()def message_worker(self):"""通过多线程和队列处理信息,使用锁确保一时间同一人只能占用一个线程"""while True:try:msg_type, msg_data = self.message_queue.get(timeout=1)sender_id = msg_data['FromUserName']# 获取或创建发送者的锁with self.sender_locks_lock:if sender_id not in self.sender_locks:self.sender_locks[sender_id] = threading.Lock()sender_lock = self.sender_locks[sender_id]# 使用发送者的锁,确保同一时间只有一个线程处理该发送者的消息with sender_lock:if msg_type == 'private':self.handle_private_message_worker(msg_data)elif msg_type == 'group':self.handle_group_message_worker(msg_data)self.message_queue.task_done()except queue.Empty:continueexcept Exception as exception:logging.error("消息处理时发生异常:%s", exception)def handle_private_message_worker(self, msg):"""处理私聊消息"""sender = msg['User']nickname = sender['NickName']content = msg['Text']logging.info("私聊消息 - 来自 %s:%s", nickname, content)reply = self.generate_reply(nickname, content)itchat.send(reply, toUserName=sender['UserName'])def handle_group_message_worker(self, msg):"""处理群消息"""if msg['IsAt']:group_name = msg['User']['NickName']sender_nickname = msg['ActualNickName']actual_content = msg['Content']# 获取自己的昵称my_nickname = itchat.search_friends()['NickName']# 去除@信息,提取实际内容content = actual_content.replace(f'@{my_nickname}', '').strip()logging.info("群聊消息 - %s 中 @%s 说:%s", group_name, sender_nickname, content)reply_content = self.generate_reply(sender_nickname, content)reply = f"@{sender_nickname} {reply_content}"itchat.send(reply, toUserName=msg['FromUserName'])def generate_reply(self, nickname, content):"""调用不同的功能模块,处理消息生成回复"""try:command_sign = content.split(" ")[0]if command_sign in self.module_mapping:module_instance = self.module_mapping[command_sign]reply = module_instance.process_messages(nickname, content)else:# 如果没有特殊命令,就直接调用聊天模块module_instance = self.module_mapping["聊天"]reply = module_instance.process_messages(nickname, content, directly=True)return replyexcept Exception as exception:print(traceback.format_exc())logging.error("处理模块时发生异常:%s", exception)return '抱歉,出现了一些错误。'@staticmethoddef run():"""开始运行机器人"""while True:try:itchat.run(blockThread=True)except KeyboardInterrupt:# 如果用户手动中断,退出循环logging.info("微信机器人已停止")breakexcept Exception as exception:logging.error("主循环发生异常:%s", exception)if 'request' in str(exception) or 'Logout' in str(exception) or 'login' in str(exception).lower():try:itchat.auto_login(hotReload=True)logging.info("重新登录成功")except Exception as login_exception:logging.error("重新登录失败:%s", login_exception)print("无法重新登录,程序即将退出")break # 退出循环,不再尝试重新登录else:logging.error("无法识别的异常,未能重新登录:%s", exception)print("遇到无法识别的异常,程序即将退出")break # 退出循环,不再尝试if __name__ == '__main__':bot = WeChatBot()bot.run()