# Copyright (C) 2020 Alex Sonea
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
import threading
import time
import logging
from ..utils import check_type, check_not_empty
logger = logging.getLogger(__name__)
[docs]class BaseThread():
"""Implements a class that wraps a processing logic that is executed
in a separate thread with the ability to pause / resume or fully stop
the task.
The main processing should be implemented in the `run` method where the
subclass should make sure that it checks periodically the status
(`paused` or `stopped`) and behave appropriately. The `run` can
be flanked by the `setup` and `teardown` methods where subclasses can
implement logic needed before the main processing is started or finished.
This becomes very handy for loops that normally prepare the work, then
run for an indefinite time, and later are closed when the owner signals.
Parameters
----------
name: str
The name of the thread.
patience: float
A duration in seconds that the main thread will wait for the
background thread to finish setup activities and indicate that it
is in ``started`` mode.
"""
[docs] def __init__(self, name='THREAD', patience=1.0):
# name should have been checked by the robot
self.__name = name
check_not_empty(patience, 'patience', 'thread', self.name, logger)
check_type(patience, (float, int), 'thread', self.name, logger)
self.__patience = patience
self.__started = threading.Event()
self.__paused = threading.Event()
self.__crashed = False
self.__thread = None
@property
def name(self):
"""Returns the name of the thread."""
return self.__name
[docs] def setup(self):
"""Thread preparation before running. Subclasses should override"""
pass
[docs] def run(self):
""" Run method of the thread.
.. note: In order to be stoppable (resp. pausable), this method has
to check the running property - as often as possible to improve
responsiveness - and terminate when :meth:`should_stop` (resp.
:py:meth:`should_pause`) becomes True.
For instance::
while <some condition for work>:
if not self.paused:
do_atom_work()
if self.stopped:
break
...
"""
raise NotImplementedError
[docs] def teardown(self):
"""Thread cleanup. Subclasses should override."""
pass
@property
def started(self):
"""Indicates if the thread was started."""
return self.__started.is_set()
@property
def stopped(self):
"""Indicates if the thread was stopped."""
return not self.__started.is_set()
@property
def running(self):
"""Indicates if the thread is running."""
return self.__started.is_set() and not self.__paused.is_set()
@property
def paused(self):
"""Indicates the thread was paused."""
return self.__started.is_set() and self.__paused.is_set()
def _wrapped_target(self):
"""Wraps the execution of the task between the setup() and
teardown() and sets / resets the events."""
try:
self.setup()
self.__started.set()
self.__paused.clear()
self.run()
self.__started.clear()
self.teardown()
except Exception:
self.__crashed = True
self.__started.clear()
self.__paused.clear()
raise
[docs] def start(self, wait=True):
"""Starts the task in it's own thread."""
logger.info(f'Start requested for "{self.name}"')
if self.running:
logger.info(f'"{self.name}" already running. Stopping first.')
self.stop()
self.__thread = threading.Thread(target=self._wrapped_target)
self.__thread.daemon = True
self.__thread.name = self.name
logger.info(f'"{self.name}" starting')
self.__thread.start()
if wait and (threading.current_thread() != self.__thread):
if not self.__started.wait(self.__patience):
if self.__crashed:
mess = f'Setup failed, for "{self.__thread.name}".'
else:
mess = f'Setup took longer than {self.__patience}s ' + \
f'for "{self.__thread.name}"'
self.__thread.join()
logger.critical(mess)
raise RuntimeError(mess)
logger.info(f'"{self.name}" successfully started')
[docs] def stop(self, wait=True):
"""Sends the stopping signal to the thread. By default waits for
the thread to finish."""
logger.info(f'Stop requested for "{self.name}"')
if self.started:
self.__started.clear()
self.__paused.clear()
logger.info(f'"{self.name}" stopping')
if wait and (threading.current_thread() != self.__thread):
while self.__thread.is_alive():
self.__started.clear()
self.__thread.join(timeout=1.0)
logger.info(f'"{self.name}" successfully stopped')
else:
logger.info(f'"{self.name}" is not running; nothing to do')
[docs] def pause(self):
"""Requests the thread to pause."""
logger.info(f'Pause requested for "{self.name}"')
if self.running:
self.__paused.set()
logger.info(f'"{self.name}" paused')
logger.info(f'"{self.name}" is not running; nothing to do')
[docs] def resume(self):
"""Requests the thread to resume."""
logger.info(f'Resume requested for "{self.name}"')
if self.paused:
self.__paused.clear()
logger.info(f'"{self.name}" resumed')
logger.info(f'"{self.name}" is not paused; nothing to do')
[docs]class BaseLoop(BaseThread):
"""This is a thread that executes in a separate thread, scheduling
a certain atomic work (encapsulated in the `atomic` method) periodically
as prescribed by the `frequency` parameter. The `run` method takes care
of checking the flags for `paused` and `stopped` so there is no need
to do this in the `atomic` method.
Parameters
----------
name: str
The name of the loop
patience: float
A duration in seconds that the main thread will wait for the
background thread to finish setup activities and indicate that it
is in ``started`` mode.
frequency: float
The loop frequency in [Hz]
warning: float
Indicates a threshold in range [0..1] indicating when
warnings should be logged to the logger in case the execution
frequency is bellow the target. A 0.8 value indicates the real
execution is less than 0.8 * target_frequency. The statistic is
calculated over a period of time specified by the parameter `review`.
throttle: float
Is a float (< 1.0) that is used by the monitoring of
execution statistics to adjust the wait time in order to produce
the desired processing frequency.
review: float
The time in [s] to calculate the statistics for the frequency.
Raises
------
KeyError and ValueError if provided data in the initialization
dictionary are incorrect or missing.
"""
[docs] def __init__(self, name='BASELOOP', patience=1.0, frequency=None,
warning=0.90, throttle=0.1, review=1.0):
super().__init__(name=name, patience=patience)
check_not_empty(frequency, 'frequency', 'loop', self.name, logger)
check_type(frequency, float, 'loop', self.name, logger)
self.__frequency = frequency
self.__actual_frequency = frequency
self.__period = 1.0 / self.__frequency
check_not_empty(warning, 'warning', 'loop', self.name, logger)
check_type(warning, float, 'loop', self.name, logger)
self.__warning = warning
check_not_empty(throttle, 'throttle', 'loop', self.name, logger)
check_type(throttle, float, 'loop', self.name, logger)
self.__throttle = throttle
check_not_empty(review, 'review', 'loop', self.name, logger)
check_type(review, float, 'loop', self.name, logger)
self.__review = review
# to keep statistics
self.__exec_counts = 0
self.__last_count_reset = None
# keeps score of processing and errors
self.__errors = 0
self.__processed = 0
self.__err_stat = (0, 0, 0)
@property
def frequency(self):
"""Loop frequency."""
return self.__frequency
@property
def actual_frequency(self):
"""Returns the actual running frequency that is calculated
by statistics."""
return self.__actual_frequency
@property
def period(self):
"""Loop period = 1 / frequency."""
return self.__period
@property
def warning(self):
"""Control the warning level for the warning message, the **setter**
is smart: if the value is larger than 2 it will assume it is a
percentage and divied it by 100 and ignore if the number is higher
than 110.
The over 100 is available for testing purposes.
"""
return self.__warning
@property
def review(self):
"""Indicates the amount of time in seconds before the thread will
review the actual frequency against the target and take action."""
return self.__review
@warning.setter
def warning(self, value):
if value < 2.0:
self.__warning = value
elif value <= 110:
self.__warning = value / 100.0
@property
def errors(self):
"""Returns the number of errors logged by the statistics."""
return self.__errors
@property
def processed(self):
"""Returns the number of items processed in the current statistics.
The items processed depends on the loop and might be different
from the number of loops executed. For example the one execution
loop might include several communication packets."""
return self.__processed
[docs] def inc_errors(self):
"""Used by subclasses to increment the number of errors."""
self.__errors += 1
[docs] def inc_processed(self):
"""Used by subclasses to increment the number of processed items."""
self.__processed += 1
@property
def error_stat(self):
"""Returns the error statistics as a tuple: (error in %, total errors,
total items)."""
return self.__err_stat
[docs] def run(self):
exec_counts = 0
last_count_reset = time.time()
# adjust = 0.0 # fine adjust the rate
while not self.stopped:
if self.paused:
# paused; reset the statistics
exec_counts = 0
self.__errors = 0
self.__processed = 0
last_count_reset = time.time()
time.sleep(self.period)
else:
start_time = time.time()
self.atomic()
end_time = time.time()
wait_time = self.__period - (end_time - start_time)
if wait_time > 0:
time.sleep(wait_time)
# else:
# logger.debug(f'Loop "{self.name}" took longer to run '
# f'{end_time - start_time:.5f} than '
# f'loop period {self.period:.5f}; check')
# statistics:
exec_counts += 1
if exec_counts >= self.__frequency * self.__review:
exec_time = time.time() - last_count_reset
# actual_freq = exec_counts / exec_time
self.__actual_frequency = exec_counts / exec_time
# rate = actual_freq / self.__frequency
# diff = self.__period - exec_time / exec_counts
# # fine tune the frequency
# adjust += diff * self.__throttle
# if adjust < - self.__period:
# adjust = - self.__period
# if actual_freq < (self.__frequency * self.__warning):
# logger.debug(
# f'Loop "{self.name}" running under '
# f'warning threshold at {actual_freq:.2f}[Hz] '
# f'({rate*100:.0f}%)')
# reset counters
exec_counts = 0
if self.__processed > 0:
rate = self.__errors / self.__processed * 100.0
else:
rate = 0.0
self.__err_stat = (rate, self.__errors, self.__processed)
self.__errors = 0
self.__processed = 0
last_count_reset = time.time()
[docs] def atomic(self):
"""This method implements the periodic task that needs to be
executed. It does not need to check `paused` or `stopped` as the
`run` method does this already and the subclasses should make sure
that the implementation completes quickly and does not raise any
exceptions.
"""
raise NotImplementedError