Source code for roboglia.base.device

# Copyright (C) 2020  Alex Sonea

# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.

# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.

# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <https://www.gnu.org/licenses/>.

import os
import logging

from ..utils import get_registered_class, check_not_empty, \
                    check_type, check_key

from ..utils import load_yaml_with_include

from .bus import BaseBus, SharedBus

logger = logging.getLogger(__name__)


[docs]class BaseDevice(): """A base virtual class for all devices. A ``BaseDevice`` is a surrogate representation of an actual device, characterized by a number of internal registers that can be read or written to by the means of a comunication bus. Any device is based on a ``model`` that identifies the ``.yml`` file describing the structure of the device (the registers). .. note: Device defintion files are searched in a path provided by the method :py:meth:`get_model_path` unless a specific path is provided in the `path` parameter. This way, if no `path` is provided the specific device class can use different locations to place the files. For instance `BaseDevice` will provide the location ``roboglia/base/devices/``, ``DynamixelDevice`` will provide ``roboglia/dynamixel/devices/``, ``I2CDevice`` will provide ``roboglia/i2c/devices/``, etc. If you want to use a device that does not exist in ``roboglia`` and for which you have created a YAML file you can indicate the directory where the file is located with the `path` paramters and the name of the file in the `model` parameter. .. warning: If you plan to use ``auto`` in the device or have initializations in `init` parameter you have to make sure that the associated bus is also marked with ``auto: True``, otherwise the reads and writes during the opening of the device will fail with ``attempt to read(write) from(to) a closed bus. Parameters ---------- name: str The name of the device bus: BaseBus or subclass The bus object where the device is attached to id: int The device ID on the bus. Typically it is an ``int`` but some buses may use a different identifier. The processing should still work fine. model: str A string used to identify the device description. Please see the note bellow regarding the position of the device description files. path: str A path to the model file in case you want to use custom defined devices that are not available in the ``roboglia`` repository. Please see the note bellow regarding the position of the device description files. inits: list A list of init templates to be applied to the device's registers when the :py:meth:`~open` method is called, where template names were defined earier in the robot definition in the ``inits`` section. Please note the initialization values should be provided in the **external** format of the register as they will be used as:: register.value = dict_value As no syncs are currently implemented this will automatically trigger a ``write`` call to store that value in the device. Raises ------ KeyError if mandatory parameters are not found or unexpected values are used (ex. for boolean) """ cache = {} """A chache of device models that is updated when a new model is encountered and reused when the same model is requested during device creation. """
[docs] def __init__(self, name='DEVICE', bus=None, dev_id=None, model=None, path=None, inits=[], **kwargs): # these are already checked by robot self.__name = name check_not_empty(bus, 'bus', 'device', name, logger) check_type(bus, [SharedBus, BaseBus], 'device', name, logger) self.__bus = bus check_not_empty(dev_id, 'dev_id', 'device', name, logger) self.__dev_id = dev_id check_not_empty(model, 'model', 'device', name, logger) check_type(model, str, 'device', name, logger) self.__model = model # registers if not path: path = self.get_model_path() model_file = os.path.join(path, model + '.yml') if model_file not in BaseDevice.cache: model_ini = load_yaml_with_include(model_file) BaseDevice.cache[model_file] = model_ini else: model_ini = BaseDevice.cache[model_file] self.__registers = {} self.__reg_by_addr = {} clones = [] for reg_name, reg_info in list(model_ini['registers'].items()): # add name to the dictionary reg_info['name'] = reg_name reg_info['device'] = self if reg_info.get('clone', False): # we register clones at the end after we have the main # registers so that we can refer to them clones.append(reg_info) else: reg_class_name = reg_info.get('class', self.default_register()) reg_class = get_registered_class(reg_class_name) new_register = reg_class(**reg_info) # we add as an attribute of the register too self.__dict__[reg_name] = new_register self.__registers[reg_name] = new_register self.__reg_by_addr[reg_info['address']] = new_register # now the clones for reg_info in clones: # check that the register address is covered by a main register check_key('address', reg_info, 'register', reg_info['name'], logger) check_key(reg_info['address'], self.__reg_by_addr, 'register', reg_info['name'], logger, f'no main register with address {reg_info["address"]} ' 'defined') reg_info['clone'] = self.register_by_address(reg_info['address']) reg_name = reg_info['name'] reg_class_name = reg_info.get('class', self.default_register()) reg_class = get_registered_class(reg_class_name) new_register = reg_class(**reg_info) # we add as an attribute of the register too self.__dict__[reg_name] = new_register self.__registers[reg_name] = new_register self.__inits = inits
@property def name(self): """Device name. Returns ------- str: The name of the device """ return self.__name @property def registers(self): """Device registers as dict. Returns ------- dict: The dictionary of registers with the register name as key. """ return self.__registers
[docs] def register_by_address(self, address): """Returns the register identified by the given address. If the address is not available in the device it will return ``None``. Returns ------- BaseDevice or subclass or ``None``: The device at `address` or ``None`` if no register with that address exits. """ return self.__reg_by_addr.get(address, None)
@property def dev_id(self): """The device number. Returns ------- int: The device number """ return self.__dev_id @property def bus(self): """The bus where the device is connected to. Returns ------- BaseBus or SharedBus or subclass: The bus object using this device. """ return self.__bus
[docs] def get_model_path(self): """Builds the path to the device description documents. By default it will return the path to the `roboglia/base/devices/` directory. Returns ------- str A full document path. """ return os.path.join(os.path.dirname(__file__), 'devices')
[docs] def default_register(self): """Default register for the device in case is not explicitly provided in the device definition file. Subclasses of ``BaseDevice`` can overide the method to derive their own class. ``BaseDevice`` suggests as default register :py:class:`BaseRegister`. """ return 'BaseRegister'
[docs] def read_register(self, register): """Implements the read of a register using the associated bus. More complex devices should overwrite the method to provide specific functionality. ``BaseDevice`` simply calls the bus's ``read`` function and returns the value received. """ return self.bus.read(register)
[docs] def write_register(self, register, value): """Implements the write of a register using the associated bus. More complex devices should overwrite the method to provide specific functionality. ``BaseDevice`` simply calls the bus's ``write`` function and returns the value received. """ self.bus.write(register, value)
[docs] def open(self): """Performs initialization of the device by reading all registers that are not flagged for ``sync`` replication and, if ``init`` parameter provided initializes the indicated registers with the values from the ``init`` paramters. """ if self.__inits: logger.info(f'Initializing device "{self.name}"') for init in self.__inits: for reg_name, value in list(init.items()): if reg_name not in self.registers: logger.warning(f'Register "{reg_name}" does not exist in ' f'device "{self.name}"; ' 'skipping initialization') else: register = self.registers[reg_name] if value is None: register.read() logger.debug(f'Register "{reg_name}" read') else: register.value = value logger.debug( f'Register "{reg_name}" updated to {value} ' f'(int_value: {register.int_value})')
[docs] def close(self): """Perform device closure. ``BaseDevice`` implementation does nothing.""" pass
[docs] def __str__(self): result = f'Device: {self.name}, ID: {self.dev_id} ' + \ f'on bus: {self.bus.name}:\n' for reg in list(self.registers.values()): result += f'\t{reg}\n' return result