Skip to content
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.

Commit 5ecbc00

Browse files
committedOct 15, 2024·
lib: Add Arduino Opta expansion protocol support.
1 parent fe67529 commit 5ecbc00

File tree

3 files changed

+559
-0
lines changed

3 files changed

+559
-0
lines changed
 

‎lib/opta/example.py

Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,96 @@
1+
# This file is part of the blueprint package.
2+
# Copyright (c) 2024 Arduino SA
3+
# This Source Code Form is subject to the terms of the Mozilla Public
4+
# License, v. 2.0. If a copy of the MPL was not distributed with this
5+
# file, You can obtain one at https://mozilla.org/MPL/2.0/.
6+
import opta
7+
import logging
8+
9+
10+
if __name__ == "__main__":
11+
logging.basicConfig(
12+
datefmt="%H:%M:%S",
13+
format="%(asctime)s.%(msecs)03d %(message)s",
14+
level=logging.INFO # Switch to DEBUG to see raw commands
15+
)
16+
17+
opta = opta.Opta(bus_id=3)
18+
19+
# enum_devices initializes the bus, resets all expansions, and returns a list of
20+
# detected expansions. NOTE: keep a reference to the list of expansion for later
21+
# use, as every time this function is called it restarts the enumeration process.
22+
for e in opta.enum_devices():
23+
print("")
24+
logging.info(f"Expansion type: {e.type} address: 0x{e.addr:02X} name: {e.name}")
25+
26+
# Read firmware version.
27+
major, minor, patch = e.firmware_version()
28+
logging.info(f"Firmware Version Major: {major} Minor: {minor} Patch: {patch}")
29+
30+
# Read product ID.
31+
pid = e.product_id()
32+
logging.info("Product ID bytes: " + ", ".join(["0x%02X" % a for a in pid[0:8]]))
33+
34+
if e.type == "digital":
35+
# Write digital pins. If the default state and timeout (in milliseconds) are
36+
# also specified, the pins will revert to the specified default state after
37+
# the timeout expires.
38+
e.digital(pins=0b10101010, default=0b01010101, timeout=3000)
39+
40+
# If no args are specified, digital() returns all digital pins.
41+
pins = e.digital()
42+
logging.info(f"Digital pins: 0b{pins:08b}")
43+
44+
# Read analog pins.
45+
logging.info("Analog pin [0 ]: %d" % e.analog(channel=0))
46+
logging.info("Analog pins [0..7]: " + str(e.analog()))
47+
48+
if e.type == "analog":
49+
# Configure LEDs on Opta Analog
50+
e.digital(pins=0b10011001)
51+
52+
# Configure analog channels. Note almost all of the possible args are used here
53+
# for demonstration purposes, however only a few are actually required (such as
54+
# the channel type and mode), most of the other args have reasonable defaults.
55+
56+
# Configure channel (0) as PWM.
57+
e.analog(channel=0, channel_type="pwm", period=1000, duty=50, default_period=500)
58+
59+
# Configure channel (2) as DAC.
60+
e.analog(channel=2, channel_type="dac", channel_mode="voltage", value=7540)
61+
62+
# Configure channel (3) as ADC.
63+
e.analog(channel=3, channel_type="adc", channel_mode="voltage",
64+
pulldown=True, rejection=False, diagnostic=False, average=0, secondary=False)
65+
66+
# Configure channel (4) as RTD.
67+
e.analog(channel=4, channel_type="rtd", use_3_wire=False, current_ma=1.2, update_time=0)
68+
69+
# Configure channel (5) as digital input.
70+
e.analog(channel=5, channel_type="din", comparator=True, inverted=False, filtered=True,
71+
scale=False, sink=1, threshold=9, debounce_mode="simple", debounce_time=24)
72+
73+
# Read ADC channel (3).
74+
# This should print 65535 if channels 2 and 3 are connected.
75+
logging.info("ADC channel [3 ]: %d" % e.analog(channel=3))
76+
77+
# Read all analog channels.
78+
logging.info("Analog channels [0..7]: " + str(e.analog()))
79+
80+
# Read RTD channel (4).
81+
logging.info("RTD channel [4 ]: %.1f ohm" % e.analog(channel=4))
82+
83+
# Read DIN channel (5).
84+
logging.info("DIN channel [5 ]: %d" % e.analog(channel=5))
85+
86+
# Read all analog channels.
87+
logging.info("Analog channels [0..7]: " + str(e.analog()))
88+
89+
# Configure channel (2) as DAC.
90+
e.analog(channel=2, channel_type="dac", channel_mode="voltage", value=3770)
91+
92+
# Configure channel (3) as ADC.
93+
e.analog(channel=3, channel_type="adc", channel_mode="voltage", pulldown=True,
94+
rejection=False, diagnostic=False, average=0, secondary=False)
95+
96+
logging.info("ADC channel [3 ]: %d" % e.analog(channel=3))

