Source code for roboglia.base.sync

# Copyright (C) 2020  Alex Sonea

# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.

# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.

# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <https://www.gnu.org/licenses/>.

import logging
from .thread import BaseLoop
from .bus import SharedBus
from ..utils import check_key, check_type, check_options, check_not_empty

logger = logging.getLogger(__name__)


[docs]class BaseSync(BaseLoop): """Base processing for a sync loop. This class is intended to be subclassed to provide specific functionality. It only parses the common elements that a sync loop would need: the devices (provided by a group) and registers (provided by a list). It will check that the provided devices are on the same bus and that the provided registers exist in all devices. .. note:: Please note that this class does not actually perform any sync. Use the subclasses :py:class:`BaseReadSync` or :py:class:`BaseWriteSync` that implement read or write syncs. ``BaseSync`` inherits the parameters from :py:class:`BaseLoop`. In addition it includes the following parameters. Parameters ---------- name: str The name of the sync patience: float A duration in seconds that the main thread will wait for the background thread to finish setup activities and indicate that it is in ``started`` mode. frequency: float The sync frequency in [Hz] warning: float Indicates a threshold in range [0..1] indicating when warnings should be logged to the logger in case the execution frequency is bellow the target. A 0.8 value indicates the real execution is less than 0.8 * target_frequency. The statistic is calculated over a period of time specified by the parameter `review`. throttle: float Is a float (< 1.0) that is used by the monitoring of execution statistics to adjust the wait time in order to produce the desired processing frequency. review: float The time in [s] to calculate the statistics for the frequency. group: set The set with the devices used by sync; normally the robot constructor replaces the name of the group from YAML file with the actual set built earlier in the initialization. registers: list of str A list of register names (as strings) used by the sync auto: bool If the sync loop should start automatically when the robot starts; defaults to ``True`` Raises ------ KeyError: if mandatory parameters are not found """
[docs] def __init__(self, name='BASESYNC', patience=1.0, frequency=None, warning=0.90, throttle=0.1, review=1.0, group=None, registers=[], auto=True): super().__init__(name=name, patience=patience, frequency=frequency, warning=warning, throttle=throttle, review=review) check_not_empty(group, 'group', 'sync', self.name, logger) check_type(group, set, 'sync', self.name, logger) self.__devices = list(group) self.__bus = self.process_devices() check_type(self.__bus, SharedBus, 'sync', self.name, logger) check_not_empty(registers, 'registers', 'sync', self.name, logger) check_type(registers, list, 'sync', self.name, logger) self.__reg_names = registers check_options(auto, [True, False], 'sync', self.name, logger) self.__auto_start = auto self.__all_registers = [] self.process_registers()
@property def auto_start(self): """Shows if the sync should be started automatically when the robot starts. """ return self.__auto_start @property def bus(self): """The bus this sync works with.""" return self.__bus @property def devices(self): """The devices used by the sync.""" return self.__devices @property def register_names(self): """The register names used by the sync.""" return self.__reg_names @property def all_registers(self): return self.__all_registers
[docs] def process_devices(self): """Processes the provided devices. The devices are exected as a set in the `init_dict`. This is normally performed by the robot class when reading the robot definition by replacing the name of the group with the actual content of the group. This method checks that all devices are assigned to the same bus otherwise raises an exception. It returns the single instance of the bus that manages all devices. """ buses = set([device.bus for device in self.__devices]) if len(buses) > 1: mess = f'Devices used for sync {self.name} should be ' + \ 'connected to a single bus.' logger.critical(mess) raise ValueError(mess) elif len(buses) == 0: mess = f'You need at least one device for sync {self.name}.' logger.critical(mess) raise ValueError(mess) # there must be only one! one_bus = buses.pop() return one_bus
[docs] def process_registers(self): """Checks that the supplied registers are available in all devices.""" for device in self.__devices: for reg_name in self.register_names: check_key(reg_name, device.registers, 'sync', self.name, logger, f'device {device.name} does not have a ' f'register {reg_name}') # mark the register for sync reg_obj = getattr(device, reg_name) # add the objest in the list so that we don't need # to loop over devices and registers and use getattr() # during the atomic() processing self.__all_registers.append(reg_obj)
[docs] def get_register_range(self): """Determines the start address of the range of registers and the whole length. Registers do not need to be order, but be careful that not all communication protocols can support gaps in the bulk read of registers. Returns ------- int The start address of the range int The length covering all the registers (including gaps) bool True is the range of registers is contiguous """ start_address = 65536 end_address = 0 last_length = 0 reg_length = 0 # pick the first device; we expect all to have the same registers device = self.devices[0] for reg_name in self.register_names: register = getattr(device, reg_name) if register.address < start_address: start_address = register.address if register.address > end_address: end_address = register.address last_length = register.size reg_length += register.size length = end_address + last_length - start_address return start_address, length, reg_length == length
[docs] def start(self): """Checks that the bus is open, then refreshes the register, sets the ``sync`` flag before calling the inherited :py:meth:BaseLoop.`start. """ if not self.bus.is_open: logger.error(f'sync {self.name}: attempt to start with a bus ' f'not open') else: for reg in self.all_registers: if reg.sync: logger.warning(f'Register "{reg.name}" of device ' f'"{reg.device.name}" is already marked ' 'for "sync" - it should not happen') else: # refresh the register before setting it for sync reg.read() reg.sync = True logger.debug(f'Setting register "{reg.name}" of device ' f'"{reg.device.name}" sync=True') super().start()
[docs] def stop(self): """Before calling the inherited method it un-flags the registers for syncing.""" for reg in self.all_registers: reg.sync = False super().stop()
[docs]class BaseReadSync(BaseSync): """A SyncLoop that performs a naive read of the registers by sequentially calling the ``read`` on each of them. It wraps the processing between buses' ``can_use()`` and ``stop_using()`` methods and uses ``naked_read`` instead of the ``read`` method. """
[docs] def atomic(self): """Implements the read of the registers. This is a naive implementation that will simply loop over all devices and registers and ask them to refresh. """ if self.bus.can_use(): for reg in self.all_registers: value = self.bus.naked_read(reg) logger.debug(f'Read {value} for device "{reg.device.name}" ' f'register "{reg.name}"') if value is not None: reg.int_value = value else: logger.warning(f'Sync "{self.name}": failed to read ' f'register "{reg.name}" ' f'of device "{reg.device.name}"') self.bus.stop_using() else: logger.error(f'Failed to acquire bus "{self.bus.name}"')
[docs]class BaseWriteSync(BaseSync): """A SyncLoop that performs a naive write of the registers by sequentially calling the ``read`` on each of them. It wraps the processing between buses' ``can_use()`` and ``stop_using()`` methods and uses ``naked_write`` instead of the ``write`` method. """
[docs] def atomic(self): """Implements the writing of the registers. This is a naive implementation that will simply loop over all devices and registers and ask them to refresh. """ if self.bus.can_use(): for reg in self.all_registers: self.bus.naked_write(reg, reg.int_value) logger.debug(f'Wrote {reg.int_value} for device ' f'"{reg.device.name}" register "{reg.name}"') self.bus.stop_using() else: logger.error(f'Failed to acquire bus "{self.bus.name}"')