nep框架重构

This commit is contained in:
2025-12-09 01:15:38 +08:00
parent 19a6924a41
commit 91bdb0dab1
30 changed files with 7930 additions and 1001 deletions

View File

@@ -1,58 +1,21 @@
# config/machine.yaml machine_name: "Local_Test_Env"
root_dir: "." # <--- 请修改这里为你的实际路径
# 当前使用的计算系统配置名 # 脚本库位置
current_system: "interactive_gpu" script_dir: "config/scripts"
systems: executors:
# --- 配置 1: 交互式 GPU 环境 (当前使用) --- # 1. 简单的本地命令 (如 NEP 训练)
# 场景: 你已经用 srun/tmux 申请到了资源,直接运行命令即可 nep_local:
interactive_gpu: type: "local"
type: "local" # local 表示直接运行 subprocess不提交 sbatch cmd: "nep"
# 路径配置 # 2. 复杂的本地脚本 (如 GPUMD)
gpumdkit_root: "/cluster/home/koko125/tool/GPUMDkit" gpumd:
type: "local"
cmd: "gpumd" # 对应 config/scripts/gpumd.sh
tools: # 3. Slurm 提交测试 (VASP CPU)
# 1. GPUMD 配置 vasp_cpu:
gpumd: type: "local"
command: "gpumd" cmd: "mpirun -np 1 vasp_std"
# 运行前需要 source 的环境脚本
env_setup: ""
gpu_id: 0
# 2. NEP 配置 (同上)
nep:
command: "nep"
env_setup: ""
gpu_id: 0
gpumdkit:
# 假设是 GPU 版本,可能不需要 mpirun 或者只需要少量核
command: "gpumdkit.sh"
env_setup: ""
# 即使是 local 模式,有时也需要指定并行度
n_procs: 1
# 3. VASP (GPU 版) 配置
vasp:
# 假设是 GPU 版本,可能不需要 mpirun 或者只需要少量核
command: "mpirun -np 1 vasp_std"
env_setup: ""
# 即使是 local 模式,有时也需要指定并行度
n_procs: 1
# --- 配置 2: VASP CPU 集群模式 (预留,未来使用) ---
# 场景: 需要生成 submit.slurm 并 sbatch 提交
slurm_cpu_cluster:
type: "slurm"
gpumdkit_root: "/cluster/home/koko125/tool/GPUMDkit"
tools:
vasp:
command: "mpirun -np 4 vasp_std"
env_setup: "module load vasp/6.3-cpu"
# Slurm 头部参数
slurm_header:
partition: "cpu_long"
ntasks_per_node: 64
time: "24:00:00"

View File

@@ -1,44 +1,55 @@
# config/param.yaml # param.yaml
# --- 1. 流程控制 --- project: "LiYCl_Auto"
stages_def:
p: "preheat"
m: "md"
s: "select"
d: "scf"
t: "train"
pr: "predict"
o: "output"
# 默认流程 # 1. 初始文件定义 (对应 data/ 目录)
default_workflow: ["p", "m", "s", "d", "t", "pr"] files:
poscar: "LiYCl.vasp"
potcar: "POTCAR"
initial_pot: "nep89.txt" # 第一轮 MD 用的势函数
# 自定义调度 # 2. 迭代流程控制
schedule: iterations:
1: ["p", "m", "s", "d", "t", "o"] # --- 第一轮 ---
- id: 0
steps:
# Step 1: MD (预热 + 采样)
# 逻辑:会把 nep.txt (来自 initial_pot) 和 model.xyz 准备好
- name: "00.md"
sub_tasks:
# 你提到可能有预热,也可能有加工,这里支持串行执行
- template_sub: "preheat" # 使用 template/00.md/preheat/run.in
- template_sub: "production" # 使用 template/00.md/production/run.in
executor: "gpumd" # 对应 machine.yaml
# --- 2. 容错与通知 --- # Step 2: 筛选
control: - name: "01.select"
max_retries: 3 method: "distance"
check_interval: 60 params: [0.01, 60, 120]
notification: # Step 3: SCF (VASP)
enable_log: true # 逻辑cp template/02.scf/INCAR; check KPOINTS; cp data/POTCAR
log_file: "./logs/sys_runtime.log" - name: "02.scf"
enable_hook: true executor: "vasp_std" # 对应 machine.yaml (可能调用 vasp_std.sh)
hook_script: "python ./hooks/send_alert.py"
alert_events: ["fail", "finish"]
# --- 3. 各模块具体的物理/算法参数 --- # Step 4: 训练
params: # 逻辑cp template/03.train/nep.in
preheat: - name: "03.train"
template_file: "run_ramp.in" executor: "nep_local"
select: # --- 第二轮 ---
target_min: 60 - id: 1
target_max: 120 steps:
init_threshold: 0.01 - name: "00.md"
sub_tasks:
- template_sub: "production" # 第二轮可能只需要 sampling
# 注意:这一轮的 nep.txt 会自动指向 iter_00/03.train/nep.txt
scf: - name: "01.select"
# 比如指定用 machine.yaml 里的哪个 tool 配置 method: "distance"
tool_key: "vasp" params: [0.012, 60, 120]
- name: "02.scf"
executor: "vasp_std"
- name: "03.train"

View File

@@ -1,16 +0,0 @@
# config/system.yaml
project_name: "LiYCl_Transport_v1"
# 物理体系定义
system:
elements: ["Li", "Y", "Cl"]
# 初始结构 (VASP格式)
initial_structure: "./initial_data/LiYCl.vasp"
# 初始势函数 (第一轮 preheat 使用)
# 如果是第一轮,使用此通用势;后续轮次自动使用上一轮训练结果
initial_potential: "./initial_data/nep89.txt"
# 晶格常数或扩胞设置 (可选,视具体模块逻辑而定)
supercell: [1, 1, 1]

58
data/POSCAR Normal file
View File

