# Copyright (C) 2020 Alex Sonea
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
import logging
import random
from smbus2 import SMBus
from ..utils import check_options
from ..base import BaseBus, SharedBus
logger = logging.getLogger(__name__)
[docs]class I2CBus(BaseBus):
"""Implements a communication bus for I2C devices.
``I2CBus`` has the same paramters as :py:class:`BaseBus`. Please
refer to this class for the details of the parameters.
In addition there is an extra parameter `mock`.
At this moment the ``I2CBus`` supports devices with byte and word
registers and permits defining composed regsiters with ``size`` > 1
that are treated as a single register.
.. note:: A gyroscope sensor might have registers for the z, y and z
axes reading that are stored as pairs of registers like this::
gyro_x_l #0x28
gyro_x_h #0x29
gyro_y_l #0x2A
gyro_y_h #0x2B
gyro_z_l #0x2C
gyro_z_h #0x2D
For simplicity it is possible to define these registers like this
in the device template::
registers:
gyro_x:
address: 0x28
size: 2
gyro_y:
address: 0x2A
size: 2
gyro_z:
address: 0x2C
size: 2
By default the registers are ``Byte`` and the order of data is
low-high as described in the :py:class:roboglia.base.`BaseRegister`.
The bus will handle this by reading the two registers sequentially
and computing the register's value using the size of the register
and the order.
Parameters
----------
name: str
The name of the bus
robot: BaseRobot
A reference to the robot using the bus
port: any
An identification for the physical bus access. Some busses have
string description like ``/dev/ttySC0`` while others could be just
integers (like in the case of I2C or SPI buses)
auto: Bool
If ``True`` the bus will be opened when the robot is started by
calling :py:meth:`BaseRobot.start`. If ``False`` the bus will be
left closed during robot initialization and needs to be opened
by the programmer.
mock: bool
Indicates if the I2C bus will use mock communication. It is
provided for testing of functionality in CI environment. If ``True``
the bus will use the :py:class:`MockSMBus` class for performing
read and write operations.
err: float
For mock buses this parameter controls the probability of generating
a random mock communication error.
"""
[docs] def __init__(self, name='I2CBUS', robot=None, port='', auto=True,
mock=False, err=0.1):
super().__init__(name=name, robot=robot, port=port, auto=auto)
check_options(mock, [True, False], 'bus', self.name, logger)
if mock:
self.__i2cbus = MockSMBus(self.robot, err=err)
else:
self.__i2cbus = SMBus() # not opened
@property
def port_handler(self):
return self.__i2cbus
[docs] def open(self):
"""Opens the communication port."""
# SMBus throws exceptions; we need to handle them
try:
# this will also attempt to open the bus
self.port_handler.open(self.port)
except Exception as e:
logger.error(f'failed to open I2C bus {self.name}')
logger.error(str(e))
[docs] def close(self):
"""Closes the communication port, if the ``super().close()`` allows
it. If the bus is used in any sync loops, the close request might
fail.
"""
if super().close(): # pragma: no branch
try:
self.port_handler.close()
except Exception as e:
logger.error(f'failed to close I2C bus {self.name}')
logger.error(str(e))
@property
def is_open(self):
"""Returns `True` or `False` if the bus is open."""
return self.port_handler.fd is not None
[docs] def read(self, reg):
"""Depending on the size of the register is calls the corresponding
function from the ``SMBus``.
"""
if not self.is_open:
logger.error(f'attempted to read from a closed bus: {self.name}')
return None
dev = reg.device
if reg.word:
function = self.__i2cbus.read_word_data
base = 65536
else:
function = self.__i2cbus.read_byte_data
base = 256
values = [0] * reg.size
for pos in range(reg.size):
try:
values[pos] = function(dev.dev_id, reg.address + pos)
except Exception as e:
logger.error(f'failed to execute read command on I2C bus '
f'{self.name} for device {dev.name} and '
f'register {reg.name}')
logger.error(str(e))
return None
if reg.order == 'HL':
values = values.reverse()
value = 0
for pos in range(reg.size):
value = value * base + values[-pos-1]
return value
[docs] def write(self, reg, value):
"""Depending on the size of the register it calls the corresponding
write function from ``SMBus``.
"""
if not self.is_open:
logger.error(f'attempted to write to a closed bus: {self.name}')
dev = reg.device
if reg.word:
function = self.__i2cbus.write_word_data
base = 65536
else:
function = self.__i2cbus.write_byte_data
base = 256
buffer = [0] * reg.size
data = reg.int_value
for pos in range(reg.size):
buffer[pos] = data % base
data = data // base
if reg.order == 'HL':
buffer = buffer.reverse()
for pos, item in enumerate(buffer):
try:
function(dev.dev_id, reg.address + pos, item)
except Exception as e:
logger.error(f'Failed to execute write command on I2C bus '
f'{self.name} for device {dev.name} and '
f'register {reg.name}')
logger.error(str(e))
return None
[docs] def read_block(self, device, start_address, length):
"""Reads a block of registers of given length.
Parameters
----------
device: I2CDevice or subclass
The device on the I2C bus
start_addr: int
The start address to read from
length: int
Number of bytes to read from the device
Returns
-------
list of int:
A list of bytes of length ``length`` with the values from the
device. It intercepts any exceptions and logs them, in that case
the return will be ``None``.
"""
if not self.is_open:
logger.error(f'attempted to read from a closed bus: {self.name}')
return None
try:
data = self.__i2cbus.read_i2c_block_data(
device.dev_id, start_address, length)
except Exception as e:
logger.error(f'Failed to execute read block command on I2C bus '
f'{self.name} for device {device.name}')
logger.error(str(e))
return None
return data
[docs] def write_block(self, device, start_address, data):
"""Writes a block of registers of given length.
Parameters
----------
device: I2CDevice or subclass
The device on the I2C bus
start_addr: int
The start address to read from
data: list of int
The bytes to write to the device
Returns
-------
``None``:
It intercepts any exceptions and logs them.
"""
if not self.is_open:
logger.error(f'attempted to write to a closed bus: {self.name}')
try:
self.__i2cbus.write_i2c_block_data(
device.dev_id, start_address, data)
except Exception as e:
logger.error(f'Failed to execute write block command on I2C bus '
f'{self.name} for device {device.name}')
logger.error(str(e))
[docs]class SharedI2CBus(SharedBus):
"""An I2C bus that can be shared between threads in a multi-threaded
environment.
It inherits all the initialization paramters from :py:class:`SharedBus` and
:py:class:`I2CBus`.
"""
[docs] def __init__(self, **kwargs):
super().__init__(I2CBus, **kwargs)
[docs]class MockSMBus(SMBus):
"""Class for testing. Overides the ``SMBus`` methods in order to
simulate the data exchange. Intended for use in the CI testing.
Parameters
----------
robot: BaseRobot
The robot (we need it to access the registers)
err: float
A small number that will be used for generating random communication
errors so that we can perform testing of the code handling those.
"""
[docs] def __init__(self, robot, err=0.1):
self.__robot = robot
self.__err = err
self.fd = None
[docs] def open(self, port):
"""mock opens the bus."""
self.fd = port
[docs] def close(self):
"""Mock closes the bus. It raises a OSError at the end so that
the code can be checked for this behavior too.
"""
self.fd = None
# we do this so that the testing covers
# the error part of the branch
raise OSError('** auto generated: error closing the bus **')
def __common_read(self, dev_id, address):
if random.random() < self.__err:
logger.error('*** random generated read error ***')
raise OSError
# non-error case
device = self.__robot.device_by_id(dev_id)
reg = device.register_by_address(address)
if reg is None:
# it's the higher digits of the register; we'll return
# 0 and the lower digits will return the full register value
return 0
if reg.access == 'R':
# we randomize the read
plus = random.randint(-10, 10)
value = max(reg.minim, min(reg.maxim, reg.int_value + plus))
else:
value = reg.int_value
return value
[docs] def read_byte_data(self, dev_id, address):
"""Simulates the read of 1 Byte."""
logger.debug(f'reading BYTE from I2C bus {self.fd} '
f'device {dev_id} address {address}')
return self.__common_read(dev_id, address)
[docs] def read_word_data(self, dev_id, address):
"""Simulates the read of 1 Word."""
logger.debug(f'reading WORD from I2C bus {self.fd} '
f'device {dev_id} address {address}')
return self.__common_read(dev_id, address)
def __common_write(self, dev_id, address, value):
if random.random() < self.__err:
logger.error('*** random generated write error ***')
raise OSError('*** random generated write error ***')
return None
[docs] def write_byte_data(self, dev_id, address, value):
"""Simulates the write of one byte."""
logger.debug(f'writting BYTE to I2C bus {self.fd} '
f'device {dev_id} address {address} value {value}')
self.__common_write(dev_id, address, value)
[docs] def write_word_data(self, dev_id, address, value):
"""Simulates the write of one word."""
logger.debug(f'writting WORD to I2C bus {self.fd} '
f'device {dev_id} address {address} value {value}')
self.__common_write(dev_id, address, value)
[docs] def read_i2c_block_data(self, dev_id, address, length, force=None):
"""Simulates the read of one block of data."""
if random.random() < self.__err:
logger.error('*** random generated read-block error ***')
raise OSError
# we'll just return a random list of numbers
return random.sample(list(range(0, 255)), length)
[docs] def write_i2c_block_data(self, dev_id, address, data):
"""Simulates the write of one block of data."""
if random.random() < self.__err:
logger.error('*** random generated write-block error ***')
raise OSError