Files
solidstate-tools/mcp/server.py
2025-10-13 10:08:59 +08:00

221 lines
8.1 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import asyncssh
import os
from contextlib import asynccontextmanager
from collections.abc import AsyncIterator
from dataclasses import dataclass
from mcp.server.fastmcp import FastMCP, Context
from mcp.server.session import ServerSession
# --- 1. 使用您提供的参数 ---
REMOTE_HOST = '202.121.182.208'
REMOTE_USER = 'koko125'
PRIVATE_KEY_PATH = 'D:/tool/tool/id_rsa.txt' # Windows 路径建议使用 /
INITIAL_WORKING_DIRECTORY = '/cluster/home/koko125/sandbox'
# --- 2. 生命周期管理部分 (高级功能) ---
@dataclass
class AppContext:
"""用于在服务器生命周期内持有共享资源的上下文对象"""
ssh_connection: asyncssh.SSHClientConnection
current_path: str
@asynccontextmanager
async def app_lifespan(server: FastMCP) -> AsyncIterator[AppContext]:
"""在服务器启动时建立SSH连接在关闭时断开。"""
print("服务器启动中正在建立SSH连接...")
conn = None
try:
conn = await asyncssh.connect(
REMOTE_HOST,
username=REMOTE_USER,
client_keys=[PRIVATE_KEY_PATH]
)
print("SSH 连接成功!")
# <<< 在登录时执行 source bashrc 命令 >>>
# 注意bashrc 的效果(如环境变量)只会对这一个 conn.run() 调用生效。
# 对于非交互式shell更好的做法是把需要执行的命令串联起来。
# 但如果只是为了加载路径等,可以在后续命令中体现。
print("正在加载 .bashrc...")
# 为了让 bashrc 生效,我们需要在一个交互式 shell 中执行命令
await conn.run('source /cluster/home/koko125/.bashrc', check=True)
print(".bashrc 加载完成。")
print(f"初始工作目录设置为: {INITIAL_WORKING_DIRECTORY}")
yield AppContext(ssh_connection=conn, current_path=INITIAL_WORKING_DIRECTORY)
finally:
if conn:
conn.close()
await conn.wait_closed()
print("SSH 连接已关闭。")
# --- 3. 创建 MCP 服务器实例,并应用生命周期管理 ---
mcp = FastMCP("远程服务器工具", lifespan=app_lifespan,port=8000)
# --- 4. 保留的调试工具 (不安全) ---
@mcp.tool()
async def execute_remote_command(command: str, ctx: Context) -> str:
"""
【调试用】在远程服务器上执行一个任意的shell命令并返回其输出。
警告:此工具有安全风险,请勿在生产环境中使用。
"""
await ctx.info(f"准备在 {REMOTE_HOST} 上执行调试命令: '{command}'")
try:
# 每次都创建一个新连接,以确保环境隔离
async with asyncssh.connect(REMOTE_HOST, username=REMOTE_USER, client_keys=[PRIVATE_KEY_PATH]) as conn:
# 在执行命令前先 source bashrc
full_command = f'source /cluster/home/koko125/.bashrc && {command}'
result = await conn.run(full_command, check=True)
output = result.stdout.strip()
await ctx.info("调试命令成功执行。")
return output
except asyncssh.ProcessError as e:
error_message = f"命令执行失败: {e.stderr.strip()}"
await ctx.error(error_message)
return error_message
except Exception as e:
error_message = f"发生未知错误: {str(e)}"
await ctx.error(error_message)
return error_message
# --- 5. 新增的规范、安全的工具 ---
@mcp.tool()
async def list_files_in_sandbox(ctx: Context[ServerSession, AppContext], path: str = ".") -> str:
"""
【安全】列出远程服务器安全沙箱路径下的文件和目录。
Args:
path: 要查看的相对路径,默认为当前沙箱目录。
"""
try:
app_ctx = ctx.request_context.lifespan_context
conn = app_ctx.ssh_connection
# 防止目录穿越攻击
if ".." in path.split('/'):
return "错误: 不允许使用 '..' 访问上级目录。"
# 安全地拼接路径
target_path = os.path.join(app_ctx.current_path, path)
await ctx.info(f"正在列出安全路径: {target_path}")
# 为了让 .bashrc 的设置(如别名)生效,最好也加上 source
command_to_run = f'source /cluster/home/koko125/.bashrc && ls -l {conn.escape(target_path)}'
result = await conn.run(command_to_run, check=True)
return result.stdout.strip()
except Exception as e:
await ctx.error(f"执行 ls 命令失败: {e}")
return f"错误: {e}"
@mcp.tool()
async def create_directory_in_sandbox(ctx: Context[ServerSession, AppContext], directory_name: str) -> str:
"""
【安全】在远程服务器的安全沙箱内创建一个新目录。
Args:
directory_name: 要创建的目录的名称。
"""
try:
app_ctx = ctx.request_context.lifespan_context
conn = app_ctx.ssh_connection
# 安全检查:确保目录名不包含斜杠或 "..",以防止创建深层目录或进行目录穿越
if '/' in directory_name or ".." in directory_name:
return "错误: 目录名无效。不能包含 '/''..'"
target_path = os.path.join(app_ctx.current_path, directory_name)
await ctx.info(f"正在创建远程目录: {target_path}")
command_to_run = f'source /cluster/home/koko125/.bashrc && mkdir {conn.escape(target_path)}'
await conn.run(command_to_run, check=True)
return f"目录 '{directory_name}' 在沙箱中成功创建。"
except asyncssh.ProcessError as e:
error_message = f"创建目录失败: {e.stderr.strip()}"
await ctx.error(error_message)
return error_message
except Exception as e:
await ctx.error(f"创建目录时发生未知错误: {e}")
return f"错误: {e}"
@mcp.tool()
async def read_file_in_sandbox(ctx: Context[ServerSession, AppContext], file_path: str) -> str:
"""
【安全】读取远程服务器安全沙箱内指定文件的内容。
Args:
file_path: 相对于沙箱目录的文件路径。
"""
try:
app_ctx = ctx.request_context.lifespan_context
conn = app_ctx.ssh_connection
# 安全检查:防止目录穿越
if ".." in file_path.split('/'):
return "错误: 不允许使用 '..' 访问上级目录。"
target_path = os.path.join(app_ctx.current_path, file_path)
await ctx.info(f"正在读取远程文件: {target_path}")
async with conn.start_sftp_client() as sftp:
async with sftp.open(target_path, 'r') as f:
content = await f.read()
return content
except FileNotFoundError:
error_message = f"读取文件失败: 文件 '{file_path}' 不存在。"
await ctx.error(error_message)
return error_message
except Exception as e:
await ctx.error(f"读取文件时发生未知错误: {e}")
return f"错误: {e}"
@mcp.tool()
async def write_file_in_sandbox(ctx: Context[ServerSession, AppContext], file_path: str, content: str) -> str:
"""
【安全】向远程服务器安全沙箱内的文件写入内容。如果文件已存在,则会覆盖它。
Args:
file_path: 相对于沙箱目录的文件路径。
content: 要写入文件的文本内容。
"""
try:
app_ctx = ctx.request_context.lifespan_context
conn = app_ctx.ssh_connection
normalized_file_path = file_path.replace("\\", "/")
# 安全检查
if ".." in file_path.split('/'):
return "错误: 不允许使用 '..' 访问上级目录。"
target_path = os.path.join(app_ctx.current_path, normalized_file_path)
await ctx.info(f"正在向远程文件写入: {target_path}")
async with conn.start_sftp_client() as sftp:
async with sftp.open(target_path, 'w') as f:
await f.write(content)
return f"内容已成功写入到沙箱文件 '{file_path}'"
except Exception as e:
await ctx.error(f"写入文件时发生未知错误: {e}")
return f"错误: {e}"
# --- 运行服务器 ---
if __name__ == "__main__":
mcp.run(transport="streamable-http")