Source code for roboglia.base.thread

# Copyright (C) 2020  Alex Sonea

# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.

# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.

# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <https://www.gnu.org/licenses/>.

import threading
import time
import logging

from ..utils import check_type, check_not_empty

logger = logging.getLogger(__name__)


[docs]class BaseThread(): """Implements a class that wraps a processing logic that is executed in a separate thread with the ability to pause / resume or fully stop the task. The main processing should be implemented in the `run` method where the subclass should make sure that it checks periodically the status (`paused` or `stopped`) and behave appropriately. The `run` can be flanked by the `setup` and `teardown` methods where subclasses can implement logic needed before the main processing is started or finished. This becomes very handy for loops that normally prepare the work, then run for an indefinite time, and later are closed when the owner signals. Parameters ---------- name: str The name of the thread. patience: float A duration in seconds that the main thread will wait for the background thread to finish setup activities and indicate that it is in ``started`` mode. """
[docs] def __init__(self, name='THREAD', patience=1.0): # name should have been checked by the robot self.__name = name check_not_empty(patience, 'patience', 'thread', self.name, logger) check_type(patience, (float, int), 'thread', self.name, logger) self.__patience = patience self.__started = threading.Event() self.__paused = threading.Event() self.__crashed = False self.__thread = None
@property def name(self): """Returns the name of the thread.""" return self.__name
[docs] def setup(self): """Thread preparation before running. Subclasses should override""" pass
[docs] def run(self): """ Run method of the thread. .. note: In order to be stoppable (resp. pausable), this method has to check the running property - as often as possible to improve responsiveness - and terminate when :meth:`should_stop` (resp. :py:meth:`should_pause`) becomes True. For instance:: while <some condition for work>: if not self.paused: do_atom_work() if self.stopped: break ... """ raise NotImplementedError
[docs] def teardown(self): """Thread cleanup. Subclasses should override.""" pass
@property def started(self): """Indicates if the thread was started.""" return self.__started.is_set() @property def stopped(self): """Indicates if the thread was stopped.""" return not self.__started.is_set() @property def running(self): """Indicates if the thread is running.""" return self.__started.is_set() and not self.__paused.is_set() @property def paused(self): """Indicates the thread was paused.""" return self.__started.is_set() and self.__paused.is_set() def _wrapped_target(self): """Wraps the execution of the task between the setup() and teardown() and sets / resets the events.""" try: self.setup() self.__started.set() self.__paused.clear() self.run() self.__started.clear() self.teardown() except Exception: self.__crashed = True self.__started.clear() self.__paused.clear() raise
[docs] def start(self, wait=True): """Starts the task in it's own thread.""" logger.info(f'Start requested for "{self.name}"') if self.running: logger.info(f'"{self.name}" already running. Stopping first.') self.stop() self.__thread = threading.Thread(target=self._wrapped_target) self.__thread.daemon = True self.__thread.name = self.name logger.info(f'"{self.name}" starting') self.__thread.start() if wait and (threading.current_thread() != self.__thread): if not self.__started.wait(self.__patience): if self.__crashed: mess = f'Setup failed, for "{self.__thread.name}".' else: mess = f'Setup took longer than {self.__patience}s ' + \ f'for "{self.__thread.name}"' self.__thread.join() logger.critical(mess) raise RuntimeError(mess) logger.info(f'"{self.name}" successfully started')
[docs] def stop(self, wait=True): """Sends the stopping signal to the thread. By default waits for the thread to finish.""" logger.info(f'Stop requested for "{self.name}"') if self.started: self.__started.clear() self.__paused.clear() logger.info(f'"{self.name}" stopping') if wait and (threading.current_thread() != self.__thread): while self.__thread.is_alive(): self.__started.clear() self.__thread.join(timeout=1.0) logger.info(f'"{self.name}" successfully stopped') else: logger.info(f'"{self.name}" is not running; nothing to do')
[docs] def pause(self): """Requests the thread to pause.""" logger.info(f'Pause requested for "{self.name}"') if self.running: self.__paused.set() logger.info(f'"{self.name}" paused') logger.info(f'"{self.name}" is not running; nothing to do')
[docs] def resume(self): """Requests the thread to resume.""" logger.info(f'Resume requested for "{self.name}"') if self.paused: self.__paused.clear() logger.info(f'"{self.name}" resumed') logger.info(f'"{self.name}" is not paused; nothing to do')
[docs]class BaseLoop(BaseThread): """This is a thread that executes in a separate thread, scheduling a certain atomic work (encapsulated in the `atomic` method) periodically as prescribed by the `frequency` parameter. The `run` method takes care of checking the flags for `paused` and `stopped` so there is no need to do this in the `atomic` method. Parameters ---------- name: str The name of the loop patience: float A duration in seconds that the main thread will wait for the background thread to finish setup activities and indicate that it is in ``started`` mode. frequency: float The loop frequency in [Hz] warning: float Indicates a threshold in range [0..1] indicating when warnings should be logged to the logger in case the execution frequency is bellow the target. A 0.8 value indicates the real execution is less than 0.8 * target_frequency. The statistic is calculated over a period of time specified by the parameter `review`. throttle: float Is a float (< 1.0) that is used by the monitoring of execution statistics to adjust the wait time in order to produce the desired processing frequency. review: float The time in [s] to calculate the statistics for the frequency. Raises ------ KeyError and ValueError if provided data in the initialization dictionary are incorrect or missing. """
[docs] def __init__(self, name='BASELOOP', patience=1.0, frequency=None, warning=0.90, throttle=0.1, review=1.0): super().__init__(name=name, patience=patience) check_not_empty(frequency, 'frequency', 'loop', self.name, logger) check_type(frequency, float, 'loop', self.name, logger) self.__frequency = frequency self.__actual_frequency = frequency self.__period = 1.0 / self.__frequency check_not_empty(warning, 'warning', 'loop', self.name, logger) check_type(warning, float, 'loop', self.name, logger) self.__warning = warning check_not_empty(throttle, 'throttle', 'loop', self.name, logger) check_type(throttle, float, 'loop', self.name, logger) self.__throttle = throttle check_not_empty(review, 'review', 'loop', self.name, logger) check_type(review, float, 'loop', self.name, logger) self.__review = review # to keep statistics self.__exec_counts = 0 self.__last_count_reset = None # keeps score of processing and errors self.__errors = 0 self.__processed = 0 self.__err_stat = (0, 0, 0)
@property def frequency(self): """Loop frequency.""" return self.__frequency @property def actual_frequency(self): """Returns the actual running frequency that is calculated by statistics.""" return self.__actual_frequency @property def period(self): """Loop period = 1 / frequency.""" return self.__period @property def warning(self): """Control the warning level for the warning message, the **setter** is smart: if the value is larger than 2 it will assume it is a percentage and divied it by 100 and ignore if the number is higher than 110. The over 100 is available for testing purposes. """ return self.__warning @property def review(self): """Indicates the amount of time in seconds before the thread will review the actual frequency against the target and take action.""" return self.__review @warning.setter def warning(self, value): if value < 2.0: self.__warning = value elif value <= 110: self.__warning = value / 100.0 @property def errors(self): """Returns the number of errors logged by the statistics.""" return self.__errors @property def processed(self): """Returns the number of items processed in the current statistics. The items processed depends on the loop and might be different from the number of loops executed. For example the one execution loop might include several communication packets.""" return self.__processed
[docs] def inc_errors(self): """Used by subclasses to increment the number of errors.""" self.__errors += 1
[docs] def inc_processed(self): """Used by subclasses to increment the number of processed items.""" self.__processed += 1
@property def error_stat(self): """Returns the error statistics as a tuple: (error in %, total errors, total items).""" return self.__err_stat
[docs] def run(self): exec_counts = 0 last_count_reset = time.time() # adjust = 0.0 # fine adjust the rate while not self.stopped: if self.paused: # paused; reset the statistics exec_counts = 0 self.__errors = 0 self.__processed = 0 last_count_reset = time.time() time.sleep(self.period) else: start_time = time.time() self.atomic() end_time = time.time() wait_time = self.__period - (end_time - start_time) if wait_time > 0: time.sleep(wait_time) # else: # logger.debug(f'Loop "{self.name}" took longer to run ' # f'{end_time - start_time:.5f} than ' # f'loop period {self.period:.5f}; check') # statistics: exec_counts += 1 if exec_counts >= self.__frequency * self.__review: exec_time = time.time() - last_count_reset # actual_freq = exec_counts / exec_time self.__actual_frequency = exec_counts / exec_time # rate = actual_freq / self.__frequency # diff = self.__period - exec_time / exec_counts # # fine tune the frequency # adjust += diff * self.__throttle # if adjust < - self.__period: # adjust = - self.__period # if actual_freq < (self.__frequency * self.__warning): # logger.debug( # f'Loop "{self.name}" running under ' # f'warning threshold at {actual_freq:.2f}[Hz] ' # f'({rate*100:.0f}%)') # reset counters exec_counts = 0 if self.__processed > 0: rate = self.__errors / self.__processed * 100.0 else: rate = 0.0 self.__err_stat = (rate, self.__errors, self.__processed) self.__errors = 0 self.__processed = 0 last_count_reset = time.time()
[docs] def atomic(self): """This method implements the periodic task that needs to be executed. It does not need to check `paused` or `stopped` as the `run` method does this already and the subclasses should make sure that the implementation completes quickly and does not raise any exceptions. """ raise NotImplementedError