当前位置: 首页 > news >正文

WebSocket 实现指南

WebSocket 实现指南

目录

1. 依赖安装

1.1 安装必要的包

# 安装 gorilla/websocket
go get github.com/gorilla/websocket# 安装 gin 框架
go get github.com/gin-gonic/gin

1.2 更新 go.mod

require (github.com/gin-gonic/gin v1.9.1github.com/gorilla/websocket v1.5.3
)

1.3 配置文件

1.3.1Config.go
package configimport ("os""gopkg.in/yaml.v3"
)type Config struct {Server    ServerConfig    `yaml:"server"`MySQL     MySQLConfig     `yaml:"mysql"`Redis     RedisConfig     `yaml:"redis"`JWT       JWTConfig       `yaml:"jwt"`WebSocket WebSocketConfig `yaml:"websocket"`
}type ServerConfig struct {Port int    `yaml:"port"`Mode string `yaml:"mode"`
}type MySQLConfig struct {Host     string `yaml:"host"`Port     int    `yaml:"port"`User     string `yaml:"user"`Password string `yaml:"password"`DBName   string `yaml:"dbname"`
}type RedisConfig struct {Host     string `yaml:"host"`Port     int    `yaml:"port"`Password string `yaml:"password"`DB       int    `yaml:"db"`
}type JWTConfig struct {Secret string `yaml:"secret"`Expire int    `yaml:"expire"` // token过期时间(小时)
}type WebSocketConfig struct {ReadBufferSize  int   `yaml:"read_buffer_size"`WriteBufferSize int   `yaml:"write_buffer_size"`MaxMessageSize  int64 `yaml:"max_message_size"`WriteWait       int   `yaml:"write_wait"`PongWait        int   `yaml:"pong_wait"`PingPeriod      int   `yaml:"ping_period"`MaxConnections  int   `yaml:"max_connections"`
}var GlobalConfig Configfunc Init() error {// 确保日志目录存在os.MkdirAll("logs", 0755)file, err := os.ReadFile("config/config.yaml")if err != nil {return err}return yaml.Unmarshal(file, &GlobalConfig)
}
1.3.2 config.yaml
server:port: 8080mode: debug  # debug or releasewebsocket:read_buffer_size: 1024write_buffer_size: 1024max_message_size: 512write_wait: 10     # secondspong_wait: 60      # secondsping_period: 54    # secondsmax_connections: 5000mysql:host: localhostport: 3306user: rootpassword: "123456"dbname: my_kingdomredis:host: localhostport: 6379password: "123456"db: 0jwt:secret: "XueZhimin"expire: 24  # hours

2. 代码实现

2.1 WebSocket管理器 (manager.go)

