This commit is contained in:
2025-12-07 13:56:33 +08:00
parent 49f54b04cd
commit c83985cd02
22 changed files with 2732 additions and 0 deletions

329
py/Occupation.py Normal file
View File

@@ -0,0 +1,329 @@
from fontTools.misc.plistlib import end_string
from pymatgen.core import Structure
import spglib
from fractions import Fraction
import random
from pymatgen.core.sites import PeriodicSite, Species,Element,Lattice
import numpy as np
import re
def typejudge(number):
if number in [1, 2]:
return "Triclinic"
elif 3 <= number <= 15:
return "Monoclinic"
elif 16 <= number <= 74:
return "Orthorhombic"
elif 75 <= number <= 142:
return "Tetragonal"
elif 143 <= number <= 167:
return "Trigonal"
elif 168 <= number <= 194:
return "Hexagonal"
elif 195 <= number <= 230:
return "Cubic"
else:
return "Unknown"
def extract_oxi_state(species_string):
"""
从 species_string"Li+:0.645")提取氧化态
Args:
species_string: 包含元素和氧化态的字符串(如 "Li+:0.645""Fe2-:0.5"
Returns:
int: 氧化态数值(如 "+" -> 1, "2-" -> -2默认返回 0 如果解析失败
"""
# 使用正则表达式匹配化学符号和氧化态(如 Li+, Fe2-
match = re.search(r"([A-Za-z]+)([+-]?\d*[+-])", species_string)
if not match:
return 0 # 默认中性
# 提取氧化态部分(如 "+", "2-", "3+"
oxi_str = match.group(2)
# 处理单符号情况(如 "+" -> 1, "-" -> -1
if oxi_str in ("+", "-"):
return 1 if oxi_str == "+" else -1
# 处理多数字情况(如 "2+" -> 2, "3-" -> -3
try:
num = int(oxi_str[:-1]) # 提取数字部分
sign = 1 if oxi_str[-1] == "+" else -1
return num * sign
except (ValueError, IndexError):
return 0 # 解析失败时返回中性
def process_cif_file(cif_file_path,explict_element):
structure = Structure.from_file(cif_file_path)
result_list = []
for index, site in enumerate(structure.sites, start=1):
occupancy = site.species.element_composition.num_atoms
species = site.species.chemical_system
if species in explict_element:
break
if occupancy < 1.0:
entry = next((r for r in result_list if
r["species"] == species and r["occupancy"] == occupancy), None)
if entry:
entry["number"].append(index)
else:
result_list.append({
"species": species,
"number": [index],
"occupancy": occupancy
})
return result_list
def factorize_to_three_factors(n,type_sym=None):
factors = []
# 遍历可能的x值
if type_sym == None:
for x in range(1, n + 1):
if n % x == 0:
remaining_n = n // x
# 遍历可能的y值
for y in range(1, remaining_n + 1):
if remaining_n % y == 0:
z = remaining_n // y
factors.append({'x': x, 'y': y, 'z': z})
if type_sym == "xyz":
for x in range(1, n + 1):
if n % x == 0:
remaining_n = n // x
# 遍历可能的y值
for y in range(1, remaining_n + 1):
if remaining_n % y == 0 and y <= x:
z = remaining_n // y
if z <= y:
factors.append({'x': x, 'y': y, 'z': z})
def sum_score(factor):
x, y, z = factor['x'], factor['y'], factor['z']
return x + y + z
# 按照sum_score从小到大排序
sorted_factors = sorted(factors, key=sum_score)
return sorted_factors
def calculate_supercell_factor(occupancy):
# 将浮点数转换为分数形式
fraction = Fraction(occupancy).limit_denominator()
# 获取分子和分母
numerator = fraction.numerator
denominator = fraction.denominator
return numerator,denominator
def mark_atoms_randomly(factors, atom_number):
"""
根据扩胞因子和占据数量生成随机占据字典
Args:
factors: 扩胞因子字典 {'x': int, 'y': int, 'z': int}
atom_number: 需要占据的副本数量
Returns:
字典 {0: 1或0, 1: 1或0, ..., total_copies-1: 1或0}
"""
x, y, z = factors['x'], factors['y'], factors['z']
total_copies = x * y * z
if atom_number > total_copies:
raise ValueError(f"atom_number ({atom_number}) 不能超过扩胞总数 (x*y*z = {total_copies})")
# 生成所有副本索引 [0, 1, 2, ..., total_copies-1]
atom_dice = list(range(total_copies))
# 随机选择 atom_number 个副本占据
selected_atoms = random.sample(atom_dice, atom_number)
# 创建结果字典 {0: 1或0, 1: 1或0, ...}
result = {atom: 1 if atom in selected_atoms else 0 for atom in atom_dice}
return result
def generate_random_list(total_elements, atom_number):
# 确保 atom_number 不超过 total_elements
if atom_number > total_elements:
raise ValueError("atom_number cannot be greater than the total number of elements (x * y * z)")
# 创建一个全0的列表
result = [0] * total_elements
# 随机选择 atom_number 个位置并将这些位置的值设为1
indices = random.sample(range(total_elements), atom_number)
for index in indices:
result[index] = 1
return result
def merge_structures(struct_copies, factors):
"""
将多个副本结构按三维顺序合并为扩胞后的结构
Args:
struct_copies: 副本结构列表(长度 = x*y*z
factors: 扩胞因子字典 {"x": int, "y": int, "z": int}
Returns:
合并后的扩胞结构
"""
x, y, z = factors["x"], factors["y"], factors["z"]
total_copies = x * y * z
if len(struct_copies) != total_copies:
raise ValueError("副本数量与扩胞因子不匹配")
# 获取原结构的晶格
original_lattice = struct_copies[0].lattice
# 创建扩胞后的新晶格(直接按倍数缩放)
new_lattice_matrix = np.dot(original_lattice.matrix, np.diag([x, y, z]))
new_lattice = Lattice(new_lattice_matrix)
# 初始化合并后的结构
merged_structure = Structure(
lattice=new_lattice,
species=[],
coords=[],
coords_are_cartesian=False
)
# 按三维顺序填充每个副本的原子
for copy_idx in range(total_copies):
# 计算当前副本的分数坐标偏移量
offset = np.array([
copy_idx // (y * z), # x方向偏移
(copy_idx % (y * z)) // z, # y方向偏移
copy_idx % z # z方向偏移
])
# 将当前副本的原子添加到合并结构中(考虑偏移)
for site in struct_copies[copy_idx]:
if site.species: # 跳过空位
merged_structure.append(
species=site.species,
coords=site.frac_coords + offset,
coords_are_cartesian=False,
properties=site.properties
)
return merged_structure
def expand_structure(structure, factors, atom_indices, atom_number):
# 参数检查
x, y, z = factors['x'], factors['y'], factors['z']
total_copies = x * y * z
if not all(1 <= idx <= len(structure.sites) for idx in atom_indices):
raise ValueError("atom_indices包含无效原子索引")
# 生成独立副本
struct_copies = [structure.copy() for _ in range(total_copies)]
atom_dice = list(range(total_copies)) # 所有副本索引 [0,1,2,...]
# 处理每个目标原子
for atom_idx in atom_indices:
original_site = structure.sites[atom_idx - 1]
element = original_site.species.chemical_system
# 生成当前原子的占据字典(如{0:1, 1:0, 2:1,...}
occupancy_dict = mark_atoms_randomly(factors,atom_number)
# 修改每个副本
for copy_idx, occupy in occupancy_dict.items():
# 或者方法2使用remove/insert
struct_copies[copy_idx].remove_sites([atom_idx - 1])
oxi_state = extract_oxi_state(original_site.species_string)
if occupy:
new_site = PeriodicSite(
species=Species(element, oxi_state),
coords=original_site.frac_coords,
lattice=struct_copies[copy_idx].lattice,
to_unit_cell=True,
label=original_site.label
)
struct_copies[copy_idx].sites.insert(atom_idx - 1, new_site)
else:
species_dict = {Species(element, oxi_state): 0.0}
new_site = PeriodicSite(
species = species_dict,
coords=original_site.frac_coords,
lattice=struct_copies[copy_idx].lattice,
to_unit_cell=True,
label=original_site.label
)
struct_copies[copy_idx].sites.insert(atom_idx - 1, new_site)
# 合并副本
expanded_structure = Structure(
lattice=np.dot(structure.lattice.matrix, np.diag([x, y, z])),
species=[],
coords=[],
coords_are_cartesian=False
)
for copy_idx in range(total_copies):
offset = np.array([
copy_idx // (y * z),
(copy_idx % (y * z)) // z,
copy_idx % z
])
for site in struct_copies[copy_idx]:
if site.species: # 只添加非空位
expanded_structure.append(
species=site.species,
coords=site.frac_coords + offset,
coords_are_cartesian=False,
properties=site.properties
)
expanded_structure = merge_structures(struct_copies,factors)
return expanded_structure
def process_occupation(input_file,output_file,explict_element = ["Li"],expect_cifnumber = 10,random_time=1):
struct = Structure.from_file(input_file)
space_group_info = struct.get_space_group_info()
space_group_symbol = space_group_info[0]
all_spacegroup_symbols = [spglib.get_spacegroup_type(i) for i in range(1, 531)]
symbol = all_spacegroup_symbols[0]
for symbol_i in all_spacegroup_symbols:
if space_group_symbol == symbol_i.international_short:
symbol = symbol_i
space_type = typejudge(symbol.number)
print(f"当前空间群符号为{space_group_symbol},序号为{symbol.number},对应的晶体体系为{space_type}")
occupation_list = process_cif_file(cif_file_path=input_file,explict_element=explict_element)
print(occupation_list)
for occupation in occupation_list:
atom_number, target_multiplier=calculate_supercell_factor(occupation["occupancy"])
divides = []
if space_type == "Hexagonal":
print('当前为六方晶系,暂不处理')
if space_type == "Cubic":
print("当前为立方晶体,三个方向同步")
divides = factorize_to_three_factors(target_multiplier,"xyz")
else:
print("为其他晶系,假设三个方向不同")
divides = factorize_to_three_factors(target_multiplier,)
print(divides)
for it in divides:
end_str = f'x{it["x"]}y{it["y"]}z{it["z"]}'
for i in range(random_time):
expand_struct=expand_structure(struct,it,occupation["number"],atom_number)
expand_struct.to_file(output_file)
print(it)
else:
print(f"不存在除{explict_element}以外的共占位原子")
process_occupation("../data/input_pre/ICSD_1234.cif", "haha.cif", explict_element=[], expect_cifnumber=1, random_time=1)

