diff --git a/main.sh b/main.sh new file mode 100644 index 0000000..5ab2fe8 --- /dev/null +++ b/main.sh @@ -0,0 +1,30 @@ +#!/bin/bash +#修改权限 +chmod -R u+w ../Screen +#启用screen环境 +conda activate screen +# 设置当前目录为 PYTHONPATH +cd py/ +export PYTHONPATH=$(pwd):$PYTHONPATH +#调用预筛选 +python pre_process.py +#调用第一步筛选 +python step1.py + +#为第一步筛出的所有材料制作脚本 +python make_sh.py +#调整环境 +conda deactivate +conda activate zeo +#运行脚本 +cd ../data/after_step1 +source sh_all.sh +rm sh_all.sh +#调整conda回到screen +conda deactivate +conda activate screen + +#启用不同的python文件做分析 +cd ../../py +python step2-5-file_process.py +#python step6.py diff --git a/py/Occupation.py b/py/Occupation.py new file mode 100644 index 0000000..b828c37 --- /dev/null +++ b/py/Occupation.py @@ -0,0 +1,329 @@ +from fontTools.misc.plistlib import end_string +from pymatgen.core import Structure +import spglib +from fractions import Fraction +import random +from pymatgen.core.sites import PeriodicSite, Species,Element,Lattice +import numpy as np +import re +def typejudge(number): + if number in [1, 2]: + return "Triclinic" + elif 3 <= number <= 15: + return "Monoclinic" + elif 16 <= number <= 74: + return "Orthorhombic" + elif 75 <= number <= 142: + return "Tetragonal" + elif 143 <= number <= 167: + return "Trigonal" + elif 168 <= number <= 194: + return "Hexagonal" + elif 195 <= number <= 230: + return "Cubic" + else: + return "Unknown" + + +def extract_oxi_state(species_string): + """ + 从 species_string(如 "Li+:0.645")提取氧化态 + + Args: + species_string: 包含元素和氧化态的字符串(如 "Li+:0.645"、"Fe2-:0.5") + + Returns: + int: 氧化态数值(如 "+" -> 1, "2-" -> -2),默认返回 0 如果解析失败 + """ + # 使用正则表达式匹配化学符号和氧化态(如 Li+, Fe2-) + match = re.search(r"([A-Za-z]+)([+-]?\d*[+-])", species_string) + if not match: + return 0 # 默认中性 + + # 提取氧化态部分(如 "+", "2-", "3+") + oxi_str = match.group(2) + + # 处理单符号情况(如 "+" -> 1, "-" -> -1) + if oxi_str in ("+", "-"): + return 1 if oxi_str == "+" else -1 + + # 处理多数字情况(如 "2+" -> 2, "3-" -> -3) + try: + num = int(oxi_str[:-1]) # 提取数字部分 + sign = 1 if oxi_str[-1] == "+" else -1 + return num * sign + except (ValueError, IndexError): + return 0 # 解析失败时返回中性 + +def process_cif_file(cif_file_path,explict_element): + structure = Structure.from_file(cif_file_path) + result_list = [] + for index, site in enumerate(structure.sites, start=1): + occupancy = site.species.element_composition.num_atoms + species = site.species.chemical_system + if species in explict_element: + break + if occupancy < 1.0: + entry = next((r for r in result_list if + r["species"] == species and r["occupancy"] == occupancy), None) + if entry: + entry["number"].append(index) + else: + result_list.append({ + "species": species, + "number": [index], + "occupancy": occupancy + }) + return result_list + + +def factorize_to_three_factors(n,type_sym=None): + factors = [] + + # 遍历可能的x值 + if type_sym == None: + for x in range(1, n + 1): + if n % x == 0: + remaining_n = n // x + # 遍历可能的y值 + for y in range(1, remaining_n + 1): + if remaining_n % y == 0: + z = remaining_n // y + factors.append({'x': x, 'y': y, 'z': z}) + if type_sym == "xyz": + for x in range(1, n + 1): + if n % x == 0: + remaining_n = n // x + # 遍历可能的y值 + for y in range(1, remaining_n + 1): + if remaining_n % y == 0 and y <= x: + z = remaining_n // y + if z <= y: + factors.append({'x': x, 'y': y, 'z': z}) + + def sum_score(factor): + x, y, z = factor['x'], factor['y'], factor['z'] + return x + y + z + + # 按照sum_score从小到大排序 + sorted_factors = sorted(factors, key=sum_score) + return sorted_factors + + + + + +def calculate_supercell_factor(occupancy): + # 将浮点数转换为分数形式 + fraction = Fraction(occupancy).limit_denominator() + + # 获取分子和分母 + numerator = fraction.numerator + denominator = fraction.denominator + + return numerator,denominator + + +def mark_atoms_randomly(factors, atom_number): + """ + 根据扩胞因子和占据数量生成随机占据字典 + + Args: + factors: 扩胞因子字典 {'x': int, 'y': int, 'z': int} + atom_number: 需要占据的副本数量 + + Returns: + 字典 {0: 1或0, 1: 1或0, ..., total_copies-1: 1或0} + """ + x, y, z = factors['x'], factors['y'], factors['z'] + total_copies = x * y * z + + if atom_number > total_copies: + raise ValueError(f"atom_number ({atom_number}) 不能超过扩胞总数 (x*y*z = {total_copies})") + + # 生成所有副本索引 [0, 1, 2, ..., total_copies-1] + atom_dice = list(range(total_copies)) + + # 随机选择 atom_number 个副本占据 + selected_atoms = random.sample(atom_dice, atom_number) + + # 创建结果字典 {0: 1或0, 1: 1或0, ...} + result = {atom: 1 if atom in selected_atoms else 0 for atom in atom_dice} + + return result + + +def generate_random_list(total_elements, atom_number): + # 确保 atom_number 不超过 total_elements + if atom_number > total_elements: + raise ValueError("atom_number cannot be greater than the total number of elements (x * y * z)") + + # 创建一个全0的列表 + result = [0] * total_elements + + # 随机选择 atom_number 个位置,并将这些位置的值设为1 + indices = random.sample(range(total_elements), atom_number) + for index in indices: + result[index] = 1 + + return result + + +def merge_structures(struct_copies, factors): + """ + 将多个副本结构按三维顺序合并为扩胞后的结构 + + Args: + struct_copies: 副本结构列表(长度 = x*y*z) + factors: 扩胞因子字典 {"x": int, "y": int, "z": int} + + Returns: + 合并后的扩胞结构 + """ + x, y, z = factors["x"], factors["y"], factors["z"] + total_copies = x * y * z + + if len(struct_copies) != total_copies: + raise ValueError("副本数量与扩胞因子不匹配") + + # 获取原结构的晶格 + original_lattice = struct_copies[0].lattice + + # 创建扩胞后的新晶格(直接按倍数缩放) + new_lattice_matrix = np.dot(original_lattice.matrix, np.diag([x, y, z])) + new_lattice = Lattice(new_lattice_matrix) + + # 初始化合并后的结构 + merged_structure = Structure( + lattice=new_lattice, + species=[], + coords=[], + coords_are_cartesian=False + ) + + # 按三维顺序填充每个副本的原子 + for copy_idx in range(total_copies): + # 计算当前副本的分数坐标偏移量 + offset = np.array([ + copy_idx // (y * z), # x方向偏移 + (copy_idx % (y * z)) // z, # y方向偏移 + copy_idx % z # z方向偏移 + ]) + + # 将当前副本的原子添加到合并结构中(考虑偏移) + for site in struct_copies[copy_idx]: + if site.species: # 跳过空位 + merged_structure.append( + species=site.species, + coords=site.frac_coords + offset, + coords_are_cartesian=False, + properties=site.properties + ) + + return merged_structure + +def expand_structure(structure, factors, atom_indices, atom_number): + # 参数检查 + x, y, z = factors['x'], factors['y'], factors['z'] + total_copies = x * y * z + if not all(1 <= idx <= len(structure.sites) for idx in atom_indices): + raise ValueError("atom_indices包含无效原子索引") + + # 生成独立副本 + struct_copies = [structure.copy() for _ in range(total_copies)] + atom_dice = list(range(total_copies)) # 所有副本索引 [0,1,2,...] + + # 处理每个目标原子 + for atom_idx in atom_indices: + original_site = structure.sites[atom_idx - 1] + element = original_site.species.chemical_system + + + # 生成当前原子的占据字典(如{0:1, 1:0, 2:1,...}) + occupancy_dict = mark_atoms_randomly(factors,atom_number) + # 修改每个副本 + for copy_idx, occupy in occupancy_dict.items(): + # 或者方法2:使用remove/insert + struct_copies[copy_idx].remove_sites([atom_idx - 1]) + oxi_state = extract_oxi_state(original_site.species_string) + if occupy: + new_site = PeriodicSite( + species=Species(element, oxi_state), + coords=original_site.frac_coords, + lattice=struct_copies[copy_idx].lattice, + to_unit_cell=True, + label=original_site.label + ) + struct_copies[copy_idx].sites.insert(atom_idx - 1, new_site) + else: + species_dict = {Species(element, oxi_state): 0.0} + new_site = PeriodicSite( + species = species_dict, + coords=original_site.frac_coords, + lattice=struct_copies[copy_idx].lattice, + to_unit_cell=True, + label=original_site.label + ) + struct_copies[copy_idx].sites.insert(atom_idx - 1, new_site) + + # 合并副本 + expanded_structure = Structure( + lattice=np.dot(structure.lattice.matrix, np.diag([x, y, z])), + species=[], + coords=[], + coords_are_cartesian=False + ) + + for copy_idx in range(total_copies): + offset = np.array([ + copy_idx // (y * z), + (copy_idx % (y * z)) // z, + copy_idx % z + ]) + for site in struct_copies[copy_idx]: + if site.species: # 只添加非空位 + expanded_structure.append( + species=site.species, + coords=site.frac_coords + offset, + coords_are_cartesian=False, + properties=site.properties + ) + expanded_structure = merge_structures(struct_copies,factors) + return expanded_structure + + +def process_occupation(input_file,output_file,explict_element = ["Li"],expect_cifnumber = 10,random_time=1): + struct = Structure.from_file(input_file) + space_group_info = struct.get_space_group_info() + space_group_symbol = space_group_info[0] + all_spacegroup_symbols = [spglib.get_spacegroup_type(i) for i in range(1, 531)] + symbol = all_spacegroup_symbols[0] + for symbol_i in all_spacegroup_symbols: + if space_group_symbol == symbol_i.international_short: + symbol = symbol_i + space_type = typejudge(symbol.number) + print(f"当前空间群符号为{space_group_symbol},序号为{symbol.number},对应的晶体体系为{space_type}") + occupation_list = process_cif_file(cif_file_path=input_file,explict_element=explict_element) + print(occupation_list) + for occupation in occupation_list: + atom_number, target_multiplier=calculate_supercell_factor(occupation["occupancy"]) + divides = [] + if space_type == "Hexagonal": + print('当前为六方晶系,暂不处理') + if space_type == "Cubic": + print("当前为立方晶体,三个方向同步") + divides = factorize_to_three_factors(target_multiplier,"xyz") + else: + print("为其他晶系,假设三个方向不同") + divides = factorize_to_three_factors(target_multiplier,) + print(divides) + for it in divides: + end_str = f'x{it["x"]}y{it["y"]}z{it["z"]}' + for i in range(random_time): + expand_struct=expand_structure(struct,it,occupation["number"],atom_number) + expand_struct.to_file(output_file) + print(it) + else: + print(f"不存在除{explict_element}以外的共占位原子") +process_occupation("../data/input_pre/ICSD_1234.cif", "haha.cif", explict_element=[], expect_cifnumber=1, random_time=1) + diff --git a/py/call_analyze.py b/py/call_analyze.py new file mode 100644 index 0000000..bb60eb1 --- /dev/null +++ b/py/call_analyze.py @@ -0,0 +1,44 @@ +# -*- coding: utf-8 -*- +import argparse +import subprocess + + +def run_analysis_with_subprocess(cif_file, input_file, output_file, filters=None): + # 如果没有传递 filters,则使用默认值 + if filters is None: + filters = ["Ordered", "PropOxi", "VoroPerco", "Coulomb", "VoroBV", "VoroInfo", "MergeSite"] + + # 构建命令行参数 + command = ['python', '../tool/analyze_voronoi_nodes.py', cif_file, '-i', input_file, '-o', output_file, '-f'] + filters + + # 调用 subprocess 执行命令 + process = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE) + + # 捕获标准输出和标准错误 + stdout, stderr = process.communicate() + + # Python 2.7 需要解码 stdout 和 stderr(因为是 str 类型) + stdout = stdout.decode('utf-8') if isinstance(stdout, str) else stdout + stderr = stderr.decode('utf-8') if isinstance(stderr, str) else stderr + + # 打印输出内容或记录到文件 + print(stdout) + if stderr: + print(stderr) + + +if __name__ == "__main__": + # 设置命令行参数解析器 + parser = argparse.ArgumentParser(description='Run Voronoi analysis using analyze.py script.') + parser.add_argument('cif_file', type=str, help='CIF file to analyze') + parser.add_argument('-i', '--input_file', type=str, help='Input YAML file', required=True) + parser.add_argument('-o', '--output_file', type=str, help='Output file to save the results', required=True) + parser.add_argument('-f', '--filters', nargs='+', + default=["Ordered", "PropOxi", "VoroPerco", "Coulomb", "VoroBV", "VoroInfo", "MergeSite"], + help='List of filters to apply (default is all filters)') + + # 解析命令行参数 + args = parser.parse_args() + + # 调用分析函数 + run_analysis_with_subprocess(args.cif_file, args.input_file, args.output_file, args.filters) diff --git a/py/crystal_2.py b/py/crystal_2.py new file mode 100644 index 0000000..8860964 --- /dev/null +++ b/py/crystal_2.py @@ -0,0 +1,363 @@ +from pymatgen.core import Structure +from pymatgen.core.periodic_table import Element, Specie +from pymatgen.analysis.local_env import CrystalNN +from pymatgen.analysis.structure_matcher import StructureMatcher +from pymatgen.io.cif import CifParser +import numpy as np +class crystal: + def __init__(self, file_path, element_positive='Na', mixed_anions=None): + # self.parse = CifParser(file_path) + # self.structure = self.parse.get_structures()[0] + if mixed_anions is None: + mixed_anions = {frozenset({'S', 'O'}), frozenset({'Cl', 'Br'}),frozenset({'Cl', 'O'}),frozenset({'Cl', 'Br'}),frozenset({'S', 'Cl'})} + self.structure = Structure.from_file(file_path) + self.file_path = file_path + self.element_positive = element_positive + self.check_all = False + self.check_basic_result = False + self.check_high_cn_and_face_sharing_result = False + self.check_percolation_radius_result = False + self.check_practical_result = False + self.anion = "" + self.anions = "" + self.mixed_anions = mixed_anions + #self.initialize() + + def initialize(self): + print("e") + # self.check_basic_result=self.check_basic() + # self.check_high_cn_and_face_sharing_result = self.check_high_cn_and_face_sharing() + # self.check_percolation_radius_result = self.check_percolation_radius() + # self.check_all = self.check_basic_result and self.check_high_cn_and_face_sharing_result and self.check_percolation_radius_result + # print(f"{self.file_path}done") + + def check_practical(self): + structure = self.structure + + # 检查是否为Li-X-O,Li-P-S + excluded_X_elements = {'S', 'I', 'Si', 'C', 'P', 'Al', 'Ge', 'Se', 'B', 'Cl'} + chemical_system_set = structure.chemical_system_set + + try: + if len(chemical_system_set) == 3: + if "Li" in chemical_system_set and "O" in chemical_system_set: + for element in excluded_X_elements: + if element in chemical_system_set: + return False + if "Li" in chemical_system_set and "P" in chemical_system_set and "S" in chemical_system_set: + return False + except Exception as e: + print(f"Error during Li-X-O check: {e}") + return False + + # 排除过渡金属元素 + excluded_transition_metals = {'Fe', 'Mn', 'Ni', 'Ti', 'Mo', 'V', 'Co'} + try: + if "Li" in chemical_system_set and "O" in chemical_system_set: + for element in excluded_transition_metals: + if element in chemical_system_set: + return False + except Exception as e: + print(f"Error during transition metal check: {e}") + return False + + # 检查是否包含N, Re, Ho, Hf, Ru, Eu, Lu + excluded_elements = {'N', 'Re', 'Ho', 'Hf', 'Ru', 'Lu'} + try: + for element in excluded_elements: + if element in chemical_system_set: + return False + except Exception as e: + print(f"Error during excluded elements check: {e}") + return False + + # 检查是否共享位点 + try: + for site in structure.sites: + if 'Li' in site.species_string and len(site.species) > 1: + return False + except Exception as e: + print(f"Error during site sharing check: {e}") + return False + + self.check_practical_result = True + return True + + def check_basic(self): + structure = self.structure + #判断是否为二元化合物 + if len(structure.types_of_specie) == 2: + return False + #判断阴离子是否为多种阴离子 + # anions = {'O', 'S', 'Se', 'Te', 'F', 'Cl', 'Br', 'I'} + anions = {'O', 'S','Br','Cl'} + try: + for site in self.structure.sites: + try: + #if site.specie.symbol in anions: + if site.species.chemical_system in anions: + self.anion = site.specie.symbol + break + except AttributeError as e: + a=1 + try: + if site.species.chemical_system in anions: + self.anion = site.specie.symbol + break + except AttributeError as e: + print(e) + if self.anion in anions: + a=1 + else: + if not self.mixed_anions: + print("不是所选阴离子化合物") + return False + except Exception as e: + print(e) + return False + #这里添加对多种阴离子的支持 + try: + # 创建一个集合来收集所有发现的阴离子 + found_anions = set() + + # 遍历structure以收集所有阴离子 + for site in self.structure.sites: + try: + if site.species.chemical_system in anions: + found_anions.add(site.specie.symbol) + except AttributeError: + try: + if site.specie.symbol in anions: + found_anions.add(site.specie.symbol) + except AttributeError: + continue + + # 检查找到的阴离子情况 + if len(found_anions) == 0: + print("未找到任何预定义的阴离子") + return False + elif len(found_anions) == 1: + # 只有一种阴离子 + self.anion = list(found_anions)[0] + print(f"发现单一阴离子: {self.anion}") + else: + # 有多种阴离子,检查是否匹配预定义的混合阴离子组合 + found_anions_frozen = frozenset(found_anions) + if found_anions_frozen in self.mixed_anions: + self.anions = found_anions + self.anion = "+".join(sorted(found_anions)) # 例如: "Cl+S" + print(f"发现匹配的混合阴离子组合: {self.anion}") + else: + # 如果找到的阴离子组合不在预定义列表中 + print(f"发现的阴离子组合 {found_anions} 不在预定义的混合阴离子列表中") + return False + except Exception as e: + print(f"处理阴离子时出错: {e}") + return False + + #这里还要调试 + # try: + # # 初始化总电荷 + # total_charge = 0 + # + # # 检查是否所有元素都有氧化态 + # for site in structure: + # try: + # oxi_state = site.species.charge # 检查是否有氧化态 + # total_charge += oxi_state # 累加氧化态 + # except AttributeError: + # print(f"元素 {site.specie.symbol} 缺少氧化态定义") + # return False + # # 检查是否电荷平衡 + # if total_charge == 0: + # print("所有元素的价态之和为 0,结构电荷平衡") + # else: + # print(f"所有元素的价态之和为 {total_charge},结构不平衡") + # return False + # except Exception as e: + # print(f"发生错误: {e}") + # return False + + #判断原子个数 + try: + if not self.mixed_anions: + if structure.num_sites>300: + return False + else: + if structure.num_sites>900: + return False + except Exception: + print("原子个数判断失败") + return False + + #判断有几个阴离子 + # anions = {'O', 'S', 'Se', 'Te', 'F', 'Cl', 'Br', 'I'} + # try: + # anion_elements = {site.species.chemical_system for site in structure if site.species.chemical_system in anions} + # if len(anion_elements) > 1: + # return False + # except Exception: + # print("阴离子个数判断失败") + # return False + + #判断是否有放射性元素 + radioactive_elements = {'U', 'Th', 'Pu', 'Ra', 'Rn', 'Po', 'Np', 'Am', 'Cm', 'Bk', 'Cf', 'Es', 'Fm', 'Md', 'No', + 'Lr'} + + try: + # 遍历结构中的元素 + for site in structure: + if site.species.chemical_system in radioactive_elements: + return False # 存在放射性元素 + except Exception: + print("放射性元素判断失败") + return False + + + #判断是否存在共占位 + try: + for site in structure.sites: + if self.element_positive in [specie.symbol for specie in site.species.keys()] and len(site.species) > 1: + return False + except Exception: + print("共占位判断失败") + return False + + #判读是否有无序或部分占位的阴离子 + try: + for site in structure.sites: + for specie, occupancy in site.species.items(): + if specie.symbol in anions and occupancy < 1: + return False + except Exception: + print("无序或部分占位的阴离子判断失败") + return False + + #判断是否有水分子 + try: + oxygen_sites = [site for site in structure.sites if site.species.chemical_system == "O"] + hydrogen_sites = [site for site in structure.sites if site.species.chemical_system == "H"] + + for o_site in oxygen_sites: + nearby_hydrogens = [h_site for h_site in hydrogen_sites if o_site.distance(h_site) < 1.2] + + if len(nearby_hydrogens) == 2: + return False + except Exception: + print("水分子判断失败") + return False + #接下来判断是否有标准信息 + try: + for site in structure.sites: + for specie in site.species.keys(): + element = Element(specie.symbol) + + if not element.common_oxidation_states: + return False + + try: + _ = Specie(element.symbol, max(element.common_oxidation_states)).ionic_radius + except: + return False + except Exception: + print("标准信息判断失败") + return False + #暂时不判断是否为电中性 + #可能需要通过ovito等库来做判断 + #存在一些文件不提供各元素的电负性 + + #判断电中性是否存在 + + self.check_basic_result = True + return True + + def check_high_cn_and_face_sharing(self,cut_distance = 3.1): + structure = self.structure + #基于固角权重的计算 + nn_finder = CrystalNN() + + #遍历结构中的所有Na位点,检查配位数 + #是所有Na位点都需要还是只检查高配位数的位点? + high_cn_ep_sites = [] + try: + for i,site in enumerate(structure): + if site.specie == Element(self.element_positive): + cn = nn_finder.get_cn(structure,i) + if cn>=5: + high_cn_ep_sites.append(i) + if len(high_cn_ep_sites)==0: + return False + except Exception: + print("高配位Na离子判断失败") + return False + #检查共面 + try: + for i in high_cn_ep_sites: + neighbors = nn_finder.get_nn_info(structure,i) + x_neighbors = [] + x_neighbors = [ + neighbor["site_index"] + for neighbor in neighbors + if structure[neighbor["site_index"]].specie.symbol == self.element_positive + and neighbor["site"].distance(structure[i]) <= cut_distance + ] + + if not self._check_face_sharing(i, x_neighbors): + print(f"Na site {i} does not share a face with other high-CN Na sites.") + return False + + print("All high-CN Na sites are face-sharing.") + except Exception: + print("共面判断失败") + return True + + + def _check_face_sharing(self,site_index,neighbor_indices): + # 获取当前 Na 位点的坐标 + site_coords = self.structure[site_index].coords + + # 遍历邻居 + for neighbor_index in neighbor_indices: + # 获取邻居的坐标 + neighbor_coords = self.structure[neighbor_index].coords + + # 获取两个原子之间共享的面(使用简单的距离或角度计算) + # 假设共享面的法向量计算可以从 Voronoi 构造 + shared_face_normal = self._calculate_face_normal(site_coords, neighbor_coords) + + # 判断是否共面(如果法向量的绝对值接近 0,可以认为共面) + if shared_face_normal is not None: + return True + + return False + def _calculate_face_normal(self, coords1, coords2): + + # 示例计算:用两个原子之间的向量生成法向量 + vector = coords2 - coords1 + norm = np.linalg.norm(vector) + + # 如果向量接近零,返回 None + if norm < 1e-6: + return None + + # 正则化向量作为法向量 + return vector / norm + + def check_percolation_radius(self): + return True +def group_structures_by_framework(structures): + matcher = StructureMatcher() + grouped_structures = [] + + for structure in structures: + matched = False + for group in grouped_structures: + + if matcher.fit(structure, group[0]): # 比较结构是否匹配 + group.append(structure) + matched = True + break + if not matched: + grouped_structures.append([structure]) + + return grouped_structures \ No newline at end of file diff --git a/py/expansion.py b/py/expansion.py new file mode 100644 index 0000000..af0a684 --- /dev/null +++ b/py/expansion.py @@ -0,0 +1,499 @@ +from distutils.dir_util import remove_tree + +from pymatgen.core import Structure, Lattice,Species,PeriodicSite +import numpy as np +from collections import defaultdict +import math +import spglib +from functools import reduce +from fractions import Fraction +import random +import re +import os + +def mark_atoms_randomly(numerator,denominator): + """ + 根据扩胞因子和占据数量生成随机占据字典 + + Args: + factors: 扩胞因子字典 {'x': int, 'y': int, 'z': int} + atom_number: 需要占据的副本数量 + + Returns: + 字典 {0: 1或0, 1: 1或0, ..., total_copies-1: 1或0} + """ + + + if numerator > denominator: + raise ValueError(f"atom_number ({numerator}) 不能超过扩胞总数 (x*y*z = {denominator})") + + # 生成所有副本索引 [0, 1, 2, ..., total_copies-1] + atom_dice = list(range(denominator)) + + # 随机选择 atom_number 个副本占据 + selected_atoms = random.sample(atom_dice, numerator) + + # 创建结果字典 {0: 1或0, 1: 1或0, ...} + result = {atom: 1 if atom in selected_atoms else 0 for atom in atom_dice} + + return result +def extract_oxi_state(species_str,element): + """ + 从物种字符串中提取指定元素的氧化态 + + 参数: + species_str: 物种字符串,如 "Li+:0.689, Sc3+:0.311" + element: 要提取的元素符号,如 "Sc" + + 返回: + int: 氧化态数值(如 Sc3+ → 3,Sc- → -1,Sc3- → -3) + 如果未找到或没有氧化态则返回 0 + """ + # 分割字符串获取各个物种部分 + species_parts = [part.strip() for part in species_str.split(",") if part.strip()] + + for part in species_parts: + # 提取元素和电荷部分(冒号前的内容) + element_with_charge = part.split(":")[0].strip() + + # 检查是否匹配目标元素 + if element in element_with_charge: + # 提取电荷部分 + charge_part = element_with_charge[len(element):] + + # 处理无数字情况(如"Sc+") + if not any(c.isdigit() for c in charge_part): + if "+" in charge_part: + return 1 + elif "-" in charge_part: + return -1 + else: + return 0 + + # 处理有数字情况(如"Sc3+") + sign = 1 + if "-" in charge_part: + sign = -1 + + # 提取数字部分 + digits = "" + for c in charge_part: + if c.isdigit(): + digits += c + + if digits: # 确保有提取到数字 + return sign * int(digits) + + return 0 # 默认返回0 +def factorize_to_three_factors(n,type_sym=None,keep_module=None): + factors = [] + + # 遍历可能的x值 + if type_sym == None: + for x in range(1, n + 1): + if n % x == 0: + remaining_n = n // x + # 遍历可能的y值 + for y in range(1, remaining_n + 1): + if remaining_n % y == 0: + z = remaining_n // y + factors.append({'x': x, 'y': y, 'z': z}) + if type_sym == "xyz": + for x in range(1, n + 1): + if n % x == 0: + remaining_n = n // x + # 遍历可能的y值 + for y in range(1, remaining_n + 1): + if remaining_n % y == 0 and y <= x: + z = remaining_n // y + if z <= y: + factors.append({'x': x, 'y': y, 'z': z}) + if keep_module=='random': + import random + # 创建一个因子列表的副本,并随机打乱顺序 + shuffled_factors = factors.copy() + random.shuffle(shuffled_factors) + return shuffled_factors + else: + def sort_key(item): + """返回一个用于排序的元组""" + return (item['x'] + item['y'] + item['z'], item['z'], item['y'], item['x']) + + # 使用 sorted() 函数(返回一个新的排序后的列表,不改变原列表) + sorted_factor = sorted(factors, key=sort_key) + return sorted_factor +def typejudge(number): + if number in [1, 2]: + return "Triclinic" + elif 3 <= number <= 15: + return "Monoclinic" + elif 16 <= number <= 74: + return "Orthorhombic" + elif 75 <= number <= 142: + return "Tetragonal" + elif 143 <= number <= 167: + return "Trigonal" + elif 168 <= number <= 194: + return "Hexagonal" + elif 195 <= number <= 230: + return "Cubic" + else: + return "Unknown" +def strategy_divide(struct,total,keep_module=None): + space_group_info = struct.get_space_group_info() + space_group_symbol = space_group_info[0] + all_spacegroup_symbols = [spglib.get_spacegroup_type(i) for i in range(1, 531)] + symbol = all_spacegroup_symbols[0] + for symbol_i in all_spacegroup_symbols: + if space_group_symbol == symbol_i.international_short: + symbol = symbol_i + space_type = typejudge(symbol.number) + print(f"当前空间群符号为{space_group_symbol},序号为{symbol.number},对应的晶体体系为{space_type}") + divides = [] + if space_type == "Hexagonal": + print('当前为六方晶系,暂不处理') + if space_type == "Cubic": + print("当前为立方晶体,三个方向同步") + divides = factorize_to_three_factors(total, "xyz",keep_module=keep_module) + else: + print("为其他晶系,假设三个方向不同") + divides = factorize_to_three_factors(total,keep_module=keep_module) + return divides +def get_first_non_explicit_element(species_str, explict_element= ["Li","Li+"]): + """ + 从物种字符串中获取第一个不在explict_element中的元素符号 + + 参数: + species_str: 物种字符串,如 "Li+:0.689, Sc3+:0.311" + explict_element: 需要排除的元素列表,如 ["Li"] + + 返回: + str: 第一个符合条件的元素符号,如 "Sc" + 如果没有找到则返回空字符串 "" + """ + if not species_str.strip(): + return "" + + # 分割字符串获取各个物种部分 + species_parts = [part.strip() for part in species_str.split(",") if part.strip()] + + for part in species_parts: + # 提取元素符号(去掉电荷和占据数部分) + element_with_charge = part.split(":")[0].strip() + # 提取纯元素符号(去掉数字和特殊符号) + pure_element = ''.join([c for c in element_with_charge if c.isalpha()]) + + if pure_element not in explict_element: + return pure_element + + return "" +def calculate_expansion_factor(Occupation_list,calculate_type='high'): + """ + 计算Occupation_list的扩大倍数,支持不同精度模式 + + 参数: + Occupation_list: List[Dict], 每个字典包含: + { + "occupation": float, + "atom_serial": List[int], + "numerator": None, + "denominator": None + } + calculate_type: str, 计算精度模式 ('high', 'normal', 'low') + - high: 精确分数(默认) + - normal: 分母≤100的最接近分数 + - low: 分母≤10的最接近分数 + + 返回: + int: 扩大倍数(所有分母的最小公倍数) + List[Dict]: 更新后的Occupation_list(包含分子和分母) + """ + if not Occupation_list: + return 1, [] + + # Step 1: 根据精度要求计算分数 + for entry in Occupation_list: + occu = entry["occupation"] + + if calculate_type == 'high': + # 高精度模式 - 使用精确分数 + fraction = Fraction(occu).limit_denominator() + elif calculate_type == 'normal': + # 普通精度 - 分母≤100 + fraction = Fraction(occu).limit_denominator(100) + elif calculate_type == 'low': + # 低精度 - 分母≤10 + fraction = Fraction(occu).limit_denominator(10) + elif calculate_type == 'very low': + # 低精度 - 分母≤10 + fraction = Fraction(occu).limit_denominator(5) + else: + raise ValueError("calculate_type必须是'high', 'normal'或'low'") + + entry["numerator"] = fraction.numerator + entry["denominator"] = fraction.denominator + + # Step 2: 计算所有分母的最小公倍数 + denominators = [entry["denominator"] for entry in Occupation_list] + lcm = reduce(lambda a, b: a * b // math.gcd(a, b), denominators, 1) + + # Step 3: 统一分母 + for entry in Occupation_list: + denominator = entry["denominator"] + entry["numerator"] = entry["numerator"] * (lcm // denominator) + entry["denominator"] = lcm + + return lcm, Occupation_list +def get_occu(s_str,explict_element): + ''' + 这里暂时不考虑无化合价的情况 + Args: + s_str: + + Returns: + + ''' + if not s_str.strip(): + return {} + pattern = r'([A-Za-z0-9+-]+):([0-9.]+)' + matches = re.findall(pattern, s_str) + result = {} + for species, occu in matches: + try: + if species not in explict_element: + return occu + except ValueError: + continue # 忽略无效数字 + + return 1 +def process_cif_file(struct, explict_element=["Li", "Li+"]): + """ + 统计结构中各原子的occupation情况(忽略occupation=1.0的原子)并分类 + 参数: + struct: Structure对象 (从CIF文件读取) + 返回: + List[Dict]: Occupation_list,每个字典格式为: + { + "occupation": list, # 占据值(不为1.0) + "atom_serial": List[int], # 原子序号列表 + "numerator": None, # 预留分子 + "denominator": None # 预留分母 + "split":list[string]#对应的值 + } + """ + if not isinstance(struct, Structure): + raise TypeError("输入必须为pymatgen的Structure对象") + + occupation_dict = defaultdict(list) + # 用于记录每个occupation对应的元素列表 + split_dict = {} + for i, site in enumerate(struct): + # 获取当前原子的occupation(默认为1.0) + occu = get_occu(site.species_string, explict_element) + # 忽略occupation=1.0的原子 + if occu != 1.0: + if site.species.chemical_system not in explict_element: + occupation_dict[occu].append(i + 1) # 原子序号从1开始计数 + # 提取元素名称列表 + elements = [] + if ':' in site.species_string: + # 格式如 'S:0.494, Cl:0.506' 或 'S2-:0.494, Cl-:0.506' + parts = site.species_string.split(',') + for part in parts: + # 提取冒号前的部分并去除前后空格 + element_with_valence = part.strip().split(':')[0].strip() + # 从带有价态的元素符号中提取纯元素符号(只保留元素符号部分) + # 元素符号通常是一个大写字母,可能后跟一个小写字母 + import re + element_match = re.match(r'([A-Z][a-z]?)', element_with_valence) + if element_match: + element = element_match.group(1) + elements.append(element) + else: + # 只有一个元素,也需要处理可能的价态 + import re + element_match = re.match(r'([A-Z][a-z]?)', site.species_string) + if element_match: + elements = [element_match.group(1)] + # 存储该occupation对应的元素列表 + split_dict[occu] = elements + + # 转换为要求的输出格式 + Occupation_list = [ + { + "occupation": occu, + "atom_serial": serials, + "numerator": None, + "denominator": None, + "split": split_dict.get(occu, []) # 添加split字段 + } + for occu, serials in occupation_dict.items() + ] + + return Occupation_list +def merge_structures(structure_list, merge_dict): + """ + 按指定方向合并多个结构 + + 参数: + structure_list: List[Structure], 待合并的结构列表(所有结构必须具有相同的晶格) + merge_dict: Dict[str, int], 指定各方向的合并次数(如 {"x":1, "y":1, "z":2}) + + 返回: + Structure: 合并后的新结构 + """ + if not structure_list: + raise ValueError("结构列表不能为空") + + # 检查所有结构是否具有相同的晶格 + ref_lattice = structure_list[0].lattice + for s in structure_list[1:]: + if not np.allclose(s.lattice.matrix, ref_lattice.matrix): + raise ValueError("所有结构的晶格必须相同") + + # 计算总合并次数 + total_merge = merge_dict.get("x", 1) * merge_dict.get("y", 1) * merge_dict.get("z", 1) + if len(structure_list) != total_merge: + raise ValueError(f"结构数量({len(structure_list)})与合并次数({total_merge})不匹配") + + # 获取参考结构的晶格参数 + a, b, c = ref_lattice.abc + alpha, beta, gamma = ref_lattice.angles + + # 计算新晶格尺寸 + new_a = a * merge_dict.get("x", 1) + new_b = b * merge_dict.get("y", 1) + new_c = c * merge_dict.get("z", 1) + new_lattice = Lattice.from_parameters(new_a, new_b, new_c, alpha, beta, gamma) + + # 合并所有原子 + all_sites = [] + for i, structure in enumerate(structure_list): + # 计算当前结构在合并后的偏移量 + x_offset = (i // (merge_dict.get("y", 1) * merge_dict.get("z", 1))) % merge_dict.get("x", 1) + y_offset = (i // merge_dict.get("z", 1)) % merge_dict.get("y", 1) + z_offset = i % merge_dict.get("z", 1) + + # 对每个原子应用偏移 + for site in structure: + coords = site.frac_coords.copy() + coords[0] = (coords[0] + x_offset) / merge_dict.get("x", 1) + coords[1] = (coords[1] + y_offset) / merge_dict.get("y", 1) + coords[2] = (coords[2] + z_offset) / merge_dict.get("z", 1) + all_sites.append({"species": site.species, "coords": coords}) + + # 创建新结构 + return Structure(new_lattice, [site["species"] for site in all_sites], [site["coords"] for site in all_sites]) +def generate_structure_list(base_structure,occupation_list,explict_element=["Li","Li+"]): + if not occupation_list: + return [base_structure.copy()] + lcm = occupation_list[0]["denominator"] + structure_list = [base_structure.copy() for _ in range(lcm)] + for entry in occupation_list: + numerator = entry["numerator"] + denominator = entry["denominator"] + atom_indices = entry["atom_serial"] # 注意:原子序号从1开始 + for atom_idx in atom_indices: + occupancy_dict = mark_atoms_randomly(numerator=numerator,denominator=denominator) + original_site = base_structure.sites[atom_idx - 1] + element = get_first_non_explicit_element(original_site.species_string,explict_element) + for copy_idx ,occupy in occupancy_dict.items(): + structure_list[copy_idx].remove_sites([atom_idx-1]) + oxi_state= extract_oxi_state(original_site.species_string,element) + if len(entry["split"])==1: + if occupy: + new_site = PeriodicSite( + species=Species(element, oxi_state), + coords=original_site.frac_coords, + lattice=structure_list[copy_idx].lattice, + to_unit_cell=True, + label=original_site.label + ) + structure_list[copy_idx].sites.insert(atom_idx - 1, new_site) + else: + species_dict = {Species("Li", 1.0):0.0} + new_site = PeriodicSite( + species = species_dict, + coords=original_site.frac_coords, + lattice=structure_list[copy_idx].lattice, + to_unit_cell=True, + label=original_site.label + ) + structure_list[copy_idx].sites.insert(atom_idx - 1, new_site) + else: + if occupy: + new_site = PeriodicSite( + species=Species(element, oxi_state), + coords=original_site.frac_coords, + lattice=structure_list[copy_idx].lattice, + to_unit_cell=True, + label=original_site.label + ) + structure_list[copy_idx].sites.insert(atom_idx - 1, new_site) + else: + new_site = PeriodicSite( + species=Species(entry['split'][1], oxi_state), + coords=original_site.frac_coords, + lattice=structure_list[copy_idx].lattice, + to_unit_cell=True, + label=original_site.label + ) + structure_list[copy_idx].sites.insert(atom_idx - 1, new_site) + return structure_list +def expansion(input_file,output_folder,keep_number,calculate_type='high',keep_module=None): + structure_origin = Structure.from_file(input_file) + lmp,oc_list = calculate_expansion_factor(process_cif_file(structure_origin),calculate_type=calculate_type) + strategy = strategy_divide(structure_origin,lmp,keep_module) + st_list = generate_structure_list(structure_origin,oc_list) + # 获取基础文件名(不含路径和扩展名) + base_name = os.path.splitext(os.path.basename(input_file))[0] + mergeds=[] + names=[] + if len(strategy)< keep_number: + keep_number = len(strategy) + for index in range(keep_number): + merged = merge_structures(st_list, strategy[index]) + + suffix = "x{}y{}z{}".format( + strategy[index]["x"], + strategy[index]["y"], + strategy[index]["z"] + ) + output_filename='' + if keep_module=='classify': + print(f"{base_name}采用扩展方式为{suffix}") + output_filename=f"{base_name}.cif" + elif keep_module=='random': + print(f"{base_name}采用扩展方式为{suffix}") + output_filename=f"{base_name}-{suffix}.cif" + else: + output_filename = f"{base_name}-{suffix}.cif" + output_path = os.path.join(output_folder, output_filename) + + merged.to(filename=output_path, fmt="cif") + + print(f"Saved: {output_path}") + if keep_module=='classify': + + return merged + if keep_module=='random': + mergeds.append(merged) + names.append(output_filename) + return mergeds,names + + +if __name__ == "__main__": + #expansion("../data/tmp/36.cif","../data/tmp",1,calculate_type='low') + expansion("../data/input_ClBr_set/36.cif", "../data/tmp", 3, calculate_type='low',keep_module='random') + #expansion("../data/input/1234.cif", "../data/input/output", 1, calculate_type='low',keep_module='classify') + # s1 = Structure.from_file("../data/input_pre/mp-6783.cif") + # s2 = Structure.from_file("../data/input_pre/ICSD_1234.cif") + # print(process_cif_file(s2)) + # lmp,oc_list=calculate_expansion_factor(process_cif_file(s2)) + # print(oc_list) + # strategy = strategy_divide(s2,lmp) + # print(strategy) + # st_list=generate_structure_list(s2,oc_list) + # merged = merge_structures(st_list,strategy[0]) + # # merged = merge_structures([s1, s2], {"x": 1, "y": 1, "z": 2}) + # merged.to("merged.cif", "cif") # 保存合并后的结构 diff --git a/py/make_sh.py b/py/make_sh.py new file mode 100644 index 0000000..6226c35 --- /dev/null +++ b/py/make_sh.py @@ -0,0 +1,156 @@ +import os + + +def creat_sh(input_folder, anion, sh_file_path='analyze.sh'): + """ + 创建shell脚本,只处理两类CIF文件: + 1. 纯数字命名的CIF文件 (例如: 123.cif) + 2. 数字-坐标格式的CIF文件 (例如: 123-x1y2z3.cif) + + 参数: + input_folder: 输入文件夹路径 + anion: 阴离子类型 + sh_file_path: 生成的shell脚本路径 + """ + # 文件夹路径 + folder_path = input_folder + + import re + + # 定义两种文件名模式的正则表达式 + pattern1 = re.compile(r'^\d+\.cif$') # 纯数字.cif + pattern2 = re.compile(r'^\d+-x\d+y\d+z\d+\.cif$') # 数字-x数字y数字z数字.cif + + # 打开SH脚本文件用于写入 + with open(sh_file_path, 'w') as sh_file: + # 写入脚本头部 + sh_file.write('#!/bin/bash\n') + + # 遍历文件夹中的所有文件 + for filename in os.listdir(folder_path): + file_path = os.path.join(folder_path, filename) + + # 只处理文件(不处理文件夹) + if os.path.isfile(file_path): + # 检查文件名是否匹配两种模式之一 + if pattern1.match(filename) or pattern2.match(filename): + # 生成对应的命令 + command = f"python ../../../tool/analyze_voronoi_nodes.py {filename} -i ../../../tool/{anion}.yaml > {filename}.txt\n" + # 将命令写入SH脚本文件 + sh_file.write(command) + + print(f"SH脚本已生成:{sh_file_path}") + + +import os + + +def create_sh_recursive(base_folder, tool_path="tool", relative_depth=2): + """ + 递归遍历文件夹,为每个包含.cif文件的文件夹生成analyze.sh脚本, + 并在基础文件夹下创建一个sh_all.sh来执行所有脚本。 + + 参数: + base_folder: 起始文件夹路径 + tool_path: 工具目录的基本路径 + relative_depth: 基础相对深度,用于计算正确的相对路径 + """ + # 用于收集所有生成的analyze.sh脚本的相对路径 + analyze_sh_paths = [] + base_folder_name = os.path.basename(base_folder) + + def process_folder(folder_path, current_depth=0): + print(f"处理文件夹: {folder_path}") + + # 获取当前文件夹名称 + folder_name = os.path.basename(folder_path) + + # 检查当前文件夹是否包含.cif文件 + has_cif_files = any( + f.endswith('.cif') for f in os.listdir(folder_path) if os.path.isfile(os.path.join(folder_path, f))) + + # 如果当前文件夹包含.cif文件,生成脚本 + if has_cif_files: + # 计算正确的工具路径(根据深度增加../) + dots = "../" * (relative_depth + current_depth) + tool_relative_path = f"{dots}{tool_path}" + + # 确定anion参数(使用文件夹名) + anion = folder_name + + # 生成脚本文件路径 + sh_file_path = os.path.join(folder_path, "analyze.sh") + + # 创建脚本 + with open(sh_file_path, 'w') as sh_file: + sh_file.write('#!/bin/bash\n') + for filename in os.listdir(folder_path): + file_path = os.path.join(folder_path, filename) + if os.path.isfile(file_path) and filename.endswith('.cif'): + command = f"python {tool_relative_path}/analyze_voronoi_nodes.py {filename} -i {tool_relative_path}/{anion}.yaml > {filename}.txt\n" + sh_file.write(command) + + # 将此脚本添加到收集器中 + # 计算相对于基础文件夹的路径 + rel_path = os.path.relpath(folder_path, base_folder) + analyze_sh_paths.append(rel_path) + + print(f"生成脚本: {sh_file_path} (工具路径: {tool_relative_path})") + + # 获取子文件夹列表 + subdirs = [d for d in os.listdir(folder_path) if os.path.isdir(os.path.join(folder_path, d))] + + # 处理复合阴离子文件夹的特殊情况 + if "+" in folder_name: + elements = folder_name.split("+") + for element in elements: + element_dir = os.path.join(folder_path, element) + # 如果对应元素的子文件夹不存在,创建它 + if not os.path.exists(element_dir): + os.makedirs(element_dir) + print(f"创建子文件夹: {element_dir}") + # 确保这个子文件夹被包含在递归处理列表中 + if element not in subdirs: + subdirs.append(element) + + # 递归处理所有子文件夹 + for subdir in subdirs: + subdir_path = os.path.join(folder_path, subdir) + process_folder(subdir_path, current_depth + 1) + + # 开始递归处理 + process_folder(base_folder) + + # 创建sh_all.sh脚本 + sh_all_path = os.path.join(base_folder, "sh_all.sh") + with open(sh_all_path, 'w') as sh_all: + sh_all.write('#!/bin/bash\n\n') + sh_all.write(f'# process all analyze.sh\n\n') + + # 记录初始目录 + sh_all.write('# remember current dir\n') + sh_all.write('INITIAL_DIR=$(pwd)\n\n') + + # 为每个包含analyze.sh的目录添加命令 + for path in analyze_sh_paths: + sh_all.write(f'echo "process {path}/analyze.sh"\n') + sh_all.write(f'cd "{path}"\n') + sh_all.write('bash analyze.sh\n') + sh_all.write(f'cd "$INITIAL_DIR"\n\n') + + # 添加完成消息 + sh_all.write('echo "done!"\n') + + # 修改权限使脚本可执行 + os.chmod(sh_all_path, 0o755) + print(f"生成总执行脚本: {sh_all_path}") + print("所有脚本生成完成!") +# 示例调用 +# create_sh_recursive("../data/after_step1") + +if __name__ == '__main__': + # creat_sh("../data/after_step1/O","O","../data/after_step1/O/analyze.sh") + # creat_sh("../data/after_step1/S","S","../data/after_step1/S/analyze.sh") + # creat_sh("../data/after_step1/Cl","Cl","../data/after_step1/Cl/analyze.sh") + # creat_sh("../data/after_step1/Br","Br","../data/after_step1/Br/analyze.sh") + create_sh_recursive("../data/after_step1") \ No newline at end of file diff --git a/py/pre_process.py b/py/pre_process.py new file mode 100644 index 0000000..af3f1d1 --- /dev/null +++ b/py/pre_process.py @@ -0,0 +1,131 @@ +import re +import os +from pymatgen.core.structure import Structure +from pymatgen.core.periodic_table import Element +import yaml +from pymatgen.core.periodic_table import Specie +from expansion import expansion +from expansion import process_cif_file +def generate_valence_yaml(output_yaml_path): + """ + Generate a YAML file containing the most common oxidation states for elements. + + Parameters: + output_yaml_path (str): Path to save the generated YAML file. + """ + valences = {} + for element in Element: + common_oxidation_states = element.common_oxidation_states + if common_oxidation_states: + # Metals/metalloids: take the maximum oxidation state + # Non-metals: take the minimum (most negative) oxidation state + if element.is_metalloid or element.is_metal: + valences[element.symbol] = max(common_oxidation_states) + else: + valences[element.symbol] = min(common_oxidation_states) + + # Save the valences dictionary to a YAML file + with open(output_yaml_path, "w") as file: + yaml.dump(valences, file, default_flow_style=False) + + +def apply_oxidation_states_to_cif(input_cif_path, valence_yaml_path, output_cif_path,calculate_type='low',output_folder = None): + """ + Modify a CIF file to include oxidation states for each element based on a YAML file, + unless oxidation states are already present in the CIF. + """ + # Load the structure from the CIF file + structure = Structure.from_file(input_cif_path) + oxi = process_cif_file(structure) + # classsify类型 + # if oxi: + # structure = expansion(input_cif_path,'../data/input_oxidation',3,calculate_type=calculate_type,keep_module='classify') + # # # 判断是否所有site都已经有oxidation state + # has_oxidation = all( + # all(isinstance(sp, Specie) for sp in site.species.keys()) + # for site in structure.sites + # ) + # if not has_oxidation: + # # 只有当没有价态时才读取yaml并赋值 + # with open(valence_yaml_path, "r") as file: + # valences = yaml.safe_load(file) + # # Apply oxidation states to the structure + # structure.add_oxidation_state_by_element(valences) + # + # # Save the updated structure to a new CIF file + # structure.to(filename=output_cif_path) + structures=[] + names=[] + if oxi: + structures,names = expansion(input_cif_path,'../data/input_oxidation',3,calculate_type=calculate_type,keep_module='random') + # # 判断是否所有site都已经有oxidation state + for structure,name in zip(structures,names): + has_oxidation = all( + all(isinstance(sp, Specie) for sp in site.species.keys()) + for site in structure.sites + ) + if not has_oxidation: + # 只有当没有价态时才读取yaml并赋值 + with open(valence_yaml_path, "r") as file: + valences = yaml.safe_load(file) + # Apply oxidation states to the structure + structure.add_oxidation_state_by_element(valences) + + # Save the updated structure to a new CIF file + structure.to(filename=os.path.join(output_folder,name )) + else: + has_oxidation = all( + all(isinstance(sp, Specie) for sp in site.species.keys()) + for site in structure.sites + ) + if not has_oxidation: + # 只有当没有价态时才读取yaml并赋值 + with open(valence_yaml_path, "r") as file: + valences = yaml.safe_load(file) + # Apply oxidation states to the structure + structure.add_oxidation_state_by_element(valences) + + # Save the updated structure to a new CIF file + structure.to(filename=output_cif_path) +def data_add_state(input_folder, valence_yaml_path, output_folder,output_occupatition_folder=None,calculate_type='normal'): + if not os.path.exists(input_folder): + print(f"{input_folder} 文件夹不存在") + return + if not os.path.exists(output_folder): + os.makedirs(output_folder) + print(f"目录 {output_folder} 已创建") + + for filename in os.listdir(input_folder): + if filename.endswith(".cif"): # 检查文件是否以.cif结尾 + file_path = os.path.join(input_folder, filename) + + # 提取文件名中的数字部分 + match = re.search(r'\d+', filename) + if match: + new_filename = match.group(0) + ".cif" # 提取数字并加上 .cif 后缀 + else: + print(f"文件名 {filename} 中未找到数字部分,跳过处理") + continue + + # 构造输出文件路径 + output_cif_path = os.path.join(output_folder, new_filename) + + # 应用氧化态并保存新文件 + print(f"正在处理{filename}") + try: + apply_oxidation_states_to_cif(file_path, valence_yaml_path, output_cif_path,calculate_type=calculate_type,output_folder=output_folder) + except Exception as e: + print(f"{filename}出现问题!") + print(f"{filename} 已完成") + + + +if __name__ == "__main__": + # Example usage: + # Step 1: Generate the valence YAML file + valence_yaml = "../tool/valence_states.yaml" + + # Step 2: Process CIF files in the input folder + data_add_state("../data/input_pre", valence_yaml, "../data/input","../data/input_oxidation") + # Step 3: Process Occupation + # data_process_Occupatiton("") diff --git a/py/step1.py b/py/step1.py new file mode 100644 index 0000000..32ca068 --- /dev/null +++ b/py/step1.py @@ -0,0 +1,54 @@ +from pymatgen.core import Structure +from pymatgen.core.periodic_table import Element, Specie +from pymatgen.io.cif import CifWriter + +from crystal_2 import crystal +import crystal_2 +import os +import shutil +from pymatgen.io.cif import CifWriter + +def read_files_check_basic(folder_path): + file_contents = [] + + if not os.path.exists(folder_path): + print(f"{folder_path} 文件夹不存在") + return file_contents + + for filename in os.listdir(folder_path): + file_path = os.path.join(folder_path, filename) + + if os.path.isfile(file_path): + try: + temp = crystal(file_path) + file_contents.append(temp) + except Exception as e: + print(e) + print(f"正在处理{filename}") + temp.check_basic() + if temp.check_basic_result: + if not "+" in temp.anion: + target_folder = os.path.join("../data/after_step1",f"{temp.anion}") + if not os.path.exists(target_folder): + os.makedirs(target_folder) + + # 目标文件路径 + target_file_path = os.path.join(target_folder, filename) + + # 复制文件到目标文件夹 + shutil.copy(file_path, target_file_path) + print(f"文件 {filename}通过基本筛选,已复制到 {target_folder}") + else: + anions = temp.anion.split("+") + for anion in anions: + target_folder = os.path.join("../data/after_step1", f"{temp.anion}") + target_folder = os.path.join(target_folder, anion) + if not os.path.exists(target_folder): + os.makedirs(target_folder) + + # 目标文件路径 + target_file_path = os.path.join(target_folder, filename) + # 复制文件到目标文件夹 + shutil.copy(file_path, target_file_path) + print(f"文件 {filename}通过基本筛选,已复制到 {target_folder}") +read_files_check_basic("../data/input") \ No newline at end of file diff --git a/py/step2-5-file_process.py b/py/step2-5-file_process.py new file mode 100644 index 0000000..fe267f4 --- /dev/null +++ b/py/step2-5-file_process.py @@ -0,0 +1,121 @@ +from step2 import process_files as step2_process +from step3 import process_files as step3_process +from step4 import process_files as step4_process +from step5 import read_files_check_partical as step5_process +import os +import shutil + +import os +import shutil + + +def create_empty_directory_structure(source_dir, target_dir): + """ + 递归地复制源文件夹的目录结构到目标文件夹,创建空文件夹。 + + 参数: + source_dir (str): 源文件夹路径 + target_dir (str): 目标文件夹路径 + + 返回: + int: 成功创建的文件夹数量 + + 异常: + FileNotFoundError: 如果源文件夹不存在 + PermissionError: 如果没有权限读取源文件夹或写入目标文件夹 + """ + # 计数器 + created_count = 0 + + # 检查源文件夹是否存在 + if not os.path.exists(source_dir): + raise FileNotFoundError(f"源文件夹不存在: {source_dir}") + + # 确保目标文件夹存在 + if not os.path.exists(target_dir): + print(f"目标文件夹不存在,正在创建: {target_dir}") + os.makedirs(target_dir) + created_count += 1 + + # 递归函数,复制文件夹结构 + def copy_structure(src, dst): + nonlocal created_count + + try: + # 获取源目录中的所有项目 + items = os.listdir(src) + + # 遍历所有项目 + for item in items: + src_path = os.path.join(src, item) + dst_path = os.path.join(dst, item) + + # 如果是目录,递归复制结构 + if os.path.isdir(src_path): + if not os.path.exists(dst_path): + os.makedirs(dst_path) + created_count += 1 + print(f"创建文件夹: {dst_path}") + copy_structure(src_path, dst_path) + # 对于文件,我们不做任何处理,因为我们只需要文件夹结构 + + except PermissionError: + print(f"无权限访问目录: {src}") + except Exception as e: + print(f"处理目录 {src} 时出错: {str(e)}") + + # 开始递归复制 + try: + copy_structure(source_dir, target_dir) + print(f"已成功在 {target_dir} 中创建 {created_count} 个文件夹,复制了完整的目录结构") + return created_count + except Exception as e: + print(f"整体操作失败: {str(e)}") + return created_count +if __name__ == "__main__": + create_empty_directory_structure("../data/after_step1","../data/after_step2") + create_empty_directory_structure("../data/after_step1", "../data/after_step3") + create_empty_directory_structure("../data/after_step1", "../data/after_step4") + create_empty_directory_structure("../data/after_step1", "../data/after_step5") + create_empty_directory_structure("../data/after_step1", "../data/after_step6") + for files in os.listdir("../data/after_step1"): + source_path = os.path.join("../data/after_step1", files) + target_path = os.path.join("../data/after_step2", files) + file = files # 如果需要从文件名提取,替换这一行 + print('-------------------') + if "+" in file: + # 第二种情况:多个元素,如"S+O" + elements = file.split("+") + print(f"处理多元素文件 {file},拆分为:{elements}") + for element in elements: + print(element) + source_path_tmp = os.path.join(source_path, element) + target_path_tmp = os.path.join(target_path, element) + print('正在做第二步筛选') + step2_process(source_path_tmp, target_path_tmp, element) + target_path_tmp_2 = os.path.join(f"../data/after_step3/{files}", element) + print('正在做第三步筛选') + step3_process(source_path_tmp, target_path_tmp, target_path_tmp_2,element) + target_path_tmp_3 = os.path.join(f"../data/after_step4/{files}", element) + print('正在做第四步筛选') + step4_process(source_path_tmp, target_path_tmp_2,target_path_tmp_3, element) + target_path_tmp_4 = os.path.join(f"../data/after_step5/{files}", element) + print('正在做第五步筛选') + step5_process(target_path_tmp_3,target_path_tmp_4) + else: + # 第一种情况:单一元素,如"S" + print(f"处理单一元素文件:{file}") + target_path_1 = os.path.join("../data/after_step3", files) + target_path_2 = os.path.join("../data/after_step4", files) + target_path_3 = os.path.join("../data/after_step5", files) + print('正在做第二步筛选') + step2_process(source_path, target_path, file) + print('正在做第三步筛选') + step3_process(source_path, target_path,target_path_1, file) + print('正在做第四步筛选') + step4_process(source_path, target_path_1,target_path_2, file) + print('正在做第五步筛选') + step5_process(target_path_2,target_path_3) + + + diff --git a/py/step2.py b/py/step2.py new file mode 100644 index 0000000..ad6703e --- /dev/null +++ b/py/step2.py @@ -0,0 +1,80 @@ +import os +import re +import shutil + + +def process_files(cif_folder, output_folder, anion): + + # 确保输出文件夹存在 + os.makedirs(output_folder, exist_ok=True) + + # 获取 txt 文件夹中的所有 txt 文件 + txt_files = [f for f in os.listdir(cif_folder) if f.endswith('.txt')] + + + # 遍历 txt 文件 + for txt_file in txt_files: + txt_path = os.path.join(cif_folder, txt_file) + + # 打开并读取 txt 文件内容 + with open(txt_path, 'r', encoding='utf-8') as file: + content = file.read() + matches = re.findall(r"Percolation diameter \(A\): (\d+\.\d+)", content) + # 使用正则表达式查找符合条件的内容 + if matches: + + # 提取文件名(去掉.txt后缀) + base_name = os.path.splitext(txt_file)[0] + check = False + if anion == "O": + print(f"{base_name}的perconlation diameter为{matches[0]}A") + if float(matches[0]) > 0.5: + check = True + print(f"符合要求") + else: + print("不符合要求") + elif anion == "S": + print(f"{base_name}的perconlation diameter为{matches[0]}A") + if float(matches[0]) > 0.55: + check = True + print(f"符合要求") + else: + print("不符合要求") + elif anion == "Br": + print(f"{base_name}的perconlation diameter为{matches[0]}A") + if float(matches[0]) > 0.45: + check = True + print("符合要求") + else: + print("不符合要求") + elif anion == "Cl": + print(f"{base_name}的perconlation diameter为{matches[0]}A") + if float(matches[0]) > 0.45: + check = True + print("符合要求") + else: + print("不符合要求") + if check: + # 查找与 txt 文件同名的 cif 文件 + cif_path = os.path.join(cif_folder, base_name) + + # 如果对应的 cif 文件存在,复制到 output_folder + if os.path.exists(cif_path): + shutil.copy(cif_path, os.path.join(output_folder, base_name)) + print(f"Copied {base_name} to {output_folder}") + + +def work_py(input_folder, output_folder): + if not os.path.exists(output_folder): + print("not exists") + for filename in os.listdir(input_folder): + target_folder = os.path.join(output_folder, filename) + from_folder = os.path.join(input_folder, filename) + process_files(from_folder, target_folder) + +if __name__ == "__main__": + # work_py("../data/after_step1","../data/after_step2" ) + # process_files("../data/after_step1/O", "../data/after_step2/O", "O") + # process_files("../data/after_step1/S", "../data/after_step2/S", "S") + process_files("../data/after_step1/Cl", "../data/after_step2/Cl", "Br") + process_files("../data/after_step1/Br", "../data/after_step2/Br", "Cl") \ No newline at end of file diff --git a/py/step3.py b/py/step3.py new file mode 100644 index 0000000..f726eb4 --- /dev/null +++ b/py/step3.py @@ -0,0 +1,72 @@ +import os +import re +import shutil + + +def process_files(cif_folder,input_folder,output_folder, anion): + + # 确保输出文件夹存在 + os.makedirs(output_folder, exist_ok=True) + + # 获取 txt 文件夹中的所有 txt 文件 + txt_files = [f for f in os.listdir(cif_folder) if f.endswith('.txt')] + + + # 遍历 txt 文件 + for txt_file in txt_files: + txt_path = os.path.join(cif_folder, txt_file) + + # 打开并读取 txt 文件内容 + with open(txt_path, 'r', encoding='utf-8') as file: + content = file.read() + matches = re.findall(r"the minium of d\s+([\d\.]+)\s*#", content) + # 使用正则表达式查找符合条件的内容 + if matches: + + # 提取文件名(去掉.txt后缀) + base_name = os.path.splitext(txt_file)[0] + check = False + if anion == "O": + print(f"{base_name}的最短距离为{matches[0]}A") + if float(matches[0]) < 3: + check = True + print(f"符合要求") + else: + print("不符合要求") + elif anion == "S": + print(f"{base_name}的最短距离为{matches[0]}A") + if float(matches[0]) < 3: + check = True + print(f"符合要求") + else: + print("不符合要求") + elif anion == "Cl": + print(f"{base_name}的最短距离为{matches[0]}A") + if float(matches[0]) < 3: + check = True + print(f"符合要求") + else: + print("不符合要求") + elif anion == "Br": + print(f"{base_name}的最短距离为{matches[0]}A") + if float(matches[0]) < 3: + check = True + print(f"符合要求") + else: + print("不符合要求") + if check: + # 查找与 txt 文件同名的 cif 文件 + cif_path = os.path.join(input_folder, base_name) + + # 如果对应的 cif 文件存在,复制到 output_folder + if os.path.exists(cif_path): + shutil.copy(cif_path, os.path.join(output_folder, base_name)) + print(f"Copied {base_name} to {output_folder}") + + + +if __name__ == '__main__': + # process_files("../data/after_step1/O","../data/after_step2/O", "../data/after_step3/O", "O") + # process_files("../data/after_step1/S", "../data/after_step2/S","../data/after_step3/S", "S") + process_files("../data/after_step1/Cl", "../data/after_step2/Cl","../data/after_step3/Cl", "Cl") + process_files("../data/after_step1/Br", "../data/after_step2/Br","../data/after_step3/Br", "Br") \ No newline at end of file diff --git a/py/step4.py b/py/step4.py new file mode 100644 index 0000000..62d79bf --- /dev/null +++ b/py/step4.py @@ -0,0 +1,72 @@ +import os +import re +import shutil + + +def process_files(cif_folder,input_folder,output_folder, anion): + + # 确保输出文件夹存在 + os.makedirs(output_folder, exist_ok=True) + + # 获取 txt 文件夹中的所有 txt 文件 + txt_files = [f for f in os.listdir(cif_folder) if f.endswith('.txt')] + + + # 遍历 txt 文件 + for txt_file in txt_files: + txt_path = os.path.join(cif_folder, txt_file) + + # 打开并读取 txt 文件内容 + with open(txt_path, 'r', encoding='utf-8') as file: + content = file.read() + matches = re.findall(r"Maximum node length detected: (\d+\.\d+) A", content) + # 使用正则表达式查找符合条件的内容 + if matches: + + # 提取文件名(去掉.txt后缀) + base_name = os.path.splitext(txt_file)[0] + check = False + if anion == "O": + print(f"{base_name}的扩大锂离子直径为{matches[0]}A") + if float(matches[0]) > 2.2: + check = True + print(f"符合要求") + else: + print("不符合要求") + elif anion == "S": + print(f"{base_name}的扩大锂离子直径为{matches[0]}A") + if float(matches[0]) > 2.2: + check = True + print(f"符合要求") + else: + print("不符合要求") + elif anion == "Cl": + print(f"{base_name}的扩大锂离子直径为{matches[0]}A") + if float(matches[0]) > 2: + check = True + print(f"符合要求") + else: + print("不符合要求") + elif anion == "Br": + print(f"{base_name}的扩大锂离子直径为{matches[0]}A") + if float(matches[0]) > 2: + check = True + print(f"符合要求") + else: + print("不符合要求") + if check: + # 查找与 txt 文件同名的 cif 文件 + cif_path = os.path.join(input_folder, base_name) + + # 如果对应的 cif 文件存在,复制到 output_folder + if os.path.exists(cif_path): + shutil.copy(cif_path, os.path.join(output_folder, base_name)) + print(f"Copied {base_name} to {output_folder}") + + + +if __name__ == "__main__": + # process_files("../data/after_step1/O","../data/after_step3/O", "../data/after_step4/O", "O") + # process_files("../data/after_step1/S", "../data/after_step3/S","../data/after_step4/S", "S") + process_files("../data/after_step1/Cl", "../data/after_step3/Cl","../data/after_step4/Cl", "Cl") + process_files("../data/after_step1/Br", "../data/after_step3/Br","../data/after_step4/Br", "Br") \ No newline at end of file diff --git a/py/step5.py b/py/step5.py new file mode 100644 index 0000000..9040c4c --- /dev/null +++ b/py/step5.py @@ -0,0 +1,57 @@ +from pymatgen.core import Structure +from pymatgen.core.periodic_table import Element, Specie +from pymatgen.io.cif import CifWriter + +from crystal_2 import crystal +import crystal_2 +import os +import shutil +from pymatgen.io.cif import CifWriter + + +def read_files_check_partical(input_folder,output_folder): + file_contents = [] + folder_path = input_folder + if not os.path.exists(folder_path): + print(f"{folder_path} 文件夹不存在") + return file_contents + + for filename in os.listdir(folder_path): + file_path = os.path.join(folder_path, filename) + + if os.path.isfile(file_path): + try: + temp = crystal(file_path) + file_contents.append(temp) + print(f"正在处理{filename}") + temp.check_practical() + if temp.check_practical_result: + target_folder = output_folder + if not os.path.exists(target_folder): + os.makedirs(target_folder) + + # 目标文件路径 + target_file_path = os.path.join(target_folder, filename) + + # 复制文件到目标文件夹 + shutil.copy(file_path, target_file_path) + print(f"文件 {filename}通过实际筛选,已复制到 {target_folder}") + except Exception as e: + print(e) + + + +def work_py(input_folder, output_folder): + if not os.path.exists(output_folder): + print("not exists") + for filename in os.listdir(input_folder): + target_folder = os.path.join(output_folder, filename) + from_folder = os.path.join(input_folder, filename) + read_files_check_partical(from_folder, target_folder) + +if __name__ == "__main__": + # work_py('../data/after_step4','../data/after_step5') + # read_files_check_partical('../data/after_step4/O','../data/after_step5/O') + # read_files_check_partical('../data/after_step4/S','../data/after_step5/S') + read_files_check_partical('../data/after_step4/Cl','../data/after_step5/Cl') + read_files_check_partical('../data/after_step4/Br','../data/after_step5/Br') \ No newline at end of file diff --git a/py/step6.py b/py/step6.py new file mode 100644 index 0000000..90ccdff --- /dev/null +++ b/py/step6.py @@ -0,0 +1,75 @@ +from pymatgen.core import Structure +from pymatgen.analysis.structure_matcher import StructureMatcher +import os +import shutil + +# 定义函数,用于将 CIF 文件进行分类并输出 + +def structure_classify(input_folder, output_folder): + """ + 分类 input_folder 中的 CIF 文件,并根据框架分组后存储到 output_folder 中。 + + :param input_folder: 输入文件夹路径,包含 .cif 文件。 + :param output_folder: 输出文件夹路径,用于存储分类后的 .cif 文件。 + """ + # 检查输入文件夹是否存在 + if not os.path.exists(input_folder): + print(f"输入文件夹 {input_folder} 不存在") + return + + # 创建输出文件夹(如果不存在) + if not os.path.exists(output_folder): + os.makedirs(output_folder) + + # 读取输入文件夹中的所有 .cif 文件 + structures = [] + file_map = {} # 记录文件路径与结构的对应关系 + for filename in os.listdir(input_folder): + if filename.endswith(".cif"): + file_path = os.path.join(input_folder, filename) + try: + structure = Structure.from_file(file_path) + structures.append(structure) + file_map[id(structure)] = filename # 使用结构对象的 id 作为键 + except Exception as e: + print(f"无法读取文件 {filename}:{e}") + + # 检查是否成功加载了任何结构 + if not structures: + print("未找到有效的 CIF 文件") + return + + # 分组结构 + matcher = StructureMatcher() + grouped_structures = [] + + for structure in structures: + matched = False + for group in grouped_structures: + if matcher.fit(structure, group[0]): # 比较结构是否匹配 + group.append(structure) + matched = True + break + if not matched: + grouped_structures.append([structure]) + + # 保存分组后的结构到输出文件夹 + for group_index, group in enumerate(grouped_structures): + group_folder = os.path.join(output_folder, f"group_{group_index}") + os.makedirs(group_folder, exist_ok=True) + + for structure_index, structure in enumerate(group): + output_file = os.path.join(group_folder, f"structure_{structure_index}.cif") + structure.to(filename=output_file) + + # 获取原始文件名并复制到分组文件夹 + original_filename = file_map[id(structure)] # 使用结构对象的 id 获取文件名 + original_file_path = os.path.join(input_folder, original_filename) + shutil.copy(original_file_path, group_folder) + + print(f"处理完成,分类后的结构已保存到 {output_folder}") + +if __name__ == "__main__": + # 示例调用 + structure_classify("../data/after_step5/S", "../data/after_step6/S") + structure_classify("../data/after_step5/O", "../data/after_step6/O") diff --git a/py/utils.py b/py/utils.py new file mode 100644 index 0000000..5521299 --- /dev/null +++ b/py/utils.py @@ -0,0 +1,9 @@ +import os + + +def work_py(input_folder, output_folder): + if not os.path.exists(output_folder): + print("not exists") + for filename in os.listdir(input_folder): + target_folder = os.path.join(output_folder, filename) + from_folder = os.path.join(input_folder, filename) diff --git a/readme.md b/readme.md new file mode 100644 index 0000000..b75d9da --- /dev/null +++ b/readme.md @@ -0,0 +1,68 @@ +# 高通量筛选 + +## 配置需求 + +需要两个conda环境,名字分别为**screen**,**zeo** + +### zeo + +#### 运行库需求 + +``` 2018.12.12 +python == 2 +pymatgen == 2018.12.12 +Numpy = 1.16.6 +os +argparse = 1.4.0 +PrettyTable = 1.01 +monty = 1.0.0 +future = 1.0.0 +``` + +#### zeo++软件需求 + +需要编译后放入python库中 + +### screen + +``` +python == 3.11.4 +pymatgen == 2024.11.13 +``` + +## 使用说明 + +如果配置的conda环境同名,运行**main.sh**即可 + +当数据来源为MP时,需要将数据放在input_pre中 + +如果数据来源为ICSD,仅需将数据放在input中即可 + + + +# 扩胞 +## 以下为每一步的分解 +### Step1 +读取cif文件 +### Step2 +统计Occupation情况,将具有相同Occupation值的记为一类,用Occupation值作为Key创建字典,该字典的一个项为atom_serial,是一个列表,记录相同Ocupation值的原子序号 +将上述字典输入一个列表Occupation_list,字典预留分子与分母两个参数 +需要函数为 +```angular2html +def process_cif_file(struct) + return Occupation_list +``` +### step3 +根据Occupation_list来计算扩大倍数\\ +首先逐一计算每个字典的分子与分母,根据key来计算,例如第一个key值为0.5,此时其对应分子为1,分母为2 +合并没一个字典,探索每一个分数的情况并求出公约数与对应的分子,更新每一个字典的值 +### step4 +根据分子与分母情况,生成structure_list,其中Occupation_list中的元素的number处的和为分子,总共个数为分母 +### step5 +根据材料结构决定对称性,对不同对称性得到不同等效情况 +根据对称性与最终扩胞生成三个方向扩胞列表,其中每个元素是字典,遵循格式为{["x":1,"y":2,"z":1]} +### step5 +根据structure_list与Occupation_list生成新的cif并保存 +### 一些假设 +只考虑两个原子在同一位置上,暂不考虑三个原子以上的情况 +不考虑Li原子的共占位情况,对Li原子不做处理 diff --git a/tool/Br.yaml b/tool/Br.yaml new file mode 100644 index 0000000..d7ab175 --- /dev/null +++ b/tool/Br.yaml @@ -0,0 +1,5 @@ +SPECIE: Li+ +ANION: Br +PERCO_R: 0.45 +NEIGHBOR: 1.8 +LONG: 2.2 \ No newline at end of file diff --git a/tool/Cl.yaml b/tool/Cl.yaml new file mode 100644 index 0000000..43e1b3e --- /dev/null +++ b/tool/Cl.yaml @@ -0,0 +1,5 @@ +SPECIE: Li+ +ANION: Cl +PERCO_R: 0.45 +NEIGHBOR: 1.8 +LONG: 2.2 \ No newline at end of file diff --git a/tool/O.yaml b/tool/O.yaml new file mode 100644 index 0000000..84c82cd --- /dev/null +++ b/tool/O.yaml @@ -0,0 +1,5 @@ +SPECIE: Li+ +ANION: O +PERCO_R: 0.5 +NEIGHBOR: 1.8 +LONG: 2.2 diff --git a/tool/S.yaml b/tool/S.yaml new file mode 100644 index 0000000..1dac13a --- /dev/null +++ b/tool/S.yaml @@ -0,0 +1,5 @@ +SPECIE: Li+ +ANION: S +PERCO_R: 0.55 +NEIGHBOR: 1.8 +LONG: 2.2 diff --git a/tool/analyze_voronoi_nodes.py b/tool/analyze_voronoi_nodes.py new file mode 100644 index 0000000..5f3c202 --- /dev/null +++ b/tool/analyze_voronoi_nodes.py @@ -0,0 +1,454 @@ +# system & I/O dependencies +import os +import sys +import time +import argparse +import pandas as pds +from ruamel.yaml import YAML +from copy import deepcopy + +# pymatgen dependencies +from pymatgen.io.cif import CifParser, CifWriter +from pymatgen.core.periodic_table import Specie, Element +from pymatgen.core.composition import Composition + +# Topological_Analysis dependencies +from Topological_Analysis.filters import * +from Topological_Analysis.PyVMD import cmd_by_radius + +# TAPercolateFilter, TABvFilter, TALongFilter, TAOptimumSiteFilter, TACoulombReplusionFilter, OxidationStateFilter, +# TADenseNeighbor + +__author__ = "Xingfeng He, Yunsheng Liu" +__copyright__ = "Copyright 2019, UMD Mo. Group" +__version__ = "0.2" +__maintainer__ = "Yunsheng Liu" +__email__ = "yliu1240@umd.edu" +__date__ = "Jun 21st, 2019" + +# Preparation +yaml = YAML() + +# customized directory to specify Topological_Analysis is if it's not in PYTHONPATH or other python directory +# base_dir = '/Users/Base/Directory' +# if not base_dir in sys.path: +# sys.path.append(base_dir) + +def Analyze_Voronoi_Nodes(args): + """ + A standard process to apply all filters. Zeo++ finds all possible polyhedrons and corresponding sites while this class + will screen bad sites and merge them. The program currently support CIF input files ONLY; + + Args: + args.cif_file (str): Directory of CIF file + args.input_file (yaml): Directory of input file which specify filter parameters. The input file must be a yaml: + + Mandatory: + 1. SPECIE: a string of target diffusion specie, with oxidation state; + e.g. Li+: Li specie with +1 oxidation state. + + Optional: (Each parameter must be added according to filters specified) + Overall: + 2. ANION: a string of potential anion type in the structure. + This parameter will automatically specify parameters for further analysis: + BV_UP + BV_LW + R_CUT + However, these parameters will be overwritten if they're explicitly assigned. + + e.g. S (sulfur) will have R_CUT: 1.5 A, + If input yaml file has another R_CUT to be 2 A, the final R_CUT will be 2 A. + + Currently support following anions: + | BV_LW | BV_UP | R_CUT(A) | + ------------------------------------------------------ + S (sulfur) | 0.4 | 1.1 | 2.5 | + O (oxygen) | 0.5 | 1.2 | 2.3 | + + VoroPerco: + 3. PERCO_R: the percolation radius for diffusion specie; + VoroBV: + 4. BV_UP: the maximum bond valence of a site which considered to be appropriate; + 5. BV_LW: the minimum bond valence of a site which considered to be appropriate; + Coulomb: + 6. R_CUT: the minimum distance between target specie and nearest ion (either anion or + cation); + VoroLong: + 7. LONG: the criteria to decide whether a node is a long node or not. Unit: A; + MergeSite: + 8. NEIGHBOR: the distance criteria to decide whether 2 sites / nodes are too close to + each other. Unit: A. + 9. LONG + + args.filters (str): strings to specify which filter to use in analysis: + + FILTER: filters applied. Currently support following filters: + Ordered: OrderFrameworkFilter + PropOxi: OxidationStateFilter + VoroPerco: TAPercolateFilter + Coulomb: TACoulombReplusionFilter + VoroBV: TABvFilter + VoroLong: TALongFilter + MergeSite: OptimumSiteFilter + VoroInfo: TALongFilter, but only output the center coordinates and length of each node + + Output: + CIF files after applying each filter. The predicted sites for target specie will be represented as sites with 50% + partial occupancy. + Note that some filters may be fundamental (decide whether they're good CIFs or not) and they may have + no output structures. + e.g. if applying OxidationStateFilter, TAPercolateFilter and TABvFilter, + there will be 2 output CIF files: + 1. CIF with all accessible sites; + 2. CIF with all sites having good bond valence; + OxidationStateFilter has no ouput structure. + """ + import Topological_Analysis + + # built-in radius for different species + va_dir = os.path.dirname(Topological_Analysis.__file__) + radii_yaml_dir = os.path.join(va_dir, 'files/radii.yaml') + with open(radii_yaml_dir, 'r') as f: + radii = yaml.load(f) + f.close() + + # read structure from CIF file + name = args.cif_file[:-4] # the last 4 characters are '.cif' + precif = CifParser(args.cif_file, occupancy_tolerance=2.0) + structure = precif.get_structures(primitive = False)[0].copy() + # for input parameter file + with open(args.input_file, 'r') as f: + input_parameters = yaml.load(f) + f.close() + + # target specie + sp = Specie.from_string(input_parameters['SPECIE']) + + # other possible parameters + if 'ANION' in input_parameters.keys(): + if input_parameters['ANION'].lower() == 's': + bv_range = (0.4, 1.1) + rc = 2.5 + elif input_parameters['ANION'].lower() == 'o': + bv_range = (0.5, 1.2) + rc = 2.3 + else: + print '## Unsupported anion type: {}'.format(input_parameters['ANION']) + bv_range = (0.4, 1.2) + rc = 2.0 + + if 'PERCO_R' in input_parameters.keys(): + pr = input_parameters['PERCO_R'] # percolation radius + else: + pr = None + + try: + # these exist further bond valence limits to overwrite existing ones + tmp = bv_range + if 'BV_UP' in input_parameters.keys(): + bv_range = (bv_range[0], input_parameters['BV_UP']) + if 'BV_LW' in input_parameters.keys(): + bv_range = (input_parameters['BV_LW'], bv_range[1]) + except: + # these's no anion type to assign bond valence range + if ('BV_UP' in input_parameters.keys()) and ('BV_LW' in input_parameters.keys()): + bv_range = (input_parameters['BV_LW'], input_parameters['BV_UP']) + else: + bv_range = None + + if 'R_CUT' in input_parameters.keys(): + rc = input_parameters['R_CUT'] # cut-off distance of coulomb replusion + else: + try: + tmp = rc # to check whether parameter exists, if it doesn't exist, set it to None. + # only necessary for bv_range and r_cut because these 2 may be set by ANION parameter + except: + rc = None + + if 'LONG' in input_parameters.keys(): + long = input_parameters['LONG'] # cut-off distance to decide whether a node is long or not + else: + long = None + if 'NEIGHBOR' in input_parameters.keys(): + nn = input_parameters['NEIGHBOR'] # cut-off distance to decide whether 2 sites are neighbors + else: + nn = None + + # temporary parameters for filters applied + frame_structure = None + org_frame = None + node_structure = None + predicted_structure = None + + for f_index, f in enumerate(args.filters): + print 'Step {}: {}'.format(f_index, f) + + if f.lower() == 'ordered': + # Check whether the framework is ordered or not. + print '# Check framework disordering.' + orderFrame = OrderFrameworkFilter(structure.copy(), radii, sp) + org_structure = orderFrame.virtual_structure.copy() + frame_structure = orderFrame.virtual_framework.copy() + org_frame = orderFrame.framework.copy() + print '# Check finishes.' + + if f.lower() == 'propoxi': + # Check oxidation states in structures. This is necessary for bond valence filter. + print '# Check oxidation states in structure.' + PropOxi = OxidationStateFilter(org_structure.copy()) + if not PropOxi.decorated: + print '## Oxidation state check fails...' + sys.exit() + else: + print '# Check finishes.' + + elif f.lower() == 'voroperco': + # Check whether there's enough space for percolation. + print '# Check Voronoi percolation raduis.' + if pr: + VoroPerco = TAPercolateFilter(org_structure.copy(), radii, sp, pr) + else: + print '## No percolation radius provided...' + sys.exit() + + if not VoroPerco.analysis_results: + print '## Cannot percolate...' + sys.exit() + else: + """ + The Voronoi analysis results include: + Voronoi_accessed_node_structure: A structure with all nodes (with Voronoi radius added to the property + of each node); + Voronoi_structure: A structure containing nodes whose Voronoi radius is greater than a certain value; + Framework: The framework structure with no target diffusion specie; + free_sph_max_dia: Maximum spherical diameter in the structure; + ...... + To see other results, please use 'analysis_keys' attribute of the class. + """ + results = deepcopy(VoroPerco.analysis_results) + print '# Percolation diameter (A): {}'.format(round(results['free_sph_max_dia'], 3)) + output_structure = org_frame.copy() + if results['Voronoi_accessed_node_structure']: + node_structure = results['Voronoi_accessed_node_structure'].copy() + for nodes in node_structure.copy(): + output_structure.append(str(sp), nodes.coords, coords_are_cartesian=True) + CifWriter(output_structure).write_file('{}_all_accessed_node.cif'.format(name)) + print '# Percolation check finishes.' + else: + print '## Errors in Voronoi analysis structure...' + + elif f.lower() == 'coulomb': + print '# Check Coulomb replusion effects.' + if (not frame_structure) or (not node_structure): + print '## No framework and node structure provided for Coulomb Replusion analysis...' + sys.exit() + elif not rc: + print '## No Coulomb replusion cut-off distance provided...' + sys.exit() + else: + if sp.oxi_state < 0: + ion = 'anion' + else: + ion = 'cation' + print '# Processing Coulomb replusion check.' + print '# {} effect detected, minimum distance to {}s is {} A.'.format(ion, ion, round(rc, 3)) + CoulRep = TACoulombReplusionFilter(node_structure.copy(), frame_structure.copy(), prune=ion, min_d_to_ion=rc) + if CoulRep.final_structure: + node_structure = CoulRep.final_structure.copy() + output_structure = org_frame.copy() + for node in node_structure.copy(): + output_structure.append(str(sp), node.coords, coords_are_cartesian=True) + CifWriter(output_structure).write_file('{}_coulomb_filtered.cif'.format(name)) + print '# Coulomb replusion check finishes.' + else: + print '## All available nodes will experience high Coulomb replusion...' + print '## The structure is either unreasonable or the replusion radius cut-off is too large...' + sys.exit() + + elif f.lower() == 'vorobv': + print '# Check bond valence limits.' + if (not frame_structure) or (not node_structure): + print '## No framework and node structure provided for bond valence analysis...' + sys.exit() + elif not bv_range: + print '## No bond valence range provided...' + sys.exit() + else: + print '# Processing bond valence check.' + print '# Bond valence limitation: {} - {}'.format(bv_range[0], bv_range[1]) + + VoroBv = TABvFilter(node_structure.copy(), frame_structure.copy(), bv_range) + if VoroBv.final_structure: + node_structure = VoroBv.final_structure.copy() + output_structure = org_frame.copy() # output cif structure + output_doc = {} # output csv file + variables = ['Cartesian_Coords', 'Voronoi_R', 'Bond_Valence'] + for i in variables: + output_doc[i] = [] + for node in node_structure.copy(): + output_structure.append(str(sp), node.coords, coords_are_cartesian=True) + tmp_coords = [round(n, 4) for n in node.coords] + output_doc['Cartesian_Coords'].append(tmp_coords) + output_doc['Voronoi_R'].append(round(node.properties['voronoi_radius'], 3)) + output_doc['Bond_Valence'].append(round(node.properties['valence_state'], 2)) + + CifWriter(output_structure).write_file('{}_bond_valence_filtered.cif'.format(name)) + df = pds.DataFrame(data=output_doc).sort_values(by=['Voronoi_R']) + df = df.reindex(variables, axis=1) + df.to_csv('{}_bv_info.csv'.format(name)) + + print '# Bond valence check finishes.' + else: + print '## All available nodes are excluded for bad bond valences...' + print '## The structure is either unreasonable or the bond valence range is bad...' + sys.exit() + + elif f.lower() == 'vorolong': + print '# Check long nodes in structure.' + if not node_structure: + print '## No node structure provided for long Voronoi node analysis...' + sys.exit() + elif not long: + print '## No length provided to decide Voronoi node length...' + sys.exit() + else: + print '# Processing Voronoi length check.' + print '# Voronoi length limitation: {} A'.format(round(long, 3)) + VoroLong = TALongFilter(node_structure.copy(), long, use_voro_radii=True) + print '# Maximum node length detected: {} A'.format(round(VoroLong.longest_node_length, 3)) + output_doc = {} + variables = ['Center_Coords', 'Node_Length'] + for i in variables: + output_doc[i] = [] + for i in VoroLong.clusters: + tmp_coords = [round(n, 4) for n in i[0]] + output_doc['Center_Coords'].append(tmp_coords) + output_doc['Node_Length'].append(round(i[1], 4)) + df = pds.DataFrame(data=output_doc).sort_values(by=['Node_Length']) + df = df.reindex(variables, axis=1) + df.to_csv('{}_node_length_info.csv'.format(name)) + print '# Central node information written.' + if VoroLong.has_long_node: + print '# Long node check finishes.' + else: + print '## The structure has no long nodes or node length restriction is bad...' + print '## Please check the node length CSV for more information...' + sys.exit() + + elif f.lower() == 'voroinfo': + print '# Output the center coordinates and length of each node......' + if not node_structure: + print '## No node structure provided for Voronoi information...' + sys.exit() + else: + VoroLong = TALongFilter(node_structure.copy(), 0, use_voro_radii=True) + print '# Maximum node length detected: {} A'.format(round(VoroLong.longest_node_length, 3)) + output_doc = {} + variables = ['Center_Coords', 'Node_Length'] + for i in variables: + output_doc[i] = [] + for i in VoroLong.clusters: + tmp_coords = [round(n, 4) for n in i[0]] + output_doc['Center_Coords'].append(tmp_coords) + output_doc['Node_Length'].append(round(i[1], 4)) + df = pds.DataFrame(data=output_doc).sort_values(by=['Node_Length']) + df = df.reindex(variables, axis=1) + df.to_csv('{}_node_length_info.csv'.format(name)) + print '# Voronoi node information written.' + + elif f.lower() == 'mergesite': + # before we use TAOptimumSiteFilter, we need to have a list of different clusters, + # thus must use TADenseNeighbor and TALongFilter. Also note that all clusters in the list must be. + if not node_structure: + print '## No node structure provided for optimizing sites...' + sys.exit() + if (not nn) or (not long): + print '## No neighbor distance cut-off and long node cut-off provided for site optimization...' + sys.exit() + + voro_dense = TADenseNeighbor(node_structure.copy(), close_criteria=1, + big_node_radius=0, radius_range=[0, 0], use_radii_ratio=True) + voro_long = TALongFilter(node_structure.copy(), 0, use_voro_radii=True) + cluster_list = voro_dense.clustering(node_structure.copy(), 1, True, True) + + long_list = [] + short_list = [] + for i in cluster_list: + if voro_long.get_cluster_length(i, use_voro_radii=True) >= long: + long_list.append(i) + else: + short_list.append(i) + print '# Processing site optimization: nearest neighbor cut-off {} A.'.format(round(nn, 3)) + OpSite = TAOptimumSiteFilter(org_structure.copy(), nn, sp, sort_type='None', use_exp_ordered_site=False) + opt_long_list = [] + opt_short_list = [] + for i in long_list: + tmp_list = OpSite.optimize_cluster(i, nn, sort_type='radius') + for j in tmp_list: + opt_long_list.append(j) + for i in short_list: + tmp_list = OpSite.optimize_cluster(i, nn, sort_type='radius') + for j in tmp_list: + opt_short_list.append(j) + print '# Long node number: {}'.format(len(opt_long_list)) + print '# Short node number: {}'.format(len(opt_short_list)) + new_list = [] + for i in opt_long_list: + new_list.append(i) + for i in opt_short_list: + new_list.append(i) + OpSite.add_cluster(new_list) + + output_structure = OpSite.site_structure.copy() + half_list = [] # it seems 50% occupancy sites are easier to see. You may directly use output_structure otherwise + for i in output_structure: + ppt = deepcopy(i.properties) + new_i = PeriodicSite({str(sp): 0.5}, i.coords, i.lattice, to_unit_cell=False, coords_are_cartesian=True, + properties=ppt) + half_list.append(new_i) + half_structure = Structure.from_sites(half_list) + CifWriter(half_structure).write_file('{}_{}_optimized_sites.cif'.format(name, 'radius')) + # CifWriter(output_structure).write_file('{}_{}_optimized_sites.cif'.format(name, 'radius')) + + # for predicted structure: + tot_num = org_structure.composition[sp] + current_num = OpSite.site_structure.composition.num_atoms + ratio = tot_num / current_num + if ratio > 1: + print '## Prediction error, please be cautious about the predicted results.' + print '## Please also double check whether the input parameters are reasonable...' + ratio = 1 + prediction = org_frame.copy() + for site in OpSite.site_structure.copy(): + prediction.append({str(sp): ratio}, site.coords, coords_are_cartesian=True) + prediction.sort() + predicted_structure = prediction.copy() + print '# Site optimization finishes.' + + else: + print '## Unsupported operation...' + if predicted_structure: + comp = org_structure.composition.reduced_formula + CifWriter(predicted_structure).write_file('{}_{}_predicted.cif'.format(name, comp)) + cmds = cmd_by_radius(half_structure, 0.5) + cmd_file = open('{}_cmd'.format(name), 'w') + cmd_file.write('mol new\n') + for lines in cmds: + cmd_file.write(lines) + cmd_file.close() + +if __name__ == '__main__': + start_time = time.time() + parser = argparse.ArgumentParser(description='Voronoi analysis on structures') + parser.add_argument("cif_file", type=str, help='CIF file directory') + parser.add_argument("-i", "--input_file", type=str, help='Input yaml file to specify different parameters') + parser.add_argument("-f", "--filters", nargs="+", + default=["Ordered", "PropOxi", "VoroPerco", "Coulomb", "VoroBV", "VoroInfo", "MergeSite"], + type=str, help="Default is PropOxi, VoroPerco, Coulomb, VoroBV, VoroInfo, MergeSite" + "Ordered list of filters. Current only support 6 filters." + "Please read README for further information") + # default: "Vorolong" + parser.set_defaults(func=Analyze_Voronoi_Nodes) + args = parser.parse_args() + args.func(args) + print('Total used time: {}'.format(str(time.time() - start_time))) diff --git a/tool/valence_states.yaml b/tool/valence_states.yaml new file mode 100644 index 0000000..2450bd3 --- /dev/null +++ b/tool/valence_states.yaml @@ -0,0 +1,98 @@ +Ac: 3 +Ag: 1 +Al: 3 +Am: 3 +As: 5 +At: -1 +Au: 3 +B: 3 +Ba: 2 +Be: 2 +Bi: 3 +Bk: 3 +Br: -1 +C: -4 +Ca: 2 +Cd: 2 +Ce: 4 +Cf: 3 +Cl: -1 +Cm: 3 +Co: 3 +Cr: 6 +Cs: 1 +Cu: 2 +Dy: 3 +Er: 3 +Es: 3 +Eu: 3 +F: -1 +Fe: 3 +Fm: 3 +Fr: 1 +Ga: 3 +Gd: 3 +Ge: 4 +H: -1 +Hf: 4 +Hg: 2 +Ho: 3 +I: -1 +In: 3 +Ir: 4 +K: 1 +La: 3 +Li: 1 +Lr: 3 +Lu: 3 +Md: 3 +Mg: 2 +Mn: 7 +Mo: 6 +N: -3 +Na: 1 +Nb: 5 +Nd: 3 +Ni: 2 +'No': 3 +Np: 5 +O: -2 +Os: 4 +P: 5 +Pa: 5 +Pb: 4 +Pd: 4 +Pm: 3 +Po: 4 +Pr: 3 +Pt: 4 +Pu: 4 +Ra: 2 +Rb: 1 +Re: 4 +Rh: 3 +Ru: 4 +S: -2 +Sb: 5 +Sc: 3 +Se: -2 +Si: 4 +Sm: 3 +Sn: 4 +Sr: 2 +Ta: 5 +Tb: 3 +Tc: 7 +Te: 6 +Th: 4 +Ti: 4 +Tl: 3 +Tm: 3 +U: 6 +V: 5 +W: 6 +Y: 3 +Yb: 3 +Zn: 2 +Zr: 4 +Xe: 0