package websocketimport ("encoding/json""fmt""mykingdom/config""net/http""sync""github.com/gin-gonic/gin""github.com/gorilla/websocket"
)// Manager WebSocket管理器
type Manager struct {clients   sync.Map    // 所有客户端连接broadcast chan []byte // 广播消息通道config    *config.WebSocketConfigmessages  chan Message // 新增:消息处理通道
}// Message 定义消息结构
type Message struct {Type     string      `json:"type"`Data     interface{} `json:"data"`ClientID string      `json:"-"` // 发送者的连接ID
}// 配置websocket upgrader
var upgrader = websocket.Upgrader{ReadBufferSize:  1024,WriteBufferSize: 1024,CheckOrigin: func(r *http.Request) bool {return true // 允许所有跨域请求},
}// NewManager 创建WebSocket管理器
func NewManager(config *config.WebSocketConfig) *Manager {return &Manager{broadcast: make(chan []byte),messages:  make(chan Message, 256), // 新增:初始化消息通道config:    config,}
}// HandleWebSocket 处理WebSocket连接
func (m *Manager) HandleWebSocket() gin.HandlerFunc {return func(c *gin.Context) {// 检查连接数限制count := 0m.clients.Range(func(_, _ interface{}) bool {count++return true})if count >= m.config.MaxConnections {c.JSON(http.StatusServiceUnavailable, gin.H{"message": "达到最大连接数限制",})return}conn, done := Upgrade(c)if done {return}// 创建新的客户端连接client := &Client{conn:    conn,manager: m,send:    make(chan []byte, 256),}// 存储客户端连接m.clients.Store(client.conn.RemoteAddr().String(), client)// 启动读写协程go client.readPump()go client.writePump()}
}// Upgrade 升级HTTP连接为WebSocket连接
func Upgrade(c *gin.Context) (*websocket.Conn, bool) {conn, err := upgrader.Upgrade(c.Writer, c.Request, nil)if err != nil {return nil, true}return conn, false
}// Broadcast 广播消息给所有客户端
func (m *Manager) Broadcast(message []byte) {m.clients.Range(func(_, value interface{}) bool {if client, ok := value.(*Client); ok {select {case client.send <- message:default:m.clients.Delete(client.conn.RemoteAddr().String())close(client.send)}}return true})
}// SendToClient 发送消息给指定客户端
func (m *Manager) SendToClient(clientAddr string, message []byte) bool {if value, ok := m.clients.Load(clientAddr); ok {if client, ok := value.(*Client); ok {client.send <- messagereturn true}}return false
}// SendMessage 发送任意类型的消息
func (m *Manager) SendMessage(messageType string, data interface{}) error {message := Message{Type: messageType,Data: data,}// 将消息转换为JSONjsonData, err := json.Marshal(message)if err != nil {return fmt.Errorf("marshal message failed: %v", err)}// 广播消息m.Broadcast(jsonData)return nil
}// SendMessageToClient 发送消息给指定客户端
func (m *Manager) SendMessageToClient(clientAddr string, messageType string, data interface{}) error {message := Message{Type: messageType,Data: data,}// 将消息转换为JSONjsonData, err := json.Marshal(message)if err != nil {return fmt.Errorf("marshal message failed: %v", err)}// 发送给指定客户端if !m.SendToClient(clientAddr, jsonData) {return fmt.Errorf("client not found: %s", clientAddr)}return nil
}// GetMessages 获取消息通道,用于读取消息
func (m *Manager) GetMessages() <-chan Message {return m.messages
}// ReadMessage 读取消息
func ReadMessage(conn *websocket.Conn) ([]byte, error) {_, message, err := conn.ReadMessage()if err != nil {return nil, fmt.Errorf("read message failed: %v", err)}return message, nil
}// WriteMessage 发送消息
func WriteMessage(conn *websocket.Conn, message []byte) error {err := conn.WriteMessage(websocket.TextMessage, message)if err != nil {return fmt.Errorf("write message failed: %v", err)}return nil
}// WriteJSON 发送JSON消息
func WriteJSON(conn *websocket.Conn, v interface{}) error {err := conn.WriteJSON(v)if err != nil {return fmt.Errorf("write json failed: %v", err)}return nil
}

2.2 客户端实现 (client.go)

package websocketimport ("encoding/json""log""time""github.com/gorilla/websocket"
)const (writeWait  = 10 * time.SecondpongWait   = 60 * time.SecondpingPeriod = (pongWait * 9) / 10
)// Client WebSocket客户端
type Client struct {conn    *websocket.Connmanager *Managersend    chan []byte
}// readPump 从WebSocket连接读取消息
func (c *Client) readPump() {defer func() {c.manager.clients.Delete(c.conn.RemoteAddr().String())c.conn.Close()}()c.conn.SetReadDeadline(time.Now().Add(pongWait))c.conn.SetPongHandler(func(string) error {c.conn.SetReadDeadline(time.Now().Add(pongWait))return nil})for {_, message, err := c.conn.ReadMessage()if err != nil {if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) {log.Printf("error: %v", err)}break}// 尝试解析消息为Message结构var msg Messageif err := json.Unmarshal(message, &msg); err != nil {log.Printf("unmarshal message failed: %v", err)continue}// 设置发送者IDmsg.ClientID = c.conn.RemoteAddr().String()// 将消息发送到消息通道c.manager.messages <- msg// 广播消息给所有客户端c.manager.Broadcast(message)}
}// writePump 向WebSocket连接写入消息
func (c *Client) writePump() {ticker := time.NewTicker(pingPeriod)defer func() {ticker.Stop()c.conn.Close()}()for {select {case message, ok := <-c.send:c.conn.SetWriteDeadline(time.Now().Add(writeWait))if !ok {c.conn.WriteMessage(websocket.CloseMessage, []byte{})return}w, err := c.conn.NextWriter(websocket.TextMessage)if err != nil {return}w.Write(message)if err := w.Close(); err != nil {return}case <-ticker.C:c.conn.SetWriteDeadline(time.Now().Add(writeWait))if err := c.conn.WriteMessage(websocket.PingMessage, nil); err != nil {return}}}
}

3. 配置说明

3.1 WebSocket配置 (config.yaml)

websocket:read_buffer_size: 1024    # 读缓冲区大小write_buffer_size: 1024   # 写缓冲区大小max_connections: 5000     # 最大连接数

3.2 配置结构体 (config.go)

type WebSocketConfig struct {ReadBufferSize  int   `yaml:"read_buffer_size"`WriteBufferSize int   `yaml:"write_buffer_size"`MaxConnections  int   `yaml:"max_connections"`
}

4. 使用示例

4.1 服务端示例

func main() {r := gin.Default()// 创建WebSocket管理器wsManager := websocket.NewManager(&config.GlobalConfig.WebSocket)// WebSocket连接r.GET("/ws", wsManager.HandleWebSocket())// 广播消息r.POST("/broadcast", func(c *gin.Context) {message := c.PostForm("message")wsManager.Broadcast([]byte(message))c.JSON(200, gin.H{"message": "broadcast sent"})})r.Run(":8080")
}

4.2 前端示例

// 连接WebSocket
const ws = new WebSocket('ws://localhost:8080/ws')// 连接成功
ws.onopen = () => {console.log('连接成功')
}// 接收消息
ws.onmessage = (event) => {console.log('收到消息:', event.data)
}// 发送消息
ws.send('Hello, World!')

4.3 Vue组件示例

<template><div><div>连接状态: {{ isConnected ? '已连接' : '未连接' }}</div><input v-model="message" @keyup.enter="sendMessage"><button @click="sendMessage">发送</button></div>
</template><script setup>
import { ref, onMounted, onUnmounted } from 'vue'const ws = ref(null)
const isConnected = ref(false)
const message = ref('')const connect = () => {ws.value = new WebSocket('ws://localhost:8080/ws')ws.value.onopen = () => isConnected.value = truews.value.onclose = () => isConnected.value = falsews.value.onmessage = (event) => {console.log('收到消息:', event.data)}
}const sendMessage = () => {if (isConnected.value && message.value) {ws.value.send(message.value)message.value = ''}
}onMounted(() => connect())
onUnmounted(() => ws.value?.close())
</script>

4.4 消息收发示例

4.4.1 消息结构
// Message 定义消息结构
type Message struct {Type     string      `json:"type"`    // 消息类型Data     interface{} `json:"data"`    // 消息数据ClientID string      `json:"-"`       // 发送者的连接ID
}
4.4.2 发送消息
// 1. 广播消息给所有客户端
err := wsManager.SendMessage("chat", map[string]interface{}{"user": "系统","content": "欢迎新用户加入",
})// 2. 发送消息给指定客户端
err := wsManager.SendMessageToClient(clientAddr, "private", map[string]interface{}{"content": "这是一条私信","time": time.Now(),
})// 3. 发送游戏动作
err := wsManager.SendMessage("game_action", map[string]interface{}{"action": "move","position": map[string]int{"x": 100,"y": 200,},
})
4.4.3 接收和处理消息
// 启动消息处理协程
go func() {// 获取消息通道msgChan := wsManager.GetMessages()// 循环处理消息for msg := range msgChan {// 根据消息类型处理switch msg.Type {case "chat":handleChatMessage(msg)case "game_action":handleGameAction(msg)case "private":handlePrivateMessage(msg)default:log.Printf("未知消息类型: %s", msg.Type)}}
}()// 处理聊天消息
func handleChatMessage(msg websocket.Message) {data, ok := msg.Data.(map[string]interface{})if !ok {return}log.Printf("来自 %s 的聊天消息: %v", msg.ClientID, data["content"])
}// 处理游戏动作
func handleGameAction(msg websocket.Message) {data, ok := msg.Data.(map[string]interface{})if !ok {return}log.Printf("来自 %s 的游戏动作: %v", msg.ClientID, data["action"])
}
4.4.4 前端发送消息示例
// 1. 发送聊天消息
ws.send(JSON.stringify({type: 'chat',data: {content: '大家好!'}
}))// 2. 发送游戏动作
ws.send(JSON.stringify({type: 'game_action',data: {action: 'move',position: {x: 100,y: 200}}
}))
4.4.5 前端接收消息示例
ws.onmessage = (event) => {try {const message = JSON.parse(event.data)// 根据消息类型处理switch (message.type) {case 'chat':handleChat(message.data)breakcase 'game_action':handleGameAction(message.data)breakcase 'private':handlePrivateMessage(message.data)breakdefault:console.log('未知消息类型:', message.type)}} catch (error) {console.error('解析消息失败:', error)}
}// 处理聊天消息
function handleChat(data) {console.log(`${data.user}: ${data.content}`)
}// 处理游戏动作
function handleGameAction(data) {console.log('游戏动作:', data.action)updateGameState(data.position)
}// 处理私信
function handlePrivateMessage(data) {console.log('收到私信:', data.content)
}

4.5 错误处理示例

// 发送消息时的错误处理
if err := wsManager.SendMessage("chat", data); err != nil {log.Printf("发送消息失败: %v", err)
}// 发送私信时的错误处理
if err := wsManager.SendMessageToClient(clientAddr, "private", data); err != nil {if strings.Contains(err.Error(), "client not found") {log.Printf("用户已离线: %s", clientAddr)} else {log.Printf("发送私信失败: %v", err)}
}

4.6 消息类型建议

const (// 系统消息MessageTypeSystem     = "system"      // 系统通知MessageTypeError      = "error"       // 错误消息// 聊天消息MessageTypeChat       = "chat"        // 公共聊天MessageTypePrivate    = "private"     // 私聊消息// 游戏消息MessageTypeGameAction = "game_action" // 游戏动作MessageTypeGameState  = "game_state"  // 游戏状态MessageTypeMatch      = "match"       // 匹配相关
)

5. 框架整合

5.1 与Gin框架整合

// 中间件:验证WebSocket连接
func AuthWebSocket() gin.HandlerFunc {return func(c *gin.Context) {// 验证tokentoken := c.Query("token")if !validateToken(token) {c.AbortWithStatus(http.StatusUnauthorized)return}c.Next()}
}// 使用中间件
r.GET("/ws", AuthWebSocket(), wsManager.HandleWebSocket())

5.2 与Nginx整合

# nginx.conf
http {map $http_upgrade $connection_upgrade {default upgrade;'' close;}upstream websocket {server 127.0.0.1:8080;}server {listen 80;server_name example.com;# WebSocket代理location /ws {proxy_pass http://websocket;proxy_http_version 1.1;proxy_set_header Upgrade $http_upgrade;proxy_set_header Connection $connection_upgrade;proxy_set_header Host $host;}# 其他HTTP请求location / {proxy_pass http://127.0.0.1:8080;proxy_set_header Host $host;proxy_set_header X-Real-IP $remote_addr;}}
}

6. 部署说明

6.1 服务器要求

  • 支持WebSocket的现代浏览器
  • Go 1.16+
  • Nginx 1.16+(如果使用)

6.2 部署步骤

  1. 编译Go程序
go build -o server cmd/main.go
  1. 配置Nginx(如果使用)
# 复制nginx配置
sudo cp nginx.conf /etc/nginx/conf.d/websocket.conf# 重启Nginx
sudo systemctl restart nginx
  1. 运行服务
./server

6.3 注意事项

  1. 连接管理

    • 定期清理断开的连接
    • 实现重连机制
    • 控制连接数量
  2. 安全性

    • 添加连接认证
    • 限制消息大小
    • 添加消息验证
  3. 性能优化

    • 使用消息队列
    • 控制广播频率
    • 添加负载均衡
  4. 监控

    • 记录连接数
    • 监控消息流量
    • 错误日志记录

http://www.mrgr.cn/news/83039.html

相关文章:

  • 初学stm32 --- ADC模拟/数字转换器工作原理
  • 应急响应——Windows / Linux 排查笔记
  • python 中的 json 库使用
  • 游戏引擎学习第77天
  • homework 2025.01.07 math 6
  • QFiledoalog::getSaveFileName用法有哪些,如何设置默认路径、默认文件名、默认后缀?
  • 排序算法:冒泡排序
  • windows从0开始配置llamafactory微调chatglm3-6b
  • 【C语言】可移植性陷阱与缺陷(八): 随机数的大小
  • UE5 打包要点
  • 【学习笔记】数据结构(十)
  • Halcon在linux及ARM上的安装及c++工程化
  • 豆包ai 生成动态tree 增、删、改以及上移下移 html+jquery
  • React(二)——Admin主页/Orders页面/Category页面
  • 120.Jenkins里的Pipeline Script
  • PyCharm简单调试
  • 左神算法基础巩固--3
  • SpringBootWeb案例-1(day10)
  • jenkins入门13--pipeline
  • 2020 年 12 月青少年软编等考 C 语言五级真题解析
  • moviepy 将mp4视频文件提取音频mp3 - python 实现
  • Linux初识——基本指令
  • Requests-数据解析bs4+xpath
  • 【0385】Postgres内核 OS 磁盘上创建 slot ( 3 - 1 )
  • STM32-笔记38-I2C-oled实验
  • STM32-DMA数据转运