Files
3d_test/py/server.py
2026-05-18 19:15:44 +08:00

113 lines
3.8 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import random
import asyncio
from fastapi import FastAPI, WebSocket, WebSocketDisconnect, Request
from fastapi.middleware.cors import CORSMiddleware
from starlette.routing import Route, Mount
from mcp.server import Server
from mcp.server.sse import SseServerTransport
from mcp.types import Tool, TextContent
connected_clients: list[WebSocket] = []
mcp_server = Server("cherry-studio-park-mcp")
# 1. MCP SSE 传输配置
# Cherry Studio 连接 /sse消息路由 /messages/
sse = SseServerTransport("/messages/")
async def mcp_endpoint(request: Request):
# 标准 SSE 握手,连接 MCP Server
async with sse.connect_sse(request.scope, request.receive, request._send) as (read_stream, write_stream):
await mcp_server.run(read_stream, write_stream, mcp_server.create_initialization_options())
# 2. 核心逻辑:发送告警到前端
async def send_alert(floor: str):
lights = random.sample(["f9-sphere1", "f9-sphere2", "f9-sphere3", "f9-sphere4"], k=random.randint(1, 2))
msg = {
"action": "show_alert",
"floor": floor,
"lights": lights
}
# 发送给所有连接的 Vue 客户端
for client in list(connected_clients):
try:
await client.send_json(msg)
except:
pass
return msg
# 3. MCP 工具定义
@mcp_server.list_tools()
async def list_tools():
return [
Tool(
name="query_crowded_floor",
description="查询哪一层出现拥挤情况,返回楼层编号",
inputSchema={"type": "object", "properties": {}, "required": []}
),
Tool(
name="show_alert_on_floor",
description="在三维平台展示告警效果,跳转到指定楼层并显示拥挤情况",
inputSchema={
"type": "object",
"properties": {
"floor": {"type": "string", "description": "楼层编号,如 F9、F8、B1 等"}
},
"required": ["floor"]
}
)
]
@mcp_server.call_tool()
async def call_tool(name: str, arguments: dict):
try:
if name == "query_crowded_floor":
return [TextContent(type="text", text="第九层")]
if name == "show_alert_on_floor":
floor = arguments.get("floor", "F9")
msg = await send_alert(floor)
return [TextContent(type="text", text=f"已在 {floor} 层触发告警,点亮了 {msg['lights']}")]
return [TextContent(type="text", text="未知工具")]
except Exception as e:
return [TextContent(type="text", text=f"执行失败: {str(e)}")]
# 4. FastAPI 路由注册
app = FastAPI(title="园区3D控制MCP服务")
app.add_middleware(CORSMiddleware, allow_origins=["*"], allow_methods=["*"], allow_headers=["*"])
# 注册 SSE 端点 (Cherry Studio 连接此地址)
app.add_route("/sse", mcp_endpoint, methods=["GET"])
# 别名,防止 Cherry Studio 尝试 /mcp/sse
app.add_route("/mcp/sse", mcp_endpoint, methods=["GET"])
# 挂载消息处理路由
app.mount("/messages/", sse.handle_post_message)
app.mount("/mcp/messages/", sse.handle_post_message)
# WebSocket 端点 (Vue 前端连接此地址)
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
await websocket.accept()
connected_clients.append(websocket)
try:
while True:
await websocket.receive_text()
except WebSocketDisconnect:
if websocket in connected_clients:
connected_clients.remove(websocket)
# HTTP 测试接口
@app.get("/")
async def root():
return {"message": "MCP SSE 服务运行中", "ws_clients": len(connected_clients)}
@app.get("/trigger_alert/{floor}")
async def http_trigger(floor: str):
"""供网页直接测试触发"""
msg = await send_alert(floor)
return msg
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)