#!/usr/bin/python from waggle.plugin import Plugin import time #import paho.mqtt.client as paho import logging import asyncio import os from amqtt.broker import Broker import yaml from amqtt.client import MQTTClient, ClientException from amqtt.mqtt.constants import QOS_1, QOS_2 import json import base64 import numpy as np import cv2 as cv sublist = [] sensorlist = [] devicelist = [] sensorcount = 0 devicecount = 0 showData = True # debug print async def broker_coro(brokerconfig): broker = Broker(config=brokerconfig) await broker.start() async def uptime_coro(): C = MQTTClient() await C.connect('mqtt://127.0.0.1:1883/') # connect to my own broker await C.subscribe(sublist) # subscribe to list of sensor topics try: while True: message = await C.deliver_message() packet = message.publish_packet dictout = decoder(packet.payload.data) sensorID = 99999 # in case no sensor ID is found somehow if 'sensorID' in dictout.keys() and dictout["sensorID"] > sensorcount: sensorID = dictout["sensorID"] else: for sensor in sensorlist: if sensor[0] == packet.variable_header.topic_name: sensorID = sensor[1] deviceID = 99999 # in case no device ID is found somehow if 'deviceID' in dictout.keys() and dictout["deviceID"] > devicecount: deviceID = dictout["deviceID"] else: for device in devicelist: print('/'.join(packet.variable_header.topic_name.split("/")[0:-1]) + "/") if device[0] == '/'.join(packet.variable_header.topic_name.split("/")[0:-1]) + "/": # remove the last field (sensor) deviceID = device[1] timestamp = time.time_ns() if 'timestamp' in dictout.keys(): timestamp = dictout["timestamp"] #print("%s => %s" % (packet.variable_header.topic_name, dictout)) if packet.variable_header.topic_name.split("/")[-1] == "camera": # trigger image loading print(len(dictout["data"])) img = np.frombuffer(base64.b64decode(dictout["data"]), np.uint8) cv.imwrite('./output.jpg', img) # write to file for testing publishData(str(dictout["data"]), str(sensorID), str(deviceID), str(packet.variable_header.topic_name.split("/")[-1]), timestamp) #await C.unsubscribe(['a/b']) #await C.disconnect() except ClientException as ce: logger.error("Client exception: %s" % ce) def publishData(data, sensorID, deviceID, sensorType, dataTimestamp): with Plugin() as plugin: if showData is True: print("publishing network.bridge.sensor." + sensorType + " with metadata:", {"sensorID": sensorID, "deviceID": deviceID, "sensorType": sensorType}, "and timestamp:", dataTimestamp, "and data:", data) else: print("publishing network.bridge.sensor." + sensorType + "with metadata", {"sensorID": sensorID, "deviceID": deviceID, "sensorType": sensorType}) # send data to waggle: plugin.publish("network.bridge.sensor." + sensorType, data, meta={"sensorID": str(sensorID), "deviceID": str(deviceID), "sensorType": str(sensorType)}, timestamp=dataTimestamp) def subdict(d, header): # coordinate mqtt topic names and waggle topics / sensor names global sensorcount global devicecount for k,v in d.items(): if isinstance(v, dict): if header is not None: subdict(v, header + k + "/") # recurse further else: subdict(v, k + "/") else: for sensor in v: sublist.append((header + k + "/" + sensor, QOS_2)) sensorlist.append((header + k + "/" + sensor, sensorcount)) sensorcount = sensorcount + 1 if (header + k + "/", devicecount - 1) not in devicelist: devicelist.append((header + k + "/", devicecount)) devicecount = devicecount + 1 #print (((header + k + "/" + sensor, QOS_2), sensorcount)) def decoder(data): print (data) return json.loads(data.decode("utf-8")) if __name__ == "__main__": # mqtt broker config with open('broker.yaml', 'r') as fileread: brokerconfig = yaml.safe_load(fileread) # clients config with open('subscribe.yaml', 'r') as fileread: clientconfig = yaml.safe_load(fileread) subdict(clientconfig, None) print("sublist:", sublist) print("sensors:", sensorlist) print("devices:", devicelist) formatter = "[%(asctime)s] :: %(levelname)s :: %(name)s :: %(message)s" logging.basicConfig(level=logging.ERROR, format=formatter) asyncio.get_event_loop().run_until_complete(broker_coro(brokerconfig)) asyncio.get_event_loop().run_until_complete(uptime_coro()) asyncio.get_event_loop().run_forever() #publishData(1, "0", sensortypes[0], time.time_ns())