重构预处理制作

This commit is contained in:
2025-12-14 14:34:26 +08:00
parent da26e0c619
commit 6eeb40d222
19 changed files with 712 additions and 0 deletions

0
config/settings.yaml Normal file
View File

View File

103
main.py Normal file
View File

@@ -0,0 +1,103 @@
"""
高通量筛选与扩胞项目 - 主入口
交互式命令行界面
"""
import os
import sys
# 添加 src 到路径
sys.path.insert(0, os.path.join(os.path.dirname(__file__), 'src'))
from analysis.database_analyzer import DatabaseAnalyzer
from analysis.report_generator import ReportGenerator
def get_user_input():
"""获取用户输入"""
print("\n" + "=" * 70)
print(" 高通量筛选与扩胞项目 - 数据库分析工具")
print("=" * 70)
# 1. 获取数据库路径
while True:
db_path = input("\n请输入数据库路径: ").strip()
if os.path.exists(db_path):
break
print(f"❌ 路径不存在: {db_path}")
# 2. 获取目标阳离子
cation = input("请输入目标阳离子 [默认: Li]: ").strip() or "Li"
# 3. 获取目标阴离子
anion_input = input("请输入目标阴离子 (用逗号分隔) [默认: O,S,Cl,Br]: ").strip()
if anion_input:
anions = set(a.strip() for a in anion_input.split(','))
else:
anions = {'O', 'S', 'Cl', 'Br'}
# 4. 选择阴离子模式
print("\n阴离子模式选择:")
print(" 1. 仅单一阴离子化合物")
print(" 2. 仅复合阴离子化合物")
print(" 3. 全部 (默认)")
mode_choice = input("请选择 [1/2/3]: ").strip()
mode_map = {'1': 'single', '2': 'mixed', '3': 'all', '': 'all'}
anion_mode = mode_map.get(mode_choice, 'all')
# 5. 并行数
n_jobs_input = input("并行线程数 [默认: 4]: ").strip()
n_jobs = int(n_jobs_input) if n_jobs_input.isdigit() else 4
return {
'database_path': db_path,
'target_cation': cation,
'target_anions': anions,
'anion_mode': anion_mode,
'n_jobs': n_jobs
}
def main():
"""主函数"""
# 获取用户输入
params = get_user_input()
print("\n" + "-" * 70)
print("开始分析数据库...")
print("-" * 70)
# 创建分析器
analyzer = DatabaseAnalyzer(
database_path=params['database_path'],
target_cation=params['target_cation'],
target_anions=params['target_anions'],
anion_mode=params['anion_mode'],
n_jobs=params['n_jobs']
)
# 执行分析
report = analyzer.analyze(show_progress=True)
# 打印报告
ReportGenerator.print_report(report, detailed=True)
# 询问是否导出
export = input("\n是否导出详细结果到CSV? [y/N]: ").strip().lower()
if export == 'y':
output_path = input("输出文件路径 [默认: analysis_report.csv]: ").strip()
output_path = output_path or "analysis_report.csv"
ReportGenerator.export_to_csv(report, output_path)
# 询问是否继续处理
print("\n" + "-" * 70)
proceed = input("是否继续进行预处理? [y/N]: ").strip().lower()
if proceed == 'y':
print("预处理功能将在下一阶段实现...")
# TODO: 调用预处理模块
print("\n分析完成!")
if __name__ == "__main__":
main()

0
src/__init__.py Normal file
View File

0
src/analysis/__init__.py Normal file
View File

View File

