使用WebSocket和服务器建立双向通信-封装-demo
封装WebSocket
代码实现了一个 WebSocket 心跳连接的功能,用于保持 WebSocket 连接的稳定性。
该代码定义了一个
WebsocketHB
类,通过构造函数的参数来配置 WebSocket 连接的相关参数,包括连接地址、心跳包发送间隔、最长未接收消息的间隔、重连间隔、最大重连次数等。在
createWebSocket
方法中,通过创建 WebSocket 实例,并设置相应的事件处理函数,包括onclose
、onerror
、onopen
和onmessage
。这些事件处理函数分别在 WebSocket 连接关闭、连接错误、连接打开和接收到消息时被调用。其中,
heartBeat
方法用于发送心跳包,定时发送指定的心跳消息内容。readyReconnect
方法用于准备进行重连,当连接关闭或发生错误时调用该方法。reconnect
方法实现了具体的重连逻辑,根据重连次数和重连限制来控制重连的次数和间隔。clearAllTimer
方法用于清除所有定时器,包括心跳包定时器、接收消息定时器和重连定时器。destroyed
方法用于手动关闭连接并销毁 WebSocket 实例。使用时,可以实例化
WebsocketHB
类,并传入相应的配置参数。然后,根据需要调用相应的方法,如destroyed
方法手动关闭连接,或根据事件的触发情况自动进行重连。这段代码适用于需要保持 WebSocket 连接的稳定性,以及自动重连的场景,例如实时通讯、实时数据推送等。
class WebsocketHB {constructor({url, // 连接客户端地址pingTimeout = 8000, // 发送心跳包间隔,默认 8000 毫秒pongTimeout = 15000, // 最长未接收消息的间隔,默认 15000 毫秒reconnectTimeout = 4000, // 每次重连间隔reconnectLimit = 15, // 最大重连次数pingMsg = 'ping' // 心跳包的消息内容}) {// 初始化配置this.url = url;this.pingTimeout = pingTimeout;this.pongTimeout = pongTimeout;this.reconnectTimeout = reconnectTimeout;this.reconnectLimit = reconnectLimit;this.pingMsg = pingMsg;// 实例变量this.ws = null;this.pingTimer = null; // 心跳包定时器this.pongTimer = null; // 接收消息定时器this.reconnectTimer = null; // 重连定时器this.reconnectCount = 0; // 当前的重连次数this.lockReconnect = false; // 锁定重连this.lockReconnectTask = false; // 锁定重连任务队列this.createWebSocket();}createWebSocket() {// ...this.ws = new WebSocket(this.url);this.ws.onclose = () => {this.onclose();this.readyReconnect();};this.ws.onerror = () => {this.onerror();this.readyReconnect();};this.ws.onopen = () => {this.onopen();this.clearAllTimer();this.heartBeat();this.reconnectCount = 0;// 解锁,可以重连this.lockReconnect = false;};this.ws.onmessage = (event) => {this.onmessage(event);// 超时定时器clearTimeout(this.pongTimer);this.pongTimer = setTimeout(() => {this.readyReconnect();}, this.pongTimeout);};}// 发送心跳包heartBeat() {this.pingTimer = setTimeout(() => {if (this.ws) {if (this.pingMsg) {this.ws.send(this.pingMsg);} else {this.ws.ping();}this.heartBeat();} else {this.readyReconnect();}}, this.pingTimeout);}// 准备重连readyReconnect() {// 避免循环重连,当一个重连任务进行时,不进行重连if (this.lockReconnectTask) return;this.lockReconnectTask = true;this.clearAllTimer();this.reconnect();}// 重连reconnect() {if (this.forbidReconnect) return;if (this.lockReconnect) return;if (this.reconnectCount > this.reconnectLimit) return;// 加锁,禁止重连this.lockReconnect = true;this.reconnectCount += 1;this.createWebSocket();this.reconnectTimer = setTimeout(() => {// 解锁,可以重连this.lockReconnect = false;this.reconnect();}, this.reconnectTimeout);}// 清空所有定时器clearAllTimer() {clearTimeout(this.pingTimer);clearTimeout(this.pongTimer);clearTimeout(this.reconnectTimer);}// 销毁 wsdestroyed() {// 如果手动关闭连接,不再重连this.forbidReconnect = true;this.clearAllTimer();this.ws && this.ws.close();}
}export default WebsocketHB;
使用
import WebsocketHB from '@/utils/websocketHB';
socket() {const ws = new WebsocketHB({url: this.loginData.wsSubscribe});
//当 WebSocket 连接成功建立后,可以根据需要发送订阅请求或执行其他操作。在这段被注释的代码中,使用 ws.ws.send 方法发送一个 JSON 格式的订阅请求。ws.onopen = () => {console.log('connect success');// ws.ws.send(JSON.stringify({// method: 'SUBSCRIBE',// params: ['123456'],// id: 111// }))};
//处理接收到的消息ws.onmessage = (e) => {let data = JSON.parse(e.data);console.log(data, 888);if (typeof data == 'string') {data = JSON.parse(data);}if(data.ch && chHandle[data.ch]){chHandle[data.ch](data)}};
//处理连接错误ws.onerror = () => {console.log('connect onerror');};
//处理连接关闭ws.onclose = () => {console.log('connect onclose');};},
处理返回数据
去匹配后端推送的字段key,将匹配上的更新数据
const chHandle = {'MIX_POSITION' : (msg) => {const {data: {BINANCE, BITGET, OKX}} = msg;const list = [...BINANCE, ...BITGET, ...OKX];// store.commit('trade/setCurrentPositions', list)},'MIX_ORDER': (msg) => {const {data} = msg;// store.commit('trade/setOrderChangeData', data)}}
完善
class WebsocketHB {constructor({url, // 连接客户端地址pingTimeout = 8000, // 发送心跳包间隔,默认 8000 毫秒pongTimeout = 15000, // 最长未接收消息的间隔,默认 15000 毫秒reconnectTimeout = 4000, // 每次重连间隔reconnectLimit = 15, // 最大重连次数pingMsg = 'ping' // 心跳包的消息内容}) {// 初始化配置this.url = url;this.pingTimeout = pingTimeout;this.pongTimeout = pongTimeout;this.reconnectTimeout = reconnectTimeout;this.reconnectLimit = reconnectLimit;this.pingMsg = pingMsg;// 实例变量this.ws = null;this.pingTimer = null; // 心跳包定时器this.pongTimer = null; // 接收消息定时器this.reconnectTimer = null; // 重连定时器this.reconnectCount = 0; // 当前的重连次数this.lockReconnect = false; // 锁定重连this.lockReconnectTask = false; // 锁定重连任务队列this.createWebSocket();}createWebSocket() {this.ws = new WebSocket(this.url);this.ws.onclose = () => {this.onclose();this.readyReconnect();};this.ws.onerror = () => {this.onerror();this.readyReconnect();};this.ws.onopen = () => {this.onopen();this.clearAllTimer();this.heartBeat();this.reconnectCount = 0;// 解锁,可以重连this.lockReconnect = false;};this.ws.onmessage = (event) => {this.onmessage(event);clearTimeout(this.pongTimer);// 超时定时器this.pongTimer = setTimeout(() => {this.readyReconnect();}, this.pongTimeout);};}// heartBeat() {// this.pingTimer = setTimeout(() => {// if (this.ws.readyState === WebSocket.OPEN) {// this.ws.send(this.pingMsg);// this.heartBeat();// }// }, this.pingTimeout);// }// 发送心跳包heartBeat() {this.pingTimer = setTimeout(() => {if (this.ws) {if (this.pingMsg) {this.ws.send(this.pingMsg);} else {this.ws.ping();}this.heartBeat();} else {this.readyReconnect();}}, this.pingTimeout);}// 准备重连readyReconnect() {// 避免循环重连,当一个重连任务进行时,不进行重连if (this.lockReconnectTask) return;this.lockReconnectTask = true;this.clearAllTimer();this.reconnect();}// 重连reconnect() {if (this.forbidReconnect) return;if (this.lockReconnect) return;if (this.reconnectCount > this.reconnectLimit) return;// 加锁,禁止重连this.lockReconnect = true;this.reconnectCount += 1;this.createWebSocket();this.reconnectTimer = setTimeout(() => {// 解锁,禁止重连this.lockReconnect = false;this.reconnect();}, this.reconnectTimeout);}// 清空所有定时器clearAllTimer() {clearTimeout(this.pingTimer);clearTimeout(this.pongTimer);clearTimeout(this.reconnectTimer);}// 销毁 wsdestroyed() {// 如果手动关闭连接,不再重连this.forbidReconnect = true;this.clearAllTimer();this.ws && this.ws.close();}// 添加事件监听机制addEventListener(event, callback) {this[event] = callback;}// 触发自定义事件triggerEvent(event, data) {if (typeof this[event] === 'function') {this[event](data);}}
}export default WebsocketHB;
改进和完善的主要点包括:
在
heartBeat
方法中,增加了对 WebSocket 连接状态的检查,确保只在连接处于打开状态时发送心跳包。添加了
addEventListener
方法,用于注册自定义事件的监听器。添加了
triggerEvent
方法,用于触发注册的自定义事件。这些改进和完善的建议可以提高代码的可读性、可维护性和可扩展性。