31
31
32
32
import struct
33
33
34
- class AdvertisingData :
35
- """Build up a BLE advertising data packet."""
34
+ class AdvertisingPacket :
35
+ """Build up a BLE advertising data or scan response packet."""
36
36
# BR/EDR flags not included here, since we don't support BR/EDR.
37
37
FLAG_LIMITED_DISCOVERY = 0x01
38
38
"""Discoverable only for a limited time period."""
@@ -53,7 +53,7 @@ class AdvertisingData:
53
53
"""Complete list of 128 bit service UUIDs."""
54
54
SHORT_LOCAL_NAME = 0x08
55
55
"""Short local device name (shortened to fit)."""
56
- COMPLETE_LOCALNAME = 0x09
56
+ COMPLETE_LOCAL_NAME = 0x09
57
57
"""Complete local device name."""
58
58
TX_POWER = 0x0A
59
59
"""Transmit power level"""
@@ -81,32 +81,151 @@ class AdvertisingData:
81
81
MAX_DATA_SIZE = 31
82
82
"""Data size in a regular BLE packet."""
83
83
84
- def __init__ (self , flags = (FLAG_GENERAL_DISCOVERY | FLAG_LE_ONLY ), max_length = MAX_DATA_SIZE ):
85
- """Initalize an advertising packet, with the given flags, no larger than max_length."""
86
- self .data = bytearray ((2 , self .FLAGS , flags ))
84
+ def __init__ (self , data = None , * , max_length = MAX_DATA_SIZE ):
85
+ """Create an advertising packet, no larger than max_length.
86
+
87
+ :param buf data: if not supplied (None), create an empty packet
88
+ if supplied, create a packet with supplied data. This is usually used
89
+ to parse an existing packet.
90
+ :param int max_length: maximum length of packet
91
+ """
92
+ self ._packet_bytes = bytearray (data ) if data else bytearray ()
87
93
self ._max_length = max_length
88
94
self ._check_length ()
89
95
96
+ @property
97
+ def packet_bytes (self ):
98
+ """The raw packet bytes."""
99
+ return self ._packet_bytes
100
+
101
+ @packet_bytes .setter
102
+ def packet_bytes (self , value ):
103
+ self ._packet_bytes = value
104
+
105
+ def __getitem__ (self , element_type ):
106
+ """Return the bytes stored in the advertising packet for the given element type.
107
+
108
+ :param int element_type: An integer designating an advertising element type.
109
+ A number of types are defined in `AdvertisingPacket`,
110
+ such as `AdvertisingPacket.TX_POWER`.
111
+ :returns: bytes that are the value for the given element type.
112
+ If the element type is not present in the packet, raise KeyError.
113
+ """
114
+ i = 0
115
+ adv_bytes = self .packet_bytes
116
+ while i < len (adv_bytes ):
117
+ item_length = adv_bytes [i ]
118
+ if element_type != adv_bytes [i + 1 ]:
119
+ # Type doesn't match: skip to next item.
120
+ i += item_length + 1
121
+ else :
122
+ return adv_bytes [i + 2 :i + 1 + item_length ]
123
+ raise KeyError
124
+
125
+ def get (self , element_type , default = None ):
126
+ """Return the bytes stored in the advertising packet for the given element type,
127
+ returning the default value if not found.
128
+ """
129
+ try :
130
+ return self .__getitem__ (element_type )
131
+ except KeyError :
132
+ return default
133
+
134
+ @property
135
+ def bytes_remaining (self ):
136
+ """Number of bytes still available for use in the packet."""
137
+ return self ._max_length - len (self ._packet_bytes )
138
+
90
139
def _check_length (self ):
91
- if len (self .data ) > self ._max_length :
92
- raise IndexError ("Advertising data exceeds max_length " )
140
+ if len (self ._packet_bytes ) > self ._max_length :
141
+ raise IndexError ("Advertising data too long " )
93
142
94
143
def add_field (self , field_type , field_data ):
95
144
"""Append an advertising data field to the current packet, of the given type.
96
145
The length field is calculated from the length of field_data."""
97
- self .data .append (1 + len (field_data ))
98
- self .data .append (field_type )
99
- self .data .extend (field_data )
146
+ self ._packet_bytes .append (1 + len (field_data ))
147
+ self ._packet_bytes .append (field_type )
148
+ self ._packet_bytes .extend (field_data )
100
149
self ._check_length ()
101
150
151
+ def add_flags (self , flags = (FLAG_GENERAL_DISCOVERY | FLAG_LE_ONLY )):
152
+ """Add default or custom advertising flags."""
153
+ self .add_field (self .FLAGS , struct .pack ("<B" , flags ))
154
+
102
155
def add_16_bit_uuids (self , uuids ):
103
156
"""Add a complete list of 16 bit service UUIDs."""
104
- self .add_field (self .ALL_16_BIT_SERVICE_UUIDS , bytes (uuid .uuid16 for uuid in uuids ))
157
+ for uuid in uuids :
158
+ self .add_field (self .ALL_16_BIT_SERVICE_UUIDS , struct .pack ("<H" , uuid .uuid16 ))
105
159
106
160
def add_128_bit_uuids (self , uuids ):
107
161
"""Add a complete list of 128 bit service UUIDs."""
108
- self .add_field (self .ALL_128_BIT_SERVICE_UUIDS , bytes (uuid .uuid128 for uuid in uuids ))
162
+ for uuid in uuids :
163
+ self .add_field (self .ALL_128_BIT_SERVICE_UUIDS , uuid .uuid128 )
109
164
110
165
def add_mfr_specific_data (self , mfr_id , data ):
111
166
"""Add manufacturer-specific data bytes."""
112
167
self .add_field (self .MANUFACTURER_SPECIFIC_DATA , struct .pack ('<H' , mfr_id ) + data )
168
+
169
+
170
+ class ServerAdvertisement :
171
+ """
172
+ Data to advertise a peripheral's services.
173
+
174
+ The advertisement consists of an advertising data packet and an optional scan response packet,
175
+ The scan response packet is created only if there is not room in the
176
+ advertising data packet for the complete peripheral name.
177
+
178
+ :param peripheral Peripheral the Peripheral to advertise. Use its services and name
179
+ :param int tx_power: transmit power in dBm at 0 meters (8 bit signed value). Default 0 dBm
180
+ """
181
+
182
+ def __init__ (self , peripheral , * , tx_power = 0 ):
183
+ self ._peripheral = peripheral
184
+
185
+ packet = AdvertisingPacket ()
186
+ packet .add_flags ()
187
+ self ._scan_response_packet = None
188
+
189
+ # Need to check service.secondary
190
+ uuids_16_bits = [service .uuid for service in peripheral .services
191
+ if service .uuid .size == 16 and not service .secondary ]
192
+ if uuids_16_bits :
193
+ packet .add_16_bit_uuids (uuids_16_bits )
194
+
195
+ uuids_128_bits = [service .uuid for service in peripheral .services
196
+ if service .uuid .size == 128 and not service .secondary ]
197
+ if uuids_128_bits :
198
+ packet .add_128_bit_uuids (uuids_128_bits )
199
+
200
+ packet .add_field (AdvertisingPacket .TX_POWER , struct .pack ("<b" , tx_power ))
201
+
202
+ # 2 bytes needed for field length and type.
203
+ bytes_available = packet .bytes_remaining - 2
204
+ if bytes_available <= 0 :
205
+ raise IndexError ("No room for name" )
206
+
207
+ name_bytes = bytes (peripheral .name , 'utf-8' )
208
+ if bytes_available >= len (name_bytes ):
209
+ packet .add_field (AdvertisingPacket .COMPLETE_LOCAL_NAME , name_bytes )
210
+ else :
211
+ packet .add_field (AdvertisingPacket .SHORT_LOCAL_NAME , name_bytes [:bytes_available ])
212
+ self ._scan_response_packet = AdvertisingPacket ()
213
+ try :
214
+ self ._scan_response_packet .add_field (AdvertisingPacket .COMPLETE_LOCAL_NAME ,
215
+ name_bytes )
216
+ except IndexError :
217
+ raise IndexError ("Name too long" )
218
+
219
+ self ._advertising_data_packet = packet
220
+
221
+ @property
222
+ def advertising_data_bytes (self ):
223
+ """The raw bytes for the initial advertising data packet."""
224
+ return self ._advertising_data_packet .packet_bytes
225
+
226
+ @property
227
+ def scan_response_bytes (self ):
228
+ """The raw bytes for the scan response packet. None if there is no response packet."""
229
+ if self ._scan_response_packet :
230
+ return self ._scan_response_packet .packet_bytes
231
+ return None
0 commit comments