Photography                                               Submit an EntrySubmit an Entry

 

Monthly Themes   | Monthly Poll   |  Back to homepage

 

We finished our device to monitor the opening of windows. Our device allows to monitor up to 4 windows simultaneously. From its 4-digit 7-segment display, it allows you to monitor the reference markers of each of the four windows. And periodically the status of the four windows is published in the Ubidots cloud, allowing remote monitoring from the Venttracker dashboard.

 

Project Blogs
Window opening monitor with ArUco - Installing OpenCV on the Raspberry PI 4B
Window Opening Monitor with ArUco - Tracking window movements
Window Opening Monitor with ArUco - Multi-window driver 4x7 segment display
Window Opening Monitor with ArUco - Final device

 

Cloud connected window opening monitor

 

Venttracker Window Monitor with OpenCV - 4x7 Display and Dashboard

 

Four windows fully detected. Two closed (0% opening) and two open (30% and 4%)

 

How to build the 4x7 display, BOM and schematic: Window Opening Monitor with ArUco - Multi-window driver 4x7 segment display

 

Venttracker Window Monitor with OpenCV - 4x7 Display, Dashboard Testing Solution

Video demo

 

 

 

Connecting the device to the cloud

Ubidots

 

We'll be using Ubidots services with Ubidots free plan for STEM. This plan has some limits

  • First 3 devices free
  • Variables: Up to 10 variables per device.
  • Data Ingestion: 4,000 dots per day across all of your devices.
  • Data Extraction: 500,000 dots per day across all of your account
  • Data Rate: 1 request per second, across all of your devices.
  • Data Retention: 1 month.

 

We are using a 5 second refresh rate for the demos which exceeds the daily free dots cuota.

 

 

 

Creating our virtual paper windows for testing

 

Print and cut:

Venttracker Window Monitor with OpenCV - Paper windows mockups for testing

 

Window ArUco Markers IDs:

 

WindowTop LeftTop RightBottom RightBottom LeftWindow Tracker
WINDOW 112340
WINDOW 21112131410
WINDOW 32122232420
WINDOW 43132333430

 

Using the paper window mock-ups to simulate 4 sliding windows

 

The different window parts are attached to the refrigerator door by magnets.

 

Venttracker Window Monitor with OpenCV - Paper Window Mock-ups

Raspberry Pi used for stop motion.

 

Python code

 

Class Diagram

 

 

You can inject in the WindowDetectector your own custom Display, Cloud Publisher or Percentage calculator object as needed.

 

Github repository

 

https://github.com/javagoza/venttracker/tree/main/wom/python

 

Ubidots Publisher Python Class code

 

Python class for publishing the data to the cloud:

Usage:

    cloudPublisher = UbidotsPublisher()

    cloudPublisher.publish(35, 48, 0, 23)

 

or with your own credentials

  cloudPublisher = UbidotsPublisher(myCredentials)

 