44
py/call_analyze.py Normal file
View File

@@ -0,0 +1,44 @@
# -*- coding: utf-8 -*-
import argparse
import subprocess
def run_analysis_with_subprocess(cif_file, input_file, output_file, filters=None):
# 如果没有传递 filters则使用默认值
if filters is None:
filters = ["Ordered", "PropOxi", "VoroPerco", "Coulomb", "VoroBV", "VoroInfo", "MergeSite"]
# 构建命令行参数
command = ['python', '../tool/analyze_voronoi_nodes.py', cif_file, '-i', input_file, '-o', output_file, '-f'] + filters
# 调用 subprocess 执行命令
process = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
# 捕获标准输出和标准错误
stdout, stderr = process.communicate()
# Python 2.7 需要解码 stdout 和 stderr因为是 str 类型)
stdout = stdout.decode('utf-8') if isinstance(stdout, str) else stdout
stderr = stderr.decode('utf-8') if isinstance(stderr, str) else stderr
# 打印输出内容或记录到文件
print(stdout)
if stderr:
print(stderr)
if __name__ == "__main__":
# 设置命令行参数解析器
parser = argparse.ArgumentParser(description='Run Voronoi analysis using analyze.py script.')
parser.add_argument('cif_file', type=str, help='CIF file to analyze')
parser.add_argument('-i', '--input_file', type=str, help='Input YAML file', required=True)
parser.add_argument('-o', '--output_file', type=str, help='Output file to save the results', required=True)
parser.add_argument('-f', '--filters', nargs='+',
default=["Ordered", "PropOxi", "VoroPerco", "Coulomb", "VoroBV", "VoroInfo", "MergeSite"],
help='List of filters to apply (default is all filters)')
# 解析命令行参数
args = parser.parse_args()
# 调用分析函数
run_analysis_with_subprocess(args.cif_file, args.input_file, args.output_file, args.filters)

363
py/crystal_2.py Normal file
View File

