#!/usr/bin/python import logging import asyncio import numpy as np import cv2 as cv from amqtt.client import MQTTClient from amqtt.mqtt.constants import QOS_0, QOS_1, QOS_2 import time import json import yaml import base64 logger = logging.getLogger(__name__) def encoder(inputdict): return json.dumps(inputdict).encode('utf-8') def get_uptime(): with open('/proc/uptime', 'r') as f: uptime_seconds = float(f.readline().split()[0]) print(uptime_seconds) return uptime_seconds def get_frame(): cap = cv.VideoCapture(0) if not cap.isOpened(): print("Cannot open camera") exit() ret, frame = cap.read() gray = cv.cvtColor(frame, cv.COLOR_BGR2GRAY) frame = gray if not ret: print("Can't receive frame (stream end?). Exiting ...") print(len(base64.b64encode(frame))) height, width = frame.shape[:2] print(height, width) return base64.b64encode(frame).decode('utf-8') async def test_coro(): C = MQTTClient() print("START" + str(get_uptime())) await C.connect('mqtt://100.79.112.65:1883') tasks = [ asyncio.ensure_future(C.publish('sensors/pi1/camera', encoder({"data": str(get_frame()), "sensorID": 0, "timestamp": time.time_ns()}), qos=QOS_2)), asyncio.ensure_future(C.publish('sensors/pi1/uptime', encoder({"data": get_uptime(), "sensorID": 0, "timestamp": time.time_ns()}), qos=QOS_2)), ] await asyncio.wait(tasks) logger.info("messages published") await C.disconnect() print("END" + str(get_uptime())) if __name__ == '__main__': formatter = "[%(asctime)s] %(name)s {%(filename)s:%(lineno)d} %(levelname)s - %(message)s" logging.basicConfig(level=logging.DEBUG, format=formatter) with open('client.yaml', 'r') as fileread: clientconfig = yaml.safe_load(fileread) print(clientconfig) while True: asyncio.get_event_loop().run_until_complete(test_coro()) #asyncio.get_event_loop().run_until_complete(test_coro2())