Add pre-edge feature fitting function

This commit is contained in:
rasmusthog 2022-10-09 18:40:11 +02:00
parent 9e46504ac9
commit 576ce0301b

View file

@ -1,4 +1,5 @@
from logging import raiseExceptions
from signal import default_int_handler
from jinja2 import TemplateRuntimeError
import pandas as pd
import numpy as np
@ -14,6 +15,7 @@ from datetime import datetime
import ipywidgets as widgets
from IPython.display import display
import warnings
##Better to make a new function that loops through the files, and performing the split_xanes_scan on
@ -445,6 +447,7 @@ def smoothing(data: dict, options={}):
# FIXME Add other types of filters
for i, filename in enumerate(data['path']):
if options['smooth_algorithm'] == 'savgol':
if options['log']:
aux.write_log(message=f'Smoothing {filename} with algorithm: {options["smooth_algorithm"]} ({i+1}/{len(data["path"])})', options=options)
@ -1012,5 +1015,282 @@ def flatten(data:dict, options={}):
return flattened_df, fit_function_diff
def extract_partial_range(data: dict, options={}):
default_options = {
'extract_range': None,
}
options = aux.update_options(options=options, required_options=default_options.keys(), default_options=default_options)
if not options['extract_range']:
warnings.warn('You did not specify a range - do so with the keyword "extract_range" in the options dictionary. Returning data without modification')
return data
partial_data = data['xanes_data'].loc[(data['xanes_data']['ZapEnergy'] > options['extract_range'][0]) & (data['xanes_data']['ZapEnergy'] < options['extract_range'][1])]
return partial_data
def fit_pre_edge_feautre(data: dict, options={}) -> pd.DataFrame:
from scipy.interpolate import UnivariateSpline
from scipy.optimize import curve_fit
from scipy.stats import norm
default_options = {
'remove_background': True,
'background_model': 'exponential',
'fit_model': 'gaussian',
'extract_range': None,
'extract_range_increments': [0, 0],
'background_limits': [None, None],
'background_limits_increments': [0, 0],
'background_polyorder': 2,
'log': False,
'logfile': f'{datetime.now().strftime("%Y-%m-%d-%H-%M-%S")}_peak_fit.log',
'show_plots': False,
'save_plots': False,
'save_folder': './',
'ylim': None,
'xlim': None,
'interactive': False,
'interactive_session_active': False
}
options = aux.update_options(options=options, required_options=default_options.keys(), default_options=default_options)
if not options['extract_range']:
warnings.warn('You did not specify a range - do so with the keyword "extract_range" in the options dictionary. No modification is done.')
return None, None, None
if options['log']:
aux.write_log(message='Starting fit of pre edge feature', options=options)
centroids = []
errors = []
for i, filename in enumerate(data['path']):
options['extract_range'][0] += options['extract_range_increments'][0]
options['extract_range'][1] += options['extract_range_increments'][1]
partial_data = extract_partial_range(data, options)
removed_background_df = pd.DataFrame(partial_data["ZapEnergy"])
background_df = pd.DataFrame(partial_data["ZapEnergy"])
if options['remove_background']:
if not options['background_limits'][0]:
options['background_limits'][0] = partial_data[1].max() - 0.003
if not options['background_limits'][1]:
options['background_limits'][1] = partial_data[1].max() + 0.003
if i > 0:
options['background_limits'][0][0] += options['background_limits_increments'][0]
options['background_limits'][0][1] += options['background_limits_increments'][0]
options['background_limits'][1][0] += options['background_limits_increments'][1]
options['background_limits'][1][1] += options['background_limits_increments'][1]
peak_background = partial_data.copy()
#peak_background.loc[(peak_background['ZapEnergy'] > options['background_limits'][0]) & (peak_background['ZapEnergy'] < options['background_limits'][1])] = np.nan
peak_background.loc[(peak_background['ZapEnergy'] < options['background_limits'][0][0]) |
((peak_background['ZapEnergy'] > options['background_limits'][0][1]) &
(peak_background['ZapEnergy'] < options['background_limits'][1][0])) |
(peak_background['ZapEnergy'] > options['background_limits'][1][1])] = np.nan
peak_background = peak_background.dropna()
# FIXME Originally tried with spline and polynomials, but they worked very poorly. This is as best as it gets at this moment, but alternatives should be considered.
if options['background_model'] == 'exponential':
def linear(x, a, b):
return a*x + b
# Fit linear curve to the logarithm of the background
popt, pcov = curve_fit(linear, peak_background['ZapEnergy'], np.log(peak_background[filename]))
#fit_function = np.poly1d(popt)
# Restore exponential nature of background
background = np.exp(linear(background_df['ZapEnergy'], *popt))
background_df.insert(1,filename,background)
removed_background_df.insert(1, filename, partial_data[filename]-background_df[filename])
removed_background_df = removed_background_df.loc[(removed_background_df['ZapEnergy'] > options['background_limits'][0][1]) &
(removed_background_df['ZapEnergy'] < options['background_limits'][1][0])
]
# Fit Gaussian
# FIXME Should have options for Lorentzian and Pseudo-Voigt here as well.
if options['fit_model'] == 'gaussian':
# FIXME Understand what this deprecation warning means and what changes should be made to make it future proof
warnings.filterwarnings(action='ignore', category=np.VisibleDeprecationWarning)
sigma_init = removed_background_df['ZapEnergy'].loc[removed_background_df[filename] == removed_background_df[filename].max()]
popt, pcov = curve_fit( gauss,
removed_background_df['ZapEnergy'],
removed_background_df[filename],
p0=[0.0005, sigma_init, 0.001],
bounds=[
(0, sigma_init-0.002, -np.inf),
(0.5, sigma_init+0.002, np.inf)
]
)
centroids.append(popt)
errors.append(pcov)
if options['show_plots'] or options['save_plots']:
fig, axes = plt.subplots(figsize=(10,5), ncols=2)
# Background removal
partial_data.plot(x='ZapEnergy', y=filename, ax=axes[0], color='black', label='Original data', kind='scatter')
background_df.plot(x='ZapEnergy', y=filename, ax=axes[0], color='black', ls='--', label='Fitted background')
removed_background_df.plot(x='ZapEnergy', y=filename, ax=axes[0], color='red', label='Background subtracted', kind='scatter')
axes[0].axvline(x=options['background_limits'][0][0], color='black', ls='--')
axes[0].axvline(x=options['background_limits'][0][1], color='black', ls='--')
axes[0].axvline(x=options['background_limits'][1][0], color='black', ls='--')
axes[0].axvline(x=options['background_limits'][1][1], color='black', ls='--')
axes[0].set_title(f'{os.path.basename(filename)} - Background removal', size=20)
axes[0].set_ylabel('Normalised x$\mu$(E)', size=20)
axes[0].set_xlabel('Energy (keV)', size=20)
peak_background.plot(x='ZapEnergy', y=filename, ax=axes[0], color='green', kind='scatter')
if options['xlim']:
axes[0].set_xlim(options['xlim'])
if options['ylim']:
axes[0].set_ylim(options['ylim'])
# Fitted curve
y_fit = gauss(removed_background_df['ZapEnergy'], *popt)
removed_background_df.plot(x='ZapEnergy', y=filename, ax=axes[1], color='black', label='Background subtracted', kind='scatter')
axes[1].plot(removed_background_df['ZapEnergy'], y_fit, color='red', label=f'Fit data ({options["fit_model"]})')
axes[1].set_title(f'{os.path.basename(filename)} - Pre-edge feature fit', size=20)
axes[1].set_ylabel('Normalised x$\mu$(E)', size=20)
axes[1].set_xlabel('Energy (keV)', size=20)
if options['xlim']:
axes[1].set_xlim(options['xlim'])
# Save plots if toggled
if options['save_plots']:
if not os.path.isdir(options['save_folder']):
os.makedirs(options['save_folder'])
dst = os.path.join(options['save_folder'], os.path.basename(filename)) + '_pre_edge_feature_fit.png'
plt.savefig(dst, transparent=False)
# Close plots if show_plots not toggled
if not options['show_plots']:
plt.close()
return centroids, errors, removed_background_df
def gauss(x, A, mu, sigma):
return (A/(sigma*np.sqrt(np.pi)))*np.exp(-(x-mu)**2/(2*sigma**2))
def save_data(data, options={}):
default_options = {
'save_data': 'xanes_data',
'save_folder': '.'
}
options = aux.update_options(options=options, required_options=default_options.keys(), default_options=default_options)
filenames = [filename for filename in data[options["save_data"]].columns if not 'ZapEnergy' in filename]
for filename in filenames:
options['save_filename'] = os.path.basename(filename).split('.')[0] + '_exported.dat'
save_path = os.path.join(options['save_folder'], options['save_filename'])
if not os.path.isdir(options['save_folder']):
os.makedirs(options['save_folder'])
to_export = data[options['save_data']][['ZapEnergy', filename]]
to_export.columns = ['E', 'I']
to_export.to_csv(save_path)
def save_centroids(data: dict, options={}):
default_options = {
'save_path': 'centroids.dat',
'overwrite': False,
'append': False,
}
options = aux.update_options(options=options, default_options=default_options)
if options['overwrite']:
mode = 'w'
elif options['append']:
mode = 'a'
else:
mode = False
if os.path.exists(options['save_path']) and not options['overwrite']:
with open(options['save_path'], 'r') as f:
reference = float(f.readline().split()[1])
else:
reference = data['centroid_fit'][0][1]*1000
if not os.path.exists(os.path.dirname(options['save_path'])):
os.makedirs(os.path.dirname(options['save_path']))
if mode:
with open(options['save_path'], mode) as f:
for path, fit, error in zip(data['path'], data['centroid_fit'], data['centroid_fit_errors']):
A = fit[0]
mu = fit[1]
sigma = fit[2]
mu_adj = (fit[1]*1000)-reference
stddevs = np.sqrt(np.diag(error))
#f.write(f'{path} \t {fit[1]*1000} \t {(fit[1]-reference)*1000} \t {fit[0]} \t {fit[2]} \n')
f.write('{: <40} \t {: <25} \t {: <25} \t {: <25} \t {: <25} \t {: <25} \t {: <25} \t {: <25}\n'.format(path, mu*1000, mu_adj, A, sigma, stddevs[1]*1000, stddevs[0], stddevs[2]))
def read_centroids(path):
df = pd.read_csv(path, delim_whitespace=True, header=None)
df.columns = ['scan', 'mu', 'mu_adj', 'A', 'sigma', 'mu_err', 'A_err', 'sigma_err']
return df