Skip to content

Add Missing Type Annotations #42

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Oct 10, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
127 changes: 72 additions & 55 deletions adafruit_rfm69.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,14 @@
except ImportError:
pass

try:
from typing import Callable, Optional, Type
from circuitpython_typing import WriteableBuffer, ReadableBuffer
from digitalio import DigitalInOut
from busio import SPI
except ImportError:
pass

__version__ = "0.0.0+auto.0"
__repo__ = "https://github.com/adafruit/Adafruit_CircuitPython_RFM69.git"

Expand Down Expand Up @@ -134,7 +142,7 @@
# pylint: disable=too-many-instance-attributes


def ticks_diff(ticks1, ticks2):
def ticks_diff(ticks1: int, ticks2: int) -> int:
"""Compute the signed difference between two ticks values
assuming that they are within 2**28 ticks
"""
Expand All @@ -143,7 +151,7 @@ def ticks_diff(ticks1, ticks2):
return diff


def check_timeout(flag, limit):
def check_timeout(flag: Callable, limit: float) -> bool:
"""test for timeout waiting for specified flag"""
timed_out = False
if HAS_SUPERVISOR:
Expand Down Expand Up @@ -224,7 +232,7 @@ class _RegisterBits:
# check from pylint.
# pylint: disable=protected-access

def __init__(self, address, *, offset=0, bits=1):
def __init__(self, address: int, *, offset: int = 0, bits: int = 1) -> None:
assert 0 <= offset <= 7
assert 1 <= bits <= 8
assert (offset + bits) <= 8
Expand All @@ -236,11 +244,11 @@ def __init__(self, address, *, offset=0, bits=1):
self._mask <<= offset
self._offset = offset

def __get__(self, obj, objtype):
def __get__(self, obj: Optional["RFM69"], objtype: Type["RFM69"]):
reg_value = obj._read_u8(self._address)
return (reg_value & self._mask) >> self._offset

def __set__(self, obj, val):
def __set__(self, obj: Optional["RFM69"], val: int) -> None:
reg_value = obj._read_u8(self._address)
reg_value &= ~self._mask
reg_value |= (val & 0xFF) << self._offset
Expand Down Expand Up @@ -274,19 +282,19 @@ def __set__(self, obj, val):
dio_0_mapping = _RegisterBits(_REG_DIO_MAPPING1, offset=6, bits=2)

# pylint: disable=too-many-statements
def __init__(
def __init__( # pylint: disable=invalid-name
self,
spi,
cs,
reset,
frequency,
spi: SPI,
cs: DigitalInOut,
reset: DigitalInOut,
frequency: int,
*,
sync_word=b"\x2D\xD4",
preamble_length=4,
encryption_key=None,
high_power=True,
baudrate=2000000
):
sync_word: bytes = b"\x2D\xD4",
preamble_length: int = 4,
encryption_key: Optional[bytes] = None,
high_power: bool = True,
baudrate: int = 2000000
) -> None:
self._tx_power = 13
self.high_power = high_power
# Device support SPI mode 0 (polarity & phase = 0) up to a max of 10mhz.
Expand Down Expand Up @@ -389,7 +397,9 @@ def __init__(

# pylint: disable=no-member
# Reconsider this disable when it can be tested.
def _read_into(self, address, buf, length=None):
def _read_into(
self, address: int, buf: WriteableBuffer, length: Optional[int] = None
) -> None:
# Read a number of bytes from the specified address into the provided
# buffer. If length is not specified (the default) the entire buffer
# will be filled.
Expand All @@ -401,12 +411,14 @@ def _read_into(self, address, buf, length=None):
device.write(self._BUFFER, end=1)
device.readinto(buf, end=length)

def _read_u8(self, address):
def _read_u8(self, address: int) -> int:
# Read a single byte from the provided address and return it.
self._read_into(address, self._BUFFER, length=1)
return self._BUFFER[0]

def _write_from(self, address, buf, length=None):
def _write_from(
self, address: int, buf: ReadableBuffer, length: Optional[int] = None
) -> None:
# Write a number of bytes to the provided address and taken from the
# provided buffer. If no length is specified (the default) the entire
# buffer is written.
Expand All @@ -418,7 +430,7 @@ def _write_from(self, address, buf, length=None):
device.write(self._BUFFER, end=1)
device.write(buf, end=length) # send data

def _write_u8(self, address, val):
def _write_u8(self, address: int, val: int) -> None:
# Write a byte register to the chip. Specify the 7-bit address and the
# 8-bit value to write to that address.
with self._device as device:
Expand All @@ -427,27 +439,27 @@ def _write_u8(self, address, val):
self._BUFFER[1] = val & 0xFF
device.write(self._BUFFER, end=2)

def reset(self):
def reset(self) -> None:
"""Perform a reset of the chip."""
# See section 7.2.2 of the datasheet for reset description.
self._reset.value = True
time.sleep(0.0001) # 100 us
self._reset.value = False
time.sleep(0.005) # 5 ms

def idle(self):
def idle(self) -> None:
"""Enter idle standby mode (switching off high power amplifiers if necessary)."""
# Like RadioHead library, turn off high power boost if enabled.
if self._tx_power >= 18:
self._write_u8(_REG_TEST_PA1, _TEST_PA1_NORMAL)
self._write_u8(_REG_TEST_PA2, _TEST_PA2_NORMAL)
self.operation_mode = STANDBY_MODE

def sleep(self):
def sleep(self) -> None:
"""Enter sleep mode."""
self.operation_mode = SLEEP_MODE

def listen(self):
def listen(self) -> None:
"""Listen for packets to be received by the chip. Use :py:func:`receive` to listen, wait
and retrieve packets as they're available.
"""
Expand All @@ -460,7 +472,7 @@ def listen(self):
# Enter RX mode (will clear FIFO!).
self.operation_mode = RX_MODE

def transmit(self):
def transmit(self) -> None:
"""Transmit a packet which is queued in the FIFO. This is a low level function for
entering transmit mode and more. For generating and transmitting a packet of data use
:py:func:`send` instead.
Expand All @@ -475,7 +487,7 @@ def transmit(self):
self.operation_mode = TX_MODE

@property
def temperature(self):
def temperature(self) -> float:
"""The internal temperature of the chip in degrees Celsius. Be warned this is not
calibrated or very accurate.

Expand All @@ -491,7 +503,7 @@ def temperature(self):
return 166.0 - temp

@property
def operation_mode(self):
def operation_mode(self) -> int:
"""The operation mode value. Unless you're manually controlling the chip you shouldn't
change the operation_mode with this property as other side-effects are required for
changing logical modes--use :py:func:`idle`, :py:func:`sleep`, :py:func:`transmit`,
Expand All @@ -501,7 +513,7 @@ def operation_mode(self):
return (op_mode >> 2) & 0b111

@operation_mode.setter
def operation_mode(self, val):
def operation_mode(self, val: int) -> None:
assert 0 <= val <= 4
# Set the mode bits inside the operation mode register.
op_mode = self._read_u8(_REG_OP_MODE)
Expand All @@ -521,7 +533,7 @@ def operation_mode(self, val):
raise TimeoutError("Operation Mode failed to set.")

@property
def sync_word(self):
def sync_word(self) -> bytearray:
"""The synchronization word value. This is a byte string up to 8 bytes long (64 bits)
which indicates the synchronization word for transmitted and received packets. Any
received packet which does not include this sync word will be ignored. The default value
Expand All @@ -539,7 +551,7 @@ def sync_word(self):
return sync_word

@sync_word.setter
def sync_word(self, val):
def sync_word(self, val: Optional[bytearray]) -> None:
# Handle disabling sync word when None value is set.
if val is None:
self.sync_on = 0
Expand All @@ -553,7 +565,7 @@ def sync_word(self, val):
self.sync_on = 1

@property
def preamble_length(self):
def preamble_length(self) -> int:
"""The length of the preamble for sent and received packets, an unsigned 16-bit value.
Received packets must match this length or they are ignored! Set to 4 to match the
RadioHead RFM69 library.
Expand All @@ -563,13 +575,13 @@ def preamble_length(self):
return ((msb << 8) | lsb) & 0xFFFF

@preamble_length.setter
def preamble_length(self, val):
def preamble_length(self, val: int) -> None:
assert 0 <= val <= 65535
self._write_u8(_REG_PREAMBLE_MSB, (val >> 8) & 0xFF)
self._write_u8(_REG_PREAMBLE_LSB, val & 0xFF)

@property
def frequency_mhz(self):
def frequency_mhz(self) -> float:
"""The frequency of the radio in Megahertz. Only the allowed values for your radio must be
specified (i.e. 433 vs. 915 mhz)!
"""
Expand All @@ -584,7 +596,7 @@ def frequency_mhz(self):
return frequency

@frequency_mhz.setter
def frequency_mhz(self, val):
def frequency_mhz(self, val: float) -> None:
assert 290 <= val <= 1020
# Calculate FRF register 24-bit value using section 6.2 of the datasheet.
frf = int((val * 1000000.0) / _FSTEP) & 0xFFFFFF
Expand All @@ -597,7 +609,7 @@ def frequency_mhz(self, val):
self._write_u8(_REG_FRF_LSB, lsb)

@property
def encryption_key(self):
def encryption_key(self) -> bytearray:
"""The AES encryption key used to encrypt and decrypt packets by the chip. This can be set
to None to disable encryption (the default), otherwise it must be a 16 byte long byte
string which defines the key (both the transmitter and receiver must use the same key
Expand All @@ -612,7 +624,7 @@ def encryption_key(self):
return key

@encryption_key.setter
def encryption_key(self, val):
def encryption_key(self, val: bytearray) -> None:
# Handle if unsetting the encryption key (None value).
if val is None:
self.aes_on = 0
Expand All @@ -623,7 +635,7 @@ def encryption_key(self, val):
self.aes_on = 1

@property
def tx_power(self):
def tx_power(self) -> int:
"""The transmit power in dBm. Can be set to a value from -2 to 20 for high power devices
(RFM69HCW, high_power=True) or -18 to 13 for low power devices. Only integer power
levels are actually set (i.e. 12.5 will result in a value of 12 dBm).
Expand All @@ -648,7 +660,7 @@ def tx_power(self):
raise RuntimeError("Power amplifiers in unknown state!")

@tx_power.setter
def tx_power(self, val):
def tx_power(self, val: float):
val = int(val)
# Determine power amplifier and output power values depending on
# high power state and requested power.
Expand Down Expand Up @@ -685,7 +697,7 @@ def tx_power(self, val):
self._tx_power = val

@property
def rssi(self):
def rssi(self) -> float:
"""The received strength indicator (in dBm).
May be inaccuate if not read immediatey. last_rssi contains the value read immediately
receipt of the last packet.
Expand All @@ -694,7 +706,7 @@ def rssi(self):
return -self._read_u8(_REG_RSSI_VALUE) / 2.0

@property
def bitrate(self):
def bitrate(self) -> float:
"""The modulation bitrate in bits/second (or chip rate if Manchester encoding is enabled).
Can be a value from ~489 to 32mbit/s, but see the datasheet for the exact supported
values.
Expand All @@ -704,47 +716,47 @@ def bitrate(self):
return _FXOSC / ((msb << 8) | lsb)

@bitrate.setter
def bitrate(self, val):
def bitrate(self, val: float) -> None:
assert (_FXOSC / 65535) <= val <= 32000000.0
# Round up to the next closest bit-rate value with addition of 0.5.
bitrate = int((_FXOSC / val) + 0.5) & 0xFFFF
self._write_u8(_REG_BITRATE_MSB, bitrate >> 8)
self._write_u8(_REG_BITRATE_LSB, bitrate & 0xFF)

@property
def frequency_deviation(self):
def frequency_deviation(self) -> float:
"""The frequency deviation in Hertz."""
msb = self._read_u8(_REG_FDEV_MSB)
lsb = self._read_u8(_REG_FDEV_LSB)
return _FSTEP * ((msb << 8) | lsb)

@frequency_deviation.setter
def frequency_deviation(self, val):
def frequency_deviation(self, val: float) -> None:
assert 0 <= val <= (_FSTEP * 16383) # fdev is a 14-bit unsigned value
# Round up to the next closest integer value with addition of 0.5.
fdev = int((val / _FSTEP) + 0.5) & 0x3FFF
self._write_u8(_REG_FDEV_MSB, fdev >> 8)
self._write_u8(_REG_FDEV_LSB, fdev & 0xFF)

def packet_sent(self):
def packet_sent(self) -> bool:
"""Transmit status"""
return (self._read_u8(_REG_IRQ_FLAGS2) & 0x8) >> 3

def payload_ready(self):
def payload_ready(self) -> bool:
"""Receive status"""
return (self._read_u8(_REG_IRQ_FLAGS2) & 0x4) >> 2

# pylint: disable=too-many-branches
def send(
self,
data,
data: ReadableBuffer,
*,
keep_listening=False,
destination=None,
node=None,
identifier=None,
flags=None
):
keep_listening: bool = False,
destination: Optional[int] = None,
node: Optional[int] = None,
identifier: Optional[int] = None,
flags: Optional[int] = None
) -> bool:
"""Send a string of data using the transmitter.
You can only send 60 bytes at a time
(limited by chip's FIFO size and appended headers).
Expand Down Expand Up @@ -801,7 +813,7 @@ def send(
self.idle()
return not timed_out

def send_with_ack(self, data):
def send_with_ack(self, data: int) -> bool:
"""Reliable Datagram mode:
Send a packet with data and wait for an ACK response.
The packet header is automatically generated.
Expand Down Expand Up @@ -839,8 +851,13 @@ def send_with_ack(self, data):
return got_ack

def receive(
self, *, keep_listening=True, with_ack=False, timeout=None, with_header=False
):
self,
*,
keep_listening: bool = True,
with_ack: bool = False,
timeout: Optional[float] = None,
with_header: bool = False
) -> int:
"""Wait to receive a packet from the receiver. If a packet is found the payload bytes
are returned, otherwise None is returned (which indicates the timeout elapsed with no
reception).
Expand Down
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,4 @@

Adafruit-Blinka
adafruit-circuitpython-busdevice
adafruit-circuitpython-typing