Skip to content

Add type annotations #73

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
May 9, 2022
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
128 changes: 75 additions & 53 deletions adafruit_rfm9x.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,16 @@
HAS_SUPERVISOR = True
except ImportError:
pass

try:
from typing import Optional, Type, Literal
from digitalio import DigitalInOut
from busio import SPI
from circuitpython_typing import WriteableBuffer, ReadableBuffer

except ImportError:
pass

__version__ = "0.0.0-auto.0"
__repo__ = "https://github.com/adafruit/Adafruit_CircuitPython_RFM9x.git"

Expand Down Expand Up @@ -120,7 +130,7 @@
# pylint: disable=too-many-statements


def ticks_diff(ticks1, ticks2):
def ticks_diff(ticks1: int, ticks2: int) -> int:
"""Compute the signed difference between two ticks values
assuming that they are within 2**28 ticks
"""
Expand All @@ -131,7 +141,7 @@ def ticks_diff(ticks1, ticks2):

class RFM9x:
"""Interface to a RFM95/6/7/8 LoRa radio module. Allows sending and
receivng bytes of data in long range LoRa mode at a support board frequency
receiving bytes of data in long range LoRa mode at a support board frequency
(433/915mhz).

You must specify the following parameters:
Expand Down Expand Up @@ -186,7 +196,7 @@ class _RegisterBits:
# check from pylint.
# pylint: disable=protected-access

def __init__(self, address, *, offset=0, bits=1):
def __init__(self, address: int, *, offset: int = 0, bits: int = 1) -> None:
assert 0 <= offset <= 7
assert 1 <= bits <= 8
assert (offset + bits) <= 8
Expand All @@ -198,11 +208,11 @@ def __init__(self, address, *, offset=0, bits=1):
self._mask <<= offset
self._offset = offset

def __get__(self, obj, objtype):
def __get__(self, obj: "RFM9x", objtype: Type["RFM9x"]) -> int:
reg_value = obj._read_u8(self._address)
return (reg_value & self._mask) >> self._offset

def __set__(self, obj, val):
def __set__(self, obj: "RFM9x", val: int) -> None:
reg_value = obj._read_u8(self._address)
reg_value &= ~self._mask
reg_value |= (val & 0xFF) << self._offset
Expand Down Expand Up @@ -243,21 +253,23 @@ def __set__(self, obj, val):

def __init__(
self,
spi,
cs,
reset,
frequency,
spi: SPI,
cs: DigitalInOut,
reset: DigitalInOut,
frequency: int,
*,
preamble_length=8,
high_power=True,
baudrate=5000000,
agc=False,
crc=True
):
preamble_length: int = 8,
high_power: bool = True,
baudrate: int = 5000000,
agc: bool = False,
crc: bool = True
) -> None:
self.high_power = high_power
# Device support SPI mode 0 (polarity & phase = 0) up to a max of 10mhz.
# Set Default Baudrate to 5MHz to avoid problems
self._device = spidev.SPIDevice(spi, cs, baudrate=baudrate, polarity=0, phase=0)
self._device: spidev.SPIDevice = spidev.SPIDevice(
spi, cs, baudrate=baudrate, polarity=0, phase=0
)
# Setup reset as a digital output - initially High
# This line is pulled low as an output quickly to trigger a reset.
self._reset = reset
Expand Down Expand Up @@ -363,7 +375,9 @@ def __init__(

# pylint: disable=no-member
# Reconsider pylint: disable when this can be tested
def _read_into(self, address, buf, length=None):
def _read_into(
self, address: int, buf: WriteableBuffer, length: Optional[int] = None
) -> None:
# Read a number of bytes from the specified address into the provided
# buffer. If length is not specified (the default) the entire buffer
# will be filled.
Expand All @@ -375,12 +389,14 @@ def _read_into(self, address, buf, length=None):
device.write(self._BUFFER, end=1)
device.readinto(buf, end=length)

def _read_u8(self, address):
def _read_u8(self, address: int) -> int:
# Read a single byte from the provided address and return it.
self._read_into(address, self._BUFFER, length=1)
return self._BUFFER[0]

def _write_from(self, address, buf, length=None):
def _write_from(
self, address: int, buf: ReadableBuffer, length: Optional[int] = None
) -> None:
# Write a number of bytes to the provided address and taken from the
# provided buffer. If no length is specified (the default) the entire
# buffer is written.
Expand All @@ -392,39 +408,40 @@ def _write_from(self, address, buf, length=None):
device.write(self._BUFFER, end=1)
device.write(buf, end=length)

def _write_u8(self, address, val):
def _write_u8(self, address: int, val: int) -> None:
# Write a byte register to the chip. Specify the 7-bit address and the
# 8-bit value to write to that address.
with self._device as device:
self._BUFFER[0] = (address | 0x80) & 0xFF # Set top bit to 1 to
# indicate a write.
self._BUFFER[0] = (
address | 0x80
) & 0xFF # Set top bit to 1 to indicate a write.
self._BUFFER[1] = val & 0xFF
device.write(self._BUFFER, end=2)

def reset(self):
def reset(self) -> None:
"""Perform a reset of the chip."""
# See section 7.2.2 of the datasheet for reset description.
self._reset.value = False # Set Reset Low
time.sleep(0.0001) # 100 us
self._reset.value = True # set Reset High
time.sleep(0.005) # 5 ms

def idle(self):
def idle(self) -> None:
"""Enter idle standby mode."""
self.operation_mode = STANDBY_MODE

def sleep(self):
def sleep(self) -> None:
"""Enter sleep mode."""
self.operation_mode = SLEEP_MODE

def listen(self):
def listen(self) -> None:
"""Listen for packets to be received by the chip. Use :py:func:`receive`
to listen, wait and retrieve packets as they're available.
"""
self.operation_mode = RX_MODE
self.dio0_mapping = 0b00 # Interrupt on rx done.

def transmit(self):
def transmit(self) -> None:
"""Transmit a packet which is queued in the FIFO. This is a low level
function for entering transmit mode and more. For generating and
transmitting a packet of data use :py:func:`send` instead.
Expand All @@ -433,7 +450,7 @@ def transmit(self):
self.dio0_mapping = 0b01 # Interrupt on tx done.

@property
def preamble_length(self):
def preamble_length(self) -> int:
"""The length of the preamble for sent and received packets, an unsigned
16-bit value. Received packets must match this length or they are
ignored! Set to 8 to match the RadioHead RFM95 library.
Expand All @@ -443,13 +460,13 @@ def preamble_length(self):
return ((msb << 8) | lsb) & 0xFFFF

@preamble_length.setter
def preamble_length(self, val):
def preamble_length(self, val: int) -> None:
assert 0 <= val <= 65535
self._write_u8(_RH_RF95_REG_20_PREAMBLE_MSB, (val >> 8) & 0xFF)
self._write_u8(_RH_RF95_REG_21_PREAMBLE_LSB, val & 0xFF)

@property
def frequency_mhz(self):
def frequency_mhz(self) -> Literal[433.0, 915.0]:
"""The frequency of the radio in Megahertz. Only the allowed values for
your radio must be specified (i.e. 433 vs. 915 mhz)!
"""
Expand All @@ -461,7 +478,7 @@ def frequency_mhz(self):
return frequency

@frequency_mhz.setter
def frequency_mhz(self, val):
def frequency_mhz(self, val: Literal[433.0, 915.0]) -> None:
if val < 240 or val > 960:
raise RuntimeError("frequency_mhz must be between 240 and 960")
# Calculate FRF register 24-bit value.
Expand All @@ -475,7 +492,7 @@ def frequency_mhz(self, val):
self._write_u8(_RH_RF95_REG_08_FRF_LSB, lsb)

@property
def tx_power(self):
def tx_power(self) -> int:
"""The transmit power in dBm. Can be set to a value from 5 to 23 for
high power devices (RFM95/96/97/98, high_power=True) or -1 to 14 for low
power devices. Only integer power levels are actually set (i.e. 12.5
Expand All @@ -490,7 +507,7 @@ def tx_power(self):
return self.output_power - 1

@tx_power.setter
def tx_power(self, val):
def tx_power(self, val: int) -> None:
val = int(val)
if self.high_power:
if val < 5 or val > 23:
Expand All @@ -511,7 +528,7 @@ def tx_power(self, val):
self.output_power = (val + 1) & 0x0F

@property
def rssi(self):
def rssi(self) -> int:
"""The received strength indicator (in dBm) of the last received message."""
# Read RSSI register and convert to value using formula in datasheet.
# Remember in LoRa mode the payload register changes function to RSSI!
Expand All @@ -523,7 +540,7 @@ def rssi(self):
return raw_rssi

@property
def snr(self):
def snr(self) -> float:
"""The SNR (in dB) of the last received message."""
# Read SNR 0x19 register and convert to value using formula in datasheet.
# SNR(dB) = PacketSnr [twos complement] / 4
Expand All @@ -533,7 +550,7 @@ def snr(self):
return snr_byte / 4

@property
def signal_bandwidth(self):
def signal_bandwidth(self) -> int:
"""The signal bandwidth used by the radio (try setting to a higher
value to increase throughput or to a lower value to increase the
likelihood of successfully received payloads). Valid values are
Expand All @@ -546,7 +563,7 @@ def signal_bandwidth(self):
return current_bandwidth

@signal_bandwidth.setter
def signal_bandwidth(self, val):
def signal_bandwidth(self, val: int) -> None:
# Set signal bandwidth (set to 125000 to match RadioHead Bw125).
for bw_id, cutoff in enumerate(self.bw_bins):
if val <= cutoff:
Expand Down Expand Up @@ -581,7 +598,7 @@ def signal_bandwidth(self, val):
self._write_u8(0x30, 0)

@property
def coding_rate(self):
def coding_rate(self) -> Literal[5, 6, 7, 8]:
"""The coding rate used by the radio to control forward error
correction (try setting to a higher value to increase tolerance of
short bursts of interference or to a lower value to increase bit
Expand All @@ -591,7 +608,7 @@ def coding_rate(self):
return denominator

@coding_rate.setter
def coding_rate(self, val):
def coding_rate(self, val: Literal[5, 6, 7, 8]) -> None:
# Set coding rate (set to 5 to match RadioHead Cr45).
denominator = min(max(val, 5), 8)
cr_id = denominator - 4
Expand All @@ -601,7 +618,7 @@ def coding_rate(self, val):
)

@property
def spreading_factor(self):
def spreading_factor(self) -> Literal[6, 7, 8, 9, 10, 11, 12]:
"""The spreading factor used by the radio (try setting to a higher
value to increase the receiver's ability to distinguish signal from
noise or to a lower value to increase the data transmission rate).
Expand All @@ -610,7 +627,7 @@ def spreading_factor(self):
return sf_id

@spreading_factor.setter
def spreading_factor(self, val):
def spreading_factor(self, val: Literal[6, 7, 8, 9, 10, 11, 12]) -> None:
# Set spreading factor (set to 7 to match RadioHead Sf128).
val = min(max(val, 6), 12)

Expand All @@ -629,14 +646,14 @@ def spreading_factor(self, val):
)

@property
def enable_crc(self):
def enable_crc(self) -> bool:
"""Set to True to enable hardware CRC checking of incoming packets.
Incoming packets that fail the CRC check are not processed. Set to
False to disable CRC checking and process all incoming packets."""
return (self._read_u8(_RH_RF95_REG_1E_MODEM_CONFIG2) & 0x04) == 0x04

@enable_crc.setter
def enable_crc(self, val):
def enable_crc(self, val: bool) -> None:
# Optionally enable CRC checking on incoming packets.
if val:
self._write_u8(
Expand All @@ -649,29 +666,29 @@ def enable_crc(self, val):
self._read_u8(_RH_RF95_REG_1E_MODEM_CONFIG2) & 0xFB,
)

def tx_done(self):
def tx_done(self) -> bool:
"""Transmit status"""
return (self._read_u8(_RH_RF95_REG_12_IRQ_FLAGS) & 0x8) >> 3

def rx_done(self):
def rx_done(self) -> bool:
"""Receive status"""
return (self._read_u8(_RH_RF95_REG_12_IRQ_FLAGS) & 0x40) >> 6

def crc_error(self):
def crc_error(self) -> bool:
"""crc status"""
return (self._read_u8(_RH_RF95_REG_12_IRQ_FLAGS) & 0x20) >> 5

# pylint: disable=too-many-branches
def send(
self,
data,
data: ReadableBuffer,
*,
keep_listening=False,
keep_listening: Optional[int] = False,
destination=None,
node=None,
identifier=None,
flags=None
):
) -> bool:
"""Send a string of data using the transmitter.
You can only send 252 bytes at a time
(limited by chip's FIFO size and appended headers).
Expand Down Expand Up @@ -743,7 +760,7 @@ def send(
self._write_u8(_RH_RF95_REG_12_IRQ_FLAGS, 0xFF)
return not timed_out

def send_with_ack(self, data):
def send_with_ack(self, data) -> bool:
"""Reliable Datagram mode:
Send a packet with data and wait for an ACK response.
The packet header is automatically generated.
Expand Down Expand Up @@ -781,15 +798,20 @@ def send_with_ack(self, data):
return got_ack

def receive(
self, *, keep_listening=True, with_header=False, with_ack=False, timeout=None
):
self,
*,
keep_listening: Optional[int] = True,
with_header: bool = False,
with_ack: bool = False,
timeout: Optional[float] = None
) -> Optional[bytearray]:
"""Wait to receive a packet from the receiver. If a packet is found the payload bytes
are returned, otherwise None is returned (which indicates the timeout elapsed with no
reception).
If keep_listening is True (the default) the chip will immediately enter listening mode
after reception of a packet, otherwise it will fall back to idle mode and ignore any
future reception.
All packets must have a 4-byte header for compatibilty with the
All packets must have a 4-byte header for compatibility with the
RadioHead library.
The header consists of 4 bytes (To,From,ID,Flags). The default setting will strip
the header before returning the packet to the caller.
Expand Down
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,5 @@
#
# SPDX-License-Identifier: Unlicense

Adafruit-Blinka
Adafruit-Blinka>=7.2.3
adafruit-circuitpython-busdevice
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
# Author details
author="Adafruit Industries",
author_email="[email protected]",
install_requires=["Adafruit-Blinka", "adafruit-circuitpython-busdevice"],
install_requires=["Adafruit-Blinka>=7.2.3", "adafruit-circuitpython-busdevice"],
# Choose your license
license="MIT",
# See https://pypi.python.org/pypi?%3Aaction=list_classifiers
Expand Down