增加扩胞逻辑

This commit is contained in:
2025-12-14 17:57:42 +08:00
parent 2378a3f2a2
commit 9b36aa10ff
4 changed files with 310 additions and 153 deletions

View File

@@ -13,6 +13,8 @@ from .worker import analyze_single_file
from ..core.scheduler import ParallelScheduler, ResourceConfig
# 在 DatabaseReport 类中添加缺失的字段
@dataclass
class DatabaseReport:
"""数据库分析报告"""
@@ -42,7 +44,8 @@ class DatabaseReport:
with_oxidation_states: int = 0
without_oxidation_states: int = 0
needs_expansion_count: int = 0
cation_partial_occupancy_count: int = 0
cation_with_vacancy_count: int = 0 # Li与空位共占位新增
cation_with_other_cation_count: int = 0 # Li与其他阳离子共占位新增
anion_partial_occupancy_count: int = 0
binary_compound_count: int = 0
has_water_count: int = 0
@@ -57,26 +60,16 @@ class DatabaseReport:
all_structures: List[StructureInfo] = field(default_factory=list)
skip_reasons_summary: Dict[str, int] = field(default_factory=dict)
def to_dict(self) -> dict:
"""转换为可序列化的字典"""
d = asdict(self)
d['target_anions'] = list(self.target_anions)
d['all_structures'] = [asdict(s) for s in self.all_structures]
return d
def save(self, path: str):
"""保存报告"""
with open(path, 'w', encoding='utf-8') as f:
json.dump(self.to_dict(), f, indent=2, ensure_ascii=False)
@classmethod
def load(cls, path: str) -> 'DatabaseReport':
"""加载报告"""
with open(path, 'r', encoding='utf-8') as f:
d = json.load(f)
d['target_anions'] = set(d['target_anions'])
d['all_structures'] = [StructureInfo(**s) for s in d['all_structures']]
return cls(**d)
# 扩胞相关统计(新增)
expansion_stats: Dict[str, int] = field(default_factory=lambda: {
'no_expansion_needed': 0,
'expansion_factor_2': 0,
'expansion_factor_3': 0,
'expansion_factor_4_8': 0,
'expansion_factor_large': 0,
'cannot_expand': 0,
})
expansion_factor_distribution: Dict[int, int] = field(default_factory=dict)
class DatabaseAnalyzer:
@@ -220,76 +213,104 @@ class DatabaseAnalyzer:
script_path = os.path.join(output_dir, "submit.sh")
return self.scheduler.submit_slurm_job(script, script_path)
def _compute_statistics(self, report: DatabaseReport):
"""计算统计数据"""
for info in report.all_structures:
if info.is_valid:
report.valid_files += 1
else:
report.invalid_files += 1
continue
# 更新 _compute_statistics 方法
if not info.contains_target_cation:
continue
def _compute_statistics(self, report: DatabaseReport):
"""计算统计数据(含扩胞分析)"""
report.cation_containing_count += 1
for info in report.all_structures:
if info.is_valid:
report.valid_files += 1
else:
report.invalid_files += 1
continue
for anion in info.anion_types:
report.anion_distribution[anion] = \
report.anion_distribution.get(anion, 0) + 1
if not info.contains_target_cation:
continue
if info.anion_mode == "single":
report.single_anion_count += 1
elif info.anion_mode == "mixed":
report.mixed_anion_count += 1
report.cation_containing_count += 1
# 根据阴离子模式过滤
if self.anion_mode == "single" and info.anion_mode != "single":
continue
if self.anion_mode == "mixed" and info.anion_mode != "mixed":
continue
if info.anion_mode == "none":
continue
for anion in info.anion_types:
report.anion_distribution[anion] = \
report.anion_distribution.get(anion, 0) + 1
# 各项统计
if info.has_oxidation_states:
report.with_oxidation_states += 1
else:
report.without_oxidation_states += 1
if info.anion_mode == "single":
report.single_anion_count += 1
elif info.anion_mode == "mixed":
report.mixed_anion_count += 1
# 根据阴离子模式过滤
if self.anion_mode == "single" and info.anion_mode != "single":
continue
if self.anion_mode == "mixed" and info.anion_mode != "mixed":
continue
if info.anion_mode == "none":
continue
# 各项统计
if info.has_oxidation_states:
report.with_oxidation_states += 1
else:
report.without_oxidation_states += 1
# Li共占位统计修改
if info.cation_with_vacancy:
report.cation_with_vacancy_count += 1
if info.cation_with_other_cation:
report.cation_with_other_cation_count += 1
if info.anion_has_partial_occupancy:
report.anion_partial_occupancy_count += 1
if info.is_binary_compound:
report.binary_compound_count += 1
if info.has_water_molecule:
report.has_water_count += 1
if info.has_radioactive_elements:
report.has_radioactive_count += 1
# 可处理性
if info.can_process:
if info.needs_expansion:
report.needs_expansion_count += 1
if info.cation_has_partial_occupancy:
report.cation_partial_occupancy_count += 1
if info.anion_has_partial_occupancy:
report.anion_partial_occupancy_count += 1
if info.is_binary_compound:
report.binary_compound_count += 1
if info.has_water_molecule:
report.has_water_count += 1
if info.has_radioactive_elements:
report.has_radioactive_count += 1
# 可处理性
if info.can_process:
if info.needs_expansion:
report.needs_preprocessing += 1
else:
report.directly_processable += 1
report.needs_preprocessing += 1
else:
report.cannot_process += 1
if info.skip_reason:
for reason in info.skip_reason.split("; "):
report.skip_reasons_summary[reason] = \
report.skip_reasons_summary.get(reason, 0) + 1
report.directly_processable += 1
else:
report.cannot_process += 1
if info.skip_reason:
for reason in info.skip_reason.split("; "):
report.skip_reasons_summary[reason] = \
report.skip_reasons_summary.get(reason, 0) + 1
# 计算比例
if report.valid_files > 0:
report.cation_containing_ratio = \
report.cation_containing_count / report.valid_files
# 扩胞统计(新增)
exp_info = info.expansion_info
factor = exp_info.expansion_factor
if report.cation_containing_count > 0:
for anion, count in report.anion_distribution.items():
report.anion_ratios[anion] = \
count / report.cation_containing_count
if not exp_info.needs_expansion:
report.expansion_stats['no_expansion_needed'] += 1
elif not exp_info.can_expand:
report.expansion_stats['cannot_expand'] += 1
elif factor == 2:
report.expansion_stats['expansion_factor_2'] += 1
elif factor == 3:
report.expansion_stats['expansion_factor_3'] += 1
elif 4 <= factor <= 8:
report.expansion_stats['expansion_factor_4_8'] += 1
else:
report.expansion_stats['expansion_factor_large'] += 1
# 详细分布
if exp_info.needs_expansion and exp_info.can_expand:
report.expansion_factor_distribution[factor] = \
report.expansion_factor_distribution.get(factor, 0) + 1
report.needs_expansion_count += 1
# 计算比例
if report.valid_files > 0:
report.cation_containing_ratio = \
report.cation_containing_count / report.valid_files
if report.cation_containing_count > 0:
for anion, count in report.anion_distribution.items():
report.anion_ratios[anion] = \
count / report.cation_containing_count