#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""wom_ubidots_publisher.py: Sends location and Windows state to Ubidots Cloud
"""
__author__ = "Enrique Albertos"

# Venttracker WOM Windows Opening Monitor creation on Ubidots
import time
import requests
import json
import random

class UbidotsCredentials() :
    TOKEN = "BBFF-UjO2Hr5GCz8mx2WA9M1g0WAwUNU73N"  # Put your TOKEN here
    DEVICE_LABEL = "Venttracker_WOM01"  # Put your device label here
    
    def __init__(self, token = TOKEN, deviceLabel = DEVICE_LABEL):
        self.__token = token
        self.__deviceLabel = deviceLabel
        
    def getToken(self) :
        return self.__token
    
    def getDeviceLabel(self) :
        return self.__deviceLabel

class UbidotsPublisher() :
    
    __WINDOWS_NUMBER_LABEL = "windows_no"  # Number of windows detected
    __UBIDOTS_URL = "http://industrial.api.ubidots.com"
    __UBIDOTS_SERVICE_ADDRESS = "{}/api/v1.6/devices/{}"
    __GEOLOCATE_URL = 'https://extreme-ip-lookup.com/json/'
    __HTTP_400_BAD_REQUEST = 400
    __POSITION_LABEL = "position"  # Number of windows detected
    __WOM_SERIAL_LABEL = "serial_id"
    __W01_OPENING_PCT_LABEL = "w01_opening_pct"  # % opening window 1
    __W02_OPENING_PCT_LABEL = "w02_opening_pct" # % opening window 2
    __W03_OPENING_PCT_LABEL = "w03_opening_pct"  # % opening window 3
    __W04_OPENING_PCT_LABEL = "w04_opening_pct"  # % opening window 4
    __W05_OPENING_PCT_LABEL = "w05_opening_pct"  # % opening window 5
    __INFO_ATTEMPING_TO_SEND_DATA = "[INFO] Attemping to send data."
    __INFO_FINISHED = "[INFO] finished."
    __INFO_PAYLOAD = "[INFO] {}."
    __INFO_REQUEST_UPDATED = "[INFO] request made properly, your device is updated."
    __ERROR_FIVE_ATTEMPTS = "[ERROR] Could not send data after 5 attempts, please check your token credentials and internet connection."
    __REQUEST_ATTEMPTS = 5
    __WINDOWS_NUMBER = 4
    __WINDOWS_LABELS = [__W01_OPENING_PCT_LABEL, __W02_OPENING_PCT_LABEL, \
                      __W03_OPENING_PCT_LABEL, __W04_OPENING_PCT_LABEL]
    
    def __init__(self, credentials = UbidotsCredentials(), debug = False):
        self.__credentials = credentials
        self.__location = self.__geolocate()    
        self.__serial = self.__get_serial()
        self.__debug = debug # True activate debug print
        

    # get latitude and longitude from IP
    def __geolocate(self) :
        url = UbidotsPublisher.__GEOLOCATE_URL
        r = requests.get(url)
        data = json.loads(r.content.decode())
        return {'lat' : data['lat'],'lng': data['lon']}

    # get raspberry pi seriial as unique identifier
    def __get_serial(self):
        # Extract serial from cpuinfo file
        cpuserial = "0000000000000000"
        try:
            f = open('/proc/cpuinfo','r')
            for line in f:
                if line[0:6]=='Serial':
                    cpuserial = line[10:26]
            f.close()
        except:
            cpuserial = "ERROR000000000"
        return cpuserial

    # build payload dictionary
    def __build_payload(self, windowData, location, serial):
        
        payload = {UbidotsPublisher.__POSITION_LABEL:
             {"value": "1",
              "context": {"lat": location['lat'],
                          "lng": location['lng'],
                          UbidotsPublisher.__WOM_SERIAL_LABEL : serial }},        
              UbidotsPublisher.__WINDOWS_NUMBER_LABEL: UbidotsPublisher.__WINDOWS_NUMBER}  
        
        for i in range(0, UbidotsPublisher.__WINDOWS_NUMBER): 
            payload.update({self.__WINDOWS_LABELS[i]: windowData[i]})
            
        payload.update(
            {UbidotsPublisher.__POSITION_LABEL:
             {"value": "1",
              "context": {"lat": location['lat'],
                          "lng": location['lng'],
                          UbidotsPublisher.__WOM_SERIAL_LABEL : serial }}})

        if self.__debug:
            print(UbidotsPublisher.__INFO_PAYLOAD.format(payload))

        return payload

    # post request to Ubidots
    def __post_request(self, payload):
        # Creates the headers for the HTTP requests
        url = UbidotsPublisher.__UBIDOTS_URL
        url = UbidotsPublisher.__UBIDOTS_SERVICE_ADDRESS.format(url, self.__credentials.getDeviceLabel())
        headers = {"X-Auth-Token": self.__credentials.getToken(), "Content-Type": "application/json"}

        # Makes the HTTP requests
        status = UbidotsPublisher.__HTTP_400_BAD_REQUEST
        attempts = 0
        while status >= UbidotsPublisher.__HTTP_400_BAD_REQUEST and attempts <= UbidotsPublisher.__REQUEST_ATTEMPTS:
            req = requests.post(url=url, headers=headers, json=payload)
            status = req.status_code
            attempts += 1
            time.sleep(1)

        # Processes results
        if self.__debug :
            print(req.status_code, req.json())
        if status >= UbidotsPublisher.__HTTP_400_BAD_REQUEST:
            if self.__debug :
                print(UbidotsPublisher.__ERROR_FIVE_ATTEMPTS)
            return False

        if self.__debug :
            print(UbidotsPublisher.__INFO_REQUEST_UPDATED)
        return True

    def publish(self, windowData) :
        payload = self.__build_payload(windowData, self.__location, self.__serial)

        if self.__debug :
            print(UbidotsPublisher.__INFO_ATTEMPING_TO_SEND_DATA)
        self.__post_request(payload)
        if self.__debug :
            print(UbidotsPublisher.__INFO_FINISHED)
        

 

 

Payload with geolocation and windows state

 

{
    'position': {
        'value': '1',
        'context': {
            'lat': '40.41902',
            'lng': '-2.92256',
            'serial_id': '10000000055eba52'
        }
    },
    'windows_no': 4,
    'w01_opening_pct': 87,
    'w02_opening_pct': 26,
    'w03_opening_pct': 71,
    'w04_opening_pct': 18
}

 

 

Ubidots Response. Request made properly and device updated

 

200
 {
    'position': [{
            'status_code': 201
        }
    ],
    'w01_opening_pct': [{
            'status_code': 201
        }
    ],
    'w02_opening_pct': [{
            'status_code': 201
        }
    ],
    'w03_opening_pct': [{
            'status_code': 201
        }
    ],
    'w04_opening_pct': [{
            'status_code': 201
        }
    ],
    'windows_no': [{
            'status_code': 201
        }
    ]
}

 

Window Opening Percentage Calculator Class Python Code

 

This class computes the opening percentage of a window given the reference markers and ids and the window tracker marker.

Usage:

 

    calculator = PercentageCalculator()

    percentage = calculator.calculate( arrayOfMarkers)  # top left, top right, bottom right, bottom left, window tracker

 

The percentage calculator takes the reference markers of the window in coordinates as gabbed from the original image, creates a transformation matrix to correct perspective and then calculates the position of the tracker marker in percentage.

 

#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""wom_percentage_calculator.py:
    calculates the opening percentage after computing a perspective transformation
"""
__author__ = "Enrique Albertos"

