Merge pull request #9 from rasmusthog/halvor_xanes

Halvor xanes
This commit is contained in:
Rasmus Vester Thøgersen 2022-07-07 14:58:08 +02:00 committed by GitHub
commit c479575c85
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
5 changed files with 1361 additions and 245 deletions

View file

@ -1,5 +1,6 @@
import json
import numpy as np
import os
def update_options(options, required_options, default_options):
''' Takes a dictionary of options along with a list of required options and dictionary of default options, and sets all keyval-pairs of options that is not already defined to the default values'''
@ -11,11 +12,26 @@ def update_options(options, required_options, default_options):
return options
def save_options(options, path):
def save_options(options, path, ignore=None):
''' Saves any options dictionary to a JSON-file in the specified path'''
options_copy = options.copy()
if ignore:
if not isinstance(ignore, list):
ignore = [ignore]
for i in ignore:
options_copy[i] = 'Removed'
if not os.path.isdir(os.path.dirname(path)):
if os.path.dirname(path):
os.makedirs(os.path.dirname(path))
with open(path, 'w') as f:
json.dump(options,f)
json.dump(options_copy, f, skipkeys=True, indent=4)
def load_options(path):
@ -53,3 +69,46 @@ def floor(a, roundto=1):
a = np.floor(a*fac) / fac
return a
def write_log(message, options={}):
from datetime import datetime
required_options = ['logfile']
default_options = {
'logfile': f'{datetime.now().strftime("%Y-%m-%d-%H-%M-%S.log")}'
}
options = update_options(options=options, required_options=required_options, default_options=default_options)
if not os.path.isdir(os.path.dirname(options['logfile'])):
os.makedirs(os.path.dirname(options['logfile']))
now = datetime.now().strftime('%Y/%m/%d %H:%M:%S')
message = f'[{now}] {message} \n'
with open(options['logfile'], 'a') as f:
f.write(message)
#Function that "collects" all the files in a folder, only accepting .dat-files from xanes-measurements
def get_filenames(path, ext, filter=''):
''' Collects all filenames from specified path with a specificed extension
Input:
path: path to find all filenames (relative or absolute)
ext: extension (including ".")'''
filenames = [os.path.join(path, filename) for filename in os.listdir(path) if os.path.isfile(os.path.join(path, filename)) and filename.endswith(ext) and filter in filename]
return filenames
def move_list_element_last(filenames,string):
for i,file in enumerate(filenames):
if string in file:
del filenames[i]
filenames.append(file)
return filenames

View file

@ -1 +1 @@
from . import io, calib
from . import io, calib, edges

File diff suppressed because it is too large Load diff

30
nafuma/xanes/edges.py Normal file
View file

@ -0,0 +1,30 @@
import pandas as pd
import numpy as np
from scipy.constants import c, h
# From 2019 redefinition of SI base units: https://en.wikipedia.org/wiki/2019_redefinition_of_the_SI_base_units
keV_per_J = (1 / 1.602176634e-19) / 1000
# kXu values taken from International Tables for Crystallography Volume , Kulwer Academic Publishers - Dordrect / Boston / London (1992)
K = { 'Z': [ 1, 2,
3, 4, 5, 6, 7, 8, 9, 10,
11, 12, 13, 14, 15, 16, 17, 18,
19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36,
37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48],
'Atom': [ 'H', 'He',
'Li', 'Be', 'B', 'C', 'N', 'O', 'F', 'Ne',
'Na', 'Mg', 'Al', 'Si', 'P', 'S', 'Cl', 'Ar',
'K', 'Ca', 'Sc', 'Ti', 'V', 'Cr', 'Mn', 'Fe', 'Co', 'Ni', 'Cu', 'Zn', 'Ga', 'Ge', 'As', 'Se', 'Br', 'Kr',
'Rb', 'Sr', 'Y', 'Zr', 'Nb', 'Mo', 'Tc', 'Ru', 'Rh', 'Pd', 'Ag', 'Cd'],
'kXu': [ np.nan, np.nan,
226.5, np.nan, np.nan, 43.68, 30.99, 23.32, np.nan, np.nan,
np.nan, 9.5117, 7.9511, 6.7446, 5.7866, 5.0182, 4.3969, 3.8707,
3.43645, 3.07016, 2.7573, 2.49730, 2.26902, 2.07012, 1.89636, 1.74334, 1.60811, 1.48802, 1.38043, 1.2833, 1.19567, 1.11652, 1.04497, 0.97978, 0.91995, 0.86547,
0.81549, 0.76969, 0.72762, 0.68877, 0.65291, 0.61977, 0.5891, 0.56047, 0.53378, 0.50915, 0.48582, 0.46409]}
K = pd.DataFrame(K)
K['keV'] = np.round(h*c/(K['kXu']*10**-10) * keV_per_J, 3)
# FIXME If needed, add energies for L-edges as well.

