| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778 |
- import sqlite3
- import json
- import os
- import paho.mqtt.client as mqtt
- import logging
- DB_FILE = os.getenv('DB_FILE', 'agent-summary.db3')
- MQTT_HOST = os.getenv('MQTT_HOST', 'iris.dgtlu.net')
- MQTT_TOPIC = os.getenv('MQTT_TOPIC','agent/summary')
- MQTT_USER = os.getenv('MQTT_USER','agent')
- MQTT_PASS = os.getenv('MQTT_PASS','agent')
- logger = logging.get("__name__")
- debug = os.getenv("DEBUG", "")
- if debug:
- logger.setLevel(logging.DEBUG)
- else:
- logger.setLevel(logging.INFO)
- logger.addHandler(logging.StreamHandler())
- def db_init():
- conn = sqlite3.connect(DB_FILE)
- c = conn.cursor()
- c.execute('''CREATE TABLE IF NOT EXISTS AgentSummary (
- camera_id int,
- dt datetime default current_timestamp,
- file char(500),
- model char(100),
- summary text,
- primary key (dt, file)
- )''')
- conn.commit()
- return conn
- def db_save_input(conn: sqlite3.Connection, json_string):
- dt = json.loads(json_string)
- c = conn.cursor()
- c.execute('INSERT INTO AgentSummary (camera_id, file, model, summary) VALUES (?, ?, ?, ?)',
- (dt["id"], dt["file"], dt["data"]["model"], dt["data"]["choices"][0]["message"]["content"].strip()))
- conn.commit()
- def mqtt_init() -> mqtt:
- def on_connect(mqt_client, userdata, flags, rc, properties):
- if rc == 0:
- logger.info("Connected to MQTT Broker!")
- else:
- logger.error("Failed to connect, return code %d\n", rc)
- client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2)
- client.on_connect = on_connect
- client.username_pw_set(username=MQTT_USER, password=MQTT_PASS)
- client.connect(host=MQTT_HOST, keepalive=60)
- return client
- def mqtt_subscribe(client: mqtt, db_conn: sqlite3.Connection):
- def on_message(mqt_client, userdata, msg):
- logger.debug(f"Received `{msg.payload.decode()}` from `{msg.topic}` topic")
- db_save_input(db_conn, msg.payload.decode())
- client.subscribe(MQTT_TOPIC)
- client.on_message = on_message
- def main():
- db_conn = db_init()
- mqtt_client = mqtt_init()
- mqtt_subscribe(mqtt_client, db_conn)
- mqtt_client.loop_forever()
- db_conn.close()
- if __name__ == '__main__':
- main()
|