@@ -0,0 +1,58 @@
Li20 Ge2 P4 S24
1.0
8.5899509999999992 0.0000000000000000 0.0000000000000000
-0.0386059999999990 8.8794570000000004 0.0000000000000000
-0.1455200000000040 -0.4498699999999980 12.9663439999999994
Li Ge P S
20 2 4 24
direct
0.9887309999999990 0.5170910000000000 0.9517369999999991 Li+
0.9644940000000000 0.5171220000000000 0.4263230000000000 Li+
0.5105600000000000 0.9648409999999999 0.5489520000000000 Li+
0.4963619999999990 0.9963219999999990 0.0506500000000000 Li+
0.2598129999999990 0.2893130000000000 0.1575810000000000 Li+
0.7437819999999991 0.7471070000000000 0.2819570000000000 Li+
0.2159610000000000 0.8178780000000000 0.7535529999999990 Li+
0.2296369999999990 0.7569920000000000 0.2519280000000000 Li+
0.7540580000000000 0.2201050000000000 0.3515260000000000 Li+
0.4620470000000000 0.5160800000000001 0.7507739999999991 Li+
0.2296480000000000 0.2281340000000000 0.3660700000000000 Li+
0.7342460000000000 0.2563749999999990 0.8572679999999990 Li+
0.2450540000000000 0.7611289999999999 0.0057880000000000 Li+
0.7414010000000000 0.2628560000000000 0.1314030000000000 Li+
0.7641690000000000 0.7282580000000000 0.6894540000000000 Li+
0.2214960000000000 0.2588350000000000 0.9194279999999990 Li+
0.7481260000000000 0.7611089999999990 0.0084029999999990 Li+
0.2441960000000000 0.7196549999999990 0.4849360000000000 Li+
0.7834100000000001 0.2145880000000000 0.6254649999999990 Li+
0.2411910000000000 0.2118050000000000 0.6496820000000000 Li+
0.9968669999999999 0.4879730000000000 0.1831110000000000 Ge4+
0.5088400000000001 0.9907500000000001 0.8156200000000000 Ge4+
0.0083590000000000 0.4831760000000000 0.6844359999999990 P4+
0.9926389999999990 0.9650139999999990 0.5018440000000000 P5+
0.4924000000000000 0.5156300000000000 0.9930840000000000 P5+
0.4878150000000000 0.0091810000000000 0.3105009999999990 P5+
0.5174609999999999 0.2009880000000000 0.7214800000000000 S-
0.9886940000000000 0.1764359999999990 0.4438430000000000 S2-
0.0046020000000000 0.7939309999999991 0.3896550000000000 S2-
0.2956610000000000 0.5308579999999991 0.9012580000000000 S2-
0.6842570000000000 0.5303240000000000 0.8996850000000000 S2-
0.4955310000000000 0.6948430000000000 0.0971559999999990 S2-
0.4869330000000000 0.3101589999999990 0.0593990000000000 S2-
0.1957789999999990 0.9476460000000000 0.5883450000000000 S2-
0.8016430000000000 0.9458259999999999 0.5899210000000000 S2-
0.0068860000000000 0.2956850000000000 0.0654530000000000 S2-
0.0007660000000000 0.7092330000000000 0.1038340000000000 S2-
0.2123620000000000 0.4863000000000000 0.6010390000000000 S2-
0.8150440000000000 0.4910100000000001 0.5895600000000001 S2-
0.4848800000000000 0.8005220000000000 0.3751230000000000 S2-
0.4957740000000000 0.1787590000000000 0.4224449999999990 S2-
0.2848890000000000 0.9890740000000000 0.9054629999999990 S2-
0.7137549999999990 0.9990030000000000 0.9215460000000000 S2-
0.9930950000000000 0.6631359999999999 0.7876190000000000 S2-
0.0030850000000000 0.2815040000000000 0.7604740000000000 S2-
0.7792519999999999 0.4661789999999990 0.2732200000000000 S2-
0.2011889999999990 0.4785850000000000 0.2915830000000000 S2-
0.4924739999999990 0.7926540000000000 0.7030090000000000 S2-
0.6858759999999990 0.0159360000000000 0.2214040000000000 S2-
0.2922079999999990 0.0311870000000000 0.2192110000000000 S2-

7301
data/POTCAR Normal file

File diff suppressed because it is too large Load Diff

45
main.py
View File

@@ -1,32 +1,33 @@
# main.py
import os
import sys import sys
import time from src.utils import setup_logger
import traceback from src.workflow import Workflow
from nep_auto.driver import NEPDriver
from nep_auto.utils.logger import setup_logger
def main(): def main():
# 1. 初始化全局日志 root_dir = os.getcwd()
logger = setup_logger("logs/sys_runtime.log")
logger.info("========================================")
logger.info("🚀 NEP Automation Framework Starting...")
logger.info("========================================")
# 1. 初始化日志
# 既然 workspace 还没创建先放到根目录Workflow 初始化后再放到 workspace 也可以
# 这里简单起见放在根目录
setup_logger(root_dir)
# 2. 检查基本文件是否存在
required_dirs = ['config', 'data', 'template']
for d in required_dirs:
if not os.path.exists(os.path.join(root_dir, d)):
print(f"Error: Missing directory '{d}'. Please check file structure.")
sys.exit(1)
# 3. 启动工作流
try: try:
# 2. 初始化驱动器 (加载配置,恢复状态) app = Workflow(root_dir)
driver = NEPDriver() app.run()
# 3. 启动主循环
driver.run()
except KeyboardInterrupt:
logger.warning("⚠️ 用户手动中断程序 (KeyboardInterrupt)")
sys.exit(0)
except Exception as e: except Exception as e:
logger.error(f"❌ 程序发生严重崩溃: {str(e)}") import traceback
logger.error(traceback.format_exc()) traceback.print_exc()
# 这里可以加入发送崩溃通知的逻辑 print(f"Critical Error: {e}")
sys.exit(1)
if __name__ == "__main__": if __name__ == "__main__":

View File

@@ -1,41 +0,0 @@
import yaml
import time
import logging
from pathlib import Path
from nep_auto.status_manager import StatusManager
class NEPDriver:
def __init__(self):
self.logger = logging.getLogger("NEP_Auto")
self.root = Path(".")
# 1. 加载所有配置
self.config_sys = self._load_yaml("config/system.yaml")
self.config_param = self._load_yaml("config/param.yaml")
# 【新增】加载 machine 配置
self.config_machine = self._load_yaml("config/machine.yaml")
self.logger.info(f"项目名称: {self.config_sys.get('project_name')}")
self.logger.info(f"计算环境: {self.config_machine.get('current_system')}")
# 2. 初始化状态管理器
self.status = StatusManager(self.root / "workspace")
def _load_yaml(self, path):
if not Path(path).exists():
raise FileNotFoundError(f"配置文件缺失: {path}")
with open(path, 'r') as f:
return yaml.safe_load(f)
def run(self):
"""主循环"""
self.logger.info("✅ 驱动器初始化完成,准备进入主循环...")
# 获取当前轮次
current_iter = self.status.get_current_iter()
self.logger.info(f"当前进度: iter_{current_iter:03d}")
# 暂时只打印一次就退出,用于测试环境
self.logger.info("测试阶段:环境检查通过。等待模块代码实现...")
# while True: ... (后续我们将在这里实现调度逻辑)

View File

@@ -1,77 +0,0 @@
import os
import shutil
import logging
from pathlib import Path
from nep_auto.utils.runner import CommandRunner
class BaseModule:
def __init__(self, driver, iter_id):
"""
:param driver: NEPDriver 实例,包含所有配置
:param iter_id: 当前轮次 (int)
"""
self.driver = driver
self.config_sys = driver.config_sys
self.config_param = driver.config_param
self.machine_config = driver.config_machine['systems'][driver.config_machine['current_system']]
self.iter_id = iter_id
self.iter_name = f"iter_{iter_id:03d}"
self.logger = logging.getLogger("NEP_Auto")
# 初始化运行器
self.runner = CommandRunner(self.machine_config)
# 定义路径
self.root = Path(driver.root) / "workspace"
self.iter_dir = self.root / self.iter_name
self.output_dir = self.iter_dir / "05.output" # 公共输出区
def get_work_dir(self):
"""需由子类实现:返回当前模块的具体工作目录"""
raise NotImplementedError
def initialize(self):
"""通用初始化:创建目录,复制通用文件"""
work_dir = self.get_work_dir()
if not work_dir.exists():
work_dir.mkdir(parents=True, exist_ok=True)
self.logger.debug(f"📁 Created dir: {work_dir}")
# 确保公共输出目录存在
if not self.output_dir.exists():
self.output_dir.mkdir(parents=True, exist_ok=True)
def run(self):
"""核心逻辑入口,子类必须实现"""
raise NotImplementedError
def check_done(self):
"""检查任务是否完成,子类必须实现"""
raise NotImplementedError
# --- 通用工具方法 ---
def copy_template(self, template_name, target_name=None):
"""从 template 目录复制文件"""
if target_name is None:
target_name = template_name
# 根据模块类型寻找模板目录 (需要在子类定义 self.template_subdir)
src = Path("template") / getattr(self, "template_subdir", "common") / template_name
dst = self.get_work_dir() / target_name
if src.exists():
shutil.copy(src, dst)
# self.logger.debug(f"📄 Copied {template_name} -> {dst}")
else:
self.logger.warning(f"⚠️ Template not found: {src}")
def link_file(self, src_path, dst_name):
"""创建软链接"""
src = Path(src_path).resolve()
dst = self.get_work_dir() / dst_name
if dst.exists():
dst.unlink()
os.symlink(src, dst)

View File

@@ -1,113 +0,0 @@
import shutil
import logging
from pathlib import Path
from .base_module import BaseModule
class PreheatModule(BaseModule):
def __init__(self, driver, iter_id):
super().__init__(driver, iter_id)
self.template_subdir = "00_md"
def get_work_dir(self):
return self.iter_dir / "00.md" / "preheat"
def initialize(self):
super().initialize() # 创建目录
work_dir = self.get_work_dir()
# 1. 准备 run.in (从配置读取模板名)
template_name = self.config_param['params']['preheat'].get('template_file', 'run.in')
self.copy_template(template_name, "run.in")
# 2. 准备 nep.in (GPUMD 运行必需,虽然内容可能很简单)
self.copy_template("nep.in")
# 3. 准备 nep.txt (势函数)
self._prepare_potential()
# 4. 准备 model.xyz (结构)
self._prepare_structure()
def _prepare_potential(self):
"""准备势函数文件 nep.txt"""
dst = self.get_work_dir() / "nep.txt"
if self.iter_id == 1:
# 第一轮:使用 system.yaml 里定义的初始势
init_pot = Path(self.config_sys['system']['initial_potential'])
if not init_pot.exists():
raise FileNotFoundError(f"Initial potential not found: {init_pot}")
shutil.copy(init_pot, dst)
self.logger.info(f" -> Copied initial potential: {init_pot.name}")
else:
# 后续轮次:使用上一轮训练结果
prev_iter = f"iter_{self.iter_id - 1:03d}"
prev_train_dir = self.root / prev_iter / "03.train"
src = prev_train_dir / "nep.txt"
if not src.exists():
raise FileNotFoundError(f"Previous potential not found: {src}")
shutil.copy(src, dst)
self.logger.info(f" -> Copied potential from {prev_iter}")
def _prepare_structure(self):
"""准备 model.xyz"""
work_dir = self.get_work_dir()
# 目前逻辑Preheat 总是从初始结构开始(或者你可以改为从上一轮的 dump 中取)
# 这里演示从 VASP 文件转换
vasp_path = Path(self.config_sys['system']['initial_structure'])
if not vasp_path.exists():
raise FileNotFoundError(f"Structure file not found: {vasp_path}")
# 复制到工作目录
local_vasp = work_dir / vasp_path.name
shutil.copy(vasp_path, local_vasp)
# 调用 gpumdkit.sh -addlabel 进行转换
# 命令格式: gpumdkit.sh -addlabel file.vasp Li Y Cl
elements = " ".join(self.config_sys['system']['elements'])
self.logger.info(" -> Converting VASP to model.xyz...")
# 使用 runner 调用 gpumdkit (必须在 machine.yaml 里定义了 'gpumdkit')
# 注意gpumdkit.sh 可能不输出 model.xyz 而是输出 file.xyz需要确认
# 假设输出为 model.xyz
cmd_args = f"-addlabel {local_vasp.name} {elements}"
self.runner.run("gpumdkit", cwd=work_dir, extra_args=cmd_args)
# 检查是否生成成功
if not (work_dir / "model.xyz").exists():
# 有时候 gpumdkit 生成的文件名可能是 LiYCl.xyz需要重命名为 model.xyz
# 这里做一个容错检查
expected_name = local_vasp.stem + ".xyz" # e.g., LiYCl.xyz
if (work_dir / expected_name).exists():
shutil.move(work_dir / expected_name, work_dir / "model.xyz")
else:
raise RuntimeError("Failed to generate model.xyz from gpumdkit")
def run(self):
"""执行 GPUMD"""
work_dir = self.get_work_dir()
# 检查是否已经跑完 (简单的锁文件机制)
if (work_dir / "thermo.out").exists():
self.logger.info(f" -> Pre-check: thermo.out exists, skipping preheat.")
# 这里可以加更复杂的检查,比如步数是否足够
return
self.logger.info(f"🔥 Running Preheat in {self.iter_name}")
self.initialize()
# 调用 GPUMD
# GPUMD 没有参数,直接运行
self.runner.run("gpumd", cwd=work_dir)
self.logger.info(" -> Preheat finished.")
def check_done(self):
# 简单检查 thermo.out 是否存在且非空
f = self.get_work_dir() / "thermo.out"
return f.exists() and f.stat().st_size > 0

View File

@@ -1,116 +0,0 @@
import shutil
import glob
from pathlib import Path
from nep_auto.modules.base_module import BaseModule
class MDModule(BaseModule):
def __init__(self, driver, iter_id):
super().__init__(driver, iter_id)
self.template_subdir = "00_md"
# 预热目录 (输入源)
self.preheat_dir = self.iter_dir / "00.md" / "preheat"
# MD 目录 (工作区)
self.work_dir = self.iter_dir / "00.md" / "md"
def get_work_dir(self):
return self.work_dir
def run(self):
self.logger.info(f"🌪️ [MD] Starting Sampling Phase Iter {self.iter_id}...")
self.initialize()
# ----------------------------------------
# 1. 从预热轨迹中采样 (dump.xyz -> sampled_structures.xyz)
# ----------------------------------------
preheat_dump = self.preheat_dir / "dump.xyz"
if not preheat_dump.exists():
raise FileNotFoundError(f"Preheat dump not found: {preheat_dump}")
# 调用 sample_structures.py
# 假设参数: input_file method number
kit_root = self.driver.config_param['env']['gpumdkit_root']
script = f"{kit_root}/Scripts/sample_structures/sample_structures.py"
# 复制 dump 到当前目录以便处理
local_dump = self.work_dir / "preheat_dump.xyz"
shutil.copy(preheat_dump, local_dump)
self.logger.info(" -> Sampling structures from preheat trajectory...")
# 按照你的描述: sample_structures.py dump.xyz uniform 4
# 这里 "4" 可以放到 param.yaml 里配置,暂时写死或读取默认
self.runner.run(
"python_script", # 这里可以用 local runner 直接跑 python
cwd=self.work_dir,
extra_args=f"{script} preheat_dump.xyz uniform 4"
)
# 产物通常叫 sampled_structures.xyz我们需要把它作为后续 MD 的起始结构
# 但注意GPUMD MD 通常读取 model.xyz 或者 restart。
# 如果你的 run.in 里写的是 load_xyz sampled_structures.xyz那就没问题。
# 如果不是,通常做法是把 sampled_structures.xyz 切分成多个文件夹。
# --- 修正逻辑:根据你的描述 "生成 sample_1-4 文件夹" ---
# 我们遍历 template/00_md/md_run_*.in
tpl_path = Path("template") / self.template_subdir
run_templates = sorted(list(tpl_path.glob("md_run_*.in")))
if not run_templates:
self.logger.warning(f"⚠️ No 'md_run_*.in' found in {tpl_path}, looking for 'run.in'...")
run_templates = list(tpl_path.glob("run.in"))
sub_tasks = []
nep_source = self.preheat_dir / "nep.txt" # 沿用预热阶段的势函数
for idx, tpl in enumerate(run_templates, start=1):
task_name = f"sample_{idx}"
task_dir = self.work_dir / task_name
task_dir.mkdir(exist_ok=True)
sub_tasks.append(task_dir)
# 1. 复制 run.in
shutil.copy(tpl, task_dir / "run.in")
# 2. 复制 nep.txt
shutil.copy(nep_source, task_dir / "nep.txt")
# 3. 复制结构 (假设所有 sample 都从预热的最后一帧或 sampled_structures 开始)
# 这里简化处理:复制 model.xyz (初始结构) 或者 使用 preheat 的最后状态
# 根据你的流程,通常需要把 sampled_structures.xyz 里的某一帧放进去
# 或者 GPUMD 支持直接读取 exyz。
# 这里我们假设 run.in 里配置好了读取方式,我们只负责给文件。
if (self.preheat_dir / "model.xyz").exists():
shutil.copy(self.preheat_dir / "model.xyz", task_dir / "model.xyz")
# ----------------------------------------
# 2. 执行所有 Sample 任务
# ----------------------------------------
self.logger.info(f" -> Submitting {len(sub_tasks)} MD tasks...")
for task_dir in sub_tasks:
self.logger.info(f" -> Running {task_dir.name}...")
self.runner.run("gpumd", cwd=task_dir)
# ----------------------------------------
# 3. 合并结果
# ----------------------------------------
self.logger.info(" -> Merging dump files...")
# cat sample_*/dump.xyz >> dump.xyz
# 使用 python 实现 cat 以跨平台安全
target_dump = self.work_dir / "dump.xyz"
with open(target_dump, 'wb') as outfile:
for task_dir in sub_tasks:
src = task_dir / "dump.xyz"
if src.exists():
with open(src, 'rb') as infile:
shutil.copyfileobj(infile, outfile)
else:
self.logger.warning(f"⚠️ {task_dir.name} generated no dump.xyz")
self.check_done()
def check_done(self):
if (self.work_dir / "dump.xyz").exists():
self.logger.info("✅ MD Sampling finished.")
return True
raise RuntimeError("MD failed: dump.xyz not created.")

