Skip to content

Instantly share code, notes, and snippets.

@lambdamusic
Created March 7, 2019 15:36
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save lambdamusic/029c88483922604c18f84e5f164e09a6 to your computer and use it in GitHub Desktop.
Save lambdamusic/029c88483922604c18f84e5f164e09a6 to your computer and use it in GitHub Desktop.
#!/usr/bin/python
# -*- coding: utf-8 -*-
"""
iterate a bucket in s3 containing JSON log files
get each file, massage json a little and uploade to elasticsearc
"""
import json
import os
import click
from datetime import datetime, date, timedelta
from elasticsearch import Elasticsearch
from smart_open import smart_open, s3_iter_bucket
import boto
import logging
logging.getLogger("smart_open").setLevel(logging.WARNING)
logging.getLogger("elasticsearch").setLevel(logging.WARNING)
logging.basicConfig(format='%(asctime)s %(message)s', datefmt='03/07/2019 01:28:39 pm', level=logging.DEBUG, filename='logrun_dsl_data_loader.log',)
DEFAULT_HOST = "http://127.0.0.1:9200/"
DEFAULT_INDEX = "dsl_logs_test"
def pretty_json(dict):
return json.dumps(dict, indent=4, sort_keys=True)
class Document(object):
"""Abstraction of an ES document"""
def __init__(self, pk=None, doc_type=None, index_name=DEFAULT_INDEX, host_name=DEFAULT_HOST):
super(Document, self).__init__()
self.raw_data = None
self.doc_type = doc_type or "doc"
self.id = pk
self.index_name = index_name
self.host_name = host_name
self.ES = Elasticsearch(hosts=self.host_name)
if self.id:
self.add()
def get(self, pk=None):
"Retrieve a doc from the index based on its ID"
if pk:
self.id = pk
elif not self.id:
print("Specify an ID")
return
self.raw_data = self.ES.get(index=self.index_name, id=self.id)
def get_val(self, prop_name):
"Get values for a loaded document, based on a specific property. Lazy and friendly."
if self.raw_data:
try:
return rdf_data[guess_prop_name]
except:
pass
print("Property not found")
else:
print("No document is loaded")
return None
def add(self, json_doc, doc_type=None, _id=""):
"Add a JSON document to the ES Index"
if not doc_type:
doc_type = self.doc_type
res = self.ES.index(index=self.index_name, doc_type=doc_type, id=_id, body=json_doc)
if False: # doing this on many inserts will blow up the terminal
print(pretty_json(res))
def delete(self, id):
res = self.ES.delete(index=index_name, id=id)
print(pretty_json(res))
def delete_index(self, index_name=None):
if not index_name:
index_name = self.index_name
res = self.ES.indices.delete(index=index_name)
print(pretty_json(res))
def print_raw_data(self):
"Returns full JSON document as stored in ES"
print(pretty_json(self.raw_data))
def parse_and_index_data(file_contents, index_name, host_name=None, source_path=None, test_mode=False):
"""
take the bytes contents of an S3 file, split it into lines, then decode it so that the JSON can be parsed and
data sent to Elastisearch
Each file is a sequence of log data, organized into 2 lines: the access token/timestamp, and the proper logs data.
Example source_path for s3:
# root_bucket/2018/11/15/17/20181115T173328.706Z_aea4ce56f807ce3fbb669898439ee4b9.json
"""
if not host_name:
host_name = DEFAULT_HOST
doc = Document(index_name=index_name, host_name=host_name)
counter = 0
for line in file_contents.splitlines():
if type(line) == bytes:
j = json.loads(line.decode())
else:
j = json.loads(line) # this is only when reading form local file for testing..
if "token" in j:
# timestamp line
ml_timestamp = j['received']
t = datetime.fromtimestamp(ml_timestamp//1000.0)
else:
# data line => get all of it
counter += 1
j['timestamp'] = t
# print(j)
if source_path:
_id = source_path.split("/")[-1].rstrip(".json") + "-%d" % counter
else:
_id = ""
if not test_mode:
# click.secho("... adding data from file " + str(source_path), fg="green")
logging.info("\n... scanning row[%d] of file " % counter + str(source_path))
doc.add(j, _id=_id)
def iterate_s3_logs(index_name, host_name="", start_date="", end_date=""):
"""
Go through all log files in S3, start from a selected location
Location is given by the year/month/day folder structure, in ascending date order.
Example source_path for s3:
# root_bucket/2018/11/15/17/20181115T173328.706Z_aea4ce56f807ce3fbb669898439ee4b9.json
<start_date> : used to determine what subfolders in S3 to start from eg 2018-01-01
<end_date> : used to determine what subfolders in S3 to end eg 2018-01-02
NOTE the end_date folder is not processed, that's where the iteration stops.
"""
logging.info("\n========\nScript <iterate_s3_logs> Started Running \n=========\n")
# default = yesterday, today
d1, delta = _date_range_from_strings(start_date, end_date)
# this has to be the top level bucket
bucket = boto.connect_s3().get_bucket('com-uberresearch-sematext-logs')
logging.info("\n connected to s3 \n")
for i in range(delta.days): # this goes up to d2-1, so excludes the end_date !
step = d1 + timedelta(i)
bucket_prefix = step.strftime("sematext_24b2ef8d/%Y/%m/%d/") # must end with slash
click.secho("..trying to connect to s3 folder: %s" % bucket_prefix, fg="blue")
logging.info("\n..trying to connect to s3 folder: %s" % bucket_prefix)
# iterate only through one dir at a time
for key, content in s3_iter_bucket(bucket, prefix=bucket_prefix, workers=16):
click.secho(">>>>> File: " + key + str(len(content)), fg="green")
parse_and_index_data(content, index_name, host_name, key)
def _date_range_from_strings(start_date, end_date):
"""
From two string-dates formatted as YYYY-MM-DD
Return a delta date_object which can be iterated on
<start_date> : used to determine what subfolders in S3 to start from eg 2018-01-01
=> default: YESTERDAY
<end_date> : used to determine what subfolders in S3 to end eg 2018-01-02
=> default: TODAY
"""
# the following is redundant, added just as an example to simulate receiving a date-string
if not start_date:
yesterday = date.today() - timedelta(1)
start_date = yesterday.strftime("%Y-%m-%d")
if not end_date:
end_date = date.today().strftime("%Y-%m-%d")
click.secho("START_DATE = %s //// END_DATE = %s " % (start_date, end_date), fg="red")
logging.info("\n======= START_DATE = %s //// END_DATE = %s " % (start_date, end_date))
d1 = datetime.strptime(start_date, '%Y-%m-%d').date()
d2 = datetime.strptime(end_date, '%Y-%m-%d').date()
return (d1, d2 - d1)
@click.command()
@click.option('--start_date', help='Start date eg 2018-11-15 (default=yesterday) ')
@click.option('--delete', is_flag=True, help='Delete selected index (view source)')
def search_test(start_date, test_local, delete):
if delete:
#
# DELETE INDEX
#
HOST, INDEX = DEFAULT_HOST, "logs_from_s3" # manually update for deleting!
d = Document(index_name=INDEX)
if click.confirm('Do you want to delete <%s/%s>?' % (HOST, INDEX)):
d.delete_index()
click.echo('Goodbye')
else:
#
# LOAD FROM S3
#
HOST, INDEX = DEFAULT_HOST, "logs_from_s3"
iterate_s3_logs(INDEX, HOST, start_date)
if __name__ == '__main__':
search_test()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment