Source code for roboglia.dynamixel.sync

# Copyright (C) 2020  Alex Sonea

# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.

# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.

# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <https://www.gnu.org/licenses/>.

import logging
from dynamixel_sdk import GroupSyncWrite, GroupSyncRead
from dynamixel_sdk import GroupBulkWrite, GroupBulkRead

from ..base import BaseSync

logger = logging.getLogger(__name__)


[docs]class DynamixelSyncWriteLoop(BaseSync): """Implements SyncWrite as specified in the frequency parameter. The devices are provided in the `group` parameter and the registers in the `registers` as a list of register names. It will update from `int_value` of each register for every device. Will raise exceptions if the SyncWrite cannot be setup or fails to execute. """
[docs] def setup(self): """This allocates the ``GroupSyncWrite``. It needs to be here and not in the constructor as this is part of the wrapped execution that is produced by :py:class:`BaseThread` class. """ # determines the addresses and lengths for each SyncWrite # allocates the GroupSyncWrite objects for each one self.__start_address, self.__length, cont = self.get_register_range() if not cont: mess = f'SyncWrite {self.name} requires registers to be contiguous' logger.error(mess) raise RuntimeError(mess) self.gsw = GroupSyncWrite(self.bus.port_handler, self.bus.packet_handler, self.__start_address, self.__length)
[docs] def atomic(self): """Executes a SyncWrite.""" # add params to sync write for device in self.devices: data = [0] * self.__length for reg_name in self.register_names: register = getattr(device, reg_name) pos = register.address - self.__start_address data[pos: pos + register.size] = device.register_low_endian( register.int_value, register.size) # addParam result = self.gsw.addParam(device.dev_id, data) if not result: # pragma: no cover logger.error(f'failed to setup SyncWrite for loop ' f'{self.name} for device {device.name}') # execute write if self.bus.can_use(): result = self.gsw.txPacket() self.bus.stop_using() # !! as soon as possible error = self.gsw.ph.getTxRxResult(result) self.inc_processed() logger.debug(f'[sync write {self.name}], result: {error}') if result != 0: logger.error(f'failed to execute SyncWrite {self.name}: ' f'cerr={error}') self.inc_errors() else: logger.error(f'sync {self.name} ' f'failed to acquire bus {self.bus.name}') # cleanup self.gsw.clearParam()
[docs]class DynamixelSyncReadLoop(BaseSync): """Implements SyncRead as specified in the frequency parameter. The devices are provided in the `group` parameter and the registers in the `registers` as a list of register names. It will update the `int_value` of each register in every device with the result of the call. Will raise exceptions if the SyncRead cannot be setup or fails to execute. Only works with Protocol 2.0. """
[docs] def __init__(self, **kwargs): super().__init__(**kwargs) if self.bus.protocol != 2.0: mess = 'SyncRead only supported for Dynamixel Protocol 2.0.' logger.critical(mess) raise ValueError(mess)
[docs] def setup(self): """Prepares to start the loop.""" self.__start_address, self.__length, _ = self.get_register_range() self.gsr = GroupSyncRead(self.bus.port_handler, self.bus.packet_handler, self.__start_address, self.__length) for device in self.devices: result = self.gsr.addParam(device.dev_id) if result is not True: # pragma: no cover logger.error(f'Failed to setup SyncRead for loop ' f'{self.name} for device {device.name}')
[docs] def atomic(self): """Executes a SyncRead.""" # acquire the bus if not self.bus.can_use(): logger.error(f'Sync {self.name} ' f'failed to acquire bus {self.bus.name}') return # execute read result = self.gsr.txRxPacket() self.bus.stop_using() # !! as soon as possible if result != 0: error = self.bus.packet_handler.getTxRxResult(result) logger.error(f'SyncRead {self.name}, cerr={error}') # return # retrieve data for device in self.devices: for reg_name in self.register_names: self.inc_processed() register = getattr(device, reg_name) if not self.gsr.isAvailable(device.dev_id, register.address, register.size): logger.error(f'Failed to retrieve data in SyncRead ' f'{self.name} for device {device.name} ' f'and register {register.name}') self.inc_errors() else: register.int_value = self.gsr.getData( device.dev_id, register.address, register.size)
[docs]class DynamixelBulkWriteLoop(BaseSync): """Implements BulkWrite as specified in the frequency parameter. The devices are provided in the `group` parameter and the registers in the `registers` as a list of register names. The registers do not need to be sequential. It will update from `int_value` of each register for every device. Will raise exceptions if the BulkWrite cannot be setup or fails to execute. Only works with Protocol 2.0. """
[docs] def __init__(self, **kwargs): super().__init__(**kwargs) if self.bus.protocol != 2.0: mess = 'BulkWrite only supported for Dynamixel Protocol 2.0.' logger.critical(mess) raise ValueError(mess)
[docs] def setup(self): """This allocates the ``GroupBulkWrite``. It needs to be here and not in the constructor as this is part of the wrapped execution that is produced by :py:class:`BaseThread` class. """ self.__start_address, self.__length, cont = self.get_register_range() if not cont: mess = f'BulkWrite {self.name} requires registers to be contiguous' logger.error(mess) raise RuntimeError(mess) self.gbw = GroupBulkWrite(self.bus.port_handler, self.bus.packet_handler)
[docs] def atomic(self): """Executes a BulkWrite.""" for device in self.devices: data = [0] * self.__length for reg_name in self.register_names: register = getattr(device, reg_name) pos = register.address - self.__start_address data[pos: pos + register.size] = device.register_low_endian( register.int_value, register.size) # addParam result = self.gbw.addParam(device.dev_id, self.__start_address, self.__length, data) if not result: # pragma: no cover logger.error(f'Failed to setup BulkWrite for loop ' f'{self.name} for device {device.name}') # execute write if self.bus.can_use(): result = self.gbw.txPacket() self.bus.stop_using() # !! as soon as possible error = self.gbw.ph.getTxRxResult(result) logger.debug(f'[bulk write {self.name}], result: {error}') if result != 0: logger.error(f'Failed to execute BulkWrite {self.name}: ' f'cerr={error}') else: logger.error(f'Sync {self.name} ' f'failed to acquire bus {self.bus.name}') # cleanup self.gbw.clearParam()
[docs]class DynamixelBulkReadLoop(BaseSync): """Implements BulkRead as specified in the frequency parameter. The devices are provided in the `group` parameter and the registers in the `registers` as a list of register names. The registers do not need to be sequential. It will update the `int_value` of each register in every device with the result of the call. Will raise exceptions if the BulkRead cannot be setup or fails to execute. With Protocol 1.0 officially works only with MX devices. """
[docs] def setup(self): """Prepares to start the loop.""" self.__start_address, self.__length, _ = self.get_register_range() self.gbr = GroupBulkRead(self.bus.port_handler, self.bus.packet_handler) for device in self.devices: result = self.gbr.addParam(device.dev_id, self.__start_address, self.__length) if result is not True: # pragma: no cover logger.error(f'Failed to setup BulkRead for loop ' f'{self.name} for device {device.name}')
[docs] def atomic(self): """Executes a BulkRead.""" # execute read if not self.bus.can_use(): logger.error(f'Sync {self.name} ' f'failed to acquire bus {self.bus.name}') else: result = self.gbr.txRxPacket() self.bus.stop_using() # !! as soon as possible if result != 0: error = self.gbr.ph.getTxRxResult(result) logger.error(f'BulkRead {self.name}, cerr={error}') else: # retrieve data for device in self.devices: for reg_name in self.register_names: register = getattr(device, reg_name) if not self.gbr.isAvailable(device.dev_id, register.address, register.size): logger.error(f'Failed to retrieve data in ' f'BulkRead {self.name} for ' f'device {device.name} and ' f'register {register.name}') else: register.int_value = self.gbr.getData( device.dev_id, register.address, register.size)
class DynamixelRangeReadLoop(BaseSync): """Implements Read for a list of registers as specified in the frequency parameter. This method is provided as an alternative for AX devices that do not support BulkRead or SyncRead and for which reading registers with BaseReadSync would be extremely inefficient. With this method we still have to send / receive a communication packet for each device, but we would get all the registers in one go. The devices are provided in the `group` parameter and the registers in the `registers` as a list of register names. The registers do not need to be sequential. It will update the `int_value` of each register in every device with the result of the call. Will raise exceptions if the BulkRead cannot be setup or fails to execute. """ def setup(self): """Prepares to start the loop.""" self.start_address, self.length, _ = self.get_register_range() def atomic(self): """Executes a RangeRead for all devices.""" # execute read if not self.bus.can_use(): logger.error(f'Sync "{self.name}" ' f'failed to acquire bus "{self.bus.name}"') return for device in self.devices: # call the function try: res, cerr, derr = self.bus.packet_handler.readTxRx( self.bus.port_handler, device.dev_id, self.start_address, self.length) except Exception as e: logger.error(f'Exception raised while reading bus ' f'"{self.name}" device "{device.name}"') logger.error(str(e)) continue # success call - log DEBUG logger.debug(f'[RangeRead] dev={device.dev_id} ' f'{res} (cerr={cerr}, derr={derr})') # process result if cerr != 0: # communication error err_desc = self.bus.packet_handler.getTxRxResult(cerr) logger.error(f'[RangeRead "{self.name}"] ' f'device "{device.name}", cerr={err_desc}') continue if derr != 0: # device error err_desc = self.bus.packet_handler.getRxPacketError(derr) logger.warning(f'Device "{device.name}" responded with a ' f'return error: {err_desc}') # process results for reg_name in self.register_names: reg = getattr(device, reg_name) pos = reg.address - self.start_address if reg.size == 1: value = res[pos] elif reg.size == 2: value = res[pos] + res[pos + 1] * 256 elif reg.size == 4: value = res[pos] + res[pos + 1] * 256 + \ res[pos + 2] * 65536 + \ res[pos + 3] * 16777216 else: raise NotImplementedError reg.int_value = value self.bus.stop_using() # !! as soon as possible