Skip to content

Instantly share code, notes, and snippets.

@metmajer
Created February 26, 2015 15:10
Show Gist options
  • Star 26 You must be signed in to star a gist
  • Fork 11 You must be signed in to fork a gist
  • Save metmajer/7da2f9599c2d75d80ee5 to your computer and use it in GitHub Desktop.
Save metmajer/7da2f9599c2d75d80ee5 to your computer and use it in GitHub Desktop.
Ansible Logging To Elasticsearch

Ansible Logging To Elasticsearch

Turns Ansible log outputs into plain JSON strings and sends them to an Elasticsearch cluster.

Place the script in your playbook's plugins/callbacks/ directory.

[defaults]
callback_plugins = plugins/callbacks/
# adapted from https://github.com/petems/ansible-json
import os
import time
import datetime
import httplib2
import json
import uuid
from datetime import datetime
from httplib2 import Http
from json import JSONEncoder
class CallbackModule(object):
@staticmethod
def convert_string_to_timestamp_ms(value, format):
dt = datetime.strptime(value, format)
return int(time.mktime(dt.timetuple()) * 1000 + round(dt.microsecond / 1000))
@staticmethod
def convert_ansible_datetime_string_to_timestamp_ms(value):
# Example input: '2015-02-23 13:48:21.316040'
return CallbackModule.convert_string_to_timestamp_ms(value, '%Y-%m-%d %H:%M:%S.%f')
@staticmethod
def convert_ansible_time_string_to_timestamp_ms(value):
# Example input: '0:00:00.004156'
return CallbackModule.convert_string_to_timestamp_ms('1970-01-01 0' + value, '%Y-%m-%d %H:%M:%S.%f')
def __init__(self):
self.uuid = uuid.uuid1()
self.http = Http()
self.http_headers = {'Content-type': 'application/json'}
self.http_method = 'PUT'
self.http_uri = 'http://localhost:9200/ansible-deployments-' + time.strftime('%Y') + '/playbook-run-' + str(self.uuid)
def json_create_message(self, host, status, res):
if type(res) == type(dict()):
if 'verbose_override' not in res:
for i in ['start', 'end']:
if i in res:
res[i] = self.convert_ansible_datetime_string_to_timestamp_ms(res[i])
for i in ['delta']:
if i in res:
res[i] = self.convert_ansible_time_string_to_timestamp_ms(res[i])
res.update({unicode('host'): unicode(host)})
res.update({unicode('status'): unicode(status)})
if 'start' in res:
timestamp = res['start']
else:
timestamp = int(time.time() * 1000)
res.update({'timestamp': timestamp})
return (str(timestamp), JSONEncoder().encode(res))
return (None, None)
def json_post_message(self, id, msg):
response, content = self.http.request(self.http_uri + '/' + id, self.http_method, msg, self.http_headers)
print('Data available at: ' + self.http_uri + '/' + id + '?pretty=1')
def json_post_result(self, host, status, res):
id, msg = self.json_create_message(host, status, res)
if id != None:
self.json_post_message(id, msg)
def on_any(self, *args, **kwargs):
pass
def runner_on_failed(self, host, res, ignore_errors=False):
self.json_post_result(host, 'failed', res)
def runner_on_ok(self, host, res):
self.json_post_result(host, 'ok', res)
def runner_on_error(self, host, msg):
json_post_result(host, 'error', {unicode('msg'): msg})
pass
def runner_on_skipped(self, host, item=None):
pass
def runner_on_unreachable(self, host, res):
self.json_post_result(host, 'unreachable', res)
def runner_on_no_hosts(self):
pass
def runner_on_async_poll(self, host, res, jid, clock):
self.json_post_result(host, 'async_poll', res)
def runner_on_async_ok(self, host, res, jid):
self.json_post_result(host, 'async_ok', res)
def runner_on_async_failed(self, host, res, jid):
self.json_post_result(host, 'async_failed', res)
def playbook_on_start(self):
pass
def playbook_on_notify(self, host, handler):
pass
def playbook_on_no_hosts_matched(self):
pass
def playbook_on_no_hosts_remaining(self):
pass
def playbook_on_task_start(self, name, is_conditional):
pass
def playbook_on_vars_prompt(self, varname, private=True, prompt=None, encrypt=None, confirm=False, salt_size=None, salt=None, default=None):
pass
def playbook_on_setup(self):
pass
def playbook_on_import_for_host(self, host, imported_file):
pass
def playbook_on_not_import_for_host(self, host, missing_file):
pass
def playbook_on_play_start(self, pattern):
pass
def playbook_on_stats(self, stats):
pass
@missnebun
Copy link

Hi there,

I am trying to use this call back script but I get this error:

[WARNING]: Failure when attempting to use callback plugin (</usr/share/ansible_plugins/callback_plugins/ansible_log_to_elasticsearch.CallbackModule object at
0x1ffca50>): runner_on_ok() takes exactly 3 arguments (2 given)

[WARNING]: Failure when attempting to use callback plugin (</usr/share/ansible_plugins/callback_plugins/inventory.CallbackModule object at 0x20197d0>):
runner_on_ok() takes exactly 3 arguments (2 given)

@athreyavc1
Copy link

Hi,

I faced the same issue,

I changed the

class CallbackModule(object): TO class CallbackModule(CallbackBase):

After that the error disappeared

ansible-playbook  -i hosts main.yml

PLAY [Checking for the file and creating it if not present] ********************

TASK [setup] *******************************************************************
ok: [NEO-LABLINUXSCRIPT]
Data available at: http://192.168.1.19:9200/ansible-deployments-2016/playbook-run-f3e7b2e2-e54e-11e5-92e3-005056b04d43/1457456306127?pretty=1

