#!/usr/bin/python from waggle.plugin import Plugin import time #import paho.mqtt.client as paho import logging import asyncio import os from amqtt.broker import Broker import yaml from amqtt.client import MQTTClient, ClientException from amqtt.mqtt.constants import QOS_1, QOS_2 sensortypes = ["microphone", "camera", "humidity"] showData = True async def broker_coro(brokerconfig): broker = Broker(config=brokerconfig) await broker.start() async def uptime_coro(): C = MQTTClient() await C.connect('mqtt://127.0.0.1:1883/') # Subscribe to '$SYS/broker/uptime' with QOS=1 # Subscribe to '$SYS/broker/load/#' with QOS=2 await C.subscribe([ ('a/b', QOS_1), ]) try: for i in range(1, 100): message = await C.deliver_message() packet = message.publish_packet print("%d: %s => %s" % (i, packet.variable_header.topic_name, str(packet.payload.data))) await C.unsubscribe(['a/b']) await C.disconnect() except ClientException as ce: logger.error("Client exception: %s" % ce) def publishData(data, sensorID, sensorType, dataTimestamp): with Plugin() as plugin: if showData is True: print("publishing network.bridge.sensor." + sensorType + "with metadata:", {"sensorID": sensorID, "sensorType": sensorType}, "and timestamp:", dataTimestamp, "and data:", data) else: print("publishing network.bridge.sensor." + sensorType + "with metadata", {"sensorID": sensorID, "sensorType": sensorType}) plugin.publish("network.bridge.sensor." + sensorType, data, meta={"sensorID": sensorID, "sensorType": sensorType}, timestamp=dataTimestamp) #time.sleep(1) if __name__ == "__main__": # testing with open('broker.yaml', 'r') as fileread: brokerconfig = yaml.safe_load(fileread) formatter = "[%(asctime)s] :: %(levelname)s :: %(name)s :: %(message)s" logging.basicConfig(level=logging.INFO, format=formatter) asyncio.get_event_loop().run_until_complete(broker_coro(brokerconfig)) asyncio.get_event_loop().run_until_complete(uptime_coro()) asyncio.get_event_loop().run_forever() publishData(1, "0", sensortypes[0], time.time_ns())