@@ -0,0 +1,246 @@
"""
数据库分析器分析整个CIF数据库的构成和质量
"""
import os
from dataclasses import dataclass, field
from typing import Dict, List, Set, Optional
from concurrent.futures import ThreadPoolExecutor, as_completed
from tqdm import tqdm
from .structure_inspector import StructureInspector, StructureInfo
@dataclass
class DatabaseReport:
"""数据库分析报告"""
# 基础统计
database_path: str = ""
total_files: int = 0
valid_files: int = 0
invalid_files: int = 0
# 目标元素统计
target_cation: str = ""
target_anions: Set[str] = field(default_factory=set)
anion_mode: str = "" # "single", "mixed", "all"
# 含目标阳离子的统计
cation_containing_count: int = 0
cation_containing_ratio: float = 0.0
# 阴离子分布 (在含目标阳离子的化合物中)
anion_distribution: Dict[str, int] = field(default_factory=dict)
anion_ratios: Dict[str, float] = field(default_factory=dict)
single_anion_count: int = 0
mixed_anion_count: int = 0
# 数据质量统计
with_oxidation_states: int = 0
without_oxidation_states: int = 0
needs_expansion_count: int = 0 # 需要扩胞的数量
cation_partial_occupancy_count: int = 0 # 阳离子共占位
anion_partial_occupancy_count: int = 0 # 阴离子共占位
binary_compound_count: int = 0
has_water_count: int = 0
has_radioactive_count: int = 0
# 可处理性统计
directly_processable: int = 0
needs_preprocessing: int = 0
cannot_process: int = 0
# 详细信息
all_structures: List[StructureInfo] = field(default_factory=list)
skip_reasons_summary: Dict[str, int] = field(default_factory=dict)
class DatabaseAnalyzer:
"""数据库分析器"""
def __init__(
self,
database_path: str,
target_cation: str = "Li",
target_anions: Set[str] = None,
anion_mode: str = "all", # "single", "mixed", "all"
n_jobs: int = 4
):
"""
初始化分析器
Args:
database_path: 数据库路径
target_cation: 目标阳离子
target_anions: 目标阴离子集合
anion_mode: 阴离子模式 ("single"=仅单一, "mixed"=仅复合, "all"=全部)
n_jobs: 并行数
"""
self.database_path = database_path
self.target_cation = target_cation
self.target_anions = target_anions or {'O', 'S', 'Cl', 'Br'}
self.anion_mode = anion_mode
self.n_jobs = n_jobs
self.inspector = StructureInspector(
target_cation=target_cation,
target_anions=self.target_anions
)
def analyze(self, show_progress: bool = True) -> DatabaseReport:
"""
分析数据库
Args:
show_progress: 是否显示进度条
Returns:
DatabaseReport: 分析报告
"""
report = DatabaseReport(
database_path=self.database_path,
target_cation=self.target_cation,
target_anions=self.target_anions,
anion_mode=self.anion_mode
)
# 获取所有CIF文件
cif_files = self._get_cif_files()
report.total_files = len(cif_files)
if report.total_files == 0:
print(f"警告: 在 {self.database_path} 中未找到CIF文件")
return report
# 并行分析所有文件
results = self._analyze_files(cif_files, show_progress)
report.all_structures = results
# 统计结果
self._compute_statistics(report)
return report
def _get_cif_files(self) -> List[str]:
"""获取所有CIF文件路径"""
cif_files = []
if os.path.isfile(self.database_path):
if self.database_path.endswith('.cif'):
cif_files.append(self.database_path)
else:
for root, dirs, files in os.walk(self.database_path):
for f in files:
if f.endswith('.cif'):
cif_files.append(os.path.join(root, f))
return cif_files
def _analyze_files(
self,
cif_files: List[str],
show_progress: bool
) -> List[StructureInfo]:
"""并行分析文件"""
results = []
if self.n_jobs == 1:
# 单线程
iterator = tqdm(cif_files, desc="分析CIF文件") if show_progress else cif_files
for f in iterator:
results.append(self.inspector.inspect(f))
else:
# 多线程
with ThreadPoolExecutor(max_workers=self.n_jobs) as executor:
futures = {executor.submit(self.inspector.inspect, f): f for f in cif_files}
iterator = tqdm(as_completed(futures), total=len(futures), desc="分析CIF文件") \
if show_progress else as_completed(futures)
for future in iterator:
try:
results.append(future.result())
except Exception as e:
print(f"分析失败: {e}")
return results
def _compute_statistics(self, report: DatabaseReport):
"""计算统计数据"""
for info in report.all_structures:
# 有效性统计
if info.is_valid:
report.valid_files += 1
else:
report.invalid_files += 1
continue
# 含目标阳离子统计
if not info.contains_target_cation:
continue
report.cation_containing_count += 1
# 阴离子分布
for anion in info.anion_types:
report.anion_distribution[anion] = report.anion_distribution.get(anion, 0) + 1
if info.anion_mode == "single":
report.single_anion_count += 1
elif info.anion_mode == "mixed":
report.mixed_anion_count += 1
# 根据阴离子模式过滤
if self.anion_mode == "single" and info.anion_mode != "single":
continue
if self.anion_mode == "mixed" and info.anion_mode != "mixed":
continue
if info.anion_mode == "none":
continue
# 氧化态统计
if info.has_oxidation_states:
report.with_oxidation_states += 1
else:
report.without_oxidation_states += 1
# 共占位统计
if info.needs_expansion:
report.needs_expansion_count += 1
if info.cation_has_partial_occupancy:
report.cation_partial_occupancy_count += 1
if info.anion_has_partial_occupancy:
report.anion_partial_occupancy_count += 1
# 其他问题统计
if info.is_binary_compound:
report.binary_compound_count += 1
if info.has_water_molecule:
report.has_water_count += 1
if info.has_radioactive_elements:
report.has_radioactive_count += 1
# 可处理性统计
if info.can_process:
if info.needs_expansion:
report.needs_preprocessing += 1
else:
report.directly_processable += 1
else:
report.cannot_process += 1
# 统计跳过原因
if info.skip_reason:
for reason in info.skip_reason.split("; "):
report.skip_reasons_summary[reason] = \
report.skip_reasons_summary.get(reason, 0) + 1
# 计算比例
if report.valid_files > 0:
report.cation_containing_ratio = report.cation_containing_count / report.valid_files
if report.cation_containing_count > 0:
for anion, count in report.anion_distribution.items():
report.anion_ratios[anion] = count / report.cation_containing_count

