Skip to content

Instantly share code, notes, and snippets.

@mitchellrj
Last active September 2, 2023 08:53
Show Gist options
  • Save mitchellrj/86722a2120d66abefd7d6eecc390b4fc to your computer and use it in GitHub Desktop.
Save mitchellrj/86722a2120d66abefd7d6eecc390b4fc to your computer and use it in GitHub Desktop.
Basic Prometheus exporter and REST API for a single sensor from the LeChacal RPICT hat.
#!env python3
#
# Usage: rpict_exporter.py DEVICE DEVICE_TYPE [HOST] [PORT] [SENSOR NAMES...]
# Example usage: rpict_exporter.py /dev/ttyAMA0 RPICT3T1 0.0.0.0 9999
#
# MIT License
#
# Copyright (c) 2021 Richard Mitchell
#
# Permission is hereby granted, free of charge, to any person obtaining a copy of this
# software and associated documentation files (the "Software"), to deal in the Software
# without restriction, including without limitation the rights to use, copy, modify,
# merge, publish, distribute, sublicense, and/or sell copies of the Software, and to
# permit persons to whom the Software is furnished to do so, subject to the following
# conditions:
#
# The above copyright notice and this permission notice (including the next paragraph)
# shall be included in all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED,
# INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A
# PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
# HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF
# CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE
# OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
import enum
import json
import logging
import signal
import sys
import threading
import time
import flask
import serial
global reader
reader = None
logger = logging.getLogger()
logging.getLogger("werkzeug").disabled = True
logger.setLevel(logging.INFO)
class DeviceType(enum.Enum):
RPICT3V1 = 'RPICT3V1'
RPICT3T1 = 'RPICT3T1'
RPICT4T4 = 'RPICT4T4'
RPICT7V1 = 'RPICT7V1'
RPICT4V3 = 'RPICT4V3'
RPICT8 = 'RPICT8'
RPIZCT4V3T2 = 'RPIZCT4V3T2'
RPIZ_CT3V1 = 'RPIZ_CT3V1'
RPIZ_CT3T1 = 'RPIZ_CT3T1'
DEVICE_TYPE_LABELS = {
DeviceType.RPICT3V1: ([
{
'__name__': 'power_w',
},
] * 3) + ([
{
'__name__': 'irms_ma',
}
] * 3) + [
{
'__name__': 'vrms_v',
}
],
DeviceType.RPICT3T1: ([
{
'__name__': 'estimated_power_w',
},
] * 3) + [
{
'__name__': 'temperature_c',
}
],
DeviceType.RPICT4T4: ([
{
'__name__': 'estimated_power_w',
},
] * 4) + ([
{
'__name__': 'temperature_c',
}
] * 4),
DeviceType.RPICT7V1: ([
{
'__name__': 'power_w',
},
] * 7) + ([
{
'__name__': 'irms_ma',
}
] * 7) + [
{
'__name__': 'vrms_v',
}
],
DeviceType.RPICT4V3: ([
{
'__name__': 'vrms_v',
},
] * 4) + ([
{
'__name__': 'power_w',
}
] * 4) + ([
{
'__name__': 'irms_ma',
}
] * 4) + ([
{
'__name__': 'power_factor',
}
] * 4),
DeviceType.RPICT8: ([
{
'__name__': 'estimated_power_w',
},
] * 8),
DeviceType.RPIZCT4V3T2: ([
{
'__name__': 'vrms_v',
},
] * 4) + ([
{
'__name__': 'power_w',
}
] * 4) + ([
{
'__name__': 'power_factor',
}
] * 4) + ([
{
'__name__': 'irms_ma',
}
] * 4) + [
{
'__name__': 'temperature_c',
'sensor_type': 'RTD_thermocouple',
}
] + [ # special case where all remaining values are of this type is accounted for below
{
'__name__': 'temperature_c',
'sensor_type': 'DS18B20',
}
],
}
DEVICE_TYPE_LABELS[DeviceType.RPIZ_CT3V1] = DEVICE_TYPE_LABELS[DeviceType.RPICT3V1]
DEVICE_TYPE_LABELS[DeviceType.RPIZ_CT3T1] = DEVICE_TYPE_LABELS[DeviceType.RPICT3T1]
DEFAULT_DEVICE = '/dev/ttyAMA0'
DEFAULT_DEVICE_TYPE = DeviceType.RPICT3V1
DEFAULT_HOST = '127.0.0.1'
DEFAULT_PORT = 9999
DEFAULT_TIMEOUT = 5
class Reader(threading.Thread):
def __init__(self, device, device_type, sensor_names, timeout):
threading.Thread.__init__(self)
self.device = device
self.device_type = device_type
self.sensor_names = sensor_names
self.timeout = timeout
self.labels = []
self.node_id = None
self.last_updated = None
self.values = []
self._exit = False
self._super_signal_handlers = {
signal.SIGINT: signal.signal(signal.SIGINT, self.stop),
signal.SIGTERM: signal.signal(signal.SIGTERM, self.stop),
signal.SIGHUP: signal.signal(signal.SIGHUP, self.stop),
}
def run(self):
self._exit = False
self.ser = serial.Serial(self.device, 38400, timeout=self.timeout)
while self.ser.is_open:
if self._exit:
logger.info('Reader thread stop requested')
break
line = self.ser.readline()
if not line:
logger.debug('Missed readline')
continue
parts = line.decode('ascii').strip().split()
self.node_id = parts[0]
self.values = list(map(float, parts[1:]))
# only set after values decoded
self.last_updated = time.time()
else:
logger.info('Serial port closed unexpectedly')
self.ser.close()
logger.info('Reader thread stopped')
def stop(self, s=None, f=None):
if s is not None:
logger.info('Received {}, reader thread stopping'.format(s))
self._exit = True
self.ser.cancel_read()
if s is not None:
if self._super_signal_handlers[s] is not None:
self._super_signal_handlers[s](s, f)
self.ser.close()
def prometheus_metrics(self):
if self.node_id is None:
return None
device_labels = DEVICE_TYPE_LABELS[self.device_type]
last_updated = self.last_updated
for i, value in enumerate(self.values):
if i > len(device_labels):
labels = dict(device_labels[-1])
else:
labels = dict(device_labels[i])
if i < len(self.sensor_names) and self.sensor_names[i]:
labels['name'] = self.sensor_names[i]
else:
labels['name'] = 'sensor_{}'.format(i + 1)
labels['node_id'] = reader.node_id
metric_name = labels.pop('__name__')
yield '{metric_name}{{{labels}}} {value}'.format(
metric_name=metric_name,
value=value,
labels=','.join(
'{}="{}"'.format(k, repr(v)[1:-1])
for k, v in labels.items()
)
)
yield 'last_updated{{node_id="{}"}} {}'.format(self.node_id, last_updated)
def json(self):
if self.node_id is None:
return '{}'
return json.dumps({
'last_updated': self.last_updated,
'node_id': self.node_id,
'sensors': self.values
})
app = flask.Flask(__name__)
@app.route('/shutdown')
def shutdown():
reader.stop()
func = flask.request.environ.get('werkzeug.server.shutdown', lambda: None)
func()
return 'Shutting down...'
@app.route('/metrics')
def prometheus():
body = list(reader.prometheus_metrics())
try:
response = flask.make_response('\n'.join(body))
except Exception:
response = flask.make_response('')
response.status_code = 503
response.headers['Content-Type'] = 'text/plain'
return response
@app.route('/')
def rest():
try:
response = flask.make_response(reader.json())
except Exception as e:
logger.exception(e)
response = flask.make_response(json.dumps({}))
response.status_code = 503
response.headers['Content-Type'] = 'application/json'
return response
if __name__ == '__main__':
device = DEFAULT_DEVICE
device_type = DEFAULT_DEVICE_TYPE
host = DEFAULT_HOST
port = DEFAULT_PORT
if len(sys.argv) > 1:
device = sys.argv[1]
if len(sys.argv) > 2:
device_type = DeviceType(sys.argv[2])
if len(sys.argv) > 3:
host = sys.argv[3]
if len(sys.argv) > 4:
port = sys.argv[4]
sensor_names = sys.argv[5:]
reader = Reader(device, device_type, sensor_names, timeout=DEFAULT_TIMEOUT)
reader.start()
app.run(host=host, port=port)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment