Compare commits

...

6 Commits

Author SHA1 Message Date
a9bc921d7c update sensorID and deviceID in API call, override invaled IDs 2023-05-05 16:49:04 -05:00
4b454d9d6f update sensor and device ID code 2023-05-05 16:22:22 -05:00
231d8c9bc7 update main.py 2023-05-02 15:41:21 -05:00
f7dc3929b9 update client, build docker successfully 2023-05-02 15:41:05 -05:00
28bc5835c2 latest client 2023-05-02 13:39:24 -05:00
e80702261c latest 2023-05-02 13:38:59 -05:00
8 changed files with 187 additions and 97 deletions

View File

@ -1,12 +1,18 @@
#!/usr/bin/python #!/usr/bin/python
# example client application
import logging import logging
import asyncio import asyncio
import numpy as np
import cv2 as cv
from amqtt.client import MQTTClient from amqtt.client import MQTTClient
from amqtt.mqtt.constants import QOS_0, QOS_1, QOS_2 from amqtt.mqtt.constants import QOS_0, QOS_1, QOS_2
import time import time
import json import json
import yaml
import base64
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -19,15 +25,27 @@ def get_uptime():
print(uptime_seconds) print(uptime_seconds)
return uptime_seconds return uptime_seconds
def get_frame():
cap = cv.VideoCapture(0)
if not cap.isOpened():
print("Cannot open camera")
exit()
ret, frame = cap.read()
gray = cv.cvtColor(frame, cv.COLOR_BGR2GRAY)
frame = gray
if not ret:
print("Can't receive frame (stream end?). Exiting ...")
print(len(base64.b64encode(frame)))
height, width = frame.shape[:2]
print(height, width)
return base64.b64encode(frame).decode('utf-8')
async def test_coro(): async def test_coro():
C = MQTTClient() C = MQTTClient()
print("START" + str(get_uptime())) print("START" + str(get_uptime()))
await C.connect('mqtt://127.0.0.1:1883/') await C.connect('mqtt://127.0.0.1:1883')
tasks = [ tasks = [
asyncio.ensure_future(C.publish('sensors/pi1/camera', encoder({"data": str(get_frame()), "sensorID": 0, "timestamp": time.time_ns() }), qos=QOS_2)),
#asyncio.ensure_future(C.publish('sensors/pico1/distance', encoder({"data": 23.0, "sensorID": 0, "timestamp": time.time_ns()}))), asyncio.ensure_future(C.publish('sensors/pi1/uptime', encoder({"data": get_uptime(), "sensorID": 0, "timestamp": time.time_ns()}), qos=QOS_2)),
#asyncio.ensure_future(C.publish('a/b', b'TEST MESSAGE WITH QOS_1', qos=QOS_1)),
asyncio.ensure_future(C.publish('sensors/pico1/uptime', encoder({"data": get_uptime(), "sensorID": 0, "timestamp": time.time_ns()}), qos=QOS_2)),
] ]
await asyncio.wait(tasks) await asyncio.wait(tasks)
logger.info("messages published") logger.info("messages published")
@ -37,11 +55,13 @@ async def test_coro():
if __name__ == '__main__': if __name__ == '__main__':
formatter = "[%(asctime)s] %(name)s {%(filename)s:%(lineno)d} %(levelname)s - %(message)s" formatter = "[%(asctime)s] %(name)s {%(filename)s:%(lineno)d} %(levelname)s - %(message)s"
logging.basicConfig(level=logging.INFO, format=formatter) logging.basicConfig(level=logging.DEBUG, format=formatter)
x = 0 with open('client.yaml', 'r') as fileread:
while x < 5: clientconfig = yaml.safe_load(fileread)
print(clientconfig)
while True:
asyncio.get_event_loop().run_until_complete(test_coro()) asyncio.get_event_loop().run_until_complete(test_coro())
x = x + 1
#asyncio.get_event_loop().run_until_complete(test_coro2()) #asyncio.get_event_loop().run_until_complete(test_coro2())

5
client.yaml Normal file
View File

@ -0,0 +1,5 @@
client:
name: "pi1"
sensors:
camera:
path: /dev/v4l/by-id/

View File