import cv2
import numpy as np

class PercentageCalculator:    
    
    def __orderPoints(self, pts):
        # initialzie a list of coordinates that will be ordered
        # such that the first entry in the list is the top-left,
        # the second entry is the top-right, the third is the
        # bottom-right, and the fourth is the bottom-left
        rect = np.zeros((4, 2), dtype = "float32")
        # the top-left point will have the smallest sum, whereas
        # the bottom-right point will have the largest sum
        s = pts.sum(axis = 1)
        rect[0] = pts[np.argmin(s)]
        rect[2] = pts[np.argmax(s)]
        # now, compute the difference between the points, the
        # top-right point will have the smallest difference,
        # whereas the bottom-left will have the largest difference
        diff = np.diff(pts, axis = 1)
        rect[1] = pts[np.argmin(diff)]
        rect[3] = pts[np.argmax(diff)]
        # return the ordered coordinates
        return rect   
    
    def __getOpeningPercentage(self, trackerPoint, pts):
        try: 
            # obtain a consistent order of the points and unpack them
            # individually
            rect = self.__orderPoints(pts)
            (tl, tr, br, bl) = rect
            # compute the width of the new image, which will be the
            # maximum distance between bottom-right and bottom-left
            # x-coordiates or the top-right and top-left x-coordinates
            widthA = np.sqrt(((br[0] - bl[0]) ** 2) + ((br[1] - bl[1]) ** 2))
            widthB = np.sqrt(((tr[0] - tl[0]) ** 2) + ((tr[1] - tl[1]) ** 2))
            maxWidth = max(int(widthA), int(widthB))
            if int(maxWidth) == 0 :
                return 0
            # compute the height of the new image, which will be the
            # maximum distance between the top-right and bottom-right
            # y-coordinates or the top-left and bottom-left y-coordinates
            heightA = np.sqrt(((tr[0] - br[0]) ** 2) + ((tr[1] - br[1]) ** 2))
            heightB = np.sqrt(((tl[0] - bl[0]) ** 2) + ((tl[1] - bl[1]) ** 2))
            maxHeight = max(int(heightA), int(heightB))
            # the set of destination points to obtain a "birds eye view",
            dst = np.array([[0, 0], [maxWidth - 1, 0],
                [maxWidth - 1, maxHeight - 1], [0, maxHeight - 1]], dtype = "float32")
            # compute the perspective transform matrix
            M = cv2.getPerspectiveTransform(rect, dst)
            # transform the tracker point
            trackerTransform = np.matmul(M, np.array([[trackerPoint[0]],[trackerPoint[1]], [1]]))
            return int((trackerTransform[0] / trackerTransform[2]) / maxWidth * 100)
        except :
            return 0

        
    def calculate(self, mcorners) :
        # calculates the opening percentage given an ordered list of corners and a tracker
        # top left, top right, bottom right, bottom left, window tracker
        return self.__getOpeningPercentage(
            mcorners[4][0], # tracker point  
            np.array([
                mcorners[0][0], # ref. rec top left corner,
                mcorners[1][1], # ref. rec top right corner
                mcorners[2][2], # ref. rec bottom right corner
                mcorners[3][3]  # ref. rec bottom left corner
                ]))


 

 

