Commit 666db27d6e89b4e98f1d42ddce007b5ac0e5caba
1 parent
7b0ebee254
Exists in
main
add safe_bearing_limit config parameter to suppress temperature warning as long …
…as the hottest bearing is below this limit (e.g. for polar mode when tap water is wamer than air)
Showing 2 changed files with 7 additions and 2 deletions Inline Diff
alarmist_config.ini
View file @
666db27
# alarmist config | 1 | 1 | # alarmist config | |
2 | 2 | |||
[influxdb] | 3 | 3 | [influxdb] | |
host = soars-rpi-1.dynamic.ucsd.edu | 4 | 4 | host = soars-rpi-1.dynamic.ucsd.edu | |
port = 8086 | 5 | 5 | port = 8086 | |
dbname = paddle | 6 | 6 | dbname = paddle | |
7 | 7 | |||
[thresholds] | 8 | 8 | [thresholds] | |
coolant = 2 | 9 | 9 | coolant = 2 | |
on_torque = 2 | 10 | 10 | on_torque = 2 | |
moving_torque = 20 | 11 | 11 | moving_torque = 20 | |
12 | # warn about bearing temperature if they are ambient_temperatre_delta above ambient | |||
ambient_temperature_delta = 0.2 | 12 | 13 | ambient_temperature_delta = 0.2 |
ssa/alarm/alarmist.py
View file @
666db27
""" | 1 | 1 | """ | |
alarmist.py | 2 | 2 | alarmist.py | |
3 | 3 | |||
Periodically checks on SOARS state and processes alarms | 4 | 4 | Periodically checks on SOARS state and processes alarms | |
""" | 5 | 5 | """ | |
6 | 6 | |||
import configparser | 7 | 7 | import configparser | |
import collections | 8 | 8 | import collections | |
import influxdb | 9 | 9 | import influxdb | |
import sys | 10 | 10 | import sys | |
import threading | 11 | 11 | import threading | |
import ssa.alarm.lightstack as lightstack | 12 | 12 | import ssa.alarm.lightstack as lightstack | |
13 | 13 | |||
DEBUG = False | 14 | 14 | DEBUG = False | |
15 | 15 | |||
# bearing coolant is measured by gauge A | 16 | 16 | # bearing coolant is measured by gauge A | |
BEARING_COOLANT_QUERY = """ | 17 | 17 | BEARING_COOLANT_QUERY = """ | |
SELECT last(flow_rate) as flow_rate | 18 | 18 | SELECT last(flow_rate) as flow_rate | |
FROM coolant_flow_rate | 19 | 19 | FROM coolant_flow_rate | |
WHERE meter_id='A' | 20 | 20 | WHERE meter_id='A' | |
""" | 21 | 21 | """ | |
# we have to infer if the paddle is moving based on the motor torque | 22 | 22 | # we have to infer if the paddle is moving based on the motor torque | |
PADDLE_MOVING_QUERY = """ | 23 | 23 | PADDLE_MOVING_QUERY = """ | |
SELECT last(value) as value FROM Torque GROUP BY * | 24 | 24 | SELECT last(value) as value FROM Torque GROUP BY * | |
""" | 25 | 25 | """ | |
# figure out the hottest bearing | 26 | 26 | # figure out the hottest bearing | |
MAX_BEARING_TEMPERATURE_QUERY = """ | 27 | 27 | MAX_BEARING_TEMPERATURE_QUERY = """ | |
SELECT max(temperature) as max_temperature FROM ( | 28 | 28 | SELECT max(temperature) as max_temperature FROM ( | |
SELECT last(temperature) as temperature | 29 | 29 | SELECT last(temperature) as temperature | |
FROM paddle_bearings | 30 | 30 | FROM paddle_bearings | |
WHERE location != 'Ambient' | 31 | 31 | WHERE location != 'Ambient' | |
GROUP BY location | 32 | 32 | GROUP BY location | |
) | 33 | 33 | ) | |
""" | 34 | 34 | """ | |
# grab ambient temperature | 35 | 35 | # grab ambient temperature | |
AMBIENT_TEMPERATURE_QUERY = """ | 36 | 36 | AMBIENT_TEMPERATURE_QUERY = """ | |
SELECT last(temperature) as temperature | 37 | 37 | SELECT last(temperature) as temperature | |
FROM paddle_bearings | 38 | 38 | FROM paddle_bearings | |
WHERE location = 'Ambient' | 39 | 39 | WHERE location = 'Ambient' | |
""" | 40 | 40 | """ | |
41 | 41 | |||
# annoying, but we need to map the measurement name and value key from influxdb | 42 | 42 | # annoying, but we need to map the measurement name and value key from influxdb | |
# onto a key in the soars state object | 43 | 43 | # onto a key in the soars state object | |
MeasurementInfo = collections.namedtuple("MeasurementInfo", "name value_key state_key") | 44 | 44 | MeasurementInfo = collections.namedtuple("MeasurementInfo", "name value_key state_key") | |
45 | 45 | |||
46 | 46 | |||
def dprint(str): | 47 | 47 | def dprint(str): | |
if DEBUG: | 48 | 48 | if DEBUG: | |
print(str) | 49 | 49 | print(str) | |
50 | 50 | |||
51 | 51 | |||
class Alarmist: | 52 | 52 | class Alarmist: | |
def __init__(self): | 53 | 53 | def __init__(self): | |
self.config = configparser.ConfigParser() | 54 | 54 | self.config = configparser.ConfigParser() | |
self.config.read("alarmist_config.ini") | 55 | 55 | self.config.read("alarmist_config.ini") | |
self.state_update_event = threading.Event() | 56 | 56 | self.state_update_event = threading.Event() | |
self.soars_state = ThreadsafeState(self.state_update_event) | 57 | 57 | self.soars_state = ThreadsafeState(self.state_update_event) | |
self.lights = lightstack.LightStack() | 58 | 58 | self.lights = lightstack.LightStack() | |
self.threads = list() | 59 | 59 | self.threads = list() | |
self.threads.append( | 60 | 60 | self.threads.append( | |
PeriodicInfluxdbPoller( | 61 | 61 | PeriodicInfluxdbPoller( | |
interval=2, | 62 | 62 | interval=2, | |
soars_state=self.soars_state, | 63 | 63 | soars_state=self.soars_state, | |
query=BEARING_COOLANT_QUERY, | 64 | 64 | query=BEARING_COOLANT_QUERY, | |
measurment_info=MeasurementInfo( | 65 | 65 | measurment_info=MeasurementInfo( | |
"coolant_flow_rate", "flow_rate", "coolant_flow_rate" | 66 | 66 | "coolant_flow_rate", "flow_rate", "coolant_flow_rate" | |
), | 67 | 67 | ), | |
db_config=self.config["influxdb"], | 68 | 68 | db_config=self.config["influxdb"], | |
) | 69 | 69 | ) | |
) | 70 | 70 | ) | |
self.threads.append( | 71 | 71 | self.threads.append( | |
PeriodicInfluxdbPoller( | 72 | 72 | PeriodicInfluxdbPoller( | |
interval=2, | 73 | 73 | interval=2, | |
soars_state=self.soars_state, | 74 | 74 | soars_state=self.soars_state, | |
query=PADDLE_MOVING_QUERY, | 75 | 75 | query=PADDLE_MOVING_QUERY, | |
measurment_info=MeasurementInfo("Torque", "value", "motor_torque"), | 76 | 76 | measurment_info=MeasurementInfo("Torque", "value", "motor_torque"), | |
db_config=self.config["influxdb"], | 77 | 77 | db_config=self.config["influxdb"], | |
) | 78 | 78 | ) | |
) | 79 | 79 | ) | |
self.threads.append( | 80 | 80 | self.threads.append( | |
PeriodicInfluxdbPoller( | 81 | 81 | PeriodicInfluxdbPoller( | |
interval=5, | 82 | 82 | interval=5, | |
soars_state=self.soars_state, | 83 | 83 | soars_state=self.soars_state, | |
query=MAX_BEARING_TEMPERATURE_QUERY, | 84 | 84 | query=MAX_BEARING_TEMPERATURE_QUERY, | |
measurment_info=MeasurementInfo( | 85 | 85 | measurment_info=MeasurementInfo( | |
"paddle_bearings", "max_temperature", "max_temperature" | 86 | 86 | "paddle_bearings", "max_temperature", "max_temperature" | |
), | 87 | 87 | ), | |
db_config=self.config["influxdb"], | 88 | 88 | db_config=self.config["influxdb"], | |
) | 89 | 89 | ) | |
) | 90 | 90 | ) | |
self.threads.append( | 91 | 91 | self.threads.append( | |
PeriodicInfluxdbPoller( | 92 | 92 | PeriodicInfluxdbPoller( | |
interval=5, | 93 | 93 | interval=5, | |
soars_state=self.soars_state, | 94 | 94 | soars_state=self.soars_state, | |
query=AMBIENT_TEMPERATURE_QUERY, | 95 | 95 | query=AMBIENT_TEMPERATURE_QUERY, | |
measurment_info=MeasurementInfo( | 96 | 96 | measurment_info=MeasurementInfo( | |
"paddle_bearings", "temperature", "ambient_temperature" | 97 | 97 | "paddle_bearings", "temperature", "ambient_temperature" | |
), | 98 | 98 | ), | |
db_config=self.config["influxdb"], | 99 | 99 | db_config=self.config["influxdb"], | |
) | 100 | 100 | ) | |
) | 101 | 101 | ) | |
self.event_watcher_thread = threading.Thread(target=self.check_state) | 102 | 102 | self.event_watcher_thread = threading.Thread(target=self.check_state) | |
self.event_watcher_thread_cancel_flag = threading.Event() | 103 | 103 | self.event_watcher_thread_cancel_flag = threading.Event() | |
104 | 104 | |||
def run(self): | 105 | 105 | def run(self): | |
# start polling threads | 106 | 106 | # start polling threads | |
for thread in self.threads: | 107 | 107 | for thread in self.threads: | |
thread.start() | 108 | 108 | thread.start() | |
# start update event watcher | 109 | 109 | # start update event watcher | |
self.event_watcher_thread.start() | 110 | 110 | self.event_watcher_thread.start() | |
111 | 111 | |||
def cleanup(self): | 112 | 112 | def cleanup(self): | |
# cancel polling loops | 113 | 113 | # cancel polling loops | |
for thread in self.threads: | 114 | 114 | for thread in self.threads: | |
thread.cancel() | 115 | 115 | thread.cancel() | |
# cancel the event watcher thread | 116 | 116 | # cancel the event watcher thread | |
self.event_watcher_thread_cancel_flag.set() | 117 | 117 | self.event_watcher_thread_cancel_flag.set() | |
self.state_update_event.set() | 118 | 118 | self.state_update_event.set() | |
# wait for threads to stop | 119 | 119 | # wait for threads to stop | |
self.join(timeout=1.0) | 120 | 120 | self.join(timeout=1.0) | |
# finally, call cleanup on lights | 121 | 121 | # finally, call cleanup on lights | |
self.lights.cleanup() | 122 | 122 | self.lights.cleanup() | |
123 | 123 | |||
def join(self, timeout=None): | 124 | 124 | def join(self, timeout=None): | |
for thread in self.threads: | 125 | 125 | for thread in self.threads: | |
thread.join(timeout=timeout) | 126 | 126 | thread.join(timeout=timeout) | |
self.event_watcher_thread.join(timeout=timeout) | 127 | 127 | self.event_watcher_thread.join(timeout=timeout) | |
128 | 128 | |||
def check_state(self): | 129 | 129 | def check_state(self): | |
# TODO - create an influxdb client here and log alarm states | 130 | 130 | # TODO - create an influxdb client here and log alarm states | |
131 | 131 | |||
# this method gets run in its own thread, so loop indefinitely, | 132 | 132 | # this method gets run in its own thread, so loop indefinitely, | |
# waking up whenever the state_update_event flag is set | 133 | 133 | # waking up whenever the state_update_event flag is set | |
while self.state_update_event.wait(timeout=None): | 134 | 134 | while self.state_update_event.wait(timeout=None): | |
# exit thread loop if the cancel flag is set | 135 | 135 | # exit thread loop if the cancel flag is set | |
if self.event_watcher_thread_cancel_flag.is_set(): | 136 | 136 | if self.event_watcher_thread_cancel_flag.is_set(): | |
break | 137 | 137 | break | |
# | 138 | 138 | # | |
# update state values that depend on db values | 139 | 139 | # update state values that depend on db values | |
# | 140 | 140 | # | |
# check if paddle is running | 141 | 141 | # check if paddle is running | |
(torque, _) = self.soars_state.get( | 142 | 142 | (torque, _) = self.soars_state.get( | |
"motor_torque", default=(0.0, None), lookback=0 | 143 | 143 | "motor_torque", default=(0.0, None), lookback=0 | |
) | 144 | 144 | ) | |
(previous_torque, _) = self.soars_state.get( | 145 | 145 | (previous_torque, _) = self.soars_state.get( | |
"motor_torque", default=(0.0, None), lookback=1 | 146 | 146 | "motor_torque", default=(0.0, None), lookback=1 | |
) | 147 | 147 | ) | |
(previouser_torque, _) = self.soars_state.get( | 148 | 148 | (previouser_torque, _) = self.soars_state.get( | |
"motor_torque", default=(0.0, None), lookback=2 | 149 | 149 | "motor_torque", default=(0.0, None), lookback=2 | |
) | 150 | 150 | ) | |
paddle_on = torque > float(self.config["thresholds"]["on_torque"]) | 151 | 151 | paddle_on = torque > float(self.config["thresholds"]["on_torque"]) | |
# Check three last points to see if paddle is moving | 152 | 152 | # Check three last points to see if paddle is moving | |
paddle_moving = paddle_on and ( | 153 | 153 | paddle_moving = paddle_on and ( | |
(torque > float(self.config["thresholds"]["moving_torque"])) | 154 | 154 | (torque > float(self.config["thresholds"]["moving_torque"])) | |
or (torque != previous_torque or previous_torque != previouser_torque) | 155 | 155 | or (torque != previous_torque or previous_torque != previouser_torque) | |
) | 156 | 156 | ) | |
# check if coolant is flowing | 157 | 157 | # check if coolant is flowing | |
(coolant_flow_rate, _) = self.soars_state.get( | 158 | 158 | (coolant_flow_rate, _) = self.soars_state.get( | |
"coolant_flow_rate", (0.0, None) | 159 | 159 | "coolant_flow_rate", (0.0, None) | |
) | 160 | 160 | ) | |
coolant_on = coolant_flow_rate > float(self.config["thresholds"]["coolant"]) | 161 | 161 | coolant_on = coolant_flow_rate > float(self.config["thresholds"]["coolant"]) | |
162 | 162 | |||
(ambient, _) = self.soars_state.get("ambient_temperature", (-1, None)) | 163 | 163 | (ambient, _) = self.soars_state.get("ambient_temperature", (-1, None)) | |
(max_bearing, _) = self.soars_state.get("max_temperature", (-1, None)) | 164 | 164 | (max_bearing, _) = self.soars_state.get("max_temperature", (-1, None)) | |
ambient_delta = float( | 165 | 165 | ambient_delta = float( | |
self.config["thresholds"]["ambient_temperature_delta"] | 166 | 166 | self.config["thresholds"]["ambient_temperature_delta"] | |
) | 167 | 167 | ) | |
bearing_temperature_warning = False | 168 | 168 | bearing_temperature_warning = False | |
if ambient > 0 and max_bearing > 0: | 169 | 169 | if ambient > 0 and max_bearing > 0: | |
if max_bearing > ambient + ambient_delta: | 170 | 170 | if (max_bearing > ambient + ambient_delta) and ( | |
171 | max_bearing > float(self.config["thresholds"]["safe_bearing_limit"]) | |||
172 | ): | |||
bearing_temperature_warning = True | 171 | 173 | bearing_temperature_warning = True | |
172 | 174 | |||
dprint( | 173 | 175 | dprint( | |
f"State: paddle_on: {paddle_on}, paddle_moving: {paddle_moving}, coolant_on: {coolant_on}" | 174 | 176 | f"State: paddle_on: {paddle_on}, paddle_moving: {paddle_moving}, coolant_on: {coolant_on}" | |
) | 175 | 177 | ) | |
dprint(f"Ambient temperature: {ambient}C Hottest bearing: {max_bearing}C") | 176 | 178 | dprint(f"Ambient temperature: {ambient}C Hottest bearing: {max_bearing}C") | |
177 | 179 | |||
# | 178 | 180 | # | |
# run checks to throw alarms | 179 | 181 | # run checks to throw alarms | |
# | 180 | 182 | # | |
warning = False | 181 | 183 | warning = False | |
alarm = False | 182 | 184 | alarm = False | |
# is paddle running? (solid white, no problems) | 183 | 185 | # is paddle running? (solid white, no problems) | |
if paddle_on: | 184 | 186 | if paddle_on: | |
self.lights.on(lightstack.Channel.White) | 185 | 187 | self.lights.on(lightstack.Channel.White) | |
else: | 186 | 188 | else: | |
self.lights.off(lightstack.Channel.White) | 187 | 189 | self.lights.off(lightstack.Channel.White) | |
# is paddle moving? (solid green, no problems) | 188 | 190 | # is paddle moving? (solid green, no problems) | |
if paddle_moving: | 189 | 191 | if paddle_moving: | |
self.lights.on(lightstack.Channel.Green) | 190 | 192 | self.lights.on(lightstack.Channel.Green) | |
else: | 191 | 193 | else: | |
self.lights.off(lightstack.Channel.Green) | 192 | 194 | self.lights.off(lightstack.Channel.Green) | |
# is water running? (solid blue, no problems) | 193 | 195 | # is water running? (solid blue, no problems) | |
if coolant_on: | 194 | 196 | if coolant_on: | |
self.lights.on(lightstack.Channel.Blue) | 195 | 197 | self.lights.on(lightstack.Channel.Blue) | |
else: | 196 | 198 | else: | |
self.lights.off(lightstack.Channel.Blue) | 197 | 199 | self.lights.off(lightstack.Channel.Blue) | |
# is paddle moving without water? (alarm state and flash blue) | 198 | 200 | # is paddle moving without water? (alarm state and flash blue) | |
if paddle_moving and not coolant_on: | 199 | 201 | if paddle_moving and not coolant_on: | |
self.lights.start_blink(0.5, lightstack.Channel.Blue) | 200 | 202 | self.lights.start_blink(0.5, lightstack.Channel.Blue) | |
alarm = True | 201 | 203 | alarm = True | |
else: | 202 | 204 | else: | |
self.lights.stop_blink(lightstack.Channel.Blue) | 203 | 205 | self.lights.stop_blink(lightstack.Channel.Blue) | |
# is the hottest bearing above ambient? (warning) | 204 | 206 | # is the hottest bearing above ambient? (warning) | |
if bearing_temperature_warning: | 205 | 207 | if bearing_temperature_warning: | |
warning = True | 206 | 208 | warning = True | |
# TODO - piston temps > 100F? (flash yellow w/ buzzer) | 207 | 209 | # TODO - piston temps > 100F? (flash yellow w/ buzzer) | |
208 | 210 | |||
# did any warnings get thrown? (turn on yellow flashing) | 209 | 211 | # did any warnings get thrown? (turn on yellow flashing) | |
if warning: | 210 | 212 | if warning: | |
self.lights.start_blink(0.5, lightstack.Channel.Yellow) | 211 | 213 | self.lights.start_blink(0.5, lightstack.Channel.Yellow) | |
else: | 212 | 214 | else: | |
self.lights.stop_blink(lightstack.Channel.Yellow) | 213 | 215 | self.lights.stop_blink(lightstack.Channel.Yellow) | |
214 | 216 | |||
# | 215 | 217 | # | |
# did any alarm want to turn on the buzzer? | 216 | 218 | # did any alarm want to turn on the buzzer? | |
# | 217 | 219 | # | |
if alarm: | 218 | 220 | if alarm: | |
self.lights.start_blink(0.3, lightstack.Channel.Red) | 219 | 221 | self.lights.start_blink(0.3, lightstack.Channel.Red) | |
self.lights.start_blink(0.3, lightstack.Channel.Buzzer) | 220 | 222 | self.lights.start_blink(0.3, lightstack.Channel.Buzzer) | |
else: | 221 | 223 | else: | |
self.lights.stop_blink(lightstack.Channel.Red) | 222 | 224 | self.lights.stop_blink(lightstack.Channel.Red) | |
self.lights.off(lightstack.Channel.Red) | 223 | 225 | self.lights.off(lightstack.Channel.Red) | |
self.lights.stop_blink(lightstack.Channel.Buzzer) | 224 | 226 | self.lights.stop_blink(lightstack.Channel.Buzzer) | |
self.lights.off(lightstack.Channel.Buzzer) | 225 | 227 | self.lights.off(lightstack.Channel.Buzzer) | |
226 | 228 | |||
# clear event flag | 227 | 229 | # clear event flag | |
self.state_update_event.clear() | 228 | 230 | self.state_update_event.clear() | |
229 | 231 | |||
230 | 232 | |||
class RingBuffer: | 231 | 233 | class RingBuffer: | |
def __init__(self, size): | 232 | 234 | def __init__(self, size): | |
self.size = size | 233 | 235 | self.size = size | |
self.data = [None] * size | 234 | 236 | self.data = [None] * size | |
self.head = -1 | 235 | 237 | self.head = -1 | |
236 | 238 | |||
def __getitem__(self, index=0): | 237 | 239 | def __getitem__(self, index=0): | |
if index >= self.size: | 238 | 240 | if index >= self.size: | |
raise Exception(f"Index out of bounds (index: {index}, size: {self.size})") | 239 | 241 | raise Exception(f"Index out of bounds (index: {index}, size: {self.size})") | |
internal_index = (self.head - index) % self.size | 240 | 242 | internal_index = (self.head - index) % self.size | |
return self.data[internal_index] | 241 | 243 | return self.data[internal_index] | |
242 | 244 | |||
def get(self, index=0, default=None): | 243 | 245 | def get(self, index=0, default=None): | |
val = self[index] | 244 | 246 | val = self[index] | |
return val if val is not None else default | 245 | 247 | return val if val is not None else default | |
246 | 248 | |||
def push(self, item): | 247 | 249 | def push(self, item): | |
self.head = (self.head + 1) % self.size | 248 | 250 | self.head = (self.head + 1) % self.size | |
self.data[self.head] = item | 249 | 251 | self.data[self.head] = item | |
250 | 252 | |||
251 | 253 | |||
# global state object protected by a lock | 252 | 254 | # global state object protected by a lock | |
# only update values if the new timestamp differs from the most recent item's timestamp | 253 | 255 | # only update values if the new timestamp differs from the most recent item's timestamp | |
# store history of past [lookback] items | 254 | 256 | # store history of past [lookback] items | |
class ThreadsafeState: | 255 | 257 | class ThreadsafeState: | |
def __init__(self, update_event, lookback=5): | 256 | 258 | def __init__(self, update_event, lookback=5): | |
self.lookback = lookback | 257 | 259 | self.lookback = lookback | |
self.state = dict() | 258 | 260 | self.state = dict() | |
self.lock = threading.RLock() | 259 | 261 | self.lock = threading.RLock() | |
self.update_event = update_event | 260 | 262 | self.update_event = update_event | |
261 | 263 | |||
def set(self, key, value, time): | 262 | 264 | def set(self, key, value, time): | |
with self.lock: | 263 | 265 | with self.lock: | |
if self.state.get(key, None) is None: | 264 | 266 | if self.state.get(key, None) is None: | |
self._init_entry(key) | 265 | 267 | self._init_entry(key) | |
history = self.state.get(key) | 266 | 268 | history = self.state.get(key) | |
item = history.get() | 267 | 269 | item = history.get() | |
if item is None: | 268 | 270 | if item is None: | |
history.push((value, time)) | 269 | 271 | history.push((value, time)) | |
self.update_event.set() | 270 | 272 | self.update_event.set() | |
else: | 271 | 273 | else: | |
(_, previous_time) = item | 272 | 274 | (_, previous_time) = item | |
if time != previous_time: | 273 | 275 | if time != previous_time: | |
history.push((value, time)) | 274 | 276 | history.push((value, time)) | |
self.update_event.set() | 275 | 277 | self.update_event.set() | |
dprint(f"ThreadsafeState: set: {key}: {value} ({time})") | 276 | 278 | dprint(f"ThreadsafeState: set: {key}: {value} ({time})") | |
277 | 279 | |||
def _init_entry(self, key): | 278 | 280 | def _init_entry(self, key): | |
with self.lock: | 279 | 281 | with self.lock: | |
self.state[key] = RingBuffer(self.lookback) | 280 | 282 | self.state[key] = RingBuffer(self.lookback) | |
281 | 283 | |||
def get(self, key, default=None, lookback=0): | 282 | 284 | def get(self, key, default=None, lookback=0): | |
with self.lock: | 283 | 285 | with self.lock: | |
history = self.state.get(key, None) | 284 | 286 | history = self.state.get(key, None) | |
if history is None: | 285 | 287 | if history is None: | |
return default | 286 | 288 | return default | |
else: | 287 | 289 | else: | |
return history.get(index=lookback, default=default) | 288 | 290 | return history.get(index=lookback, default=default) | |
289 | 291 | |||
290 | 292 | |||
# extend the Timer thread object to run repeatedly. Subclass this. | 291 | 293 | # extend the Timer thread object to run repeatedly. Subclass this. | |
class RepeatedTimer(threading.Timer): | 292 | 294 | class RepeatedTimer(threading.Timer): | |
def __init__(self, interval): | 293 | 295 | def __init__(self, interval): | |
# Timer constructer expects a callback function, but we don't care | 294 | 296 | # Timer constructer expects a callback function, but we don't care | |
# about that, so just pass None | 295 | 297 | # about that, so just pass None | |
super(RepeatedTimer, self).__init__(interval, None) | 296 | 298 | super(RepeatedTimer, self).__init__(interval, None) | |
297 | 299 | |||
# override Timer's run method to loop on our injest_data function | 298 | 300 | # override Timer's run method to loop on our injest_data function | |
# wait will block while loop for timeout interval | 299 | 301 | # wait will block while loop for timeout interval | |
# calling timer.cancel() sets the finish flag and exits the loop | 300 | 302 | # calling timer.cancel() sets the finish flag and exits the loop | |
def run(self): | 301 | 303 | def run(self): | |
while not self.finished.wait(timeout=self.interval): | 302 | 304 | while not self.finished.wait(timeout=self.interval): | |
self.do_work() | 303 | 305 | self.do_work() | |
304 | 306 | |||
def do_work(self): | 305 | 307 | def do_work(self): | |
raise Exception( | 306 | 308 | raise Exception( | |
"RepeatedTimer baseclass do_work() called. Override this method in a subclass" | 307 | 309 | "RepeatedTimer baseclass do_work() called. Override this method in a subclass" | |
) | 308 | 310 | ) | |
309 | 311 | |||
310 | 312 | |||
# makes a [query] to influxdb every [interval] | 311 | 313 | # makes a [query] to influxdb every [interval] | |
# influxdb returns a resultset object: | 312 | 314 | # influxdb returns a resultset object: | |
# https://influxdb-python.readthedocs.io/en/latest/resultset.html#resultset | 313 | 315 | # https://influxdb-python.readthedocs.io/en/latest/resultset.html#resultset | |
class PeriodicInfluxdbPoller(RepeatedTimer): | 314 | 316 | class PeriodicInfluxdbPoller(RepeatedTimer): | |
def __init__(self, interval, soars_state, query, measurment_info, db_config): | 315 | 317 | def __init__(self, interval, soars_state, query, measurment_info, db_config): | |
super(PeriodicInfluxdbPoller, self).__init__(interval) | 316 | 318 | super(PeriodicInfluxdbPoller, self).__init__(interval) |