import shutil import subprocess import glob from pathlib import Path from nep_auto.modules.base_module import BaseModule class SCFModule(BaseModule): def __init__(self, driver, iter_id): super().__init__(driver, iter_id) self.template_subdir = "02_scf" self.work_dir = self.iter_dir / "02.scf" self.select_dir = self.iter_dir / "01.select" def get_work_dir(self): return self.work_dir def run(self): self.logger.info(f"⚛️ [SCF] Starting DFT Calculation Iter {self.iter_id}...") self.initialize() # ---------------------------------------- # 1. 准备数据: selected.xyz -> 301 切分 # ---------------------------------------- src_xyz = self.select_dir / "selected.xyz" if not src_xyz.exists(): raise FileNotFoundError("selected.xyz missing from select module") shutil.copy(src_xyz, self.work_dir / "selected.xyz") # 调用 gpumdkit.sh (301 -> prefix) # Prefix 使用 "task" 或者 "job",生成 job_1, job_2... prefix = "task" input_str = f"301\n{prefix}\n" gpumdkit_cmd = self.machine_config['tools']['gpumdkit']['command'] self.logger.info(" -> Splitting structures using gpumdkit...") try: subprocess.run( gpumdkit_cmd, input=input_str, cwd=self.work_dir, shell=True, executable="/bin/bash", stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, check=True ) except subprocess.CalledProcessError as e: self.logger.error(f"gpumdkit splitting failed: {e.stderr}") raise # ---------------------------------------- # 2. 准备 DFT 输入文件 (fp 文件夹) # ---------------------------------------- # gpumdkit 会生成一个 fp 文件夹,我们需要把模版放进去 fp_dir = self.work_dir / "fp" if not fp_dir.exists(): # 某些版本的脚本可能不自动创建 fp,手动建一个保险 fp_dir.mkdir(exist_ok=True) self.logger.info(" -> preparing INCAR/KPOINTS/POTCAR...") # 从 template/02_scf 复制到 02.scf/fp self.copy_template("INCAR", target_name=None) shutil.copy(self.work_dir / "INCAR", fp_dir / "INCAR") self.copy_template("KPOINTS", target_name=None) shutil.copy(self.work_dir / "KPOINTS", fp_dir / "KPOINTS") self.copy_template("POTCAR", target_name=None) shutil.copy(self.work_dir / "POTCAR", fp_dir / "POTCAR") # ---------------------------------------- # 3. 分发文件并提交任务 # ---------------------------------------- # 找到所有生成的文件夹 (task_1, task_2...) task_dirs = sorted(list(self.work_dir.glob(f"{prefix}_*"))) if not task_dirs: raise RuntimeError(f"No {prefix}_* folders generated!") self.logger.info(f" -> Found {len(task_dirs)} tasks. Distributing input files...") # 将 fp 里的文件分发到每个 task 文件夹 (替代 presub.sh 的功能) common_files = ["INCAR", "KPOINTS", "POTCAR"] for t_dir in task_dirs: if not t_dir.is_dir(): continue for f in common_files: shutil.copy(fp_dir / f, t_dir / f) # 提交计算 self.logger.info(" -> Running VASP jobs...") success_count = 0 # 这里的并行策略取决于 machine.yaml # 如果是 Interactive GPU,我们通常是串行跑,或者一次跑 N 个 # 这里先简单实现串行跑 for t_dir in task_dirs: self.logger.info(f" -> Running {t_dir.name}...") try: # 调用 machine.yaml 里的 vasp 工具 self.runner.run("vasp", cwd=t_dir) if (t_dir / "OUTCAR").exists(): # 简单判据 success_count += 1 except Exception as e: self.logger.error(f"Job {t_dir.name} failed: {e}") self.logger.info(f" -> Finished. Success: {success_count}/{len(task_dirs)}") # ---------------------------------------- # 4. 收集结果 (OUTCARs -> NEP-dataset.xyz) # ---------------------------------------- # 使用 gpumdkit 104 功能: Format Conversion -> OUTCAR to xyz (需提供路径) # 或者 108? 根据你的描述是 gpumdkit.sh -out2xyz . self.logger.info(" -> Converting OUTCARs to NEP-dataset.xyz...") # 方式 A: 命令行参数调用 (如果你确认支持) # cmd = f"{gpumdkit_cmd} -out2xyz ." # 方式 B: 交互式调用 (104/108) - 这里假设 -out2xyz 可用,这是最方便的 # 如果不支持,我们需要知道交互式的代码。根据你的描述 7: "-out2xyz ." try: # 尝试直接调用 -out2xyz subprocess.run( f"{gpumdkit_cmd} -out2xyz .", cwd=self.work_dir, shell=True, executable="/bin/bash", check=True ) # gpumdkit 通常生成 model.xyz 或 out.xyz,我们需要重命名为 NEP-dataset.xyz # 假设生成的是 model.xyz potential_outputs = ["model.xyz", "movie.xyz", "out.xyz"] found = False for f in potential_outputs: if (self.work_dir / f).exists(): shutil.move(self.work_dir / f, self.work_dir / "NEP-dataset.xyz") found = True break if not found and not (self.work_dir / "NEP-dataset.xyz").exists(): # 如果没找到,可能已经在子文件夹里? pass except subprocess.CalledProcessError: self.logger.warning("gpumdkit -out2xyz failed, falling back to ASE...") # Fallback: 使用 ASE 收集 (更稳健) from ase.io import read, write all_atoms = [] for t_dir in task_dirs: try: all_atoms.append(read(t_dir / "OUTCAR", format="vasp-outcar")) except: pass if all_atoms: write(self.work_dir / "NEP-dataset.xyz", all_atoms, format="extxyz") self.check_done() def check_done(self): if (self.work_dir / "NEP-dataset.xyz").exists(): return True raise RuntimeError("SCF failed: NEP-dataset.xyz not generated")