View File

@@ -59,12 +59,10 @@ class ReportGenerator:
print(f" 缺化合价信息: {report.without_oxidation_states:6d} "
f"({report.without_oxidation_states / total_target:.1%})")
print()
print(f" 需扩胞处理: {report.needs_expansion_count:6d} "
f"({report.needs_expansion_count / total_target:.1%})")
print(f" {report.target_cation}共占位(不可处理): {report.cation_partial_occupancy_count:6d} "
f"({report.cation_partial_occupancy_count / total_target:.1%})")
print(f" 阴离子共占位: {report.anion_partial_occupancy_count:6d} "
f"({report.anion_partial_occupancy_count / total_target:.1%})")
print(f" {report.target_cation}与空位共占位(无需处理): {report.cation_with_vacancy_count:6d}")
print(f" {report.target_cation}与阳离子共占位(需扩胞): {report.cation_with_other_cation_count:6d}")
print(f" 阴离子共占位: {report.anion_partial_occupancy_count:6d}")
print(f" 需扩胞处理(总计): {report.needs_expansion_count:6d}")
print()
print(f" 二元化合物: {report.binary_compound_count:6d}")
print(f" 含水分子: {report.has_water_count:6d}")
@@ -83,7 +81,7 @@ class ReportGenerator:
print(f" 📊 可处理总数: {total_processable:6d}")
# 跳过原因汇总
if report.skip_reasons_summary and detailed:
if report.skip_reasons_summary:
print("\n" + "-" * 70)
print("【5. 无法处理的原因统计】")
print("-" * 70)
@@ -93,21 +91,16 @@ class ReportGenerator:
reverse=True
)
for reason, count in sorted_reasons:
print(f" {reason:30s}: {count:6d}")
print(f" {reason:35s}: {count:6d}")
print("\n" + "=" * 70)
# 扩胞分析(新增)
# 扩胞分析
print("\n" + "-" * 70)
print("5. 扩胞需求分析】")
print("6. 扩胞需求分析】")
print("-" * 70)
exp = report.expansion_stats
total_processable = report.directly_processable + report.needs_preprocessing
if total_processable > 0:
print(f" 无需扩胞: {exp['no_expansion_needed']:6d} "
f"({exp['no_expansion_needed'] / total_processable:.1%})")
print(f" 无需扩胞: {exp['no_expansion_needed']:6d}")
print(f" 扩胞因子=2: {exp['expansion_factor_2']:6d}")
print(f" 扩胞因子=3: {exp['expansion_factor_3']:6d}")
print(f" 扩胞因子=4~8: {exp['expansion_factor_4_8']:6d}")
@@ -121,6 +114,8 @@ class ReportGenerator:
count = report.expansion_factor_distribution[factor]
bar = "" * min(count, 30)
print(f" {factor:3d}x: {count:5d} {bar}")
print("\n" + "=" * 70)
@staticmethod
def export_to_csv(report: DatabaseReport, output_path: str):
"""导出详细结果到CSV"""

