In my previous entry, I demonstrated how the Debug probe revealed the Request and Response data flowing to and from the RSL10-Sense BTLE device and a python script that I wrote to visualize the services and characteristics available. In this entry, I'm going to show you how to request sensors data with Python and how I effectively read the Beacon response.

Before we start, it is important to understand how the Requests (cyan) and Responses (purple) are structured - example below:

Custom Service Firmware - Request-Response

 

Custom Service Request

Using the feedback from the Debug Probe and and with some digging at the RSL10 example firmware, we can see that the command requested (cyan color) is segmented in 3 parts; separated by the '/' (slash ASCII character) .

 

Token

What seems to be a random ASCII character between '0' and '~' (0x30 to 0x7E)

Customer Service Firmware - Token

Node

Name of the sensor that will provide the data. e.g.: AL (Ambient Light) PB (Push Button), EV (Environmental - Temp, Humidity, Pressure, Air Quality)

Customer Service Firmware - Node

Handler

Defines the property or in other words, what kind of measurement is requested to the sensor.

Customer Service Firmware - Handler

e.g. For the "EV" (Environmental Sensor) these are some of the options

  • T: Temperature in Celcius
  • TF: Temperature in Fahrenheit
  • H: Humidity

 

Custom Service Response

Once more, we will take a look at the debug probe feedback (purple color) to see how the response packet "generally" looks:

 

Token

The RSL10 device will respond with the same token that was sent in the request; an ASCII character between '0' and '~' (0x30 to 0x7E)

 

Data Type

When returning numbers, the RSL10 will respond with the following:

  • f: when the value of the sensor is a float (or contains decimals)
  • i: for integer values

Sensor Value

And finally the value of the sensor

It is important to mention that not all the sensors will return this kind of packet but you get a general idea of how it may work with most sensors.

 

Collecting data from the Beacon with Python

To collect data from the RSL10 Beacon we will need to do the following (how to retrieve these details was explained in my previous blog):

  • Connect to the BTLE device (the RSL10 Beacon)
  • Write a request to the Characteristic with WRITE property (described as 'RX_VALUE - Command for BDK') which is Identified with the Handle 0x0010

  • If successful, read the Beacon response from the Characteristic identified with the Handle 0x000b (described as 'TX_VALUE - Response from BDK')
  • Disconnect or repeat steps 2 and 3

 

Source code

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# Luis Ortiz - luislab.com
# March 8, 2020


from bluepy import btle
import binascii
import subprocess
import re


import sys


from textcolor import textColor


import time
from datetime import datetime


class console_log:
  def __init__(self, debug = False, ansi = False):
    self.debug = debug
    self.ansi = ansi


  def Sys_Info(self, text:str):
    if self.debug:
      if not self.ansi:
        text = re.sub('(\x9B|\x1B\[)[0-?]*[ -\/]*[@-~]', '', text);
      print (text)


def elapsed_time(elapsed):
  s, ms = divmod(elapsed, 1000)
  m, s = divmod(s, 60)
  h, m = divmod(m, 60)


  time_str = ""
  if (h > 0):
    time_str = "%dh " % h
  if (m > 0):
    time_str += "%dm " % m
  if (s > 0):
    time_str += "%ds " % s
  if (ms > 0):
    time_str += "%dms " % ms


  return time_str.strip()