@@ -0,0 +1,363 @@
from pymatgen.core import Structure
from pymatgen.core.periodic_table import Element, Specie
from pymatgen.analysis.local_env import CrystalNN
from pymatgen.analysis.structure_matcher import StructureMatcher
from pymatgen.io.cif import CifParser
import numpy as np
class crystal:
def __init__(self, file_path, element_positive='Na', mixed_anions=None):
# self.parse = CifParser(file_path)
# self.structure = self.parse.get_structures()[0]
if mixed_anions is None:
mixed_anions = {frozenset({'S', 'O'}), frozenset({'Cl', 'Br'}),frozenset({'Cl', 'O'}),frozenset({'Cl', 'Br'}),frozenset({'S', 'Cl'})}
self.structure = Structure.from_file(file_path)
self.file_path = file_path
self.element_positive = element_positive
self.check_all = False
self.check_basic_result = False
self.check_high_cn_and_face_sharing_result = False
self.check_percolation_radius_result = False
self.check_practical_result = False
self.anion = ""
self.anions = ""
self.mixed_anions = mixed_anions
#self.initialize()
def initialize(self):
print("e")
# self.check_basic_result=self.check_basic()
# self.check_high_cn_and_face_sharing_result = self.check_high_cn_and_face_sharing()
# self.check_percolation_radius_result = self.check_percolation_radius()
# self.check_all = self.check_basic_result and self.check_high_cn_and_face_sharing_result and self.check_percolation_radius_result
# print(f"{self.file_path}done")
def check_practical(self):
structure = self.structure
# 检查是否为Li-X-O,Li-P-S
excluded_X_elements = {'S', 'I', 'Si', 'C', 'P', 'Al', 'Ge', 'Se', 'B', 'Cl'}
chemical_system_set = structure.chemical_system_set
try:
if len(chemical_system_set) == 3:
if "Li" in chemical_system_set and "O" in chemical_system_set:
for element in excluded_X_elements:
if element in chemical_system_set:
return False
if "Li" in chemical_system_set and "P" in chemical_system_set and "S" in chemical_system_set:
return False
except Exception as e:
print(f"Error during Li-X-O check: {e}")
return False
# 排除过渡金属元素
excluded_transition_metals = {'Fe', 'Mn', 'Ni', 'Ti', 'Mo', 'V', 'Co'}
try:
if "Li" in chemical_system_set and "O" in chemical_system_set:
for element in excluded_transition_metals:
if element in chemical_system_set:
return False
except Exception as e:
print(f"Error during transition metal check: {e}")
return False
# 检查是否包含N, Re, Ho, Hf, Ru, Eu, Lu
excluded_elements = {'N', 'Re', 'Ho', 'Hf', 'Ru', 'Lu'}
try:
for element in excluded_elements:
if element in chemical_system_set:
return False
except Exception as e:
print(f"Error during excluded elements check: {e}")
return False
# 检查是否共享位点
try:
for site in structure.sites:
if 'Li' in site.species_string and len(site.species) > 1:
return False
except Exception as e:
print(f"Error during site sharing check: {e}")
return False
self.check_practical_result = True
return True
def check_basic(self):
structure = self.structure
#判断是否为二元化合物
if len(structure.types_of_specie) == 2:
return False
#判断阴离子是否为多种阴离子
# anions = {'O', 'S', 'Se', 'Te', 'F', 'Cl', 'Br', 'I'}
anions = {'O', 'S','Br','Cl'}
try:
for site in self.structure.sites:
try:
#if site.specie.symbol in anions:
if site.species.chemical_system in anions:
self.anion = site.specie.symbol
break
except AttributeError as e:
a=1
try:
if site.species.chemical_system in anions:
self.anion = site.specie.symbol
break
except AttributeError as e:
print(e)
if self.anion in anions:
a=1
else:
if not self.mixed_anions:
print("不是所选阴离子化合物")
return False
except Exception as e:
print(e)
return False
#这里添加对多种阴离子的支持
try:
# 创建一个集合来收集所有发现的阴离子
found_anions = set()
# 遍历structure以收集所有阴离子
for site in self.structure.sites:
try:
if site.species.chemical_system in anions:
found_anions.add(site.specie.symbol)
except AttributeError:
try:
if site.specie.symbol in anions:
found_anions.add(site.specie.symbol)
except AttributeError:
continue
# 检查找到的阴离子情况
if len(found_anions) == 0:
print("未找到任何预定义的阴离子")
return False
elif len(found_anions) == 1:
# 只有一种阴离子
self.anion = list(found_anions)[0]
print(f"发现单一阴离子: {self.anion}")
else:
# 有多种阴离子,检查是否匹配预定义的混合阴离子组合
found_anions_frozen = frozenset(found_anions)
if found_anions_frozen in self.mixed_anions:
self.anions = found_anions
self.anion = "+".join(sorted(found_anions)) # 例如: "Cl+S"
print(f"发现匹配的混合阴离子组合: {self.anion}")
else:
# 如果找到的阴离子组合不在预定义列表中
print(f"发现的阴离子组合 {found_anions} 不在预定义的混合阴离子列表中")
return False
except Exception as e:
print(f"处理阴离子时出错: {e}")
return False
#这里还要调试
# try:
# # 初始化总电荷
# total_charge = 0
#
# # 检查是否所有元素都有氧化态
# for site in structure:
# try:
# oxi_state = site.species.charge # 检查是否有氧化态
# total_charge += oxi_state # 累加氧化态
# except AttributeError:
# print(f"元素 {site.specie.symbol} 缺少氧化态定义")
# return False
# # 检查是否电荷平衡
# if total_charge == 0:
# print("所有元素的价态之和为 0结构电荷平衡")
# else:
# print(f"所有元素的价态之和为 {total_charge},结构不平衡")
# return False
# except Exception as e:
# print(f"发生错误: {e}")
# return False
#判断原子个数
try:
if not self.mixed_anions:
if structure.num_sites>300:
return False
else:
if structure.num_sites>900:
return False
except Exception:
print("原子个数判断失败")
return False
#判断有几个阴离子
# anions = {'O', 'S', 'Se', 'Te', 'F', 'Cl', 'Br', 'I'}
# try:
# anion_elements = {site.species.chemical_system for site in structure if site.species.chemical_system in anions}
# if len(anion_elements) > 1:
# return False
# except Exception:
# print("阴离子个数判断失败")
# return False
#判断是否有放射性元素
radioactive_elements = {'U', 'Th', 'Pu', 'Ra', 'Rn', 'Po', 'Np', 'Am', 'Cm', 'Bk', 'Cf', 'Es', 'Fm', 'Md', 'No',
'Lr'}
try:
# 遍历结构中的元素
for site in structure:
if site.species.chemical_system in radioactive_elements:
return False # 存在放射性元素
except Exception:
print("放射性元素判断失败")
return False
#判断是否存在共占位
try:
for site in structure.sites:
if self.element_positive in [specie.symbol for specie in site.species.keys()] and len(site.species) > 1:
return False
except Exception:
print("共占位判断失败")
return False
#判读是否有无序或部分占位的阴离子
try:
for site in structure.sites:
for specie, occupancy in site.species.items():
if specie.symbol in anions and occupancy < 1:
return False
except Exception:
print("无序或部分占位的阴离子判断失败")
return False
#判断是否有水分子
try:
oxygen_sites = [site for site in structure.sites if site.species.chemical_system == "O"]
hydrogen_sites = [site for site in structure.sites if site.species.chemical_system == "H"]
for o_site in oxygen_sites:
nearby_hydrogens = [h_site for h_site in hydrogen_sites if o_site.distance(h_site) < 1.2]
if len(nearby_hydrogens) == 2:
return False
except Exception:
print("水分子判断失败")
return False
#接下来判断是否有标准信息
try:
for site in structure.sites:
for specie in site.species.keys():
element = Element(specie.symbol)
if not element.common_oxidation_states:
return False
try:
_ = Specie(element.symbol, max(element.common_oxidation_states)).ionic_radius
except:
return False
except Exception:
print("标准信息判断失败")
return False
#暂时不判断是否为电中性
#可能需要通过ovito等库来做判断
#存在一些文件不提供各元素的电负性
#判断电中性是否存在
self.check_basic_result = True
return True
def check_high_cn_and_face_sharing(self,cut_distance = 3.1):
structure = self.structure
#基于固角权重的计算
nn_finder = CrystalNN()
#遍历结构中的所有Na位点检查配位数
#是所有Na位点都需要还是只检查高配位数的位点
high_cn_ep_sites = []
try:
for i,site in enumerate(structure):
if site.specie == Element(self.element_positive):
cn = nn_finder.get_cn(structure,i)
if cn>=5:
high_cn_ep_sites.append(i)
if len(high_cn_ep_sites)==0:
return False
except Exception:
print("高配位Na离子判断失败")
return False
#检查共面
try:
for i in high_cn_ep_sites:
neighbors = nn_finder.get_nn_info(structure,i)
x_neighbors = []
x_neighbors = [
neighbor["site_index"]
for neighbor in neighbors
if structure[neighbor["site_index"]].specie.symbol == self.element_positive
and neighbor["site"].distance(structure[i]) <= cut_distance
]
if not self._check_face_sharing(i, x_neighbors):
print(f"Na site {i} does not share a face with other high-CN Na sites.")
return False
print("All high-CN Na sites are face-sharing.")
except Exception:
print("共面判断失败")
return True
def _check_face_sharing(self,site_index,neighbor_indices):
# 获取当前 Na 位点的坐标
site_coords = self.structure[site_index].coords
# 遍历邻居
for neighbor_index in neighbor_indices:
# 获取邻居的坐标
neighbor_coords = self.structure[neighbor_index].coords
# 获取两个原子之间共享的面(使用简单的距离或角度计算)
# 假设共享面的法向量计算可以从 Voronoi 构造
shared_face_normal = self._calculate_face_normal(site_coords, neighbor_coords)
# 判断是否共面(如果法向量的绝对值接近 0可以认为共面
if shared_face_normal is not None:
return True
return False
def _calculate_face_normal(self, coords1, coords2):
# 示例计算:用两个原子之间的向量生成法向量
vector = coords2 - coords1
norm = np.linalg.norm(vector)
# 如果向量接近零,返回 None
if norm < 1e-6:
return None
# 正则化向量作为法向量
return vector / norm
def check_percolation_radius(self):
return True
def group_structures_by_framework(structures):
matcher = StructureMatcher()
grouped_structures = []
for structure in structures:
matched = False
for group in grouped_structures:
if matcher.fit(structure, group[0]): # 比较结构是否匹配
group.append(structure)
matched = True
break
if not matched:
grouped_structures.append([structure])
return grouped_structures

499
py/expansion.py Normal file
View File

