Merge pull request #7 from rasmusthog/rasmus_xanes

Rasmus xanes
This commit is contained in:
nanowhale 2022-06-24 17:00:11 +02:00 committed by GitHub
commit b2b19086a5
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 523 additions and 319 deletions

View file

@ -1,3 +1,5 @@
from logging import raiseExceptions
from jinja2 import TemplateRuntimeError
import pandas as pd import pandas as pd
import numpy as np import numpy as np
import os import os
@ -41,9 +43,9 @@ def pre_edge_fit(data: dict, options={}) -> pd.DataFrame:
# FIXME Add log-file # FIXME Add log-file
required_options = ['edge_start', 'log', 'logfile', 'save_plots', 'save_folder'] required_options = ['pre_edge_start', 'log', 'logfile', 'save_plots', 'save_folder']
default_options = { default_options = {
'edge_start': None, 'pre_edge_start': None,
'log': False, 'log': False,
'logfile': f'{datetime.now().strftime("%Y-%m-%d-%H-%M-%S")}_pre_edge_fit.log', 'logfile': f'{datetime.now().strftime("%Y-%m-%d-%H-%M-%S")}_pre_edge_fit.log',
'save_plots': False, 'save_plots': False,
@ -60,18 +62,13 @@ def pre_edge_fit(data: dict, options={}) -> pd.DataFrame:
# FIXME Implement with finding accurate edge position # FIXME Implement with finding accurate edge position
# FIXME Allow specification of start of pre-edge area # FIXME Allow specification of start of pre-edge area
# Find the cutoff point at which the edge starts - everything to the LEFT of this point will be used in the pre edge function fit # Find the cutoff point at which the edge starts - everything to the LEFT of this point will be used in the pre edge function fit
if not options['edge_start']: if not options['pre_edge_start']:
pre_edge_limit_offsets = { pre_edge_limit_offset = 0.03
'Mn': 0.03,
'Fe': 0.03,
'Co': 0.03,
'Ni': 0.03
}
data['edge'] = find_element(data) data['edge'] = find_element(data)
edge_position = estimate_edge_position(data, options, index=0) edge_position = estimate_edge_position(data, options, index=0)
pre_edge_limit = edge_position - pre_edge_limit_offsets[data['edge']] pre_edge_limit = edge_position - pre_edge_limit_offset
# FIXME There should be an option to specify the interval in which to fit the background - now it is taking everything to the left of edge_start parameter, but if there are some artifacts in this area, it should be possible to # FIXME There should be an option to specify the interval in which to fit the background - now it is taking everything to the left of edge_start parameter, but if there are some artifacts in this area, it should be possible to
# limit the interval # limit the interval
@ -81,6 +78,8 @@ def pre_edge_fit(data: dict, options={}) -> pd.DataFrame:
# Making a new dataframe, with only the ZapEnergies as the first column -> will be filled to include the background data # Making a new dataframe, with only the ZapEnergies as the first column -> will be filled to include the background data
pre_edge_fit_data = pd.DataFrame(data['xanes_data_original']["ZapEnergy"]) pre_edge_fit_data = pd.DataFrame(data['xanes_data_original']["ZapEnergy"])
data['pre_edge_params'] = {}
for i, filename in enumerate(data['path']): for i, filename in enumerate(data['path']):
if options['log']: if options['log']:
aux.write_log(message=f'Fitting background on {os.path.basename(filename)} ({i+1} / {len(data["path"])})', options=options) aux.write_log(message=f'Fitting background on {os.path.basename(filename)} ({i+1} / {len(data["path"])})', options=options)
@ -89,6 +88,8 @@ def pre_edge_fit(data: dict, options={}) -> pd.DataFrame:
params = np.polyfit(pre_edge_data["ZapEnergy"],pre_edge_data[filename],1) params = np.polyfit(pre_edge_data["ZapEnergy"],pre_edge_data[filename],1)
fit_function = np.poly1d(params) fit_function = np.poly1d(params)
data['pre_edge_params'][filename] = params
#making a list, y_pre,so the background will be applied to all ZapEnergy-values #making a list, y_pre,so the background will be applied to all ZapEnergy-values
background=fit_function(pre_edge_fit_data["ZapEnergy"]) background=fit_function(pre_edge_fit_data["ZapEnergy"])
@ -131,7 +132,7 @@ def pre_edge_subtraction(data: dict, options={}):
required_options = ['log', 'logfile', 'save_plots', 'save_folder'] required_options = ['log', 'logfile', 'save_plots', 'save_folder']
default_options = { default_options = {
'log': False, 'log': False,
'logfile': f'{datetime.now().strftime("%Y-%m-%d-%H-%M-%S.log")}_pre_edge_subtraction.log', 'logfile': f'{datetime.now().strftime("%Y-%m-%d-%H-%M-%S")}_pre_edge_subtraction.log',
'save_plots': False, 'save_plots': False,
'save_folder': './' 'save_folder': './'
} }
@ -165,11 +166,167 @@ def pre_edge_subtraction(data: dict, options={}):
return xanes_data_bkgd_subtracted return xanes_data_bkgd_subtracted
def post_edge_fit(data: dict, options={}):
#FIXME should be called "fitting post edge" (normalization is not done here, need edge shift position)
required_options = ['log', 'logfile', 'post_edge_interval']
default_options = {
'log': False,
'logfile': f'{datetime.now().strftime("%Y-%m-%d-%H-%M-%S")}_post_edge_fit.log',
'post_edge_interval': [None, None],
}
options = aux.update_options(options=options, required_options=required_options, default_options=default_options)
if not options['post_edge_interval'][0]:
post_edge_limit_offset = 0.03
data['edge'] = find_element(data)
edge_position = estimate_edge_position(data, options, index=0)
options['post_edge_interval'][0] = edge_position + post_edge_limit_offset
if not options['post_edge_interval'][1]:
options['post_edge_interval'][1] = data['xanes_data_original']['ZapEnergy'].max()
post_edge_data = data['xanes_data_original'].loc[(data['xanes_data_original']["ZapEnergy"] > options['post_edge_interval'][0]) & (data['xanes_data_original']["ZapEnergy"] < options['post_edge_interval'][1])]
post_edge_data.dropna(inplace=True) #Removing all indexes without any value, as some of the data sets misses the few last data points and fucks up the fit
# Making a new dataframe, with only the ZapEnergies as the first column -> will be filled to include the background data
post_edge_fit_data = pd.DataFrame(data['xanes_data_original']["ZapEnergy"])
data['post_edge_params'] = {}
for i, filename in enumerate(data['path']):
if options['log']:
aux.write_log(message=f'Fitting post edge on {os.path.basename(filename)} ({i+1} / {len(data["path"])})', options=options)
#Fitting linear function to the background
params = np.polyfit(post_edge_data["ZapEnergy"], post_edge_data[filename], 2)
fit_function = np.poly1d(params)
data['post_edge_params'][filename] = params
#making a list, y_pre,so the background will be applied to all ZapEnergy-values
background=fit_function(post_edge_fit_data["ZapEnergy"])
#adding a new column in df_background with the y-values of the background
post_edge_fit_data.insert(1,filename,background)
if options['save_plots']:
if not os.path.isdir(options['save_folder']):
os.makedirs(options['save_folder'])
dst = os.path.join(options['save_folder'], os.path.basename(filename)) + '_post_edge_fit.png'
fig, (ax1, ax2) = plt.subplots(1,2,figsize=(10,5))
data['xanes_data_original'].plot(x='ZapEnergy', y=filename, color='black', ax=ax1)
post_edge_fit_data.plot(x='ZapEnergy', y=filename, color='red', ax=ax1)
ax1.axvline(x = max(post_edge_data['ZapEnergy']), ls='--')
ax1.set_title(f'{os.path.basename(filename)} - Full view', size=20)
data['xanes_data_original'].plot(x='ZapEnergy', y=filename, color='black', ax=ax2)
post_edge_fit_data.plot(x='ZapEnergy', y=filename, color='red', ax=ax2)
ax2.axvline(x = max(post_edge_data['ZapEnergy']), ls='--')
ax2.set_xlim([min(post_edge_data['ZapEnergy']), max(post_edge_data['ZapEnergy'])])
ax2.set_ylim([min(post_edge_data[filename]), max(post_edge_data[filename])])
ax2.set_title(f'{os.path.basename(filename)} - Fit region', size=20)
plt.savefig(dst, transparent=False)
plt.close()
return post_edge_fit_data
def smoothing(data: dict, options={}):
# FIXME Add logging
# FIXME Add saving of files
required_options = ['log', 'logfile', 'window_length','polyorder', 'save_default']
default_options = {
'log': False,
'logfile': f'{datetime.now().strftime("%Y-%m-%d-%H-%M-%S")}_smoothing.log',
'save_plots': False,
'save_folder': './',
'window_length': 3,
'polyorder': 2,
'save_default': False
}
options = aux.update_options(options=options, required_options=required_options, default_options=default_options)
df_smooth = pd.DataFrame(data['xanes_data']['ZapEnergy'])
if options['save_default']:
df_smooth_default = pd.DataFrame(data['xanes_data']['ZapEnergy'])
# FIXME Add other types of filters
# FIXME Instead of assigning values directly to the data dictionary, these should be made into an own DataFrame that you can decide later what to do with - these variables should
# then be returned
for filename in data['path']:
df_smooth.insert(1, filename, savgol_filter(data['xanes_data'][filename], options['window_length'], options['polyorder']))
if options['save_default']:
df_smooth_default.insert(1, filename, savgol_filter(data['xanes_data'][filename], default_options['window_length'], default_options['polyorder']))
if options['save_plots']:
if not os.path.isdir(options['save_folder']):
os.makedirs(options['save_folder'])
dst = os.path.join(options['save_folder'], os.path.basename(filename)) + '_smooth.png'
edge_pos = estimate_edge_position(data=data, options=options)
intensity_midpoint = df_smooth[filename].iloc[np.where(df_smooth['ZapEnergy'] == find_nearest(df_smooth['ZapEnergy'], edge_pos))].values[0]
if options['save_default']:
fig, (ax1, ax2) = plt.subplots(1,2,figsize=(20,5))
data['xanes_data'].loc[(data['xanes_data']['ZapEnergy'] > edge_pos-0.0015) & (data['xanes_data']['ZapEnergy'] < edge_pos+0.0015)].plot(x='ZapEnergy', y=filename, color='black', ax=ax1, kind='scatter')
df_smooth.loc[(df_smooth['ZapEnergy'] > edge_pos-0.0015) & (df_smooth['ZapEnergy'] < edge_pos+0.0015)].plot(x='ZapEnergy', y=filename, color='red', ax=ax1)
ax1.set_title(f'{os.path.basename(filename)} - Smooth', size=20)
data['xanes_data'].loc[(data['xanes_data']['ZapEnergy'] > edge_pos-0.0015) & (data['xanes_data']['ZapEnergy'] < edge_pos+0.0015)].plot(x='ZapEnergy', y=filename, color='black', ax=ax2, kind='scatter')
df_smooth_default.loc[(df_smooth_default['ZapEnergy'] > edge_pos-0.0015) & (df_smooth_default['ZapEnergy'] < edge_pos+0.0015)].plot(x='ZapEnergy', y=filename, color='red', ax=ax2)
ax2.set_title(f'{os.path.basename(filename)} - Smooth (default values)', size=20)
elif not options['save_default']:
fig, ax = plt.subplots(figsize=(10,5))
data['xanes_data'].loc[(data['xanes_data']['ZapEnergy'] > edge_pos-0.0015) & (data['xanes_data']['ZapEnergy'] < edge_pos+0.0015)].plot(x='ZapEnergy', y=filename, color='black', ax=ax, kind='scatter')
df_smooth.loc[(df_smooth['ZapEnergy'] > edge_pos-0.0015) & (df_smooth['ZapEnergy'] < edge_pos+0.0015)].plot(x='ZapEnergy', y=filename, color='red', ax=ax)
ax.set_xlim([edge_pos-0.0015, edge_pos+0.0015])
ax.set_ylim([intensity_midpoint*0.9, intensity_midpoint*1.1])
ax.set_title(f'{os.path.basename(filename)} - Smooth', size=20)
plt.savefig(dst, transparent=False)
plt.close()
if not options['save_default']:
df_smooth_default = None
return df_smooth, df_smooth_default
def find_nearest(array, value):
#function to find the value closes to "value" in an "array"
array = np.asarray(array)
idx = (np.abs(array - value)).argmin()
return array[idx]
def estimate_edge_position(data: dict, options={}, index=0): def estimate_edge_position(data: dict, options={}, index=0):
#a dataset is differentiated to find a first estimate of the edge shift to use as starting point. #a dataset is differentiated to find a first estimate of the edge shift to use as starting point.
required_options = ['print','periods'] required_options = ['log','logfile', 'periods']
default_options = { default_options = {
'print': False, 'log': False,
'logfile': f'{datetime.now().strftime("%Y-%m-%d-%H-%M-%S")}_edge_position_estimation.log',
'periods': 2, #Periods needs to be an even number for the shifting of values to work properly 'periods': 2, #Periods needs to be an even number for the shifting of values to work properly
} }
options = aux.update_options(options=options, required_options=required_options, default_options=default_options) options = aux.update_options(options=options, required_options=required_options, default_options=default_options)
@ -190,256 +347,224 @@ def estimate_edge_position(data: dict, options={}, index=0):
return estimated_edge_shift return estimated_edge_shift
def determine_edge_position(data: dict, options={}):
def post_edge_fit(path, options={}): required_options = ['save_values', 'log', 'logfile', 'save_plots', 'save_folder', 'periods', 'diff', 'double_diff', 'fit_region']
#FIXME should be called "fitting post edge" (normalization is not done here, need edge shift position)
required_options = ['print']
default_options = { default_options = {
'print': False 'save_values': True,
'log': False,
'logfile': f'{datetime.now().strftime("%Y-%m-%d-%H-%M-%S")}_determine_edge_position.log',
'save_plots': False,
'save_folder': './',
'periods': 2, #Periods needs to be an even number for the shifting of values to work properly,
'diff': True,
'double_diff': False,
'fit_region': 0.0005
} }
options = aux.update_options(options=options, required_options=required_options, default_options=default_options) options = aux.update_options(options=options, required_options=required_options, default_options=default_options)
df_bkgd_sub,filenames,edge = pre_edge_subtraction(path, options=options)
#Defining the end of the pre-edge-region for Mn/Ni, thus start of the edge
#FIXME Use rought edge shift estimate, add X eV as first guess, have an option to adjust this value with widget
if edge == 'Mn':
edge_stop = 6.565
if edge == 'Ni':
edge_stop = 8.361
df_end= df_bkgd_sub.loc[df_bkgd_sub["ZapEnergy"] > edge_stop] # new dataframe only containing the post edge, where a regression line will be calculated in the for-loop below
df_end.dropna(inplace=True) #Removing all indexes without any value, as some of the data sets misses the few last data points and fucks up the fit
df_postedge = pd.DataFrame(df_bkgd_sub["ZapEnergy"]) #making a new dataframe
function_post_list=[]
for files in filenames:
d = np.polyfit(df_end["ZapEnergy"],df_end[files],1)
function_post = np.poly1d(d)
y_post=function_post(df_bkgd_sub["ZapEnergy"])
function_post_list.append(function_post)
df_postedge.insert(1,files,y_post) #adding a new column with the y-values of the fitted post edge
#Plotting the background subtracted signal with the post-edge regression line and the start point for the linear regression line
if options['print'] == True:
ax = df_bkgd_sub.plot(x = "ZapEnergy",y=filenames) #defining x and y
plt.axvline(x = min(df_end["ZapEnergy"]))
fig = plt.figure(figsize=(15,15))
df_postedge.plot(x="ZapEnergy", y=filenames,color="Green",ax=ax, legend=False)
ax = df_bkgd_sub.plot(x = "ZapEnergy",y=filenames, legend=False) #defining x and y
df_postedge.plot(x="ZapEnergy", y=filenames,color="Green",ax=ax, legend=False)
plt.axvline(x = min(df_end["ZapEnergy"]))
return df_bkgd_sub, df_postedge, filenames, edge
def smoothing(path, options={}):
required_options = ['print','window_length','polyorder']
default_options = {
'print': False,
'window_length': 3,
'polyorder': 2
}
options = aux.update_options(options=options, required_options=required_options, default_options=default_options)
df_bkgd_sub, df_postedge, filenames, edge = post_edge_fit(path,options=options)
#================= SMOOTHING
df_smooth = pd.DataFrame(df_bkgd_sub["ZapEnergy"])
df_default = pd.DataFrame(df_bkgd_sub["ZapEnergy"])
#df_smooth[filenames] = df_bkgd_sub.iloc[:,2].rolling(window=rolling_av).mean()
#df_smooth[filenames] = df_smooth[filenames].shift(-int((rolling_av)/2))
for filename in filenames:
x_smooth=savgol_filter(df_bkgd_sub[filename], options['window_length'],options['polyorder'])
df_smooth[filename] = x_smooth
x_default=savgol_filter(df_bkgd_sub[filename],default_options['window_length'],default_options['polyorder'])
df_default[filename] = x_default
#printing the smoothed curves vs data
if options['print'] == True:
## ================================================
#df_diff = pd.DataFrame(df_smooth["ZapEnergy"])
#df_diff_estimated_max = df_diff[filenames].dropna().max()
#estimated_edge_shift=df_diff.loc[df_diff[filenames] == df_diff_max,'ZapEnergy'].values[0]
# ==========================================
fig, (ax1,ax2) = plt.subplots(1,2,figsize=(15,5))
x_range_zoom=[6.54,6.55] #make into widget
y_range_zoom=[20000,80000] #make into widget
df_bkgd_sub.plot.scatter(x = "ZapEnergy",y=filenames, ax=ax1, color="Red")
df_smooth.plot(x = "ZapEnergy",y=filenames, ax=ax1, color="Blue")
ax1.set_xlim(x_range_zoom)
ax1.set_ylim(y_range_zoom)
ax1.set_title("Smoothed curve (blue) vs data (red) used for further analysis")
df_bkgd_sub.plot.scatter(x = "ZapEnergy",y=filenames, ax=ax2, color="Red")
df_default.plot(x = "ZapEnergy",y=filenames, ax=ax2, color="Green")
ax2.set_xlim(x_range_zoom)
ax2.set_ylim(y_range_zoom)
ax2.set_title("Smoothed curve (green) vs data (red) using default window_length and polyorder")
return df_smooth, filenames
def find_nearest(array, value):
#function to find the value closes to "value" in an "array"
array = np.asarray(array)
idx = (np.abs(array - value)).argmin()
return array[idx]
def finding_e0(path, options={}):
required_options = ['print','periods']
default_options = {
'print': False,
'periods': 2, #Periods needs to be an even number for the shifting of values to work properly
}
options = aux.update_options(options=options, required_options=required_options, default_options=default_options)
df_smooth, filenames = smoothing(path, options=options) #This way the smoothing is printed as long as the "finding e0" is printed.
if options['periods'] % 2 == 1: if options['periods'] % 2 == 1:
print("NB!!!!!!!!!!!!!!!!! Periods needs to be an even number for the shifting of values to work properly") raise Exception("NB! Periods needs to be an even number for the shifting of values to work properly")
###df_diff = pd.DataFrame(df_smooth["ZapEnergy"]) #
if len(filenames) == 1:
filenames=filenames[0]
else:
print("MORE THAN ONE FILE --> generalize")
##### #####
estimated_edge_shift, df_diff, df_diff_max = estimate_edge_position(df_smooth, filenames,options=options)
print(estimated_edge_shift)
####
###df_diff[filenames]=df_smooth[filenames].diff(periods=options['periods']) #
df_doublediff=pd.DataFrame(df_smooth["ZapEnergy"])
df_doublediff[filenames]=df_diff[filenames].diff(periods=options['periods'])
if options['print'] == True: if options['diff']:
fig, (ax1,ax2) = plt.subplots(1,2,figsize=(15,5)) df_diff = pd.DataFrame(data['xanes_data']['ZapEnergy'])
if options['double_diff']:
df_diff.plot(x = "ZapEnergy",y=filenames, ax=ax1) #defining x and y df_double_diff = pd.DataFrame(data['xanes_data']['ZapEnergy'])
df_doublediff.plot(x = "ZapEnergy",y=filenames,ax=ax2) #defining x and y if options['save_values']:
data['e0'] = {}
#shifting column values up so that average differential fits right between the points used in the calculation
#df_diff[filenames]=df_diff[filenames].shift(-int(options['periods']/2)) #
df_doublediff[filenames]=df_doublediff[filenames].shift(-int(options['periods']))
#finding maximum value to maneuver to the correct part of the data set
#df_diff_max = df_diff[filenames].dropna().max()
estimated_edge_shift=df_diff.loc[df_diff[filenames] == df_diff_max,'ZapEnergy'].values[0]
fit_region = 0.0004
df_diff_edge=df_diff.loc[(df_diff["ZapEnergy"] < estimated_edge_shift+fit_region)]# and (df_diff["ZapEnergy"] > estimated_edge_shift-0.05)]
df_diff_edge=df_diff_edge.loc[(df_diff["ZapEnergy"] > estimated_edge_shift-fit_region)]
df_doublediff_edge=df_doublediff.loc[(df_doublediff["ZapEnergy"] < estimated_edge_shift+fit_region)]# and (df_diff["ZapEnergy"] > estimated_edge_shift-0.05)]
df_doublediff_edge=df_doublediff_edge.loc[(df_doublediff["ZapEnergy"] > estimated_edge_shift-fit_region)]
#df_diff_edge=df_diff.loc[(df_diff["ZapEnergy"] > estimated_edge_shift-0.15) and (df_diff["ZapEnergy"] < estimated_edge_shift+0.15)]
#df_diff_edge=df_diff.loc[df_diff["ZapEnergy"] > estimated_edge_shift-0.15]
#print(df_diff_edge)
if options['print'] == True:
fig, (ax3,ax4) = plt.subplots(1,2,figsize=(15,5))
df_diff_edge.plot(x = "ZapEnergy",y=filenames,ax=ax3) #defining x and y
ax3.set_title("Zoomed into edge region (derivative))")
ax3.axvline(x = estimated_edge_shift)
df_doublediff_edge.plot(x = "ZapEnergy",y=filenames,ax=ax4,kind="scatter") #defining x and y
ax4.set_title("Zoomed into edge region (double derivative)")
ax4.axvline(x = estimated_edge_shift)
ax4.axhline(0)
#ax1.set_xlim([estimated_edge_shift-fit_region,estimated_edge_shift+fit_region])
#ax1.set_title("not sure what this is tbh")
#ax2.set_xlim([estimated_edge_shift-fit_region,estimated_edge_shift+fit_region])
#ax2.set_title("not sure what this is either tbh")
#==============
#df_smooth=df_smooth2
#=================
for i, filename in enumerate(data['path']):
estimated_edge_pos = estimate_edge_position(data, options=options, index=i)
#========================== fitting first differential ========== #========================== fitting first differential ==========
df_diff = df_diff[df_diff[filenames].notna()]
#fitting a function to the chosen interval if options['diff']:
d = np.polyfit(df_diff_edge["ZapEnergy"],df_diff_edge[filenames],2) df_diff[filename] = data['xanes_data'][filename].diff(periods=options['periods'])
function_diff = np.poly1d(d) df_diff[filename]=df_diff[filename].shift(-int(options['periods']/2))
x_diff=np.linspace(df_diff_edge["ZapEnergy"].iloc[0],df_diff_edge["ZapEnergy"].iloc[-1],num=1000) df_diff_edge = df_diff.loc[(df_diff["ZapEnergy"] < estimated_edge_pos+options['fit_region']) & ((df_diff["ZapEnergy"] > estimated_edge_pos-options['fit_region']))]
y_diff=function_diff(x_diff)
#print(df_diff_edge["ZapEnergy"].iloc[-1])
if options['print'] == True:
ax3.plot(x_diff,y_diff,color='Green')
#y_diff_max=np.amax(y_diff,0)
y_diff_max_index = np.where(y_diff == np.amax(y_diff))
#print(y_diff_max_index[0])
edge_shift_diff=float(x_diff[y_diff_max_index])
print("Edge shift estimated by the differential maximum is "+str(round(edge_shift_diff,5)))
if options['print'] == True:
ax3.axvline(x=edge_shift_diff,color="green")
#print(df_doublediff_edge["ZapEnergy"].iloc[0])
#ax4.plot(x_doublediff,y_doublediff,color='Green'))
#fitting double differentiate # Fitting a function to the chosen interval
df_doublediff = df_doublediff[df_doublediff[filenames].notna()] params = np.polyfit(df_diff_edge["ZapEnergy"], df_diff_edge[filename], 2)
d = np.polyfit(df_doublediff_edge["ZapEnergy"],df_doublediff_edge[filenames],2) diff_function = np.poly1d(params)
function_doublediff = np.poly1d(d)
x_doublediff=np.linspace(df_doublediff_edge["ZapEnergy"].iloc[0],df_doublediff_edge["ZapEnergy"].iloc[-1],num=10000) x_diff=np.linspace(df_diff_edge["ZapEnergy"].iloc[0],df_diff_edge["ZapEnergy"].iloc[-1],num=10000)
y_doublediff=function_doublediff(x_doublediff) y_diff=diff_function(x_diff)
if options['print'] == True: df_diff_fit_function = pd.DataFrame(x_diff)
ax4.plot(x_doublediff,y_doublediff,color='Green') df_diff_fit_function['y_diff'] = y_diff
df_diff_fit_function.columns = ['x_diff', 'y_diff']
y_doublediff_zero=find_nearest(y_doublediff,0) # Picks out the x-value where the y-value is at a maximum
y_doublediff_zero_index = np.where(y_doublediff == y_doublediff_zero) edge_pos_diff=x_diff[np.where(y_diff == np.amax(y_diff))][0]
edge_shift_doublediff=float(x_doublediff[y_doublediff_zero_index]) if options['log']:
aux.write_log(message=f"Edge position estimated by the differential maximum is: {str(round(edge_pos_diff,5))}", options=options)
print("Edge shift estimated by the double differential zero-point is "+str(round(edge_shift_doublediff,5))) if options['save_values']:
if options['print'] == True: data['e0'][filename] = edge_pos_diff
ax4.axvline(x=edge_shift_doublediff,color="green")
return df_smooth, filenames, edge_shift_diff
def normalization(data,options={}): if options['double_diff']:
required_options = ['print'] df_double_diff[filename] = data['xanes_data'][filename].diff(periods=options['periods']).diff(periods=options['periods'])
df_double_diff[filename]=df_double_diff[filename].shift(-int(options['periods']))
# Pick out region of interest
df_double_diff_edge = df_double_diff.loc[(df_double_diff["ZapEnergy"] < estimated_edge_pos+options['fit_region']) & ((df_double_diff["ZapEnergy"] > estimated_edge_pos-options['fit_region']))]
# Fitting a function to the chosen interval
params = np.polyfit(df_double_diff_edge["ZapEnergy"], df_double_diff_edge[filename], 2)
double_diff_function = np.poly1d(params)
x_double_diff=np.linspace(df_double_diff_edge["ZapEnergy"].iloc[0], df_double_diff_edge["ZapEnergy"].iloc[-1],num=10000)
y_double_diff=double_diff_function(x_double_diff)
df_double_diff_fit_function = pd.DataFrame(x_double_diff)
df_double_diff_fit_function['y_diff'] = y_double_diff
df_double_diff_fit_function.columns = ['x_diff', 'y_diff']
# Picks out the x-value where the y-value is closest to 0
edge_pos_double_diff=x_double_diff[np.where(y_double_diff == find_nearest(y_double_diff,0))][0]
if options['log']:
aux.write_log(message=f"Edge shift estimated by the double differential zero-point is {str(round(edge_pos_double_diff,5))}", options=options)
if options['save_plots']:
if options['diff'] and options['double_diff']:
fig, ((ax1, ax2), (ax3, ax4)) = plt.subplots(ncols=2, nrows=2, figsize=(20,20))
df_diff.plot(x='ZapEnergy', y=filename, ax=ax1, kind='scatter')
df_diff_fit_function.plot(x='x_diff', y='y_diff', ax=ax1)
ax1.set_xlim([edge_pos_diff-0.0015, edge_pos_diff+0.0015])
ax1.axvline(x=edge_pos_diff-options['fit_region'], ls='--', c='black')
ax1.axvline(x=edge_pos_diff, ls='--', c='green')
ax1.axvline(x=edge_pos_diff+options['fit_region'], ls='--', c='black')
ax1.set_title('Fit region of differentiated data')
df_diff_edge.plot(x='ZapEnergy', y=filename, ax=ax2, kind='scatter')
df_diff_fit_function.plot(x='x_diff', y='y_diff', ax=ax2)
ax2.axvline(x=edge_pos_diff, ls='--', c='green')
ax2.axvline(x=estimated_edge_pos, ls='--', c='red')
ax2.set_title('Fit of differentiated data')
df_double_diff.plot(x='ZapEnergy', y=filename, ax=ax3, kind='scatter')
df_double_diff_fit_function.plot(x='x_diff', y='y_diff', ax=ax3)
ax3.set_xlim([edge_pos_double_diff-0.0015, edge_pos_double_diff+0.0015])
ax3.axvline(x=edge_pos_double_diff-options['fit_region'], ls='--', c='black')
ax3.axvline(x=edge_pos_double_diff, ls='--', c='green')
ax3.axvline(x=edge_pos_double_diff+options['fit_region'], ls='--', c='black')
df_double_diff_edge.plot(x='ZapEnergy', y=filename, ax=ax4, kind='scatter')
df_double_diff_fit_function.plot(x='x_diff', y='y_diff', ax=ax4)
ax4.axvline(x=edge_pos_double_diff, ls='--', c='green')
ax4.axvline(x=estimated_edge_pos, ls='--', c='red')
elif options['diff']:
fig, (ax1, ax2) = plt.subplots(ncols=2,nrows=1, figsize=(20, 10))
df_diff.plot(x='ZapEnergy', y=filename, ax=ax1, kind='scatter')
ax1.set_xlim([edge_pos_diff-0.5, edge_pos_diff+0.5])
ax1.axvline(x=edge_pos_diff-options['fit_region'], ls='--', c='black')
ax1.axvline(x=edge_pos_diff, ls='--', c='green')
ax1.axvline(x=edge_pos_diff+options['fit_region'], ls='--', c='black')
df_diff_edge.plot(x='ZapEnergy', y=filename, ax=ax2)
ax2.axvline(x=edge_pos_diff, ls='--', c='green')
ax2.axvline(x=estimated_edge_pos, ls='--', c='red')
elif options['double_diff']:
fig, (ax1, ax2) = plt.subplots(ncols=2,nrows=1, figsize=(20, 10))
df_double_diff.plot(x='ZapEnergy', y=filename, ax=ax1, kind='scatter')
ax1.set_xlim([edge_pos_double_diff-0.5, edge_pos_double_diff+0.5])
ax1.axvline(x=edge_pos_double_diff-options['fit_region'], ls='--', c='black')
ax1.axvline(x=edge_pos_double_diff, ls='--', c='green')
ax1.axvline(x=edge_pos_double_diff+options['fit_region'], ls='--', c='black')
df_double_diff_edge.plot(x='ZapEnergy', y=filename, ax=ax2)
ax2.axvline(x=edge_pos_double_diff, ls='--', c='green')
ax2.axvline(x=estimated_edge_pos, ls='--', c='red')
if not options['diff']:
edge_pos_diff = None
if not options['double_diff']:
edge_pos_double_diff = None
return edge_pos_diff, edge_pos_double_diff
def normalise(data: dict, options={}):
required_options = ['log', 'logfile', 'save_values']
default_options = { default_options = {
'print': False, 'log': False,
'logfile': f'{datetime.now().strftime("%Y-%m-%d-%H-%M-%S")}_normalisation.log',
'save_values': True
} }
options = aux.update_options(options=options, required_options=required_options, default_options=default_options) options = aux.update_options(options=options, required_options=required_options, default_options=default_options)
#Finding the normalization constant µ_0(E_0), by subtracting the value of the pre-edge-line from the value of the post-edge line at e0 normalised_df = pd.DataFrame(data['xanes_data']['ZapEnergy'])
normalization_constant=post_edge_fit_function(e0) - pre_edge_fit_function(e0) data['normalisation_constants'] = {}
#subtracting background (as in pre_edge_subtraction) #Finding the normalisation constant µ_0(E_0), by subtracting the value of the pre-edge-line from the value of the post-edge line at e0
for filename in data['path']:
#dividing the background-subtracted data with the normalization constant e0_ind = data['post_edge_fit_data'].loc[data['post_edge_fit_data']['ZapEnergy'] == find_nearest(data['post_edge_fit_data']['ZapEnergy'], data['e0'][filename])].index.values[0]
#norm = data['post_edge_fit_data'][filename].iloc[find_nearest(data['post_edge_fit_data'][filename], data['e0'][filename])]
normalisation_constant = data['post_edge_fit_data'][filename].iloc[e0_ind] - data['pre_edge_fit_data'][filename].iloc[e0_ind]
normalised_df.insert(1, filename, data['xanes_data'][filename] / normalisation_constant)
def flattening(data,options={}): # Normalise the pre-edge and post-edge fit function data
data['pre_edge_fit_data'][filename] = data['pre_edge_fit_data'][filename] / normalisation_constant
data['post_edge_fit_data'][filename] = data['post_edge_fit_data'][filename] / normalisation_constant
data['normalisation_constants'][filename] = normalisation_constant
if options['save_values']:
data['xanes_data'] = normalised_df
return normalised_df
def flatten(data:dict, options={}):
#only picking out zapenergy-values higher than edge position (edge pos and below remains untouched) #only picking out zapenergy-values higher than edge position (edge pos and below remains untouched)
df_e0_and_above=df.loc[df['ZapEnergy'] > edge_shift_diff]
flattened_data = post_edge_fit_function(df_e0_and_above['ZapEnergy']) - pre_edge_fit_function(df_e0_and_above['ZapEnergy']) required_options = ['log', 'logfile', 'save_values']
default_options = {
'log': False,
'logfile': f'{datetime.now().strftime("%Y-%m-%d-%H-%M-%S")}_flattening.log',
'save_values': True
}
options = aux.update_options(options=options, required_options=required_options, default_options=default_options)
flattened_df = pd.DataFrame(data['xanes_data']['ZapEnergy'])
for filename in data['path']:
fit_function_diff = -data['post_edge_fit_data'][filename] + data['pre_edge_params'][filename][0]
fit_function_diff.loc[flattened_df['ZapEnergy'] <= data['e0'][filename]] = 0
flattened_df[filename] = data['xanes_data'][filename] - fit_function_diff
if options['save_values']:
data['xanes_data'] = flattened_df
return flattened_df, fit_function_diff
#make a new dataframe with flattened values #make a new dataframe with flattened values

View file

@ -3,93 +3,169 @@ import matplotlib.pyplot as plt
import os import os
import numpy as np import numpy as np
import nafuma.auxillary as aux import nafuma.auxillary as aux
from nafuma.xanes.calib import find_element
from datetime import datetime
def split_xanes_scan(root, destination=None, replace=False): def split_scan_data(data: dict, options={}) -> list:
#root is the path to the beamtime-folder ''' Splits a XANES-file from BM31 into different files depending on the edge. Has the option to add intensities of all scans of same edge into the same file.
#destination should be the path to the processed data As of now only picks out xmap_rois (fluoresence mode) and for Mn, Fe, Co and Ni K-edges.'''
#insert a for-loop to go through all the folders.dat-files in the folder root\xanes\raw
# FIXME Only adding this variable to pass the Linting-tests - will refactor this later required_options = ['log', 'logfile', 'save', 'save_folder', 'replace', 'add_rois']
filename = 'dummy'
default_options = {
'log': False,
'logfile': f'{datetime.now().strftime("%Y-%m-%d-%H-%M-%S")}_split_edges.log',
'save': False, # whether to save the files or not
'save_folder': '.', # root folder of where to save the files
'replace': False, # whether to replace the files if they already exist
'add_rois': False # Whether to add the rois of individual scans of the same edge together
}
options = aux.update_options(options=options, required_options=required_options, default_options=default_options)
if not isinstance(data['path'], list):
data['path'] = [data['path']]
all_scans = []
if options['log']:
aux.write_log(message='Starting file splitting...', options=options)
for filename in data['path']:
if options['log']:
aux.write_log(message=f'Reading {filename}...', options=options)
with open(filename, 'r') as f: with open(filename, 'r') as f:
lines = f.readlines() lines = f.readlines()
datas = [] scan_datas, scan_data = [], []
data = [] headers, header = [], ''
headers = [] read_data = False
header = ''
start = False
for line in lines: for line in lines:
# Header line starts with #L - reads headers, and toggles data read-in on
if line[0:2] == "#L": if line[0:2] == "#L":
start = True header, read_data = line[2:].split(), True
header = line[2:].split()
if options['log']:
aux.write_log(message='... Found scan data. Starting read-in...', options=options)
continue continue
# First line after data started with #C - stops data read-in
elif line[0:2] == "#C": elif line[0:2] == "#C":
start = False read_data = False
if data: if scan_data:
datas.append(data) scan_datas.append(scan_data); scan_data = []
data = []
if header: if header:
headers.append(header) headers.append(header); header = ''
header = ''
# Ignore line if read-in not toggled
if read_data == False:
if start == False:
continue continue
# Read in data if it is
else: else:
data.append(line.split()) scan_data.append(line.split())
edges = {'Mn': [], 'Fe': [], 'Co': [], 'Ni': []}
for i, scan_data in enumerate(scan_datas):
xanes_df = pd.DataFrame(scan_data).apply(pd.to_numeric)
xanes_df.columns = headers[i]
edge = find_element({'xanes_data_original': xanes_df})
if options['log']:
aux.write_log(message=f'... Starting data clean-up ({edge}-edge)... ({i+1}/{len(scan_datas)})', options=options)
if not ('xmap_roi00' in headers[i]) and (not 'xmap_roi01' in headers[i]):
if options['log']:
aux.write_log(message='... ... Did not find fluoresence data. Skipping...', options=options)
continue
edges[edge].append(xanes_df)
edges = {'Mn': [6.0, 6.1, 6.2, 6.3, 6.4, 6.5], 'Fe': [6.8, 6.9, 7.0, 7.1, 7.2], 'Co': [7.6, 7.7, 7.8, 7.9], 'Ni': [8.1, 8.2, 8.3, 8.4, 8.5]}
edge_count = {'Mn': 0, 'Fe': 0, 'Co': 0, 'Ni': 0}
for ind, data in enumerate(datas): if options['add']:
df = pd.DataFrame(data)
df.columns = headers[ind]
edge_start = np.round((float(df["ZapEnergy"].min())), 1) if options['log']:
aux.write_log(message=f'... Addition of rois enabled. Starting addition...', options=options)
for edge, energies in edges.items(): added_edges = {'Mn': [], 'Fe': [], 'Co': [], 'Ni': []}
if edge_start in energies: for edge, scans in edges.items():
edge_actual = edge
edge_count[edge] += 1 if options['log']:
aux.write_log(message=f'... ... Adding rois of the {edge}-edge...', options=options)
if scans:
xanes_df = scans[0]
for i, scan in enumerate(scans):
if i > 0:
if options['log']:
aux.write_log(message=f'... ... ... Adding {i+1}/{len(scans)}', options=options)
if 'xmap_roi00' in xanes_df.columns:
xanes_df['xmap_roi00'] += scan['xmap_roi00']
if 'xmap_roi01' in xanes_df.columns:
xanes_df['xmap_roi01'] += scan['xmap_roi01']
added_edges[edge].append(xanes_df)
edges = added_edges
if options['save']:
if options['log']:
aux.write_log(message=f'... Saving data to {options["save_folder"]}', options=options)
if not os.path.isdir(options['save_folder']):
if options['log']:
aux.write_log(message=f'... ... {options["save_folder"]} does not exist. Creating folder.', options=options)
os.makedirs(options['save_folder'])
filename = os.path.basename(filename).split('.')[0]
filename = filename.split('/')[-1] for edge, scans in edges.items():
count = str(edge_count[edge_actual]).zfill(4) for i, scan in enumerate(scans):
count = '' if options['add'] else '_'+str(i).zfill(4)
path = os.path.join(options['save_folder'], f'{filename}_{edge}{count}.dat')
# Save
if destination: if not os.path.isfile(path):
cwd = os.getcwd() scan.to_csv(path)
if options['log']:
if not os.path.isdir(destination): aux.write_log(message=f'... ... Scan saved to {path}', options=options)
os.mkdir(destination)
elif options['replace'] and os.path.isfile(path):
os.chdir(destination) scan.to_csv(path)
if options['log']:
df.to_csv('{}_{}_{}.dat'.format(filename.split('.')[0], edge_actual, count)) aux.write_log(message=f'... ... File already exists. Overwriting to {path}', options=options)
os.chdir(cwd) elif not options['replace'] and os.path.isfile(path):
if options['log']:
else: aux.write_log(message=f'... ... File already exists. Skipping...', options=options)
df.to_csv('{}_{}_{}.dat'.format(filename.split('.')[0], edge_actual, count))
all_scans.append(edges)
if options['log']:
aux.write_log(message=f'All done!', options=options)
return all_scans
@ -98,9 +174,9 @@ def read_data(data: dict, options={}) -> pd.DataFrame:
# FIXME Handle the case when dataseries are not the same size # FIXME Handle the case when dataseries are not the same size
required_options = [] required_options = ['adjust']
default_options = { default_options = {
'adjust': 0
} }
options = aux.update_options(options=options, required_options=required_options, default_options=default_options) options = aux.update_options(options=options, required_options=required_options, default_options=default_options)
@ -109,6 +185,7 @@ def read_data(data: dict, options={}) -> pd.DataFrame:
# Initialise DataFrame with only ZapEnergy-column # Initialise DataFrame with only ZapEnergy-column
xanes_data = pd.read_csv(data['path'][0])[['ZapEnergy']] xanes_data = pd.read_csv(data['path'][0])[['ZapEnergy']]
xanes_data['ZapEnergy'] += options['adjust']
if not isinstance(data['path'], list): if not isinstance(data['path'], list):
data['path'] = [data['path']] data['path'] = [data['path']]
@ -117,6 +194,7 @@ def read_data(data: dict, options={}) -> pd.DataFrame:
columns.append(filename) columns.append(filename)
scan_data = pd.read_csv(filename) scan_data = pd.read_csv(filename)
scan_data = scan_data[[determine_active_roi(scan_data)]] scan_data = scan_data[[determine_active_roi(scan_data)]]
xanes_data = pd.concat([xanes_data, scan_data], axis=1) xanes_data = pd.concat([xanes_data, scan_data], axis=1)
@ -130,6 +208,7 @@ def read_data(data: dict, options={}) -> pd.DataFrame:
def determine_active_roi(scan_data): def determine_active_roi(scan_data):
# FIXME For Co-edge, this gave a wrong scan # FIXME For Co-edge, this gave a wrong scan