Skip to content

Commit 8bb2232

Browse files
authored
Merge pull request #16 from dhalbert/pairing
Pairing
2 parents f38c77c + ab44563 commit 8bb2232

File tree

5 files changed

+528
-55
lines changed

5 files changed

+528
-55
lines changed

adafruit_ble/advertising.py

Lines changed: 94 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,10 @@ class AdvertisingPacket:
5151
"""Incomplete list of 128 bit service UUIDs."""
5252
ALL_128_BIT_SERVICE_UUIDS = 0x07
5353
"""Complete list of 128 bit service UUIDs."""
54+
SOLICITED_16_BIT_SERVICE_UUIDS = 0x14
55+
"""List of 16 bit service UUIDs solicited by a peripheral."""
56+
SOLICITED_128_BIT_SERVICE_UUIDS = 0x15
57+
"""List of 128 bit service UUIDs solicited by a peripheral."""
5458
SHORT_LOCAL_NAME = 0x08
5559
"""Short local device name (shortened to fit)."""
5660
COMPLETE_LOCAL_NAME = 0x09
@@ -131,101 +135,142 @@ def get(self, element_type, default=None):
131135
except KeyError:
132136
return default
133137

138+
@property
139+
def length(self):
140+
"""Current number of bytes in packet."""
141+
return len(self._packet_bytes)
142+
134143
@property
135144
def bytes_remaining(self):
136145
"""Number of bytes still available for use in the packet."""
137-
return self._max_length - len(self._packet_bytes)
146+
return self._max_length - self.length
138147

139148
def _check_length(self):
140-
if len(self._packet_bytes) > self._max_length:
149+
if self.length > self._max_length:
141150
raise IndexError("Advertising data too long")
142151

152+
def add_flags(self, flags=(FLAG_GENERAL_DISCOVERY | FLAG_LE_ONLY)):
153+
"""Add advertising flags."""
154+
self.add_field(self.FLAGS, struct.pack("<B", flags))
155+
143156
def add_field(self, field_type, field_data):
144-
"""Append an advertising data field to the current packet, of the given type.
157+
"""Append byte data to the current packet, of the given type.
145158
The length field is calculated from the length of field_data."""
146159
self._packet_bytes.append(1 + len(field_data))
147160
self._packet_bytes.append(field_type)
148161
self._packet_bytes.extend(field_data)
149162
self._check_length()
150163

151-
def add_flags(self, flags=(FLAG_GENERAL_DISCOVERY | FLAG_LE_ONLY)):
152-
"""Add default or custom advertising flags."""
153-
self.add_field(self.FLAGS, struct.pack("<B", flags))
154-
155-
def add_16_bit_uuids(self, uuids):
156-
"""Add a complete list of 16 bit service UUIDs."""
157-
for uuid in uuids:
158-
self.add_field(self.ALL_16_BIT_SERVICE_UUIDS, struct.pack("<H", uuid.uuid16))
159-
160-
def add_128_bit_uuids(self, uuids):
161-
"""Add a complete list of 128 bit service UUIDs."""
162-
for uuid in uuids:
163-
self.add_field(self.ALL_128_BIT_SERVICE_UUIDS, uuid.uuid128)
164-
165164
def add_mfr_specific_data(self, mfr_id, data):
166165
"""Add manufacturer-specific data bytes."""
167166
self.add_field(self.MANUFACTURER_SPECIFIC_DATA, struct.pack('<H', mfr_id) + data)
168167

168+
def add_tx_power(self, tx_power):
169+
"""Add transmit power value."""
170+
self.add_field(AdvertisingPacket.TX_POWER, struct.pack("<b", tx_power))
169171

170-
class ServerAdvertisement:
171-
"""
172-
Data to advertise a peripheral's services.
172+
def add_appearance(self, appearance):
173+
"""Add BLE Appearance value."""
174+
self.add_field(AdvertisingPacket.APPEARANCE, struct.pack("<H", appearance))
173175

