#!/usr/bin/python from waggle.plugin import Plugin import time #import paho.mqtt.client as paho import logging import asyncio import os from amqtt.broker import Broker import yaml from amqtt.client import MQTTClient, ClientException from amqtt.mqtt.constants import QOS_1, QOS_2 import json sublist = [] sensortypes = ["microphone", "camera", "humidity"] showData = True async def broker_coro(brokerconfig): broker = Broker(config=brokerconfig) await broker.start() async def uptime_coro(): C = MQTTClient() await C.connect('mqtt://127.0.0.1:1883/') # Subscribe to '$SYS/broker/uptime' with QOS=1 # Subscribe to '$SYS/broker/load/#' with QOS=2 #await C.subscribe([ # ('a/b', QOS_1), # ]) await C.subscribe(sublist) try: count = 0 while True: count = count + 1 message = await C.deliver_message() packet = message.publish_packet dictout = decoder(packet.payload.data) print("%d: %s => %s" % (count, packet.variable_header.topic_name, dictout)) publishData(dictout["data"], dictout["sensorID"], packet.variable_header.topic_name.split("/")[-1], dictout["timestamp"]) #await C.unsubscribe(['a/b']) #await C.disconnect() except ClientException as ce: logger.error("Client exception: %s" % ce) def publishData(data, sensorID, sensorType, dataTimestamp): with Plugin() as plugin: if showData is True: print("publishing network.bridge.sensor." + sensorType + "with metadata:", {"sensorID": sensorID, "sensorType": sensorType}, "and timestamp:", dataTimestamp, "and data:", data) else: print("publishing network.bridge.sensor." + sensorType + "with metadata", {"sensorID": sensorID, "sensorType": sensorType}) plugin.publish("network.bridge.sensor." + sensorType, data, meta={"sensorID": str(sensorID), "sensorType": str(sensorType)}, timestamp=dataTimestamp) #time.sleep(1) def subdict(d, header): for k,v in d.items(): if isinstance(v, dict): if header is not None: subdict(v, header + k + "/") else: subdict(v, k + "/") else: for sensor in v: sublist.append((header + k + "/" + sensor, QOS_2)) #print (header + k + "/" + sensor) def decoder(data): print (data) return json.loads(data.decode("utf-8")) if __name__ == "__main__": # mqtt broker config with open('broker.yaml', 'r') as fileread: brokerconfig = yaml.safe_load(fileread) # clients config with open('subscribe.yaml', 'r') as fileread: clientconfig = yaml.safe_load(fileread) subdict(clientconfig, None) print(sublist) formatter = "[%(asctime)s] :: %(levelname)s :: %(name)s :: %(message)s" logging.basicConfig(level=logging.ERROR, format=formatter) asyncio.get_event_loop().run_until_complete(broker_coro(brokerconfig)) asyncio.get_event_loop().run_until_complete(uptime_coro()) asyncio.get_event_loop().run_forever() publishData(1, "0", sensortypes[0], time.time_ns())