Add new functions and fix certain bugs

This commit is contained in:
rasmusvt 2022-06-30 17:08:14 +02:00
parent 8702cdfa00
commit 1757445f89

View file

@ -116,6 +116,11 @@ def pre_edge_fit(data: dict, options={}) -> pd.DataFrame:
data['pre_edge_params'][filename] = params data['pre_edge_params'][filename] = params
if options['log']:
aux.write_log(message=f'Pre edge fitted between {options["pre_edge_limits"][0]} and {options["pre_edge_limits"][1]} with polynomial of order {options["pre_edge_polyorder"]} with parmameters {params}.', options=options)
if options['pre_edge_masks']:
aux.write_log(message=f'Excluded regions: {options["pre_edge_masks"]}', options=options)
#making a list, y_pre,so the background will be applied to all ZapEnergy-values #making a list, y_pre,so the background will be applied to all ZapEnergy-values
background=fit_function(pre_edge_fit_data["ZapEnergy"]) background=fit_function(pre_edge_fit_data["ZapEnergy"])
@ -171,7 +176,7 @@ def pre_edge_fit_interactive(data: dict, options: dict) -> None:
w = widgets.interactive( w = widgets.interactive(
btp.ipywidgets_update, func=widgets.fixed(pre_edge_fit), data=widgets.fixed(data), options=widgets.fixed(options), btp.ipywidgets_update, func=widgets.fixed(pre_edge_fit), data=widgets.fixed(data), options=widgets.fixed(options),
pre_edge_limits=widgets.FloatRangeSlider(value=[options['pre_edge_limits'][0], options['pre_edge_limits'][1]], min=data['xanes_data_original']['ZapEnergy'].min(), max=data['xanes_data_original']['ZapEnergy'].max(), step=0.001), pre_edge_limits=widgets.FloatRangeSlider(value=[options['pre_edge_limits'][0], options['pre_edge_limits'][1]], min=data['xanes_data_original']['ZapEnergy'].min(), max=data['xanes_data_original']['ZapEnergy'].max(), step=0.0001),
pre_edge_store_data=widgets.Checkbox(value=options['pre_edge_store_data']) pre_edge_store_data=widgets.Checkbox(value=options['pre_edge_store_data'])
) )
@ -267,10 +272,13 @@ def post_edge_fit(data: dict, options={}):
edge_position = estimate_edge_position(data, options, index=0) edge_position = estimate_edge_position(data, options, index=0)
options['post_edge_limits'][0] = edge_position + post_edge_limit_offset options['post_edge_limits'][0] = edge_position + post_edge_limit_offset
if not options['post_edge_limits'][1]: if not options['post_edge_limits'][1]:
options['post_edge_limits'][1] = data['xanes_data_original']['ZapEnergy'].max() options['post_edge_limits'][1] = data['xanes_data_original']['ZapEnergy'].max()
if options['post_edge_limits'][0] > options['post_edge_limits'][1]:
options['post_edge_limits'][0] = options['post_edge_limits'][1] - 0.1
# Start inteactive session with ipywidgets. Disables options['interactive'] in order for the interactive loop to not start another interactive session # Start inteactive session with ipywidgets. Disables options['interactive'] in order for the interactive loop to not start another interactive session
if options['interactive']: if options['interactive']:
options['interactive'] = False options['interactive'] = False
@ -288,6 +296,8 @@ def post_edge_fit(data: dict, options={}):
post_edge_data = post_edge_data.dropna() #Removing all indexes without any value, as some of the data sets misses the few last data points and fucks up the fit post_edge_data = post_edge_data.dropna() #Removing all indexes without any value, as some of the data sets misses the few last data points and fucks up the fit
# Making a new dataframe, with only the ZapEnergies as the first column -> will be filled to include the background data # Making a new dataframe, with only the ZapEnergies as the first column -> will be filled to include the background data
post_edge_fit_data = pd.DataFrame(data['xanes_data_original']["ZapEnergy"]) post_edge_fit_data = pd.DataFrame(data['xanes_data_original']["ZapEnergy"])
@ -302,7 +312,9 @@ def post_edge_fit(data: dict, options={}):
fit_function = np.poly1d(params) fit_function = np.poly1d(params)
if options['log']: if options['log']:
aux.write_log(message=f'Post edge fitted with parameters: {params}') aux.write_log(message=f'Post edge fitted between {options["post_edge_limits"][0]} and {options["post_edge_limits"][1]} with polynomial of order {options["post_edge_polyorder"]} with parmameters {params}.', options=options)
if options['post_edge_masks']:
aux.write_log(message=f'Excluded regions: {options["post_edge_masks"]}', options=options)
data['post_edge_params'][filename] = params data['post_edge_params'][filename] = params
@ -364,7 +376,7 @@ def post_edge_fit_interactive(data: dict, options: dict) -> None:
w = widgets.interactive( w = widgets.interactive(
btp.ipywidgets_update, func=widgets.fixed(post_edge_fit), data=widgets.fixed(data), options=widgets.fixed(options), btp.ipywidgets_update, func=widgets.fixed(post_edge_fit), data=widgets.fixed(data), options=widgets.fixed(options),
post_edge_limits=widgets.FloatRangeSlider(value=[options['post_edge_limits'][0], options['post_edge_limits'][1]], min=data['xanes_data_original']['ZapEnergy'].min(), max=data['xanes_data_original']['ZapEnergy'].max(), step=0.001), post_edge_limits=widgets.FloatRangeSlider(value=[options['post_edge_limits'][0], options['post_edge_limits'][1]], min=data['xanes_data_original']['ZapEnergy'].min(), max=data['xanes_data_original']['ZapEnergy'].max(), step=0.0001),
post_edge_store_data=widgets.Checkbox(value=options['post_edge_store_data']) post_edge_store_data=widgets.Checkbox(value=options['post_edge_store_data'])
) )
@ -400,7 +412,7 @@ def smoothing(data: dict, options={}):
df_smooth_default = pd.DataFrame(data['xanes_data']['ZapEnergy']) df_smooth_default = pd.DataFrame(data['xanes_data']['ZapEnergy'])
if options['log']: if options['log']:
aux.write_log(message='Starting smoothing procedure.') aux.write_log(message='Starting smoothing procedure.', options=options)
# Run in interactive mode if enabled # Run in interactive mode if enabled
@ -498,6 +510,10 @@ def smoothing_interactive(data: dict, options: dict) -> None:
display(w) display(w)
def backup(data):
data['xanes_data_backup'] = data['xanes_data'].copy()
def restore_from_backup(data): def restore_from_backup(data):
''' Restores DataFrame from data['xanes_data_backup'] to data['xanes_data']. This can be useful e.g. when smoothing and you want to re-do the smoothing with different parameters. ''' Restores DataFrame from data['xanes_data_backup'] to data['xanes_data']. This can be useful e.g. when smoothing and you want to re-do the smoothing with different parameters.
@ -505,7 +521,7 @@ def restore_from_backup(data):
If there is no DataFrame stored in data['xanes_data_backup'], this function does nothing. ''' If there is no DataFrame stored in data['xanes_data_backup'], this function does nothing. '''
if 'xanes_data_bakcup' in data.keys(): if 'xanes_data_bakcup' in data.keys():
data['xanes_data'] = data['xanes_data_backup'] data['xanes_data'] = data['xanes_data_backup'].copy()
def find_nearest(array, value): def find_nearest(array, value):
@ -523,7 +539,7 @@ def estimate_edge_position(data: dict, options={}, index=0):
default_options = { default_options = {
'log': False, # Toggles logging on/off 'log': False, # Toggles logging on/off
'logfile': f'{datetime.now().strftime("%Y-%m-%d-%H-%M-%S")}_edge_position_estimation.log', # Sets path to log-file 'logfile': f'{datetime.now().strftime("%Y-%m-%d-%H-%M-%S")}_edge_position_estimation.log', # Sets path to log-file
'periods': 6, #Periods needs to be an even number for the shifting of values to work properly 'periods': 2, #Periods needs to be an even number for the shifting of values to work properly
} }
options = aux.update_options(options=options, required_options=required_options, default_options=default_options) options = aux.update_options(options=options, required_options=required_options, default_options=default_options)
@ -533,13 +549,25 @@ def estimate_edge_position(data: dict, options={}, index=0):
#shifting column values up so that average differential fits right between the points used in the calculation #shifting column values up so that average differential fits right between the points used in the calculation
df_diff[data['path'][index]]=df_diff[data['path'][index]].shift(-int(options['periods']/2)) df_diff[data['path'][index]]=df_diff[data['path'][index]].shift(-int(options['periods']/2))
if 'pre_edge_masks' in options.keys():
for mask in options['pre_edge_masks']:
df_diff[data['path'][index]].loc[(df_diff['ZapEnergy'] > mask[0]) & (df_diff['ZapEnergy'] < mask[1])] = 0
if 'post_edge_masks' in options.keys():
for mask in options['post_edge_masks']:
df_diff[data['path'][index]].loc[(df_diff['ZapEnergy'] > mask[0]) & (df_diff['ZapEnergy'] < mask[1])] = 0
df_diff_max = df_diff[data['path'][index]].dropna().max() df_diff_max = df_diff[data['path'][index]].dropna().max()
estimated_edge_shift =df_diff.loc[df_diff[data['path'][index]] == df_diff_max,'ZapEnergy'].values[0]
estimated_edge_pos = df_diff.loc[df_diff[data['path'][index]] == df_diff_max,'ZapEnergy'].values[0]
if options['log']: if options['log']:
aux.write_log(message=f'Estimated edge shift is: {estimated_edge_shift} keV', options=options) aux.write_log(message=f'Estimated edge position is: {estimated_edge_pos} keV', options=options)
return estimated_edge_shift return estimated_edge_pos
def determine_edge_position(data: dict, options={}): def determine_edge_position(data: dict, options={}):
''' Determines the edge position by 1) first differential maximum and/or 2) second differential zero-point. Calculates differential and/or double differential by diff.periods and double_diff.periods respectively. ''' Determines the edge position by 1) first differential maximum and/or 2) second differential zero-point. Calculates differential and/or double differential by diff.periods and double_diff.periods respectively.
@ -550,7 +578,7 @@ def determine_edge_position(data: dict, options={}):
Requires that XANES-data is already loaded in data['xanes_data']. This allows the user to choose when to determine the edge position - whether before or after normalisation, flattening etc.''' Requires that XANES-data is already loaded in data['xanes_data']. This allows the user to choose when to determine the edge position - whether before or after normalisation, flattening etc.'''
required_options = ['save_values', 'log', 'logfile', 'show_plots', 'save_plots', 'save_folder', 'diff', 'diff.polyorder', 'diff.periods', 'double_diff', 'double_diff.polyorder', 'double_diff.periods', 'points_around_edge'] required_options = ['save_values', 'log', 'logfile', 'show_plots', 'save_plots', 'save_folder', 'diff', 'diff.polyorder', 'diff.periods', 'double_diff', 'double_diff.polyorder', 'double_diff.periods', 'points_around_edge', 'save_diff_data']
default_options = { default_options = {
'save_values': True, # Whether the edge positions should be stored in a dictionary within the main data dictionary. 'save_values': True, # Whether the edge positions should be stored in a dictionary within the main data dictionary.
'log': False, # Toggles logging on/off 'log': False, # Toggles logging on/off
@ -564,7 +592,8 @@ def determine_edge_position(data: dict, options={}):
'double_diff': False, # Toggles calculation of the edge position based on double differential data 'double_diff': False, # Toggles calculation of the edge position based on double differential data
'double_diff.polyorder': 1, # Sets the order of the polynomial to fit edge region of the double differential to 'double_diff.polyorder': 1, # Sets the order of the polynomial to fit edge region of the double differential to
'double_diff.periods': 2, # Sets the number of data points between which the second order difference should be calculated. Needs to be even for subsequent shifting of data to function. 'double_diff.periods': 2, # Sets the number of data points between which the second order difference should be calculated. Needs to be even for subsequent shifting of data to function.
'points_around_edge': 5 # The length of the region to find points to fit to a function 'points_around_edge': 1, # The length of the region to find points to fit to a function
'save_diff_data': False
} }
options = aux.update_options(options=options, required_options=required_options, default_options=default_options) options = aux.update_options(options=options, required_options=required_options, default_options=default_options)
@ -608,6 +637,9 @@ def determine_edge_position(data: dict, options={}):
fit_region = (options['points_around_edge']+1)*(data['xanes_data']['ZapEnergy'].iloc[1] - data['xanes_data']['ZapEnergy'].iloc[0]) fit_region = (options['points_around_edge']+1)*(data['xanes_data']['ZapEnergy'].iloc[1] - data['xanes_data']['ZapEnergy'].iloc[0])
if fit_region < 0:
fit_region = (options['points_around_edge']+1)*(data['xanes_data']['ZapEnergy'].iloc[10] - data['xanes_data']['ZapEnergy'].iloc[9])
#========================== Fitting the first order derivative ========== #========================== Fitting the first order derivative ==========
@ -666,7 +698,7 @@ def determine_edge_position(data: dict, options={}):
aux.write_log(message=f"Edge position estimated by the double differential zero-point is {str(round(edge_pos_double_diff,5))} keV", options=options) aux.write_log(message=f"Edge position estimated by the double differential zero-point is {str(round(edge_pos_double_diff,5))} keV", options=options)
if options['diff']: if options['diff']:
aux.write_log(message=f"Difference between edge position estimated from differential maximum and double differential zero-point is {(edge_pos_diff-edge_pos_double_diff)*1000} eV.") aux.write_log(message=f"Difference between edge position estimated from differential maximum and double differential zero-point is {(edge_pos_diff-edge_pos_double_diff)*1000} eV.", options=options)
if options['save_values']: if options['save_values']:
data['e0_double_diff'][filename] = edge_pos_double_diff data['e0_double_diff'][filename] = edge_pos_double_diff
@ -773,6 +805,11 @@ def determine_edge_position(data: dict, options={}):
if not options['double_diff']: if not options['double_diff']:
edge_pos_double_diff = None edge_pos_double_diff = None
if options['save_diff_data']:
data['diff_data'] = df_diff if options['diff'] else None
data['double_diff_data'] = df_double_diff if options['double_diff'] else None
return edge_pos_diff, edge_pos_double_diff return edge_pos_diff, edge_pos_double_diff
@ -789,6 +826,20 @@ def determine_edge_position_interactive(data: dict, options: dict) -> None:
display(w) display(w)
def determine_edge_shift(data: dict, options: dict, edge_pos: float) -> None:
if 'edge' not in data.keys():
data['edge'] = find_element(data)
reference_energy = xas.edges.K['keV'].loc[xas.edges.K['Atom'] == data['edge']].values[0]
edge_shift = reference_energy - edge_pos
if options['log']:
aux.write_log(message=f'Edge shift vs. reference value for {data["edge"]} is {edge_shift*1000} eV', options=options)
return edge_shift
def normalise(data: dict, options={}): def normalise(data: dict, options={}):
''' Normalises the data so that the difference between the fitted pre- and post-edge functions is 1 at the edge position. ''' Normalises the data so that the difference between the fitted pre- and post-edge functions is 1 at the edge position.
@ -806,6 +857,7 @@ def normalise(data: dict, options={}):
normalised_df = pd.DataFrame(data['xanes_data']['ZapEnergy']) normalised_df = pd.DataFrame(data['xanes_data']['ZapEnergy'])
data['normalisation_constants'] = {} data['normalisation_constants'] = {}
if options['normalisation_store_data']: if options['normalisation_store_data']:
pre_edge_fit_data_norm = pd.DataFrame(data['pre_edge_fit_data']['ZapEnergy']) pre_edge_fit_data_norm = pd.DataFrame(data['pre_edge_fit_data']['ZapEnergy'])
post_edge_fit_data_norm = pd.DataFrame(data['post_edge_fit_data']['ZapEnergy']) post_edge_fit_data_norm = pd.DataFrame(data['post_edge_fit_data']['ZapEnergy'])