From 8a67202247c9ad05f2639f64c94496ad87a46ce5 Mon Sep 17 00:00:00 2001 From: Cole Deck Date: Tue, 21 Mar 2023 15:53:45 -0500 Subject: [PATCH] Add MQTT initial idea --- broker.yaml | 16 ++++++++++++++++ client.py | 43 +++++++++++++++++++++++++++++++++++++++++++ main.py | 47 +++++++++++++++++++++++++++++++++++++++++++++-- requirements.txt | 4 +++- subscribe.yaml | 3 +++ 5 files changed, 110 insertions(+), 3 deletions(-) create mode 100644 broker.yaml create mode 100644 client.py create mode 100644 subscribe.yaml diff --git a/broker.yaml b/broker.yaml new file mode 100644 index 0000000..532c287 --- /dev/null +++ b/broker.yaml @@ -0,0 +1,16 @@ +listeners: + default: + max-connections: 50000 + type: tcp + tcp: + bind: 0.0.0.0:1883 + ws: + bind: 0.0.0.0:8080 + type: ws +timeout-disconnect-delay: 2 +auth: + plugins: ['auth.anonymous'] #List of plugins to activate for authentication among all registered plugins + allow-anonymous: true + #password-file: /some/passwd_file +topic-check: + enabled: false # Set to False if topic filtering is not needed \ No newline at end of file diff --git a/client.py b/client.py new file mode 100644 index 0000000..d80477f --- /dev/null +++ b/client.py @@ -0,0 +1,43 @@ +#!/usr/bin/python + +import logging +import asyncio + +from amqtt.client import MQTTClient +from amqtt.mqtt.constants import QOS_0, QOS_1, QOS_2 + +logger = logging.getLogger(__name__) + +async def test_coro(): + C = MQTTClient() + await C.connect('mqtt://test.mosquitto.org/') + tasks = [ + asyncio.ensure_future(C.publish('a/b', b'TEST MESSAGE WITH QOS_0')), + asyncio.ensure_future(C.publish('a/b', b'TEST MESSAGE WITH QOS_1', qos=QOS_1)), + asyncio.ensure_future(C.publish('a/b', b'TEST MESSAGE WITH QOS_2', qos=QOS_2)), + ] + await asyncio.wait(tasks) + logger.info("messages published") + await C.disconnect() + + +async def test_coro2(): + try: + C = MQTTClient() + ret = await C.connect('mqtt://127.0.0.1:1883/') + message = await C.publish('a/b', b'TEST MESSAGE WITH QOS_0', qos=QOS_0) + message = await C.publish('a/b', b'TEST MESSAGE WITH QOS_1', qos=QOS_1) + message = await C.publish('a/b', b'TEST MESSAGE WITH QOS_2', qos=QOS_2) + #print(message) + logger.info("messages published") + await C.disconnect() + except ConnectException as ce: + logger.error("Connection failed: %s" % ce) + asyncio.get_event_loop().stop() + + +if __name__ == '__main__': + formatter = "[%(asctime)s] %(name)s {%(filename)s:%(lineno)d} %(levelname)s - %(message)s" + logging.basicConfig(level=logging.DEBUG, format=formatter) + asyncio.get_event_loop().run_until_complete(test_coro()) + asyncio.get_event_loop().run_until_complete(test_coro2()) \ No newline at end of file diff --git a/main.py b/main.py index 5c2686b..9d125a3 100644 --- a/main.py +++ b/main.py @@ -2,10 +2,43 @@ from waggle.plugin import Plugin import time +#import paho.mqtt.client as paho +import logging +import asyncio +import os +from amqtt.broker import Broker +import yaml +from amqtt.client import MQTTClient, ClientException +from amqtt.mqtt.constants import QOS_1, QOS_2 + + sensortypes = ["microphone", "camera", "humidity"] showData = True +async def broker_coro(brokerconfig): + broker = Broker(config=brokerconfig) + await broker.start() + +async def uptime_coro(): + C = MQTTClient() + await C.connect('mqtt://127.0.0.1:1883/') + # Subscribe to '$SYS/broker/uptime' with QOS=1 + # Subscribe to '$SYS/broker/load/#' with QOS=2 + await C.subscribe([ + ('a/b', QOS_1), + ]) + try: + for i in range(1, 100): + message = await C.deliver_message() + packet = message.publish_packet + print("%d: %s => %s" % (i, packet.variable_header.topic_name, str(packet.payload.data))) + await C.unsubscribe(['a/b']) + await C.disconnect() + except ClientException as ce: + logger.error("Client exception: %s" % ce) + + def publishData(data, sensorID, sensorType, dataTimestamp): with Plugin() as plugin: if showData is True: @@ -14,8 +47,18 @@ def publishData(data, sensorID, sensorType, dataTimestamp): print("publishing network.bridge.sensor." + sensorType + "with metadata", {"sensorID": sensorID, "sensorType": sensorType}) plugin.publish("network.bridge.sensor." + sensorType, data, meta={"sensorID": sensorID, "sensorType": sensorType}, timestamp=dataTimestamp) - time.sleep(1) + #time.sleep(1) if __name__ == "__main__": # testing - publishData(1, "0", sensortypes[0], time.time_ns()) \ No newline at end of file + with open('broker.yaml', 'r') as fileread: + brokerconfig = yaml.safe_load(fileread) + + formatter = "[%(asctime)s] :: %(levelname)s :: %(name)s :: %(message)s" + logging.basicConfig(level=logging.INFO, format=formatter) + asyncio.get_event_loop().run_until_complete(broker_coro(brokerconfig)) + asyncio.get_event_loop().run_until_complete(uptime_coro()) + asyncio.get_event_loop().run_forever() + + publishData(1, "0", sensortypes[0], time.time_ns()) + diff --git a/requirements.txt b/requirements.txt index 821be3e..f376015 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1 +1,3 @@ -pywaggle[all] \ No newline at end of file +pywaggle[all] +amqtt==0.11.0b1 +pyyaml \ No newline at end of file diff --git a/subscribe.yaml b/subscribe.yaml new file mode 100644 index 0000000..a888d24 --- /dev/null +++ b/subscribe.yaml @@ -0,0 +1,3 @@ +sensors: + pico1: + - "distance" \ No newline at end of file