View File

@@ -0,0 +1,140 @@
"""
报告生成器:生成格式化的分析报告
"""
from typing import Optional
from .database_analyzer import DatabaseReport
class ReportGenerator:
"""报告生成器"""
@staticmethod
def print_report(report: DatabaseReport, detailed: bool = False):
"""打印分析报告"""
print("\n" + "=" * 70)
print(" 数据库分析报告")
print("=" * 70)
# 基础信息
print(f"\n📁 数据库路径: {report.database_path}")
print(f"🎯 目标阳离子: {report.target_cation}")
print(f"🎯 目标阴离子: {', '.join(sorted(report.target_anions))}")
print(f"🎯 阴离子模式: {report.anion_mode}")
# 基础统计
print("\n" + "-" * 70)
print("【1. 基础统计】")
print("-" * 70)
print(f" 总 CIF 文件数: {report.total_files}")
print(f" 有效文件数: {report.valid_files}")
print(f" 无效文件数: {report.invalid_files}")
print(f"{report.target_cation} 化合物数: {report.cation_containing_count}")
print(f"{report.target_cation} 化合物占比: {report.cation_containing_ratio:.1%}")
# 阴离子分布
print("\n" + "-" * 70)
print(f"【2. 阴离子分布】(在含 {report.target_cation} 的化合物中)")
print("-" * 70)
if report.anion_distribution:
for anion in sorted(report.anion_distribution.keys()):
count = report.anion_distribution[anion]
ratio = report.anion_ratios.get(anion, 0)
bar = "" * int(ratio * 30)
print(f" {anion:5s}: {count:6d} ({ratio:6.1%}) {bar}")
print(f"\n 单一阴离子化合物: {report.single_anion_count}")
print(f" 复合阴离子化合物: {report.mixed_anion_count}")
# 数据质量
print("\n" + "-" * 70)
print("【3. 数据质量检查】")
print("-" * 70)
total_target = report.cation_containing_count
if total_target > 0:
print(f" 含化合价信息: {report.with_oxidation_states:6d} "
f"({report.with_oxidation_states / total_target:.1%})")
print(f" 缺化合价信息: {report.without_oxidation_states:6d} "
f"({report.without_oxidation_states / total_target:.1%})")
print()
print(f" 需扩胞处理: {report.needs_expansion_count:6d} "
f"({report.needs_expansion_count / total_target:.1%})")
print(f" {report.target_cation}共占位(不可处理): {report.cation_partial_occupancy_count:6d} "
f"({report.cation_partial_occupancy_count / total_target:.1%})")
print(f" 阴离子共占位: {report.anion_partial_occupancy_count:6d} "
f"({report.anion_partial_occupancy_count / total_target:.1%})")
print()
print(f" 二元化合物: {report.binary_compound_count:6d}")
print(f" 含水分子: {report.has_water_count:6d}")
print(f" 含放射性元素: {report.has_radioactive_count:6d}")
# 可处理性评估
print("\n" + "-" * 70)
print("【4. 可处理性评估】")
print("-" * 70)
total_processable = report.directly_processable + report.needs_preprocessing
print(f" ✅ 可直接处理: {report.directly_processable:6d}")
print(f" ⚠️ 需预处理(扩胞): {report.needs_preprocessing:6d}")
print(f" ❌ 无法处理: {report.cannot_process:6d}")
print(f" ─────────────────────────────")
print(f" 📊 可处理总数: {total_processable:6d}")
# 跳过原因汇总
if report.skip_reasons_summary and detailed:
print("\n" + "-" * 70)
print("【5. 无法处理的原因统计】")
print("-" * 70)
sorted_reasons = sorted(
report.skip_reasons_summary.items(),
key=lambda x: x[1],
reverse=True
)
for reason, count in sorted_reasons:
print(f" {reason:30s}: {count:6d}")
print("\n" + "=" * 70)
@staticmethod
def export_to_csv(report: DatabaseReport, output_path: str):
"""导出详细结果到CSV"""
import csv
with open(output_path, 'w', newline='', encoding='utf-8') as f:
writer = csv.writer(f)
# 写入表头
headers = [
'file_name', 'is_valid', 'contains_target_cation',
'anion_types', 'anion_mode', 'has_oxidation_states',
'has_partial_occupancy', 'cation_partial_occupancy',
'anion_partial_occupancy', 'needs_expansion',
'is_binary', 'has_water', 'has_radioactive',
'can_process', 'skip_reason'
]
writer.writerow(headers)
# 写入数据
for info in report.all_structures:
row = [
info.file_name,
info.is_valid,
info.contains_target_cation,
'+'.join(sorted(info.anion_types)) if info.anion_types else '',
info.anion_mode,
info.has_oxidation_states,
info.has_partial_occupancy,
info.cation_has_partial_occupancy,
info.anion_has_partial_occupancy,
info.needs_expansion,
info.is_binary_compound,
info.has_water_molecule,
info.has_radioactive_elements,
info.can_process,
info.skip_reason
]
writer.writerow(row)
print(f"详细结果已导出到: {output_path}")

