1
+ # SPDX-FileCopyrightText: 2024 J Fletcher
2
+ #
3
+ # SPDX-License-Identifier: MIT
4
+
1
5
# SIMPLE ASYNCIO EVENT EXAMPLE
2
6
3
7
# Brief program that illustrates using Events to coordinate tasks
33
37
# Define the various colors according to preference and set them into
34
38
# a dictionary for later retrieval. (Blue is not used in this code.)
35
39
40
+
36
41
class Color :
37
42
# pylint: disable=too-few-public-methods
38
43
def __init__ (self , initial_value ):
39
44
self .value = initial_value
40
45
46
+
41
47
# Create a class to hold and track the color while code executes.
42
48
49
+
43
50
async def blink (color ):
44
51
with neopixel .NeoPixel (board .NEOPIXEL , 1 , brightness = 0.1 ) as led :
45
52
while True :
@@ -48,34 +55,38 @@ async def blink(color):
48
55
led [0 ] = COLORS .get (color .value )
49
56
await asyncio .sleep (0 )
50
57
58
+
51
59
# Instantiate the led using 'with ... as' construction to keep this
52
60
# function from blocking. 'COLORS.get(0)' indicates the led should show
53
61
# no color (i.e., turn off), while 'COLORS.get(color.value)' instructs
54
62
# the led to show the color pulled from the dictionary via the color
55
63
# class' color.value. The line 'asyncio.sleep(1)' sets the blink rate;
56
64
# in this case, once per second.
57
65
66
+
58
67
async def input_poll (swapper ):
59
68
count = 0
60
69
while True :
61
70
button .update ()
62
71
if button .fell :
63
- print (' Press!' )
72
+ print (" Press!" )
64
73
if count == 0 :
65
74
count += 1
66
- print (' Event is set!' )
75
+ print (" Event is set!" )
67
76
swapper .set ()
68
77
elif count == 1 :
69
78
count -= 1
70
- print (' Event is clear!' )
79
+ print (" Event is clear!" )
71
80
swapper .clear ()
72
81
await asyncio .sleep (0 )
73
82
83
+
74
84
# This function checks the button for activity and sets or clears the
75
85
# Event depending on the button activity reflected in the 'count' variable.
76
86
# The count begins set at 0 and is alternatingly incremented (count += 1)
77
87
# and decremented (count -= 1) with each press of the button.
78
88
89
+
79
90
async def state (swapper , color ):
80
91
while True :
81
92
if swapper .is_set ():
@@ -84,25 +95,26 @@ async def state(swapper, color):
84
95
color .value = 1
85
96
await asyncio .sleep (0 )
86
97
87
- async def main ():
88
98
99
+ async def main ():
89
100
color = Color (1 )
90
101
COLORS .get (color )
91
102
92
- # Sets the color the led will first show on start
103
+ # Sets the color the led will first show on start
93
104
94
105
swapper = asyncio .Event ()
95
106
96
- # Creates and names the Event that signals the led to change color
107
+ # Creates and names the Event that signals the led to change color
97
108
98
109
blinky = asyncio .create_task (blink (color ))
99
110
poll = asyncio .create_task (input_poll (swapper ))
100
111
monitor = asyncio .create_task (state (swapper , color ))
101
112
102
- # Creates and names Tasks from the functions defined above
113
+ # Creates and names Tasks from the functions defined above
103
114
104
115
await asyncio .gather (monitor , blinky , poll )
105
116
117
+
106
118
# Don't forget the 'await'! The 'asyncio.gather()' command passes the
107
119
# listed tasks to the asynchronous scheduler, where processing resources
108
120
# are directed from one task to another depending upon whether said task
0 commit comments