nep框架搭建

This commit is contained in:
2025-12-08 17:48:03 +08:00
parent 0b6537a810
commit 5057d18e98
5 changed files with 232 additions and 48 deletions

View File

@@ -10,10 +10,14 @@ class NEPDriver:
self.logger = logging.getLogger("NEP_Auto")
self.root = Path(".")
# 1. 加载配置
# 1. 加载所有配置
self.config_sys = self._load_yaml("config/system.yaml")
self.config_param = self._load_yaml("config/param.yaml")
# 【新增】加载 machine 配置
self.config_machine = self._load_yaml("config/machine.yaml")
self.logger.info(f"项目名称: {self.config_sys.get('project_name')}")
self.logger.info(f"计算环境: {self.config_machine.get('current_system')}")
# 2. 初始化状态管理器
self.status = StatusManager(self.root / "workspace")

View File

@@ -0,0 +1,77 @@
import os
import shutil
import logging
from pathlib import Path
from nep_auto.utils.runner import CommandRunner
class BaseModule:
def __init__(self, driver, iter_id):
"""
:param driver: NEPDriver 实例,包含所有配置
:param iter_id: 当前轮次 (int)
"""
self.driver = driver
self.config_sys = driver.config_sys
self.config_param = driver.config_param
self.machine_config = driver.config_machine['systems'][driver.config_machine['current_system']]
self.iter_id = iter_id
self.iter_name = f"iter_{iter_id:03d}"
self.logger = logging.getLogger("NEP_Auto")
# 初始化运行器
self.runner = CommandRunner(self.machine_config)
# 定义路径
self.root = Path(driver.root) / "workspace"
self.iter_dir = self.root / self.iter_name
self.output_dir = self.iter_dir / "05.output" # 公共输出区
def get_work_dir(self):
"""需由子类实现:返回当前模块的具体工作目录"""
raise NotImplementedError
def initialize(self):
"""通用初始化:创建目录,复制通用文件"""
work_dir = self.get_work_dir()
if not work_dir.exists():
work_dir.mkdir(parents=True, exist_ok=True)
self.logger.debug(f"📁 Created dir: {work_dir}")
# 确保公共输出目录存在
if not self.output_dir.exists():
self.output_dir.mkdir(parents=True, exist_ok=True)
def run(self):
"""核心逻辑入口,子类必须实现"""
raise NotImplementedError
def check_done(self):
"""检查任务是否完成,子类必须实现"""
raise NotImplementedError
# --- 通用工具方法 ---
def copy_template(self, template_name, target_name=None):
"""从 template 目录复制文件"""
if target_name is None:
target_name = template_name
# 根据模块类型寻找模板目录 (需要在子类定义 self.template_subdir)
src = Path("template") / getattr(self, "template_subdir", "common") / template_name
dst = self.get_work_dir() / target_name
if src.exists():
shutil.copy(src, dst)
# self.logger.debug(f"📄 Copied {template_name} -> {dst}")
else:
self.logger.warning(f"⚠️ Template not found: {src}")
def link_file(self, src_path, dst_name):
"""创建软链接"""
src = Path(src_path).resolve()
dst = self.get_work_dir() / dst_name
if dst.exists():
dst.unlink()
os.symlink(src, dst)

View File

@@ -0,0 +1,74 @@
import subprocess
import os
import time
import logging
class CommandRunner:
def __init__(self, machine_config):
"""
:param machine_config: config/machine.yaml 中 'systems' -> 'current_system' 对应的内容
"""
self.config = machine_config
self.logger = logging.getLogger("NEP_Auto")
self.mode = self.config.get("type", "local") # local 或 slurm
def run(self, tool_name, cwd=".", wait=True, extra_args=""):
"""
核心运行方法
:param tool_name: machine.yaml 中 tools 下的键名 (如 'gpumd', 'vasp')
:param cwd: 执行命令的工作目录
:param wait: 是否等待命令结束 (True: 阻塞, False: 后台运行)
:param extra_args: 附加在命令后的参数
"""
# 1. 获取工具配置
tool_conf = self.config.get("tools", {}).get(tool_name)
if not tool_conf:
self.logger.error(f"❌ 找不到工具配置: {tool_name}")
raise ValueError(f"Tool {tool_name} not defined in machine.yaml")
cmd = tool_conf.get("command")
env_setup = tool_conf.get("env_setup", "")
# 2. 组装命令 (Local 模式)
if self.mode == "local":
full_cmd = f"{cmd} {extra_args}"
# 如果有环境加载脚本,用 && 连接
if env_setup:
full_cmd = f"{env_setup} && {full_cmd}"
self.logger.info(f"⚙️ [Local] Executing: {full_cmd}")
self.logger.info(f" 📂 Workdir: {cwd}")
try:
# 使用 bash 执行以支持 source 命令
process = subprocess.Popen(
full_cmd,
shell=True,
cwd=cwd,
executable="/bin/bash",
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True
)
if wait:
stdout, stderr = process.communicate()
if process.returncode != 0:
self.logger.error(f"❌ Execution failed (Code {process.returncode})")
self.logger.error(f"Stderr: {stderr}")
raise RuntimeError(f"Command failed: {full_cmd}")
return True
else:
return process # 返回进程对象供监控
except Exception as e:
self.logger.error(f"❌ Runner Error: {str(e)}")
raise
# 3. Slurm 模式 (预留接口,暂未实现具体逻辑)
elif self.mode == "slurm":
self.logger.warning("⚠️ Slurm mode not fully implemented yet.")
# 这里未来会生成 sbatch 脚本并提交
return False