Files
solidstate-tools/mcp/system_tools.py
2025-10-15 16:19:41 +08:00

407 lines
14 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# system_tools.py
import posixpath
import stat as pystat
from shlex import shlex
from typing import Any
from contextlib import asynccontextmanager
from collections.abc import AsyncIterator
import asyncssh
from mcp.server.fastmcp import FastMCP, Context
from mcp.server.session import ServerSession
# 在 system_tools.py 顶部添加
def shell_quote(arg: str) -> str:
"""
安全地把字符串作为单个 shell 参数:
- 外层用单引号包裹
- 内部的单引号 ' 替换为 '\'' 序列
适用于远端 Linux shell 命令拼接
"""
return "'" + arg.replace("'", "'\"'\"'") + "'"
# 复用你的配置与数据类
from lifespan import (
SharedAppContext,
REMOTE_HOST,
REMOTE_USER,
PRIVATE_KEY_PATH,
INITIAL_WORKING_DIRECTORY,
)
# —— 1) 定义 FastMCP 的生命周期,在启动时建立 SSH 连接,关闭时断开 ——
@asynccontextmanager
async def system_lifespan(_server: FastMCP) -> AsyncIterator[SharedAppContext]:
"""
FastMCP 生命周期:建立并注入共享的 SSH 连接与沙箱根路径。
说明:这是 MCP 自己的生命周期,工具里通过 ctx.request_context.lifespan_context 访问。
"""
conn: asyncssh.SSHClientConnection | None = None
try:
# 建立 SSH 连接
conn = await asyncssh.connect(
REMOTE_HOST,
username=REMOTE_USER,
client_keys=[PRIVATE_KEY_PATH],
)
# 将类型安全的共享上下文注入 MCP 生命周期
yield SharedAppContext(ssh_connection=conn, sandbox_path=INITIAL_WORKING_DIRECTORY)
finally:
# 关闭连接
if conn:
conn.close()
await conn.wait_closed()
def create_system_mcp() -> FastMCP:
"""创建一个包含系统操作工具的 MCP 实例。"""
system_mcp = FastMCP(
name="System Tools",
instructions="用于在远程服务器上进行基本文件和目录操作的工具集。",
streamable_http_path="/",
lifespan=system_lifespan, # 关键:把生命周期传给 FastMCP [1]
)
def _safe_join(sandbox_root: str, relative_path: str) -> str:
"""
将用户提供的相对路径映射到沙箱根目录内的规范化绝对路径。
- 统一使用 POSIX 语义(远端 Linux)
- 禁止使用以 '/' 开头的绝对路径
- 禁止 '..' 越界,确保最终路径仍在沙箱内
"""
rel = (relative_path or ".").strip()
# 禁止绝对路径,转为相对
if rel.startswith("/"):
rel = rel.lstrip("/")
# 规范化拼接
combined = posixpath.normpath(posixpath.join(sandbox_root, rel))
# 统一尾部斜杠处理,避免边界判断遗漏
root_norm = sandbox_root.rstrip("/")
# 确保仍在沙箱内
if combined != root_norm and not combined.startswith(root_norm + "/"):
raise ValueError("路径越界:仅允许访问沙箱目录内部")
# 禁止路径中出现 '..'(进一步加固)
parts = [p for p in combined.split("/") if p]
if ".." in parts:
raise ValueError("非法路径:不允许使用 '..' 跨目录")
return combined
# —— 重构后的各工具:统一用类型安全的 ctx 与 app_ctx 访问共享资源 ——
@system_mcp.tool()
async def list_files(ctx: Context[ServerSession, SharedAppContext], path: str = ".") -> list[dict[str, Any]]:
"""
列出远程沙箱目录中的文件和子目录(结构化输出)。
Returns:
list[dict]: [{name, path, is_dir, size, permissions, mtime}]
"""
try:
app_ctx = ctx.request_context.lifespan_context
conn = app_ctx.ssh_connection
sandbox_root = app_ctx.sandbox_path
target = _safe_join(sandbox_root, path)
await ctx.info(f"列出目录:{target}")
items: list[dict[str, Any]] = []
async with conn.start_sftp_client() as sftp:
names = await sftp.listdir(target) # list[str]
for name in names:
item_abs = posixpath.join(target, name)
attrs = None
try:
attrs = await sftp.stat(item_abs)
except Exception:
pass
perms = int(attrs.permissions or 0) if attrs else 0
is_dir = bool(pystat.S_ISDIR(perms))
size = int(attrs.size or 0) if attrs and attrs.size is not None else 0
mtime = int(attrs.mtime or 0) if attrs and attrs.mtime is not None else 0
items.append({
"name": name,
"path": posixpath.join(path.rstrip("/"), name) if path != "/" else name,
"is_dir": is_dir,
"size": size,
"permissions": perms,
"mtime": mtime,
})
await ctx.debug(f"目录项数量:{len(items)}")
return items
except FileNotFoundError:
msg = f"目录不存在或不可访问:{path}"
await ctx.error(msg)
return [{"error": msg}]
except Exception as e:
msg = f"list_files 失败:{e}"
await ctx.error(msg)
return [{"error": msg}]
@system_mcp.tool()
async def read_file(ctx: Context[ServerSession, SharedAppContext], file_path: str, encoding: str = "utf-8") -> str:
"""
读取远程沙箱内指定文件内容。
"""
try:
app_ctx = ctx.request_context.lifespan_context
conn = app_ctx.ssh_connection
sandbox_root = app_ctx.sandbox_path
target = _safe_join(sandbox_root, file_path)
await ctx.info(f"读取文件:{target}")
async with conn.start_sftp_client() as sftp:
async with sftp.open(target, "rb") as f:
data = await f.read()
try:
content = data.decode(encoding)
except Exception:
content = data.decode(encoding, errors="replace")
return content
except FileNotFoundError:
msg = f"读取失败:文件不存在 '{file_path}'"
await ctx.error(msg)
return msg
except Exception as e:
msg = f"read_file 失败:{e}"
await ctx.error(msg)
return msg
@system_mcp.tool()
async def write_file(
ctx: Context[ServerSession, SharedAppContext],
file_path: str,
content: str,
encoding: str = "utf-8",
create_parents: bool = True,
) -> dict[str, Any]:
"""
写入远程沙箱文件(默认按需创建父目录)。
"""
try:
app_ctx = ctx.request_context.lifespan_context # 类型化的 SharedAppContext
conn = app_ctx.ssh_connection
sandbox_root = app_ctx.sandbox_path
target = _safe_join(sandbox_root, file_path)
await ctx.info(f"写入文件:{target}")
if create_parents:
parent = posixpath.dirname(target)
if parent and parent != sandbox_root:
await conn.run(f"mkdir -p {shell_quote(parent)}", check=True)
data = content.encode(encoding)
async with conn.start_sftp_client() as sftp:
async with sftp.open(target, "wb") as f:
await f.write(data)
await ctx.debug(f"写入完成:{len(data)} 字节")
return {"path": file_path, "bytes_written": len(data)}
except Exception as e:
msg = f"write_file 失败:{e}"
await ctx.error(msg)
return {"error": msg}
@system_mcp.tool()
async def make_dir(ctx: Context[ServerSession, SharedAppContext], path: str, parents: bool = True) -> str:
try:
app_ctx = ctx.request_context.lifespan_context
conn = app_ctx.ssh_connection
sandbox_root = app_ctx.sandbox_path
target = _safe_join(sandbox_root, path)
if parents:
# 单行命令mkdir -p
await conn.run(f"mkdir -p {shell_quote(target)}", check=True)
else:
async with conn.start_sftp_client() as sftp:
await sftp.mkdir(target)
await ctx.info(f"目录已创建: {target}")
return f"目录已创建: {path}"
except Exception as e:
await ctx.error(f"创建目录失败: {e}")
return f"错误: {e}"
@system_mcp.tool()
async def delete_file(ctx: Context[ServerSession, SharedAppContext], file_path: str) -> str:
"""
删除文件(非目录)。
"""
try:
app_ctx = ctx.request_context.lifespan_context
conn = app_ctx.ssh_connection
sandbox_root = app_ctx.sandbox_path
target = _safe_join(sandbox_root, file_path)
async with conn.start_sftp_client() as sftp:
await sftp.remove(target)
await ctx.info(f"文件已删除: {target}")
return f"文件已删除: {file_path}"
except FileNotFoundError:
msg = f"删除失败:文件不存在 '{file_path}'"
await ctx.error(msg)
return msg
except Exception as e:
await ctx.error(f"删除文件失败: {e}")
return f"错误: {e}"
@system_mcp.tool()
async def delete_dir(
ctx: Context[ServerSession, SharedAppContext],
dir_path: str,
recursive: bool = False,
) -> str:
"""
删除目录。
"""
try:
app_ctx = ctx.request_context.lifespan_context # 类型化的 SharedAppContext
conn = app_ctx.ssh_connection
sandbox_root = app_ctx.sandbox_path
target = _safe_join(sandbox_root, dir_path)
if recursive:
await conn.run(f"rm -rf {shell_quote(target)}", check=True)
else:
async with conn.start_sftp_client() as sftp:
await sftp.rmdir(target)
await ctx.info(f"目录已删除: {target}")
return f"目录已删除: {dir_path}"
except FileNotFoundError:
msg = f"删除失败:目录不存在 '{dir_path}' 或非空"
await ctx.error(msg)
return msg
except Exception as e:
await ctx.error(f"删除目录失败: {e}")
return f"错误: {e}"
@system_mcp.tool()
async def move_path(ctx: Context[ServerSession, SharedAppContext], src: str, dst: str,
overwrite: bool = True) -> str:
try:
app_ctx = ctx.request_context.lifespan_context
conn = app_ctx.ssh_connection
sandbox_root = app_ctx.sandbox_path
src_abs = _safe_join(sandbox_root, src)
dst_abs = _safe_join(sandbox_root, dst)
flag = "-f" if overwrite else ""
# 单行命令mv
cmd = f"mv {flag} {shell_quote(src_abs)} {shell_quote(dst_abs)}".strip()
await conn.run(cmd, check=True)
await ctx.info(f"已移动: {src_abs} -> {dst_abs}")
return f"已移动: {src} -> {dst}"
except FileNotFoundError:
msg = f"移动失败:源不存在 '{src}'"
await ctx.error(msg)
return msg
except Exception as e:
await ctx.error(f"移动失败: {e}")
return f"错误: {e}"
@system_mcp.tool()
async def copy_path(
ctx: Context[ServerSession, SharedAppContext],
src: str,
dst: str,
recursive: bool = True,
overwrite: bool = True,
) -> str:
try:
app_ctx = ctx.request_context.lifespan_context
conn = app_ctx.ssh_connection
sandbox_root = app_ctx.sandbox_path
src_abs = _safe_join(sandbox_root, src)
dst_abs = _safe_join(sandbox_root, dst)
flags = []
if recursive:
flags.append("-r")
if overwrite:
flags.append("-f")
# 单行命令cp
cmd = " ".join(["cp"] + flags + [shell_quote(src_abs), shell_quote(dst_abs)])
await conn.run(cmd, check=True)
await ctx.info(f"已复制: {src_abs} -> {dst_abs}")
return f"已复制: {src} -> {dst}"
except FileNotFoundError:
msg = f"复制失败:源不存在 '{src}'"
await ctx.error(msg)
return msg
except Exception as e:
await ctx.error(f"复制失败: {e}")
return f"错误: {e}"
@system_mcp.tool()
async def exists(ctx: Context[ServerSession, SharedAppContext], path: str) -> bool:
"""
判断路径(文件/目录)是否存在。
"""
try:
app_ctx = ctx.request_context.lifespan_context
conn = app_ctx.ssh_connection
sandbox_root = app_ctx.sandbox_path
target = _safe_join(sandbox_root, path)
async with conn.start_sftp_client() as sftp:
await sftp.stat(target)
return True
except FileNotFoundError:
return False
except Exception as e:
await ctx.error(f"exists 检查失败: {e}")
return False
@system_mcp.tool()
async def stat_path(ctx: Context[ServerSession, SharedAppContext], path: str) -> dict:
"""
查看远程路径属性(结构化输出)。
"""
try:
app_ctx = ctx.request_context.lifespan_context
conn = app_ctx.ssh_connection
sandbox_root = app_ctx.sandbox_path
target = _safe_join(sandbox_root, path)
async with conn.start_sftp_client() as sftp:
attrs = await sftp.stat(target)
perms = attrs.permissions or 0
return {
"path": target,
"size": int(attrs.size or 0),
"is_dir": bool(pystat.S_ISDIR(perms)),
"permissions": perms, # 九进制权限位,例:0o755
"mtime": int(attrs.mtime or 0), # 秒
}
except FileNotFoundError:
msg = f"路径不存在: {path}"
await ctx.error(msg)
return {"error": msg}
except Exception as e:
await ctx.error(f"stat 失败: {e}")
return {"error": str(e)}
return system_mcp