Skip to content

Refactor how we enable reports #2

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Sep 4, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
186 changes: 125 additions & 61 deletions adafruit_bno080/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,9 @@

from struct import unpack_from, pack_into
from collections import namedtuple
from time import sleep, monotonic, monotonic_ns
import time
from micropython import const
import digitalio

# TODO: Remove on release
from .debug import channels, reports
Expand Down Expand Up @@ -66,18 +67,18 @@


# Calibrated Acceleration (m/s2)
_BNO_REPORT_ACCELEROMETER = const(0x01)
BNO_REPORT_ACCELEROMETER = const(0x01)
# Calibrated gyroscope (rad/s).
_BNO_REPORT_GYROSCOPE = const(0x02)
BNO_REPORT_GYROSCOPE = const(0x02)
# Magnetic field calibrated (in µTesla). The fully calibrated magnetic field measurement.
_BNO_REPORT_MAGNETIC_FIELD = const(0x03)
BNO_REPORT_MAGNETIC_FIELD = const(0x03)
# Linear acceleration (m/s2). Acceleration of the device with gravity removed
_BNO_REPORT_LINEAR_ACCELERATION = const(0x04)
BNO_REPORT_LINEAR_ACCELERATION = const(0x04)
# Rotation Vector
_BNO_REPORT_ROTATION_VECTOR = const(0x05)
_BNO_REPORT_GEOMAGNETIC_ROTATION_VECTOR = const(0x09)
_BNO_REPORT_STEP_COUNTER = const(0x11)
_BNO_REPORT_SHAKE_DETECTOR = const(0x19)
BNO_REPORT_ROTATION_VECTOR = const(0x05)
BNO_REPORT_GEOMAGNETIC_ROTATION_VECTOR = const(0x09)
BNO_REPORT_STEP_COUNTER = const(0x11)
BNO_REPORT_SHAKE_DETECTOR = const(0x19)


