Compare commits

...

4 Commits

6 changed files with 176 additions and 97 deletions

View File

@ -1,12 +1,18 @@
#!/usr/bin/python #!/usr/bin/python
# example client application
import logging import logging
import asyncio import asyncio
import numpy as np
import cv2 as cv
from amqtt.client import MQTTClient from amqtt.client import MQTTClient
from amqtt.mqtt.constants import QOS_0, QOS_1, QOS_2 from amqtt.mqtt.constants import QOS_0, QOS_1, QOS_2
import time import time
import json import json
import yaml
import base64
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -19,15 +25,27 @@ def get_uptime():
print(uptime_seconds) print(uptime_seconds)
return uptime_seconds return uptime_seconds
def get_frame():
cap = cv.VideoCapture(0)
if not cap.isOpened():
print("Cannot open camera")
exit()
ret, frame = cap.read()
gray = cv.cvtColor(frame, cv.COLOR_BGR2GRAY)
frame = gray
if not ret:
print("Can't receive frame (stream end?). Exiting ...")
print(len(base64.b64encode(frame)))
height, width = frame.shape[:2]
print(height, width)
return base64.b64encode(frame).decode('utf-8')
async def test_coro(): async def test_coro():
C = MQTTClient() C = MQTTClient()
print("START" + str(get_uptime())) print("START" + str(get_uptime()))
await C.connect('mqtt://127.0.0.1:1883/') await C.connect('mqtt://127.0.0.1:1883')
tasks = [ tasks = [
asyncio.ensure_future(C.publish('sensors/pi1/camera', encoder({"data": str(get_frame()), "sensorID": 0, "timestamp": time.time_ns() }), qos=QOS_2)),
#asyncio.ensure_future(C.publish('sensors/pico1/distance', encoder({"data": 23.0, "sensorID": 0, "timestamp": time.time_ns()}))), asyncio.ensure_future(C.publish('sensors/pi1/uptime', encoder({"data": get_uptime(), "sensorID": 0, "timestamp": time.time_ns()}), qos=QOS_2)),
#asyncio.ensure_future(C.publish('a/b', b'TEST MESSAGE WITH QOS_1', qos=QOS_1)),
asyncio.ensure_future(C.publish('sensors/pico1/uptime', encoder({"data": get_uptime(), "sensorID": 0, "timestamp": time.time_ns()}), qos=QOS_2)),
] ]
await asyncio.wait(tasks) await asyncio.wait(tasks)
logger.info("messages published") logger.info("messages published")
@ -37,16 +55,13 @@ async def test_coro():
if __name__ == '__main__': if __name__ == '__main__':
formatter = "[%(asctime)s] %(name)s {%(filename)s:%(lineno)d} %(levelname)s - %(message)s" formatter = "[%(asctime)s] %(name)s {%(filename)s:%(lineno)d} %(levelname)s - %(message)s"
logging.basicConfig(level=logging.INFO, format=formatter) logging.basicConfig(level=logging.DEBUG, format=formatter)
with open('client.yaml', 'r') as fileread: with open('client.yaml', 'r') as fileread:
clientconfig = yaml.safe_load(fileread) clientconfig = yaml.safe_load(fileread)
subdict(clientconfig, None) print(clientconfig)
print(sublist)
x = 0 while True:
while x < 5:
asyncio.get_event_loop().run_until_complete(test_coro()) asyncio.get_event_loop().run_until_complete(test_coro())
x = x + 1
#asyncio.get_event_loop().run_until_complete(test_coro2()) #asyncio.get_event_loop().run_until_complete(test_coro2())

96
main.py
View File

