Skip to content

Commit 9b32459

Browse files
committed
lib/opta: Add support for running on CPython.
Signed-off-by: iabdalkader <[email protected]>
1 parent 8312406 commit 9b32459

File tree

2 files changed

+71
-15
lines changed

2 files changed

+71
-15
lines changed

lib/opta/example.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@
1414
level=logging.INFO # Switch to DEBUG to see raw commands
1515
)
1616

17+
# On CPython, something like the following should be used:
18+
# opta = opta.Opta(bus_id=3, pin_id=("/dev/gpiochip1", 5))
1719
opta = opta.Opta(bus_id=3)
1820

1921
# enum_devices initializes the bus, resets all expansions, and returns a list of

lib/opta/opta.py

Lines changed: 69 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,23 @@
55
# file, You can obtain one at https://mozilla.org/MPL/2.0/.
66
import struct
77
import logging
8-
from time import sleep_ms
9-
from machine import I2C
10-
from machine import Pin
11-
from micropython import const
8+
from time import sleep
9+
import sys
10+
11+
_is_micropython = sys.implementation.name == "micropython"
12+
13+
if _is_micropython:
14+
from machine import Pin
15+
from machine import I2C
16+
from micropython import const
17+
else:
18+
import gpiod
19+
from gpiod.line import Direction
20+
from gpiod.line import Value
21+
from smbus2 import SMBus, i2c_msg
22+
def const(x):
23+
return x
24+
1225

1326
_MIN_ADDRESS = const(0x0B)
1427
_TMP_ADDRESS = const(0x0A)
@@ -70,6 +83,47 @@
7083
_CHANNEL_TYPES = const(("adc", "dac", "rtd", "pwm", "hiz", "din"))
7184

7285

86+
class IOPin:
87+
def __init__(self, pin_id):
88+
if _is_micropython:
89+
self.pin = Pin(pin_id, Pin.IN, Pin.PULL_UP)
90+
else:
91+
self.pin = gpiod.request_lines(
92+
pin_id[0],
93+
consumer="Opta",
94+
config={pin_id[1]: gpiod.LineSettings(direction=Direction.INPUT)},
95+
)
96+
97+
def read(self):
98+
if _is_micropython:
99+
return self.pin.value()
100+
else:
101+
return self.pin.get_values()[0] == Value.ACTIVE
102+
103+
104+
class I2CBus:
105+
def __init__(self, bus_id, freq):
106+
if _is_micropython:
107+
self.bus = I2C(bus_id, freq=freq)
108+
else:
109+
self.bus = SMBus(bus_id)
110+
111+
def read(self, addr, buf):
112+
if _is_micropython:
113+
self.bus.readfrom_into(addr, buf)
114+
else:
115+
msg = i2c_msg.read(addr, len(buf))
116+
self.bus.i2c_rdwr(msg)
117+
buf[:] = msg.buf[0:len(buf)]
118+
119+
def write(self, addr, buf):
120+
if _is_micropython:
121+
self.bus.writeto(addr, buf)
122+
else:
123+
msg = i2c_msg.write(addr, buf)
124+
self.bus.i2c_rdwr(msg)
125+
126+
73127
class Expansion:
74128
def __init__(self, opta, type, addr, name):
75129
self.opta = opta
@@ -274,7 +328,7 @@ def get_bool(k, d):
274328
if "default_value" in kwargs:
275329
value = kwargs["default_value"]
276330
self.opta._cmd(self.addr, _CMD_SET_ANALOG_DAC_DEF, "<BHB", channel, value, 1)
277-
sleep_ms(250) # DAC requires at leas 250ms to update after a write.
331+
sleep(0.250) # DAC requires at leas 250ms to update after a write.
278332
elif channel_type == "din":
279333
deb_mode = kwargs.get("debounce_mode", "simple")
280334
self.opta._cmd(
@@ -309,18 +363,18 @@ def get_bool(k, d):
309363

310364

311365
class Opta:
312-
def __init__(self, bus_id, freq=400_000, det=None):
366+
def __init__(self, bus_id, freq=400_000, pin_id="BUS_DETECT"):
313367
"""
314368
Initializes an Opta controller.
315369
316370
Parameters:
317371
- bus_id : The I2C bus identifier.
318372
- freq : I2C bus frequency (default=400_000).
319-
- det : GPIO pin used for bus detection (default is a PULL_UP input pin named "BUS_DETECT").
373+
- pin_id : GPIO pin ID used for bus detection (default="BUS_DETECT").
320374
"""
321-
self.bus = I2C(bus_id, freq=freq)
375+
self.pin = IOPin(pin_id)
376+
self.bus = I2CBus(bus_id, freq)
322377
self.cmd_buf = memoryview(bytearray(256 + 2))
323-
self.det = Pin("BUS_DETECT", Pin.IN, Pin.PULL_UP) if det is None else det
324378
self.exp_types = {
325379
0x02: ("digital", "Opta Digital Mechanical"),
326380
0x03: ("digital", "Opta Digital Solid State"),
@@ -336,14 +390,14 @@ def _log_enabled(self, level):
336390
return logging.getLogger().isEnabledFor(level)
337391

338392
def _bus_read(self, addr, buf):
339-
self.bus.readfrom_into(addr, buf)
393+
self.bus.read(addr, buf)
340394
if self._log_enabled(logging.DEBUG):
341395
self._log_debug("Recv: " + " ".join(["%02X" % (a) for a in buf]))
342396

343397
def _bus_write(self, addr, buf):
344398
if self._log_enabled(logging.DEBUG):
345399
self._log_debug("Send: " + " ".join(["%02X" % (a) for a in buf]))
346-
self.bus.writeto(addr, buf)
400+
self.bus.write(addr, buf)
347401

348402
def _crc8(self, buf, poly=0x07, crc=0x00):
349403
for byte in buf:
@@ -382,7 +436,7 @@ def _cmd(self, addr, cmd, fmt=None, *args):
382436

383437
def _reset_bus(self, addr):
384438
self._cmd(addr, _CMD_CHIP_RESET, "B", 0x56)
385-
sleep_ms(2000)
439+
sleep(2)
386440

387441
def _set_address(self, addr, addr_new=None):
388442
if addr_new is not None:
@@ -413,9 +467,9 @@ def enum_devices(self):
413467

414468
addr = _MAX_ADDRESS
415469
# Assign temp I2C addresses to expansions.
416-
while not self.det.value():
470+
while not self.pin.read():
417471
self._set_address(0x0A, addr_new=addr)
418-
sleep_ms(100)
472+
sleep(0.100)
419473
try:
420474
xaddr, xtype = self._set_address(addr)
421475
if xaddr == addr:
@@ -428,7 +482,7 @@ def enum_devices(self):
428482
# Assign final I2C addresses to expansions.
429483
for addr_new in range(_MIN_ADDRESS, _MIN_ADDRESS + addr - _MAX_ADDRESS):
430484
self._set_address(addr - 1, addr_new)
431-
sleep_ms(100)
485+
sleep(0.100)
432486
try:
433487
xaddr, xtype = self._set_address(addr_new)
434488
addr -= 1

0 commit comments

Comments
 (0)