Skip to content

lib/opta: Add support for running on CPython. #14

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Dec 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions lib/opta/example.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
level=logging.INFO # Switch to DEBUG to see raw commands
)

# On CPython, something like the following should be used:
# opta = opta.Opta(bus_id=3, pin_id=("/dev/gpiochip1", 5))
opta = opta.Opta(bus_id=3)

# enum_devices initializes the bus, resets all expansions, and returns a list of
Expand Down
85 changes: 70 additions & 15 deletions lib/opta/opta.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,24 @@
# file, You can obtain one at https://mozilla.org/MPL/2.0/.
import struct
import logging
from time import sleep_ms
from machine import I2C
from machine import Pin
from micropython import const
from time import sleep
import sys

_is_micropython = sys.implementation.name == "micropython"

if _is_micropython:
from machine import Pin
from machine import I2C
from micropython import const
else:
import gpiod
from gpiod.line import Direction
from gpiod.line import Value
from smbus2 import SMBus, i2c_msg

def const(x):
return x


_MIN_ADDRESS = const(0x0B)
_TMP_ADDRESS = const(0x0A)
Expand Down Expand Up @@ -70,6 +84,47 @@
_CHANNEL_TYPES = const(("adc", "dac", "rtd", "pwm", "hiz", "din"))


class IOPin:
def __init__(self, pin_id):
if _is_micropython:
self.pin = Pin(pin_id, Pin.IN, Pin.PULL_UP)
else:
self.pin = gpiod.request_lines(
pin_id[0],
consumer="Opta",
config={pin_id[1]: gpiod.LineSettings(direction=Direction.INPUT)},
)

def read(self):
if _is_micropython:
return self.pin.value()
else:
return self.pin.get_values()[0] == Value.ACTIVE


class I2CBus:
def __init__(self, bus_id, freq):
if _is_micropython:
self.bus = I2C(bus_id, freq=freq)
else:
self.bus = SMBus(bus_id)

def read(self, addr, buf):
if _is_micropython:
self.bus.readfrom_into(addr, buf)
else:
msg = i2c_msg.read(addr, len(buf))
self.bus.i2c_rdwr(msg)
buf[:] = msg.buf[0:len(buf)]

def write(self, addr, buf):
if _is_micropython:
self.bus.writeto(addr, buf)
else:
msg = i2c_msg.write(addr, buf)
self.bus.i2c_rdwr(msg)


class Expansion:
def __init__(self, opta, type, addr, name):
self.opta = opta
Expand Down Expand Up @@ -274,7 +329,7 @@ def get_bool(k, d):
if "default_value" in kwargs:
value = kwargs["default_value"]
self.opta._cmd(self.addr, _CMD_SET_ANALOG_DAC_DEF, "<BHB", channel, value, 1)
sleep_ms(250) # DAC requires at leas 250ms to update after a write.
sleep(0.250) # DAC requires at leas 250ms to update after a write.
elif channel_type == "din":
deb_mode = kwargs.get("debounce_mode", "simple")
self.opta._cmd(
Expand Down Expand Up @@ -309,18 +364,18 @@ def get_bool(k, d):


class Opta:
def __init__(self, bus_id, freq=400_000, det=None):
def __init__(self, bus_id, freq=400_000, pin_id="BUS_DETECT"):
"""
Initializes an Opta controller.

Parameters:
- bus_id : The I2C bus identifier.
- freq : I2C bus frequency (default=400_000).
- det : GPIO pin used for bus detection (default is a PULL_UP input pin named "BUS_DETECT").
- pin_id : GPIO pin ID used for bus detection (default="BUS_DETECT").
"""
self.bus = I2C(bus_id, freq=freq)
self.pin = IOPin(pin_id)
self.bus = I2CBus(bus_id, freq)
self.cmd_buf = memoryview(bytearray(256 + 2))
self.det = Pin("BUS_DETECT", Pin.IN, Pin.PULL_UP) if det is None else det
self.exp_types = {
0x02: ("digital", "Opta Digital Mechanical"),
0x03: ("digital", "Opta Digital Solid State"),
Expand All @@ -336,14 +391,14 @@ def _log_enabled(self, level):
return logging.getLogger().isEnabledFor(level)

def _bus_read(self, addr, buf):
self.bus.readfrom_into(addr, buf)
self.bus.read(addr, buf)
if self._log_enabled(logging.DEBUG):
self._log_debug("Recv: " + " ".join(["%02X" % (a) for a in buf]))

def _bus_write(self, addr, buf):
if self._log_enabled(logging.DEBUG):
self._log_debug("Send: " + " ".join(["%02X" % (a) for a in buf]))
self.bus.writeto(addr, buf)
self.bus.write(addr, buf)

def _crc8(self, buf, poly=0x07, crc=0x00):
for byte in buf:
Expand Down Expand Up @@ -382,7 +437,7 @@ def _cmd(self, addr, cmd, fmt=None, *args):

def _reset_bus(self, addr):
self._cmd(addr, _CMD_CHIP_RESET, "B", 0x56)
sleep_ms(2000)
sleep(2)

def _set_address(self, addr, addr_new=None):
if addr_new is not None:
Expand Down Expand Up @@ -413,9 +468,9 @@ def enum_devices(self):

addr = _MAX_ADDRESS
# Assign temp I2C addresses to expansions.
while not self.det.value():
while not self.pin.read():
self._set_address(0x0A, addr_new=addr)
sleep_ms(100)
sleep(0.100)
try:
xaddr, xtype = self._set_address(addr)
if xaddr == addr:
Expand All @@ -428,7 +483,7 @@ def enum_devices(self):
# Assign final I2C addresses to expansions.
for addr_new in range(_MIN_ADDRESS, _MIN_ADDRESS + addr - _MAX_ADDRESS):
self._set_address(addr - 1, addr_new)
sleep_ms(100)
sleep(0.100)
try:
xaddr, xtype = self._set_address(addr_new)
addr -= 1
Expand Down
Loading