Source code for roboglia.base.bus

# Copyright (C) 2020  Alex Sonea

# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.

# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.

# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <https://www.gnu.org/licenses/>.

import logging
import threading

from ..utils import check_type, check_options, check_not_empty


logger = logging.getLogger(__name__)


[docs]class BaseBus(): """A base abstract class for handling an arbitrary bus. You will normally subclass ``BaseBus`` and define particular functionality specific to the bus by implementing the methods of the ``BaseBus``. This class only stores the name of the bus and the access to the physical object. Your subclass can add additional attributes and methods to deal with the particularities of the real bus represented. Parameters ---------- name: str The name of the bus robot: BaseRobot A reference to the robot using the bus port: any An identification for the physical bus access. Some busses have string description like ``/dev/ttySC0`` while others could be just integers (like in the case of I2C or SPI buses) auto: Bool If ``True`` the bus will be opened when the robot is started by calling :py:meth:`BaseRobot.start`. If ``False`` the bus will be left closed during robot initialization and needs to be opened by the programmer. Raises: KeyError: if ``port`` not supplied """
[docs] def __init__(self, name='BUS', robot=None, port='', auto=True): # already checked by robot # check_not_empty(robot, 'robot', 'bus', name, logger) check_not_empty(port, 'port', 'bus', name, logger) self.__name = name self.__robot = robot self.__port = port self.__auto_open = auto check_options(self.__auto_open, [True, False], 'bus', self.__name, logger)
@property def name(self): """(read-only) the bus name.""" return self.__name @property def robot(self): """The robot that owns the bus.""" return self.__robot @property def port(self): """(read-only) the bus port.""" return self.__port @property def auto_open(self): """Indicates if the bus should be opened by the robot when initializing.""" return self.__auto_open
[docs] def open(self): """Opens the actual physical bus. Must be overridden by the subclass. """ raise NotImplementedError
[docs] def close(self): """Closes the actual physical bus. Must be overridden by the subclass, but the implementation in the subclass should first check for the return from this method before actually closing the bus as dependent object on this bus might be affected:: def close(self): if super().close() ... do the close activities # optional; the handling in the ``BaseBus.close()`` will # issue error message to log else: logger.<level>('message') """ if self.robot: for sync in list(self.robot.syncs.values()): # we need to compare by names and not by object ids because # sync.bus == self will not work: # sync.bus could be a SharedBus and # self will be the base bus (ex. FileBus or Dynamixel Bus) if sync.bus.name == self.name and sync.started: logger.error(f'Attempted to close bus {self.name} that is ' 'used by running syncs') return False return True
[docs] def __repr__(self): """Returns a representation of a BaseBus that includes the name of the class, the port and the status (open or closed).""" return f'<{self.__class__.__name__} port={self.port} ' + \ f'open={self.is_open}>'
@property def is_open(self): """Returns `True` or `False` if the bus is open. Must be overridden by the subclass. """ raise NotImplementedError
[docs] def read(self, reg): """Reads one register information from the bus. Must be overridden. Parameters ---------- reg: BaseRegister or subclass The register object that needs to be read. Keep in mind that the register object also contains a reference to the device in the ``device`` attribute and it is up to the subclass to determine the way the information must be processed before providing it to the caller. Returns ------- int Typically it would return an ``int`` that will have to be handled by the caller. """ raise NotImplementedError
[docs] def write(self, reg, val): """Writes one register information from the bus. Must be overridden. Parameters ---------- reg: BaseRegister or subclass The register object that needs to be written. Keep in mind that the register object also contains a reference to the device in the ``device`` attribute and it is up to the subclass to determine the way the information must be processed before providing it actual device. val: int The value needed to the written to the device. """ raise NotImplementedError
[docs]class FileBus(BaseBus): """A bus that writes to a file with cache provided for testing purposes. Writes by this class are send to a file stream and also buffered in a local memory. Reads use this buffer to return values or use the default values from the register defintion. Same parameters as :py:class:`BaseBus`. """
[docs] def __init__(self, name='FILEBUS', robot=None, port='', auto=True): super().__init__(name=name, robot=robot, port=port, auto=auto) self.__fp = None self.__last = {} logger.debug(f'FileBus "{self.name}" initialized')
[docs] def open(self): """Opens the file associated with the ``FileBus``.""" if self.is_open: logger.warning(f'bus {self.name} already open') else: self.__fp = open(self.port, 'w') logger.debug(f'FileBus {self.name} opened')
[docs] def close(self): """Closes the file associated with the ``FileBus``.""" if self.is_open: if super().close(): self.__fp.close() logger.debug(f'FileBus {self.name} closed')
@property def is_open(self): """Returns ``True`` is the file is opened.""" return False if not self.__fp else not self.__fp.closed
[docs] def write(self, reg, value): """Updates the values in the FileBus. The method will update the buffer with the value provided then will log the write on the file. A flush() is performed in case you want to inspect the content of the file while the robot is running. File writing errors are intercepted and logged but no Exception is raised. Parameters ---------- reg: BaseRegister or subclass The register object that needs to be written. Keep in mind that the register object also contains a reference to the device in the ``device`` attribute and it is up to the subclass to determine the way the information must be processed before providing it actual device. value: int The value needed to the written to the device. """ if not self.is_open: logger.error(f'attempt to write to closed bus {self.name}') else: self.__last[(reg.device.dev_id, reg.address)] = value text = f'written {value} in register "{reg.name}"" ' + \ f'({reg.address}) of device "{reg.device.name}" ' + \ f'({reg.device.dev_id})' try: self.__fp.write(text + '\n') self.__fp.flush() except Exception: # pragma: no cover logger.error(f'error executing write and flush to file ' f'for bus: {self.name}') logger.debug(f'FileBus "{self.name}" {text}')
[docs] def read(self, reg): """Reads the value from the buffer of ``FileBus`` and logs it. The method intercepts the ``raise`` errors from writing to the physical file and converts them to errors in the log file so that the rest of the program can continue uninterrupted. The method will try to read from the buffer the value. If there is no value in the buffer it will be defaulted from the register's default value. The method will log the read to the file and return the value. Parameters ---------- reg: BaseRegister or subclass The register object that needs to be read. Keep in mind that the register object also contains a reference to the device in the ``device`` attribute and it is up to the subclass to determine the way the information must be processed before providing it to the caller. Returns ------- int Typically it would return an ``int`` that will have to be handled by the caller. """ if not self.is_open: logger.error(f'attempt to read from closed bus {self.name}') return None # normal processing if (reg.device.dev_id, reg.address) not in self.__last: self.__last[(reg.device.dev_id, reg.address)] = reg.default val = self.__last[(reg.device.dev_id, reg.address)] text = f'read {val} from register "{reg.name}" ({reg.address}) ' +\ f'of device "{reg.device.name}" ({reg.device.dev_id})' try: self.__fp.write(text+'\n') self.__fp.flush() except Exception: # pragma: no cover logger.error(f'error executing write and flush to file ' f'for bus: {self.name}') logger.debug(f'FileBus "{self.name}" {text}') return val
[docs] def __str__(self): """The string representation of the ``FileBus`` is a dump of the internal buffer. """ result = '' for (dev_id, reg_address), value in list(self.__last.items()): result += f'Device {dev_id}, Register ID {reg_address}: ' + \ f'VALUE {value}\n' return result
[docs]class SharedBus(): """Implements a bus that provides a locking mechanism for the access to the underlying hardware, aimed specifically for use in multi-threaded environments where multiple jobs could compete for access to one single bus. .. note:: This class implements ``__getattr__`` so that any calls to an instance of this class that are not already implemented bellow will be passed to the internal instance of ``BusClass`` that was created at instantiation. This way you can access all the attributes and methods of the ``BusClass`` instance transparently, as long as they are not already overridden by this class. Parameters ---------- BusClass: BaseBus subclass The class that will be wrapped by the ``SharedBus`` timeout: float A timeout for acquiring the lock that controls the access to the bus **kwargs: keyword arguments that are passed to the BusClass for instantiation """
[docs] def __init__(self, BusClass, timeout=0.5, **kwargs): self.__main_bus = BusClass(**kwargs) self.__timeout = timeout check_type(self.__timeout, float, 'bus', self.__main_bus.name, logger) if self.__timeout > 0.5: logger.warning(f'timeout {self.__timeout} for shareable ' f'{self.__main_bus.name} might be excessive.') self.__lock = threading.Lock()
@property def lock(self): return self.__lock @property def timeout(self): """Returns the timeout for requesting access to lock.""" return self.__timeout
[docs] def can_use(self): """Tries to acquire the resource on behalf of the caller. This method should be called every time a user of the bus wants to perform an operation. If the result is ``False`` the user does not have exclusive use of the bus and the actions are not guaranteed. .. warning:: It is the responsibility of the user to call :py:meth:`~SharedBus.stop_using` as soon as possible after preforming the intended work with the bus if this method grants it access. Failing to do so will result in the bus being blocked by this user and prohibiting other users to access it. Returns ------- bool ``True`` if managed to acquire the resource, ``False`` if not. It is the responsibility of the caller to decide what to do in case there is a ``False`` return including logging or Raising. """ return self.__lock.acquire(timeout=self.__timeout)
[docs] def stop_using(self): """Releases the resource.""" self.__lock.release()
[docs] def naked_read(self, reg): """Calls the main bus read without invoking the lock. This is intended for those users that plan to use a series of read operations and they do not want to lock and release the bus every time, as this adds some overhead. Since the original bus' ``read`` method is overridden (see below), any calls to ``read`` from a user will result in using the wrapped version defined in this class. Therefore in the scenario that the user wants to execute a series of quick reads the ``naked_read`` can be used as long as the user wraps the calls correctly for obtaining exclusive access:: if bus.can_use(): val1 = bus.naked_read(reg1) val2 = bus.naked_read(reg2) val3 = bus.naked_read(reg3) ... bus.stop_using() else: logger.warning('some warning') Parameters ---------- reg: BaseRegister or subclass The register object that needs to be read. Keep in mind that the register object also contains a reference to the device in the ``device`` attribute and it is up to the subclass to determine the way the information must be processed before providing it to the caller. Returns ------- int Typically it would return an ``int`` that will have to be handled by the caller. """ return self.__main_bus.read(reg)
[docs] def naked_write(self, reg, value): """Calls the main bus write without invoking the lock. This is intended for those users that plan to use a series of write operations and they do not want to lock and release the bus every time, as this adds some overhead. Since the original bus' ``write`` method is overridden (see below), any calls to ``write`` from a user will result in using the wrapped version defined in this class. Therefore in the scenario that the user wants to execute a series of quick writes the ``naked_write`` can be used as long as the user wraps the calls correctly for obtaining exclusive access:: if bus.can_use(): val1 = bus.naked_write(reg1, val1) val2 = bus.naked_write(reg2, val2) val3 = bus.naked_write(reg3, val3) ... bus.stop_using() else: logger.warning('some warning') Parameters ---------- reg: BaseRegister or subclass The register object that needs to be read. Keep in mind that the register object also contains a reference to the device in the ``device`` attribute and it is up to the subclass to determine the way the information must be processed before providing it to the caller. value: int The value needed to the written to the device. """ self.__main_bus.write(reg, value)
[docs] def read(self, reg): """Overrides the main bus' :py:meth:`~roboglia.base.BaseBus.read` method and performs a **safe** read by wrapping the read call in a request to acquire the bus. If the method is not able to acquire the bus in time (times out) it will log an error and return ``None``. Parameters ---------- reg: BaseRegister or subclass The register object that needs to be read. Keep in mind that the register object also contains a reference to the device in the ``device`` attribute and it is up to the subclass to determine the way the information must be processed before providing it to the caller. Returns ------- int: The value read for this register or ``None`` is the call failed to secure with bus within the ``timeout``. """ if self.can_use(): value = self.__main_bus.read(reg) self.stop_using() return value # couldn't acquire logger.error(f'failed to acquire bus {self.__main_bus.name}') return None
[docs] def write(self, reg, value): """Overrides the main bus' `~roboglia.base.BaseBus.write` method and performs a **safe** write by wrapping the main bus write call in a request to acquire the bus. If the method is not able to acquire the bus in time (times out) it will log an error. Parameters ---------- reg: BaseRegister or subclass The register object that needs to be read. Keep in mind that the register object also contains a reference to the device in the ``device`` attribute and it is up to the subclass to determine the way the information must be processed before providing it to the caller. value: int The value to be written to the device. """ if self.can_use(): self.__main_bus.write(reg, value) self.stop_using() else: logger.error(f'failed to acquire bus {self.__main_bus.name}')
[docs] def __repr__(self): """Invokes the main bus representation but changes the class name with the "Shared" class name to show a more accurate picture of the object.""" ans = self.__main_bus.__repr__() ans = ans.replace(self.__main_bus.__class__.__name__, self.__class__.__name__) return ans
[docs] def __getattr__(self, name): """Forwards all unanswered calls to the main bus instance.""" return getattr(self.__main_bus, name)
[docs]class SharedFileBus(SharedBus): """This is a :py:class:`FileBus` class that was wrapped for access to a shared resource. All :py:class:`FileBus` methods and attributes are accessible transparently but please be aware that the methods ``read`` and ``write`` are now **safe**, wrapped around calls to :py:meth:`SharedBus.can_use` and :py:meth:`SharedBus.stop_using`. Additionally the two new access methods :py:meth:`~SharedBus.naked_read` and :py:meth:`~SharedBus.naked_write` are available. .. note:: You should always use a ``SharedFileBus`` class if you plan to use sync loops that run in separate threads and they will have access to the same bus. ``SharedFileBus`` inherits all the paramters from :py:class:`FileBus` as well as the ones from the meta-class :py:class:`SharedBus`. Please refer to these for a detail documentation of the parameters. """
[docs] def __init__(self, **kwargs): super().__init__(FileBus, **kwargs)
[docs] def __str__(self): return FileBus.__str__(self)