View File

@@ -1,148 +0,0 @@
import shutil
import subprocess
from pathlib import Path
from nep_auto.modules.base_module import BaseModule
class SelectModule(BaseModule):
def __init__(self, driver, iter_id):
super().__init__(driver, iter_id)
self.work_dir = self.iter_dir / "01.select"
self.md_dir = self.iter_dir / "00.md" / "md"
def get_work_dir(self):
return self.work_dir
def get_frame_count(self, xyz_file):
"""读取 xyz 文件帧数 (通过 grep 'Lattice' 计数)"""
if not xyz_file.exists():
return 0
try:
# 使用 grep -c 更快,避免 python 读取大文件内存溢出
result = subprocess.run(
f"grep -c 'Lattice' {xyz_file}",
shell=True, stdout=subprocess.PIPE, text=True
)
return int(result.stdout.strip())
except:
return 0
def run(self):
self.logger.info(f"🔍 [Select] Starting Active Learning Selection Iter {self.iter_id}...")
self.initialize()
# ----------------------------------------
# 1. 准备必要文件
# ----------------------------------------
# A. 待筛选数据 (从 MD 结果拿)
src_dump = self.md_dir / "dump.xyz"
if not src_dump.exists():
raise FileNotFoundError(f"MD dump missing: {src_dump}")
shutil.copy(src_dump, self.work_dir / "dump.xyz")
# B. 势函数 (从 MD 结果拿)
shutil.copy(self.md_dir / "nep.txt", self.work_dir / "nep.txt")
# C. 历史训练集 (用于对比)
# 逻辑:如果是第一轮,我们需要一个初始的 train.xyz (即使是空的或者是 model.xyz)
# gpumdkit 需要这个文件存在
target_train_xyz = self.work_dir / "train.xyz"
if self.iter_id == 1:
# 尝试从 data 目录拿初始训练集,如果没有,可以用 model.xyz 充数
init_train = self.root / "00.data" / "train.xyz"
if init_train.exists():
shutil.copy(init_train, target_train_xyz)
else:
# 如果实在没有,把初始结构当做 train.xyz避免脚本报错
self.logger.warning("No initial train.xyz found, using model.xyz as placeholder.")
shutil.copy(self.md_dir / "model.xyz", target_train_xyz)
else:
# 使用上一轮累积的训练集
prev_train = self.root / f"iter_{self.iter_id - 1:03d}" / "03.train" / "train.xyz"
if prev_train.exists():
shutil.copy(prev_train, target_train_xyz)
else:
raise FileNotFoundError(f"Previous train.xyz missing: {prev_train}")
# ----------------------------------------
# 2. 循环筛选 (调整阈值)
# ----------------------------------------
cfg = self.config_param['params']['select']
target_min = cfg.get('target_min', 60)
target_max = cfg.get('target_max', 120)
threshold = cfg.get('init_threshold', 0.01)
max_attempts = 10
attempt = 0
# gpumdkit 命令 (假设 machine.yaml 里配好了 tool 叫 'gpumdkit')
# 如果是 local 模式runner.run 实际上是执行 command。
# 但这里我们需要特殊的 input piperunner 的通用接口可能不够用。
# 既然我们明确是 local 环境且用 pipe直接用 subprocess 最稳。
gpumdkit_cmd = self.machine_config['tools']['gpumdkit']['command'] # e.g. "gpumdkit.sh"
while attempt < max_attempts:
self.logger.info(f" -> Attempt {attempt + 1}: Threshold = {threshold:.5f}")
# 构造输入流字符串
# 对应你的流程: 203 -> file names -> 1 (distance mode) -> threshold
input_str = f"203\ndump.xyz train.xyz nep.txt\n1\n{threshold}\n"
# 构造完整命令: echo -e "..." | gpumdkit.sh
# 注意python 的 input 参数直接传给 stdin不需要用 echo |
try:
self.logger.debug(f" Input string: {repr(input_str)}")
process = subprocess.run(
gpumdkit_cmd,
input=input_str,
cwd=self.work_dir,
shell=True,
executable="/bin/bash",
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True
)
# 记录输出以便 debug
# self.logger.debug(process.stdout)
if process.returncode != 0:
self.logger.error(f"gpumdkit execution failed: {process.stderr}")
raise RuntimeError("gpumdkit failed")
except Exception as e:
self.logger.error(f"Execution error: {e}")
raise
# 检查 selected.xyz
selected_file = self.work_dir / "selected.xyz"
count = self.get_frame_count(selected_file)
self.logger.info(f" -> Selected {count} structures.")
if target_min <= count <= target_max:
self.logger.info(f"✅ Selection success! ({count} frames)")
break
elif count < target_min:
self.logger.info(" -> Too few, lowering threshold (x0.8)...")
threshold *= 0.8
else:
self.logger.info(" -> Too many, raising threshold (x1.2)...")
threshold *= 1.2
# 稍微清理一下生成的中间文件,防止下次干扰?
# selected.xyz 会被下次覆盖,所以不删也行。
attempt += 1
if attempt >= max_attempts:
self.logger.warning("⚠️ Max attempts reached. Proceeding with current best.")
self.check_done()
def check_done(self):
if (self.work_dir / "selected.xyz").exists():
return True
raise RuntimeError("Selection failed: selected.xyz not found")

View File

