diff --git a/.idea/misc.xml b/.idea/misc.xml index 419adf3..06ede0d 100644 --- a/.idea/misc.xml +++ b/.idea/misc.xml @@ -3,5 +3,5 @@ - + \ No newline at end of file diff --git a/.idea/solidstate-tools.iml b/.idea/solidstate-tools.iml index 8073316..909438d 100644 --- a/.idea/solidstate-tools.iml +++ b/.idea/solidstate-tools.iml @@ -2,7 +2,7 @@ - + \ No newline at end of file diff --git a/mcp/softBV_reramke.py b/mcp/softBV_reramke.py new file mode 100644 index 0000000..902dfbb --- /dev/null +++ b/mcp/softBV_reramke.py @@ -0,0 +1,1160 @@ +# softbv_mcp_refactored.py +# +# 这是 softBV 工具集的重构版本。 +# +# - 核心框架: 保持了原有的 SSH 生命周期管理和远程环境激活机制 [6]。 +# - 重构起点: 暂时移除了旧的 softbv_info, softbv_md, softbv_gen_cube 函数。 +# - 新增工具: 包含一个新的、功能单一的 `softbv_print_cell` 工具,用于执行 `softBV.x --print-cell` [6]。 +# +# 依赖: +# pip install "mcp[cli]" asyncssh pydantic +# +# 用法: +# 1. 直接运行进行测试: `python softbv_mcp_refactored.py` +# 2. 或集成到 Starlette (main.py): +# from softbv_mcp_refactored import create_softbv_mcp +# softbv_mcp = create_softbv_mcp() +# Mount("/softbv-new", app=softbv_mcp.streamable_http_app()) + +import os +import posixpath +import asyncio +from dataclasses import dataclass +from typing import Any +from collections.abc import AsyncIterator +from contextlib import asynccontextmanager + +import asyncssh +from pydantic import BaseModel, Field + +from mcp.server.fastmcp import FastMCP, Context +from mcp.server.session import ServerSession + +import re # 确保导入 re 模块 + +# ========= 1. 配置与常量 ========= +# 这些配置与您之前的脚本保持一致,可以从环境变量或硬编码中获取。 + +# 从 lifespan.py 或其他配置文件导入,如果它们存在的话 +try: + from lifespan import REMOTE_HOST, REMOTE_USER, PRIVATE_KEY_PATH +except ImportError: + # 如果找不到,则使用默认值 + REMOTE_HOST = os.getenv("REMOTE_HOST", "202.121.182.208") + REMOTE_USER = os.getenv("REMOTE_USER", "koko125") + PRIVATE_KEY_PATH = os.getenv("PRIVATE_KEY_PATH", "D:/tool/tool/id_rsa.txt") + +# softBV 环境相关的固定路径 +SOFTBV_PROFILE = os.getenv("SOFTBV_PROFILE", "/cluster/home/koko125/script/softBV.sh") +SOFTBV_BIN = os.getenv("SOFTBV_BIN", "/cluster/home/koko125/tool/softBV-GUI_linux/bin/softBV.x") +DEFAULT_WORKDIR = os.getenv("DEFAULT_WORKDIR", f"/cluster/home/{REMOTE_USER}/sandbox") + +class PPTUnitaryItem(BaseModel): + """PROPERTY_UNITARY 表中的一行数据""" + type: str + an: int = Field(description="Atomic Number") + softness: float + rc: float = Field(description="Cutoff Radius") + q: float = Field(description="Charge") + +class PPTBinaryItem(BaseModel): + """PROPERTY_BINARY 表中的一行数据""" + cation: str + anion: str + D_0: float + R_min: float + b: float + R_0: float + R_cutoff: float + +class PPTResult(BaseModel): + """'--print-ppt' 命令的完整结构化解析结果""" + screening_factor: float | None = Field(description="最终计算出的 screening factor") + unitary_properties: list[PPTUnitaryItem] = Field(description="一元属性列表") + binary_properties: list[PPTBinaryItem] = Field(description="二元属性列表") + raw_output: str = Field(description="命令的完整原始标准输出") + + +class GenCubeDefaultArgs(BaseModel): + """用于'--gen-cube'的默认参数模式""" + input_cif: str = Field(description="远程 CIF 文件路径") + type: str = Field(description="导电离子类型 (例如 'Li')") + os: int = Field(description="导电离子氧化态 (例如 1)") + +class GenCubeFullArgs(BaseModel): + """用于'--gen-cube'的全参数模式""" + input_cif: str = Field(description="远程 CIF 文件路径") + type: str = Field(description="导电离子类型") + os: int = Field(description="导电离子氧化态") + sf: float = Field(description="自定义 screening factor (非正值则自动计算)") + resolution: float = Field(description="体素分辨率") + ignore_conducting_ion: bool = Field(description="是否在计算中忽略导电离子") + periodic: bool = Field(description="是否使用周期性边界条件") + output_name: str | None = Field(default=None, description="输出文件的前缀 (可选)") + +class GenCubeResult(BaseModel): + """'--gen-cube' 命令的执行结果""" + command_used: str + working_directory: str + exit_status: int + log_file: str = Field(description="远程服务器上的完整日志文件路径") + output_head: str = Field(description="日志文件的前40行,用于快速预览") + output_tail: str = Field(description="日志文件的后40行,用于查看最终结果") + new_files: list[str] = Field(description="执行后新生成的文件列表") + elapsed_seconds: int = Field(description="任务总耗时(秒)") + + +class BVSumItem(BaseModel): + """每个离子的键价和 (Bond Valence Sum)""" + name: str = Field(description="离子名称") + type: str = Field(description="离子类型") + occ: float = Field(description="占有率 (occupancy)") + bv_sum: float = Field(description="键价和 (bond valence sum)") + +class BVPairItem(BaseModel): + """离子对之间的键价""" + name1: str + type1: str + occ1: float + name2: str + type2: str + occ2: float + bv: float = Field(description="键价 (bond valence)") + +class CalBVResult(BaseModel): + """'--cal-bv' 命令的完整结构化解析结果""" + global_instability_index: float | None = Field(None, description="全局不稳定性指数 (GII)") + suggested_stability: str | None = Field(None, description="建议的稳定性 (例如 'stable')") + bv_sums: list[BVSumItem] = Field(description="每个离子的键价和列表") + bv_pairs: list[BVPairItem] = Field(description="离子对之间的键价列表") + raw_output: str = Field(description="命令的完整原始标准输出") + + +class CubeAtomInfo(BaseModel): + """从 .cube 文件中解析出的单个原子信息""" + index: int + name: str | None = None + type: str + os: int = Field(description="Oxidation State") + x_f: tuple[float, float, float] = Field(description="Fractional coordinates") + occ: float = Field(description="Occupancy") + +class CubeCellParams(BaseModel): + """晶格常数""" + a: float + b: float + c: float + +class CubeCellAngles(BaseModel): + """晶格角度""" + alpha: float + beta: float + gamma: float + +class PrintCubeResult(BaseModel): + """'--print-cube' 命令的完整结构化解析结果""" + name: str = Field(description="Cube 文件名") + cell_vector_matrix: list[list[float]] = Field(description="晶胞矢量矩阵") + cell_parameters: CubeCellParams + cell_angles: CubeCellAngles + total_atoms: int + atoms: list[CubeAtomInfo] = Field(description="原子信息列表") + cube_size: tuple[int, int, int] + cube_cell_vector: list[list[float]] + cube_voxel_vector: list[list[float]] = Field(description="Voxel vector in Angstroms") + raw_output: str = Field(description="命令的完整原始标准输出") + +class CalGIIResult(BaseModel): + """'--cal-gii' 命令的结构化解析结果""" + global_instability_index: float | None = Field(None, description="全局不稳定性指数 (GII)") + suggested_stability: str | None = Field(None, description="建议的稳定性 (例如 'stable')") + raw_output: str = Field(description="命令的完整原始标准输出") + +class CoordinationItem(BaseModel): + """'--cal-cn' 输出表格中的一行""" + index: int + shell: int + coordinated_type: str + r: float = Field(description="距离 (Å)") + occ: float = Field(description="占有率") + n: float = Field(description="累计配位数") + rds: float + +class CalCNResult(BaseModel): + """'--cal-cn' 命令的结构化解析结果""" + center_atom: str | None = Field(None, description="中心原子名称") + coordination_number: float | None = Field(None, description="总配位数") + coordination_details: list[CoordinationItem] = Field(description="配位详细信息列表") + raw_output: str = Field(description="命令的完整原始标准输出") + +class CalSymResult(BaseModel): + """'--cal-sym' 命令的结构化解析结果""" + determined_space_group: str | None = Field(None, description="最终确定的空间群") + fits_to_symmetries: list[str] = Field(description="计算过程中拟合到的所有对称性列表") + raw_output: str = Field(description="命令的完整原始标准输出") + +class CalTotEnResult(BaseModel): + """'--cal-tot-en' 命令的结构化解析结果""" + total_energy_eV: float | None = Field(None, description="计算出的总能量 (eV)") + screening_factor_used: float | None = Field(None, description="计算中使用的 screening factor (sf)") + raw_output: str = Field(description="命令的完整原始标准输出") +# ========= 2. 辅助函数 ========= + +def shell_quote(arg: str) -> str: + """安全地将字符串作为单个 shell 参数 (POSIX)。""" + return "'" + str(arg).replace("'", "'\"'\"'") + "'" + + +async def run_in_softbv_env( + conn: asyncssh.SSHClientConnection, + profile_path: str, + cmd: str, + cwd: str | None = None, + check: bool = True, +) -> asyncssh.SSHCompletedProcess: + """ + 在远端 bash 会话中激活 softBV 环境并执行命令。 + """ + parts = [] + if cwd: + parts.append(f"cd {shell_quote(cwd)}") + # 激活环境脚本 + parts.append(f"source {shell_quote(profile_path)}") + # 执行主命令 + parts.append(cmd) + + composite_cmd = "; ".join(parts) + # 使用 bash -lc 确保登录环境和命令可以被正确解析 + full_command = f"bash -lc {shell_quote(composite_cmd)}" + + return await conn.run(full_command, check=check) + + + +def _parse_ppt_output(raw_text: str) -> PPTResult: + """ + 将 '--print-ppt' 的原始 stdout 解析为结构化的 PPTResult 模型。 + """ + lines = raw_text.splitlines() + + screening_factor = None + unitary_items = [] + binary_items = [] + + # 查找 screening factor + for line in reversed(lines): + if "screening factor =" in line: + try: + screening_factor = float(line.split("=")[-1].strip()) + break + except (ValueError, IndexError): + continue + + # 解析表格 + in_unitary = False + in_binary = False + for line in lines: + if "PPT: | type | an |" in line: + in_unitary = True + in_binary = False + continue + if "PPT: | cation | anion |" in line: + in_unitary = False + in_binary = True + continue + if "PPT:--" in line or "PPT:==" in line: + continue + + parts = [p.strip() for p in line.split("|") if p.strip()] + + if in_unitary and len(parts) == 5: + try: + unitary_items.append(PPTUnitaryItem( + type=parts[0], + an=int(parts[1]), + softness=float(parts[2]), + rc=float(parts[3]), + q=float(parts[4]) + )) + except (ValueError, IndexError): + continue + elif in_binary and len(parts) == 7: + try: + binary_items.append(PPTBinaryItem( + cation=parts[0], + anion=parts[1], + D_0=float(parts[2]), + R_min=float(parts[3]), + b=float(parts[4]), + R_0=float(parts[5]), + R_cutoff=float(parts[6]) + )) + except (ValueError, IndexError): + continue + + return PPTResult( + screening_factor=screening_factor, + unitary_properties=unitary_items, + binary_properties=binary_items, + raw_output=raw_text + ) + + +async def _run_in_softbv_env_stream( + conn: asyncssh.SSHClientConnection, + profile_path: str, + cmd: str, + cwd: str | None = None, +) -> asyncssh.SSHClientProcess: + """以非阻塞方式,在激活环境中启动一个命令,并返回进程句柄。""" + parts = [] + if cwd: + parts.append(f"cd {shell_quote(cwd)}") + # 将 source 的输出重定向,避免污染主命令的日志 + parts.append(f"source {shell_quote(profile_path)} >/dev/null 2>&1 || true") + parts.append(cmd) + + composite_cmd = "; ".join(parts) + full_command = f"bash -lc {shell_quote(composite_cmd)}" + + proc = await conn.create_process(full_command) + return proc + + +async def _listdir_safe(conn: asyncssh.SSHClientConnection, path: str) -> list[str]: + """安全地列出目录内容,失败时返回空列表。""" + try: + async with conn.start_sftp_client() as sftp: + return await sftp.listdir(path) + except Exception: + return [] + + +async def _stat_size_safe(conn: asyncssh.SSHClientConnection, path: str) -> int | None: + """安全地获取文件大小,失败时返回 None。""" + try: + async with conn.start_sftp_client() as sftp: + attrs = await sftp.stat(path) + return int(attrs.size or 0) + except Exception: + return None + + +def _parse_cal_bv_output(raw_text: str) -> CalBVResult: + """ + 将 '--cal-bv' 的原始 stdout 解析为结构化的 CalBVResult 模型。 + """ + lines = raw_text.splitlines() + + gii = None + stability = None + bv_sums = [] + bv_pairs = [] + + in_sum_section = False + in_pair_section = False + + # 解析 GII 和稳定性 + for line in lines: + if line.startswith("GII: Global instability index ="): + try: + gii = float(line.split("=")[-1].strip()) + except (ValueError, IndexError): + pass + elif line.startswith("GII: suggested stability:"): + try: + stability = line.split(":", 1)[-1].strip() + except IndexError: + pass + + # 解析表格 + for line in lines: + if line.startswith("BV: name type occ"): + in_sum_section = True + in_pair_section = False + continue + if line.startswith("BV: nam1 typ1 occ1"): + in_sum_section = False + in_pair_section = True + continue + if line.startswith("BV:="): + continue + + if not line.startswith("BV:"): + in_sum_section = False + in_pair_section = False + + # 使用正则表达式进行更稳健的解析,以处理可变宽度的列 + if in_sum_section: + match = re.match(r"BV:\s+(\S+)\s+(\S+)\s+([\d.]+)\s+([-\d.]+)", line) + if match: + try: + bv_sums.append(BVSumItem( + name=match.group(1), + type=match.group(2), + occ=float(match.group(3)), + bv_sum=float(match.group(4)) + )) + except (ValueError, IndexError): + continue + + elif in_pair_section: + # 这是一个更复杂的行,使用 shlex 更安全地分割 + try: + # 移除 "BV:" 前缀并分割 + parts = line[3:].strip().split() + if len(parts) == 7: + bv_pairs.append(BVPairItem( + name1=parts[0], + type1=parts[1], + occ1=float(parts[2]), + name2=parts[3], + type2=parts[4], + occ2=float(parts[5]), + bv=float(parts[6]) + )) + except (ValueError, IndexError): + continue + + return CalBVResult( + global_instability_index=gii, + suggested_stability=stability, + bv_sums=bv_sums, + bv_pairs=bv_pairs, + raw_output=raw_text + ) + + +def _parse_print_cube_output(raw_text: str) -> PrintCubeResult: + """ + 将 '--print-cube' 的原始 stdout 解析为结构化的 PrintCubeResult 模型。 + """ + lines = raw_text.splitlines() + + # Helper to parse vector blocks + def parse_vector_block(start_line_idx: int) -> list[list[float]]: + matrix = [] + for i in range(3): + line = lines[start_line_idx + i] + parts = [float(p) for p in line.strip().split()] + matrix.append(parts) + return matrix + + # Find key lines and parse data + name = re.search(r"CELL: name: (.*)", raw_text).group(1).strip() + total_atoms = int(re.search(r"CELL: total atom: (\d+)", raw_text).group(1)) + + params_line = re.search(r"CELL: parameters:\s+([\d.\s]+)", raw_text).group(1) + a, b, c = map(float, params_line.split()) + cell_params = CubeCellParams(a=a, b=b, c=c) + + angles_line = re.search(r"CELL: angles:\s+([\d.\s]+)", raw_text).group(1) + alpha, beta, gamma = map(float, angles_line.split()) + cell_angles = CubeCellAngles(alpha=alpha, beta=beta, gamma=gamma) + + cube_size_line = re.search(r"CUBE: cube size = (\d+) \* (\d+) \* (\d+)", raw_text).group(1, 2, 3) + cube_size = tuple(map(int, cube_size_line)) + + atoms = [] + cell_matrix, cube_cell_vec, cube_voxel_vec = [], [], [] + + for i, line in enumerate(lines): + if "CELL: vector matrix:" in line: + cell_matrix = parse_vector_block(i + 1) + elif "CUBE: cell vector (A):" in line: + cube_cell_vec = parse_vector_block(i + 1) + elif "CUBE: votex vector (A):" in line: # Note: 'votex' is a typo in the original output + cube_voxel_vec = parse_vector_block(i + 1) + elif "ATOM:" in line: + atom_match = re.search( + r"\[\s*(\d+)\]ATOM: name=\s*([^;]*);type=([^;]*);os=\s*(-?\d+);x_f=\(([^)]*)\);occ=([\d.]+)", line) + if atom_match: + idx, atom_name, atom_type, os_val, xf_str, occ_val = atom_match.groups() + xf_coords = tuple(map(float, xf_str.split(','))) + atoms.append(CubeAtomInfo( + index=int(idx), + name=atom_name.strip() or None, + type=atom_type.strip(), + os=int(os_val), + x_f=xf_coords, + occ=float(occ_val) + )) + + return PrintCubeResult( + name=name, + cell_vector_matrix=cell_matrix, + cell_parameters=cell_params, + cell_angles=cell_angles, + total_atoms=total_atoms, + atoms=atoms, + cube_size=cube_size, + cube_cell_vector=cube_cell_vec, + cube_voxel_vector=cube_voxel_vec, + raw_output=raw_text + ) + + +def _parse_cal_gii_output(raw_text: str) -> CalGIIResult: + """解析 '--cal-gii' 的输出。""" + gii, stability = None, None + for line in raw_text.splitlines(): + if line.startswith("GII: Global instability index ="): + try: + gii = float(line.split("=")[-1].strip()) + except (ValueError, IndexError): + pass + elif line.startswith("GII: suggested stability:"): + try: + stability = line.split(":", 1)[-1].strip().rstrip('.') + except IndexError: + pass + return CalGIIResult(global_instability_index=gii, suggested_stability=stability, raw_output=raw_text) + + +def _parse_cal_cn_output(raw_text: str) -> CalCNResult: + """解析 '--cal-cn' 的输出。""" + center_atom, cn = None, None + details = [] + in_table = False + for line in raw_text.splitlines(): + if line.startswith("CN: Center atom:"): + center_atom = line.split(":", 1)[-1].strip() + elif line.startswith("CN: Coordination Number:"): + try: + cn = float(line.split(":", 1)[-1].strip()) + except (ValueError, IndexError): + pass + elif "CN: Index Shell" in line: + in_table = True + continue + + if in_table and line.startswith("CN:"): + try: + parts = line[3:].strip().split() + if len(parts) == 7: + details.append(CoordinationItem( + index=int(parts[0]), shell=int(parts[1]), coordinated_type=parts[2], + r=float(parts[3]), occ=float(parts[4]), n=float(parts[5]), rds=float(parts[6]) + )) + except (ValueError, IndexError): + pass + + return CalCNResult(center_atom=center_atom, coordination_number=cn, coordination_details=details, + raw_output=raw_text) + + +def _parse_cal_sym_output(raw_text: str) -> CalSymResult: + """解析 '--cal-sym' 的输出。""" + determined_sg = None + fits = [] + for line in raw_text.splitlines(): + if line.startswith("Message: structure fits to symmetry"): + fits.append(line.split("symmetry")[-1].strip().rstrip('.')) + elif line.startswith("Message: determined space group:"): + determined_sg = line.split(":", 1)[-1].strip() + + return CalSymResult(determined_space_group=determined_sg, fits_to_symmetries=fits, raw_output=raw_text) + +def _parse_cal_tot_en_output(raw_text: str) -> CalTotEnResult: + """解析 '--cal-tot-en' 的输出。""" + total_energy, sf = None, None + for line in raw_text.splitlines(): + if "total energy =" in line: + try: + # 提取能量值 + energy_str = line.split("=")[-1].strip().split()[0] + total_energy = float(energy_str) + except (ValueError, IndexError): + pass + elif line.startswith("Message: sf="): + try: + # 提取使用的 sf 值 + sf = float(line.split("=")[-1].strip()) + except (ValueError, IndexError): + pass + return CalTotEnResult( + total_energy_eV=total_energy, + screening_factor_used=sf, + raw_output=raw_text + ) + + +# ========= 3. 生命周期管理 ========= + +@dataclass +class SoftBVContext: + """在服务器生命周期内共享的上下文对象。""" + ssh_connection: asyncssh.SSHClientConnection + workdir: str + profile: str + bin_path: str + + +@asynccontextmanager +async def softbv_lifespan(_server: FastMCP) -> AsyncIterator[SoftBVContext]: + """ + FastMCP 生命周期管理器:在服务器启动时建立 SSH 连接,在关闭时安全断开。 + 这个函数创建的 SoftBVContext 将被注入到所有工具中 [1, 6]。 + """ + conn: asyncssh.SSHClientConnection | None = None + try: + await asyncio.sleep(0) # 确保事件循环已启动 + print(f"正在连接到 {REMOTE_USER}@{REMOTE_HOST}...") + conn = await asyncssh.connect( + REMOTE_HOST, + username=REMOTE_USER, + client_keys=[PRIVATE_KEY_PATH], + known_hosts=None, + connect_timeout=15, + ) + print("SSH 连接成功!") + # `yield` 将上下文提供给 MCP 服务器 + yield SoftBVContext( + ssh_connection=conn, + workdir=DEFAULT_WORKDIR, + profile=SOFTBV_PROFILE, + bin_path=SOFTBV_BIN, + ) + finally: + # 服务器关闭时,清理资源 + if conn: + conn.close() + await conn.wait_closed() + print("SSH 连接已关闭。") + + +# ========= 4. MCP 服务器实例 ========= + +mcp = FastMCP( + name="softBV Tools (Refactored)", + instructions="用于执行 softBV 相关计算的工具集(重构版)。", + lifespan=softbv_lifespan, + streamable_http_path="/", # 方便挂载到 Starlette [1] +) + + +# ========= 5. 工具定义 ========= +# +# 原有的 softbv_info, softbv_md, softbv_gen_cube 函数已移除, +# 等待您完成重构后再添加。 +# + +@mcp.tool() +async def softbv_print_cell( + cif_path: str, + cwd: str | None = None, + ctx: Context[ServerSession, SoftBVContext] | None = None, +) -> dict[str, Any]: + """ + 执行 'softBV.x --print-cell' 命令,打印并返回晶胞信息。 + + Args: + cif_path (str): 远程服务器上的 CIF 文件路径(可以是相对或绝对路径)。 + cwd (str | None): 远程工作目录。如果未提供,则使用默认沙箱目录。 + ctx: MCP 上下文,由框架自动注入。 + + Returns: + dict: 一个包含命令执行结果的字典,包括退出码、标准输出和标准错误。 + """ + if ctx is None: + raise ValueError("Context is required for this operation.") + + app_ctx = ctx.request_context.lifespan_context + conn = app_ctx.ssh_connection + workdir = cwd or app_ctx.workdir + + # 如果 cif_path 不是绝对路径,则基于工作目录构建完整路径 + input_abs_path = cif_path + if not input_abs_path.startswith("/"): + input_abs_path = posixpath.normpath(posixpath.join(workdir, cif_path)) + + # 构造命令 + cmd = f"{shell_quote(app_ctx.bin_path)} --print-cell {shell_quote(input_abs_path)}" + + await ctx.info(f"执行命令: {cmd} (工作目录: {workdir})") + + # 在激活的环境中运行命令,不要求必须成功 (check=False) + proc = await run_in_softbv_env(conn, app_ctx.profile, cmd=cmd, cwd=workdir, check=False) + + if proc.exit_status != 0: + await ctx.warning(f"命令执行失败,退出码: {proc.exit_status}") + + # 返回结构化输出,便于 AI 或客户端解析 [1, 6] + return { + "command_used": cmd, + "working_directory": workdir, + "exit_status": proc.exit_status, + "stdout": proc.stdout, + "stderr": proc.stderr, + } + + +@mcp.tool() +async def softbv_print_ppt( + cif_path: str, + cwd: str | None = None, + ctx: Context[ServerSession, SoftBVContext] | None = None, +) -> PPTResult: + """ + 执行 'softBV.x --print-ppt',计算并返回配对势参数 (Pair-Potential Terms)。 + + Args: + cif_path (str): 远程服务器上的 CIF 文件路径(可以是相对或绝对路径)。 + cwd (str | None): 远程工作目录。如果未提供,则使用默认沙箱目录。 + ctx: MCP 上下文,由框架自动注入。 + + Returns: + PPTResult: 一个包含解析后的配对势参数和原始输出的结构化对象。 + """ + if ctx is None: + raise ValueError("Context is required for this operation.") + + app_ctx = ctx.request_context.lifespan_context + conn = app_ctx.ssh_connection + workdir = cwd or app_ctx.workdir + + # 构建路径和命令 + input_abs_path = cif_path + if not input_abs_path.startswith("/"): + input_abs_path = posixpath.normpath(posixpath.join(workdir, cif_path)) + + cmd = f"{shell_quote(app_ctx.bin_path)} --print-ppt {shell_quote(input_abs_path)}" + + await ctx.info(f"执行命令: {cmd} (工作目录: {workdir})") + + # 执行命令 + proc = await run_in_softbv_env(conn, app_ctx.profile, cmd=cmd, cwd=workdir, check=False) + + if proc.exit_status != 0: + # 即使失败,也尝试解析可能的错误信息 + await ctx.warning(f"命令执行失败,退出码: {proc.exit_status}") + # 在失败时返回一个包含原始输出的空结果,AI 可以从中读取错误 + return PPTResult( + screening_factor=None, + unitary_properties=[], + binary_properties=[], + raw_output=f"Exit Status: {proc.exit_status}\n\nSTDOUT:\n{proc.stdout}\n\nSTDERR:\n{proc.stderr}" + ) + + # 解析成功的输出并返回结构化数据 + parsed_result = _parse_ppt_output(proc.stdout) + return parsed_result + + +# 在 softbv_mcp_refactored.py 的工具定义区域添加 + +@mcp.tool(name="softbv_gen_cube") +async def softbv_gen_cube( + args: GenCubeDefaultArgs | GenCubeFullArgs, + cwd: str | None = None, + ctx: Context[ServerSession, SoftBVContext] | None = None, +) -> GenCubeResult: + """ + 执行 'softBV.x --gen-cube' 以计算势函数。这是一个耗时任务。 + + 支持两种调用模式: + 1. 默认模式 (GenCubeDefaultArgs): 仅提供 CIF、离子类型和氧化态,其他参数使用程序默认值。 + 2. 全参数模式 (GenCubeFullArgs): 提供所有可配置参数。 + + 该工具会定期报告心跳以防超时,并在任务完成后返回详细的结构化结果。 + """ + if ctx is None: + raise ValueError("Context is required for this operation.") + + app_ctx = ctx.request_context.lifespan_context + conn = app_ctx.ssh_connection + workdir = cwd or app_ctx.workdir + + # --- 1. 构造命令 --- + input_abs_path = args.input_cif + if not input_abs_path.startswith("/"): + input_abs_path = posixpath.normpath(posixpath.join(workdir, args.input_cif)) + + cmd_parts = [ + shell_quote(app_ctx.bin_path), + shell_quote("--gen-cube"), + shell_quote(input_abs_path), + shell_quote(args.type), + shell_quote(str(args.os)), + ] + + # 根据传入的参数类型,决定是否添加额外参数 + if isinstance(args, GenCubeFullArgs): + cmd_parts.extend([ + shell_quote(str(args.sf)), + shell_quote(str(args.resolution)), + ]) + # 处理布尔标志 + if args.ignore_conducting_ion: + cmd_parts.append(shell_quote("t")) # 't' for true + else: + cmd_parts.append(shell_quote("f")) # 'f' for false + if args.periodic: + cmd_parts.append(shell_quote("t")) + else: + cmd_parts.append(shell_quote("f")) + + if args.output_name: + cmd_parts.append(shell_quote(args.output_name)) + + # --- 2. 准备长时任务 --- + import time + log_name = f"softbv_gencube_{int(time.time())}.log" + log_path = posixpath.join(workdir, log_name) + + # 将标准输出和错误都重定向到日志文件 + final_cmd = " ".join(cmd_parts) + f" > {shell_quote(log_path)} 2>&1" + + await ctx.info(f"启动长任务: {final_cmd} (工作目录: {workdir})") + + files_before = await _listdir_safe(conn, workdir) + proc = await _run_in_softbv_env_stream(conn, app_ctx.profile, cmd=final_cmd, cwd=workdir) + start_time = time.monotonic() + + # --- 3. 心跳循环,防止超时 --- + try: + while proc.exit_status is None: + # 每10秒报告一次进度,这本身就是心跳 [1, 6] + await asyncio.sleep(10) + + elapsed = int(time.monotonic() - start_time) + log_size = await _stat_size_safe(conn, log_path) + + await ctx.report_progress( + progress=min(elapsed / (30 * 60), 0.99), # 以30分钟为基准的近似进度 + message=f"计算进行中... 已用时 {elapsed} 秒, 日志大小: {log_size or 0} 字节。" + ) + finally: + # 确保进程结束 + await proc.wait() + + # --- 4. 收集并返回结果 --- + elapsed_seconds = int(time.monotonic() - start_time) + files_after = await _listdir_safe(conn, workdir) + new_files = sorted(set(files_after) - set(files_before)) + + # 读取日志文件的头和尾 + log_head, log_tail = "", "" + try: + async with conn.start_sftp_client() as sftp: + async with sftp.open(log_path, "r", encoding="utf-8", errors="replace") as f: + lines = await f.readlines() + log_head = "".join(lines[:40]) + log_tail = "".join(lines[-40:]) + except Exception as e: + await ctx.warning(f"读取日志文件 '{log_path}' 失败: {e}") + + result = GenCubeResult( + command_used=final_cmd, + working_directory=workdir, + exit_status=proc.exit_status, + log_file=log_path, + output_head=log_head, + output_tail=log_tail, + new_files=new_files, + elapsed_seconds=elapsed_seconds, + ) + + if result.exit_status == 0: + await ctx.info(f"gen-cube 任务成功完成, 耗时 {elapsed_seconds} 秒。") + else: + await ctx.error(f"gen-cube 任务失败, 退出码: {result.exit_status}。请检查日志: {log_path}") + + return result + + +# 在 softbv_mcp_refactored.py 中,紧跟在 softbv_gen_cube 之后添加 + +@mcp.tool() +async def softbv_calculate_bv( + cif_path: str, + cwd: str | None = None, + ctx: Context[ServerSession, SoftBVContext] | None = None, +) -> CalBVResult: + """ + 执行 'softBV.x --cal-bv',计算并返回键价和 (Bond Valence Sums)。 + + Args: + cif_path (str): 远程服务器上的 CIF 文件路径(可以是相对或绝对路径)。 + cwd (str | None): 远程工作目录。如果未提供,则使用默认沙箱目录。 + ctx: MCP 上下文,由框架自动注入。 + + Returns: + CalBVResult: 一个包含解析后的键价和、全局不稳定性指数以及原始输出的结构化对象。 + """ + if ctx is None: + raise ValueError("Context is required for this operation.") + + app_ctx = ctx.request_context.lifespan_context + conn = app_ctx.ssh_connection + workdir = cwd or app_ctx.workdir + + # 构建路径和命令 + input_abs_path = cif_path + if not input_abs_path.startswith("/"): + input_abs_path = posixpath.normpath(posixpath.join(workdir, cif_path)) + + cmd = f"{shell_quote(app_ctx.bin_path)} --cal-bv {shell_quote(input_abs_path)}" + + await ctx.info(f"执行命令: {cmd} (工作目录: {workdir})") + + # 执行命令 + proc = await run_in_softbv_env(conn, app_ctx.profile, cmd=cmd, cwd=workdir, check=False) + + if proc.exit_status != 0: + await ctx.warning(f"命令执行失败,退出码: {proc.exit_status}") + # 即使失败,也返回一个包含错误信息的结构 + return CalBVResult( + global_instability_index=None, + suggested_stability=None, + bv_sums=[], + bv_pairs=[], + raw_output=f"Exit Status: {proc.exit_status}\n\nSTDOUT:\n{proc.stdout}\n\nSTDERR:\n{proc.stderr}" + ) + + # 解析成功的输出并返回结构化数据 + parsed_result = _parse_cal_bv_output(proc.stdout) + return parsed_result + + +# 在 softbv_mcp_refactored.py 中,紧跟在 softbv_calculate_bv 之后添加 + +@mcp.tool() +async def softbv_print_cube( + cube_path: str, + periodic: bool = True, + cwd: str | None = None, + ctx: Context[ServerSession, SoftBVContext] | None = None, +) -> PrintCubeResult: + """ + 执行 'softBV.x --print-cube',读取并解析一个 .cube 文件。 + + Args: + cube_path (str): 远程服务器上的 .cube 文件路径。 + periodic (bool): 是否将 cube 视为周期性的。默认为 True。 + cwd (str | None): 远程工作目录。 + ctx: MCP 上下文,由框架自动注入。 + + Returns: + PrintCubeResult: 一个包含从 .cube 文件中解析出的详细信息的结构化对象。 + """ + if ctx is None: + raise ValueError("Context is required for this operation.") + + app_ctx = ctx.request_context.lifespan_context + conn = app_ctx.ssh_connection + workdir = cwd or app_ctx.workdir + + # 构建路径和命令 + input_abs_path = cube_path + if not input_abs_path.startswith("/"): + input_abs_path = posixpath.normpath(posixpath.join(workdir, cube_path)) + + cmd_parts = [ + shell_quote(app_ctx.bin_path), + shell_quote("--print-cube"), + shell_quote(input_abs_path) + ] + # 添加可选的 periodic 标志 + periodic_flag = "t" if periodic else "f" + cmd_parts.append(shell_quote(periodic_flag)) + + cmd = " ".join(cmd_parts) + + await ctx.info(f"执行命令: {cmd} (工作目录: {workdir})") + + # 执行命令 + proc = await run_in_softbv_env(conn, app_ctx.profile, cmd=cmd, cwd=workdir, check=False) + + if proc.exit_status != 0 or not proc.stdout: + await ctx.error(f"命令执行失败,退出码: {proc.exit_status}") + raise ValueError(f"Failed to print cube file. Exit Status: {proc.exit_status}\nSTDERR: {proc.stderr}") + + try: + # 解析成功的输出并返回结构化数据 + parsed_result = _parse_print_cube_output(proc.stdout) + return parsed_result + except Exception as e: + await ctx.error(f"解析 '--print-cube' 输出失败: {e}\n原始输出:\n{proc.stdout}") + raise ValueError(f"Failed to parse output: {e}") + + +# 在 softbv_mcp_refactored.py 的工具区域添加这三个函数 + +@mcp.tool() +async def softbv_calculate_gii( + cif_path: str, + cwd: str | None = None, + ctx: Context[ServerSession, SoftBVContext] | None = None, +) -> CalGIIResult: + """ + 执行 'softBV.x --cal-gii',计算并返回全局不稳定性指数 (GII)。 + """ + if ctx is None: raise ValueError("Context is required.") + app_ctx = ctx.request_context.lifespan_context + conn = app_ctx.ssh_connection + workdir = cwd or app_ctx.workdir + + input_abs_path = cif_path + if not input_abs_path.startswith("/"): + input_abs_path = posixpath.normpath(posixpath.join(workdir, cif_path)) + + cmd = f"{shell_quote(app_ctx.bin_path)} --cal-gii {shell_quote(input_abs_path)}" + await ctx.info(f"Executing: {cmd}") + proc = await run_in_softbv_env(conn, app_ctx.profile, cmd=cmd, cwd=workdir, check=False) + + if proc.exit_status != 0: + await ctx.warning(f"Command failed with exit code {proc.exit_status}") + return CalGIIResult(raw_output=f"Exit Status: {proc.exit_status}\n\nSTDERR:\n{proc.stderr}") + + return _parse_cal_gii_output(proc.stdout) + + +@mcp.tool() +async def softbv_calculate_cn( + cif_path: str, + atom_name: str, + rds_cutoff: float | None = None, + cwd: str | None = None, + ctx: Context[ServerSession, SoftBVContext] | None = None, +) -> CalCNResult: + """ + 执行 'softBV.x --cal-cn',计算指定原子的配位数。 + """ + if ctx is None: raise ValueError("Context is required.") + app_ctx = ctx.request_context.lifespan_context + conn = app_ctx.ssh_connection + workdir = cwd or app_ctx.workdir + + input_abs_path = cif_path + if not input_abs_path.startswith("/"): + input_abs_path = posixpath.normpath(posixpath.join(workdir, cif_path)) + + cmd_parts = [ + shell_quote(app_ctx.bin_path), + shell_quote("--cal-cn"), + shell_quote(input_abs_path), + shell_quote(atom_name) + ] + if rds_cutoff is not None: + cmd_parts.append(shell_quote(str(rds_cutoff))) + cmd = " ".join(cmd_parts) + + await ctx.info(f"Executing: {cmd}") + proc = await run_in_softbv_env(conn, app_ctx.profile, cmd=cmd, cwd=workdir, check=False) + + if proc.exit_status != 0: + await ctx.warning(f"Command failed with exit code {proc.exit_status}") + return CalCNResult(raw_output=f"Exit Status: {proc.exit_status}\n\nSTDERR:\n{proc.stderr}", + coordination_details=[]) + + return _parse_cal_cn_output(proc.stdout) + + +@mcp.tool() +async def softbv_calculate_sym( + cif_path: str, + tolerance: float | None = None, + cwd: str | None = None, + ctx: Context[ServerSession, SoftBVContext] | None = None, +) -> CalSymResult: + """ + 执行 'softBV.x --cal-sym',找出结构的空间群。 + """ + if ctx is None: raise ValueError("Context is required.") + app_ctx = ctx.request_context.lifespan_context + conn = app_ctx.ssh_connection + workdir = cwd or app_ctx.workdir + + input_abs_path = cif_path + if not input_abs_path.startswith("/"): + input_abs_path = posixpath.normpath(posixpath.join(workdir, cif_path)) + + cmd_parts = [ + shell_quote(app_ctx.bin_path), + shell_quote("--cal-sym"), + shell_quote(input_abs_path) + ] + if tolerance is not None: + cmd_parts.append(shell_quote(str(tolerance))) + cmd = " ".join(cmd_parts) + + await ctx.info(f"Executing: {cmd}") + proc = await run_in_softbv_env(conn, app_ctx.profile, cmd=cmd, cwd=workdir, check=False) + + if proc.exit_status != 0: + await ctx.warning(f"Command failed with exit code {proc.exit_status}") + return CalSymResult(raw_output=f"Exit Status: {proc.exit_status}\n\nSTDERR:\n{proc.stderr}", + fits_to_symmetries=[]) + + return _parse_cal_sym_output(proc.stdout) + + +@mcp.tool() +async def softbv_calculate_total_energy( + cif_path: str, + sf: float | None = None, + cwd: str | None = None, + ctx: Context[ServerSession, SoftBVContext] | None = None, +) -> CalTotEnResult: + """ + 执行 'softBV.x --cal-tot-en',计算并返回总能量。 + + Args: + cif_path (str): 远程 CIF 文件路径。 + sf (float | None): 可选的 screening factor。 + cwd (str | None): 远程工作目录。 + ctx: MCP 上下文,由框架自动注入。 + + Returns: + CalTotEnResult: 包含总能量和所用 sf 值的结构化结果。 + """ + if ctx is None: + raise ValueError("Context is required for this operation.") + + app_ctx = ctx.request_context.lifespan_context + conn = app_ctx.ssh_connection + workdir = cwd or app_ctx.workdir + + # 构建路径和命令 + input_abs_path = cif_path + if not input_abs_path.startswith("/"): + input_abs_path = posixpath.normpath(posixpath.join(workdir, cif_path)) + + cmd_parts = [ + shell_quote(app_ctx.bin_path), + shell_quote("--cal-tot-en"), + shell_quote(input_abs_path) + ] + if sf is not None: + cmd_parts.append(shell_quote(str(sf))) + cmd = " ".join(cmd_parts) + + await ctx.info(f"执行命令: {cmd}") + + # 执行命令 + proc = await run_in_softbv_env(conn, app_ctx.profile, cmd=cmd, cwd=workdir, check=False) + + if proc.exit_status != 0: + await ctx.warning(f"命令执行失败,退出码: {proc.exit_status}") + return CalTotEnResult( + raw_output=f"Exit Status: {proc.exit_status}\n\nSTDOUT:\n{proc.stdout}\n\nSTDERR:\n{proc.stderr}" + ) + + # 解析并返回结果 + return _parse_cal_tot_en_output(proc.stdout) + +# ========= 6. 工厂函数与主程序入口 ========= + +def create_softbv_mcp() -> FastMCP: + """供外部(如 Starlette)导入的工厂函数。""" + return mcp + + +if __name__ == "__main__": + """ + 如果直接运行此文件,则启动一个独立的 MCP 服务器。 + """ + print("正在以独立模式启动 softBV MCP 服务器...") + mcp.run(transport="streamable-http") + # 启动后,您可以使用 `mcp dev` 或其他客户端工具连接到 http://localhost:8000 \ No newline at end of file