_DEFAULT_REPORT_INTERVAL = const(50000) # in microseconds = 50ms
Expand Down Expand Up @@ -111,15 +112,15 @@
_BNO_CMD_TIMESTAMP_REBASE: 5,
}
# length is probably deterministic, like axes * 2 +4
_ENABLED_SENSOR_REPORTS = {
_BNO_REPORT_ACCELEROMETER: (_Q_POINT_8_SCALAR, 3, 10),
_BNO_REPORT_GYROSCOPE: (_Q_POINT_9_SCALAR, 3, 10),
_BNO_REPORT_MAGNETIC_FIELD: (_Q_POINT_4_SCALAR, 3, 10),
_BNO_REPORT_LINEAR_ACCELERATION: (_Q_POINT_8_SCALAR, 3, 10),
_BNO_REPORT_ROTATION_VECTOR: (_Q_POINT_14_SCALAR, 4, 14,),
_BNO_REPORT_GEOMAGNETIC_ROTATION_VECTOR: (_Q_POINT_12_SCALAR, 4, 14),
_BNO_REPORT_STEP_COUNTER: (1, 1, 12),
_BNO_REPORT_SHAKE_DETECTOR: (1, 1, 6),
_AVAIL_SENSOR_REPORTS = {
BNO_REPORT_ACCELEROMETER: (_Q_POINT_8_SCALAR, 3, 10),
BNO_REPORT_GYROSCOPE: (_Q_POINT_9_SCALAR, 3, 10),
BNO_REPORT_MAGNETIC_FIELD: (_Q_POINT_4_SCALAR, 3, 10),
BNO_REPORT_LINEAR_ACCELERATION: (_Q_POINT_8_SCALAR, 3, 10),
BNO_REPORT_ROTATION_VECTOR: (_Q_POINT_14_SCALAR, 4, 14,),
BNO_REPORT_GEOMAGNETIC_ROTATION_VECTOR: (_Q_POINT_12_SCALAR, 4, 14),
BNO_REPORT_STEP_COUNTER: (1, 1, 12),
BNO_REPORT_SHAKE_DETECTOR: (1, 1, 6),
}

DATA_BUFFER_SIZE = const(512) # data buffer size. obviously eats ram
Expand All @@ -131,17 +132,22 @@
REPORT_STATUS = ["Unreliable", "Accuracy low", "Accuracy medium", "Accuracy high"]


class PacketError(Exception):
"""Raised when the packet couldnt be parsed"""
pass


def _elapsed(start_time):
return monotonic() - start_time
return time.monotonic() - start_time


def elapsed_time(func):
"""Print the runtime of the decorated function"""

def wrapper_timer(*args, **kwargs):
start_time = monotonic_ns() # 1
start_time = time.monotonic_ns() # 1
value = func(*args, **kwargs)
end_time = monotonic_ns() # 2
end_time = time.monotonic_ns() # 2
run_time = end_time - start_time # 3
print("Finished", func.__name__, "in", (run_time / 1000000.0), "ms")
return value
Expand All @@ -152,7 +158,7 @@ def wrapper_timer(*args, **kwargs):
def _parse_sensor_report_data(report_bytes):
data_offset = 4 # this may not always be true
report_id = report_bytes[0]
scalar, count, _report_length = _ENABLED_SENSOR_REPORTS[report_id]
scalar, count, _report_length = _AVAIL_SENSOR_REPORTS[report_id]

results = []

Expand Down Expand Up @@ -198,7 +204,7 @@ def parse_sensor_id(buffer):

def _report_length(report_id):
if report_id < 0xF0: # it's a sensor report
return _ENABLED_SENSOR_REPORTS[report_id][2]
return _AVAIL_SENSOR_REPORTS[report_id][2]

return _REPORT_LENGTHS[report_id]

Expand Down Expand Up @@ -248,7 +254,7 @@ def __str__(self):
_BNO_CHANNEL_INPUT_SENSOR_REPORTS,
]:
if self.report_id in reports:
outstr += "DBG::\t\t \tReport Type: %s(%d)\n" % (
outstr += "DBG::\t\t \tReport Type: %s (0x%x)\n" % (
reports[self.report_id],
self.report_id,
)
Expand Down Expand Up @@ -332,8 +338,9 @@ class BNO080:

"""

def __init__(self, debug=False):
def __init__(self, reset=None, debug=False):
self._debug = debug
self._reset = reset
self._dbg("********** __init__ *************")
self._data_buffer = bytearray(DATA_BUFFER_SIZE)
self._packet_slices = []
Expand All @@ -351,56 +358,76 @@ def __init__(self, debug=False):

def initialize(self):
"""Initialize the sensor"""
self.reset()
self.hard_reset()
self.soft_reset()
if not self._check_id():
raise RuntimeError("Could not read ID")
for report_type in _ENABLED_SENSOR_REPORTS:
self._enable_feature(report_type)

@property
def magnetic(self):
"""A tuple of the current magnetic field measurements on the X, Y, and Z axes"""
self._process_available_packets() # decorator?
return self._readings[_BNO_REPORT_MAGNETIC_FIELD]
try:
return self._readings[BNO_REPORT_MAGNETIC_FIELD]
except KeyError:
raise RuntimeError("No magfield report found, is it enabled?") from None

@property
def quaternion(self):
"""A quaternion representing the current rotation vector"""
self._process_available_packets()
return self._readings[_BNO_REPORT_ROTATION_VECTOR]
try:
return self._readings[BNO_REPORT_ROTATION_VECTOR]
except KeyError:
raise RuntimeError("No quaternion report found, is it enabled?") from None

@property
def geomagnetic_quaternion(self):
"""A quaternion representing the current geomagnetic rotation vector"""
self._process_available_packets()
return self._readings[_BNO_REPORT_GEOMAGNETIC_ROTATION_VECTOR]
try:
return self._readings[BNO_REPORT_GEOMAGNETIC_ROTATION_VECTOR]
except KeyError:
raise RuntimeError("No geomag quaternion report found, is it enabled?") from None

@property
def steps(self):
"""The number of steps detected since the sensor was initialized"""
self._process_available_packets()
return self._readings[_BNO_REPORT_STEP_COUNTER]
try:
return self._readings[BNO_REPORT_STEP_COUNTER]
except KeyError:
raise RuntimeError("No steps report found, is it enabled?") from None

@property
def linear_acceleration(self):
"""A tuple representing the current linear acceleration values on the X, Y, and Z
axes in meters per second squared"""
self._process_available_packets()
return self._readings[_BNO_REPORT_LINEAR_ACCELERATION]
try:
return self._readings[BNO_REPORT_LINEAR_ACCELERATION]
except KeyError:
raise RuntimeError("No lin. accel report found, is it enabled?") from None

@property
def acceleration(self):
"""A tuple representing the acceleration measurements on the X, Y, and Z
axes in meters per second squared"""
self._process_available_packets()
return self._readings[_BNO_REPORT_ACCELEROMETER]
try:
return self._readings[BNO_REPORT_ACCELEROMETER]
except KeyError:
raise RuntimeError("No accel report found, is it enabled?") from None

@property
def gyro(self):
"""A tuple representing Gyro's rotation measurements on the X, Y, and Z
axes in radians per second"""
self._process_available_packets()
return self._readings[_BNO_REPORT_GYROSCOPE]
try:
return self._readings[BNO_REPORT_GYROSCOPE]
except KeyError:
raise RuntimeError("No gyro report found, is it enabled?") from None

@property
def shake(self):
Expand All @@ -411,20 +438,27 @@ def shake(self):
this property is not guaranteed to reflect the shake state at the moment it is read
"""
self._process_available_packets()
shake_detected = self._readings[_BNO_REPORT_SHAKE_DETECTOR]
# clear on read
if shake_detected:
self._readings[_BNO_REPORT_SHAKE_DETECTOR] = False
try:
shake_detected = self._readings[BNO_REPORT_SHAKE_DETECTOR]
# clear on read
if shake_detected:
self._readings[BNO_REPORT_SHAKE_DETECTOR] = False
except KeyError:
raise RuntimeError("No shake report found, is it enabled?") from None

# # decorator?
def _process_available_packets(self):
processed_count = 0
while self._data_ready:
new_packet = self._read_packet()
print("reading a packet")
try:
new_packet = self._read_packet()
except PacketError:
continue
self._handle_packet(new_packet)
processed_count += 1
self._dbg("")
self._dbg("Processesd", processed_count, "packets")
#print("Processed", processed_count, "packets")
self._dbg("")
# we'll probably need an exit here for fast sensor rates
self._dbg("")
Expand All @@ -436,7 +470,7 @@ def _wait_for_packet_type(self, channel_number, report_id=None, timeout=5.0):
else:
report_id_str = ""
self._dbg("** Waiting for packet on channel", channel_number, report_id_str)
start_time = monotonic()
start_time = time.monotonic()
while _elapsed(start_time) < timeout:
new_packet = self._wait_for_packet()

Expand All @@ -452,7 +486,7 @@ def _wait_for_packet_type(self, channel_number, report_id=None, timeout=5.0):
raise RuntimeError("Timed out waiting for a packet on channel", channel_number)

def _wait_for_packet(self, timeout=_PACKET_READ_TIMEOUT):
start_time = monotonic()
start_time = time.monotonic()
while _elapsed(start_time) < timeout:
if not self._data_ready:
continue
Expand Down Expand Up @@ -503,14 +537,14 @@ def _process_report(self, report_id, report_bytes):
print(outstr)
self._dbg("")

if report_id == _BNO_REPORT_STEP_COUNTER:
if report_id == BNO_REPORT_STEP_COUNTER:
self._readings[report_id] = _parse_step_couter_report(report_bytes)
return
if report_id == _BNO_REPORT_SHAKE_DETECTOR:
if report_id == BNO_REPORT_SHAKE_DETECTOR:
shake_detected = _parse_shake_report(report_bytes)
# shake not previously detected - auto cleared by 'shake' property
if not self._readings[_BNO_REPORT_SHAKE_DETECTOR]:
self._readings[_BNO_REPORT_SHAKE_DETECTOR] = shake_detected
if not self._readings[BNO_REPORT_SHAKE_DETECTOR]:
self._readings[BNO_REPORT_SHAKE_DETECTOR] = shake_detected
return

sensor_data = _parse_sensor_report_data(report_bytes)
Expand All @@ -533,10 +567,11 @@ def _get_feature_enable_report(
pack_into("<I", set_feature_report, 5, report_interval)
return set_feature_report

def _enable_feature(self, feature_id):
def enable_feature(self, feature_id):
self._dbg("\n********** Enabling feature id:", feature_id, "**********")

set_feature_report = self._get_feature_enable_report(feature_id)
print("Enabling", feature_id)
self._send_packet(_BNO_CHANNEL_CONTROL, set_feature_report)
while True:
packet = self._wait_for_packet_type(
Expand All @@ -545,15 +580,16 @@ def _enable_feature(self, feature_id):

if packet.data[1] == feature_id:
if (
feature_id == _BNO_REPORT_ROTATION_VECTOR
feature_id == BNO_REPORT_ROTATION_VECTOR
): # check for other vector types as well
self._readings[feature_id] = (0.0, 0.0, 0.0, 0.0)
else:
self._readings[feature_id] = (0.0, 0.0, 0.0)
self._dbg("Enabled")
return True
print("Enabled", feature_id)
break
else:
raise RuntimeError("Was not able to enable feature", feature_id)

return False

def _check_id(self):
self._dbg("\n********** READ ID **********")
Expand Down Expand Up @@ -605,22 +641,50 @@ def _get_data(self, index, fmt_string):
data_index = index + 4
return unpack_from(fmt_string, self._data_buffer, offset=data_index)[0]

def _read_header(self):
"""Reads the first 4 bytes available as a header"""
with self.bus_device_obj as bus_dev: # pylint:disable=no-member
bus_dev.readinto(self._data_buffer, end=4)
packet_header = Packet.header_from_buffer(self._data_buffer)
self._dbg(packet_header)
return packet_header

# pylint:disable=no-self-use
@property
def _data_ready(self):
raise RuntimeError("Not implemented")

def reset(self):
def hard_reset(self):
"""Hardware reset the sensor to an initial unconfigured state"""
if not self._reset:
return
#print("Hard resetting...")
self._reset.direction = digitalio.Direction.OUTPUT
self._reset.value = True
time.sleep(0.01)
self._reset.value = False
time.sleep(0.01)
self._reset.value = True
time.sleep(0.5)

def soft_reset(self):
"""Reset the sensor to an initial unconfigured state"""
raise RuntimeError("Not implemented")
print("Soft resetting...", end="")
data = bytearray(1)
data[0] = 1
seq = self._send_packet(BNO_CHANNEL_EXE, data)
time.sleep(0.5)

for i in range(3):
while True: # retry reading packets until ready!
try:
packet = self._read_packet()
break
except PacketError:
time.sleep(0.1)

#print(packet)
if i == 0 and packet.channel_number != _BNO_CHANNEL_SHTP_COMMAND:
raise RuntimeError("Expected an SHTP announcement")
if i == 1 and packet.channel_number != BNO_CHANNEL_EXE:
raise RuntimeError("Expected a reset reply")
if i == 2 and packet.channel_number != _BNO_CHANNEL_CONTROL:
raise RuntimeError("Expected a control announcement")
print("OK!");
# all is good!

def _send_packet(self, channel, data):
raise RuntimeError("Not implemented")
Expand Down
Loading