Skip to content

Fixed example for RPLidar firmware 1.29 #17

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 16 commits into from
May 5, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 6 additions & 15 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -33,33 +33,24 @@ Usage Example
.. code-block:: python
import os
from math import cos, sin, pi, floor
import pygame
from adafruit_circuitpython_rplidar import RPLidar
# Set up pygame and the display
os.putenv('SDL_FBDEV', '/dev/fb1')
pygame.init()
lcd = pygame.display.set_mode((320,240))
pygame.mouse.set_visible(False)
lcd.fill((0,0,0))
pygame.display.update()
from math import floor
from adafruit_rplidar import RPLidar
# Setup the RPLidar
PORT_NAME = '/dev/ttyUSB0'
lidar = RPLidar(None, PORT_NAME)
lidar = RPLidar(None, PORT_NAME, timeout=3)
# used to scale data to fit on the screen
max_distance = 0
def process_data(data):
# Do something useful with the data
pass
print(data)
scan_data = [0]*360
try:
print(lidar.get_info())
# print(lidar.get_info())
for scan in lidar.iter_scans():
for (_, angle, distance) in scan:
scan_data[min([359, floor(angle)])] = distance
Expand Down
219 changes: 179 additions & 40 deletions adafruit_rplidar.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

* Author(s): Dave Astels
* Based on https://github.com/SkoltechRobotics/rplidar by Artyom Pavlov
* and updates from https://github.com/Roboticia/RPLidar by Julien JEHL

Implementation Notes
--------------------
Expand All @@ -23,16 +24,17 @@
* Adafruit CircuitPython firmware for the supported boards:
https://github.com/adafruit/circuitpython/releases

