This source code is part of the Python magnetic field sensor and detector project. This class only provides a simple flashing indicator light for the project.
######################################## # # Python Magnetic Field Logger and Detector # by Sebastian Folz # Introduced on Pi And More 11 # ######################################## # Don't use time module, because there is no valid time without WiFi import time import sqlite3 as lite from qmc5883 import QMC5883 import statistics # import functions for system calls from subprocess import call import sys # import pushbutton class from pushbutton import pushbutton, Push # import blink class from Blinker import Blink1, Blink2 from enum import Enum import RPi.GPIO as GPIO # fix copy bug with switched arguments from shutil import copyfile # Import own neural net class from MLP import MLNN TASTER = 38 LED = 40 LONG_CYCLES = 10 # 1.5 seconds COMMIT = 10 BINSIZE = 10 # Defined states for state machine class State(Enum): LOGGING = 0 ACTION = 1 SHUT_DOWN = 2 ALERT = 3 class Logger: def __init__(self): self.timestamp = 0 self.state = State.LOGGING self.step = 0 self.event = "" self.blinker = Blink1() # data array self.darr=[] # Neural Net self.nn = MLNN() self.datasets = 0 # button initialization self.button = pushbutton() self.button.configure(TASTER) self.button.setLongCycle(LONG_CYCLES) # LED initializatio GPIO.setmode(GPIO.BOARD) GPIO.setup(LED, GPIO.OUT) GPIO.output(LED, GPIO.LOW) # sensor arays self.xx = [] self.yy = [] self.zz = [] self.con = None self.compass = QMC5883() self.compass.setSamplingRate(QMC5883.CONFIG_200HZ) self.run = 0 try: self.con = lite.connect('/home/pi/Dokumente/db/geo.db') self.con.isolation_level = 'DEFERRED'; self.cur = self.con.cursor() self.cur.execute('SELECT SQLITE_VERSION()') data = self.cur.fetchone() print("SQLite version: %s" % data) self.cur.execute('SELECT MAX(run)+1 FROM mag_field_data') data = self.cur.fetchone() if data[0] == None: self.run = 0 else: self.run = data[0] print("Run number: {}".format(self.run)) #print("Database has no runs. Starting at 0") except lite.Error as e: print("SQLite error %s:" % e.args[0]) sys.exit(1) ################################# # # Function check_arrival() - Check for arrival home # ################################ def check_arrival(self, dx): # Append normalized value to data array self.darr.append((dx + 5950) / 32768) if (len(self.darr) >= 80): if (self.nn.forward(self.darr) == True): self.blinker = Blink1() self.prepare() self.state = State.ALERT print("Detected home at (Run: Timestam) {} :, {}".format(self.run, self.timestamp)) self.darr.pop(0) ################################ # # Function lof() - logs magnetic field data into SQLIte3 database # ################################ def log(self): x = 0 y = 0 z = 0 t = 0 # read sensor data x,y,z,t = self.compass.readRaw(); self.xx.append(x) self.yy.append(y) self.zz.append(z) if (len(self.xx) == BINSIZE): #10 # HINT: cannot convert into degrees or radians, because the maxima and minima of #the QMC5883 are unknown, but it is not full 16 bit range! dx = round(statistics.median(self.xx), 1) dy = round(statistics.median(self.yy), 1) dz = round(statistics.median(self.zz), 1) self.xx = [] self.yy = [] self.zz = [] self.timestamp += 1 #print("Device data: {}, {}, {}".format(dx, dy, dz)) #print("DEBUG: Timestamp: {} Run: {}".format(self.timestamp, self.run)) # Check for arrival self.check_arrival(dx) self.cur.execute("INSERT INTO mag_field_data (timestamp, x, y, z, run, event) VALUES ({}, {}, {}, {}, {}, \"{}\")".format(self.timestamp, dx, dy, dz, self.run, self.event)) self.datasets = self.datasets + 1; if (self.datasets == COMMIT): self.con.commit() self.datasets = 0 ################################ # # Function prepare() - reset timing before state switch # ################################ def prepare(self): self.blinker.reset() GPIO.output(LED, GPIO.LOW) self.step = 0 ################################ # # Function logging_mode() - Implementation of the Logging Mode state # ################################ def logging_mode(self): pushmode = self.button.isPushed() # first, check for state switch if (pushmode == Push.SHORT_DOWN): self.event = "h" self.blinker = Blink2() self.prepare() self.state = State.ACTION print("Entering EVENT 1 state") return # first, check for state switch if (pushmode == Push.LONG_DOWN): self.blinker = Blink1() self.prepare() self.state = State.SHUT_DOWN print("Entering LONG DOWN state") return self.log() ################################ # # Function action_mode() - Implementation of the Action Mode state # ################################ def action_mode(self): return ################################ # # Function alert_mode() - Implementation of the Alert Mode state # ################################ def alert_mode(self): # first, check for state switch if not (self.button.isPushed() == Push.NONE): self.event = "" self.prepare() self.state = State.LOGGING print("Fast Return") return self.log() if self.blinker.blink(): GPIO.output(LED, GPIO.HIGH) else: GPIO.output(LED, GPIO.LOW) # stop program just if blink sequence has completed if self.blinker.EOS(): GPIO.output(LED, GPIO.LOW) self.blinker = Blink2() ################################ # # Function pause_mode() - Implementation of the Pause Mode state # ################################ def pause_mode(self): # first, check for state switch if not (self.button.isPushed() == Push.NONE): self.event = "" self.prepare() self.state = State.LOGGING print("Fast Return") return if self.blinker.blink(): GPIO.output(LED, GPIO.HIGH) else: GPIO.output(LED, GPIO.LOW) # stop program just if blink sequence has completed if self.blinker.EOS(): GPIO.output(LED, GPIO.LOW) self.blinker = Blink2() self.log() ################################ # # Function shut_down_mode() - Implementation of the Shut Down Mode state # ################################ def shut_down_mode(self): print("Entering SHUT DOWN mode") # first, check for state switch if not (self.button.isPushed() == Push.NONE): print("Fast Return") self.prepare() self.state = State.LOGGING return if self.blinker.blink(): GPIO.output(LED, GPIO.HIGH) else: GPIO.output(LED, GPIO.LOW) # stop program just if blink sequence has completed if self.blinker.EOS(): # importatnt: close database connection self.close() # make a backup of GEO db ;) # call ssems to switch arguments on cp! DO NOT USE! # call("cp /home/pi/Dokumente/db/geo.db /home/pi/Dokumente/db/geo_backup.db", shell=True) copyfile("/home/pi/Dokumente/db/geo.db", "/home/pi/Dokumente/db/geo_backup.db") GPIO.output(LED, GPIO.LOW) call("sudo shutdown -h now", shell=True) sys.exit(1) self.step += 1 ################################ # # Function operate() - Central Operation procedure # ################################ def operate(self): if (self.state == State.LOGGING): self.logging_mode() elif (self.state == State.ACTION): self.pause_mode() elif (self.state == State.ALERT): self.alert_mode() else: self.shut_down_mode() # close database connection def close(self): if self.con: self.con.close() log = Logger() try: while True: log.operate() # Sleep for a 100 ms time.sleep(0.1) finally: print("SQLite: Closing connection") log.close()
Kommentare
Kommentar veröffentlichen