From 0d3fde39892cb011b6c16395f559b13bad9aec21 Mon Sep 17 00:00:00 2001 From: Alec Delaney Date: Thu, 6 Jan 2022 12:37:54 -0500 Subject: [PATCH 1/4] Change rom variable initial value to None Easier for typing, but gives the same effect with respect to truthiness --- adafruit_onewire/bus.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/adafruit_onewire/bus.py b/adafruit_onewire/bus.py index 1938777..9054338 100644 --- a/adafruit_onewire/bus.py +++ b/adafruit_onewire/bus.py @@ -129,7 +129,7 @@ def scan(self): """Scan for devices on the bus and return a list of addresses.""" devices = [] diff = 65 - rom = False + rom = None count = 0 for _ in range(0xFF): rom, diff = self._search_rom(rom, diff) From c49def89b5d5acefb1cab0f60049551752e85d43 Mon Sep 17 00:00:00 2001 From: Alec Delaney Date: Thu, 6 Jan 2022 12:49:29 -0500 Subject: [PATCH 2/4] List out parameters for OneWireDevice __exit__() --- adafruit_onewire/device.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/adafruit_onewire/device.py b/adafruit_onewire/device.py index ba62baa..7f2fbd3 100644 --- a/adafruit_onewire/device.py +++ b/adafruit_onewire/device.py @@ -28,7 +28,7 @@ def __enter__(self): self._select_rom() return self - def __exit__(self, *exc): + def __exit__(self, exception_type: Optional[Type[type]], exception_value: Optional[BaseException], traceback: Optional[TracebackType]) -> bool: return False def readinto(self, buf, *, start=0, end=None): From b1e3ac2157339a691cc4c2cac6dae7c442436c00 Mon Sep 17 00:00:00 2001 From: Alec Delaney Date: Thu, 6 Jan 2022 12:50:48 -0500 Subject: [PATCH 3/4] Add type hints --- adafruit_onewire/bus.py | 38 ++++++++++++++++++++++---------------- adafruit_onewire/device.py | 17 ++++++++++++----- 2 files changed, 34 insertions(+), 21 deletions(-) diff --git a/adafruit_onewire/bus.py b/adafruit_onewire/bus.py index 9054338..2949fc2 100644 --- a/adafruit_onewire/bus.py +++ b/adafruit_onewire/bus.py @@ -17,6 +17,12 @@ import busio from micropython import const +try: + from typing import Optional, List, Tuple + from microcontroller import Pin +except ImportError: + pass + _SEARCH_ROM = const(0xF0) _MATCH_ROM = const(0x55) _SKIP_ROM = const(0xCC) @@ -30,26 +36,26 @@ class OneWireError(Exception): class OneWireAddress: """A class to represent a 1-Wire address.""" - def __init__(self, rom): + def __init__(self, rom: bytearray) -> None: self._rom = rom @property - def rom(self): + def rom(self) -> bytearray: """The unique 64 bit ROM code.""" return self._rom @property - def crc(self): + def crc(self) -> int: """The 8 bit CRC.""" return self._rom[7] @property - def serial_number(self): + def serial_number(self) -> bytearray: """The 48 bit serial number.""" return self._rom[1:7] @property - def family_code(self): + def family_code(self) -> int: """The 8 bit family code.""" return self._rom[0] @@ -57,7 +63,7 @@ def family_code(self): class OneWireBus: """A class to represent a 1-Wire bus.""" - def __init__(self, pin): + def __init__(self, pin: Pin) -> None: # pylint: disable=no-member self._ow = busio.OneWire(pin) self._readbit = self._ow.read_bit @@ -65,21 +71,21 @@ def __init__(self, pin): self._maximum_devices = _MAX_DEV @property - def maximum_devices(self): + def maximum_devices(self) -> int: """The maximum number of devices the bus will scan for. Valid range is 1 to 255. It is an error to have more devices on the bus than this number. Having less is OK. """ return self._maximum_devices @maximum_devices.setter - def maximum_devices(self, count): + def maximum_devices(self, count: int) -> None: if not isinstance(count, int): raise ValueError("Maximum must be an integer value 1 - 255.") if count < 1 or count > 0xFF: raise ValueError("Maximum must be an integer value 1 - 255.") self._maximum_devices = count - def reset(self, required=False): + def reset(self, required: bool = False) -> bool: """ Perform a reset and check for presence pulse. @@ -90,7 +96,7 @@ def reset(self, required=False): raise OneWireError("No presence pulse found. Check devices and wiring.") return not reset - def readinto(self, buf, *, start=0, end=None): + def readinto(self, buf: bytearray, *, start: int = 0, end: Optional[int] = None) -> None: """ Read into ``buf`` from the device. The number of bytes read will be the length of ``buf``. @@ -108,7 +114,7 @@ def readinto(self, buf, *, start=0, end=None): for i in range(start, end): buf[i] = self._readbyte() - def write(self, buf, *, start=0, end=None): + def write(self, buf: bytearray, *, start: int = 0, end: Optional[int] = None) -> None: """ Write the bytes from ``buf`` to the device. @@ -125,7 +131,7 @@ def write(self, buf, *, start=0, end=None): for i in range(start, end): self._writebyte(buf[i]) - def scan(self): + def scan(self) -> List[OneWireAddress]: """Scan for devices on the bus and return a list of addresses.""" devices = [] diff = 65 @@ -146,18 +152,18 @@ def scan(self): break return devices - def _readbyte(self): + def _readbyte(self) -> int: val = 0 for i in range(8): val |= self._ow.read_bit() << i return val - def _writebyte(self, value): + def _writebyte(self, value: int) -> None: for i in range(8): bit = (value >> i) & 0x1 self._ow.write_bit(bit) - def _search_rom(self, l_rom, diff): + def _search_rom(self, l_rom: Optional[bytearray], diff: int) -> Tuple[bytearray, int]: if not self.reset(): return None, 0 self._writebyte(_SEARCH_ROM) @@ -185,7 +191,7 @@ def _search_rom(self, l_rom, diff): return rom, next_diff @staticmethod - def crc8(data): + def crc8(data: bytearray) -> int: """ Perform the 1-Wire CRC check on the provided data. diff --git a/adafruit_onewire/device.py b/adafruit_onewire/device.py index 7f2fbd3..50b9df1 100644 --- a/adafruit_onewire/device.py +++ b/adafruit_onewire/device.py @@ -14,24 +14,31 @@ __version__ = "0.0.0-auto.0" __repo__ = "https://github.com/adafruit/Adafruit_CircuitPython_OneWire.git" +try: + from typing import Optional, Type + from types import TracebackType + from adafruit_onewire.bus import OneWireBus, OneWireAddress +except ImportError: + pass + _MATCH_ROM = b"\x55" class OneWireDevice: """A class to represent a single device on the 1-Wire bus.""" - def __init__(self, bus, address): + def __init__(self, bus: OneWireBus, address: OneWireAddress): self._bus = bus self._address = address - def __enter__(self): + def __enter__(self) -> 'OneWireDevice': self._select_rom() return self def __exit__(self, exception_type: Optional[Type[type]], exception_value: Optional[BaseException], traceback: Optional[TracebackType]) -> bool: return False - def readinto(self, buf, *, start=0, end=None): + def readinto(self, buf: bytearray, *, start: int = 0, end: Optional[int] = None) -> None: """ Read into ``buf`` from the device. The number of bytes read will be the length of ``buf``. @@ -49,7 +56,7 @@ def readinto(self, buf, *, start=0, end=None): if self._bus.crc8(buf): raise RuntimeError("CRC error.") - def write(self, buf, *, start=0, end=None): + def write(self, buf: bytearray, *, start: int = 0, end: Optional[int] = None) -> None: """ Write the bytes from ``buf`` to the device. @@ -63,7 +70,7 @@ def write(self, buf, *, start=0, end=None): """ return self._bus.write(buf, start=start, end=end) - def _select_rom(self): + def _select_rom(self) -> None: self._bus.reset() self.write(_MATCH_ROM) self.write(self._address.rom) From 3da156b9c80724e42299dccdde0b98630c0cd783 Mon Sep 17 00:00:00 2001 From: Alec Delaney Date: Thu, 6 Jan 2022 13:01:53 -0500 Subject: [PATCH 4/4] Reformatted per pre-commit --- adafruit_onewire/bus.py | 12 +++++++++--- adafruit_onewire/device.py | 17 +++++++++++++---- 2 files changed, 22 insertions(+), 7 deletions(-) diff --git a/adafruit_onewire/bus.py b/adafruit_onewire/bus.py index 2949fc2..51a55d1 100644 --- a/adafruit_onewire/bus.py +++ b/adafruit_onewire/bus.py @@ -96,7 +96,9 @@ def reset(self, required: bool = False) -> bool: raise OneWireError("No presence pulse found. Check devices and wiring.") return not reset - def readinto(self, buf: bytearray, *, start: int = 0, end: Optional[int] = None) -> None: + def readinto( + self, buf: bytearray, *, start: int = 0, end: Optional[int] = None + ) -> None: """ Read into ``buf`` from the device. The number of bytes read will be the length of ``buf``. @@ -114,7 +116,9 @@ def readinto(self, buf: bytearray, *, start: int = 0, end: Optional[int] = None) for i in range(start, end): buf[i] = self._readbyte() - def write(self, buf: bytearray, *, start: int = 0, end: Optional[int] = None) -> None: + def write( + self, buf: bytearray, *, start: int = 0, end: Optional[int] = None + ) -> None: """ Write the bytes from ``buf`` to the device. @@ -163,7 +167,9 @@ def _writebyte(self, value: int) -> None: bit = (value >> i) & 0x1 self._ow.write_bit(bit) - def _search_rom(self, l_rom: Optional[bytearray], diff: int) -> Tuple[bytearray, int]: + def _search_rom( + self, l_rom: Optional[bytearray], diff: int + ) -> Tuple[bytearray, int]: if not self.reset(): return None, 0 self._writebyte(_SEARCH_ROM) diff --git a/adafruit_onewire/device.py b/adafruit_onewire/device.py index 50b9df1..36aabf0 100644 --- a/adafruit_onewire/device.py +++ b/adafruit_onewire/device.py @@ -31,14 +31,21 @@ def __init__(self, bus: OneWireBus, address: OneWireAddress): self._bus = bus self._address = address - def __enter__(self) -> 'OneWireDevice': + def __enter__(self) -> "OneWireDevice": self._select_rom() return self - def __exit__(self, exception_type: Optional[Type[type]], exception_value: Optional[BaseException], traceback: Optional[TracebackType]) -> bool: + def __exit__( + self, + exception_type: Optional[Type[type]], + exception_value: Optional[BaseException], + traceback: Optional[TracebackType], + ) -> bool: return False - def readinto(self, buf: bytearray, *, start: int = 0, end: Optional[int] = None) -> None: + def readinto( + self, buf: bytearray, *, start: int = 0, end: Optional[int] = None + ) -> None: """ Read into ``buf`` from the device. The number of bytes read will be the length of ``buf``. @@ -56,7 +63,9 @@ def readinto(self, buf: bytearray, *, start: int = 0, end: Optional[int] = None) if self._bus.crc8(buf): raise RuntimeError("CRC error.") - def write(self, buf: bytearray, *, start: int = 0, end: Optional[int] = None) -> None: + def write( + self, buf: bytearray, *, start: int = 0, end: Optional[int] = None + ) -> None: """ Write the bytes from ``buf`` to the device.