View File

@@ -0,0 +1,223 @@
"""
结构检查器对单个CIF文件进行深度分析
"""
from dataclasses import dataclass, field
from typing import Set, Dict, List, Optional, Tuple
from pymatgen.core import Structure
from pymatgen.core.periodic_table import Element, Specie
@dataclass
class StructureInfo:
"""单个结构的分析结果"""
file_path: str
file_name: str
# 基础信息
is_valid: bool = False
error_message: str = ""
# 元素组成
elements: Set[str] = field(default_factory=set)
num_sites: int = 0
# 阳离子/阴离子信息
contains_target_cation: bool = False
anion_types: Set[str] = field(default_factory=set) # 找到的目标阴离子
anion_mode: str = "" # "single", "mixed", "none"
# 数据质量标记
has_oxidation_states: bool = False
has_partial_occupancy: bool = False # 是否有共占位
cation_has_partial_occupancy: bool = False # 目标阳离子是否共占位
anion_has_partial_occupancy: bool = False # 阴离子是否共占位
has_water_molecule: bool = False
has_radioactive_elements: bool = False
is_binary_compound: bool = False
# 可处理性
needs_expansion: bool = False # 需要扩胞
can_process: bool = False # 可以直接处理
skip_reason: str = "" # 跳过原因
class StructureInspector:
"""结构检查器"""
# 预定义的阴离子集合
VALID_ANIONS = {'O', 'S', 'Cl', 'Br'}
# 放射性元素
RADIOACTIVE_ELEMENTS = {
'U', 'Th', 'Pu', 'Ra', 'Rn', 'Po', 'Np', 'Am',
'Cm', 'Bk', 'Cf', 'Es', 'Fm', 'Md', 'No', 'Lr'
}
def __init__(self, target_cation: str = "Li", target_anions: Set[str] = None):
"""
初始化检查器
Args:
target_cation: 目标阳离子 (如 "Li", "Na")
target_anions: 目标阴离子集合 (如 {"O", "S"})
"""
self.target_cation = target_cation
self.target_anions = target_anions or self.VALID_ANIONS
def inspect(self, file_path: str) -> StructureInfo:
"""
分析单个CIF文件
Args:
file_path: CIF文件路径
Returns:
StructureInfo: 分析结果
"""
import os
info = StructureInfo(
file_path=file_path,
file_name=os.path.basename(file_path)
)
# 尝试读取结构
try:
structure = Structure.from_file(file_path)
info.is_valid = True
except Exception as e:
info.error_message = str(e)
return info
# 基础信息
info.elements = {str(el) for el in structure.composition.elements}
info.num_sites = structure.num_sites
# 检查是否为二元化合物
info.is_binary_compound = len(structure.types_of_specie) == 2
# 检查是否含有目标阳离子
info.contains_target_cation = self.target_cation in info.elements
# 检查阴离子类型
info.anion_types = info.elements.intersection(self.target_anions)
if len(info.anion_types) == 0:
info.anion_mode = "none"
elif len(info.anion_types) == 1:
info.anion_mode = "single"
else:
info.anion_mode = "mixed"
# 检查氧化态
info.has_oxidation_states = self._check_oxidation_states(structure)
# 检查共占位
self._check_partial_occupancy(structure, info)
# 检查水分子
info.has_water_molecule = self._check_water_molecule(structure)
# 检查放射性元素
info.has_radioactive_elements = bool(
info.elements.intersection(self.RADIOACTIVE_ELEMENTS)
)
# 判断是否需要扩胞
info.needs_expansion = info.has_partial_occupancy and not info.cation_has_partial_occupancy
# 判断可处理性
self._evaluate_processability(info)
return info
def _check_oxidation_states(self, structure: Structure) -> bool:
"""检查结构是否包含氧化态信息"""
try:
for site in structure.sites:
for specie in site.species.keys():
if isinstance(specie, Specie):
return True
return False
except:
return False
def _check_partial_occupancy(self, structure: Structure, info: StructureInfo):
"""检查共占位情况"""
try:
for site in structure.sites:
if len(site.species) > 1:
info.has_partial_occupancy = True
# 检查是否涉及目标阳离子
species_symbols = [str(sp.symbol) if hasattr(sp, 'symbol') else str(sp)
for sp in site.species.keys()]
if self.target_cation in species_symbols:
info.cation_has_partial_occupancy = True
# 检查是否涉及阴离子
if any(sym in self.target_anions for sym in species_symbols):
info.anion_has_partial_occupancy = True
# 检查单一物种的部分占据
for specie, occupancy in site.species.items():
if occupancy < 1.0:
info.has_partial_occupancy = True
symbol = str(specie.symbol) if hasattr(specie, 'symbol') else str(specie)
if symbol == self.target_cation:
info.cation_has_partial_occupancy = True
if symbol in self.target_anions:
info.anion_has_partial_occupancy = True
except Exception as e:
pass
def _check_water_molecule(self, structure: Structure) -> bool:
"""检查是否含有水分子"""
try:
oxygen_sites = [site for site in structure.sites
if 'O' in str(site.species)]
hydrogen_sites = [site for site in structure.sites
if 'H' in str(site.species)]
for o_site in oxygen_sites:
nearby_h = [h for h in hydrogen_sites
if o_site.distance(h) < 1.2]
if len(nearby_h) >= 2:
return True
return False
except:
return False
def _evaluate_processability(self, info: StructureInfo):
"""评估可处理性"""
skip_reasons = []
if not info.is_valid:
skip_reasons.append("无法解析CIF文件")
if not info.contains_target_cation:
skip_reasons.append(f"不含{self.target_cation}")
if info.anion_mode == "none":
skip_reasons.append("不含目标阴离子")
if info.is_binary_compound:
skip_reasons.append("二元化合物")
if info.has_radioactive_elements:
skip_reasons.append("含放射性元素")
if info.cation_has_partial_occupancy:
skip_reasons.append(f"{self.target_cation}存在共占位")
if info.anion_has_partial_occupancy:
skip_reasons.append("阴离子存在共占位")
if info.has_water_molecule:
skip_reasons.append("含水分子")
if skip_reasons:
info.can_process = False
info.skip_reason = "; ".join(skip_reasons)
else:
info.can_process = True

0
src/core/__init__.py Normal file
View File

0
src/core/controller.py Normal file
View File

View File

View File

View File

View File

View File

0
src/utils/__init__.py Normal file
View File

0
src/utils/io.py Normal file
View File

0
src/utils/logger.py Normal file
View File

0
src/utils/structure.py Normal file
View File