Skip to content
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.

Commit e4ca73e

Browse files
committedOct 16, 2024·
lib: Add Arduino Opta expansion protocol support.
1 parent fe67529 commit e4ca73e

File tree

3 files changed

+541
-0
lines changed

3 files changed

+541
-0
lines changed
 

‎lib/opta/example.py

Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
# This file is part of the blueprint package.
2+
# Copyright (c) 2024 Arduino SA
3+
# This Source Code Form is subject to the terms of the Mozilla Public
4+
# License, v. 2.0. If a copy of the MPL was not distributed with this
5+
# file, You can obtain one at https://mozilla.org/MPL/2.0/.
6+
import opta
7+
import logging
8+
9+
10+
if __name__ == "__main__":
11+
logging.basicConfig(
12+
datefmt="%H:%M:%S",
13+
format="%(asctime)s.%(msecs)03d %(message)s",
14+
level=logging.INFO # Switch to DEBUG to see raw commands
15+
)
16+
17+
opta = opta.Opta(bus_id=3)
18+
19+
# enum_devices initializes the bus, resets all expansions, and returns a list of
20+
# detected expansions. NOTE: keep a reference to the list of expansion for later
21+
# use, as every time this function is called it restarts the enumeration process.
22+
for e in opta.enum_devices():
23+
print("")
24+
logging.info(f"Expansion type: {e.type} address: 0x{e.addr:02X} name: {e.name}")
25+
26+
# Read firmware version.
27+
major, minor, patch = e.firmware_version()
28+
logging.info(f"Firmware Version Major: {major} Minor: {minor} Patch: {patch}")
29+
30+
# Read product ID.
31+
pid = e.product_id()
32+
logging.info("Product ID bytes: " + ", ".join(["0x%02X" % a for a in pid[0:8]]))
33+
34+
if e.type == "digital":
35+
# Write digital pins. If the default state and timeout (in milliseconds) are
36+
# also specified, the pins will revert to the specified default state after
37+
# the timeout expires.
38+
e.digital(pins=0b10101010, default=0b01010101, timeout=3000)
39+
40+
# If no args are specified, digital() returns all digital pins.
41+
pins = e.digital()
42+
logging.info(f"Digital pins: 0b{pins:08b}")
43+
44+
# Read analog pins.
45+
logging.info("Analog pin [0 ]: %d" % e.analog(channel=0))
46+
logging.info("Analog pins [0..7]: " + str(e.analog()))
47+
48+
if e.type == "analog":
49+
# Configure LEDs on Opta Analog
50+
e.digital(pins=0b10011001)
51+
52+
# Configure analog channels. Note almost all of the possible args are used here
53+
# for demonstration purposes, however only a few are actually required (such as
54+
# the channel type and mode). Most of the other args have reasonable defaults.
55+
56+
# Configure channel (0) as PWM.
57+
e.analog(channel=0, channel_type="pwm", period=1000, duty=50, default_period=500)
58+
59+
# Configure channel (2) as DAC.
60+
e.analog(channel=2, channel_type="dac", channel_mode="voltage", value=7540)
61+
62+
# Configure channel (3) as ADC.
63+
e.analog(channel=3, channel_type="adc", channel_mode="voltage", pulldown=True,
64+
rejection=False, diagnostic=False, average=0, secondary=False)
65+
66+
# Configure channel (4) as RTD.
67+
e.analog(channel=4, channel_type="rtd", use_3_wire=False, current_ma=1.2, update_time=0)
68+
69+
# Configure channel (5) as digital input.
70+
e.analog(channel=5, channel_type="din", comparator=True, inverted=False, filtered=True,
71+
scale=False, sink=1, threshold=9, debounce_mode="simple", debounce_time=24)
72+
73+
# Read ADC channel (3).
74+
# This should print 65535 if channels 2 and 3 are connected.
75+
logging.info("ADC channel [3 ]: %d" % e.analog(channel=3))
76+
77+
# Read all analog channels.
78+
logging.info("Analog channels [0..7]: " + str(e.analog()))
79+
80+
# Read RTD channel (4).
81+
logging.info("RTD channel [4 ]: %.1f ohm" % e.analog(channel=4))
82+
83+
# Read DIN channel (5).
84+
logging.info("DIN channel [5 ]: %d" % e.analog(channel=5))
85+
86+
# Read all analog channels.
87+
logging.info("Analog channels [0..7]: " + str(e.analog()))
88+
89+
# Re-configure channel (2) as DAC.
90+
e.analog(channel=2, channel_type="dac", channel_mode="voltage", value=3770)
91+
logging.info("ADC channel [3 ]: %d" % e.analog(channel=3))

