# vasp_mcp.py import os import posixpath import warnings from dataclasses import dataclass from typing import Any, AsyncIterator from contextlib import asynccontextmanager import asyncssh from pymatgen.core import Structure from io import StringIO from mcp.server.fastmcp import FastMCP, Context from mcp.server.session import ServerSession from pymatgen.io.cif import CifParser # --- VASP 特定配置 --- REMOTE_HOST = os.getenv("REMOTE_HOST", '202.121.182.208') REMOTE_USER = os.getenv("REMOTE_USER", 'koko125') PRIVATE_KEY_PATH = os.getenv("PRIVATE_KEY_PATH", 'D:/tool/tool/id_rsa.txt') # VASP 赝势库和沙箱路径 POTCAR_BASE_PATH = "/cluster/home/koko125/tool/potcar_mcp" DEFAULT_SANDBOX = f"/cluster/home/{REMOTE_USER}/sandbox" VASP_ENV_SCRIPT = "/cluster/home/koko125/intel/oneapi/setvars.sh" VASP_MPI_RUN_CMD = "mpirun -np 4 /cluster/home/koko125/vasp/bin_cpu/vasp_std" def shell_quote(arg: str) -> str: """安全地引用 shell 参数""" return "'" + str(arg).replace("'", "'\"'\"'") + "'" # --- 定义共享上下文 --- @dataclass class VaspContext: ssh_connection: asyncssh.SSHClientConnection potcar_base: str sandbox_path: str env_script: str mpi_run_cmd: str # <-- 添加这一行 # --- 定义生命周期管理器 --- @asynccontextmanager async def vasp_lifespan(_server: FastMCP) -> AsyncIterator[VaspContext]: """建立 SSH 连接并注入 VASP 上下文。""" conn: asyncssh.SSHClientConnection | None = None try: conn = await asyncssh.connect( REMOTE_HOST, username=REMOTE_USER, client_keys=[PRIVATE_KEY_PATH], known_hosts=None ) yield VaspContext( ssh_connection=conn, potcar_base=POTCAR_BASE_PATH, sandbox_path=DEFAULT_SANDBOX, env_script=VASP_ENV_SCRIPT, mpi_run_cmd=VASP_MPI_RUN_CMD ) finally: if conn: conn.close() await conn.wait_closed() # --- VASP MCP 工厂函数 --- def create_vasp_mcp() -> FastMCP: """创建包含 VASP 辅助工具的 MCP 实例。""" mcp = FastMCP( name="VASP POTCAR Tools", instructions="用于查询和准备 VASP POTCAR 文件的专用工具集。", lifespan=vasp_lifespan, streamable_http_path="/", ) # 沿用 system_tools.py 中的安全路径拼接函数,确保目标路径安全 def _safe_join_sandbox(sandbox_root: str, relative_path: str) -> str: rel = (relative_path or ".").strip().lstrip("/") combined = posixpath.normpath(posixpath.join(sandbox_root, rel)) root_norm = sandbox_root.rstrip("/") if combined != root_norm and not combined.startswith(root_norm + "/"): raise ValueError("路径越界:目标路径必须在沙箱目录内") if ".." in combined.split("/"): raise ValueError("非法路径:不允许使用 '..'") return combined # --- 工具 1: 列出可用的赝势库类型 --- @mcp.tool() async def list_potcar_types(ctx: Context[ServerSession, VaspContext]) -> list[str]: """ 列出中央赝势库中所有可用的赝势类型 (例如 'PBE_potpaw', 'PAW_GGA_PBE')。 """ app_ctx = ctx.request_context.lifespan_context conn = app_ctx.ssh_connection try: # 使用 ls -F, 目录会带上 '/' 后缀 result = await conn.run(f"ls -F {shell_quote(app_ctx.potcar_base)}", check=True) potcar_types = [ name.strip('/') for name in result.stdout.strip().split() if name.endswith('/') ] await ctx.info(f"发现可用赝势库: {potcar_types}") return potcar_types except Exception as e: await ctx.error(f"列出 POTCAR 类型失败: {e}") return [] # --- 工具 2 (新): 查询指定赝势库中的可用元素 --- @mcp.tool() async def query_potcar_elements(ctx: Context[ServerSession, VaspContext], potcar_type: str) -> list[str]: """ 查询指定类型的赝势库中包含哪些元素的赝势。 """ app_ctx = ctx.request_context.lifespan_context conn = app_ctx.ssh_connection # 安全地构建源目录路径 source_dir = posixpath.join(app_ctx.potcar_base, potcar_type) if ".." in potcar_type or "/" in potcar_type: await ctx.error(f"非法的赝势库类型: {potcar_type}") return [] await ctx.info(f"查询目录 '{source_dir}' 中的可用元素...") try: result = await conn.run(f"ls -F {shell_quote(source_dir)}", check=True) elements = [ name.strip('/') for name in result.stdout.strip().split() if name.endswith('/') ] return elements except asyncssh.ProcessError as e: msg = f"查询元素失败: 赝势库 '{potcar_type}' 可能不存在。Stderr: {e.stderr}" await ctx.error(msg) return [] except Exception as e: await ctx.error(f"查询 POTCAR 元素时发生未知错误: {e}") return [] # --- 工具 3 (新): 从中央库安全地复制 POTCAR 文件到沙箱 --- @mcp.tool() async def copy_potcar_file( ctx: Context[ServerSession, VaspContext], potcar_type: str, element: str, destination_path: str ) -> dict[str, str]: """ 从中央赝势库安全地复制一个指定元素的 POTCAR 文件到用户沙箱中的目标路径。 例如, 将 'PBE_potpaw' 库中的 'Si' 赝势复制到 'sio2_relax/POTCAR_Si'。 """ app_ctx = ctx.request_context.lifespan_context conn = app_ctx.ssh_connection try: # 1. 安全地构建源文件路径 (只允许访问 potcar_base 下的子目录) if ".." in potcar_type or "/" in potcar_type or ".." in element or "/" in element: raise ValueError("非法的赝势库类型或元素名称。") source_file = posixpath.join(app_ctx.potcar_base, potcar_type, element, "POTCAR") # 2. 安全地构建目标文件路径 (必须在沙箱内) dest_file_abs = _safe_join_sandbox(app_ctx.sandbox_path, destination_path) # 3. 执行 cp 命令 cmd = f"cp {shell_quote(source_file)} {shell_quote(dest_file_abs)}" await ctx.info(f"执行安全复制: cp {source_file} -> {dest_file_abs}") await conn.run(cmd, check=True) return {"status": "success", "source": source_file, "destination": destination_path} except asyncssh.ProcessError as e: msg = f"复制 POTCAR 失败。请检查 potcar_type 和 element 是否正确。Stderr: {e.stderr}" await ctx.error(msg) return {"status": "error", "message": msg} except ValueError as e: await ctx.error(f"路径验证失败: {e}") return {"status": "error", "message": str(e)} except Exception as e: await ctx.error(f"复制 POTCAR 时发生未知错误: {e}") return {"status": "error", "message": str(e)} # (可选) 我们仍然可以保留 cif_to_poscar 工具,因为它对于确定元素顺序非常有用 @mcp.tool() def cif_to_poscar(cif_content: str, sort_structure: bool = True) -> dict[str, str]: """将 CIF 文件内容转换为 VASP 的 POSCAR 文件内容,并提供有序的元素列表。""" # ... (此工具代码与之前版本相同) try: structure = Structure.from_str(cif_content, fmt="cif") if sort_structure: structure = structure.get_sorted_structure() elements = [site.specie.symbol for site in structure] unique_elements = sorted(set(elements), key=elements.index) poscar_string_io = StringIO() structure.to(fmt="poscar", file_obj=poscar_string_io) poscar_content = poscar_string_io.getvalue() return { "status": "success", "poscar_content": poscar_content, "elements": " ".join(unique_elements) } except Exception as e: return {"status": "error", "message": f"CIF 转换失败: {e}"} @mcp.tool() def cif_to_poscar(cif_content: str,ctx: Context[ServerSession, VaspContext], sort_structure: bool = True) -> dict[str, str]: """ 将 CIF 文件内容稳健地转换为 VASP 的 POSCAR 文件内容。 该工具会尝试多种解析策略来处理格式不规范的CIF文件。 如果成功,返回 POSCAR 内容和用于生成 POTCAR 的有序元素列表。 如果失败,返回详细的错误信息。 Args: cif_content (str): 包含晶体结构的 CIF 文件完整内容。 sort_structure (bool): 是否对原子进行排序以匹配 Pymatgen 的 POTCAR 约定。默认为 True。 """ structure = None last_exception = None # 忽略 Pymatgen 可能产生的警告,避免污染输出 with warnings.catch_warnings(): warnings.simplefilter("ignore") # --- 策略 1: 标准解析 --- try: ctx.debug("尝试策略 1: 标准 CIF 解析...") structure = Structure.from_str(cif_content, fmt="cif") ctx.info("策略 1 成功: 标准解析完成。") except Exception as e: ctx.warning(f"策略 1 失败: {e}") last_exception = e # --- 策略 2: 宽松解析 (忽略化合价检查) --- if structure is None: try: ctx.debug("尝试策略 2: 宽松解析 (不检查化合价)...") # 使用底层的 CifParser 并禁用化合价检查 parser = CifParser.from_string(cif_content, check_valence=False) structure = parser.get_structures(primitive=True)[0] ctx.info("策略 2 成功: 宽松解析完成。") except Exception as e: ctx.warning(f"策略 2 失败: {e}") last_exception = e # --- 策略 3: 使用原始坐标,不进行对称性处理 --- if structure is None: try: ctx.debug("尝试策略 3: 使用原始坐标 (primitive=False)...") parser = CifParser.from_string(cif_content) # 获取文件中的原始结构,而不是计算出的原胞 structure = parser.get_structures(primitive=False)[0] ctx.info("策略 3 成功: 已使用原始坐标。") except Exception as e: ctx.warning(f"策略 3 失败: {e}") last_exception = e # --- 如果所有策略都失败 --- if structure is None: error_message = ( "无法从提供的 CIF 内容中解析出晶体结构。所有解析策略均已失败。\n" f"最后遇到的错误是: {last_exception}\n" "建议: 请检查 CIF 文件格式是否严重损坏。AI 可以尝试重新生成 CIF,或直接请求用户提供 POSCAR 文件内容。" ) ctx.error(error_message) return {"status": "error", "message": error_message} # --- 成功后处理 --- try: # 排序结构以确保元素顺序与 pymatgen 的 POTCAR 生成逻辑一致 if sort_structure: structure = structure.get_sorted_structure() # 从结构中获取有序的元素列表 elements = [site.specie.symbol for site in structure.composition.elements] # 生成 POSCAR 内容 poscar_string_io = StringIO() # 使用 vasp5 格式,确保元素行存在 structure.to(fmt="poscar", file_obj=poscar_string_io) poscar_content = poscar_string_io.getvalue() return { "status": "success", "poscar_content": poscar_content, "elements": " ".join(elements) # 以空格分隔的字符串形式提供有序元素 } except Exception as e: # 这种情况很少见,但可能在结构后处理时发生 final_error = f"结构解析成功,但在生成POSCAR时出错: {e}" ctx.error(final_error) return {"status": "error", "message": final_error} @mcp.tool() async def test_vasp_run( ctx: Context[ServerSession, VaspContext], job_directory: str ) -> dict[str, Any]: """ 在指定目录中启动 VASP 的“精简模式”(--lite)以进行快速测试。 此模式会完整解析所有输入文件(INCAR, POSCAR等)并检查参数, 但不会开始实际的离子或电子步计算,通常在数秒内完成。 Args: job_directory (str): 包含所有 VASP 输入文件的远程沙箱子目录。 """ app_ctx = ctx.request_context.lifespan_context conn = app_ctx.ssh_connection try: # 安全地构建工作目录的绝对路径 workdir_abs = _safe_join_sandbox(app_ctx.sandbox_path, job_directory) await ctx.info(f"在 '{workdir_abs}' 中开始 VASP 输入文件验证 (精简模式)...") # 关键:在 VASP 执行命令后附加 --lite 标志 # 注意: mpirun [options] [args] # 我们需要将 --lite 附加到 vasp_std 后面 # 假设 app_ctx.mpi_run_cmd 是 "mpirun -np 4 .../vasp_std" command_with_lite = f"{app_ctx.mpi_run_cmd} --lite" # 构建完整的 shell 命令,以激活环境并执行 # 这里的 f-string 已被修正,以避免多行和嵌套问题 inner_command = ( f"cd {shell_quote(workdir_abs)}; " f"source {shell_quote(app_ctx.env_script)}; " f"{command_with_lite}" ) full_shell_command = f"bash -lc {shell_quote(inner_command)}" # 使用简单的 conn.run 等待命令完成。check=False 因为我们想自己处理非零退出 proc = await conn.run(full_shell_command, check=False) # 分析结果 stdout = proc.stdout or "" stderr = proc.stderr or "" # VASP 成功完成初始化并正常退出的标志通常是这条信息 success_indicator = "General timing and accounting informations for this job" if proc.exit_status == 0 and success_indicator in stdout: test_passed = True conclusion = "测试成功:VASP 输入文件有效,所有参数均被正确解析。" await ctx.info(conclusion) else: test_passed = False conclusion = "测试失败:VASP 报告了错误或未能正常完成初始化。请检查下面的 stderr 输出。" await ctx.warning(conclusion) return { "status": "completed", "test_passed": test_passed, "conclusion": conclusion, "exit_status": proc.exit_status, "stdout_preview": "\n".join(stdout.splitlines()[-20:]), # 只看最后20行,避免刷屏 "stderr": stderr } except Exception as e: msg = f"执行 test_vasp_run 工具时发生意外错误: {e}" await ctx.error(msg) return {"status": "error", "message": str(e)} # 确保这个 _safe_join_sandbox 辅助函数存在于 create_vasp_mcp 函数内部或可被访问 return mcp