import asyncssh import os from contextlib import asynccontextmanager from collections.abc import AsyncIterator from dataclasses import dataclass from mcp.server.fastmcp import FastMCP, Context from mcp.server.session import ServerSession # --- 1. 使用您提供的参数 --- REMOTE_HOST = '202.121.182.208' REMOTE_USER = 'koko125' PRIVATE_KEY_PATH = 'D:/tool/tool/id_rsa.txt' # Windows 路径建议使用 / INITIAL_WORKING_DIRECTORY = '/cluster/home/koko125/sandbox' # --- 2. 生命周期管理部分 (高级功能) --- @dataclass class AppContext: """用于在服务器生命周期内持有共享资源的上下文对象""" ssh_connection: asyncssh.SSHClientConnection current_path: str @asynccontextmanager async def app_lifespan(server: FastMCP) -> AsyncIterator[AppContext]: """在服务器启动时建立SSH连接,在关闭时断开。""" print("服务器启动中,正在建立SSH连接...") conn = None try: conn = await asyncssh.connect( REMOTE_HOST, username=REMOTE_USER, client_keys=[PRIVATE_KEY_PATH] ) print("SSH 连接成功!") # <<< 在登录时执行 source bashrc 命令 >>> # 注意:bashrc 的效果(如环境变量)只会对这一个 conn.run() 调用生效。 # 对于非交互式shell,更好的做法是把需要执行的命令串联起来。 # 但如果只是为了加载路径等,可以在后续命令中体现。 print("正在加载 .bashrc...") # 为了让 bashrc 生效,我们需要在一个交互式 shell 中执行命令 await conn.run('source /cluster/home/koko125/.bashrc', check=True) print(".bashrc 加载完成。") print(f"初始工作目录设置为: {INITIAL_WORKING_DIRECTORY}") yield AppContext(ssh_connection=conn, current_path=INITIAL_WORKING_DIRECTORY) finally: if conn: conn.close() await conn.wait_closed() print("SSH 连接已关闭。") # --- 3. 创建 MCP 服务器实例,并应用生命周期管理 --- mcp = FastMCP("远程服务器工具", lifespan=app_lifespan) # --- 4. 保留的调试工具 (不安全) --- @mcp.tool() async def execute_remote_command(command: str, ctx: Context) -> str: """ 【调试用】在远程服务器上执行一个任意的shell命令并返回其输出。 警告:此工具有安全风险,请勿在生产环境中使用。 """ await ctx.info(f"准备在 {REMOTE_HOST} 上执行调试命令: '{command}'") try: # 每次都创建一个新连接,以确保环境隔离 async with asyncssh.connect(REMOTE_HOST, username=REMOTE_USER, client_keys=[PRIVATE_KEY_PATH]) as conn: # 在执行命令前先 source bashrc full_command = f'source /cluster/home/koko125/.bashrc && {command}' result = await conn.run(full_command, check=True) output = result.stdout.strip() await ctx.info("调试命令成功执行。") return output except asyncssh.ProcessError as e: error_message = f"命令执行失败: {e.stderr.strip()}" await ctx.error(error_message) return error_message except Exception as e: error_message = f"发生未知错误: {str(e)}" await ctx.error(error_message) return error_message # --- 5. 新增的规范、安全的工具 --- @mcp.tool() async def list_files_in_sandbox(ctx: Context[ServerSession, AppContext], path: str = ".") -> str: """ 【安全】列出远程服务器安全沙箱路径下的文件和目录。 Args: path: 要查看的相对路径,默认为当前沙箱目录。 """ try: app_ctx = ctx.request_context.lifespan_context conn = app_ctx.ssh_connection # 防止目录穿越攻击 if ".." in path.split('/'): return "错误: 不允许使用 '..' 访问上级目录。" # 安全地拼接路径 target_path = os.path.join(app_ctx.current_path, path) await ctx.info(f"正在列出安全路径: {target_path}") # 为了让 .bashrc 的设置(如别名)生效,最好也加上 source command_to_run = f'source /cluster/home/koko125/.bashrc && ls -l {conn.escape(target_path)}' result = await conn.run(command_to_run, check=True) return result.stdout.strip() except Exception as e: await ctx.error(f"执行 ls 命令失败: {e}") return f"错误: {e}" @mcp.tool() async def create_directory_in_sandbox(ctx: Context[ServerSession, AppContext], directory_name: str) -> str: """ 【安全】在远程服务器的安全沙箱内创建一个新目录。 Args: directory_name: 要创建的目录的名称。 """ try: app_ctx = ctx.request_context.lifespan_context conn = app_ctx.ssh_connection # 安全检查:确保目录名不包含斜杠或 "..",以防止创建深层目录或进行目录穿越 if '/' in directory_name or ".." in directory_name: return "错误: 目录名无效。不能包含 '/' 或 '..'。" target_path = os.path.join(app_ctx.current_path, directory_name) await ctx.info(f"正在创建远程目录: {target_path}") command_to_run = f'source /cluster/home/koko125/.bashrc && mkdir {conn.escape(target_path)}' await conn.run(command_to_run, check=True) return f"目录 '{directory_name}' 在沙箱中成功创建。" except asyncssh.ProcessError as e: error_message = f"创建目录失败: {e.stderr.strip()}" await ctx.error(error_message) return error_message except Exception as e: await ctx.error(f"创建目录时发生未知错误: {e}") return f"错误: {e}" @mcp.tool() async def read_file_in_sandbox(ctx: Context[ServerSession, AppContext], file_path: str) -> str: """ 【安全】读取远程服务器安全沙箱内指定文件的内容。 Args: file_path: 相对于沙箱目录的文件路径。 """ try: app_ctx = ctx.request_context.lifespan_context conn = app_ctx.ssh_connection # 安全检查:防止目录穿越 if ".." in file_path.split('/'): return "错误: 不允许使用 '..' 访问上级目录。" target_path = os.path.join(app_ctx.current_path, file_path) await ctx.info(f"正在读取远程文件: {target_path}") async with conn.start_sftp_client() as sftp: async with sftp.open(target_path, 'r') as f: content = await f.read() return content except FileNotFoundError: error_message = f"读取文件失败: 文件 '{file_path}' 不存在。" await ctx.error(error_message) return error_message except Exception as e: await ctx.error(f"读取文件时发生未知错误: {e}") return f"错误: {e}" @mcp.tool() async def write_file_in_sandbox(ctx: Context[ServerSession, AppContext], file_path: str, content: str) -> str: """ 【安全】向远程服务器安全沙箱内的文件写入内容。如果文件已存在,则会覆盖它。 Args: file_path: 相对于沙箱目录的文件路径。 content: 要写入文件的文本内容。 """ try: app_ctx = ctx.request_context.lifespan_context conn = app_ctx.ssh_connection normalized_file_path = file_path.replace("\\", "/") # 安全检查 if ".." in file_path.split('/'): return "错误: 不允许使用 '..' 访问上级目录。" target_path = os.path.join(app_ctx.current_path, normalized_file_path) await ctx.info(f"正在向远程文件写入: {target_path}") async with conn.start_sftp_client() as sftp: async with sftp.open(target_path, 'w') as f: await f.write(content) return f"内容已成功写入到沙箱文件 '{file_path}'。" except Exception as e: await ctx.error(f"写入文件时发生未知错误: {e}") return f"错误: {e}" # --- 运行服务器 --- if __name__ == "__main__": mcp.run(transport="streamable-http")