Sunday, 22 January 2017

Telemetry (Sense Hat)

I have recently started trying out my Sense Hat module for the Raspberry Pi; it has a number of sensors built in to play with and support for reading and writing data through a Python library. There are a number of libraries out there, but I chose this one as it allowed me to recycle some of my old code.

I mostly just recycled the database creation and writing code for SQLite, and I am storing the data as a blob (JSON string) so that I can continue revising my data objects without having to mess around with my database at all.

I'm planning on calling this and processing the data through a service I'm working on as well.

import logging
import sqlite3
import sys
import time
import json
from sense_hat import SenseHat
from re import findall
from subprocess import check_output
sense = SenseHat()
rounding = 2
logPath = '/home/pi/pyDev/SensorMgr/logs/'
sqlite_file = logPath + 'SensorData.db'
table_name = 'PiData'
FORMAT = '%(asctime)s : %(levelname)s : %(message)s'
logging.basicConfig(format=FORMAT, filename=logPath + 'debug.log', level=logging.DEBUG)
try: # capture sensor data
# system
core_temp = check_output(["vcgencmd", "measure_temp"])
core_temp = findall(b'\d+', core_temp)
core_temp = core_temp[0].decode("utf-8")
# time
gmTime = time.gmtime()
strTime = time.asctime(gmTime)
timeStamp = time.time()
# sense hat
humidity = round(sense.get_humidity(), rounding)
temp = round(sense.get_temperature(), rounding)
pressure = sense.get_pressure()
pressure = round(pressure / 10, rounding) # convert to kPa from millibars
orientation_rad = sense.get_orientation_radians()
# compass
sense.set_imu_config(True, False, False) # gyroscope only
north = round(sense.get_compass(), rounding)
except: # catch *all* exceptions
e = sys.exc_info()
print("{msg}".format(msg=e))
logging.debug(e[0], exc_info=True)
try:
# Open db connection
conn = sqlite3.connect(sqlite_file)
c = conn.cursor()
except: # catch *all* exceptions
e = sys.exc_info()
print("{msg}".format(msg=e))
logging.debug(e[0], exc_info=True)
# create data obj
myList = list()
element = ["CoreTemp", core_temp, "C"]
myList.append(element)
element = ["Humidity", humidity, "%rH"]
myList.append(element)
element = ["Temperature", temp, "C"]
myList.append(element)
element = ["Pressure", pressure, "kPa"]
myList.append(element)
element = ["Orient", orientation_rad, "rad"]
myList.append(element)
element = ["Compass", north, "Deg N"]
myList.append(element)
dataStr = json.dumps(myList)
print(dataStr)
time_col = 'Time'
date_col = 'DateTime'
data_col = 'Data'
try:
# Create output sql strings
sql_insert = "INSERT INTO {tn} ({tc}, {dc}, {dobc}) ".\
format(tn=table_name, tc=time_col, dc=date_col, dobc=data_col)
sql_values = "VALUES ({tc}, '{dc}', '{dobc}')".\
format(tc=timeStamp, dc=strTime, dobc=dataStr)
sql_string = sql_insert + sql_values
print("SqlCmd: " + sql_string)
except: # catch *all* exceptions
e = sys.exc_info()
print("{msg}".format(msg=e))
logging.debug(e[0], exc_info=True)
# execute the script, commit changes and disconnect
try:
c.execute(sql_string)
conn.commit()
conn.close()
except: # catch *all* exceptions
e = sys.exc_info()
print("{msg}".format(msg=e))
logging.debug(e[0], exc_info=True)
# output to console
print("Humidity: %s %%rH" % humidity)
print("Temperature: %s C" % temp)
print("Pressure: %s kPa" % pressure)
print("p: {pitch}, r: {roll}, y: {yaw}".format(**orientation_rad))
print("Core " + core_temp)
print("North: %s" % north)
view raw read_sensors.py hosted with ❤ by GitHub