Skip to content

Fix code formatting for linter. #1

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jul 18, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 9 additions & 3 deletions aiotcloud/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,36 +21,42 @@
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
from .ucloud import AIOTClient
from .ucloud import AIOTClient # noqa
from .ucloud import AIOTObject
from .ucloud import timestamp

try:
import asyncio
except ImportError:
import uasyncio as asyncio


class Location(AIOTObject):
def __init__(self, name, **kwargs):
super().__init__(name, keys={"lat", "lon"}, **kwargs)


class Color(AIOTObject):
def __init__(self, name, **kwargs):
super().__init__(name, keys={"hue", "sat", "bri"}, **kwargs)


class ColoredLight(AIOTObject):
def __init__(self, name, **kwargs):
super().__init__(name, keys={"swi", "hue", "sat", "bri"}, **kwargs)


class DimmedLight(AIOTObject):
def __init__(self, name, **kwargs):
super().__init__(name, keys={"swi", "bri"}, **kwargs)


class Schedule(AIOTObject):
def __init__(self, name, **kwargs):
kwargs.update({("runnable", True)}) # Force task creation.
kwargs.update({("runnable", True)}) # Force task creation.
self.on_active = kwargs.pop("on_active", None)
# Uncomment to allow the schedule to change in runtime.
#kwargs["on_write"] = kwargs.get("on_write", lambda aiot, value: None)
# kwargs["on_write"] = kwargs.get("on_write", lambda aiot, value: None)
self.active = False
super().__init__(name, keys={"frm", "to", "len", "msk"}, **kwargs)

