Add read brml functionality

This commit is contained in:
rasmusvt 2021-10-27 15:33:56 +02:00
parent 5c977cc387
commit acdc9399eb

View file

@ -2,6 +2,7 @@ import fabio, pyFAI
import pandas as pd import pandas as pd
import numpy as np import numpy as np
import os import os
import shutil
import zipfile import zipfile
import io import io
@ -147,312 +148,77 @@ def view_integrator(calibrant):
print("\nIntegrator: \n", ai) print("\nIntegrator: \n", ai)
def brml_reader(file_name):
"""
From: https://github.com/aboulle/DxTools/blob/master/data_reader.py
Extracts brml into a temporary unzip file, and parses all xml files.
For every intensity value, saves all relevant motor coordinates and sensor values.
All values are save in temporary SPEC-style tmp file.
"""
#*****************************************************************************************************
# Unzip xml files
#*****************************************************************************************************
extract_path = os.path.join(os.getcwd(),"unzip")
if sys.platform == "win32":
print("Detected platform: Windows")
os.system("RMDIR "+ extract_path +" /s /q")
elif sys.platform == "darwin":
print("Detected platform: MacOS")
os.system("rm -rf "+ extract_path)
elif sys.platform == "linux" or sys.platform == "linux2":
print("Detected platform: Linux")
os.system("rm -rf "+ extract_path)
#Extract all RawData*.xml files and InstructionContainer the brml to temporary unzip file
with zipfile.ZipFile(file_name,"r") as brml: def read_brml(path, options=None):
required_options = ['extract_folder']
default_options = {
'extract_folder': 'temp',
'save_folder': None
}
if not options:
options = default_options
else:
for option in required_options:
if option not in options.keys():
options[option] = default_options[option]
if not os.path.isdir(options['extract_folder']):
os.mkdir(options['extract_folder'])
# Extract the RawData0.xml file from the brml-file
with zipfile.ZipFile(path, 'r') as brml:
for info in brml.infolist(): for info in brml.infolist():
if ("RawData" in info.filename) or ("InstructionContainer" in info.filename): if "RawData" in info.filename:
#if ("RawData" in info.filename): brml.extract(info.filename, temp)
brml.extract(info.filename, extract_path)
#*****************************************************************************************************
# For time counting, the number of days is initialized to 0.
# Compatibility fixes with D8 advance and older Discover: offsets, chi and tx, ty are initialized to 0
#*****************************************************************************************************
# Initialize the number of days to 0
n_day = 0.
# Initialize all offsets to 0
off_tth = off_om = off_phi = off_chi = off_tx = off_ty = 0
# Set Chi, tx and ty to 0 for D8 advance (July 2017 Julia Stroh)
chi = tx = ty = "0"
#*****************************************************************************************************
#Modification June 2017 (Duc Dinh)
#In some RawData.xml files, wavelength and static motors are missing.
#Find wl and static motors in MeasurementContainer.xml
#*****************************************************************************************************
data_path = os.path.join(extract_path, "*0","InstructionContainer.xml")
for file in sorted(glob.glob(data_path)):
tree = ET.parse(file)
root = tree.getroot()
for chain in root.findall("./ComparisonMethod/HrxrdAlignmentData"):
wl = chain.find("WaveLength").attrib["Value"]
for chain in root.findall("./ComparisonMethod/HrxrdAlignmentData/Data"):
if chain.get("LogicName") == "TwoTheta":
tth = chain.find("TheoreticalPosition").attrib["Value"]
off_tth = chain.find("PositionOffset").attrib["Value"]
tth = str(float(tth)-float(off_tth))
if chain.get("LogicName") == "Theta":
om = chain.find("TheoreticalPosition").attrib["Value"]
off_om = chain.find("PositionOffset").attrib["Value"]
om = str(float(om)-float(off_om))
if chain.get("LogicName") == "Chi": # Parse the RawData0.xml file
chi = chain.find("TheoreticalPosition").attrib["Value"] path = os.path.join(options['extract_folder'], 'RawData0.xml')
off_chi = chain.find("PositionOffset").attrib["Value"]
chi = str(float(chi)-float(off_chi))
if chain.get("LogicName") == "Phi": tree = ET.parse(path)
phi = chain.find("TheoreticalPosition").attrib["Value"] root = tree.getroot()
off_phi = chain.find("PositionOffset").attrib["Value"]
phi = str(float(phi)-float(off_phi))
if chain.get("LogicName") == "X": shutil.rmtree(options['extract_folder'])
tx = chain.find("TheoreticalPosition").attrib["Value"]
off_tx = chain.find("PositionOffset").attrib["Value"]
tx = str(float(tx)-float(off_tx))
if chain.get("LogicName") == "Y": diffractogram = []
ty = chain.find("TheoreticalPosition").attrib["Value"]
off_ty = chain.find("PositionOffset").attrib["Value"]
ty = str(float(ty)-float(off_ty))
os.remove(file)
#Create ouput file for chain in root.findall('./DataRoutes/DataRoute'):
outfile = open("tmp", "w", encoding='utf8') # Create output data file if chain.get('Description') == 'Originally measured data.':
outfile.write("#temperature khi phi x y theta offset 2theta scanning motor intensity time\n") for data in chain.findall('Datum'):
#***************************************************************************************************** data = data.text.split(',')
# Finds scan type, wl, scanning motors and fixed motors values in RawData*.xml twotheta, intensity = float(data[2]), float(data[3])
#*****************************************************************************************************
data_path = os.path.join(extract_path, "*0","*.xml") #reading files in Experiment0 folder if twotheta > 0:
for file in sorted(glob.glob(data_path), key=file_nb): diffractogram.append({'2th': twotheta, 'I': intensity})
new_file = 0
check_temperature = 0
check_1Dmode = 0
#parsing XML file
tree = ET.parse(file)
root = tree.getroot()
#obtain scan type
for chain in (root.findall("./DataRoutes/DataRoute/ScanInformation") or root.findall("./ScanInformation")):
scan_type = chain.get("VisibleName")
if ("PSD" in scan_type) or ("Psd" in scan_type):
scan_type = "PSDFIXED"
if ("Coupled" in scan_type) or ("coupled" in scan_type) or ("2Theta-Omega" in scan_type):
scan_type = "COUPLED"
if ("Rocking" in scan_type) or ("rocking" in scan_type):
scan_type = "THETA"
# Check if temperature is recorded diffractogram = pd.DataFrame(diffractogram)
for chain in (root.findall("./DataRoutes/DataRoute/DataViews/RawDataView/Recording") or root.findall("./DataViews/RawDataView/Recording")):
if "Temperature" in chain.get("LogicName"):
check_temperature = 1
#Find wl in RawData.xml
for chain in root.findall("./FixedInformation/Instrument/PrimaryTracks/TrackInfoData/MountedOptics/InfoData/Tube/WaveLengthAlpha1"):
wl = chain.get("Value")
# Find the fast-scanning axis
for chain in (root.findall("./DataRoutes/DataRoute/ScanInformation/ScanAxes/ScanAxisInfo") or root.findall("./ScanInformation/ScanAxes/ScanAxisInfo")):
if new_file == 0:
if chain.get("AxisName") == "TwoTheta": #Added offset correction / June 2017. Only relevant if offset in InstructionContainer. 0 otherwise.
off_scan = float(off_tth)
elif chain.get("AxisName") == "Theta":
off_scan = float(off_om)
else:
off_scan = 0
step = chain.find("Increment").text
start = chain.find("Start").text
stop = chain.find("Stop").text
ref = chain.find("Reference").text
start = str(float(ref)+float(start)-off_scan) #Added offset correction / June 2017.
#start = str(float(ref)+float(start))
new_file += 1
# Find scanning motors if options['save_folder']:
for chain in (root.findall("./DataRoutes/DataRoute/ScanInformation/ScanAxes/ScanAxisInfo") or root.findall("./ScanInformation/ScanAxes/ScanAxisInfo")): if not os.path.isdir(options['save_folder']):
if chain.get("AxisName") == "TwoTheta": os.makedirs(options['save_folder'])
tth = chain.find("Start").text
ref = chain.find("Reference").text
tth = str(float(ref)+float(tth)-float(off_tth)) #Added offset correction / June 2017.
#tth = str(float(ref)+float(tth))
if chain.get("AxisName") == "Theta": diffractogram.to_csv(options['save_folder'])
om = chain.find("Start").text
ref = chain.find("Reference").text
om = str(float(ref)+float(om)-float(off_om)) #Added offset correction / June 2017.
#om = str(float(ref)+float(om))
if chain.get("AxisName") == "Chi":
chi = chain.find("Start").text
ref = chain.find("Reference").text
chi = str(float(ref)+float(chi)-float(off_chi)) #Added offset correction / June 2017.
#chi = str(float(ref)+float(chi))
if chain.get("AxisName") == "Phi":
phi = chain.find("Start").text
ref = chain.find("Reference").text
phi = str(float(ref)+float(phi)-float(off_phi)) #Added offset correction / June 2017.
#phi = str(float(ref)+float(phi))
if chain.get("AxisName") == "X": return diffractogram
tx = chain.find("Start").text
ref = chain.find("Reference").text
tx = str(float(ref)+float(tx)-float(off_tx)) #Added offset correction / June 2017.
#tx = str(float(ref)+float(tx))
if chain.get("AxisName") == "Y":
ty = chain.find("Start").text
ref = chain.find("Reference").text
ty = str(float(ref)+float(ty)-float(off_ty)) #Added offset correction / June 2017.
#ty = str(float(ref)+float(ty))
# Find static motors
for chain in root.findall("./FixedInformation/Drives/InfoData"):
if chain.get("LogicName") == "TwoTheta":
tth = chain.find("Position").attrib["Value"]
tth = str(float(tth)-float(off_tth)) #Added offset correction / June 2017.
if chain.get("LogicName") == "Theta":
om = chain.find("Position").attrib["Value"]
om = str(float(om)-float(off_om)) #Added offset correction / June 2017.
if chain.get("LogicName") == "Chi":
chi = chain.find("Position").attrib["Value"]
chi = str(float(chi)-float(off_chi)) #Added offset correction / June 2017.
if chain.get("LogicName") == "Phi":
phi = chain.find("Position").attrib["Value"]
phi = str(float(phi)-float(off_phi)) #Added offset correction / June 2017.
if chain.get("LogicName") == "X":
tx = chain.find("Position").attrib["Value"]
tx = str(float(tx)-float(off_tx)) #Added offset correction / June 2017.
if chain.get("LogicName") == "Y":
ty = chain.find("Position").attrib["Value"]
ty = str(float(ty)-float(off_ty)) #Added offset correction / June 2017.
offset = str(float(om) - float(tth)/2.)
#*****************************************************************************************************
# This section computes scanning time, scanning angular range and scanning speed
# in order to convert 2th values to time values (July 2017, Julia Stroh)
#*****************************************************************************************************
for chain in (root.findall("./TimeStampStarted")):
d_start = ((chain.text).split("T")[0]).split("-")[2]
t_start = ((chain.text).split("T")[1]).split("+")[0]
h_start, min_start, sec_start = t_start.split(":")
t_start = float(h_start)*3600 + float(min_start)*60 + float(sec_start)
if file_nb(file)==0:
abs_start = t_start
for chain in (root.findall("./TimeStampFinished")):
d_stop = ((chain.text).split("T")[0]).split("-")[2]
t_stop = ((chain.text).split("T")[1]).split("+")[0]
h_stop, min_stop, sec_stop = t_stop.split(":")
t_stop = float(h_stop)*3600 + float(min_stop)*60 + float(sec_stop)
# Check if detector is in 1D mode
for chain in (root.findall("./DataRoutes/DataRoute/ScanInformation") or root.findall("./ScanInformation")):
if chain.find("TimePerStep").text != chain.find("TimePerStepEffective").text:
check_1Dmode = 1
# Check if day changed between start and stop and correct accordingly
if d_stop != d_start:
t_stop += 24*3600.
total_scan_time = t_stop - t_start
#scanning range
dth_scan = float(stop)-float(start)
#psd range
dth_psd = 0
for chain in root.findall("./FixedInformation/Detectors/InfoData/AngularOpening"):
dth_psd = chain.get("Value")
total_dth = float(dth_psd)*check_1Dmode+float(dth_scan)
scan_speed = total_dth / total_scan_time
#*****************************************************************************************************
# Finds intensity values. If temperature is recorded, also fin temperature values.
# The intensity data is formatted differently in PSDfixed mode and when temperature is recorded
#*****************************************************************************************************
if "PSDFIXED" in scan_type:
if check_temperature == 0:
for chain in (root.findall("./DataRoutes/DataRoute") or root.findall("./")):
intensity = (chain.find("Datum").text).split(',')
for chain in (root.findall("./DataRoutes/DataRoute/DataViews/RawDataView/Recording") or root.findall("./DataViews/RawDataView/Recording")):
if chain.get("LogicName") == "Counter1D":
n_channels = int(chain.find("Size/X").text)
line_count = 0
int_shift = len(intensity) - n_channels
for i in range(n_channels): #the intensity values are shifted to the right by int_shift
if i == 0:
scanning = float(start)
else:
scanning += float(step)
line_count += 1
t_2th = (t_start+n_day*24*3600 - abs_start)+((float(dth_psd)*check_1Dmode + scanning - float(start)) / scan_speed)
outfile.write("25" + " " + (chi) + " " + (phi)
+ " " + (tx) + " " + (ty) + " " + (om)
+ " " + (offset) + " " + (tth) + " " + str(scanning)
+ " " + intensity[i+int_shift] +" " + str(t_2th) +'\n')
else:
return implementation_warning, 0, 0
#if "COUPLED" in scan_type:
# to do check in brml that all scans (except psd fixed) share the same data structure (wrt temperature)
else:
if check_temperature == 0:
line_count = 0
for chain in (root.findall("./DataRoutes/DataRoute/Datum") or root.findall("./Datum")):
if line_count == 0:
scanning = float(start)
else:
scanning += float(step)
line_count += 1
intensity = (chain.text).split(',')[-1]
#compute time corresponding to scanning angle (July 2017)
t_2th = (t_start+n_day*24*3600 - abs_start)+((float(dth_psd)*check_1Dmode + scanning - float(start)) / scan_speed)
outfile.write("25" + " " + (chi) + " " + (phi)
+ " " + (tx) + " " + (ty) + " " + (om)
+ " " + (offset) + " " + (tth) + " " + str(round(scanning, 4))
+ " " + intensity + " " + str(t_2th) +'\n')
else:
line_count = 0
for chain in (root.findall("./DataRoutes/DataRoute/Datum") or root.findall("./Datum")):
if line_count == 0:
scanning = float(start)
else:
scanning += float(step)
line_count += 1
t_2th = (t_start+n_day*24*3600 - abs_start)+((float(dth_psd)*check_1Dmode + scanning - float(start)) / scan_speed)
intensity = (chain.text).split(',')[-2]
temperature = (chain.text).split(',')[-1]
outfile.write(temperature + " " + (chi) + " " + (phi)
+ " " + (tx) + " " + (ty) + " " + (om)
+ " " + (offset) + " " + (tth) + " " + str(round(scanning, 4))
+ " " + intensity + " " + str(t_2th) +'\n')
if d_stop != d_start:
n_day+=1
outfile.close()
if sys.platform == "win32":
os.system("RMDIR "+ extract_path +" /s /q")
elif sys.platform == "darwin":
os.system("rm -rf "+ extract_path)
elif sys.platform == "linux" or sys.platform == "linux2":
os.system("rm -rf "+ extract_path)
return scan_type, line_count, wl