Source code for roboglia.move.moves

import yaml
import logging

# from ..utils import check_key
from .thread import StepLoop
from ..base import PVLList

logger = logging.getLogger(__name__)


[docs]class Script(StepLoop): """A Script is the top level structure used for defining prescribed motion for a robot. Parameters ---------- name: str The name of the script patience: float A duration in seconds that the main thread will wait for the background thread to finish setup activities and indicate that it is in ``started`` mode. times: int How many times the loop should be played. If a negative number is given (ex. -1) the loop will play to infinite robot: BaseRobot or subclass The robot that will be performing the script defaults: dict A dictionary with default behavior. Supported elements for the moment: - "duration" (specifies the duration of a sequence transition, if no explicit one is provided) - others to come... times: int The number of times the script steps will be executed when :py:meth:`play` will be invoked. Default is 1. joints: list of Joint or subclasses An ordered list of joints that are used by the script. The ``frame`` definitions later uses this order when describing the states. frames: dict of :py:class:`Frame` The Frame definitions used by the script. See the information for this class for more details. sequences: dict of :py:class:`Sequence` The Sequence defintions that are used by the script. See the information for this class for more details. scenes: dict of :py:class:`Scene` The Scene defintions used by the Script. See the information for this class for more details. script: list of :py:class:`Scene` An ordered list of Scenes that represent the complete Script. When the script is played the scenes are run in the order provided and, if the ``times`` parameter is different than 1, it will repeat the execution in a loop. """
[docs] def __init__(self, name='SCRIPT', patience=1.0, times=1, robot=None, defaults={}, joints=[], frames={}, sequences={}, scenes={}, script=[]): super().__init__(name=name, patience=patience, times=times) self.__robot = robot self.__defaults = defaults self.__init_joints(joints) self.__init_frames(frames) self.__init_sequences(sequences) self.__init_scenes(scenes) self.__init_script(script)
[docs] @classmethod def from_yaml(cls, robot, file_name): """Reads the script defintion from a YAML file.""" with open(file_name, 'r') as f: init_dict = yaml.load(f, Loader=yaml.FullLoader) if len(init_dict) > 1: # pragma: no branch logger.warning('only the first script will be loaded') name = list(init_dict)[0] components = init_dict[name] return cls(name=name, robot=robot, **components)
def __init_joints(self, joints): """Used by __init__ to setup the joints. Incorrect joints will be marked with ``None`` and will be filtered out when commands are issued. """ for index, joint_name in enumerate(joints): rm_joints = [joint.name for joint in self.robot.manager.joints] if joint_name not in rm_joints: logger.warning(f'Joint {joint_name} used in script {self.name}' ' is not managed by the robot manager ' f'{self.robot.manager.name} ' 'and will be skipped') joints[index] = None else: joints[index] = self.robot.joints[joint_name] self.__joints = joints def __init_frames(self, frames): """Used by __init__ to setup the frames. Handles full frames (dict of position, velocity and loads) or simplified frames (list of positions only).""" self.__frames = {} for frame_name, frame_info in list(frames.items()): if isinstance(frame_info, list): new_frame = Frame(name=frame_name, positions=frame_info) elif isinstance(frame_info, dict): new_frame = Frame(name=frame_name, **frame_info) else: raise NotImplementedError self.__frames[frame_name] = new_frame def __init_sequences(self, sequences): """Used by __init__ to setup the sequences. Frames incorrectly referenced will the skipped.""" self.__sequences = {} for seq_name, seq_info in list(sequences.items()): frames = seq_info.get('frames', []) if not frames: logger.warning(f'sequence {seq_name} has no frames defined') self.__sequences[seq_name] = None else: # check the frame names and replace with objects for index, frame_name in enumerate(frames): if frame_name not in self.frames: logger.warning(f'frame {frame_name} used by sequence ' f'{seq_name} does not exits ' 'and will be skipped') frames[index] = None else: frames[index] = self.frames[frame_name] self.__sequences[seq_name] = \ Sequence(name=seq_name, **seq_info) def __init_scenes(self, scenes): """Used by __init__ to setup scenes.""" self.__scenes = {} for scene_name, scene_info in list(scenes.items()): sequences = scene_info.get('sequences', None) if not sequences: logger.warning(f'Scene "{scene_name}" does not have any ' f'sequences defined; will skip') self.__scenes[scene_name] = None else: # replace sequence names with object references for index, seq_name in enumerate(sequences): # for a scene the sequence representation will be a dict # that includes the sequence reference and the direction # of play (inverse) seq = {} # check for inverse request if '.reverse' in seq_name: seq['reverse'] = True seq_name = seq_name.replace('.reverse', '') else: seq['reverse'] = False # validate the sequence exists # if not log error and use None if seq_name not in self.sequences: logger.warning(f'Sequence "{seq_name}" used by scene ' f'"{scene_name}" does not exist; ' 'will skip') seq['sequence'] = None else: seq['sequence'] = self.sequences[seq_name] # now replace the sequence name with the enhanced reference sequences[index] = seq # update the scenes dictionary self.__scenes[scene_name] = \ Scene(name=scene_name, **scene_info) def __init_script(self, script): """Called by __init__ to setup the script steps.""" for index, scene_name in enumerate(script): if scene_name not in self.scenes: logger.warning(f'Scene "{scene_name}" used by script ' f'"{self.name}" does not exist; will skip') script[index] = None else: script[index] = self.scenes[scene_name] self.__script = script @property def robot(self): """The robot associated with the Script.""" return self.__robot @property def defaults(self): """Default values for Script.""" return self.__defaults @property def joints(self): """The joints used by the Script.""" return self.__joints @property def frames(self): """The dictionary of Frames used by the Script.""" return self.__frames @property def sequences(self): """The dictionary of Sequences used by the Script.""" return self.__sequences @property def scenes(self): """The dictionary of Scenes used by the Script.""" return self.__scenes @property def script(self): """Returns the script (the list of scenes to be executed).""" return self.__script
[docs] def play(self): """Inherited from :py:class:`StepLoop`. Iterates over the scenes and produces the commands.""" logger.debug(f'Script {self.name} playing') for scene in self.script: if scene: # pragma: no branch logger.debug(f'Script {self.name} playing scene {scene.name}') for frame, duration in scene.play(): yield frame, duration
[docs] def atomic(self, data): """Inherited from :py:class:`StepLoop`. Submits the data to the robot manager only for valid joints.""" # data is a list of tuples with the commands for each joint assert len(data) == len(self.joints) # because self.joints could contain None values we cannot use zip commands = {} for index, joint in enumerate(self.joints): if joint: commands[joint.name] = data[index] logger.debug(f'Submitting: {commands}') self.robot.manager.submit(self, commands)
[docs] def teardown(self): """Informs the robot manager we are finished.""" for _ in range(5): if self.robot.manager.stop_submit(self): logger.info(f'Script {self.name} successfully unsubscribed') return None logger.warning(f'Script {self.name} failed to unsubscribe from ' 'Joint Manager')
[docs]class Scene(): """A Scene is a collection of :py:class:`Sequence` presented in an ordered list. Parameters ---------- name: str The name of the Scene sequences: list of :py:class:`Sequence` The Sequences that make the Scene. times: int A repeat counter for playing the list of Sequences when the :py:meth:`play` is invoked. """
[docs] def __init__(self, name='SCENE', sequences=[], times=1): self.__name = name self.__sequences = sequences self.__times = times
@property def name(self): """The name of the Scene.""" return self.__name @property def sequences(self): """The list of Sequences in the Scene.""" return self.__sequences @property def times(self): """The repetition counter for the Scene.""" return self.__times
[docs] def play(self): """Performs a Scene. Inherited from :py:class:`StepLoop`. Iterates over the Sequences and produces the commands.""" for step in range(self.times): logger.debug(f'Scene "{self.name}" playing iteration {step+1}') for seq_ext in self.sequences: sequence = seq_ext['sequence'] reverse = seq_ext['reverse'] rev_text = ' in reverse' if reverse else '' if not sequence: logger.debug(f'Scene "{self.name}" playing sequence ' '<None> sequence - skipping') else: logger.debug(f'Scene "{self.name}" playing sequence ' f'"{sequence.name}"{rev_text}') for frame, duration in sequence.play(reverse=reverse): yield frame, duration
[docs]class Sequence(): """A Sequence is an ordered list of of frames that have associated durations in seconds and can be played in a loop a number of times. Parameters ---------- name: str The name of the sequence frames: list of :py:class:`Frame` The frames contained in the sequence. The order in which the frames are listed is the order in which they will be played durations: list of float The durations in seconds for each frame. If the length of the list is different than the length of the frames there will be a critical error logged and the sequence will not be loaded. times: int The number of times the sequence should be played. Default is 1. """
[docs] def __init__(self, name='SEQUENCE', frames=[], durations=[], times=1): self.__name = name if len(frames) != len(durations): logger.warning(f'Durations for sequence "{name}" different than ' 'the number of frames; will skip') return None # normal processing self.__frames = frames self.__durations = durations self.__times = times
@property def name(self): """The name of the sequence.""" return self.__name @property def frames(self): """The list of ``Frame`` in the sequence.""" return self.__frames @property def durations(self): """The durations associated with each frame.""" return self.__durations @property def times(self): """The number of times the sequence will be played in a loop.""" return self.__times
[docs] def play(self, reverse=False): """Plays the sequence. Produces an iterator over all the frames, repeating as many ``times`` as requested. Parameters ---------- reverse: bool Indicates if the frames should be played in reverse order. Returns ------- iterator of tuple (commands, duration) ``commands`` is the list of (pos, vel, load) for each joint from the frame, and ``duration`` is the specified duration for the frame. """ for step in range(self.times): logger.debug(f'Sequence "{self.name}" playing iteration {step+1}') if reverse: zipped = list(zip(reversed(self.frames), reversed(self.durations))) else: zipped = list(zip(self.frames, self.durations)) for frame, duration in zipped: if frame: logger.debug(f'Sequence "{self.name}" playing frame ' f'"{frame.name}", duration {duration}') yield frame.commands, duration else: logger.debug(f'Sequence "{self.name}" playing frame ' '<None> frame - skipping')
[docs]class Frame(): """A ``Frame`` is a single representation of the robots' joints at one point in time. It is described by a list of positions, the velocities wanted to get to those positions and the loads. The last two of them are optional and will be padded with ``nan`` in case they do not cover all positions listed in the first parameter. Parameters ---------- name: str The name of the frame positions: list of floats The desired positions for the joints. They are provided in the same order as the number of joints that are described at the begining of the :py:class:`Script` where the frame is used. The unit of measure is the one used for the joints which in turn is dependent on the settings of the registers used by joints. velocities: list of floats The velocities used to move to the desired positions. If they are empty or not all covered, the constructor will padded with ``nan`` to make it the same size as the positions. You can also use ``nan`` in the list to indicate that a particular joint does not need to change the velocity (will continue to use the one set previously). loads: list of floats The loads used to move to the desired positions. If they are empty or not all covered, the constructor will padded with ``nan`` to make it the same size as the positions. You can also use ``nan`` in the list to indicate that a particular joint does not need to change the load (will continue to use the one set previously). """
[docs] def __init__(self, name='FRAME', positions=[], velocities=[], loads=[]): self.__name = name self.__pvl = PVLList(positions, velocities, loads)
@property def name(self): return self.__name @property def positions(self): """Returns the positions of a frame.""" return self.__pvl.positions @property def velocities(self): """Returns the (padded) velocities of a frame.""" return self.__pvl.velocities @property def loads(self): """Returns the (padded) loads of a frame.""" return self.__pvl.loads @property def commands(self): """Returns a list of tuples (pos, vel, load) for each joint in the frame. """ return self.__pvl