diff --git a/config/settings.yaml b/config/settings.yaml new file mode 100644 index 0000000..e69de29 diff --git a/config/valence_states.yaml b/config/valence_states.yaml new file mode 100644 index 0000000..e69de29 diff --git a/main.py b/main.py new file mode 100644 index 0000000..656be87 --- /dev/null +++ b/main.py @@ -0,0 +1,103 @@ +""" +高通量筛选与扩胞项目 - 主入口 +交互式命令行界面 +""" +import os +import sys + +# 添加 src 到路径 +sys.path.insert(0, os.path.join(os.path.dirname(__file__), 'src')) + +from analysis.database_analyzer import DatabaseAnalyzer +from analysis.report_generator import ReportGenerator + + +def get_user_input(): + """获取用户输入""" + print("\n" + "=" * 70) + print(" 高通量筛选与扩胞项目 - 数据库分析工具") + print("=" * 70) + + # 1. 获取数据库路径 + while True: + db_path = input("\n请输入数据库路径: ").strip() + if os.path.exists(db_path): + break + print(f"❌ 路径不存在: {db_path}") + + # 2. 获取目标阳离子 + cation = input("请输入目标阳离子 [默认: Li]: ").strip() or "Li" + + # 3. 获取目标阴离子 + anion_input = input("请输入目标阴离子 (用逗号分隔) [默认: O,S,Cl,Br]: ").strip() + if anion_input: + anions = set(a.strip() for a in anion_input.split(',')) + else: + anions = {'O', 'S', 'Cl', 'Br'} + + # 4. 选择阴离子模式 + print("\n阴离子模式选择:") + print(" 1. 仅单一阴离子化合物") + print(" 2. 仅复合阴离子化合物") + print(" 3. 全部 (默认)") + mode_choice = input("请选择 [1/2/3]: ").strip() + + mode_map = {'1': 'single', '2': 'mixed', '3': 'all', '': 'all'} + anion_mode = mode_map.get(mode_choice, 'all') + + # 5. 并行数 + n_jobs_input = input("并行线程数 [默认: 4]: ").strip() + n_jobs = int(n_jobs_input) if n_jobs_input.isdigit() else 4 + + return { + 'database_path': db_path, + 'target_cation': cation, + 'target_anions': anions, + 'anion_mode': anion_mode, + 'n_jobs': n_jobs + } + + +def main(): + """主函数""" + # 获取用户输入 + params = get_user_input() + + print("\n" + "-" * 70) + print("开始分析数据库...") + print("-" * 70) + + # 创建分析器 + analyzer = DatabaseAnalyzer( + database_path=params['database_path'], + target_cation=params['target_cation'], + target_anions=params['target_anions'], + anion_mode=params['anion_mode'], + n_jobs=params['n_jobs'] + ) + + # 执行分析 + report = analyzer.analyze(show_progress=True) + + # 打印报告 + ReportGenerator.print_report(report, detailed=True) + + # 询问是否导出 + export = input("\n是否导出详细结果到CSV? [y/N]: ").strip().lower() + if export == 'y': + output_path = input("输出文件路径 [默认: analysis_report.csv]: ").strip() + output_path = output_path or "analysis_report.csv" + ReportGenerator.export_to_csv(report, output_path) + + # 询问是否继续处理 + print("\n" + "-" * 70) + proceed = input("是否继续进行预处理? [y/N]: ").strip().lower() + if proceed == 'y': + print("预处理功能将在下一阶段实现...") + # TODO: 调用预处理模块 + + print("\n分析完成!") + + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/src/__init__.py b/src/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/analysis/__init__.py b/src/analysis/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/analysis/database_analyzer.py b/src/analysis/database_analyzer.py new file mode 100644 index 0000000..8ca776a --- /dev/null +++ b/src/analysis/database_analyzer.py @@ -0,0 +1,246 @@ +""" +数据库分析器:分析整个CIF数据库的构成和质量 +""" +import os +from dataclasses import dataclass, field +from typing import Dict, List, Set, Optional +from concurrent.futures import ThreadPoolExecutor, as_completed +from tqdm import tqdm + +from .structure_inspector import StructureInspector, StructureInfo + + +@dataclass +class DatabaseReport: + """数据库分析报告""" + + # 基础统计 + database_path: str = "" + total_files: int = 0 + valid_files: int = 0 + invalid_files: int = 0 + + # 目标元素统计 + target_cation: str = "" + target_anions: Set[str] = field(default_factory=set) + anion_mode: str = "" # "single", "mixed", "all" + + # 含目标阳离子的统计 + cation_containing_count: int = 0 + cation_containing_ratio: float = 0.0 + + # 阴离子分布 (在含目标阳离子的化合物中) + anion_distribution: Dict[str, int] = field(default_factory=dict) + anion_ratios: Dict[str, float] = field(default_factory=dict) + single_anion_count: int = 0 + mixed_anion_count: int = 0 + + # 数据质量统计 + with_oxidation_states: int = 0 + without_oxidation_states: int = 0 + + needs_expansion_count: int = 0 # 需要扩胞的数量 + cation_partial_occupancy_count: int = 0 # 阳离子共占位 + anion_partial_occupancy_count: int = 0 # 阴离子共占位 + + binary_compound_count: int = 0 + has_water_count: int = 0 + has_radioactive_count: int = 0 + + # 可处理性统计 + directly_processable: int = 0 + needs_preprocessing: int = 0 + cannot_process: int = 0 + + # 详细信息 + all_structures: List[StructureInfo] = field(default_factory=list) + skip_reasons_summary: Dict[str, int] = field(default_factory=dict) + + +class DatabaseAnalyzer: + """数据库分析器""" + + def __init__( + self, + database_path: str, + target_cation: str = "Li", + target_anions: Set[str] = None, + anion_mode: str = "all", # "single", "mixed", "all" + n_jobs: int = 4 + ): + """ + 初始化分析器 + + Args: + database_path: 数据库路径 + target_cation: 目标阳离子 + target_anions: 目标阴离子集合 + anion_mode: 阴离子模式 ("single"=仅单一, "mixed"=仅复合, "all"=全部) + n_jobs: 并行数 + """ + self.database_path = database_path + self.target_cation = target_cation + self.target_anions = target_anions or {'O', 'S', 'Cl', 'Br'} + self.anion_mode = anion_mode + self.n_jobs = n_jobs + + self.inspector = StructureInspector( + target_cation=target_cation, + target_anions=self.target_anions + ) + + def analyze(self, show_progress: bool = True) -> DatabaseReport: + """ + 分析数据库 + + Args: + show_progress: 是否显示进度条 + + Returns: + DatabaseReport: 分析报告 + """ + report = DatabaseReport( + database_path=self.database_path, + target_cation=self.target_cation, + target_anions=self.target_anions, + anion_mode=self.anion_mode + ) + + # 获取所有CIF文件 + cif_files = self._get_cif_files() + report.total_files = len(cif_files) + + if report.total_files == 0: + print(f"警告: 在 {self.database_path} 中未找到CIF文件") + return report + + # 并行分析所有文件 + results = self._analyze_files(cif_files, show_progress) + report.all_structures = results + + # 统计结果 + self._compute_statistics(report) + + return report + + def _get_cif_files(self) -> List[str]: + """获取所有CIF文件路径""" + cif_files = [] + + if os.path.isfile(self.database_path): + if self.database_path.endswith('.cif'): + cif_files.append(self.database_path) + else: + for root, dirs, files in os.walk(self.database_path): + for f in files: + if f.endswith('.cif'): + cif_files.append(os.path.join(root, f)) + + return cif_files + + def _analyze_files( + self, + cif_files: List[str], + show_progress: bool + ) -> List[StructureInfo]: + """并行分析文件""" + results = [] + + if self.n_jobs == 1: + # 单线程 + iterator = tqdm(cif_files, desc="分析CIF文件") if show_progress else cif_files + for f in iterator: + results.append(self.inspector.inspect(f)) + else: + # 多线程 + with ThreadPoolExecutor(max_workers=self.n_jobs) as executor: + futures = {executor.submit(self.inspector.inspect, f): f for f in cif_files} + + iterator = tqdm(as_completed(futures), total=len(futures), desc="分析CIF文件") \ + if show_progress else as_completed(futures) + + for future in iterator: + try: + results.append(future.result()) + except Exception as e: + print(f"分析失败: {e}") + + return results + + def _compute_statistics(self, report: DatabaseReport): + """计算统计数据""" + + for info in report.all_structures: + # 有效性统计 + if info.is_valid: + report.valid_files += 1 + else: + report.invalid_files += 1 + continue + + # 含目标阳离子统计 + if not info.contains_target_cation: + continue + + report.cation_containing_count += 1 + + # 阴离子分布 + for anion in info.anion_types: + report.anion_distribution[anion] = report.anion_distribution.get(anion, 0) + 1 + + if info.anion_mode == "single": + report.single_anion_count += 1 + elif info.anion_mode == "mixed": + report.mixed_anion_count += 1 + + # 根据阴离子模式过滤 + if self.anion_mode == "single" and info.anion_mode != "single": + continue + if self.anion_mode == "mixed" and info.anion_mode != "mixed": + continue + if info.anion_mode == "none": + continue + + # 氧化态统计 + if info.has_oxidation_states: + report.with_oxidation_states += 1 + else: + report.without_oxidation_states += 1 + + # 共占位统计 + if info.needs_expansion: + report.needs_expansion_count += 1 + if info.cation_has_partial_occupancy: + report.cation_partial_occupancy_count += 1 + if info.anion_has_partial_occupancy: + report.anion_partial_occupancy_count += 1 + + # 其他问题统计 + if info.is_binary_compound: + report.binary_compound_count += 1 + if info.has_water_molecule: + report.has_water_count += 1 + if info.has_radioactive_elements: + report.has_radioactive_count += 1 + + # 可处理性统计 + if info.can_process: + if info.needs_expansion: + report.needs_preprocessing += 1 + else: + report.directly_processable += 1 + else: + report.cannot_process += 1 + # 统计跳过原因 + if info.skip_reason: + for reason in info.skip_reason.split("; "): + report.skip_reasons_summary[reason] = \ + report.skip_reasons_summary.get(reason, 0) + 1 + + # 计算比例 + if report.valid_files > 0: + report.cation_containing_ratio = report.cation_containing_count / report.valid_files + + if report.cation_containing_count > 0: + for anion, count in report.anion_distribution.items(): + report.anion_ratios[anion] = count / report.cation_containing_count \ No newline at end of file diff --git a/src/analysis/report_generator.py b/src/analysis/report_generator.py new file mode 100644 index 0000000..398b067 --- /dev/null +++ b/src/analysis/report_generator.py @@ -0,0 +1,140 @@ +""" +报告生成器:生成格式化的分析报告 +""" +from typing import Optional +from .database_analyzer import DatabaseReport + + +class ReportGenerator: + """报告生成器""" + + @staticmethod + def print_report(report: DatabaseReport, detailed: bool = False): + """打印分析报告""" + + print("\n" + "=" * 70) + print(" 数据库分析报告") + print("=" * 70) + + # 基础信息 + print(f"\n📁 数据库路径: {report.database_path}") + print(f"🎯 目标阳离子: {report.target_cation}") + print(f"🎯 目标阴离子: {', '.join(sorted(report.target_anions))}") + print(f"🎯 阴离子模式: {report.anion_mode}") + + # 基础统计 + print("\n" + "-" * 70) + print("【1. 基础统计】") + print("-" * 70) + print(f" 总 CIF 文件数: {report.total_files}") + print(f" 有效文件数: {report.valid_files}") + print(f" 无效文件数: {report.invalid_files}") + print(f" 含 {report.target_cation} 化合物数: {report.cation_containing_count}") + print(f" 含 {report.target_cation} 化合物占比: {report.cation_containing_ratio:.1%}") + + # 阴离子分布 + print("\n" + "-" * 70) + print(f"【2. 阴离子分布】(在含 {report.target_cation} 的化合物中)") + print("-" * 70) + + if report.anion_distribution: + for anion in sorted(report.anion_distribution.keys()): + count = report.anion_distribution[anion] + ratio = report.anion_ratios.get(anion, 0) + bar = "█" * int(ratio * 30) + print(f" {anion:5s}: {count:6d} ({ratio:6.1%}) {bar}") + + print(f"\n 单一阴离子化合物: {report.single_anion_count}") + print(f" 复合阴离子化合物: {report.mixed_anion_count}") + + # 数据质量 + print("\n" + "-" * 70) + print("【3. 数据质量检查】") + print("-" * 70) + + total_target = report.cation_containing_count + if total_target > 0: + print(f" 含化合价信息: {report.with_oxidation_states:6d} " + f"({report.with_oxidation_states / total_target:.1%})") + print(f" 缺化合价信息: {report.without_oxidation_states:6d} " + f"({report.without_oxidation_states / total_target:.1%})") + print() + print(f" 需扩胞处理: {report.needs_expansion_count:6d} " + f"({report.needs_expansion_count / total_target:.1%})") + print(f" {report.target_cation}共占位(不可处理): {report.cation_partial_occupancy_count:6d} " + f"({report.cation_partial_occupancy_count / total_target:.1%})") + print(f" 阴离子共占位: {report.anion_partial_occupancy_count:6d} " + f"({report.anion_partial_occupancy_count / total_target:.1%})") + print() + print(f" 二元化合物: {report.binary_compound_count:6d}") + print(f" 含水分子: {report.has_water_count:6d}") + print(f" 含放射性元素: {report.has_radioactive_count:6d}") + + # 可处理性评估 + print("\n" + "-" * 70) + print("【4. 可处理性评估】") + print("-" * 70) + + total_processable = report.directly_processable + report.needs_preprocessing + print(f" ✅ 可直接处理: {report.directly_processable:6d}") + print(f" ⚠️ 需预处理(扩胞): {report.needs_preprocessing:6d}") + print(f" ❌ 无法处理: {report.cannot_process:6d}") + print(f" ─────────────────────────────") + print(f" 📊 可处理总数: {total_processable:6d}") + + # 跳过原因汇总 + if report.skip_reasons_summary and detailed: + print("\n" + "-" * 70) + print("【5. 无法处理的原因统计】") + print("-" * 70) + sorted_reasons = sorted( + report.skip_reasons_summary.items(), + key=lambda x: x[1], + reverse=True + ) + for reason, count in sorted_reasons: + print(f" {reason:30s}: {count:6d}") + + print("\n" + "=" * 70) + + @staticmethod + def export_to_csv(report: DatabaseReport, output_path: str): + """导出详细结果到CSV""" + import csv + + with open(output_path, 'w', newline='', encoding='utf-8') as f: + writer = csv.writer(f) + + # 写入表头 + headers = [ + 'file_name', 'is_valid', 'contains_target_cation', + 'anion_types', 'anion_mode', 'has_oxidation_states', + 'has_partial_occupancy', 'cation_partial_occupancy', + 'anion_partial_occupancy', 'needs_expansion', + 'is_binary', 'has_water', 'has_radioactive', + 'can_process', 'skip_reason' + ] + writer.writerow(headers) + + # 写入数据 + for info in report.all_structures: + row = [ + info.file_name, + info.is_valid, + info.contains_target_cation, + '+'.join(sorted(info.anion_types)) if info.anion_types else '', + info.anion_mode, + info.has_oxidation_states, + info.has_partial_occupancy, + info.cation_has_partial_occupancy, + info.anion_has_partial_occupancy, + info.needs_expansion, + info.is_binary_compound, + info.has_water_molecule, + info.has_radioactive_elements, + info.can_process, + info.skip_reason + ] + writer.writerow(row) + + print(f"详细结果已导出到: {output_path}") \ No newline at end of file diff --git a/src/analysis/structure_inspector.py b/src/analysis/structure_inspector.py new file mode 100644 index 0000000..2099a22 --- /dev/null +++ b/src/analysis/structure_inspector.py @@ -0,0 +1,223 @@ +""" +结构检查器:对单个CIF文件进行深度分析 +""" +from dataclasses import dataclass, field +from typing import Set, Dict, List, Optional, Tuple +from pymatgen.core import Structure +from pymatgen.core.periodic_table import Element, Specie + + +@dataclass +class StructureInfo: + """单个结构的分析结果""" + file_path: str + file_name: str + + # 基础信息 + is_valid: bool = False + error_message: str = "" + + # 元素组成 + elements: Set[str] = field(default_factory=set) + num_sites: int = 0 + + # 阳离子/阴离子信息 + contains_target_cation: bool = False + anion_types: Set[str] = field(default_factory=set) # 找到的目标阴离子 + anion_mode: str = "" # "single", "mixed", "none" + + # 数据质量标记 + has_oxidation_states: bool = False + has_partial_occupancy: bool = False # 是否有共占位 + cation_has_partial_occupancy: bool = False # 目标阳离子是否共占位 + anion_has_partial_occupancy: bool = False # 阴离子是否共占位 + has_water_molecule: bool = False + has_radioactive_elements: bool = False + is_binary_compound: bool = False + + # 可处理性 + needs_expansion: bool = False # 需要扩胞 + can_process: bool = False # 可以直接处理 + skip_reason: str = "" # 跳过原因 + + +class StructureInspector: + """结构检查器""" + + # 预定义的阴离子集合 + VALID_ANIONS = {'O', 'S', 'Cl', 'Br'} + + # 放射性元素 + RADIOACTIVE_ELEMENTS = { + 'U', 'Th', 'Pu', 'Ra', 'Rn', 'Po', 'Np', 'Am', + 'Cm', 'Bk', 'Cf', 'Es', 'Fm', 'Md', 'No', 'Lr' + } + + def __init__(self, target_cation: str = "Li", target_anions: Set[str] = None): + """ + 初始化检查器 + + Args: + target_cation: 目标阳离子 (如 "Li", "Na") + target_anions: 目标阴离子集合 (如 {"O", "S"}) + """ + self.target_cation = target_cation + self.target_anions = target_anions or self.VALID_ANIONS + + def inspect(self, file_path: str) -> StructureInfo: + """ + 分析单个CIF文件 + + Args: + file_path: CIF文件路径 + + Returns: + StructureInfo: 分析结果 + """ + import os + info = StructureInfo( + file_path=file_path, + file_name=os.path.basename(file_path) + ) + + # 尝试读取结构 + try: + structure = Structure.from_file(file_path) + info.is_valid = True + except Exception as e: + info.error_message = str(e) + return info + + # 基础信息 + info.elements = {str(el) for el in structure.composition.elements} + info.num_sites = structure.num_sites + + # 检查是否为二元化合物 + info.is_binary_compound = len(structure.types_of_specie) == 2 + + # 检查是否含有目标阳离子 + info.contains_target_cation = self.target_cation in info.elements + + # 检查阴离子类型 + info.anion_types = info.elements.intersection(self.target_anions) + if len(info.anion_types) == 0: + info.anion_mode = "none" + elif len(info.anion_types) == 1: + info.anion_mode = "single" + else: + info.anion_mode = "mixed" + + # 检查氧化态 + info.has_oxidation_states = self._check_oxidation_states(structure) + + # 检查共占位 + self._check_partial_occupancy(structure, info) + + # 检查水分子 + info.has_water_molecule = self._check_water_molecule(structure) + + # 检查放射性元素 + info.has_radioactive_elements = bool( + info.elements.intersection(self.RADIOACTIVE_ELEMENTS) + ) + + # 判断是否需要扩胞 + info.needs_expansion = info.has_partial_occupancy and not info.cation_has_partial_occupancy + + # 判断可处理性 + self._evaluate_processability(info) + + return info + + def _check_oxidation_states(self, structure: Structure) -> bool: + """检查结构是否包含氧化态信息""" + try: + for site in structure.sites: + for specie in site.species.keys(): + if isinstance(specie, Specie): + return True + return False + except: + return False + + def _check_partial_occupancy(self, structure: Structure, info: StructureInfo): + """检查共占位情况""" + try: + for site in structure.sites: + if len(site.species) > 1: + info.has_partial_occupancy = True + + # 检查是否涉及目标阳离子 + species_symbols = [str(sp.symbol) if hasattr(sp, 'symbol') else str(sp) + for sp in site.species.keys()] + + if self.target_cation in species_symbols: + info.cation_has_partial_occupancy = True + + # 检查是否涉及阴离子 + if any(sym in self.target_anions for sym in species_symbols): + info.anion_has_partial_occupancy = True + + # 检查单一物种的部分占据 + for specie, occupancy in site.species.items(): + if occupancy < 1.0: + info.has_partial_occupancy = True + symbol = str(specie.symbol) if hasattr(specie, 'symbol') else str(specie) + + if symbol == self.target_cation: + info.cation_has_partial_occupancy = True + if symbol in self.target_anions: + info.anion_has_partial_occupancy = True + except Exception as e: + pass + + def _check_water_molecule(self, structure: Structure) -> bool: + """检查是否含有水分子""" + try: + oxygen_sites = [site for site in structure.sites + if 'O' in str(site.species)] + hydrogen_sites = [site for site in structure.sites + if 'H' in str(site.species)] + + for o_site in oxygen_sites: + nearby_h = [h for h in hydrogen_sites + if o_site.distance(h) < 1.2] + if len(nearby_h) >= 2: + return True + return False + except: + return False + + def _evaluate_processability(self, info: StructureInfo): + """评估可处理性""" + skip_reasons = [] + + if not info.is_valid: + skip_reasons.append("无法解析CIF文件") + + if not info.contains_target_cation: + skip_reasons.append(f"不含{self.target_cation}") + + if info.anion_mode == "none": + skip_reasons.append("不含目标阴离子") + + if info.is_binary_compound: + skip_reasons.append("二元化合物") + + if info.has_radioactive_elements: + skip_reasons.append("含放射性元素") + + if info.cation_has_partial_occupancy: + skip_reasons.append(f"{self.target_cation}存在共占位") + + if info.anion_has_partial_occupancy: + skip_reasons.append("阴离子存在共占位") + + if info.has_water_molecule: + skip_reasons.append("含水分子") + + if skip_reasons: + info.can_process = False + info.skip_reason = "; ".join(skip_reasons) + else: + info.can_process = True \ No newline at end of file diff --git a/src/core/__init__.py b/src/core/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/core/controller.py b/src/core/controller.py new file mode 100644 index 0000000..e69de29 diff --git a/src/preprocessing/__init__.py b/src/preprocessing/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/preprocessing/classifier.py b/src/preprocessing/classifier.py new file mode 100644 index 0000000..e69de29 diff --git a/src/preprocessing/cleaner.py b/src/preprocessing/cleaner.py new file mode 100644 index 0000000..e69de29 diff --git a/src/preprocessing/script_generator.py b/src/preprocessing/script_generator.py new file mode 100644 index 0000000..e69de29 diff --git a/src/preprocessing/validator.py b/src/preprocessing/validator.py new file mode 100644 index 0000000..e69de29 diff --git a/src/utils/__init__.py b/src/utils/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/utils/io.py b/src/utils/io.py new file mode 100644 index 0000000..e69de29 diff --git a/src/utils/logger.py b/src/utils/logger.py new file mode 100644 index 0000000..e69de29 diff --git a/src/utils/structure.py b/src/utils/structure.py new file mode 100644 index 0000000..e69de29