diff --git a/nafuma/xanes/__init__.py b/nafuma/xanes/__init__.py index a3834e8..af49ee6 100644 --- a/nafuma/xanes/__init__.py +++ b/nafuma/xanes/__init__.py @@ -1 +1 @@ -from . import io, calib, edges \ No newline at end of file +from . import io, calib, plot, edges \ No newline at end of file diff --git a/nafuma/xanes/calib.py b/nafuma/xanes/calib.py index beaeba7..6162d99 100644 --- a/nafuma/xanes/calib.py +++ b/nafuma/xanes/calib.py @@ -462,8 +462,6 @@ def smoothing(data: dict, options={}): # Make plots ... if options['save_plots'] or options['show_plots']: - - edge_pos = estimate_edge_position(data=data, options=options) step_length = data['xanes_data']['ZapEnergy'].iloc[1] - data['xanes_data']['ZapEnergy'].iloc[0] @@ -563,8 +561,8 @@ def estimate_edge_position(data: dict, options={}, index=0): options = aux.update_options(options=options, required_options=required_options, default_options=default_options) #making new dataframe to keep the differentiated data - df_diff = pd.DataFrame(data['xanes_data_original']["ZapEnergy"]) - df_diff[data['path'][index]]=data['xanes_data_original'][data['path'][index]].diff(periods=options['periods']) + df_diff = pd.DataFrame(data['xanes_data']["ZapEnergy"]) + df_diff[data['path'][index]]=data['xanes_data'][data['path'][index]].diff(periods=options['periods']) #shifting column values up so that average differential fits right between the points used in the calculation df_diff[data['path'][index]]=df_diff[data['path'][index]].shift(-int(options['periods']/2)) diff --git a/nafuma/xanes/io.py b/nafuma/xanes/io.py index a842418..e73674e 100644 --- a/nafuma/xanes/io.py +++ b/nafuma/xanes/io.py @@ -81,10 +81,19 @@ def split_scan_data(data: dict, options={}) -> list: for i, scan_data in enumerate(scan_datas): - + + if 'ZapEnergy' not in headers[i]: + if options['log']: + aux.write_log(message=f'... No valid scan data found... ({i+1}/{len(scan_datas)})', options=options) + continue + xanes_df = pd.DataFrame(scan_data).apply(pd.to_numeric) xanes_df.columns = headers[i] + + edge = find_element({'xanes_data_original': xanes_df}) + + if options['log']: aux.write_log(message=f'... Starting data clean-up ({edge}-edge)... ({i+1}/{len(scan_datas)})', options=options) @@ -183,15 +192,111 @@ def split_scan_data(data: dict, options={}) -> list: +def save_data(data: dict, options={}) -> None: + + required_options = ['save_folder', 'overwrite', 'log', 'logfile', 'filename'] + + default_options = { + 'log': False, + 'logfile': f'{datetime.datetime.now().strftime("%Y-%m-%d-%H-%M-%S")}_save_files.log', + 'save_folder': 'saved_scans', + 'overwrite': False, + 'filename': f'{datetime.datetime.now().strftime("%Y-%m-%d-%H-%M-%S")}_exported_data.dat', + } + + options = aux.update_options(options=options, required_options=required_options, default_options=default_options) + + + # Check if there is any data to be saved + if not 'xanes_data' in data.keys(): + if options['log']: + aux.write_log(message=f'There is not saved scan data in data. Exiting without saving...', options=options) + + return None + + if not isinstance(data['xanes_data'], pd.DataFrame): + if options['log']: + aux.write_log(message=f'data["xanes_data"] has an invalid format. Exiting without saving...', options=options) + + return None + + + # Make folder(s) if it/they do(es)n't exist + if not os.path.exists(options['save_folder']): + if options['log']: + aux.write_log(message=f'Destination folder does not exist. Creating folder...', options=options) + + os.makedirs(options['save_folder']) + + + + if os.path.exists(os.path.join('save_folder', options['filename'])): + if not options['overwrite']: + if options['log']: + aux.write_log(message=f'File already exists and overwrite disabled. Exiting without saving...', options=options) + return None + + with open(os.path.join(options['save_folder'], options['filename']), 'w') as f: + + if 'e0_diff' in data.keys(): + f.write(f'# Number of header lines: {len(data["path"])+1} \n') + + for i, (path, e0) in enumerate(data['e0_diff'].items()): + f.write(f'# Scan_{i} \t {e0} \n') + + else: + f.write(f'# Number of header lines: {1}') + + + data['xanes_data'].to_csv(f, sep='\t', index=False) + + + #data['xanes_data'].to_csv(os.path.join(options['save_folder'], options['filename']), sep='\t', index=False) + + + +def load_data(path: str) -> dict: + # FIXME Let this function be called by read_data() if some criterium is passed + + data = {} + + + with open(path, 'r') as f: + line = f.readline() + header_lines = int(line.split()[-1]) + + if header_lines > 1: + edge_positions = [] + line = f.readline() + while line[0] == '#': + edge_positions.append(line.split()[-1]) + line = f.readline() + + data['xanes_data'] = pd.read_csv(path, sep='\t', skiprows=header_lines) + data['path'] = data['xanes_data'].columns.to_list() + data['path'].remove('ZapEnergy') + + if header_lines > 1: + data['e0_diff'] = {} + + for path, edge_position in zip(data['path'], edge_positions): + data['e0_diff'][path] = float(edge_position) + + + + return data + + def read_data(data: dict, options={}) -> pd.DataFrame: # FIXME Handle the case when dataseries are not the same size # FIXME Add possibility to extract TIME (for operando runs) and Blower Temp (for variable temperature runs) # FIXME Add possibility to iport transmission data - required_options = ['adjust'] + required_options = ['adjust', 'mode'] default_options = { - 'adjust': 0 + 'adjust': 0, + 'mode': 'fluoresence' } options = aux.update_options(options=options, required_options=required_options, default_options=default_options) @@ -211,11 +316,15 @@ def read_data(data: dict, options={}) -> pd.DataFrame: scan_data = pd.read_csv(filename, skiprows=1) - if not options['active_roi']: - scan_data = scan_data[[determine_active_roi(scan_data)]] - else: - scan_data = scan_data[options['active_roi']] - + if options['mode'] == 'fluoresence': + if not options['active_roi']: + scan_data = scan_data[[determine_active_roi(scan_data)]] + else: + scan_data = scan_data[options['active_roi']] + + elif options['mode'] == 'transmission': + scan_data = scan_data['MonEx'] / scan_data['Ion2'] + xanes_data = pd.concat([xanes_data, scan_data], axis=1) diff --git a/nafuma/xanes/plot.py b/nafuma/xanes/plot.py new file mode 100644 index 0000000..440308c --- /dev/null +++ b/nafuma/xanes/plot.py @@ -0,0 +1,164 @@ +import matplotlib.pyplot as plt +from matplotlib.ticker import (MultipleLocator, FormatStrFormatter,AutoMinorLocator) + +import pandas as pd +import numpy as np +import math +import datetime + +#import ipywidgets as widgets +#from IPython.display import display + +import nafuma.xanes as xas +import nafuma.plotting as btp +import nafuma.auxillary as aux + + +def plot_xanes(data, options={}): + + + # Update options + required_options = ['which_scans', 'xlabel', 'ylabel', 'xunit', 'yunit', 'exclude_scans', 'colours', 'gradient', 'rc_params', 'format_params'] + default_options = { + 'which_scans': 'all', + 'xlabel': 'Energy', 'ylabel': 'Intensity', + 'xunit': 'keV', 'yunit': 'arb. u.', + 'exclude_scans': [], + 'colours': None, + 'gradient': False, + 'rc_params': {}, + 'format_params': {}} + + options = aux.update_options(options=options, required_options=required_options, default_options=default_options) + + + if not 'xanes_data' in data.keys(): + data['xanes_data'] = xas.io.load_data(data=data, options=options) + + # Update list of cycles to correct indices + update_scans_list(data=data, options=options) + + colours = generate_colours(scans=options['which_scans'], options=options) + + # Prepare plot, and read and process data + + fig, ax = btp.prepare_plot(options=options) + + + # Add counter to pick out correct colour + counter = 0 + for i, path in enumerate(data['path']): + if i in options['which_scans']: + data['xanes_data'].plot(x='ZapEnergy', y=path, ax=ax, c=colours[counter]) + counter += 1 + + + fig, ax = btp.adjust_plot(fig=fig, ax=ax, options=options) + + #if options['interactive_session_active']: + + + return fig, ax + + +def pick_out_scans(metadata: dict, timestamp: list): + + # If either start or end are None, set to way back when or way into the future + if not timestamp[0]: + timestamp[0] = datetime.datetime.strptime('1970 01 01 00:00:00', '%Y %m %d %H:%M:%S') + else: + timestamp[0] = datetime.datetime.strptime(timestamp[0], "%d.%b %y %H.%M.%S") + if not timestamp[1]: + timestamp[1] = datetime.datetime.strptime('3000 01 01 00:00:00', '%Y %m %d %H:%M:%S') + else: + timestamp[1] = datetime.datetime.strptime(timestamp[1], "%d.%b %y %H.%M.%S") + + scans = [] + for i, time in enumerate(metadata['time']): + if time >= timestamp[0] and time <= timestamp[1]: + scans.append(i) + + + return scans + + + + + + +def update_scans_list(data, options: dict) -> None: + + if options['which_scans'] == 'all': + options['which_scans'] = [i for i in range(len(data['path']))] + + + elif isinstance(options['which_scans'], list): + + scans =[] + + for scan in options['which_scans']: + if isinstance(scan, int): + scans.append(scan-1) + + elif isinstance(scan, tuple): + interval = [i-1 for i in range(scan[0], scan[1]+1)] + scans.extend(interval) + + + options['which_scans'] = scans + + + # Tuple is used to define an interval - as elements tuples can't be assigned, I convert it to a list here. + elif isinstance(options['which_scans'], tuple): + which_scans = list(options['which_scans']) + + if which_scans[0] <= 0: + which_scans[0] = 1 + + elif which_scans[1] < 0: + which_scans[1] = len(options['which_scans']) + + + options['which_scans'] = [i-1 for i in range(which_scans[0], which_scans[1]+1)] + + + for i, scan in enumerate(options['which_scans']): + if scan in options['exclude_scans']: + del options['which_scans'][i] + + + + +def generate_colours(scans, options): + # FIXME Make this a generalised function and use this instead of this and in the electrochemsitry submodule + + # Assign colours from the options dictionary if it is defined, otherwise use standard colours. + if options['colours']: + colour = options['colours'] + + else: + #colour = (214/255, 143/255, 214/255) # Plum Web (#D68FD6), coolors.co + colour = (90/255, 42/255, 39/255) # Caput Mortuum(#5A2A27), coolors.co + + # If gradient is enabled, find start and end points for each colour + if options['gradient']: + + add = min([(1-x)*0.75 for x in colour]) + + colour_start = colour + colour_end = [x+add for x in colour] + + + # Generate lists of colours + colours = [] + + for scan_number in range(0, len(scans)): + if options['gradient']: + weight_start = (len(scans) - scan_number)/len(scans) + weight_end = scan_number/len(scans) + + colour = [weight_start*start_colour + weight_end*end_colour for start_colour, end_colour in zip(colour_start, colour_end)] + + colours.append(colour) + + return colours \ No newline at end of file