Source code for roboglia.base.robot

# Copyright (C) 2020  Alex Sonea

# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.

# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.

# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <https://www.gnu.org/licenses/>.

import yaml
import logging
import threading
import statistics
import time

from ..utils import get_registered_class, check_key, check_type, check_options
from .thread import BaseLoop
from .joint import Joint, PVL, PVLList

logger = logging.getLogger(__name__)


[docs]class BaseRobot(): """A complete representation of a robot. A robot has at minimum one ``Bus`` and one ``Device``. You can create a robot programatically by calling the constructor and providing all the parameters required or use an initialization dictionary or a YAML file. The last option is the preferred one considering the volume of information usually needed to describe a robot. For initializing a robot from a dictionary definition use :py:meth:`~BaseRobot.from_dict` class method. For instantiating from a YAML file use :py:meth:`~BaseRobot.from_yaml` class method. Parameters ---------- name: str the name of the robot; will default to **ROBOT** buses: dict a dictionary with buses definitions; the components of the buses are defined by the attributes of the particular class of the bus inits: dict a dictionary of register initialization; should have the following form:: inits: init_template_1: register_1: value register_2: None # this indicates 'read initialization' init_template_2: register_3: value register_4: value see also the :py:class:`BaseDevice` where the details of the initialization process are described devices: dict a dictionary with the device definitions; the components of devices are defined by the attributes of the particular class of device joints: dict a dictionary with the joint definitions; the components of the joints are defined by the attributes of the particular class of joint sensors: dict a dictionary with the sensors defintion; the components of the sensor are defined by the attributes of the particular class of sensor groups: dict a dictionary with the group definitions; the groups end up unwind in the robot as sets (eliminates duplication) and they are defined by the following components (keys in the dictionary defintion): ``devices`` a list of device names in no particular order, ``joints`` a list of joint names in no particular order, ``sensors`` a list of sensors in no particular order and ``groups`` a list of sub-groups that were previously defined and will be included in the current group. Technically it is possible to mix and match the components of a group (for instance create groups that contain devices, sensors, and joints). syncs: dict a dictionary with sync loops definitions; the components of syncs are defined by the attributes of the particular class of sync. """
[docs] def __init__(self, name='ROBOT', buses={}, inits={}, devices={}, joints={}, sensors={}, groups={}, syncs={}, manager={}): logger.info('***** Initializing robot *************') self.__name = name # if not buses: # message = 'you need at least one bus for the robot' # logger.critical(message) # raise ValueError(message) self.__init_buses(buses) check_type(inits, dict, 'robot', name, logger) self.__inits = inits # if not devices: # message = 'you need at least one device for the robot' # logger.critical(message) # raise ValueError(message) self.__init_devices(devices) self.__init_joints(joints) self.__init_sensors(sensors) self.__init_groups(groups) self.__init_syncs(syncs) self.__init_manager(manager) logger.info('***** Initialization complete ********')
[docs] @classmethod def from_yaml(cls, file_name): """Initializes the robot from a YAML file. It will attempt to read the file and parse it with ``yaml`` library (PyYaml) and then passes it to the :py:meth:`~BaseRobot.from_dict` class method to do further initialization. Parameters ---------- file_name: str The name of the YAML file with the robot definition Raises ------ FileNotFoundError in case the file is not available """ logger.info(f'Creating robot from YAML file {file_name}') with open(file_name, 'r') as f: init_dict = yaml.load(f, Loader=yaml.FullLoader) if len(init_dict) > 1: logger.warning('Only the first robot will be considered.') name = list(init_dict)[0] components = init_dict[name] return BaseRobot(name=name, **components)
def __init_buses(self, buses): """Called by ``__init__`` to parse and instantiate buses.""" self.__buses = {} if buses is None: return logger.info('Settting up buses...') for bus_name, bus_info in list(buses.items()): # add the name in the dict bus_info['name'] = bus_name # add the robot as the parent of the bus bus_info['robot'] = self check_key('class', bus_info, 'bus', bus_name, logger) bus_class = get_registered_class(bus_info['class']) del bus_info['class'] new_bus = bus_class(**bus_info) self.__buses[bus_name] = new_bus logger.info(f'Bus "{bus_name}" added')
[docs] def add_bus(self, bus_obj): """Adds an already instantiated Bus object to the robot. Raises an error in the log if a bus with the same name is already registered and does not register it. Parameters ---------- bus_obj: BaseBus or subclass The bus to be added """ if bus_obj.name not in self.__buses: self.__buses[bus_obj.name] = bus_obj else: logger.error(f'Bus {bus_obj.name} already registered ' 'with the robot')
def __init_devices(self, devices): """Called by ``__init__`` to parse and instantiate devices.""" self.__devices = {} self.__dev_by_id = {} if devices is None: return logger.info('Setting up devices...') for dev_name, dev_info in list(devices.items()): # add the name in the dev_info dev_info['name'] = dev_name check_key('bus', dev_info, 'device', dev_name, logger) check_key(dev_info['bus'], self.buses, 'device', dev_name, logger, f'bus {dev_info["bus"]} does not exist') check_key('class', dev_info, 'device', dev_name, logger) # convert bus names to bus objects bus_name = dev_info['bus'] dev_bus = self.buses[bus_name] dev_info['bus'] = dev_bus # convert init names to objects list_of_inits = dev_info.get('inits', []) check_type(list_of_inits, list, 'device', dev_name, logger) for index, init_name in enumerate(list_of_inits): check_key(init_name, self.inits, 'device', dev_name, logger) list_of_inits[index] = self.inits[init_name] dev_class = get_registered_class(dev_info['class']) new_dev = dev_class(**dev_info) self.__devices[dev_name] = new_dev self.__dev_by_id[dev_info['dev_id']] = new_dev logger.info(f'Device "{dev_name}" added')
[docs] def add_device(self, dev_obj): """Adds an already instantiated Device object to the robot. Raises an error in the log if a device with the same name is already registered and does not register it. Parameters ---------- dev_obj: BaseDevice or subclass The device to be added """ if dev_obj.name not in self.__devices: self.__devices[dev_obj.name] = dev_obj self.__dev_by_id[dev_obj.dev_id] = dev_obj else: logger.error(f'Device {dev_obj.name} already registered ' 'with the robot')
def __init_joints(self, joints): """Called by ``__init__`` to parse and instantiate joints.""" self.__joints = {} logger.info('Setting up joints...') for joint_name, joint_info in list(joints.items()): # add the name in the joint_info joint_info['name'] = joint_name check_key('device', joint_info, 'joint', joint_name, logger) check_key(joint_info['device'], self.devices, 'joint', joint_name, logger, f'device {joint_info["device"]} does not exist') check_key('class', joint_info, 'joint', joint_name, logger) # convert device reference from name to object dev_name = joint_info['device'] device = self.devices[dev_name] joint_info['device'] = device joint_class = get_registered_class(joint_info['class']) new_joint = joint_class(**joint_info) self.__joints[joint_name] = new_joint logger.info(f'Joint "{joint_name}" added') def __init_sensors(self, sensors): """Called by ``__init__`` to parse and instantiate sensors.""" self.__sensors = {} logger.info('Setting up sensors...') for sensor_name, sensor_info in list(sensors.items()): # add the name in the joint_info sensor_info['name'] = sensor_name check_key('device', sensor_info, 'sensor', sensor_name, logger) check_key(sensor_info['device'], self.devices, 'senor', sensor_name, logger, f'device {sensor_info["device"]} does not exist') check_key('class', sensor_info, 'sensor', sensor_name, logger) # convert device reference from name to object dev_name = sensor_info['device'] device = self.devices[dev_name] sensor_info['device'] = device sensor_class = get_registered_class(sensor_info['class']) new_sensor = sensor_class(**sensor_info) self.__sensors[sensor_name] = new_sensor logger.info(f'Sensor "{sensor_name}" added') def __init_groups(self, groups): """Called by ``__init__`` to parse and instantiate groups.""" self.__groups = {} logger.info('Setting up groups...') for grp_name, grp_info in list(groups.items()): new_grp = set() # groups of devices for dev_name in grp_info.get('devices', []): check_key(dev_name, self.devices, 'group', grp_name, logger, f'device {dev_name} does not exist') new_grp.add(self.devices[dev_name]) # groups of joints for joint_name in grp_info.get('joints', []): check_key(joint_name, self.joints, 'group', grp_name, logger, f'joint {joint_name} does not exist') new_grp.add(self.joints[joint_name]) # groups of groups for sub_grp_name in grp_info.get('groups', []): check_key(sub_grp_name, self.groups, 'group', grp_name, logger, f'group {sub_grp_name} does not exist') new_grp.update(self.groups[sub_grp_name]) self.__groups[grp_name] = new_grp logger.info(f'Group "{grp_name}" added') def __init_syncs(self, syncs): """Called by ``__init__`` to parse and instantiate syncs.""" self.__syncs = {} logger.info('Setting up syncs...') for sync_name, sync_info in list(syncs.items()): sync_info['name'] = sync_name check_key('group', sync_info, 'sync', sync_name, logger) check_key(sync_info['group'], self.groups, 'sync', sync_name, logger, f'group {sync_info["group"]} does not exist') check_key('class', sync_info, 'sync', sync_name, logger) # convert group references group_name = sync_info['group'] sync_info['group'] = self.groups[group_name] sync_class = get_registered_class(sync_info['class']) del sync_info['class'] new_sync = sync_class(**sync_info) self.__syncs[sync_name] = new_sync logger.info(f'Sync "{sync_name}" added') def __init_manager(self, manager): """Called by ``__init__`` to parse and instantiate the robot manager.""" # process joints and replace names with objects logger.info('Setting up manager...') joints = manager.get('joints', []) for index, joint_name in enumerate(joints): check_key(joint_name, self.joints, 'manager', self.name, logger) joints[index] = self.joints[joint_name] group_name = manager.get('group', '') if group_name: check_key(group_name, self.groups, 'manager', self.name, logger) group = self.groups[group_name] for joint in group: check_type(joint, Joint, 'manager', self.name, logger) else: group = set() if 'joints' in manager: del manager['joints'] if 'group' in manager: del manager['group'] name = manager.get('name', self.name+'-manager') self.__manager = JointManager(name=name, joints=joints, group=group, **manager) logger.info(f'Manager "{self.manager.name}" added') @property def name(self): """(read-only) The name of the robot.""" return self.__name @property def buses(self): """(read-only) The buses of the robot as a dict.""" return self.__buses @property def inits(self): """The initialization templates defined for the robot.""" return self.__inits @property def devices(self): """(read-only) The devices of the robot as a dict.""" return self.__devices
[docs] def device_by_id(self, dev_id): """Returns a device by it's ID. Parameters ---------- dev_id: int the ID or device to be returned Returns ------- BaseRegister the register with that ID in the device. If no register with that ID exists, returns ``None``. """ return self.__dev_by_id.get(dev_id, None)
@property def joints(self): """(read-only) The joints of the robot as a dict.""" return self.__joints @property def sensors(self): """The sensors of the robot as a dict.""" return self.__sensors @property def groups(self): """(read-only) The groups of the robot as a dict.""" return self.__groups @property def syncs(self): """(read-only) The syncs of the robot as a dict.""" return self.__syncs @property def manager(self): """The RobotManager of the robot.""" return self.__manager
[docs] def start(self): """Starts the robot operation. It will: * call the :py:meth:`~BaseBus.open` method on all buses except the ones that have ``auto`` set to ``False`` * call the :py:meth:`~BaseDevice.open` method on all devices except the ones that have ``auto`` set to ``False`` * call the :py:meth:`~BaseSync.start` method on all syncs except the ones that have ``auto`` set to ``False`` """ logger.info('***** Starting robot *****************') # buses logger.info('Opening buses...') for bus in list(self.buses.values()): if bus.auto_open: logger.info(f'Opening bus: "{bus.name}"') bus.open() else: logger.info(f'Opening bus: "{bus.name}" - skipped') # devices logger.info('Opening devices...') for device in list(self.devices.values()): logger.info(f'Opening device: "{device.name}"') # TODO: should there be an Auto attribute for devices? device.open() # joint manager; this will also start the joints logger.info('Starting joint manager...') self.manager.start() # syncs # we start syncs latest to make sure that the joint manager # has properly initialized the joints before starting to replicate # the internal devices logger.info('Starting syncs...') for sync in list(self.syncs.values()): if sync.auto_start: logger.info(f'Starting sync: "{sync.name}"') sync.start() else: logger.info(f'Starting sync: "{sync.name}" - skipped') # finished logger.info('***** Robot started ******************')
[docs] def stop(self): """Stops the robot operation. It will: * call the :py:meth:`~BaseSync.stop` method on all syncs * call the :py:meth:`~BaseDevice.close` method on all devices * call the :py:meth:`~BaseBus.close` method on all buses """ logger.info('***** Stopping robot *****************') logger.info('Stopping joint manager...') self.manager.stop() logger.info('Stopping syncs...') for sync in list(self.syncs.values()): logger.info(f'Stopping sync: "{sync.name}"') sync.stop() logger.info('Closing devices...') for device in list(self.devices.values()): logger.info(f'Closing device: "{device.name}"') device.close() logger.info('Closing buses...') for bus in list(self.buses.values()): logger.info(f'Closing bus: "{bus.name}"') bus.close() logger.info('***** Robot stopped ******************')
[docs]class JointManager(BaseLoop): """Implements the management of the joints by alowing multiple movement streams to submit position commands to the robot. The ``JointManager`` inherits the constructor paramters from :py:class:`BaseLoop`. Please refer to that class for mote details. In addition the class introduces the following additional paramters: Parameters ---------- joints: list of :py:class:roboglia.Base.`Joint` or subclass The list of joints that the manager is having under control. Alternatively you can use the parameter ``group`` (see below) group: set of :py:class:roboglia.Base.`Joint` or subclass A group of joints that was defined earlier with a ``group`` statement in the robot definition file. function: str The function used to produce the blended command for the joints. If specific functions for position (``p_function``), velocity ( ``v_function``) or load (``ld_function``) are not supplied, then this function is used. Allowed values are 'mean', 'median', 'min', 'max'. p_function: str A specific function to be used for aggregating the position values. Allowed values are 'mean', 'median', 'min', 'max'. v_function: str A specific function to be used for aggregating the velocity values. Allowed values are 'mean', 'median', 'min', 'max'. ld_function: str A specific function to be used for aggregating the load values. Allowed values are 'mean', 'median', 'min', 'max'. timeout: float Is a time in seconds an accessor will wait before issuing a timeout when trying to submit data to the manager or the manager preparing the data for the joints. """
[docs] def __init__(self, name='JointManager', frequency=100.0, joints=[], group=None, function='mean', p_function=None, v_function=None, ld_function=None, timeout=0.5, **kwargs): super().__init__(name=name, frequency=frequency, **kwargs) temp_joints = [] if joints: temp_joints.extend(joints) if group: temp_joints.extend(group) # eliminate duplicates self.__joints = list(set(temp_joints)) if len(self.__joints) == 0: logger.warning('Joint manager does not have any joints ' 'attached to it') check_options(function, ['mean', 'median', 'min', 'max'], 'JointManager', name, logger) # aggregate functions func = self.__check_function(function, 'default') self.__p_func = self.__check_function(p_function, 'p_function', func) self.__v_func = self.__check_function(v_function, 'v_function', func) self.__ld_func = self.__check_function(ld_function, 'ld_function', func) # processing queues self.__submissions = {} self.__adjustments = {} self.__streams = {} self.__lock = threading.Lock()
def __check_function(self, func_name, context, default=statistics.mean): """Checks the function provided and returns a reference to it. Supported functions: ``mean``, ``median``, ``min`` and ``max``. Parameters ---------- func_name: str A name of a function to be checked and retrieved. Supported values: ``mean``, ``median``, ``min`` and ``max``. default: function A function that will be used to default to in case the supplied one is not supported. Returns ------- func: If the function is one of the supported ones, it returns a reference to it, otherwise returns ``default`` function. """ supported = { 'mean': statistics.mean, 'median': statistics.median, 'min': min, 'max': max } if func_name in supported: return supported[func_name] logger.info(f'Function "{func_name}" for {context} not supported. ' f'Using {default}') return default @property def joints(self): return self.__joints @property def p_func(self): """Aggregate function for positions.""" return self.__p_func @property def v_func(self): """Aggregate function for positions.""" return self.__v_func @property def ld_func(self): """Aggregate function for positions.""" return self.__ld_func
[docs] def submit(self, stream, commands, adjustments=False): """Used by a stream of commands to notify the Joint Manager they joint commands they want. Parameters ---------- stream: BaseThread or subclass The stream providing the data. It is used to keep the request separate and be able to merge later. commands: dict A dictionary with the commands requests in the format:: {joint_name: (values)} Where ``values`` is a tuple with the command for that joint. It is acceptable to send partial commands to a joint, for instance you can send only (100,) meaning position 100 to a JointPVL. Submitting more information to a joint will have no effect, for instance (100, 20, 40) (position, velocity, load) to a Joint will only use the position part of the request. adjustments: bool Indicates that the values are to be treated as adjustments to the other requests instead of absolute requests. This is convenient for streams that request postion correction like an accelerometer based balance control. Internally the JointManger keeps the commands separate between the absolute and the adjustments ones and calculates separate averages then adjusts the absolute results with the ones from the adjustments to produce the final numbers. Returns ------- bool: ``True`` if the operation was successful. False if there was an error (most likely the lock was not acquired). Caller needs to review this and decide if they should retry to send data. """ if not self.__lock.acquire(timeout=self.period): logger.warning(f'failed to acquire manager for ' f'stream {stream.name}') return False # add the new stream if stream.name not in self.__streams: self.__streams[stream.name] = stream # record adjustments request if adjustments: self.__adjustments[stream.name] = commands # record submission request else: self.__submissions[stream.name] = commands self.__lock.release() return True
[docs] def stop_submit(self, stream, adjustments=False): """Notifies the ``JointManager`` that the stream has finished sending data and as a result the data in the ``JointManager`` cache should be removed. .. warning:: If the stream does not call this method when it finished with a routine the last submission will remain in the cache and will continue to be averaged with the other requests, creating problems. Don't forget to call this method when your move finishes! Parameters ---------- stream: BaseThread or subclass The name of the move sending the data adjustments: bool Indicates the move submitted to the adjustment stream. Returns ------- bool: ``True`` if the operation was successful. False if there was an error (most likely the lock was not acquired). Caller needs to review this and decide if they should retry to send data. In the case of this method it is advisable to try resending the request, otherwise stale data will stay in the cache. """ if not self.__lock.acquire(timeout=self.period): logger.warning(f'failed to acquire manager for ' f'stream {stream.name}') return False # delete the stream if stream.name in self.__streams: # pragma: no branch del self.__streams[stream.name] # remove any adjustment requests if adjustments: if stream.name in self.__adjustments: # pragma: no branch del self.__adjustments[stream.name] # remove any submission requests else: if stream.name in self.__submissions: # pragma: no branch del self.__submissions[stream.name] self.__lock.release() return True
[docs] def start(self): """Starts the JointManager. Before calling the :py:meth:`BaseThread.start` it activates the joints if they indicate they have the ``auto`` flag set. """ for joint in self.joints: if joint.auto_activate and not joint.active: logger.info(f'Activating joint: "{joint.name}"') joint.active = True else: logger.info(f'Activating joint: "{joint.name}" - skipped') super().start()
[docs] def stop(self): """Stops the JointManager. After calling the :py:meth:`BaseThread.stop` it deactivates the joints if they indicate they have the ``auto`` flag set. """ # stop the streams logger.info('Stopping streams...') start = time.time() duration = 0 while self.__streams and duration < 2.0: stream = list(self.__streams.values())[0] if stream.running: stream.stop() duration = time.time() - start # while stream.running: # time.sleep(0.1) super().stop() for joint in self.joints: if joint.auto_activate and joint.active: logger.info(f'Deactivating joint: "{joint.name}"') joint.active = False else: logger.info(f'Deactivating joint: "{joint.name}" - skipped')
[docs] def atomic(self): if not self.__lock.acquire(timeout=self.period): logger.warning('failed to acquire lock for atomic processing') else: for joint in self.joints: comm = self.__process_request(joint, self.__submissions) adj = self.__process_request(joint, self.__adjustments) value = comm + adj if not value == PVL(): # pragma: no branch logger.debug(f'Setting joint {joint.name}: value={value}') joint.value = value self.__lock.release()
def __process_request(self, joint, requests): """Processes a list of requests and returns the processed command for that joint. The processed command applies an aggregation function (default ``mean``) to the command parameters. Parameters ---------- joint: Joint or subclass The joint being processed requests: dict A dictionary that contains all the requests submitted by streams. They are normally either the :py:class:`JointManager`'s ``submissions`` or ``adjustments``, the two buffers with requests for joint positions. The dictionary has as key the submitter's name and the data is another dict of {joint : (pos, vel, load)} records. Returns ------- PVLList: A list of PVL items selected from the requests. If there are no commands for that joint it returns a list with """ req = PVLList() for request in list(requests.values()): values = request.get(joint.name, None) if not values: continue else: req.append(pvl=values) if len(req) == 0: return PVL() # will be with ``nan``` if len(req) == 1: return req.items[0] return req.process(p_func=self.p_func, v_func=self.v_func, ld_func=self.ld_func)