23
23
24
24
"""
25
25
26
+ try :
27
+ from typing import Optional , List
28
+ from typing_extensions import Literal
29
+ from circuitpython_typing import WriteableBuffer , ReadableBuffer
30
+ from microcontroller import Pin
31
+ except ImportError :
32
+ pass
33
+
26
34
# imports
27
35
from time import monotonic
28
36
from digitalio import DigitalInOut
37
45
class _BitBangIO :
38
46
"""Base class for subclassing only"""
39
47
40
- def __init__ (self ):
48
+ def __init__ (self ) -> None :
41
49
self ._locked = False
42
50
43
- def try_lock (self ):
51
+ def try_lock (self ) -> bool :
44
52
"""Attempt to grab the lock. Return True on success, False if the lock is already taken."""
45
53
if self ._locked :
46
54
return False
47
55
self ._locked = True
48
56
return True
49
57
50
- def unlock (self ):
58
+ def unlock (self ) -> None :
51
59
"""Release the lock so others may use the resource."""
52
60
if self ._locked :
53
61
self ._locked = False
54
62
else :
55
63
raise ValueError ("Not locked" )
56
64
57
- def _check_lock (self ):
65
+ def _check_lock (self ) -> Literal [ True ] :
58
66
if not self ._locked :
59
67
raise RuntimeError ("First call try_lock()" )
60
68
return True
61
69
62
70
def __enter__ (self ):
63
71
return self
64
72
65
- def __exit__ (self , exc_type , exc_value , traceback ):
73
+ def __exit__ (self , exc_type , exc_value , traceback ) -> None :
66
74
self .deinit ()
67
75
68
76
# pylint: disable=no-self-use
69
- def deinit (self ):
77
+ def deinit (self ) -> None :
70
78
"""Free any hardware used by the object."""
71
79
return
72
80
@@ -76,7 +84,9 @@ def deinit(self):
76
84
class I2C (_BitBangIO ):
77
85
"""Software-based implementation of the I2C protocol over GPIO pins."""
78
86
79
- def __init__ (self , scl , sda , * , frequency = 400000 , timeout = 1 ):
87
+ def __init__ (
88
+ self , scl : Pin , sda : Pin , * , frequency : int = 400000 , timeout : float = 1
89
+ ) -> None :
80
90
"""Initialize bitbang (or software) based I2C. Must provide the I2C
81
91
clock, and data pin numbers.
82
92
"""
@@ -95,17 +105,17 @@ def __init__(self, scl, sda, *, frequency=400000, timeout=1):
95
105
self ._delay = (1 / frequency ) / 2 # half period
96
106
self ._timeout = timeout
97
107
98
- def deinit (self ):
108
+ def deinit (self ) -> None :
99
109
"""Free any hardware used by the object."""
100
110
self ._sda .deinit ()
101
111
self ._scl .deinit ()
102
112
103
- def _wait (self ):
113
+ def _wait (self ) -> None :
104
114
end = monotonic () + self ._delay # half period
105
115
while end > monotonic ():
106
116
pass
107
117
108
- def scan (self ):
118
+ def scan (self ) -> List [ int ] :
109
119
"""Perform an I2C Device Scan"""
110
120
found = []
111
121
if self ._check_lock ():
@@ -114,14 +124,28 @@ def scan(self):
114
124
found .append (address )
115
125
return found
116
126
117
- def writeto (self , address , buffer , * , start = 0 , end = None ):
127
+ def writeto (
128
+ self ,
129
+ address : int ,
130
+ buffer : ReadableBuffer ,
131
+ * ,
132
+ start : int = 0 ,
133
+ end : Optional [int ] = None ,
134
+ ) -> None :
118
135
"""Write data from the buffer to an address"""
119
136
if end is None :
120
137
end = len (buffer )
121
138
if self ._check_lock ():
122
139
self ._write (address , buffer [start :end ], True )
123
140
124
- def readfrom_into (self , address , buffer , * , start = 0 , end = None ):
141
+ def readfrom_into (
142
+ self ,
143
+ address : int ,
144
+ buffer : WriteableBuffer ,
145
+ * ,
146
+ start : int = 0 ,
147
+ end : Optional [int ] = None ,
148
+ ) -> None :
125
149
"""Read data from an address and into the buffer"""
126
150
if end is None :
127
151
end = len (buffer )
@@ -133,15 +157,15 @@ def readfrom_into(self, address, buffer, *, start=0, end=None):
133
157
134
158
def writeto_then_readfrom (
135
159
self ,
136
- address ,
137
- buffer_out ,
138
- buffer_in ,
160
+ address : int ,
161
+ buffer_out : ReadableBuffer ,
162
+ buffer_in : WriteableBuffer ,
139
163
* ,
140
- out_start = 0 ,
141
- out_end = None ,
142
- in_start = 0 ,
143
- in_end = None
144
- ):
164
+ out_start : int = 0 ,
165
+ out_end : Optional [ int ] = None ,
166
+ in_start : int = 0 ,
167
+ in_end : Optional [ int ] = None ,
168
+ ) -> None :
145
169
"""Write data from buffer_out to an address and then
146
170
read data from an address and into buffer_in
147
171
"""
@@ -153,30 +177,30 @@ def writeto_then_readfrom(
153
177
self ._write (address , buffer_out [out_start :out_end ], False )
154
178
self .readfrom_into (address , buffer_in , start = in_start , end = in_end )
155
179
156
- def _scl_low (self ):
180
+ def _scl_low (self ) -> None :
157
181
self ._scl .switch_to_output (value = False )
158
182
159
- def _sda_low (self ):
183
+ def _sda_low (self ) -> None :
160
184
self ._sda .switch_to_output (value = False )
161
185
162
- def _scl_release (self ):
186
+ def _scl_release (self ) -> None :
163
187
"""Release and let the pullups lift"""
164
188
# Use self._timeout to add clock stretching
165
189
self ._scl .switch_to_input ()
166
190
167
- def _sda_release (self ):
191
+ def _sda_release (self ) -> None :
168
192
"""Release and let the pullups lift"""
169
193
# Use self._timeout to add clock stretching
170
194
self ._sda .switch_to_input ()
171
195
172
- def _start (self ):
196
+ def _start (self ) -> None :
173
197
self ._sda_release ()
174
198
self ._scl_release ()
175
199
self ._wait ()
176
200
self ._sda_low ()
177
201
self ._wait ()
178
202
179
- def _stop (self ):
203
+ def _stop (self ) -> None :
180
204
self ._scl_low ()
181
205
self ._wait ()
182
206
self ._sda_low ()
@@ -186,7 +210,7 @@ def _stop(self):
186
210
self ._sda_release ()
187
211
self ._wait ()
188
212
189
- def _repeated_start (self ):
213
+ def _repeated_start (self ) -> None :
190
214
self ._scl_low ()
191
215
self ._wait ()
192
216
self ._sda_release ()
@@ -196,7 +220,7 @@ def _repeated_start(self):
196
220
self ._sda_low ()
197
221
self ._wait ()
198
222
199
- def _write_byte (self , byte ) :
223
+ def _write_byte (self , byte : int ) -> bool :
200
224
for bit_position in range (8 ):
201
225
self ._scl_low ()
202
226
@@ -222,7 +246,7 @@ def _write_byte(self, byte):
222
246
223
247
return not ack
224
248
225
- def _read_byte (self , ack = False ):
249
+ def _read_byte (self , ack : bool = False ) -> int :
226
250
self ._scl_low ()
227
251
self ._wait ()
228
252
# sda will already be an input as we are simulating open drain
@@ -246,25 +270,27 @@ def _read_byte(self, ack=False):
246
270
247
271
return data & 0xFF
248
272
249
- def _probe (self , address ) :
273
+ def _probe (self , address : int ) -> bool :
250
274
self ._start ()
251
275
ok = self ._write_byte (address << 1 )
252
276
self ._stop ()
253
277
return ok > 0
254
278
255
- def _write (self , address , buffer , transmit_stop ) :
279
+ def _write (self , address : int , buffer : ReadableBuffer , transmit_stop : bool ) -> None :
256
280
self ._start ()
257
281
if not self ._write_byte (address << 1 ):
258
- raise RuntimeError ("Device not responding at 0x{:02X}" .format (address ))
282
+ # raise RuntimeError("Device not responding at 0x{:02X}".format(address))
283
+ raise RuntimeError (f"Device not responding at 0x{ address :02X} " )
259
284
for byte in buffer :
260
285
self ._write_byte (byte )
261
286
if transmit_stop :
262
287
self ._stop ()
263
288
264
- def _read (self , address , length ) :
289
+ def _read (self , address : int , length : int ) -> bytearray :
265
290
self ._start ()
266
291
if not self ._write_byte (address << 1 | 1 ):
267
- raise RuntimeError ("Device not responding at 0x{:02X}" .format (address ))
292
+ # raise RuntimeError("Device not responding at 0x{:02X}".format(address))
293
+ raise RuntimeError (f"Device not responding at 0x{ address :02X} " )
268
294
buffer = bytearray (length )
269
295
for byte_position in range (length ):
270
296
buffer [byte_position ] = self ._read_byte (ack = (byte_position != length - 1 ))
@@ -275,7 +301,9 @@ def _read(self, address, length):
275
301
class SPI (_BitBangIO ):
276
302
"""Software-based implementation of the SPI protocol over GPIO pins."""
277
303
278
- def __init__ (self , clock , MOSI = None , MISO = None ):
304
+ def __init__ (
305
+ self , clock : Pin , MOSI : Optional [Pin ] = None , MISO : Optional [Pin ] = None
306
+ ) -> None :
279
307
"""Initialize bit bang (or software) based SPI. Must provide the SPI
280
308
clock, and optionally MOSI and MISO pin numbers. If MOSI is set to None
281
309
then writes will be disabled and fail with an error, likewise for MISO
@@ -304,15 +332,22 @@ def __init__(self, clock, MOSI=None, MISO=None):
304
332
self ._miso = DigitalInOut (MISO )
305
333
self ._miso .switch_to_input ()
306
334
307
- def deinit (self ):
335
+ def deinit (self ) -> None :
308
336
"""Free any hardware used by the object."""
309
337
self ._sclk .deinit ()
310
338
if self ._miso :
311
339
self ._miso .deinit ()
312
340
if self ._mosi :
313
341
self ._mosi .deinit ()
314
342
315
- def configure (self , * , baudrate = 100000 , polarity = 0 , phase = 0 , bits = 8 ):
343
+ def configure (
344
+ self ,
345
+ * ,
346
+ baudrate : int = 100000 ,
347
+ polarity : Literal [0 , 1 ] = 0 ,
348
+ phase : Literal [0 , 1 ] = 0 ,
349
+ bits : int = 8 ,
350
+ ) -> None :
316
351
"""Configures the SPI bus. Only valid when locked."""
317
352
if self ._check_lock ():
318
353
if not isinstance (baudrate , int ):
@@ -331,13 +366,15 @@ def configure(self, *, baudrate=100000, polarity=0, phase=0, bits=8):
331
366
self ._bits = bits
332
367
self ._half_period = (1 / self ._baudrate ) / 2 # 50% Duty Cyle delay
333
368
334
- def _wait (self , start = None ):
369
+ def _wait (self , start : Optional [ int ] = None ) -> float :
335
370
"""Wait for up to one half cycle"""
336
371
while (start + self ._half_period ) > monotonic ():
337
372
pass
338
373
return monotonic () # Return current time
339
374
340
- def write (self , buffer , start = 0 , end = None ):
375
+ def write (
376
+ self , buffer : ReadableBuffer , start : int = 0 , end : Optional [int ] = None
377
+ ) -> None :
341
378
"""Write the data contained in buf. Requires the SPI being locked.
342
379
If the buffer is empty, nothing happens.
343
380
"""
@@ -369,7 +406,13 @@ def write(self, buffer, start=0, end=None):
369
406
self ._sclk .value = self ._polarity
370
407
371
408
# pylint: disable=too-many-branches
372
- def readinto (self , buffer , start = 0 , end = None , write_value = 0 ):
409
+ def readinto (
410
+ self ,
411
+ buffer : WriteableBuffer ,
412
+ start : int = 0 ,
413
+ end : Optional [int ] = None ,
414
+ write_value : int = 0 ,
415
+ ) -> None :
373
416
"""Read into the buffer specified by buf while writing zeroes. Requires the SPI being
374
417
locked. If the number of bytes to read is 0, nothing happens.
375
418
"""
@@ -417,14 +460,14 @@ def readinto(self, buffer, start=0, end=None, write_value=0):
417
460
418
461
def write_readinto (
419
462
self ,
420
- buffer_out ,
421
- buffer_in ,
463
+ buffer_out : ReadableBuffer ,
464
+ buffer_in : WriteableBuffer ,
422
465
* ,
423
- out_start = 0 ,
424
- out_end = None ,
425
- in_start = 0 ,
426
- in_end = None
427
- ):
466
+ out_start : int = 0 ,
467
+ out_end : Optional [ int ] = None ,
468
+ in_start : int = 0 ,
469
+ in_end : Optional [ int ] = None ,
470
+ ) -> None :
428
471
"""Write out the data in buffer_out while simultaneously reading data into buffer_in.
429
472
The lengths of the slices defined by buffer_out[out_start:out_end] and
430
473
buffer_in[in_start:in_end] must be equal. If buffer slice lengths are
@@ -482,6 +525,6 @@ def write_readinto(
482
525
# pylint: enable=too-many-branches
483
526
484
527
@property
485
- def frequency (self ):
528
+ def frequency (self ) -> int :
486
529
"""Return the currently configured baud rate"""
487
530
return self ._baudrate
0 commit comments