import shutil import logging from pathlib import Path from .base_module import BaseModule class PreheatModule(BaseModule): def __init__(self, driver, iter_id): super().__init__(driver, iter_id) self.template_subdir = "00_md" def get_work_dir(self): return self.iter_dir / "00.md" / "preheat" def initialize(self): super().initialize() # 创建目录 work_dir = self.get_work_dir() # 1. 准备 run.in (从配置读取模板名) template_name = self.config_param['params']['preheat'].get('template_file', 'run.in') self.copy_template(template_name, "run.in") # 2. 准备 nep.in (GPUMD 运行必需,虽然内容可能很简单) self.copy_template("nep.in") # 3. 准备 nep.txt (势函数) self._prepare_potential() # 4. 准备 model.xyz (结构) self._prepare_structure() def _prepare_potential(self): """准备势函数文件 nep.txt""" dst = self.get_work_dir() / "nep.txt" if self.iter_id == 1: # 第一轮:使用 system.yaml 里定义的初始势 init_pot = Path(self.config_sys['system']['initial_potential']) if not init_pot.exists(): raise FileNotFoundError(f"Initial potential not found: {init_pot}") shutil.copy(init_pot, dst) self.logger.info(f" -> Copied initial potential: {init_pot.name}") else: # 后续轮次:使用上一轮训练结果 prev_iter = f"iter_{self.iter_id - 1:03d}" prev_train_dir = self.root / prev_iter / "03.train" src = prev_train_dir / "nep.txt" if not src.exists(): raise FileNotFoundError(f"Previous potential not found: {src}") shutil.copy(src, dst) self.logger.info(f" -> Copied potential from {prev_iter}") def _prepare_structure(self): """准备 model.xyz""" work_dir = self.get_work_dir() # 目前逻辑:Preheat 总是从初始结构开始(或者你可以改为从上一轮的 dump 中取) # 这里演示从 VASP 文件转换 vasp_path = Path(self.config_sys['system']['initial_structure']) if not vasp_path.exists(): raise FileNotFoundError(f"Structure file not found: {vasp_path}") # 复制到工作目录 local_vasp = work_dir / vasp_path.name shutil.copy(vasp_path, local_vasp) # 调用 gpumdkit.sh -addlabel 进行转换 # 命令格式: gpumdkit.sh -addlabel file.vasp Li Y Cl elements = " ".join(self.config_sys['system']['elements']) self.logger.info(" -> Converting VASP to model.xyz...") # 使用 runner 调用 gpumdkit (必须在 machine.yaml 里定义了 'gpumdkit') # 注意:gpumdkit.sh 可能不输出 model.xyz 而是输出 file.xyz,需要确认 # 假设输出为 model.xyz cmd_args = f"-addlabel {local_vasp.name} {elements}" self.runner.run("gpumdkit", cwd=work_dir, extra_args=cmd_args) # 检查是否生成成功 if not (work_dir / "model.xyz").exists(): # 有时候 gpumdkit 生成的文件名可能是 LiYCl.xyz,需要重命名为 model.xyz # 这里做一个容错检查 expected_name = local_vasp.stem + ".xyz" # e.g., LiYCl.xyz if (work_dir / expected_name).exists(): shutil.move(work_dir / expected_name, work_dir / "model.xyz") else: raise RuntimeError("Failed to generate model.xyz from gpumdkit") def run(self): """执行 GPUMD""" work_dir = self.get_work_dir() # 检查是否已经跑完 (简单的锁文件机制) if (work_dir / "thermo.out").exists(): self.logger.info(f" -> Pre-check: thermo.out exists, skipping preheat.") # 这里可以加更复杂的检查,比如步数是否足够 return self.logger.info(f"🔥 Running Preheat in {self.iter_name}") self.initialize() # 调用 GPUMD # GPUMD 没有参数,直接运行 self.runner.run("gpumd", cwd=work_dir) self.logger.info(" -> Preheat finished.") def check_done(self): # 简单检查 thermo.out 是否存在且非空 f = self.get_work_dir() / "thermo.out" return f.exists() and f.stat().st_size > 0