We continue with our project to monitor the state of window opening that will allow us to review our natural ventilation strategies in the classrooms. For this, in this project we are using a Raspberry Pi 4B Raspberry Pi 4B and an old USB Webcam to monitor the status of the windows by tracking ArUco markers.

In the previous blog we were preparing the environment, installing the operating system, a VNC server to work remotely with the Raspberry Pi in headless mode and compiling the OpenCV libraries for Python.

In this blog we try to process video in real time and get the window opening percentage by doing a perspective transformation using 4 ArUco markers as a reference.

 

Project Blogs
Window opening monitor with ArUco - Installing OpenCV on the Raspberry PI 4B
Window Opening Monitor with ArUco - Tracking window movements
Window Opening Monitor with ArUco - Multi-window driver 4x7 segment display
Window Opening Monitor with ArUco - Final device

Tracking window movements

Increasing frame rate with continuous processing

In our first attempts to do continuous image processing with openCV we have not obtained good results, we barely got 1.5 frames per second. That's enough for our purpose of window monitoring without displaying video in real time, but it's too poor for a cool demo. So let's spend some time improving the visualization and making a more cool demo.

Tracking sliding window movements with ArUco markers

At first we thought it was due to the PyPlot library, but analyzing the problem better we saw that the problem we had when capturing the image of our Webcam from openCV using the cv2.VideoCapture function from the main thread.

Capturing and decoding the image in the main thread is not a good idea.

 

First step: Video streaming threaded library

 

cv2.VideoCapture problem

VideoCapture.read() is a blocking operation. It has to grab a and decode frames from the Webcam and it is a time consuming operation.

We are using it in our main thread so we can move video capture to another thread to increase the frame rate.

Fortunately, there are already those who have had the same problem and have solved it.

We will use the imutils.video library and the VideoStream class

https://github.com/jrosebr1/imutils/blob/master/imutils/video/videostream.py

 

Second step : Simpler codes

So far we have been using ArUco 6x6 codes, but we don't need that many and size does matter here.

Image using ArUco6x6 codes and PyPlot without reordering the colours.

OpenCV uses BGR as its default colour order for images, matplotlib uses RGB.

 

ArUco 6x6 codes

 

 

Testing with simpler codes to increase processing speed.

We are going to use simpler codes instead of 6x6 ArUco codes will use 4x4 ArUco codes.

 

ArUco 4x4 codes

These are ids 0 to 49 from DICT_4X4_50

 

 

Third step: reducing image size

Our goal is not a quality image but to be able to identify ArUco codes quickly and in the worst possible conditions, with low lighting or backlighting.

Therefore we will reduce the image to an acceptable size to be able to process the ArUco codes with confidence.

 

frame = imutils.resize(frame, width=640)

 

 

Applying all the improvements

 

Python code

 

from imutils.video import VideoStream
import argparse
import imutils
import time
import cv2


def toInt(myTuple):
    return tuple(map(lambda i: int(i), myTuple))


arucoDict = cv2.aruco.Dictionary_get(cv2.aruco.DICT_4X4_50)
arucoParams = cv2.aruco.DetectorParameters_create()
vs = VideoStream(src=0).start()
# start time for fps information
start = time.time()
n =0 
# loop over the frames from the video stream
while True:
    # grab the frame from the threaded video stream and resize it
    # to have a maximum width of 640 pixels
    frame = vs.read()
    frame = imutils.resize(frame, width=640)
    # detect ArUco markers in the input frame
    (corners, ids, rejected) = cv2.aruco.detectMarkers(frame, arucoDict, parameters=arucoParams)
    # verify *at least* one ArUco marker was detected
    if len(corners) > 0:
        # flatten the ArUco IDs list
        ids = ids.flatten()
        # loop over the detected ArUCo corners
        for (markerCorner, markerID) in zip(corners, ids):
            # extract the marker corners (which are always returned
            # in top-left, top-right, bottom-right, and bottom-left
            # order)
            corners = markerCorner.reshape((4, 2))
            (topLeft, topRight, bottomRight, bottomLeft) = corners
            # convert each of the (x, y)-coordinate pairs to integers           
            tr = toInt(topRight)
            br = toInt(bottomRight)
            bl = toInt(bottomLeft)
            tl = toInt(topLeft)
            
            # draw the bounding box of the ArUCo detection
            cv2.line(frame, tl, tr, (0, 255, 0), 2)
            cv2.line(frame, tr, br, (0, 255, 0), 2)
            cv2.line(frame, br, bl, (0, 255, 0), 2)
            cv2.line(frame, bl, tl, (0, 255, 0), 2)
            # compute and draw the center (x, y)-coordinates of the
            # ArUco marker
            cX = int((tl[0] + br[0]) / 2.0)
            cY = int((tl[1] + br[1]) / 2.0)
            cv2.circle(frame, (cX, cY), 4, (0, 0, 255), -1)
            # draw the ArUco marker ID on the frame
            cv2.putText(frame, str(markerID), (tl[0], tl[1] - 15), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 2)
    # show the output frame
    cv2.imshow("Frame", frame)
    key = cv2.waitKey(1) & 0xFF
    # if the `q` key was pressed, break from the loop
    if key == ord("q"):
        break
    n = n + 1
    end = time.time()
    print("fps: {} ".format(int(n/(end-start))))
    # reset fps info each 5 seconds
    if (end-start > 5) :
        n = 0
        start = time.time()


