DeepSeek本地部署+自主开发对话Web应用
文章目录
- 引言
- 前端部分
- 核心页面DeepSeek.vue
- MyModal.vue
- 后端部分
- WebSocketConfig 配置类
- AbstractDeepSeekTool
- DeepSeekWebSocketHandler
- 数据库设计
- 总结
引言
最近DeepSeep横空出世,在全球内掀起一股热潮,到处都是满血大模型接入的应用,但这些有的是要钱的,有的虽然免费但是也会卡顿,作为一名软件开发人员,当然要想办法了,这不笔者就自主开发了一个web对话页面,不是简单的对话响应哦,还能支持保留历史记录的,可以新建对话框,请往下看(如果对您有帮助记得关注我,点个赞!)。
前端Gitee代码地址:https://gitee.com/buxingzhe/deepseek-chat.git
后端Gitee代码地址:https://gitee.com/buxingzhe/deep-seek-project.git
前端部分
先来个效果图
安装依赖
npm install
运行命令
npm run serve
打包构建
npm run build
功能特点
-
AI流式响应,无卡顿。
-
支持对话消息文本的持久化,除了临时会话,其他会话框的标题均可修改,会话框支持删除。
-
可以随意新建对话框,支持保存历史对话消息,有一个默认临时会话框,不保存会话历史消息,刷新页面后消息丢失。
-
消息区分思考推理部分和正式回答部分,思考推理部分为浅灰色,正式回答为黑色文本。
-
使用了MarkDown渲染html,格式更加美观。
-
消息生成过程中可随时停止。
-
支持心跳检测,后端服务离线后重启时,前端自动重新建立连接webSocket会话。
核心页面DeepSeek.vue
代码稍微有点复杂了,毕竟要实现的东西也不少,什么保存消息上下文记录啊,定时心跳检测啊,文本渲染啊等等,样式部分我这省略了,具体可从gitee拉取源码。
<template><div id="app"><div class="chat-container"><!-- 会话列表 --><div class="session-list"><divv-for="(session, index) in sessions":key="index"class="session-item":class="{ active: activeSessionIndex === index }"@click="selectSession(index)"><span class="session-status" :class="{ online: session.isConnected, offline:!session.isConnected }"></span><span class="session-status-text">{{ session.isConnected ? '在线' : '离线' }}</span><span class="session-name">{{ session.title }}</span><!-- 三个小点按钮 --><div class="session-actions-trigger" @click.stop="toggleActionsMenu(index)"><span>...</span></div><!-- 编辑和删除操作菜单 --><divv-if="session.showActionsMenu"class="session-actions-menu"@click.stop><button @click="openEditModal(index)">编辑</button><button @click="openDeleteModal(index)">删除</button></div></div><button @click="createNewSession" class="new-session-button" :disabled="hasUnconnectedSession">新建会话</button></div><!-- 聊天内容 --><div class="chat-content"><div class="chatBox"><!-- 错误提示 --><div v-if="errorMessage" class="error-message">{{ errorMessage }}</div><!-- 聊天消息显示区域 --><div class="chat-messages"><div v-for="(message, index) in currentMessages" :key="index" class="message"><!-- 用户消息 --><div v-if="message.sender === 'user'" class="user-message-container"><article class="message-content user-message">{{ message.text }}</article></div><!-- 机器人消息 --><div v-else class="bot-message-container"><article class="message-content bot-message" v-html="renderMarkdown(message.text)"></article></div></div></div><!-- 输入框和发送按钮 --><div class="chat-input"><textareav-model="inputMessage"placeholder="请输入你的问题..."@keyup="handleKeyup"rows="6":disabled="!isConnected"/><button @click="handleButtonClick" :disabled="!isConnected">{{ currentSession.isGenerating ? '停止生成' : '发送' }}</button></div></div></div></div><!-- 编辑模态框 --><MyModal :visible="isEditModalVisible" title="编辑会话标题" @close="closeEditModal"><input v-model="editTitle" placeholder="请输入新的会话标题" /><template #footer><button @click="closeEditModal">取消</button><button @click="confirmEdit">确定</button></template></MyModal><!-- 删除模态框 --><MyModal :visible="isDeleteModalVisible" title="确认删除" @close="closeDeleteModal"><p>确定要删除该会话吗?</p><template #footer><button @click="closeDeleteModal">取消</button><button @click="confirmDelete">确定</button></template></MyModal></div>
</template><script setup>
import {computed, onMounted, onUnmounted, reactive, ref} from 'vue';
import MyModal from './MyModal.vue';
import {marked} from 'marked';
import DOMPurify from 'dompurify';
import axios from 'axios';// 配置 marked(保持不变)
marked.setOptions({gfm: true,breaks: true,highlight: function (code) {return code;}
});// 渲染 Markdown 内容(保持不变)
const renderMarkdown = (content) => {console.log("渲染前内容",content);content = content.replace(/<think>/g, '<div><span class="deepThink">');if (!content.includes('</think>')) {content = content.concat('</span></div>');}if (content.includes('</think>')) {content = content.replace(/<\/span><\/div>/g, '');content = content.replace(/<\/think>/g, '</span></div>');}const html = marked(content);const sanitizedHtml = DOMPurify.sanitize(html);const tempDiv = document.createElement('div');tempDiv.innerHTML = sanitizedHtml.toString();const deepThinkElements = tempDiv.querySelectorAll('.deepThink');deepThinkElements.forEach((element) => {if (element.textContent.trim() === '') {element.textContent = '暂无推理过程';}});console.log("渲染后内容",tempDiv.innerHTML);return tempDiv.innerHTML;
};// 存储所有会话(改为空数组)
const sessions = ref([]);// 当前激活的会话索引
const activeSessionIndex = ref(0);// 新增:统一错误处理函数
const handleNetworkError = (error, session) => {let errorMsg = '服务暂时不可用,请稍后重试';if (!navigator.onLine) {errorMsg = '网络连接已断开,请检查网络设置';} else if (error.message === 'Network Error') {errorMsg = '无法连接到服务器,请确认后端服务已启动';} else if (error.response?.status >= 500) {errorMsg = '服务器内部错误,请联系管理员';}session.errorMessage = errorMsg;setTimeout(() => {session.errorMessage = ''; // 5秒后自动清除错误}, 5000);
};// 保存用户消息到数据库
const saveUserMessageToDatabase = async (talkInfoId, message,messageType) => {try {await axios.post('/api/deepSeek/saveTalkInfoMessage', {talkInfoId,message,messageType});} catch (error) {console.error('保存用户消息到数据库失败:', error);const errorSession = sessions.value[activeSessionIndex.value] || {};handleNetworkError(error, errorSession);}
};// 保存机器人消息到数据库
const saveBotMessageToDatabase = async (talkInfoId, message,messageType) => {try {await axios.post('/api/deepSeek/saveTalkInfoMessage', {talkInfoId,message,messageType});} catch (error) {console.error('保存机器人消息到数据库失败:', error);const errorSession = sessions.value[activeSessionIndex.value] || {};handleNetworkError(error, errorSession);}
};// 获取单个会话框的历史消息列表
const getMessageListByTalkInfoId = async (talkInfoId) => {try {const response = await axios.get('/api/deepSeek/getMessageListByTalkInfoId', {params: { talkInfoId }});// 检查 response.data.data 是否存在且为数组if (Array.isArray(response.data?.data)) {//构建数据const result = response.data.data.map(item => {return {sender: item.messageType,text: item.message};});console.log('构建数据成功+++:', result)return result;}return [];} catch (error) {console.error('获取会话消息列表失败:', error);const errorSession = sessions.value[activeSessionIndex.value] || {};handleNetworkError(error, errorSession);return [];}
};//页面刷新时初始化会话列表
const fetchSessions = async () => {try {const response = await axios.get('/api/deepSeek/talkInfoList');const data = response.data?.data || [];// 创建临时会话const tempSession = reactive({talkInfoId: 'temp-' + Date.now(),title: '临时会话',messages: [{ sender: 'bot', text: '欢迎使用临时对话框!有什么可以帮助你的?' }],inputMessage: '',errorMessage: '',isConnected: false,isGenerating: false,socket: null,reconnectInterval: null,retryCount: 0, // 新增重试次数currentBotMessage: '' // 用于存储当前机器人流式响应消息});// 初始化会话列表,将临时会话添加到列表开头sessions.value = [tempSession, ...data.map(item =>reactive({talkInfoId: item.id,title: item.title || `会话 ${sessions.value.length + 1}`,messages: [{ sender: 'bot', text: '欢迎使用!有什么可以帮助你的?' }],inputMessage: '',errorMessage: '',isConnected: false,isGenerating: false,socket: null,reconnectInterval: null,retryCount: 0, // 新增重试次数currentBotMessage: '' // 用于存储当前机器人流式响应消息}))];// 为每个会话获取对话消息for (const session of sessions.value) {const talkInfoId = session?.talkInfoId;const result = talkInfoId.toString().startsWith('temp-')if (!result){const messages = await getMessageListByTalkInfoId(session?.talkInfoId);console.log("获取的消息为:",messages);session.messages = messages && messages.length > 0 ? messages :[{ sender: 'bot', text: '欢迎使用!有什么可以帮助你的?' }];}}// 若查询到会话框列表,开始逐一连接if (data.length > 0) {sessions.value.forEach(session => {reconnectWebSocket(session);});} else {// 查不到则直接连接临时会话框reconnectWebSocket(tempSession);}} catch (error) {console.error('获取会话列表失败:', error);// 创建临时会话const tempSession = reactive({talkInfoId: 'temp-' + Date.now(),title: '临时会话',messages: [{ sender: 'bot', text: '欢迎使用!' }],inputMessage: '',errorMessage: '',isConnected: false,isGenerating: false,socket: null,reconnectInterval: null,retryCount: 0, // 新增重试次数currentBotMessage: '' // 用于存储当前机器人流式响应消息});sessions.value = [tempSession];activeSessionIndex.value = 0;handleNetworkError(error, tempSession);// 直接连接临时会话框reconnectWebSocket(tempSession);}
};// 计算属性(保持不变)
const currentMessages = computed(() => sessions.value[activeSessionIndex.value]?.messages || []);
const inputMessage = computed({get: () => sessions.value[activeSessionIndex.value]?.inputMessage || '',set: (value) => (sessions.value[activeSessionIndex.value].inputMessage = value)
});
const errorMessage = computed({get: () => sessions.value[activeSessionIndex.value]?.errorMessage || '',set: (value) => (sessions.value[activeSessionIndex.value].errorMessage = value)
});
const isConnected = computed({get: () => sessions.value[activeSessionIndex.value]?.isConnected || false,set: (value) => (sessions.value[activeSessionIndex.value].isConnected = value)
});
const currentSession = computed(() => sessions.value[activeSessionIndex.value] || {});// 发送消息函数
const sendMessage = () => {if (!inputMessage.value) return;const messageData = JSON.stringify({talkInfoId: currentSession.value?.talkInfoId,content: inputMessage.value});currentMessages.value.push({ sender: 'user', text: inputMessage.value });currentSession.value.socket.send(messageData);// 保存用户消息到数据库if (!currentSession.value.talkInfoId.toString().startsWith('temp-')){saveUserMessageToDatabase(currentSession.value.talkInfoId, inputMessage.value,'user');}inputMessage.value = '';currentSession.value.isGenerating = true;
};// 停止生成函数
const stopGenerating = () => {const messageData = JSON.stringify({talkInfoId: currentSession.value?.talkInfoId,content: 'stopSending'});currentSession.value.socket.send(messageData);currentSession.value.isGenerating = false;
};// 发送按钮点击处理
const handleButtonClick = () => {currentSession.value.isGenerating ? stopGenerating() : sendMessage();
};// 键盘事件处理
const handleKeyup = (event) => {if (event.key === 'Enter' &&!event.shiftKey) sendMessage();
};// WebSocket 连接管理(修改)
let fetchSessionsDebounceTimer = null;
const reconnectWebSocket = (session) => {if (session.reconnectInterval) clearInterval(session.reconnectInterval);const initialDelay = 3000; // 初始延迟时间const maxDelay = 60000; // 最大延迟时间const backoffFactor = 2; // 退避因子const attemptReconnect = () => {if (!session.socket || session.socket.readyState === WebSocket.CLOSED) {session.socket = new WebSocket(`ws://localhost:8085/websocket?talkInfoId=${session.talkInfoId}`);session.socket.onopen = () => {session.errorMessage = '';session.isConnected = true;session.retryCount = 0; // 连接成功,重置重试次数clearInterval(session.reconnectInterval);};session.socket.onmessage = (event) => {const targetSession = sessions.value.find(s => s?.socket === session.socket);if (!targetSession) return;if (event.data.includes('[END_OF_MESSAGE_GENERATE]')) {targetSession.isGenerating = false;// 保存机器人消息到数据库if (!targetSession?.talkInfoId.toString().startsWith('temp-')){saveBotMessageToDatabase(targetSession?.talkInfoId, targetSession.currentBotMessage,'bot');}targetSession.currentBotMessage = '';return;}targetSession.currentBotMessage += event.data;const lastMessage = targetSession?.messages[targetSession?.messages.length - 1];lastMessage?.sender === 'bot'? lastMessage.text += event.data: targetSession?.messages.push({ sender: 'bot', text: event.data });};session.socket.onerror = (error) => {console.error('连接错误:', error);handleNetworkError(error, session);session.isConnected = false;session.retryCount++;const nextDelay = Math.min(initialDelay * Math.pow(backoffFactor, session.retryCount), maxDelay);session.reconnectInterval = setTimeout(attemptReconnect, nextDelay);};session.socket.onclose = () => {handleNetworkError(new Error('连接意外关闭'), session);session.isConnected = false;session.retryCount++;const nextDelay = Math.min(initialDelay * Math.pow(backoffFactor, session.retryCount), maxDelay);session.reconnectInterval = setTimeout(attemptReconnect, nextDelay);// 尝试刷新会话列表,添加防抖机制if (fetchSessionsDebounceTimer) {clearTimeout(fetchSessionsDebounceTimer);}fetchSessionsDebounceTimer = setTimeout(() => {fetchSessions();}, 3000); // 3 秒防抖};}};attemptReconnect();
};// 创建新会话
const createNewSession = async () => {if (hasUnconnectedSession.value) return;try {// 新建新会话的同时保存到后台const response = await axios.post('/api/deepSeek/saveTalkInfo', {title: `历史会话 ${sessions.value.length}`});const newSession = reactive({talkInfoId: response.data.data,title: `历史会话 ${sessions.value.length}`,messages: [{ sender: 'bot', text: '欢迎使用!有什么可以帮助你的?' }],inputMessage: '',errorMessage: '',isConnected: false,isGenerating: false,socket: null,reconnectInterval: null,retryCount: 0, // 新增重试次数currentBotMessage: '' // 用于存储当前机器人流式响应消息});sessions.value.push(newSession);activeSessionIndex.value = sessions.value.length - 1;reconnectWebSocket(newSession);} catch (error) {console.error('创建会话失败:', error);const errorSession = sessions.value[activeSessionIndex.value] || {};handleNetworkError(error, errorSession);// 确保至少存在一个会话if (sessions.value.length === 0) {const tempSession = reactive({talkInfoId: 'temp-' + Date.now(),title: '临时会话',messages: [{ sender: 'bot', text: '欢迎使用!' }],inputMessage: '',errorMessage: '',isConnected: false,isGenerating: false,socket: null,reconnectInterval: null,retryCount: 0, // 新增重试次数currentBotMessage: '' // 用于存储当前机器人流式响应消息});sessions.value = [tempSession];activeSessionIndex.value = 0;}}
};// 切换操作菜单显示隐藏
const toggleActionsMenu = (index) => {sessions.value.forEach((session, i) => {session.showActionsMenu = i === index && !session.showActionsMenu;});
};
//============================编辑和删除会话框相关操作逻辑=========================================================================
const isEditModalVisible = ref(false);
const isDeleteModalVisible = ref(false);
const editTitle = ref('');
const deleteTitle = ref('');
let currentEditIndex = -1;
let currentDeleteIndex = -1;const openEditModal = (index) => {currentEditIndex = index;editTitle.value = sessions.value[index].title;isEditModalVisible.value = true;
};const closeEditModal = () => {isEditModalVisible.value = false;
};const confirmEdit = async () => {const session = sessions.value[currentEditIndex];if (editTitle.value && editTitle.value!== session.title) {try {await axios.post('/api/deepSeek/updateTalkInfo', {id: session.talkInfoId,title: editTitle.value});session.title = editTitle.value;} catch (error) {console.error('编辑会话失败:', error);handleNetworkError(error, session);}}closeEditModal();
};const openDeleteModal = (index) => {currentDeleteIndex = index;deleteTitle.value = sessions.value[index].title;isDeleteModalVisible.value = true;
};const closeDeleteModal = () => {isDeleteModalVisible.value = false;
};const confirmDelete = async () => {const session = sessions.value[currentDeleteIndex];if (session.talkInfoId.toString().startsWith('temp-')) {console.log('临时会话不能删除');closeDeleteModal();return;}try {await axios.post('/api/deepSeek/deleteTalkInfo', {id: session.talkInfoId});sessions.value.splice(currentDeleteIndex, 1);if (activeSessionIndex.value === currentDeleteIndex) {activeSessionIndex.value = Math.max(0, activeSessionIndex.value - 1);}} catch (error) {console.error('删除会话失败:', error);handleNetworkError(error, session);}closeDeleteModal();
};
//======================================================================================================================
// 选择会话
const selectSession = (index) => {activeSessionIndex.value = index;isConnected.value = sessions.value[index].isConnected;
};// 计算属性:判断是否有未连接的会话
const hasUnconnectedSession = computed(() => {return sessions.value.some(session =>!session?.isConnected);
});// 生命周期
let serviceCheckInterval;
onMounted(() => {fetchSessions(); // 初始化时获取会话列表// 定期检查后端服务可用性serviceCheckInterval = setInterval(() => {axios.get('/api/deepSeek/heartBeatCheck').then(() => {// 服务可用,对未连接的会话进行重试sessions.value.forEach(session => {if (!session?.isConnected) {reconnectWebSocket(session);}});}).catch(error => {console.error('后端服务不可用:', error);sessions.value.forEach(session => {session.isConnected = false;if (session?.socket) {session.socket.close();}});});}, 10000); // 每10秒检查一次
});onUnmounted(() => {sessions.value.forEach((session) => {if (session?.socket) session.socket.close();if (session?.reconnectInterval) clearInterval(session.reconnectInterval);});clearInterval(serviceCheckInterval);
});
</script><style scoped>
...此处省略
</style>
MyModal.vue
<template><div v-if="visible" class="modal-overlay" @click.self="close"><transition name="modal-fade"><div class="modal"><div class="modal-header"><h3>{{ title }}</h3><button @click="close">×</button></div><div class="modal-body"><slot></slot></div><div class="modal-footer"><slot name="footer"></slot></div></div></transition></div>
</template><script setup>
import { watch } from 'vue';// eslint-disable-next-line no-undef
const props = defineProps({visible: {type: Boolean,required: true},title: {type: String,required: true}
});// eslint-disable-next-line no-undef
const emit = defineEmits(['close']);const close = () => {emit('close');
};watch(() => props.visible, (newVal) => {if (!newVal) {close();}
});
</script><style scoped>
.modal-overlay {position: fixed;top: 0;left: 0;width: 100%;height: 100%;background-color: rgba(0, 0, 0, 0.5);display: flex;justify-content: center;align-items: center;z-index: 1000;animation: fadeIn 0.3s ease-in-out;
}.modal {background: white;border-radius: 12px;box-shadow: 0 8px 30px rgba(0, 0, 0, 0.2);width: 500px;max-width: 90%;transform-origin: center;animation: scaleUp 0.3s ease-in-out;
}.modal-header {display: flex;justify-content: space-between;align-items: center;padding: 16px 24px;border-bottom: 1px solid #e9ecef;
}.modal-header h3 {margin: 0;font-size: 1.25rem;font-weight: 600;color: #212529;
}.modal-header button {background: none;border: none;font-size: 1.5rem;line-height: 1;color: #6c757d;cursor: pointer;transition: color 0.2s;padding: 4px;
}.modal-header button:hover {color: #dc3545;
}.modal-body {padding: 20px 24px;color: #495057;line-height: 1.6;max-height: 70vh;overflow-y: auto;
}.modal-footer {display: flex;justify-content: flex-end;gap: 12px;padding: 16px 24px;border-top: 1px solid #e9ecef;background: #f8f9fa;border-radius: 0 0 12px 12px;
}@keyframes fadeIn {from { opacity: 0; }to { opacity: 1; }
}@keyframes scaleUp {from { transform: scale(0.95); }to { transform: scale(1); }
}@media (max-width: 480px) {.modal {width: 95%;margin: 10px;}.modal-header,.modal-body,.modal-footer {padding: 12px 16px;}
}
</style>
后端部分
项目结构图如下
WebSocketConfig 配置类
package com.deepseek.project.websocket;import jakarta.annotation.Resource;
import org.apache.catalina.connector.Connector;
import org.springframework.boot.web.embedded.tomcat.TomcatServletWebServerFactory;
import org.springframework.boot.web.servlet.server.ServletWebServerFactory;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.config.annotation.EnableWebSocket;
import org.springframework.web.socket.config.annotation.WebSocketConfigurer;
import org.springframework.web.socket.config.annotation.WebSocketHandlerRegistry;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;
import org.springframework.web.socket.server.standard.ServletServerContainerFactoryBean;/*** @author hulei* websocket配置类*/
@Configuration
@EnableWebSocket
public class WebSocketConfig implements WebSocketConfigurer {@Resourceprivate ApplicationContext applicationContext;@Beanpublic ServerEndpointExporter serverEndpointExporter() {return new ServerEndpointExporter();}@Beanpublic ServletWebServerFactory servletContainer() {TomcatServletWebServerFactory tomcat = new TomcatServletWebServerFactory();tomcat.addAdditionalTomcatConnectors(createWebSocketConnector());return tomcat;}/*** 设置最大消息大小*/@Beanpublic ServletServerContainerFactoryBean createWebSocketContainer() {ServletServerContainerFactoryBean container = new ServletServerContainerFactoryBean();// 在此处设置bufferSizecontainer.setMaxTextMessageBufferSize(512000);container.setMaxBinaryMessageBufferSize(512000);container.setMaxSessionIdleTimeout(15 * 60000L);return container;}@Overridepublic void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {registry.addHandler(deepSeekWebSocketHandler(), "/websocket").setAllowedOrigins("*");}public DeepSeekWebSocketHandler deepSeekWebSocketHandler() {// 从spring容器中获取bean,如过直接new的话就不是spring管理的bean了,DeepSeekWebSocketHandler 内部使用依赖注入其他的类则会为nullreturn applicationContext.getBean(DeepSeekWebSocketHandler.class);}private Connector createWebSocketConnector() {Connector connector = new Connector("org.apache.coyote.http11.Http11NioProtocol");connector.setScheme("ws");// WebSocket 服务的端口,多通道公用connector.setPort(8085);return connector;}
}
这段代码是WebSocket配置类,主要用于配置和初始化WebSocket服务。功能包括:
- 启用WebSocket并注册处理器。
- 配置Tomcat服务器以支持WebSocket连接。
- 设置WebSocket的最大消息大小和会话超时时间。
- 创建WebSocket连接器并指定端口。
控制流程图如下:
AbstractDeepSeekTool
package com.deepseek.project.tool;import com.deepseek.project.constant.Constants;
import com.deepseek.project.service.ITalkInfoService;
import com.google.gson.Gson;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import okhttp3.*;
import org.springframework.web.socket.TextMessage;
import org.springframework.web.socket.WebSocketSession;import java.io.IOException;
import java.util.*;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;/*** author: hulei* 抽象的DeepSeek工具类,为什么要抽象呢,因为笔者既想使用本地部署的deepSeek,又想使用线上官方提供的API,所以抽象了DeepSeek工具类,*/
@Slf4j
@Data
public abstract class AbstractDeepSeekTool {private static final int MAX_CONCURRENT = 20;private static final int BATCH_SIZE = 10;public static OkHttpClient client;private static ExecutorService httpExecutor;public static final Gson gson = new Gson();/*** deepSeek会话历史记录*/public final List<Map<String, String>> conversationHistory = Collections.synchronizedList(new ArrayList<>());/*** 是否停止发送标志,由前端传值控制*/public volatile boolean stopSending = false;/*** DeepSeek流式响应消息缓存列表,攒到一定数量时,批量保存到数据库*/private final List<Map<String, String>> deepSeekMessageCache = new ArrayList<>();/*** ITalkInfoService*/private final ITalkInfoService italkInfoService;public AbstractDeepSeekTool(ITalkInfoService italkInfoService, List<Map<String, String>> conversationHistory) {this.italkInfoService = italkInfoService;this.conversationHistory.addAll(conversationHistory);init();}public void init() {httpExecutor = Executors.newCachedThreadPool(r -> {Thread t = new Thread(r);t.setDaemon(true);return t;});client = new OkHttpClient.Builder().dispatcher(new Dispatcher(httpExecutor)).connectionPool(new ConnectionPool(50, 5, java.util.concurrent.TimeUnit.MINUTES)).connectTimeout(30, java.util.concurrent.TimeUnit.SECONDS).readTimeout(60, java.util.concurrent.TimeUnit.SECONDS).writeTimeout(60, java.util.concurrent.TimeUnit.SECONDS).build();}public void processMessage(String userMessage, WebSocketSession session, String talkInfoId) {synchronized (conversationHistory) {Map<String, String> userMessageMap = Map.of("role", "user", "content", userMessage);conversationHistory.add(userMessageMap);if (!talkInfoId.startsWith(Constants.TEMP_TALK_INFO_ID_PREFIX)) {addMessageToCache(userMessageMap, talkInfoId);}}requestDeepSeekMessage(session, talkInfoId);}/*** 构建请求头抽象方法,交给子类实现*/protected abstract Request buildRequest();/*** 请求DeepSeek深度搜索消息, 交给子类实现*/public abstract void requestDeepSeekMessage(WebSocketSession session, String talkInfoId);/*** 添加消息到DeepSeek流式响应消息缓存列表中*/public void addMessageToCache(Map<String, String> deepSeekMessageMap, String talkInfoId) {deepSeekMessageCache.add(deepSeekMessageMap);if (deepSeekMessageCache.size() >= BATCH_SIZE) {saveCachedDeepSeekMessages(talkInfoId);}}/*** 保存DeepSeek流式响应消息缓存列表中的消息到数据库*/public void saveCachedDeepSeekMessages(String talkInfoId) {if (!deepSeekMessageCache.isEmpty()) {try {italkInfoService.saveTalkInfoDeepSeekHistory(Integer.parseInt(talkInfoId), deepSeekMessageCache);log.info("批量消息保存到数据库成功,数量: {}", deepSeekMessageCache.size());deepSeekMessageCache.clear();} catch (Exception e) {log.error("批量消息保存到数据库失败,数量: {}", deepSeekMessageCache.size(), e);}}}/*** 发送响应结束标记信息给前端, 前端会根据这个标记来判断是否继续接收消息,改变按钮状态从停止生成改为待发送状态*/public void sendEndMarker(WebSocketSession session) {try {session.sendMessage(new TextMessage("[END_OF_MESSAGE_GENERATE]"));} catch (IOException e) {log.error("发送结束标记失败", e);}}/*** 处理异常*/public void handleError(WebSocketSession session, Exception e) {log.error("请求处理异常", e);try {session.sendMessage(new TextMessage("系统错误: " + e.getMessage()));sendEndMarker(session);} catch (IOException ex) {log.error("发送错误信息失败", ex);}}
}
这段代码定义了一个抽象类 AbstractDeepSeekTool,用于处理与 DeepSeek 的交互。主要功能包括初始化 HTTP 客户端和线程池、处理用户消息、请求 DeepSeek 消息、缓存和批量保存消息到数据库、发送结束标记给前端以及处理异常。
- 初始化:设置 HTTP 客户端和线程池。
- 处理用户消息:将用户消息添加到会话历史记录并缓存。
- 请求 DeepSeek 消息:由子类实现具体逻辑。
- 缓存和批量保存:将消息缓存并在达到批量大小时保存到数据库。
- 发送结束标记:通知前端消息生成结束。
- 处理异常:捕获并处理异常,发送错误信息给前端。
控制流图
DeepSeekWebSocketHandler
package com.deepseek.project.websocket;import com.deepseek.project.constant.Constants;
import com.deepseek.project.model.TalkInfoDeepSeekHistory;
import com.deepseek.project.service.ITalkInfoService;
import com.deepseek.project.tool.AbstractDeepSeekTool;
import com.deepseek.project.tool.DeepSeekLocalTool;
import com.deepseek.project.tool.DeepSeekOnlineTool;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.gson.Gson;
import com.google.gson.reflect.TypeToken;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.jetbrains.annotations.NotNull;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import org.springframework.web.socket.CloseStatus;
import org.springframework.web.socket.TextMessage;
import org.springframework.web.socket.WebSocketSession;
import org.springframework.web.socket.handler.TextWebSocketHandler;import java.io.IOException;
import java.lang.reflect.Type;
import java.net.URI;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;/*** author: 胡磊* WebSocket处理器,用于处理与客户端的WebSocket通信*/
@Slf4j
@Component
public class DeepSeekWebSocketHandler extends TextWebSocketHandler {// 使用talkInfoId作为键的会话映射private final Map<String, AbstractDeepSeekTool> talkSessionMap = new ConcurrentHashMap<>();private static final Gson gson = new Gson();private static final ObjectMapper objectMapper = new ObjectMapper();@Resourceprivate ITalkInfoService italkInfoService;@Value("${deepseek.tool.type:local}")private String deepSeekToolType;@Overridepublic void afterConnectionEstablished(@NotNull WebSocketSession session) {String talkInfoId = extractTalkInfoId(session);if (talkInfoId == null) {closeWithError(session);return;}// 创建或获取已有会话工具talkSessionMap.computeIfAbsent(talkInfoId, id -> {log.info("构建新会话工具,talkInfoId: {}", talkInfoId);List<Map<String, String>> conversationHistory = new ArrayList<>();if (!talkInfoId.startsWith(Constants.TEMP_TALK_INFO_ID_PREFIX)) {//根据会话框id查询会话框历史记录List<TalkInfoDeepSeekHistory> historyList = italkInfoService.getDeepSeekHistoryListByTalkInfoId(Integer.parseInt(talkInfoId));for (TalkInfoDeepSeekHistory history : historyList) {Type type = new TypeToken<Map<String, String>>() {}.getType();Map<String, String> messageMap = gson.fromJson(history.getContent(), type);conversationHistory.add(messageMap);}}return createDeepSeekTool(conversationHistory);});}/*** 创建AbstractDeepSeekTool具体类的实例,根据配置的toolType参数来选择创建哪种类型的工具*/private AbstractDeepSeekTool createDeepSeekTool(List<Map<String, String>> conversationHistory) {if ("online".equalsIgnoreCase(deepSeekToolType)) {return new DeepSeekOnlineTool(italkInfoService, conversationHistory);} else {return new DeepSeekLocalTool(italkInfoService, conversationHistory);}}/*** 从WebSocketSession中提取talkInfoId参数值* 此方法主要用于从WebSocket连接的URI中提取出talkInfoId参数值,该参数值用于标识对话信息* 如果URI为空,或者没有找到talkInfoId参数,则返回null** @param session WebSocketSession对象,包含客户端与服务器之间的WebSocket连接信息* @return String 返回提取出的talkInfoId参数值,如果没有找到则返回null*/private String extractTalkInfoId(WebSocketSession session) {try {URI uri = session.getUri();if (uri == null) return null;return Arrays.stream(uri.getQuery().split("&")).filter(param -> param.startsWith("talkInfoId=")).findFirst().map(param -> param.split("=")[1]).orElse(null);} catch (Exception e) {log.error("解析talkInfoId失败", e);return null;}}/*** 处理WebSocket消息*/@Overrideprotected void handleTextMessage(@NotNull WebSocketSession session, @NotNull TextMessage message) {try {log.info("收到消息:{}", message.getPayload());JsonNode json = objectMapper.readTree(message.getPayload());String talkInfoId = json.get("talkInfoId").asText();String content = json.get("content").asText();AbstractDeepSeekTool tool = talkSessionMap.get(talkInfoId);if (tool == null) {log.warn("找不到对应的会话工具,talkInfoId: {}", talkInfoId);sendErrorMessage(session, "无效 session");return;}if ("stopSending".equalsIgnoreCase(content)) {handleStopCommand(tool, session);} else {handleNormalMessage(tool, session, content, talkInfoId);}} catch (IOException e) {log.error("消息解析失败", e);sendErrorMessage(session, "Invalid message format");}}/*** 处理前端点击发送的停止生成命令*/private void handleStopCommand(AbstractDeepSeekTool tool, WebSocketSession session) {tool.setStopSending(true);try {session.sendMessage(new TextMessage("[END_OF_MESSAGE_GENERATE]"));} catch (IOException e) {log.error("停止命令响应失败", e);}}/*** 处理普通消息*/private void handleNormalMessage(AbstractDeepSeekTool tool, WebSocketSession session, String content, String talkInfoId) {tool.setStopSending(false);try {tool.processMessage(content, session, talkInfoId);} catch (Exception e) {log.error("消息处理异常", e);sendErrorMessage(session, "消息处理发生异常");}}@Overridepublic void afterConnectionClosed(@NotNull WebSocketSession session, @NotNull CloseStatus status) {String talkInfoId = extractTalkInfoId(session);if (talkInfoId != null) {// 根据业务需求决定是否立即清理资源// 如果是持久化会话可以保留,临时会话则移除,因为每次刷新页面临时会话都会新建,原来的就没用了,需要移除if (talkInfoId.startsWith(Constants.TEMP_TALK_INFO_ID_PREFIX)) {talkSessionMap.remove(talkInfoId);}log.info("会话关闭,talkInfoId: {}", talkInfoId);}}/*** 发送错误消息*/private void sendErrorMessage(WebSocketSession session, String error) {try {session.sendMessage(new TextMessage(objectMapper.createObjectNode().put("type", "error").put("message", error).toString()));} catch (IOException e) {log.error("错误信息发送失败", e);}}private void closeWithError(WebSocketSession session) {try {session.close(new CloseStatus(CloseStatus.BAD_DATA.getCode(), "缺失talkInfoId会话框ID参数"));} catch (IOException e) {log.error("关闭连接失败", e);}}
}
这段代码实现了一个WebSocket处理器,用于处理与客户端的WebSocket通信。主要功能包括:
- 建立连接时提取talkInfoId并创建或获取会话工具。
- 处理接收到的消息,区分普通消息和停止命令。
- 关闭连接时清理资源。
控制流图
数据库设计
之所以要设计几张表,是因为要实现保存上下文会话历史消息记录的需要,这里笔者使用的是mysql,其实每次的问答都会生成大量的文本消息和大量的DeepSeek的历史消息JSON数据,使用MongoDB或者Elasticsearch作为持久化工具对于查询性能更好,但笔者这里没有折腾了,读者朋友可以自己决定使用什么存储工具。
一共三张表:talk_info 、talk_info_deepseek_history、talk_info_messages
- talk_info
这个是对话框列表,就是新建会话的会话框会在这里新增数据
create table talk_info
(id int auto_incrementprimary key,title varchar(32) not null comment '对话标题',create_time datetime not null
)comment '对话框表';
- talk_info_deepseek_history
这个是deepSeek记录上下文请求和响应历史消息的表
create table talk_info_deepseek_history
(id int auto_increment comment '主键'primary key,talk_info_id int not null comment '对话框id',content text collate utf8mb4_unicode_ci null,create_time datetime null
)comment '对话框的deepseek历史对话记录';
- talk_info_messages
这个是干嘛的呢,也是记录上下文对话文本消息的表,只不过是记录单纯的文本消息,用于前端对话框展示的,而deepSeek则要求按照一定的json格式组装,这样才能被其加载读取历史会话记录,所以分开存储了。
create table talk_info_messages
(id int auto_incrementprimary key,talk_info_id int not null comment '对话框id',message_type varchar(10) null comment '消息类型:user,bot',message text collate utf8mb4_unicode_ci null,create_time datetime not null
)comment '对话历史消息列表';
总结
以上就是笔者开发deepSeek对话web应用的整个过程了,笔者的技术能力有限,尤其是前端部分,很多样式的调整,其实是借助了大模型帮我调整的,不足之处请大家多多指教!