From b19352382b4fe92a0b3164ac98b117dc8b3f9a7b Mon Sep 17 00:00:00 2001
From: koko <1429659362@qq.com>
Date: Mon, 13 Oct 2025 15:37:31 +0800
Subject: [PATCH] mcp-python
---
.idea/misc.xml | 2 +-
.idea/solidstate-tools.iml | 2 +-
mcp/main.py | 15 +-
mcp/system_tools.py | 438 +++++++++++++++++++++++++++++++------
mcp/uvicorn/main.py | 10 +
5 files changed, 387 insertions(+), 80 deletions(-)
create mode 100644 mcp/uvicorn/main.py
diff --git a/.idea/misc.xml b/.idea/misc.xml
index 06ede0d..419adf3 100644
--- a/.idea/misc.xml
+++ b/.idea/misc.xml
@@ -3,5 +3,5 @@
-
+
\ No newline at end of file
diff --git a/.idea/solidstate-tools.iml b/.idea/solidstate-tools.iml
index 909438d..8073316 100644
--- a/.idea/solidstate-tools.iml
+++ b/.idea/solidstate-tools.iml
@@ -2,7 +2,7 @@
-
+
\ No newline at end of file
diff --git a/mcp/main.py b/mcp/main.py
index 16e67e7..7321ad2 100644
--- a/mcp/main.py
+++ b/mcp/main.py
@@ -8,21 +8,24 @@ from system_tools import create_system_mcp # 如果暂时不用可先不挂
# 先创建 MCP 实例
test_mcp = create_test_mcp()
-# system_mcp = create_system_mcp()
+system_mcp = create_system_mcp()
# 关键:在 Starlette 的 lifespan 中启动 MCP 的 session manager
@contextlib.asynccontextmanager
async def lifespan(app: Starlette):
async with contextlib.AsyncExitStack() as stack:
- await stack.enter_async_context(test_mcp.session_manager.run())
- # await stack.enter_async_context(system_mcp.session_manager.run())
+ # await stack.enter_async_context(test_mcp.session_manager.run())
+ await stack.enter_async_context(system_mcp.session_manager.run())
yield # 服务器运行期间
# 退出时自动清理
app = Starlette(
lifespan=lifespan,
routes=[
- Mount("/test", app=test_mcp.streamable_http_app()),
- # Mount("/system", app=system_mcp.streamable_http_app()),
+ # Mount("/test", app=test_mcp.streamable_http_app()),
+ Mount("/system", app=system_mcp.streamable_http_app()),
],
-)
\ No newline at end of file
+)
+
+# 启动代码为uvicorn main:app --host 0.0.0.0 --port 8000
+# url为http://localhost:8000/system
\ No newline at end of file
diff --git a/mcp/system_tools.py b/mcp/system_tools.py
index d453d4c..bad854a 100644
--- a/mcp/system_tools.py
+++ b/mcp/system_tools.py
@@ -1,10 +1,13 @@
# system_tools.py
import os
+import posixpath
+import stat as pystat
+from typing import Any
+
import asyncssh
from mcp.server.fastmcp import FastMCP, Context
from mcp.server.session import ServerSession
-from lifespan import SharedAppContext # 导入共享上下文的类型定义
-
+from lifespan import SharedAppContext # 保持你的现有导入
def create_system_mcp() -> FastMCP:
"""创建一个包含系统操作工具的 MCP 实例。"""
@@ -16,99 +19,390 @@ def create_system_mcp() -> FastMCP:
streamable_http_path="/"
)
+ def _safe_join(sandbox_root: str, relative_path: str) -> str:
+ """
+ 将用户提供的相对路径映射到沙箱根目录内的规范化绝对路径。
+ - 统一使用 POSIX 语义(远端 Linux)
+ - 禁止使用以 '/' 开头的绝对路径
+ - 禁止 '..' 越界,确保最终路径仍在沙箱内
+ """
+ rel = (relative_path or ".").strip()
+
+ # 禁止绝对路径,转为相对
+ if rel.startswith("/"):
+ rel = rel.lstrip("/")
+
+ # 规范化拼接
+ combined = posixpath.normpath(posixpath.join(sandbox_root, rel))
+
+ # 统一尾部斜杠处理,避免边界判断遗漏
+ root_norm = sandbox_root.rstrip("/")
+
+ # 确保仍在沙箱内
+ if combined != root_norm and not combined.startswith(root_norm + "/"):
+ raise ValueError("路径越界:仅允许访问沙箱目录内部")
+
+ # 禁止路径中出现 '..'(进一步加固)
+ parts = [p for p in combined.split("/") if p]
+ if ".." in parts:
+ raise ValueError("非法路径:不允许使用 '..' 跨目录")
+
+ return combined
+
+ # —— 重构后的三个工具:list_files / read_file / write_file ——
@system_mcp.tool()
- async def list_files(ctx: Context[ServerSession, dict], path: str = ".") -> str:
+ async def list_files(ctx: Context[ServerSession, dict], path: str = ".") -> list[dict[str, Any]]:
"""
- 列出远程服务器安全沙箱路径下的文件和目录。
- Args:
- path: 要查看的相对路径,默认为当前沙箱目录。
- """
- try:
- # 从上下文中获取共享的 SSH 连接和沙箱路径 [1]
- shared_ctx: SharedAppContext = ctx.request_context.lifespan_context["shared_context"]
-
- conn = shared_ctx.ssh_connection
- sandbox_path = shared_ctx.sandbox_path
-
- # 安全检查:防止目录穿越攻击
- if ".." in path.split('/'):
- return "错误: 不允许使用 '..' 访问上级目录。"
-
- # 安全地拼接路径
- target_path = os.path.join(sandbox_path, path)
- await ctx.info(f"正在列出安全路径: {target_path}")
-
- # 使用 conn.run() 执行 ls -l 命令
- command_to_run = f'ls -la {conn.escape(target_path)}'
- result = await conn.run(command_to_run, check=True)
- return result.stdout.strip()
- except asyncssh.ProcessError as e:
- error_message = f"命令执行失败: {e.stderr.strip()}"
- await ctx.error(error_message)
- return error_message
- except Exception as e:
- error_message = f"执行 ls 命令时发生未知错误: {e}"
- await ctx.error(error_message)
- return error_message
-
- @system_mcp.tool()
- async def read_file(ctx: Context[ServerSession, dict], file_path: str) -> str:
- """
- 读取远程服务器安全沙箱内指定文件的内容。
- Args:
- file_path: 相对于沙箱目录的文件路径。
+ 列出远程沙箱目录中的文件和子目录(结构化输出)。
+ Returns:
+ list[dict]: [{name, path, is_dir, size, permissions, mtime}]
"""
try:
shared_ctx: SharedAppContext = ctx.request_context.lifespan_context["shared_context"]
-
conn = shared_ctx.ssh_connection
- sandbox_path = shared_ctx.sandbox_path
- # 安全检查:防止目录穿越
- if ".." in file_path.split('/'):
- return "错误: 不允许使用 '..' 访问上级目录。"
+ sandbox_root = shared_ctx.sandbox_path
- target_path = os.path.join(sandbox_path, file_path)
- await ctx.info(f"正在读取远程文件: {target_path}")
+ target = _safe_join(sandbox_root, path)
+ await ctx.info(f"列出目录:{target}")
+ items: list[dict[str, Any]] = []
async with conn.start_sftp_client() as sftp:
- async with sftp.open(target_path, 'r') as f:
- content = await f.read()
- return content
+ # 先列出文件名,再逐个 stat 获取属性
+ names = await sftp.listdir(target) # list[str]
+ for name in names:
+ item_abs = posixpath.join(target, name)
+ attrs = None
+ try:
+ attrs = await sftp.stat(item_abs)
+ except Exception:
+ pass
+
+ perms = int(attrs.permissions or 0) if attrs else 0
+ is_dir = bool(pystat.S_ISDIR(perms))
+ size = int(attrs.size or 0) if attrs and attrs.size is not None else 0
+ mtime = int(attrs.mtime or 0) if attrs and attrs.mtime is not None else 0
+
+ items.append({
+ "name": name,
+ # 返回相对路径,便于后续工具继续在沙箱内操作
+ "path": posixpath.join(path.rstrip("/"), name) if path != "/" else name,
+ "is_dir": is_dir,
+ "size": size,
+ "permissions": perms, # 九进制权限位,例如 0o755(以整型传递)
+ "mtime": mtime, # 秒
+ })
+
+ await ctx.debug(f"目录项数量:{len(items)}")
+ return items
+
except FileNotFoundError:
- error_message = f"读取文件失败: 文件 '{file_path}' 不存在。"
- await ctx.error(error_message)
- return error_message
+ msg = f"目录不存在或不可访问:{path}"
+ await ctx.error(msg)
+ return [{"error": msg}]
except Exception as e:
- await ctx.error(f"读取文件时发生未知错误: {e}")
- return f"错误: {e}"
+ msg = f"list_files 失败:{e}"
+ await ctx.error(msg)
+ return [{"error": msg}]
@system_mcp.tool()
- async def write_file(ctx: Context[ServerSession, dict], file_path: str, content: str) -> str:
+ async def read_file(ctx: Context[ServerSession, dict], file_path: str, encoding: str = "utf-8") -> str:
"""
- 向远程服务器安全沙箱内的文件写入内容。如果文件已存在,则会覆盖它。
+ 读取远程沙箱内指定文件内容。
Args:
- file_path: 相对于沙箱目录的文件路径。
- content: 要写入文件的文本内容。
+ file_path: 相对于沙箱根的文件路径
+ encoding: 文本编码(默认 utf-8)
+ Returns:
+ 文件文本内容(字符串)
"""
try:
shared_ctx: SharedAppContext = ctx.request_context.lifespan_context["shared_context"]
-
conn = shared_ctx.ssh_connection
- sandbox_path = shared_ctx.sandbox_path
+ sandbox_root = shared_ctx.sandbox_path
- # 安全检查
- if ".." in file_path.split('/'):
- return "错误: 不允许使用 '..' 访问上级目录。"
-
- target_path = os.path.join(sandbox_path, file_path)
- await ctx.info(f"正在向远程文件写入: {target_path}")
+ target = _safe_join(sandbox_root, file_path)
+ await ctx.info(f"读取文件:{target}")
async with conn.start_sftp_client() as sftp:
- async with sftp.open(target_path, 'w') as f:
- await f.write(content)
- return f"内容已成功写入到沙箱文件 '{file_path}'。"
+ # 以二进制读取,按 encoding 解码为文本
+ async with sftp.open(target, "rb") as f:
+ data = await f.read()
+ try:
+ content = data.decode(encoding)
+ except Exception:
+ # 如果解码失败,回退为原始字节的 repr
+ content = data.decode(encoding, errors="replace")
+
+ return content
+
+ except FileNotFoundError:
+ msg = f"读取失败:文件不存在 '{file_path}'"
+ await ctx.error(msg)
+ return msg
except Exception as e:
- await ctx.error(f"写入文件时发生未知错误: {e}")
+ msg = f"read_file 失败:{e}"
+ await ctx.error(msg)
+ return msg
+
+ @system_mcp.tool()
+ async def write_file(
+ ctx: Context[ServerSession, dict],
+ file_path: str,
+ content: str,
+ encoding: str = "utf-8",
+ create_parents: bool = True,
+ ) -> dict[str, Any]:
+ """
+ 写入远程沙箱文件(默认按需创建父目录)。
+ Args:
+ file_path: 相对于沙箱根的文件路径
+ content: 要写入的文本内容
+ encoding: 写入编码(默认 utf-8)
+ create_parents: True 则自动创建父目录
+ Returns:
+ dict: { path, bytes_written }
+ """
+ try:
+ shared_ctx: SharedAppContext = ctx.request_context.lifespan_context["shared_context"]
+ conn = shared_ctx.ssh_connection
+ sandbox_root = shared_ctx.sandbox_path
+
+ target = _safe_join(sandbox_root, file_path)
+ await ctx.info(f"写入文件:{target}")
+
+ # 可选:创建父目录
+ if create_parents:
+ parent = posixpath.dirname(target)
+ if parent and parent != sandbox_root:
+ # mkdir -p,更快且不易出错
+ await conn.run(f"mkdir -p {conn.escape(parent)}", check=True)
+
+ data = content.encode(encoding)
+
+ async with conn.start_sftp_client() as sftp:
+ async with sftp.open(target, "wb") as f:
+ await f.write(data)
+
+ await ctx.debug(f"写入完成:{len(data)} 字节")
+ return {"path": file_path, "bytes_written": len(data)}
+
+ except Exception as e:
+ msg = f"write_file 失败:{e}"
+ await ctx.error(msg)
+ return {"error": msg}
+
+
+
+
+ @system_mcp.tool()
+ async def make_dir(ctx: Context[ServerSession, dict], path: str, parents: bool = True) -> str:
+ """
+ 创建目录(默认支持递归创建父级 -p)。
+ Args:
+ path: 相对于沙箱根的目录路径
+ parents: True 表示使用 `mkdir -p`,False 则只创建最后一级
+ """
+ try:
+ shared_ctx: SharedAppContext = ctx.request_context.lifespan_context["shared_context"]
+ conn = shared_ctx.ssh_connection
+ sandbox_root = shared_ctx.sandbox_path
+
+ target = _safe_join(sandbox_root, path)
+ if parents:
+ await conn.run(f"mkdir -p {conn.escape(target)}", check=True)
+ else:
+ async with conn.start_sftp_client() as sftp:
+ await sftp.mkdir(target)
+
+ await ctx.info(f"目录已创建: {target}")
+ return f"目录已创建: {path}"
+ except Exception as e:
+ await ctx.error(f"创建目录失败: {e}")
return f"错误: {e}"
+ @system_mcp.tool()
+ async def delete_file(ctx: Context[ServerSession, dict], file_path: str) -> str:
+ """
+ 删除文件(非目录)。
+ Args:
+ file_path: 相对于沙箱根的文件路径
+ """
+ try:
+ shared_ctx: SharedAppContext = ctx.request_context.lifespan_context["shared_context"]
+ conn = shared_ctx.ssh_connection
+ sandbox_root = shared_ctx.sandbox_path
+
+ target = _safe_join(sandbox_root, file_path)
+ async with conn.start_sftp_client() as sftp:
+ await sftp.remove(target)
+
+ await ctx.info(f"文件已删除: {target}")
+ return f"文件已删除: {file_path}"
+ except FileNotFoundError:
+ msg = f"删除失败:文件不存在 '{file_path}'"
+ await ctx.error(msg)
+ return msg
+ except Exception as e:
+ await ctx.error(f"删除文件失败: {e}")
+ return f"错误: {e}"
+
+ @system_mcp.tool()
+ async def delete_dir(ctx: Context[ServerSession, dict], dir_path: str, recursive: bool = False) -> str:
+ """
+ 删除目录。
+ Args:
+ dir_path: 相对于沙箱根的目录路径
+ recursive: True 则递归删除(rm -rf),False 仅删除空目录(rmdir)
+ """
+ try:
+ shared_ctx: SharedAppContext = ctx.request_context.lifespan_context["shared_context"]
+ conn = shared_ctx.ssh_connection
+ sandbox_root = shared_ctx.sandbox_path
+
+ target = _safe_join(sandbox_root, dir_path)
+
+ if recursive:
+ await conn.run(f"rm -rf {conn.escape(target)}", check=True)
+ else:
+ async with conn.start_sftp_client() as sftp:
+ await sftp.rmdir(target)
+
+ await ctx.info(f"目录已删除: {target}")
+ return f"目录已删除: {dir_path}"
+ except FileNotFoundError:
+ msg = f"删除失败:目录不存在 '{dir_path}' 或非空"
+ await ctx.error(msg)
+ return msg
+ except Exception as e:
+ await ctx.error(f"删除目录失败: {e}")
+ return f"错误: {e}"
+
+ @system_mcp.tool()
+ async def move_path(ctx: Context[ServerSession, dict], src: str, dst: str, overwrite: bool = True) -> str:
+ """
+ 移动/重命名文件或目录。
+ Args:
+ src: 源路径(相对于沙箱根)
+ dst: 目标路径(相对于沙箱根)
+ overwrite: True 使用 mv -f 覆盖同名目标
+ """
+ try:
+ shared_ctx: SharedAppContext = ctx.request_context.lifespan_context["shared_context"]
+ conn = shared_ctx.ssh_connection
+ sandbox_root = shared_ctx.sandbox_path
+
+ src_abs = _safe_join(sandbox_root, src)
+ dst_abs = _safe_join(sandbox_root, dst)
+
+ flags = "-f" if overwrite else ""
+ await conn.run(f"mv {flags} {conn.escape(src_abs)} {conn.escape(dst_abs)}", check=True)
+
+ await ctx.info(f"已移动: {src_abs} -> {dst_abs}")
+ return f"已移动: {src} -> {dst}"
+ except FileNotFoundError:
+ msg = f"移动失败:源不存在 '{src}'"
+ await ctx.error(msg)
+ return msg
+ except Exception as e:
+ await ctx.error(f"移动失败: {e}")
+ return f"错误: {e}"
+
+ @system_mcp.tool()
+ async def copy_path(
+ ctx: Context[ServerSession, dict],
+ src: str,
+ dst: str,
+ recursive: bool = True,
+ overwrite: bool = True,
+ ) -> str:
+ """
+ 复制文件或目录。
+ Args:
+ src: 源路径(相对于沙箱根)
+ dst: 目标路径(相对于沙箱根)
+ recursive: True 则使用 -r 递归复制目录
+ overwrite: True 使用 -f 覆盖同名目标
+ """
+ try:
+ shared_ctx: SharedAppContext = ctx.request_context.lifespan_context["shared_context"]
+ conn = shared_ctx.ssh_connection
+ sandbox_root = shared_ctx.sandbox_path
+
+ src_abs = _safe_join(sandbox_root, src)
+ dst_abs = _safe_join(sandbox_root, dst)
+
+ flags = []
+ if recursive:
+ flags.append("-r")
+ if overwrite:
+ flags.append("-f")
+
+ await conn.run(f"cp {' '.join(flags)} {conn.escape(src_abs)} {conn.escape(dst_abs)}", check=True)
+
+ await ctx.info(f"已复制: {src_abs} -> {dst_abs}")
+ return f"已复制: {src} -> {dst}"
+ except FileNotFoundError:
+ msg = f"复制失败:源不存在 '{src}'"
+ await ctx.error(msg)
+ return msg
+ except Exception as e:
+ await ctx.error(f"复制失败: {e}")
+ return f"错误: {e}"
+
+ @system_mcp.tool()
+ async def exists(ctx: Context[ServerSession, dict], path: str) -> bool:
+ """
+ 判断路径(文件/目录)是否存在。
+ Args:
+ path: 相对于沙箱根的路径
+ Returns:
+ True/False
+ """
+ try:
+ shared_ctx: SharedAppContext = ctx.request_context.lifespan_context["shared_context"]
+ conn = shared_ctx.ssh_connection
+ sandbox_root = shared_ctx.sandbox_path
+
+ target = _safe_join(sandbox_root, path)
+ async with conn.start_sftp_client() as sftp:
+ await sftp.stat(target)
+ return True
+ except FileNotFoundError:
+ return False
+ except Exception as e:
+ await ctx.error(f"exists 检查失败: {e}")
+ return False
+
+ @system_mcp.tool()
+ async def stat_path(ctx: Context[ServerSession, dict], path: str) -> dict:
+ """
+ 查看远程路径属性(结构化输出)。
+ Args:
+ path: 相对于沙箱根的路径
+ Returns:
+ dict: { path, size, is_dir, permissions, mtime }
+ """
+ try:
+ shared_ctx: SharedAppContext = ctx.request_context.lifespan_context["shared_context"]
+ conn = shared_ctx.ssh_connection
+ sandbox_root = shared_ctx.sandbox_path
+
+ target = _safe_join(sandbox_root, path)
+ async with conn.start_sftp_client() as sftp:
+ attrs = await sftp.stat(target)
+
+ perms = attrs.permissions or 0
+ return {
+ "path": target,
+ "size": int(attrs.size or 0),
+ "is_dir": bool(pystat.S_ISDIR(perms)),
+ "permissions": perms, # 九进制权限位,例:0o755
+ "mtime": int(attrs.mtime or 0), # 秒
+ }
+ except FileNotFoundError:
+ msg = f"路径不存在: {path}"
+ await ctx.error(msg)
+ return {"error": msg}
+ except Exception as e:
+ await ctx.error(f"stat 失败: {e}")
+ return {"error": str(e)}
return system_mcp
diff --git a/mcp/uvicorn/main.py b/mcp/uvicorn/main.py
new file mode 100644
index 0000000..bd0bab3
--- /dev/null
+++ b/mcp/uvicorn/main.py
@@ -0,0 +1,10 @@
+
+# main.py
+
+from fastapi import FastAPI
+
+app = FastAPI()
+
+@app.get("/")
+async def read_root():
+ return {"message": "Hello, World!"}
\ No newline at end of file