@@ -0,0 +1,499 @@
from distutils.dir_util import remove_tree
from pymatgen.core import Structure, Lattice,Species,PeriodicSite
import numpy as np
from collections import defaultdict
import math
import spglib
from functools import reduce
from fractions import Fraction
import random
import re
import os
def mark_atoms_randomly(numerator,denominator):
"""
根据扩胞因子和占据数量生成随机占据字典
Args:
factors: 扩胞因子字典 {'x': int, 'y': int, 'z': int}
atom_number: 需要占据的副本数量
Returns:
字典 {0: 1或0, 1: 1或0, ..., total_copies-1: 1或0}
"""
if numerator > denominator:
raise ValueError(f"atom_number ({numerator}) 不能超过扩胞总数 (x*y*z = {denominator})")
# 生成所有副本索引 [0, 1, 2, ..., total_copies-1]
atom_dice = list(range(denominator))
# 随机选择 atom_number 个副本占据
selected_atoms = random.sample(atom_dice, numerator)
# 创建结果字典 {0: 1或0, 1: 1或0, ...}
result = {atom: 1 if atom in selected_atoms else 0 for atom in atom_dice}
return result
def extract_oxi_state(species_str,element):
"""
从物种字符串中提取指定元素的氧化态
参数:
species_str: 物种字符串,如 "Li+:0.689, Sc3+:0.311"
element: 要提取的元素符号,如 "Sc"
返回:
int: 氧化态数值(如 Sc3+ → 3Sc- → -1Sc3- → -3
如果未找到或没有氧化态则返回 0
"""
# 分割字符串获取各个物种部分
species_parts = [part.strip() for part in species_str.split(",") if part.strip()]
for part in species_parts:
# 提取元素和电荷部分(冒号前的内容)
element_with_charge = part.split(":")[0].strip()
# 检查是否匹配目标元素
if element in element_with_charge:
# 提取电荷部分
charge_part = element_with_charge[len(element):]
# 处理无数字情况(如"Sc+"
if not any(c.isdigit() for c in charge_part):
if "+" in charge_part:
return 1
elif "-" in charge_part:
return -1
else:
return 0
# 处理有数字情况(如"Sc3+"
sign = 1
if "-" in charge_part:
sign = -1
# 提取数字部分
digits = ""
for c in charge_part:
if c.isdigit():
digits += c
if digits: # 确保有提取到数字
return sign * int(digits)
return 0 # 默认返回0
def factorize_to_three_factors(n,type_sym=None,keep_module=None):
factors = []
# 遍历可能的x值
if type_sym == None:
for x in range(1, n + 1):
if n % x == 0:
remaining_n = n // x
# 遍历可能的y值
for y in range(1, remaining_n + 1):
if remaining_n % y == 0:
z = remaining_n // y
factors.append({'x': x, 'y': y, 'z': z})
if type_sym == "xyz":
for x in range(1, n + 1):
if n % x == 0:
remaining_n = n // x
# 遍历可能的y值
for y in range(1, remaining_n + 1):
if remaining_n % y == 0 and y <= x:
z = remaining_n // y
if z <= y:
factors.append({'x': x, 'y': y, 'z': z})
if keep_module=='random':
import random
# 创建一个因子列表的副本,并随机打乱顺序
shuffled_factors = factors.copy()
random.shuffle(shuffled_factors)
return shuffled_factors
else:
def sort_key(item):
"""返回一个用于排序的元组"""
return (item['x'] + item['y'] + item['z'], item['z'], item['y'], item['x'])
# 使用 sorted() 函数(返回一个新的排序后的列表,不改变原列表)
sorted_factor = sorted(factors, key=sort_key)
return sorted_factor
def typejudge(number):
if number in [1, 2]:
return "Triclinic"
elif 3 <= number <= 15:
return "Monoclinic"
elif 16 <= number <= 74:
return "Orthorhombic"
elif 75 <= number <= 142:
return "Tetragonal"
elif 143 <= number <= 167:
return "Trigonal"
elif 168 <= number <= 194:
return "Hexagonal"
elif 195 <= number <= 230:
return "Cubic"
else:
return "Unknown"
def strategy_divide(struct,total,keep_module=None):
space_group_info = struct.get_space_group_info()
space_group_symbol = space_group_info[0]
all_spacegroup_symbols = [spglib.get_spacegroup_type(i) for i in range(1, 531)]
symbol = all_spacegroup_symbols[0]
for symbol_i in all_spacegroup_symbols:
if space_group_symbol == symbol_i.international_short:
symbol = symbol_i
space_type = typejudge(symbol.number)
print(f"当前空间群符号为{space_group_symbol},序号为{symbol.number},对应的晶体体系为{space_type}")
divides = []
if space_type == "Hexagonal":
print('当前为六方晶系,暂不处理')
if space_type == "Cubic":
print("当前为立方晶体,三个方向同步")
divides = factorize_to_three_factors(total, "xyz",keep_module=keep_module)
else:
print("为其他晶系,假设三个方向不同")
divides = factorize_to_three_factors(total,keep_module=keep_module)
return divides
def get_first_non_explicit_element(species_str, explict_element= ["Li","Li+"]):
"""
从物种字符串中获取第一个不在explict_element中的元素符号
参数:
species_str: 物种字符串,如 "Li+:0.689, Sc3+:0.311"
explict_element: 需要排除的元素列表,如 ["Li"]
返回:
str: 第一个符合条件的元素符号,如 "Sc"
如果没有找到则返回空字符串 ""
"""
if not species_str.strip():
return ""
# 分割字符串获取各个物种部分
species_parts = [part.strip() for part in species_str.split(",") if part.strip()]
for part in species_parts:
# 提取元素符号(去掉电荷和占据数部分)
element_with_charge = part.split(":")[0].strip()
# 提取纯元素符号(去掉数字和特殊符号)
pure_element = ''.join([c for c in element_with_charge if c.isalpha()])
if pure_element not in explict_element:
return pure_element
return ""
def calculate_expansion_factor(Occupation_list,calculate_type='high'):
"""
计算Occupation_list的扩大倍数支持不同精度模式
参数:
Occupation_list: List[Dict], 每个字典包含:
{
"occupation": float,
"atom_serial": List[int],
"numerator": None,
"denominator": None
}
calculate_type: str, 计算精度模式 ('high', 'normal', 'low')
- high: 精确分数(默认)
- normal: 分母≤100的最接近分数
- low: 分母≤10的最接近分数
返回:
int: 扩大倍数(所有分母的最小公倍数)
List[Dict]: 更新后的Occupation_list包含分子和分母
"""
if not Occupation_list:
return 1, []
# Step 1: 根据精度要求计算分数
for entry in Occupation_list:
occu = entry["occupation"]
if calculate_type == 'high':
# 高精度模式 - 使用精确分数
fraction = Fraction(occu).limit_denominator()
elif calculate_type == 'normal':
# 普通精度 - 分母≤100
fraction = Fraction(occu).limit_denominator(100)
elif calculate_type == 'low':
# 低精度 - 分母≤10
fraction = Fraction(occu).limit_denominator(10)
elif calculate_type == 'very low':
# 低精度 - 分母≤10
fraction = Fraction(occu).limit_denominator(5)
else:
raise ValueError("calculate_type必须是'high', 'normal''low'")
entry["numerator"] = fraction.numerator
entry["denominator"] = fraction.denominator
# Step 2: 计算所有分母的最小公倍数
denominators = [entry["denominator"] for entry in Occupation_list]
lcm = reduce(lambda a, b: a * b // math.gcd(a, b), denominators, 1)
# Step 3: 统一分母
for entry in Occupation_list:
denominator = entry["denominator"]
entry["numerator"] = entry["numerator"] * (lcm // denominator)
entry["denominator"] = lcm
return lcm, Occupation_list
def get_occu(s_str,explict_element):
'''
这里暂时不考虑无化合价的情况
Args:
s_str:
Returns:
'''
if not s_str.strip():
return {}
pattern = r'([A-Za-z0-9+-]+):([0-9.]+)'
matches = re.findall(pattern, s_str)
result = {}
for species, occu in matches:
try:
if species not in explict_element:
return occu
except ValueError:
continue # 忽略无效数字
return 1
def process_cif_file(struct, explict_element=["Li", "Li+"]):
"""
统计结构中各原子的occupation情况忽略occupation=1.0的原子)并分类
参数:
struct: Structure对象 (从CIF文件读取)
返回:
List[Dict]: Occupation_list每个字典格式为:
{
"occupation": list, # 占据值不为1.0
"atom_serial": List[int], # 原子序号列表
"numerator": None, # 预留分子
"denominator": None # 预留分母
"split":list[string]#对应的值
}
"""
if not isinstance(struct, Structure):
raise TypeError("输入必须为pymatgen的Structure对象")
occupation_dict = defaultdict(list)
# 用于记录每个occupation对应的元素列表
split_dict = {}
for i, site in enumerate(struct):
# 获取当前原子的occupation默认为1.0
occu = get_occu(site.species_string, explict_element)
# 忽略occupation=1.0的原子
if occu != 1.0:
if site.species.chemical_system not in explict_element:
occupation_dict[occu].append(i + 1) # 原子序号从1开始计数
# 提取元素名称列表
elements = []
if ':' in site.species_string:
# 格式如 'S:0.494, Cl:0.506' 或 'S2-:0.494, Cl-:0.506'
parts = site.species_string.split(',')
for part in parts:
# 提取冒号前的部分并去除前后空格
element_with_valence = part.strip().split(':')[0].strip()
# 从带有价态的元素符号中提取纯元素符号(只保留元素符号部分)
# 元素符号通常是一个大写字母,可能后跟一个小写字母
import re
element_match = re.match(r'([A-Z][a-z]?)', element_with_valence)
if element_match:
element = element_match.group(1)
elements.append(element)
else:
# 只有一个元素,也需要处理可能的价态
import re
element_match = re.match(r'([A-Z][a-z]?)', site.species_string)
if element_match:
elements = [element_match.group(1)]
# 存储该occupation对应的元素列表
split_dict[occu] = elements
# 转换为要求的输出格式
Occupation_list = [
{
"occupation": occu,
"atom_serial": serials,
"numerator": None,
"denominator": None,
"split": split_dict.get(occu, []) # 添加split字段
}
for occu, serials in occupation_dict.items()
]
return Occupation_list
def merge_structures(structure_list, merge_dict):
"""
按指定方向合并多个结构
参数:
structure_list: List[Structure], 待合并的结构列表(所有结构必须具有相同的晶格)
merge_dict: Dict[str, int], 指定各方向的合并次数(如 {"x":1, "y":1, "z":2}
返回:
Structure: 合并后的新结构
"""
if not structure_list:
raise ValueError("结构列表不能为空")
# 检查所有结构是否具有相同的晶格
ref_lattice = structure_list[0].lattice
for s in structure_list[1:]:
if not np.allclose(s.lattice.matrix, ref_lattice.matrix):
raise ValueError("所有结构的晶格必须相同")
# 计算总合并次数
total_merge = merge_dict.get("x", 1) * merge_dict.get("y", 1) * merge_dict.get("z", 1)
if len(structure_list) != total_merge:
raise ValueError(f"结构数量({len(structure_list)})与合并次数({total_merge})不匹配")
# 获取参考结构的晶格参数
a, b, c = ref_lattice.abc
alpha, beta, gamma = ref_lattice.angles
# 计算新晶格尺寸
new_a = a * merge_dict.get("x", 1)
new_b = b * merge_dict.get("y", 1)
new_c = c * merge_dict.get("z", 1)
new_lattice = Lattice.from_parameters(new_a, new_b, new_c, alpha, beta, gamma)
# 合并所有原子
all_sites = []
for i, structure in enumerate(structure_list):
# 计算当前结构在合并后的偏移量
x_offset = (i // (merge_dict.get("y", 1) * merge_dict.get("z", 1))) % merge_dict.get("x", 1)
y_offset = (i // merge_dict.get("z", 1)) % merge_dict.get("y", 1)
z_offset = i % merge_dict.get("z", 1)
# 对每个原子应用偏移
for site in structure:
coords = site.frac_coords.copy()
coords[0] = (coords[0] + x_offset) / merge_dict.get("x", 1)
coords[1] = (coords[1] + y_offset) / merge_dict.get("y", 1)
coords[2] = (coords[2] + z_offset) / merge_dict.get("z", 1)
all_sites.append({"species": site.species, "coords": coords})
# 创建新结构
return Structure(new_lattice, [site["species"] for site in all_sites], [site["coords"] for site in all_sites])
def generate_structure_list(base_structure,occupation_list,explict_element=["Li","Li+"]):
if not occupation_list:
return [base_structure.copy()]
lcm = occupation_list[0]["denominator"]
structure_list = [base_structure.copy() for _ in range(lcm)]
for entry in occupation_list:
numerator = entry["numerator"]
denominator = entry["denominator"]
atom_indices = entry["atom_serial"] # 注意原子序号从1开始
for atom_idx in atom_indices:
occupancy_dict = mark_atoms_randomly(numerator=numerator,denominator=denominator)
original_site = base_structure.sites[atom_idx - 1]
element = get_first_non_explicit_element(original_site.species_string,explict_element)
for copy_idx ,occupy in occupancy_dict.items():
structure_list[copy_idx].remove_sites([atom_idx-1])
oxi_state= extract_oxi_state(original_site.species_string,element)
if len(entry["split"])==1:
if occupy:
new_site = PeriodicSite(
species=Species(element, oxi_state),
coords=original_site.frac_coords,
lattice=structure_list[copy_idx].lattice,
to_unit_cell=True,
label=original_site.label
)
structure_list[copy_idx].sites.insert(atom_idx - 1, new_site)
else:
species_dict = {Species("Li", 1.0):0.0}
new_site = PeriodicSite(
species = species_dict,
coords=original_site.frac_coords,
lattice=structure_list[copy_idx].lattice,
to_unit_cell=True,
label=original_site.label
)
structure_list[copy_idx].sites.insert(atom_idx - 1, new_site)
else:
if occupy:
new_site = PeriodicSite(
species=Species(element, oxi_state),
coords=original_site.frac_coords,
lattice=structure_list[copy_idx].lattice,
to_unit_cell=True,
label=original_site.label
)
structure_list[copy_idx].sites.insert(atom_idx - 1, new_site)
else:
new_site = PeriodicSite(
species=Species(entry['split'][1], oxi_state),
coords=original_site.frac_coords,
lattice=structure_list[copy_idx].lattice,
to_unit_cell=True,
label=original_site.label
)
structure_list[copy_idx].sites.insert(atom_idx - 1, new_site)
return structure_list
def expansion(input_file,output_folder,keep_number,calculate_type='high',keep_module=None):
structure_origin = Structure.from_file(input_file)
lmp,oc_list = calculate_expansion_factor(process_cif_file(structure_origin),calculate_type=calculate_type)
strategy = strategy_divide(structure_origin,lmp,keep_module)
st_list = generate_structure_list(structure_origin,oc_list)
# 获取基础文件名(不含路径和扩展名)
base_name = os.path.splitext(os.path.basename(input_file))[0]
mergeds=[]
names=[]
if len(strategy)< keep_number:
keep_number = len(strategy)
for index in range(keep_number):
merged = merge_structures(st_list, strategy[index])
suffix = "x{}y{}z{}".format(
strategy[index]["x"],
strategy[index]["y"],
strategy[index]["z"]
)
output_filename=''
if keep_module=='classify':
print(f"{base_name}采用扩展方式为{suffix}")
output_filename=f"{base_name}.cif"
elif keep_module=='random':
print(f"{base_name}采用扩展方式为{suffix}")
output_filename=f"{base_name}-{suffix}.cif"
else:
output_filename = f"{base_name}-{suffix}.cif"
output_path = os.path.join(output_folder, output_filename)
merged.to(filename=output_path, fmt="cif")
print(f"Saved: {output_path}")
if keep_module=='classify':
return merged
if keep_module=='random':
mergeds.append(merged)
names.append(output_filename)
return mergeds,names
if __name__ == "__main__":
#expansion("../data/tmp/36.cif","../data/tmp",1,calculate_type='low')
expansion("../data/input_ClBr_set/36.cif", "../data/tmp", 3, calculate_type='low',keep_module='random')
#expansion("../data/input/1234.cif", "../data/input/output", 1, calculate_type='low',keep_module='classify')
# s1 = Structure.from_file("../data/input_pre/mp-6783.cif")
# s2 = Structure.from_file("../data/input_pre/ICSD_1234.cif")
# print(process_cif_file(s2))
# lmp,oc_list=calculate_expansion_factor(process_cif_file(s2))
# print(oc_list)
# strategy = strategy_divide(s2,lmp)
# print(strategy)
# st_list=generate_structure_list(s2,oc_list)
# merged = merge_structures(st_list,strategy[0])
# # merged = merge_structures([s1, s2], {"x": 1, "y": 1, "z": 2})
# merged.to("merged.cif", "cif") # 保存合并后的结构

156
py/make_sh.py Normal file
View File

@@ -0,0 +1,156 @@
import os
def creat_sh(input_folder, anion, sh_file_path='analyze.sh'):
"""
创建shell脚本只处理两类CIF文件
1. 纯数字命名的CIF文件 (例如: 123.cif)
2. 数字-坐标格式的CIF文件 (例如: 123-x1y2z3.cif)
参数:
input_folder: 输入文件夹路径
anion: 阴离子类型
sh_file_path: 生成的shell脚本路径
"""
# 文件夹路径
folder_path = input_folder
import re
# 定义两种文件名模式的正则表达式
pattern1 = re.compile(r'^\d+\.cif$') # 纯数字.cif
pattern2 = re.compile(r'^\d+-x\d+y\d+z\d+\.cif$') # 数字-x数字y数字z数字.cif
# 打开SH脚本文件用于写入
with open(sh_file_path, 'w') as sh_file:
# 写入脚本头部
sh_file.write('#!/bin/bash\n')
# 遍历文件夹中的所有文件
for filename in os.listdir(folder_path):
file_path = os.path.join(folder_path, filename)
# 只处理文件(不处理文件夹)
if os.path.isfile(file_path):
# 检查文件名是否匹配两种模式之一
if pattern1.match(filename) or pattern2.match(filename):
# 生成对应的命令
command = f"python ../../../tool/analyze_voronoi_nodes.py {filename} -i ../../../tool/{anion}.yaml > {filename}.txt\n"
# 将命令写入SH脚本文件
sh_file.write(command)
print(f"SH脚本已生成{sh_file_path}")
import os
def create_sh_recursive(base_folder, tool_path="tool", relative_depth=2):
"""
递归遍历文件夹,为每个包含.cif文件的文件夹生成analyze.sh脚本
并在基础文件夹下创建一个sh_all.sh来执行所有脚本。
参数:
base_folder: 起始文件夹路径
tool_path: 工具目录的基本路径
relative_depth: 基础相对深度,用于计算正确的相对路径
"""
# 用于收集所有生成的analyze.sh脚本的相对路径
analyze_sh_paths = []
base_folder_name = os.path.basename(base_folder)
def process_folder(folder_path, current_depth=0):
print(f"处理文件夹: {folder_path}")
# 获取当前文件夹名称
folder_name = os.path.basename(folder_path)
# 检查当前文件夹是否包含.cif文件
has_cif_files = any(
f.endswith('.cif') for f in os.listdir(folder_path) if os.path.isfile(os.path.join(folder_path, f)))
# 如果当前文件夹包含.cif文件生成脚本
if has_cif_files:
# 计算正确的工具路径(根据深度增加../
dots = "../" * (relative_depth + current_depth)
tool_relative_path = f"{dots}{tool_path}"
# 确定anion参数使用文件夹名
anion = folder_name
# 生成脚本文件路径
sh_file_path = os.path.join(folder_path, "analyze.sh")
# 创建脚本
with open(sh_file_path, 'w') as sh_file:
sh_file.write('#!/bin/bash\n')
for filename in os.listdir(folder_path):
file_path = os.path.join(folder_path, filename)
if os.path.isfile(file_path) and filename.endswith('.cif'):
command = f"python {tool_relative_path}/analyze_voronoi_nodes.py {filename} -i {tool_relative_path}/{anion}.yaml > {filename}.txt\n"
sh_file.write(command)
# 将此脚本添加到收集器中
# 计算相对于基础文件夹的路径
rel_path = os.path.relpath(folder_path, base_folder)
analyze_sh_paths.append(rel_path)
print(f"生成脚本: {sh_file_path} (工具路径: {tool_relative_path})")
# 获取子文件夹列表
subdirs = [d for d in os.listdir(folder_path) if os.path.isdir(os.path.join(folder_path, d))]
# 处理复合阴离子文件夹的特殊情况
if "+" in folder_name:
elements = folder_name.split("+")
for element in elements:
element_dir = os.path.join(folder_path, element)
# 如果对应元素的子文件夹不存在,创建它
if not os.path.exists(element_dir):
os.makedirs(element_dir)
print(f"创建子文件夹: {element_dir}")
# 确保这个子文件夹被包含在递归处理列表中
if element not in subdirs:
subdirs.append(element)
# 递归处理所有子文件夹
for subdir in subdirs:
subdir_path = os.path.join(folder_path, subdir)
process_folder(subdir_path, current_depth + 1)
# 开始递归处理
process_folder(base_folder)
# 创建sh_all.sh脚本
sh_all_path = os.path.join(base_folder, "sh_all.sh")
with open(sh_all_path, 'w') as sh_all:
sh_all.write('#!/bin/bash\n\n')
sh_all.write(f'# process all analyze.sh\n\n')
# 记录初始目录
sh_all.write('# remember current dir\n')
sh_all.write('INITIAL_DIR=$(pwd)\n\n')
# 为每个包含analyze.sh的目录添加命令
for path in analyze_sh_paths:
sh_all.write(f'echo "process {path}/analyze.sh"\n')
sh_all.write(f'cd "{path}"\n')
sh_all.write('bash analyze.sh\n')
sh_all.write(f'cd "$INITIAL_DIR"\n\n')
# 添加完成消息
sh_all.write('echo "done!"\n')
# 修改权限使脚本可执行
os.chmod(sh_all_path, 0o755)
print(f"生成总执行脚本: {sh_all_path}")
print("所有脚本生成完成!")
# 示例调用
# create_sh_recursive("../data/after_step1")
if __name__ == '__main__':
# creat_sh("../data/after_step1/O","O","../data/after_step1/O/analyze.sh")
# creat_sh("../data/after_step1/S","S","../data/after_step1/S/analyze.sh")
# creat_sh("../data/after_step1/Cl","Cl","../data/after_step1/Cl/analyze.sh")
# creat_sh("../data/after_step1/Br","Br","../data/after_step1/Br/analyze.sh")
create_sh_recursive("../data/after_step1")

131
py/pre_process.py Normal file
View File

@@ -0,0 +1,131 @@
import re
import os
from pymatgen.core.structure import Structure
from pymatgen.core.periodic_table import Element
import yaml
from pymatgen.core.periodic_table import Specie
from expansion import expansion
from expansion import process_cif_file
def generate_valence_yaml(output_yaml_path):
"""
Generate a YAML file containing the most common oxidation states for elements.
Parameters:
output_yaml_path (str): Path to save the generated YAML file.
"""
valences = {}
for element in Element:
common_oxidation_states = element.common_oxidation_states
if common_oxidation_states:
# Metals/metalloids: take the maximum oxidation state
# Non-metals: take the minimum (most negative) oxidation state
if element.is_metalloid or element.is_metal:
valences[element.symbol] = max(common_oxidation_states)
else:
valences[element.symbol] = min(common_oxidation_states)
# Save the valences dictionary to a YAML file
with open(output_yaml_path, "w") as file:
yaml.dump(valences, file, default_flow_style=False)
def apply_oxidation_states_to_cif(input_cif_path, valence_yaml_path, output_cif_path,calculate_type='low',output_folder = None):
"""
Modify a CIF file to include oxidation states for each element based on a YAML file,
unless oxidation states are already present in the CIF.
"""
# Load the structure from the CIF file
structure = Structure.from_file(input_cif_path)
oxi = process_cif_file(structure)
# classsify类型
# if oxi:
# structure = expansion(input_cif_path,'../data/input_oxidation',3,calculate_type=calculate_type,keep_module='classify')
# # # 判断是否所有site都已经有oxidation state
# has_oxidation = all(
# all(isinstance(sp, Specie) for sp in site.species.keys())
# for site in structure.sites
# )
# if not has_oxidation:
# # 只有当没有价态时才读取yaml并赋值
# with open(valence_yaml_path, "r") as file:
# valences = yaml.safe_load(file)
# # Apply oxidation states to the structure
# structure.add_oxidation_state_by_element(valences)
#
# # Save the updated structure to a new CIF file
# structure.to(filename=output_cif_path)
structures=[]
names=[]
if oxi:
structures,names = expansion(input_cif_path,'../data/input_oxidation',3,calculate_type=calculate_type,keep_module='random')
# # 判断是否所有site都已经有oxidation state
for structure,name in zip(structures,names):
has_oxidation = all(
all(isinstance(sp, Specie) for sp in site.species.keys())
for site in structure.sites
)
if not has_oxidation:
# 只有当没有价态时才读取yaml并赋值
with open(valence_yaml_path, "r") as file:
valences = yaml.safe_load(file)
# Apply oxidation states to the structure
structure.add_oxidation_state_by_element(valences)
# Save the updated structure to a new CIF file
structure.to(filename=os.path.join(output_folder,name ))
else:
has_oxidation = all(
all(isinstance(sp, Specie) for sp in site.species.keys())
for site in structure.sites
)
if not has_oxidation:
# 只有当没有价态时才读取yaml并赋值
with open(valence_yaml_path, "r") as file:
valences = yaml.safe_load(file)
# Apply oxidation states to the structure
structure.add_oxidation_state_by_element(valences)
# Save the updated structure to a new CIF file
structure.to(filename=output_cif_path)
def data_add_state(input_folder, valence_yaml_path, output_folder,output_occupatition_folder=None,calculate_type='normal'):
if not os.path.exists(input_folder):
print(f"{input_folder} 文件夹不存在")
return
if not os.path.exists(output_folder):
os.makedirs(output_folder)
print(f"目录 {output_folder} 已创建")
for filename in os.listdir(input_folder):
if filename.endswith(".cif"): # 检查文件是否以.cif结尾
file_path = os.path.join(input_folder, filename)
# 提取文件名中的数字部分
match = re.search(r'\d+', filename)
if match:
new_filename = match.group(0) + ".cif" # 提取数字并加上 .cif 后缀
else:
print(f"文件名 {filename} 中未找到数字部分,跳过处理")
continue
# 构造输出文件路径
output_cif_path = os.path.join(output_folder, new_filename)
# 应用氧化态并保存新文件
print(f"正在处理{filename}")
try:
apply_oxidation_states_to_cif(file_path, valence_yaml_path, output_cif_path,calculate_type=calculate_type,output_folder=output_folder)
except Exception as e:
print(f"{filename}出现问题!")
print(f"{filename} 已完成")
if __name__ == "__main__":
# Example usage:
# Step 1: Generate the valence YAML file
valence_yaml = "../tool/valence_states.yaml"
# Step 2: Process CIF files in the input folder
data_add_state("../data/input_pre", valence_yaml, "../data/input","../data/input_oxidation")
# Step 3: Process Occupation
# data_process_Occupatiton("")

54
py/step1.py Normal file
View File

@@ -0,0 +1,54 @@
from pymatgen.core import Structure
from pymatgen.core.periodic_table import Element, Specie
from pymatgen.io.cif import CifWriter
from crystal_2 import crystal
import crystal_2
import os
import shutil
from pymatgen.io.cif import CifWriter
def read_files_check_basic(folder_path):
file_contents = []
if not os.path.exists(folder_path):
print(f"{folder_path} 文件夹不存在")
return file_contents
for filename in os.listdir(folder_path):
file_path = os.path.join(folder_path, filename)
if os.path.isfile(file_path):
try:
temp = crystal(file_path)
file_contents.append(temp)
except Exception as e:
print(e)
print(f"正在处理{filename}")
temp.check_basic()
if temp.check_basic_result:
if not "+" in temp.anion:
target_folder = os.path.join("../data/after_step1",f"{temp.anion}")
if not os.path.exists(target_folder):
os.makedirs(target_folder)
# 目标文件路径
target_file_path = os.path.join(target_folder, filename)
# 复制文件到目标文件夹
shutil.copy(file_path, target_file_path)
print(f"文件 {filename}通过基本筛选,已复制到 {target_folder}")
else:
anions = temp.anion.split("+")
for anion in anions:
target_folder = os.path.join("../data/after_step1", f"{temp.anion}")
target_folder = os.path.join(target_folder, anion)
if not os.path.exists(target_folder):
os.makedirs(target_folder)
# 目标文件路径
target_file_path = os.path.join(target_folder, filename)
# 复制文件到目标文件夹
shutil.copy(file_path, target_file_path)
print(f"文件 {filename}通过基本筛选,已复制到 {target_folder}")
read_files_check_basic("../data/input")

121
py/step2-5-file_process.py Normal file
View File

@@ -0,0 +1,121 @@
from step2 import process_files as step2_process
from step3 import process_files as step3_process
from step4 import process_files as step4_process
from step5 import read_files_check_partical as step5_process
import os
import shutil
import os
import shutil
def create_empty_directory_structure(source_dir, target_dir):
"""
递归地复制源文件夹的目录结构到目标文件夹,创建空文件夹。
参数:
source_dir (str): 源文件夹路径
target_dir (str): 目标文件夹路径
返回:
int: 成功创建的文件夹数量
异常:
FileNotFoundError: 如果源文件夹不存在
PermissionError: 如果没有权限读取源文件夹或写入目标文件夹
"""
# 计数器
created_count = 0
# 检查源文件夹是否存在
if not os.path.exists(source_dir):
raise FileNotFoundError(f"源文件夹不存在: {source_dir}")
# 确保目标文件夹存在
if not os.path.exists(target_dir):
print(f"目标文件夹不存在,正在创建: {target_dir}")
os.makedirs(target_dir)
created_count += 1
# 递归函数,复制文件夹结构
def copy_structure(src, dst):
nonlocal created_count
try:
# 获取源目录中的所有项目
items = os.listdir(src)
# 遍历所有项目
for item in items:
src_path = os.path.join(src, item)
dst_path = os.path.join(dst, item)
# 如果是目录,递归复制结构
if os.path.isdir(src_path):
if not os.path.exists(dst_path):
os.makedirs(dst_path)
created_count += 1
print(f"创建文件夹: {dst_path}")
copy_structure(src_path, dst_path)
# 对于文件,我们不做任何处理,因为我们只需要文件夹结构
except PermissionError:
print(f"无权限访问目录: {src}")
except Exception as e:
print(f"处理目录 {src} 时出错: {str(e)}")
# 开始递归复制
try:
copy_structure(source_dir, target_dir)
print(f"已成功在 {target_dir} 中创建 {created_count} 个文件夹,复制了完整的目录结构")
return created_count
except Exception as e:
print(f"整体操作失败: {str(e)}")
return created_count
if __name__ == "__main__":
create_empty_directory_structure("../data/after_step1","../data/after_step2")
create_empty_directory_structure("../data/after_step1", "../data/after_step3")
create_empty_directory_structure("../data/after_step1", "../data/after_step4")
create_empty_directory_structure("../data/after_step1", "../data/after_step5")
create_empty_directory_structure("../data/after_step1", "../data/after_step6")
for files in os.listdir("../data/after_step1"):
source_path = os.path.join("../data/after_step1", files)
target_path = os.path.join("../data/after_step2", files)
file = files # 如果需要从文件名提取,替换这一行
print('-------------------')
if "+" in file:
# 第二种情况:多个元素,如"S+O"
elements = file.split("+")
print(f"处理多元素文件 {file},拆分为:{elements}")
for element in elements:
print(element)
source_path_tmp = os.path.join(source_path, element)
target_path_tmp = os.path.join(target_path, element)
print('正在做第二步筛选')
step2_process(source_path_tmp, target_path_tmp, element)
target_path_tmp_2 = os.path.join(f"../data/after_step3/{files}", element)
print('正在做第三步筛选')
step3_process(source_path_tmp, target_path_tmp, target_path_tmp_2,element)
target_path_tmp_3 = os.path.join(f"../data/after_step4/{files}", element)
print('正在做第四步筛选')
step4_process(source_path_tmp, target_path_tmp_2,target_path_tmp_3, element)
target_path_tmp_4 = os.path.join(f"../data/after_step5/{files}", element)
print('正在做第五步筛选')
step5_process(target_path_tmp_3,target_path_tmp_4)
else:
# 第一种情况:单一元素,如"S"
print(f"处理单一元素文件:{file}")
target_path_1 = os.path.join("../data/after_step3", files)
target_path_2 = os.path.join("../data/after_step4", files)
target_path_3 = os.path.join("../data/after_step5", files)
print('正在做第二步筛选')
step2_process(source_path, target_path, file)
print('正在做第三步筛选')
step3_process(source_path, target_path,target_path_1, file)
print('正在做第四步筛选')
step4_process(source_path, target_path_1,target_path_2, file)
print('正在做第五步筛选')
step5_process(target_path_2,target_path_3)

80
py/step2.py Normal file
View File

@@ -0,0 +1,80 @@
import os
import re
import shutil
def process_files(cif_folder, output_folder, anion):
# 确保输出文件夹存在
os.makedirs(output_folder, exist_ok=True)
# 获取 txt 文件夹中的所有 txt 文件
txt_files = [f for f in os.listdir(cif_folder) if f.endswith('.txt')]
# 遍历 txt 文件
for txt_file in txt_files:
txt_path = os.path.join(cif_folder, txt_file)
# 打开并读取 txt 文件内容
with open(txt_path, 'r', encoding='utf-8') as file:
content = file.read()
matches = re.findall(r"Percolation diameter \(A\): (\d+\.\d+)", content)
# 使用正则表达式查找符合条件的内容
if matches:
# 提取文件名(去掉.txt后缀
base_name = os.path.splitext(txt_file)[0]
check = False
if anion == "O":
print(f"{base_name}的perconlation diameter为{matches[0]}A")
if float(matches[0]) > 0.5:
check = True
print(f"符合要求")
else:
print("不符合要求")
elif anion == "S":
print(f"{base_name}的perconlation diameter为{matches[0]}A")
if float(matches[0]) > 0.55:
check = True
print(f"符合要求")
else:
print("不符合要求")
elif anion == "Br":
print(f"{base_name}的perconlation diameter为{matches[0]}A")
if float(matches[0]) > 0.45:
check = True
print("符合要求")
else:
print("不符合要求")
elif anion == "Cl":
print(f"{base_name}的perconlation diameter为{matches[0]}A")
if float(matches[0]) > 0.45:
check = True
print("符合要求")
else:
print("不符合要求")
if check:
# 查找与 txt 文件同名的 cif 文件
cif_path = os.path.join(cif_folder, base_name)
# 如果对应的 cif 文件存在,复制到 output_folder
if os.path.exists(cif_path):
shutil.copy(cif_path, os.path.join(output_folder, base_name))
print(f"Copied {base_name} to {output_folder}")
def work_py(input_folder, output_folder):
if not os.path.exists(output_folder):
print("not exists")
for filename in os.listdir(input_folder):
target_folder = os.path.join(output_folder, filename)
from_folder = os.path.join(input_folder, filename)
process_files(from_folder, target_folder)
if __name__ == "__main__":
# work_py("../data/after_step1","../data/after_step2" )
# process_files("../data/after_step1/O", "../data/after_step2/O", "O")
# process_files("../data/after_step1/S", "../data/after_step2/S", "S")
process_files("../data/after_step1/Cl", "../data/after_step2/Cl", "Br")
process_files("../data/after_step1/Br", "../data/after_step2/Br", "Cl")

72
py/step3.py Normal file
View File

@@ -0,0 +1,72 @@
import os
import re
import shutil
def process_files(cif_folder,input_folder,output_folder, anion):
# 确保输出文件夹存在
os.makedirs(output_folder, exist_ok=True)
# 获取 txt 文件夹中的所有 txt 文件
txt_files = [f for f in os.listdir(cif_folder) if f.endswith('.txt')]
# 遍历 txt 文件
for txt_file in txt_files:
txt_path = os.path.join(cif_folder, txt_file)
# 打开并读取 txt 文件内容
with open(txt_path, 'r', encoding='utf-8') as file:
content = file.read()
matches = re.findall(r"the minium of d\s+([\d\.]+)\s*#", content)
# 使用正则表达式查找符合条件的内容
if matches:
# 提取文件名(去掉.txt后缀
base_name = os.path.splitext(txt_file)[0]
check = False
if anion == "O":
print(f"{base_name}的最短距离为{matches[0]}A")
if float(matches[0]) < 3:
check = True
print(f"符合要求")
else:
print("不符合要求")
elif anion == "S":
print(f"{base_name}的最短距离为{matches[0]}A")
if float(matches[0]) < 3:
check = True
print(f"符合要求")
else:
print("不符合要求")
elif anion == "Cl":
print(f"{base_name}的最短距离为{matches[0]}A")
if float(matches[0]) < 3:
check = True
print(f"符合要求")
else:
print("不符合要求")
elif anion == "Br":
print(f"{base_name}的最短距离为{matches[0]}A")
if float(matches[0]) < 3:
check = True
print(f"符合要求")
else:
print("不符合要求")
if check:
# 查找与 txt 文件同名的 cif 文件
cif_path = os.path.join(input_folder, base_name)
# 如果对应的 cif 文件存在,复制到 output_folder
if os.path.exists(cif_path):
shutil.copy(cif_path, os.path.join(output_folder, base_name))
print(f"Copied {base_name} to {output_folder}")
if __name__ == '__main__':
# process_files("../data/after_step1/O","../data/after_step2/O", "../data/after_step3/O", "O")
# process_files("../data/after_step1/S", "../data/after_step2/S","../data/after_step3/S", "S")
process_files("../data/after_step1/Cl", "../data/after_step2/Cl","../data/after_step3/Cl", "Cl")
process_files("../data/after_step1/Br", "../data/after_step2/Br","../data/after_step3/Br", "Br")

72
py/step4.py Normal file
View File

@@ -0,0 +1,72 @@
import os
import re
import shutil
def process_files(cif_folder,input_folder,output_folder, anion):
# 确保输出文件夹存在
os.makedirs(output_folder, exist_ok=True)
# 获取 txt 文件夹中的所有 txt 文件
txt_files = [f for f in os.listdir(cif_folder) if f.endswith('.txt')]
# 遍历 txt 文件
for txt_file in txt_files:
txt_path = os.path.join(cif_folder, txt_file)
# 打开并读取 txt 文件内容
with open(txt_path, 'r', encoding='utf-8') as file:
content = file.read()
matches = re.findall(r"Maximum node length detected: (\d+\.\d+) A", content)
# 使用正则表达式查找符合条件的内容
if matches:
# 提取文件名(去掉.txt后缀
base_name = os.path.splitext(txt_file)[0]
check = False
if anion == "O":
print(f"{base_name}的扩大锂离子直径为{matches[0]}A")
if float(matches[0]) > 2.2:
check = True
print(f"符合要求")
else:
print("不符合要求")
elif anion == "S":
print(f"{base_name}的扩大锂离子直径为{matches[0]}A")
if float(matches[0]) > 2.2:
check = True
print(f"符合要求")
else:
print("不符合要求")
elif anion == "Cl":
print(f"{base_name}的扩大锂离子直径为{matches[0]}A")
if float(matches[0]) > 2:
check = True
print(f"符合要求")
else:
print("不符合要求")
elif anion == "Br":
print(f"{base_name}的扩大锂离子直径为{matches[0]}A")
if float(matches[0]) > 2:
check = True
print(f"符合要求")
else:
print("不符合要求")
if check:
# 查找与 txt 文件同名的 cif 文件
cif_path = os.path.join(input_folder, base_name)
# 如果对应的 cif 文件存在,复制到 output_folder
if os.path.exists(cif_path):
shutil.copy(cif_path, os.path.join(output_folder, base_name))
print(f"Copied {base_name} to {output_folder}")
if __name__ == "__main__":
# process_files("../data/after_step1/O","../data/after_step3/O", "../data/after_step4/O", "O")
# process_files("../data/after_step1/S", "../data/after_step3/S","../data/after_step4/S", "S")
process_files("../data/after_step1/Cl", "../data/after_step3/Cl","../data/after_step4/Cl", "Cl")
process_files("../data/after_step1/Br", "../data/after_step3/Br","../data/after_step4/Br", "Br")

57
py/step5.py Normal file
View File

@@ -0,0 +1,57 @@
from pymatgen.core import Structure
from pymatgen.core.periodic_table import Element, Specie
from pymatgen.io.cif import CifWriter
from crystal_2 import crystal
import crystal_2
import os
import shutil
from pymatgen.io.cif import CifWriter
def read_files_check_partical(input_folder,output_folder):
file_contents = []
folder_path = input_folder
if not os.path.exists(folder_path):
print(f"{folder_path} 文件夹不存在")
return file_contents
for filename in os.listdir(folder_path):
file_path = os.path.join(folder_path, filename)
if os.path.isfile(file_path):
try:
temp = crystal(file_path)
file_contents.append(temp)
print(f"正在处理{filename}")
temp.check_practical()
if temp.check_practical_result:
target_folder = output_folder
if not os.path.exists(target_folder):
os.makedirs(target_folder)
# 目标文件路径
target_file_path = os.path.join(target_folder, filename)
# 复制文件到目标文件夹
shutil.copy(file_path, target_file_path)
print(f"文件 {filename}通过实际筛选,已复制到 {target_folder}")
except Exception as e:
print(e)
def work_py(input_folder, output_folder):
if not os.path.exists(output_folder):
print("not exists")
for filename in os.listdir(input_folder):
target_folder = os.path.join(output_folder, filename)
from_folder = os.path.join(input_folder, filename)
read_files_check_partical(from_folder, target_folder)
if __name__ == "__main__":
# work_py('../data/after_step4','../data/after_step5')
# read_files_check_partical('../data/after_step4/O','../data/after_step5/O')
# read_files_check_partical('../data/after_step4/S','../data/after_step5/S')
read_files_check_partical('../data/after_step4/Cl','../data/after_step5/Cl')
read_files_check_partical('../data/after_step4/Br','../data/after_step5/Br')

75
py/step6.py Normal file
View File

@@ -0,0 +1,75 @@
from pymatgen.core import Structure
from pymatgen.analysis.structure_matcher import StructureMatcher
import os
import shutil
# 定义函数,用于将 CIF 文件进行分类并输出
def structure_classify(input_folder, output_folder):
"""
分类 input_folder 中的 CIF 文件,并根据框架分组后存储到 output_folder 中。
:param input_folder: 输入文件夹路径,包含 .cif 文件。
:param output_folder: 输出文件夹路径,用于存储分类后的 .cif 文件。
"""
# 检查输入文件夹是否存在
if not os.path.exists(input_folder):
print(f"输入文件夹 {input_folder} 不存在")
return
# 创建输出文件夹(如果不存在)
if not os.path.exists(output_folder):
os.makedirs(output_folder)
# 读取输入文件夹中的所有 .cif 文件
structures = []
file_map = {} # 记录文件路径与结构的对应关系
for filename in os.listdir(input_folder):
if filename.endswith(".cif"):
file_path = os.path.join(input_folder, filename)
try:
structure = Structure.from_file(file_path)
structures.append(structure)
file_map[id(structure)] = filename # 使用结构对象的 id 作为键
except Exception as e:
print(f"无法读取文件 {filename}{e}")
# 检查是否成功加载了任何结构
if not structures:
print("未找到有效的 CIF 文件")
return
# 分组结构
matcher = StructureMatcher()
grouped_structures = []
for structure in structures:
matched = False
for group in grouped_structures:
if matcher.fit(structure, group[0]): # 比较结构是否匹配
group.append(structure)
matched = True
break
if not matched:
grouped_structures.append([structure])
# 保存分组后的结构到输出文件夹
for group_index, group in enumerate(grouped_structures):
group_folder = os.path.join(output_folder, f"group_{group_index}")
os.makedirs(group_folder, exist_ok=True)
for structure_index, structure in enumerate(group):
output_file = os.path.join(group_folder, f"structure_{structure_index}.cif")
structure.to(filename=output_file)
# 获取原始文件名并复制到分组文件夹
original_filename = file_map[id(structure)] # 使用结构对象的 id 获取文件名
original_file_path = os.path.join(input_folder, original_filename)
shutil.copy(original_file_path, group_folder)
print(f"处理完成,分类后的结构已保存到 {output_folder}")
if __name__ == "__main__":
# 示例调用
structure_classify("../data/after_step5/S", "../data/after_step6/S")
structure_classify("../data/after_step5/O", "../data/after_step6/O")

9
py/utils.py Normal file
View File

@@ -0,0 +1,9 @@
import os
def work_py(input_folder, output_folder):
if not os.path.exists(output_folder):
print("not exists")
for filename in os.listdir(input_folder):
target_folder = os.path.join(output_folder, filename)
from_folder = os.path.join(input_folder, filename)