Expand Down
10 changes: 4 additions & 6 deletions aiotcloud/ntptime.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,11 @@
THE SOFTWARE.
"""
try:
import usocket as socket
except:
import socket
try:
import ustruct as struct
except:
import struct
except ImportError:
import usocket as socket
import ustruct as struct

# (date(2000, 1, 1) - date(1900, 1, 1)).days * 24*60*60
NTP_DELTA = 3155673600
Expand All @@ -44,7 +42,7 @@ def time():
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
try:
s.settimeout(1)
res = s.sendto(NTP_QUERY, addr)
s.sendto(NTP_QUERY, addr)
msg = s.recv(48)
finally:
s.close()
Expand Down
76 changes: 40 additions & 36 deletions aiotcloud/ucloud.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
from kpn_senml import SenmlPack
from kpn_senml import SenmlRecord
from aiotcloud.umqtt import MQTTClient

try:
import logging
import asyncio
Expand All @@ -36,27 +37,31 @@
import uasyncio as asyncio
from aiotcloud import ntptime
from uasyncio.core import CancelledError

# MicroPython doesn't have this exception
class InvalidStateError(Exception):
pass


def timestamp():
return int(time.time())


class AIOTObject(SenmlRecord):
def __init__(self, name, **kwargs):
self.on_read = kwargs.pop("on_read", None)
self.on_read = kwargs.pop("on_read", None)
self.on_write = kwargs.pop("on_write", None)
self.interval = kwargs.pop("interval", 1.0)
self._runnable = kwargs.pop("runnable", False)
value = kwargs.pop("value", None)
if keys := kwargs.pop("keys", {}): # Create a complex object (with sub-records).
mkrec = lambda k, v: AIOTObject(f"{name}:{k}", value=v, callback=self.senml_callback)
value = {k : mkrec(k, v) for (k, v) in {k: kwargs.pop(k, None) for k in keys}.items()}
if keys := kwargs.pop("keys", {}):
value = { # Create a complex object (with sub-records).
k: AIOTObject(f"{name}:{k}", value=v, callback=self.senml_callback)
for (k, v) in {k: kwargs.pop(k, None) for k in keys}.items()
}
self._updated = False
self.on_write_scheduled = False
self.timestamp = timestamp()
self.dtype = type(value) # NOTE: must be set before calling super
callback = kwargs.pop("callback", self.senml_callback)
for key in kwargs: # kwargs should be empty by now, unless a wrong attr was used.
raise TypeError(f"'{self.__class__.__name__}' got an unexpected keyword argument '{key}'")
Expand Down Expand Up @@ -94,15 +99,17 @@ def runnable(self):
@SenmlRecord.value.setter
def value(self, value):
if value is not None:
if self.dtype is type(None):
self.dtype = type(value)
elif not isinstance(value, self.dtype):
raise TypeError(f"record: {self.name} invalid data type. Expected {self.dtype} not {type(value)}")
else:
if self.value is not None:
if not isinstance(self.value, type(value)):
raise TypeError(
f"record: {self.name} invalid data type. Expected {type(self.value)} not {type(value)}"
)
self._updated = True
self.timestamp = timestamp()
logging.debug(f"record: {self.name} %s: {value} ts: {self.timestamp}"
%("initialized" if self.value is None else "updated"))
logging.debug(
f"record: {self.name} %s: {value} ts: {self.timestamp}"
% ("initialized" if self.value is None else "updated")
)
self._value = value

def __getattr__(self, attr):
Expand All @@ -119,7 +126,7 @@ def __setattr__(self, name, value):
def _build_rec_dict(self, naming_map, appendTo):
if isinstance(self.value, dict):
for r in self.value.values():
if r.value is not None: # NOTE: should filter by updated when it's supported.
if r.value is not None: # NOTE: should filter by updated when it's supported.
r._build_rec_dict(naming_map, appendTo)
else:
super()._build_rec_dict(naming_map, appendTo)
Expand All @@ -128,7 +135,7 @@ def add_to_pack(self, pack):
if isinstance(self.value, dict):
for r in self.value.values():
# NOTE: If record value is None it can still be added to the pack for initialization.
pack.add(r) # NOTE: should filter by updated when it's supported.
pack.add(r) # NOTE: should filter by updated when it's supported.
else:
pack.add(self)
self.updated = False
Expand All @@ -143,14 +150,15 @@ def senml_callback(self, record, **kwargs):

async def run(self, aiot):
while True:
if (self.on_read is not None):
if self.on_read is not None:
self.value = self.on_read(aiot)
if (self.on_write is not None and self.on_write_scheduled):
if self.on_write is not None and self.on_write_scheduled:
self.on_write_scheduled = False
self.on_write(aiot, self if isinstance(self.value, dict) else self.value)
await asyncio.sleep(self.interval)

class AIOTClient():

class AIOTClient:
def __init__(self, device_id, ssl_params=None, server="mqtts-sa.iot.oniudra.cc", port=8883, keepalive=10):
self.tasks = {}
self.records = {}
Expand All @@ -159,7 +167,7 @@ def __init__(self, device_id, ssl_params=None, server="mqtts-sa.iot.oniudra.cc",
self.update_systime()
self.last_ping = timestamp()
self.device_topic = b"/a/d/" + device_id + b"/e/i"
self.senmlpack = SenmlPack("urn:uuid:"+device_id.decode("utf-8"), self.senml_generic_callback)
self.senmlpack = SenmlPack("urn:uuid:" + device_id.decode("utf-8"), self.senml_generic_callback)
self.mqtt = MQTTClient(device_id, server, port, ssl_params, keepalive=keepalive, callback=self.mqtt_callback)
# Note: the following internal objects are initialized by the cloud.
for name in ["thing_id", "tz_offset", "tz_dst_until"]:
Expand All @@ -183,11 +191,8 @@ def get(self, key, default=None):

def update_systime(self):
try:
from aiotcloud import ntptime
ntptime.settime()
logging.info("RTC time set from NTP.")
except ImportError:
pass
except Exception as e:
logging.error(f"Failed to set RTC time from NTP: {e}.")

Expand Down Expand Up @@ -239,8 +244,8 @@ async def discovery_task(self, interval=0.100):
self.mqtt.check_msg()
if self.records.get("thing_id").value is not None:
self.thing_id = self.records.pop("thing_id").value
if not self.thing_id: # Empty thing ID should not happen.
raise(Exception("Device is not linked to a Thing ID."))
if not self.thing_id: # Empty thing ID should not happen.
raise (Exception("Device is not linked to a Thing ID."))

self.topic_out = self.create_topic("e", "o")
self.mqtt.subscribe(self.create_topic("e", "i"))
Expand All @@ -249,7 +254,7 @@ async def discovery_task(self, interval=0.100):
lastval_record.add_to_pack(self.senmlpack)
self.mqtt.subscribe(self.create_topic("shadow", "i"))
self.mqtt.publish(self.create_topic("shadow", "o"), self.senmlpack.to_cbor(), qos=True)
logging.info(f"Device configured via discovery protocol.")
logging.info("Device configured via discovery protocol.")
await asyncio.sleep(interval)

async def mqtt_task(self, interval=0.100):
Expand All @@ -258,30 +263,28 @@ async def mqtt_task(self, interval=0.100):
if self.thing_id is not None:
self.senmlpack.clear()
for record in self.records.values():
if (record.updated):
if record.updated:
record.add_to_pack(self.senmlpack)
if len(self.senmlpack._data):
logging.debug("Pushing records to AIoT Cloud:")
if (self.debug):
for record in self.senmlpack:
logging.debug(f" ==> record: {record.name} value: {str(record.value)[:48]}...")
for record in self.senmlpack:
logging.debug(f" ==> record: {record.name} value: {str(record.value)[:48]}...")
self.mqtt.publish(self.topic_out, self.senmlpack.to_cbor(), qos=True)
self.last_ping = timestamp()
elif (self.keepalive and (timestamp() - self.last_ping) > self.keepalive):
elif self.keepalive and (timestamp() - self.last_ping) > self.keepalive:
self.mqtt.ping()
self.last_ping = timestamp()
logging.debug("No records to push, sent a ping request.")
await asyncio.sleep(interval)

async def run(self, user_main=None, debug=False):
self.debug = debug

async def run(self, user_main=None):
logging.info("Connecting to AIoT cloud...")
if not self.mqtt.connect():
logging.error("Failed to connect AIoT cloud.")
return

self.mqtt.subscribe(self.device_topic)
if (user_main is not None):
if user_main is not None:
self.create_task("user_main", user_main, self)
self.create_task("mqtt_task", self.mqtt_task)
self.create_task("discovery", self.discovery_task)
Expand All @@ -291,8 +294,8 @@ async def run(self, user_main=None, debug=False):
await asyncio.gather(*self.tasks.values(), return_exceptions=False)
logging.info("All tasks finished!")
break
except Exception as e:
pass #import traceback; traceback.print_exc()
except Exception:
pass # import traceback; traceback.print_exc()

for name in list(self.tasks):
task = self.tasks[name]
Expand All @@ -301,4 +304,5 @@ async def run(self, user_main=None, debug=False):
self.tasks.pop(name)
self.records.pop(name, None)
logging.error(f"Removed task: {name}. Raised exception: {task.exception()}.")
except (CancelledError, InvalidStateError) as e: pass
except (CancelledError, InvalidStateError):
pass
20 changes: 16 additions & 4 deletions aiotcloud/umqtt.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
# Based on: https://github.com/micropython/micropython-lib/tree/master/micropython/umqtt.simple

import time

try:
import socket
import struct
Expand All @@ -34,12 +35,23 @@
import ulogging as logging
from ussl import wrap_socket


class MQTTException(Exception):
pass


class MQTTClient:
def __init__(self, client_id, server, port, ssl_params,
user=None, password=None, keepalive=0, callback=None):
def __init__(
self,
client_id,
server,
port,
ssl_params,
user=None,
password=None,
keepalive=0,
callback=None,
):
self.client_id = client_id
self.server = server
self.port = port
Expand Down Expand Up @@ -86,7 +98,7 @@ def _connect(self, clean_session=True):
self.sock = socket.socket()
self.sock = wrap_socket(self.sock, **self.ssl_params)
self.sock.connect(addr)
except:
except Exception:
self.sock.close()
self.sock = socket.socket()
self.sock.connect(addr)
Expand Down Expand Up @@ -211,7 +223,7 @@ def subscribe(self, topic, qos=0):
# messages processed internally.
def wait_msg(self):
res = self.sock.read(1)
if res == b"" or res == None:
if res == b"" or res is None:
return None
self.sock.setblocking(True)
if res == b"\xd0": # PINGRESP
Expand Down
21 changes: 16 additions & 5 deletions aiotcloud/ussl.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
#
# ussl module with m2crypto backend for HSM support.
# ussl module with m2crypto backend for HSM support.

from M2Crypto import Engine, m2, BIO, SSL
from M2Crypto import Engine, m2, SSL

_key = None
_cert = None
Expand All @@ -33,6 +33,7 @@
ENGINE_PATH = "/usr/lib/engines-1.1/libpkcs11.so"
MODULE_PATH = "/usr/lib/softhsm/libsofthsm2.so"


def init(pin, certfile, keyfile, engine_path, module_path):
global _key, _cert
Engine.load_dynamic_engine("pkcs11", engine_path)
Expand All @@ -43,12 +44,22 @@ def init(pin, certfile, keyfile, engine_path, module_path):
_key = pkcs11.load_private_key(keyfile)
_cert = pkcs11.load_certificate(certfile)

def wrap_socket(sock_in, pin, certfile, keyfile, ca_certs=None, ciphers=None, engine_path=ENGINE_PATH, module_path=MODULE_PATH):

def wrap_socket(
sock_in,
pin,
certfile,
keyfile,
ca_certs=None,
ciphers=None,
engine_path=ENGINE_PATH,
module_path=MODULE_PATH,
):
if _key is None or _cert is None:
init(pin, certfile, keyfile, engine_path, module_path)

# Create SSL context
ctx = SSL.Context('tls')
ctx = SSL.Context("tls")
ctx.set_default_verify_paths()
ctx.set_allow_unknown_ca(False)

Expand All @@ -57,7 +68,7 @@ def wrap_socket(sock_in, pin, certfile, keyfile, ca_certs=None, ciphers=None, en

if ca_certs is not None:
if ctx.load_verify_locations(ca_certs) != 1:
raise Exception('Failed to load CA certs')
raise Exception("Failed to load CA certs")
ctx.set_verify(SSL.verify_peer, depth=9)

# Set key/cert
Expand Down
Loading