diff --git a/adafruit_rfm9x.py b/adafruit_rfm9x.py index 772ab18..cc9561a 100644 --- a/adafruit_rfm9x.py +++ b/adafruit_rfm9x.py @@ -26,6 +26,16 @@ HAS_SUPERVISOR = True except ImportError: pass + +try: + from typing import Optional, Type, Literal + from digitalio import DigitalInOut + from busio import SPI + from circuitpython_typing import WriteableBuffer, ReadableBuffer + +except ImportError: + pass + __version__ = "0.0.0-auto.0" __repo__ = "https://github.com/adafruit/Adafruit_CircuitPython_RFM9x.git" @@ -120,7 +130,7 @@ # pylint: disable=too-many-statements -def ticks_diff(ticks1, ticks2): +def ticks_diff(ticks1: int, ticks2: int) -> int: """Compute the signed difference between two ticks values assuming that they are within 2**28 ticks """ @@ -131,7 +141,7 @@ def ticks_diff(ticks1, ticks2): class RFM9x: """Interface to a RFM95/6/7/8 LoRa radio module. Allows sending and - receivng bytes of data in long range LoRa mode at a support board frequency + receiving bytes of data in long range LoRa mode at a support board frequency (433/915mhz). You must specify the following parameters: @@ -186,7 +196,7 @@ class _RegisterBits: # check from pylint. # pylint: disable=protected-access - def __init__(self, address, *, offset=0, bits=1): + def __init__(self, address: int, *, offset: int = 0, bits: int = 1) -> None: assert 0 <= offset <= 7 assert 1 <= bits <= 8 assert (offset + bits) <= 8 @@ -198,11 +208,11 @@ def __init__(self, address, *, offset=0, bits=1): self._mask <<= offset self._offset = offset - def __get__(self, obj, objtype): + def __get__(self, obj: "RFM9x", objtype: Type["RFM9x"]) -> int: reg_value = obj._read_u8(self._address) return (reg_value & self._mask) >> self._offset - def __set__(self, obj, val): + def __set__(self, obj: "RFM9x", val: int) -> None: reg_value = obj._read_u8(self._address) reg_value &= ~self._mask reg_value |= (val & 0xFF) << self._offset @@ -243,17 +253,17 @@ def __set__(self, obj, val): def __init__( self, - spi, - cs, - reset, - frequency, + spi: SPI, + cs: DigitalInOut, + reset: DigitalInOut, + frequency: int, *, - preamble_length=8, - high_power=True, - baudrate=5000000, - agc=False, - crc=True - ): + preamble_length: int = 8, + high_power: bool = True, + baudrate: int = 5000000, + agc: bool = False, + crc: bool = True + ) -> None: self.high_power = high_power # Device support SPI mode 0 (polarity & phase = 0) up to a max of 10mhz. # Set Default Baudrate to 5MHz to avoid problems @@ -363,7 +373,9 @@ def __init__( # pylint: disable=no-member # Reconsider pylint: disable when this can be tested - def _read_into(self, address, buf, length=None): + def _read_into( + self, address: int, buf: WriteableBuffer, length: Optional[int] = None + ) -> None: # Read a number of bytes from the specified address into the provided # buffer. If length is not specified (the default) the entire buffer # will be filled. @@ -375,12 +387,14 @@ def _read_into(self, address, buf, length=None): device.write(self._BUFFER, end=1) device.readinto(buf, end=length) - def _read_u8(self, address): + def _read_u8(self, address: int) -> int: # Read a single byte from the provided address and return it. self._read_into(address, self._BUFFER, length=1) return self._BUFFER[0] - def _write_from(self, address, buf, length=None): + def _write_from( + self, address: int, buf: ReadableBuffer, length: Optional[int] = None + ) -> None: # Write a number of bytes to the provided address and taken from the # provided buffer. If no length is specified (the default) the entire # buffer is written. @@ -392,16 +406,17 @@ def _write_from(self, address, buf, length=None): device.write(self._BUFFER, end=1) device.write(buf, end=length) - def _write_u8(self, address, val): + def _write_u8(self, address: int, val: int) -> None: # Write a byte register to the chip. Specify the 7-bit address and the # 8-bit value to write to that address. with self._device as device: - self._BUFFER[0] = (address | 0x80) & 0xFF # Set top bit to 1 to - # indicate a write. + self._BUFFER[0] = ( + address | 0x80 + ) & 0xFF # Set top bit to 1 to indicate a write. self._BUFFER[1] = val & 0xFF device.write(self._BUFFER, end=2) - def reset(self): + def reset(self) -> None: """Perform a reset of the chip.""" # See section 7.2.2 of the datasheet for reset description. self._reset.value = False # Set Reset Low @@ -409,22 +424,22 @@ def reset(self): self._reset.value = True # set Reset High time.sleep(0.005) # 5 ms - def idle(self): + def idle(self) -> None: """Enter idle standby mode.""" self.operation_mode = STANDBY_MODE - def sleep(self): + def sleep(self) -> None: """Enter sleep mode.""" self.operation_mode = SLEEP_MODE - def listen(self): + def listen(self) -> None: """Listen for packets to be received by the chip. Use :py:func:`receive` to listen, wait and retrieve packets as they're available. """ self.operation_mode = RX_MODE self.dio0_mapping = 0b00 # Interrupt on rx done. - def transmit(self): + def transmit(self) -> None: """Transmit a packet which is queued in the FIFO. This is a low level function for entering transmit mode and more. For generating and transmitting a packet of data use :py:func:`send` instead. @@ -433,7 +448,7 @@ def transmit(self): self.dio0_mapping = 0b01 # Interrupt on tx done. @property - def preamble_length(self): + def preamble_length(self) -> int: """The length of the preamble for sent and received packets, an unsigned 16-bit value. Received packets must match this length or they are ignored! Set to 8 to match the RadioHead RFM95 library. @@ -443,13 +458,13 @@ def preamble_length(self): return ((msb << 8) | lsb) & 0xFFFF @preamble_length.setter - def preamble_length(self, val): + def preamble_length(self, val: int) -> None: assert 0 <= val <= 65535 self._write_u8(_RH_RF95_REG_20_PREAMBLE_MSB, (val >> 8) & 0xFF) self._write_u8(_RH_RF95_REG_21_PREAMBLE_LSB, val & 0xFF) @property - def frequency_mhz(self): + def frequency_mhz(self) -> Literal[433.0, 915.0]: """The frequency of the radio in Megahertz. Only the allowed values for your radio must be specified (i.e. 433 vs. 915 mhz)! """ @@ -461,7 +476,7 @@ def frequency_mhz(self): return frequency @frequency_mhz.setter - def frequency_mhz(self, val): + def frequency_mhz(self, val: Literal[433.0, 915.0]) -> None: if val < 240 or val > 960: raise RuntimeError("frequency_mhz must be between 240 and 960") # Calculate FRF register 24-bit value. @@ -475,7 +490,7 @@ def frequency_mhz(self, val): self._write_u8(_RH_RF95_REG_08_FRF_LSB, lsb) @property - def tx_power(self): + def tx_power(self) -> int: """The transmit power in dBm. Can be set to a value from 5 to 23 for high power devices (RFM95/96/97/98, high_power=True) or -1 to 14 for low power devices. Only integer power levels are actually set (i.e. 12.5 @@ -490,7 +505,7 @@ def tx_power(self): return self.output_power - 1 @tx_power.setter - def tx_power(self, val): + def tx_power(self, val: int) -> None: val = int(val) if self.high_power: if val < 5 or val > 23: @@ -511,7 +526,7 @@ def tx_power(self, val): self.output_power = (val + 1) & 0x0F @property - def rssi(self): + def rssi(self) -> int: """The received strength indicator (in dBm) of the last received message.""" # Read RSSI register and convert to value using formula in datasheet. # Remember in LoRa mode the payload register changes function to RSSI! @@ -523,7 +538,7 @@ def rssi(self): return raw_rssi @property - def snr(self): + def snr(self) -> float: """The SNR (in dB) of the last received message.""" # Read SNR 0x19 register and convert to value using formula in datasheet. # SNR(dB) = PacketSnr [twos complement] / 4 @@ -533,7 +548,7 @@ def snr(self): return snr_byte / 4 @property - def signal_bandwidth(self): + def signal_bandwidth(self) -> int: """The signal bandwidth used by the radio (try setting to a higher value to increase throughput or to a lower value to increase the likelihood of successfully received payloads). Valid values are @@ -546,7 +561,7 @@ def signal_bandwidth(self): return current_bandwidth @signal_bandwidth.setter - def signal_bandwidth(self, val): + def signal_bandwidth(self, val: int) -> None: # Set signal bandwidth (set to 125000 to match RadioHead Bw125). for bw_id, cutoff in enumerate(self.bw_bins): if val <= cutoff: @@ -581,7 +596,7 @@ def signal_bandwidth(self, val): self._write_u8(0x30, 0) @property - def coding_rate(self): + def coding_rate(self) -> Literal[5, 6, 7, 8]: """The coding rate used by the radio to control forward error correction (try setting to a higher value to increase tolerance of short bursts of interference or to a lower value to increase bit @@ -591,7 +606,7 @@ def coding_rate(self): return denominator @coding_rate.setter - def coding_rate(self, val): + def coding_rate(self, val: Literal[5, 6, 7, 8]) -> None: # Set coding rate (set to 5 to match RadioHead Cr45). denominator = min(max(val, 5), 8) cr_id = denominator - 4 @@ -601,7 +616,7 @@ def coding_rate(self, val): ) @property - def spreading_factor(self): + def spreading_factor(self) -> Literal[6, 7, 8, 9, 10, 11, 12]: """The spreading factor used by the radio (try setting to a higher value to increase the receiver's ability to distinguish signal from noise or to a lower value to increase the data transmission rate). @@ -610,7 +625,7 @@ def spreading_factor(self): return sf_id @spreading_factor.setter - def spreading_factor(self, val): + def spreading_factor(self, val: Literal[6, 7, 8, 9, 10, 11, 12]) -> None: # Set spreading factor (set to 7 to match RadioHead Sf128). val = min(max(val, 6), 12) @@ -629,14 +644,14 @@ def spreading_factor(self, val): ) @property - def enable_crc(self): + def enable_crc(self) -> bool: """Set to True to enable hardware CRC checking of incoming packets. Incoming packets that fail the CRC check are not processed. Set to False to disable CRC checking and process all incoming packets.""" return (self._read_u8(_RH_RF95_REG_1E_MODEM_CONFIG2) & 0x04) == 0x04 @enable_crc.setter - def enable_crc(self, val): + def enable_crc(self, val: bool) -> None: # Optionally enable CRC checking on incoming packets. if val: self._write_u8( @@ -649,29 +664,29 @@ def enable_crc(self, val): self._read_u8(_RH_RF95_REG_1E_MODEM_CONFIG2) & 0xFB, ) - def tx_done(self): + def tx_done(self) -> bool: """Transmit status""" return (self._read_u8(_RH_RF95_REG_12_IRQ_FLAGS) & 0x8) >> 3 - def rx_done(self): + def rx_done(self) -> bool: """Receive status""" return (self._read_u8(_RH_RF95_REG_12_IRQ_FLAGS) & 0x40) >> 6 - def crc_error(self): + def crc_error(self) -> bool: """crc status""" return (self._read_u8(_RH_RF95_REG_12_IRQ_FLAGS) & 0x20) >> 5 # pylint: disable=too-many-branches def send( self, - data, + data: ReadableBuffer, *, - keep_listening=False, - destination=None, - node=None, - identifier=None, - flags=None - ): + keep_listening: bool = False, + destination: Optional[int] = None, + node: Optional[int] = None, + identifier: Optional[int] = None, + flags: Optional[int] = None + ) -> bool: """Send a string of data using the transmitter. You can only send 252 bytes at a time (limited by chip's FIFO size and appended headers). @@ -743,7 +758,7 @@ def send( self._write_u8(_RH_RF95_REG_12_IRQ_FLAGS, 0xFF) return not timed_out - def send_with_ack(self, data): + def send_with_ack(self, data: ReadableBuffer) -> bool: """Reliable Datagram mode: Send a packet with data and wait for an ACK response. The packet header is automatically generated. @@ -781,15 +796,20 @@ def send_with_ack(self, data): return got_ack def receive( - self, *, keep_listening=True, with_header=False, with_ack=False, timeout=None - ): + self, + *, + keep_listening: bool = True, + with_header: bool = False, + with_ack: bool = False, + timeout: Optional[float] = None + ) -> Optional[bytearray]: """Wait to receive a packet from the receiver. If a packet is found the payload bytes are returned, otherwise None is returned (which indicates the timeout elapsed with no reception). If keep_listening is True (the default) the chip will immediately enter listening mode after reception of a packet, otherwise it will fall back to idle mode and ignore any future reception. - All packets must have a 4-byte header for compatibilty with the + All packets must have a 4-byte header for compatibility with the RadioHead library. The header consists of 4 bytes (To,From,ID,Flags). The default setting will strip the header before returning the packet to the caller. diff --git a/requirements.txt b/requirements.txt index f675e3b..ae2e224 100644 --- a/requirements.txt +++ b/requirements.txt @@ -2,5 +2,5 @@ # # SPDX-License-Identifier: Unlicense -Adafruit-Blinka +Adafruit-Blinka>=7.2.3 adafruit-circuitpython-busdevice diff --git a/setup.py b/setup.py index 03ed1c0..4f525cd 100644 --- a/setup.py +++ b/setup.py @@ -34,7 +34,7 @@ # Author details author="Adafruit Industries", author_email="circuitpython@adafruit.com", - install_requires=["Adafruit-Blinka", "adafruit-circuitpython-busdevice"], + install_requires=["Adafruit-Blinka>=7.2.3", "adafruit-circuitpython-busdevice"], # Choose your license license="MIT", # See https://pypi.python.org/pypi?%3Aaction=list_classifiers