Display4x7 Class Python Code

 

This class drives a 4x7 segment display using an SN74HC595 shift register clocked by spi clock  and 4 digital lines to switch digits. Works in its own thread

See: Window Opening Monitor with ArUco - Multi-window driver 4x7 segment display

 

Usage:

          display = Display4x7()

          display.start()

          display.displayWindowCorners([[True, True, True, True, True], [False, False, False, False, False], [False, False, False, False, False], [False, False, False, False, False]])

 

#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""wom_display_4x7_spi.py: 4 digits x7  segment display
   drives a 4x7 segment display using an SN74HC595 shift register clocked by spi clock
   and 4 digital lines to switch digits. Works in its own thread
"""
__author__ = "Enrique Albertos"
__license__ = "GPL"

import RPi.GPIO as GPIO
import sys
import time
import threading
from threading import Thread
import spidev
import atexit

class Display4x7(threading.Thread):
    
    # PIN definitions GPIO.BCM
    # Connect to 74HC595 8-bit serial-in, parallel-out shift 
    __bus  = 0  # MOSI GPIO 10 (PIN 21) - 74HC595 pin 14 DS
              # SCLK GPIO 11 - 74HC595 pin 11 SHCP
    __device = 0
    __spiSpeedDefault = 3900000
    __latchPinDefault = 25 # GPIO 8 (CEO) 74HC595 pin 12 STCP


    # HS42056 1K-32 digit selection
    __digit0PinDefault   = 14 # 7-Segment pin D4
    __digit1PinDefault   = 15 # 7-Segment pin D3
    __digit2PinDefault   = 18 # 7-Segment pin D2
    __digit3PinDefault   = 23 # 7-Segment pin D1

    MARKERS = ( 0x03,  # Top Left
                0x05,  # Top Right
                0x50,  # Bottom Right
                0x18,  # Bottom Left
                0x80,  # Center
                0x00   # blank
                )

    HEX_DIGITS = (0x5F, # = 0  
                 0x44,  # = 1  
                 0x9D,  # = 2  
                 0xD5,  # = 3
                 0xC6,  # = 4
                 0xD3,  # = 5
                 0xDB,  # = 6
                 0x45,  # = 7
                 0xDF,  # = 8
                 0xC7,  # = 9
                 0xCF,  # = A
                 0xDA,  # = b
                 0x1B,  # = C
                 0xDC,  # = d
                 0x9B,  # = E
                 0x8B,  # = F
                 0x00   # blank
                 )    
    
    def __init__(self, initialContent = (0,0,0,0), bus=0, device=0, digit0 = __digit0PinDefault, digit1 = __digit1PinDefault, digit2 = __digit2PinDefault, digit3 = __digit3PinDefault,  latchPin = __latchPinDefault, speedHz = __spiSpeedDefault):
        self.__displayContent = initialContent
        self.__latchPin = latchPin
        self.__digit3 = digit3
        self.__digit2 = digit2
        self.__digit1 = digit1
        self.__digit0 = digit0
        self.__shifRegisterPins = (latchPin)
        self.__controlDigitsPins = ( digit3, digit2, digit1, digit0 )
        self.__lock = threading.Lock()
        self.__bus = bus
        self.__device = device
        self.__speedHz = speedHz
        atexit.register(self.cleanup)
        self.__setup()
        threading.Thread.__init__(self)          
    
    def __initPinsAsOutputs(self, pins) :
        for pin in pins:
            GPIO.setup(pin, GPIO.OUT, initial = GPIO.LOW)
            
    def __lowPins(self, pins) :
        for pin in pins:
            GPIO.output(pin, GPIO.LOW)           
            
    def __setup(self):        
        GPIO.setwarnings(False)
        GPIO.setmode(GPIO.BCM)
        # init display control digits pins
        self.__initPinsAsOutputs(self.__controlDigitsPins)        
        # init serial shit register pins
        GPIO.setup(self.__latchPin, GPIO.OUT, initial = GPIO.LOW)
        
        self.__spiDisplay= spidev.SpiDev()
        self.__spiDisplay.open(self.__bus,self.__device)
        self.__spiDisplay.max_speed_hz = self.__speedHz
        self.__spiDisplay.mode = 0
        self.__spiDisplay.bits_per_word = 8
        self.__spiDisplay.no_cs = True        
        
    def __shiftout(self, byte):        
        GPIO.output(self.__latchPin, 1)
        time.sleep(0.00000005)
        GPIO.output(self.__latchPin, 0)
        self.__spiDisplay.xfer([byte])
        GPIO.output(self.__latchPin, 1)
        time.sleep(0.00000005)
        GPIO.output(self.__latchPin, 0)
        
    def run(self):
        # overrides thread run
        while True:
            i=0
            for pin in self.__controlDigitsPins:
                self.__lowPins(self.__controlDigitsPins)    
                with self.__lock:
                    self.__shiftout(self.__displayContent[i])
                GPIO.output(pin, GPIO.HIGH)
                time.sleep(0.00000001)
                i=i+1
               
    def display(self, displayContent = (0,0,0,0)) :
        with self.__lock:
            self.__displayContent = displayContent
    
    def displayInt(self, number = 0) :
        self.display((self.HEX_DIGITS[(number // 1000)%10], self.HEX_DIGITS[(number // 100)%10],self.HEX_DIGITS[(number // 10)%10],self.HEX_DIGITS[number %10]))

    def displayWindowCorners(self, iterable) :
        content = [0,0,0,0]
        digit=0
        for element in iterable:
            for i in range(5) :
                if element[i]:
                    content[digit] |= Display4x7.MARKERS[i]
            digit = digit + 1
        self.display(content)
    
    def __enter__(self) :
        return self
    
    def __exit__(self, exc_type, exc_value, traceback) :
        self.cleanUp()
        
        
    def cleanup() :
        self.__dislay.closeSPI(self.spiDevice)
        GPIO.cleanup()
    


        



 

 

WindowDetector Class. Python Code

 

Window detector. Detects up to 4 windows marked with 5 ArUco markers each

Results are sent to a 4x7 Led Display and published to Ubidots Cloud

 

Usage:

        windowDetector = WindowDetector()

        windowDetector.start()

 

Video images are captured in its own thread.

Last viewed marker positons are buffered.

Condition of marker detected uses a low pass filter using a deque that stores the 40 last states and compute as an or over the last 40 values.

This prevents the positions of the markers from being lost when one of the markers is momentarily covered over.

 

#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""wom_window_detector.py: Window detector. Detects up to 4 windows
   marked with 5 ArUco markers each
   Results are sent to a 4x7 Led Display
"""
__author__ = "Enrique Albertos"
__license__ = "GPL"

