Commit 6cd96adadaa3a799491bea7ecbf45b7858e67516
1 parent
c27f3d786e
Exists in
main
Revert "always try to turn off a channel when stopping blink even if there is no running thread"
This reverts commit c27f3d786e5b6d00b8c300d20f5e25bf1ef655af.
Showing 1 changed file with 1 additions and 1 deletions Inline Diff
ssa/alarm/lightstack.py
View file @
6cd96ad
""" | 1 | 1 | """ | |
lightstack.py | 2 | 2 | lightstack.py | |
Control program for buzzer and 5-light stack. | 3 | 3 | Control program for buzzer and 5-light stack. | |
4 | 4 | |||
LTA | 5 | 5 | LTA | |
""" | 6 | 6 | """ | |
import threading | 7 | 7 | import threading | |
import functools | 8 | 8 | import functools | |
import os | 9 | 9 | import os | |
10 | 10 | |||
TEST_MODE = True if os.environ.get("TEST_GPIO") == "True" else False | 11 | 11 | TEST_MODE = True if os.environ.get("TEST_GPIO") == "True" else False | |
12 | 12 | |||
if TEST_MODE == True: | 13 | 13 | if TEST_MODE == True: | |
import fakeGPIO as GPIO | 14 | 14 | import fakeGPIO as GPIO | |
else: | 15 | 15 | else: | |
import RPi.GPIO as GPIO | 16 | 16 | import RPi.GPIO as GPIO | |
17 | 17 | |||
18 | 18 | |||
class Channel: | 19 | 19 | class Channel: | |
# mappings for channel name (e.g. "Buzzer", "Red", "Green") to pin | 20 | 20 | # mappings for channel name (e.g. "Buzzer", "Red", "Green") to pin | |
# number using the BCM number (e.g. GPIO 17, as opposed to BOARD | 21 | 21 | # number using the BCM number (e.g. GPIO 17, as opposed to BOARD | |
# header numbering) | 22 | 22 | # header numbering) | |
Buzzer = 17 | 23 | 23 | Buzzer = 17 | |
White = 27 | 24 | 24 | White = 27 | |
Blue = 22 | 25 | 25 | Blue = 22 | |
Green = 23 | 26 | 26 | Green = 23 | |
Yellow = 24 | 27 | 27 | Yellow = 24 | |
Red = 25 | 28 | 28 | Red = 25 | |
29 | 29 | |||
30 | 30 | |||
# decorator function | 31 | 31 | # decorator function | |
# A lot of complication just so that we can decorate | 32 | 32 | # A lot of complication just so that we can decorate | |
# other class methods with @_with_lock so that it is | 33 | 33 | # other class methods with @_with_lock so that it is | |
# guarded by self.lock | 34 | 34 | # guarded by self.lock | |
def _with_lock(func): | 35 | 35 | def _with_lock(func): | |
@functools.wraps(func) | 36 | 36 | @functools.wraps(func) | |
def wrapper(self, *args): | 37 | 37 | def wrapper(self, *args): | |
with self.lock: | 38 | 38 | with self.lock: | |
func(self, *args) | 39 | 39 | func(self, *args) | |
40 | 40 | |||
return wrapper | 41 | 41 | return wrapper | |
42 | 42 | |||
43 | 43 | |||
class LightStack: | 44 | 44 | class LightStack: | |
ON = GPIO.HIGH | 45 | 45 | ON = GPIO.HIGH | |
OFF = GPIO.LOW | 46 | 46 | OFF = GPIO.LOW | |
47 | 47 | |||
def __init__(self): | 48 | 48 | def __init__(self): | |
# use BCM pin numbering | 49 | 49 | # use BCM pin numbering | |
GPIO.setmode(GPIO.BCM) | 50 | 50 | GPIO.setmode(GPIO.BCM) | |
# I guess make this threadsafe | 51 | 51 | # I guess make this threadsafe | |
self.lock = threading.RLock() | 52 | 52 | self.lock = threading.RLock() | |
# dicts to store state and threads | 53 | 53 | # dicts to store state and threads | |
self.channel_states = dict() | 54 | 54 | self.channel_states = dict() | |
self.channel_threads = dict() | 55 | 55 | self.channel_threads = dict() | |
# initialize states | 56 | 56 | # initialize states | |
for channel in LightStack.enumerate_channels(): | 57 | 57 | for channel in LightStack.enumerate_channels(): | |
print(f"setup channel {channel}") | 58 | 58 | print(f"setup channel {channel}") | |
GPIO.setup(channel, GPIO.OUT) | 59 | 59 | GPIO.setup(channel, GPIO.OUT) | |
GPIO.output(channel, GPIO.LOW) | 60 | 60 | GPIO.output(channel, GPIO.LOW) | |
self.channel_states[channel] = LightStack.OFF | 61 | 61 | self.channel_states[channel] = LightStack.OFF | |
print("LighStack initialized") | 62 | 62 | print("LighStack initialized") | |
63 | 63 | |||
@_with_lock | 64 | 64 | @_with_lock | |
def on(self, channel): | 65 | 65 | def on(self, channel): | |
GPIO.output(channel, LightStack.ON) | 66 | 66 | GPIO.output(channel, LightStack.ON) | |
self.channel_states[channel] = LightStack.ON | 67 | 67 | self.channel_states[channel] = LightStack.ON | |
68 | 68 | |||
@_with_lock | 69 | 69 | @_with_lock | |
def off(self, channel): | 70 | 70 | def off(self, channel): | |
GPIO.output(channel, LightStack.OFF) | 71 | 71 | GPIO.output(channel, LightStack.OFF) | |
self.channel_states[channel] = LightStack.OFF | 72 | 72 | self.channel_states[channel] = LightStack.OFF | |
73 | 73 | |||
@_with_lock | 74 | 74 | @_with_lock | |
def toggle(self, channel): | 75 | 75 | def toggle(self, channel): | |
if self.channel_states[channel] == LightStack.OFF: | 76 | 76 | if self.channel_states[channel] == LightStack.OFF: | |
self.on(channel) | 77 | 77 | self.on(channel) | |
else: | 78 | 78 | else: | |
self.off(channel) | 79 | 79 | self.off(channel) | |
80 | 80 | |||
@_with_lock | 81 | 81 | @_with_lock | |
def start_blink(self, interval, channel): | 82 | 82 | def start_blink(self, interval, channel): | |
if self.channel_threads.get(channel, None) is None: | 83 | 83 | if self.channel_threads.get(channel, None) is None: | |
new_thread = _Blinker(interval, self, channel) | 84 | 84 | new_thread = _Blinker(interval, self, channel) | |
self.channel_threads[channel] = new_thread | 85 | 85 | self.channel_threads[channel] = new_thread | |
new_thread.start() | 86 | 86 | new_thread.start() | |
87 | 87 | |||
@_with_lock | 88 | 88 | @_with_lock | |
def stop_blink(self, channel): | 89 | 89 | def stop_blink(self, channel): | |
thread = self.channel_threads.get(channel, None) | 90 | 90 | thread = self.channel_threads.get(channel, None) | |
if thread is not None: | 91 | 91 | if thread is not None: | |
thread.cancel() | 92 | 92 | thread.cancel() | |
thread.join() | 93 | 93 | thread.join() | |
self.channel_threads[channel] = None | 94 | 94 | self.channel_threads[channel] = None | |
self.off(channel) | 95 | 95 | self.off(channel) | |
96 | 96 | |||
def get(self, channel): | 97 | 97 | def get(self, channel): | |
return self.channel_states.get(channel, None) | 98 | 98 | return self.channel_states.get(channel, None) | |
99 | 99 | |||
@staticmethod | 100 | 100 | @staticmethod | |
def enumerate_channels(): | 101 | 101 | def enumerate_channels(): | |
for channel_name in dir(Channel): | 102 | 102 | for channel_name in dir(Channel): | |
if channel_name[0] != "_": | 103 | 103 | if channel_name[0] != "_": | |
yield getattr(Channel, channel_name) | 104 | 104 | yield getattr(Channel, channel_name) | |
105 | 105 |