Created
March 7, 2019 15:36
-
-
Save lambdamusic/029c88483922604c18f84e5f164e09a6 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python | |
# -*- coding: utf-8 -*- | |
""" | |
iterate a bucket in s3 containing JSON log files | |
get each file, massage json a little and uploade to elasticsearc | |
""" | |
import json | |
import os | |
import click | |
from datetime import datetime, date, timedelta | |
from elasticsearch import Elasticsearch | |
from smart_open import smart_open, s3_iter_bucket | |
import boto | |
import logging | |
logging.getLogger("smart_open").setLevel(logging.WARNING) | |
logging.getLogger("elasticsearch").setLevel(logging.WARNING) | |
logging.basicConfig(format='%(asctime)s %(message)s', datefmt='03/07/2019 01:28:39 pm', level=logging.DEBUG, filename='logrun_dsl_data_loader.log',) | |
DEFAULT_HOST = "http://127.0.0.1:9200/" | |
DEFAULT_INDEX = "dsl_logs_test" | |
def pretty_json(dict): | |
return json.dumps(dict, indent=4, sort_keys=True) | |
class Document(object): | |
"""Abstraction of an ES document""" | |
def __init__(self, pk=None, doc_type=None, index_name=DEFAULT_INDEX, host_name=DEFAULT_HOST): | |
super(Document, self).__init__() | |
self.raw_data = None | |
self.doc_type = doc_type or "doc" | |
self.id = pk | |
self.index_name = index_name | |
self.host_name = host_name | |
self.ES = Elasticsearch(hosts=self.host_name) | |
if self.id: | |
self.add() | |
def get(self, pk=None): | |
"Retrieve a doc from the index based on its ID" | |
if pk: | |
self.id = pk | |
elif not self.id: | |
print("Specify an ID") | |
return | |
self.raw_data = self.ES.get(index=self.index_name, id=self.id) | |
def get_val(self, prop_name): | |
"Get values for a loaded document, based on a specific property. Lazy and friendly." | |
if self.raw_data: | |
try: | |
return rdf_data[guess_prop_name] | |
except: | |
pass | |
print("Property not found") | |
else: | |
print("No document is loaded") | |
return None | |
def add(self, json_doc, doc_type=None, _id=""): | |
"Add a JSON document to the ES Index" | |
if not doc_type: | |
doc_type = self.doc_type | |
res = self.ES.index(index=self.index_name, doc_type=doc_type, id=_id, body=json_doc) | |
if False: # doing this on many inserts will blow up the terminal | |
print(pretty_json(res)) | |
def delete(self, id): | |
res = self.ES.delete(index=index_name, id=id) | |
print(pretty_json(res)) | |
def delete_index(self, index_name=None): | |
if not index_name: | |
index_name = self.index_name | |
res = self.ES.indices.delete(index=index_name) | |
print(pretty_json(res)) | |
def print_raw_data(self): | |
"Returns full JSON document as stored in ES" | |
print(pretty_json(self.raw_data)) | |
def parse_and_index_data(file_contents, index_name, host_name=None, source_path=None, test_mode=False): | |
""" | |
take the bytes contents of an S3 file, split it into lines, then decode it so that the JSON can be parsed and | |
data sent to Elastisearch | |
Each file is a sequence of log data, organized into 2 lines: the access token/timestamp, and the proper logs data. | |
Example source_path for s3: | |
# root_bucket/2018/11/15/17/20181115T173328.706Z_aea4ce56f807ce3fbb669898439ee4b9.json | |
""" | |
if not host_name: | |
host_name = DEFAULT_HOST | |
doc = Document(index_name=index_name, host_name=host_name) | |
counter = 0 | |
for line in file_contents.splitlines(): | |
if type(line) == bytes: | |
j = json.loads(line.decode()) | |
else: | |
j = json.loads(line) # this is only when reading form local file for testing.. | |
if "token" in j: | |
# timestamp line | |
ml_timestamp = j['received'] | |
t = datetime.fromtimestamp(ml_timestamp//1000.0) | |
else: | |
# data line => get all of it | |
counter += 1 | |
j['timestamp'] = t | |
# print(j) | |
if source_path: | |
_id = source_path.split("/")[-1].rstrip(".json") + "-%d" % counter | |
else: | |
_id = "" | |
if not test_mode: | |
# click.secho("... adding data from file " + str(source_path), fg="green") | |
logging.info("\n... scanning row[%d] of file " % counter + str(source_path)) | |
doc.add(j, _id=_id) | |
def iterate_s3_logs(index_name, host_name="", start_date="", end_date=""): | |
""" | |
Go through all log files in S3, start from a selected location | |
Location is given by the year/month/day folder structure, in ascending date order. | |
Example source_path for s3: | |
# root_bucket/2018/11/15/17/20181115T173328.706Z_aea4ce56f807ce3fbb669898439ee4b9.json | |
<start_date> : used to determine what subfolders in S3 to start from eg 2018-01-01 | |
<end_date> : used to determine what subfolders in S3 to end eg 2018-01-02 | |
NOTE the end_date folder is not processed, that's where the iteration stops. | |
""" | |
logging.info("\n========\nScript <iterate_s3_logs> Started Running \n=========\n") | |
# default = yesterday, today | |
d1, delta = _date_range_from_strings(start_date, end_date) | |
# this has to be the top level bucket | |
bucket = boto.connect_s3().get_bucket('com-uberresearch-sematext-logs') | |
logging.info("\n connected to s3 \n") | |
for i in range(delta.days): # this goes up to d2-1, so excludes the end_date ! | |
step = d1 + timedelta(i) | |
bucket_prefix = step.strftime("sematext_24b2ef8d/%Y/%m/%d/") # must end with slash | |
click.secho("..trying to connect to s3 folder: %s" % bucket_prefix, fg="blue") | |
logging.info("\n..trying to connect to s3 folder: %s" % bucket_prefix) | |
# iterate only through one dir at a time | |
for key, content in s3_iter_bucket(bucket, prefix=bucket_prefix, workers=16): | |
click.secho(">>>>> File: " + key + str(len(content)), fg="green") | |
parse_and_index_data(content, index_name, host_name, key) | |
def _date_range_from_strings(start_date, end_date): | |
""" | |
From two string-dates formatted as YYYY-MM-DD | |
Return a delta date_object which can be iterated on | |
<start_date> : used to determine what subfolders in S3 to start from eg 2018-01-01 | |
=> default: YESTERDAY | |
<end_date> : used to determine what subfolders in S3 to end eg 2018-01-02 | |
=> default: TODAY | |
""" | |
# the following is redundant, added just as an example to simulate receiving a date-string | |
if not start_date: | |
yesterday = date.today() - timedelta(1) | |
start_date = yesterday.strftime("%Y-%m-%d") | |
if not end_date: | |
end_date = date.today().strftime("%Y-%m-%d") | |
click.secho("START_DATE = %s //// END_DATE = %s " % (start_date, end_date), fg="red") | |
logging.info("\n======= START_DATE = %s //// END_DATE = %s " % (start_date, end_date)) | |
d1 = datetime.strptime(start_date, '%Y-%m-%d').date() | |
d2 = datetime.strptime(end_date, '%Y-%m-%d').date() | |
return (d1, d2 - d1) | |
@click.command() | |
@click.option('--start_date', help='Start date eg 2018-11-15 (default=yesterday) ') | |
@click.option('--delete', is_flag=True, help='Delete selected index (view source)') | |
def search_test(start_date, test_local, delete): | |
if delete: | |
# | |
# DELETE INDEX | |
# | |
HOST, INDEX = DEFAULT_HOST, "logs_from_s3" # manually update for deleting! | |
d = Document(index_name=INDEX) | |
if click.confirm('Do you want to delete <%s/%s>?' % (HOST, INDEX)): | |
d.delete_index() | |
click.echo('Goodbye') | |
else: | |
# | |
# LOAD FROM S3 | |
# | |
HOST, INDEX = DEFAULT_HOST, "logs_from_s3" | |
iterate_s3_logs(INDEX, HOST, start_date) | |
if __name__ == '__main__': | |
search_test() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment