# system_tools.py import posixpath import stat as pystat from shlex import shlex from typing import Any from contextlib import asynccontextmanager from collections.abc import AsyncIterator import asyncssh from mcp.server.fastmcp import FastMCP, Context from mcp.server.session import ServerSession # 在 system_tools.py 顶部添加 def shell_quote(arg: str) -> str: """ 安全地把字符串作为单个 shell 参数: - 外层用单引号包裹 - 内部的单引号 ' 替换为 '\'' 序列 适用于远端 Linux shell 命令拼接 """ return "'" + arg.replace("'", "'\"'\"'") + "'" # 复用你的配置与数据类 from lifespan import ( SharedAppContext, REMOTE_HOST, REMOTE_USER, PRIVATE_KEY_PATH, INITIAL_WORKING_DIRECTORY, ) # —— 1) 定义 FastMCP 的生命周期,在启动时建立 SSH 连接,关闭时断开 —— @asynccontextmanager async def system_lifespan(_server: FastMCP) -> AsyncIterator[SharedAppContext]: """ FastMCP 生命周期:建立并注入共享的 SSH 连接与沙箱根路径。 说明:这是 MCP 自己的生命周期,工具里通过 ctx.request_context.lifespan_context 访问。 """ conn: asyncssh.SSHClientConnection | None = None try: # 建立 SSH 连接 conn = await asyncssh.connect( REMOTE_HOST, username=REMOTE_USER, client_keys=[PRIVATE_KEY_PATH], ) # 将类型安全的共享上下文注入 MCP 生命周期 yield SharedAppContext(ssh_connection=conn, sandbox_path=INITIAL_WORKING_DIRECTORY) finally: # 关闭连接 if conn: conn.close() await conn.wait_closed() def create_system_mcp() -> FastMCP: """创建一个包含系统操作工具的 MCP 实例。""" system_mcp = FastMCP( name="System Tools", instructions="用于在远程服务器上进行基本文件和目录操作的工具集。", streamable_http_path="/", lifespan=system_lifespan, # 关键:把生命周期传给 FastMCP [1] ) def _safe_join(sandbox_root: str, relative_path: str) -> str: """ 将用户提供的相对路径映射到沙箱根目录内的规范化绝对路径。 - 统一使用 POSIX 语义(远端 Linux) - 禁止使用以 '/' 开头的绝对路径 - 禁止 '..' 越界,确保最终路径仍在沙箱内 """ rel = (relative_path or ".").strip() # 禁止绝对路径,转为相对 if rel.startswith("/"): rel = rel.lstrip("/") # 规范化拼接 combined = posixpath.normpath(posixpath.join(sandbox_root, rel)) # 统一尾部斜杠处理,避免边界判断遗漏 root_norm = sandbox_root.rstrip("/") # 确保仍在沙箱内 if combined != root_norm and not combined.startswith(root_norm + "/"): raise ValueError("路径越界:仅允许访问沙箱目录内部") # 禁止路径中出现 '..'(进一步加固) parts = [p for p in combined.split("/") if p] if ".." in parts: raise ValueError("非法路径:不允许使用 '..' 跨目录") return combined # —— 重构后的各工具:统一用类型安全的 ctx 与 app_ctx 访问共享资源 —— @system_mcp.tool() async def list_files(ctx: Context[ServerSession, SharedAppContext], path: str = ".") -> list[dict[str, Any]]: """ 列出远程沙箱目录中的文件和子目录(结构化输出)。 Returns: list[dict]: [{name, path, is_dir, size, permissions, mtime}] """ try: app_ctx = ctx.request_context.lifespan_context conn = app_ctx.ssh_connection sandbox_root = app_ctx.sandbox_path target = _safe_join(sandbox_root, path) await ctx.info(f"列出目录:{target}") items: list[dict[str, Any]] = [] async with conn.start_sftp_client() as sftp: names = await sftp.listdir(target) # list[str] for name in names: item_abs = posixpath.join(target, name) attrs = None try: attrs = await sftp.stat(item_abs) except Exception: pass perms = int(attrs.permissions or 0) if attrs else 0 is_dir = bool(pystat.S_ISDIR(perms)) size = int(attrs.size or 0) if attrs and attrs.size is not None else 0 mtime = int(attrs.mtime or 0) if attrs and attrs.mtime is not None else 0 items.append({ "name": name, "path": posixpath.join(path.rstrip("/"), name) if path != "/" else name, "is_dir": is_dir, "size": size, "permissions": perms, "mtime": mtime, }) await ctx.debug(f"目录项数量:{len(items)}") return items except FileNotFoundError: msg = f"目录不存在或不可访问:{path}" await ctx.error(msg) return [{"error": msg}] except Exception as e: msg = f"list_files 失败:{e}" await ctx.error(msg) return [{"error": msg}] @system_mcp.tool() async def read_file(ctx: Context[ServerSession, SharedAppContext], file_path: str, encoding: str = "utf-8") -> str: """ 读取远程沙箱内指定文件内容。 """ try: app_ctx = ctx.request_context.lifespan_context conn = app_ctx.ssh_connection sandbox_root = app_ctx.sandbox_path target = _safe_join(sandbox_root, file_path) await ctx.info(f"读取文件:{target}") async with conn.start_sftp_client() as sftp: async with sftp.open(target, "rb") as f: data = await f.read() try: content = data.decode(encoding) except Exception: content = data.decode(encoding, errors="replace") return content except FileNotFoundError: msg = f"读取失败:文件不存在 '{file_path}'" await ctx.error(msg) return msg except Exception as e: msg = f"read_file 失败:{e}" await ctx.error(msg) return msg @system_mcp.tool() async def write_file( ctx: Context[ServerSession, dict], file_path: str, content: str, encoding: str = "utf-8", create_parents: bool = True, ) -> dict[str, Any]: """ 写入远程沙箱文件(默认按需创建父目录)。 """ try: shared_ctx: SharedAppContext = ctx.request_context.lifespan_context["shared_context"] conn = shared_ctx.ssh_connection sandbox_root = shared_ctx.sandbox_path target = _safe_join(sandbox_root, file_path) await ctx.info(f"写入文件:{target}") if create_parents: parent = posixpath.dirname(target) if parent and parent != sandbox_root: await conn.run(f"mkdir -p {shell_quote(parent)}", check=True) data = content.encode(encoding) async with conn.start_sftp_client() as sftp: async with sftp.open(target, "wb") as f: await f.write(data) await ctx.debug(f"写入完成:{len(data)} 字节") return {"path": file_path, "bytes_written": len(data)} except Exception as e: msg = f"write_file 失败:{e}" await ctx.error(msg) return {"error": msg} @system_mcp.tool() async def make_dir(ctx: Context[ServerSession, SharedAppContext], path: str, parents: bool = True) -> str: try: app_ctx = ctx.request_context.lifespan_context conn = app_ctx.ssh_connection sandbox_root = app_ctx.sandbox_path target = _safe_join(sandbox_root, path) if parents: # 单行命令:mkdir -p await conn.run(f"mkdir -p {shell_quote(target)}", check=True) else: async with conn.start_sftp_client() as sftp: await sftp.mkdir(target) await ctx.info(f"目录已创建: {target}") return f"目录已创建: {path}" except Exception as e: await ctx.error(f"创建目录失败: {e}") return f"错误: {e}" @system_mcp.tool() async def delete_file(ctx: Context[ServerSession, SharedAppContext], file_path: str) -> str: """ 删除文件(非目录)。 """ try: app_ctx = ctx.request_context.lifespan_context conn = app_ctx.ssh_connection sandbox_root = app_ctx.sandbox_path target = _safe_join(sandbox_root, file_path) async with conn.start_sftp_client() as sftp: await sftp.remove(target) await ctx.info(f"文件已删除: {target}") return f"文件已删除: {file_path}" except FileNotFoundError: msg = f"删除失败:文件不存在 '{file_path}'" await ctx.error(msg) return msg except Exception as e: await ctx.error(f"删除文件失败: {e}") return f"错误: {e}" @system_mcp.tool() async def delete_dir(ctx: Context[ServerSession, dict], dir_path: str, recursive: bool = False) -> str: """ 删除目录。 """ try: shared_ctx: SharedAppContext = ctx.request_context.lifespan_context["shared_context"] conn = shared_ctx.ssh_connection sandbox_root = shared_ctx.sandbox_path target = _safe_join(sandbox_root, dir_path) if recursive: await conn.run(f"rm -rf {shell_quote(target)}", check=True) else: async with conn.start_sftp_client() as sftp: await sftp.rmdir(target) await ctx.info(f"目录已删除: {target}") return f"目录已删除: {dir_path}" except FileNotFoundError: msg = f"删除失败:目录不存在 '{dir_path}' 或非空" await ctx.error(msg) return msg except Exception as e: await ctx.error(f"删除目录失败: {e}") return f"错误: {e}" @system_mcp.tool() async def move_path(ctx: Context[ServerSession, SharedAppContext], src: str, dst: str, overwrite: bool = True) -> str: try: app_ctx = ctx.request_context.lifespan_context conn = app_ctx.ssh_connection sandbox_root = app_ctx.sandbox_path src_abs = _safe_join(sandbox_root, src) dst_abs = _safe_join(sandbox_root, dst) flag = "-f" if overwrite else "" # 单行命令:mv cmd = f"mv {flag} {shell_quote(src_abs)} {shell_quote(dst_abs)}".strip() await conn.run(cmd, check=True) await ctx.info(f"已移动: {src_abs} -> {dst_abs}") return f"已移动: {src} -> {dst}" except FileNotFoundError: msg = f"移动失败:源不存在 '{src}'" await ctx.error(msg) return msg except Exception as e: await ctx.error(f"移动失败: {e}") return f"错误: {e}" @system_mcp.tool() async def copy_path( ctx: Context[ServerSession, SharedAppContext], src: str, dst: str, recursive: bool = True, overwrite: bool = True, ) -> str: try: app_ctx = ctx.request_context.lifespan_context conn = app_ctx.ssh_connection sandbox_root = app_ctx.sandbox_path src_abs = _safe_join(sandbox_root, src) dst_abs = _safe_join(sandbox_root, dst) flags = [] if recursive: flags.append("-r") if overwrite: flags.append("-f") # 单行命令:cp cmd = " ".join(["cp"] + flags + [shell_quote(src_abs), shell_quote(dst_abs)]) await conn.run(cmd, check=True) await ctx.info(f"已复制: {src_abs} -> {dst_abs}") return f"已复制: {src} -> {dst}" except FileNotFoundError: msg = f"复制失败:源不存在 '{src}'" await ctx.error(msg) return msg except Exception as e: await ctx.error(f"复制失败: {e}") return f"错误: {e}" @system_mcp.tool() async def exists(ctx: Context[ServerSession, SharedAppContext], path: str) -> bool: """ 判断路径(文件/目录)是否存在。 """ try: app_ctx = ctx.request_context.lifespan_context conn = app_ctx.ssh_connection sandbox_root = app_ctx.sandbox_path target = _safe_join(sandbox_root, path) async with conn.start_sftp_client() as sftp: await sftp.stat(target) return True except FileNotFoundError: return False except Exception as e: await ctx.error(f"exists 检查失败: {e}") return False @system_mcp.tool() async def stat_path(ctx: Context[ServerSession, SharedAppContext], path: str) -> dict: """ 查看远程路径属性(结构化输出)。 """ try: app_ctx = ctx.request_context.lifespan_context conn = app_ctx.ssh_connection sandbox_root = app_ctx.sandbox_path target = _safe_join(sandbox_root, path) async with conn.start_sftp_client() as sftp: attrs = await sftp.stat(target) perms = attrs.permissions or 0 return { "path": target, "size": int(attrs.size or 0), "is_dir": bool(pystat.S_ISDIR(perms)), "permissions": perms, # 九进制权限位,例:0o755 "mtime": int(attrs.mtime or 0), # 秒 } except FileNotFoundError: msg = f"路径不存在: {path}" await ctx.error(msg) return {"error": msg} except Exception as e: await ctx.error(f"stat 失败: {e}") return {"error": str(e)} return system_mcp