@@ -1,167 +0,0 @@
import shutil
import subprocess
import glob
from pathlib import Path
from nep_auto.modules.base_module import BaseModule
class SCFModule(BaseModule):
def __init__(self, driver, iter_id):
super().__init__(driver, iter_id)
self.template_subdir = "02_scf"
self.work_dir = self.iter_dir / "02.scf"
self.select_dir = self.iter_dir / "01.select"
def get_work_dir(self):
return self.work_dir
def run(self):
self.logger.info(f"⚛️ [SCF] Starting DFT Calculation Iter {self.iter_id}...")
self.initialize()
# ----------------------------------------
# 1. 准备数据: selected.xyz -> 301 切分
# ----------------------------------------
src_xyz = self.select_dir / "selected.xyz"
if not src_xyz.exists():
raise FileNotFoundError("selected.xyz missing from select module")
shutil.copy(src_xyz, self.work_dir / "selected.xyz")
# 调用 gpumdkit.sh (301 -> prefix)
# Prefix 使用 "task" 或者 "job",生成 job_1, job_2...
prefix = "task"
input_str = f"301\n{prefix}\n"
gpumdkit_cmd = self.machine_config['tools']['gpumdkit']['command']
self.logger.info(" -> Splitting structures using gpumdkit...")
try:
subprocess.run(
gpumdkit_cmd,
input=input_str,
cwd=self.work_dir,
shell=True,
executable="/bin/bash",
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
check=True
)
except subprocess.CalledProcessError as e:
self.logger.error(f"gpumdkit splitting failed: {e.stderr}")
raise
# ----------------------------------------
# 2. 准备 DFT 输入文件 (fp 文件夹)
# ----------------------------------------
# gpumdkit 会生成一个 fp 文件夹,我们需要把模版放进去
fp_dir = self.work_dir / "fp"
if not fp_dir.exists():
# 某些版本的脚本可能不自动创建 fp手动建一个保险
fp_dir.mkdir(exist_ok=True)
self.logger.info(" -> preparing INCAR/KPOINTS/POTCAR...")
# 从 template/02_scf 复制到 02.scf/fp
self.copy_template("INCAR", target_name=None)
shutil.copy(self.work_dir / "INCAR", fp_dir / "INCAR")
self.copy_template("KPOINTS", target_name=None)
shutil.copy(self.work_dir / "KPOINTS", fp_dir / "KPOINTS")
self.copy_template("POTCAR", target_name=None)
shutil.copy(self.work_dir / "POTCAR", fp_dir / "POTCAR")
# ----------------------------------------
# 3. 分发文件并提交任务
# ----------------------------------------
# 找到所有生成的文件夹 (task_1, task_2...)
task_dirs = sorted(list(self.work_dir.glob(f"{prefix}_*")))
if not task_dirs:
raise RuntimeError(f"No {prefix}_* folders generated!")
self.logger.info(f" -> Found {len(task_dirs)} tasks. Distributing input files...")
# 将 fp 里的文件分发到每个 task 文件夹 (替代 presub.sh 的功能)
common_files = ["INCAR", "KPOINTS", "POTCAR"]
for t_dir in task_dirs:
if not t_dir.is_dir(): continue
for f in common_files:
shutil.copy(fp_dir / f, t_dir / f)
# 提交计算
self.logger.info(" -> Running VASP jobs...")
success_count = 0
# 这里的并行策略取决于 machine.yaml
# 如果是 Interactive GPU我们通常是串行跑或者一次跑 N 个
# 这里先简单实现串行跑
for t_dir in task_dirs:
self.logger.info(f" -> Running {t_dir.name}...")
try:
# 调用 machine.yaml 里的 vasp 工具
self.runner.run("vasp", cwd=t_dir)
if (t_dir / "OUTCAR").exists(): # 简单判据
success_count += 1
except Exception as e:
self.logger.error(f"Job {t_dir.name} failed: {e}")
self.logger.info(f" -> Finished. Success: {success_count}/{len(task_dirs)}")
# ----------------------------------------
# 4. 收集结果 (OUTCARs -> NEP-dataset.xyz)
# ----------------------------------------
# 使用 gpumdkit 104 功能: Format Conversion -> OUTCAR to xyz (需提供路径)
# 或者 108? 根据你的描述是 gpumdkit.sh -out2xyz .
self.logger.info(" -> Converting OUTCARs to NEP-dataset.xyz...")
# 方式 A: 命令行参数调用 (如果你确认支持)
# cmd = f"{gpumdkit_cmd} -out2xyz ."
# 方式 B: 交互式调用 (104/108) - 这里假设 -out2xyz 可用,这是最方便的
# 如果不支持,我们需要知道交互式的代码。根据你的描述 7: "-out2xyz ."
try:
# 尝试直接调用 -out2xyz
subprocess.run(
f"{gpumdkit_cmd} -out2xyz .",
cwd=self.work_dir,
shell=True,
executable="/bin/bash",
check=True
)
# gpumdkit 通常生成 model.xyz 或 out.xyz我们需要重命名为 NEP-dataset.xyz
# 假设生成的是 model.xyz
potential_outputs = ["model.xyz", "movie.xyz", "out.xyz"]
found = False
for f in potential_outputs:
if (self.work_dir / f).exists():
shutil.move(self.work_dir / f, self.work_dir / "NEP-dataset.xyz")
found = True
break
if not found and not (self.work_dir / "NEP-dataset.xyz").exists():
# 如果没找到,可能已经在子文件夹里?
pass
except subprocess.CalledProcessError:
self.logger.warning("gpumdkit -out2xyz failed, falling back to ASE...")
# Fallback: 使用 ASE 收集 (更稳健)
from ase.io import read, write
all_atoms = []
for t_dir in task_dirs:
try:
all_atoms.append(read(t_dir / "OUTCAR", format="vasp-outcar"))
except:
pass
if all_atoms:
write(self.work_dir / "NEP-dataset.xyz", all_atoms, format="extxyz")
self.check_done()
def check_done(self):
if (self.work_dir / "NEP-dataset.xyz").exists():
return True
raise RuntimeError("SCF failed: NEP-dataset.xyz not generated")

View File

@@ -1,69 +0,0 @@
import shutil
from pathlib import Path
from nep_auto.modules.base_module import BaseModule
class TrainModule(BaseModule):
def __init__(self, driver, iter_id):
super().__init__(driver, iter_id)
self.template_subdir = "03_train"
self.work_dir = self.iter_dir / "03.train"
def get_work_dir(self):
return self.work_dir
def run(self):
self.logger.info(f"🧠 [Train] Starting Training Iter {self.iter_id}...")
self.initialize()
# ----------------------------------------
# 1. 准备 train.xyz (合并)
# ----------------------------------------
# 目标文件
current_train = self.work_dir / "train.xyz"
# 来源 1: 上一轮的 train.xyz (如果是第一轮,找初始数据)
sources = []
if self.iter_id == 1:
init_data = self.root / "00.data" / "train.xyz"
if init_data.exists():
sources.append(init_data)
else:
prev_train = self.root / f"iter_{self.iter_id - 1:03d}" / "03.train" / "train.xyz"
if prev_train.exists():
sources.append(prev_train)
# 来源 2: 本轮新算的 SCF 数据
new_data = self.iter_dir / "02.scf" / "NEP-dataset.xyz"
if new_data.exists():
sources.append(new_data)
else:
raise FileNotFoundError("New training data (NEP-dataset.xyz) missing!")
# 执行合并
self.logger.info(f" -> Merging {len(sources)} datasets into train.xyz...")
with open(current_train, 'wb') as outfile:
for src in sources:
with open(src, 'rb') as infile:
shutil.copyfileobj(infile, outfile)
# ----------------------------------------
# 2. 准备 nep.in
# ----------------------------------------
self.copy_template("nep.in")
# ----------------------------------------
# 3. 运行训练 (调用 machine.yaml 里的 nep)
# ----------------------------------------
self.logger.info(" -> Running NEP training...")
self.runner.run("nep", cwd=self.work_dir)
self.check_done()
def check_done(self):
# 检查是否生成了 nep.txt
# 通常还会检查 loss.out 是否收敛,或者生成了 virials.out 等
if (self.work_dir / "nep.txt").exists():
self.logger.info("✅ Training finished.")
return True
raise RuntimeError("Training failed: nep.txt not generated")