View File

@@ -64,9 +64,10 @@ class StructureInfo:
is_binary_compound: bool = False
# 共占位详细分析(新增)
cation_has_partial_occupancy: bool = False # 目标阳离子共占位
anion_has_partial_occupancy: bool = False # 阴离子共占位
other_has_partial_occupancy: bool = False # 其他元素共占位(需扩胞)
cation_with_vacancy: bool = False # Li与空位共占位不需处理
cation_with_other_cation: bool = False # Li与其他阳离子共占位需扩胞
anion_has_partial_occupancy: bool = False # 阴离子共占位
other_has_partial_occupancy: bool = False # 其他元素共占位(需扩胞)
expansion_info: ExpansionInfo = field(default_factory=ExpansionInfo)
# 可处理性
@@ -218,14 +219,17 @@ class StructureInspector:
continue
return None
# 在 StructureInspector 类中,替换 _analyze_partial_occupancy 方法
def _analyze_partial_occupancy(self, structure: Structure, info: StructureInfo):
"""
分析共占位情况(核心逻辑
分析共占位情况(修正版
关键规则:
- 目标阳离子(Li)的共占位 → 不可处理
- 离子共占位 → 需要扩胞,但通常不处理
- 其他阳离子共占位 → 需要扩胞处理
- Li与空位共占位 → 不需要处理cation_with_vacancy
- Li与其他阳离子共占位 → 需要扩胞cation_with_other_cation
- 离子共占位 → 通常不处理
- 其他阳离子共占位 → 需要扩胞
"""
occupancy_dict = defaultdict(list) # {occupation: [site_indices]}
occupancy_elements = {} # {occupation: [elements]}
@@ -234,60 +238,118 @@ class StructureInspector:
site_species = site.species
species_string = str(site.species)
# 检查是否有多个物种占据同一位点
if len(site_species) > 1:
info.has_partial_occupancy = True
# 提取各元素及其占据率
species_occu = {} # {element: occupancy}
for sp, occu in site_species.items():
elem = sp.symbol if hasattr(sp, 'symbol') else str(sp)
elem = self._get_element_from_species_string(elem)
if elem:
species_occu[elem] = occu
# 提取各元素符号
elements_at_site = []
for sp in site_species.keys():
elem = sp.symbol if hasattr(sp, 'symbol') else str(sp)
elem = self._get_element_from_species_string(elem)
if elem:
elements_at_site.append(elem)
total_occupancy = sum(species_occu.values())
elements_at_site = list(species_occu.keys())
# 判断是否涉及目标阳离子
if self.target_cation in elements_at_site:
info.cation_has_partial_occupancy = True
# 检查是否有部分占据
has_partial = any(occu < 1.0 for occu in species_occu.values()) or len(species_occu) > 1
if not has_partial:
continue
info.has_partial_occupancy = True
# 判断Li的共占位情况
if self.target_cation in elements_at_site:
li_occu = species_occu.get(self.target_cation, 0)
other_elements = [e for e in elements_at_site if e != self.target_cation]
if not other_elements and li_occu < 1.0:
# Li与空位共占位Li占据率<1但没有其他元素
info.cation_with_vacancy = True
elif other_elements:
# Li与其他元素共占位
other_are_anions = all(e in self.target_anions for e in other_elements)
if other_are_anions:
# Li与阴离子共占位罕见标记为阴离子共占位
info.anion_has_partial_occupancy = True
else:
# Li与其他阳离子共占位 → 需要扩胞
info.cation_with_other_cation = True
# 记录需要扩胞的占据率取非Li元素的占据率
for elem in other_elements:
if elem not in self.target_anions:
occu = species_occu.get(elem, 0)
if occu > 0 and occu < 1.0:
occupancy_dict[occu].append(i)
occupancy_elements[occu] = elements_at_site
else:
# 不涉及Li的位点
# 判断是否涉及阴离子
if any(elem in self.target_anions for elem in elements_at_site):
info.anion_has_partial_occupancy = True
# 判断是否涉及其他元素(需要扩胞处理的情况)
other_elements = [e for e in elements_at_site
if e != self.target_cation and e not in self.target_anions]
if other_elements:
else:
# 其他阳离子的共占位 → 需要扩胞
info.other_has_partial_occupancy = True
# 获取占据率(取非目标阳离子的占据率)
occu = self._get_occupancy_from_species_string(
species_string,
self.target_cation_variants
)
if occu is not None and occu != 1.0:
occupancy_dict[occu].append(i)
occupancy_elements[occu] = elements_at_site
# 检查单一物种的部分占据
for specie, occupancy in site_species.items():
if occupancy < 1.0:
info.has_partial_occupancy = True
elem = specie.symbol if hasattr(specie, 'symbol') else str(specie)
elem = self._get_element_from_species_string(elem)
if elem == self.target_cation:
info.cation_has_partial_occupancy = True
elif elem in self.target_anions:
info.anion_has_partial_occupancy = True
else:
info.other_has_partial_occupancy = True
occupancy_dict[occupancy].append(i)
occupancy_elements[occupancy] = [elem]
# 获取占据率
for elem, occu in species_occu.items():
if occu > 0 and occu < 1.0:
occupancy_dict[occu].append(i)
occupancy_elements[occu] = elements_at_site
break # 只记录一次
# 计算扩胞信息
self._calculate_expansion_info(info, occupancy_dict, occupancy_elements)
def _evaluate_processability(self, info: StructureInfo):
"""评估可处理性(修正版)"""
skip_reasons = []
if not info.is_valid:
skip_reasons.append("无法解析CIF文件")
if not info.contains_target_cation:
skip_reasons.append(f"不含{self.target_cation}")
if info.anion_mode == "none":
skip_reasons.append("不含目标阴离子")
if info.is_binary_compound:
skip_reasons.append("二元化合物")
if info.has_radioactive_elements:
skip_reasons.append("含放射性元素")
# Li与空位共占位 → 不需要处理不加入skip_reasons
# info.cation_with_vacancy 不影响可处理性
# Li与其他阳离子共占位 → 需要扩胞(如果扩胞因子合理则可处理)
if info.cation_with_other_cation:
if info.expansion_info.can_expand:
info.needs_expansion = True
else:
skip_reasons.append(f"{self.target_cation}与其他阳离子共占位且{info.expansion_info.skip_reason}")
# 阴离子共占位 → 不处理
if info.anion_has_partial_occupancy:
skip_reasons.append("阴离子存在共占位")
if info.has_water_molecule:
skip_reasons.append("含水分子")
# 其他阳离子共占位不涉及Li→ 需要扩胞
if info.other_has_partial_occupancy:
if info.expansion_info.can_expand:
info.needs_expansion = True
else:
skip_reasons.append(info.expansion_info.skip_reason)
if skip_reasons:
info.can_process = False
info.skip_reason = "; ".join(skip_reasons)
else:
info.can_process = True
def _calculate_expansion_info(
self,
info: StructureInfo,

View File

@@ -5,7 +5,7 @@
import os
import pickle
from typing import List, Tuple, Optional
from dataclasses import asdict
from dataclasses import asdict, fields
from .structure_inspector import StructureInspector, StructureInfo
@@ -38,11 +38,68 @@ def analyze_single_file(args: Tuple[str, str, set]) -> Optional[StructureInfo]:
)
def structure_info_to_dict(info: StructureInfo) -> dict:
"""
将 StructureInfo 转换为可序列化的字典
处理 set、dataclass 等特殊类型
"""
result = {}
for field in fields(info):
value = getattr(info, field.name)
# 处理 set 类型
if isinstance(value, set):
result[field.name] = list(value)
# 处理嵌套的 dataclass (如 ExpansionInfo)
elif hasattr(value, '__dataclass_fields__'):
result[field.name] = asdict(value)
# 处理 list 中可能包含的 dataclass
elif isinstance(value, list):
result[field.name] = [
asdict(item) if hasattr(item, '__dataclass_fields__') else item
for item in value
]
else:
result[field.name] = value
return result
def dict_to_structure_info(d: dict) -> StructureInfo:
"""
从字典恢复 StructureInfo 对象
"""
from .structure_inspector import ExpansionInfo, OccupancyInfo
# 处理 set 类型字段
if 'elements' in d and isinstance(d['elements'], list):
d['elements'] = set(d['elements'])
if 'anion_types' in d and isinstance(d['anion_types'], list):
d['anion_types'] = set(d['anion_types'])
if 'target_anions' in d and isinstance(d['target_anions'], list):
d['target_anions'] = set(d['target_anions'])
# 处理 ExpansionInfo
if 'expansion_info' in d and isinstance(d['expansion_info'], dict):
exp_dict = d['expansion_info']
# 处理 OccupancyInfo 列表
if 'occupancy_details' in exp_dict:
exp_dict['occupancy_details'] = [
OccupancyInfo(**occ) if isinstance(occ, dict) else occ
for occ in exp_dict['occupancy_details']
]
d['expansion_info'] = ExpansionInfo(**exp_dict)
return StructureInfo(**d)
def batch_analyze(
file_paths: List[str],
target_cation: str,
target_anions: set,
output_file: str = None
file_paths: List[str],
target_cation: str,
target_anions: set,
output_file: str = None
) -> List[StructureInfo]:
"""
批量分析文件用于SLURM子任务
@@ -77,12 +134,34 @@ def batch_analyze(
# 保存结果
if output_file:
serializable_results = [structure_info_to_dict(r) for r in results]
with open(output_file, 'wb') as f:
pickle.dump([asdict(r) for r in results], f)
pickle.dump(serializable_results, f)
return results
def load_results(result_file: str) -> List[StructureInfo]:
"""
从pickle文件加载结果
"""
with open(result_file, 'rb') as f:
data = pickle.load(f)
return [dict_to_structure_info(d) for d in data]
def merge_results(result_files: List[str]) -> List[StructureInfo]:
"""
合并多个结果文件用于汇总SLURM作业数组的输出
"""
all_results = []
for f in result_files:
if os.path.exists(f):
all_results.extend(load_results(f))
return all_results
# 用于SLURM作业数组的命令行入口
if __name__ == "__main__":
import argparse