class beacon:
  def __init__(self, macAddr: str):
    self.mac = macAddr
    self.minOrd = ord('0')
    self.maxOrd = ord('~')
    self.token_limit = (self.maxOrd - self.minOrd) + 1  # CS.c    if (request.token[0] < '0' || request.token[0] > '~')
    self.rToken = 0                                     # Request token
    self.color = textColor()
    self.cl = console_log(True, True)


    color = self.color
    cl    = self.cl


    cl.Sys_Info ("\nConnecting...")


    self.p = btle.Peripheral(macAddr, btle.ADDR_TYPE_PUBLIC)
    cl.Sys_Info ("\nConnected [" + color.green(self.p.addr) + "]")


    for gs in self.p.getServices():
      for gc in gs.getCharacteristics():
        if gc.supportsRead() and gc.uuid.getCommonName() == str(gc.uuid) and 'NOTIFY' in gc.propertiesToString():
          self.rxC = gc
          cl.Sys_Info('__init__ rxC %s' % self.rxC)
        elif gc.uuid.getCommonName() == str(gc.uuid) and 'WRITE' in gc.propertiesToString():
          self.txC = gc
          cl.Sys_Info('__init__ txC %s' % self.txC)
        elif gc.uuid == btle.AssignedNumbers.batteryLevel and 'NOTIFY' in gc.propertiesToString():
          self.battC = gc
          cl.Sys_Info('__init__ battC %s' % self.battC)
      
  def disconnect(self):
    cl = self.cl
    self.p.disconnect()
    cl.Sys_Info("\nDisconnected")


  def getBatteryLevel(self):
    color = self.color
    cl = self.cl


    try:
      bValue = self.battC.read()
      sValue = binascii.b2a_hex(bValue).decode('utf-8')


      cl.Sys_Info(("\nBattery Level: " + color.cyan("0x%s") + \
                   " (" + color.yellow("%d%%") + ")") \
                  % (sValue, int(sValue, 16)))


      return sValue


    except btle.BTLEException as exc:
      cl.Sys_Info(exc)
      return None
    


  def readNode(self, node:str, prop:str):
    color = self.color
    cl = self.cl


    try:
      self.rToken = (self.rToken + 1) % self.token_limit
      cl.Sys_Info("\nrToken %d" % self.rToken)


      sReq = chr(self.rToken + self.minOrd) + '/' + node + '/' + prop
      bReq = sReq.encode('utf-8')


      cl.Sys_Info("Requested %s" % color.cyan('\'' + sReq + '\''))


      startTime = time.time()
      self.p.writeCharacteristic(0x0010, bReq, False)


      success = self.p.waitForNotifications(2)
      endTime = time.time()


      if not success:
        cl.Sys_Info("Request timeout.")
      else:
        cl.Sys_Info("Notification received in %.2f seconds"  % (endTime - startTime))
        bReq = self.rxC.read()
        sReq = bReq.decode('utf-8')
        cl.Sys_Info ("Read sReq %s" % color.pink(sReq))
        result = re.match('(.{1}\/)(.?)(\/)([0-9\.]+$)', sReq)
        if result:
          vType = result.group(2)
          value = result.group(4)
          return vType, value
        else:
          cl.Sys_Info ("No \"re\" match")


      return None, None


    except ValueError:
      return None, None
    except btle.BTLEException as exc:
      cl.Sys_Info (exc)
      return None, None


  def getNodeValue(self, node:str, prop:str=''):       # node (sensor), property
    color = self.color
    cl = self.cl


    if node == 'BL':    # Battery Level
      batteryLevel = int(self.getBatteryLevel(), 16)
      
      return batteryLevel
    elif node == 'EV':  # Environmental sensor (Temp, Humidity, Raw Pressure, Indoor Air Quality) 
      if prop in ('T', 'TF', 'H', 'PP', 'P', 'A'):      # Temp C
        valueType, nodeStrValue = self.readNode(node, prop)


        if nodeStrValue.replace('.', '1').isdigit():
          nodeValue = float(nodeStrValue)
          if prop in ('T'):
            tempC = nodeValue
            cl.Sys_Info (("valueType: %s, temp: " + color.yellow("%2.1f°C")) \
                         % (color.purple("\'" + valueType + "\'"), tempC))
            return tempC
          elif prop in ('TF'):
            tempF = nodeValue
            cl.Sys_Info (("valueType: %s, temp: " + color.yellow("%2.1f°F")) \
                         % (color.purple("\'" + valueType + "\'"), tempF))
            return tempF
          elif prop in ('H'):
            humidity = nodeValue
            cl.Sys_Info (("valueType: %s, humidity: " + color.yellow("%2.f%%")) \
                         % (color.purple("\'" + valueType + "\'"), humidity))
            return humidity
          else:
            cl.Sys_Info (("valueType: %s, value: " + color.yellow("%2.2f") + ", property: %s") \
                         % (color.purple("\'" + valueType + "\'"), nodeValue, color.cyan(prop)))
            return nodeValue
        else:
          cl.Sys_Info ("valueType: %s, nodeValue %s" \
                       % (color.purple("\'" + valueType + "\'"), color.purple("\'" + nodeStrValue + "\'")))
          return nodeStrValue
          
    elif node == 'AL':  # Light Sensor  
      if prop in ('L'):      # Lux
        valueType, nodeStrValue = self.readNode(node, prop)


        if nodeStrValue.replace('.', '1').isdigit():
          lux = float(nodeStrValue)
          cl.Sys_Info (("valueType: %s, illuminance: " + color.yellow("%2.2f lux")) % (color.purple("\'" + valueType + "\'"), lux))
          return lux
        else:
          cl.Sys_Info ("valueType: %s, nodeValue %s" % (color.purple("\'" + valueType + "\'"), color.purple("\'" + nodeStrValue + "\'")))
          return nodeStrValue


class beaconCSV:
  def __init__(self, fileName):
    self.fileName = fileName


    try:
      with open(self.fileName, mode='xt', encoding='utf-8') as f:
        f.write("\"Date\",\"TempC\",\"TempF\",\"Humidity\"\n")
    except FileExistsError:
      pass


  def write(self, tempC, tempF, humidity):
    dt_string = datetime.now().strftime('%Y%m%d %H:%M:%S')
    fileStr = "\"" + dt_string + "\",%2.1f" % tempC + ",%2.1f" % tempF + ",%2.f" % humidity
    cl.Sys_Info(fileStr)


    with open(self.fileName, mode='at', encoding='utf-8') as f:
      f.write(fileStr + "\n")


beacon1 = beacon("60:C0:BF:29:EF:50")
beacon1CSV = beaconCSV('beacon1.csv') 


cl = console_log(True, True)


try:
  while True:
    tempC = beacon1.getNodeValue('EV', 'T')
    tempF = tempC * 1.8 + 32
    humidity = beacon1.getNodeValue('EV', 'H')
    batteryLevel = beacon1.getNodeValue('BL')
    beacon1CSV.write(tempC, tempF, humidity)
    time.sleep(180)
  beacon1.disconnect()


except KeyboardInterrupt:
  beacon1.disconnect()
  print("Bye.")

 

Below is what the output looks like

RSL10 Beacon Test LuisLab

The data collected is also stored in a .CSV (example attached)

Temperature vs Humidity

 

Blogs in this series

  1. ON Sweet Home - Introduction
  2. ON Sweet Home - Getting to know your RSL10
  3. ON Sweet Home - Collecting the Beacon data
  4. ON Sweet Home - 3D printed parts
  5. ON Sweet Home - GUI is alive and the project is complete!