Skip to content

Instantly share code, notes, and snippets.

@brookskindle
Created November 20, 2018 03:10
Show Gist options
  • Save brookskindle/fe1fdffe622eb0cd8995d7d1d68d6b8f to your computer and use it in GitHub Desktop.
Save brookskindle/fe1fdffe622eb0cd8995d7d1d68d6b8f to your computer and use it in GitHub Desktop.
Populate a Postgres table with information from HOTSLogs
#!/usr/bin/env python
"""
Build a heroes of the storm database
"""
import csv
import logging
import time
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, Boolean, create_engine
from sqlalchemy.orm import sessionmaker
logging.basicConfig(level=logging.DEBUG)
engine = create_engine("postgresql://brooks:@localhost/hots")
Session = sessionmaker(bind=engine)
Base = declarative_base()
OBJECTS_PER_COMMIT = 100_000
class Character(Base):
__tablename__ = "character"
id = Column(Integer, primary_key=True)
replay_id = Column(Integer, nullable=False)
is_winner = Column(Boolean, nullable=False)
team_level = Column(Integer, nullable=False)
exp_contribution = Column(Integer, nullable=False)
def __str__(self):
s = (
"<Character: Replay={}, Winner={}, Lvl={}, Exp={}>"
.format(self.replay_id, self.is_winner, self.team_level,
self.exp_contribution
)
)
return s
def __repr__(self):
return str(self)
@staticmethod
def from_dict(d):
"""Returns a new Character from a dict"""
c = Character()
c.replay_id = d["ReplayID"]
c.is_winner = bool(int(d["Is Winner"]))
c.team_level = d["In Game Level"]
c.exp_contribution = d["Experience Contribution"]
return c
@staticmethod
def as_dict(d):
"""
Given a verbose dictionary, return the dictionary keys for a character
"""
newdict = {
"replay_id": d["ReplayID"],
"is_winner": bool(int(d["Is Winner"])),
"team_level": d["In Game Level"],
"exp_contribution": d["Experience Contribution"],
}
return newdict
def bulk_insert(dictionaries):
"""Bulk insert the given character dictionaries into the database
We only care about a small subset of columns, so we're going to ignore the
rest
"""
# Using a traditional add_all and commit() takes ~1.2s per 1,000 inserts.
# This is much too slow to insert 12million rows
# session = Session()
# session.add_all([Character.from_dict(d) for d in dictionaries])
# session.commit()
# Perform a bulk insert instead
# https://stackoverflow.com/a/34344200
engine.execute(
Character.__table__.insert(),
[Character.as_dict(d) for d in dictionaries],
)
def main():
logger = logging.getLogger(__name__)
# Base.metadata.drop_all(engine) # TODO: only use this when testing
Base.metadata.create_all(engine)
filename = "/home/brooks/data/hots/ReplayCharacters.csv"
with open(filename) as file:
logger.debug(f"Successfully opened {filename}")
reader = csv.DictReader(file)
lines_to_insert = []
start = time.time()
per_commit_start = time.time()
for i, line in enumerate(reader):
lines_to_insert.append(line)
if i != 0 and i % OBJECTS_PER_COMMIT == 0:
bulk_insert(lines_to_insert)
lines_to_insert = []
end = time.time()
logger.debug(
f"Insert {OBJECTS_PER_COMMIT} objs in "
f"{end - per_commit_start:.3f}s "
f"({end - start:.3f}s total for {i} inserts)."
)
per_commit_start = time.time()
bulk_insert(lines_to_insert)
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment