# softbv_mcp_refactored.py # # 这是 softBV 工具集的重构版本。 # # - 核心框架: 保持了原有的 SSH 生命周期管理和远程环境激活机制 [6]。 # - 重构起点: 暂时移除了旧的 softbv_info, softbv_md, softbv_gen_cube 函数。 # - 新增工具: 包含一个新的、功能单一的 `softbv_print_cell` 工具,用于执行 `softBV.x --print-cell` [6]。 # # 依赖: # pip install "mcp[cli]" asyncssh pydantic # # 用法: # 1. 直接运行进行测试: `python softbv_mcp_refactored.py` # 2. 或集成到 Starlette (main.py): # from softbv_mcp_refactored import create_softbv_mcp # softbv_mcp = create_softbv_mcp() # Mount("/softbv-new", app=softbv_mcp.streamable_http_app()) import os import posixpath import asyncio from dataclasses import dataclass from typing import Any from collections.abc import AsyncIterator from contextlib import asynccontextmanager import asyncssh from pydantic import BaseModel, Field from mcp.server.fastmcp import FastMCP, Context from mcp.server.session import ServerSession import re # 确保导入 re 模块 # ========= 1. 配置与常量 ========= # 这些配置与您之前的脚本保持一致,可以从环境变量或硬编码中获取。 # 从 lifespan.py 或其他配置文件导入,如果它们存在的话 try: from lifespan import REMOTE_HOST, REMOTE_USER, PRIVATE_KEY_PATH except ImportError: # 如果找不到,则使用默认值 REMOTE_HOST = os.getenv("REMOTE_HOST", "202.121.182.208") REMOTE_USER = os.getenv("REMOTE_USER", "koko125") PRIVATE_KEY_PATH = os.getenv("PRIVATE_KEY_PATH", "D:/tool/tool/id_rsa.txt") # softBV 环境相关的固定路径 SOFTBV_PROFILE = os.getenv("SOFTBV_PROFILE", "/cluster/home/koko125/script/softBV.sh") SOFTBV_BIN = os.getenv("SOFTBV_BIN", "/cluster/home/koko125/tool/softBV-GUI_linux/bin/softBV.x") DEFAULT_WORKDIR = os.getenv("DEFAULT_WORKDIR", f"/cluster/home/{REMOTE_USER}/sandbox") class PPTUnitaryItem(BaseModel): """PROPERTY_UNITARY 表中的一行数据""" type: str an: int = Field(description="Atomic Number") softness: float rc: float = Field(description="Cutoff Radius") q: float = Field(description="Charge") class PPTBinaryItem(BaseModel): """PROPERTY_BINARY 表中的一行数据""" cation: str anion: str D_0: float R_min: float b: float R_0: float R_cutoff: float class PPTResult(BaseModel): """'--print-ppt' 命令的完整结构化解析结果""" screening_factor: float | None = Field(description="最终计算出的 screening factor") unitary_properties: list[PPTUnitaryItem] = Field(description="一元属性列表") binary_properties: list[PPTBinaryItem] = Field(description="二元属性列表") raw_output: str = Field(description="命令的完整原始标准输出") class GenCubeDefaultArgs(BaseModel): """用于'--gen-cube'的默认参数模式""" input_cif: str = Field(description="远程 CIF 文件路径") type: str = Field(description="导电离子类型 (例如 'Li')") os: int = Field(description="导电离子氧化态 (例如 1)") class GenCubeFullArgs(BaseModel): """用于'--gen-cube'的全参数模式""" input_cif: str = Field(description="远程 CIF 文件路径") type: str = Field(description="导电离子类型") os: int = Field(description="导电离子氧化态") sf: float = Field(description="自定义 screening factor (非正值则自动计算)") resolution: float = Field(description="体素分辨率") ignore_conducting_ion: bool = Field(description="是否在计算中忽略导电离子") periodic: bool = Field(description="是否使用周期性边界条件") output_name: str | None = Field(default=None, description="输出文件的前缀 (可选)") class GenCubeResult(BaseModel): """'--gen-cube' 命令的执行结果""" command_used: str working_directory: str exit_status: int log_file: str = Field(description="远程服务器上的完整日志文件路径") output_head: str = Field(description="日志文件的前40行,用于快速预览") output_tail: str = Field(description="日志文件的后40行,用于查看最终结果") new_files: list[str] = Field(description="执行后新生成的文件列表") elapsed_seconds: int = Field(description="任务总耗时(秒)") class BVSumItem(BaseModel): """每个离子的键价和 (Bond Valence Sum)""" name: str = Field(description="离子名称") type: str = Field(description="离子类型") occ: float = Field(description="占有率 (occupancy)") bv_sum: float = Field(description="键价和 (bond valence sum)") class BVPairItem(BaseModel): """离子对之间的键价""" name1: str type1: str occ1: float name2: str type2: str occ2: float bv: float = Field(description="键价 (bond valence)") class CalBVResult(BaseModel): """'--cal-bv' 命令的完整结构化解析结果""" global_instability_index: float | None = Field(None, description="全局不稳定性指数 (GII)") suggested_stability: str | None = Field(None, description="建议的稳定性 (例如 'stable')") bv_sums: list[BVSumItem] = Field(description="每个离子的键价和列表") bv_pairs: list[BVPairItem] = Field(description="离子对之间的键价列表") raw_output: str = Field(description="命令的完整原始标准输出") class CubeAtomInfo(BaseModel): """从 .cube 文件中解析出的单个原子信息""" index: int name: str | None = None type: str os: int = Field(description="Oxidation State") x_f: tuple[float, float, float] = Field(description="Fractional coordinates") occ: float = Field(description="Occupancy") class CubeCellParams(BaseModel): """晶格常数""" a: float b: float c: float class CubeCellAngles(BaseModel): """晶格角度""" alpha: float beta: float gamma: float class PrintCubeResult(BaseModel): """'--print-cube' 命令的完整结构化解析结果""" name: str = Field(description="Cube 文件名") cell_vector_matrix: list[list[float]] = Field(description="晶胞矢量矩阵") cell_parameters: CubeCellParams cell_angles: CubeCellAngles total_atoms: int atoms: list[CubeAtomInfo] = Field(description="原子信息列表") cube_size: tuple[int, int, int] cube_cell_vector: list[list[float]] cube_voxel_vector: list[list[float]] = Field(description="Voxel vector in Angstroms") raw_output: str = Field(description="命令的完整原始标准输出") class CalGIIResult(BaseModel): """'--cal-gii' 命令的结构化解析结果""" global_instability_index: float | None = Field(None, description="全局不稳定性指数 (GII)") suggested_stability: str | None = Field(None, description="建议的稳定性 (例如 'stable')") raw_output: str = Field(description="命令的完整原始标准输出") class CoordinationItem(BaseModel): """'--cal-cn' 输出表格中的一行""" index: int shell: int coordinated_type: str r: float = Field(description="距离 (Å)") occ: float = Field(description="占有率") n: float = Field(description="累计配位数") rds: float class CalCNResult(BaseModel): """'--cal-cn' 命令的结构化解析结果""" center_atom: str | None = Field(None, description="中心原子名称") coordination_number: float | None = Field(None, description="总配位数") coordination_details: list[CoordinationItem] = Field(description="配位详细信息列表") raw_output: str = Field(description="命令的完整原始标准输出") class CalSymResult(BaseModel): """'--cal-sym' 命令的结构化解析结果""" determined_space_group: str | None = Field(None, description="最终确定的空间群") fits_to_symmetries: list[str] = Field(description="计算过程中拟合到的所有对称性列表") raw_output: str = Field(description="命令的完整原始标准输出") class CalTotEnResult(BaseModel): """'--cal-tot-en' 命令的结构化解析结果""" total_energy_eV: float | None = Field(None, description="计算出的总能量 (eV)") screening_factor_used: float | None = Field(None, description="计算中使用的 screening factor (sf)") raw_output: str = Field(description="命令的完整原始标准输出") # ========= 2. 辅助函数 ========= def shell_quote(arg: str) -> str: """安全地将字符串作为单个 shell 参数 (POSIX)。""" return "'" + str(arg).replace("'", "'\"'\"'") + "'" async def run_in_softbv_env( conn: asyncssh.SSHClientConnection, profile_path: str, cmd: str, cwd: str | None = None, check: bool = True, ) -> asyncssh.SSHCompletedProcess: """ 在远端 bash 会话中激活 softBV 环境并执行命令。 """ parts = [] if cwd: parts.append(f"cd {shell_quote(cwd)}") # 激活环境脚本 parts.append(f"source {shell_quote(profile_path)}") # 执行主命令 parts.append(cmd) composite_cmd = "; ".join(parts) # 使用 bash -lc 确保登录环境和命令可以被正确解析 full_command = f"bash -lc {shell_quote(composite_cmd)}" return await conn.run(full_command, check=check) def _parse_ppt_output(raw_text: str) -> PPTResult: """ 将 '--print-ppt' 的原始 stdout 解析为结构化的 PPTResult 模型。 """ lines = raw_text.splitlines() screening_factor = None unitary_items = [] binary_items = [] # 查找 screening factor for line in reversed(lines): if "screening factor =" in line: try: screening_factor = float(line.split("=")[-1].strip()) break except (ValueError, IndexError): continue # 解析表格 in_unitary = False in_binary = False for line in lines: if "PPT: | type | an |" in line: in_unitary = True in_binary = False continue if "PPT: | cation | anion |" in line: in_unitary = False in_binary = True continue if "PPT:--" in line or "PPT:==" in line: continue parts = [p.strip() for p in line.split("|") if p.strip()] if in_unitary and len(parts) == 5: try: unitary_items.append(PPTUnitaryItem( type=parts[0], an=int(parts[1]), softness=float(parts[2]), rc=float(parts[3]), q=float(parts[4]) )) except (ValueError, IndexError): continue elif in_binary and len(parts) == 7: try: binary_items.append(PPTBinaryItem( cation=parts[0], anion=parts[1], D_0=float(parts[2]), R_min=float(parts[3]), b=float(parts[4]), R_0=float(parts[5]), R_cutoff=float(parts[6]) )) except (ValueError, IndexError): continue return PPTResult( screening_factor=screening_factor, unitary_properties=unitary_items, binary_properties=binary_items, raw_output=raw_text ) async def _run_in_softbv_env_stream( conn: asyncssh.SSHClientConnection, profile_path: str, cmd: str, cwd: str | None = None, ) -> asyncssh.SSHClientProcess: """以非阻塞方式,在激活环境中启动一个命令,并返回进程句柄。""" parts = [] if cwd: parts.append(f"cd {shell_quote(cwd)}") # 将 source 的输出重定向,避免污染主命令的日志 parts.append(f"source {shell_quote(profile_path)} >/dev/null 2>&1 || true") parts.append(cmd) composite_cmd = "; ".join(parts) full_command = f"bash -lc {shell_quote(composite_cmd)}" proc = await conn.create_process(full_command) return proc async def _listdir_safe(conn: asyncssh.SSHClientConnection, path: str) -> list[str]: """安全地列出目录内容,失败时返回空列表。""" try: async with conn.start_sftp_client() as sftp: return await sftp.listdir(path) except Exception: return [] async def _stat_size_safe(conn: asyncssh.SSHClientConnection, path: str) -> int | None: """安全地获取文件大小,失败时返回 None。""" try: async with conn.start_sftp_client() as sftp: attrs = await sftp.stat(path) return int(attrs.size or 0) except Exception: return None def _parse_cal_bv_output(raw_text: str) -> CalBVResult: """ 将 '--cal-bv' 的原始 stdout 解析为结构化的 CalBVResult 模型。 """ lines = raw_text.splitlines() gii = None stability = None bv_sums = [] bv_pairs = [] in_sum_section = False in_pair_section = False # 解析 GII 和稳定性 for line in lines: if line.startswith("GII: Global instability index ="): try: gii = float(line.split("=")[-1].strip()) except (ValueError, IndexError): pass elif line.startswith("GII: suggested stability:"): try: stability = line.split(":", 1)[-1].strip() except IndexError: pass # 解析表格 for line in lines: if line.startswith("BV: name type occ"): in_sum_section = True in_pair_section = False continue if line.startswith("BV: nam1 typ1 occ1"): in_sum_section = False in_pair_section = True continue if line.startswith("BV:="): continue if not line.startswith("BV:"): in_sum_section = False in_pair_section = False # 使用正则表达式进行更稳健的解析,以处理可变宽度的列 if in_sum_section: match = re.match(r"BV:\s+(\S+)\s+(\S+)\s+([\d.]+)\s+([-\d.]+)", line) if match: try: bv_sums.append(BVSumItem( name=match.group(1), type=match.group(2), occ=float(match.group(3)), bv_sum=float(match.group(4)) )) except (ValueError, IndexError): continue elif in_pair_section: # 这是一个更复杂的行,使用 shlex 更安全地分割 try: # 移除 "BV:" 前缀并分割 parts = line[3:].strip().split() if len(parts) == 7: bv_pairs.append(BVPairItem( name1=parts[0], type1=parts[1], occ1=float(parts[2]), name2=parts[3], type2=parts[4], occ2=float(parts[5]), bv=float(parts[6]) )) except (ValueError, IndexError): continue return CalBVResult( global_instability_index=gii, suggested_stability=stability, bv_sums=bv_sums, bv_pairs=bv_pairs, raw_output=raw_text ) def _parse_print_cube_output(raw_text: str) -> PrintCubeResult: """ 将 '--print-cube' 的原始 stdout 解析为结构化的 PrintCubeResult 模型。 """ lines = raw_text.splitlines() # Helper to parse vector blocks def parse_vector_block(start_line_idx: int) -> list[list[float]]: matrix = [] for i in range(3): line = lines[start_line_idx + i] parts = [float(p) for p in line.strip().split()] matrix.append(parts) return matrix # Find key lines and parse data name = re.search(r"CELL: name: (.*)", raw_text).group(1).strip() total_atoms = int(re.search(r"CELL: total atom: (\d+)", raw_text).group(1)) params_line = re.search(r"CELL: parameters:\s+([\d.\s]+)", raw_text).group(1) a, b, c = map(float, params_line.split()) cell_params = CubeCellParams(a=a, b=b, c=c) angles_line = re.search(r"CELL: angles:\s+([\d.\s]+)", raw_text).group(1) alpha, beta, gamma = map(float, angles_line.split()) cell_angles = CubeCellAngles(alpha=alpha, beta=beta, gamma=gamma) cube_size_line = re.search(r"CUBE: cube size = (\d+) \* (\d+) \* (\d+)", raw_text).group(1, 2, 3) cube_size = tuple(map(int, cube_size_line)) atoms = [] cell_matrix, cube_cell_vec, cube_voxel_vec = [], [], [] for i, line in enumerate(lines): if "CELL: vector matrix:" in line: cell_matrix = parse_vector_block(i + 1) elif "CUBE: cell vector (A):" in line: cube_cell_vec = parse_vector_block(i + 1) elif "CUBE: votex vector (A):" in line: # Note: 'votex' is a typo in the original output cube_voxel_vec = parse_vector_block(i + 1) elif "ATOM:" in line: atom_match = re.search( r"\[\s*(\d+)\]ATOM: name=\s*([^;]*);type=([^;]*);os=\s*(-?\d+);x_f=\(([^)]*)\);occ=([\d.]+)", line) if atom_match: idx, atom_name, atom_type, os_val, xf_str, occ_val = atom_match.groups() xf_coords = tuple(map(float, xf_str.split(','))) atoms.append(CubeAtomInfo( index=int(idx), name=atom_name.strip() or None, type=atom_type.strip(), os=int(os_val), x_f=xf_coords, occ=float(occ_val) )) return PrintCubeResult( name=name, cell_vector_matrix=cell_matrix, cell_parameters=cell_params, cell_angles=cell_angles, total_atoms=total_atoms, atoms=atoms, cube_size=cube_size, cube_cell_vector=cube_cell_vec, cube_voxel_vector=cube_voxel_vec, raw_output=raw_text ) def _parse_cal_gii_output(raw_text: str) -> CalGIIResult: """解析 '--cal-gii' 的输出。""" gii, stability = None, None for line in raw_text.splitlines(): if line.startswith("GII: Global instability index ="): try: gii = float(line.split("=")[-1].strip()) except (ValueError, IndexError): pass elif line.startswith("GII: suggested stability:"): try: stability = line.split(":", 1)[-1].strip().rstrip('.') except IndexError: pass return CalGIIResult(global_instability_index=gii, suggested_stability=stability, raw_output=raw_text) def _parse_cal_cn_output(raw_text: str) -> CalCNResult: """解析 '--cal-cn' 的输出。""" center_atom, cn = None, None details = [] in_table = False for line in raw_text.splitlines(): if line.startswith("CN: Center atom:"): center_atom = line.split(":", 1)[-1].strip() elif line.startswith("CN: Coordination Number:"): try: cn = float(line.split(":", 1)[-1].strip()) except (ValueError, IndexError): pass elif "CN: Index Shell" in line: in_table = True continue if in_table and line.startswith("CN:"): try: parts = line[3:].strip().split() if len(parts) == 7: details.append(CoordinationItem( index=int(parts[0]), shell=int(parts[1]), coordinated_type=parts[2], r=float(parts[3]), occ=float(parts[4]), n=float(parts[5]), rds=float(parts[6]) )) except (ValueError, IndexError): pass return CalCNResult(center_atom=center_atom, coordination_number=cn, coordination_details=details, raw_output=raw_text) def _parse_cal_sym_output(raw_text: str) -> CalSymResult: """解析 '--cal-sym' 的输出。""" determined_sg = None fits = [] for line in raw_text.splitlines(): if line.startswith("Message: structure fits to symmetry"): fits.append(line.split("symmetry")[-1].strip().rstrip('.')) elif line.startswith("Message: determined space group:"): determined_sg = line.split(":", 1)[-1].strip() return CalSymResult(determined_space_group=determined_sg, fits_to_symmetries=fits, raw_output=raw_text) def _parse_cal_tot_en_output(raw_text: str) -> CalTotEnResult: """解析 '--cal-tot-en' 的输出。""" total_energy, sf = None, None for line in raw_text.splitlines(): if "total energy =" in line: try: # 提取能量值 energy_str = line.split("=")[-1].strip().split()[0] total_energy = float(energy_str) except (ValueError, IndexError): pass elif line.startswith("Message: sf="): try: # 提取使用的 sf 值 sf = float(line.split("=")[-1].strip()) except (ValueError, IndexError): pass return CalTotEnResult( total_energy_eV=total_energy, screening_factor_used=sf, raw_output=raw_text ) # ========= 3. 生命周期管理 ========= @dataclass class SoftBVContext: """在服务器生命周期内共享的上下文对象。""" ssh_connection: asyncssh.SSHClientConnection workdir: str profile: str bin_path: str @asynccontextmanager async def softbv_lifespan(_server: FastMCP) -> AsyncIterator[SoftBVContext]: """ FastMCP 生命周期管理器:在服务器启动时建立 SSH 连接,在关闭时安全断开。 这个函数创建的 SoftBVContext 将被注入到所有工具中 [1, 6]。 """ conn: asyncssh.SSHClientConnection | None = None try: await asyncio.sleep(0) # 确保事件循环已启动 print(f"正在连接到 {REMOTE_USER}@{REMOTE_HOST}...") conn = await asyncssh.connect( REMOTE_HOST, username=REMOTE_USER, client_keys=[PRIVATE_KEY_PATH], known_hosts=None, connect_timeout=15, ) print("SSH 连接成功!") # `yield` 将上下文提供给 MCP 服务器 yield SoftBVContext( ssh_connection=conn, workdir=DEFAULT_WORKDIR, profile=SOFTBV_PROFILE, bin_path=SOFTBV_BIN, ) finally: # 服务器关闭时,清理资源 if conn: conn.close() await conn.wait_closed() print("SSH 连接已关闭。") # ========= 4. MCP 服务器实例 ========= mcp = FastMCP( name="softBV Tools (Refactored)", instructions="用于执行 softBV 相关计算的工具集(重构版)。", lifespan=softbv_lifespan, streamable_http_path="/", # 方便挂载到 Starlette [1] ) # ========= 5. 工具定义 ========= # # 原有的 softbv_info, softbv_md, softbv_gen_cube 函数已移除, # 等待您完成重构后再添加。 # @mcp.tool() async def softbv_print_cell( cif_path: str, cwd: str | None = None, ctx: Context[ServerSession, SoftBVContext] | None = None, ) -> dict[str, Any]: """ 执行 'softBV.x --print-cell' 命令,打印并返回晶胞信息。 Args: cif_path (str): 远程服务器上的 CIF 文件路径(可以是相对或绝对路径)。 cwd (str | None): 远程工作目录。如果未提供,则使用默认沙箱目录。 ctx: MCP 上下文,由框架自动注入。 Returns: dict: 一个包含命令执行结果的字典,包括退出码、标准输出和标准错误。 """ if ctx is None: raise ValueError("Context is required for this operation.") app_ctx = ctx.request_context.lifespan_context conn = app_ctx.ssh_connection workdir = cwd or app_ctx.workdir # 如果 cif_path 不是绝对路径,则基于工作目录构建完整路径 input_abs_path = cif_path if not input_abs_path.startswith("/"): input_abs_path = posixpath.normpath(posixpath.join(workdir, cif_path)) # 构造命令 cmd = f"{shell_quote(app_ctx.bin_path)} --print-cell {shell_quote(input_abs_path)}" await ctx.info(f"执行命令: {cmd} (工作目录: {workdir})") # 在激活的环境中运行命令,不要求必须成功 (check=False) proc = await run_in_softbv_env(conn, app_ctx.profile, cmd=cmd, cwd=workdir, check=False) if proc.exit_status != 0: await ctx.warning(f"命令执行失败,退出码: {proc.exit_status}") # 返回结构化输出,便于 AI 或客户端解析 [1, 6] return { "command_used": cmd, "working_directory": workdir, "exit_status": proc.exit_status, "stdout": proc.stdout, "stderr": proc.stderr, } @mcp.tool() async def softbv_print_ppt( cif_path: str, cwd: str | None = None, ctx: Context[ServerSession, SoftBVContext] | None = None, ) -> PPTResult: """ 执行 'softBV.x --print-ppt',计算并返回配对势参数 (Pair-Potential Terms)。 Args: cif_path (str): 远程服务器上的 CIF 文件路径(可以是相对或绝对路径)。 cwd (str | None): 远程工作目录。如果未提供,则使用默认沙箱目录。 ctx: MCP 上下文,由框架自动注入。 Returns: PPTResult: 一个包含解析后的配对势参数和原始输出的结构化对象。 """ if ctx is None: raise ValueError("Context is required for this operation.") app_ctx = ctx.request_context.lifespan_context conn = app_ctx.ssh_connection workdir = cwd or app_ctx.workdir # 构建路径和命令 input_abs_path = cif_path if not input_abs_path.startswith("/"): input_abs_path = posixpath.normpath(posixpath.join(workdir, cif_path)) cmd = f"{shell_quote(app_ctx.bin_path)} --print-ppt {shell_quote(input_abs_path)}" await ctx.info(f"执行命令: {cmd} (工作目录: {workdir})") # 执行命令 proc = await run_in_softbv_env(conn, app_ctx.profile, cmd=cmd, cwd=workdir, check=False) if proc.exit_status != 0: # 即使失败,也尝试解析可能的错误信息 await ctx.warning(f"命令执行失败,退出码: {proc.exit_status}") # 在失败时返回一个包含原始输出的空结果,AI 可以从中读取错误 return PPTResult( screening_factor=None, unitary_properties=[], binary_properties=[], raw_output=f"Exit Status: {proc.exit_status}\n\nSTDOUT:\n{proc.stdout}\n\nSTDERR:\n{proc.stderr}" ) # 解析成功的输出并返回结构化数据 parsed_result = _parse_ppt_output(proc.stdout) return parsed_result # 在 softbv_mcp_refactored.py 的工具定义区域添加 @mcp.tool(name="softbv_gen_cube") async def softbv_gen_cube( args: GenCubeDefaultArgs | GenCubeFullArgs, cwd: str | None = None, ctx: Context[ServerSession, SoftBVContext] | None = None, ) -> GenCubeResult: """ 执行 'softBV.x --gen-cube' 以计算势函数。这是一个耗时任务。 支持两种调用模式: 1. 默认模式 (GenCubeDefaultArgs): 仅提供 CIF、离子类型和氧化态,其他参数使用程序默认值。 2. 全参数模式 (GenCubeFullArgs): 提供所有可配置参数。 该工具会定期报告心跳以防超时,并在任务完成后返回详细的结构化结果。 """ if ctx is None: raise ValueError("Context is required for this operation.") app_ctx = ctx.request_context.lifespan_context conn = app_ctx.ssh_connection workdir = cwd or app_ctx.workdir # --- 1. 构造命令 --- input_abs_path = args.input_cif if not input_abs_path.startswith("/"): input_abs_path = posixpath.normpath(posixpath.join(workdir, args.input_cif)) cmd_parts = [ shell_quote(app_ctx.bin_path), shell_quote("--gen-cube"), shell_quote(input_abs_path), shell_quote(args.type), shell_quote(str(args.os)), ] # 根据传入的参数类型,决定是否添加额外参数 if isinstance(args, GenCubeFullArgs): cmd_parts.extend([ shell_quote(str(args.sf)), shell_quote(str(args.resolution)), ]) # 处理布尔标志 if args.ignore_conducting_ion: cmd_parts.append(shell_quote("t")) # 't' for true else: cmd_parts.append(shell_quote("f")) # 'f' for false if args.periodic: cmd_parts.append(shell_quote("t")) else: cmd_parts.append(shell_quote("f")) if args.output_name: cmd_parts.append(shell_quote(args.output_name)) # --- 2. 准备长时任务 --- import time log_name = f"softbv_gencube_{int(time.time())}.log" log_path = posixpath.join(workdir, log_name) # 将标准输出和错误都重定向到日志文件 final_cmd = " ".join(cmd_parts) + f" > {shell_quote(log_path)} 2>&1" await ctx.info(f"启动长任务: {final_cmd} (工作目录: {workdir})") files_before = await _listdir_safe(conn, workdir) proc = await _run_in_softbv_env_stream(conn, app_ctx.profile, cmd=final_cmd, cwd=workdir) start_time = time.monotonic() # --- 3. 心跳循环,防止超时 --- try: while proc.exit_status is None: # 每10秒报告一次进度,这本身就是心跳 [1, 6] await asyncio.sleep(10) elapsed = int(time.monotonic() - start_time) log_size = await _stat_size_safe(conn, log_path) await ctx.report_progress( progress=min(elapsed / (30 * 60), 0.99), # 以30分钟为基准的近似进度 message=f"计算进行中... 已用时 {elapsed} 秒, 日志大小: {log_size or 0} 字节。" ) finally: # 确保进程结束 await proc.wait() # --- 4. 收集并返回结果 --- elapsed_seconds = int(time.monotonic() - start_time) files_after = await _listdir_safe(conn, workdir) new_files = sorted(set(files_after) - set(files_before)) # 读取日志文件的头和尾 log_head, log_tail = "", "" try: async with conn.start_sftp_client() as sftp: async with sftp.open(log_path, "r", encoding="utf-8", errors="replace") as f: lines = await f.readlines() log_head = "".join(lines[:40]) log_tail = "".join(lines[-40:]) except Exception as e: await ctx.warning(f"读取日志文件 '{log_path}' 失败: {e}") result = GenCubeResult( command_used=final_cmd, working_directory=workdir, exit_status=proc.exit_status, log_file=log_path, output_head=log_head, output_tail=log_tail, new_files=new_files, elapsed_seconds=elapsed_seconds, ) if result.exit_status == 0: await ctx.info(f"gen-cube 任务成功完成, 耗时 {elapsed_seconds} 秒。") else: await ctx.error(f"gen-cube 任务失败, 退出码: {result.exit_status}。请检查日志: {log_path}") return result # 在 softbv_mcp_refactored.py 中,紧跟在 softbv_gen_cube 之后添加 @mcp.tool() async def softbv_calculate_bv( cif_path: str, cwd: str | None = None, ctx: Context[ServerSession, SoftBVContext] | None = None, ) -> CalBVResult: """ 执行 'softBV.x --cal-bv',计算并返回键价和 (Bond Valence Sums)。 Args: cif_path (str): 远程服务器上的 CIF 文件路径(可以是相对或绝对路径)。 cwd (str | None): 远程工作目录。如果未提供,则使用默认沙箱目录。 ctx: MCP 上下文,由框架自动注入。 Returns: CalBVResult: 一个包含解析后的键价和、全局不稳定性指数以及原始输出的结构化对象。 """ if ctx is None: raise ValueError("Context is required for this operation.") app_ctx = ctx.request_context.lifespan_context conn = app_ctx.ssh_connection workdir = cwd or app_ctx.workdir # 构建路径和命令 input_abs_path = cif_path if not input_abs_path.startswith("/"): input_abs_path = posixpath.normpath(posixpath.join(workdir, cif_path)) cmd = f"{shell_quote(app_ctx.bin_path)} --cal-bv {shell_quote(input_abs_path)}" await ctx.info(f"执行命令: {cmd} (工作目录: {workdir})") # 执行命令 proc = await run_in_softbv_env(conn, app_ctx.profile, cmd=cmd, cwd=workdir, check=False) if proc.exit_status != 0: await ctx.warning(f"命令执行失败,退出码: {proc.exit_status}") # 即使失败,也返回一个包含错误信息的结构 return CalBVResult( global_instability_index=None, suggested_stability=None, bv_sums=[], bv_pairs=[], raw_output=f"Exit Status: {proc.exit_status}\n\nSTDOUT:\n{proc.stdout}\n\nSTDERR:\n{proc.stderr}" ) # 解析成功的输出并返回结构化数据 parsed_result = _parse_cal_bv_output(proc.stdout) return parsed_result # 在 softbv_mcp_refactored.py 中,紧跟在 softbv_calculate_bv 之后添加 @mcp.tool() async def softbv_print_cube( cube_path: str, periodic: bool = True, cwd: str | None = None, ctx: Context[ServerSession, SoftBVContext] | None = None, ) -> PrintCubeResult: """ 执行 'softBV.x --print-cube',读取并解析一个 .cube 文件。 Args: cube_path (str): 远程服务器上的 .cube 文件路径。 periodic (bool): 是否将 cube 视为周期性的。默认为 True。 cwd (str | None): 远程工作目录。 ctx: MCP 上下文,由框架自动注入。 Returns: PrintCubeResult: 一个包含从 .cube 文件中解析出的详细信息的结构化对象。 """ if ctx is None: raise ValueError("Context is required for this operation.") app_ctx = ctx.request_context.lifespan_context conn = app_ctx.ssh_connection workdir = cwd or app_ctx.workdir # 构建路径和命令 input_abs_path = cube_path if not input_abs_path.startswith("/"): input_abs_path = posixpath.normpath(posixpath.join(workdir, cube_path)) cmd_parts = [ shell_quote(app_ctx.bin_path), shell_quote("--print-cube"), shell_quote(input_abs_path) ] # 添加可选的 periodic 标志 periodic_flag = "t" if periodic else "f" cmd_parts.append(shell_quote(periodic_flag)) cmd = " ".join(cmd_parts) await ctx.info(f"执行命令: {cmd} (工作目录: {workdir})") # 执行命令 proc = await run_in_softbv_env(conn, app_ctx.profile, cmd=cmd, cwd=workdir, check=False) if proc.exit_status != 0 or not proc.stdout: await ctx.error(f"命令执行失败,退出码: {proc.exit_status}") raise ValueError(f"Failed to print cube file. Exit Status: {proc.exit_status}\nSTDERR: {proc.stderr}") try: # 解析成功的输出并返回结构化数据 parsed_result = _parse_print_cube_output(proc.stdout) return parsed_result except Exception as e: await ctx.error(f"解析 '--print-cube' 输出失败: {e}\n原始输出:\n{proc.stdout}") raise ValueError(f"Failed to parse output: {e}") # 在 softbv_mcp_refactored.py 的工具区域添加这三个函数 @mcp.tool() async def softbv_calculate_gii( cif_path: str, cwd: str | None = None, ctx: Context[ServerSession, SoftBVContext] | None = None, ) -> CalGIIResult: """ 执行 'softBV.x --cal-gii',计算并返回全局不稳定性指数 (GII)。 """ if ctx is None: raise ValueError("Context is required.") app_ctx = ctx.request_context.lifespan_context conn = app_ctx.ssh_connection workdir = cwd or app_ctx.workdir input_abs_path = cif_path if not input_abs_path.startswith("/"): input_abs_path = posixpath.normpath(posixpath.join(workdir, cif_path)) cmd = f"{shell_quote(app_ctx.bin_path)} --cal-gii {shell_quote(input_abs_path)}" await ctx.info(f"Executing: {cmd}") proc = await run_in_softbv_env(conn, app_ctx.profile, cmd=cmd, cwd=workdir, check=False) if proc.exit_status != 0: await ctx.warning(f"Command failed with exit code {proc.exit_status}") return CalGIIResult(raw_output=f"Exit Status: {proc.exit_status}\n\nSTDERR:\n{proc.stderr}") return _parse_cal_gii_output(proc.stdout) @mcp.tool() async def softbv_calculate_cn( cif_path: str, atom_name: str, rds_cutoff: float | None = None, cwd: str | None = None, ctx: Context[ServerSession, SoftBVContext] | None = None, ) -> CalCNResult: """ 执行 'softBV.x --cal-cn',计算指定原子的配位数。 """ if ctx is None: raise ValueError("Context is required.") app_ctx = ctx.request_context.lifespan_context conn = app_ctx.ssh_connection workdir = cwd or app_ctx.workdir input_abs_path = cif_path if not input_abs_path.startswith("/"): input_abs_path = posixpath.normpath(posixpath.join(workdir, cif_path)) cmd_parts = [ shell_quote(app_ctx.bin_path), shell_quote("--cal-cn"), shell_quote(input_abs_path), shell_quote(atom_name) ] if rds_cutoff is not None: cmd_parts.append(shell_quote(str(rds_cutoff))) cmd = " ".join(cmd_parts) await ctx.info(f"Executing: {cmd}") proc = await run_in_softbv_env(conn, app_ctx.profile, cmd=cmd, cwd=workdir, check=False) if proc.exit_status != 0: await ctx.warning(f"Command failed with exit code {proc.exit_status}") return CalCNResult(raw_output=f"Exit Status: {proc.exit_status}\n\nSTDERR:\n{proc.stderr}", coordination_details=[]) return _parse_cal_cn_output(proc.stdout) @mcp.tool() async def softbv_calculate_sym( cif_path: str, tolerance: float | None = None, cwd: str | None = None, ctx: Context[ServerSession, SoftBVContext] | None = None, ) -> CalSymResult: """ 执行 'softBV.x --cal-sym',找出结构的空间群。 """ if ctx is None: raise ValueError("Context is required.") app_ctx = ctx.request_context.lifespan_context conn = app_ctx.ssh_connection workdir = cwd or app_ctx.workdir input_abs_path = cif_path if not input_abs_path.startswith("/"): input_abs_path = posixpath.normpath(posixpath.join(workdir, cif_path)) cmd_parts = [ shell_quote(app_ctx.bin_path), shell_quote("--cal-sym"), shell_quote(input_abs_path) ] if tolerance is not None: cmd_parts.append(shell_quote(str(tolerance))) cmd = " ".join(cmd_parts) await ctx.info(f"Executing: {cmd}") proc = await run_in_softbv_env(conn, app_ctx.profile, cmd=cmd, cwd=workdir, check=False) if proc.exit_status != 0: await ctx.warning(f"Command failed with exit code {proc.exit_status}") return CalSymResult(raw_output=f"Exit Status: {proc.exit_status}\n\nSTDERR:\n{proc.stderr}", fits_to_symmetries=[]) return _parse_cal_sym_output(proc.stdout) @mcp.tool() async def softbv_calculate_total_energy( cif_path: str, sf: float | None = None, cwd: str | None = None, ctx: Context[ServerSession, SoftBVContext] | None = None, ) -> CalTotEnResult: """ 执行 'softBV.x --cal-tot-en',计算并返回总能量。 Args: cif_path (str): 远程 CIF 文件路径。 sf (float | None): 可选的 screening factor。 cwd (str | None): 远程工作目录。 ctx: MCP 上下文,由框架自动注入。 Returns: CalTotEnResult: 包含总能量和所用 sf 值的结构化结果。 """ if ctx is None: raise ValueError("Context is required for this operation.") app_ctx = ctx.request_context.lifespan_context conn = app_ctx.ssh_connection workdir = cwd or app_ctx.workdir # 构建路径和命令 input_abs_path = cif_path if not input_abs_path.startswith("/"): input_abs_path = posixpath.normpath(posixpath.join(workdir, cif_path)) cmd_parts = [ shell_quote(app_ctx.bin_path), shell_quote("--cal-tot-en"), shell_quote(input_abs_path) ] if sf is not None: cmd_parts.append(shell_quote(str(sf))) cmd = " ".join(cmd_parts) await ctx.info(f"执行命令: {cmd}") # 执行命令 proc = await run_in_softbv_env(conn, app_ctx.profile, cmd=cmd, cwd=workdir, check=False) if proc.exit_status != 0: await ctx.warning(f"命令执行失败,退出码: {proc.exit_status}") return CalTotEnResult( raw_output=f"Exit Status: {proc.exit_status}\n\nSTDOUT:\n{proc.stdout}\n\nSTDERR:\n{proc.stderr}" ) # 解析并返回结果 return _parse_cal_tot_en_output(proc.stdout) # ========= 6. 工厂函数与主程序入口 ========= def create_softbv_mcp() -> FastMCP: """供外部(如 Starlette)导入的工厂函数。""" return mcp if __name__ == "__main__": """ 如果直接运行此文件,则启动一个独立的 MCP 服务器。 """ print("正在以独立模式启动 softBV MCP 服务器...") mcp.run(transport="streamable-http") # 启动后,您可以使用 `mcp dev` 或其他客户端工具连接到 http://localhost:8000