View File

@@ -1,27 +0,0 @@
import json
import os
from pathlib import Path
class StatusManager:
def __init__(self, workspace_path):
self.workspace = Path(workspace_path)
self.status_file = self.workspace / "status.json"
if not self.workspace.exists():
self.workspace.mkdir(parents=True)
# 如果没有状态文件,创建一个初始的
if not self.status_file.exists():
self._save_status({"current_iter": 1, "stages": {}})
def _save_status(self, data):
with open(self.status_file, 'w') as f:
json.dump(data, f, indent=4)
def get_current_iter(self):
if self.status_file.exists():
with open(self.status_file, 'r') as f:
data = json.load(f)
return data.get("current_iter", 1)
return 1

View File

@@ -1,33 +0,0 @@
import logging
import os
import sys
def setup_logger(log_file="logs/runtime.log"):
# 确保日志目录存在
os.makedirs(os.path.dirname(log_file), exist_ok=True)
logger = logging.getLogger("NEP_Auto")
logger.setLevel(logging.INFO)
# 避免重复添加 handler
if logger.handlers:
return logger
# 格式
formatter = logging.Formatter(
'[%(asctime)s] [%(levelname)s] %(message)s',
datefmt='%Y-%m-%d %H:%M:%S'
)
# 文件输出
fh = logging.FileHandler(log_file, mode='a', encoding='utf-8')
fh.setFormatter(formatter)
logger.addHandler(fh)
# 屏幕输出
ch = logging.StreamHandler(sys.stdout)
ch.setFormatter(formatter)
logger.addHandler(ch)
return logger

View File

@@ -1,74 +0,0 @@
import subprocess
import os
import time
import logging
class CommandRunner:
def __init__(self, machine_config):
"""
:param machine_config: config/machine.yaml 中 'systems' -> 'current_system' 对应的内容
"""
self.config = machine_config
self.logger = logging.getLogger("NEP_Auto")
self.mode = self.config.get("type", "local") # local 或 slurm
def run(self, tool_name, cwd=".", wait=True, extra_args=""):
"""
核心运行方法
:param tool_name: machine.yaml 中 tools 下的键名 (如 'gpumd', 'vasp')
:param cwd: 执行命令的工作目录
:param wait: 是否等待命令结束 (True: 阻塞, False: 后台运行)
:param extra_args: 附加在命令后的参数
"""
# 1. 获取工具配置
tool_conf = self.config.get("tools", {}).get(tool_name)
if not tool_conf:
self.logger.error(f"❌ 找不到工具配置: {tool_name}")
raise ValueError(f"Tool {tool_name} not defined in machine.yaml")
cmd = tool_conf.get("command")
env_setup = tool_conf.get("env_setup", "")
# 2. 组装命令 (Local 模式)
if self.mode == "local":
full_cmd = f"{cmd} {extra_args}"
# 如果有环境加载脚本,用 && 连接
if env_setup:
full_cmd = f"{env_setup} && {full_cmd}"
self.logger.info(f"⚙️ [Local] Executing: {full_cmd}")
self.logger.info(f" 📂 Workdir: {cwd}")
try:
# 使用 bash 执行以支持 source 命令
process = subprocess.Popen(
full_cmd,
shell=True,
cwd=cwd,
executable="/bin/bash",
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True
)
if wait:
stdout, stderr = process.communicate()
if process.returncode != 0:
self.logger.error(f"❌ Execution failed (Code {process.returncode})")
self.logger.error(f"Stderr: {stderr}")
raise RuntimeError(f"Command failed: {full_cmd}")
return True
else:
return process # 返回进程对象供监控
except Exception as e:
self.logger.error(f"❌ Runner Error: {str(e)}")
raise
# 3. Slurm 模式 (预留接口,暂未实现具体逻辑)
elif self.mode == "slurm":
self.logger.warning("⚠️ Slurm mode not fully implemented yet.")
# 这里未来会生成 sbatch 脚本并提交
return False

129
src/machine.py Normal file
View File

@@ -0,0 +1,129 @@
# src/machine.py
import os
import subprocess
import time
import logging
import shutil
class MachineManager:
def __init__(self, machine_config_path):
from src.utils import load_yaml
self.config = load_yaml(machine_config_path)
self.root_dir = self.config.get('root_dir', os.getcwd())
self.script_dir = os.path.join(self.root_dir, self.config.get('script_dir', 'config/scripts'))
logging.info(f"MachineManager initialized. Script dir: {self.script_dir}")
def execute(self, executor_name, work_dir):
"""
统一执行入口
:param executor_name: machine.yaml 中定义的 key (如 gpumd, vasp_cpu)
:param work_dir: 任务执行的工作目录
"""
if executor_name not in self.config['executors']:
logging.error(f"Executor '{executor_name}' not defined in machine.yaml")
return False
exec_conf = self.config['executors'][executor_name]
exec_type = exec_conf.get('type', 'local')
# 确保工作目录存在
os.makedirs(work_dir, exist_ok=True)
logging.info(f"--- Task: {executor_name} | Type: {exec_type} ---")
logging.info(f"Working Dir: {work_dir}")
if exec_type == 'local':
return self._run_local(exec_conf, work_dir)
elif exec_type == 'slurm':
return self._submit_slurm(exec_conf, work_dir, executor_name)
else:
logging.error(f"Unknown execution type: {exec_type}")
return False
def _run_local(self, conf, work_dir):
"""本地直接执行"""
# 1. 优先看有没有 script 脚本文件
if 'script' in conf:
script_name = conf['script']
src_script = os.path.join(self.script_dir, script_name)
if not os.path.exists(src_script):
logging.error(f"Script not found: {src_script}")
return False
# 运行脚本: bash /path/to/script.sh
cmd = f"bash {src_script}"
# 2. 如果没有脚本,看有没有 cmd 直接命令
elif 'cmd' in conf:
cmd = conf['cmd']
else:
logging.error("No 'script' or 'cmd' defined for local executor.")
return False
try:
# 切换到工作目录执行
logging.info(f"Executing Local Command: {cmd}")
subprocess.check_call(cmd, shell=True, cwd=work_dir)
logging.info("Local execution success.")
return True
except subprocess.CalledProcessError as e:
logging.error(f"Execution failed with error code {e.returncode}")
return False
def _submit_slurm(self, conf, work_dir, job_name):
"""生成 Slurm 脚本并提交 (模拟)"""
script_name = conf.get('script')
src_script = os.path.join(self.script_dir, script_name)
if not os.path.exists(src_script):
logging.error(f"Script not found: {src_script}")
return False
# 1. 读取用户自定义脚本内容
with open(src_script, 'r') as f:
user_script_content = f.read()
# 2. 生成提交脚本 (.sub)
sub_file = os.path.join(work_dir, "submit.sub")
with open(sub_file, 'w') as f:
f.write("#!/bin/bash\n")
f.write(f"#SBATCH --job-name={job_name}\n")
# 根据 yaml 自动填入 SBATCH 参数
if 'partition' in conf: f.write(f"#SBATCH --partition={conf['partition']}\n")
if 'nodes' in conf: f.write(f"#SBATCH --nodes={conf['nodes']}\n")
if 'ntasks' in conf: f.write(f"#SBATCH --ntasks={conf['ntasks']}\n")
if 'time' in conf: f.write(f"#SBATCH --time={conf['time']}\n")
if 'gpus' in conf: f.write(f"#SBATCH --gres=gpu:{conf['gpus']}\n")
f.write("\n")
f.write("cd $SLURM_SUBMIT_DIR\n")
f.write("\n")
f.write("# --- User Script Content ---\n")
f.write(user_script_content)
logging.info(f"Generated submission script: {sub_file}")
# 3. 提交任务
# 注意:这里我们做个判断,如果是在非 Slurm 环境测试,就不真正提交,只生成文件
# 如果你想真正提交,把下面的 True 改为 False
TEST_MODE = True
if TEST_MODE:
logging.info("[TEST_MODE] Simulated 'sbatch submit.sub'. Check the .sub file.")
return True
else:
try:
# 提交并获取 Job ID
res = subprocess.check_output(f"sbatch {sub_file}", shell=True, cwd=work_dir)
job_id = res.decode().strip().split()[-1] # 通常输出是 Submitted batch job 123456
logging.info(f"Job submitted. ID: {job_id}")
# TODO: 这里需要加入 wait_for_job(job_id) 的逻辑,我们下一阶段实现
return True
except subprocess.CalledProcessError as e:
logging.error(f"Submission failed: {e}")
return False

