Skip to content

Make these connection errors distinct from other error types. #164

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 16 commits into from
Jun 21, 2022
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
94 changes: 47 additions & 47 deletions adafruit_esp32spi/adafruit_esp32spi.py
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ def _wait_for_ready(self):
print(".", end="")
time.sleep(0.05)
else:
raise RuntimeError("ESP32 not responding")
raise TimeoutError("ESP32 not responding")
if self._debug >= 3:
print()

Expand Down Expand Up @@ -242,7 +242,7 @@ def _send_command(self, cmd, params=None, *, param_len_16=False):
if self._ready.value: # ok ready to send!
break
else:
raise RuntimeError("ESP32 timed out on SPI select")
raise TimeoutError("ESP32 timed out on SPI select")
spi.write(
self._sendbuf, start=0, end=packet_len
) # pylint: disable=no-member
Expand Down Expand Up @@ -271,17 +271,17 @@ def _wait_spi_char(self, spi, desired):
for _ in range(10):
r = self._read_byte(spi)
if r == _ERR_CMD:
raise RuntimeError("Error response to command")
raise BrokenPipeError("Error response to command")
if r == desired:
return True
time.sleep(0.01)
raise RuntimeError("Timed out waiting for SPI char")
raise TimeoutError("Timed out waiting for SPI char")

def _check_data(self, spi, desired):
"""Read a byte and verify its the value we want"""
r = self._read_byte(spi)
if r != desired:
raise RuntimeError("Expected %02X but got %02X" % (desired, r))
raise BrokenPipeError("Expected %02X but got %02X" % (desired, r))

def _wait_response_cmd(self, cmd, num_responses=None, *, param_len_16=False):
"""Wait for ready, then parse the response"""
Expand All @@ -294,7 +294,7 @@ def _wait_response_cmd(self, cmd, num_responses=None, *, param_len_16=False):
if self._ready.value: # ok ready to send!
break
else:
raise RuntimeError("ESP32 timed out on SPI select")
raise TimeoutError("ESP32 timed out on SPI select")

self._wait_spi_char(spi, _START_CMD)
self._check_data(spi, cmd | _REPLY_FLAG)
Expand Down Expand Up @@ -377,7 +377,7 @@ def start_scan_networks(self):
print("Start scan")
resp = self._send_command_get_response(_START_SCAN_NETWORKS)
if resp[0][0] != 1:
raise RuntimeError("Failed to start AP scan")
raise OSError("Failed to start AP scan")

def get_scan_networks(self):
"""The results of the latest SSID scan. Returns a list of dictionaries with
Expand Down Expand Up @@ -440,7 +440,7 @@ def set_dns_config(self, dns1, dns2):
_SET_DNS_CONFIG, [b"\x00", self.unpretty_ip(dns1), self.unpretty_ip(dns2)]
)
if resp[0][0] != 1:
raise RuntimeError("Failed to set dns with esp32")
raise OSError("Failed to set dns with esp32")

def set_hostname(self, hostname):
"""Tells the ESP32 to set hostname for DHCP.
Expand All @@ -449,57 +449,57 @@ def set_hostname(self, hostname):
"""
resp = self._send_command_get_response(_SET_HOSTNAME, [hostname.encode()])
if resp[0][0] != 1:
raise RuntimeError("Failed to set hostname with esp32")
raise OSError("Failed to set hostname with esp32")

def wifi_set_network(self, ssid):
"""Tells the ESP32 to set the access point to the given ssid"""
resp = self._send_command_get_response(_SET_NET_CMD, [ssid])
if resp[0][0] != 1:
raise RuntimeError("Failed to set network")
raise OSError("Failed to set network")

def wifi_set_passphrase(self, ssid, passphrase):
"""Sets the desired access point ssid and passphrase"""
resp = self._send_command_get_response(_SET_PASSPHRASE_CMD, [ssid, passphrase])
if resp[0][0] != 1:
raise RuntimeError("Failed to set passphrase")
raise OSError("Failed to set passphrase")

def wifi_set_entidentity(self, ident):
"""Sets the WPA2 Enterprise anonymous identity"""
resp = self._send_command_get_response(_SET_ENT_IDENT_CMD, [ident])
if resp[0][0] != 1:
raise RuntimeError("Failed to set enterprise anonymous identity")
raise OSError("Failed to set enterprise anonymous identity")

def wifi_set_entusername(self, username):
"""Sets the desired WPA2 Enterprise username"""
resp = self._send_command_get_response(_SET_ENT_UNAME_CMD, [username])
if resp[0][0] != 1:
raise RuntimeError("Failed to set enterprise username")
raise OSError("Failed to set enterprise username")

def wifi_set_entpassword(self, password):
"""Sets the desired WPA2 Enterprise password"""
resp = self._send_command_get_response(_SET_ENT_PASSWD_CMD, [password])
if resp[0][0] != 1:
raise RuntimeError("Failed to set enterprise password")
raise OSError("Failed to set enterprise password")

def wifi_set_entenable(self):
"""Enables WPA2 Enterprise mode"""
resp = self._send_command_get_response(_SET_ENT_ENABLE_CMD)
if resp[0][0] != 1:
raise RuntimeError("Failed to enable enterprise mode")
raise OSError("Failed to enable enterprise mode")

def _wifi_set_ap_network(self, ssid, channel):
"""Creates an Access point with SSID and Channel"""
resp = self._send_command_get_response(_SET_AP_NET_CMD, [ssid, channel])
if resp[0][0] != 1:
raise RuntimeError("Failed to setup AP network")
raise OSError("Failed to setup AP network")

def _wifi_set_ap_passphrase(self, ssid, passphrase, channel):
"""Creates an Access point with SSID, passphrase, and Channel"""
resp = self._send_command_get_response(
_SET_AP_PASSPHRASE_CMD, [ssid, passphrase, channel]
)
if resp[0][0] != 1:
raise RuntimeError("Failed to setup AP password")
raise OSError("Failed to setup AP password")

@property
def ssid(self):
Expand Down Expand Up @@ -539,7 +539,7 @@ def is_connected(self):
"""Whether the ESP32 is connected to an access point"""
try:
return self.status == WL_CONNECTED
except RuntimeError:
except OSError:
self.reset()
return False

Expand All @@ -548,15 +548,15 @@ def ap_listening(self):
"""Returns if the ESP32 is in access point mode and is listening for connections"""
try:
return self.status == WL_AP_LISTENING
except RuntimeError:
except OSError:
self.reset()
return False

def disconnect(self):
"""Disconnect from the access point"""
resp = self._send_command_get_response(_DISCONNECT_CMD)
if resp[0][0] != 1:
raise RuntimeError("Failed to disconnect")
raise OSError("Failed to disconnect")

def connect(self, secrets):
"""Connect to an access point using a secrets dictionary
Expand Down Expand Up @@ -589,10 +589,10 @@ def connect_AP(self, ssid, password, timeout_s=10): # pylint: disable=invalid-n
return stat
time.sleep(0.05)
if stat in (WL_CONNECT_FAILED, WL_CONNECTION_LOST, WL_DISCONNECTED):
raise RuntimeError("Failed to connect to ssid", ssid)
raise ConnectionError("Failed to connect to ssid", ssid)
if stat == WL_NO_SSID_AVAIL:
raise RuntimeError("No such ssid", ssid)
raise RuntimeError("Unknown error 0x%02X" % stat)
raise ConnectionError("No such ssid", ssid)
raise OSError("Unknown error 0x%02X" % stat)

def create_AP(
self, ssid, password, channel=1, timeout=10
Expand All @@ -607,11 +607,11 @@ def create_AP(
:param int timeout: number of seconds until we time out and fail to create AP
"""
if len(ssid) > 32:
raise RuntimeError("ssid must be no more than 32 characters")
raise ValueError("ssid must be no more than 32 characters")
if password and (len(password) < 8 or len(password) > 64):
raise RuntimeError("password must be 8 - 63 characters")
raise ValueError("password must be 8 - 63 characters")
if channel < 1 or channel > 14:
raise RuntimeError("channel must be between 1 and 14")
raise ValueError("channel must be between 1 and 14")

if isinstance(channel, int):
channel = bytes(channel)
Expand All @@ -631,8 +631,8 @@ def create_AP(
return stat
time.sleep(0.05)
if stat == WL_AP_FAILED:
raise RuntimeError("Failed to create AP", ssid)
raise RuntimeError("Unknown error 0x%02x" % stat)
raise ConnectionError("Failed to create AP", ssid)
raise OSError("Unknown error 0x%02x" % stat)

def pretty_ip(self, ip): # pylint: disable=no-self-use, invalid-name
"""Converts a bytearray IP address to a dotted-quad string for printing"""
Expand All @@ -652,7 +652,7 @@ def get_host_by_name(self, hostname):
hostname = bytes(hostname, "utf-8")
resp = self._send_command_get_response(_REQ_HOST_BY_NAME_CMD, (hostname,))
if resp[0][0] != 1:
raise RuntimeError("Failed to request hostname")
raise ConnectionError("Failed to request hostname")
resp = self._send_command_get_response(_GET_HOST_BY_NAME_CMD)
return resp[0]

Expand Down Expand Up @@ -708,7 +708,7 @@ def socket_open(self, socket_num, dest, port, conn_mode=TCP_MODE):
(dest, port_param, self._socknum_ll[0], (conn_mode,)),
)
if resp[0][0] != 1:
raise RuntimeError("Could not connect to remote server")
raise ConnectionError("Could not connect to remote server")
if conn_mode == ESP_SPIcontrol.TLS_MODE:
self._tls_socket = socket_num

Expand Down Expand Up @@ -751,24 +751,24 @@ def socket_write(self, socket_num, buffer, conn_mode=TCP_MODE):
if conn_mode == self.UDP_MODE:
# UDP verifies chunks on write, not bytes
if sent != total_chunks:
raise RuntimeError(
raise ConnectionError(
"Failed to write %d chunks (sent %d)" % (total_chunks, sent)
)
# UDP needs to finalize with this command, does the actual sending
resp = self._send_command_get_response(_SEND_UDP_DATA_CMD, self._socknum_ll)
if resp[0][0] != 1:
raise RuntimeError("Failed to send UDP data")
raise ConnectionError("Failed to send UDP data")
return

if sent != len(buffer):
self.socket_close(socket_num)
raise RuntimeError(
raise ConnectionError(
"Failed to send %d bytes (sent %d)" % (len(buffer), sent)
)

resp = self._send_command_get_response(_DATA_SENT_TCP_CMD, self._socknum_ll)
if resp[0][0] != 1:
raise RuntimeError("Failed to verify data sent")
raise ConnectionError("Failed to verify data sent")

def socket_available(self, socket_num):
"""Determine how many bytes are waiting to be read on the socket"""
Expand Down Expand Up @@ -815,7 +815,7 @@ def socket_connect(self, socket_num, dest, port, conn_mode=TCP_MODE):
if self.socket_connected(socket_num):
return True
time.sleep(0.01)
raise RuntimeError("Failed to establish connection")
raise TimeoutError("Failed to establish connection")

def socket_close(self, socket_num):
"""Close a socket using the ESP32's internal reference number"""
Expand All @@ -824,7 +824,7 @@ def socket_close(self, socket_num):
self._socknum_ll[0][0] = socket_num
try:
self._send_command_get_response(_STOP_CLIENT_TCP_CMD, self._socknum_ll)
except RuntimeError:
except OSError:
pass
if socket_num == self._tls_socket:
self._tls_socket = None
Expand All @@ -842,7 +842,7 @@ def start_server(
resp = self._send_command_get_response(_START_SERVER_TCP_CMD, params)

if resp[0][0] != 1:
raise RuntimeError("Could not start server")
raise OSError("Could not start server")

def server_state(self, socket_num):
"""Get the state of the ESP32's internal reference server socket number"""
Expand All @@ -863,7 +863,7 @@ def set_esp_debug(self, enabled):
written to the ESP32's UART."""
resp = self._send_command_get_response(_SET_DEBUG_CMD, ((bool(enabled),),))
if resp[0][0] != 1:
raise RuntimeError("Failed to set debug mode")
raise OSError("Failed to set debug mode")

def set_pin_mode(self, pin, mode):
"""Set the io mode for a GPIO pin.
Expand All @@ -879,7 +879,7 @@ def set_pin_mode(self, pin, mode):
pin_mode = mode
resp = self._send_command_get_response(_SET_PIN_MODE_CMD, ((pin,), (pin_mode,)))
if resp[0][0] != 1:
raise RuntimeError("Failed to set pin mode")
raise OSError("Failed to set pin mode")

def set_digital_write(self, pin, value):
"""Set the digital output value of pin.
Expand All @@ -891,7 +891,7 @@ def set_digital_write(self, pin, value):
_SET_DIGITAL_WRITE_CMD, ((pin,), (value,))
)
if resp[0][0] != 1:
raise RuntimeError("Failed to write to pin")
raise OSError("Failed to write to pin")

def set_analog_write(self, pin, analog_value):
"""Set the analog output value of pin, using PWM.
Expand All @@ -904,7 +904,7 @@ def set_analog_write(self, pin, analog_value):
_SET_ANALOG_WRITE_CMD, ((pin,), (value,))
)
if resp[0][0] != 1:
raise RuntimeError("Failed to write to pin")
raise OSError("Failed to write to pin")

def set_digital_read(self, pin):
"""Get the digital input value of pin. Returns the boolean value of the pin.
Expand Down Expand Up @@ -953,10 +953,10 @@ def get_time(self):
raise ValueError("_GET_TIME returned 0")
return resp_time
if self.status in (WL_AP_LISTENING, WL_AP_CONNECTED):
raise RuntimeError(
raise OSError(
"Cannot obtain NTP while in AP mode, must be connected to internet"
)
raise RuntimeError("Must be connected to WiFi before obtaining NTP.")
raise OSError("Must be connected to WiFi before obtaining NTP.")

def set_certificate(self, client_certificate):
"""Sets client certificate. Must be called
Expand All @@ -967,7 +967,7 @@ def set_certificate(self, client_certificate):
if self._debug:
print("** Setting client certificate")
if self.status == WL_CONNECTED:
raise RuntimeError(
raise ValueError(
"set_certificate must be called BEFORE a connection is established."
)
if isinstance(client_certificate, str):
Expand All @@ -977,7 +977,7 @@ def set_certificate(self, client_certificate):
assert len(client_certificate) < 1300, ".PEM must be less than 1300 bytes."
resp = self._send_command_get_response(_SET_CLI_CERT, (client_certificate,))
if resp[0][0] != 1:
raise RuntimeError("Failed to set client certificate")
raise OSError("Failed to set client certificate")
self.set_crt = True
return resp[0]

Expand All @@ -990,7 +990,7 @@ def set_private_key(self, private_key):
if self._debug:
print("** Setting client's private key.")
if self.status == WL_CONNECTED:
raise RuntimeError(
raise ValueError(
"set_private_key must be called BEFORE a connection is established."
)
if isinstance(private_key, str):
Expand All @@ -1000,6 +1000,6 @@ def set_private_key(self, private_key):
assert len(private_key) < 1700, ".PEM must be less than 1700 bytes."
resp = self._send_command_get_response(_SET_PK, (private_key,))
if resp[0][0] != 1:
raise RuntimeError("Failed to set private key.")
raise OSError("Failed to set private key.")
self.set_psk = True
return resp[0]
8 changes: 4 additions & 4 deletions adafruit_esp32spi/adafruit_esp32spi_socket.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ def getaddrinfo(host, port, family=0, socktype=0, proto=0, flags=0):
"""Given a hostname and a port name, return a 'socket.getaddrinfo'
compatible list of tuples. Honestly, we ignore anything but host & port"""
if not isinstance(port, int):
raise RuntimeError("Port must be an integer")
raise ValueError("Port must be an integer")
ipaddr = _the_interface.get_host_by_name(host)
return [(AF_INET, socktype, proto, "", (ipaddr, port))]

Expand All @@ -56,7 +56,7 @@ def __init__(
self, family=AF_INET, type=SOCK_STREAM, proto=0, fileno=None, socknum=None
):
if family != AF_INET:
raise RuntimeError("Only AF_INET family supported")
raise ValueError("Only AF_INET family supported")
self._type = type
self._buffer = b""
self._socknum = socknum if socknum else _the_interface.get_socket()
Expand All @@ -74,7 +74,7 @@ def connect(self, address, conntype=None):
if not _the_interface.socket_connect(
self._socknum, host, port, conn_mode=conntype
):
raise RuntimeError("Failed to connect to host", host)
raise ConnectionError("Failed to connect to host", host)
self._buffer = b""

def send(self, data): # pylint: disable=no-self-use
Expand Down Expand Up @@ -105,7 +105,7 @@ def readline(self, eol=b"\r\n"):
self._buffer += _the_interface.socket_read(self._socknum, avail)
elif self._timeout > 0 and time.monotonic() - stamp > self._timeout:
self.close() # Make sure to close socket so that we don't exhaust sockets.
raise RuntimeError("Didn't receive full response, failing out")
raise OSError("Didn't receive full response, failing out")
firstline, self._buffer = self._buffer.split(eol, 1)
gc.collect()
return firstline
Expand Down
Loading