TASK [filetester : Check if a file exists] *************************************
ok: [NEO-LABLINUXSCRIPT]
Data available at: http://192.168.1.19:9200/ansible-deployments-2016/playbook-run-f3e7b2e2-e54e-11e5-92e3-005056b04d43/1457456308776?pretty=1

TASK [filetester : Display messages] *******************************************
skipping: [NEO-LABLINUXSCRIPT]

TASK [filetester : Copying files] **********************************************
changed: [NEO-LABLINUXSCRIPT]
Data available at: http://192.168.1.19:9200/ansible-deployments-2016/playbook-run-f3e7b2e2-e54e-11e5-92e3-005056b04d43/1457456310722?pretty=1

PLAY RECAP *********************************************************************
NEO-LABLINUXSCRIPT         : ok=3    changed=1    unreachable=0    failed=0

Hope it helps

@colbygk
Copy link

colbygk commented Mar 16, 2016

Might help to note that you'll need to import it:

from ansible.plugins.callback import CallbackBase

@xxKeoxx
Copy link

xxKeoxx commented Oct 1, 2020

I am confused by this. Does this feed data in to elastic using logstash or an ingest node?

@rsteenwyk
Copy link

I am confused by this. Does this feed data in to elastic using logstash or an ingest node?

No, this is making a call to a Elastic index directly.

@anuragkhuntia
Copy link

Working Code to avoid confusion

# adapted from https://github.com/petems/ansible-json
import os
import time
import datetime
import httplib2
import json
import uuid

from datetime import datetime
from httplib2 import Http
from json import JSONEncoder

class CallbackModule(CallbackBase):

  @staticmethod
  def convert_string_to_timestamp_ms(value, format):
    dt = datetime.strptime(value, format)
    return int(time.mktime(dt.timetuple()) * 1000 + round(dt.microsecond / 1000))

  @staticmethod
  def convert_ansible_datetime_string_to_timestamp_ms(value):
    # Example input: '2015-02-23 13:48:21.316040'
    return CallbackModule.convert_string_to_timestamp_ms(value, '%Y-%m-%d %H:%M:%S.%f')
  
  @staticmethod
  def convert_ansible_time_string_to_timestamp_ms(value):
    # Example input: '0:00:00.004156'
    return CallbackModule.convert_string_to_timestamp_ms('1970-01-01 0' + value, '%Y-%m-%d %H:%M:%S.%f')

  def __init__(self):
    self.uuid = uuid.uuid1()

    self.http = Http()
    self.http_headers = {'Content-type': 'application/json'}
    self.http_method = 'PUT'
    self.http_uri = 'http://localhost:9200/ansible-deployments-' + time.strftime('%Y') + '/playbook-run-' + str(self.uuid)

  def json_create_message(self, host, status, res):
    if type(res) == type(dict()):
      if 'verbose_override' not in res:

        for i in ['start', 'end']:
          if i in res:
            res[i] = self.convert_ansible_datetime_string_to_timestamp_ms(res[i])

        for i in ['delta']:
          if i in res:
            res[i] = self.convert_ansible_time_string_to_timestamp_ms(res[i])

        res.update({unicode('host'): unicode(host)})
        res.update({unicode('status'): unicode(status)})

        if 'start' in res:
          timestamp = res['start']
        else:
          timestamp = int(time.time() * 1000)

        res.update({'timestamp': timestamp})

        return (str(timestamp), JSONEncoder().encode(res))

    return (None, None)

  def json_post_message(self, id, msg):
    response, content = self.http.request(self.http_uri + '/' + id, self.http_method, msg, self.http_headers)
    print('Data available at: ' + self.http_uri + '/' + id + '?pretty=1')

  def json_post_result(self, host, status, res):
    id, msg = self.json_create_message(host, status, res)
    if id != None:
      self.json_post_message(id, msg)

  def on_any(self, *args, **kwargs):
    pass

  def runner_on_failed(self, host, res, ignore_errors=False):
    self.json_post_result(host, 'failed', res)

  def runner_on_ok(self, host, res):
    self.json_post_result(host, 'ok', res)

  def runner_on_error(self, host, msg):
    json_post_result(host, 'error', {unicode('msg'): msg})
    pass

  def runner_on_skipped(self, host, item=None):
    pass

  def runner_on_unreachable(self, host, res):
    self.json_post_result(host, 'unreachable', res)

  def runner_on_no_hosts(self):
    pass

  def runner_on_async_poll(self, host, res, jid, clock):
    self.json_post_result(host, 'async_poll', res)

  def runner_on_async_ok(self, host, res, jid):
    self.json_post_result(host, 'async_ok', res)

  def runner_on_async_failed(self, host, res, jid):
    self.json_post_result(host, 'async_failed', res)

  def playbook_on_start(self):
    pass

  def playbook_on_notify(self, host, handler):
    pass

  def playbook_on_no_hosts_matched(self):
    pass

  def playbook_on_no_hosts_remaining(self):
    pass

  def playbook_on_task_start(self, name, is_conditional):
    pass

  def playbook_on_vars_prompt(self, varname, private=True, prompt=None, encrypt=None, confirm=False, salt_size=None, salt=None, default=None):
    pass

  def playbook_on_setup(self):
    pass

  def playbook_on_import_for_host(self, host, imported_file):
    pass

  def playbook_on_not_import_for_host(self, host, missing_file):
    pass

  def playbook_on_play_start(self, pattern):
    pass

  def playbook_on_stats(self, stats):
    pass

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment