Files
solidstate-tools/mcp/softBV_remake.py
koko c0b2ec5983 sofvBV_mcp重构v2
Embedding copy
2025-10-22 23:59:23 +08:00

1659 lines
60 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# softbv_mcp_refactored.py
#
# 这是 softBV 工具集的重构版本。
#
# - 核心框架: 保持了原有的 SSH 生命周期管理和远程环境激活机制 [6]。
# - 重构起点: 暂时移除了旧的 softbv_info, softbv_md, softbv_gen_cube 函数。
# - 新增工具: 包含一个新的、功能单一的 `softbv_print_cell` 工具,用于执行 `softBV.x --print-cell` [6]。
#
# 依赖:
# pip install "mcp[cli]" asyncssh pydantic
#
# 用法:
# 1. 直接运行进行测试: `python softbv_mcp_refactored.py`
# 2. 或集成到 Starlette (main.py):
# from softbv_mcp_refactored import create_softbv_mcp
# softbv_mcp = create_softbv_mcp()
# Mount("/softbv-new", app=softbv_mcp.streamable_http_app())
import os
import posixpath
import asyncio
from dataclasses import dataclass
from typing import Any
from collections.abc import AsyncIterator
from contextlib import asynccontextmanager
import asyncssh
from pydantic import BaseModel, Field
from mcp.server.fastmcp import FastMCP, Context
from mcp.server.session import ServerSession
import re # 确保导入 re 模块
# ========= 1. 配置与常量 =========
# 这些配置与您之前的脚本保持一致,可以从环境变量或硬编码中获取。
# 从 lifespan.py 或其他配置文件导入,如果它们存在的话
try:
from lifespan import REMOTE_HOST, REMOTE_USER, PRIVATE_KEY_PATH
except ImportError:
# 如果找不到,则使用默认值
REMOTE_HOST = os.getenv("REMOTE_HOST", "202.121.182.208")
REMOTE_USER = os.getenv("REMOTE_USER", "koko125")
PRIVATE_KEY_PATH = os.getenv("PRIVATE_KEY_PATH", "D:/tool/tool/id_rsa.txt")
# softBV 环境相关的固定路径
SOFTBV_PROFILE = os.getenv("SOFTBV_PROFILE", "/cluster/home/koko125/script/softBV.sh")
SOFTBV_BIN = os.getenv("SOFTBV_BIN", "/cluster/home/koko125/tool/softBV-GUI_linux/bin/softBV.x")
DEFAULT_WORKDIR = os.getenv("DEFAULT_WORKDIR", f"/cluster/home/{REMOTE_USER}/sandbox")
class PPTUnitaryItem(BaseModel):
"""PROPERTY_UNITARY 表中的一行数据"""
type: str
an: int = Field(description="Atomic Number")
softness: float
rc: float = Field(description="Cutoff Radius")
q: float = Field(description="Charge")
class PPTBinaryItem(BaseModel):
"""PROPERTY_BINARY 表中的一行数据"""
cation: str
anion: str
D_0: float
R_min: float
b: float
R_0: float
R_cutoff: float
class PPTResult(BaseModel):
"""'--print-ppt' 命令的完整结构化解析结果"""
screening_factor: float | None = Field(description="最终计算出的 screening factor")
unitary_properties: list[PPTUnitaryItem] = Field(description="一元属性列表")
binary_properties: list[PPTBinaryItem] = Field(description="二元属性列表")
raw_output: str = Field(description="命令的完整原始标准输出")
class GenCubeDefaultArgs(BaseModel):
"""用于'--gen-cube'的默认参数模式"""
input_cif: str = Field(description="远程 CIF 文件路径")
type: str = Field(description="导电离子类型 (例如 'Li')")
os: int = Field(description="导电离子氧化态 (例如 1)")
class GenCubeFullArgs(BaseModel):
"""用于'--gen-cube'的全参数模式"""
input_cif: str = Field(description="远程 CIF 文件路径")
type: str = Field(description="导电离子类型")
os: int = Field(description="导电离子氧化态")
sf: float = Field(description="自定义 screening factor (非正值则自动计算)")
resolution: float = Field(description="体素分辨率")
ignore_conducting_ion: bool = Field(description="是否在计算中忽略导电离子")
periodic: bool = Field(description="是否使用周期性边界条件")
output_name: str | None = Field(default=None, description="输出文件的前缀 (可选)")
class GenCubeResult(BaseModel):
"""'--gen-cube' 命令的执行结果"""
command_used: str
working_directory: str
exit_status: int
log_file: str = Field(description="远程服务器上的完整日志文件路径")
output_head: str = Field(description="日志文件的前40行用于快速预览")
output_tail: str = Field(description="日志文件的后40行用于查看最终结果")
new_files: list[str] = Field(description="执行后新生成的文件列表")
elapsed_seconds: int = Field(description="任务总耗时(秒)")
class BVSumItem(BaseModel):
"""每个离子的键价和 (Bond Valence Sum)"""
name: str = Field(description="离子名称")
type: str = Field(description="离子类型")
occ: float = Field(description="占有率 (occupancy)")
bv_sum: float = Field(description="键价和 (bond valence sum)")
class BVPairItem(BaseModel):
"""离子对之间的键价"""
name1: str
type1: str
occ1: float
name2: str
type2: str
occ2: float
bv: float = Field(description="键价 (bond valence)")
class CalBVResult(BaseModel):
"""
'--cal-bv' 命令的优化版结构化解析结果。
为了避免返回内容过大,只返回关键摘要和数据预览。
"""
global_instability_index: float | None = Field(None, description="全局不稳定性指数 (GII)")
suggested_stability: str | None = Field(None, description="建议的稳定性 (例如 'stable')")
# 摘要信息
total_bv_sums: int = Field(0, description="键价和 (BVS) 条目总数")
total_bv_pairs: int = Field(0, description="键价对 (BV pairs) 条目总数")
# 数据预览 (例如前 20 条)
bv_sums_preview: list[BVSumItem] = Field(description="键价和列表的预览")
bv_pairs_preview: list[BVPairItem] = Field(description="键价对列表的预览")
# 原始输出仍然保留,以备 AI 需要时自行解析
raw_output_head: str = Field(description="命令原始标准输出的前 50 行")
raw_output_tail: str = Field(description="命令原始标准输出的后 50 行")
class CubeAtomInfo(BaseModel):
"""从 .cube 文件中解析出的单个原子信息"""
index: int
name: str | None = None
type: str
os: int = Field(description="Oxidation State")
x_f: tuple[float, float, float] = Field(description="Fractional coordinates")
occ: float = Field(description="Occupancy")
class CubeCellParams(BaseModel):
"""晶格常数"""
a: float
b: float
c: float
class CubeCellAngles(BaseModel):
"""晶格角度"""
alpha: float
beta: float
gamma: float
class PrintCubeResult(BaseModel):
"""'--print-cube' 命令的完整结构化解析结果"""
name: str = Field(description="Cube 文件名")
cell_vector_matrix: list[list[float]] = Field(description="晶胞矢量矩阵")
cell_parameters: CubeCellParams
cell_angles: CubeCellAngles
total_atoms: int
atoms: list[CubeAtomInfo] = Field(description="原子信息列表")
cube_size: tuple[int, int, int]
cube_cell_vector: list[list[float]]
cube_voxel_vector: list[list[float]] = Field(description="Voxel vector in Angstroms")
raw_output: str = Field(description="命令的完整原始标准输出")
class CalGIIResult(BaseModel):
"""'--cal-gii' 命令的结构化解析结果"""
global_instability_index: float | None = Field(None, description="全局不稳定性指数 (GII)")
suggested_stability: str | None = Field(None, description="建议的稳定性 (例如 'stable')")
raw_output: str = Field(description="命令的完整原始标准输出")
class CoordinationItem(BaseModel):
"""'--cal-cn' 输出表格中的一行"""
index: int
shell: int
coordinated_type: str
r: float = Field(description="距离 (Å)")
occ: float = Field(description="占有率")
n: float = Field(description="累计配位数")
rds: float
class CalCNResult(BaseModel):
"""'--cal-cn' 命令的结构化解析结果"""
center_atom: str | None = Field(None, description="中心原子名称")
coordination_number: float | None = Field(None, description="总配位数")
coordination_details: list[CoordinationItem] = Field(description="配位详细信息列表")
raw_output: str = Field(description="命令的完整原始标准输出")
class CalSymResult(BaseModel):
"""'--cal-sym' 命令的结构化解析结果"""
determined_space_group: str | None = Field(None, description="最终确定的空间群")
fits_to_symmetries: list[str] = Field(description="计算过程中拟合到的所有对称性列表")
raw_output: str = Field(description="命令的完整原始标准输出")
class CalTotEnResult(BaseModel):
"""'--cal-tot-en' 命令的结构化解析结果"""
total_energy_eV: float | None = Field(None, description="计算出的总能量 (eV)")
screening_factor_used: float | None = Field(None, description="计算中使用的 screening factor (sf)")
raw_output: str = Field(description="命令的完整原始标准输出")
class AnalyzePathwayArgs(BaseModel):
"""'--gh' (pathway analysis) 命令的输入参数"""
input_cif: str = Field(description="用于标记位点的远程 CIF 文件路径")
input_cube: str = Field(description="包含势能数据的远程 Cube 文件路径")
type: str = Field(description="导电离子类型 (例如 'Li')")
os: int = Field(description="导电离子氧化态 (例如 1)")
barrier_max: float | None = Field(None, description="最大势垒,低于此值的路径才会被显示")
periodic: bool = Field(True, description="是否将 cube 文件视为周期性的")
class PercolationThresholds(BaseModel):
"""'--gh' 命令输出中解析的渗流阈值"""
e_total: float | None = Field(None, alias="e", description="总网络的阈值")
e_1D: float | None = Field(None, description="一维传导网络的阈值")
e_2D: float | None = Field(None, description="二维传导网络的阈值")
e_3D: float | None = Field(None, description="三维传导网络的阈值")
e_loop: float | None = Field(None, description="循环路径的阈值")
class AnalyzePathwayResult(BaseModel):
"""'--gh' 命令的完整结构化解析结果"""
command_used: str
working_directory: str
exit_status: int
thresholds: PercolationThresholds | None = Field(None, description="计算出的渗流阈值")
new_files: list[str] = Field(description="执行后新生成的文件列表 (例如 *.gh.cif)")
raw_output: str = Field(description="命令的完整原始标准输出")
class MDDefaultArgs(BaseModel):
"""用于 '--md' 的默认参数模式"""
input_cif: str = Field(description="远程 CIF 文件路径")
type: str = Field(description="导电离子类型 (例如 'Li')")
os: int = Field(description="导电离子氧化态 (例如 1)")
class MDFullArgs(BaseModel):
"""用于 '--md' 的全参数模式"""
input_cif: str = Field(description="远程 CIF 文件路径")
type: str = Field(description="导电离子类型")
os: int = Field(description="导电离子氧化态")
sf: float = Field(description="自定义 screening factor")
temperature: float = Field(description="温度 (K)")
t_end: float = Field(description="生产时间 (ps)")
t_equil: float = Field(description="平衡时间 (ps)")
dt: float = Field(description="时间步长 (ps)")
t_log: float = Field(description="采样间隔 (ps)")
class MDResultProperties(BaseModel):
"""'--md' 输出中解析出的最终物理属性"""
displacement: float | None = None
diffusivity: float | None = None
mobility: float | None = None
conductivity: float | None = None
class MDResult(BaseModel):
"""'--md' 命令的完整结构化执行结果"""
command_used: str
working_directory: str
exit_status: int
final_properties: MDResultProperties | None = Field(None, description="计算出的最终电导率等属性")
log_file: str = Field(description="远程服务器上的完整日志文件路径")
output_head: str = Field(description="日志文件的前40行")
output_tail: str = Field(description="日志文件的后40行包含最终结果")
new_files: list[str] = Field(description="执行后新生成的文件列表")
elapsed_seconds: int = Field(description="任务总耗时(秒)")
class KMCDefaultArgs(BaseModel):
"""用于 '--kmc' 的默认参数模式"""
input_cif: str = Field(description="远程 CIF 文件路径")
input_cube: str = Field(description="远程 Cube 文件路径")
type: str = Field(description="导电离子类型 (例如 'Li')")
os: int = Field(description="导电离子氧化态 (例如 1)")
class KMCFullArgs(BaseModel):
"""用于 '--kmc' 的全参数模式"""
input_cif: str = Field(description="远程 CIF 文件路径")
input_cube: str = Field(description="远程 Cube 文件路径")
type: str = Field(description="导电离子类型")
os: int = Field(description="导电离子氧化态")
supercell: tuple[int, int, int] | None = Field(None, description="超胞维度 (s0, s1, s2)")
sf: float | None = Field(None, description="自定义 screening factor")
temperature: float | None = Field(None, description="温度 (K)")
t_limit: float | None = Field(None, description="时间限制 (ps)")
step_limit: int | None = Field(None, description="步数限制")
step_log: int | None = Field(None, description="日志记录步数间隔")
cutoff: float | None = Field(None, description="库仑排斥截断半径")
class KMCResultVector(BaseModel):
"""表示一个标量和一个矢量分量 (total, (x, y, z))"""
total: float
vector: tuple[float, float, float]
class KMCSiteOccupancy(BaseModel):
"""KMC 结果中的单个位点占据信息"""
site_name: str
occupancy: float
multiplicity: int
class KMCResultProperties(BaseModel):
"""'--kmc' 输出中解析出的最终物理属性"""
temperature: float | None = None
displacement: KMCResultVector | None = None
diffusivity: KMCResultVector | None = None
mobility: KMCResultVector | None = None
conductivity: KMCResultVector | None = None
site_occupancy_summary: list[KMCSiteOccupancy] = Field(default_factory=list)
class KMCResult(BaseModel):
"""'--kmc' 命令的完整结构化执行结果"""
command_used: str
working_directory: str
exit_status: int
final_properties: KMCResultProperties | None = Field(None, description="计算出的最终 KMC 属性")
log_file: str = Field(description="远程服务器上的完整日志文件路径")
output_head: str = Field(description="日志文件的前部内容")
output_tail: str = Field(description="日志文件的尾部内容,包含最终结果")
new_files: list[str] = Field(description="执行后新生成的文件列表")
elapsed_seconds: int = Field(description="任务总耗时(秒)")
# ========= 2. 辅助函数 =========
def shell_quote(arg: str) -> str:
"""安全地将字符串作为单个 shell 参数 (POSIX)。"""
return "'" + str(arg).replace("'", "'\"'\"'") + "'"
async def run_in_softbv_env(
conn: asyncssh.SSHClientConnection,
profile_path: str,
cmd: str,
cwd: str | None = None,
check: bool = True,
) -> asyncssh.SSHCompletedProcess:
"""
在远端 bash 会话中激活 softBV 环境并执行命令。
"""
parts = []
if cwd:
parts.append(f"cd {shell_quote(cwd)}")
# 激活环境脚本
parts.append(f"source {shell_quote(profile_path)}")
# 执行主命令
parts.append(cmd)
composite_cmd = "; ".join(parts)
# 使用 bash -lc 确保登录环境和命令可以被正确解析
full_command = f"bash -lc {shell_quote(composite_cmd)}"
return await conn.run(full_command, check=check)
def _parse_ppt_output(raw_text: str) -> PPTResult:
"""
'--print-ppt' 的原始 stdout 解析为结构化的 PPTResult 模型。
"""
lines = raw_text.splitlines()
screening_factor = None
unitary_items = []
binary_items = []
# 查找 screening factor
for line in reversed(lines):
if "screening factor =" in line:
try:
screening_factor = float(line.split("=")[-1].strip())
break
except (ValueError, IndexError):
continue
# 解析表格
in_unitary = False
in_binary = False
for line in lines:
if "PPT: | type | an |" in line:
in_unitary = True
in_binary = False
continue
if "PPT: | cation | anion |" in line:
in_unitary = False
in_binary = True
continue
if "PPT:--" in line or "PPT:==" in line:
continue
parts = [p.strip() for p in line.split("|") if p.strip()]
if in_unitary and len(parts) == 5:
try:
unitary_items.append(PPTUnitaryItem(
type=parts[0],
an=int(parts[1]),
softness=float(parts[2]),
rc=float(parts[3]),
q=float(parts[4])
))
except (ValueError, IndexError):
continue
elif in_binary and len(parts) == 7:
try:
binary_items.append(PPTBinaryItem(
cation=parts[0],
anion=parts[1],
D_0=float(parts[2]),
R_min=float(parts[3]),
b=float(parts[4]),
R_0=float(parts[5]),
R_cutoff=float(parts[6])
))
except (ValueError, IndexError):
continue
return PPTResult(
screening_factor=screening_factor,
unitary_properties=unitary_items,
binary_properties=binary_items,
raw_output=raw_text
)
async def _run_in_softbv_env_stream(
conn: asyncssh.SSHClientConnection,
profile_path: str,
cmd: str,
cwd: str | None = None,
) -> asyncssh.SSHClientProcess:
"""以非阻塞方式,在激活环境中启动一个命令,并返回进程句柄。"""
parts = []
if cwd:
parts.append(f"cd {shell_quote(cwd)}")
# 将 source 的输出重定向,避免污染主命令的日志
parts.append(f"source {shell_quote(profile_path)} >/dev/null 2>&1 || true")
parts.append(cmd)
composite_cmd = "; ".join(parts)
full_command = f"bash -lc {shell_quote(composite_cmd)}"
proc = await conn.create_process(full_command)
return proc
async def _listdir_safe(conn: asyncssh.SSHClientConnection, path: str) -> list[str]:
"""安全地列出目录内容,失败时返回空列表。"""
try:
async with conn.start_sftp_client() as sftp:
return await sftp.listdir(path)
except Exception:
return []
async def _stat_size_safe(conn: asyncssh.SSHClientConnection, path: str) -> int | None:
"""安全地获取文件大小,失败时返回 None。"""
try:
async with conn.start_sftp_client() as sftp:
attrs = await sftp.stat(path)
return int(attrs.size or 0)
except Exception:
return None
def _parse_cal_bv_output(raw_text: str) -> CalBVResult:
"""
'--cal-bv' 的原始 stdout 解析为优化的、包含摘要和预览的 CalBVResult 模型。
"""
lines = raw_text.splitlines()
gii = None
stability = None
bv_sums = []
bv_pairs = []
# ...(GII 和表格解析逻辑保持不变)...
in_sum_section = False
in_pair_section = False
for line in lines:
if line.startswith("GII: Global instability index ="):
try:
gii = float(line.split("=")[-1].strip())
except (ValueError, IndexError):
pass
elif line.startswith("GII: suggested stability:"):
try:
stability = line.split(":", 1)[-1].strip().rstrip('.')
except IndexError:
pass
if line.startswith("BV: name type occ"):
in_sum_section = True
in_pair_section = False
continue
if line.startswith("BV: nam1 typ1 occ1"):
in_sum_section = False
in_pair_section = True
continue
if line.startswith("BV:="):
continue
if not line.startswith("BV:"):
in_sum_section = False
in_pair_section = False
if in_sum_section:
match = re.match(r"BV:\s+(\S+)\s+(\S+)\s+([\d.]+)\s+([-\d.]+)", line)
if match:
try:
bv_sums.append(BVSumItem(
name=match.group(1), type=match.group(2),
occ=float(match.group(3)), bv_sum=float(match.group(4))
))
except (ValueError, IndexError):
continue
elif in_pair_section:
try:
parts = line[3:].strip().split()
if len(parts) == 7:
bv_pairs.append(BVPairItem(
name1=parts[0], type1=parts[1], occ1=float(parts[2]),
name2=parts[3], type2=parts[4], occ2=float(parts[5]),
bv=float(parts[6])
))
except (ValueError, IndexError):
continue
# --- 关键修改:返回摘要和预览,而不是完整列表 ---
return CalBVResult(
global_instability_index=gii,
suggested_stability=stability,
total_bv_sums=len(bv_sums),
total_bv_pairs=len(bv_pairs),
bv_sums_preview=bv_sums[:20], # 只返回前 20 条作为预览
bv_pairs_preview=bv_pairs[:20], # 只返回前 20 条作为预览
raw_output_head="\n".join(lines[:50]),
raw_output_tail="\n".join(lines[-50:])
)
def _parse_print_cube_output(raw_text: str) -> PrintCubeResult:
"""
'--print-cube' 的原始 stdout 解析为结构化的 PrintCubeResult 模型。
"""
lines = raw_text.splitlines()
# Helper to parse vector blocks
def parse_vector_block(start_line_idx: int) -> list[list[float]]:
matrix = []
for i in range(3):
line = lines[start_line_idx + i]
parts = [float(p) for p in line.strip().split()]
matrix.append(parts)
return matrix
# Find key lines and parse data
name = re.search(r"CELL: name: (.*)", raw_text).group(1).strip()
total_atoms = int(re.search(r"CELL: total atom: (\d+)", raw_text).group(1))
params_line = re.search(r"CELL: parameters:\s+([\d.\s]+)", raw_text).group(1)
a, b, c = map(float, params_line.split())
cell_params = CubeCellParams(a=a, b=b, c=c)
angles_line = re.search(r"CELL: angles:\s+([\d.\s]+)", raw_text).group(1)
alpha, beta, gamma = map(float, angles_line.split())
cell_angles = CubeCellAngles(alpha=alpha, beta=beta, gamma=gamma)
cube_size_line = re.search(r"CUBE: cube size = (\d+) \* (\d+) \* (\d+)", raw_text).group(1, 2, 3)
cube_size = tuple(map(int, cube_size_line))
atoms = []
cell_matrix, cube_cell_vec, cube_voxel_vec = [], [], []
for i, line in enumerate(lines):
if "CELL: vector matrix:" in line:
cell_matrix = parse_vector_block(i + 1)
elif "CUBE: cell vector (A):" in line:
cube_cell_vec = parse_vector_block(i + 1)
elif "CUBE: votex vector (A):" in line: # Note: 'votex' is a typo in the original output
cube_voxel_vec = parse_vector_block(i + 1)
elif "ATOM:" in line:
atom_match = re.search(
r"\[\s*(\d+)\]ATOM: name=\s*([^;]*);type=([^;]*);os=\s*(-?\d+);x_f=\(([^)]*)\);occ=([\d.]+)", line)
if atom_match:
idx, atom_name, atom_type, os_val, xf_str, occ_val = atom_match.groups()
xf_coords = tuple(map(float, xf_str.split(',')))
atoms.append(CubeAtomInfo(
index=int(idx),
name=atom_name.strip() or None,
type=atom_type.strip(),
os=int(os_val),
x_f=xf_coords,
occ=float(occ_val)
))
return PrintCubeResult(
name=name,
cell_vector_matrix=cell_matrix,
cell_parameters=cell_params,
cell_angles=cell_angles,
total_atoms=total_atoms,
atoms=atoms,
cube_size=cube_size,
cube_cell_vector=cube_cell_vec,
cube_voxel_vector=cube_voxel_vec,
raw_output=raw_text
)
def _parse_cal_gii_output(raw_text: str) -> CalGIIResult:
"""解析 '--cal-gii' 的输出。"""
gii, stability = None, None
for line in raw_text.splitlines():
if line.startswith("GII: Global instability index ="):
try:
gii = float(line.split("=")[-1].strip())
except (ValueError, IndexError):
pass
elif line.startswith("GII: suggested stability:"):
try:
stability = line.split(":", 1)[-1].strip().rstrip('.')
except IndexError:
pass
return CalGIIResult(global_instability_index=gii, suggested_stability=stability, raw_output=raw_text)
def _parse_cal_cn_output(raw_text: str) -> CalCNResult:
"""解析 '--cal-cn' 的输出。"""
center_atom, cn = None, None
details = []
in_table = False
for line in raw_text.splitlines():
if line.startswith("CN: Center atom:"):
center_atom = line.split(":", 1)[-1].strip()
elif line.startswith("CN: Coordination Number:"):
try:
cn = float(line.split(":", 1)[-1].strip())
except (ValueError, IndexError):
pass
elif "CN: Index Shell" in line:
in_table = True
continue
if in_table and line.startswith("CN:"):
try:
parts = line[3:].strip().split()
if len(parts) == 7:
details.append(CoordinationItem(
index=int(parts[0]), shell=int(parts[1]), coordinated_type=parts[2],
r=float(parts[3]), occ=float(parts[4]), n=float(parts[5]), rds=float(parts[6])
))
except (ValueError, IndexError):
pass
return CalCNResult(center_atom=center_atom, coordination_number=cn, coordination_details=details,
raw_output=raw_text)
def _parse_cal_sym_output(raw_text: str) -> CalSymResult:
"""解析 '--cal-sym' 的输出。"""
determined_sg = None
fits = []
for line in raw_text.splitlines():
if line.startswith("Message: structure fits to symmetry"):
fits.append(line.split("symmetry")[-1].strip().rstrip('.'))
elif line.startswith("Message: determined space group:"):
determined_sg = line.split(":", 1)[-1].strip()
return CalSymResult(determined_space_group=determined_sg, fits_to_symmetries=fits, raw_output=raw_text)
def _parse_cal_tot_en_output(raw_text: str) -> CalTotEnResult:
"""解析 '--cal-tot-en' 的输出。"""
total_energy, sf = None, None
for line in raw_text.splitlines():
if "total energy =" in line:
try:
# 提取能量值
energy_str = line.split("=")[-1].strip().split()[0]
total_energy = float(energy_str)
except (ValueError, IndexError):
pass
elif line.startswith("Message: sf="):
try:
# 提取使用的 sf 值
sf = float(line.split("=")[-1].strip())
except (ValueError, IndexError):
pass
return CalTotEnResult(
total_energy_eV=total_energy,
screening_factor_used=sf,
raw_output=raw_text
)
def _parse_gh_output(raw_text: str) -> PercolationThresholds | None:
"""'--gh' 的 stdout 中解析 SOFTBV_GRAPH_NETWORK 行。"""
thresholds = {}
# 正则表达式用于匹配 "e_type = value" 格式
pattern = re.compile(r"SOFTBV_GRAPH_NETWORK: (e(?:_1D|_2D|_3D|_loop)?)\s*=\s*([-\d.]+)")
for line in raw_text.splitlines():
match = pattern.search(line)
if match:
key = match.group(1)
value = float(match.group(2))
# 为了与 Pydantic 模型中的 alias 对应, 'e' 保持不变
if key == "e":
thresholds["e"] = value
else:
thresholds[key] = value
if not thresholds:
return None
return PercolationThresholds.model_validate(thresholds)
def _parse_md_output(raw_text: str) -> MDResultProperties:
"""'--md' 的 stdout 中解析最终的物理属性。"""
properties = {}
# 正则表达式匹配 "MD: key = value" 格式
pattern = re.compile(r"MD: (displacement|diffusivity|mobility|conductivity)\s*=\s*([-\d.eE]+)")
for line in raw_text.splitlines():
match = pattern.search(line)
if match:
key = match.group(1)
value = float(match.group(2))
properties[key] = value
return MDResultProperties.model_validate(properties)
# 在 softbv_mcp_refactored.py 的辅助函数区域添加
def _parse_kmc_output(raw_text: str) -> KMCResultProperties:
"""'--kmc' 的 stdout 中解析最终的物理属性和位点占据信息。"""
properties = {"site_occupancy_summary": []}
in_occupancy_summary = False
# 匹配 "key = total, (x, y, z)" 或 "key = total" 的行
vector_pattern = re.compile(
r"KMC: (temperature|displacement|diffusivity|mobility|conductivity)\s*=\s*([-\d.eE]+)(?:,\s*\(([-\d.eE]+),\s*([-\d.eE]+),\s*([-\d.eE]+)\))?"
)
# 匹配位点占据信息的行
occupancy_pattern = re.compile(
r"KMC:\s*\[\s*([^\]]+)\]\s+([-\d.]+)\s+\(multiplicity\s*=\s*(\d+)\)"
)
for line in raw_text.splitlines():
if "KMC: summary of site-averaged occupancy:" in line:
in_occupancy_summary = True
continue
vec_match = vector_pattern.search(line)
if vec_match:
key, total_str, x_str, y_str, z_str = vec_match.groups()
total = float(total_str)
if key == 'temperature':
properties[key] = total
elif x_str is not None:
properties[key] = {
"total": total,
"vector": (float(x_str), float(y_str), float(z_str))
}
continue
if in_occupancy_summary:
occ_match = occupancy_pattern.search(line)
if occ_match:
site, occ, mult = occ_match.groups()
properties["site_occupancy_summary"].append({
"site_name": site.strip(),
"occupancy": float(occ),
"multiplicity": int(mult)
})
return KMCResultProperties.model_validate(properties)
# ========= 3. 生命周期管理 =========
@dataclass
class SoftBVContext:
"""在服务器生命周期内共享的上下文对象。"""
ssh_connection: asyncssh.SSHClientConnection
workdir: str
profile: str
bin_path: str
@asynccontextmanager
async def softbv_lifespan(_server: FastMCP) -> AsyncIterator[SoftBVContext]:
"""
FastMCP 生命周期管理器:在服务器启动时建立 SSH 连接,在关闭时安全断开。
这个函数创建的 SoftBVContext 将被注入到所有工具中 [1, 6]。
"""
conn: asyncssh.SSHClientConnection | None = None
try:
await asyncio.sleep(0) # 确保事件循环已启动
print(f"正在连接到 {REMOTE_USER}@{REMOTE_HOST}...")
conn = await asyncssh.connect(
REMOTE_HOST,
username=REMOTE_USER,
client_keys=[PRIVATE_KEY_PATH],
known_hosts=None,
connect_timeout=15,
)
print("SSH 连接成功!")
# `yield` 将上下文提供给 MCP 服务器
yield SoftBVContext(
ssh_connection=conn,
workdir=DEFAULT_WORKDIR,
profile=SOFTBV_PROFILE,
bin_path=SOFTBV_BIN,
)
finally:
# 服务器关闭时,清理资源
if conn:
conn.close()
await conn.wait_closed()
print("SSH 连接已关闭。")
# ========= 4. MCP 服务器实例 =========
mcp = FastMCP(
name="softBV Tools (Refactored)",
instructions="用于执行 softBV 相关计算的工具集(重构版)。",
lifespan=softbv_lifespan,
streamable_http_path="/", # 方便挂载到 Starlette [1]
)
# ========= 5. 工具定义 =========
#
# 原有的 softbv_info, softbv_md, softbv_gen_cube 函数已移除,
# 等待您完成重构后再添加。
#
@mcp.tool()
async def softbv_print_cell(
cif_path: str,
cwd: str | None = None,
ctx: Context[ServerSession, SoftBVContext] | None = None,
) -> dict[str, Any]:
"""
执行 'softBV.x --print-cell' 命令,打印并返回晶胞信息。
Args:
cif_path (str): 远程服务器上的 CIF 文件路径(可以是相对或绝对路径)。
cwd (str | None): 远程工作目录。如果未提供,则使用默认沙箱目录。
ctx: MCP 上下文,由框架自动注入。
Returns:
dict: 一个包含命令执行结果的字典,包括退出码、标准输出和标准错误。
"""
if ctx is None:
raise ValueError("Context is required for this operation.")
app_ctx = ctx.request_context.lifespan_context
conn = app_ctx.ssh_connection
workdir = cwd or app_ctx.workdir
# 如果 cif_path 不是绝对路径,则基于工作目录构建完整路径
input_abs_path = cif_path
if not input_abs_path.startswith("/"):
input_abs_path = posixpath.normpath(posixpath.join(workdir, cif_path))
# 构造命令
cmd = f"{shell_quote(app_ctx.bin_path)} --print-cell {shell_quote(input_abs_path)}"
await ctx.info(f"执行命令: {cmd} (工作目录: {workdir})")
# 在激活的环境中运行命令,不要求必须成功 (check=False)
proc = await run_in_softbv_env(conn, app_ctx.profile, cmd=cmd, cwd=workdir, check=False)
if proc.exit_status != 0:
await ctx.warning(f"命令执行失败,退出码: {proc.exit_status}")
# 返回结构化输出,便于 AI 或客户端解析 [1, 6]
return {
"command_used": cmd,
"working_directory": workdir,
"exit_status": proc.exit_status,
"stdout": proc.stdout,
"stderr": proc.stderr,
}
@mcp.tool()
async def softbv_print_ppt(
cif_path: str,
cwd: str | None = None,
ctx: Context[ServerSession, SoftBVContext] | None = None,
) -> PPTResult:
"""
执行 'softBV.x --print-ppt',计算并返回配对势参数 (Pair-Potential Terms)。
Args:
cif_path (str): 远程服务器上的 CIF 文件路径(可以是相对或绝对路径)。
cwd (str | None): 远程工作目录。如果未提供,则使用默认沙箱目录。
ctx: MCP 上下文,由框架自动注入。
Returns:
PPTResult: 一个包含解析后的配对势参数和原始输出的结构化对象。
"""
if ctx is None:
raise ValueError("Context is required for this operation.")
app_ctx = ctx.request_context.lifespan_context
conn = app_ctx.ssh_connection
workdir = cwd or app_ctx.workdir
# 构建路径和命令
input_abs_path = cif_path
if not input_abs_path.startswith("/"):
input_abs_path = posixpath.normpath(posixpath.join(workdir, cif_path))
cmd = f"{shell_quote(app_ctx.bin_path)} --print-ppt {shell_quote(input_abs_path)}"
await ctx.info(f"执行命令: {cmd} (工作目录: {workdir})")
# 执行命令
proc = await run_in_softbv_env(conn, app_ctx.profile, cmd=cmd, cwd=workdir, check=False)
if proc.exit_status != 0:
# 即使失败,也尝试解析可能的错误信息
await ctx.warning(f"命令执行失败,退出码: {proc.exit_status}")
# 在失败时返回一个包含原始输出的空结果AI 可以从中读取错误
return PPTResult(
screening_factor=None,
unitary_properties=[],
binary_properties=[],
raw_output=f"Exit Status: {proc.exit_status}\n\nSTDOUT:\n{proc.stdout}\n\nSTDERR:\n{proc.stderr}"
)
# 解析成功的输出并返回结构化数据
parsed_result = _parse_ppt_output(proc.stdout)
return parsed_result
# 在 softbv_mcp_refactored.py 的工具定义区域添加
@mcp.tool(name="softbv_gen_cube")
async def softbv_gen_cube(
args: GenCubeDefaultArgs | GenCubeFullArgs,
cwd: str | None = None,
ctx: Context[ServerSession, SoftBVContext] | None = None,
) -> GenCubeResult:
"""
执行 'softBV.x --gen-cube' 以计算势函数。这是一个耗时任务。
支持两种调用模式:
1. 默认模式 (GenCubeDefaultArgs): 仅提供 CIF、离子类型和氧化态其他参数使用程序默认值。
2. 全参数模式 (GenCubeFullArgs): 提供所有可配置参数。
该工具会定期报告心跳以防超时,并在任务完成后返回详细的结构化结果。
"""
if ctx is None:
raise ValueError("Context is required for this operation.")
app_ctx = ctx.request_context.lifespan_context
conn = app_ctx.ssh_connection
workdir = cwd or app_ctx.workdir
# --- 1. 构造命令 ---
input_abs_path = args.input_cif
if not input_abs_path.startswith("/"):
input_abs_path = posixpath.normpath(posixpath.join(workdir, args.input_cif))
cmd_parts = [
shell_quote(app_ctx.bin_path),
shell_quote("--gen-cube"),
shell_quote(input_abs_path),
shell_quote(args.type),
shell_quote(str(args.os)),
]
# 根据传入的参数类型,决定是否添加额外参数
if isinstance(args, GenCubeFullArgs):
cmd_parts.extend([
shell_quote(str(args.sf)),
shell_quote(str(args.resolution)),
])
# 处理布尔标志
if args.ignore_conducting_ion:
cmd_parts.append(shell_quote("t")) # 't' for true
else:
cmd_parts.append(shell_quote("f")) # 'f' for false
if args.periodic:
cmd_parts.append(shell_quote("t"))
else:
cmd_parts.append(shell_quote("f"))
if args.output_name:
cmd_parts.append(shell_quote(args.output_name))
# --- 2. 准备长时任务 ---
import time
log_name = f"softbv_gencube_{int(time.time())}.log"
log_path = posixpath.join(workdir, log_name)
# 将标准输出和错误都重定向到日志文件
final_cmd = " ".join(cmd_parts) + f" > {shell_quote(log_path)} 2>&1"
await ctx.info(f"启动长任务: {final_cmd} (工作目录: {workdir})")
files_before = await _listdir_safe(conn, workdir)
proc = await _run_in_softbv_env_stream(conn, app_ctx.profile, cmd=final_cmd, cwd=workdir)
start_time = time.monotonic()
# --- 3. 心跳循环,防止超时 ---
try:
while proc.exit_status is None:
# 每10秒报告一次进度这本身就是心跳 [1, 6]
await asyncio.sleep(10)
elapsed = int(time.monotonic() - start_time)
log_size = await _stat_size_safe(conn, log_path)
await ctx.report_progress(
progress=min(elapsed / (30 * 60), 0.99), # 以30分钟为基准的近似进度
message=f"计算进行中... 已用时 {elapsed} 秒, 日志大小: {log_size or 0} 字节。"
)
finally:
# 确保进程结束
await proc.wait()
# --- 4. 收集并返回结果 ---
elapsed_seconds = int(time.monotonic() - start_time)
files_after = await _listdir_safe(conn, workdir)
new_files = sorted(set(files_after) - set(files_before))
# 读取日志文件的头和尾
log_head, log_tail = "", ""
try:
async with conn.start_sftp_client() as sftp:
async with sftp.open(log_path, "r", encoding="utf-8", errors="replace") as f:
lines = await f.readlines()
log_head = "".join(lines[:40])
log_tail = "".join(lines[-40:])
except Exception as e:
await ctx.warning(f"读取日志文件 '{log_path}' 失败: {e}")
result = GenCubeResult(
command_used=final_cmd,
working_directory=workdir,
exit_status=proc.exit_status,
log_file=log_path,
output_head=log_head,
output_tail=log_tail,
new_files=new_files,
elapsed_seconds=elapsed_seconds,
)
if result.exit_status == 0:
await ctx.info(f"gen-cube 任务成功完成, 耗时 {elapsed_seconds} 秒。")
else:
await ctx.error(f"gen-cube 任务失败, 退出码: {result.exit_status}。请检查日志: {log_path}")
return result
# 在 softbv_mcp_refactored.py 中,紧跟在 softbv_gen_cube 之后添加
# 在 softbv_mcp_refactored.py 中,找到并替换 softbv_calculate_bv 函数
@mcp.tool()
async def softbv_calculate_bv(
cif_path: str,
cwd: str | None = None,
ctx: Context[ServerSession, SoftBVContext] | None = None,
) -> CalBVResult:
"""
执行 'softBV.x --cal-bv',计算键价和。
返回一个包含关键摘要和数据预览的结构化对象,以避免内容过长。
"""
if ctx is None:
raise ValueError("Context is required for this operation.")
app_ctx = ctx.request_context.lifespan_context
conn = app_ctx.ssh_connection
workdir = cwd or app_ctx.workdir
input_abs_path = cif_path
if not input_abs_path.startswith("/"):
input_abs_path = posixpath.normpath(posixpath.join(workdir, cif_path))
cmd = f"{shell_quote(app_ctx.bin_path)} --cal-bv {shell_quote(input_abs_path)}"
await ctx.info(f"执行命令: {cmd} (工作目录: {workdir})")
proc = await run_in_softbv_env(conn, app_ctx.profile, cmd=cmd, cwd=workdir, check=False)
if proc.exit_status != 0:
await ctx.warning(f"命令执行失败,退出码: {proc.exit_status}")
# --- 关键修改:失败时返回与新模型匹配的空结构 ---
raw_error = f"Exit Status: {proc.exit_status}\n\nSTDOUT:\n{proc.stdout}\n\nSTDERR:\n{proc.stderr}"
return CalBVResult(
bv_sums_preview=[],
bv_pairs_preview=[],
raw_output_head=raw_error[:1000], # 截断以防错误信息也过长
raw_output_tail=""
)
# 解析成功的输出并返回包含摘要和预览的结构化数据
parsed_result = _parse_cal_bv_output(proc.stdout)
return parsed_result
@mcp.tool()
async def softbv_print_cube(
cube_path: str,
periodic: bool = True,
cwd: str | None = None,
ctx: Context[ServerSession, SoftBVContext] | None = None,
) -> PrintCubeResult:
"""
执行 'softBV.x --print-cube',读取并解析一个 .cube 文件。
Args:
cube_path (str): 远程服务器上的 .cube 文件路径。
periodic (bool): 是否将 cube 视为周期性的。默认为 True。
cwd (str | None): 远程工作目录。
ctx: MCP 上下文,由框架自动注入。
Returns:
PrintCubeResult: 一个包含从 .cube 文件中解析出的详细信息的结构化对象。
"""
if ctx is None:
raise ValueError("Context is required for this operation.")
app_ctx = ctx.request_context.lifespan_context
conn = app_ctx.ssh_connection
workdir = cwd or app_ctx.workdir
# 构建路径和命令
input_abs_path = cube_path
if not input_abs_path.startswith("/"):
input_abs_path = posixpath.normpath(posixpath.join(workdir, cube_path))
cmd_parts = [
shell_quote(app_ctx.bin_path),
shell_quote("--print-cube"),
shell_quote(input_abs_path)
]
# 添加可选的 periodic 标志
periodic_flag = "t" if periodic else "f"
cmd_parts.append(shell_quote(periodic_flag))
cmd = " ".join(cmd_parts)
await ctx.info(f"执行命令: {cmd} (工作目录: {workdir})")
# 执行命令
proc = await run_in_softbv_env(conn, app_ctx.profile, cmd=cmd, cwd=workdir, check=False)
if proc.exit_status != 0 or not proc.stdout:
await ctx.error(f"命令执行失败,退出码: {proc.exit_status}")
raise ValueError(f"Failed to print cube file. Exit Status: {proc.exit_status}\nSTDERR: {proc.stderr}")
try:
# 解析成功的输出并返回结构化数据
parsed_result = _parse_print_cube_output(proc.stdout)
return parsed_result
except Exception as e:
await ctx.error(f"解析 '--print-cube' 输出失败: {e}\n原始输出:\n{proc.stdout}")
raise ValueError(f"Failed to parse output: {e}")
# 在 softbv_mcp_refactored.py 的工具区域添加这三个函数
@mcp.tool()
async def softbv_calculate_gii(
cif_path: str,
cwd: str | None = None,
ctx: Context[ServerSession, SoftBVContext] | None = None,
) -> CalGIIResult:
"""
执行 'softBV.x --cal-gii',计算并返回全局不稳定性指数 (GII)。
"""
if ctx is None: raise ValueError("Context is required.")
app_ctx = ctx.request_context.lifespan_context
conn = app_ctx.ssh_connection
workdir = cwd or app_ctx.workdir
input_abs_path = cif_path
if not input_abs_path.startswith("/"):
input_abs_path = posixpath.normpath(posixpath.join(workdir, cif_path))
cmd = f"{shell_quote(app_ctx.bin_path)} --cal-gii {shell_quote(input_abs_path)}"
await ctx.info(f"Executing: {cmd}")
proc = await run_in_softbv_env(conn, app_ctx.profile, cmd=cmd, cwd=workdir, check=False)
if proc.exit_status != 0:
await ctx.warning(f"Command failed with exit code {proc.exit_status}")
return CalGIIResult(raw_output=f"Exit Status: {proc.exit_status}\n\nSTDERR:\n{proc.stderr}")
return _parse_cal_gii_output(proc.stdout)
@mcp.tool()
async def softbv_calculate_cn(
cif_path: str,
atom_name: str,
rds_cutoff: float | None = None,
cwd: str | None = None,
ctx: Context[ServerSession, SoftBVContext] | None = None,
) -> CalCNResult:
"""
执行 'softBV.x --cal-cn',计算指定原子的配位数。
"""
if ctx is None: raise ValueError("Context is required.")
app_ctx = ctx.request_context.lifespan_context
conn = app_ctx.ssh_connection
workdir = cwd or app_ctx.workdir
input_abs_path = cif_path
if not input_abs_path.startswith("/"):
input_abs_path = posixpath.normpath(posixpath.join(workdir, cif_path))
cmd_parts = [
shell_quote(app_ctx.bin_path),
shell_quote("--cal-cn"),
shell_quote(input_abs_path),
shell_quote(atom_name)
]
if rds_cutoff is not None:
cmd_parts.append(shell_quote(str(rds_cutoff)))
cmd = " ".join(cmd_parts)
await ctx.info(f"Executing: {cmd}")
proc = await run_in_softbv_env(conn, app_ctx.profile, cmd=cmd, cwd=workdir, check=False)
if proc.exit_status != 0:
await ctx.warning(f"Command failed with exit code {proc.exit_status}")
return CalCNResult(raw_output=f"Exit Status: {proc.exit_status}\n\nSTDERR:\n{proc.stderr}",
coordination_details=[])
return _parse_cal_cn_output(proc.stdout)
@mcp.tool()
async def softbv_calculate_sym(
cif_path: str,
tolerance: float | None = None,
cwd: str | None = None,
ctx: Context[ServerSession, SoftBVContext] | None = None,
) -> CalSymResult:
"""
执行 'softBV.x --cal-sym',找出结构的空间群。
"""
if ctx is None: raise ValueError("Context is required.")
app_ctx = ctx.request_context.lifespan_context
conn = app_ctx.ssh_connection
workdir = cwd or app_ctx.workdir
input_abs_path = cif_path
if not input_abs_path.startswith("/"):
input_abs_path = posixpath.normpath(posixpath.join(workdir, cif_path))
cmd_parts = [
shell_quote(app_ctx.bin_path),
shell_quote("--cal-sym"),
shell_quote(input_abs_path)
]
if tolerance is not None:
cmd_parts.append(shell_quote(str(tolerance)))
cmd = " ".join(cmd_parts)
await ctx.info(f"Executing: {cmd}")
proc = await run_in_softbv_env(conn, app_ctx.profile, cmd=cmd, cwd=workdir, check=False)
if proc.exit_status != 0:
await ctx.warning(f"Command failed with exit code {proc.exit_status}")
return CalSymResult(raw_output=f"Exit Status: {proc.exit_status}\n\nSTDERR:\n{proc.stderr}",
fits_to_symmetries=[])
return _parse_cal_sym_output(proc.stdout)
@mcp.tool()
async def softbv_calculate_total_energy(
cif_path: str,
sf: float | None = None,
cwd: str | None = None,
ctx: Context[ServerSession, SoftBVContext] | None = None,
) -> CalTotEnResult:
"""
执行 'softBV.x --cal-tot-en',计算并返回总能量。
Args:
cif_path (str): 远程 CIF 文件路径。
sf (float | None): 可选的 screening factor。
cwd (str | None): 远程工作目录。
ctx: MCP 上下文,由框架自动注入。
Returns:
CalTotEnResult: 包含总能量和所用 sf 值的结构化结果。
"""
if ctx is None:
raise ValueError("Context is required for this operation.")
app_ctx = ctx.request_context.lifespan_context
conn = app_ctx.ssh_connection
workdir = cwd or app_ctx.workdir
# 构建路径和命令
input_abs_path = cif_path
if not input_abs_path.startswith("/"):
input_abs_path = posixpath.normpath(posixpath.join(workdir, cif_path))
cmd_parts = [
shell_quote(app_ctx.bin_path),
shell_quote("--cal-tot-en"),
shell_quote(input_abs_path)
]
if sf is not None:
cmd_parts.append(shell_quote(str(sf)))
cmd = " ".join(cmd_parts)
await ctx.info(f"执行命令: {cmd}")
# 执行命令
proc = await run_in_softbv_env(conn, app_ctx.profile, cmd=cmd, cwd=workdir, check=False)
if proc.exit_status != 0:
await ctx.warning(f"命令执行失败,退出码: {proc.exit_status}")
return CalTotEnResult(
raw_output=f"Exit Status: {proc.exit_status}\n\nSTDOUT:\n{proc.stdout}\n\nSTDERR:\n{proc.stderr}"
)
# 解析并返回结果
return _parse_cal_tot_en_output(proc.stdout)
@mcp.tool()
async def softbv_analyze_pathway(
args: AnalyzePathwayArgs,
cwd: str | None = None,
ctx: Context[ServerSession, SoftBVContext] | None = None,
) -> AnalyzePathwayResult:
"""
执行 'softBV.x --gh',分析传导路径并返回渗流阈值。
Args:
args (AnalyzePathwayArgs): 包含所有输入参数的结构化对象。
cwd (str | None): 远程工作目录。
ctx: MCP 上下文,由框架自动注入。
Returns:
AnalyzePathwayResult: 包含渗流阈值、新生成文件列表和原始输出的结构化结果。
"""
if ctx is None:
raise ValueError("Context is required for this operation.")
app_ctx = ctx.request_context.lifespan_context
conn = app_ctx.ssh_connection
workdir = cwd or app_ctx.workdir
# --- 1. 构建命令 ---
def get_abs(path: str) -> str:
return path if path.startswith("/") else posixpath.normpath(posixpath.join(workdir, path))
cmd_parts = [
shell_quote(app_ctx.bin_path),
shell_quote("--gh"),
shell_quote(get_abs(args.input_cif)),
shell_quote(get_abs(args.input_cube)),
shell_quote(args.type),
shell_quote(str(args.os)),
]
if args.barrier_max is not None:
cmd_parts.append(shell_quote(str(args.barrier_max)))
periodic_flag = "t" if args.periodic else "f"
cmd_parts.append(shell_quote(periodic_flag))
cmd = " ".join(cmd_parts)
await ctx.info(f"执行路径分析: {cmd}")
# --- 2. 执行并收集结果 ---
files_before = await _listdir_safe(conn, workdir)
proc = await run_in_softbv_env(conn, app_ctx.profile, cmd=cmd, cwd=workdir, check=False)
files_after = await _listdir_safe(conn, workdir)
new_files = sorted(set(files_after) - set(files_before))
if proc.exit_status != 0:
await ctx.warning(f"'--gh' 命令执行失败,退出码: {proc.exit_status}")
return AnalyzePathwayResult(
command_used=cmd,
working_directory=workdir,
exit_status=proc.exit_status,
thresholds=None,
new_files=new_files,
raw_output=f"Exit Status: {proc.exit_status}\n\nSTDOUT:\n{proc.stdout}\n\nSTDERR:\n{proc.stderr}"
)
# --- 3. 解析并返回结构化结果 ---
parsed_thresholds = _parse_gh_output(proc.stdout)
if not parsed_thresholds:
await ctx.info("在输出中未找到渗流阈值信息。")
return AnalyzePathwayResult(
command_used=cmd,
working_directory=workdir,
exit_status=proc.exit_status,
thresholds=parsed_thresholds,
new_files=new_files,
raw_output=proc.stdout
)
# 在 softbv_mcp_refactored.py 的工具区域添加这个函数
@mcp.tool(name="softbv_run_md")
async def softbv_run_md(
args: MDDefaultArgs | MDFullArgs,
cwd: str | None = None,
ctx: Context[ServerSession, SoftBVContext] | None = None,
) -> MDResult:
"""
执行 'softBV.x --md' 进行分子动力学计算。这是一个耗时任务。
支持两种调用模式:
1. 默认模式 (MDDefaultArgs): 仅提供 CIF、离子类型和氧化态。
2. 全参数模式 (MDFullArgs): 提供所有 MD 参数。
该工具会定期报告心跳,并在任务完成后返回包含电导率等关键信息的结构化结果。
"""
if ctx is None:
raise ValueError("Context is required for this operation.")
app_ctx = ctx.request_context.lifespan_context
conn = app_ctx.ssh_connection
workdir = cwd or app_ctx.workdir
# --- 1. 构造命令 ---
input_abs_path = args.input_cif
if not input_abs_path.startswith("/"):
input_abs_path = posixpath.normpath(posixpath.join(workdir, args.input_cif))
cmd_parts = [
shell_quote(app_ctx.bin_path),
shell_quote("--md"),
shell_quote(input_abs_path),
shell_quote(args.type),
shell_quote(str(args.os)),
]
if isinstance(args, MDFullArgs):
cmd_parts.extend([
shell_quote(str(args.sf)),
shell_quote(str(args.temperature)),
shell_quote(str(args.t_end)),
shell_quote(str(args.t_equil)),
shell_quote(str(args.dt)),
shell_quote(str(args.t_log)),
])
# --- 2. 准备长时任务 ---
import time
log_name = f"softbv_md_{int(time.time())}.log"
log_path = posixpath.join(workdir, log_name)
final_cmd = " ".join(cmd_parts) + f" > {shell_quote(log_path)} 2>&1"
await ctx.info(f"启动 MD 长任务: {final_cmd} (工作目录: {workdir})")
files_before = await _listdir_safe(conn, workdir)
proc = await _run_in_softbv_env_stream(conn, app_ctx.profile, cmd=final_cmd, cwd=workdir)
start_time = time.monotonic()
# --- 3. 心跳循环,防止超时 ---
try:
while proc.exit_status is None:
await asyncio.sleep(10) # 每 10 秒报告一次
elapsed = int(time.monotonic() - start_time)
log_size = await _stat_size_safe(conn, log_path)
# 使用 ctx.report_progress 作为心跳机制 [1]
await ctx.report_progress(
progress=min(elapsed / (30 * 60), 0.99), # 以30分钟为基准的近似进度
message=f"MD 计算进行中... 已用时 {elapsed} 秒, 日志大小: {log_size or 0} 字节。"
)
finally:
await proc.wait() # 确保进程结束
# --- 4. 收集并返回结果 ---
elapsed_seconds = int(time.monotonic() - start_time)
files_after = await _listdir_safe(conn, workdir)
new_files = sorted(set(files_after) - set(files_before))
# 读取日志文件的头和尾
log_head, log_tail, full_log = "", "", ""
try:
async with conn.start_sftp_client() as sftp:
async with sftp.open(log_path, "r", encoding="utf-8", errors="replace") as f:
lines = await f.readlines()
full_log = "".join(lines)
log_head = "".join(lines[:40])
log_tail = "".join(lines[-40:])
except Exception as e:
await ctx.warning(f"读取日志文件 '{log_path}' 失败: {e}")
# 从日志中解析最终属性
final_properties = _parse_md_output(full_log)
result = MDResult(
command_used=final_cmd,
working_directory=workdir,
exit_status=proc.exit_status,
final_properties=final_properties,
log_file=log_path,
output_head=log_head,
output_tail=log_tail,
new_files=new_files,
elapsed_seconds=elapsed_seconds,
)
if result.exit_status == 0:
await ctx.info(f"MD 任务成功完成, 耗时 {elapsed_seconds} 秒。")
else:
await ctx.error(f"MD 任务失败, 退出码: {result.exit_status}。请检查日志: {log_path}")
return result
# 在 softbv_mcp_refactored.py 的工具区域添加这个函数
@mcp.tool(name="softbv_run_kmc")
async def softbv_run_kmc(
args: KMCDefaultArgs | KMCFullArgs,
cwd: str | None = None,
ctx: Context[ServerSession, SoftBVContext] | None = None,
) -> KMCResult:
"""
执行 'softBV.x --kmc' 进行动力学蒙特卡洛计算。这是一个耗时任务。
支持两种调用模式: 默认模式和全参数模式。
"""
if ctx is None:
raise ValueError("Context is required for this operation.")
app_ctx = ctx.request_context.lifespan_context
conn = app_ctx.ssh_connection
workdir = cwd or app_ctx.workdir
# --- 1. 构造命令 ---
def get_abs(path: str) -> str:
return path if path.startswith("/") else posixpath.normpath(posixpath.join(workdir, path))
cmd_parts = [
shell_quote(app_ctx.bin_path),
shell_quote("--kmc"),
shell_quote(get_abs(args.input_cif)),
shell_quote(get_abs(args.input_cube)),
shell_quote(args.type),
shell_quote(str(args.os)),
]
if isinstance(args, KMCFullArgs):
if args.supercell:
cmd_parts.extend(map(lambda x: shell_quote(str(x)), args.supercell))
# 使用一个循环来处理所有可选的浮点/整型参数
for param in ['sf', 'temperature', 't_limit', 'step_limit', 'step_log', 'cutoff']:
value = getattr(args, param, None)
if value is not None:
cmd_parts.append(shell_quote(str(value)))
# --- 2. 准备长时任务 (与 gen-cube 和 md 相同) ---
import time
log_name = f"softbv_kmc_{int(time.time())}.log"
log_path = posixpath.join(workdir, log_name)
final_cmd = " ".join(cmd_parts) + f" > {shell_quote(log_path)} 2>&1"
await ctx.info(f"启动 KMC 长任务: {final_cmd} (工作目录: {workdir})")
files_before = await _listdir_safe(conn, workdir)
proc = await _run_in_softbv_env_stream(conn, app_ctx.profile, cmd=final_cmd, cwd=workdir)
start_time = time.monotonic()
# --- 3. 心跳循环 (与 gen-cube 和 md 相同) ---
try:
while proc.exit_status is None:
await asyncio.sleep(10)
elapsed = int(time.monotonic() - start_time)
log_size = await _stat_size_safe(conn, log_path)
await ctx.report_progress(
progress=min(elapsed / (30 * 60), 0.99),
message=f"KMC 计算进行中... 已用时 {elapsed} 秒, 日志大小: {log_size or 0} 字节。"
)
finally:
await proc.wait()
# --- 4. 收集并返回结果 ---
elapsed_seconds = int(time.monotonic() - start_time)
files_after = await _listdir_safe(conn, workdir)
new_files = sorted(set(files_after) - set(files_before))
log_head, log_tail, full_log = "", "", ""
try:
async with conn.start_sftp_client() as sftp:
async with sftp.open(log_path, "r", encoding="utf-8", errors="replace") as f:
lines = await f.readlines()
full_log = "".join(lines)
log_head = "".join(lines[:40])
log_tail = "".join(lines[-40:])
except Exception as e:
await ctx.warning(f"读取日志文件 '{log_path}' 失败: {e}")
# 从完整日志中解析最终属性
final_properties = _parse_kmc_output(full_log)
result = KMCResult(
command_used=final_cmd,
working_directory=workdir,
exit_status=proc.exit_status,
final_properties=final_properties,
log_file=log_path,
output_head=log_head,
output_tail=log_tail,
new_files=new_files,
elapsed_seconds=elapsed_seconds,
)
if result.exit_status == 0:
await ctx.info(f"KMC 任务成功完成, 耗时 {elapsed_seconds} 秒。")
else:
await ctx.error(f"KMC 任务失败, 退出码: {result.exit_status}。请检查日志: {log_path}")
return result
# ========= 6. 工厂函数与主程序入口 =========
def create_softbv_mcp() -> FastMCP:
"""供外部(如 Starlette)导入的工厂函数。"""
return mcp
if __name__ == "__main__":
"""
如果直接运行此文件,则启动一个独立的 MCP 服务器。
"""
print("正在以独立模式启动 softBV MCP 服务器...")
mcp.run(transport="streamable-http")
# 启动后,您可以使用 `mcp dev` 或其他客户端工具连接到 http://localhost:8000