golang如何实现sse
Server-Sent Events (SSE) 的实现主要是通过逐步读取 HTTP 响应体并以事件流的形式将数据推送到客户端。下面我会详细解释代码中如何实现 SSE。
SSE (Server-Sent Events) 简介
SSE 是一种在客户端与服务器之间建立单向持久连接的技术,允许服务器在有新数据时主动推送到客户端。与传统的 HTTP 请求-响应模型不同,SSE 使用一个持久连接来推送数据,而不需要客户端反复发起请求。
代码分析
1.设置响应头部:
服务器首先通过设置响应头,指定内容类型为 text/event-stream,并且禁用缓存、保持连接活跃:
c.Writer.Header().Set("Content-Type", "text/event-stream")c.Writer.Header().Set("Cache-Control", "no-cache")c.Writer.Header().Set("Connection", "keep-alive")
2.Content-Type: text/event-stream 告诉浏览器这是一个 SSE 流。
3.Cache-Control: no-cache 禁止缓存,确保每次客户端接收到的是最新的事件数据。
4.Connection: keep-alive 确保 HTTP 连接在数据传输期间保持活跃。
5.发起 HTTP 请求:
SendChatRequestPlus 函数创建并发送了一个 HTTP POST 请求到 https://api.lingyiwanwu.com/v1/chat/completions,该请求的请求体是传入的 body 参数。请求的目的可能是请求一个聊天机器人服务的回答。
req, err := http.NewRequest("POST", "https://api.lingyiwanwu.com/v1/chat/completions", bytes.NewBuffer(jsonData))
其中 jsonData 是将 body 序列化后的 JSON 数据。
6.处理响应:
请求发送后,代码等待来自服务端的响应。服务端返回的数据是逐行传送的,并且每行都以 data: 开头,这是 SSE 的标准格式。
scanner := bufio.NewScanner(resp.Body)for scanner.Scan() {line := scanner.Text()if strings.HasPrefix(line, "data: ") {jsonData := strings.TrimPrefix(line, "data: ")// 进一步处理 jsonData}}
7.使用 bufio.Scanner 来逐行读取响应体,每一行可能包含一个 SSE 数据事件。
8.每一行数据以 data: 开头,后面是实际的 JSON 数据。
例如,如果响应体是这样:
data: {"message": "Hello, world!"}data: {"message": "How are you?"}
这就意味着服务端通过 SSE 向客户端发送了两个消息。
9.解码 JSON 数据并格式化:
读取到每行 data: 后面的 JSON 数据后,将其解码为 responseBody,这是一个 map[string]interface{} 类型。
if err := json.Unmarshal([]byte(jsonData), &responseBody); err != nil {c.String(http.StatusInternalServerError, "error decoding JSON: %s", err)return}
然后,将解码后的 responseBody 再次序列化为 JSON 格式,并通过 SSE 格式发送到客户端。
10.发送数据到客户端:
将格式化后的 JSON 数据通过 fmt.Fprintf 按 SSE 格式发送到客户端。注意,数据是以 data: <json_data>\n\n 这种格式发送的。
eventData, err := json.Marshal(responseBody)if err != nil {c.String(http.StatusInternalServerError, "error marshalling response: %s", err)return}fmt.Fprintf(c.Writer, "data: %s\n\n", eventData)c.Writer.Flush() // 刷新响应流
11.data: %s\n\n 表示发送一个 SSE 消息,其中 data 是消息的内容。
12.c.Writer.Flush() 强制将响应刷新到客户端,确保消息实时传送。
13.控制发送频率:
在代码中,time.Sleep(1 * time.Second) 用来控制每次发送的频率。这里设置的是每秒钟发送一次事件。如果需要,可以调整这个时间间隔。
time.Sleep(1 * time.Second)
14.处理错误:
在整个过程中,代码会处理可能出现的各种错误,例如 JSON 编码/解码错误、HTTP 请求错误、响应读取错误等。如果发生错误,服务器会返回相应的错误信息,并停止处理。
总结
在这段代码中,SSE 的实现核心是通过将服务器的响应逐行读取,并格式化为符合 SSE 协议的格式 (data: <json_data>\n\n) 推送给客户端。这种方法确保了服务器能够持续向客户端推送数据,而无需客户端持续发送请求。通过设置正确的响应头和适当的处理每一行 data: 数据,最终实现了 Server-Sent Events 的功能。
改进建议
15.错误处理:如果服务端响应的状态码不为 200 OK,可以在响应头中返回更详细的错误信息,而不仅仅是一个通用的错误消息。
16.连接管理:考虑到 SSE 的长连接特性,可能需要处理连接超时或客户端断开连接的情况。