commit 6f59fa0b0a2c84b8326f3cb8f237ffa5372e571a Author: Knut Ahlers Date: Sat Jul 11 18:20:49 2020 +0200 Initial version diff --git a/.dockerignore b/.dockerignore new file mode 100644 index 0000000..85c55eb --- /dev/null +++ b/.dockerignore @@ -0,0 +1,2 @@ +.env +.venv diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..3c1dad2 --- /dev/null +++ b/.gitignore @@ -0,0 +1,3 @@ +config.yml +.env +.venv diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..4e5df4d --- /dev/null +++ b/Dockerfile @@ -0,0 +1,13 @@ +FROM python:alpine + +COPY requirements.txt /src/requirements.txt +WORKDIR /src + +RUN set -ex \ + && pip install -r /src/requirements.txt + +VOLUME ["/config"] +ENV CONFIG_PATH=/config/config.yml + +COPY . /src +CMD ["/usr/local/bin/python3", "main.py"] diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..745b174 --- /dev/null +++ b/Makefile @@ -0,0 +1,11 @@ +default: + +clean: + rm -rf .venv + +freeze: testenv + ./.venv/bin/pip freeze >requirements.txt + +testenv: clean + python -m venv .venv + ./.venv/bin/pip install -r requirements.txt diff --git a/config_sample.yml b/config_sample.yml new file mode 100644 index 0000000..f21acfb --- /dev/null +++ b/config_sample.yml @@ -0,0 +1,21 @@ +--- + +influx_db: 'mqtt_map' + +subscriptions: + + tele/kettle/SENSOR: + - metric: power + tags: + device: kettle + transform: !lambda "x: json.loads(x)['ENERGY']['Power']" + - metric: current + tags: + device: kettle + transform: !lambda "x: json.loads(x)['ENERGY']['Current']" + - metric: voltage + tags: + device: kettle + transform: !lambda "x: json.loads(x)['ENERGY']['Voltage']" + +... diff --git a/influx.py b/influx.py new file mode 100644 index 0000000..8d081ab --- /dev/null +++ b/influx.py @@ -0,0 +1,33 @@ +from influxdb import InfluxDBClient +import vault + + +class Influx(): + + def __init__(self, database): + cfg = vault.read_data('secret/mqtt2influx/influxdb') + + self.database = database + self.client = InfluxDBClient(cfg['host'], cfg['port'], + cfg['user'], cfg['pass'], self.database) + + def submit(self, body): + """ + submit("mydatabase", [ + { + "measurement": "cpu_load_short", + "tags": { + "host": "server01", + "region": "us-west" + }, + "time": "2009-11-10T23:00:00Z", + "fields": { + "Float_value": 0.64, + "Int_value": 3, + "String_value": "Text", + "Bool_value": True + } + } + ]) + """ + self.client.write_points(body, database=self.database) diff --git a/main.py b/main.py new file mode 100644 index 0000000..a9a28bd --- /dev/null +++ b/main.py @@ -0,0 +1,104 @@ +import logging +import os +import paho.mqtt.client as mqtt +import yaml + +from yaml_lambda import * +from influx import Influx +import vault + + +class MQTT2InfluxDB(): + + def __init__(self, config_path='config.yml'): + with open(config_path) as cfg_file: + self.config = yaml.safe_load(cfg_file) + + self.influx = Influx(self.obj_get( + self.config, 'influx_db', 'mqtt2influxdb')) + + def obj_get(self, obj, key, default=None): + if key in obj: + return obj[key] + return default + + def on_connect(self, client, userdata, flags, rc): + topics = [(topic, 1) for topic in self.config['subscriptions'].keys()] + result, _ = client.subscribe(topics) + + if result != mqtt.MQTT_ERR_SUCCESS: + raise(Exception('MQTT subscribe failed: {}'.format(result))) + + logging.info('MQTT connected and subscribed') + + def on_message(self, client, userdata, msg): + points = [] + + for processor in self.obj_get(self.config['subscriptions'], msg.topic, []): + tffn = self.obj_get(processor, 'transform', + YAMLLambda('x: float(x)')) + value = tffn.run(msg.payload) + + points.append({ + 'measurement': processor['metric'], + 'tags': self.obj_get(processor, 'tags', {}), + 'fields': { + 'value': value, + }, + }) + + logging.debug( + 'MQTT Message received: topic={topic} metric={metric} value={value}'.format( + topic=msg.topic, + metric=processor['metric'], + value=value, + )) + + if len(points) > 0: + self.influx.submit(points) + + def run(self): + client = mqtt.Client() + client.enable_logger() + client.on_connect = self.on_connect + client.on_message = self.on_message + + mqtt_config = vault.read_data('secret/mqtt2influx/mqtt') + + client.username_pw_set( + self.obj_get(mqtt_config, 'user', self.obj_get( + os.environ, 'MQTT_USER')), + self.obj_get(mqtt_config, 'pass', self.obj_get( + os.environ, 'MQTT_PASS')), + ) + + logging.debug('Connecting to MQTT broker...') + + client.connect( + self.obj_get(mqtt_config, 'host', self.obj_get( + os.environ, 'MQTT_HOST')), + port=self.obj_get(mqtt_config, 'port', self.obj_get( + os.environ, 'MQTT_PORT', 1883)), + keepalive=10, + ) + + client.loop_forever() + + +if __name__ == '__main__': + loglevel = logging.INFO + if 'LOG_LEVEL' in os.environ and os.environ['LOG_LEVEL'] == 'DEBUG': + loglevel = logging.DEBUG + + configpath = 'config.yml' + if 'CONFIG_PATH' in os.environ: + configpath = os.environ['CONFIG_PATH'] + + logging.basicConfig( + datefmt='%m/%d/%Y %I:%M:%S %p', + format='[%(asctime)s][%(levelname)s] %(message)s', + level=loglevel, + ) + + inst = MQTT2InfluxDB(configpath) + inst.run() diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..745a862 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,4 @@ +paho-mqtt +PyYAML +hvac +influxdb diff --git a/vault.py b/vault.py new file mode 100644 index 0000000..70c4e39 --- /dev/null +++ b/vault.py @@ -0,0 +1,19 @@ +import hvac +import os + +if not 'VAULT_ADDR' in os.environ or not 'VAULT_ROLE_ID' in os.environ: + raise Exception('VAULT_ADDR or VAULT_ROLE_ID are missing') + +vault = hvac.Client(os.environ['VAULT_ADDR']) +auth = vault.auth_approle(os.environ['VAULT_ROLE_ID']) +if 'auth' in auth and 'client_token' in auth['auth']: + vault.token = auth['auth']['client_token'] +else: + raise Exception('Authorization to Vault failed!') + + +def read_data(key): + resp = vault.read(key) + if 'data' not in resp: + raise Exception('Unable to read configuration') + return resp['data'] diff --git a/yaml_lambda.py b/yaml_lambda.py new file mode 100644 index 0000000..42eacf7 --- /dev/null +++ b/yaml_lambda.py @@ -0,0 +1,24 @@ +import json +import yaml + + +class YAMLLambda(yaml.YAMLObject): + yaml_tag = '!lambda' + + def __init__(self, lambda_code): + self.code = lambda_code + + def run(self, in_value): + return eval('lambda {}'.format(self.code))(in_value) + + @classmethod + def from_yaml(cls, loader, node): + return YAMLLambda(node.value) + + @classmethod + def to_yaml(cls, dumper, data): + return dumper.represent_scalar(cls.yaml_tag, data.code) + + +yaml.SafeLoader.add_constructor('!lambda', YAMLLambda.from_yaml) +yaml.SafeDumper.add_multi_representer(YAMLLambda, YAMLLambda.to_yaml)