From 576ce0301b949739a8942517ed3822a25bf3f7a7 Mon Sep 17 00:00:00 2001 From: rasmusthog Date: Sun, 9 Oct 2022 18:40:11 +0200 Subject: [PATCH] Add pre-edge feature fitting function --- nafuma/xanes/calib.py | 280 ++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 280 insertions(+) diff --git a/nafuma/xanes/calib.py b/nafuma/xanes/calib.py index 6162d99..0e0ff33 100644 --- a/nafuma/xanes/calib.py +++ b/nafuma/xanes/calib.py @@ -1,4 +1,5 @@ from logging import raiseExceptions +from signal import default_int_handler from jinja2 import TemplateRuntimeError import pandas as pd import numpy as np @@ -14,6 +15,7 @@ from datetime import datetime import ipywidgets as widgets from IPython.display import display +import warnings ##Better to make a new function that loops through the files, and performing the split_xanes_scan on @@ -445,6 +447,7 @@ def smoothing(data: dict, options={}): # FIXME Add other types of filters for i, filename in enumerate(data['path']): + if options['smooth_algorithm'] == 'savgol': if options['log']: aux.write_log(message=f'Smoothing {filename} with algorithm: {options["smooth_algorithm"]} ({i+1}/{len(data["path"])})', options=options) @@ -1012,5 +1015,282 @@ def flatten(data:dict, options={}): return flattened_df, fit_function_diff +def extract_partial_range(data: dict, options={}): + + default_options = { + 'extract_range': None, + } + + options = aux.update_options(options=options, required_options=default_options.keys(), default_options=default_options) + + if not options['extract_range']: + warnings.warn('You did not specify a range - do so with the keyword "extract_range" in the options dictionary. Returning data without modification') + return data + + + partial_data = data['xanes_data'].loc[(data['xanes_data']['ZapEnergy'] > options['extract_range'][0]) & (data['xanes_data']['ZapEnergy'] < options['extract_range'][1])] + + return partial_data + + + + +def fit_pre_edge_feautre(data: dict, options={}) -> pd.DataFrame: + + from scipy.interpolate import UnivariateSpline + from scipy.optimize import curve_fit + from scipy.stats import norm + + default_options = { + 'remove_background': True, + 'background_model': 'exponential', + 'fit_model': 'gaussian', + 'extract_range': None, + 'extract_range_increments': [0, 0], + 'background_limits': [None, None], + 'background_limits_increments': [0, 0], + 'background_polyorder': 2, + 'log': False, + 'logfile': f'{datetime.now().strftime("%Y-%m-%d-%H-%M-%S")}_peak_fit.log', + 'show_plots': False, + 'save_plots': False, + 'save_folder': './', + 'ylim': None, + 'xlim': None, + 'interactive': False, + 'interactive_session_active': False + } + + options = aux.update_options(options=options, required_options=default_options.keys(), default_options=default_options) + + + if not options['extract_range']: + warnings.warn('You did not specify a range - do so with the keyword "extract_range" in the options dictionary. No modification is done.') + return None, None, None + + if options['log']: + aux.write_log(message='Starting fit of pre edge feature', options=options) + + + centroids = [] + errors = [] + for i, filename in enumerate(data['path']): + + options['extract_range'][0] += options['extract_range_increments'][0] + options['extract_range'][1] += options['extract_range_increments'][1] + + partial_data = extract_partial_range(data, options) + + removed_background_df = pd.DataFrame(partial_data["ZapEnergy"]) + background_df = pd.DataFrame(partial_data["ZapEnergy"]) + + if options['remove_background']: + if not options['background_limits'][0]: + options['background_limits'][0] = partial_data[1].max() - 0.003 + if not options['background_limits'][1]: + options['background_limits'][1] = partial_data[1].max() + 0.003 + + if i > 0: + options['background_limits'][0][0] += options['background_limits_increments'][0] + options['background_limits'][0][1] += options['background_limits_increments'][0] + options['background_limits'][1][0] += options['background_limits_increments'][1] + options['background_limits'][1][1] += options['background_limits_increments'][1] + + peak_background = partial_data.copy() + + #peak_background.loc[(peak_background['ZapEnergy'] > options['background_limits'][0]) & (peak_background['ZapEnergy'] < options['background_limits'][1])] = np.nan + peak_background.loc[(peak_background['ZapEnergy'] < options['background_limits'][0][0]) | + ((peak_background['ZapEnergy'] > options['background_limits'][0][1]) & + (peak_background['ZapEnergy'] < options['background_limits'][1][0])) | + (peak_background['ZapEnergy'] > options['background_limits'][1][1])] = np.nan + peak_background = peak_background.dropna() + + + # FIXME Originally tried with spline and polynomials, but they worked very poorly. This is as best as it gets at this moment, but alternatives should be considered. + if options['background_model'] == 'exponential': + def linear(x, a, b): + return a*x + b + + # Fit linear curve to the logarithm of the background + popt, pcov = curve_fit(linear, peak_background['ZapEnergy'], np.log(peak_background[filename])) + #fit_function = np.poly1d(popt) + + # Restore exponential nature of background + background = np.exp(linear(background_df['ZapEnergy'], *popt)) + background_df.insert(1,filename,background) + + removed_background_df.insert(1, filename, partial_data[filename]-background_df[filename]) + removed_background_df = removed_background_df.loc[(removed_background_df['ZapEnergy'] > options['background_limits'][0][1]) & + (removed_background_df['ZapEnergy'] < options['background_limits'][1][0]) + ] + + + + + # Fit Gaussian + # FIXME Should have options for Lorentzian and Pseudo-Voigt here as well. + if options['fit_model'] == 'gaussian': + # FIXME Understand what this deprecation warning means and what changes should be made to make it future proof + warnings.filterwarnings(action='ignore', category=np.VisibleDeprecationWarning) + + + sigma_init = removed_background_df['ZapEnergy'].loc[removed_background_df[filename] == removed_background_df[filename].max()] + popt, pcov = curve_fit( gauss, + removed_background_df['ZapEnergy'], + removed_background_df[filename], + p0=[0.0005, sigma_init, 0.001], + bounds=[ + (0, sigma_init-0.002, -np.inf), + (0.5, sigma_init+0.002, np.inf) + ] + ) + + centroids.append(popt) + errors.append(pcov) + + + if options['show_plots'] or options['save_plots']: + + fig, axes = plt.subplots(figsize=(10,5), ncols=2) + + + # Background removal + partial_data.plot(x='ZapEnergy', y=filename, ax=axes[0], color='black', label='Original data', kind='scatter') + background_df.plot(x='ZapEnergy', y=filename, ax=axes[0], color='black', ls='--', label='Fitted background') + removed_background_df.plot(x='ZapEnergy', y=filename, ax=axes[0], color='red', label='Background subtracted', kind='scatter') + axes[0].axvline(x=options['background_limits'][0][0], color='black', ls='--') + axes[0].axvline(x=options['background_limits'][0][1], color='black', ls='--') + axes[0].axvline(x=options['background_limits'][1][0], color='black', ls='--') + axes[0].axvline(x=options['background_limits'][1][1], color='black', ls='--') + axes[0].set_title(f'{os.path.basename(filename)} - Background removal', size=20) + axes[0].set_ylabel('Normalised x$\mu$(E)', size=20) + axes[0].set_xlabel('Energy (keV)', size=20) + peak_background.plot(x='ZapEnergy', y=filename, ax=axes[0], color='green', kind='scatter') + if options['xlim']: + axes[0].set_xlim(options['xlim']) + if options['ylim']: + axes[0].set_ylim(options['ylim']) + + # Fitted curve + + y_fit = gauss(removed_background_df['ZapEnergy'], *popt) + + removed_background_df.plot(x='ZapEnergy', y=filename, ax=axes[1], color='black', label='Background subtracted', kind='scatter') + axes[1].plot(removed_background_df['ZapEnergy'], y_fit, color='red', label=f'Fit data ({options["fit_model"]})') + axes[1].set_title(f'{os.path.basename(filename)} - Pre-edge feature fit', size=20) + axes[1].set_ylabel('Normalised x$\mu$(E)', size=20) + axes[1].set_xlabel('Energy (keV)', size=20) + + if options['xlim']: + axes[1].set_xlim(options['xlim']) + + + # Save plots if toggled + if options['save_plots']: + if not os.path.isdir(options['save_folder']): + os.makedirs(options['save_folder']) + + dst = os.path.join(options['save_folder'], os.path.basename(filename)) + '_pre_edge_feature_fit.png' + + plt.savefig(dst, transparent=False) + + + # Close plots if show_plots not toggled + if not options['show_plots']: + plt.close() + + return centroids, errors, removed_background_df + + + + +def gauss(x, A, mu, sigma): + return (A/(sigma*np.sqrt(np.pi)))*np.exp(-(x-mu)**2/(2*sigma**2)) + + + +def save_data(data, options={}): + + default_options = { + 'save_data': 'xanes_data', + 'save_folder': '.' + } + + options = aux.update_options(options=options, required_options=default_options.keys(), default_options=default_options) + + + filenames = [filename for filename in data[options["save_data"]].columns if not 'ZapEnergy' in filename] + + for filename in filenames: + + options['save_filename'] = os.path.basename(filename).split('.')[0] + '_exported.dat' + + save_path = os.path.join(options['save_folder'], options['save_filename']) + + if not os.path.isdir(options['save_folder']): + os.makedirs(options['save_folder']) + + to_export = data[options['save_data']][['ZapEnergy', filename]] + to_export.columns = ['E', 'I'] + to_export.to_csv(save_path) + + + + + + +def save_centroids(data: dict, options={}): + + default_options = { + 'save_path': 'centroids.dat', + 'overwrite': False, + 'append': False, + } + + options = aux.update_options(options=options, default_options=default_options) + + if options['overwrite']: + mode = 'w' + elif options['append']: + mode = 'a' + else: + mode = False + + if os.path.exists(options['save_path']) and not options['overwrite']: + with open(options['save_path'], 'r') as f: + reference = float(f.readline().split()[1]) + + else: + reference = data['centroid_fit'][0][1]*1000 + + + if not os.path.exists(os.path.dirname(options['save_path'])): + os.makedirs(os.path.dirname(options['save_path'])) + + + + if mode: + with open(options['save_path'], mode) as f: + for path, fit, error in zip(data['path'], data['centroid_fit'], data['centroid_fit_errors']): + + A = fit[0] + mu = fit[1] + sigma = fit[2] + mu_adj = (fit[1]*1000)-reference + + stddevs = np.sqrt(np.diag(error)) + + #f.write(f'{path} \t {fit[1]*1000} \t {(fit[1]-reference)*1000} \t {fit[0]} \t {fit[2]} \n') + f.write('{: <40} \t {: <25} \t {: <25} \t {: <25} \t {: <25} \t {: <25} \t {: <25} \t {: <25}\n'.format(path, mu*1000, mu_adj, A, sigma, stddevs[1]*1000, stddevs[0], stddevs[2])) + + + +def read_centroids(path): + + df = pd.read_csv(path, delim_whitespace=True, header=None) + + df.columns = ['scan', 'mu', 'mu_adj', 'A', 'sigma', 'mu_err', 'A_err', 'sigma_err'] + + return df