Skip to content

ussl: Add support for sign callback on MicroPython. #93

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Oct 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/client-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ jobs:
run: |
python -m pip install --upgrade pip
python -m pip install build==0.10.0 cbor2==5.4.6 M2Crypto==0.38.0 micropython-senml==0.1.0
sudo apt-get update
sudo apt-get install softhsm2 gnutls-bin libengine-pkcs11-openssl

- name: '📦 Build package'
Expand Down
20 changes: 14 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -111,17 +111,25 @@ python examples/example.py
```

## Testing on MicroPython
MicroPython currently does Not support secure elements. The username and password can be used, or the key and cert files must be stored in DER format on the filesystem. To test the client on MicroPython, first convert the key and certificate to DER, using the following commands, then copy the files to the internal storage.

#### Convert key and certificate to `.DER`
MicroPython supports both modes of authentication: basic mode, using a username and password, and mTLS with the key and certificate stored on the filesystem or a secure element (for provisioned boards). To use key and certificate files stored on the filesystem, they must first be converted to DER format. The following commands can be used to convert from PEM to DER:
```bash
openssl ec -in key.pem -out key.der -outform DER
openssl x509 -in cert.pem -out cert.der -outform DER
```

#### Run the MicroPython example script
* Set `KEY_PATH`, `CERT_PATH`, to key and certificate DER paths respectively.
* run `examples/micropython.py`
In this case `KEY_PATH`, `CERT_PATH`, can be set to the key and certificate DER paths, respectively:
```Python
KEY_PATH = "path/to/key.der"
CERT_PATH = "path/to/cert.der"
```

Alternatively, if the key and certificate are stored on the SE, their URIs can be specified in the following format:
```Python
KEY_PATH = "se05x:token=0x00000064"
CERT_PATH = "se05x:token=0x00000065"
```

With the key and certificate set, the example can be run with the following command `examples/micropython_advanced.py`

## Useful links

Expand Down
145 changes: 145 additions & 0 deletions examples/micropython_advanced.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
# This file is part of the Python Arduino IoT Cloud.
# Any copyright is dedicated to the Public Domain.
# https://creativecommons.org/publicdomain/zero/1.0/
import time
import ssl # noqa
import network
import logging
from time import strftime
from arduino_iot_cloud import ArduinoCloudClient
from arduino_iot_cloud import Location
from arduino_iot_cloud import Schedule
from arduino_iot_cloud import ColoredLight
from arduino_iot_cloud import Task
from arduino_iot_cloud import CADATA # noqa
from random import uniform
from secrets import WIFI_SSID
from secrets import WIFI_PASS
from secrets import DEVICE_ID


# Provisioned boards with secure elements can provide key and
# certificate URIs in the SE, in following format:
KEY_PATH = "se05x:token=0x00000064" # noqa
CERT_PATH = "se05x:token=0x00000065" # noqa

# Alternatively, the key and certificate files can be stored
# on the internal filesystem in DER format:
#KEY_PATH = "key.der" # noqa
#CERT_PATH = "cert.der" # noqa


def on_switch_changed(client, value):
# This is a write callback for the switch that toggles the LED variable. The LED
# variable can be accessed via the client object passed in the first argument.
client["led"] = value


def on_clight_changed(client, clight):
logging.info(f"ColoredLight changed. Swi: {clight.swi} Bri: {clight.bri} Sat: {clight.sat} Hue: {clight.hue}")


def user_task(client, args):
# NOTE: this function should not block.
# This is a user-defined task that updates the colored light. Note any registered
# cloud object can be accessed using the client object passed to this function.
# The composite ColoredLight object fields can be assigned to individually, using dot:
client["clight"].hue = round(uniform(0, 100), 1)
client["clight"].bri = round(uniform(0, 100), 1)


def wdt_task(client, wdt):
# Update the WDT to prevent it from resetting the system
wdt.feed()


def wifi_connect():
if not WIFI_SSID or not WIFI_PASS:
raise (Exception("Network is not configured. Set SSID and passwords in secrets.py"))
wlan = network.WLAN(network.STA_IF)
wlan.active(True)
wlan.connect(WIFI_SSID, WIFI_PASS)
while not wlan.isconnected():
logging.info("Trying to connect. Note this may take a while...")
time.sleep_ms(500)
logging.info(f"WiFi Connected {wlan.ifconfig()}")


if __name__ == "__main__":
# Configure the logger.
# All message equal or higher to the logger level are printed.
# To see more debugging messages, set level=logging.DEBUG.
logging.basicConfig(
datefmt="%H:%M:%S",
format="%(asctime)s.%(msecs)03d %(message)s",
level=logging.INFO,
)

# NOTE: Add networking code here or in boot.py
wifi_connect()

# Create a client object to connect to the Arduino IoT cloud.
# For mTLS authentication, "keyfile" and "certfile" can be paths to a DER-encoded key and
# a DER-encoded certificate, or secure element (SE) URIs in the format: provider:token=slot
client = ArduinoCloudClient(
device_id=DEVICE_ID,
ssl_params={
"keyfile": KEY_PATH, "certfile": CERT_PATH, "cadata": CADATA,
"verify_mode": ssl.CERT_REQUIRED, "server_hostname": "iot.arduino.cc"
},
sync_mode=False,
)

# Register cloud objects.
# Note: The following objects must be created first in the dashboard and linked to the device.
# This cloud object is initialized with its last known value from the cloud. When this object is updated
# from the dashboard, the on_switch_changed function is called with the client object and the new value.
client.register("sw1", value=None, on_write=on_switch_changed, interval=0.250)

# This cloud object is updated manually in the switch's on_write_change callback.
client.register("led", value=None)

# This is a periodic cloud object that gets updated at fixed intervals (in this case 1 seconed) with the
# value returned from its on_read function (a formatted string of the current time). Note this object's
# initial value is None, it will be initialized by calling the on_read function.
client.register("clk", value=None, on_read=lambda x: strftime("%H:%M:%S", time.localtime()), interval=1.0)

# This is an example of a composite cloud object (a cloud object with multiple variables). In this case
# a colored light with switch, hue, saturation and brightness attributes. Once initialized, the object's
# attributes can be accessed using dot notation. For example: client["clight"].swi = False.
client.register(ColoredLight("clight", swi=True, on_write=on_clight_changed))

# This is another example of a composite cloud object, a map location with lat and long attributes.
client.register(Location("treasureisland", lat=31.264694, lon=29.979987))

# This object allows scheduling recurring events from the cloud UI. On activation of the event, if the
# on_active callback is provided, it gets called with the client object and the schedule object value.
# Note: The activation status of the object can also be polled using client["schedule"].active.
client.register(Schedule("schedule", on_active=lambda client, value: logging.info(f"Schedule activated {value}!")))

# The client can also schedule user code in a task and run it along with the other cloud objects.
# To schedule a user function, use the Task object and pass the task name and function in "on_run"
# to client.register().
client.register(Task("user_task", on_run=user_task, interval=1.0))

# If a Watchdog timer is available, it can be used to recover the system by resetting it, if it ever
# hangs or crashes for any reason. NOTE: once the WDT is enabled it must be reset periodically to
# prevent it from resetting the system, which is done in another user task.
# NOTE: Change the following to True to enable the WDT.
if False:
try:
from machine import WDT
# Enable the WDT with a timeout of 5s (1s is the minimum)
wdt = WDT(timeout=7500)
client.register(Task("watchdog_task", on_run=wdt_task, interval=1.0, args=wdt))
except (ImportError, AttributeError):
pass

# Start the Arduino IoT cloud client. In synchronous mode, this function returns immediately
# after connecting to the cloud.
client.start()

# In sync mode, start returns after connecting, and the client must be polled periodically.
while True:
client.update()
time.sleep(0.100)
19 changes: 1 addition & 18 deletions examples/micropython_basic.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,6 @@
from secrets import DEVICE_ID
from secrets import SECRET_KEY # noqa

KEY_PATH = "key.der" # noqa
CERT_PATH = "cert.der" # noqa


def on_switch_changed(client, value):
# This is a write callback for the switch that toggles the LED variable. The LED
Expand Down Expand Up @@ -65,7 +62,7 @@ def wifi_connect():
logging.basicConfig(
datefmt="%H:%M:%S",
format="%(asctime)s.%(msecs)03d %(message)s",
level=logging.DEBUG,
level=logging.INFO,
)

# NOTE: Add networking code here or in boot.py
Expand All @@ -76,20 +73,6 @@ def wifi_connect():
# ID, and the password is the secret key obtained from the IoT cloud when provisioning a device.
client = ArduinoCloudClient(device_id=DEVICE_ID, username=DEVICE_ID, password=SECRET_KEY, sync_mode=False)

# Alternatively, the client supports key and certificate-based authentication. To use this
# mode, set "keyfile" and "certfile", and specify the CA certificate (if any) in "ssl_params".
# Secure elements, which can be used to store the key and certificate, are also supported.
# To use secure elements, provide the key and certificate URIs (in provider:token format) and
# set the token's PIN (if applicable). For example:
# client = ArduinoCloudClient(
# device_id=DEVICE_ID,
# ssl_params={
# "pin": "1234", "keyfile": KEY_PATH, "certfile": CERT_PATH, "cadata": CADATA,
# "verify_mode": ssl.CERT_REQUIRED, "server_hostname" : "iot.arduino.cc"
# },
# sync_mode=False,
# )

# Register cloud objects.
# Note: The following objects must be created first in the dashboard and linked to the device.
# This cloud object is initialized with its last known value from the cloud. When this object is updated
Expand Down
78 changes: 63 additions & 15 deletions src/arduino_iot_cloud/ussl.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,22 +9,56 @@
import ssl
import sys
import logging
import binascii
try:
from micropython import const
except (ImportError, AttributeError):
def const(x):
return x

pkcs11 = None
se_dev = None

# Default engine and provider.
_ENGINE_PATH = "/usr/lib/engines-3/libpkcs11.so"
_MODULE_PATH = "/usr/lib/softhsm/libsofthsm2.so"

# Reference EC key for NXP's PlugNTrust
_EC_REF_KEY = binascii.unhexlify(
b"3041020100301306072a8648ce3d020106082a8648ce3d03010704273025"
b"0201010420100000000000000000000000000000000000ffffffffa5a6b5"
b"b6a5a6b5b61000"
# Reference EC key for the SE.
_EC_REF_KEY = const(
b"\x30\x41\x02\x01\x00\x30\x13\x06\x07\x2A\x86\x48\xCE\x3D\x02\x01"
b"\x06\x08\x2A\x86\x48\xCE\x3D\x03\x01\x07\x04\x27\x30\x25\x02\x01"
b"\x01\x04\x20\xA5\xA6\xB5\xB6\xA5\xA6\xB5\xB6\x00\x00\x00\x00\x00"
b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xFF"
b"\xFF\xFF\xFF"
)


def log_level_enabled(level):
return logging.getLogger().isEnabledFor(level)


def ecdsa_sign_callback(key, data):
if log_level_enabled(logging.DEBUG):
key_hex = "".join("%02X" % b for b in key)
logging.debug(f"ecdsa_sign_callback key:{key_hex}")

if key[0:8] != b"\xA5\xA6\xB5\xB6\xA5\xA6\xB5\xB6":
if log_level_enabled(logging.DEBUG):
logging.debug("ecdsa_sign_callback falling back to default sign")
return None

obj_id = int.from_bytes(key[-4:], "big")
if log_level_enabled(logging.DEBUG):
logging.debug(f"ecdsa_sign_callback oid: 0x{obj_id:02X}")

# Sign data on SE using reference key object id.
sig = se_dev.sign(obj_id, data)
if log_level_enabled(logging.DEBUG):
sig_hex = "".join("%02X" % b for b in sig)
logging.debug(f"ecdsa_sign_callback sig: {sig_hex}")
logging.info("Signed using secure element")
return sig


def wrap_socket(sock, ssl_params={}):
keyfile = ssl_params.get("keyfile", None)
certfile = ssl_params.get("certfile", None)
Expand All @@ -33,16 +67,27 @@ def wrap_socket(sock, ssl_params={}):
ciphers = ssl_params.get("ciphers", None)
verify = ssl_params.get("verify_mode", ssl.CERT_NONE)
hostname = ssl_params.get("server_hostname", None)
micropython = sys.implementation.name == "micropython"

if keyfile is not None and "token" in keyfile and micropython:
# Create a reference EC key for NXP EdgeLock device.
objid = int(keyfile.split("=")[1], 16).to_bytes(4, "big")
keyfile = _EC_REF_KEY[0:53] + objid + _EC_REF_KEY[57:]
# Load the certificate from the secure element (when supported).
# import cryptoki
# with cryptoki.open() as token:
# cert = token.read(0x65, 412)
se_key_token = keyfile is not None and "token" in keyfile
se_crt_token = certfile is not None and "token" in certfile
sys_micropython = sys.implementation.name == "micropython"

if sys_micropython and (se_key_token or se_crt_token):
import se05x

# Create and initialize SE05x device.
global se_dev
if se_dev is None:
se_dev = se05x.SE05X()

if se_key_token:
# Create a reference key for the secure element.
obj_id = int(keyfile.split("=")[1], 16)
keyfile = _EC_REF_KEY[0:-4] + obj_id.to_bytes(4, "big")

if se_crt_token:
# Load the certificate from the secure element.
certfile = se_dev.read(0x65, 412)

if keyfile is None or "token" not in keyfile:
# Use MicroPython/CPython SSL to wrap socket.
Expand All @@ -58,6 +103,9 @@ def wrap_socket(sock, ssl_params={}):
ctx.set_ciphers(ciphers)
if cafile is not None or cadata is not None:
ctx.load_verify_locations(cafile=cafile, cadata=cadata)
if sys_micropython and se_key_token:
# Set alternate ECDSA sign function.
ctx._context.ecdsa_sign_callback = ecdsa_sign_callback
return ctx.wrap_socket(sock, server_hostname=hostname)
else:
# Use M2Crypto to load key and cert from HSM.
Expand Down
Loading