from imutils.video import VideoStream
import imutils
import time
import cv2
import numpy as np
from collections import deque
from _functools import reduce
from wom_display_4x7_spi  import Display4x7
import atexit

class WindowDetector() :
    __WINDOW1_MARKERS = ( 1,  2,  3,  4,  0)
    __WINDOW2_MARKERS = (11, 12, 13, 14, 10)
    __WINDOW3_MARKERS = (21, 22, 23, 24, 20)
    __WINDOW4_MARKERS = (31, 32, 33, 34, 30)
    __WINDOW_MARKERS = (__WINDOW1_MARKERS, __WINDOW2_MARKERS, __WINDOW3_MARKERS, __WINDOW4_MARKERS)
    __NO_MARKER_DETECTED = (False,False,False,False,False) 
    __NO_WINDOW_DETECTED = (__NO_MARKER_DETECTED,__NO_MARKER_DETECTED,__NO_MARKER_DETECTED,__NO_MARKER_DETECTED)
    __BUFFER_LENGTH = 40
    __FRAME_RATE = 4
    __IMAGE_SIZE = 1200
    
    def __init__(self, display = Display4x7()):
        self.display = display
        atexit.register(self.cleanup)
    
    def __movingDetector (self, iterable):
        # iterates the buffer deque and ors the lists of booleans
        return (reduce(lambda x, y: np.bitwise_or(list(x),list(y)), iterable)).tolist()
    
    def __markersInWindow(self, windowMarkers, ids) :
        # creates a tuple of booleans correspondig to the detection of the window markers
        # top left corner, top right corner, bottom right corner, left right corner, moving part
        list = []
        for element in windowMarkers :
            list.append(element in ids)
        return tuple(list)
    
    def __markersIn(self, windowMarkers, ids) :
        # creates a tuple of tuples for the different markers found in window 
        list = []
        for window in windowMarkers :
            list.append(self.__markersInWindow(window, ids))
        return tuple(list)

    def start(self):        
        # starts the detector, grab images and display markers found
        detectorBuffer = deque((), maxlen= WindowDetector.__BUFFER_LENGTH)
        detectorBuffer.append(WindowDetector.__NO_WINDOW_DETECTED)

        self.display.start()
        arucoDict = cv2.aruco.Dictionary_get(cv2.aruco.DICT_4X4_50)
        arucoParams = cv2.aruco.DetectorParameters_create()

        vs = VideoStream(src=0, framerate=WindowDetector.__FRAME_RATE).start()
        # loop over the frames from the video stream
        while True:
            # grab the frame from the threaded video stream and resize it
            frame = vs.read()
            frame = imutils.resize(frame, width=WindowDetector.__IMAGE_SIZE)
            # detect ArUco markers in the input frame
            (mcorners, ids, rejected) = cv2.aruco.detectMarkers(frame, arucoDict, parameters=arucoParams)
            # verify *at least* one ArUco marker was detected
            if len(mcorners) > 0:
                flatid = ids.flatten();
                if len(detectorBuffer) >= WindowDetector.__BUFFER_LENGTH:
                   detectorBuffer.popleft()                  
                detectorBuffer.append( self.__markersIn(self.__WINDOW_MARKERS, flatid))
            else:
                detectorBuffer.append(self.__NO_WINDOW_DETECTED)                
            self.display.displayWindowCorners(self.__movingDetector(detectorBuffer))
            
    def __enter__(self) :
        return self
    
    def __exit__(self, exc_type, exc_value, traceback) :
        self.cleanUp()
        
        
    def cleanup() :
        GPIO.cleanup()
        cv2.destroyAllWindows()
        vs.stop()


 

Conclusions

We have made a design that allows remote monitoring of non-automated windows in a simple and inexpensive way.

A single device can control multiple windows. Up to 4 additional cameras can be added, which would allow us to control 20 windows with a single device.

Some uses for the device:

  • monitor building energy performance
  • natural ventilation habits monitor and enforcement

 

 

Project Blogs
Window opening monitor with ArUco - Installing OpenCV on the Raspberry PI 4B
Window Opening Monitor with ArUco - Tracking window movements
Window Opening Monitor with ArUco - Multi-window driver 4x7 segment display
Window Opening Monitor with ArUco - Final device