I mostly just recycled the database creation and writing code for SQLite, and I am storing the data as a blob (JSON string) so that I can continue revising my data objects without having to mess around with my database at all.
I'm planning on calling this and processing the data through a service I'm working on as well.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import logging | |
import sqlite3 | |
import sys | |
import time | |
import json | |
from sense_hat import SenseHat | |
from re import findall | |
from subprocess import check_output | |
sense = SenseHat() | |
rounding = 2 | |
logPath = '/home/pi/pyDev/SensorMgr/logs/' | |
sqlite_file = logPath + 'SensorData.db' | |
table_name = 'PiData' | |
FORMAT = '%(asctime)s : %(levelname)s : %(message)s' | |
logging.basicConfig(format=FORMAT, filename=logPath + 'debug.log', level=logging.DEBUG) | |
try: # capture sensor data | |
# system | |
core_temp = check_output(["vcgencmd", "measure_temp"]) | |
core_temp = findall(b'\d+', core_temp) | |
core_temp = core_temp[0].decode("utf-8") | |
# time | |
gmTime = time.gmtime() | |
strTime = time.asctime(gmTime) | |
timeStamp = time.time() | |
# sense hat | |
humidity = round(sense.get_humidity(), rounding) | |
temp = round(sense.get_temperature(), rounding) | |
pressure = sense.get_pressure() | |
pressure = round(pressure / 10, rounding) # convert to kPa from millibars | |
orientation_rad = sense.get_orientation_radians() | |
# compass | |
sense.set_imu_config(True, False, False) # gyroscope only | |
north = round(sense.get_compass(), rounding) | |
except: # catch *all* exceptions | |
e = sys.exc_info() | |
print("{msg}".format(msg=e)) | |
logging.debug(e[0], exc_info=True) | |
try: | |
# Open db connection | |
conn = sqlite3.connect(sqlite_file) | |
c = conn.cursor() | |
except: # catch *all* exceptions | |
e = sys.exc_info() | |
print("{msg}".format(msg=e)) | |
logging.debug(e[0], exc_info=True) | |
# create data obj | |
myList = list() | |
element = ["CoreTemp", core_temp, "C"] | |
myList.append(element) | |
element = ["Humidity", humidity, "%rH"] | |
myList.append(element) | |
element = ["Temperature", temp, "C"] | |
myList.append(element) | |
element = ["Pressure", pressure, "kPa"] | |
myList.append(element) | |
element = ["Orient", orientation_rad, "rad"] | |
myList.append(element) | |
element = ["Compass", north, "Deg N"] | |
myList.append(element) | |
dataStr = json.dumps(myList) | |
print(dataStr) | |
time_col = 'Time' | |
date_col = 'DateTime' | |
data_col = 'Data' | |
try: | |
# Create output sql strings | |
sql_insert = "INSERT INTO {tn} ({tc}, {dc}, {dobc}) ".\ | |
format(tn=table_name, tc=time_col, dc=date_col, dobc=data_col) | |
sql_values = "VALUES ({tc}, '{dc}', '{dobc}')".\ | |
format(tc=timeStamp, dc=strTime, dobc=dataStr) | |
sql_string = sql_insert + sql_values | |
print("SqlCmd: " + sql_string) | |
except: # catch *all* exceptions | |
e = sys.exc_info() | |
print("{msg}".format(msg=e)) | |
logging.debug(e[0], exc_info=True) | |
# execute the script, commit changes and disconnect | |
try: | |
c.execute(sql_string) | |
conn.commit() | |
conn.close() | |
except: # catch *all* exceptions | |
e = sys.exc_info() | |
print("{msg}".format(msg=e)) | |
logging.debug(e[0], exc_info=True) | |
# output to console | |
print("Humidity: %s %%rH" % humidity) | |
print("Temperature: %s C" % temp) | |
print("Pressure: %s kPa" % pressure) | |
print("p: {pitch}, r: {roll}, y: {yaw}".format(**orientation_rad)) | |
print("Core " + core_temp) | |
print("North: %s" % north) |