import os import shutil import subprocess import re import numpy as np import time import datetime import warnings import json import pandas as pd import nafuma.auxillary as aux def make_initial_inp(data: dict, options={}): # required_options = ['filename', 'overwrite', 'include', 'save_results', 'save_dir', 'instrument', 'topas_options', 'background', 'capillary', 'th2_offset', 'fit_peak_width', 'simple_axial_model', 'TCHZ_Peak_Type', 'start', 'finish', 'exclude', 'magnetic_atoms', 'radiation', 'magnetic_space_group', 'interval'] default_options = { 'filename': 'start.inp', 'overwrite': False, 'save_results': False, 'save_dir': 'results/', 'instrument': None, 'radiation': 'synchrotron', 'magnetic_space_group': None, 'magnetic_atoms': [], 'include': [], # Any files that should be included with #include 'topas_options': {}, 'background': 7, 'capillary': False, 'th2_offset': False, 'fit_peak_width': False, 'simple_axial_model': False, 'TCHZ_Peak_Type': False, 'interval': [None, None], # Start and finish values that TOPAS should refine on. Overrides 'start' and 'finish' 'start': None, # Start value only. Overridden by 'interval' if this is set. 'finish': None, # Finish value only. Overridden by 'interval' if this is set. 'exclude': [], # Excluded regions. List of lists. 'manual_background': False #Upload a background-file } options = aux.update_options(options=options, default_options=default_options) options['topas_options'] = update_topas_options(options=options) if not os.path.exists(options['filename']) or options['overwrite']: with open(options['filename'], 'w') as fout: write_headers(fout=fout, options=options) fout.write('\n\'------------------------------------------------------------------------------------------------------------------------------------------\n') fout.write('\'START OF INP - XXXX') fout.write('\n\'------------------------------------------------------------------------------------------------------------------------------------------\n') fout.write('\n\'------------------------------------------------------------------------------------------------------------------------------------------\n') fout.write('r_wp 0 r_exp 0 r_p 0 r_wp_dash 0 r_p_dash 0 r_exp_dash 0 weighted_Durbin_Watson 0 gof 0') fout.write('\n\'------------------------------------------------------------------------------------------------------------------------------------------\n') write_xdd(fout=fout, data=data, options=options) fout.write('\n\'------------------------------------------------------------------------------------------------------------------------------------------\n') fout.write('\'PARAMETER DEFINITONS') fout.write('\n\'------------------------------------------------------------------------------------------------------------------------------------------\n') for i, str in enumerate(data['str']): fout.write('\n\n') write_params(fout=fout, data=data, options=options, index=i) fout.write('\n\'------------------------------------------------------------------------------------------------------------------------------------------\n') fout.write('\'STR DEFINITIONS') fout.write('\n\'------------------------------------------------------------------------------------------------------------------------------------------\n') for i, str in enumerate(data['str']): fout.write('\n\n') write_str(fout=fout, data=data, options=options, index=i) if 'peak' in data.keys(): fout.write('\n\'------------------------------------------------------------------------------------------------------------------------------------------\n') fout.write('\'PEAK PHASES') fout.write('\n\'------------------------------------------------------------------------------------------------------------------------------------------\n') write_peaks(fout=fout, data=data, options=options, index=i) with open(options['filename'], 'r') as fin: lines = [] for statement in data['define_statements']: lines.append(f'\'#define {statement}\n') lines.append('\n') lines += fin.readlines() with open(options['filename'], 'w') as fout: for line in lines: fout.write(line) def make_general_background_in_q(data, options): #Function where you get a txt.-file of q-values, based on the 2th values you have picked out manually from a data set. Idea is that the same background points can be used for several samples, measured at several wavelengths. #The input is a list of 2th-values that the user has found by manually inspecting a data set import numpy as np import nafuma.xrd as xrd import matplotlib.pyplot as plt default_options = { 'save_dir': 'background', 'filename': 'test' } options = aux.update_options(options=options, default_options=default_options) list_of_2th_values = data['2th_values'] #x_background_points=list_of_2th_values x_background_points_q=[] for x in list_of_2th_values: q=np.abs((4*np.pi/data['wavelength'])*np.sin(x/2 * np.pi/180)) x_background_points_q.append(q) background_points=pd.DataFrame(x_background_points_q) background_filename="C:/Users/halvorhv/OneDriveUiO/1_OrderPaper/analysis/exsitu/probing_ordering/refinements/"+options['filename']+"_background_points_in_q.txt" background_points.to_csv(background_filename,index=None, header=None)#,index=None, header=None,sep=' ') def make_manual_background(data, options): #FIXME generalize this so it works properly #FIXME fix so that plotting is optional import numpy as np import nafuma.xrd as xrd import matplotlib.pyplot as plt default_options = { 'plot_background': True, 'interval_length': 0.05, #picking out the region to be fitted for each background point, bigger interval-length means a bigger region for each point. 'save_dir': 'background' } if "noheaders" in data['path'][0]: filename = os.path.basename(data['path'][0]).split('_noheaders.')[0] else: filename = os.path.basename(data['path'][0]).split('.')[0] options = aux.update_options(options=options, default_options=default_options) #data['background_in_q'] is a .txt-file with one column of x-values that are good starting points for background determination, as they should not include any peaks for that specific material #importing the pre-made background points in Q into a dataframe df_backgroundpoints_q=pd.read_csv(data['background_in_q'],names=["background_q"]) diffractogram, wavelength = xrd.io.read_xy(data=data) #transferring q-range background to the respective wavelength x_background_points=[] for q in df_backgroundpoints_q["background_q"]: twotheta=2*180/np.pi*np.arcsin(q*wavelength/(4*np.pi)) x_background_points.append(twotheta) intervallength=options['interval_length'] background=pd.DataFrame() for i, x in enumerate(x_background_points): test=diffractogram.loc[(diffractogram["2th"]>x-intervallength) & (diffractogram["2th"] str: ''' Takes a template and creates an entry with xdd as path to file and with number num.''' temp_xdd = find_xdd_in_inp(options['template']) num_str = f'{num}'.zfill(4) # Replace diffractogram-path s = template.replace(temp_xdd, xdd).replace('XXXX', num_str) s = s.replace(os.path.basename(temp_xdd).split('.')[0], os.path.basename(xdd).split('.')[0]) return s def find_xdd_in_inp(path: str) -> str: ''' Finds the path to the .xy / .xye scan in a given .INP-file. Assumes only one occurence of xdd and will return the last one no matter what, but outputs a UserWarning if more than one xdd is found.''' with open(path, 'r') as f: lines = f.readlines() xdds = 0 for line in lines: if 'xdd' in line: xdd = line.split()[-1].strip('"') xdds += 1 if xdds > 1: warnings.warn(f'More than one path was found in {path}. Returning last occurence - please make sure this is what you want!') return xdd def refine(data: dict, options={}): ''' Calls TOPAS from options['topas_path'], which should point to tc.exe in the TOPAS-folder. If not explicitly passed, will try to use what is in topas.conf.''' required_options = ['topas_path', 'topas_log', 'topas_logfile', 'log', 'logfile', 'overwrite'] default_options = { 'topas_path': None, 'topas_log': False, 'topas_logfile': f'{datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")}_topas.out', 'log': False, 'logfile': f'{datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")}_generate_big_inp.log', 'overwrite': False } options = aux.update_options(options=options, required_options=required_options, default_options=default_options) if not options['topas_path']: import nafuma.xrd as xrd # Open topas.conf in the nafuma/xrd with open(os.path.join(os.path.dirname(xrd.__file__), 'topas.conf'), 'r') as f: topas_base = f.read() options['topas_path'] = os.path.join(topas_base, 'tc.exe') # Check to see if the executable exists if not os.path.exists(options['topas_path']): raise Exception('TOPAS executable not found! Please explicitly pass path to tc.exe directly in options["topas_path"] or change base folder in topas.conf to the correct one.') # Create folders if they don't exist # FIXME Since the big INP files now have the same filename for all iterations, we need to adjust the code to only get unique values from the get_paths function # FIXME get_headers() is also not working now. Needs to be adjusted to the new way of writing the Out-parameters paths = get_paths(data['inp']) paths = aux.get_unique(paths) for path in paths: headers = get_headers(data['inp'], path) dirname = os.path.dirname(path) if dirname and not os.path.isdir(dirname): os.makedirs(dirname) if not os.path.exists(path) or options['overwrite']: with open(path, 'w') as results: for header in headers: results.write(header+'\t') results.write('\n') else: raise Exception(f'Results file already exists: {path}') # Create shell command command = ' '.join([options['topas_path'], data['inp']]) # Append output if logging is enabled if options['topas_log']: command = ' '.join([command, f'>{options["topas_logfile"]}']) if os.path.dirname(options['topas_logfile']) and not os.path.isdir(os.path.dirname(options['topas_logfile'])): os.makedirs(os.path.dirname(options['topas_logfile'])) subprocess.call(command, shell=True) def read_results(path, options={}): default_options = { 'errors': True } options = aux.update_options(options=options, default_options=default_options) results = pd.read_csv(path, delim_whitespace=True, index_col=0, header=None) if options['errors']: atoms = int((results.shape[1] - 24) / 10) headers = [ 'r_wp', 'r_exp', 'r_p', 'r_p_dash', 'r_exp_dash', 'gof', 'vol', 'vol_err', 'mass', 'mass_err', 'wp', 'wp_err', 'a', 'a_err', 'b', 'b_err', 'c', 'c_err', 'alpha', 'alpha_err', 'beta', 'beta_err', 'gamma', 'gamma_err', ] else: atoms = int((results.shape[1] - 15) / 5) headers = [ 'r_wp', 'r_exp', 'r_p', 'r_p_dash', 'r_exp_dash', 'gof', 'vol', 'mass', 'wp', 'a', 'b', 'c', 'alpha', 'beta', 'gamma', ] labels = ['x', 'y', 'z', 'occ', 'beq'] for i in range(atoms): for label in labels: headers.append(f'atom_{i+1}_{label}') if options['errors']: headers.append(f'atom_{i+1}_{label}_err') results.columns = headers return results def fix_magnetic_cif(path): with open(path, 'r') as f: lines = f.readlines() for i, line in enumerate(lines): if '_magnetic_space_group_symop_operation_timereversal' in line: # last line before symmetry operations j = 1 line = lines[i + j] while len(line) > 1: lines[i+j] = f'{j} ' + line j += 1 line = lines[i+j] with open(path, 'w') as f: for line in lines: f.write(line) def get_magnetic_moment(path): with open(path, 'r') as f: lines = f.readlines() for i, line in enumerate(lines): if '_magnetic_atom_site_moment_crystalaxis_mz' in line: # last line before magnetic moments j = 1 line = lines[i+j] magnetic_moments = {} while line: magnetic_moments[line.split()[0]] = (float(line.split()[1]), float(line.split()[2]), float(line.split()[3])) j += 1 if i+j < len(lines): line = lines[i+j] else: break return magnetic_moments def get_magnetic_moment_magnitude(magmom): magnitudes = {} for site in magmom.keys(): magnitudes[site] = np.sqrt(magmom[site][0]**2+magmom[site][1]**2+magmom[site][2]**2) return magnitudes def strip_labels_from_inp(path, save_path=None): if not save_path: save_path = path with open(path, 'r') as fin: lines = fin.readlines() for i, line in enumerate(lines): lines[i] = line.replace('_XXXX', '') with open(save_path, 'w') as fout: for line in lines: fout.write(line)