|
| 1 | +# SPDX-FileCopyrightText: 2024 J Fletcher |
| 2 | +# |
| 3 | +# SPDX-License-Identifier: MIT |
| 4 | + |
| 5 | +# SIMPLE ASYNCIO EVENT EXAMPLE |
| 6 | + |
| 7 | +# Brief program that illustrates using Events to coordinate tasks |
| 8 | +# within Asyncio programs. The present example involves only one led |
| 9 | +# and one event, but Asyncio allows a high degree of scaling. Adding |
| 10 | +# several copies of the functions 'blink', 'input_poll', or 'state' |
| 11 | +# should be straightforward with changes to names and objects. |
| 12 | + |
| 13 | +import asyncio |
| 14 | +import board |
| 15 | +import digitalio |
| 16 | +from adafruit_debouncer import Debouncer |
| 17 | +import neopixel |
| 18 | + |
| 19 | + |
| 20 | +# Import library modules, as is tradition |
| 21 | + |
| 22 | +pin = digitalio.DigitalInOut(board.BUTTON) |
| 23 | +pin.direction = digitalio.Direction.INPUT |
| 24 | +pin.pull = digitalio.Pull.UP |
| 25 | +button = Debouncer(pin) |
| 26 | + |
| 27 | +# Instantiate the input, in this case, the 'BOOT' button on a |
| 28 | +# QT Py 2040. The debouncer ensures a clean hit. |
| 29 | + |
| 30 | +BLANK = (0, 0, 0, 0) |
| 31 | +RED = (255, 0, 0) |
| 32 | +GREEN = (0, 255, 0) |
| 33 | +BLUE = (0, 0, 255) |
| 34 | + |
| 35 | +COLORS = {0: BLANK, 1: RED, 2: GREEN, 3: BLUE} |
| 36 | + |
| 37 | +# Define the various colors according to preference and set them into |
| 38 | +# a dictionary for later retrieval. (Blue is not used in this code.) |
| 39 | + |
| 40 | + |
| 41 | +class Color: |
| 42 | + # pylint: disable=too-few-public-methods |
| 43 | + def __init__(self, initial_value): |
| 44 | + self.value = initial_value |
| 45 | + |
| 46 | + |
| 47 | +# Create a class to hold and track the color while code executes. |
| 48 | + |
| 49 | + |
| 50 | +async def blink(color): |
| 51 | + with neopixel.NeoPixel(board.NEOPIXEL, 1, brightness=0.1) as led: |
| 52 | + while True: |
| 53 | + led[0] = COLORS.get(0) |
| 54 | + await asyncio.sleep(1) |
| 55 | + led[0] = COLORS.get(color.value) |
| 56 | + await asyncio.sleep(0) |
| 57 | + |
| 58 | + |
| 59 | +# Instantiate the led using 'with ... as' construction to keep this |
| 60 | +# function from blocking. 'COLORS.get(0)' indicates the led should show |
| 61 | +# no color (i.e., turn off), while 'COLORS.get(color.value)' instructs |
| 62 | +# the led to show the color pulled from the dictionary via the color |
| 63 | +# class' color.value. The line 'asyncio.sleep(1)' sets the blink rate; |
| 64 | +# in this case, once per second. |
| 65 | + |
| 66 | + |
| 67 | +async def input_poll(swapper): |
| 68 | + count = 0 |
| 69 | + while True: |
| 70 | + button.update() |
| 71 | + if button.fell: |
| 72 | + print("Press!") |
| 73 | + if count == 0: |
| 74 | + count += 1 |
| 75 | + print("Event is set!") |
| 76 | + swapper.set() |
| 77 | + elif count == 1: |
| 78 | + count -= 1 |
| 79 | + print("Event is clear!") |
| 80 | + swapper.clear() |
| 81 | + await asyncio.sleep(0) |
| 82 | + |
| 83 | + |
| 84 | +# This function checks the button for activity and sets or clears the |
| 85 | +# Event depending on the button activity reflected in the 'count' variable. |
| 86 | +# The count begins set at 0 and is alternatingly incremented (count += 1) |
| 87 | +# and decremented (count -= 1) with each press of the button. |
| 88 | + |
| 89 | + |
| 90 | +async def state(swapper, color): |
| 91 | + while True: |
| 92 | + if swapper.is_set(): |
| 93 | + color.value = 2 |
| 94 | + else: |
| 95 | + color.value = 1 |
| 96 | + await asyncio.sleep(0) |
| 97 | + |
| 98 | + |
| 99 | +async def main(): |
| 100 | + color = Color(1) |
| 101 | + COLORS.get(color) |
| 102 | + |
| 103 | + # Sets the color the led will first show on start |
| 104 | + |
| 105 | + swapper = asyncio.Event() |
| 106 | + |
| 107 | + # Creates and names the Event that signals the led to change color |
| 108 | + |
| 109 | + blinky = asyncio.create_task(blink(color)) |
| 110 | + poll = asyncio.create_task(input_poll(swapper)) |
| 111 | + monitor = asyncio.create_task(state(swapper, color)) |
| 112 | + |
| 113 | + # Creates and names Tasks from the functions defined above |
| 114 | + |
| 115 | + await asyncio.gather(monitor, blinky, poll) |
| 116 | + |
| 117 | + |
| 118 | +# Don't forget the 'await'! The 'asyncio.gather()' command passes the |
| 119 | +# listed tasks to the asynchronous scheduler, where processing resources |
| 120 | +# are directed from one task to another depending upon whether said task |
| 121 | +# has signalled its ability to 'give up' control by reaching the 'await |
| 122 | +# asyncio.sleep()' line. |
| 123 | + |
| 124 | +asyncio.run(main()) |
0 commit comments