# do a bit of cleanup
cv2.destroyAllWindows()
vs.stop()

 

Processing real time video

Here the results sizing the image to 640 pixels width and using 4x4 ArUco codes.

 

 

 

Planar homography

We want to detect the position of the moving parts of the windows using ArUco markers

Depending the alignment of the camera with the windows the images of the window are skewed and in wrong perspective.

 

We will define the following convention:

  • ID 0 : moving part in control
  • ID 1 : top left reference corner
  • ID 2 : top right reference corner
  • ID 3 : bottom right reference corner
  • ID 4 : bottom left reference corner

 

Venttracker Window Sensor With ArUco Markers

 

 

ArUco Detect Markers

cv2.aruco.detectMarkers method returns three values:

  1. corners: A list containing the (x, y)-coordinates of our detected ArUco markers
  2. ids: The ArUco IDs of the detected markers
  3. rejected: A list of potential markers that were found but ultimately rejected due to the inner code of the marker being unable to be parsed

 

Will use a planar homography to correct perspective and been able to calculate the opening percentage with the transform matrix.

 

transform = cv2.getPerspectiveTransform(rect, dst)

warp = cv2.warpPerspective(orig, transform , (maxWidth, maxHeight))

 

 

 

Python code

Disclaimer: The code is for demonstration purposes only, it does not take into account exceptions or markers other than the ones we have defined in the convention. In the next blog a version with greater exception control and more configurable.

 

from imutils.video import VideoStream
import imutils
import time
import cv2
import numpy as np

def order_points(pts):
    # initialzie a list of coordinates that will be ordered
    # such that the first entry in the list is the top-left,
    # the second entry is the top-right, the third is the
    # bottom-right, and the fourth is the bottom-left
    rect = np.zeros((4, 2), dtype = "float32")
    # the top-left point will have the smallest sum, whereas
    # the bottom-right point will have the largest sum
    s = pts.sum(axis = 1)
    rect[0] = pts[np.argmin(s)]
    rect[2] = pts[np.argmax(s)]
    # now, compute the difference between the points, the
    # top-right point will have the smallest difference,
    # whereas the bottom-left will have the largest difference
    diff = np.diff(pts, axis = 1)
    rect[1] = pts[np.argmin(diff)]
    rect[3] = pts[np.argmax(diff)]
    # return the ordered coordinates
    return rect

def four_point_transform(image, pts):
    # obtain a consistent order of the points and unpack them
    # individually
    rect = order_points(pts)
    (tl, tr, br, bl) = rect
    # compute the width of the new image, which will be the
    # maximum distance between bottom-right and bottom-left
    # x-coordiates or the top-right and top-left x-coordinates
    widthA = np.sqrt(((br[0] - bl[0]) ** 2) + ((br[1] - bl[1]) ** 2))
    widthB = np.sqrt(((tr[0] - tl[0]) ** 2) + ((tr[1] - tl[1]) ** 2))
    maxWidth = max(int(widthA), int(widthB))
    # compute the height of the new image, which will be the
    # maximum distance between the top-right and bottom-right
    # y-coordinates or the top-left and bottom-left y-coordinates
    heightA = np.sqrt(((tr[0] - br[0]) ** 2) + ((tr[1] - br[1]) ** 2))
    heightB = np.sqrt(((tl[0] - bl[0]) ** 2) + ((tl[1] - bl[1]) ** 2))
    maxHeight = max(int(heightA), int(heightB))
    # now that we have the dimensions of the new image, construct
    # the set of destination points to obtain a "birds eye view",
    # (i.e. top-down view) of the image, again specifying points
    # in the top-left, top-right, bottom-right, and bottom-left
    # order
    dst = np.array([
        [0, 0],
        [maxWidth - 1, 0],
        [maxWidth - 1, maxHeight - 1],
        [0, maxHeight - 1]], dtype = "float32")
    # compute the perspective transform matrix and then apply it
    M = cv2.getPerspectiveTransform(rect, dst)
    warped = cv2.warpPerspective(image, M, (maxWidth, maxHeight))
    # return the warped image
    return warped

