Created
February 8, 2021 23:06
-
-
Save pcarleton/2c7d754495caf86ccc51cc4e119bf404 to your computer and use it in GitHub Desktop.
Pulling amazon orders data from CSV's
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#! /usr/bin/env python | |
# Imports CSV's for "Items" and "Orders and Shipments" as downloaded | |
# from: https://www.amazon.com/gp/b2b/reports/ | |
# | |
# The output of this script is a single JSON file combining the data from both | |
# files for easier management. | |
# | |
# | |
# It assumes you're dealing with one batch (i.e. 1 Items, 1 Orders) file located | |
# in a folder. It will copy the files to the documents directory and optionally | |
# delete the original files. | |
# | |
# File format gotchas: | |
# * There can be multiple lines in Orders for the same order ID if they shipped separately | |
# * TODO: This may or may not end up as a single transaction on the CC statement | |
import csv | |
import json | |
import sys | |
import os | |
import re | |
import datetime | |
import shutil | |
DEFAULT_DOWNLOAD_DIRECTORY = '~/Downloads' | |
FILE_DIRECTORY = os.path.dirname(os.path.abspath(__file__)) | |
DEFAULT_INCOMING_DIRECTORY = os.path.join(FILE_DIRECTORY, '..', '..', 'incoming') | |
DEFAULT_ARCHIVE_DIRECTORY = os.path.join(FILE_DIRECTORY, '..', '..', 'documents', 'Liabilities', 'Chase', 'Amazon') | |
ORDER_DATE_FORMAT = '%m/%d/%y' | |
FILE_DATE_FORMAT = '%d-%b-%Y' | |
# Dealing with file contents | |
def make_orders_dict(item_file_name=None, orders_file_name=None): | |
orders = {} | |
with open(orders_file_name) as f: | |
for index, row in enumerate(csv.DictReader(f)): | |
data = orders.get(row['Order ID']) or {'shipments': [], 'items': [], 'order_details':{}} | |
details = data['order_details'] | |
details['Order ID'] = row['Order ID'] | |
details['Order Date'] = row['Order Date'] | |
details['totals'] = details.get('totals', []) + [row['Total Charged']] | |
data['shipments'].append(row) | |
orders[row['Order ID']] = data | |
with open(item_file_name) as f: | |
for index, row in enumerate(csv.DictReader(f)): | |
if row['Order ID'] not in orders: | |
print(f"Couldn't find {row['Order ID']} in orders, line:") | |
print(f"\t{row}") | |
continue | |
order = orders[row['Order ID']] | |
order['items'].append(row) | |
return orders | |
def get_cols(filename): | |
with open(filename, 'r') as f: | |
return set(f.readline().split(',')) | |
def orders_or_items(filename): | |
cols = get_cols(filename) | |
if 'Item Subtotal' in cols: | |
return 'items' | |
return 'orders' | |
# Parsing file names | |
def identify(filename): | |
return re.match(r'^[0-9]{2}-(Jan|Feb|Mar|Apr|May|Jun|Jul|Aug|Sep|Oct|Nov|Dec)-202[0-9]', filename) | |
def parse_range(filename): | |
fname = os.path.basename(filename) | |
ignore_dupes = fname.split(" ")[0].split('.')[0] | |
pieces = ignore_dupes.split("_") | |
return [datetime.datetime.strptime(p, FILE_DATE_FORMAT) for p in [pieces[0], pieces[2]]] | |
def get_meta(filename): | |
date_range = parse_range(filename) | |
file_type = orders_or_items(filename) | |
new_filename = '{file_type}_{start}_{end}.csv'.format( | |
file_type=file_type, | |
start=date_range[0].date().isoformat(), | |
end=date_range[1].date().isoformat(), | |
) | |
new_path = os.path.join(DEFAULT_ARCHIVE_DIRECTORY, new_filename) | |
return { | |
'filename': filename, | |
'daterange': date_range, | |
'file_type': file_type, | |
'new_filename': new_filename, | |
'new_path': new_path | |
} | |
def get_matches(directory=DEFAULT_DOWNLOAD_DIRECTORY): | |
full_dir = os.path.expanduser(directory) | |
files = os.listdir(full_dir) | |
paths = [os.path.join(full_dir, f) for f in files if identify(f)] | |
return paths | |
def main(): | |
# Find the downloaded tiles | |
matches = get_matches() | |
# We want an "Orders" file for the totals, and an "Items" file for the | |
# product descriptions | |
if len(matches) != 2: | |
print('Found {} matches, wanted 2, exiting'.format(len(matches))) | |
sys.exit(1) | |
print("Found 2 matches:\n{}\n".format("\n".join([os.path.basename(m) for m in matches]))) | |
# metadata tells us file type and new file names | |
meta = [get_meta(f) for f in matches] | |
ftypes = {m['file_type']: m for m in meta} | |
# Parse the contents into something useful | |
orders = make_orders_dict( | |
item_file_name=ftypes['items']['filename'], | |
orders_file_name=ftypes['orders']['filename']) | |
# Save the result to a file for later ingesting | |
combined_file_name = "amazon-orders-combined_{start}_{end}.json".format( | |
start=meta[0]['daterange'][0].date().isoformat(), | |
end=meta[0]['daterange'][1].date().isoformat() | |
) | |
destination = os.path.join(DEFAULT_INCOMING_DIRECTORY, combined_file_name) | |
with open(destination, 'w') as o: | |
print("Writing combined file to {}\n".format(destination)) | |
print(json.dumps(orders, indent=2), file=o) | |
# Move the original CSV's into the archive, optionally deleting the old files | |
delete = False | |
for m in meta: | |
print("Copying {} to {}".format(m['filename'], m['new_path'])) | |
shutil.copyfile(m['filename'], m['new_path']) | |
if delete: | |
print("Deleting {}".format(m['filename'])) | |
os.remove(m['filename']) | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment