diff --git a/CLUE_Rock_Paper_Scissors/advanced/clue-multi-rpsgame.py b/CLUE_Rock_Paper_Scissors/advanced/clue-multi-rpsgame.py new file mode 100644 index 000000000..8ece18d97 --- /dev/null +++ b/CLUE_Rock_Paper_Scissors/advanced/clue-multi-rpsgame.py @@ -0,0 +1,567 @@ +# clue-multi-rpsgame v1.20 +# CircuitPython massively multiplayer rock paper scissors game over Bluetooth LE + +# Tested with CLUE and Circuit Playground Bluefruit Alpha with TFT Gizmo +# using CircuitPython and 5.3.0 + +# copy this file to CLUE/CPB board as code.py + +# MIT License + +# Copyright (c) 2020 Kevin J. Walters + +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: + +# The above copyright notice and this permission notice shall be included in all +# copies or substantial portions of the Software. + +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +# SOFTWARE. + + +import gc +import os +import random + +from micropython import const +import board +import digitalio + +import neopixel +from adafruit_ble import BLERadio + +# These imports works on CLUE, CPB (and CPX on 5.x) +try: + from audioio import AudioOut +except ImportError: + from audiopwmio import PWMAudioOut as AudioOut + +# RPS module files +from rps_advertisements import JoinGameAdvertisement, \ + RpsEncDataAdvertisement, \ + RpsKeyDataAdvertisement, \ + RpsRoundEndAdvertisement +from rps_audio import SampleJukebox +from rps_comms import broadcastAndReceive, addrToText, MIN_AD_INTERVAL +from rps_crypto import bytesPad, strUnpad, generateOTPadKey, \ + enlargeKey, encrypt, decrypt +from rps_display import RPSDisplay, blankScreen + + +# Look for our name in secrets.py file if present +ble_name = None +try: + from secrets import secrets + ble_name = secrets.get("rps_name") + if ble_name is None: + ble_name = secrets.get("ble_name") + if ble_name is None: + print("INFO: No rps_name or ble_name entry found in secrets dict") +except ImportError: + pass # File is optional, reaching here is not a program error + + +debug = 1 + +def d_print(level, *args, **kwargs): + """A simple conditional print for debugging based on global debug level.""" + if not isinstance(level, int): + print(level, *args, **kwargs) + elif debug >= level: + print(*args, **kwargs) + + +def tftGizmoPresent(): + """Determine if the TFT Gizmo is attached. + The TFT's Gizmo circuitry for backlight features a 10k pull-down resistor. + This attempts to verify the presence of the pull-down to determine + if TFT Gizmo is present. + This is likely to get confused if anything else is connected to pad A3. + Only use this on Circuit Playground Express (CPX) + or Circuit Playground Bluefruit (CPB) boards.""" + present = True + try: + with digitalio.DigitalInOut(board.A3) as backlight_pin: + backlight_pin.pull = digitalio.Pull.UP + present = not backlight_pin.value + except ValueError: + # The Gizmo is already initialised, i.e. showing console output + pass + + return present + + +# Assuming CLUE if it's not a Circuit Playround (Bluefruit) +clue_less = "Circuit Playground" in os.uname().machine + +# Note: difference in pull-up and pull-down +# and logical not use for buttons +if clue_less: + # CPB with TFT Gizmo (240x240) + # from adafruit_circuitplayground import cp # Avoiding to save memory + + # Outputs + if tftGizmoPresent(): + from adafruit_gizmo import tft_gizmo + display = tft_gizmo.TFT_Gizmo() + JG_RX_COL = 0x0000ff + BUTTON_Y_POS = 120 + else: + display = None + JG_RX_COL = 0x000030 # dimmer blue for upward facing CPB NeoPixels + BUTTON_Y_POS = None + + audio_out = AudioOut(board.SPEAKER) + #pixels = cp.pixels + pixels = neopixel.NeoPixel(board.NEOPIXEL, 10) + + # Enable the onboard amplifier for speaker + #cp._speaker_enable.value = True # pylint: disable=protected-access + speaker_enable = digitalio.DigitalInOut(board.SPEAKER_ENABLE) + speaker_enable.switch_to_output(value=False) + speaker_enable.value = True + + # Inputs + # buttons reversed if it is used upside-down with Gizmo + _button_a = digitalio.DigitalInOut(board.BUTTON_A) + _button_a.switch_to_input(pull=digitalio.Pull.DOWN) + _button_b = digitalio.DigitalInOut(board.BUTTON_B) + _button_b.switch_to_input(pull=digitalio.Pull.DOWN) + if display is None: + def button_left(): + return _button_a.value + def button_right(): + return _button_b.value + else: + def button_left(): + return _button_b.value + def button_right(): + return _button_a.value + +else: + # CLUE with builtin screen (240x240) + # from adafruit_clue import clue # Avoiding to save memory + + # Outputs + display = board.DISPLAY + audio_out = AudioOut(board.SPEAKER) + #pixels = clue.pixel + pixels = neopixel.NeoPixel(board.NEOPIXEL, 1) + JG_RX_COL = 0x0000ff + BUTTON_Y_POS = 152 + + # Inputs + _button_a = digitalio.DigitalInOut(board.BUTTON_A) + _button_a.switch_to_input(pull=digitalio.Pull.UP) + _button_b = digitalio.DigitalInOut(board.BUTTON_B) + _button_b.switch_to_input(pull=digitalio.Pull.UP) + def button_left(): + return not _button_a.value + def button_right(): + return not _button_b.value + + +blankScreen(display, pixels) + +# Set to True for blue flashing when devices are joining the playing group +JG_FLASH = False + +IMAGE_DIR = "rps/images" +AUDIO_DIR = "rps/audio" + +audio_files = (("searching", "welcome-to", "arena", "ready") + + ("rock", "paper", "scissors") + + ("start-tx", "end-tx", "txing") + + ("rock-scissors", "paper-rock", "scissors-paper") + + ("you-win", "draw", "you-lose", "error") + + ("humiliation", "excellent")) + +gc.collect() +d_print(2, "GC before SJ", gc.mem_free()) +sample = SampleJukebox(audio_out, audio_files, + directory=AUDIO_DIR) +del audio_files # not needed anymore +gc.collect() +d_print(2, "GC after SJ", gc.mem_free()) + +# A lookup table in Dict form for win/lose, each value is a sample name +# Does not need to cater for draw (tie) condition +WAV_VICTORY_NAME = { "rp": "paper-rock", + "pr": "paper-rock", + "ps": "scissors-paper", + "sp": "scissors-paper", + "sr": "rock-scissors", + "rs": "rock-scissors"} + +# This limit is based on displaying names on screen with scale=2 font +MAX_PLAYERS = 8 +# Some code is dependent on these being lower-case +CHOICES = ("rock", "paper", "scissors") + +rps_display = RPSDisplay(display, pixels, + CHOICES, sample, WAV_VICTORY_NAME, + MAX_PLAYERS, BUTTON_Y_POS, + IMAGE_DIR + "/rps-sprites-ind4.bmp", + ble_color=JG_RX_COL) + +# Transmit maximum times in seconds +JG_MSG_TIME_S = 20 +FIRST_MSG_TIME_S = 12 +STD_MSG_TIME_S = 4 +LAST_ACK_TIME_S = 1.5 + + +# Intro screen with audio +rps_display.introductionScreen() + +# Enable the Bluetooth LE radio and set player's name (from secrets.py) +ble = BLERadio() +if ble_name is not None: + ble.name = ble_name + + +game_no = 1 +round_no = 1 +wins = losses = draws = voids = 0 + +# TOTAL_ROUNDS = 5 +TOTAL_ROUNDS = 3 + +CRYPTO_ALGO = "chacha20" +KEY_SIZE = 8 # in bytes +KEY_ENLARGE = 256 // KEY_SIZE // 8 + +# Scoring values +POINTS_WIN = 2 +POINTS_DRAW = 1 + +WIN = const(1) +DRAW = const(2) # AKA tie +LOSE = const(3) +INVALID = const(4) + +def evaluateRound(mine, yours): + """Determine who won the round in this game based on the two strings mine and yours. + Returns WIN, DRAW, LOSE or INVALID for bad input.""" + # Return INVALID if any input is None + try: + mine_lc = mine.lower() + yours_lc = yours.lower() + except AttributeError: + return INVALID + + if mine_lc not in CHOICES or yours_lc not in CHOICES: + return INVALID + + # Both inputs are valid choices if we got this far + # pylint: disable=too-many-boolean-expressions + if mine_lc == yours_lc: + return DRAW + elif (mine_lc == "rock" and yours_lc == "scissors" + or mine_lc == "paper" and yours_lc == "rock" + or mine_lc == "scissors" and yours_lc == "paper"): + return WIN + + return LOSE + + +rps_display.playerListScreen() + +def addPlayer(name, addr_text, address, ad): + # pylint: disable=unused-argument + # address is part of call back + """Add the player name and mac address to players global variable + and the name and rssi (if present) to on-screen list.""" + + rssi = ad.rssi if ad else None + + players.append((name, addr_text)) + rps_display.addPlayer(name, rssi=rssi) + + +# Make a list of all the player's (name, mac address as text) +# where both are strings with this player as first entry +players = [] +my_name = ble.name +rps_display.fadeUpDown("down") +addPlayer(my_name, addrToText(ble.address_bytes), None, None) + + +# These two functions mainly serve to adapt the call back arguments +# to the called functions which do not use them +def jgAdCallbackFlashBLE(_a, _b, _c): + """Used in broadcastAndReceive to flash the NeoPixels + when advertising messages are received.""" + return rps_display.flashBLE() + +def jgEndscanCallback(_a, _b, _c): + """Used in broadcastAndReceive to allow early termination of the scanning + when the left button is pressed. + Button may need to be held down for a second.""" + return button_left() + +# Join Game +gc.collect() +d_print(2, "GC before JG", gc.mem_free()) + +sample.play("searching", loop=True) +rps_display.fadeUpDown("up") +jg_msg = JoinGameAdvertisement(game="RPS") +(_, _, _) = broadcastAndReceive(ble, + jg_msg, + scan_time=JG_MSG_TIME_S, + scan_response_request=True, + ad_cb=(jgAdCallbackFlashBLE + if JG_FLASH + else None), + endscan_cb=jgEndscanCallback, + name_cb=addPlayer) +del _ # To clean-up with GC below +sample.stop() +gc.collect() +d_print(2, "GC after JG", gc.mem_free()) + +# Wait for button release - this stops a long press +# being acted upon in the main loop further down +while button_left(): + pass + +scores = [0] * len(players) +num_other_players = len(players) - 1 + +# Set the advertising interval to the minimum for four or fewer players +# and above that extend value by players multiplied by 7ms +ad_interval = MIN_AD_INTERVAL if len(players) <= 4 else len(players) * 0.007 + +d_print(1, "PLAYERS", players) + +# Sequence numbers - real packets start range between 1-255 inclusive +seq_tx = [1] # The next number to send + +new_round_init = True + +# A nonce by definition must not be reused but here a random key is +# generated per round and this is used once per round so this is ok +static_nonce = bytes(range(12, 0, -1)) + +while True: + if round_no > TOTAL_ROUNDS: + print("Summary: ", + "wins {:d}, losses {:d}," + " draws {:d}, void {:d}\n\n".format(wins, losses, draws, voids)) + + rps_display.showGameResult(players, scores, rounds_tot=TOTAL_ROUNDS) + + # Reset variables for another game + round_no = 1 + wins = losses = draws = voids = 0 + scores = [0] * len(players) + game_no += 1 + + if new_round_init: + rps_display.showGameRound(game_no=game_no, round_no=round_no, rounds_tot=TOTAL_ROUNDS) + # Make a new initial random choice for the player and show it + my_choice_idx = random.randrange(len(CHOICES)) + rps_display.fadeUpDown("down") + rps_display.showChoice(my_choice_idx, + game_no=game_no, round_no=round_no, rounds_tot=TOTAL_ROUNDS, + won_sf=wins, drew_sf=draws, lost_sf=losses) + rps_display.fadeUpDown("up") + new_round_init = False + + if button_left(): + while button_left(): # Wait for button release + pass + my_choice_idx = (my_choice_idx + 1) % len(CHOICES) + rps_display.showChoice(my_choice_idx, + game_no=game_no, round_no=round_no, rounds_tot=TOTAL_ROUNDS, + won_sf=wins, drew_sf=draws, lost_sf=losses) + + if button_right(): + gc.collect() + d_print(2, "GC before comms", gc.mem_free()) + + # This sound cue is really for other players + sample.play("ready") + + my_choice = CHOICES[my_choice_idx] + player_choices = [my_choice] + + # Repeating key four times to make key for ChaCha20 + short_key = generateOTPadKey(KEY_SIZE) + key = enlargeKey(short_key, KEY_ENLARGE) + d_print(3, "KEY", key) + + plain_bytes = bytesPad(my_choice, size=8, pad=0) + cipher_bytes = encrypt(plain_bytes, key, CRYPTO_ALGO, + nonce=static_nonce) + enc_data_msg = RpsEncDataAdvertisement(enc_data=cipher_bytes, + round_no=round_no) + + # Wait for ready sound sample to stop playing + sample.wait() + sample.play("start-tx") + sample.wait() + sample.play("txing", loop=True) + # Players will not be synchronised at this point as they do not + # have to make their choices simultaneously - much longer 12 second + # time to accomodate this + _, enc_data_by_addr, _ = broadcastAndReceive(ble, + enc_data_msg, + RpsEncDataAdvertisement, + RpsKeyDataAdvertisement, + scan_time=FIRST_MSG_TIME_S, + ad_interval=ad_interval, + receive_n=num_other_players, + seq_tx=seq_tx) + + key_data_msg = RpsKeyDataAdvertisement(key_data=short_key, round_no=round_no) + # All of the programs will be loosely synchronised now + _, key_data_by_addr, _ = broadcastAndReceive(ble, + key_data_msg, + RpsEncDataAdvertisement, + RpsKeyDataAdvertisement, + RpsRoundEndAdvertisement, + scan_time=STD_MSG_TIME_S, + ad_interval=ad_interval, + receive_n=num_other_players, + seq_tx=seq_tx, + ads_by_addr=enc_data_by_addr) + del enc_data_by_addr + + # Play end transmit sound while doing next decrypt bit + sample.play("end-tx") + + re_msg = RpsRoundEndAdvertisement(round_no=round_no) + # The round end message is really about acknowledging receipt of + # the key_data_msg by sending a non-critical message with the ack + _, re_by_addr, _ = broadcastAndReceive(ble, + re_msg, + RpsEncDataAdvertisement, + RpsKeyDataAdvertisement, + RpsRoundEndAdvertisement, + scan_time=LAST_ACK_TIME_S, + ad_interval=ad_interval, + receive_n=num_other_players, + seq_tx=seq_tx, + ads_by_addr=key_data_by_addr) + del key_data_by_addr, _ # To allow GC + + # This will have accumulated all the messages for this round + allmsg_by_addr = re_by_addr + del re_by_addr + + # Decrypt results + # If any data is incorrect the opponent_choice is left as None + for p_idx1 in range(1, len(players)): + print("DECRYPT GC", p_idx1, gc.mem_free()) + opponent_name = players[p_idx1][0] + opponent_macaddr = players[p_idx1][1] + opponent_choice = None + opponent_msgs = allmsg_by_addr.get(opponent_macaddr) + if opponent_msgs is None: + opponent_msgs = [] + cipher_ad = cipher_bytes = cipher_round = None + key_ad = key_bytes = key_round = None + # There should be either one or two messges per type + # two occurs when there + for msg_idx in range(len(opponent_msgs)): + if (cipher_ad is None + and isinstance(opponent_msgs[msg_idx][0], + RpsEncDataAdvertisement)): + cipher_ad = opponent_msgs[msg_idx][0] + cipher_bytes = cipher_ad.enc_data + cipher_round = cipher_ad.round_no + elif (key_ad is None + and isinstance(opponent_msgs[msg_idx][0], + RpsKeyDataAdvertisement)): + key_ad = opponent_msgs[msg_idx][0] + key_bytes = key_ad.key_data + key_round = key_ad.round_no + + if cipher_ad and key_ad: + if round_no == cipher_round == key_round: + key = enlargeKey(key_bytes, KEY_ENLARGE) + plain_bytes = decrypt(cipher_bytes, key, CRYPTO_ALGO, + nonce=static_nonce) + opponent_choice = strUnpad(plain_bytes) + else: + print("Received wrong round for {:d} {:d}: {:d} {:d}", + opponent_name, round_no, cipher_round, key_round) + else: + print("Missing packets: RpsEncDataAdvertisement " + "and RpsKeyDataAdvertisement:", cipher_ad, key_ad) + player_choices.append(opponent_choice) + + # Free up some memory by deleting any data that's no longer needed + del allmsg_by_addr + gc.collect() + d_print(2, "GC after comms", gc.mem_free()) + + sample.wait() # Ensure end-tx has completed + + # Chalk up wins and losses - checks this player but also has to + # check other players against each other to calculate all the + # scores for the high score table at the end of game + for p_idx0, (p0_name, _) in enumerate(players[:len(players) - 1]): + for p_idx1, (p1_name, _) in enumerate(players[p_idx0 + 1:], p_idx0 + 1): + # evaluateRound takes text strings for RPS + result = evaluateRound(player_choices[p_idx0], + player_choices[p_idx1]) + + # this_player is used to control incrementing the summary + # for the tally for this local player + this_player = 0 + void = False + if p_idx0 == 0: + this_player = 1 + p0_ch_idx = None + p1_ch_idx = None + try: + p0_ch_idx = CHOICES.index(player_choices[p_idx0]) + p1_ch_idx = CHOICES.index(player_choices[p_idx1]) + except ValueError: + void = True # Ensure this is marked void + print("ERROR", "failed to decode", + player_choices[p_idx0], player_choices[p_idx1]) + + # showPlayerVPlayer takes int index values for RPS + rps_display.showPlayerVPlayer(p0_name, p1_name, p_idx1, + p0_ch_idx, p1_ch_idx, + result == WIN, + result == DRAW, + result == INVALID or void) + + if result == INVALID or void: + voids += this_player + elif result == DRAW: + draws += this_player + scores[p_idx0] += POINTS_DRAW + scores[p_idx1] += POINTS_DRAW + elif result == WIN: + wins += this_player + scores[p_idx0] += POINTS_WIN + else: + losses += this_player + scores[p_idx1] += POINTS_WIN + + d_print(2, + p0_name, player_choices[p_idx0], "vs", + p1_name, player_choices[p_idx1], + "result", result) + + print("Game {:d}, round {:d}, wins {:d}, losses {:d}, draws {:d}, " + "void {:d}".format(game_no, round_no, wins, losses, draws, voids)) + + round_no += 1 + new_round_init = True diff --git a/CLUE_Rock_Paper_Scissors/advanced/rps/audio/arena.wav b/CLUE_Rock_Paper_Scissors/advanced/rps/audio/arena.wav new file mode 100644 index 000000000..7be9d6b31 Binary files /dev/null and b/CLUE_Rock_Paper_Scissors/advanced/rps/audio/arena.wav differ diff --git a/CLUE_Rock_Paper_Scissors/advanced/rps/audio/draw.wav b/CLUE_Rock_Paper_Scissors/advanced/rps/audio/draw.wav new file mode 100644 index 000000000..b3c6a66cd Binary files /dev/null and b/CLUE_Rock_Paper_Scissors/advanced/rps/audio/draw.wav differ diff --git a/CLUE_Rock_Paper_Scissors/advanced/rps/audio/end-tx.wav b/CLUE_Rock_Paper_Scissors/advanced/rps/audio/end-tx.wav new file mode 100644 index 000000000..5e4c66055 Binary files /dev/null and b/CLUE_Rock_Paper_Scissors/advanced/rps/audio/end-tx.wav differ diff --git a/CLUE_Rock_Paper_Scissors/advanced/rps/audio/error.wav b/CLUE_Rock_Paper_Scissors/advanced/rps/audio/error.wav new file mode 100644 index 000000000..6912c2421 Binary files /dev/null and b/CLUE_Rock_Paper_Scissors/advanced/rps/audio/error.wav differ diff --git a/CLUE_Rock_Paper_Scissors/advanced/rps/audio/excellent.wav b/CLUE_Rock_Paper_Scissors/advanced/rps/audio/excellent.wav new file mode 100644 index 000000000..d0bcd18dc Binary files /dev/null and b/CLUE_Rock_Paper_Scissors/advanced/rps/audio/excellent.wav differ diff --git a/CLUE_Rock_Paper_Scissors/advanced/rps/audio/humiliation.wav b/CLUE_Rock_Paper_Scissors/advanced/rps/audio/humiliation.wav new file mode 100644 index 000000000..117c2a21f Binary files /dev/null and b/CLUE_Rock_Paper_Scissors/advanced/rps/audio/humiliation.wav differ diff --git a/CLUE_Rock_Paper_Scissors/advanced/rps/audio/paper-rock.wav b/CLUE_Rock_Paper_Scissors/advanced/rps/audio/paper-rock.wav new file mode 100644 index 000000000..77344139a Binary files /dev/null and b/CLUE_Rock_Paper_Scissors/advanced/rps/audio/paper-rock.wav differ diff --git a/CLUE_Rock_Paper_Scissors/advanced/rps/audio/paper.wav b/CLUE_Rock_Paper_Scissors/advanced/rps/audio/paper.wav new file mode 100644 index 000000000..cf18a8e71 Binary files /dev/null and b/CLUE_Rock_Paper_Scissors/advanced/rps/audio/paper.wav differ diff --git a/CLUE_Rock_Paper_Scissors/advanced/rps/audio/ready.wav b/CLUE_Rock_Paper_Scissors/advanced/rps/audio/ready.wav new file mode 100644 index 000000000..0518a6ac5 Binary files /dev/null and b/CLUE_Rock_Paper_Scissors/advanced/rps/audio/ready.wav differ diff --git a/CLUE_Rock_Paper_Scissors/advanced/rps/audio/rock-scissors.wav b/CLUE_Rock_Paper_Scissors/advanced/rps/audio/rock-scissors.wav new file mode 100644 index 000000000..10fe99dbb Binary files /dev/null and b/CLUE_Rock_Paper_Scissors/advanced/rps/audio/rock-scissors.wav differ diff --git a/CLUE_Rock_Paper_Scissors/advanced/rps/audio/rock.wav b/CLUE_Rock_Paper_Scissors/advanced/rps/audio/rock.wav new file mode 100644 index 000000000..b2c7a6dce Binary files /dev/null and b/CLUE_Rock_Paper_Scissors/advanced/rps/audio/rock.wav differ diff --git a/CLUE_Rock_Paper_Scissors/advanced/rps/audio/scissors-paper.wav b/CLUE_Rock_Paper_Scissors/advanced/rps/audio/scissors-paper.wav new file mode 100644 index 000000000..408543a0c Binary files /dev/null and b/CLUE_Rock_Paper_Scissors/advanced/rps/audio/scissors-paper.wav differ diff --git a/CLUE_Rock_Paper_Scissors/advanced/rps/audio/scissors.wav b/CLUE_Rock_Paper_Scissors/advanced/rps/audio/scissors.wav new file mode 100644 index 000000000..b33605bf8 Binary files /dev/null and b/CLUE_Rock_Paper_Scissors/advanced/rps/audio/scissors.wav differ diff --git a/CLUE_Rock_Paper_Scissors/advanced/rps/audio/searching.wav b/CLUE_Rock_Paper_Scissors/advanced/rps/audio/searching.wav new file mode 100644 index 000000000..ed73f22ab Binary files /dev/null and b/CLUE_Rock_Paper_Scissors/advanced/rps/audio/searching.wav differ diff --git a/CLUE_Rock_Paper_Scissors/advanced/rps/audio/start-tx.wav b/CLUE_Rock_Paper_Scissors/advanced/rps/audio/start-tx.wav new file mode 100644 index 000000000..ca2b7682b Binary files /dev/null and b/CLUE_Rock_Paper_Scissors/advanced/rps/audio/start-tx.wav differ diff --git a/CLUE_Rock_Paper_Scissors/advanced/rps/audio/txing.wav b/CLUE_Rock_Paper_Scissors/advanced/rps/audio/txing.wav new file mode 100644 index 000000000..6ea1fd987 Binary files /dev/null and b/CLUE_Rock_Paper_Scissors/advanced/rps/audio/txing.wav differ diff --git a/CLUE_Rock_Paper_Scissors/advanced/rps/audio/welcome-to.wav b/CLUE_Rock_Paper_Scissors/advanced/rps/audio/welcome-to.wav new file mode 100644 index 000000000..4ea00d8b6 Binary files /dev/null and b/CLUE_Rock_Paper_Scissors/advanced/rps/audio/welcome-to.wav differ diff --git a/CLUE_Rock_Paper_Scissors/advanced/rps/audio/you-lose.wav b/CLUE_Rock_Paper_Scissors/advanced/rps/audio/you-lose.wav new file mode 100644 index 000000000..f3e44da3e Binary files /dev/null and b/CLUE_Rock_Paper_Scissors/advanced/rps/audio/you-lose.wav differ diff --git a/CLUE_Rock_Paper_Scissors/advanced/rps/audio/you-win.wav b/CLUE_Rock_Paper_Scissors/advanced/rps/audio/you-win.wav new file mode 100644 index 000000000..0fe1b6b61 Binary files /dev/null and b/CLUE_Rock_Paper_Scissors/advanced/rps/audio/you-win.wav differ diff --git a/CLUE_Rock_Paper_Scissors/advanced/rps/images/rps-sprites-ind4.bmp b/CLUE_Rock_Paper_Scissors/advanced/rps/images/rps-sprites-ind4.bmp new file mode 100644 index 000000000..96643bf1b Binary files /dev/null and b/CLUE_Rock_Paper_Scissors/advanced/rps/images/rps-sprites-ind4.bmp differ diff --git a/CLUE_Rock_Paper_Scissors/advanced/rps_advertisements.py b/CLUE_Rock_Paper_Scissors/advanced/rps_advertisements.py new file mode 100644 index 000000000..c80c2afb0 --- /dev/null +++ b/CLUE_Rock_Paper_Scissors/advanced/rps_advertisements.py @@ -0,0 +1,240 @@ +# MIT License + +# Copyright (c) 2020 Kevin J. Walters + +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: + +# The above copyright notice and this permission notice shall be included in all +# copies or substantial portions of the Software. + +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +# SOFTWARE. + +import struct + +from adafruit_ble.advertising import Advertisement, LazyObjectField +from adafruit_ble.advertising.standard import ManufacturerData, ManufacturerDataField + +# These message should really include version numbers for the +# the protocol and a descriptor for the encryption type + +# From adafruit_ble.advertising +# 0xFF is "Manufacturer Specific Data" as per list of types in +# https://www.bluetooth.com/specifications/assigned-numbers/generic-access-profile/ +MANUFACTURING_DATA_ADT = 0xFF +ADAFRUIT_COMPANY_ID = 0x0822 + +# pylint: disable=line-too-long +# From https://github.com/adafruit/Adafruit_CircuitPython_BLE_BroadcastNet/blob/c6328d5c7edf8a99ff719c3b1798cb4111bab397/adafruit_ble_broadcastnet.py#L84-L85 +ADAFRUIT_SEQ_ID = 0x0003 + +# According to https://github.com/adafruit/Adafruit_CircuitPython_BLE/blob/master/adafruit_ble/advertising/adafruit.py +# 0xf000 (to 0xffff) is for range for Adafruit customers + +# These four are used as part of prefix matching +RPS_ENC_DATA_ID = 0xfe41 +RPS_KEY_DATA_ID = 0xfe42 +RPS_ROUND_ID = 0xfe43 +GM_JOIN_ID = 0xfe44 + +RPS_ACK_ID = 0xfe51 + +# Data formats for shared fields +_DATA_FMT_ROUND = "B" +_DATA_FMT_ACK = "B" +_SEQ_FMT = "B" + + +class RpsEncDataAdvertisement(Advertisement): + """An RPS (broadcast) message. + This sends the encrypted choice of the player. + This is not connectable and does not elicit a scan response + based on defaults in Advertisement parent class. + """ + flags = None + + _PREFIX_FMT = "= level: + print(*args, **kwargs) + + +def addrToText(mac_addr, big_endian=False, sep=""): + """Convert a mac_addr in bytes to text.""" + # Note use of reversed() - macs are returned in an unusual LSB order + # pylint: disable=superfluous-parens + return sep.join(["{:02x}".format(b) + for b in (mac_addr if big_endian else reversed(mac_addr))]) + + +def maxAck(acklist): + """Return the highest ack number from a contiguous run. + Returns 0 for an empty list.""" + + if len(acklist) == 0: + return 0 + elif len(acklist) == 1: + return acklist[0] + + ordered_acklist = sorted(acklist) + max_ack_sofar = ordered_acklist[0] + for ack in ordered_acklist[1:]: + if ack - max_ack_sofar > 1: + break + max_ack_sofar = ack + return max_ack_sofar + + +def startScan(radio, send_ad, send_advertising, + sequence_number, receive_n, + ss_rx_ad_classes, rx_ad_classes, + scan_time, ad_interval, + buffer_size, minimum_rssi, + match_locally, scan_response_request, + enable_ack, awaiting_allrx, awaiting_allacks, + ad_cb, name_cb, endscan_cb, + received_ads_by_addr, blenames_by_addr, + send_ad_rxs, acks): + # pylint: disable=too-many-locals,too-many-branches,too-many-statements + """Send an Advertisement send_ad and then wait for up to scan_time to + receive receive_n Advertisement packets from other devices. + If receive_n is 0 then wait for the remaining scan_time. + The callbacks can only be called when packets are received + so endscan_db has limited functionality. + This is called repeatedly by broadcastAndReceive. + """ + complete = False + + if send_advertising: + try: + radio.start_advertising(send_ad, interval=ad_interval) + except _bleio.BluetoothError: + pass # catch and ignore "Already advertising." + + # Timeout value is in seconds + # RSSI -100 is probably practical minimum, -128 would be 8bit signed min + # window and interval are 0.1 by default - same value means + # continuous scanning although actual BLE implementations do have a + # brief gaps in scanning + # The 1800 byte buffer_size is a quirky workaround for + # MemoryError: memory allocation failed, allocating 1784 bytes + # from CP's symbol table growing as the program executes + cls_send_ad = type(send_ad) + matching_ads = 0 + for adv_ss in radio.start_scan(*ss_rx_ad_classes, + minimum_rssi=minimum_rssi, + buffer_size=buffer_size, # default is 512, was 1536 + active=scan_response_request, + timeout=scan_time): + addr_text = addrToText(adv_ss.address.address_bytes) + + # Add name of the device to dict limiting + # this to devices of interest by checking received_ads_by_addr + # plus pass data to any callback function + if (addr_text not in blenames_by_addr + and addr_text in received_ads_by_addr): + name = adv_ss.complete_name # None indicates no value + if name: # This test ignores any empty strings too + blenames_by_addr[addr_text] = name + if name_cb is not None: + name_cb(name, addr_text, adv_ss.address, adv_ss) + + # If using application Advertisement type matching then + # check the Advertisement's prefix and continue for loop if it + # does not match + adv = None + d_print(5, "RXed RTA", match_locally, addr_text, repr(adv_ss)) + if match_locally: + adv_ss_as_bytes = adafruit_ble.advertising.standard.encode_data(adv_ss.data_dict) + for cls in rx_ad_classes: + prefix = cls.prefix + # This DOES NOT IMPLEMENT PROPER MATCHING + # proper matching would involve parsing prefix and then matching each + # resulting prefix against each dict entry from decode_data() + # starting at 1 skips over the message length value + if adv_ss_as_bytes[1:len(prefix)] == prefix[1:]: + adv = cls() + # Only populating fields in use + adv.data_dict = adafruit_ble.advertising.standard.decode_data(adv_ss_as_bytes) + adv.address = adv_ss.address + d_print(4, "RXed mm RTA", addr_text, adv) + break + + else: + if any(isinstance(adv_ss, cls) for cls in rx_ad_classes): + adv = adv_ss + + # Continue loop after an endscan callback if ad is not of interest + if adv is None: # this means adv was not in rx_ad_classes + if endscan_cb is not None and endscan_cb(addr_text, adv_ss.address, adv_ss): + complete = True + break + else: + continue + + # Must be a match if this is reached + matching_ads += 1 + if ad_cb is not None: + ad_cb(addr_text, adv.address, adv) + + # Look for an ack and record it in acks if not already there + if hasattr(adv, "ack") and isinstance(adv.ack, int): + d_print(4, "Found ack") + if addr_text not in acks: + acks[addr_text] = [adv.ack] + elif adv.ack not in acks[addr_text]: + acks[addr_text].append(adv.ack) + + if addr_text in received_ads_by_addr: + this_ad_b = bytes(adv) + for existing_ad in received_ads_by_addr[addr_text]: + if this_ad_b == existing_ad[1]: + break # already present + else: # Python's unusual for/break/else + received_ads_by_addr[addr_text].append((adv, bytes(adv))) + if isinstance(adv, cls_send_ad): + send_ad_rxs[addr_text] = True + else: + received_ads_by_addr[addr_text] = [(adv, bytes(adv))] + if isinstance(adv, cls_send_ad): + send_ad_rxs[addr_text] = True + + d_print(5, "send_ad_rxs", len(send_ad_rxs), "ack", len(acks)) + + if awaiting_allrx: + if receive_n > 0 and len(send_ad_rxs) == receive_n: + if enable_ack and sequence_number is not None: + awaiting_allrx = False + awaiting_allacks = True + if send_advertising: + radio.stop_advertising() + d_print(4, "old ack", send_ad.ack, "new ack", sequence_number) + send_ad.ack = sequence_number + if send_advertising: + radio.start_advertising(send_ad, interval=ad_interval) + d_print(3, "TXing with ack", send_ad, + "ack_count", len(acks)) + else: + complete = True + break # packets received but not sending ack nor waiting for acks + elif awaiting_allacks: + if len(acks) == receive_n: + ack_count = 0 + for addr_text, acks_for_addr in acks.items(): + if maxAck(acks_for_addr) >= sequence_number: + ack_count += 1 + if ack_count == receive_n: + complete = True + break # all acks received, can stop transmitting now + + if endscan_cb is not None: + if endscan_cb(addr_text, adv_ss.address, adv_ss): + complete = True + break + + return (complete, matching_ads, awaiting_allrx, awaiting_allacks) + + +def broadcastAndReceive(radio, + send_ad, + *receive_ads_types, + scan_time=DEF_SEND_TIME_S, + ad_interval=MIN_AD_INTERVAL, + buffer_size=1800, + minimum_rssi=-90, + receive_n=0, + seq_tx=None, + match_locally=False, + scan_response_request=False, + ad_cb=None, + ads_by_addr={}, + names_by_addr={}, + name_cb=None, + endscan_cb=None + ): + # pylint: disable=too-many-locals,too-many-branches,too-many-statements + """Send an Advertisement send_ad and then wait for up to scan_time to + receive receive_n Advertisement packets from other devices. + If receive_n is 0 then wait for the remaining scan_time. + Returns list of received Advertisements not necessarily in arrival order and + dictionary indexed by the compressed text representation of the address with a list + of tuples of (advertisement, bytes(advertisement)). + This MODIFIES send_ad by setting sequence_number and ack if those + properties are present. + This is likely to run for a fraction of second longer than scan_time. + The default scan_response_request of False should reduce traffic and + may reduce collisions. + The buffer_size of 1800 helps to prevent 1784 MemoryError + from dict enlargement including the interpreter's symbol table. + """ + + sequence_number = None + if seq_tx is not None and hasattr(send_ad, "sequence_number"): + sequence_number = seq_tx[0] + send_ad.sequence_number = sequence_number + seq_tx[0] += 1 + + # A dict to store unique Advertisement indexed by mac address + # as text string + cls_send_ad = type(send_ad) + received_ads_by_addr = dict(ads_by_addr) # Will not be a deep copy + if receive_ads_types: + rx_ad_classes = receive_ads_types + else: + rx_ad_classes = (cls_send_ad,) + + if match_locally: + ss_rx_ad_classes = (Advertisement,) + elif scan_response_request: + ss_rx_ad_classes = rx_ad_classes + (Advertisement,) + else: + ss_rx_ad_classes = rx_ad_classes + + blenames_by_addr = dict(names_by_addr) # Will not be a deep copy + + # Look for packets already received of the cls_send_ad class (type) + send_ad_rxs = {} + # And make a list of sequence numbers already acknowledged + acks = {} + for addr_text, adsnb_per_addr in received_ads_by_addr.items(): + if cls_send_ad in [type(andb[0]) for andb in adsnb_per_addr]: + send_ad_rxs[addr_text] = True + + # Pick out any Advertisements with an ack field with a value + acks_thisaddr = [adnb for adnb in adsnb_per_addr + if hasattr(adnb[0], "ack") + and isinstance(adnb[0].ack, int)] + + if acks_thisaddr: + seqs = [adnb[0].ack for adnb in acks_thisaddr] + acks[addr_text] = seqs + d_print(5, "Acks received for", addr_text, + "of", seqs, "in", acks_thisaddr) + + # Determine whether there is a second phase of sending acks + enable_ack = hasattr(send_ad, "ack") + # Set an initial ack for anything previously received + if enable_ack and acks: + send_ad.ack = max(max(li) for li in acks.values()) + awaiting_allacks = False + awaiting_allrx = True + + d_print(2, "TXing", send_ad, "interval", ad_interval) + matched_ads = 0 + complete = False + d_print(1, "Listening for", ss_rx_ad_classes) + start_ns = time.monotonic_ns() + target_end_ns = start_ns + round(scan_time * NS_IN_S) + advertising_duration = 0.0 + + scan_no = 0 + while not complete and time.monotonic_ns() < target_end_ns: + if endscan_cb is not None and endscan_cb(None, None, None): + break + scan_no += 1 + a_rand = random.random() + # Decide on whether to transmit Advertisement packets + # or not - this is a workaround for reception problems + if scan_no > 1 and a_rand < 0.4: + send_advertising = False + duration = 0.5 + 2.5 * a_rand # 50-150ms + else: + send_advertising = True + duration = 0.9 # 900ms + advertising_duration += duration + + # Lots of arguments passed here, would be safer as keyword args + (complete, ss_matched, + awaiting_allrx, + awaiting_allacks) = startScan(radio, send_ad, send_advertising, + sequence_number, receive_n, + ss_rx_ad_classes, rx_ad_classes, + duration, ad_interval, + buffer_size, minimum_rssi, + match_locally, scan_response_request, + enable_ack, awaiting_allrx, awaiting_allacks, + ad_cb, name_cb, endscan_cb, + received_ads_by_addr, blenames_by_addr, + send_ad_rxs, acks) + matched_ads += ss_matched + + if advertising_duration > 0.0: + radio.stop_advertising() + radio.stop_scan() + d_print(2, "Matched ads", matched_ads, "with scans", scan_no) + + end_send_ns = time.monotonic_ns() + d_print(4, "TXRX time", (end_send_ns - start_ns) / 1e9) + + # Make a single list of all the received adverts from the dict + received_ads = [] + for ads in received_ads_by_addr.values(): + # Pick out the first value, second value is just bytes() version + received_ads.extend([a[0] for a in ads]) + return (received_ads, received_ads_by_addr, blenames_by_addr) diff --git a/CLUE_Rock_Paper_Scissors/advanced/rps_crypto.py b/CLUE_Rock_Paper_Scissors/advanced/rps_crypto.py new file mode 100644 index 000000000..383dcb5b5 --- /dev/null +++ b/CLUE_Rock_Paper_Scissors/advanced/rps_crypto.py @@ -0,0 +1,98 @@ +# MIT License + +# Copyright (c) 2020 Kevin J. Walters + +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: + +# The above copyright notice and this permission notice shall be included in all +# copies or substantial portions of the Software. + +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +# SOFTWARE. + + +import os +import random + +from rps_crypto_chacha import ChaCha + + +def bytesPad(text, size=8, pad=0): + """Convert a string to bytes and add pad value if necessary to make the length up to size. + """ + text_as_bytes = text.encode("utf-8") + if len(text_as_bytes) >= size: + return text_as_bytes + else: + return text_as_bytes + bytes([pad] * (size - len(text_as_bytes))) + + +def strUnpad(text_as_bytes, pad=0): + """Convert a bytes or bytearray + to a str removing trailing bytes matching int pad.""" + text_b = bytes(text_as_bytes) # bytes type has the useful methods + if pad is not None: + text_b = text_b.rstrip(bytes([pad])) + + return text_b.decode("utf-8") + + +def enlargeKey(small_key, mult): + """Enlarge a key using a primtive, probably risky replication algorithm!""" + return small_key * mult + + +def generateOTPadKey(n_bytes): + """Generate a random key of n_bytes bytes returned as type bytes. + This uses the hardware TNG on boards with the feature + like the nRF52840-based CLUE and CPB. + Others use the PRNG. + """ + try: + key = os.urandom(n_bytes) + except NotImplementedError: + key = bytes([random.getrandbits(8) for _ in range(n_bytes)]) + return key + + +def encrypt(plain_text, key, algorithm, *, nonce=None, counter=None): + """Encrypt plain_text bytes with key bytes using algorithm. + Algorithm "xor" can be used for stream ciphers. + """ + + key_data = key(len(plain_text)) if callable(key) else key + + if algorithm == "xor": + return bytes([plain_text[i] ^ key_data[i] for i in range(len(plain_text))]) + elif algorithm == "chacha20": + c_counter = 0 if counter is None else counter + algo = ChaCha(key, nonce, counter=c_counter) + return algo.encrypt(plain_text) + else: + return ValueError("Algorithm not implemented") + + +def decrypt(cipher_text, key, algorithm, *, nonce=None, counter=None): + """Decrypt plain_text bytes with key bytes using algorithm. + Algorithm "xor" can be used for stream ciphers. + """ + key_data = key(len(cipher_text)) if callable(key) else key + + if algorithm == "xor": + return encrypt(cipher_text, key_data, "xor") # enc/dec are same + elif algorithm == "chacha20": + c_counter = 0 if counter is None else counter + algo = ChaCha(key, nonce, counter=c_counter) + return algo.decrypt(cipher_text) + else: + return ValueError("Algorithm not implemented") diff --git a/CLUE_Rock_Paper_Scissors/advanced/rps_crypto_chacha.py b/CLUE_Rock_Paper_Scissors/advanced/rps_crypto_chacha.py new file mode 100644 index 000000000..baa92a85e --- /dev/null +++ b/CLUE_Rock_Paper_Scissors/advanced/rps_crypto_chacha.py @@ -0,0 +1,154 @@ +# Copyright (c) 2015 Hubert Kario (code from tlslite-ng library) +# Copyright (c) 2020 Kevin J. Walters (very minor CP tweaks) + +# GNU Lesser General Public License, version 2.1 +# https://www.gnu.org/licenses/old-licenses/lgpl-2.1.en.html + + +"""Pure Python implementation of ChaCha cipher +Implementation that follows RFC 7539 closely. +""" + +import struct + +MASK32 = 0xffffffff + + +class ChaCha(): + """Pure python implementation of ChaCha cipher""" + + constants = [0x61707865, 0x3320646e, 0x79622d32, 0x6b206574] + + # pylint: disable=invalid-name + @staticmethod + def rotl32(v, c): + """Rotate left a 32 bit integer v by c bits""" + return ((v << c) & MASK32) | (v >> (32 - c)) + + @staticmethod + def quarter_round(x, a, b, c, d): + """Perform a ChaCha quarter round""" + xa = x[a] + xb = x[b] + xc = x[c] + xd = x[d] + + xa = (xa + xb) & MASK32 + xd = xd ^ xa + xd = ((xd << 16) & MASK32 | (xd >> 16)) + + xc = (xc + xd) & MASK32 + xb = xb ^ xc + xb = ((xb << 12) & MASK32 | (xb >> 20)) + + xa = (xa + xb) & MASK32 + xd = xd ^ xa + xd = ((xd << 8) & MASK32 | (xd >> 24)) + + xc = (xc + xd) & MASK32 + xb = xb ^ xc + xb = ((xb << 7) & MASK32 | (xb >> 25)) + + x[a] = xa + x[b] = xb + x[c] = xc + x[d] = xd + + _round_mixup_box = [(0, 4, 8, 12), + (1, 5, 9, 13), + (2, 6, 10, 14), + (3, 7, 11, 15), + (0, 5, 10, 15), + (1, 6, 11, 12), + (2, 7, 8, 13), + (3, 4, 9, 14)] + + @classmethod + def double_round(cls, x): + """Perform two rounds of ChaCha cipher""" + for a, b, c, d in cls._round_mixup_box: + xa = x[a] + xb = x[b] + xc = x[c] + xd = x[d] + + xa = (xa + xb) & MASK32 + xd = xd ^ xa + xd = ((xd << 16) & MASK32 | (xd >> 16)) + + xc = (xc + xd) & MASK32 + xb = xb ^ xc + xb = ((xb << 12) & MASK32 | (xb >> 20)) + + xa = (xa + xb) & MASK32 + xd = xd ^ xa + xd = ((xd << 8) & MASK32 | (xd >> 24)) + + xc = (xc + xd) & MASK32 + xb = xb ^ xc + xb = ((xb << 7) & MASK32 | (xb >> 25)) + + x[a] = xa + x[b] = xb + x[c] = xc + x[d] = xd + + @staticmethod + def chacha_block(key, counter, nonce, rounds): + """Generate a state of a single block""" + state = ChaCha.constants + key + [counter] + nonce + + working_state = state[:] + dbl_round = ChaCha.double_round + for _ in range(0, rounds // 2): + dbl_round(working_state) + + return [(st + wrkSt) & MASK32 for st, wrkSt + in zip(state, working_state)] + + @staticmethod + def word_to_bytearray(state): + """Convert state to little endian bytestream""" + return bytearray(struct.pack(' prev_score: + descending = False + prev_score = sco[idx] + + fmt = "{:" + str(max_len) + "s} {:2d}" + x_pos = (self.width - (max_len + 3) * 2 * self.font_width) // 2 + scale = 2 + spacing = 4 if len(pla) <= 6 else 0 + top_y_pos = round((self.height + - len(pla) * scale * self.font_height + - (len(pla) - 1) * spacing) / 2 + + scale * self.font_height / 2) + scores_group = Group(max_size=len(pla)) + gs_group.append(scores_group) + for idx, (name, _) in enumerate(pla): + op_dob = Label(self.font, + text=fmt.format(name, sco[idx]), + scale=2, + color=(PLAYER_NAME_COL_FG if idx == 0 else OPP_NAME_COL_FG)) + op_dob.x = x_pos + op_dob.y = top_y_pos + idx * (scale * self.font_height + spacing) + scores_group.append(op_dob) + time.sleep(0.2) + + # Sort the entries if needed + sort_scores = list(sco) # Make an independent local copy + if not descending: + empty_group = Group() # minor hack to aid swaps in scores_group + step = 3 + qm_dob = Label(self.font, + text="?", + scale=2, + color=QM_SORT_FG) + qm_dob.x = round(x_pos - 1.5 * scale * self.font_width) + gs_group.append(qm_dob) + while True: + swaps = 0 + for idx in range(0, len(sort_scores) -1): + above_score = sort_scores[idx] + above_y = scores_group[idx].y + below_y = scores_group[idx + 1].y + qm_dob.y = (above_y + below_y) // 2 + if above_score < sort_scores[idx + 1]: + qm_dob.text = "<" + qm_dob.color = QM_SORTING_FG + swaps += 1 + + # make list of steps + range_y = below_y - above_y + offsets = list(range(step, range_y + 1, step)) + # Ensure this goes to the exact final position + if offsets[-1] != range_y: + offsets.append(range_y) + + for offset in offsets: + scores_group[idx].y = above_y + offset + scores_group[idx + 1].y = below_y - offset + time.sleep(0.050) + + # swap the scores around + sort_scores[idx] = sort_scores[idx + 1] + sort_scores[idx + 1] = above_score + + # swap the graphical objects around using empty_group + # to avoid ValueError: Layer already in a group + old_above_dob = scores_group[idx] + old_below_dob = scores_group[idx + 1] + scores_group[idx + 1] = empty_group + scores_group[idx] = old_below_dob + scores_group[idx + 1] = old_above_dob + + qm_dob.text = "?" + qm_dob.color = QM_SORT_FG + time.sleep(0.2) + else: + time.sleep(0.6) + + if swaps == 0: + break # Sort complete if no values were swapped + gs_group.remove(qm_dob) + + + def showGameResultNeoPixels(self, pla, sco, rounds_tot=None): + # pylint: disable=unused-argument + """Display a high score table on NeoPixels. + Sorted into highest first order then displayed by + flashing position pixel to indicate player number and + gradually lighting up pixels in a circle using rainbow + colours for each revolution of the NeoPixels starting at orange. + """ + def selectSecondElem(seq): + """Used in a sort() method to sort my second element in a list.""" + return seq[1] + + # Sort the scores into highest first order + idx_n_score = [(s, sco[s]) for s in range(len(sco))] + idx_n_score.sort(key=selectSecondElem, reverse=True) + ##idx_n_score.sort(key=lambda s: s[1], reverse=True) + + bg_col = BLACK + for idx, score in idx_n_score: + playerIdx = self.choiceToPixIdx(idx) + for scoreRise in range(score): + scoreIdx = self.choiceToPixIdx(scoreRise) + rev = min(scoreRise // self.pix_len, len(SCORE_COLS) - 1) + self.pix[scoreIdx] = SCORE_COLS[rev] + if scoreIdx == playerIdx: + bg_col = SCORE_COLS[rev] + time.sleep(0.09) + self.pix[playerIdx] = PLAYER_COL + time.sleep(0.09) + self.pix[playerIdx] = bg_col + + for _ in range(4): + self.pix[playerIdx] = bg_col + time.sleep(0.5) + self.pix[playerIdx] = PLAYER_COL + time.sleep(0.5) + + self.pix.fill(BLACK) + + + def showGameResult(self, pla, sco, rounds_tot=None): + + if self.disp is None: + self.showGameResultNeoPixels(pla, sco, rounds_tot=rounds_tot) + else: + self.showGameResultScreen(pla, sco, rounds_tot=rounds_tot) + + # Sound samples for exceptional scores + if sco[0] == 0: + self.sample.play("humiliation") + elif sco[0] >= int((len(sco) - 1) * rounds_tot * 1.5): + self.sample.play("excellent") + + if self.disp is not None: + time.sleep(10) # Leave displayed scores visible + + self.sample.wait() + + + def winnerWav(self, mine_idx, yours_idx): + """Return the sound file to play to describe victory or None for draw.""" + + # Take the first characters + mine = self.choices[mine_idx][0] + yours = self.choices[yours_idx][0] + + return self.sample_victory.get(mine + yours) + + + def showPlayerVPlayerScreen(self, me_name, op_name, my_ch_idx, op_ch_idx, + result, summary, win, draw, void): + # pylint: disable=too-many-locals,too-many-branches,too-many-statements + """Display a win, draw, lose or error message.""" + self.fadeUpDown("down") + self.emptyGroup(self.disp_group) + + if void: + error_tot = 3 + error_group = Group(max_size=error_tot + 1) + # Opponent's name helps pinpoint the error + op_dob = Label(self.font, + text=op_name, + scale=2, + color=OPP_NAME_COL_FG) + op_dob.x = 40 + op_dob.y = self.font_height + error_group.append(op_dob) + self.showGroup(error_group) + self.fadeUpDown("up", duration=0.4) + if result is not None: + self.sample.play(result) + font_scale = 2 + # Attempting to put the three pieces of "Error!" text on screen + # synchronised with the sound sample repeating the word + for idx in range(error_tot): + error_dob = Label(self.font, + text="Error!", + scale=font_scale, + color=ERROR_COL_FG) + error_dob.x = 40 + error_dob.y = 60 + idx * 60 + error_group.append(error_dob) + time.sleep(0.5) # Small attempt to synchronise audio with text + font_scale += 1 + + else: + # Would be slightly better to create this Group once and re-use it + pvp_group = Group(max_size=3) + + # Add player's name and sprite just off left side of screen + # and opponent's just off right + player_detail = [[me_name, self.sprites[my_ch_idx], -16 - 3 * self.sprite_size, + PLAYER_NAME_COL_FG, PLAYER_NAME_COL_BG], + [op_name, self.opp_sprites[op_ch_idx], 16 + self.width, + OPP_NAME_COL_FG, OPP_NAME_COL_BG]] + idx_lr = [0, 1] # index for left and right sprite + if win: + player_detail.reverse() # this player is winner so put last + idx_lr.reverse() + + # Add some whitespace around winner's name + player_detail[1][0] = " " + player_detail[1][0] + " " + + for (name, sprite, + start_x, + fg, bg) in player_detail: + s_group = Group(scale=2, max_size=2) # Audio is choppy at scale=3 + s_group.x = start_x + s_group.y = (self.height - 2 * (self.sprite_size + self.font_height)) // 2 + + s_group.append(sprite) + p_name_dob = Label(self.font, + text=name, + scale=1, # This is scaled by the group + color=fg, + background_color=bg) + # Centre text below sprite - values are * Group scale + p_name_dob.x = (self.sprite_size - len(name) * self.font_width) // 2 + p_name_dob.y = self.sprite_size + 4 + s_group.append(p_name_dob) + + pvp_group.append(s_group) + + if draw: + sum_text = "Draw" + elif win: + sum_text = "You win" + else: + sum_text = "You lose" + # Text starts invisible (BLACK) and color is later changed + summary_dob = Label(self.font, + text=sum_text, + scale=3, + color=BLACK) + summary_dob.x = round((self.width + - 3 * self.font_width * len(sum_text)) / 2) + summary_dob.y = round(self.height - (3 * self.font_height / 2)) + pvp_group.append(summary_dob) + + self.showGroup(pvp_group) + self.fadeUpDown("up", duration=0.4) + + # Start audio half way through animations + if draw: + # Move sprites onto the screen leaving them at either side + for idx in range(16): + pvp_group[idx_lr[0]].x += 6 + pvp_group[idx_lr[1]].x -= 6 + if idx == 8 and result is not None: + self.sample.play(result) + time.sleep(0.2) + else: + # Move sprites together, winning sprite overlaps and covers loser + for idx in range(16): + pvp_group[idx_lr[0]].x += 10 + pvp_group[idx_lr[1]].x -= 10 + if idx == 8 and result is not None: + self.sample.play(result) + time.sleep(0.2) + + self.sample.wait() # Wait for first sample to finish + + if summary is not None: + self.sample.play(summary) + + # Flash colours for win, fad up to blue for rest + if not draw and win: + colours = [YELLOW_COL, ORANGE_COL, RED_COL] * 5 + else: + colours = [DRAWLOSE_COL * sc // 15 for sc in range(1, 15 + 1)] + for col in colours: + summary_dob.color = col + time.sleep(0.120) + + self.sample.wait() # Ensure second sample has completed + + + def showPlayerVPlayerNeoPixels(self, op_idx, my_ch_idx, op_ch_idx, + result, summary, win, draw, void): + # pylint: disable=too-many-locals + """This indicates the choices by putting the colours + associated with rock paper scissors on the first pixel + for the player and on subsequent pixels for opponents. + A win brightens the winning pixel as the result audio plays. + Pixel order is based on the game's definition and not + the native NeoPixel list order. + Errors are indicated by flashing all pixels but keeping the + opponent's one dark.""" + + pix_op_idx = self.choiceToPixIdx(op_idx) + if void: + if result is not None: + self.sample.play(result) + ramp_updown = (list(range(8, 128 + 1, 8)) + + list(range(128 - 8, 0 - 1, -8))) + # This fills all NeoPixels so will clear the RPS choice + for _ in range(3): + for ramp in ramp_updown: + self.pix.fill((ramp, 0, 0)) # modulate red led from RGB + self.pix[pix_op_idx] = BLACK # blackout the opponent pixel + time.sleep(0.013) # attempt to to sync with audio + + else: + if result is not None: + self.sample.play(result) + + # Clear the RPS choice and show the player and opponent choices + self.pix.fill(BLACK) + if draw: + self.pix[0] = CHOICE_COL[my_ch_idx] << 2 + self.pix[pix_op_idx] = CHOICE_COL[op_ch_idx] << 2 + time.sleep(0.25) + else: + self.pix[0] = CHOICE_COL[my_ch_idx] + self.pix[pix_op_idx] = CHOICE_COL[op_ch_idx] + # Brighten the pixel for winner + brigten_idx = 0 if win else pix_op_idx + for _ in range(5): + rr, gg, bb = self.pix[brigten_idx] + self.pix[brigten_idx] = (rr << 1, gg << 1, bb << 1) + time.sleep(0.45) + + if summary is not None: + self.sample.wait() + self.sample.play(summary) + self.sample.wait() # Ensure first or second sample have completed + time.sleep(0.5) + self.pix.fill(BLACK) + + + def showPlayerVPlayer(self, + me_name, op_name, op_idx, my_ch_idx, op_ch_idx, + win, draw, void): + if void: + result_wav = "error" + summary_wav = None + elif draw: + result_wav = None + summary_wav = "draw" + else: + result_wav = self.winnerWav(my_ch_idx, op_ch_idx) + summary_wav = "you-win" if win else "you-lose" + + if self.disp is None: + self.showPlayerVPlayerNeoPixels(op_idx, + my_ch_idx, op_ch_idx, + result_wav, summary_wav, win, draw, void) + else: + self.showPlayerVPlayerScreen(me_name, op_name, + my_ch_idx, op_ch_idx, + result_wav, summary_wav, win, draw, void) diff --git a/CLUE_Rock_Paper_Scissors/simple/clue-simple-rpsgame.py b/CLUE_Rock_Paper_Scissors/simple/clue-simple-rpsgame.py new file mode 100644 index 000000000..cce150b79 --- /dev/null +++ b/CLUE_Rock_Paper_Scissors/simple/clue-simple-rpsgame.py @@ -0,0 +1,421 @@ +# clue-simple-rpsgame v1.3 +# CircuitPython rock paper scissors game over Bluetooth LE + +# Tested with CLUE and Circuit Playground Bluefruit Alpha with TFT Gizmo +# and CircuitPython and 5.3.0 + +# copy this file to CLUE/CPB board as code.py + +# MIT License + +# Copyright (c) 2020 Kevin J. Walters + +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: + +# The above copyright notice and this permission notice shall be included in all +# copies or substantial portions of the Software. + +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +# SOFTWARE. + +import time +import os +import struct +import sys + +import board +from displayio import Group +import terminalio +import digitalio + +from adafruit_ble import BLERadio +from adafruit_ble.advertising import Advertisement, LazyObjectField +from adafruit_ble.advertising.standard import ManufacturerData, ManufacturerDataField + +from adafruit_display_text.label import Label + + +debug = 3 + +def d_print(level, *args, **kwargs): + """A simple conditional print for debugging based on global debug level.""" + if not isinstance(level, int): + print(level, *args, **kwargs) + elif debug >= level: + print(*args, **kwargs) + + +def tftGizmoPresent(): + """Determine if the TFT Gizmo is attached. + The TFT's Gizmo circuitry for backlight features a 10k pull-down resistor. + This attempts to verify the presence of the pull-down to determine + if TFT Gizmo is present. + Only use this on Circuit Playground Express (CPX) + or Circuit Playground Bluefruit (CPB) boards.""" + present = True + try: + with digitalio.DigitalInOut(board.A3) as backlight_pin: + backlight_pin.pull = digitalio.Pull.UP + present = not backlight_pin.value + except ValueError: + # The Gizmo is already initialised, i.e. showing console output + pass + + return present + + +# Assuming CLUE if it's not a Circuit Playround (Bluefruit) +clue_less = "Circuit Playground" in os.uname().machine + +# Note: difference in pull-up and pull-down +# and not use for buttons +if clue_less: + # CPB with TFT Gizmo (240x240) + + # Outputs + if tftGizmoPresent(): + from adafruit_gizmo import tft_gizmo + display = tft_gizmo.TFT_Gizmo() + else: + display = None + + # Inputs + # buttons reversed if it is used upside-down with Gizmo + _button_a = digitalio.DigitalInOut(board.BUTTON_A) + _button_a.switch_to_input(pull=digitalio.Pull.DOWN) + _button_b = digitalio.DigitalInOut(board.BUTTON_B) + _button_b.switch_to_input(pull=digitalio.Pull.DOWN) + if display is None: + def button_left(): + return _button_a.value + def button_right(): + return _button_b.value + else: + def button_left(): + return _button_b.value + def button_right(): + return _button_a.value + +else: + # CLUE with builtin screen (240x240) + + # Outputs + display = board.DISPLAY + + # Inputs + _button_a = digitalio.DigitalInOut(board.BUTTON_A) + _button_a.switch_to_input(pull=digitalio.Pull.UP) + _button_b = digitalio.DigitalInOut(board.BUTTON_B) + _button_b.switch_to_input(pull=digitalio.Pull.UP) + def button_left(): + return not _button_a.value + def button_right(): + return not _button_b.value + +if display is None: + print("FATAL:", "This version of program only works with a display") + sys.exit(1) + +choices = ("rock", "paper", "scissors") +my_choice_idx = 0 + +# Top y position of first choice and pixel separate between choices +top_y_pos = 60 +choice_sep = 60 + +DIM_TXT_COL_FG = 0x505050 +DEFAULT_TXT_COL_FG = 0xa0a0a0 +DEFAULT_TXT_COL_BG = 0x000000 +CURSOR_COL_FG = 0xc0c000 +OPP_CURSOR_COL_FG = 0x00c0c0 + + +def setCursor(c_idx, who, visibility=None): + """Set the position of the cursor on-screen to indicate the player's selection.""" + char = None + + if visibility == "show": + char = ">" + elif visibility == "hide": + char = " " + + if 0 <= c_idx < len(choices): + dob = cursor_dob if who == "mine" else opp_cursor_dob + dob.y = top_y_pos + choice_sep * c_idx + if char is not None: + dob.text = char + + +def flashWinner(c_idx, who): + """Invert foreground/background colour a few times + to indicate the winning choice.""" + + if who == "mine": + sg_idx = rps_dob_idx[0] + c_idx + elif who == "opp": + sg_idx = rps_dob_idx[1] + c_idx + else: + raise ValueError("who is mine or opp") + + # An even number will leave colours on original values + for _ in range(5 * 2): + tmp_col = screen_group[sg_idx].color + screen_group[sg_idx].color = screen_group[sg_idx].background_color + screen_group[sg_idx].background_color = tmp_col + time.sleep(0.5) + + +# The 6x14 terminalio classic font +FONT_WIDTH, FONT_HEIGHT = terminalio.FONT.get_bounding_box() +screen_group = Group(max_size=len(choices) * 2 + 1 + 1) + +# The position of the two players RPS Label objects inside screen_group +rps_dob_idx = [] + +# Create the simple arrow cursors +left_col = 20 +right_col = display.width // 2 + left_col +for x_pos in (left_col, right_col): + y_pos = top_y_pos + rps_dob_idx.append(len(screen_group)) + for label_text in choices: + rps_dob = Label(terminalio.FONT, + text=label_text, + scale=2, + color=DEFAULT_TXT_COL_FG, + background_color=DEFAULT_TXT_COL_BG) + rps_dob.x = x_pos + rps_dob.y = y_pos + y_pos += 60 + screen_group.append(rps_dob) + +cursor_dob = Label(terminalio.FONT, + text=">", + scale=3, + color=CURSOR_COL_FG) +cursor_dob.x = left_col - 20 +setCursor(my_choice_idx, "mine") +cursor_dob.y = top_y_pos +screen_group.append(cursor_dob) + +# Initially set to a space to not show it +opp_cursor_dob = Label(terminalio.FONT, + text=" ", + scale=3, + color=OPP_CURSOR_COL_FG, + background_color=DEFAULT_TXT_COL_BG) +opp_cursor_dob.x = right_col - 20 +setCursor(my_choice_idx, "your") +opp_cursor_dob.y = top_y_pos +screen_group.append(opp_cursor_dob) + +display.show(screen_group) + +# From adafruit_ble.advertising +MANUFACTURING_DATA_ADT = 0xFF +ADAFRUIT_COMPANY_ID = 0x0822 + +# pylint: disable=line-too-long +# According to https://github.com/adafruit/Adafruit_CircuitPython_BLE/blob/master/adafruit_ble/advertising/adafruit.py +# 0xf000 (to 0xffff) is for range for Adafruit customers +RPS_ACK_ID = 0xfe30 +RPS_DATA_ID = 0xfe31 + + +class RpsAdvertisement(Advertisement): + """Broadcast an RPS message. + This is not connectable and elicits no scan_response based on defaults + in Advertisement parent class.""" + + flags = None + + _PREFIX_FMT = " TOTAL_ROUND: + print("Summary: ", + "wins {:d}, losses {:d}, draws {:d}, void {:d}".format(wins, losses, draws, voids)) + + # Reset variables for another game + round_no = 1 + wins = 0 + losses = 0 + draws = 0 + voids = 0 + round_no = 1 + + if button_left(): + while button_left(): + pass + my_choice_idx = (my_choice_idx + 1) % len(choices) + setCursor(my_choice_idx, "mine") + + if button_right(): + tx_message = RpsAdvertisement() + + choice = choices[my_choice_idx] + tx_message.test_string = choice + d_print(2, "TXing RTA", choice) + + opponent_choice = None + ble.start_advertising(tx_message, interval=MIN_AD_INTERVAL) + sending_ns = time.monotonic_ns() + + # Timeout value is in seconds + # RSSI -100 is probably minimum, -128 would be 8bit signed min + # window and interval are 0.1 by default - same value means + # continuous scanning (sending Advertisement will interrupt this) + for adv in ble.start_scan(RpsAdvertisement, + minimum_rssi=-90, + timeout=MAX_SEND_TIME_S): + received_ns = time.monotonic_ns() + d_print(2, "RXed RTA", + adv.test_string) + opponent_choice_bytes = adv.test_string + + # Trim trailing NUL chars from bytes + idx = 0 + while idx < len(opponent_choice_bytes): + if opponent_choice_bytes[idx] == 0: + break + idx += 1 + opponent_choice = opponent_choice_bytes[0:idx].decode("utf-8") + break + + # We have received one message or exceeded MAX_SEND_TIME_S + ble.stop_scan() + + # Ensure we send our message for a minimum period of time + # constrained by the ultimate duration cap + if opponent_choice is not None: + timeout = False + remaining_ns = MAX_SEND_TIME_NS - (received_ns - sending_ns) + extra_ad_time_ns = min(remaining_ns, MIN_SEND_TIME_NS) + # Only sleep if we need to, the value here could be a small + # negative one too so this caters for this + if extra_ad_time_ns > 0: + sleep_t = extra_ad_time_ns / NS_IN_S + d_print(2, "Additional {:f} seconds of advertising".format(sleep_t)) + time.sleep(sleep_t) + else: + timeout = True + + ble.stop_advertising() + + d_print(1, "ROUND", round_no, + "MINE", choice, + "| OPPONENT", opponent_choice) + win, draw, void = evaluate_game(choice, opponent_choice) + + if void: + voids += 1 + else: + opp_choice_idx = choices.index(opponent_choice) + setCursor(opp_choice_idx, "opp", visibility="show") + if draw: + time.sleep(4) + draws += 1 + elif win: + flashWinner(my_choice_idx, "mine") + wins += 1 + else: + flashWinner(opp_choice_idx, "opp") + losses += 1 + setCursor(opp_choice_idx, "opp", visibility="hide") + d_print(1, "wins {:d}, losses {:d}, draws {:d}, void {:d}".format(wins, losses, draws, voids)) + + round_no += 1 diff --git a/CLUE_Rock_Paper_Scissors/tests/test_rps_advertisements.py b/CLUE_Rock_Paper_Scissors/tests/test_rps_advertisements.py new file mode 100644 index 000000000..d2d784234 --- /dev/null +++ b/CLUE_Rock_Paper_Scissors/tests/test_rps_advertisements.py @@ -0,0 +1,133 @@ +# The MIT License (MIT) +# +# Copyright (c) 2020 Kevin J. Walters +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in +# all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +# THE SOFTWARE. + +import sys +import os + +import unittest +from unittest.mock import MagicMock + +verbose = int(os.getenv('TESTVERBOSE', '2')) + +# PYTHONPATH needs to be set to find adafruit_ble + +# Mocking library used by adafruit_ble +sys.modules['_bleio'] = MagicMock() + +# Borrowing the dhalbert/tannewt technique from adafruit/Adafruit_CircuitPython_Motor +sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))) + +# import what we are testing or will test in future +# pylint: disable=unused-import,wrong-import-position +from rps_advertisements import JoinGameAdvertisement, \ + RpsEncDataAdvertisement, \ + RpsKeyDataAdvertisement, \ + RpsRoundEndAdvertisement + +# pylint: disable=line-too-long + +class Test_RpsEncDataAdvertisement(unittest.TestCase): + + def test_bytes_order(self): + """Testing the order of data inside the manufacturer's field to ensure it follows the + fields are set in. This is new behaviour to benefit prefix matching.""" + + rpsedad1 = RpsEncDataAdvertisement(enc_data=b"FIRST", round_no=33, sequence_number=17) + + # This checks value is not the old incorrect order + self.assertNotEqual(bytes(rpsedad1), + b"\x16\xff\x22\x08\x03\x03\x00\x11\nA\xfeFIRST\x00\x00\x00\x03C\xfe\x21", + msg="Checking order of serialised data for" + " ackless RpsEncDataAdvertisement does" + " not follow previous incorrect order") + + # This check for correct order + self.assertEqual(bytes(rpsedad1), + b"\x16\xff\x22\x08\x0a\x41\xfeFIRST\x00\x00\x00\x03C\xfe\x21\x03\x03\x00\x11", + msg="Checking order of serialised data for" + " ackless RpsEncDataAdvertisement") + + rpsedad1.ack = 29 + self.assertEqual(bytes(rpsedad1), + b"\x1a\xff\x22\x08\nA\xfeFIRST\x00\x00\x00\x03C\xfe!\x03\x03\x00\x11\x03Q\xfe\x1d", + msg="Checking order of serialised data for" + " RpsEncDataAdvertisement with ack set post construction") + + +class Test_RpsKeyDataAdvertisement(unittest.TestCase): + + def test_bytes_order(self): + """Testing the order of data inside the manufacturer's field to ensure it follows the + fields are set in. This is new behaviour to benefit prefix matching.""" + + rpskdad1 = RpsKeyDataAdvertisement(key_data=b"FIRST", round_no=33, sequence_number=17) + + # This checks value is not the old incorrect order + self.assertNotEqual(bytes(rpskdad1), + b"\x16\xff\x22\x08\x03\x03\x00\x11\nB\xfeFIRST\x00\x00\x00\x03C\xfe\x21", + msg="Checking order of serialised data for" + " ackless RpsKeyDataAdvertisement does" + " not follow previous incorrect order") + + # This check for correct order + self.assertEqual(bytes(rpskdad1), + b"\x16\xff\x22\x08\x0a\x42\xfeFIRST\x00\x00\x00\x03C\xfe\x21\x03\x03\x00\x11", + msg="Checking order of serialised data for" + " ackless RpsKeyDataAdvertisement") + + rpskdad1.ack = 29 + self.assertEqual(bytes(rpskdad1), + b"\x1a\xff\x22\x08\nB\xfeFIRST\x00\x00\x00\x03C\xfe!\x03\x03\x00\x11\x03Q\xfe\x1d", + msg="Checking order of serialised data for" + " RpsKeyDataAdvertisement with ack set post construction") + + +class Test_RpsRoundEndAdvertisement(unittest.TestCase): + + def test_bytes_order(self): + """Testing the order of data inside the manufacturer's field to ensure it follows the + fields are set in. This is new behaviour to benefit prefix matching.""" + + rpsread1 = RpsRoundEndAdvertisement(round_no=133, sequence_number=201) + + # This checks value is not the old incorrect order + self.assertNotEqual(bytes(rpsread1), + b"\x0b\xff\x22\x08\x03\x03\x00\xc9\x03C\xfe\x85", + msg="Checking order of serialised data for" + " ackless RpsRoundEndAdvertisement does" + " not follow previous incorrect order") + + # This check for correct order + self.assertEqual(bytes(rpsread1), + b"\x0b\xff\x22\x08\x03C\xfe\x85\x03\x03\x00\xc9", + msg="Checking order of serialised data for" + " ackless RpsRoundEndAdvertisement") + + rpsread1.ack = 200 + self.assertEqual(bytes(rpsread1), + b"\x0f" b"\xff\x22\x08\x03C\xfe\x85\x03\x03\x00\xc9" b"\x03Q\xfe\xc8", + msg="Checking order of serialised data for" + " RpsRoundEndAdvertisement with ack set post construction") + + +if __name__ == '__main__': + unittest.main(verbosity=verbose) diff --git a/CLUE_Rock_Paper_Scissors/tests/test_rps_crypto.py b/CLUE_Rock_Paper_Scissors/tests/test_rps_crypto.py new file mode 100644 index 000000000..a93d265d1 --- /dev/null +++ b/CLUE_Rock_Paper_Scissors/tests/test_rps_crypto.py @@ -0,0 +1,98 @@ +# The MIT License (MIT) +# +# Copyright (c) 2020 Kevin J. Walters +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in +# all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +# THE SOFTWARE. + +import sys +import os + +import unittest + +verbose = int(os.getenv('TESTVERBOSE', '2')) + +# Borrowing the dhalbert/tannewt technique from adafruit/Adafruit_CircuitPython_Motor +sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))) + +# import what we are testing or will test in future +# pylint: disable=unused-import,wrong-import-position +from rps_crypto import encrypt, decrypt + + +# pylint: disable=protected-access +class Test_Chacha20(unittest.TestCase): + + algo_name = "chacha20" + + def test_encdec_one(self): + """Test using values from RFC8439 section 2.3.2.""" + + key = bytes(range(32)) + nonce = b"\x00\x00\x00\x00\x00\x00\x00\x4a\x00\x00\x00\x00" + ##nonce = b"\x00\x00\x00\x09\x00\x00\x00\x4a\x00\x00\x00\x00" + counter = 1 + plain_text = (b"Ladies and Gentlemen of the class of '99: If I could" + b" offer you only one tip for the future," + b" sunscreen would be it.") + + cipher_text = encrypt(plain_text, key, self.algo_name, + nonce=nonce, counter=counter) + + expected_cipher_text = ( + b"\x6e\x2e\x35\x9a\x25\x68\xf9\x80\x41\xba\x07\x28\xdd\x0d\x69\x81" + b"\xe9\x7e\x7a\xec\x1d\x43\x60\xc2\x0a\x27\xaf\xcc\xfd\x9f\xae\x0b" + b"\xf9\x1b\x65\xc5\x52\x47\x33\xab\x8f\x59\x3d\xab\xcd\x62\xb3\x57" + b"\x16\x39\xd6\x24\xe6\x51\x52\xab\x8f\x53\x0c\x35\x9f\x08\x61\xd8" + b"\x07\xca\x0d\xbf\x50\x0d\x6a\x61\x56\xa3\x8e\x08\x8a\x22\xb6\x5e" + b"\x52\xbc\x51\x4d\x16\xcc\xf8\x06\x81\x8c\xe9\x1a\xb7\x79\x37\x36" + b"\x5a\xf9\x0b\xbf\x74\xa3\x5b\xe6\xb4\x0b\x8e\xed\xf2\x78\x5e\x42" + b"\x87\x4d") + + self.assertEqual(cipher_text, expected_cipher_text, + msg="Checking cipher text matches expected value") + + decrypted_text = decrypt(cipher_text, key, self.algo_name, + nonce=nonce, counter=counter) + + self.assertEqual(plain_text, decrypted_text, + msg="Checking decryption of encrypted text gives original plain text") + + + def test_encdec_two(self): + """Test using values approximating that from RPS game.""" + + key = b"TPSecret" * 4 + nonce = bytearray(range(12, 0, -1)) + plain_text = b"rock" + + cipher_text = encrypt(plain_text, key, self.algo_name, + nonce=nonce) + + decrypted_text = decrypt(cipher_text, key, self.algo_name, + nonce=nonce) + + # It is possible for these to match but very unlikely in the general case + self.assertNotEqual(plain_text, cipher_text, + msg="Check cipher_text is not plain_text") + self.assertEqual(plain_text, decrypted_text, + msg="Checking decryption of encrypted text gives original plain text") + + +if __name__ == '__main__': + unittest.main(verbosity=verbose) diff --git a/CLUE_Rock_Paper_Scissors/very-simple/clue-verysimple-rpsgame.py b/CLUE_Rock_Paper_Scissors/very-simple/clue-verysimple-rpsgame.py new file mode 100644 index 000000000..7dd1ceb91 --- /dev/null +++ b/CLUE_Rock_Paper_Scissors/very-simple/clue-verysimple-rpsgame.py @@ -0,0 +1,51 @@ +# clue-verysimple-rpsgame v1.0 +# CircuitPython rock paper scissors game simple text game +# based on https://www.youtube.com/watch?v=dhaaZQyBP2g + +# Tested with CLUE and Circuit Playground Bluefruit (Alpha) +# and CircuitPython and 5.3.0 + +# copy this file to CLUE/CPB board as code.py + +# MIT License + +# Copyright (c) 2015 Chris Bradfield, KidsCanCode LLC + +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: + +# The above copyright notice and this permission notice shall be included in all +# copies or substantial portions of the Software. + +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +# SOFTWARE. + +import random + +moves = ["r", "p", "s"] +player_wins = ["pr", "sp", "rs"] + +print("Rock, paper scissors game: enter first letter for move or q for quit") +while True: + player_move = input("Your move: ") + if player_move == "q": + break + + computer_move = random.choice(moves) + print("You:", player_move) + print("Me:", computer_move) + if player_move == computer_move: + print("Tie") + elif player_move + computer_move in player_wins: + print("You win!") + else: + print("You lose!")