Version 0.0.1 does NOT support CircuitPython. Future versions will.
The Current Version does NOT support CircuitPython. Future versions will.
"""

import struct
import sys
import time
import warnings
from collections import namedtuple

# pylint:disable=invalid-name,undefined-variable,global-variable-not-assigned
# pylint:disable=too-many-arguments,raise-missing-from
# pylint:disable=too-many-arguments,raise-missing-from,too-many-instance-attributes

__version__ = "0.0.0-auto.0"
__repo__ = "https://github.com/adafruit/Adafruit_CircuitPython_RPLIDAR.git"
Expand All @@ -46,16 +48,12 @@
STOP_BYTE = b"\x25"
RESET_BYTE = b"\x40"

SCAN_BYTE = b"\x20"
FORCE_SCAN_BYTE = b"\x21"

DESCRIPTOR_LEN = 7
INFO_LEN = 20
HEALTH_LEN = 3

INFO_TYPE = 4
HEALTH_TYPE = 6
SCAN_TYPE = 129

# Constants & Command to start A2 motor
MAX_MOTOR_PWM = 1023
Expand All @@ -68,6 +66,18 @@
2: "Error",
}

SCAN_TYPE_NORMAL = 0
SCAN_TYPE_FORCE = 1
SCAN_TYPE_EXPRESS = 2

_SCAN_TYPES = (
{"byte": b"\x20", "response": 129, "size": 5},
{"byte": b"\x21", "response": 129, "size": 5},
{"byte": b"\x82", "response": 130, "size": 84},
)

express_packet = namedtuple("express_packet", "distance angle new_scan start_angle")


class RPLidarException(Exception):
"""Basic exception class for RPLidar"""
Expand All @@ -88,6 +98,17 @@ def _process_scan(raw):
return new_scan, quality, angle, distance


def _process_express_scan(data, new_angle, frame):
new_scan = (new_angle < data.start_angle) & (frame == 1)
angle = (
data.start_angle
+ ((new_angle - data.start_angle) % 360) / 32 * frame
- data.angle[frame - 1]
) % 360
distance = data.distance[frame - 1]
return new_scan, None, angle, distance


class RPLidar:
"""Class for communicating with RPLidar rangefinder scanners"""

Expand All @@ -97,6 +118,12 @@ class RPLidar:
timeout = 1 #: Serial port timeout
motor = False #: Is motor running?
baudrate = 115200 #: Baudrate for serial port
scanning = False
descriptor_size = 0
scan_type = SCAN_TYPE_NORMAL
express_frame = 32
express_data = False
express_old_data = None

def __init__(self, motor_pin, port, baudrate=115200, timeout=1, logging=False):
"""Initialize RPLidar object for communicating with the sensor.
Expand Down Expand Up @@ -152,7 +179,6 @@ def connect(self):
parity=serial.PARITY_NONE,
stopbits=serial.STOPBITS_ONE,
timeout=self.timeout,
dsrdtr=True,
)
except serial.SerialException as err:
raise RPLidarException(
Expand Down Expand Up @@ -253,7 +279,7 @@ def info(self):
if dtype != INFO_TYPE:
raise RPLidarException("Wrong response data type")
raw = self._read_response(dsize)
serialnumber_bytes = struct.unpack("BBBBBBBBBBBBBBBB", raw[4:])
serialnumber_bytes = struct.unpack("B" * len(raw[4:]), raw[4:])
serialnumber = "".join(reversed(["%02x" % b for b in serialnumber_bytes]))
data = {
"model": raw[0],
Expand Down Expand Up @@ -294,13 +320,67 @@ def health(self):

def clear_input(self):
"""Clears input buffer by reading all available data"""
if self.scanning:
raise RPLidarException("Clearing not allowed during active scanning!")
self._serial_port.flushInput()
self.express_frame = 32
self.express_data = False

def start(self, scan_type=SCAN_TYPE_NORMAL):
"""Start the scanning process
Parameters
----------
scan : normal, force or express.
"""
if self.scanning:
raise RPLidarException("Scanning already running!")
# Start the scanning process, enable laser diode and the
# measurement system
status, error_code = self.health
self.log("debug", "Health status: %s [%d]" % (status, error_code))
if status == _HEALTH_STATUSES[2]:
self.log(
"warning",
"Trying to reset sensor due to the error. "
"Error code: %d" % (error_code),
)
self.reset()
status, error_code = self.health
if status == _HEALTH_STATUSES[2]:
raise RPLidarException(
"RPLidar hardware failure. " "Error code: %d" % error_code
)
elif status == _HEALTH_STATUSES[1]:
self.log(
"warning",
"Warning sensor status detected! " "Error code: %d" % (error_code),
)
cmd = _SCAN_TYPES[scan_type]["byte"]
self.log("info", "starting scan process in %s mode" % scan_type)

if scan_type == "express":
self._send_payload_cmd(cmd, b"\x00\x00\x00\x00\x00")
else:
self._send_cmd(cmd)

dsize, is_single, dtype = self._read_descriptor()
if dsize != _SCAN_TYPES[scan_type]["size"]:
raise RPLidarException("Wrong info reply length")
if is_single:
raise RPLidarException("Not a multiple response mode")
if dtype != _SCAN_TYPES[scan_type]["response"]:
raise RPLidarException("Wrong response data type")
self.descriptor_size = dsize
self.scan_type = scan_type
self.scanning = True

def stop(self):
"""Stops scanning process, disables laser diode and the measurement
system, moves sensor to the idle state."""
self.log("info", "Stopping scanning")
self._send_cmd(STOP_BYTE)
time.sleep(0.001)
self.scanning = False
self.clear_input()

def reset(self):
Expand All @@ -309,8 +389,9 @@ def reset(self):
self.log("info", "Resetting the sensor")
self._send_cmd(RESET_BYTE)
time.sleep(0.002)
self.clear_input()

def iter_measurements(self, max_buf_meas=500):
def iter_measurements(self, max_buf_meas=500, scan_type=SCAN_TYPE_NORMAL):
"""Iterate over measurements. Note that consumer must be fast enough,
otherwise data will be accumulated inside buffer and consumer will get
data with increasing lag.
Expand All @@ -334,37 +415,11 @@ def iter_measurements(self, max_buf_meas=500):
In millimeter unit. Set to 0 when measurement is invalid.
"""
self.start_motor()
status, error_code = self.health
self.log("debug", "Health status: %s [%d]" % (status, error_code))
if status == _HEALTH_STATUSES[2]:
self.log(
"warning",
"Trying to reset sensor due to the error. "
"Error code: %d" % (error_code),
)
self.reset()
status, error_code = self.health
if status == _HEALTH_STATUSES[2]:
raise RPLidarException(
"RPLidar hardware failure. " "Error code: %d" % error_code
)
elif status == _HEALTH_STATUSES[1]:
self.log(
"warning",
"Warning sensor status detected! " "Error code: %d" % (error_code),
)
cmd = SCAN_BYTE
self._send_cmd(cmd)
dsize, is_single, dtype = self._read_descriptor()
if dsize != 5:
raise RPLidarException("Wrong info reply length")
if is_single:
raise RPLidarException("Not a multiple response mode")
if dtype != SCAN_TYPE:
raise RPLidarException("Wrong response data type")
if not self.scanning:
self.start(scan_type)

while True:
raw = self._read_response(dsize)
self.log_bytes("debug", "Received scan response: ", raw)
dsize = self.descriptor_size
if max_buf_meas:
data_in_buf = self._serial_port.in_waiting
if data_in_buf > max_buf_meas * dsize:
Expand All @@ -374,7 +429,50 @@ def iter_measurements(self, max_buf_meas=500):
"Clearing buffer..." % (data_in_buf // dsize, max_buf_meas),
)
self._serial_port.read(data_in_buf // dsize * dsize)
yield _process_scan(raw)
if self.scan_type == SCAN_TYPE_NORMAL:
raw = self._read_response(dsize)
self.log_bytes("debug", "Received scan response: ", raw)
yield _process_scan(raw)
elif self.scan_type == SCAN_TYPE_EXPRESS:
if self.express_frame == 32:
self.express_frame = 0
if not self.express_data:
self.log("debug", "reading first time bytes")
self.express_data = ExpressPacket.from_string(
self._read_response(dsize)
)

self.express_old_data = self.express_data
self.log(
"debug",
"set old_data with start_angle %f"
% self.express_old_data.start_angle,
)
self.express_data = ExpressPacket.from_string(
self._read_response(dsize)
)
self.log(
"debug",
"set new_data with start_angle %f"
% self.express_data.start_angle,
)

self.express_frame += 1
self.log(
"debug",
"process scan of frame %d with angle : "
"%f and angle new : %f"
% (
self.express_frame,
self.express_old_data.start_angle,
self.express_data.start_angle,
),
)
yield _process_express_scan(
self.express_old_data,
self.express_data.start_angle,
self.express_frame,
)

def iter_measurments(self, max_buf_meas=500):
"""For compatibility, this method wraps `iter_measurements`"""
Expand Down Expand Up @@ -414,3 +512,44 @@ def iter_scans(self, max_buf_meas=500, min_len=5):
scan = []
if quality > 0 and distance > 0:
scan.append((quality, angle, distance))


class ExpressPacket(express_packet):
"""Class representing a Express type Packet"""

sync1 = 0xA
sync2 = 0x5
sign = {0: 1, 1: -1}

@classmethod
def from_string(cls, data):
"""Decode and Instantiate the class from a string packet"""
packet = bytearray(data)

if (packet[0] >> 4) != cls.sync1 or (packet[1] >> 4) != cls.sync2:
raise ValueError("try to parse corrupted data ({})".format(packet))

checksum = 0
for b in packet[2:]:
checksum ^= b
if checksum != (packet[0] & 0b00001111) + ((packet[1] & 0b00001111) << 4):
raise ValueError("Invalid checksum ({})".format(packet))

new_scan = packet[3] >> 7
start_angle = (packet[2] + ((packet[3] & 0b01111111) << 8)) / 64

d = a = ()
for i in range(0, 80, 5):
d += ((packet[i + 4] >> 2) + (packet[i + 5] << 6),)
a += (
((packet[i + 8] & 0b00001111) + ((packet[i + 4] & 0b00000001) << 4))
/ 8
* cls.sign[(packet[i + 4] & 0b00000010) >> 1],
)
d += ((packet[i + 6] >> 2) + (packet[i + 7] << 6),)
a += (
((packet[i + 8] >> 4) + ((packet[i + 6] & 0b00000001) << 4))
/ 8
* cls.sign[(packet[i + 6] & 0b00000010) >> 1],
)
return cls(d, a, new_scan, start_angle)
Loading