#!/usr/bin/python from waggle.plugin import Plugin import time #import paho.mqtt.client as paho import logging import asyncio import os from amqtt.broker import Broker import yaml from amqtt.client import MQTTClient, ClientException from amqtt.mqtt.constants import QOS_1, QOS_2 import json import base64 import numpy as np import cv2 as cv sublist = [] sensortypes = ["microphone", "camera", "humidity"] showData = True async def broker_coro(brokerconfig): broker = Broker(config=brokerconfig) await broker.start() async def uptime_coro(): C = MQTTClient() await C.connect('mqtt://127.0.0.1:1883/') # Subscribe to '$SYS/broker/uptime' with QOS=1 # Subscribe to '$SYS/broker/load/#' with QOS=2 #await C.subscribe([ # ('a/b', QOS_1), # ]) await C.subscribe(sublist) try: count = 0 while True: count = count + 1 message = await C.deliver_message() packet = message.publish_packet dictout = decoder(packet.payload.data) print("%d: %s => %s" % (count, packet.variable_header.topic_name, dictout)) if packet.variable_header.topic_name.split("/")[-1] == "camera": # trigger image loading print(len(dictout["data"])) img = np.frombuffer(base64.b64decode(dictout["data"]), np.uint8) #height, width = img.shape[:2] #print(height, width) #cv.imshow('frame', img) cv.imwrite('./output.jpg', img) #if cv.waitKey(1) == ord('q'): # break else: publishData(str(dictout["data"]), str(dictout["sensorID"]), str(packet.variable_header.topic_name.split("/")[-1]), dictout["timestamp"]) #await C.unsubscribe(['a/b']) #await C.disconnect() except ClientException as ce: logger.error("Client exception: %s" % ce) def publishData(data, sensorID, sensorType, dataTimestamp): with Plugin() as plugin: if showData is True: print("publishing network.bridge.sensor." + sensorType + " with metadata:", {"sensorID": sensorID, "sensorType": sensorType}, "and timestamp:", dataTimestamp, "and data:", data) else: print("publishing network.bridge.sensor." + sensorType + "with metadata", {"sensorID": sensorID, "sensorType": sensorType}) plugin.publish("network.bridge.sensor." + sensorType, data, meta={"sensorID": str(sensorID), "sensorType": str(sensorType)}, timestamp=dataTimestamp) #time.sleep(1) print("DONE") def subdict(d, header): # coordinate mqtt topic names and waggle topics / sensor names for k,v in d.items(): if isinstance(v, dict): if header is not None: subdict(v, header + k + "/") # recurse further else: subdict(v, k + "/") else: for sensor in v: sublist.append((header + k + "/" + sensor, QOS_2)) #print (header + k + "/" + sensor) def decoder(data): print (data) return json.loads(data.decode("utf-8")) if __name__ == "__main__": # mqtt broker config with open('broker.yaml', 'r') as fileread: brokerconfig = yaml.safe_load(fileread) # clients config with open('subscribe.yaml', 'r') as fileread: clientconfig = yaml.safe_load(fileread) subdict(clientconfig, None) print(sublist) formatter = "[%(asctime)s] :: %(levelname)s :: %(name)s :: %(message)s" logging.basicConfig(level=logging.ERROR, format=formatter) asyncio.get_event_loop().run_until_complete(broker_coro(brokerconfig)) asyncio.get_event_loop().run_until_complete(uptime_coro()) asyncio.get_event_loop().run_forever() #publishData(1, "0", sensortypes[0], time.time_ns())