Skip to content

Annotation and hinting for BitbangIO #23

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 8 commits into from
Aug 10, 2022
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
100 changes: 54 additions & 46 deletions adafruit_bitbangio.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,14 @@

"""

try:
from typing import Optional
from typing_extensions import Literal
from circuitpython_typing import WriteableBuffer, ReadableBuffer
from microcontroller import Pin
except ImportError:
pass

# imports
from time import monotonic
from digitalio import DigitalInOut
Expand All @@ -37,36 +45,36 @@
class _BitBangIO:
"""Base class for subclassing only"""

def __init__(self):
def __init__(self) -> None:
self._locked = False

def try_lock(self):
def try_lock(self) -> bool:
"""Attempt to grab the lock. Return True on success, False if the lock is already taken."""
if self._locked:
return False
self._locked = True
return True

def unlock(self):
def unlock(self) -> None:
"""Release the lock so others may use the resource."""
if self._locked:
self._locked = False
else:
raise ValueError("Not locked")

def _check_lock(self):
def _check_lock(self) -> Optional[bool]:
if not self._locked:
raise RuntimeError("First call try_lock()")
return True

def __enter__(self):
return self

def __exit__(self, exc_type, exc_value, traceback):
def __exit__(self, exc_type, exc_value, traceback) -> None:
self.deinit()

# pylint: disable=no-self-use
def deinit(self):
def deinit(self) -> None:
"""Free any hardware used by the object."""
return

Expand All @@ -76,7 +84,7 @@ def deinit(self):
class I2C(_BitBangIO):
"""Software-based implementation of the I2C protocol over GPIO pins."""

def __init__(self, scl, sda, *, frequency=400000, timeout=1):
def __init__(self, scl: microcontroller.Pin, sda: microcontroller.Pin, *, frequency: int = 400000, timeout: int = 1) -> None:
"""Initialize bitbang (or software) based I2C. Must provide the I2C
clock, and data pin numbers.
"""
Expand All @@ -95,17 +103,17 @@ def __init__(self, scl, sda, *, frequency=400000, timeout=1):
self._delay = (1 / frequency) / 2 # half period
self._timeout = timeout

def deinit(self):
def deinit(self) -> None:
"""Free any hardware used by the object."""
self._sda.deinit()
self._scl.deinit()

def _wait(self):
def _wait(self) -> None:
end = monotonic() + self._delay # half period
while end > monotonic():
pass

def scan(self):
def scan(self) -> List[int]:
"""Perform an I2C Device Scan"""
found = []
if self._check_lock():
Expand All @@ -114,14 +122,14 @@ def scan(self):
found.append(address)
return found

def writeto(self, address, buffer, *, start=0, end=None):
def writeto(self, address: int, buffer: ReadableBuffer, *, start=0, end=None) -> None:
"""Write data from the buffer to an address"""
if end is None:
end = len(buffer)
if self._check_lock():
self._write(address, buffer[start:end], True)

def readfrom_into(self, address, buffer, *, start=0, end=None):
def readfrom_into(self, address: int, buffer: WriteableBuffer, *, start: int = 0, end: Optional[int] = None) -> None:
"""Read data from an address and into the buffer"""
if end is None:
end = len(buffer)
Expand All @@ -133,15 +141,15 @@ def readfrom_into(self, address, buffer, *, start=0, end=None):

def writeto_then_readfrom(
self,
address,
buffer_out,
buffer_in,
address: int,
buffer_out: ReadableBuffer,
buffer_in: WriteableBuffer,
*,
out_start=0,
out_end=None,
in_start=0,
in_end=None
):
out_start: int = 0,
out_end: Optional[int] = None,
in_start: int = 0,
in_end: Optional[int] = None
) -> None:
"""Write data from buffer_out to an address and then
read data from an address and into buffer_in
"""
Expand All @@ -153,30 +161,30 @@ def writeto_then_readfrom(
self._write(address, buffer_out[out_start:out_end], False)
self.readfrom_into(address, buffer_in, start=in_start, end=in_end)

def _scl_low(self):
def _scl_low(self) -> None:
self._scl.switch_to_output(value=False)

def _sda_low(self):
def _sda_low(self) -> None:
self._sda.switch_to_output(value=False)

def _scl_release(self):
def _scl_release(self) -> None:
"""Release and let the pullups lift"""
# Use self._timeout to add clock stretching
self._scl.switch_to_input()

def _sda_release(self):
def _sda_release(self) -> None:
"""Release and let the pullups lift"""
# Use self._timeout to add clock stretching
self._sda.switch_to_input()

def _start(self):
def _start(self) -> None:
self._sda_release()
self._scl_release()
self._wait()
self._sda_low()
self._wait()

def _stop(self):
def _stop(self) -> None:
self._scl_low()
self._wait()
self._sda_low()
Expand All @@ -186,7 +194,7 @@ def _stop(self):
self._sda_release()
self._wait()

def _repeated_start(self):
def _repeated_start(self) -> None:
self._scl_low()
self._wait()
self._sda_release()
Expand All @@ -196,7 +204,7 @@ def _repeated_start(self):
self._sda_low()
self._wait()

def _write_byte(self, byte):
def _write_byte(self, byte: int ) -> bool:
for bit_position in range(8):
self._scl_low()

Expand All @@ -222,7 +230,7 @@ def _write_byte(self, byte):

return not ack

def _read_byte(self, ack=False):
def _read_byte(self, ack: bool = False) -> int:
self._scl_low()
self._wait()
# sda will already be an input as we are simulating open drain
Expand All @@ -246,13 +254,13 @@ def _read_byte(self, ack=False):

return data & 0xFF

def _probe(self, address):
def _probe(self, address: int) -> bool:
self._start()
ok = self._write_byte(address << 1)
self._stop()
return ok > 0

def _write(self, address, buffer, transmit_stop):
def _write(self, address: int, buffer: ReadableBuffer, transmit_stop: bool) -> None:
self._start()
if not self._write_byte(address << 1):
raise RuntimeError("Device not responding at 0x{:02X}".format(address))
Expand All @@ -261,7 +269,7 @@ def _write(self, address, buffer, transmit_stop):
if transmit_stop:
self._stop()

def _read(self, address, length):
def _read(self, address: int, length: int) -> ReadableBuffer:
self._start()
if not self._write_byte(address << 1 | 1):
raise RuntimeError("Device not responding at 0x{:02X}".format(address))
Expand All @@ -275,7 +283,7 @@ def _read(self, address, length):
class SPI(_BitBangIO):
"""Software-based implementation of the SPI protocol over GPIO pins."""

def __init__(self, clock, MOSI=None, MISO=None):
def __init__(self, clock: microcontroller.Pin, MOSI: Optional[microcontroller.Pin] = None, MISO: Optional[microcontroller.Pin] = None) -> None:
"""Initialize bit bang (or software) based SPI. Must provide the SPI
clock, and optionally MOSI and MISO pin numbers. If MOSI is set to None
then writes will be disabled and fail with an error, likewise for MISO
Expand Down Expand Up @@ -304,15 +312,15 @@ def __init__(self, clock, MOSI=None, MISO=None):
self._miso = DigitalInOut(MISO)
self._miso.switch_to_input()

