162 lines
5.9 KiB
Python
162 lines
5.9 KiB
Python
import os
|
|
import pandas as pd
|
|
import math
|
|
import shutil
|
|
|
|
# ================= 配置区域 =================
|
|
# 定义各阴离子的筛选阈值
|
|
THRESHOLDS = {
|
|
"O": {"perc": 0.50, "min_d": 3.0, "node": 2.2},
|
|
"S": {"perc": 0.55, "min_d": 3.0, "node": 2.2},
|
|
"Cl": {"perc": 0.45, "min_d": 3.0, "node": 2.0},
|
|
"Br": {"perc": 0.45, "min_d": 3.0, "node": 2.0}
|
|
}
|
|
|
|
# 路径配置
|
|
CSV_ROOT_DIR = "../output" # CSV 所在的根目录
|
|
DATA_SOURCE_DIR = "../data/after_step1" # 原始 CIF 文件所在的根目录
|
|
TARGET_DIR = "../data/after_screening" # 筛选后放置软链接的目标目录
|
|
|
|
|
|
# ===========================================
|
|
|
|
def check_requirements(row, anion_type):
|
|
"""
|
|
检查单行数据是否符合要求
|
|
"""
|
|
config = THRESHOLDS.get(anion_type)
|
|
if not config:
|
|
return False
|
|
|
|
try:
|
|
perc = float(row["Percolation Diameter (A)"])
|
|
min_d = float(row["Minimum of d"])
|
|
node = float(row["Maximum Node Length (A)"])
|
|
|
|
if math.isnan(perc) or math.isnan(min_d) or math.isnan(node):
|
|
return False
|
|
|
|
# 筛选逻辑
|
|
c1 = perc > config["perc"]
|
|
c2 = min_d < config["min_d"]
|
|
c3 = node > config["node"]
|
|
|
|
return c1 and c2 and c3
|
|
|
|
except (ValueError, TypeError):
|
|
return False
|
|
|
|
|
|
def create_result_file(group_name, anion_name, material_id):
|
|
"""
|
|
创建结果文件 (这里改为直接复制,软链接在跨文件系统或某些环境下可能不稳定,复制更稳妥)
|
|
如果确实需要软链接,可以将 shutil.copy 换回 os.symlink
|
|
"""
|
|
# 1. 构建源文件路径
|
|
# 正确路径: ../data/after_step1/Group/MaterialID/MaterialID.cif
|
|
# 例如: ../data/after_step1/S/195819/195819.cif
|
|
# 注意:如果原本结构是 S+O/S/ID... 这里会自动适配
|
|
|
|
# 这里的路径逻辑要非常小心,取决于 extract_data.py 是怎么生成 CSV 目录结构的
|
|
# 如果 CSV 在 output/S/S.csv -> 对应源文件在 after_step1/S/...
|
|
# 如果 CSV 在 output/S+O/S/S.csv -> 对应源文件在 after_step1/S+O/S/...
|
|
|
|
if group_name == anion_name:
|
|
# 单阴离子情况 (如 output/S/S.csv -> after_step1/S/ID/ID.cif)
|
|
rel_source_path = os.path.join(DATA_SOURCE_DIR, group_name, material_id, f"{material_id}.cif")
|
|
else:
|
|
# 混合阴离子情况 (如 output/S+O/S/S.csv -> after_step1/S+O/S/ID/ID.cif)
|
|
rel_source_path = os.path.join(DATA_SOURCE_DIR, group_name, anion_name, material_id, f"{material_id}.cif")
|
|
|
|
abs_source_path = os.path.abspath(rel_source_path)
|
|
|
|
if not os.path.exists(abs_source_path):
|
|
print(f"源文件不存在: {abs_source_path}")
|
|
return
|
|
|
|
# 2. 构建目标文件夹路径 ../data/after_screening/Group/Anion
|
|
if group_name == anion_name:
|
|
target_subdir = os.path.join(TARGET_DIR, group_name)
|
|
else:
|
|
target_subdir = os.path.join(TARGET_DIR, group_name, anion_name)
|
|
|
|
if not os.path.exists(target_subdir):
|
|
os.makedirs(target_subdir)
|
|
|
|
# 3. 构建目标文件路径
|
|
target_file_path = os.path.join(target_subdir, f"{material_id}.cif")
|
|
|
|
# 4. 执行复制 (改为复制以确保结果独立)
|
|
try:
|
|
if os.path.exists(target_file_path):
|
|
os.remove(target_file_path)
|
|
|
|
shutil.copy(abs_source_path, target_file_path)
|
|
# 如果你非常确定要软链接,请注释上一行,解开下一行:
|
|
# os.symlink(abs_source_path, target_file_path)
|
|
|
|
except OSError as e:
|
|
print(f"创建文件失败 {material_id}: {e}")
|
|
|
|
|
|
def process_all_csvs():
|
|
"""
|
|
遍历 output 文件夹下的所有 CSV 并处理
|
|
"""
|
|
if not os.path.exists(CSV_ROOT_DIR):
|
|
print(f"CSV 目录不存在: {CSV_ROOT_DIR}")
|
|
return
|
|
|
|
print("开始执行 Step 2-4 联合筛选...")
|
|
|
|
for root, dirs, files in os.walk(CSV_ROOT_DIR):
|
|
for file in files:
|
|
if file.endswith(".csv"):
|
|
csv_path = os.path.join(root, file)
|
|
|
|
# --- 核心修正 1: 路径解析逻辑 ---
|
|
# 获取相对于 output 根目录的路径部分
|
|
# 例如 root = ../output/S -> rel_root = S
|
|
# 例如 root = ../output/S+O/S -> rel_root = S+O/S
|
|
rel_root = os.path.relpath(root, CSV_ROOT_DIR)
|
|
path_parts = rel_root.split(os.sep)
|
|
|
|
# 解析 Group 和 Anion
|
|
if len(path_parts) == 1:
|
|
# 单层目录: output/S -> Group=S, Anion=S
|
|
group_name = path_parts[0]
|
|
anion_name = path_parts[0]
|
|
elif len(path_parts) >= 2:
|
|
# 双层目录: output/S+O/S -> Group=S+O, Anion=S
|
|
group_name = path_parts[0]
|
|
anion_name = path_parts[1]
|
|
else:
|
|
# 根目录下直接有csv的情况 (不应该发生)
|
|
continue
|
|
|
|
if anion_name not in THRESHOLDS:
|
|
print(f"跳过不支持的阴离子类型: {anion_name}")
|
|
continue
|
|
|
|
print(f"正在处理: Group={group_name}, Anion={anion_name} ({file})")
|
|
|
|
# --- 核心修正 2: 防止 Filename 被读取为浮点数 ---
|
|
# dtypeStr={'Filename': str} 强制将第一列读取为字符串
|
|
df = pd.read_csv(csv_path, dtype={'Filename': str})
|
|
|
|
pass_count = 0
|
|
total_count = len(df)
|
|
|
|
for index, row in df.iterrows():
|
|
# 去除可能存在的 .0 后缀 (以防万一 CSV 里已经写成了浮点格式)
|
|
material_id = str(row['Filename']).replace('.0', '')
|
|
|
|
if check_requirements(row, anion_name):
|
|
create_result_file(group_name, anion_name, material_id)
|
|
pass_count += 1
|
|
|
|
print(f" - 完成: {pass_count}/{total_count} 个材料通过筛选并保存至 {TARGET_DIR}。")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
process_all_csvs() |