diff --git a/adafruit_rfm69.py b/adafruit_rfm69.py index d564c68..8892907 100644 --- a/adafruit_rfm69.py +++ b/adafruit_rfm69.py @@ -61,6 +61,14 @@ except ImportError: pass +try: + from typing import Callable, Optional, Type + from circuitpython_typing import WriteableBuffer, ReadableBuffer + from digitalio import DigitalInOut + from busio import SPI +except ImportError: + pass + __version__ = "0.0.0+auto.0" __repo__ = "https://github.com/adafruit/Adafruit_CircuitPython_RFM69.git" @@ -134,7 +142,7 @@ # pylint: disable=too-many-instance-attributes -def ticks_diff(ticks1, ticks2): +def ticks_diff(ticks1: int, ticks2: int) -> int: """Compute the signed difference between two ticks values assuming that they are within 2**28 ticks """ @@ -143,7 +151,7 @@ def ticks_diff(ticks1, ticks2): return diff -def check_timeout(flag, limit): +def check_timeout(flag: Callable, limit: float) -> bool: """test for timeout waiting for specified flag""" timed_out = False if HAS_SUPERVISOR: @@ -224,7 +232,7 @@ class _RegisterBits: # check from pylint. # pylint: disable=protected-access - def __init__(self, address, *, offset=0, bits=1): + def __init__(self, address: int, *, offset: int = 0, bits: int = 1) -> None: assert 0 <= offset <= 7 assert 1 <= bits <= 8 assert (offset + bits) <= 8 @@ -236,11 +244,11 @@ def __init__(self, address, *, offset=0, bits=1): self._mask <<= offset self._offset = offset - def __get__(self, obj, objtype): + def __get__(self, obj: Optional["RFM69"], objtype: Type["RFM69"]): reg_value = obj._read_u8(self._address) return (reg_value & self._mask) >> self._offset - def __set__(self, obj, val): + def __set__(self, obj: Optional["RFM69"], val: int) -> None: reg_value = obj._read_u8(self._address) reg_value &= ~self._mask reg_value |= (val & 0xFF) << self._offset @@ -274,19 +282,19 @@ def __set__(self, obj, val): dio_0_mapping = _RegisterBits(_REG_DIO_MAPPING1, offset=6, bits=2) # pylint: disable=too-many-statements - def __init__( + def __init__( # pylint: disable=invalid-name self, - spi, - cs, - reset, - frequency, + spi: SPI, + cs: DigitalInOut, + reset: DigitalInOut, + frequency: int, *, - sync_word=b"\x2D\xD4", - preamble_length=4, - encryption_key=None, - high_power=True, - baudrate=2000000 - ): + sync_word: bytes = b"\x2D\xD4", + preamble_length: int = 4, + encryption_key: Optional[bytes] = None, + high_power: bool = True, + baudrate: int = 2000000 + ) -> None: self._tx_power = 13 self.high_power = high_power # Device support SPI mode 0 (polarity & phase = 0) up to a max of 10mhz. @@ -389,7 +397,9 @@ def __init__( # pylint: disable=no-member # Reconsider this disable when it can be tested. - def _read_into(self, address, buf, length=None): + def _read_into( + self, address: int, buf: WriteableBuffer, length: Optional[int] = None + ) -> None: # Read a number of bytes from the specified address into the provided # buffer. If length is not specified (the default) the entire buffer # will be filled. @@ -401,12 +411,14 @@ def _read_into(self, address, buf, length=None): device.write(self._BUFFER, end=1) device.readinto(buf, end=length) - def _read_u8(self, address): + def _read_u8(self, address: int) -> int: # Read a single byte from the provided address and return it. self._read_into(address, self._BUFFER, length=1) return self._BUFFER[0] - def _write_from(self, address, buf, length=None): + def _write_from( + self, address: int, buf: ReadableBuffer, length: Optional[int] = None + ) -> None: # Write a number of bytes to the provided address and taken from the # provided buffer. If no length is specified (the default) the entire # buffer is written. @@ -418,7 +430,7 @@ def _write_from(self, address, buf, length=None): device.write(self._BUFFER, end=1) device.write(buf, end=length) # send data - def _write_u8(self, address, val): + def _write_u8(self, address: int, val: int) -> None: # Write a byte register to the chip. Specify the 7-bit address and the # 8-bit value to write to that address. with self._device as device: @@ -427,7 +439,7 @@ def _write_u8(self, address, val): self._BUFFER[1] = val & 0xFF device.write(self._BUFFER, end=2) - def reset(self): + def reset(self) -> None: """Perform a reset of the chip.""" # See section 7.2.2 of the datasheet for reset description. self._reset.value = True @@ -435,7 +447,7 @@ def reset(self): self._reset.value = False time.sleep(0.005) # 5 ms - def idle(self): + def idle(self) -> None: """Enter idle standby mode (switching off high power amplifiers if necessary).""" # Like RadioHead library, turn off high power boost if enabled. if self._tx_power >= 18: @@ -443,11 +455,11 @@ def idle(self): self._write_u8(_REG_TEST_PA2, _TEST_PA2_NORMAL) self.operation_mode = STANDBY_MODE - def sleep(self): + def sleep(self) -> None: """Enter sleep mode.""" self.operation_mode = SLEEP_MODE - def listen(self): + def listen(self) -> None: """Listen for packets to be received by the chip. Use :py:func:`receive` to listen, wait and retrieve packets as they're available. """ @@ -460,7 +472,7 @@ def listen(self): # Enter RX mode (will clear FIFO!). self.operation_mode = RX_MODE - def transmit(self): + def transmit(self) -> None: """Transmit a packet which is queued in the FIFO. This is a low level function for entering transmit mode and more. For generating and transmitting a packet of data use :py:func:`send` instead. @@ -475,7 +487,7 @@ def transmit(self): self.operation_mode = TX_MODE @property - def temperature(self): + def temperature(self) -> float: """The internal temperature of the chip in degrees Celsius. Be warned this is not calibrated or very accurate. @@ -491,7 +503,7 @@ def temperature(self): return 166.0 - temp @property - def operation_mode(self): + def operation_mode(self) -> int: """The operation mode value. Unless you're manually controlling the chip you shouldn't change the operation_mode with this property as other side-effects are required for changing logical modes--use :py:func:`idle`, :py:func:`sleep`, :py:func:`transmit`, @@ -501,7 +513,7 @@ def operation_mode(self): return (op_mode >> 2) & 0b111 @operation_mode.setter - def operation_mode(self, val): + def operation_mode(self, val: int) -> None: assert 0 <= val <= 4 # Set the mode bits inside the operation mode register. op_mode = self._read_u8(_REG_OP_MODE) @@ -521,7 +533,7 @@ def operation_mode(self, val): raise TimeoutError("Operation Mode failed to set.") @property - def sync_word(self): + def sync_word(self) -> bytearray: """The synchronization word value. This is a byte string up to 8 bytes long (64 bits) which indicates the synchronization word for transmitted and received packets. Any received packet which does not include this sync word will be ignored. The default value @@ -539,7 +551,7 @@ def sync_word(self): return sync_word @sync_word.setter - def sync_word(self, val): + def sync_word(self, val: Optional[bytearray]) -> None: # Handle disabling sync word when None value is set. if val is None: self.sync_on = 0 @@ -553,7 +565,7 @@ def sync_word(self, val): self.sync_on = 1 @property - def preamble_length(self): + def preamble_length(self) -> int: """The length of the preamble for sent and received packets, an unsigned 16-bit value. Received packets must match this length or they are ignored! Set to 4 to match the RadioHead RFM69 library. @@ -563,13 +575,13 @@ def preamble_length(self): return ((msb << 8) | lsb) & 0xFFFF @preamble_length.setter - def preamble_length(self, val): + def preamble_length(self, val: int) -> None: assert 0 <= val <= 65535 self._write_u8(_REG_PREAMBLE_MSB, (val >> 8) & 0xFF) self._write_u8(_REG_PREAMBLE_LSB, val & 0xFF) @property - def frequency_mhz(self): + def frequency_mhz(self) -> float: """The frequency of the radio in Megahertz. Only the allowed values for your radio must be specified (i.e. 433 vs. 915 mhz)! """ @@ -584,7 +596,7 @@ def frequency_mhz(self): return frequency @frequency_mhz.setter - def frequency_mhz(self, val): + def frequency_mhz(self, val: float) -> None: assert 290 <= val <= 1020 # Calculate FRF register 24-bit value using section 6.2 of the datasheet. frf = int((val * 1000000.0) / _FSTEP) & 0xFFFFFF @@ -597,7 +609,7 @@ def frequency_mhz(self, val): self._write_u8(_REG_FRF_LSB, lsb) @property - def encryption_key(self): + def encryption_key(self) -> bytearray: """The AES encryption key used to encrypt and decrypt packets by the chip. This can be set to None to disable encryption (the default), otherwise it must be a 16 byte long byte string which defines the key (both the transmitter and receiver must use the same key @@ -612,7 +624,7 @@ def encryption_key(self): return key @encryption_key.setter - def encryption_key(self, val): + def encryption_key(self, val: bytearray) -> None: # Handle if unsetting the encryption key (None value). if val is None: self.aes_on = 0 @@ -623,7 +635,7 @@ def encryption_key(self, val): self.aes_on = 1 @property - def tx_power(self): + def tx_power(self) -> int: """The transmit power in dBm. Can be set to a value from -2 to 20 for high power devices (RFM69HCW, high_power=True) or -18 to 13 for low power devices. Only integer power levels are actually set (i.e. 12.5 will result in a value of 12 dBm). @@ -648,7 +660,7 @@ def tx_power(self): raise RuntimeError("Power amplifiers in unknown state!") @tx_power.setter - def tx_power(self, val): + def tx_power(self, val: float): val = int(val) # Determine power amplifier and output power values depending on # high power state and requested power. @@ -685,7 +697,7 @@ def tx_power(self, val): self._tx_power = val @property - def rssi(self): + def rssi(self) -> float: """The received strength indicator (in dBm). May be inaccuate if not read immediatey. last_rssi contains the value read immediately receipt of the last packet. @@ -694,7 +706,7 @@ def rssi(self): return -self._read_u8(_REG_RSSI_VALUE) / 2.0 @property - def bitrate(self): + def bitrate(self) -> float: """The modulation bitrate in bits/second (or chip rate if Manchester encoding is enabled). Can be a value from ~489 to 32mbit/s, but see the datasheet for the exact supported values. @@ -704,7 +716,7 @@ def bitrate(self): return _FXOSC / ((msb << 8) | lsb) @bitrate.setter - def bitrate(self, val): + def bitrate(self, val: float) -> None: assert (_FXOSC / 65535) <= val <= 32000000.0 # Round up to the next closest bit-rate value with addition of 0.5. bitrate = int((_FXOSC / val) + 0.5) & 0xFFFF @@ -712,39 +724,39 @@ def bitrate(self, val): self._write_u8(_REG_BITRATE_LSB, bitrate & 0xFF) @property - def frequency_deviation(self): + def frequency_deviation(self) -> float: """The frequency deviation in Hertz.""" msb = self._read_u8(_REG_FDEV_MSB) lsb = self._read_u8(_REG_FDEV_LSB) return _FSTEP * ((msb << 8) | lsb) @frequency_deviation.setter - def frequency_deviation(self, val): + def frequency_deviation(self, val: float) -> None: assert 0 <= val <= (_FSTEP * 16383) # fdev is a 14-bit unsigned value # Round up to the next closest integer value with addition of 0.5. fdev = int((val / _FSTEP) + 0.5) & 0x3FFF self._write_u8(_REG_FDEV_MSB, fdev >> 8) self._write_u8(_REG_FDEV_LSB, fdev & 0xFF) - def packet_sent(self): + def packet_sent(self) -> bool: """Transmit status""" return (self._read_u8(_REG_IRQ_FLAGS2) & 0x8) >> 3 - def payload_ready(self): + def payload_ready(self) -> bool: """Receive status""" return (self._read_u8(_REG_IRQ_FLAGS2) & 0x4) >> 2 # pylint: disable=too-many-branches def send( self, - data, + data: ReadableBuffer, *, - keep_listening=False, - destination=None, - node=None, - identifier=None, - flags=None - ): + keep_listening: bool = False, + destination: Optional[int] = None, + node: Optional[int] = None, + identifier: Optional[int] = None, + flags: Optional[int] = None + ) -> bool: """Send a string of data using the transmitter. You can only send 60 bytes at a time (limited by chip's FIFO size and appended headers). @@ -801,7 +813,7 @@ def send( self.idle() return not timed_out - def send_with_ack(self, data): + def send_with_ack(self, data: int) -> bool: """Reliable Datagram mode: Send a packet with data and wait for an ACK response. The packet header is automatically generated. @@ -839,8 +851,13 @@ def send_with_ack(self, data): return got_ack def receive( - self, *, keep_listening=True, with_ack=False, timeout=None, with_header=False - ): + self, + *, + keep_listening: bool = True, + with_ack: bool = False, + timeout: Optional[float] = None, + with_header: bool = False + ) -> int: """Wait to receive a packet from the receiver. If a packet is found the payload bytes are returned, otherwise None is returned (which indicates the timeout elapsed with no reception). diff --git a/requirements.txt b/requirements.txt index a45c547..76ea39a 100644 --- a/requirements.txt +++ b/requirements.txt @@ -4,3 +4,4 @@ Adafruit-Blinka adafruit-circuitpython-busdevice +adafruit-circuitpython-typing