Merge pull request #14 from rasmusthog/rasmus_electrochemistry

Merge new electrochemistry functionality into main
This commit is contained in:
Rasmus Vester Thøgersen 2022-08-22 08:45:11 +00:00 committed by GitHub
commit eb2666b85d
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 584 additions and 129 deletions

View file

@ -1,4 +1,3 @@
from email.policy import default
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
@ -7,6 +6,12 @@ import os
import nafuma.auxillary as aux
from sympy import re
# FIXME This is not good practice, but a temporary fix as I don't have time to understand what causes the SettingWithCopyWarning.
# Read this: https://www.dataquest.io/blog/settingwithcopywarning/
pd.set_option('mode.chained_assignment', None)
def read_data(data, options={}):
if data['kind'] == 'neware':
@ -50,10 +55,9 @@ def read_neware(path, options={}):
else:
df = pd.read_csv(csv_details)
elif path.split('.')[-1] == 'csv':
elif path.endswith('csv'):
df = pd.read_csv(path)
return df
@ -66,7 +70,9 @@ def read_batsmall(path):
Output:
df: pandas DataFrame containing the data as-is, but without additional NaN-columns.'''
df = pd.read_csv(path, skiprows=2, sep='\t')
# FIXME Now it is hardcoded that the decimal is a comma. It should do a check, as datasets can vary depending on the system settings of the machine that does the data conversion
df = pd.read_csv(path, skiprows=2, sep='\t', decimal=',')
df = df.loc[:, ~df.columns.str.contains('^Unnamed')]
return df
@ -129,9 +135,11 @@ def process_batsmall_data(df, options=None):
df = unit_conversion(df=df, options=options)
if options['splice_cycles']:
df = splice_cycles(df=df, options=options)
# Replace NaN with empty string in the Comment-column and then remove all steps where the program changes - this is due to inconsistent values for current
df[["comment"]] = df[["comment"]].fillna(value={'comment': ''})
df = df[df["comment"].str.contains("program")==False]
@ -207,6 +215,43 @@ def splice_cycles(df, options: dict) -> pd.DataFrame:
add = df['specific_capacity'].iloc[i-1]
df['specific_capacity'].iloc[i:last_chg] = df['specific_capacity'].iloc[i:last_chg] + add
if options['kind'] == 'neware':
if options['summary']:
for i in range(df['cycle'].max()):
sub_df = df.loc[df['cycle'] == i+1].copy()
if sub_df['status'].loc[sub_df['status'] == 'CC Chg'].count() > 1:
indices = sub_df.index[sub_df['status'] == 'CC Chg']
add_columns = ['capacity', 'specific_capacity', 'ions', 'energy', 'cycle_time']
for column in add_columns:
if column in df.columns:
df[column].iloc[indices[-1]] = df[column].iloc[indices[-1]] + df[column].iloc[indices[0]]
df.drop(index=indices[0], inplace=True)
df.reset_index(inplace=True, drop=True)
else:
for i in range(df['cycle'].max()):
sub_df = df.loc[df['cycle'] == i+1].copy()
sub_chg_df = sub_df.loc[sub_df['status'] == 'CC Chg'].copy()
steps_indices = sub_chg_df['steps'].unique()
if len(steps_indices) > 1:
add_columns = ['capacity', 'specific_capacity', 'ions', 'energy', 'cycle_time']
for column in add_columns:
if column in df.columns:
# Extract the maximum value from the first of the two cycles by accessing the column value of the highest index of the first cycle
add = df[column].iloc[df.loc[df['steps'] == steps_indices[0]].index.max()]
df[column].loc[df['steps'] == steps_indices[1]] += add
return df
@ -223,70 +268,116 @@ def process_neware_data(df, options={}):
active_materiale_weight: weight of the active material (in mg) used in the cell.
molecular_weight: the molar mass (in g mol^-1) of the active material, to calculate the number of ions extracted. Assumes one electron per Li+/Na+-ion """
required_options = ['units', 'active_material_weight', 'molecular_weight', 'reverse_discharge', 'splice_cycles']
required_options = ['units', 'active_material_weight', 'molecular_weight', 'reverse_discharge', 'splice_cycles', 'increment_cycles_from', 'delete_datapoints']
default_options = {
'units': None,
'active_material_weight': None,
'molecular_weight': None,
'reverse_discharge': False,
'splice_cycles': None}
'splice_cycles': None,
'increment_cycles_from': None,# index
'delete_datapoints': None, # list of indices
}
aux.update_options(options=options, required_options=required_options, default_options=default_options)
options['kind'] = 'neware'
# Complete set of new units and get the units used in the dataset, and convert values in the DataFrame from old to new.
set_units(options=options) # sets options['units']
options['old_units'] = get_old_units(df=df, options=options)
if not options['summary']:
# Complete set of new units and get the units used in the dataset, and convert values in the DataFrame from old to new.
set_units(options=options) # sets options['units']
options['old_units'] = get_old_units(df=df, options=options)
df = add_columns(df=df, options=options) # adds columns to the DataFrame if active material weight and/or molecular weight has been passed in options
df = add_columns(df=df, options=options) # adds columns to the DataFrame if active material weight and/or molecular weight has been passed in options
df = unit_conversion(df=df, options=options) # converts all units from the old units to the desired units
df = unit_conversion(df=df, options=options) # converts all units from the old units to the desired units
if options['increment_cycles_from']:
df['cycle'].iloc[options['increment_cycles_from']:] += 1
if options['delete_datapoints']:
for datapoint in options['delete_datapoints']:
df.drop(index=datapoint, inplace=True)
if options['splice_cycles']:
df = splice_cycles(df=df, options=options)
# Creates masks for charge and discharge curves
chg_mask = df['status'] == 'CC Chg'
dchg_mask = df['status'] == 'CC DChg'
# Creates masks for charge and discharge curves
chg_mask = df['status'] == 'CC Chg'
dchg_mask = df['status'] == 'CC DChg'
# Initiate cycles list
cycles = []
# Loop through all the cycling steps, change the current and capacities in the
for i in range(df["cycle"].max()):
sub_df = df.loc[df['cycle'] == i+1].copy()
#sub_df.loc[dchg_mask, 'current'] *= -1
#sub_df.loc[dchg_mask, 'capacity'] *= -1
chg_df = sub_df.loc[chg_mask]
dchg_df = sub_df.loc[dchg_mask]
# Continue to next iteration if the charge and discharge DataFrames are empty (i.e. no current)
if chg_df.empty and dchg_df.empty:
continue
# Reverses the discharge curve if specified
if options['reverse_discharge']:
max_capacity = dchg_df['capacity'].max()
dchg_df['capacity'] = np.abs(dchg_df['capacity'] - max_capacity)
if 'specific_capacity' in df.columns:
max_capacity = dchg_df['specific_capacity'].max()
dchg_df['specific_capacity'] = np.abs(dchg_df['specific_capacity'] - max_capacity)
if 'ions' in df.columns:
max_capacity = dchg_df['ions'].max()
dchg_df['ions'] = np.abs(dchg_df['ions'] - max_capacity)
cycles.append((chg_df, dchg_df))
# Initiate cycles list
cycles = []
return cycles
# Loop through all the cycling steps, change the current and capacities in the
for i in range(df["cycle"].max()):
sub_df = df.loc[df['cycle'] == i+1].copy()
chg_df = sub_df.loc[chg_mask]
dchg_df = sub_df.loc[dchg_mask]
# Continue to next iteration if the charge and discharge DataFrames are empty (i.e. no current)
if chg_df.empty and dchg_df.empty:
continue
# Reverses the discharge curve if specified
if options['reverse_discharge']:
max_capacity = dchg_df['capacity'].max()
dchg_df['capacity'] = np.abs(dchg_df['capacity'] - max_capacity)
if 'specific_capacity' in df.columns:
max_capacity = dchg_df['specific_capacity'].max()
dchg_df['specific_capacity'] = np.abs(dchg_df['specific_capacity'] - max_capacity)
if 'ions' in df.columns:
max_capacity = dchg_df['ions'].max()
dchg_df['ions'] = np.abs(dchg_df['ions'] - max_capacity)
cycles.append((chg_df, dchg_df))
return cycles
elif options['summary']:
set_units(options=options)
options['old_units'] = get_old_units(df=df, options=options)
df = add_columns(df=df, options=options)
df = unit_conversion(df=df, options=options)
if options['splice_cycles']:
df = splice_cycles(df=df, options=options)
chg_df = df.loc[df['status'] == 'CC Chg']
chg_df.reset_index(inplace=True)
dchg_df = df.loc[df['status'] == 'CC DChg']
dchg_df.reset_index(inplace=True)
# Construct new DataFrame
new_df = pd.DataFrame(chg_df["cycle"])
new_df.insert(1,'charge_capacity',chg_df['capacity'])
new_df.insert(1,'charge_specific_capacity',chg_df['specific_capacity'])
new_df.insert(1,'discharge_capacity',dchg_df['capacity'])
new_df.insert(1,'discharge_specific_capacity',dchg_df['specific_capacity'])
new_df.insert(1,'charge_energy',chg_df['energy'])
new_df.insert(1,'charge_specific_energy',chg_df['specific_energy'])
new_df.insert(1,'discharge_energy',dchg_df['energy'])
new_df.insert(1,'discharge_specific_energy',dchg_df['specific_energy'])
new_df = calculate_efficiency(df=new_df, options=options)
return new_df
def process_biologic_data(df, options=None):
@ -301,11 +392,22 @@ def process_biologic_data(df, options=None):
'splice_cycles': None}
# Check if the DataFrame contains GC or CV data.
# FIXME This might not be a very rigorous method of checking. E.g. Rest has mode == 3, so if loading a short GC with many Rest-datapoints, the mean will be 2 and it will be treated as CV. For now manual override is sufficient
if not 'mode' in options.keys():
options['mode'] = 'GC' if int(df['mode'].mean()) == 1 else 'CV'
aux.update_options(options=options, required_options=required_options, default_options=default_options)
options['kind'] = 'biologic'
# Pick out necessary columns
df = df[['Ns changes', 'Ns', 'time/s', 'Ewe/V', 'Energy charge/W.h', 'Energy discharge/W.h', '<I>/mA', 'Capacity/mA.h', 'cycle number']].copy()
headers = [
'Ns changes', 'Ns', 'time/s', 'Ewe/V', 'Energy charge/W.h', 'Energy discharge/W.h', '<I>/mA', 'Capacity/mA.h', 'cycle number' ] if options['mode'] == 'GC' else [
'ox/red', 'time/s', 'control/V', 'Ewe/V', '<I>/mA', 'cycle number', '(Q-Qo)/C', 'P/W'
]
df = df[headers].copy()
# Complete set of new units and get the units used in the dataset, and convert values in the DataFrame from old to new.
set_units(options)
@ -315,10 +417,15 @@ def process_biologic_data(df, options=None):
df = unit_conversion(df=df, options=options)
# Creates masks for charge and discharge curves
chg_mask = (df['status'] == 1) & (df['status_change'] != 1)
dchg_mask = (df['status'] == 2) & (df['status_change'] != 1)
# Creates masks for charge and discharge curves
if options['mode'] == 'GC':
chg_mask = (df['status'] == 1) & (df['status_change'] != 1)
dchg_mask = (df['status'] == 2) & (df['status_change'] != 1)
elif options['mode'] == 'CV':
chg_mask = (df['status'] == 1) # oxidation
dchg_mask = (df['status'] == 0) # reduction
# Initiate cycles list
cycles = []
@ -338,7 +445,7 @@ def process_biologic_data(df, options=None):
if chg_df.empty and dchg_df.empty:
continue
if options['reverse_discharge']:
if options['mode'] == 'GC' and options['reverse_discharge']:
max_capacity = dchg_df['capacity'].max()
dchg_df['capacity'] = np.abs(dchg_df['capacity'] - max_capacity)
@ -350,8 +457,12 @@ def process_biologic_data(df, options=None):
max_capacity = dchg_df['ions'].max()
dchg_df['ions'] = np.abs(dchg_df['ions'] - max_capacity)
cycles.append((chg_df, dchg_df))
if options['mode'] == 'CV':
chg_df = chg_df.sort_values(by='voltage').reset_index(drop=True)
dchg_df = dchg_df.sort_values(by='voltage', ascending=False).reset_index(drop=True)
cycles.append((chg_df, dchg_df))
return cycles
@ -360,8 +471,13 @@ def process_biologic_data(df, options=None):
def add_columns(df, options):
if options['kind'] == 'neware':
if options['summary']:
df[f'Energy({options["old_units"]["energy"]})'] = np.abs(df[f'Net discharge energy({options["old_units"]["energy"]})'])
if options['active_material_weight']:
df["SpecificCapacity({}/mg)".format(options['old_units']["capacity"])] = df["Capacity({})".format(options['old_units']['capacity'])] / (options['active_material_weight'])
df[f"SpecificCapacity({options['old_units']['capacity']}/mg)"] = df["Capacity({})".format(options['old_units']['capacity'])] / (options['active_material_weight'])
df[f"SpecificEnergy({options['old_units']['energy']}/mg)"] = df["Energy({})".format(options['old_units']['energy'])] / (options['active_material_weight'])
if options['molecular_weight']:
faradays_constant = 96485.3365 # [F] = C mol^-1 = As mol^-1
@ -389,6 +505,25 @@ def add_columns(df, options):
return df
def calculate_efficiency(df: pd.DataFrame, options: dict) -> pd.DataFrame:
default_options = {
'reference_index': 0
}
options = aux.update_options(options=options, required_options=default_options.keys(), default_options=default_options)
df['charge_capacity_fade'] = (df['charge_capacity'] / df['charge_capacity'].iloc[options['reference_index']])*100
df['discharge_capacity_fade'] = (df['discharge_capacity'] / df['discharge_capacity'].iloc[options['reference_index']])*100
df['coulombic_efficiency'] = (df['discharge_capacity'] / df['charge_capacity'])*100
df['energy_efficiency'] = (df['discharge_energy'] / df['charge_energy'])*100
return df
def unit_conversion(df, options):
from . import unit_tables
@ -403,47 +538,131 @@ def unit_conversion(df, options):
if options['kind'] == 'neware':
df['Current({})'.format(options['old_units']['current'])] = df['Current({})'.format(options['old_units']['current'])] * unit_tables.current()[options['old_units']['current']].loc[options['units']['current']]
df['Voltage({})'.format(options['old_units']['voltage'])] = df['Voltage({})'.format(options['old_units']['voltage'])] * unit_tables.voltage()[options['old_units']['voltage']].loc[options['units']['voltage']]
df['Capacity({})'.format(options['old_units']['capacity'])] = df['Capacity({})'.format(options['old_units']['capacity'])] * unit_tables.capacity()[options['old_units']['capacity']].loc[options['units']['capacity']]
df['Energy({})'.format(options['old_units']['energy'])] = df['Energy({})'.format(options['old_units']['energy'])] * unit_tables.energy()[options['old_units']['energy']].loc[options['units']['energy']]
df['CycleTime({})'.format(options['units']['time'])] = df.apply(lambda row : convert_time_string(row['Relative Time(h:min:s.ms)'], unit=options['units']['time']), axis=1)
df['RunTime({})'.format(options['units']['time'])] = df.apply(lambda row : convert_datetime_string(row['Real Time(h:min:s.ms)'], reference=df['Real Time(h:min:s.ms)'].iloc[0], unit=options['units']['time']), axis=1)
columns = ['status', 'jump', 'cycle', 'steps', 'current', 'voltage', 'capacity', 'energy']
if 'SpecificCapacity({}/mg)'.format(options['old_units']['capacity']) in df.columns:
df['SpecificCapacity({}/mg)'.format(options['old_units']['capacity'])] = df['SpecificCapacity({}/mg)'.format(options['old_units']['capacity'])] * unit_tables.capacity()[options['old_units']['capacity']].loc[options['units']['capacity']] / unit_tables.mass()['mg'].loc[options['units']["mass"]]
columns.append('specific_capacity')
record_number = 'Data serial number' if 'Data serial number' in df.columns else 'Record number'
relative_time = 'Relative Time(h:min:s.ms)' if 'Relative Time(h:min:s.ms)' in df.columns else 'Relative Time'
continuous_time = 'Continuous Time(h:min:s.ms)' if 'Continuous Time(h:min:s.ms)' in df.columns else 'Continuous Time'
real_time = 'Real Time(h:min:s.ms)' if 'Real Time(h:min:s.ms)' in df.columns else 'Real Time'
if options['summary']:
df[f'Energy({options["old_units"]["energy"]})'] = df[f'Energy({options["old_units"]["energy"]})'] * unit_tables.energy()[options['old_units']['energy']].loc[options['units']['energy']]
df[f'Starting current({options["old_units"]["current"]})'] = df[f'Starting current({options["old_units"]["current"]})'] * unit_tables.current()[options['old_units']['current']].loc[options['units']['current']]
df[f'Start Volt({options["old_units"]["voltage"]})'] = df[f'Start Volt({options["old_units"]["voltage"]})'] * unit_tables.voltage()[options['old_units']['voltage']].loc[options['units']['voltage']]
df[f'Capacity({options["old_units"]["capacity"]})'] = df[f'Capacity({options["old_units"]["capacity"]})'] * unit_tables.capacity()[options['old_units']['capacity']].loc[options['units']['capacity']]
df[f'Energy({options["old_units"]["energy"]})'] = df[f'Energy({options["old_units"]["energy"]})'] * unit_tables.energy()[options['old_units']['energy']].loc[options['units']['energy']]
df[f'CycleTime({options["units"]["time"]})'] = df.apply(lambda row : convert_time_string(row[relative_time], unit=options['units']['time']), axis=1)
df[f'RunTime({options["units"]["time"]})'] = df.apply(lambda row : convert_datetime_string(row[real_time], reference=df[real_time].iloc[0], ref_time=df[f'CycleTime({options["units"]["time"]})'].iloc[0],unit=options['units']['time']), axis=1)
droplist = [
'Chnl',
'Original step',
f'End Volt({options["old_units"]["voltage"]})',
f'Termination current({options["old_units"]["current"]})',
relative_time,
real_time,
continuous_time,
f'Net discharge capacity({options["old_units"]["capacity"]})',
f'Chg Cap({options["old_units"]["capacity"]})',
f'DChg Cap({options["old_units"]["capacity"]})',
f'Net discharge energy({options["old_units"]["energy"]})',
f'Chg Eng({options["old_units"]["energy"]})',
f'DChg Eng({options["old_units"]["energy"]})'
]
# Drop all undesireable columns
for drop in droplist:
if drop in df.columns:
df.drop(drop, axis=1, inplace=True)
columns = ['cycle', 'steps', 'status', 'voltage', 'current', 'capacity', 'energy']
# Add column labels for specific capacity and ions if they exist
if 'SpecificCapacity({}/mg)'.format(options['old_units']['capacity']) in df.columns:
df['SpecificCapacity({}/mg)'.format(options['old_units']['capacity'])] = df['SpecificCapacity({}/mg)'.format(options['old_units']['capacity'])] * unit_tables.capacity()[options['old_units']['capacity']].loc[options['units']['capacity']] / unit_tables.mass()['mg'].loc[options['units']["mass"]]
columns.append('specific_capacity')
if f'SpecificEnergy({options["old_units"]["energy"]}/mg)' in df.columns:
df[f'SpecificEnergy({options["old_units"]["energy"]}/mg)'] = df[f'SpecificEnergy({options["old_units"]["energy"]}/mg)'] * unit_tables.energy()[options['old_units']['energy']].loc[options['units']['energy']] / unit_tables.mass()['mg'].loc[options['units']["mass"]]
columns.append('specific_energy')
if 'IonsExtracted' in df.columns:
columns.append('ions')
# Append energy column label here as it was the last column to be generated
columns.append('cycle_time')
columns.append('runtime')
# Apply new column labels
df.columns = columns
columns.append('cycle_time')
columns.append('time')
else:
df['Current({})'.format(options['old_units']['current'])] = df['Current({})'.format(options['old_units']['current'])] * unit_tables.current()[options['old_units']['current']].loc[options['units']['current']]
df['Voltage({})'.format(options['old_units']['voltage'])] = df['Voltage({})'.format(options['old_units']['voltage'])] * unit_tables.voltage()[options['old_units']['voltage']].loc[options['units']['voltage']]
df['Capacity({})'.format(options['old_units']['capacity'])] = df['Capacity({})'.format(options['old_units']['capacity'])] * unit_tables.capacity()[options['old_units']['capacity']].loc[options['units']['capacity']]
df['Energy({})'.format(options['old_units']['energy'])] = df['Energy({})'.format(options['old_units']['energy'])] * unit_tables.energy()[options['old_units']['energy']].loc[options['units']['energy']]
df['CycleTime({})'.format(options['units']['time'])] = df.apply(lambda row : convert_time_string(row[relative_time], unit=options['units']['time']), axis=1)
df['RunTime({})'.format(options['units']['time'])] = df.apply(lambda row : convert_datetime_string(row[real_time], reference=df[real_time].iloc[0], ref_time=df[f'CycleTime({options["units"]["time"]})'].iloc[0], unit=options['units']['time']), axis=1)
columns = ['status', 'jump', 'cycle', 'steps', 'current', 'voltage', 'capacity', 'energy']
if 'SpecificCapacity({}/mg)'.format(options['old_units']['capacity']) in df.columns:
df['SpecificCapacity({}/mg)'.format(options['old_units']['capacity'])] = df['SpecificCapacity({}/mg)'.format(options['old_units']['capacity'])] * unit_tables.capacity()[options['old_units']['capacity']].loc[options['units']['capacity']] / unit_tables.mass()['mg'].loc[options['units']["mass"]]
columns.append('specific_capacity')
if f'SpecificEnergy({options["old_units"]["energy"]}/mg)' in df.columns:
df[f'SpecificEnergy({options["old_units"]["energy"]}/mg)'] = df[f'SpecificEnergy({options["old_units"]["energy"]}/mg)'] * unit_tables.energy()[options['old_units']['energy']].loc[options['units']['energy']] / unit_tables.mass()['mg'].loc[options['units']["mass"]]
columns.append('specific_energy')
df.drop(['Record number', 'Relative Time(h:min:s.ms)', 'Real Time(h:min:s.ms)'], axis=1, inplace=True)
if 'IonsExtracted' in df.columns:
columns.append('ions')
df.columns = columns
columns.append('cycle_time')
columns.append('time')
droplist = [record_number, relative_time, real_time]
for drop in droplist:
if drop in df.columns:
df.drop(drop, axis=1, inplace=True)
df.columns = columns
if options['kind'] == 'biologic':
df['time/{}'.format(options['old_units']['time'])] = df["time/{}".format(options['old_units']["time"])] * unit_tables.time()[options['old_units']["time"]].loc[options['units']['time']]
df["Ewe/{}".format(options['old_units']["voltage"])] = df["Ewe/{}".format(options['old_units']["voltage"])] * unit_tables.voltage()[options['old_units']["voltage"]].loc[options['units']['voltage']]
df["<I>/{}".format(options['old_units']["current"])] = df["<I>/{}".format(options['old_units']["current"])] * unit_tables.current()[options['old_units']["current"]].loc[options['units']['current']]
for column in df.columns:
if 'time' in column:
df['time/{}'.format(options['old_units']['time'])] = df["time/{}".format(options['old_units']["time"])] * unit_tables.time()[options['old_units']["time"]].loc[options['units']['time']]
capacity = options['old_units']['capacity'].split('h')[0] + '.h'
df["Capacity/{}".format(capacity)] = df["Capacity/{}".format(capacity)] * (unit_tables.capacity()[options['old_units']["capacity"]].loc[options['units']["capacity"]])
if 'Ewe' in column:
df["Ewe/{}".format(options['old_units']["voltage"])] = df["Ewe/{}".format(options['old_units']["voltage"])] * unit_tables.voltage()[options['old_units']["voltage"]].loc[options['units']['voltage']]
columns = ['status_change', 'status', 'time', 'voltage', 'energy_charge', 'energy_discharge', 'current', 'capacity', 'cycle']
if '<I>' in column:
df["<I>/{}".format(options['old_units']["current"])] = df["<I>/{}".format(options['old_units']["current"])] * unit_tables.current()[options['old_units']["current"]].loc[options['units']['current']]
if 'SpecificCapacity({}/mg)'.format(options['old_units']['capacity']) in df.columns:
df['SpecificCapacity({}/mg)'.format(options['old_units']['capacity'])] = df['SpecificCapacity({}/mg)'.format(options['old_units']['capacity'])] * unit_tables.capacity()[options['old_units']['capacity']].loc[options['units']['capacity']] / unit_tables.mass()['mg'].loc[options['units']["mass"]]
columns.append('specific_capacity')
if 'Capacity' in column:
capacity = options['old_units']['capacity'].split('h')[0] + '.h'
df["Capacity/{}".format(capacity)] = df["Capacity/{}".format(capacity)] * (unit_tables.capacity()[options['old_units']["capacity"]].loc[options['units']["capacity"]])
if 'IonsExtracted' in df.columns:
columns.append('ions')
columns = [
'status_change', 'status', 'time', 'voltage', 'energy_charge', 'energy_discharge', 'current', 'capacity', 'cycle'] if options['mode'] == 'GC' else [ # GC headers
'status', 'time', 'control_voltage', 'voltage', 'current', 'cycle', 'charge', 'power' # CV headers
]
if options['mode'] == 'GC':
if 'SpecificCapacity({}/mg)'.format(options['old_units']['capacity']) in df.columns:
df['SpecificCapacity({}/mg)'.format(options['old_units']['capacity'])] = df['SpecificCapacity({}/mg)'.format(options['old_units']['capacity'])] * unit_tables.capacity()[options['old_units']['capacity']].loc[options['units']['capacity']] / unit_tables.mass()['mg'].loc[options['units']["mass"]]
columns.append('specific_capacity')
if 'IonsExtracted' in df.columns:
columns.append('ions')
df.columns = columns
@ -479,22 +698,34 @@ def get_old_units(df: pd.DataFrame, options: dict) -> dict:
if options['kind'] == 'batsmall':
time = df.columns[0].split()[-1].strip('[]')
voltage = df.columns[1].split()[-1].strip('[]')
current = df.columns[2].split()[-1].strip('[]')
capacity, mass = df.columns[4].split()[-1].strip('[]').split('/')
old_units = {'time': time, 'current': current, 'voltage': voltage, 'capacity': capacity, 'mass': mass}
old_units = {}
for column in df.columns:
if 'TT [' in column:
old_units['time'] = column.split()[-1].strip('[]')
elif 'U [' in column:
old_units['voltage'] = column.split()[-1].strip('[]')
elif 'I [' in column:
old_units['current'] = column.split()[-1].strip('[]')
elif 'C [' in column:
old_units['capacity'], old_units['mass'] = column.split()[-1].strip('[]').split('/')
# time = df.columns[0].split()[-1].strip('[]')
# voltage = df.columns[1].split()[-1].strip('[]')
# current = df.columns[2].split()[-1].strip('[]')
# capacity, mass = df.columns[4].split()[-1].strip('[]').split('/')
# old_units = {'time': time, 'current': current, 'voltage': voltage, 'capacity': capacity, 'mass': mass}
if options['kind']=='neware':
for column in df.columns:
if 'Voltage' in column:
if 'Voltage' in column or 'Start Volt' in column:
voltage = column.split('(')[-1].strip(')')
elif 'Current' in column:
elif 'Current' in column or 'Starting current' in column:
current = column.split('(')[-1].strip(')')
elif 'Capacity' in column:
capacity = column.split('(')[-1].strip(')')
elif 'Energy' in column:
elif 'Energy' in column or 'Eng' in column:
energy = column.split('(')[-1].strip(')')
old_units = {'voltage': voltage, 'current': current, 'capacity': capacity, 'energy': energy}
@ -502,19 +733,18 @@ def get_old_units(df: pd.DataFrame, options: dict) -> dict:
if options['kind'] == 'biologic':
old_units = {}
for column in df.columns:
if 'time' in column:
time = column.split('/')[-1]
old_units['time'] = column.split('/')[-1]
elif 'Ewe' in column:
voltage = column.split('/')[-1]
old_units['voltage'] = column.split('/')[-1]
elif 'Capacity' in column:
capacity = column.split('/')[-1].replace('.', '')
old_units['capacity'] = column.split('/')[-1].replace('.', '')
elif 'Energy' in column:
energy = column.split('/')[-1].replace('.', '')
old_units['energy'] = column.split('/')[-1].replace('.', '')
elif '<I>' in column:
current = column.split('/')[-1]
old_units = {'voltage': voltage, 'current': current, 'capacity': capacity, 'energy': energy, 'time': time}
old_units['current'] = column.split('/')[-1]
return old_units
@ -532,7 +762,7 @@ def convert_time_string(time_string, unit='ms'):
def convert_datetime_string(datetime_string, reference, unit='s'):
def convert_datetime_string(datetime_string, reference, ref_time, unit='s'):
''' Convert time string from Neware-data with the format yyy-mm-dd hh:mm:ss to any given unit'''
from datetime import datetime
@ -555,7 +785,7 @@ def convert_datetime_string(datetime_string, reference, unit='s'):
factors = {'ms': 1000, 's': 1, 'min': 1/(60), 'h': 1/(60*60)}
time = s * factors[unit]
time = s * factors[unit] + ref_time
return time

View file

@ -1,9 +1,13 @@
from pickle import MARK
import matplotlib.pyplot as plt
from matplotlib.ticker import (MultipleLocator, FormatStrFormatter,AutoMinorLocator)
import pandas as pd
import numpy as np
import math
import os
import shutil
from PIL import Image
import ipywidgets as widgets
from IPython.display import display
@ -19,10 +23,15 @@ def plot_gc(data, options=None):
# Update options
required_options = ['x_vals', 'y_vals', 'which_cycles', 'charge', 'discharge', 'colours', 'differentiate_charge_discharge', 'gradient', 'interactive', 'interactive_session_active', 'rc_params', 'format_params']
required_options = ['force_reload', 'x_vals', 'y_vals', 'which_cycles', 'limit', 'exclude_cycles', 'show_plot', 'summary', 'charge', 'discharge', 'colours', 'differentiate_charge_discharge', 'gradient', 'interactive', 'interactive_session_active', 'rc_params', 'format_params', 'save_gif', 'save_path', 'fps']
default_options = {
'force_reload': False,
'x_vals': 'capacity', 'y_vals': 'voltage',
'which_cycles': 'all',
'limit': None, # Limit line to be drawn
'exclude_cycles': [],
'show_plot': True,
'summary': False,
'charge': True, 'discharge': True,
'colours': None,
'differentiate_charge_discharge': True,
@ -30,20 +39,22 @@ def plot_gc(data, options=None):
'interactive': False,
'interactive_session_active': False,
'rc_params': {},
'format_params': {}}
'format_params': {},
'save_gif': False,
'save_path': 'animation.gif',
'fps': 1
}
options = aux.update_options(options=options, required_options=required_options, default_options=default_options)
if not 'cycles' in data.keys():
# Read data if not already loaded
if not 'cycles' in data.keys() or options['force_reload']:
data['cycles'] = ec.io.read_data(data=data, options=options)
# Update list of cycles to correct indices
update_cycles_list(cycles=data['cycles'], options=options)
colours = generate_colours(cycles=data['cycles'], options=options)
# Update list of cycles to correct indices
update_cycles_list(data=data, options=options)
if options['interactive']:
options['interactive'], options['interactive_session_active'] = False, True
@ -51,30 +62,121 @@ def plot_gc(data, options=None):
return
# Prepare plot, and read and process data
fig, ax = btp.prepare_plot(options=options)
colours = generate_colours(cycles=data['cycles'], options=options)
if not options['summary']:
for i, cycle in enumerate(data['cycles']):
if i in options['which_cycles']:
if options['show_plot']:
# Prepare plot
fig, ax = btp.prepare_plot(options=options)
for i, cycle in enumerate(data['cycles']):
if i in options['which_cycles']:
if options['charge']:
cycle[0].plot(x=options['x_vals'], y=options['y_vals'], ax=ax, c=colours[i][0])
if options['discharge']:
cycle[1].plot(x=options['x_vals'], y=options['y_vals'], ax=ax, c=colours[i][1])
if options['interactive_session_active']:
update_labels(options, force=True)
else:
update_labels(options)
if options['save_gif'] and not options['interactive_session_active']:
if not os.path.isdir('tmp'):
os.makedirs('tmp')
# Scale image to make GIF smaller
options['format_params']['width'] = 7.5
options['format_params']['height'] = 3
options['format_params']['dpi'] = 200
for i, cycle in enumerate(data['cycles']):
if i in options['which_cycles']:
giffig, gifax = btp.prepare_plot(options=options)
if options['charge']:
cycle[0].plot(x=options['x_vals'], y=options['y_vals'], ax=gifax, c=colours[i][0])
if options['discharge']:
cycle[1].plot(x=options['x_vals'], y=options['y_vals'], ax=gifax, c=colours[i][1])
gifax.text(x=gifax.get_xlim()[1]*0.8, y=3, s=f'{i+1}')
update_labels(options)
giffig, gifax = btp.adjust_plot(fig=giffig, ax=gifax, options=options)
plt.savefig(os.path.join('tmp', str(i+1).zfill(4)+'.png'))
plt.close()
img_paths = [os.path.join('tmp', path) for path in os.listdir('tmp') if path.endswith('png')]
frames = []
for path in img_paths:
frame = Image.open(path)
frames.append(frame)
frames[0].save(options['save_path'], format='GIF', append_images=frames[1:], save_all=True, duration=(1/options['fps'])*1000, loop=0)
shutil.rmtree('tmp')
elif options['summary'] and options['show_plot']:
# Prepare plot
fig, ax = btp.prepare_plot(options=options)
mask = []
for i in range(data['cycles'].shape[0]):
if i+1 in options['which_cycles']:
mask.append(True)
else:
mask.append(False)
# Drop the last row if it is midway through a charge in order to avoid mismatch of length of mask and dataset.
if len(mask) > data['cycles'].shape[0]:
del mask[-1]
data['cycles'].drop(data['cycles'].tail(1).index, inplace=True)
# FIXME To begin, the default is that y-values correspond to x-values. This should probably be implemented in more logical and consistent manner in the future.
if options['x_vals'] in ['coulombic_efficiency', 'energy_efficiency']:
data['cycles'].loc[mask].plot(x='cycle', y=options['x_vals'], ax=ax, color=colours[0][0], kind='scatter', marker="$\u25EF$", s=plt.rcParams['lines.markersize'])
if options['limit']:
ax.axhline(y=options['limit'], ls='--', c='black')
else:
if options['charge']:
cycle[0].plot(x=options['x_vals'], y=options['y_vals'], ax=ax, c=colours[i][0])
yval = 'charge_' + options['x_vals']
data['cycles'].loc[mask].plot(x='cycle', y=yval, ax=ax, color=colours[0][0], kind='scatter', marker="$\u25EF$", s=plt.rcParams['lines.markersize'])
if options['discharge']:
cycle[1].plot(x=options['x_vals'], y=options['y_vals'], ax=ax, c=colours[i][1])
yval = 'discharge_' + options['x_vals']
data['cycles'].loc[mask].plot(x='cycle', y=yval, ax=ax, color=colours[0][1], kind='scatter', marker="$\u25EF$", s=plt.rcParams['lines.markersize'])
if options['interactive_session_active']:
update_labels(options, force=True)
if options['limit']:
ax.axhline(y=options['limit'], ls='--', c='black')
if options['interactive_session_active']:
update_labels(options, force=True)
else:
update_labels(options)
if options['show_plot']:
fig, ax = btp.adjust_plot(fig=fig, ax=ax, options=options)
return data['cycles'], fig, ax
else:
update_labels(options)
fig, ax = btp.adjust_plot(fig=fig, ax=ax, options=options)
#if options['interactive_session_active']:
return data['cycles'], fig, ax
return data['cycles'], None, None
def plot_gc_interactive(data, options):
@ -90,6 +192,107 @@ def plot_gc_interactive(data, options):
display(w)
def plot_cv(data, options):
# Update options
required_options = ['force_reload', 'x_vals', 'y_vals', 'which_cycles', 'limit', 'exclude_cycles', 'show_plot', 'charge', 'discharge', 'colours', 'differentiate_charge_discharge', 'gradient', 'interactive', 'interactive_session_active', 'rc_params', 'format_params', 'save_gif', 'save_path', 'fps']
default_options = {
'force_reload': False,
'x_vals': 'voltage', 'y_vals': 'current',
'which_cycles': 'all',
'limit': None, # Limit line to be drawn
'exclude_cycles': [],
'show_plot': True,
'charge': True, 'discharge': True,
'colours': None,
'differentiate_charge_discharge': True,
'gradient': False,
'interactive': False,
'interactive_session_active': False,
'rc_params': {},
'format_params': {},
'save_gif': False,
'save_path': 'animation.gif',
'fps': 1
}
options = aux.update_options(options=options, required_options=required_options, default_options=default_options)
# Read data if not already loaded
if not 'cycles' in data.keys() or options['force_reload']:
data['cycles'] = ec.io.read_data(data=data, options=options)
# Update list of cycles to correct indices
update_cycles_list(data=data, options=options)
colours = generate_colours(cycles=data['cycles'], options=options)
if options['show_plot']:
# Prepare plot
fig, ax = btp.prepare_plot(options=options)
for i, cycle in enumerate(data['cycles']):
if i in options['which_cycles']:
if options['charge']:
cycle[0].plot(x=options['x_vals'], y=options['y_vals'], ax=ax, c=colours[i][0])
if options['discharge']:
cycle[1].plot(x=options['x_vals'], y=options['y_vals'], ax=ax, c=colours[i][1])
update_labels(options)
if options['save_gif'] and not options['interactive_session_active']:
if not os.path.isdir('tmp'):
os.makedirs('tmp')
# Scale image to make GIF smaller
options['format_params']['width'] = 7.5
options['format_params']['height'] = 3
options['format_params']['dpi'] = 200
for i, cycle in enumerate(data['cycles']):
if i in options['which_cycles']:
giffig, gifax = btp.prepare_plot(options=options)
if options['charge']:
cycle[0].plot(x=options['x_vals'], y=options['y_vals'], ax=gifax, c=colours[i][0])
if options['discharge']:
cycle[1].plot(x=options['x_vals'], y=options['y_vals'], ax=gifax, c=colours[i][1])
gifax.text(x=gifax.get_xlim()[1]*0.8, y=3, s=f'{i+1}')
update_labels(options)
giffig, gifax = btp.adjust_plot(fig=giffig, ax=gifax, options=options)
plt.savefig(os.path.join('tmp', str(i+1).zfill(4)+'.png'))
plt.close()
img_paths = [os.path.join('tmp', path) for path in os.listdir('tmp') if path.endswith('png')]
frames = []
for path in img_paths:
frame = Image.open(path)
frames.append(frame)
frames[0].save(options['save_path'], format='GIF', append_images=frames[1:], save_all=True, duration=(1/options['fps'])*1000, loop=0)
shutil.rmtree('tmp')
if options['show_plot']:
fig, ax = btp.adjust_plot(fig=fig, ax=ax, options=options)
return data['cycles'], fig, ax
else:
return data['cycles'], None, None
def update_labels(options, force=False):
if 'xlabel' not in options.keys() or force:
@ -118,30 +321,47 @@ def update_labels(options, force=False):
def update_cycles_list(cycles, options: dict) -> None:
def update_cycles_list(data, options: dict) -> None:
if options['which_cycles'] == 'all':
options['which_cycles'] = [i for i in range(len(cycles))]
options['which_cycles'] = [i for i in range(len(data['cycles']))]
elif type(options['which_cycles']) == list:
options['which_cycles'] = [i-1 for i in options['which_cycles']]
elif isinstance(options['which_cycles'], list):
cycles =[]
for cycle in options['which_cycles']:
if isinstance(cycle, int):
cycles.append(cycle-1)
elif isinstance(cycle, tuple):
interval = [i-1 for i in range(cycle[0], cycle[1]+1)]
cycles.extend(interval)
options['which_cycles'] = cycles
# Tuple is used to define an interval - as elements tuples can't be assigned, I convert it to a list here.
elif type(options['which_cycles']) == tuple:
elif isinstance(options['which_cycles'], tuple):
which_cycles = list(options['which_cycles'])
if which_cycles[0] <= 0:
which_cycles[0] = 1
elif which_cycles[1] < 0:
which_cycles[1] = len(cycles)
which_cycles[1] = len(options['which_cycles'])
options['which_cycles'] = [i-1 for i in range(which_cycles[0], which_cycles[1]+1)]
for i, cycle in enumerate(options['which_cycles']):
if cycle in options['exclude_cycles']:
del options['which_cycles'][i]
def prettify_gc_plot(fig, ax, options=None):

View file

@ -182,8 +182,13 @@ def adjust_plot(fig, ax, options):
# Hide x- and y-ticks:
if options['hide_y_ticks']:
ax.tick_params(axis='y', direction='in', which='both', left=False, right=False)
else:
ax.tick_params(axis='y', direction='in', which='both', left=True, right=True)
if options['hide_x_ticks']:
ax.tick_params(axis='x', direction='in', which='both', bottom=False, top=False)
else:
ax.tick_params(axis='x', direction='in', which='both', bottom=True, top=True)