import yaml
import logging
# from ..utils import check_key
from .thread import StepLoop
from ..base import PVLList
logger = logging.getLogger(__name__)
[docs]class Script(StepLoop):
[docs] def __init__(self, name='SCRIPT', robot=None, times=1, joints=[],
frames={}, sequences={}, scenes={}, script=[], **kwargs):
super().__init__(name=name, times=times, **kwargs)
self.__robot = robot
self.__init_joints(joints)
self.__init_frames(frames)
self.__init_sequences(sequences)
self.__init_scenes(scenes)
self.__init_script(script)
@classmethod
def from_yaml(cls, robot, file_name):
with open(file_name, 'r') as f:
init_dict = yaml.load(f, Loader=yaml.FullLoader)
if len(init_dict) > 1: # pragma: no branch
logger.warning('only the first script will be loaded')
name = list(init_dict)[0]
components = init_dict[name]
return cls(name=name, robot=robot, **components)
def __init_joints(self, joints):
"""Used by __init__ to setup the joints. Incorrect joints will be
marked with ``None`` and will be filtered out when commands are
issued.
"""
for index, joint_name in enumerate(joints):
if joint_name not in self.robot.joints:
logger.warning(f'joint {joint_name} used in script {self.name}'
f' does not exist in robot {self.robot.name} '
'and will be skipped')
joints[index] = None
else:
joints[index] = self.robot.joints[joint_name]
self.__joints = joints
def __init_frames(self, frames):
"""Used by __init__ to setup the frames. Handles full frames (dict
of position, velocity and loads) or simplified frames (list of
positions only)."""
self.__frames = {}
for frame_name, frame_info in list(frames.items()):
if isinstance(frame_info, list):
new_frame = Frame(name=frame_name, positions=frame_info)
elif isinstance(frame_info, dict):
new_frame = Frame(name=frame_name, **frame_info)
else:
raise NotImplementedError
self.__frames[frame_name] = new_frame
def __init_sequences(self, sequences):
"""Used by __init__ to setup the sequences. Frames incorrectly
referenced will the skipped."""
self.__sequences = {}
for seq_name, seq_info in list(sequences.items()):
frames = seq_info.get('frames', [])
if not frames:
logger.warning(f'sequence {seq_name} has no frames defined')
self.__sequences[seq_name] = None
else:
# check the frame names and replace with objects
for index, frame_name in enumerate(frames):
if frame_name not in self.frames:
logger.warning(f'frame {frame_name} used by sequence '
f'{seq_name} does not exits '
'and will be skipped')
frames[index] = None
else:
frames[index] = self.frames[frame_name]
self.__sequences[seq_name] = \
Sequence(name=seq_name, **seq_info)
def __init_scenes(self, scenes):
"""Used by __init__ to setup scenes."""
self.__scenes = {}
for scene_name, scene_info in list(scenes.items()):
sequences = scene_info.get('sequences', None)
if not sequences:
logger.warning(f'Scene "{scene_name}" does not have any '
f'sequences defined; will skip')
self.__scenes[scene_name] = None
else:
# replace sequence names with object references
for index, seq_name in enumerate(sequences):
# for a scene the sequence representation will be a dict
# that includes the sequence reference and the direction
# of play (inverse)
seq = {}
# check for inverse request
if '.reverse' in seq_name:
seq['reverse'] = True
seq_name = seq_name.replace('.reverse', '')
else:
seq['reverse'] = False
# validate the sequence exists
# if not log error and use None
if seq_name not in self.sequences:
logger.warning(f'Sequence "{seq_name}" used by scene '
f'"{scene_name}" does not exist; '
'will skip')
seq['sequence'] = None
else:
seq['sequence'] = self.sequences[seq_name]
# now replace the sequence name with the enhanced reference
sequences[index] = seq
# update the scenes dictionary
self.__scenes[scene_name] = \
Scene(name=scene_name, **scene_info)
def __init_script(self, script):
"""Called by __init__ to setup the script steps."""
for index, scene_name in enumerate(script):
if scene_name not in self.scenes:
logger.warning(f'Scene "{scene_name}" used by script '
f'"{self.name}" does not exist; will skip')
script[index] = None
else:
script[index] = self.scenes[scene_name]
self.__script = script
@property
def robot(self):
return self.__robot
@property
def joints(self):
return self.__joints
@property
def frames(self):
return self.__frames
@property
def sequences(self):
return self.__sequences
@property
def scenes(self):
return self.__scenes
@property
def script(self):
"""Returns the script (the list of scenes to be executed)."""
return self.__script
[docs] def play(self):
"""Inherited from :py:class:`StepLoop`. Iterates over the scenes
and produces the commands."""
logger.debug(f'Script {self.name} playing')
for scene in self.script:
if scene: # pragma: no branch
logger.debug(f'Script {self.name} playing scene {scene.name}')
for frame, duration in scene.play():
yield frame, duration
[docs] def atomic(self, data):
"""Inherited from :py:class:`StepLoop`. Submits the data to the
robot manager only for valid joints."""
# data is a list of tuples with the commands for each joint
assert len(data) == len(self.joints)
# because self.joints could contain None values we cannot use zip
commands = {}
for index, joint in enumerate(self.joints):
if joint:
commands[joint.name] = data[index]
logger.debug(f'Submitting: {commands}')
self.robot.manager.submit(self, commands)
[docs] def teardown(self):
"""Informs the robot manager we are finished."""
for _ in range(5):
if self.robot.manager.stop_submit(self):
logger.info(f'Script {self.name} successfully unsubscribed')
return None
logger.warning(f'Script {self.name} failed to unsubscribe from '
'Joint Manager')
[docs]class Scene():
[docs] def __init__(self, name='SCENE', sequences=[], times=1):
self.__name = name
self.__sequences = sequences
self.__times = times
@property
def name(self):
return self.__name
@property
def sequences(self):
return self.__sequences
@property
def times(self):
return self.__times
def play(self):
for step in range(self.times):
logger.debug(f'Scene "{self.name}" playing iteration {step+1}')
for seq_ext in self.sequences:
sequence = seq_ext['sequence']
reverse = seq_ext['reverse']
rev_text = ' in reverse' if reverse else ''
if not sequence:
logger.debug(f'Scene "{self.name}" playing sequence '
'<None> sequence - skipping')
else:
logger.debug(f'Scene "{self.name}" playing sequence '
f'"{sequence.name}"{rev_text}')
for frame, duration in sequence.play(reverse=reverse):
yield frame, duration
[docs]class Sequence():
"""A Sequence is an ordered list of of frames that have associated
durations in seconds and can be played in a loop a number of times.
Parameters
----------
name: str
The name of the sequence
frames: list of :py:class:`Frame`
The frames contained in the sequence. The order in which the frames
are listed is the order in which they will be played
durations: list of float
The durations in seconds for each frame. If the length of the list
is different than the length of the frames there will be a
critical error logged and the sequence will not be loaded.
times: int
The number of times the sequence should be played. Default is 1.
"""
[docs] def __init__(self, name='SEQUENCE', frames=[], durations=[], times=1):
self.__name = name
if len(frames) != len(durations):
logger.warning(f'Durations for sequence "{name}" different than '
'the number of frames; will skip')
return None
# normal processing
self.__frames = frames
self.__durations = durations
self.__times = times
@property
def name(self):
"""The name of the sequence."""
return self.__name
@property
def frames(self):
"""The list of ``Frame`` in the sequence."""
return self.__frames
@property
def durations(self):
"""The durations associated with each frame."""
return self.__durations
@property
def times(self):
"""The number of times the sequence will be played in a loop."""
return self.__times
[docs] def play(self, reverse=False):
"""Plays the sequence. Produces an iterator over all the frames,
repeating as many ``times`` as requested.
Parameters
----------
reverse: bool
Indicates if the frames should be played in reverse order.
Returns
-------
iterator of tuple (commands, duration)
``commands`` is the list of (pos, vel, load) for each joint
from the frame, and ``duration`` is the specified duration for
the frame.
"""
for step in range(self.times):
logger.debug(f'Sequence "{self.name}" playing iteration {step+1}')
if reverse:
zipped = list(zip(reversed(self.frames), reversed(self.durations)))
else:
zipped = list(zip(self.frames, self.durations))
for frame, duration in zipped:
if frame:
logger.debug(f'Sequence "{self.name}" playing frame '
f'"{frame.name}", duration {duration}')
yield frame.commands, duration
else:
logger.debug(f'Sequence "{self.name}" playing frame '
'<None> frame - skipping')
[docs]class Frame():
"""A ``Frame`` is a single representation of the robots' joints at one
point in time. It is described by a list of positions, the velocities
wanted to get to those positions and the loads. The last two of them
are optional and will be padded with ``nan`` in case they do not cover
all positions listed in the first parameter.
Parameters
----------
name: str
The name of the frame
positions: list of floats
The desired positions for the joints. They are provided in the same
order as the number of joints that are described at the begining
of the :py:class:`Script` where the frame is used. The unit of
measure is the one used for the joints which in turn is dependent
on the settings of the registers used by joints.
velocities: list of floats
The velocities used to move to the desired positions. If they are
empty or not all covered, the constructor will padded with ``nan``
to make it the same size as the positions. You can also use ``nan``
in the list to indicate that a particular joint does not need to
change the velocity (will continue to use the one set previously).
loads: list of floats
The loads used to move to the desired positions. If they are
empty or not all covered, the constructor will padded with ``nan``
to make it the same size as the positions. You can also use ``nan``
in the list to indicate that a particular joint does not need to
change the load (will continue to use the one set previously).
"""
[docs] def __init__(self, name='FRAME', positions=[], velocities=[], loads=[]):
self.__name = name
self.__pvl = PVLList(positions, velocities, loads)
@property
def name(self):
return self.__name
@property
def positions(self):
"""Returns the positions of a frame."""
return self.__pvl.positions
@property
def velocities(self):
"""Returns the (padded) velocities of a frame."""
return self.__pvl.velocities
@property
def loads(self):
"""Returns the (padded) loads of a frame."""
return self.__pvl.loads
@property
def commands(self):
"""Returns a list of tuples (pos, vel, load) for each joint in the
frame.
"""
return self.__pvl