@ -8,9 +8,8 @@
; Please visit documentation for the other options and examples ; Please visit documentation for the other options and examples
; https://docs.platformio.org/page/projectconf.html ; https://docs.platformio.org/page/projectconf.html
[env:pico] [env:esp32]
platform = https://github.com/maxgerhardt/platform-raspberrypi.git platform = espressif32
board = rpipicow board = esp32doit-devkit-v1
framework = arduino framework = arduino
upload_port = /run/media/amelia/RPI-RP2/ lib_deps = knolleary/PubSubClient@^2.8
board_build.core = earlephilhower

98
main.py
View File

@ -11,11 +11,18 @@ import yaml
from amqtt.client import MQTTClient, ClientException from amqtt.client import MQTTClient, ClientException
from amqtt.mqtt.constants import QOS_1, QOS_2 from amqtt.mqtt.constants import QOS_1, QOS_2
import json import json
import base64
import numpy as np
import cv2 as cv
sublist = [] sublist = []
sensorlist = []
devicelist = []
sensorcount = 0
devicecount = 0
sensortypes = ["microphone", "camera", "humidity"] showData = True # debug print
showData = True
async def broker_coro(brokerconfig): async def broker_coro(brokerconfig):
broker = Broker(config=brokerconfig) broker = Broker(config=brokerconfig)
@ -23,50 +30,75 @@ async def broker_coro(brokerconfig):
async def uptime_coro(): async def uptime_coro():
C = MQTTClient() C = MQTTClient()
await C.connect('mqtt://127.0.0.1:1883/') await C.connect('mqtt://127.0.0.1:1883/') # connect to my own broker
# Subscribe to '$SYS/broker/uptime' with QOS=1 await C.subscribe(sublist) # subscribe to list of sensor topics
# Subscribe to '$SYS/broker/load/#' with QOS=2
#await C.subscribe([
# ('a/b', QOS_1),
# ])
await C.subscribe(sublist)
try: try:
count = 0
while True: while True:
count = count + 1
message = await C.deliver_message() message = await C.deliver_message()
packet = message.publish_packet packet = message.publish_packet
dictout = decoder(packet.payload.data) dictout = decoder(packet.payload.data)
print("%d: %s => %s" % (count, packet.variable_header.topic_name, dictout)) sensorID = 99999 # in case no sensor ID is found somehow
publishData(str(dictout["data"]), str(dictout["sensorID"]), str(packet.variable_header.topic_name.split("/")[-1]), dictout["timestamp"]) if 'sensorID' in dictout.keys() and dictout["sensorID"] > sensorcount:
sensorID = dictout["sensorID"]
else:
for sensor in sensorlist:
if sensor[0] == packet.variable_header.topic_name:
sensorID = sensor[1]
deviceID = 99999 # in case no device ID is found somehow
if 'deviceID' in dictout.keys() and dictout["deviceID"] > devicecount:
deviceID = dictout["deviceID"]
else:
for device in devicelist:
print('/'.join(packet.variable_header.topic_name.split("/")[0:-1]) + "/")
if device[0] == '/'.join(packet.variable_header.topic_name.split("/")[0:-1]) + "/": # remove the last field (sensor)
deviceID = device[1]
timestamp = time.time_ns()
if 'timestamp' in dictout.keys():
timestamp = dictout["timestamp"]
#print("%s => %s" % (packet.variable_header.topic_name, dictout))
if packet.variable_header.topic_name.split("/")[-1] == "camera": # trigger image loading
print(len(dictout["data"]))
img = np.frombuffer(base64.b64decode(dictout["data"]), np.uint8)
cv.imwrite('./output.jpg', img) # write to file for testing
publishData(str(dictout["data"]), str(sensorID), str(deviceID), str(packet.variable_header.topic_name.split("/")[-1]), timestamp)
#await C.unsubscribe(['a/b']) #await C.unsubscribe(['a/b'])
#await C.disconnect() #await C.disconnect()
except ClientException as ce: except ClientException as ce:
logger.error("Client exception: %s" % ce) logger.error("Client exception: %s" % ce)
def publishData(data, sensorID, sensorType, dataTimestamp): def publishData(data, sensorID, deviceID, sensorType, dataTimestamp):
with Plugin() as plugin: with Plugin() as plugin:
if showData is True: if showData is True:
print("publishing network.bridge.sensor." + sensorType + " with metadata:", {"sensorID": sensorID, "sensorType": sensorType}, "and timestamp:", dataTimestamp, "and data:", data) print("publishing network.bridge.sensor." + sensorType + " with metadata:", {"sensorID": sensorID, "deviceID": deviceID, "sensorType": sensorType}, "and timestamp:", dataTimestamp, "and data:", data)
else: else:
print("publishing network.bridge.sensor." + sensorType + "with metadata", {"sensorID": sensorID, "sensorType": sensorType}) print("publishing network.bridge.sensor." + sensorType + "with metadata", {"sensorID": sensorID, "deviceID": deviceID, "sensorType": sensorType})
# send data to waggle:
plugin.publish("network.bridge.sensor." + sensorType, data, meta={"sensorID": str(sensorID), "sensorType": str(sensorType)}, timestamp=dataTimestamp) plugin.publish("network.bridge.sensor." + sensorType, data, meta={"sensorID": str(sensorID), "deviceID": str(deviceID), "sensorType": str(sensorType)}, timestamp=dataTimestamp)
#time.sleep(1)
print("DONE")
def subdict(d, header): def subdict(d, header):
for k,v in d.items(): # coordinate mqtt topic names and waggle topics / sensor names
if isinstance(v, dict): global sensorcount
if header is not None: global devicecount
subdict(v, header + k + "/") for k,v in d.items():
else: if isinstance(v, dict):
subdict(v, k + "/") if header is not None:
else: subdict(v, header + k + "/") # recurse further
for sensor in v: else:
sublist.append((header + k + "/" + sensor, QOS_2)) subdict(v, k + "/")
#print (header + k + "/" + sensor) else:
for sensor in v:
sublist.append((header + k + "/" + sensor, QOS_2))
sensorlist.append((header + k + "/" + sensor, sensorcount))
sensorcount = sensorcount + 1
if (header + k + "/", devicecount - 1) not in devicelist:
devicelist.append((header + k + "/", devicecount))
devicecount = devicecount + 1
#print (((header + k + "/" + sensor, QOS_2), sensorcount))
def decoder(data): def decoder(data):
print (data) print (data)
@ -81,7 +113,9 @@ if __name__ == "__main__":
with open('subscribe.yaml', 'r') as fileread: with open('subscribe.yaml', 'r') as fileread:
clientconfig = yaml.safe_load(fileread) clientconfig = yaml.safe_load(fileread)
subdict(clientconfig, None) subdict(clientconfig, None)
print(sublist) print("sublist:", sublist)
print("sensors:", sensorlist)
print("devices:", devicelist)
formatter = "[%(asctime)s] :: %(levelname)s :: %(name)s :: %(message)s" formatter = "[%(asctime)s] :: %(levelname)s :: %(name)s :: %(message)s"
logging.basicConfig(level=logging.ERROR, format=formatter) logging.basicConfig(level=logging.ERROR, format=formatter)
@ -89,5 +123,5 @@ if __name__ == "__main__":
asyncio.get_event_loop().run_until_complete(uptime_coro()) asyncio.get_event_loop().run_until_complete(uptime_coro())
asyncio.get_event_loop().run_forever() asyncio.get_event_loop().run_forever()
publishData(1, "0", sensortypes[0], time.time_ns()) #publishData(1, "0", sensortypes[0], time.time_ns())

View File

@ -1,3 +1,3 @@
pywaggle[all] pywaggle[all]
amqtt==0.11.0b1 amqtt==0.10.0a3
pyyaml pyyaml

14
run.sh
View File

@ -1,9 +1,9 @@
#!/bin/bash #!/bin/bash
#export PYWAGGLE_LOG_DIR=test-run export PYWAGGLE_LOG_DIR=test-run
python main.py & python main.py
sleep 1 #sleep 1
python client.py #2&>/dev/null > /dev/null #python client.py #2&>/dev/null > /dev/null
sleep 5 #sleep 5
jobs -p #jobs -p
kill $(jobs -p) #kill $(jobs -p)

View File

@ -2,9 +2,13 @@ sensors:
pico1: pico1:
- "distance" - "distance"
- "uptime" - "uptime"
pico2:
cameras:
- "1"
- "2"
pi1: pi1:
- "humidity" - "camera"
- "uptime"
subnet2:
pi2:
- "microphone"
- "uptime"
pico1:
- "soil"
- "uptime"

File diff suppressed because one or more lines are too long