@ -11,11 +11,18 @@ import yaml
from amqtt.client import MQTTClient, ClientException from amqtt.client import MQTTClient, ClientException
from amqtt.mqtt.constants import QOS_1, QOS_2 from amqtt.mqtt.constants import QOS_1, QOS_2
import json import json
import base64
import numpy as np
import cv2 as cv
sublist = [] sublist = []
sensorlist = []
devicelist = []
sensorcount = 0
devicecount = 0
sensortypes = ["microphone", "camera", "humidity"] showData = True # debug print
showData = True
async def broker_coro(brokerconfig): async def broker_coro(brokerconfig):
broker = Broker(config=brokerconfig) broker = Broker(config=brokerconfig)
@ -23,50 +30,75 @@ async def broker_coro(brokerconfig):
async def uptime_coro(): async def uptime_coro():
C = MQTTClient() C = MQTTClient()
await C.connect('mqtt://127.0.0.1:1883/') await C.connect('mqtt://127.0.0.1:1883/') # connect to my own broker
# Subscribe to '$SYS/broker/uptime' with QOS=1 await C.subscribe(sublist) # subscribe to list of sensor topics
# Subscribe to '$SYS/broker/load/#' with QOS=2
#await C.subscribe([
# ('a/b', QOS_1),
# ])
await C.subscribe(sublist)
try: try:
count = 0
while True: while True:
count = count + 1
message = await C.deliver_message() message = await C.deliver_message()
packet = message.publish_packet packet = message.publish_packet
dictout = decoder(packet.payload.data) dictout = decoder(packet.payload.data)
print("%d: %s => %s" % (count, packet.variable_header.topic_name, dictout)) sensorID = 99999 # in case no sensor ID is found somehow
publishData(str(dictout["data"]), str(dictout["sensorID"]), str(packet.variable_header.topic_name.split("/")[-1]), dictout["timestamp"]) if 'sensorID' in dictout.keys() and dictout["sensorID"] > sensorcount:
sensorID = dictout["sensorID"]
else:
for sensor in sensorlist:
if sensor[0] == packet.variable_header.topic_name:
sensorID = sensor[1]
deviceID = 99999 # in case no device ID is found somehow
if 'deviceID' in dictout.keys() and dictout["deviceID"] > devicecount:
deviceID = dictout["deviceID"]
else:
for device in devicelist:
print('/'.join(packet.variable_header.topic_name.split("/")[0:-1]) + "/")
if device[0] == '/'.join(packet.variable_header.topic_name.split("/")[0:-1]) + "/": # remove the last field (sensor)
deviceID = device[1]
timestamp = time.time_ns()
if 'timestamp' in dictout.keys():
timestamp = dictout["timestamp"]
#print("%s => %s" % (packet.variable_header.topic_name, dictout))
if packet.variable_header.topic_name.split("/")[-1] == "camera": # trigger image loading
print(len(dictout["data"]))
img = np.frombuffer(base64.b64decode(dictout["data"]), np.uint8)
cv.imwrite('./output.jpg', img) # write to file for testing
publishData(str(dictout["data"]), str(sensorID), str(deviceID), str(packet.variable_header.topic_name.split("/")[-1]), timestamp)
#await C.unsubscribe(['a/b']) #await C.unsubscribe(['a/b'])
#await C.disconnect() #await C.disconnect()
except ClientException as ce: except ClientException as ce:
logger.error("Client exception: %s" % ce) logger.error("Client exception: %s" % ce)
def publishData(data, sensorID, sensorType, dataTimestamp): def publishData(data, sensorID, deviceID, sensorType, dataTimestamp):
with Plugin() as plugin: with Plugin() as plugin:
if showData is True: if showData is True:
print("publishing network.bridge.sensor." + sensorType + " with metadata:", {"sensorID": sensorID, "sensorType": sensorType}, "and timestamp:", dataTimestamp, "and data:", data) print("publishing network.bridge.sensor." + sensorType + " with metadata:", {"sensorID": sensorID, "deviceID": deviceID, "sensorType": sensorType}, "and timestamp:", dataTimestamp, "and data:", data)
else: else:
print("publishing network.bridge.sensor." + sensorType + "with metadata", {"sensorID": sensorID, "sensorType": sensorType}) print("publishing network.bridge.sensor." + sensorType + "with metadata", {"sensorID": sensorID, "deviceID": deviceID, "sensorType": sensorType})
# send data to waggle:
plugin.publish("network.bridge.sensor." + sensorType, data, meta={"sensorID": str(sensorID), "sensorType": str(sensorType)}, timestamp=dataTimestamp) plugin.publish("network.bridge.sensor." + sensorType, data, meta={"sensorID": str(sensorID), "deviceID": str(deviceID), "sensorType": str(sensorType)}, timestamp=dataTimestamp)
#time.sleep(1)
print("DONE")
def subdict(d, header): def subdict(d, header):
for k,v in d.items(): # coordinate mqtt topic names and waggle topics / sensor names
if isinstance(v, dict): global sensorcount
if header is not None: global devicecount
subdict(v, header + k + "/") for k,v in d.items():
else: if isinstance(v, dict):
subdict(v, k + "/") if header is not None:
else: subdict(v, header + k + "/") # recurse further
for sensor in v: else:
sublist.append((header + k + "/" + sensor, QOS_2)) subdict(v, k + "/")
#print (header + k + "/" + sensor) else:
for sensor in v:
sublist.append((header + k + "/" + sensor, QOS_2))
sensorlist.append((header + k + "/" + sensor, sensorcount))
sensorcount = sensorcount + 1
if (header + k + "/", devicecount - 1) not in devicelist:
devicelist.append((header + k + "/", devicecount))
devicecount = devicecount + 1
#print (((header + k + "/" + sensor, QOS_2), sensorcount))
def decoder(data): def decoder(data):
print (data) print (data)
@ -81,7 +113,9 @@ if __name__ == "__main__":
with open('subscribe.yaml', 'r') as fileread: with open('subscribe.yaml', 'r') as fileread:
clientconfig = yaml.safe_load(fileread) clientconfig = yaml.safe_load(fileread)
subdict(clientconfig, None) subdict(clientconfig, None)
print(sublist) print("sublist:", sublist)
print("sensors:", sensorlist)
print("devices:", devicelist)
formatter = "[%(asctime)s] :: %(levelname)s :: %(name)s :: %(message)s" formatter = "[%(asctime)s] :: %(levelname)s :: %(name)s :: %(message)s"
logging.basicConfig(level=logging.ERROR, format=formatter) logging.basicConfig(level=logging.ERROR, format=formatter)

View File

@ -1,4 +1,3 @@
pywaggle[all] pywaggle[all]
amqtt==0.11.0b1 amqtt==0.10.0a3
pyyaml pyyaml
alive-progress

14
run.sh
View File

@ -1,9 +1,9 @@
#!/bin/bash #!/bin/bash
#export PYWAGGLE_LOG_DIR=test-run export PYWAGGLE_LOG_DIR=test-run
python main.py & python main.py
sleep 1 #sleep 1
python client.py #2&>/dev/null > /dev/null #python client.py #2&>/dev/null > /dev/null
sleep 5 #sleep 5
jobs -p #jobs -p
kill $(jobs -p) #kill $(jobs -p)

View File

@ -2,10 +2,13 @@ sensors:
pico1: pico1:
- "distance" - "distance"
- "uptime" - "uptime"
pico2:
cameras:
- "1"
- "2"
pi1: pi1:
- "humidity" - "camera"
- "camera" - "uptime"
subnet2:
pi2:
- "microphone"
- "uptime"
pico1:
- "soil"
- "uptime"

File diff suppressed because one or more lines are too long