diff --git a/adafruit_bitbangio.py b/adafruit_bitbangio.py index 92d2b17..551f245 100644 --- a/adafruit_bitbangio.py +++ b/adafruit_bitbangio.py @@ -23,6 +23,14 @@ """ +try: + from typing import Optional, List + from typing_extensions import Literal + from circuitpython_typing import WriteableBuffer, ReadableBuffer + from microcontroller import Pin +except ImportError: + pass + # imports from time import monotonic from digitalio import DigitalInOut @@ -37,24 +45,24 @@ class _BitBangIO: """Base class for subclassing only""" - def __init__(self): + def __init__(self) -> None: self._locked = False - def try_lock(self): + def try_lock(self) -> bool: """Attempt to grab the lock. Return True on success, False if the lock is already taken.""" if self._locked: return False self._locked = True return True - def unlock(self): + def unlock(self) -> None: """Release the lock so others may use the resource.""" if self._locked: self._locked = False else: raise ValueError("Not locked") - def _check_lock(self): + def _check_lock(self) -> Literal[True]: if not self._locked: raise RuntimeError("First call try_lock()") return True @@ -62,11 +70,11 @@ def _check_lock(self): def __enter__(self): return self - def __exit__(self, exc_type, exc_value, traceback): + def __exit__(self, exc_type, exc_value, traceback) -> None: self.deinit() # pylint: disable=no-self-use - def deinit(self): + def deinit(self) -> None: """Free any hardware used by the object.""" return @@ -76,7 +84,9 @@ def deinit(self): class I2C(_BitBangIO): """Software-based implementation of the I2C protocol over GPIO pins.""" - def __init__(self, scl, sda, *, frequency=400000, timeout=1): + def __init__( + self, scl: Pin, sda: Pin, *, frequency: int = 400000, timeout: float = 1 + ) -> None: """Initialize bitbang (or software) based I2C. Must provide the I2C clock, and data pin numbers. """ @@ -95,17 +105,17 @@ def __init__(self, scl, sda, *, frequency=400000, timeout=1): self._delay = (1 / frequency) / 2 # half period self._timeout = timeout - def deinit(self): + def deinit(self) -> None: """Free any hardware used by the object.""" self._sda.deinit() self._scl.deinit() - def _wait(self): + def _wait(self) -> None: end = monotonic() + self._delay # half period while end > monotonic(): pass - def scan(self): + def scan(self) -> List[int]: """Perform an I2C Device Scan""" found = [] if self._check_lock(): @@ -114,14 +124,28 @@ def scan(self): found.append(address) return found - def writeto(self, address, buffer, *, start=0, end=None): + def writeto( + self, + address: int, + buffer: ReadableBuffer, + *, + start: int = 0, + end: Optional[int] = None, + ) -> None: """Write data from the buffer to an address""" if end is None: end = len(buffer) if self._check_lock(): self._write(address, buffer[start:end], True) - def readfrom_into(self, address, buffer, *, start=0, end=None): + def readfrom_into( + self, + address: int, + buffer: WriteableBuffer, + *, + start: int = 0, + end: Optional[int] = None, + ) -> None: """Read data from an address and into the buffer""" if end is None: end = len(buffer) @@ -133,15 +157,15 @@ def readfrom_into(self, address, buffer, *, start=0, end=None): def writeto_then_readfrom( self, - address, - buffer_out, - buffer_in, + address: int, + buffer_out: ReadableBuffer, + buffer_in: WriteableBuffer, *, - out_start=0, - out_end=None, - in_start=0, - in_end=None - ): + out_start: int = 0, + out_end: Optional[int] = None, + in_start: int = 0, + in_end: Optional[int] = None, + ) -> None: """Write data from buffer_out to an address and then read data from an address and into buffer_in """ @@ -153,30 +177,30 @@ def writeto_then_readfrom( self._write(address, buffer_out[out_start:out_end], False) self.readfrom_into(address, buffer_in, start=in_start, end=in_end) - def _scl_low(self): + def _scl_low(self) -> None: self._scl.switch_to_output(value=False) - def _sda_low(self): + def _sda_low(self) -> None: self._sda.switch_to_output(value=False) - def _scl_release(self): + def _scl_release(self) -> None: """Release and let the pullups lift""" # Use self._timeout to add clock stretching self._scl.switch_to_input() - def _sda_release(self): + def _sda_release(self) -> None: """Release and let the pullups lift""" # Use self._timeout to add clock stretching self._sda.switch_to_input() - def _start(self): + def _start(self) -> None: self._sda_release() self._scl_release() self._wait() self._sda_low() self._wait() - def _stop(self): + def _stop(self) -> None: self._scl_low() self._wait() self._sda_low() @@ -186,7 +210,7 @@ def _stop(self): self._sda_release() self._wait() - def _repeated_start(self): + def _repeated_start(self) -> None: self._scl_low() self._wait() self._sda_release() @@ -196,7 +220,7 @@ def _repeated_start(self): self._sda_low() self._wait() - def _write_byte(self, byte): + def _write_byte(self, byte: int) -> bool: for bit_position in range(8): self._scl_low() @@ -222,7 +246,7 @@ def _write_byte(self, byte): return not ack - def _read_byte(self, ack=False): + def _read_byte(self, ack: bool = False) -> int: self._scl_low() self._wait() # sda will already be an input as we are simulating open drain @@ -246,25 +270,27 @@ def _read_byte(self, ack=False): return data & 0xFF - def _probe(self, address): + def _probe(self, address: int) -> bool: self._start() ok = self._write_byte(address << 1) self._stop() return ok > 0 - def _write(self, address, buffer, transmit_stop): + def _write(self, address: int, buffer: ReadableBuffer, transmit_stop: bool) -> None: self._start() if not self._write_byte(address << 1): - raise RuntimeError("Device not responding at 0x{:02X}".format(address)) + # raise RuntimeError("Device not responding at 0x{:02X}".format(address)) + raise RuntimeError(f"Device not responding at 0x{address:02X}") for byte in buffer: self._write_byte(byte) if transmit_stop: self._stop() - def _read(self, address, length): + def _read(self, address: int, length: int) -> bytearray: self._start() if not self._write_byte(address << 1 | 1): - raise RuntimeError("Device not responding at 0x{:02X}".format(address)) + # raise RuntimeError("Device not responding at 0x{:02X}".format(address)) + raise RuntimeError(f"Device not responding at 0x{address:02X}") buffer = bytearray(length) for byte_position in range(length): buffer[byte_position] = self._read_byte(ack=(byte_position != length - 1)) @@ -275,7 +301,9 @@ def _read(self, address, length): class SPI(_BitBangIO): """Software-based implementation of the SPI protocol over GPIO pins.""" - def __init__(self, clock, MOSI=None, MISO=None): + def __init__( + self, clock: Pin, MOSI: Optional[Pin] = None, MISO: Optional[Pin] = None + ) -> None: """Initialize bit bang (or software) based SPI. Must provide the SPI clock, and optionally MOSI and MISO pin numbers. If MOSI is set to None then writes will be disabled and fail with an error, likewise for MISO @@ -304,7 +332,7 @@ def __init__(self, clock, MOSI=None, MISO=None): self._miso = DigitalInOut(MISO) self._miso.switch_to_input() - def deinit(self): + def deinit(self) -> None: """Free any hardware used by the object.""" self._sclk.deinit() if self._miso: @@ -312,7 +340,14 @@ def deinit(self): if self._mosi: self._mosi.deinit() - def configure(self, *, baudrate=100000, polarity=0, phase=0, bits=8): + def configure( + self, + *, + baudrate: int = 100000, + polarity: Literal[0, 1] = 0, + phase: Literal[0, 1] = 0, + bits: int = 8, + ) -> None: """Configures the SPI bus. Only valid when locked.""" if self._check_lock(): if not isinstance(baudrate, int): @@ -331,13 +366,15 @@ def configure(self, *, baudrate=100000, polarity=0, phase=0, bits=8): self._bits = bits self._half_period = (1 / self._baudrate) / 2 # 50% Duty Cyle delay - def _wait(self, start=None): + def _wait(self, start: Optional[int] = None) -> float: """Wait for up to one half cycle""" while (start + self._half_period) > monotonic(): pass return monotonic() # Return current time - def write(self, buffer, start=0, end=None): + def write( + self, buffer: ReadableBuffer, start: int = 0, end: Optional[int] = None + ) -> None: """Write the data contained in buf. Requires the SPI being locked. If the buffer is empty, nothing happens. """ @@ -369,7 +406,13 @@ def write(self, buffer, start=0, end=None): self._sclk.value = self._polarity # pylint: disable=too-many-branches - def readinto(self, buffer, start=0, end=None, write_value=0): + def readinto( + self, + buffer: WriteableBuffer, + start: int = 0, + end: Optional[int] = None, + write_value: int = 0, + ) -> None: """Read into the buffer specified by buf while writing zeroes. Requires the SPI being locked. If the number of bytes to read is 0, nothing happens. """ @@ -417,14 +460,14 @@ def readinto(self, buffer, start=0, end=None, write_value=0): def write_readinto( self, - buffer_out, - buffer_in, + buffer_out: ReadableBuffer, + buffer_in: WriteableBuffer, *, - out_start=0, - out_end=None, - in_start=0, - in_end=None - ): + out_start: int = 0, + out_end: Optional[int] = None, + in_start: int = 0, + in_end: Optional[int] = None, + ) -> None: """Write out the data in buffer_out while simultaneously reading data into buffer_in. The lengths of the slices defined by buffer_out[out_start:out_end] and buffer_in[in_start:in_end] must be equal. If buffer slice lengths are @@ -482,6 +525,6 @@ def write_readinto( # pylint: enable=too-many-branches @property - def frequency(self): + def frequency(self) -> int: """Return the currently configured baud rate""" return self._baudrate diff --git a/requirements.txt b/requirements.txt index 7a984a4..452ef26 100644 --- a/requirements.txt +++ b/requirements.txt @@ -3,3 +3,4 @@ # SPDX-License-Identifier: Unlicense Adafruit-Blinka +typing-extensions