Skip to content

Instantly share code, notes, and snippets.

@dehowell
Last active July 23, 2020 21:32
Show Gist options
  • Save dehowell/4b1c71f577e7c7a722c02ca021e4f99e to your computer and use it in GitHub Desktop.
Save dehowell/4b1c71f577e7c7a722c02ca021e4f99e to your computer and use it in GitHub Desktop.
Script for correcting Jamie's data files
#!/usr/local/bin/python3
import csv
import itertools
import os
import os.path
import statistics
import sys
import traceback
OUTPUT_PATH = 'Transformed Data Files'
def read_rpt(filename):
with open(filename) as f:
header = f.readline()
keys = header.split()
for line in f:
vals = line.split()
row = dict(zip(keys, vals))
yield row
def find_event(records, event_name):
times = [r['Time'] for r in records if r['Ev'] == event_name]
times = [int(t) for t in times]
return {
'event_name': event_name,
'start': min(times),
'end': max(times)
}
def split_event(record, event):
split_point = event['end'] - 3 * 60
if record['Ev'] == event['event_name']:
before_split = int(record['Time']) <= split_point
new_event_name = record['Ev'] + (before_split and 'a' or 'b')
new_record = {**record}
new_record['Ev'] = new_event_name
return new_record
else:
return record
def aggregate_by_event(records):
for event_name, group in itertools.groupby(records, key=lambda r: r['Ev']):
group = list(group)
def mean_of(column):
not_missing = [float(r[column]) for r in group if r[column] != '.']
if len(not_missing) > 0:
return statistics.mean(not_missing)
else:
return '.'
group_summary = {
'Ev': event_name,
'Time': min([r['Time'] for r in group])
}
for col in ['HR', 'RSA', 'TidVol', 'ResPer', 'SCL', 'SCR', 'Syst', 'Dias', 'Rate']:
group_summary[col] = mean_of(col)
yield group_summary
def process_input_file(input_file):
records = list(read_rpt(input_file))
event1 = find_event(records, '1')
corrected = [split_event(r, event1) for r in records]
basename = os.path.basename(input_file)
output_path = os.path.join(os.path.dirname(input_file), OUTPUT_PATH)
os.makedirs(output_path, exist_ok=True)
output_rpt_file = os.path.join(output_path, basename)
with open(output_rpt_file, 'w') as f:
print("Transforming corrected RPT files for {} at {}".format(input_file, output_rpt_file))
f.write('Time Ev HR RSA TidVol ResPer SCL SCR Syst Dias Rate\n')
for row in corrected:
line = '{Time} {Ev:>2} {HR:>6} {RSA:>4} {TidVol:>7} {ResPer:>7} {SCL:>6} {SCR:>6} {Syst:>5} {Dias:>5} {Rate:>4}\n'.format(**row)
f.write(line)
output_sum_file = os.path.join(output_path, os.path.splitext(basename)[0] + '.SUM')
with open(output_sum_file, 'w') as f:
print("Computing SUMMARY files for {} at {}".format(input_file, output_sum_file))
writer = csv.DictWriter(f, fieldnames=['Ev', 'Time', 'HR', 'RSA', 'TidVol', 'ResPer', 'SCL', 'SCR', 'Syst', 'Dias', 'Rate'])
writer.writeheader()
for row in aggregate_by_event(corrected):
writer.writerow(row)
def main(input_files):
for input_file in input_files:
try:
process_input_file(input_file)
except:
print('Error processing file: {}'.format(input_file))
traceback.print_exc()
if __name__ == '__main__':
main(sys.argv[1:])
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment