diff --git a/nep_auto/modules/m2_select.py b/nep_auto/modules/m2_select.py index 419e1ff..707e0bc 100644 --- a/nep_auto/modules/m2_select.py +++ b/nep_auto/modules/m2_select.py @@ -1,5 +1,5 @@ import shutil -import re +import subprocess from pathlib import Path from nep_auto.modules.base_module import BaseModule @@ -14,14 +14,16 @@ class SelectModule(BaseModule): return self.work_dir def get_frame_count(self, xyz_file): - """读取 xyz 文件帧数 (简单通过 grep 'Lattice' 计数,或用 ASE)""" + """读取 xyz 文件帧数 (通过 grep 'Lattice' 计数)""" if not xyz_file.exists(): return 0 - # 简单方法:读取文件统计 Lattice 出现的次数 (ExtXYZ 格式) try: - with open(xyz_file, 'r') as f: - content = f.read() - return content.count("Lattice=") + # 使用 grep -c 更快,避免 python 读取大文件内存溢出 + result = subprocess.run( + f"grep -c 'Lattice' {xyz_file}", + shell=True, stdout=subprocess.PIPE, text=True + ) + return int(result.stdout.strip()) except: return 0 @@ -29,73 +31,114 @@ class SelectModule(BaseModule): self.logger.info(f"🔍 [Select] Starting Active Learning Selection Iter {self.iter_id}...") self.initialize() - # 准备数据 + # ---------------------------------------- + # 1. 准备必要文件 + # ---------------------------------------- + # A. 待筛选数据 (从 MD 结果拿) src_dump = self.md_dir / "dump.xyz" - train_xyz_prev = self.root / "00.data" / "train.xyz" # 或者是上一轮的 train - # 如果是 iter > 1,train.xyz 应该是累积的。这里简化,先假设有一个参考的 train.xyz - - # 必须文件:dump.xyz, train.xyz, nep.txt + if not src_dump.exists(): + raise FileNotFoundError(f"MD dump missing: {src_dump}") shutil.copy(src_dump, self.work_dir / "dump.xyz") - # 这里的 train.xyz 是给 neptrain_select_structs.py 用作参考的 - if self.iter_id == 1: - # 第一轮可以用 data 里的初始文件,或者做一个空的 - pass - else: - # 复制上一轮的 train.xyz - pass - - # 复制 nep.txt + # B. 势函数 (从 MD 结果拿) shutil.copy(self.md_dir / "nep.txt", self.work_dir / "nep.txt") - # 读取参数 + # C. 历史训练集 (用于对比) + # 逻辑:如果是第一轮,我们需要一个初始的 train.xyz (即使是空的或者是 model.xyz) + # gpumdkit 需要这个文件存在 + target_train_xyz = self.work_dir / "train.xyz" + + if self.iter_id == 1: + # 尝试从 data 目录拿初始训练集,如果没有,可以用 model.xyz 充数 + init_train = self.root / "00.data" / "train.xyz" + if init_train.exists(): + shutil.copy(init_train, target_train_xyz) + else: + # 如果实在没有,把初始结构当做 train.xyz,避免脚本报错 + self.logger.warning("No initial train.xyz found, using model.xyz as placeholder.") + shutil.copy(self.md_dir / "model.xyz", target_train_xyz) + else: + # 使用上一轮累积的训练集 + prev_train = self.root / f"iter_{self.iter_id - 1:03d}" / "03.train" / "train.xyz" + if prev_train.exists(): + shutil.copy(prev_train, target_train_xyz) + else: + raise FileNotFoundError(f"Previous train.xyz missing: {prev_train}") + + # ---------------------------------------- + # 2. 循环筛选 (调整阈值) + # ---------------------------------------- cfg = self.config_param['params']['select'] target_min = cfg.get('target_min', 60) target_max = cfg.get('target_max', 120) threshold = cfg.get('init_threshold', 0.01) - kit_root = self.driver.config_param['env']['gpumdkit_root'] - script = f"{kit_root}/Scripts/sample_structures/neptrain_select_structs.py" - - # 循环筛选 max_attempts = 10 attempt = 0 + # gpumdkit 命令 (假设 machine.yaml 里配好了 tool 叫 'gpumdkit') + # 如果是 local 模式,runner.run 实际上是执行 command。 + # 但这里我们需要特殊的 input pipe,runner 的通用接口可能不够用。 + # 既然我们明确是 local 环境且用 pipe,直接用 subprocess 最稳。 + gpumdkit_cmd = self.machine_config['tools']['gpumdkit']['command'] # e.g. "gpumdkit.sh" + while attempt < max_attempts: - self.logger.info(f" -> Attempt {attempt + 1}: Threshold = {threshold}") + self.logger.info(f" -> Attempt {attempt + 1}: Threshold = {threshold:.5f}") - # 构造命令: python script dump.xyz train.xyz nep.txt [options] - # 注意:如果你的脚本不支持命令行传参阈值,需要修改脚本或用 sed 修改 - # 假设脚本已经被修改支持 --distance {threshold},或者我们用一种 hack 方式 - # 既然原流程是交互式的,这里强烈建议你修改 neptrain_select_structs.py - # 让它支持命令行参数:parser.add_argument('--distance', ...) + # 构造输入流字符串 + # 对应你的流程: 203 -> file names -> 1 (distance mode) -> threshold + input_str = f"203\ndump.xyz train.xyz nep.txt\n1\n{threshold}\n" - cmd_args = f"{script} dump.xyz train.xyz nep.txt --distance {threshold} --auto_confirm" + # 构造完整命令: echo -e "..." | gpumdkit.sh + # 注意:python 的 input 参数直接传给 stdin,不需要用 echo | try: - self.runner.run("python_script", cwd=self.work_dir, extra_args=cmd_args) - except Exception as e: - self.logger.warning(f"Select script warning: {e}") + self.logger.debug(f" Input string: {repr(input_str)}") - # 检查结果 + process = subprocess.run( + gpumdkit_cmd, + input=input_str, + cwd=self.work_dir, + shell=True, + executable="/bin/bash", + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + text=True + ) + + # 记录输出以便 debug + # self.logger.debug(process.stdout) + + if process.returncode != 0: + self.logger.error(f"gpumdkit execution failed: {process.stderr}") + raise RuntimeError("gpumdkit failed") + + except Exception as e: + self.logger.error(f"Execution error: {e}") + raise + + # 检查 selected.xyz selected_file = self.work_dir / "selected.xyz" count = self.get_frame_count(selected_file) self.logger.info(f" -> Selected {count} structures.") if target_min <= count <= target_max: - self.logger.info("✅ Selection criteria met!") + self.logger.info(f"✅ Selection success! ({count} frames)") break elif count < target_min: - self.logger.info(" -> Too few, lowering threshold (-0.01)...") - threshold = threshold - 0.01 + self.logger.info(" -> Too few, lowering threshold (x0.8)...") + threshold *= 0.8 else: - self.logger.info(" -> Too many, raising threshold (+0.01)...") - threshold = threshold + 0.01 + self.logger.info(" -> Too many, raising threshold (x1.2)...") + threshold *= 1.2 + + # 稍微清理一下生成的中间文件,防止下次干扰? + # selected.xyz 会被下次覆盖,所以不删也行。 attempt += 1 if attempt >= max_attempts: - self.logger.warning("⚠️ Max attempts reached in selection. Proceeding with current best.") + self.logger.warning("⚠️ Max attempts reached. Proceeding with current best.") self.check_done() diff --git a/nep_auto/modules/m3_scf.py b/nep_auto/modules/m3_scf.py index 0f73a85..4154d3b 100644 --- a/nep_auto/modules/m3_scf.py +++ b/nep_auto/modules/m3_scf.py @@ -1,6 +1,7 @@ import shutil +import subprocess +import glob from pathlib import Path -from ase.io import read, write from nep_auto.modules.base_module import BaseModule @@ -18,70 +19,145 @@ class SCFModule(BaseModule): self.logger.info(f"⚛️ [SCF] Starting DFT Calculation Iter {self.iter_id}...") self.initialize() - # 1. 读取 selected.xyz - selected_xyz = self.select_dir / "selected.xyz" - if not selected_xyz.exists(): - raise FileNotFoundError("selected.xyz missing") + # ---------------------------------------- + # 1. 准备数据: selected.xyz -> 301 切分 + # ---------------------------------------- + src_xyz = self.select_dir / "selected.xyz" + if not src_xyz.exists(): + raise FileNotFoundError("selected.xyz missing from select module") - self.logger.info(" -> Reading structures using ASE...") - atoms_list = read(selected_xyz, index=':') - self.logger.info(f" -> Found {len(atoms_list)} structures.") + shutil.copy(src_xyz, self.work_dir / "selected.xyz") - # 2. 准备任务文件夹 - task_dirs = [] - for i, atoms in enumerate(atoms_list): - task_name = f"task.{i:03d}" - task_dir = self.work_dir / task_name - task_dir.mkdir(exist_ok=True) - task_dirs.append(task_dir) + # 调用 gpumdkit.sh (301 -> prefix) + # Prefix 使用 "task" 或者 "job",生成 job_1, job_2... + prefix = "task" + input_str = f"301\n{prefix}\n" - # 写 POSCAR - write(task_dir / "POSCAR", atoms, format='vasp') + gpumdkit_cmd = self.machine_config['tools']['gpumdkit']['command'] - # 复制模版 INCAR, KPOINTS, POTCAR - self.copy_template("INCAR", target_name=None) # 复制到 self.work_dir - shutil.copy(self.work_dir / "INCAR", task_dir / "INCAR") # 再分发 - self.copy_template("KPOINTS", target_name=None) - shutil.copy(self.work_dir / "KPOINTS", task_dir / "KPOINTS") - self.copy_template("POTCAR", target_name=None) - shutil.copy(self.work_dir / "POTCAR", task_dir / "POTCAR") + self.logger.info(" -> Splitting structures using gpumdkit...") + try: + subprocess.run( + gpumdkit_cmd, + input=input_str, + cwd=self.work_dir, + shell=True, + executable="/bin/bash", + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + text=True, + check=True + ) + except subprocess.CalledProcessError as e: + self.logger.error(f"gpumdkit splitting failed: {e.stderr}") + raise - # 3. 提交任务 - # 这里区分 local 模式和 slurm 模式 - # 既然你目前是 interactive gpu,我们假设是串行或者简单的并行 + # ---------------------------------------- + # 2. 准备 DFT 输入文件 (fp 文件夹) + # ---------------------------------------- + # gpumdkit 会生成一个 fp 文件夹,我们需要把模版放进去 + fp_dir = self.work_dir / "fp" + if not fp_dir.exists(): + # 某些版本的脚本可能不自动创建 fp,手动建一个保险 + fp_dir.mkdir(exist_ok=True) + + self.logger.info(" -> preparing INCAR/KPOINTS/POTCAR...") + # 从 template/02_scf 复制到 02.scf/fp + self.copy_template("INCAR", target_name=None) + shutil.copy(self.work_dir / "INCAR", fp_dir / "INCAR") + + self.copy_template("KPOINTS", target_name=None) + shutil.copy(self.work_dir / "KPOINTS", fp_dir / "KPOINTS") + + self.copy_template("POTCAR", target_name=None) + shutil.copy(self.work_dir / "POTCAR", fp_dir / "POTCAR") + + # ---------------------------------------- + # 3. 分发文件并提交任务 + # ---------------------------------------- + # 找到所有生成的文件夹 (task_1, task_2...) + task_dirs = sorted(list(self.work_dir.glob(f"{prefix}_*"))) + if not task_dirs: + raise RuntimeError(f"No {prefix}_* folders generated!") + + self.logger.info(f" -> Found {len(task_dirs)} tasks. Distributing input files...") + + # 将 fp 里的文件分发到每个 task 文件夹 (替代 presub.sh 的功能) + common_files = ["INCAR", "KPOINTS", "POTCAR"] + for t_dir in task_dirs: + if not t_dir.is_dir(): continue + for f in common_files: + shutil.copy(fp_dir / f, t_dir / f) + + # 提交计算 self.logger.info(" -> Running VASP jobs...") - success_count = 0 - for task_dir in task_dirs: - self.logger.info(f" -> Running {task_dir.name}...") - try: - # 调用 machine.yaml 里定义的 vasp - # 注意:如果 task 很多,这里最好写成多进程并发 - self.runner.run("vasp", cwd=task_dir) - # 简单检查 - if (task_dir / "OUTCAR").exists(): + # 这里的并行策略取决于 machine.yaml + # 如果是 Interactive GPU,我们通常是串行跑,或者一次跑 N 个 + # 这里先简单实现串行跑 + for t_dir in task_dirs: + self.logger.info(f" -> Running {t_dir.name}...") + try: + # 调用 machine.yaml 里的 vasp 工具 + self.runner.run("vasp", cwd=t_dir) + if (t_dir / "OUTCAR").exists(): # 简单判据 success_count += 1 except Exception as e: - self.logger.error(f"Task {task_dir.name} failed: {e}") + self.logger.error(f"Job {t_dir.name} failed: {e}") self.logger.info(f" -> Finished. Success: {success_count}/{len(task_dirs)}") - # 4. 收集数据 (OUTCAR -> NEP-dataset.xyz) - self.logger.info(" -> Collecting data...") - valid_atoms = [] - for task_dir in task_dirs: - try: - # 读取 OUTCAR - atoms = read(task_dir / "OUTCAR", format='vasp-outcar') - valid_atoms.append(atoms) - except: + # ---------------------------------------- + # 4. 收集结果 (OUTCARs -> NEP-dataset.xyz) + # ---------------------------------------- + # 使用 gpumdkit 104 功能: Format Conversion -> OUTCAR to xyz (需提供路径) + # 或者 108? 根据你的描述是 gpumdkit.sh -out2xyz . + + self.logger.info(" -> Converting OUTCARs to NEP-dataset.xyz...") + + # 方式 A: 命令行参数调用 (如果你确认支持) + # cmd = f"{gpumdkit_cmd} -out2xyz ." + + # 方式 B: 交互式调用 (104/108) - 这里假设 -out2xyz 可用,这是最方便的 + # 如果不支持,我们需要知道交互式的代码。根据你的描述 7: "-out2xyz ." + + try: + # 尝试直接调用 -out2xyz + subprocess.run( + f"{gpumdkit_cmd} -out2xyz .", + cwd=self.work_dir, + shell=True, + executable="/bin/bash", + check=True + ) + + # gpumdkit 通常生成 model.xyz 或 out.xyz,我们需要重命名为 NEP-dataset.xyz + # 假设生成的是 model.xyz + potential_outputs = ["model.xyz", "movie.xyz", "out.xyz"] + found = False + for f in potential_outputs: + if (self.work_dir / f).exists(): + shutil.move(self.work_dir / f, self.work_dir / "NEP-dataset.xyz") + found = True + break + + if not found and not (self.work_dir / "NEP-dataset.xyz").exists(): + # 如果没找到,可能已经在子文件夹里? pass - if valid_atoms: - write(self.work_dir / "NEP-dataset.xyz", valid_atoms, format='extxyz') - else: - raise RuntimeError("No valid OUTCARs found!") + except subprocess.CalledProcessError: + self.logger.warning("gpumdkit -out2xyz failed, falling back to ASE...") + # Fallback: 使用 ASE 收集 (更稳健) + from ase.io import read, write + all_atoms = [] + for t_dir in task_dirs: + try: + all_atoms.append(read(t_dir / "OUTCAR", format="vasp-outcar")) + except: + pass + if all_atoms: + write(self.work_dir / "NEP-dataset.xyz", all_atoms, format="extxyz") self.check_done() diff --git a/nep_auto/modules/m4_train.py b/nep_auto/modules/m4_train.py index 5f930d2..890b87e 100644 --- a/nep_auto/modules/m4_train.py +++ b/nep_auto/modules/m4_train.py @@ -1,4 +1,5 @@ import shutil +from pathlib import Path from nep_auto.modules.base_module import BaseModule @@ -15,42 +16,53 @@ class TrainModule(BaseModule): self.logger.info(f"🧠 [Train] Starting Training Iter {self.iter_id}...") self.initialize() - # 1. 准备 train.xyz - # 逻辑:当前 train.xyz = 上一轮 train.xyz + 本轮 scf/NEP-dataset.xyz - current_train_xyz = self.work_dir / "train.xyz" + # ---------------------------------------- + # 1. 准备 train.xyz (合并) + # ---------------------------------------- + # 目标文件 + current_train = self.work_dir / "train.xyz" - # 打开输出文件 - with open(current_train_xyz, 'wb') as outfile: - # A. 写入上一轮数据 (或初始数据) - if self.iter_id == 1: - # 第一轮,看是否有初始训练集,如果没有则只用本轮的 SCF 数据 - # 这里假设 iter_000 是个虚拟的,或者直接去 00.data 里找 - init_data = self.root / "00.data" / "train.xyz" # 预留位置 - pass - else: - prev_train = self.root / f"iter_{self.iter_id - 1:03d}" / "03.train" / "train.xyz" - if prev_train.exists(): - with open(prev_train, 'rb') as infile: - shutil.copyfileobj(infile, outfile) + # 来源 1: 上一轮的 train.xyz (如果是第一轮,找初始数据) + sources = [] + if self.iter_id == 1: + init_data = self.root / "00.data" / "train.xyz" + if init_data.exists(): + sources.append(init_data) + else: + prev_train = self.root / f"iter_{self.iter_id - 1:03d}" / "03.train" / "train.xyz" + if prev_train.exists(): + sources.append(prev_train) - # B. 写入本轮新数据 - new_data = self.iter_dir / "02.scf" / "NEP-dataset.xyz" - if new_data.exists(): - with open(new_data, 'rb') as infile: + # 来源 2: 本轮新算的 SCF 数据 + new_data = self.iter_dir / "02.scf" / "NEP-dataset.xyz" + if new_data.exists(): + sources.append(new_data) + else: + raise FileNotFoundError("New training data (NEP-dataset.xyz) missing!") + + # 执行合并 + self.logger.info(f" -> Merging {len(sources)} datasets into train.xyz...") + with open(current_train, 'wb') as outfile: + for src in sources: + with open(src, 'rb') as infile: shutil.copyfileobj(infile, outfile) - else: - raise FileNotFoundError("New training data (NEP-dataset.xyz) missing!") + # ---------------------------------------- # 2. 准备 nep.in + # ---------------------------------------- self.copy_template("nep.in") - # 3. 运行训练 + # ---------------------------------------- + # 3. 运行训练 (调用 machine.yaml 里的 nep) + # ---------------------------------------- self.logger.info(" -> Running NEP training...") self.runner.run("nep", cwd=self.work_dir) self.check_done() def check_done(self): + # 检查是否生成了 nep.txt + # 通常还会检查 loss.out 是否收敛,或者生成了 virials.out 等 if (self.work_dir / "nep.txt").exists(): self.logger.info("✅ Training finished.") return True