arucoDict = cv2.aruco.Dictionary_get(cv2.aruco.DICT_4X4_50)
arucoParams = cv2.aruco.DetectorParameters_create()

vs = VideoStream(src=0).start()
time.sleep(2.0)
start = time.time()
n =0 

# loop over the frames from the video stream
while True:
    # grab the frame from the threaded video stream and resize it
    # to have a maximum width of 1000 pixels
    frame = vs.read()
    frame = imutils.resize(frame, width=600)
    # detect ArUco markers in the input frame
    (mcorners, ids, rejected) = cv2.aruco.detectMarkers(frame, arucoDict, parameters=arucoParams)
    # verify *at least* one ArUco marker was detected
    if len(mcorners) > 0:
        # flatten the ArUco IDs list
        ids = ids.flatten()
        # loop over the detected ArUCo corners
        for (markerCorner, markerID) in zip(mcorners, ids):
            # extract the marker corners (which are always returned
            # in top-left, top-right, bottom-right, and bottom-left
            # order)
            corners = markerCorner.reshape((4, 2))
            (topLeft, topRight, bottomRight, bottomLeft) = corners
            # convert each of the (x, y)-coordinate pairs to integers
            topRight = (int(topRight[0]), int(topRight[1]))
            bottomRight = (int(bottomRight[0]), int(bottomRight[1]))
            bottomLeft = (int(bottomLeft[0]), int(bottomLeft[1]))
            topLeft = (int(topLeft[0]), int(topLeft[1]))
            
            # draw the bounding box of the ArUCo detection
            cv2.line(frame, topLeft, topRight, (0, 255, 0), 2)
            cv2.line(frame, topRight, bottomRight, (0, 255, 0), 2)
            cv2.line(frame, bottomRight, bottomLeft, (0, 255, 0), 2)
            cv2.line(frame, bottomLeft, topLeft, (0, 255, 0), 2)
            # compute and draw the center (x, y)-coordinates of the
            # ArUco marker
            cX = int((topLeft[0] + bottomRight[0]) / 2.0)
            cY = int((topLeft[1] + bottomRight[1]) / 2.0)
            cv2.circle(frame, (cX, cY), 4, (0, 0, 255), -1)
            # draw the ArUco marker ID on the frame
            cv2.putText(frame, str(markerID), (topLeft[0], topLeft[1] - 15), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 2)
        if len(mcorners) > 4:
            print(ids)
            id1 = int( np.where(ids.flatten() == 1)[0])
            id2 = int( np.where(ids.flatten() == 2)[0])
            id3 = int( np.where(ids.flatten() == 3)[0])
            id4 = int( np.where(ids.flatten() == 4)[0])

            btl = mcorners[id1][0][0]
            btr = mcorners[id2][0][1]
            bbr = mcorners[id3][0][2]
            bbl = mcorners[id4][0][3]
            
            pts = np.array([btl, btr, bbr, bbl])
            # apply the four point tranform to obtain a "birds eye view" of the image
            warped = four_point_transform(frame, pts)
            cv2.imshow("Warped", warped)

    cv2.imshow("Frame", frame)
    # show the output frame

    key = cv2.waitKey(1) & 0xFF
    # if the `q` key was pressed, break from the loop
    if key == ord("q"):
        break
    n = n + 1
    end = time.time()
    print("\n0: ")
    print(n/(end-start))
# do a bit of cleanup
cv2.destroyAllWindows()
vs.stop()

 

 

Next steps

 

  • View the opening percentage on a small display.
  • Monitor the status of the windows through cloud services.
  • Define the convention for the simultaneous monitoring of several windows.
  • Try other camera models.

 

Project Blogs
Window opening monitor with ArUco - Installing OpenCV on the Raspberry PI 4B
Window Opening Monitor with ArUco - Tracking window movements
Window Opening Monitor with ArUco - Multi-window driver 4x7 segment display
Window Opening Monitor with ArUco - Final device

 

 

 

PreviousNext
Window opening monitor with ArUco - Installing OpenCV on the Raspberry PI 4BWindow Opening Monitor with ArUco - Multi-window driver 4x7 segment display