import sqlite3 import json import os import paho.mqtt.client as mqtt import logging DB_FILE = os.getenv('DB_FILE', 'agent-summary.db3') MQTT_HOST = os.getenv('MQTT_HOST', 'iris.dgtlu.net') MQTT_TOPIC = os.getenv('MQTT_TOPIC','agent/summary') MQTT_USER = os.getenv('MQTT_USER','agent') MQTT_PASS = os.getenv('MQTT_PASS','agent') logger = logging.getLogger(__name__) debug = os.getenv("DEBUG", "") if debug: logger.setLevel(logging.DEBUG) else: logger.setLevel(logging.INFO) logger.addHandler(logging.StreamHandler()) def db_init(): conn = sqlite3.connect(DB_FILE) c = conn.cursor() c.execute('''CREATE TABLE IF NOT EXISTS AgentSummary ( camera_id int, dt datetime default current_timestamp, file char(500), model char(100), summary text, primary key (dt, file) )''') conn.commit() return conn def db_save_input(conn: sqlite3.Connection, json_string): dt = json.loads(json_string) c = conn.cursor() c.execute('INSERT INTO AgentSummary (camera_id, file, model, summary) VALUES (?, ?, ?, ?)', (dt["id"], dt["file"], dt["data"]["model"], dt["data"]["choices"][0]["message"]["content"].strip())) conn.commit() def mqtt_init() -> mqtt: def on_connect(mqt_client, userdata, flags, rc, properties): if rc == 0: logger.info("Connected to MQTT Broker!") else: logger.error("Failed to connect, return code %d\n", rc) client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2) client.on_connect = on_connect client.username_pw_set(username=MQTT_USER, password=MQTT_PASS) client.connect(host=MQTT_HOST, keepalive=60) return client def mqtt_subscribe(client: mqtt, db_conn: sqlite3.Connection): def on_message(mqt_client, userdata, msg): logger.debug(f"Received `{msg.payload.decode()}` from `{msg.topic}` topic") db_save_input(db_conn, msg.payload.decode()) client.subscribe(MQTT_TOPIC) client.on_message = on_message def main(): db_conn = db_init() mqtt_client = mqtt_init() mqtt_subscribe(mqtt_client, db_conn) mqtt_client.loop_forever() db_conn.close() if __name__ == '__main__': main()