View file

@ -2,147 +2,307 @@ import pandas as pd
import matplotlib.pyplot as plt
import os
import numpy as np
import nafuma.auxillary as aux
from nafuma.xanes.calib import find_element
import datetime
def split_xanes_scan(filename, destination=None, replace=False):
#root is the path to the beamtime-folder
#destination should be the path to the processed data
def split_scan_data(data: dict, options={}) -> list:
''' Splits a XANES-file from BM31 into different files depending on the edge. Has the option to add intensities of all scans of same edge into the same file.
As of now only picks out xmap_rois (fluoresence mode) and for Mn, Fe, Co and Ni K-edges.'''
#insert a for-loop to go through all the folders.dat-files in the folder root\xanes\raw
required_options = ['log', 'logfile', 'save', 'save_folder', 'replace', 'active_roi', 'add_rois', 'return']
default_options = {
'log': False,
'logfile': f'{datetime.datetime.now().strftime("%Y-%m-%d-%H-%M-%S")}_split_edges.log',
'save': False, # whether to save the files or not
'save_folder': '.', # root folder of where to save the files
'replace': False, # whether to replace the files if they already exist
'active_roi': None,
'add_rois': False, # Whether to add the rois of individual scans of the same edge together
'return': True
}
options = aux.update_options(options=options, required_options=required_options, default_options=default_options)
if not isinstance(data['path'], list):
data['path'] = [data['path']]
all_scans = []
if options['log']:
aux.write_log(message='Starting file splitting...', options=options)
for filename in data['path']:
if options['log']:
aux.write_log(message=f'Reading {filename}...', options=options)
with open(filename, 'r') as f:
lines = f.readlines()
datas = []
data = []
headers = []
header = ''
start = False
timestamps = []
scan_datas, scan_data = [], []
headers, header = [], ''
read_data = False
for line in lines:
if line[0:2] == "#L":
start = True
header = line[2:].split()
for i, line in enumerate(lines):
# Header line starts with #L - reads headers, and toggles data read-in on
if 'zapline mono' in line:
timestamps.append(lines[i+1].strip('#D'))
elif line[0:2] == "#L":
header, read_data = line[2:].split(), True
if options['log']:
aux.write_log(message='... Found scan data. Starting read-in...', options=options)
continue
elif line[0:2] == "#C":
start = False
# First line after data started with #C - stops data read-in
elif line[0:2] == "#C" or line[0:2] == '#S':
read_data = False
if data:
datas.append(data)
data = []
if scan_data:
scan_datas.append(scan_data); scan_data = []
if header:
headers.append(header)
header = ''
headers.append(header); header = ''
if start == False:
# Ignore line if read-in not toggled
if read_data == False:
continue
# Read in data if it is
else:
data.append(line.split())
scan_data.append(line.split())
edges = {'Mn': [], 'Fe': [], 'Co': [], 'Ni': []}
for i, scan_data in enumerate(scan_datas):
xanes_df = pd.DataFrame(scan_data).apply(pd.to_numeric)
xanes_df.columns = headers[i]
edge = find_element({'xanes_data_original': xanes_df})
if options['log']:
aux.write_log(message=f'... Starting data clean-up ({edge}-edge)... ({i+1}/{len(scan_datas)})', options=options)
if not ('xmap_roi00' in headers[i]) and (not 'xmap_roi01' in headers[i]):
if options['log']:
aux.write_log(message='... ... Did not find fluoresence data. Skipping...', options=options)
continue
edges = {'Mn': [6.0, 6.1, 6.2, 6.3, 6.4, 6.5], 'Fe': [6.8, 6.9, 7.0, 7.1, 7.2], 'Co': [7.6, 7.7, 7.8, 7.9], 'Ni': [8.1, 8.2, 8.3, 8.4, 8.5]}
edge_count = {'Mn': 0, 'Fe': 0, 'Co': 0, 'Ni': 0}
edges[edge].append(xanes_df)
for ind, data in enumerate(datas):
df = pd.DataFrame(data)
df.columns = headers[ind]
if options['add_rois']:
edge_start = np.round((float(df["ZapEnergy"].min())), 1)
if options['log']:
aux.write_log(message=f'... Addition of rois enabled. Starting addition...', options=options)
for edge, energies in edges.items():
if edge_start in energies:
edge_actual = edge
edge_count[edge] += 1
added_edges = {'Mn': [], 'Fe': [], 'Co': [], 'Ni': []}
for edge, scans in edges.items():
if options['log']:
aux.write_log(message=f'... ... Adding rois of the {edge}-edge...', options=options)
if scans:
xanes_df = scans[0]
for i, scan in enumerate(scans):
if i > 0:
if options['log']:
aux.write_log(message=f'... ... ... Adding {i+1}/{len(scans)}', options=options)
if 'xmap_roi00' in xanes_df.columns:
xanes_df['xmap_roi00'] += scan['xmap_roi00']
if 'xmap_roi01' in xanes_df.columns:
xanes_df['xmap_roi01'] += scan['xmap_roi01']
added_edges[edge].append(xanes_df)
edges = added_edges
if options['save']:
#FIXME If there is something wrong with the input file, the file will not be saved but log-file still sais it is saved. Goes from "Saving data to ..." to "All done!" no matter if it fals or not.
if options['log']:
aux.write_log(message=f'... Saving data to {options["save_folder"]}', options=options)
if not os.path.isdir(options['save_folder']):
if options['log']:
aux.write_log(message=f'... ... {options["save_folder"]} does not exist. Creating folder.', options=options)
os.makedirs(options['save_folder'])
filename = os.path.basename(filename).split('.')[0]
filename = filename.split('/')[-1]
count = str(edge_count[edge_actual]).zfill(4)
for edge, scans in edges.items():
for i, scan in enumerate(scans):
count = '' if options['add_rois'] else '_'+str(i).zfill(4)
path = os.path.join(options['save_folder'], f'{filename}_{edge}{count}.dat')
if not os.path.isfile(path):
with open(path, 'w', newline = '\n') as f:
f.write(f'# Time: {timestamps[i]}')
scan.to_csv(f)
if options['log']:
aux.write_log(message=f'... ... Scan saved to {path}', options=options)
elif options['replace'] and os.path.isfile(path):
with open(path, 'w', newline = '\n') as f:
scan.to_csv(f)
if options['log']:
aux.write_log(message=f'... ... File already exists. Overwriting to {path}', options=options)
elif not options['replace'] and os.path.isfile(path):
if options['log']:
aux.write_log(message=f'... ... File already exists. Skipping...', options=options)
all_scans.append(edges)
if options['log']:
aux.write_log(message=f'All done!', options=options)
# Save
if destination:
cwd = os.getcwd()
if not os.path.isdir(destination):
os.mkdir(destination)
os.chdir(destination)
df.to_csv('{}_{}_{}.dat'.format(filename.split('.')[0], edge_actual, count))
os.chdir(cwd)
if options['return']:
return all_scans
else:
df.to_csv('{}_{}_{}.dat'.format(filename.split('.')[0], edge_actual, count))
#Function that "collects" all the files in a folder, only accepting .dat-files from xanes-measurements
def get_filenames(path):
cwd = os.getcwd()
# Change into path provided
os.chdir(path)
filenames = [os.path.join(path, filename) for filename in os.listdir() if os.path.isfile(filename) and filename[-4:] == '.dat'] #changed
return
# Change directory back to where you ran the script from
os.chdir(cwd)
def read_data(data: dict, options={}) -> pd.DataFrame:
return filenames
def put_in_dataframe(path):
filenames = get_filenames(path)
# FIXME Handle the case when dataseries are not the same size
# FIXME Add possibility to extract TIME (for operando runs) and Blower Temp (for variable temperature runs)
# FIXME Add possibility to iport transmission data
required_options = ['adjust']
default_options = {
'adjust': 0
}
#making the column names to be used in the dataframe, making sure the first column is the ZapEnergy
column_names = ["ZapEnergy"]
options = aux.update_options(options=options, required_options=required_options, default_options=default_options)
for i in range(len(filenames)):
column_names.append(filenames[i])
columns = ['ZapEnergy']
#Taking the first file in the folder and extracting ZapEnergies and intensity from that (only need the intensity from the rest)
first = pd.read_csv(filenames[0], skiprows=0)
if not isinstance(data['path'], list):
data['path'] = [data['path']]
#Making a data frame with the correct columns, and will fill inn data afterwards
df = pd.DataFrame(columns = column_names)
#First putting in the 2theta-values
df["ZapEnergy"]=first["ZapEnergy"]
# Initialise DataFrame with only ZapEnergy-column
xanes_data = pd.read_csv(data['path'][0], skiprows=1)[['ZapEnergy']]
xanes_data['ZapEnergy'] += options['adjust']
#filling in the intensities from all files into the corresponding column in the dataframe
for i in range(len(filenames)):
df2 = pd.read_csv(filenames[i])
df2 = df2.drop(['Mon','Det1','Det2','Det3','Det4','Det5', 'Det6','Ion1'], axis=1) #, axis=1)
df2 = df2.drop(['MonEx','Ion2','Htime','MusstEnc1','MusstEnc3','MusstEnc4', 'TwoTheta', 'ZCryo'], axis=1)
df2 = df2.drop(['ZBlower1', 'ZBlower2', 'ZSrcur'], axis=1)#, axis=19) #removing the sigma at this point
############## THIS PART PICKS OUT WHICH ROI IS OF INTEREST, BUT MUST BE FIXED IF LOOKING AT THREE EDGES (roi00,roi01,roi02) #####################
if 'xmap_roi01' in df2.columns:
for filename in data['path']:
columns.append(filename)
scan_data = pd.read_csv(filename, skiprows=1)
if not options['active_roi']:
scan_data = scan_data[[determine_active_roi(scan_data)]]
else:
scan_data = scan_data[options['active_roi']]
xanes_data = pd.concat([xanes_data, scan_data], axis=1)
xanes_data.columns = columns
return xanes_data
def read_metadata(data: dict, options={}) -> dict:
required_options = ['get_temperature', 'get_timestamp', 'adjust_time']
default_options = {
'get_temperature': True,
'get_timestamp': True,
'adjust_time': False
}
options = aux.update_options(options=options, required_options=required_options, default_options=default_options)
temperatures = []
timestamps = []
for filename in data['path']:
scan_data = pd.read_csv(filename, skiprows=1)
if options['get_temperature']:
temperatures.append(scan_data['ZBlower2'].mean())
if options['get_timestamp']:
with open(filename, 'r') as f:
time = f.readline().strip('# Time: ')
time = datetime.datetime.strptime(time, "%a %b %d %H:%M:%S %Y ")
if options['adjust_time']:
time_elapsed = scan_data['Htime'].iloc[-1] - scan_data['Htime'].iloc[0]
time += datetime.timedelta(microseconds=time_elapsed)/2
timestamps.append(time)
metadata = {'time': timestamps, 'temperature': temperatures}
return metadata
def determine_active_roi(scan_data):
# FIXME For Co-edge, this gave a wrong scan
#Trying to pick the roi with the highest difference between maximum and minimum intensity --> biggest edge shift
if max(df2["xmap_roi00"])-min(df2["xmap_roi00"])>max(df2["xmap_roi01"])-min(df2["xmap_roi01"]):
df[filenames[i]]=df2["xmap_roi00"] #forMn
# if max(scan_data["xmap_roi00"])-min(scan_data["xmap_roi00"])>max(scan_data["xmap_roi01"])-min(scan_data["xmap_roi01"]):
# active_roi = 'xmap_roi00'
# else:
# active_roi = 'xmap_roi01'
if not ('xmap_roi00' in scan_data.columns) or not ('xmap_roi01' in scan_data.columns):
if 'xmap_roi00' in scan_data.columns:
active_roi = 'xmap_roi00'
elif 'xmap_roi01' in scan_data.columns:
active_roi = 'xmap_roi01'
elif (scan_data['xmap_roi00'].iloc[0:100].mean() < scan_data['xmap_roi00'].iloc[-100:].mean()) and (scan_data['xmap_roi01'].iloc[0:100].mean() < scan_data['xmap_roi01'].iloc[-100:].mean()):
if (scan_data['xmap_roi00'].iloc[:int(scan_data.shape[0]/2)].max() - scan_data['xmap_roi00'].iloc[0])/scan_data['xmap_roi00'].max() > (scan_data['xmap_roi01'].iloc[:int(scan_data.shape[0]/2)].max() - scan_data['xmap_roi01'].iloc[0])/scan_data['xmap_roi01'].max():
active_roi = 'xmap_roi00'
else:
df[filenames[i]]=df2["xmap_roi01"] #forNi
active_roi = 'xmap_roi01'
elif scan_data['xmap_roi00'].iloc[0:100].mean() < scan_data['xmap_roi00'].iloc[-100:].mean():
active_roi = 'xmap_roi00'
elif scan_data['xmap_roi01'].iloc[0:100].mean() < scan_data['xmap_roi01'].iloc[-100:].mean():
active_roi = 'xmap_roi01'
else:
df[filenames[i]]=df2["xmap_roi00"]
###############################################################################################
active_roi = None
i=i+1
return active_roi
#print(df)
#If I want to make a csv-file of the raw data. Decided that was not necessary:
#df.to_csv('static-Mn-edge.csv') #writing it to a csv, first row is datapoint (index), second column is 2theta, and from there the scans starts
return df