‎lib/opta/manifest.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
metadata(
2+
description="Modules for Arduino Opta.",
3+
version="0.0.1",
4+
)
5+
6+
module("opta.py")

‎lib/opta/opta.py

Lines changed: 444 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,444 @@
1+
# This file is part of the blueprint package.
2+
# Copyright (c) 2024 Arduino SA
3+
# This Source Code Form is subject to the terms of the Mozilla Public
4+
# License, v. 2.0. If a copy of the MPL was not distributed with this
5+
# file, You can obtain one at https://mozilla.org/MPL/2.0/.
6+
import struct
7+
import logging
8+
from time import sleep_ms
9+
from machine import I2C
10+
from machine import Pin
11+
from micropython import const
12+
13+
_MIN_ADDRESS = const(0x0B)
14+
_TMP_ADDRESS = const(0x0A)
15+
_MAX_ADDRESS = const(0x0B + 0x0A)
16+
17+
# Header and CRC length.
18+
_CMD_HDR_LEN = const(0x03)
19+
_CMD_CRC_LEN = const(0x01)
20+
21+
# Command direction (SET == 1, GET == 2)
22+
_CMD_DIR_SET = const(0x01)
23+
_CMD_DIR_GET = const(0x02)
24+
25+
# Commands are encoded as follows:
26+
# [direction, command opcode, response length, ack required]
27+
_CMD_CHIP_RESET = const((_CMD_DIR_SET, 0x01, 0, False))
28+
_CMD_SET_ADDRESS = const((_CMD_DIR_SET, 0x02, 0, False))
29+
_CMD_GET_ADDRESS = const((_CMD_DIR_GET, 0x03, 2, False))
30+
31+
# Misc commands such as product ID, firmware etc...
32+
_CMD_GET_PRODUCT_ID = const((_CMD_DIR_GET, 0x25, 33, False))
33+
_CMD_GET_FW_VERSION = const((_CMD_DIR_GET, 0x16, 3, False))
34+
_CMD_SET_BOOTLOADER = const((_CMD_DIR_SET, 0xF3, 0, False))
35+
36+
# Flash commands.
37+
_CMD_SET_FLASH_WRITE = const((_CMD_DIR_SET, 0x17, 0, False))
38+
_CMD_GET_FLASH_READ = const((_CMD_DIR_GET, 0x18, 32, False))
39+
40+
# Digital pins commands.
41+
_CMD_SET_DIGITAL_PIN = const((_CMD_DIR_SET, 0x06, 0, False))
42+
_CMD_GET_DIGITAL_BUS = const((_CMD_DIR_GET, 0x04, 2, False))
43+
_CMD_SET_DIGITAL_DEF = const((_CMD_DIR_SET, 0x08, 0, False))
44+
_CMD_SET_DIGITAL_BUS_OA = const((_CMD_DIR_SET, 0x15, 0, True))
45+
46+
# Analog channels commands.
47+
_CMD_CFG_ANALOG_ADC = const((_CMD_DIR_SET, 0x09, 0, True))
48+
_CMD_CFG_ANALOG_DIN = const((_CMD_DIR_SET, 0x11, 0, True))
49+
_CMD_CFG_ANALOG_PWM = const((_CMD_DIR_SET, 0x13, 0, True))
50+
_CMD_CFG_ANALOG_DAC = const((_CMD_DIR_SET, 0x0C, 0, True))
51+
_CMD_CFG_ANALOG_RTD = const((_CMD_DIR_SET, 0x0E, 0, True))
52+
_CMD_CFG_ANALOG_HIZ = const((_CMD_DIR_SET, 0x24, 0, True))
53+
_CMD_SET_ANALOG_DAC = const((_CMD_DIR_SET, 0x0D, 0, True))
54+
_CMD_SET_ANALOG_RTD_TIM = const((_CMD_DIR_SET, 0x10, 0, True))
55+
56+
# Read analog channels (Analog pin, ADC, RTD, Digital In)
57+
_CMD_GET_ANALOG_PIN = const((_CMD_DIR_GET, 0x05, 2, False))
58+
_CMD_GET_ANALOG_PIN_ALL = const((_CMD_DIR_GET, 0x07, 32, False))
59+
_CMD_GET_ANALOG_ADC = const((_CMD_DIR_GET, 0x0A, 3, False))
60+
_CMD_GET_ANALOG_ADC_ALL = const((_CMD_DIR_GET, 0x0B, 16, False))
61+
_CMD_GET_ANALOG_RTD = const((_CMD_DIR_GET, 0x0F, 5, False))
62+
_CMD_GET_ANALOG_DIN = const((_CMD_DIR_GET, 0x12, 1, False))
63+
64+
# Default analog channels values and timeout.
65+
_CMD_SET_ANALOG_PWM_DEF = const((_CMD_DIR_SET, 0x33, 0, True))
66+
_CMD_SET_ANALOG_DAC_DEF = const((_CMD_DIR_SET, 0x3D, 0, True))
67+
_CMD_SET_ANALOG_TIMEOUT = const((_CMD_DIR_SET, 0x31, 0, True))
68+
69+
_CHANNEL_MODES = const((None, "voltage", "current"))
70+
_CHANNEL_TYPES = const(("adc", "dac", "rtd", "pwm", "hiz", "din"))
71+
72+
73+
class Expansion:
74+
def __init__(self, opta, type, addr, name):
75+
self.opta = opta
76+
self.type = type
77+
self.addr = addr
78+
self.name = name
79+
self.channels = {}
80+
81+
def product_id(self):
82+
"""
83+
Returns the product ID bytes of the expansion.
84+
"""
85+
return self.opta._cmd(self.addr, _CMD_GET_PRODUCT_ID)
86+
87+
def firmware_version(self):
88+
"""
89+
Returns the firmware version of the expansion.
90+
"""
91+
return self.opta._cmd(self.addr, _CMD_GET_FW_VERSION)
92+
93+
def _flash(self, address, size=0, data=None):
94+
"""
95+
Reads from or writes to the flash memory at the specified address.
96+
97+
NOTE: This should be used for production purposes only!
98+
99+
Parameters:
100+
- address : The memory address to read from or write to.
101+
- size : Number of bytes to read from the flash memory.
102+
- data : Bytes to write to the flash memory.
103+
104+
Returns:
105+
Data read from the flash memory as bytes, if reading. Returns None if writing.
106+
"""
107+
size = size if data is None else len(data)
108+
if size < 0 or size > 32:
109+
raise RuntimeError("Maximum flash read/write size is 32")
110+
if data is None:
111+
resp = self.opta._cmd(self.addr, _CMD_GET_FLASH_READ, "<HB", address, size)
112+
return struct.unpack("<HB32s", resp)[-1][0:size]
113+
self.opta._cmd(self.addr, _CMD_SET_FLASH_WRITE, f"<HB{size}s", address, size, data)
114+
115+
def digital(self, pins=None, timeout=None, default=0):
116+
"""
117+
Reads or writes digital pins.
118+
119+
Parameters:
120+
- pins : Digital pins mask to set. If None, returns the state of all pins.
121+
- timeout : The timeout in milliseconds, after which pins revert to the default state.
122+
- default : The default state to which the pins will revert to after the timeout expires.
123+
124+
Note:
125+
- For Opta Analog, this functions supports writing digital LEDs only.
126+
127+
Returns: The current state of the digital pins if reading, or None if setting the pins.
128+
"""
129+
if self.type == "analog":
130+
if pins is None:
131+
raise RuntimeError("Function is not supported on analog expansions")
132+
return self.opta._cmd(self.addr, _CMD_SET_DIGITAL_BUS_OA, "B", pins)
133+
if pins is None and timeout is None:
134+
return struct.unpack("<H", self.opta._cmd(self.addr, _CMD_GET_DIGITAL_BUS))[0]
135+
if pins is not None:
136+
self.opta._cmd(self.addr, _CMD_SET_DIGITAL_PIN, "B", pins)
137+
if timeout is not None:
138+
return self.opta._cmd(self.addr, _CMD_SET_DIGITAL_DEF, "<BH", default, timeout)
139+
140+
def analog(self, channel=None, channel_type=None, channel_mode=None, timeout=None, **kwargs):
141+
"""
142+
Configures or reads analog channels.
143+
144+
Parameters:
145+
- timeout : Set timeout in milliseconds after which analog channels revert to their default
146+
states set previously with analog(). Note if this is set, all other args are ignored.
147+
- channel : The channel number to configure, read, or write. If None, reads all ADC channels.
148+
- channel_type : Channel type can be "adc", "dac", "pwm", "rtd", "hiz", "din".
149+
- channel_mode : "voltage" or "current" (default="voltage" for ADC channels).
150+
151+
kwargs :
152+
hiz configuration:
153+
- no args.
154+
155+
adc configuration:
156+
- average : Number of points for moving average (0-255, default=0).
157+
- rejection : Enable rejection (default=False).
158+
- diagnostic : Enable diagnostic (default=False).
159+
- pulldown : Enable pulldown (default=True for voltage, False for current).
160+
- secondary : This ADC channel is shared with another function (default=False).
161+
162+
dac configuration:
163+
- use_slew : Enable slew rate control (default=False).
164+
- slew_rate : Slew rate if `use_slew` is True (default=0).
165+
- limit_current : Limit current (default=False).
166+
- value : Value to write to a DAC channel.
167+
- default_value: The default value to revert to, after the timeout set with timeout() expires.
168+
169+
pwm configuration:
170+
- period: PWM period in uS (for example 1000uS).
171+
- duty: high duty cycle in % (for example 50%).
172+
- default_period: The default value to revert to, after the timeout set with timeout() expires.
173+
- default_duty: The default value to revert to, after the timeout set with timeout() expires.
174+
175+
rtd configuration:
176+
- use_3_wire: 3-Wire RTD (default = False).
177+
- current_ma: RTD current in mA (default = 1.2mA).
178+
- update_time: The update time of all RDT channels.
179+
180+
din (digital input) configuration:
181+
- comparator : Enable comparator (default=True).
182+
- filtered : Select the filtered input to the comparator (default=True).
183+
- inverted : Invert the output from the digital input comparator (default=False).
184+
- scale : Comparator voltage scale (default=False).
185+
- threshold : Comparator voltage threshold (default 9).
186+
- sink : Sets the sink current in digital input logic mode (0-31, default 1)
187+
- debounce_mode : Debounce mode ("simple", "integrator" or None to disable debounce, default="simple")
188+
- debounce_time : Debounce time (0-31, default 24)
189+
190+
Returns: ADC value(s) if reading, or None if configuring a channel or setting the analog timeout.
191+
"""
192+
if channel is not None:
193+
if self.type == "analog" and (channel < 0 or channel > 7):
194+
raise ValueError("Invalid channel specified")
195+
if self.type == "digital" and (channel < 0 or channel > 16):
196+
raise ValueError("Invalid channel specified")
197+
198+
# Set default analog channels timeout.
199+
if timeout is not None:
200+
if self.type != "analog":
201+
raise RuntimeError("Function is not supported on digital expansions")
202+
return self.opta._cmd(self.addr, _CMD_SET_ANALOG_TIMEOUT, "<H", timeout)
203+
204+
# Read a all analog channels.
205+
if channel is None:
206+
fmt = "<HHHHHHHH" if self.type == "analog" else "<HHHHHHHHHHHHHHHH"
207+
cmd = _CMD_GET_ANALOG_ADC_ALL if self.type == "analog" else _CMD_GET_ANALOG_PIN_ALL
208+
return struct.unpack(fmt, self.opta._cmd(self.addr, cmd))
209+
210+
# Read a single analog channel.
211+
if channel_type is None:
212+
channel_type = self.channels.get(channel, None)
213+
if self.type == "digital":
214+
return struct.unpack("<H", self.opta._cmd(self.addr, _CMD_GET_ANALOG_PIN, "B", channel))[-1]
215+
elif channel_type == "rtd":
216+
return struct.unpack("<Bf", self.opta._cmd(self.addr, _CMD_GET_ANALOG_RTD, "B", channel))[-1]
217+
elif channel_type == "adc":
218+
return struct.unpack("<BH", self.opta._cmd(self.addr, _CMD_GET_ANALOG_ADC, "B", channel))[-1]
219+
elif channel_type == "din":
220+
return (struct.unpack("B", self.opta._cmd(self.addr, _CMD_GET_ANALOG_DIN))[-1] & channel) == channel
221+
else:
222+
raise RuntimeError("Channel is not configured or not readable")
223+
224+
# Configure a new channel
225+
if channel_type not in _CHANNEL_TYPES:
226+
raise ValueError("Invalid channel type.")
227+
if channel_mode not in _CHANNEL_MODES:
228+
raise ValueError("Invalid channel mode.")
229+
230+
def get_bool(k, d):
231+
# True is 1 and False is 2.
232+
return 1 if kwargs.get(k, d) else 2
233+
234+
if channel_type == "hiz":
235+
self.opta._cmd(self.addr, _CMD_CFG_ANALOG_HIZ, "<B", channel)
236+
elif channel_type == "rtd":
237+
self.opta._cmd(
238+
self.addr,
239+
_CMD_CFG_ANALOG_RTD,
240+
"<BBf",
241+
channel,
242+
get_bool("use_3_wire", False),
243+
kwargs.get("current_ma", 1.2),
244+
)
245+
self.opta._cmd(self.addr, _CMD_SET_ANALOG_RTD_TIM, "<H", kwargs.get("update_time", 800))
246+
elif channel_type == "adc":
247+
self.opta._cmd(
248+
self.addr,
249+
_CMD_CFG_ANALOG_ADC,
250+
"BBBBBBB",
251+
channel,
252+
channel_mode == "current",
253+
get_bool("pulldown", channel_mode == "voltage"),
254+
get_bool("rejection", False),
255+
get_bool("diagnostic", False),
256+
kwargs.get("average", 0),
257+
get_bool("secondary", False),
258+
)
259+
elif channel_type == "dac":
260+
if "value" not in kwargs:
261+
raise ValueError("DAC requires a value argument")
262+
# Configure DAC channel and update the output value.
263+
self.opta._cmd(
264+
self.addr,
265+
_CMD_CFG_ANALOG_DAC,
266+
"BBBBB",
267+
channel,
268+
channel_mode == "current",
269+
get_bool("limit_current", channel_mode == "voltage"),
270+
get_bool("slew_rate", False),
271+
kwargs.get("slew_rate", 0),
272+
)
273+
self.opta._cmd(self.addr, _CMD_SET_ANALOG_DAC, "<BHB", channel, kwargs["value"], 1)
274+
if "default_value" in kwargs:
275+
value = kwargs["default_value"]
276+
self.opta._cmd(self.addr, _CMD_SET_ANALOG_DAC_DEF, "<BHB", channel, value, 1)
277+
sleep_ms(250) # DAC requires at leas 250ms to update after a write.
278+
elif channel_type == "din":
279+
deb_mode = kwargs.get("debounce_mode", "simple")
280+
self.opta._cmd(
281+
self.addr,
282+
_CMD_CFG_ANALOG_DIN,
283+
"BBBBBBBBB",
284+
channel,
285+
get_bool("filtered", True),
286+
get_bool("inverted", False),
287+
get_bool("comparator", True),
288+
1 if (deb_mode is None or deb_mode == "simple") else 2,
289+
get_bool("scale", False),
290+
kwargs.get("threshold", 9),
291+
kwargs.get("sink", 1),
292+
kwargs.get("debounce_time", 0 if deb_mode is None else 24),
293+
)
294+
elif channel_type == "pwm":
295+
if channel > 4:
296+
raise ValueError("Invalid PWM channel specified")
297+
if "period" not in kwargs:
298+
raise ValueError("PWM requires a period argument")
299+
period = kwargs["period"]
300+
duty = int(kwargs.get("duty", 50) / 100 * period)
301+
self.opta._cmd(self.addr, _CMD_CFG_ANALOG_PWM, "<BII", channel, period, duty)
302+
if "default_period" in kwargs:
303+
period = kwargs["default_period"]
304+
duty = int(kwargs.get("default_duty", 50) / 100 * period)
305+
self.opta._cmd(self.addr, _CMD_SET_ANALOG_PWM_DEF, "<BII", channel, period, duty)
306+
307+
# Save channel type for later.
308+
self.channels[channel] = channel_type
309+
310+
311+
class Opta:
312+
def __init__(self, bus_id, freq=400_000, det=None):
313+
"""
314+
Initializes an Opta controller.
315+
316+
Parameters:
317+
- bus_id : The I2C bus identifier.
318+
- freq : I2C bus frequency (default=400_000).
319+
- det : GPIO pin used for bus detection (default is a PULL_UP input pin named "BUS_DETECT").
320+
"""
321+
self.bus = I2C(bus_id, freq=freq)
322+
self.cmd_buf = memoryview(bytearray(256 + 2))
323+
self.det = Pin("BUS_DETECT", Pin.IN, Pin.PULL_UP) if det is None else det
324+
self.exp_types = {
325+
0x02: ("digital", "Opta Digital Mechanical"),
326+
0x03: ("digital", "Opta Digital Solid State"),
327+
0x04: ("analog", "Opta Analog"),
328+
0x05: ("digital", "UNO R4 MINIMA"),
329+
}
330+
331+
def _log_debug(self, msg):
332+
# Blue protocol prints in blue
333+
logging.debug(f"\033[94m{msg}\033[0m")
334+
335+
def _log_enabled(self, level):
336+
return logging.getLogger().isEnabledFor(level)
337+
338+
def _bus_read(self, addr, buf):
339+
self.bus.readfrom_into(addr, buf)
340+
if self._log_enabled(logging.DEBUG):
341+
self._log_debug("Recv: " + " ".join(["%02X" % (a) for a in buf]))
342+
343+
def _bus_write(self, addr, buf):
344+
if self._log_enabled(logging.DEBUG):
345+
self._log_debug("Send: " + " ".join(["%02X" % (a) for a in buf]))
346+
self.bus.writeto(addr, buf)
347+
348+
def _crc8(self, buf, poly=0x07, crc=0x00):
349+
for byte in buf:
350+
crc ^= byte
351+
for bit in range(8):
352+
crc = (crc << 1) ^ poly if crc & 0x80 else crc << 1
353+
return crc & 0xFF
354+
355+
def _cmd(self, addr, cmd, fmt=None, *args):
356+
plen = 0 if fmt is None else struct.calcsize(fmt)
357+
struct.pack_into("BBB", self.cmd_buf, 0, cmd[0], cmd[1], plen)
358+
# Pack args (if any)
359+
if fmt is not None:
360+
struct.pack_into(fmt, self.cmd_buf, 3, *args)
361+
self.cmd_buf[_CMD_HDR_LEN + plen] = self._crc8(self.cmd_buf[0 : _CMD_HDR_LEN + plen])
362+
self._bus_write(addr, self.cmd_buf[0 : _CMD_HDR_LEN + _CMD_CRC_LEN + plen])
363+
if cmd[2] == 0 and not cmd[3]:
364+
return
365+
# Command has a response payload or an ACK.
366+
plen = 0 if cmd[3] else cmd[2]
367+
# Expected CMD and DIR
368+
exp_cmd = 0x20 if cmd[3] else cmd[1]
369+
exp_dir = 0x03 if cmd[0] == _CMD_DIR_GET else 0x04
370+
371+
# Read and check response
372+
self._bus_read(addr, self.cmd_buf[0 : _CMD_HDR_LEN + _CMD_CRC_LEN + plen])
373+
# Check CMD, ARG and LEN
374+
if self.cmd_buf[0] != exp_dir or self.cmd_buf[1] != exp_cmd or self.cmd_buf[2] != plen:
375+
raise ValueError("Unexpected response: " + "".join("%02X" % b for b in self.cmd_buf[0:3]))
376+
377+
# Check CRC
378+
if self.cmd_buf[_CMD_HDR_LEN + plen] != self._crc8(self.cmd_buf[0 : _CMD_HDR_LEN + plen]):
379+
raise ValueError("Invalid CRC")
380+
if not cmd[3]:
381+
return self.cmd_buf[_CMD_HDR_LEN : _CMD_HDR_LEN + plen]
382+
383+
def _reset_bus(self, addr):
384+
self._cmd(addr, _CMD_CHIP_RESET, "B", 0x56)
385+
sleep_ms(2000)
386+
387+
def _set_address(self, addr, addr_new=None):
388+
if addr_new is not None:
389+
if self._log_enabled(logging.DEBUG):
390+
self._log_debug(f"set address: 0x{addr:02X} new_address: 0x{addr_new:02X}")
391+
return self._cmd(addr, _CMD_SET_ADDRESS, "B", addr_new)
392+
return self._cmd(addr, _CMD_GET_ADDRESS)
393+
394+
def enum_devices(self):
395+
"""
396+
Initializes the bus, resets all expansions, and returns a list of detected expansions.
397+
Returns: A list of detected expansions on the bus.
398+
"""
399+
detected = False
400+
expansion_list = []
401+
402+
# Reset the first, last or temp device on the bus.
403+
for addr in [_MIN_ADDRESS, _MAX_ADDRESS, _TMP_ADDRESS]:
404+
try:
405+
self._reset_bus(addr)
406+
detected = True
407+
break
408+
except Exception as e:
409+
logging.debug(e)
410+
411+
if not detected:
412+
raise RuntimeError("No expansions detected on the bus")
413+
414+
addr = _MAX_ADDRESS
415+
# Assign temp I2C addresses to expansions.
416+
while not self.det.value():
417+
self._set_address(0x0A, addr_new=addr)
418+
sleep_ms(100)
419+
try:
420+
xaddr, xtype = self._set_address(addr)
421+
if xaddr == addr:
422+
addr += 1
423+
if self._log_enabled(logging.DEBUG):
424+
self._log_debug(f"phase 1 type: 0x{xtype:02X} address: 0x{xaddr:02X}")
425+
except Exception as e:
426+
logging.debug(e)
427+
428+
# Assign final I2C addresses to expansions.
429+
for addr_new in range(_MIN_ADDRESS, _MIN_ADDRESS + addr - _MAX_ADDRESS):
430+
self._set_address(addr - 1, addr_new)
431+
sleep_ms(100)
432+
try:
433+
xaddr, xtype = self._set_address(addr_new)
434+
addr -= 1
435+
if self._log_enabled(logging.DEBUG):
436+
self._log_debug(f"phase 2 type: 0x{xtype:02X} address: 0x{xaddr:02X}")
437+
if xtype not in self.exp_types:
438+
raise RuntimeError("Unsupported expansion type %d" % xtype)
439+
expansion_list.append(Expansion(self, self.exp_types[xtype][0], xaddr, self.exp_types[xtype][1]))
440+
441+
except Exception as e:
442+
logging.debug(e)
443+
444+
return expansion_list

0 commit comments

Comments
 (0)
Please sign in to comment.