Skip to content

Commit 98c37c5

Browse files
committed
Add simple Packet Buffer test examples
1 parent d48acb0 commit 98c37c5

File tree

3 files changed

+146
-0
lines changed

3 files changed

+146
-0
lines changed

examples/ble_packet_buffer_client.py

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
# SPDX-FileCopyrightText: 2022 Scott Shawcroft for Adafruit Industries
2+
# SPDX-License-Identifier: MIT
3+
4+
"""
5+
Used with ble_packet_buffer_test.py. Transmits "echo" to
6+
PacketBufferService and receives it back.
7+
"""
8+
9+
import time
10+
11+
from ble_packet_buffer_service import PacketBufferService
12+
13+
from adafruit_ble import BLERadio
14+
from adafruit_ble.advertising.standard import ProvideServicesAdvertisement
15+
16+
ble = BLERadio()
17+
buf = bytearray(512)
18+
while True:
19+
while ble.connected and any(
20+
PacketBufferService in connection for connection in ble.connections
21+
):
22+
for connection in ble.connections:
23+
if PacketBufferService not in connection:
24+
continue
25+
print("echo")
26+
pb = connection[PacketBufferService]
27+
pb.write(b"echo")
28+
# Returns 0 if nothing was read.
29+
packet_len = pb.readinto(buf)
30+
if packet_len > 0:
31+
print(buf[:packet_len])
32+
print()
33+
time.sleep(1)
34+
35+
print("disconnected, scanning")
36+
for advertisement in ble.start_scan(ProvideServicesAdvertisement, timeout=1):
37+
if PacketBufferService not in advertisement.services:
38+
continue
39+
ble.connect(advertisement)
40+
print("connected")
41+
break
42+
ble.stop_scan()

examples/ble_packet_buffer_service.py

Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
# SPDX-FileCopyrightText: 2022 Scott Shawcroft for Adafruit Industries
2+
# SPDX-License-Identifier: MIT
3+
4+
"""
5+
Used with ble_packet_buffer_*.py. Establishes a simple server to test
6+
PacketBuffer with.
7+
"""
8+
9+
import _bleio
10+
11+
from adafruit_ble.services import Service
12+
from adafruit_ble.characteristics import (
13+
Attribute,
14+
Characteristic,
15+
ComplexCharacteristic,
16+
)
17+
from adafruit_ble.uuid import VendorUUID
18+
19+
20+
class PacketBufferUUID(VendorUUID):
21+
"""UUIDs with the PacketBuffer base UUID."""
22+
23+
# pylint: disable=too-few-public-methods
24+
25+
def __init__(self, uuid16):
26+
uuid128 = bytearray("reffuBtekcaP".encode("utf-8") + b"\x00\x00\xaf\xad")
27+
uuid128[-3] = uuid16 >> 8
28+
uuid128[-4] = uuid16 & 0xFF
29+
super().__init__(uuid128)
30+
31+
32+
class PacketBufferCharacteristic(ComplexCharacteristic):
33+
def __init__(
34+
self,
35+
*,
36+
uuid=None,
37+
buffer_size=4,
38+
properties=Characteristic.WRITE_NO_RESPONSE
39+
| Characteristic.NOTIFY
40+
| Characteristic.READ,
41+
read_perm=Attribute.OPEN,
42+
write_perm=Attribute.OPEN
43+
):
44+
self.buffer_size = buffer_size
45+
super().__init__(
46+
uuid=uuid,
47+
properties=properties,
48+
read_perm=read_perm,
49+
write_perm=write_perm,
50+
max_length=512,
51+
fixed_length=False,
52+
)
53+
54+
def bind(self, service):
55+
"""Binds the characteristic to the given Service."""
56+
bound_characteristic = super().bind(service)
57+
return _bleio.PacketBuffer(
58+
bound_characteristic, buffer_size=self.buffer_size, max_packet_size=512
59+
)
60+
61+
62+
class PacketBufferService(Service):
63+
"""Test service that has one packet buffer"""
64+
65+
uuid = PacketBufferUUID(0x0001)
66+
67+
packets = PacketBufferCharacteristic(uuid=PacketBufferUUID(0x0101))
68+
69+
def readinto(self, buf):
70+
return self.packets.readinto(buf)
71+
72+
def write(self, buf, *, header=None):
73+
return self.packets.write(buf, header=header)

examples/ble_packet_buffer_test.py

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
# SPDX-FileCopyrightText: 2020 ladyada for Adafruit Industries
2+
# SPDX-License-Identifier: MIT
3+
4+
"""
5+
Can be used with ble_packet_buffer_client.py. Receives packets from the
6+
PacketBufferService and transmits them back.
7+
"""
8+
9+
from packet_buffer_service import PacketBufferService
10+
11+
from adafruit_ble import BLERadio
12+
from adafruit_ble.advertising.standard import ProvideServicesAdvertisement
13+
14+
15+
ble = BLERadio()
16+
pbs = PacketBufferService()
17+
advertisement = ProvideServicesAdvertisement(pbs)
18+
19+
buf = bytearray(512)
20+
21+
while True:
22+
ble.start_advertising(advertisement)
23+
while not ble.connected:
24+
pass
25+
while ble.connected:
26+
# Returns b'' if nothing was read.
27+
packet_len = pbs.readinto(buf)
28+
if packet_len > 0:
29+
packet = buf[:packet_len]
30+
print(packet)
31+
pbs.write(packet)

0 commit comments

Comments
 (0)