174-
The advertisement consists of an advertising data packet and an optional scan response packet,
175-
The scan response packet is created only if there is not room in the
176-
advertising data packet for the complete peripheral name.
177176

178-
:param peripheral Peripheral the Peripheral to advertise. Use its services and name
179-
:param int tx_power: transmit power in dBm at 0 meters (8 bit signed value). Default 0 dBm
180-
"""
181-
182-
def __init__(self, peripheral, *, tx_power=0):
183-
self._peripheral = peripheral
177+
class Advertisement:
178+
"""Superclass for common code to construct a BLE advertisement,
179+
consisting of an advertising data packet and an optional scan response packet.
184180
185-
packet = AdvertisingPacket()
186-
packet.add_flags()
181+
:param int flags: advertising flags. Default is general discovery, and BLE only (not classic)
182+
"""
183+
def __init__(self, flags=None, tx_power=None):
184+
self._packet = AdvertisingPacket()
187185
self._scan_response_packet = None
186+
if flags:
187+
self._packet.add_flags(flags)
188+
else:
189+
self._packet.add_flags()
188190

189-
# Need to check service.secondary
190-
uuids_16_bits = [service.uuid for service in peripheral.services
191-
if service.uuid.size == 16 and not service.secondary]
192-
if uuids_16_bits:
193-
packet.add_16_bit_uuids(uuids_16_bits)
194-
195-
uuids_128_bits = [service.uuid for service in peripheral.services
196-
if service.uuid.size == 128 and not service.secondary]
197-
if uuids_128_bits:
198-
packet.add_128_bit_uuids(uuids_128_bits)
199-
200-
packet.add_field(AdvertisingPacket.TX_POWER, struct.pack("<b", tx_power))
191+
if tx_power is not None:
192+
self._packet.add_tx_power(tx_power)
201193

194+
def add_name(self, name):
195+
"""Add name to advertisement. If it doesn't fit, add truncated name to packet,
196+
and add complete name to scan response packet.
197+
"""
202198
# 2 bytes needed for field length and type.
203-
bytes_available = packet.bytes_remaining - 2
199+
bytes_available = self._packet.bytes_remaining - 2
204200
if bytes_available <= 0:
205201
raise IndexError("No room for name")
206202

207-
name_bytes = bytes(peripheral.name, 'utf-8')
203+
name_bytes = bytes(name, 'utf-8')
208204
if bytes_available >= len(name_bytes):
209-
packet.add_field(AdvertisingPacket.COMPLETE_LOCAL_NAME, name_bytes)
205+
self._packet.add_field(AdvertisingPacket.COMPLETE_LOCAL_NAME, name_bytes)
210206
else:
211-
packet.add_field(AdvertisingPacket.SHORT_LOCAL_NAME, name_bytes[:bytes_available])
207+
self._packet.add_field(AdvertisingPacket.SHORT_LOCAL_NAME, name_bytes[:bytes_available])
212208
self._scan_response_packet = AdvertisingPacket()
213209
try:
214210
self._scan_response_packet.add_field(AdvertisingPacket.COMPLETE_LOCAL_NAME,
215211
name_bytes)
216212
except IndexError:
217213
raise IndexError("Name too long")
218214

219-
self._advertising_data_packet = packet
215+
def add_uuids(self, uuids, field_type_16_bit_uuids, field_type_128_bit_uuids):
216+
"""Add 16-bit and 128-bit uuids to the packet, using the given field types."""
217+
concatenated_16_bit_uuids = b''.join(
218+
struct.pack("<H", uuid.uuid16) for uuid in uuids if uuid.size == 16)
219+
if concatenated_16_bit_uuids:
220+
self._packet.add_field(field_type_16_bit_uuids, concatenated_16_bit_uuids)
221+
222+
uuids_128_bits = [uuid for uuid in uuids if uuid.size == 128]
223+
if len(uuids_128_bits) > 1:
224+
raise ValueError("Only one 128 bit UUID will fit")
225+
if uuids_128_bits:
226+
self._packet.add_field(field_type_128_bit_uuids, uuids_128_bits[0].uuid128)
220227

221228
@property
222229
def advertising_data_bytes(self):
223230
"""The raw bytes for the initial advertising data packet."""
224-
return self._advertising_data_packet.packet_bytes
231+
return self._packet.packet_bytes
225232

226233
@property
227234
def scan_response_bytes(self):
228235
"""The raw bytes for the scan response packet. None if there is no response packet."""
229236
if self._scan_response_packet:
230237
return self._scan_response_packet.packet_bytes
231238
return None
239+
240+
241+
class ServerAdvertisement(Advertisement):
242+
"""Build an advertisement for a peripheral's services.
243+
244+
There is room in the packet for only one 128-bit UUID. Giving UUIDs in the scan response
245+
is not yet implemented.
246+
247+
:param Peripheral peripheral: the Peripheral to advertise. Use its services and name.
248+
:param int tx_power: transmit power in dBm at 0 meters (8 bit signed value). Default 0 dBm
249+
"""
250+
251+
def __init__(self, peripheral, *, tx_power=0):
252+
super().__init__()
253+
uuids = [service.uuid for service in peripheral.services if not service.secondary]
254+
self.add_uuids(uuids,
255+
AdvertisingPacket.ALL_16_BIT_SERVICE_UUIDS,
256+
AdvertisingPacket.ALL_128_BIT_SERVICE_UUIDS)
257+
self.add_name(peripheral.name)
258+
259+
260+
class SolicitationAdvertisement(Advertisement):
261+
"""Build an advertisement for a peripheral to solicit one or more services from a central.
262+
263+
There is room in the packet for only one 128-bit UUID. Giving UUIDs in the scan response
264+
is not yet implemented.
265+
266+
:param string name: Name to use in advertisement.
267+
:param iterable service_uuids: One or more services requested from a central
268+
:param int tx_power: transmit power in dBm at 0 meters (8 bit signed value). Default 0 dBm.
269+
"""
270+
271+
def __init__(self, name, service_uuids, *, tx_power=0):
272+
super().__init__()
273+
self.add_uuids(service_uuids,
274+
AdvertisingPacket.SOLICITED_16_BIT_SERVICE_UUIDS,
275+
AdvertisingPacket.SOLICITED_128_BIT_SERVICE_UUIDS)
276+
self.add_name(name)

adafruit_ble/current_time_client.py

Lines changed: 148 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,148 @@
1+
# The MIT License (MIT)
2+
#
3+
# Copyright (c) 2019 Dan Halbert for Adafruit Industries
4+
#
5+
# Permission is hereby granted, free of charge, to any person obtaining a copy
6+
# of this software and associated documentation files (the "Software"), to deal
7+
# in the Software without restriction, including without limitation the rights
8+
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
9+
# copies of the Software, and to permit persons to whom the Software is
10+
# furnished to do so, subject to the following conditions:
11+
#
12+
# The above copyright notice and this permission notice shall be included in
13+
# all copies or substantial portions of the Software.
14+
#
15+
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16+
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17+
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18+
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19+
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
20+
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
21+
# THE SOFTWARE.
22+
"""
23+
`adafruit_ble.current_time_client`
24+
====================================================
25+
26+
Connect to a Current Time Service, as a peripheral.
27+
28+
* Author(s): Dan Halbert for Adafruit Industries
29+
30+
"""
31+
import struct
32+
import time
33+
34+
from bleio import Peripheral, UUID
35+
from .advertising import SolicitationAdvertisement
36+
37+
class CurrentTimeClient:
38+
"""
39+
Set up a peripheral that solicits centrals for Current Time Service.
40+
41+
:param str name: Name to advertise for server. If None, use default Advertisement name.
42+
43+
Example::
44+
45+
from adafruit_ble.current_time_client import CurrentTimeClient
46+
import time
47+
48+
cts_client = CurrentTimeClient()
49+
cts_client.start_advertising()
50+
while not cts_client.connected:
51+
pass
52+
# The first time a property is read, the client
53+
# will do discovery and pairing.
54+
while True:
55+
print(cts_client.current_time)
56+
time.sleep(5)
57+
58+
To try the example above, open Settings->Bluetooth on your iOS device.
59+
After the program starts advertising, ``CIRCUITPYxxxx` will show up as a Bluetooth
60+
device for possible connection. Tap it, and then accept the pairing request.
61+
Then the time should print.
62+
"""
63+
64+
CTS_UUID = UUID(0x1805)
65+
CURRENT_TIME_UUID = UUID(0x2A2B)
66+
LOCAL_TIME_INFORMATION_UUID = UUID(0x2A0F)
67+
68+
def __init__(self, name=None, tx_power=0):
69+
self._periph = Peripheral(name=name)
70+
self._advertisement = SolicitationAdvertisement(self._periph.name,
71+
(self.CTS_UUID,), tx_power=tx_power)
72+
self._current_time_char = self._local_time_char = None
73+
74+
75+
def start_advertising(self):
76+
"""Start advertising to solicit a central that supports Current Time Service."""
77+
self._periph.start_advertising(self._advertisement.advertising_data_bytes,
78+
scan_response=self._advertisement.scan_response_bytes)
79+
80+
def stop_advertising(self):
81+
"""Stop advertising the service."""
82+
self._periph.stop_advertising()
83+
84+
@property
85+
def connected(self):
86+
"""True if a central connected to this peripheral."""
87+
return self._periph.connected
88+
89+
def disconnect(self):
90+
"""Disconnect from central."""
91+
self._periph.disconnect()
92+
93+
def _check_connected(self):
94+
if not self.connected:
95+
raise OSError("Not connected")
96+
# Do discovery and pairing if not already done.
97+
if not self._current_time_char:
98+
self._discover()
99+
self._periph.pair()
100+
101+
def _discover(self):
102+
"""Discover service information."""
103+
services = self._periph.discover_remote_services((self.CTS_UUID,))
104+
if not services:
105+
raise OSError("Unable to discover service")
106+
for characteristic in services[0].characteristics:
107+
if characteristic.uuid == self.CURRENT_TIME_UUID:
108+
self._current_time_char = characteristic
109+
elif characteristic.uuid == self.LOCAL_TIME_INFORMATION_UUID:
110+
self._local_time_char = characteristic
111+
if not self._current_time_char or not self._local_time_char:
112+
raise OSError("Remote service missing needed characteristic")
113+
114+
@property
115+
def current_time(self):
116+
"""Get a tuple describing the current time from the server:
117+
(year, month, day, hour, minute, second, weekday, subsecond, adjust_reason)
118+
"""
119+
self._check_connected()
120+
if self._current_time_char:
121+
# year, month, day, hour, minute, second, weekday, subsecond, adjust_reason
122+
values = struct.unpack('<HBBBBBBBB', self._current_time_char.value)
123+
return values
124+
else:
125+
raise OSError("Characteristic not discovered")
126+
127+
128+
@property
129+
def local_time_information(self):
130+
"""Get a tuple of location information from the server:
131+
(timezone, dst_offset)
132+
"""
133+
self._check_connected()
134+
if self._local_time_char:
135+
# timezone, dst_offset
136+
values = struct.unpack('<bB', self._local_time_char.value)
137+
return values
138+
else:
139+
raise OSError("Characteristic not discovered")
140+
141+
@property
142+
def struct_time(self):
143+
"""Return the current time as a `time.struct_time` Day of year and whether DST is in effect
144+
is not available from Current Time Service, so these are set to -1.
145+
"""
146+
_, month, day, hour, minute, second, weekday, _, _ = self.current_time
147+
# Bluetooth weekdays count from 1. struct_time counts from 0.
148+
return time.struct_time((month, day, hour, minute, second, weekday - 1, -1))

0 commit comments

Comments
 (0)