221 lines
8.1 KiB
Python
221 lines
8.1 KiB
Python
import asyncssh
|
||
import os
|
||
from contextlib import asynccontextmanager
|
||
from collections.abc import AsyncIterator
|
||
from dataclasses import dataclass
|
||
|
||
from mcp.server.fastmcp import FastMCP, Context
|
||
from mcp.server.session import ServerSession
|
||
|
||
# --- 1. 使用您提供的参数 ---
|
||
REMOTE_HOST = '202.121.182.208'
|
||
REMOTE_USER = 'koko125'
|
||
PRIVATE_KEY_PATH = 'D:/tool/tool/id_rsa.txt' # Windows 路径建议使用 /
|
||
INITIAL_WORKING_DIRECTORY = '/cluster/home/koko125/sandbox'
|
||
|
||
|
||
# --- 2. 生命周期管理部分 (高级功能) ---
|
||
|
||
@dataclass
|
||
class AppContext:
|
||
"""用于在服务器生命周期内持有共享资源的上下文对象"""
|
||
ssh_connection: asyncssh.SSHClientConnection
|
||
current_path: str
|
||
|
||
|
||
@asynccontextmanager
|
||
async def app_lifespan(server: FastMCP) -> AsyncIterator[AppContext]:
|
||
"""在服务器启动时建立SSH连接,在关闭时断开。"""
|
||
print("服务器启动中,正在建立SSH连接...")
|
||
conn = None
|
||
try:
|
||
conn = await asyncssh.connect(
|
||
REMOTE_HOST,
|
||
username=REMOTE_USER,
|
||
client_keys=[PRIVATE_KEY_PATH]
|
||
)
|
||
print("SSH 连接成功!")
|
||
|
||
# <<< 在登录时执行 source bashrc 命令 >>>
|
||
# 注意:bashrc 的效果(如环境变量)只会对这一个 conn.run() 调用生效。
|
||
# 对于非交互式shell,更好的做法是把需要执行的命令串联起来。
|
||
# 但如果只是为了加载路径等,可以在后续命令中体现。
|
||
print("正在加载 .bashrc...")
|
||
# 为了让 bashrc 生效,我们需要在一个交互式 shell 中执行命令
|
||
await conn.run('source /cluster/home/koko125/.bashrc', check=True)
|
||
print(".bashrc 加载完成。")
|
||
|
||
print(f"初始工作目录设置为: {INITIAL_WORKING_DIRECTORY}")
|
||
yield AppContext(ssh_connection=conn, current_path=INITIAL_WORKING_DIRECTORY)
|
||
finally:
|
||
if conn:
|
||
conn.close()
|
||
await conn.wait_closed()
|
||
print("SSH 连接已关闭。")
|
||
|
||
|
||
# --- 3. 创建 MCP 服务器实例,并应用生命周期管理 ---
|
||
mcp = FastMCP("远程服务器工具", lifespan=app_lifespan,port=8000)
|
||
|
||
|
||
# --- 4. 保留的调试工具 (不安全) ---
|
||
|
||
@mcp.tool()
|
||
async def execute_remote_command(command: str, ctx: Context) -> str:
|
||
"""
|
||
【调试用】在远程服务器上执行一个任意的shell命令并返回其输出。
|
||
警告:此工具有安全风险,请勿在生产环境中使用。
|
||
"""
|
||
await ctx.info(f"准备在 {REMOTE_HOST} 上执行调试命令: '{command}'")
|
||
try:
|
||
# 每次都创建一个新连接,以确保环境隔离
|
||
async with asyncssh.connect(REMOTE_HOST, username=REMOTE_USER, client_keys=[PRIVATE_KEY_PATH]) as conn:
|
||
# 在执行命令前先 source bashrc
|
||
full_command = f'source /cluster/home/koko125/.bashrc && {command}'
|
||
result = await conn.run(full_command, check=True)
|
||
output = result.stdout.strip()
|
||
await ctx.info("调试命令成功执行。")
|
||
return output
|
||
except asyncssh.ProcessError as e:
|
||
error_message = f"命令执行失败: {e.stderr.strip()}"
|
||
await ctx.error(error_message)
|
||
return error_message
|
||
except Exception as e:
|
||
error_message = f"发生未知错误: {str(e)}"
|
||
await ctx.error(error_message)
|
||
return error_message
|
||
|
||
|
||
# --- 5. 新增的规范、安全的工具 ---
|
||
|
||
@mcp.tool()
|
||
async def list_files_in_sandbox(ctx: Context[ServerSession, AppContext], path: str = ".") -> str:
|
||
"""
|
||
【安全】列出远程服务器安全沙箱路径下的文件和目录。
|
||
|
||
Args:
|
||
path: 要查看的相对路径,默认为当前沙箱目录。
|
||
"""
|
||
try:
|
||
app_ctx = ctx.request_context.lifespan_context
|
||
conn = app_ctx.ssh_connection
|
||
|
||
# 防止目录穿越攻击
|
||
if ".." in path.split('/'):
|
||
return "错误: 不允许使用 '..' 访问上级目录。"
|
||
|
||
# 安全地拼接路径
|
||
target_path = os.path.join(app_ctx.current_path, path)
|
||
|
||
await ctx.info(f"正在列出安全路径: {target_path}")
|
||
|
||
# 为了让 .bashrc 的设置(如别名)生效,最好也加上 source
|
||
command_to_run = f'source /cluster/home/koko125/.bashrc && ls -l {conn.escape(target_path)}'
|
||
result = await conn.run(command_to_run, check=True)
|
||
return result.stdout.strip()
|
||
except Exception as e:
|
||
await ctx.error(f"执行 ls 命令失败: {e}")
|
||
return f"错误: {e}"
|
||
|
||
|
||
@mcp.tool()
|
||
async def create_directory_in_sandbox(ctx: Context[ServerSession, AppContext], directory_name: str) -> str:
|
||
"""
|
||
【安全】在远程服务器的安全沙箱内创建一个新目录。
|
||
|
||
Args:
|
||
directory_name: 要创建的目录的名称。
|
||
"""
|
||
try:
|
||
app_ctx = ctx.request_context.lifespan_context
|
||
conn = app_ctx.ssh_connection
|
||
|
||
# 安全检查:确保目录名不包含斜杠或 "..",以防止创建深层目录或进行目录穿越
|
||
if '/' in directory_name or ".." in directory_name:
|
||
return "错误: 目录名无效。不能包含 '/' 或 '..'。"
|
||
|
||
target_path = os.path.join(app_ctx.current_path, directory_name)
|
||
|
||
await ctx.info(f"正在创建远程目录: {target_path}")
|
||
|
||
command_to_run = f'source /cluster/home/koko125/.bashrc && mkdir {conn.escape(target_path)}'
|
||
await conn.run(command_to_run, check=True)
|
||
|
||
return f"目录 '{directory_name}' 在沙箱中成功创建。"
|
||
except asyncssh.ProcessError as e:
|
||
error_message = f"创建目录失败: {e.stderr.strip()}"
|
||
await ctx.error(error_message)
|
||
return error_message
|
||
except Exception as e:
|
||
await ctx.error(f"创建目录时发生未知错误: {e}")
|
||
return f"错误: {e}"
|
||
|
||
|
||
@mcp.tool()
|
||
async def read_file_in_sandbox(ctx: Context[ServerSession, AppContext], file_path: str) -> str:
|
||
"""
|
||
【安全】读取远程服务器安全沙箱内指定文件的内容。
|
||
|
||
Args:
|
||
file_path: 相对于沙箱目录的文件路径。
|
||
"""
|
||
try:
|
||
app_ctx = ctx.request_context.lifespan_context
|
||
conn = app_ctx.ssh_connection
|
||
|
||
# 安全检查:防止目录穿越
|
||
if ".." in file_path.split('/'):
|
||
return "错误: 不允许使用 '..' 访问上级目录。"
|
||
|
||
target_path = os.path.join(app_ctx.current_path, file_path)
|
||
|
||
await ctx.info(f"正在读取远程文件: {target_path}")
|
||
|
||
async with conn.start_sftp_client() as sftp:
|
||
async with sftp.open(target_path, 'r') as f:
|
||
content = await f.read()
|
||
return content
|
||
|
||
except FileNotFoundError:
|
||
error_message = f"读取文件失败: 文件 '{file_path}' 不存在。"
|
||
await ctx.error(error_message)
|
||
return error_message
|
||
except Exception as e:
|
||
await ctx.error(f"读取文件时发生未知错误: {e}")
|
||
return f"错误: {e}"
|
||
|
||
|
||
@mcp.tool()
|
||
async def write_file_in_sandbox(ctx: Context[ServerSession, AppContext], file_path: str, content: str) -> str:
|
||
"""
|
||
【安全】向远程服务器安全沙箱内的文件写入内容。如果文件已存在,则会覆盖它。
|
||
|
||
Args:
|
||
file_path: 相对于沙箱目录的文件路径。
|
||
content: 要写入文件的文本内容。
|
||
"""
|
||
try:
|
||
app_ctx = ctx.request_context.lifespan_context
|
||
conn = app_ctx.ssh_connection
|
||
normalized_file_path = file_path.replace("\\", "/")
|
||
# 安全检查
|
||
if ".." in file_path.split('/'):
|
||
return "错误: 不允许使用 '..' 访问上级目录。"
|
||
|
||
target_path = os.path.join(app_ctx.current_path, normalized_file_path)
|
||
|
||
await ctx.info(f"正在向远程文件写入: {target_path}")
|
||
|
||
async with conn.start_sftp_client() as sftp:
|
||
async with sftp.open(target_path, 'w') as f:
|
||
await f.write(content)
|
||
|
||
return f"内容已成功写入到沙箱文件 '{file_path}'。"
|
||
except Exception as e:
|
||
await ctx.error(f"写入文件时发生未知错误: {e}")
|
||
return f"错误: {e}"
|
||
|
||
|
||
# --- 运行服务器 ---
|
||
if __name__ == "__main__":
|
||
mcp.run(transport="streamable-http") |