MCP实战派【1】-SSE
Know——MCP【1】
What is MCP and Why need MCP and How start?
这个问题不想阐述了,看官网说明最好~https://modelcontextprotocol.io/introduction
MCP 目前主要有2种传输协议
-
Stdio
-
SSE+HTTP/Streamable HTTP
SSE+HTTP
本篇文章主要讲述的是 SSE+HTTP 的传输协议
具体SSE+HTTP 是怎么运行的,怎么去开发一个这样的服务,会从开发一个 dify openapi 转mcp sse 插件 的角度去分析下怎么去开发
什么是 SSE ?SSE 的全称是 Server-Sent Events ,这个协议主要是解决 浏览器往客户端消息推送的,就像流式处理一样,了解 openai sdk的使用的人知道,我们如果开启 stream 等于 true ,正常我们就是采用的这种方式
其实看了 很多网上的文章,说 MCP,但是没有将清楚 MCP 中SSE 的模式究竟是怎么运行的
具体可以看的画的简陋版本的流程图~
流式的请求 我都使用了虚线,从流程图中 可以看的很清楚,SSE 是从客户端发起和服务端的一个 get长连接,这个流式通道负责是从服务器推送实时的消息给客户端,客户端 发起非 SSE 的请求使用的都是post,但是服务端返回的时候 只返回202,accept, 具体返回的消息是通过 SSE 的的长连接通道返回的
仔细想想这种传输协议设计的挺有意思的,说白了就是做了一个异步编程,客户端请求服务端,但是服务端由于处理消息比较慢等原因,这里可以想象下我们在使用 GPT Chat 的时候,如果不采用流式的,估计一个复制的的请求要等个半天页面才有响应,这样 的设计也是迎合了目前LLM 都是采用 token 流式输出的原因,MCP SSE 为了解决这个问题,就是采用先让客户端建长连接,然后处理客户端请求的时候,直接返回,然后通过 SSE 的连接通道输出流式的消息,不得不说 还是挺棒的设计
当然这样 SSE+HTTP 的模式,也有弊端,
-
首先第一个可以看到,这样的设计对MCP的Service要求挺高的,要维护一个长连接,如果使用多了的话,服务进行分布式部署或者水平扩展的时候,都会遇到一个连接问题
-
第二个就是,SSE 的模式是单向的,只能服务器推送客户端,客户端不能主动通过 SSE 和服务端交互,而已这个单向的设计,
-
还有一个问题就是当服务器不可用的时候,实际上客户端是感知不到的,我就遇到了这个问题,还有就是当服务端回复的时候,是不能做到状态恢复的
当然说了这么多 ,还不如看官方的内容,https://github.com/modelcontextprotocol/modelcontextprotocol/pull/206,3月24号的时候,社区已经提交了Streamable HTTP 的合并,具体的差异,可以进文章看看~
开发MCP SSE 服务
def create_sse_message(event, data):return f"event: {event}\ndata: {json.dumps(data) if isinstance(data, (dict, list)) else data}\n\n"class SSEEndpoint(Endpoint):def _invoke(self, r: Request, values: Mapping, settings: Mapping) -> Response:"""Invokes the endpoint with the given request."""session_id = str(uuid.uuid4()).replace("-", "")def generate():endpoint = f"messages/?session_id={session_id}"yield create_sse_message("endpoint", endpoint)while True:message = Nonetry:message = self.session.storage.get(session_id)except:passif message is None:time.sleep(0.5)continuemessage = message.decode()self.session.storage.delete(session_id)yield create_sse_message("message", message)return Response(generate(), status=200, content_type="text/event-stream")
上面这段代码,是我 dify 插件中的部分代码,可以看到 当客户端第一次访问 endpont/sse 地址的时候,服务器返回一个 text/event-stream ,意思就是告诉客户端 我这个是一个长连接,我会不断的输出内容给你,第一个返回就是messages/?session_id= 的地址,是告诉客户端,后续的请求,可以使用这个地址
当客户端接收到服务器端的第一个消息的时候,客户端会根据这个新的地址 去进行初始化
采用 post 的请求 endpont/messages 地址,方法是 initialize
def _handle_initialize(self, data: Dict[str, Any]) -> Dict[str, Any]:"""处理 initialize 请求。Args:data (Dict[str, Any]): The request data.Returns:Dict[str, Any]: The response data."""return {"jsonrpc": "2.0","id": data.get("id"),"result": {"protocolVersion": "2024-11-05","capabilities": {"experimental": {},"prompts": {"listChanged": False},"resources": {"subscribe": False, "listChanged": False},"tools": {"listChanged": False},},"serverInfo": {"name": "Dify", "version": "1.3.0"},},}
就想我说的,服务端 去处理 initialize的请求,的返回是通过 SSE 的长链接返回的
后面的 tools/list的请求也是同理
具体要支持多少 methond 可以看下官方的说明:
备注:dify插件 openapi 转 MCP SSE的 代码在这里:
https://github.com/xunberg/dify-plugin-openapi-to-mcp
下个版本去支持Streamable+HTTP