177
src/steps.py Normal file
View File

@@ -0,0 +1,177 @@
# src/steps.py
import os
import shutil
import time
import logging
import subprocess
class BaseStep:
def __init__(self, name, work_dir, machine_manager, config):
self.name = name
self.work_dir = work_dir
self.machine = machine_manager
self.config = config
os.makedirs(self.work_dir, exist_ok=True)
self.logger = logging.getLogger()
def copy_file(self, src, dst_name=None):
"""辅助函数:安全复制文件"""
if not os.path.exists(src):
self.logger.error(f"[{self.name}] Source file missing: {src}")
return False
dst_name = dst_name if dst_name else os.path.basename(src)
dst_path = os.path.join(self.work_dir, dst_name)
shutil.copy(src, dst_path)
return dst_path
class MDStep(BaseStep):
"""
对应 00.md: 负责预热/采样
"""
def run(self, prev_nep_path, template_path):
self.logger.info(f"=== Running Step: {self.name} (MD) ===")
# 1. 准备 nep.txt (来自上一轮或初始数据)
if not prev_nep_path:
self.logger.error("No nep.txt provided for MD.")
return False
self.copy_file(prev_nep_path, "nep.txt")
# 2. 准备 model.xyz (如果是第一轮这里假设外部已经放好了或者由init步生成)
# 为了简化,我们假设上一级流程已经把 model.xyz 准备在 work_dir 或者由上一轮传递
# 这里我们假设 model.xyz 必须存在于 work_dir (可以通过 init 步骤拷入)
if not os.path.exists(os.path.join(self.work_dir, "model.xyz")):
self.logger.warning(f"[{self.name}] model.xyz not found in {self.work_dir}. Make sure Init step ran.")
# 3. 准备 run.in (从 template 复制)
run_in_src = os.path.join(template_path, "run.in")
self.copy_file(run_in_src, "run.in")
# 4. 调用 Machine 执行 GPUMD
# 注意:这里我们调用 machine.yaml 里定义的 'gpumd' 执行器
success = self.machine.execute("gpumd", self.work_dir)
if success and os.path.exists(os.path.join(self.work_dir, "dump.xyz")):
self.logger.info(f"[{self.name}] MD finished. dump.xyz generated.")
return True
else:
self.logger.error(f"[{self.name}] MD failed or dump.xyz missing.")
return False
class SelectStep(BaseStep):
"""
对应 01.select: 智能筛选
"""
def run(self, dump_path, train_path, nep_path, method="distance", params=[0.01, 60, 120]):
self.logger.info(f"=== Running Step: {self.name} (Smart Selection) ===")
# 准备文件
self.copy_file(dump_path, "dump.xyz")
self.copy_file(train_path, "train.xyz")
self.copy_file(nep_path, "nep.txt")
target_min, target_max = params[1], params[2]
threshold = params[0]
step_size = 0.001 # 每次调整的步长
# 你的流程里是用 gpumdkit.sh 做筛选 (option 203)
# 这里的命令构造需要非常小心,模拟你的 echo输入
# 假设 gpumdkit.sh 在 PATH 中,或者通过 machine config 获取路径
# 由于我们现在是 local 调试,假设你依然依赖 gpumdkit.sh
# 但既然我们写 Python建议未来把筛选逻辑计算距离直接写成 Python 代码。
# 这里暂时模拟调用逻辑:
for i in range(10): # 最多尝试10次
self.logger.info(f"Selection attempt {i + 1}: Threshold={threshold:.4f}")
# 构造输入字符串: 203 -> file names -> 1 (distance) -> threshold
# 注意:这里假设 gpumdkit.sh 能接受这种输入
# 为了调试方便,这里我们暂时只打日志,不真的调 gpumdkit (因为它需要真实的数据文件)
# 在真实运行中,这里应该调用:
# input_str = f"203\ndump.xyz train.xyz nep.txt\n1\n{threshold}\n"
# subprocess.run("gpumdkit.sh", input=input_str, cwd=self.work_dir...)
# --- 模拟代码 Start ---
# 假设生成了一个假的 selected.xyz
with open(os.path.join(self.work_dir, "selected.xyz"), 'w') as f:
# 模拟根据阈值,阈值越小选的越多
mock_count = int(100 / (threshold * 100))
f.write(f"Mock selected {mock_count} frames")
selected_count = mock_count
self.logger.info(f"Found {selected_count} structures (Mock).")
# --- 模拟代码 End ---
if target_min <= selected_count <= target_max:
self.logger.info(f"Selection Success! Final count: {selected_count}")
return True
elif selected_count < target_min:
self.logger.info("Too few. Decreasing threshold.")
threshold -= step_size
if threshold < 0: threshold = 0.001
else:
self.logger.info("Too many. Increasing threshold.")
threshold += step_size
self.logger.warning("Selection failed to converge. Using last result.")
return True # 暂时允许继续
class SCFStep(BaseStep):
"""
对应 02.scf: VASP 计算
"""
def run(self, template_path, potcar_path):
self.logger.info(f"=== Running Step: {self.name} (SCF/VASP) ===")
# 1. 复制 POTCAR
self.copy_file(potcar_path, "POTCAR")
# 2. 复制 INCAR
incar_src = os.path.join(template_path, "INCAR")
if not self.copy_file(incar_src, "INCAR"):
return False # INCAR 必须有
# 3. 复制 KPOINTS (可选)
kpoints_src = os.path.join(template_path, "KPOINTS")
if os.path.exists(kpoints_src):
self.copy_file(kpoints_src, "KPOINTS")
# 4. 执行 VASP
# 注意:这里通常需要把 selected.xyz 拆分成多个文件夹
# 在 Local 简单测试中,我们假设 selected.xyz 已经被拆分成了 POSCAR
# 或者我们只跑一个单点能测试。
# 既然是框架开发,这里我们调用 machine.yaml 里的 'vasp_cpu'
success = self.machine.execute("vasp_cpu", self.work_dir)
return success
class TrainStep(BaseStep):
"""
对应 03.train: NEP 训练
"""
def run(self, template_path, new_train_data_path):
self.logger.info(f"=== Running Step: {self.name} (Train) ===")
# 1. 准备 nep.in
self.copy_file(os.path.join(template_path, "nep.in"), "nep.in")
# 2. 准备 train.xyz (这里假设我们把所有数据 cat 到了这里)
if new_train_data_path and os.path.exists(new_train_data_path):
self.copy_file(new_train_data_path, "train.xyz")
else:
# 如果没有新数据,只是测试,创建一个空的
with open(os.path.join(self.work_dir, "train.xyz"), 'w') as f:
f.write("Mock training data")
# 3. 运行 NEP
return self.machine.execute("nep_local", self.work_dir)

