diff --git a/.github/workflows/python-linter.yml b/.github/workflows/python-linter.yml index a372aec..7a5ea9a 100644 --- a/.github/workflows/python-linter.yml +++ b/.github/workflows/python-linter.yml @@ -34,5 +34,5 @@ jobs: - name: '😾 Lint with flake8' run: | - flake8 --count --ignore=C901 --select=E9,F63,F7,F82 --exclude=manifest.py --show-source --statistics lib/ - flake8 --count --ignore=C901 --max-complexity=15 --max-line-length=120 --exclude=manifest.py --statistics lib/ + flake8 --count --ignore=C901,E221,E203 --select=E9,F63,F7,F82 --exclude=manifest.py --show-source --statistics lib/ + flake8 --count --ignore=C901,E221,E203 --max-complexity=15 --max-line-length=120 --exclude=manifest.py --statistics lib/ diff --git a/lib/opta/example.py b/lib/opta/example.py new file mode 100644 index 0000000..e4bfae5 --- /dev/null +++ b/lib/opta/example.py @@ -0,0 +1,91 @@ +# This file is part of the blueprint package. +# Copyright (c) 2024 Arduino SA +# This Source Code Form is subject to the terms of the Mozilla Public +# License, v. 2.0. If a copy of the MPL was not distributed with this +# file, You can obtain one at https://mozilla.org/MPL/2.0/. +import opta +import logging + + +if __name__ == "__main__": + logging.basicConfig( + datefmt="%H:%M:%S", + format="%(asctime)s.%(msecs)03d %(message)s", + level=logging.INFO # Switch to DEBUG to see raw commands + ) + + opta = opta.Opta(bus_id=3) + + # enum_devices initializes the bus, resets all expansions, and returns a list of + # detected expansions. NOTE: keep a reference to the list of expansion for later + # use, as every time this function is called it restarts the enumeration process. + for e in opta.enum_devices(): + print("") + logging.info(f"Expansion type: {e.type} address: 0x{e.addr:02X} name: {e.name}") + + # Read firmware version. + major, minor, patch = e.firmware_version() + logging.info(f"Firmware Version Major: {major} Minor: {minor} Patch: {patch}") + + # Read product ID. + pid = e.product_id() + logging.info("Product ID bytes: " + ", ".join(["0x%02X" % a for a in pid[0:8]])) + + if e.type == "digital": + # Write digital pins. If the default state and timeout (in milliseconds) are + # also specified, the pins will revert to the specified default state after + # the timeout expires. + e.digital(pins=0b10101010, default=0b01010101, timeout=3000) + + # If no args are specified, digital() returns all digital pins. + pins = e.digital() + logging.info(f"Digital pins: 0b{pins:08b}") + + # Read analog pins. + logging.info("Analog pin [0 ]: %d" % e.analog(channel=0)) + logging.info("Analog pins [0..7]: " + str(e.analog())) + + if e.type == "analog": + # Configure LEDs on Opta Analog + e.digital(pins=0b10011001) + + # Configure analog channels. Note almost all of the possible args are used here + # for demonstration purposes, however only a few are actually required (such as + # the channel type and mode). Most of the other args have reasonable defaults. + + # Configure channel (0) as PWM. + e.analog(channel=0, channel_type="pwm", period=1000, duty=50, default_period=500) + + # Configure channel (2) as DAC. + e.analog(channel=2, channel_type="dac", channel_mode="voltage", value=7540) + + # Configure channel (3) as ADC. + e.analog(channel=3, channel_type="adc", channel_mode="voltage", pulldown=True, + rejection=False, diagnostic=False, average=0, secondary=False) + + # Configure channel (4) as RTD. + e.analog(channel=4, channel_type="rtd", use_3_wire=False, current_ma=1.2, update_time=0) + + # Configure channel (5) as digital input. + e.analog(channel=5, channel_type="din", comparator=True, inverted=False, filtered=True, + scale=False, sink=1, threshold=9, debounce_mode="simple", debounce_time=24) + + # Read ADC channel (3). + # This should print 65535 if channels 2 and 3 are connected. + logging.info("ADC channel [3 ]: %d" % e.analog(channel=3)) + + # Read all analog channels. + logging.info("Analog channels [0..7]: " + str(e.analog())) + + # Read RTD channel (4). + logging.info("RTD channel [4 ]: %.1f ohm" % e.analog(channel=4)) + + # Read DIN channel (5). + logging.info("DIN channel [5 ]: %d" % e.analog(channel=5)) + + # Read all analog channels. + logging.info("Analog channels [0..7]: " + str(e.analog())) + + # Re-configure channel (2) as DAC. + e.analog(channel=2, channel_type="dac", channel_mode="voltage", value=3770) + logging.info("ADC channel [3 ]: %d" % e.analog(channel=3)) diff --git a/lib/opta/manifest.py b/lib/opta/manifest.py new file mode 100644 index 0000000..cd8384e --- /dev/null +++ b/lib/opta/manifest.py @@ -0,0 +1,6 @@ +metadata( + description="Modules for Arduino Opta.", + version="0.0.1", +) + +module("opta.py") diff --git a/lib/opta/opta.py b/lib/opta/opta.py new file mode 100644 index 0000000..29dc8ab --- /dev/null +++ b/lib/opta/opta.py @@ -0,0 +1,444 @@ +# This file is part of the blueprint package. +# Copyright (c) 2024 Arduino SA +# This Source Code Form is subject to the terms of the Mozilla Public +# License, v. 2.0. If a copy of the MPL was not distributed with this +# file, You can obtain one at https://mozilla.org/MPL/2.0/. +import struct +import logging +from time import sleep_ms +from machine import I2C +from machine import Pin +from micropython import const + +_MIN_ADDRESS = const(0x0B) +_TMP_ADDRESS = const(0x0A) +_MAX_ADDRESS = const(0x0B + 0x0A) + +# Header and CRC length. +_CMD_HDR_LEN = const(0x03) +_CMD_CRC_LEN = const(0x01) + +# Command direction (SET == 1, GET == 2) +_CMD_DIR_SET = const(0x01) +_CMD_DIR_GET = const(0x02) + +# Commands are encoded as follows: +# [direction, command opcode, response length, ack required] +_CMD_CHIP_RESET = const((_CMD_DIR_SET, 0x01, 0, False)) +_CMD_SET_ADDRESS = const((_CMD_DIR_SET, 0x02, 0, False)) +_CMD_GET_ADDRESS = const((_CMD_DIR_GET, 0x03, 2, False)) + +# Misc commands such as product ID, firmware etc... +_CMD_GET_PRODUCT_ID = const((_CMD_DIR_GET, 0x25, 33, False)) +_CMD_GET_FW_VERSION = const((_CMD_DIR_GET, 0x16, 3, False)) +_CMD_SET_BOOTLOADER = const((_CMD_DIR_SET, 0xF3, 0, False)) + +# Flash commands. +_CMD_SET_FLASH_WRITE = const((_CMD_DIR_SET, 0x17, 0, False)) +_CMD_GET_FLASH_READ = const((_CMD_DIR_GET, 0x18, 32, False)) + +# Digital pins commands. +_CMD_SET_DIGITAL_PIN = const((_CMD_DIR_SET, 0x06, 0, False)) +_CMD_GET_DIGITAL_BUS = const((_CMD_DIR_GET, 0x04, 2, False)) +_CMD_SET_DIGITAL_DEF = const((_CMD_DIR_SET, 0x08, 0, False)) +_CMD_SET_DIGITAL_BUS_OA = const((_CMD_DIR_SET, 0x15, 0, True)) + +# Analog channels commands. +_CMD_CFG_ANALOG_ADC = const((_CMD_DIR_SET, 0x09, 0, True)) +_CMD_CFG_ANALOG_DIN = const((_CMD_DIR_SET, 0x11, 0, True)) +_CMD_CFG_ANALOG_PWM = const((_CMD_DIR_SET, 0x13, 0, True)) +_CMD_CFG_ANALOG_DAC = const((_CMD_DIR_SET, 0x0C, 0, True)) +_CMD_CFG_ANALOG_RTD = const((_CMD_DIR_SET, 0x0E, 0, True)) +_CMD_CFG_ANALOG_HIZ = const((_CMD_DIR_SET, 0x24, 0, True)) +_CMD_SET_ANALOG_DAC = const((_CMD_DIR_SET, 0x0D, 0, True)) +_CMD_SET_ANALOG_RTD_TIM = const((_CMD_DIR_SET, 0x10, 0, True)) + +# Read analog channels (Analog pin, ADC, RTD, Digital In) +_CMD_GET_ANALOG_PIN = const((_CMD_DIR_GET, 0x05, 2, False)) +_CMD_GET_ANALOG_PIN_ALL = const((_CMD_DIR_GET, 0x07, 32, False)) +_CMD_GET_ANALOG_ADC = const((_CMD_DIR_GET, 0x0A, 3, False)) +_CMD_GET_ANALOG_ADC_ALL = const((_CMD_DIR_GET, 0x0B, 16, False)) +_CMD_GET_ANALOG_RTD = const((_CMD_DIR_GET, 0x0F, 5, False)) +_CMD_GET_ANALOG_DIN = const((_CMD_DIR_GET, 0x12, 1, False)) + +# Default analog channels values and timeout. +_CMD_SET_ANALOG_PWM_DEF = const((_CMD_DIR_SET, 0x33, 0, True)) +_CMD_SET_ANALOG_DAC_DEF = const((_CMD_DIR_SET, 0x3D, 0, True)) +_CMD_SET_ANALOG_TIMEOUT = const((_CMD_DIR_SET, 0x31, 0, True)) + +_CHANNEL_MODES = const((None, "voltage", "current")) +_CHANNEL_TYPES = const(("adc", "dac", "rtd", "pwm", "hiz", "din")) + + +class Expansion: + def __init__(self, opta, type, addr, name): + self.opta = opta + self.type = type + self.addr = addr + self.name = name + self.channels = {} + + def product_id(self): + """ + Returns the product ID bytes of the expansion. + """ + return self.opta._cmd(self.addr, _CMD_GET_PRODUCT_ID) + + def firmware_version(self): + """ + Returns the firmware version of the expansion. + """ + return self.opta._cmd(self.addr, _CMD_GET_FW_VERSION) + + def _flash(self, address, size=0, data=None): + """ + Reads from or writes to the flash memory at the specified address. + + NOTE: This should be used for production purposes only! + + Parameters: + - address : The memory address to read from or write to. + - size : Number of bytes to read from the flash memory. + - data : Bytes to write to the flash memory. + + Returns: + Data read from the flash memory as bytes, if reading. Returns None if writing. + """ + size = size if data is None else len(data) + if size < 0 or size > 32: + raise RuntimeError("Maximum flash read/write size is 32") + if data is None: + resp = self.opta._cmd(self.addr, _CMD_GET_FLASH_READ, " 7): + raise ValueError("Invalid channel specified") + if self.type == "digital" and (channel < 0 or channel > 16): + raise ValueError("Invalid channel specified") + + # Set default analog channels timeout. + if timeout is not None: + if self.type != "analog": + raise RuntimeError("Function is not supported on digital expansions") + return self.opta._cmd(self.addr, _CMD_SET_ANALOG_TIMEOUT, " 4: + raise ValueError("Invalid PWM channel specified") + if "period" not in kwargs: + raise ValueError("PWM requires a period argument") + period = kwargs["period"] + duty = int(kwargs.get("duty", 50) / 100 * period) + self.opta._cmd(self.addr, _CMD_CFG_ANALOG_PWM, "