Source code for roboglia.dynamixel.bus

# Copyright (C) 2020  Alex Sonea

# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.

# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# GNU General Public License for more details.

# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <>.

import logging
import random

from dynamixel_sdk import PacketHandler, PortHandler
from serial import rs485

from ..base import BaseBus, SharedBus
from ..utils import check_type, check_options, check_not_empty

logger = logging.getLogger(__name__)

[docs]class DynamixelBus(BaseBus): """A communication bus that supports Dynamixel protocol. Uses ``dynamixel_sdk``. .. note:: The parameters listed bellow are only the specific ones introduced by the ``DynamixelBus`` class. Since this is a subclass of :py:class:`~roboglia.base.BaseBus` and the constructor will call the ``super()`` constructor, all the paramters supported by :py:class:`~roboglia.base.BaseBus` are also supported and checked when creating a ``DynamixelBus``. For instance the `name`, `robot` and `port` are validated. Parameters ---------- name: str The name of the bus robot: BaseRobot A reference to the robot using the bus port: any An identification for the physical bus access. Some busses have string description like ``/dev/ttySC0`` while others could be just integers (like in the case of I2C or SPI buses) auto: Bool If ``True`` the bus will be opened when the robot is started by calling :py:meth:`BaseRobot.start`. If ``False`` the bus will be left closed during robot initialization and needs to be opened by the programmer. baudrate: int Communication speed for the bus protocol: float Communication protocol for the bus; must be 1.0 or 2.0 rs485: bool If ``True``, ``DynamixelBus`` will configure the serial port with RS485 support. This might be required for certain interfaces that need this mode in order to control the semi-duplex protocol (one wire) implemented by Dynamixel devices or if you genuinely use RS485 Dynamixel devices. mock: bool Indicates to use mock bus for testing purposes; this will make use of the :py:class:`MockPacketHandler` to simulate the communication on a Dynamixel bus and allow to test the software in CI testing. Raises ------ KeyError: if any of the required keys are missing ValueError: if any of the required data is incorrect """
[docs] def __init__(self, name='DYNAMIXEL', robot=None, port='', auto=True, baudrate=1000000, protocol=2.0, rs485=False, mock=False): super().__init__(name=name, robot=robot, port=port, auto=auto) check_type(baudrate, int, 'bus',, logger) check_not_empty(baudrate, 'baudrate', 'bus',, logger) self.__baudrate = baudrate check_options(protocol, [1.0, 2.0], 'bus',, logger) self.__protocol = protocol check_options(rs485, [True, False], 'bus',, logger) self.__rs485 = rs485 self.__port_handler = None self.__packet_handler = None check_options(mock, [True, False], 'bus',, logger) self.__mock = mock
@property def port_handler(self): """The Dynamixel port handler for this bus.""" return self.__port_handler @property def packet_handler(self): """The Dynamixel packet handler for this bus.""" return self.__packet_handler @property def protocol(self): """Protocol supported by the bus.""" return self.__protocol @property def baudrate(self): """Bus baudrate.""" return self.__baudrate @property def rs485(self): """If the bus uses rs485.""" return self.__rs485
[docs] def open(self): """Allocates the port_handler and the packet_handler. If the attribute ``mock`` was ``True`` when setting up the bus, then uses MockPacketHandler. """ if self.__mock: check_not_empty(self.robot, 'robot', 'bus',, logger) self.__port_handler = 'MockBus' self.__packet_handler = MockPacketHandler(self.protocol, self.robot) else: # pragma: no cover self.__port_handler = PortHandler(self.port) self.__port_handler.openPort() self.__port_handler.setBaudRate(self.baudrate) if self.rs485: self.__port_handler.ser.rs485_mode = rs485.RS485Settings()'Bus "{}" set in rs485 mode') self.__packet_handler = PacketHandler(self.__protocol)'Bus "{}" opened')
[docs] def close(self): """Closes the actual physical bus. Calls the ``super().close()`` to check if there is ok to close the bus and no other objects are using it.""" if self.is_open: if super().close(): # pragma: no branch self.__packet_handler = None if not self.__mock: # pragma: no cover self.__port_handler.closePort() self.__port_handler = None'Bus "{}" closed')
@property def is_open(self): """Returns `True` or `False` if the bus is open. """ return self.__port_handler is not None
[docs] def ping(self, dxl_id): """Performs a Dynamixel ``ping`` of a device. Parameters ---------- dxl_id: int The Dynamixel device number to be pinged. Returns ------- bool ``True`` if the device responded, ``False`` otherwise. """ if not self.is_open: logger.error('Ping invoked with a bus not opened') return False _, cerr, derr =, dxl_id) return True if cerr == 0 and derr == 0 else False
[docs] def scan(self, range=list(range(254))): """Scans the devices on the bus. Parameters ---------- range: range the range of devices to be cheked if they exist on the bus. The method will call :py:meth:`~ping` for each ID in the list. By default the list is [0, 253]. Returns: list of int The list of IDs that have been successfully identified on the bus. If none is found the list will be empty. """ if not self.is_open: logger.error('Scan invoked with a bus not opened') else: return [dxl_id for dxl_id in range if]
[docs] def read(self, reg): """Depending on the size of the register calls the corresponding TxRx function from the packet handler. If the result is ok (communication error and dynamixel error are both 0) then the obtained value is returned. Communication and data errors are logged and no exceptions are raised. Parameters ---------- reg: BaseRegister or subclass The register to be read Returns ------- int: The value read by calling the device. """ if not self.is_open: logger.error(f'Attempt to use closed bus "{}"') else: dev = reg.device # select the function by the size of register if reg.size == 1: function = self.__packet_handler.read1ByteTxRx elif reg.size == 2: function = self.__packet_handler.read2ByteTxRx elif reg.size == 4: function = self.__packet_handler.read4ByteTxRx else: raise NotImplementedError # call the function try: res, cerr, derr = function(self.__port_handler, dev.dev_id, reg.address) except Exception as e: # pragma: no cover logger.error(f'Exception raised while reading bus ' f'"{}" device "{}" register ' f'"{}"') logger.error(str(e)) return None # success call - log DEBUG logger.debug(f'[readXByteTxRx] dev={dev.dev_id} ' f'reg={reg.address}: ' f'{res} (cerr={cerr}, derr={derr})') # process result if cerr != 0: # communication error err_desc = self.__packet_handler.getTxRxResult(cerr) logger.error(f'[bus "{}"] device "{}", ' f'register "{}": {err_desc}') return None if derr != 0: # device error err_desc = self.__packet_handler.getRxPacketError(derr) logger.warning(f'device "{}" responded with a ' f'return error: {err_desc}') return res
[docs] def write(self, reg, value): """Depending on the size of the register calls the corresponding TxRx function from the packet handler. Communication and data errors are logged and no exceptions are raised. Parameters ---------- reg: BaseRegister or subclass The register to write to value: int The value to write to the register. Please note that this is in the internal format of the register and it is the responsibility of the register class to provide conversion between the internal and external format if they are different. """ if not self.is_open: logger.error(f'Attempt to use closed bus "{}"') else: dev = reg.device # select function by register size if reg.size == 1: function = self.__packet_handler.write1ByteTxRx elif reg.size == 2: function = self.__packet_handler.write2ByteTxRx elif reg.size == 4: function = self.__packet_handler.write4ByteTxRx else: raise NotImplementedError # execute the function try: cerr, derr = function(self.__port_handler, dev.dev_id, reg.address, value) except Exception as e: # pragma: no cover logger.error(f'Exception raised while writing bus ' f'"{}" device "{}" register ' f'"{}"') logger.error(str(e)) return None # success call - log DEBUG logger.debug(f'[writeXByteTxRx] dev={dev.dev_id} ' f'reg={reg.address}: ' f'{value} (cerr={cerr}, derr={derr})') # process result if cerr != 0: # communication error err_desc = self.__packet_handler.getTxRxResult(cerr) logger.error(f'[Bus "{}"] device "{}", ' f'register "{}": {err_desc}') else: if derr != 0: # device error err_desc = self.__packet_handler.getRxPacketError(derr) logger.warning(f'Device "{}" responded with a ' f'return error: {err_desc}')
[docs] def __repr__(self): ans = super().__repr__()[:-1] ans += f' prot={self.protocol} baud={self.baudrate} rs485={self.rs485}' return ans+'>'
[docs]class SharedDynamixelBus(SharedBus): """A DynamixelBus that can be used in multithreaded environment. Includes the functionality of a :py:class:`DynamixelBus` in a :py:class:`SharedBus`. The :py:meth:`~write` and :py:meth:`~read` methods are wrapped around in :py:meth:`~can_use` and :py:meth:`~stop_using` to provide the exclusive access. In addition, two methods :py:meth:`~naked_write` and :py:meth:`~naked_read` are provided so that classes that want sequence of read / writes can do that more efficiently without accessing the lock every time. They simply invoke the *unsafe* methods :py:meth:DynamixelBus.`write` and :py:meth:DynamixelBus.`read` from the :py:class:`DynamixelBus` class. .. see also: :py:class:`SharedBus` class. .. warning:: If you are using :py:meth:`~naked_write` and :py:meth:`~naked_read` you **must** ensure that you wrap them in :py:meth:`~can_use` and :py:meth:`~stop_using` in the calling code. """
[docs] def __init__(self, **kwargs): super().__init__(DynamixelBus, **kwargs)
[docs]class MockPacketHandler(): """A class used to simulate the Dynamixel communication without actually using a real bus or devices. Used for testing in the CI environment. The class includes deterministic behavior, for instance it will use the existing values of the device to mock a response, as well as well as stochastic behavior where with a certain probability we generate communication errors in order to be able to test how the code deals with these situations. Also, for read of registers that are read only the class will introduce small random numbers to the numbers already in the registers so to simulate values that change over time (ex. current position). Parameters ---------- protocol: float Dynamixel protocol to use. Should be 1.0 or 2.0 robot: BaseRobot The robot for in order to *bootstrap* information. err: float A value that is used to generate random communication errors so that we can test the parts of the code that deal with this. """
[docs] def __init__(self, protocol, robot, err=0.1): self.__robot = robot self.__err = err self.__protocol = protocol self.__sync_data_length = None
[docs] def getProtocolVersion(self): """Returns the Dynamixel protocol used.""" return self.__protocol
[docs] def getTxRxResult(self, err): """Used to get a string representation of a communication error. Invokes the official function from ``PacketHandler`` in ``dynamixel_sdk``. Parameters ---------- err: int An error code as reported by the communication medium Returns ------- str: A string representation of this error. """ ph = PacketHandler(self.__protocol) return ph.getTxRxResult(err)
[docs] def getRxPacketError(self, err): """Used to get a string representation of a device response error. Invokes the official function from ``PacketHandler`` in ``dynamixel_sdk``. Parameters ---------- err: int An error code as reported by the Dynamixel device Returns ------- str: A string representation of this error. """ ph = PacketHandler(self.__protocol) return ph.getRxPacketError(err)
def __common_writeTxRx(self, ph, dev_id, address, value): if random.random() < self.__err: logger.error('** Random error generated by MockPacketHandler **') return -3001, 0 if random.random() < self.__err: logger.error('** Random error generated by MockPacketHandler **') return 0, 4 # overheat return 0, 0
[docs] def write1ByteTxRx(self, ph, dev_id, address, value): """Mocks a write of 1 byte to a device. In ``err`` percentage time it will raise a communication error. From the remaning cases again an ``err`` percentage will be raised with device error (overheat). The paramters are copied from the ``PacketHadler`` in ``dynamixel_sdk``. You would rarely need to use this. """ return self.__common_writeTxRx(ph, dev_id, address, value)
[docs] def write2ByteTxRx(self, ph, dev_id, address, value): """Same as :py:meth:`write1ByteTxRx` but for 2 Bytes registers.""" return self.__common_writeTxRx(ph, dev_id, address, value)
[docs] def write4ByteTxRx(self, ph, dev_id, address, value): """Same as :py:meth:`write1ByteTxRx` but for 4 Bytes registers.""" return self.__common_writeTxRx(ph, dev_id, address, value)
def __common_readTxRx(self, ph, dev_id, address): if random.random() < self.__err: logger.error('** Random error generated by MockPacketHandler **') return 0, -3001, 0 device = self.__robot.device_by_id(dev_id) reg = device.register_by_address(address) if random.random() < self.__err: logger.error('** Random error generated by MockPacketHandler **') return reg.int_value, 0, 4 # overheat return reg.int_value, 0, 0
[docs] def read1ByteTxRx(self, ph, dev_id, address): """Same as :py:meth:`write1ByteTxRx` but for reading 1 Bytes registers.""" return self.__common_readTxRx(ph, dev_id, address)
[docs] def read2ByteTxRx(self, ph, dev_id, address): """Same as :py:meth:`write1ByteTxRx` but for reading 2 Bytes registers.""" return self.__common_readTxRx(ph, dev_id, address)
[docs] def read4ByteTxRx(self, ph, dev_id, address): """Same as :py:meth:`write1ByteTxRx` but for reading 4 Bytes registers.""" return self.__common_readTxRx(ph, dev_id, address)
[docs] def syncWriteTxOnly(self, port, start_address, data_length, param, param_length): """Mocks a SyncWrite transmit package. We return randomly an error or success.""" if random.random() < self.__err: logger.error('** Random error generated by MockPacketHandler **') return -3001 return 0
[docs] def syncReadTx(self, port, start_address, data_length, param, param_length): """Mocks a SyncWrite transmit package. We return randomly an error or success.""" if random.random() < self.__err: logger.error('** Random error generated by MockPacketHandler **') return -3001 self.__sync_data_length = data_length self.__param = param self.__start_address = start_address self.__index = 0 self.__mode = 'sync' return 0
[docs] def readRx(self, port, dxl_id, length): """Mocks a read package received. Used by SyncRead and BulkRead. It will attempt to produce a response based on the data already exiting in the registers. If the register is a read-only one, we will add a random value between (-10, 10) to the exiting value and then trim it to the ``min`` and ``max`` limits of the register. When passing back the data, for registers that are more than 1 byte a *low endian* conversion is executed (see :py:meth:`DynamixelRegister.register_low_endian`). """ if random.random() < self.__err: logger.error('** Random error generated by MockPacketHandler **') return 0, -3001, 0 # we're not going to check the device and register as we # expect both to be available since we checked them when # we setup the sync if self.__mode == 'sync': device = self.__robot.device_by_id(self.__param[self.__index]) register = device.register_by_address(self.__start_address) else: # bulk idx = self.__index * 5 dev_id = self.__param[idx] device = self.__robot.device_by_id(dev_id) assert dev_id == dxl_id address = self.__param[idx + 1] + self.__param[idx + 2] * 256 register = device.register_by_address(address) # assert register.size == length self.__start_address = address self.__sync_data_length = self.__param[idx + 3] + \ self.__param[idx + 4] * 256 data = [] pos = register.address - self.__start_address while pos < self.__sync_data_length: if register.access == 'RW': value = register.int_value else: value = register.int_value + random.randint(-10, 10) value = max(register.minim, min(register.maxim, value)) data.extend(device.register_low_endian(value, register.size)) pos += register.size self.__index += 1 return data, 0, 0
[docs] def readTxRx(self, port, dxl_id, address, length): """Mocks a read package received. Used by RangeRead. It will attempt to produce a response based on the data already exiting in the registers. If the register is a read-only one, we will add a random value between (-10, 10) to the exiting value and then trim it to the ``min`` and ``max`` limits of the register. When passing back the data, for registers that are more than 1 byte a *low endian* conversion is executed (see :py:meth:`DynamixelRegister.register_low_endian`). """ if random.random() < self.__err: logger.error('** Random error generated by MockPacketHandler **') raise OSError if random.random() < self.__err: logger.error('** Random error generated by MockPacketHandler **') return [0], -3001, 0 device = self.__robot.device_by_id(dxl_id) parsed_len = 0 res = [] register = None while parsed_len < length: while not register: register = device.register_by_address(address) if not register: address += 1 parsed_len += 1 res.append(0) if register.access == 'RW': value = register.int_value else: value = register.int_value + random.randint(-10, 10) value = max(register.minim, min(register.maxim, value)) res.extend(device.register_low_endian(value, register.size)) address += register.size parsed_len += register.size register = None if random.random() < self.__err: logger.error('** Random error generated by MockPacketHandler **') derr = 4 # overheat else: derr = 0 return res, 0, derr
[docs] def bulkWriteTxOnly(self, port, param, param_length): """Simulate a BulkWrite transmit package. We return randomly an error or success.""" if random.random() < self.__err: logger.error('** Random error generated by MockPacketHandler **') return -3001 return 0
[docs] def bulkReadTx(self, port, param, param_length): """"Simulate a BulkWrite transmit of response request package. We return randomly an error or success.""" if random.random() < self.__err: logger.error('** Random error generated by MockPacketHandler **') return -3001 # self.__sync_data_length = data_length self.__param = param # self.__start_address = start_address self.__index = 0 self.__mode = 'bulk' return 0
[docs] def ping(self, ph, dxl_id): """Simulates a ``ping`` on the Dynamixel bus.""" for device in list(self.__robot.devices.values()): if device.dev_id == dxl_id: return device.model_number, 0, 0 return 0, -3001, 0