47
src/utils.py Normal file
View File

@@ -0,0 +1,47 @@
# src/utils.py
import yaml
import logging
import os
import sys
def load_yaml(path):
"""加载 YAML 配置文件"""
if not os.path.exists(path):
logging.error(f"Config file not found: {path}")
sys.exit(1)
with open(path, 'r') as f:
return yaml.safe_load(f)
def setup_logger(work_dir, log_file="autonep.log"):
"""配置日志:同时输出到文件和控制台"""
logger = logging.getLogger()
logger.setLevel(logging.INFO)
# 清楚之前的 handler 防止重复
if logger.hasHandlers():
logger.handlers.clear()
# 文件 Handler
file_handler = logging.FileHandler(os.path.join(work_dir, log_file))
file_handler.setFormatter(logging.Formatter('%(asctime)s - %(levelname)s - %(message)s'))
logger.addHandler(file_handler)
# 控制台 Handler
console_handler = logging.StreamHandler()
console_handler.setFormatter(logging.Formatter('%(message)s')) # 控制台只看消息,简洁点
logger.addHandler(console_handler)
return logger
class Notifier:
"""(预留) 通知模块"""
def __init__(self, url=None):
self.url = url
def send(self, title, msg, priority=5):
# 暂时只打印日志,不实际发送
logging.info(f"[[Notification]] {title}: {msg}")

131
src/workflow.py Normal file
View File

@@ -0,0 +1,131 @@
# src/workflow.py
import os
import shutil
import logging
from src.utils import load_yaml
from src.machine import MachineManager
from src.steps import MDStep, SelectStep, SCFStep, TrainStep
class Workflow:
def __init__(self, root_dir):
self.root_dir = root_dir
# 1. 加载配置
self.param = load_yaml(os.path.join(root_dir, "config/param.yaml"))
# 2. 初始化机器管理器
self.machine = MachineManager(os.path.join(root_dir, "config/machine.yaml"))
# 3. 初始化路径变量
self.workspace = os.path.join(root_dir, "workspace")
self.data_dir = os.path.join(root_dir, "data")
self.template_dir = os.path.join(root_dir, "template")
self.logger = logging.getLogger()
# 状态追踪变量
self.current_nep_pot = os.path.join(self.data_dir, self.param['files']['initial_pot'])
# 假设第一轮之前的 train set 也是空的或者由用户提供,这里先指向一个基础文件
self.current_train_set = os.path.join(self.workspace, "accumulated_train.xyz")
def run(self):
self.logger.info(f"Workflow Started: {self.param['project']}")
# 遍历每一轮迭代
for iteration in self.param['iterations']:
iter_id = iteration['id']
iter_name = f"iter_{iter_id:02d}"
iter_path = os.path.join(self.workspace, iter_name)
self.logger.info(f"\n >>> Starting Iteration: {iter_id} <<<")
os.makedirs(iter_path, exist_ok=True)
# --- 执行该轮定义的各个 Step ---
for step_conf in iteration['steps']:
step_name = step_conf['name']
# ==========================
# Step: 00.md
# ==========================
if step_name == "00.md":
step_dir = os.path.join(iter_path, "00.md")
# 只有第一轮且有init需求时才进行 POSCAR -> model.xyz 转化
# 这里为了 Local 测试,我们简单处理:直接把 POSCAR 拷过去当 model.xyz (仅作演示)
# 实际上你应该调用 gpumdkit 转化
if iter_id == 0:
os.makedirs(step_dir, exist_ok=True)
shutil.copy(os.path.join(self.data_dir, self.param['files']['poscar']),
os.path.join(step_dir, "model.xyz"))
# 遍历子任务 (preheat, production...)
for sub in step_conf.get('sub_tasks', []):
template_sub_name = sub['template_sub']
sub_work_dir = os.path.join(step_dir, template_sub_name)
template_path = os.path.join(self.template_dir, "00.md", template_sub_name)
# 实例化并运行
md_task = MDStep(f"MD-{template_sub_name}", sub_work_dir, self.machine, self.config)
# 关键:要把上一级准备好的 model.xyz 拷进来
if iter_id == 0:
shutil.copy(os.path.join(step_dir, "model.xyz"), os.path.join(sub_work_dir, "model.xyz"))
# 如果是后续轮次,应该用上一轮选好的结构,这里暂略,先跑通第一轮
md_task.run(self.current_nep_pot, template_path)
# 记录最后生成的 dump.xyz 路径,供下一步使用
self.last_dump_path = os.path.join(sub_work_dir, "dump.xyz")
# ==========================
# Step: 01.select
# ==========================
elif step_name == "01.select":
step_dir = os.path.join(iter_path, "01.select")
select_task = SelectStep("Select", step_dir, self.machine, self.config)
# 使用上一步产生的 dump 和 当前的训练集/势函数
select_task.run(
dump_path=getattr(self, 'last_dump_path', None),
train_path=self.current_train_set,
nep_path=self.current_nep_pot,
method=step_conf.get('method'),
params=step_conf.get('params')
)
# ==========================
# Step: 02.scf
# ==========================
elif step_name == "02.scf":
step_dir = os.path.join(iter_path, "02.scf")
scf_task = SCFStep("SCF", step_dir, self.machine, self.config)
template_path = os.path.join(self.template_dir, "02.scf")
potcar_path = os.path.join(self.data_dir, self.param['files']['potcar'])
scf_task.run(template_path, potcar_path)
# 假装产生了一些新数据
self.new_data_chunk = os.path.join(step_dir, "scf_results.xyz")
# ==========================
# Step: 03.train
# ==========================
elif step_name == "03.train":
step_dir = os.path.join(iter_path, "03.train")
train_task = TrainStep("Train", step_dir, self.machine, self.config)
template_path = os.path.join(self.template_dir, "03.train")
# 实际逻辑应该是把 self.new_data_chunk 合并到 total_train.xyz
# 这里直接传入
train_task.run(template_path, getattr(self, 'new_data_chunk', None))
# 更新当前势函数路径,供下一轮使用
self.current_nep_pot = os.path.join(step_dir, "nep.txt")
self.logger.info("Workflow Finished Successfully.")
@property
def config(self):
return self.param # 简单透传

View File

@@ -1,8 +0,0 @@
potential ./nep.txt
velocity 100
ensemble npt_mttk temp 100 400 aniso 0 0
run 100000
ensemble npt_mttk temp 400 1200 aniso 0 0
dump_thermo 10
dump_exyz 10000
run 100000

0
template/02.scf/INCAR Normal file
View File

0
template/02.scf/KPOINTS Normal file
View File

0
template/03.train/nep.in Normal file
View File