‎lib/opta/manifest.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
metadata(
2+
description="Modules for Arduino Opta.",
3+
version="0.0.1",
4+
)
5+
6+
module("opta.py")

‎lib/opta/opta.py

Lines changed: 457 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,457 @@
1+
# This file is part of the blueprint package.
2+
# Copyright (c) 2024 Arduino SA
3+
# This Source Code Form is subject to the terms of the Mozilla Public
4+
# License, v. 2.0. If a copy of the MPL was not distributed with this
5+
# file, You can obtain one at https://mozilla.org/MPL/2.0/.
6+
import struct
7+
import logging
8+
from time import sleep_ms
9+
from machine import I2C
10+
from machine import Pin
11+
from micropython import const
12+
13+
_MIN_ADDRESS = const(0x0B)
14+
_TMP_ADDRESS = const(0x0A)
15+
_MAX_ADDRESS = const(0x0B + 0x0A)
16+
17+
# Header and CRC length.
18+
_CMD_HDR_LEN = const(0x03)
19+
_CMD_CRC_LEN = const(0x01)
20+
21+
# Command direction (SET == 1, GET == 2)
22+
_CMD_DIR_SET = const(0x01)
23+
_CMD_DIR_GET = const(0x02)
24+
25+
# Commands are encoded as follows:
26+
# [direction, command opcode, response length, ack required]
27+
_CMD_CHIP_RESET = const((_CMD_DIR_SET, 0x01, 0, False))
28+
_CMD_SET_ADDRESS = const((_CMD_DIR_SET, 0x02, 0, False))
29+
_CMD_GET_ADDRESS = const((_CMD_DIR_GET, 0x03, 2, False))
30+
31+
# Misc commands such as product ID, firmware etc...
32+
_CMD_GET_PRODUCT_ID = const((_CMD_DIR_GET, 0x25, 33, False))
33+
_CMD_GET_FW_VERSION = const((_CMD_DIR_GET, 0x16, 3, False))
34+
_CMD_SET_BOOTLOADER = const((_CMD_DIR_SET, 0xF3, 0, False))
35+
36+
# Flash commands.
37+
_CMD_SET_FLASH_WRITE = const((_CMD_DIR_SET, 0x17, 0, False))
38+
_CMD_GET_FLASH_READ = const((_CMD_DIR_GET, 0x18, 32, False))
39+
40+
# Digital pins commands.
41+
_CMD_SET_DIGITAL_PIN = const((_CMD_DIR_SET, 0x06, 0, False))
42+
_CMD_GET_DIGITAL_BUS = const((_CMD_DIR_GET, 0x04, 2, False))
43+
_CMD_SET_DIGITAL_DEF = const((_CMD_DIR_SET, 0x08, 0, False))
44+
_CMD_SET_DIGITAL_BUS_OA = const((_CMD_DIR_SET, 0x15, 0, True))
45+
46+
# Analog channels commands.
47+
_CMD_CFG_ANALOG_ADC = const((_CMD_DIR_SET, 0x09, 0, True))
48+
_CMD_CFG_ANALOG_DIN = const((_CMD_DIR_SET, 0x11, 0, True))
49+
_CMD_CFG_ANALOG_PWM = const((_CMD_DIR_SET, 0x13, 0, True))
50+
_CMD_CFG_ANALOG_DAC = const((_CMD_DIR_SET, 0x0C, 0, True))
51+
_CMD_CFG_ANALOG_RTD = const((_CMD_DIR_SET, 0x0E, 0, True))
52+
_CMD_CFG_ANALOG_HIZ = const((_CMD_DIR_SET, 0x24, 0, True))
53+
_CMD_SET_ANALOG_DAC = const((_CMD_DIR_SET, 0x0D, 0, True))
54+
_CMD_SET_ANALOG_RTD_TIM = const((_CMD_DIR_SET, 0x10, 0, True))
55+
56+
# Read analog channels (Analog pin, ADC, RTD, Digital In)
57+
_CMD_GET_ANALOG_PIN = const((_CMD_DIR_GET, 0x05, 2, False))
58+
_CMD_GET_ANALOG_PIN_ALL = const((_CMD_DIR_GET, 0x07, 32, False))
59+
_CMD_GET_ANALOG_ADC = const((_CMD_DIR_GET, 0x0A, 3, False))
60+
_CMD_GET_ANALOG_ADC_ALL = const((_CMD_DIR_GET, 0x0B, 16, False))
61+
_CMD_GET_ANALOG_RTD = const((_CMD_DIR_GET, 0x0F, 5, False))
62+
_CMD_GET_ANALOG_DIN = const((_CMD_DIR_GET, 0x12, 1, False))
63+
64+
# Default analog channels values and timeout.
65+
_CMD_SET_ANALOG_PWM_DEF = const((_CMD_DIR_SET, 0x33, 0, True))
66+
_CMD_SET_ANALOG_DAC_DEF = const((_CMD_DIR_SET, 0x3D, 0, True))
67+
_CMD_SET_ANALOG_TIMEOUT = const((_CMD_DIR_SET, 0x31, 0, True))
68+
69+
_CHANNEL_MODES = const((None, "voltage", "current"))
70+
_CHANNEL_TYPES = const(("adc", "dac", "rtd", "pwm", "hiz", "din"))
71+
72+
73+
class Expansion:
74+
def __init__(self, opta, type, addr, name):
75+
self.opta = opta
76+
self.type = type
77+
self.addr = addr
78+
self.name = name
79+
self.channels = {}
80+
81+
def product_id(self):
82+
"""
83+
Returns the product ID bytes of the expansion.
84+
"""
85+
return self.opta._cmd(self.addr, _CMD_GET_PRODUCT_ID)
86+
87+
def firmware_version(self):
88+
"""
89+
Returns the firmware version of the expansion.
90+
"""
91+
return self.opta._cmd(self.addr, _CMD_GET_FW_VERSION)
92+
93+
def _flash(self, address, size=0, data=None):
94+
"""
95+
Reads from or writes to the flash memory at the specified address.
96+
97+
NOTE: This should be used for production purposes only!
98+
99+
Parameters:
100+
- address : The memory address to read from or write to.
101+
- size : Number of bytes to read from the flash memory.
102+
- data : Bytes to write to the flash memory.
103+
104+
Returns:
105+
Data read from the flash memory as bytes, if reading. Returns None if writing.
106+
"""
107+
size = size if data is None else len(data)
108+
if size < 0 or size > 32:
109+
raise RuntimeError("Maximum flash read/write size is 32")
110+
if data is None:
111+
args = struct.pack("<HB", address, size)
112+
return struct.unpack("<HB32s", self.opta._cmd(self.addr, _CMD_GET_FLASH_READ, args))[-1][0:size]
113+
args = struct.pack(f"<HB{size}s", self.mem_buf, 0, address, size, data)
114+
self.opta._cmd(self.addr, _CMD_SET_FLASH_WRITE, args)
115+
116+
def digital(self, pins=None, timeout=None, default=0):
117+
"""
118+
Reads or writes digital pins.
119+
120+
Parameters:
121+
- pins : Digital pins mask to set. If None, returns the state of all pins.
122+
- timeout : The timeout in milliseconds, after which pins revert to the default state.
123+
- default : The default state to which the pins will revert to after the timeout expires.
124+
125+
Note:
126+
- For Opta Analog, this functions supports writing digital LEDs only.
127+
128+
Returns: The current state of the digital pins if reading, or None if setting the pins.
129+
"""
130+
if self.type == "analog":
131+
if pins is None:
132+
raise RuntimeError("Function is not supported on analog expansions")
133+
return self.opta._cmd(self.addr, _CMD_SET_DIGITAL_BUS_OA, pins)
134+
if pins is None and timeout is None:
135+
return struct.unpack("<H", self.opta._cmd(self.addr, _CMD_GET_DIGITAL_BUS))[0]
136+
if pins is not None:
137+
self.opta._cmd(self.addr, _CMD_SET_DIGITAL_PIN, pins)
138+
if timeout is not None:
139+
return self.opta._cmd(self.addr, _CMD_SET_DIGITAL_DEF, struct.pack("<BH", default, timeout))
140+
141+
def analog(self, channel=None, channel_type=None, channel_mode=None, timeout=None, **kwargs):
142+
"""
143+
Configures or reads analog channels.
144+
145+
Parameters:
146+
- timeout : Set timeout in milliseconds after which analog channels revert to their default
147+
states set previously with analog(). Note if this is set, all other args are ignored.
148+
- channel : The channel number to configure, read, or write. If None, reads all ADC channels.
149+
- channel_type : Channel type can be "adc", "dac", "pwm", "rtd", "hiz", "din".
150+
- channel_mode : "voltage" or "current" (default="voltage" for ADC channels).
151+
152+
kwargs :
153+
hiz configuration:
154+
- no args.
155+
156+
adc configuration:
157+
- average : Number of points for moving average (0-255, default=0).
158+
- rejection : Enable rejection (default=False).
159+
- diagnostic : Enable diagnostic (default=False).
160+
- pulldown : Enable pulldown (default=True for voltage, False for current).
161+
- secondary : This ADC channel is shared with another function (default=False).
162+
163+
dac configuration:
164+
- use_slew : Enable slew rate control (default=False).
165+
- slew_rate : Slew rate if `use_slew` is True (default=0).
166+
- limit_current : Limit current (default=False).
167+
- value : Value to write to a DAC channel.
168+
- default_value: The default value to revert to, after the timeout set with timeout() expires.
169+
170+
pwm configuration:
171+
- period: PWM period in uS (for example 1000uS).
172+
- duty: high duty cycle in % (for example 50%).
173+
- default_period: The default value to revert to, after the timeout set with timeout() expires.
174+
- default_duty: The default value to revert to, after the timeout set with timeout() expires.
175+
176+
rtd configuration:
177+
- use_3_wire: 3-Wire RTD (default = False).
178+
- current_ma: RTD current in mA (default = 1.2mA).
179+
- update_time: The update time of all RDT channels.
180+
181+
din (digital input) configuration:
182+
- comparator : Enable comparator (default=True).
183+
- filtered : Select the filtered input to the comparator (default=True).
184+
- inverted : Invert the output from the digital input comparator (default=False).
185+
- scale : Comparator voltage scale (default=False).
186+
- threshold : Comparator voltage threshold (default 9).
187+
- sink : Sets the sink current in digital input logic mode (0-31, default 1)
188+
- debounce_mode : Debounce mode ("simple", "integrator" or None to disable debounce, default="simple")
189+
- debounce_time : Debounce time (0-31, default 24)
190+
191+
Returns: ADC value(s) if reading, or None if configuring a channel or setting the analog timeout.
192+
"""
193+
if channel is not None:
194+
if channel < 0:
195+
raise ValueError("Invalid channel specified")
196+
if self.type == "analog" and channel > 7:
197+
raise ValueError("Invalid channel specified")
198+
if self.type == "digital" and channel > 16:
199+
raise ValueError("Invalid channel specified")
200+
201+
# Set default analog channels timeout.
202+
if timeout is not None:
203+
if self.type != "analog":
204+
raise RuntimeError("Function is not supported on digital expansions")
205+
return self.opta._cmd(self.addr, _CMD_SET_ANALOG_TIMEOUT, struct.pack("<H", timeout))
206+
207+
# Read a all analog channels.
208+
if channel is None:
209+
fmt = "<HHHHHHHH" if self.type == "analog" else "<HHHHHHHHHHHHHHHH"
210+
cmd = _CMD_GET_ANALOG_ADC_ALL if self.type == "analog" else _CMD_GET_ANALOG_PIN_ALL
211+
return struct.unpack(fmt, self.opta._cmd(self.addr, cmd))
212+
213+
# Read a single analog channel.
214+
if channel_type is None:
215+
channel_type = self.channels.get(channel, None)
216+
if self.type == "digital":
217+
return struct.unpack("<H", self.opta._cmd(self.addr, _CMD_GET_ANALOG_PIN, channel))[-1]
218+
elif channel_type == "rtd":
219+
return struct.unpack("<Bf", self.opta._cmd(self.addr, _CMD_GET_ANALOG_RTD, channel))[-1]
220+
elif channel_type == "adc":
221+
return struct.unpack("<BH", self.opta._cmd(self.addr, _CMD_GET_ANALOG_ADC, channel))[-1]
222+
elif channel_type == "din":
223+
return (struct.unpack("B", self.opta._cmd(self.addr, _CMD_GET_ANALOG_DIN))[-1] & channel) == channel
224+
else:
225+
raise RuntimeError("Channel is not configured or not readable")
226+
227+
# Configure a new channel
228+
if channel_type not in _CHANNEL_TYPES:
229+
raise ValueError("Invalid channel type.")
230+
if channel_mode not in _CHANNEL_MODES:
231+
raise ValueError("Invalid channel mode.")
232+
233+
def get_bool(k, d):
234+
# True is 1 and False is 2.
235+
return 1 if kwargs.get(k, d) else 2
236+
237+
if channel_type == "hiz":
238+
self.opta._cmd(self.addr, _CMD_CFG_ANALOG_HIZ, struct.pack("<B", channel))
239+
elif channel_type == "rtd":
240+
self.opta._cmd(
241+
self.addr,
242+
_CMD_CFG_ANALOG_RTD,
243+
struct.pack("<BBf", channel, get_bool("use_3_wire", False), kwargs.get("current_ma", 1.2)),
244+
)
245+
self.opta._cmd(self.addr, _CMD_SET_ANALOG_RTD_TIM, struct.pack("<H", kwargs.get("update_time", 800)))
246+
elif channel_type == "pwm":
247+
if channel > 4:
248+
raise ValueError("Invalid PWM channel specified")
249+
if "period" not in kwargs:
250+
raise ValueError("PWM requires a period argument")
251+
period = kwargs["period"]
252+
def_period = kwargs.get("default_period", None)
253+
self.opta._cmd(
254+
self.addr,
255+
_CMD_CFG_ANALOG_PWM,
256+
struct.pack("<BII", channel, period, int(kwargs.get("duty", 50) / 100 * period))
257+
)
258+
if "default_period" in kwargs:
259+
self.opta._cmd(
260+
self.addr,
261+
_CMD_SET_ANALOG_PWM_DEF,
262+
struct.pack("<BII", channel, def_period, int(kwargs.get("default_duty", 50) / 100 * def_period))
263+
)
264+
elif channel_type == "adc":
265+
self.opta._cmd(
266+
self.addr,
267+
_CMD_CFG_ANALOG_ADC,
268+
struct.pack(
269+
"BBBBBBB",
270+
channel,
271+
channel_mode == "current",
272+
get_bool("pulldown", channel_mode == "voltage"),
273+
get_bool("rejection", False),
274+
get_bool("diagnostic", False),
275+
kwargs.get("average", 0),
276+
get_bool("secondary", False),
277+
)
278+
)
279+
elif channel_type == "dac":
280+
if "value" not in kwargs:
281+
raise ValueError("DAC requires a value argument")
282+
# Configure DAC channel and update the output value.
283+
self.opta._cmd(
284+
self.addr,
285+
_CMD_CFG_ANALOG_DAC,
286+
struct.pack(
287+
"BBBBB",
288+
channel,
289+
channel_mode == "current",
290+
get_bool("limit_current", channel_mode == "voltage"),
291+
get_bool("slew_rate", False),
292+
kwargs.get("slew_rate", 0),
293+
)
294+
)
295+
self.opta._cmd(self.addr, _CMD_SET_ANALOG_DAC, struct.pack("<BHB", channel, kwargs["value"], 1))
296+
if "default_value" in kwargs:
297+
self.opta._cmd(
298+
self.addr,
299+
_CMD_SET_ANALOG_DAC_DEF,
300+
struct.pack("<BHB", channel, kwargs["default_value"], 1)
301+
)
302+
sleep_ms(200) # DAC requires at leas 200ms to update after a write.
303+
elif channel_type == "din":
304+
deb_mode = kwargs.get("debounce_mode", "simple")
305+
self.opta._cmd(
306+
self.addr,
307+
_CMD_CFG_ANALOG_DIN,
308+
struct.pack(
309+
"BBBBBBBBB",
310+
channel,
311+
get_bool("filtered", True),
312+
get_bool("inverted", False),
313+
get_bool("comparator", True),
314+
1 if (deb_mode is None or deb_mode == "simple") else 2,
315+
get_bool("scale", False),
316+
kwargs.get("threshold", 9),
317+
kwargs.get("sink", 1),
318+
kwargs.get("debounce_time", 0 if deb_mode is None else 24),
319+
)
320+
)
321+
# Save channel type for later.
322+
self.channels[channel] = channel_type
323+
324+
325+
class Opta:
326+
def __init__(self, bus_id, freq=400_000, det=None):
327+
"""
328+
Initializes an Opta controller.
329+
330+
Parameters:
331+
- bus_id : The I2C bus identifier.
332+
- freq : I2C bus frequency (default=400_000).
333+
- det : GPIO pin used for bus detection (default is a PULL_UP input pin named "BUS_DETECT").
334+
"""
335+
self.bus = I2C(bus_id, freq=freq)
336+
self.cmd_buf = memoryview(bytearray(256 + 2))
337+
self.mem_buf = memoryview(bytearray(struct.calcsize(f"<HB{32}s")))
338+
self.det = Pin("BUS_DETECT", Pin.IN, Pin.PULL_UP) if det is None else det
339+
self.exp_types = {
340+
0x02: ("digital", "Opta Digital Mechanical"),
341+
0x03: ("digital", "Opta Digital Solid State"),
342+
0x04: ("analog", "Opta Analog"),
343+
0x05: ("digital", "UNO R4 MINIMA"),
344+
}
345+
346+
def _log_debug(self, msg):
347+
# Blue protocol prints in blue
348+
logging.debug(f"\033[94m{msg}\033[0m")
349+
350+
def _log_enabled(self, level):
351+
return logging.getLogger().isEnabledFor(level)
352+
353+
def _bus_read(self, addr, buf):
354+
self.bus.readfrom_into(addr, buf)
355+
if self._log_enabled(logging.DEBUG):
356+
self._log_debug("Recv: " + " ".join(["%02X" % (a) for a in buf]))
357+
358+
def _bus_write(self, addr, buf):
359+
if self._log_enabled(logging.DEBUG):
360+
self._log_debug("Send: " + " ".join(["%02X" % (a) for a in buf]))
361+
self.bus.writeto(addr, buf)
362+
363+
def _crc8(self, buf, poly=0x07, crc=0x00):
364+
for byte in buf:
365+
crc ^= byte
366+
for bit in range(8):
367+
crc = (crc << 1) ^ poly if crc & 0x80 else crc << 1
368+
return crc & 0xFF
369+
370+
def _cmd(self, addr, cmd, arg=None):
371+
plen = 0 if arg is None else 1 if isinstance(arg, int) else len(arg)
372+
fmt = "BBB" if plen == 0 else "BBBB" if plen == 1 else f"BBB{plen}s"
373+
struct.pack_into(fmt, self.cmd_buf, 0, cmd[0], cmd[1], plen, arg)
374+
self.cmd_buf[_CMD_HDR_LEN + plen] = self._crc8(self.cmd_buf[0 : _CMD_HDR_LEN + plen])
375+
self._bus_write(addr, self.cmd_buf[0 : _CMD_HDR_LEN + _CMD_CRC_LEN + plen])
376+
if cmd[2] == 0 and not cmd[3]:
377+
return
378+
# Command has a response payload or an ACK.
379+
plen = 0 if cmd[3] else cmd[2]
380+
# Expected CMD and DIR
381+
exp_cmd = 0x20 if cmd[3] else cmd[1]
382+
exp_dir = 0x03 if cmd[0] == _CMD_DIR_GET else 0x04
383+
384+
# Read and check response
385+
self._bus_read(addr, self.cmd_buf[0 : _CMD_HDR_LEN + _CMD_CRC_LEN + plen])
386+
# Check CMD, ARG and LEN
387+
if self.cmd_buf[0] != exp_dir or self.cmd_buf[1] != exp_cmd or self.cmd_buf[2] != plen:
388+
raise ValueError("Unexpected response: " + "".join("%02X" % b for b in self.cmd_buf[0:3]))
389+
390+
# Check CRC
391+
if self.cmd_buf[_CMD_HDR_LEN + plen] != self._crc8(self.cmd_buf[0 : _CMD_HDR_LEN + plen]):
392+
raise ValueError("Invalid CRC")
393+
if not cmd[3]:
394+
return self.cmd_buf[_CMD_HDR_LEN : _CMD_HDR_LEN + plen]
395+
396+
def _reset_bus(self, addr):
397+
self._cmd(addr, _CMD_CHIP_RESET, 0x56)
398+
sleep_ms(2000)
399+
400+
def _set_address(self, addr, addr_new=None):
401+
if addr_new is not None:
402+
if self._log_enabled(logging.DEBUG):
403+
self._log_debug(f"set address: 0x{addr:02X} new_address: 0x{addr_new:02X}")
404+
return self._cmd(addr, _CMD_SET_ADDRESS, addr_new)
405+
return self._cmd(addr, _CMD_GET_ADDRESS)
406+
407+
def enum_devices(self):
408+
"""
409+
Initializes the bus, resets all expansions, and returns a list of detected expansions.
410+
Returns: A list of detected expansions on the bus.
411+
"""
412+
detected = False
413+
expansion_list = []
414+
415+
# Reset the first, last or temp device on the bus.
416+
for addr in [_MIN_ADDRESS, _MAX_ADDRESS, _TMP_ADDRESS]:
417+
try:
418+
self._reset_bus(addr)
419+
detected = True
420+
break
421+
except Exception as e:
422+
logging.debug(e)
423+
424+
if not detected:
425+
raise RuntimeError("No expansions detected on the bus")
426+
427+
addr = _MAX_ADDRESS
428+
# Assign temp I2C addresses to expansions.
429+
while not self.det.value():
430+
self._set_address(0x0A, addr_new=addr)
431+
sleep_ms(100)
432+
try:
433+
xaddr, xtype = self._set_address(addr)
434+
if xaddr == addr:
435+
addr += 1
436+
if self._log_enabled(logging.DEBUG):
437+
self._log_debug(f"phase 1 type: 0x{xtype:02X} address: 0x{xaddr:02X}")
438+
except Exception as e:
439+
logging.debug(e)
440+
441+
# Assign final I2C addresses to expansions.
442+
for addr_new in range(_MIN_ADDRESS, _MIN_ADDRESS + addr - _MAX_ADDRESS):
443+
self._set_address(addr - 1, addr_new)
444+
sleep_ms(100)
445+
try:
446+
xaddr, xtype = self._set_address(addr_new)
447+
addr -= 1
448+
if self._log_enabled(logging.DEBUG):
449+
self._log_debug(f"phase 2 type: 0x{xtype:02X} address: 0x{xaddr:02X}")
450+
if xtype not in self.exp_types:
451+
raise RuntimeError("Unsupported expansion type %d" % xtype)
452+
expansion_list.append(Expansion(self, self.exp_types[xtype][0], xaddr, self.exp_types[xtype][1]))
453+
454+
except Exception as e:
455+
logging.debug(e)
456+
457+
return expansion_list

0 commit comments

Comments
 (0)
Please sign in to comment.