Refactor split_scans

This commit is contained in:
rasmusvt 2022-06-20 16:08:36 +02:00
parent 9c6a7d5991
commit 7214746af1

View file

@ -2,94 +2,120 @@ import pandas as pd
import matplotlib.pyplot as plt import matplotlib.pyplot as plt
import os import os
import numpy as np import numpy as np
import nafuma.auxillary as aux import nafuma.auxillary as aux
from nafuma.xanes.calib import find_element
def split_xanes_scan(root, destination=None, replace=False): def split_scan_data(data: dict, options={}):
required_options = ['save', 'save_folder', 'replace', 'add_rois']
default_options = {
'save': False,
'save_folder': '.',
'replace': False,
'add_rois': False
}
options = aux.update_options(options=options, required_options=required_options, default_options=default_options)
#root is the path to the beamtime-folder #root is the path to the beamtime-folder
#destination should be the path to the processed data #destination should be the path to the processed data
#insert a for-loop to go through all the folders.dat-files in the folder root\xanes\raw #insert a for-loop to go through all the folders.dat-files in the folder root\xanes\raw
# FIXME Only adding this variable to pass the Linting-tests - will refactor this later # FIXME Only adding this variable to pass the Linting-tests - will refactor this later
filename = 'dummy'
if not isinstance(data['path'], list):
data['path'] = [data['path']]
all_scans = []
with open(filename, 'r') as f: for filename in data['path']:
lines = f.readlines()
with open(filename, 'r') as f:
lines = f.readlines()
scan_datas, scan_data = [], []
headers, header = [], ''
read_data = False
datas = [] for line in lines:
data = [] # Header line starts with #L - reads headers, and toggles data read-in on
headers = [] if line[0:2] == "#L":
header = '' header, read_data = line[2:].split(), True
start = False continue
for line in lines: # First line after data started with #C - stops data read-in
if line[0:2] == "#L": elif line[0:2] == "#C":
start = True read_data = False
header = line[2:].split()
continue
elif line[0:2] == "#C":
start = False
if data:
datas.append(data)
data = []
if header: if scan_data:
headers.append(header) scan_datas.append(scan_data); scan_data = []
header = ''
if header:
headers.append(header); header = ''
# Ignore line if read-in not toggled
if read_data == False:
continue
# Read in data if it is
else:
scan_data.append(line.split())
edges = {'Mn': [], 'Fe': [], 'Co': [], 'Ni': []}
if start == False:
continue
else:
data.append(line.split())
edges = {'Mn': [6.0, 6.1, 6.2, 6.3, 6.4, 6.5], 'Fe': [6.8, 6.9, 7.0, 7.1, 7.2], 'Co': [7.6, 7.7, 7.8, 7.9], 'Ni': [8.1, 8.2, 8.3, 8.4, 8.5]}
edge_count = {'Mn': 0, 'Fe': 0, 'Co': 0, 'Ni': 0}
for ind, data in enumerate(datas):
df = pd.DataFrame(data)
df.columns = headers[ind]
edge_start = np.round((float(df["ZapEnergy"].min())), 1)
for edge, energies in edges.items():
if edge_start in energies:
edge_actual = edge
edge_count[edge] += 1
for i, scan_data in enumerate(scan_datas):
xanes_df = pd.DataFrame(scan_data).apply(pd.to_numeric)
xanes_df.columns = headers[i]
if not ('xmap_roi00' in headers[i]) and (not 'xmap_roi01' in headers[i]):
continue
edge = find_element({'xanes_data_original': xanes_df})
edges[edge].append(xanes_df)
filename = filename.split('/')[-1] if options['add']:
count = str(edge_count[edge_actual]).zfill(4)
added_edges = {'Mn': [], 'Fe': [], 'Co': [], 'Ni': []}
for edge, scans in edges.items():
if scans:
xanes_df = scans[0]
for i, scan in enumerate(scans):
# Save if i > 0:
if destination:
cwd = os.getcwd()
if not os.path.isdir(destination): if 'xmap_roi00' in xanes_df.columns:
os.mkdir(destination) xanes_df['xmap_roi00'] += scan['xmap_roi00']
if 'xmap_roi01' in xanes_df.columns:
os.chdir(destination) xanes_df['xmap_roi01'] += scan['xmap_roi01']
df.to_csv('{}_{}_{}.dat'.format(filename.split('.')[0], edge_actual, count)) added_edges[edge].append(xanes_df)
os.chdir(cwd) edges = added_edges
else: if options['save']:
df.to_csv('{}_{}_{}.dat'.format(filename.split('.')[0], edge_actual, count)) if not os.path.isdir(options['save_folder']):
os.makedirs(options['save_folder'])
filename = os.path.basename(filename).split('.')[0]
for edge, scans in edges.items():
for i, scan in enumerate(scans):
count = '' if options['add'] else '_'+str(i).zfill(4)
path = os.path.join(options['save_folder'], f'{filename}_{edge}{count}.dat')
scan.to_csv(path)
all_scans.append(edges)
return all_scans
@ -117,6 +143,7 @@ def read_data(data: dict, options={}) -> pd.DataFrame:
columns.append(filename) columns.append(filename)
scan_data = pd.read_csv(filename) scan_data = pd.read_csv(filename)
scan_data = scan_data[[determine_active_roi(scan_data)]] scan_data = scan_data[[determine_active_roi(scan_data)]]
xanes_data = pd.concat([xanes_data, scan_data], axis=1) xanes_data = pd.concat([xanes_data, scan_data], axis=1)