def deinit(self):
def deinit(self) -> None:
"""Free any hardware used by the object."""
self._sclk.deinit()
if self._miso:
self._miso.deinit()
if self._mosi:
self._mosi.deinit()

def configure(self, *, baudrate=100000, polarity=0, phase=0, bits=8):
def configure(self, *, baudrate: int = 100000, polarity: Literal[0,1] = 0, phase: Literal[0,1] = 0, bits: int = 8) -> None:
"""Configures the SPI bus. Only valid when locked."""
if self._check_lock():
if not isinstance(baudrate, int):
Expand All @@ -331,13 +339,13 @@ def configure(self, *, baudrate=100000, polarity=0, phase=0, bits=8):
self._bits = bits
self._half_period = (1 / self._baudrate) / 2 # 50% Duty Cyle delay

def _wait(self, start=None):
def _wait(self, start: Optional[int] = None) -> float:
"""Wait for up to one half cycle"""
while (start + self._half_period) > monotonic():
pass
return monotonic() # Return current time

def write(self, buffer, start=0, end=None):
def write(self, buffer: ReadableBuffer, start: int = 0, end: Optional[int] = None) -> None:
"""Write the data contained in buf. Requires the SPI being locked.
If the buffer is empty, nothing happens.
"""
Expand Down Expand Up @@ -369,7 +377,7 @@ def write(self, buffer, start=0, end=None):
self._sclk.value = self._polarity

# pylint: disable=too-many-branches
def readinto(self, buffer, start=0, end=None, write_value=0):
def readinto(self, buffer: WriteableBuffer, start: int = 0, end: Optional[int] = None, write_value: int = 0) -> None:
"""Read into the buffer specified by buf while writing zeroes. Requires the SPI being
locked. If the number of bytes to read is 0, nothing happens.
"""
Expand Down Expand Up @@ -417,14 +425,14 @@ def readinto(self, buffer, start=0, end=None, write_value=0):

def write_readinto(
self,
buffer_out,
buffer_in,
buffer_out: ReadableBuffer,
buffer_in: WriteableBuffer,
*,
out_start=0,
out_end=None,
in_start=0,
in_end=None
):
out_start: int = 0,
out_end: Optional[int] = None,
in_start: int = 0,
in_end: Optional[int] = None
) -> None:
"""Write out the data in buffer_out while simultaneously reading data into buffer_in.
The lengths of the slices defined by buffer_out[out_start:out_end] and
buffer_in[in_start:in_end] must be equal. If buffer slice lengths are
Expand Down Expand Up @@ -482,6 +490,6 @@ def write_readinto(
# pylint: enable=too-many-branches

@property
def frequency(self):
def frequency(self) -> int:
"""Return the currently configured baud rate"""
return self._baudrate
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,4 @@
# SPDX-License-Identifier: Unlicense

Adafruit-Blinka
typing-extensions
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
author_email="[email protected]",
install_requires=[
"Adafruit-Blinka",
"typing-extensions",
],
# Choose your license
license="MIT",
Expand Down