Skip to content

Instantly share code, notes, and snippets.

@randerzander
Created October 11, 2017 19:27
Show Gist options
  • Save randerzander/b7ef272123cbfcb4b5422abc620bbb92 to your computer and use it in GitHub Desktop.
Save randerzander/b7ef272123cbfcb4b5422abc620bbb92 to your computer and use it in GitHub Desktop.
from org.apache.commons.io import IOUtils
from java.nio.charset import StandardCharsets
from org.apache.nifi.processor.io import StreamCallback
import os, sys, imp, traceback, time
from urlparse import urlparse
parser_path = '/home/dev/projects/scripts/parsers/'
class PyStreamCallback(StreamCallback):
def __init__(self, result):
self.result = result
def process(self, instream, outstream):
outstream.write(self.result)
def fail(flowfile, err):
flowfile = session.putAttribute(flowfile, 'parse.error', err)
session.transfer(flowfile, REL_FAILURE)
def process(flowfile):
url = urlparse(flowfile.getAttribute('source.url'))
tld = url.hostname.split('.')[-2]
parser = tld
path = parser_path + parser + '.py'
# load the parser if it has been updated
if parser not in sys.modules or os.path.getmtime(path) > sys.modules[parser].loaded_at:
try:
module = imp.load_source(parser, path)
module.loaded_at = int(time.time())
except:
fail(flowfile, 'Loading Module: ' + traceback.format_exc())
return
parse_module = sys.modules[parser]
# Read flowfile content
data = {}
instream = session.read(flowfile)
if hasattr(parse_module, 'format') and parse_module.format.lower() == 'binary':
data['content'] = IOUtils.toByteArray(instream)
else:
data['content'] = IOUtils.toString(instream, StandardCharsets.UTF_8)
instream.close()
# Attempt to parse
try:
if hasattr(parse_module, 'attributes'):
for attribute in parse_module.attributes:
data[attribute] = flowfile.getAttribute(attribute)
result = parse_module.parse(data)
# Copy returned fields into flowfile attributes
for attr in result.keys():
if attr != 'content':
flowfile = session.putAttribute(flowfile, attr, result[attr])
# If content is a list, emit one new flowfile per entry
if type(result['content']) is list:
for content in result['content']:
child_flowfile = session.create(flowfile)
child_flowfile = session.write(child_flowfile, PyStreamCallback(content))
session.transfer(child_flowfile, REL_SUCCESS)
session.remove(flowfile)
else:
flowfile = session.write(flowfile, PyStreamCallback(result['content']))
session.transfer(flowfile, REL_SUCCESS)
except:
fail(flowfile, 'Parsing: ' + traceback.format_exc())
# Execution starts here
flowfile = session.get()
if (flowfile != None): process(flowfile)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment