Skip to content

Make full use of all hardware acceptance masks and filters #20

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
133 changes: 80 additions & 53 deletions adafruit_mcp2515/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,15 +99,26 @@
# Filters & Masks
_RXM0SIDH = const(0x20)
_RXM1SIDH = const(0x24)
MASKS = [_RXM0SIDH, _RXM1SIDH]

_RXF0SIDH = const(0x00)
_RXF1SIDH = const(0x04)
_RXF2SIDH = const(0x08)
_RXF3SIDH = const(0x10)
_RXF4SIDH = const(0x14)
_RXF5SIDH = const(0x18)
FILTERS = [[_RXF0SIDH, _RXF1SIDH], [_RXF2SIDH, _RXF3SIDH, _RXF4SIDH, _RXF5SIDH]]

MASKS_FILTERS = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This data should be an instance member (self.masks_filters), because otherwise a system with more than one MCP2515 chip in it would not function properly.

_RXM0SIDH: [None, {
_RXF0SIDH: None,
_RXF1SIDH: None
}],
_RXM1SIDH: [None, {
_RXF2SIDH: None,
_RXF3SIDH: None,
_RXF4SIDH: None,
_RXF5SIDH: None
}]
}
# bits/flags
_RX0IF = const(0x01)
_RX1IF = const(0x02)
Expand Down Expand Up @@ -309,8 +320,6 @@ def __init__(
self._tx_buffers = []
self._rx0_overflow = False
self._rx1_overflow = False
self._masks_in_use = []
self._filters_in_use = [[], []]
self._mode = None
self._bus_state = BusState.ERROR_ACTIVE
self._baudrate = baudrate
Expand Down Expand Up @@ -538,13 +547,8 @@ def _start_transmit(self, tx_buffer):
in_end=1,
)

def _set_filter_register(self, filter_index, mask, extended):
filter_reg_addr = FILTERS[filter_index]
self._write_id_to_register(filter_reg_addr, mask, extended)

def _set_mask_register(self, mask_index, mask, extended):
mask_reg_addr = MASKS[mask_index]
self._write_id_to_register(mask_reg_addr, mask, extended)
def _set_acceptance_register(self, register, value, is_extended):
self._write_id_to_register(register, value, is_extended)

@staticmethod
def _unload_ids(raw_ids):
Expand Down Expand Up @@ -774,45 +778,72 @@ def _get_bus_status(self):
else:
self._bus_state = BusState.ERROR_ACTIVE

def _create_mask(self, match):
mask = match.mask
if mask == 0:
if match.extended:
mask = EXTID_BOTTOM_29_MASK
else:
mask = STDID_BOTTOM_11_MASK

masks_used = len(self._masks_in_use)
if masks_used < len(MASKS):
next_mask_index = masks_used

self._set_mask_register(next_mask_index, mask, match.extended)
self._masks_in_use.append(MASKS[next_mask_index])
return next_mask_index

raise RuntimeError("No Masks Available")

def _create_filter(self, match, mask_index):
def _find_mask_and_filter(self, match_mask: int):
"""Finds the optimal mask and filter registers for the given mask.

next_filter_index = len(self._filters_in_use[mask_index])
if next_filter_index == len(FILTERS[mask_index]):
raise RuntimeError("No Filters Available")
Returns a tuple of (mask, filter) registers.
Returns `None` if no mask and filter are available.

filter_register = FILTERS[mask_index][next_filter_index]
Args:
mask (int): The mask value to fit.
"""

self._write_id_to_register(filter_register, match.address, match.extended)
self._filters_in_use[mask_index].append(filter_register)
mask: int = None
filt: int = None

# First try to find a matching mask with a free filter
for mask_reg, [mask_val, filts] in MASKS_FILTERS.items():
if mask_val == match_mask:
# Matching mask, look for a free filter
for filt_reg, filt_val in filts.items():
if not filt_val:
# Free filter
mask = mask_reg
filt = filt_reg
break

if mask:
# We're done here
break

if mask is None:
# Then try to find a free mask
for mask_reg, [mask_val, filts] in MASKS_FILTERS.items():
if mask_val is None:
mask = mask_reg
filt = next(iter(filts.keys()))
break

return (mask, filt) if mask is not None else None

def _create_mask_and_filter(self, match: Match):
actual_mask = match.mask
if match.mask == 0:
actual_mask = EXTID_BOTTOM_29_MASK if match.extended \
else STDID_BOTTOM_11_MASK

result = self._find_mask_and_filter(actual_mask)
if not result:
raise Exception(
"No mask and filter is available for "
f"mask 0x{actual_mask:03x}, addr 0x{match.address:03x}"
)

(mask, filt) = result
self._set_acceptance_register(mask, actual_mask, match.extended)
self._set_acceptance_register(filt, match.address, match.extended)
MASKS_FILTERS[mask][0] = actual_mask
MASKS_FILTERS[mask][1][filt] = match.address

def deinit_filtering_registers(self):
"""Clears the Receive Mask and Filter Registers"""

for mask_index, mask_reg in enumerate(MASKS):
for mask_reg, mask_data in MASKS_FILTERS.items():
self._set_register(mask_reg, 0)
mask_data[0] = None # Mask value
for filt_reg in mask_data[1]:
self._set_register(filt_reg, 0)
mask_data[1][filt_reg] = None

for filter_reg in FILTERS[mask_index]:
self._set_register(filter_reg, 0)
self._masks_in_use = []
self._filters_in_use = [[], []]

######## CANIO API METHODS #############
@property
Expand Down Expand Up @@ -905,17 +936,13 @@ def listen(self, matches=None, *, timeout: float = 10):

for match in matches:
self._dbg("match:", match)
mask_index_used = self._create_mask(match)
self._create_filter(match, mask_index=mask_index_used)

used_masks = len(self._masks_in_use)
# if matches were made and there are unused masks
# set the unused masks to prevent them from leaking packets
if len(matches) > 0 and used_masks < len(MASKS):
next_mask_index = used_masks
for idx in range(next_mask_index, len(MASKS)):
print("using unused mask index:", idx)
self._create_mask(matches[-1])
self._create_mask_and_filter(match)

for mask_reg, [mask_val, _] in MASKS_FILTERS.items():
if mask_val is None:
# Mask unused, set it to a match to prevent leakage
self._set_acceptance_register(mask_reg, match.mask, match.extended)


return Listener(self, timeout)

Expand Down