# SPDX-FileCopyrightText: Copyright (c) 2019 ladyada for Adafruit Industries
#
# SPDX-License-Identifier: MIT

"""
`adafruit_esp32spi`
================================================================================

CircuitPython driver library for using ESP32 as WiFi  co-processor using SPI


* Author(s): ladyada

Implementation Notes
--------------------

**Hardware:**

**Software and Dependencies:**

* Adafruit CircuitPython firmware for the supported boards:
  https://github.com/adafruit/circuitpython/releases

* Adafruit's Bus Device library:
  https://github.com/adafruit/Adafruit_CircuitPython_BusDevice

"""

import struct
import time
import warnings
from micropython import const
from adafruit_bus_device.spi_device import SPIDevice
from digitalio import Direction

__version__ = "0.0.0+auto.0"
__repo__ = "https://github.com/adafruit/Adafruit_CircuitPython_ESP32SPI.git"

_SET_NET_CMD = const(0x10)
_SET_PASSPHRASE_CMD = const(0x11)
_SET_IP_CONFIG = const(0x14)
_SET_DNS_CONFIG = const(0x15)
_SET_HOSTNAME = const(0x16)
_SET_AP_NET_CMD = const(0x18)
_SET_AP_PASSPHRASE_CMD = const(0x19)
_SET_DEBUG_CMD = const(0x1A)

_GET_CONN_STATUS_CMD = const(0x20)
_GET_IPADDR_CMD = const(0x21)
_GET_MACADDR_CMD = const(0x22)
_GET_CURR_SSID_CMD = const(0x23)
_GET_CURR_BSSID_CMD = const(0x24)
_GET_CURR_RSSI_CMD = const(0x25)
_GET_CURR_ENCT_CMD = const(0x26)

_SCAN_NETWORKS = const(0x27)
_START_SERVER_TCP_CMD = const(0x28)
_GET_SOCKET_CMD = const(0x3F)
_GET_STATE_TCP_CMD = const(0x29)
_DATA_SENT_TCP_CMD = const(0x2A)
_AVAIL_DATA_TCP_CMD = const(0x2B)
_GET_DATA_TCP_CMD = const(0x2C)
_START_CLIENT_TCP_CMD = const(0x2D)
_STOP_CLIENT_TCP_CMD = const(0x2E)
_GET_CLIENT_STATE_TCP_CMD = const(0x2F)
_DISCONNECT_CMD = const(0x30)
_GET_IDX_RSSI_CMD = const(0x32)
_GET_IDX_ENCT_CMD = const(0x33)
_REQ_HOST_BY_NAME_CMD = const(0x34)
_GET_HOST_BY_NAME_CMD = const(0x35)
_START_SCAN_NETWORKS = const(0x36)
_GET_FW_VERSION_CMD = const(0x37)
_SEND_UDP_DATA_CMD = const(0x39)
_GET_REMOTE_DATA_CMD = const(0x3A)
_GET_TIME = const(0x3B)
_GET_IDX_BSSID_CMD = const(0x3C)
_GET_IDX_CHAN_CMD = const(0x3D)
_PING_CMD = const(0x3E)

_SEND_DATA_TCP_CMD = const(0x44)
_GET_DATABUF_TCP_CMD = const(0x45)
_INSERT_DATABUF_TCP_CMD = const(0x46)
_SET_ENT_IDENT_CMD = const(0x4A)
_SET_ENT_UNAME_CMD = const(0x4B)
_SET_ENT_PASSWD_CMD = const(0x4C)
_SET_ENT_ENABLE_CMD = const(0x4F)
_SET_CLI_CERT = const(0x40)
_SET_PK = const(0x41)

_SET_PIN_MODE_CMD = const(0x50)
_SET_DIGITAL_WRITE_CMD = const(0x51)
_SET_ANALOG_WRITE_CMD = const(0x52)
_SET_DIGITAL_READ_CMD = const(0x53)
_SET_ANALOG_READ_CMD = const(0x54)

_START_CMD = const(0xE0)
_END_CMD = const(0xEE)
_ERR_CMD = const(0xEF)
_REPLY_FLAG = const(1 << 7)
_CMD_FLAG = const(0)

SOCKET_CLOSED = const(0)
SOCKET_LISTEN = const(1)
SOCKET_SYN_SENT = const(2)
SOCKET_SYN_RCVD = const(3)
SOCKET_ESTABLISHED = const(4)
SOCKET_FIN_WAIT_1 = const(5)
SOCKET_FIN_WAIT_2 = const(6)
SOCKET_CLOSE_WAIT = const(7)
SOCKET_CLOSING = const(8)
SOCKET_LAST_ACK = const(9)
SOCKET_TIME_WAIT = const(10)

WL_NO_SHIELD = const(0xFF)
WL_NO_MODULE = const(0xFF)
WL_IDLE_STATUS = const(0)
WL_NO_SSID_AVAIL = const(1)
WL_SCAN_COMPLETED = const(2)
WL_CONNECTED = const(3)
WL_CONNECT_FAILED = const(4)
WL_CONNECTION_LOST = const(5)
WL_DISCONNECTED = const(6)
WL_AP_LISTENING = const(7)
WL_AP_CONNECTED = const(8)
WL_AP_FAILED = const(9)

ADC_ATTEN_DB_0 = const(0)
ADC_ATTEN_DB_2_5 = const(1)
ADC_ATTEN_DB_6 = const(2)
ADC_ATTEN_DB_11 = const(3)

# pylint: disable=too-many-lines


class Network:
    """A wifi network provided by a nearby access point."""

    def __init__(  # pylint: disable=too-many-arguments
        self,
        esp_spi_control=None,
        raw_ssid=None,
        raw_bssid=None,
        raw_rssi=None,
        raw_channel=None,
        raw_country=None,
        raw_authmode=None,
    ):
        self._esp_spi_control = esp_spi_control
        self._raw_ssid = raw_ssid
        self._raw_bssid = raw_bssid
        self._raw_rssi = raw_rssi
        self._raw_channel = raw_channel
        self._raw_country = raw_country
        self._raw_authmode = raw_authmode

    def _get_response(self, cmd):
        respose = self._esp_spi_control._send_command_get_response(  # pylint: disable=protected-access
            cmd, [b"\xFF"]
        )
        return respose[0]

    @property
    def ssid(self):
        """String id of the network"""
        if self._raw_ssid:
            response = self._raw_ssid
        else:
            response = self._get_response(_GET_CURR_SSID_CMD)
        return response.decode("utf-8")

    @property
    def bssid(self):
        """BSSID of the network (usually the AP’s MAC address)"""
        if self._raw_bssid:
            response = self._raw_bssid
        else:
            response = self._get_response(_GET_CURR_BSSID_CMD)
        return bytes(response)

    @property
    def rssi(self):
        """Signal strength of the network"""
        if self._raw_bssid:
            response = self._raw_rssi
        else:
            response = self._get_response(_GET_CURR_RSSI_CMD)
        return struct.unpack("<i", response)[0]

    @property
    def channel(self):
        """Channel number the network is operating on"""
        if self._raw_channel:
            return self._raw_channel[0]
        return None

    @property
    def country(self):
        """String id of the country code"""
        return self._raw_country

    @property
    def authmode(self):
        """String id of the authmode

        derived from Nina code:
        https://github.com/adafruit/nina-fw/blob/master/arduino/libraries/WiFi/src/WiFi.cpp#L385
        """
        if self._raw_authmode:
            response = self._raw_authmode[0]
        else:
            response = self._get_response(_GET_CURR_ENCT_CMD)[0]

        if response == 7:
            return "OPEN"
        if response == 5:
            return "WEP"
        if response == 2:
            return "PSK"
        if response == 4:
            return "WPA2"
        return "UNKNOWN"


class ESP_SPIcontrol:  # pylint: disable=too-many-public-methods, too-many-instance-attributes
    """A class that will talk to an ESP32 module programmed with special firmware
    that lets it act as a fast an efficient WiFi co-processor"""

    TCP_MODE = const(0)
    UDP_MODE = const(1)
    TLS_MODE = const(2)

    # pylint: disable=too-many-arguments
    def __init__(
        self,
        spi,
        cs_dio,
        ready_dio,
        reset_dio,
        gpio0_dio=None,
        *,
        debug=False,
        debug_show_secrets=False,
    ):
        self._debug = debug
        self._debug_show_secrets = debug_show_secrets
        self.set_psk = False
        self.set_crt = False
        self._buffer = bytearray(10)
        self._pbuf = bytearray(1)  # buffer for param read
        self._sendbuf = bytearray(256)  # buffer for command sending
        self._socknum_ll = [[0]]  # pre-made list of list of socket #

        self._spi_device = SPIDevice(spi, cs_dio, baudrate=8000000)
        self._cs = cs_dio
        self._ready = ready_dio
        self._reset = reset_dio
        self._gpio0 = gpio0_dio
        self._cs.direction = Direction.OUTPUT
        self._ready.direction = Direction.INPUT
        self._reset.direction = Direction.OUTPUT
        # Only one TLS socket at a time is supported so track when we already have one.
        self._tls_socket = None
        if self._gpio0:
            self._gpio0.direction = Direction.INPUT
        self.reset()

    # pylint: enable=too-many-arguments

    def reset(self):
        """Hard reset the ESP32 using the reset pin"""
        if self._debug:
            print("Reset ESP32")
        if self._gpio0:
            self._gpio0.direction = Direction.OUTPUT
            self._gpio0.value = True  # not bootload mode
        self._cs.value = True
        self._reset.value = False
        time.sleep(0.01)  # reset
        self._reset.value = True
        time.sleep(0.75)  # wait for it to boot up
        if self._gpio0:
            self._gpio0.direction = Direction.INPUT

    def _wait_for_ready(self):
        """Wait until the ready pin goes low"""
        if self._debug >= 3:
            print("Wait for ESP32 ready", end="")
        times = time.monotonic()
        while (time.monotonic() - times) < 10:  # wait up to 10 seconds
            if not self._ready.value:  # we're ready!
                break
            if self._debug >= 3:
                print(".", end="")
                time.sleep(0.05)
        else:
            raise TimeoutError("ESP32 not responding")
        if self._debug >= 3:
            print()

    # pylint: disable=too-many-branches
    def _send_command(self, cmd, params=None, *, param_len_16=False):
        """Send over a command with a list of parameters"""
        if not params:
            params = ()

        packet_len = 4  # header + end byte
        for i, param in enumerate(params):
            packet_len += len(param)  # parameter
            packet_len += 1  # size byte
            if param_len_16:
                packet_len += 1  # 2 of em here!
        while packet_len % 4 != 0:
            packet_len += 1
        # we may need more space
        if packet_len > len(self._sendbuf):
            self._sendbuf = bytearray(packet_len)

        self._sendbuf[0] = _START_CMD
        self._sendbuf[1] = cmd & ~_REPLY_FLAG
        self._sendbuf[2] = len(params)

        # handle parameters here
        ptr = 3
        for i, param in enumerate(params):
            if self._debug >= 2:
                print("\tSending param #%d is %d bytes long" % (i, len(param)))
            if param_len_16:
                self._sendbuf[ptr] = (len(param) >> 8) & 0xFF
                ptr += 1
            self._sendbuf[ptr] = len(param) & 0xFF
            ptr += 1
            for j, par in enumerate(param):
                self._sendbuf[ptr + j] = par
            ptr += len(param)
        self._sendbuf[ptr] = _END_CMD

        self._wait_for_ready()
        with self._spi_device as spi:
            times = time.monotonic()
            while (time.monotonic() - times) < 1:  # wait up to 1000ms
                if self._ready.value:  # ok ready to send!
                    break
            else:
                raise TimeoutError("ESP32 timed out on SPI select")
            spi.write(
                self._sendbuf, start=0, end=packet_len
            )  # pylint: disable=no-member
            if self._debug >= 3:
                print("Wrote: ", [hex(b) for b in self._sendbuf[0:packet_len]])

    # pylint: disable=too-many-branches

    def _read_byte(self, spi):
        """Read one byte from SPI"""
        spi.readinto(self._pbuf)
        if self._debug >= 3:
            print("\t\tRead:", hex(self._pbuf[0]))
        return self._pbuf[0]

    def _read_bytes(self, spi, buffer, start=0, end=None):
        """Read many bytes from SPI"""
        if not end:
            end = len(buffer)
        spi.readinto(buffer, start=start, end=end)
        if self._debug >= 3:
            print("\t\tRead:", [hex(i) for i in buffer])

    def _wait_spi_char(self, spi, desired):
        """Read a byte with a retry loop, and if we get it, check that its what we expect"""
        for _ in range(10):
            r = self._read_byte(spi)
            if r == _ERR_CMD:
                raise BrokenPipeError("Error response to command")
            if r == desired:
                return True
            time.sleep(0.01)
        raise TimeoutError("Timed out waiting for SPI char")

    def _check_data(self, spi, desired):
        """Read a byte and verify its the value we want"""
        r = self._read_byte(spi)
        if r != desired:
            raise BrokenPipeError("Expected %02X but got %02X" % (desired, r))

    def _wait_response_cmd(self, cmd, num_responses=None, *, param_len_16=False):
        """Wait for ready, then parse the response"""
        self._wait_for_ready()

        responses = []
        with self._spi_device as spi:
            times = time.monotonic()
            while (time.monotonic() - times) < 1:  # wait up to 1000ms
                if self._ready.value:  # ok ready to send!
                    break
            else:
                raise TimeoutError("ESP32 timed out on SPI select")

            self._wait_spi_char(spi, _START_CMD)
            self._check_data(spi, cmd | _REPLY_FLAG)
            if num_responses is not None:
                self._check_data(spi, num_responses)
            else:
                num_responses = self._read_byte(spi)
            for num in range(num_responses):
                param_len = self._read_byte(spi)
                if param_len_16:
                    param_len <<= 8
                    param_len |= self._read_byte(spi)
                if self._debug >= 2:
                    print("\tParameter #%d length is %d" % (num, param_len))
                response = bytearray(param_len)
                self._read_bytes(spi, response)
                responses.append(response)
            self._check_data(spi, _END_CMD)

        if self._debug >= 2:
            print("Read %d: " % len(responses[0]), responses)
        return responses

    def _send_command_get_response(  # pylint: disable=too-many-arguments
        self,
        cmd,
        params=None,
        *,
        reply_params=1,
        sent_param_len_16=False,
        recv_param_len_16=False,
    ):
        """Send a high level SPI command, wait and return the response"""
        self._send_command(cmd, params, param_len_16=sent_param_len_16)
        return self._wait_response_cmd(
            cmd, reply_params, param_len_16=recv_param_len_16
        )

    @property
    def status(self):
        """The status of the ESP32 WiFi core. Can be WL_NO_SHIELD or WL_NO_MODULE
        (not found), WL_IDLE_STATUS, WL_NO_SSID_AVAIL, WL_SCAN_COMPLETED,
        WL_CONNECTED, WL_CONNECT_FAILED, WL_CONNECTION_LOST, WL_DISCONNECTED,
        WL_AP_LISTENING, WL_AP_CONNECTED, WL_AP_FAILED"""
        resp = self._send_command_get_response(_GET_CONN_STATUS_CMD)
        if self._debug:
            print("Connection status:", resp[0][0])
        return resp[0][0]  # one byte response

    @property
    def firmware_version(self):
        """A string of the firmware version on the ESP32"""
        if self._debug:
            print("Firmware version")
        resp = self._send_command_get_response(_GET_FW_VERSION_CMD)
        return resp[0].decode("utf-8").replace("\x00", "")

    @property
    def MAC_address(self):  # pylint: disable=invalid-name
        """A bytearray containing the MAC address of the ESP32"""
        if self._debug:
            print("MAC address")
        resp = self._send_command_get_response(_GET_MACADDR_CMD, [b"\xFF"])
        return resp[0]

    @property
    def MAC_address_actual(self):  # pylint: disable=invalid-name
        """A bytearray containing the actual MAC address of the ESP32"""
        return bytearray(reversed(self.MAC_address))

    @property
    def mac_address(self):
        """A bytes containing the actual MAC address of the ESP32"""
        return bytes(self.MAC_address_actual)

    def start_scan_networks(self):
        """Begin a scan of visible access points. Follow up with a call
        to 'get_scan_networks' for response"""
        if self._debug:
            print("Start scan")
        resp = self._send_command_get_response(_START_SCAN_NETWORKS)
        if resp[0][0] != 1:
            raise OSError("Failed to start AP scan")

    def get_scan_networks(self):
        """The results of the latest SSID scan. Returns a list of dictionaries with
        'ssid', 'rssi', 'encryption', bssid, and channel entries, one for each AP found
        """
        self._send_command(_SCAN_NETWORKS)
        names = self._wait_response_cmd(_SCAN_NETWORKS)
        # print("SSID names:", names)
        APs = []  # pylint: disable=invalid-name
        for i, name in enumerate(names):
            bssid = self._send_command_get_response(_GET_IDX_BSSID_CMD, ((i,),))[0]
            rssi = self._send_command_get_response(_GET_IDX_RSSI_CMD, ((i,),))[0]
            channel = self._send_command_get_response(_GET_IDX_CHAN_CMD, ((i,),))[0]
            authmode = self._send_command_get_response(_GET_IDX_ENCT_CMD, ((i,),))[0]
            APs.append(
                Network(
                    raw_ssid=name,
                    raw_bssid=bssid,
                    raw_rssi=rssi,
                    raw_channel=channel,
                    raw_authmode=authmode,
                )
            )
        return APs

    def scan_networks(self):
        """Scan for visible access points, returns a list of access point details.
        Returns a list of dictionaries with 'ssid', 'rssi' and 'encryption' entries,
        one for each AP found"""
        self.start_scan_networks()
        for _ in range(10):  # attempts
            time.sleep(2)
            APs = self.get_scan_networks()  # pylint: disable=invalid-name
            if APs:
                return APs
        return None

    def set_ip_config(self, ip_address, gateway, mask="255.255.255.0"):
        """Tells the ESP32 to set ip, gateway and network mask b"\xFF"

        :param str ip_address: IP address (as a string).
        :param str gateway: Gateway (as a string).
        :param str mask: Mask, defaults to 255.255.255.0 (as a string).
        """
        resp = self._send_command_get_response(
            _SET_IP_CONFIG,
            params=[
                b"\x00",
                self.unpretty_ip(ip_address),
                self.unpretty_ip(gateway),
                self.unpretty_ip(mask),
            ],
            sent_param_len_16=False,
        )
        return resp

    def set_dns_config(self, dns1, dns2):
        """Tells the ESP32 to set DNS

        :param str dns1: DNS server 1 IP as a string.
        :param str dns2: DNS server 2 IP as a string.
        """
        resp = self._send_command_get_response(
            _SET_DNS_CONFIG, [b"\x00", self.unpretty_ip(dns1), self.unpretty_ip(dns2)]
        )
        if resp[0][0] != 1:
            raise OSError("Failed to set dns with esp32")

    def set_hostname(self, hostname):
        """Tells the ESP32 to set hostname for DHCP.

        :param str hostname: The new host name.
        """
        resp = self._send_command_get_response(_SET_HOSTNAME, [hostname.encode()])
        if resp[0][0] != 1:
            raise OSError("Failed to set hostname with esp32")

    def wifi_set_network(self, ssid):
        """Tells the ESP32 to set the access point to the given ssid"""
        resp = self._send_command_get_response(_SET_NET_CMD, [ssid])
        if resp[0][0] != 1:
            raise OSError("Failed to set network")

    def wifi_set_passphrase(self, ssid, passphrase):
        """Sets the desired access point ssid and passphrase"""
        resp = self._send_command_get_response(_SET_PASSPHRASE_CMD, [ssid, passphrase])
        if resp[0][0] != 1:
            raise OSError("Failed to set passphrase")

    def wifi_set_entidentity(self, ident):
        """Sets the WPA2 Enterprise anonymous identity"""
        resp = self._send_command_get_response(_SET_ENT_IDENT_CMD, [ident])
        if resp[0][0] != 1:
            raise OSError("Failed to set enterprise anonymous identity")

    def wifi_set_entusername(self, username):
        """Sets the desired WPA2 Enterprise username"""
        resp = self._send_command_get_response(_SET_ENT_UNAME_CMD, [username])
        if resp[0][0] != 1:
            raise OSError("Failed to set enterprise username")

    def wifi_set_entpassword(self, password):
        """Sets the desired WPA2 Enterprise password"""
        resp = self._send_command_get_response(_SET_ENT_PASSWD_CMD, [password])
        if resp[0][0] != 1:
            raise OSError("Failed to set enterprise password")

    def wifi_set_entenable(self):
        """Enables WPA2 Enterprise mode"""
        resp = self._send_command_get_response(_SET_ENT_ENABLE_CMD)
        if resp[0][0] != 1:
            raise OSError("Failed to enable enterprise mode")

    def _wifi_set_ap_network(self, ssid, channel):
        """Creates an Access point with SSID and Channel"""
        resp = self._send_command_get_response(_SET_AP_NET_CMD, [ssid, channel])
        if resp[0][0] != 1:
            raise OSError("Failed to setup AP network")

    def _wifi_set_ap_passphrase(self, ssid, passphrase, channel):
        """Creates an Access point with SSID, passphrase, and Channel"""
        resp = self._send_command_get_response(
            _SET_AP_PASSPHRASE_CMD, [ssid, passphrase, channel]
        )
        if resp[0][0] != 1:
            raise OSError("Failed to setup AP password")

    @property
    def ap_info(self):
        """Network object containing BSSID, SSID, authmode, channel, country and RSSI when
        connected to an access point. None otherwise."""
        if self.is_connected:
            return Network(esp_spi_control=self)
        return None

    @property
    def network_data(self):
        """A dictionary containing current connection details such as the 'ip_addr',
        'netmask' and 'gateway'"""
        resp = self._send_command_get_response(
            _GET_IPADDR_CMD, [b"\xFF"], reply_params=3
        )
        return {"ip_addr": resp[0], "netmask": resp[1], "gateway": resp[2]}

    @property
    def ip_address(self):
        """Our local IP address"""
        return self.network_data["ip_addr"]

    @property
    def connected(self):
        """Whether the ESP32 is connected to an access point"""
        try:
            return self.status == WL_CONNECTED
        except OSError:
            self.reset()
            return False

    @property
    def is_connected(self):
        """Whether the ESP32 is connected to an access point"""
        return self.connected

    @property
    def ap_listening(self):
        """Returns if the ESP32 is in access point mode and is listening for connections"""
        try:
            return self.status == WL_AP_LISTENING
        except OSError:
            self.reset()
            return False

    def disconnect(self):
        """Disconnect from the access point"""
        resp = self._send_command_get_response(_DISCONNECT_CMD)
        if resp[0][0] != 1:
            raise OSError("Failed to disconnect")

    def connect(self, ssid, password=None, timeout=10):
        """Connect to an access point with given name and password.

        **Deprecated functionality:** If the first argument (``ssid``) is a ``dict``,
        assume it is a dictionary with entries for keys ``"ssid"`` and, optionally, ``"password"``.
        This mimics the previous signature for ``connect()``.
        This upward compatibility will be removed in a future release.
        """
        if isinstance(ssid, dict):  # secrets
            warnings.warn(
                "The passing in of `secrets`, is deprecated. Use connect() with `ssid` and "
                "`password` instead and fetch values from settings.toml with `os.getenv()`."
            )
            ssid, password = ssid["ssid"], ssid.get("password")
        self.connect_AP(ssid, password, timeout_s=timeout)

    def connect_AP(self, ssid, password, timeout_s=10):  # pylint: disable=invalid-name
        """Connect to an access point with given name and password.
        Will wait until specified timeout seconds and return on success
        or raise an exception on failure.

        :param ssid: the SSID to connect to
        :param passphrase: the password of the access point
        :param timeout_s: number of seconds until we time out and fail to create AP
        """
        if self._debug:
            print(
                f"Connect to AP: {ssid=}, password=\
                    {repr(password if self._debug_show_secrets else '*' * len(password))}"
            )
        if isinstance(ssid, str):
            ssid = bytes(ssid, "utf-8")
        if password:
            if isinstance(password, str):
                password = bytes(password, "utf-8")
            self.wifi_set_passphrase(ssid, password)
        else:
            self.wifi_set_network(ssid)
        times = time.monotonic()
        while (time.monotonic() - times) < timeout_s:  # wait up until timeout
            stat = self.status
            if stat == WL_CONNECTED:
                return stat
            time.sleep(0.05)
        if stat in (WL_CONNECT_FAILED, WL_CONNECTION_LOST, WL_DISCONNECTED):
            raise ConnectionError("Failed to connect to ssid", ssid)
        if stat == WL_NO_SSID_AVAIL:
            raise ConnectionError("No such ssid", ssid)
        raise OSError("Unknown error 0x%02X" % stat)

    def create_AP(
        self, ssid, password, channel=1, timeout=10
    ):  # pylint: disable=invalid-name
        """Create an access point with the given name, password, and channel.
        Will wait until specified timeout seconds and return on success
        or raise an exception on failure.

        :param str ssid: the SSID of the created Access Point. Must be less than 32 chars.
        :param str password: the password of the created Access Point. Must be 8-63 chars.
        :param int channel: channel of created Access Point (1 - 14).
        :param int timeout: number of seconds until we time out and fail to create AP
        """
        if len(ssid) > 32:
            raise ValueError("ssid must be no more than 32 characters")
        if password and (len(password) < 8 or len(password) > 64):
            raise ValueError("password must be 8 - 63 characters")
        if channel < 1 or channel > 14:
            raise ValueError("channel must be between 1 and 14")

        if isinstance(channel, int):
            channel = bytes(channel)
        if isinstance(ssid, str):
            ssid = bytes(ssid, "utf-8")
        if password:
            if isinstance(password, str):
                password = bytes(password, "utf-8")
            self._wifi_set_ap_passphrase(ssid, password, channel)
        else:
            self._wifi_set_ap_network(ssid, channel)

        times = time.monotonic()
        while (time.monotonic() - times) < timeout:  # wait up to timeout
            stat = self.status
            if stat == WL_AP_LISTENING:
                return stat
            time.sleep(0.05)
        if stat == WL_AP_FAILED:
            raise ConnectionError("Failed to create AP", ssid)
        raise OSError("Unknown error 0x%02x" % stat)

    @property
    def ipv4_address(self):
        """IP address of the station when connected to an access point."""
        return self.pretty_ip(self.ip_address)

    def pretty_ip(self, ip):  # pylint: disable=no-self-use, invalid-name
        """Converts a bytearray IP address to a dotted-quad string for printing"""
        return "%d.%d.%d.%d" % (ip[0], ip[1], ip[2], ip[3])

    def unpretty_ip(self, ip):  # pylint: disable=no-self-use, invalid-name
        """Converts a dotted-quad string to a bytearray IP address"""
        octets = [int(x) for x in ip.split(".")]
        return bytes(octets)

    def get_host_by_name(self, hostname):
        """Convert a hostname to a packed 4-byte IP address. Returns
        a 4 bytearray"""
        if self._debug:
            print("*** Get host by name")
        if isinstance(hostname, str):
            hostname = bytes(hostname, "utf-8")
        resp = self._send_command_get_response(_REQ_HOST_BY_NAME_CMD, (hostname,))
        if resp[0][0] != 1:
            raise ConnectionError("Failed to request hostname")
        resp = self._send_command_get_response(_GET_HOST_BY_NAME_CMD)
        return resp[0]

    def ping(self, dest, ttl=250):
        """Ping a destination IP address or hostname, with a max time-to-live
        (ttl). Returns a millisecond timing value"""
        if isinstance(dest, str):  # convert to IP address
            dest = self.get_host_by_name(dest)
        # ttl must be between 0 and 255
        ttl = max(0, min(ttl, 255))
        resp = self._send_command_get_response(_PING_CMD, (dest, (ttl,)))
        return struct.unpack("<H", resp[0])[0]

    def get_socket(self):
        """Request a socket from the ESP32, will allocate and return a number that
        can then be passed to the other socket commands"""
        if self._debug:
            print("*** Get socket")
        resp = self._send_command_get_response(_GET_SOCKET_CMD)
        resp = resp[0][0]
        if resp == 255:
            raise OSError(23)  # ENFILE - File table overflow
        if self._debug:
            print("Allocated socket #%d" % resp)
        return resp

    def socket_open(self, socket_num, dest, port, conn_mode=TCP_MODE):
        """Open a socket to a destination IP address or hostname
        using the ESP32's internal reference number. By default we use
        'conn_mode' TCP_MODE but can also use UDP_MODE or TLS_MODE
        (dest must be hostname for TLS_MODE!)"""
        self._socknum_ll[0][0] = socket_num
        if self._debug:
            print("*** Open socket to", dest, port, conn_mode)
        if conn_mode == ESP_SPIcontrol.TLS_MODE and self._tls_socket is not None:
            raise OSError(23, "Only one open SSL connection allowed")
        port_param = struct.pack(">H", port)
        if isinstance(dest, str):  # use the 5 arg version
            dest = bytes(dest, "utf-8")
            resp = self._send_command_get_response(
                _START_CLIENT_TCP_CMD,
                (
                    dest,
                    b"\x00\x00\x00\x00",
                    port_param,
                    self._socknum_ll[0],
                    (conn_mode,),
                ),
            )
        else:  # ip address, use 4 arg vesion
            resp = self._send_command_get_response(
                _START_CLIENT_TCP_CMD,
                (dest, port_param, self._socknum_ll[0], (conn_mode,)),
            )
        if resp[0][0] != 1:
            raise ConnectionError("Could not connect to remote server")
        if conn_mode == ESP_SPIcontrol.TLS_MODE:
            self._tls_socket = socket_num

    def socket_status(self, socket_num):
        """Get the socket connection status, can be SOCKET_CLOSED, SOCKET_LISTEN,
        SOCKET_SYN_SENT, SOCKET_SYN_RCVD, SOCKET_ESTABLISHED, SOCKET_FIN_WAIT_1,
        SOCKET_FIN_WAIT_2, SOCKET_CLOSE_WAIT, SOCKET_CLOSING, SOCKET_LAST_ACK, or
        SOCKET_TIME_WAIT"""
        self._socknum_ll[0][0] = socket_num
        resp = self._send_command_get_response(
            _GET_CLIENT_STATE_TCP_CMD, self._socknum_ll
        )
        return resp[0][0]

    def socket_connected(self, socket_num):
        """Test if a socket is connected to the destination, returns boolean true/false"""
        return self.socket_status(socket_num) == SOCKET_ESTABLISHED

    def socket_write(self, socket_num, buffer, conn_mode=TCP_MODE):
        """Write the bytearray buffer to a socket"""
        if self._debug:
            print("Writing:", buffer)
        self._socknum_ll[0][0] = socket_num
        sent = 0
        total_chunks = (len(buffer) // 64) + 1
        send_command = _SEND_DATA_TCP_CMD
        if conn_mode == self.UDP_MODE:  # UDP requires a different command to write
            send_command = _INSERT_DATABUF_TCP_CMD
        for chunk in range(total_chunks):
            resp = self._send_command_get_response(
                send_command,
                (
                    self._socknum_ll[0],
                    memoryview(buffer)[(chunk * 64) : ((chunk + 1) * 64)],
                ),
                sent_param_len_16=True,
            )
            sent += resp[0][0]

        if conn_mode == self.UDP_MODE:
            # UDP verifies chunks on write, not bytes
            if sent != total_chunks:
                raise ConnectionError(
                    "Failed to write %d chunks (sent %d)" % (total_chunks, sent)
                )
            # UDP needs to finalize with this command, does the actual sending
            resp = self._send_command_get_response(_SEND_UDP_DATA_CMD, self._socknum_ll)
            if resp[0][0] != 1:
                raise ConnectionError("Failed to send UDP data")
            return

        if sent != len(buffer):
            self.socket_close(socket_num)
            raise ConnectionError(
                "Failed to send %d bytes (sent %d)" % (len(buffer), sent)
            )

        resp = self._send_command_get_response(_DATA_SENT_TCP_CMD, self._socknum_ll)
        if resp[0][0] != 1:
            raise ConnectionError("Failed to verify data sent")

    def socket_available(self, socket_num):
        """Determine how many bytes are waiting to be read on the socket"""
        self._socknum_ll[0][0] = socket_num
        resp = self._send_command_get_response(_AVAIL_DATA_TCP_CMD, self._socknum_ll)
        reply = struct.unpack("<H", resp[0])[0]
        if self._debug:
            print("ESPSocket: %d bytes available" % reply)
        return reply

    def socket_read(self, socket_num, size):
        """Read up to 'size' bytes from the socket number. Returns a bytes"""
        if self._debug:
            print(
                "Reading %d bytes from ESP socket with status %d"
                % (size, self.socket_status(socket_num))
            )
        self._socknum_ll[0][0] = socket_num
        resp = self._send_command_get_response(
            _GET_DATABUF_TCP_CMD,
            (self._socknum_ll[0], (size & 0xFF, (size >> 8) & 0xFF)),
            sent_param_len_16=True,
            recv_param_len_16=True,
        )
        return bytes(resp[0])

    def socket_connect(self, socket_num, dest, port, conn_mode=TCP_MODE):
        """Open and verify we connected a socket to a destination IP address or hostname
        using the ESP32's internal reference number. By default we use
        'conn_mode' TCP_MODE but can also use UDP_MODE or TLS_MODE (dest must
        be hostname for TLS_MODE!)"""
        if self._debug:
            print("*** Socket connect mode", conn_mode)

        self.socket_open(socket_num, dest, port, conn_mode=conn_mode)
        if conn_mode == self.UDP_MODE:
            # UDP doesn't actually establish a connection
            # but the socket for writing is created via start_server
            self.start_server(port, socket_num, conn_mode)
            return True

        times = time.monotonic()
        while (time.monotonic() - times) < 3:  # wait 3 seconds
            if self.socket_connected(socket_num):
                return True
            time.sleep(0.01)
        raise TimeoutError("Failed to establish connection")

    def socket_close(self, socket_num):
        """Close a socket using the ESP32's internal reference number"""
        if self._debug:
            print("*** Closing socket #%d" % socket_num)
        self._socknum_ll[0][0] = socket_num
        try:
            self._send_command_get_response(_STOP_CLIENT_TCP_CMD, self._socknum_ll)
        except OSError:
            pass
        if socket_num == self._tls_socket:
            self._tls_socket = None

    def start_server(
        self, port, socket_num, conn_mode=TCP_MODE, ip=None
    ):  # pylint: disable=invalid-name
        """Opens a server on the specified port, using the ESP32's internal reference number"""
        if self._debug:
            print("*** starting server")
        self._socknum_ll[0][0] = socket_num
        params = [struct.pack(">H", port), self._socknum_ll[0], (conn_mode,)]
        if ip:
            params.insert(0, ip)
        resp = self._send_command_get_response(_START_SERVER_TCP_CMD, params)

        if resp[0][0] != 1:
            raise OSError("Could not start server")

    def server_state(self, socket_num):
        """Get the state of the ESP32's internal reference server socket number"""
        self._socknum_ll[0][0] = socket_num
        resp = self._send_command_get_response(_GET_STATE_TCP_CMD, self._socknum_ll)
        return resp[0][0]

    def get_remote_data(self, socket_num):
        """Get the IP address and port of the remote host"""
        self._socknum_ll[0][0] = socket_num
        resp = self._send_command_get_response(
            _GET_REMOTE_DATA_CMD, self._socknum_ll, reply_params=2
        )
        return {"ip_addr": resp[0], "port": struct.unpack("<H", resp[1])[0]}

    def set_esp_debug(self, enabled):
        """Enable/disable debug mode on the ESP32. Debug messages will be
        written to the ESP32's UART."""
        resp = self._send_command_get_response(_SET_DEBUG_CMD, ((bool(enabled),),))
        if resp[0][0] != 1:
            raise OSError("Failed to set debug mode")

    def set_pin_mode(self, pin, mode):
        """Set the io mode for a GPIO pin.

        :param int pin: ESP32 GPIO pin to set.
        :param value: direction for pin, digitalio.Direction or integer (0=input, 1=output).
        """
        if mode == Direction.OUTPUT:
            pin_mode = 1
        elif mode == Direction.INPUT:
            pin_mode = 0
        else:
            pin_mode = mode
        resp = self._send_command_get_response(_SET_PIN_MODE_CMD, ((pin,), (pin_mode,)))
        if resp[0][0] != 1:
            raise OSError("Failed to set pin mode")

    def set_digital_write(self, pin, value):
        """Set the digital output value of pin.

        :param int pin: ESP32 GPIO pin to write to.
        :param bool value: Value for the pin.
        """
        resp = self._send_command_get_response(
            _SET_DIGITAL_WRITE_CMD, ((pin,), (value,))
        )
        if resp[0][0] != 1:
            raise OSError("Failed to write to pin")

    def set_analog_write(self, pin, analog_value):
        """Set the analog output value of pin, using PWM.

        :param int pin: ESP32 GPIO pin to write to.
        :param float value: 0=off 1.0=full on
        """
        value = int(255 * analog_value)
        resp = self._send_command_get_response(
            _SET_ANALOG_WRITE_CMD, ((pin,), (value,))
        )
        if resp[0][0] != 1:
            raise OSError("Failed to write to pin")

    def set_digital_read(self, pin):
        """Get the digital input value of pin. Returns the boolean value of the pin.

        :param int pin: ESP32 GPIO pin to read from.
        """
        # Verify nina-fw => 1.5.0
        fw_semver_maj = self.firmware_version[2]
        assert int(fw_semver_maj) >= 5, "Please update nina-fw to 1.5.0 or above."

        resp = self._send_command_get_response(_SET_DIGITAL_READ_CMD, ((pin,),))[0]
        if resp[0] == 0:
            return False
        if resp[0] == 1:
            return True
        raise OSError(
            "_SET_DIGITAL_READ response error: response is not boolean", resp[0]
        )

    def set_analog_read(self, pin, atten=ADC_ATTEN_DB_11):
        """Get the analog input value of pin. Returns an int between 0 and 65536.

        :param int pin: ESP32 GPIO pin to read from.
        :param int atten: attenuation constant
        """
        # Verify nina-fw => 1.5.0
        fw_semver_maj = self.firmware_version[2]
        assert int(fw_semver_maj) >= 5, "Please update nina-fw to 1.5.0 or above."

        resp = self._send_command_get_response(_SET_ANALOG_READ_CMD, ((pin,), (atten,)))
        resp_analog = struct.unpack("<i", resp[0])
        if resp_analog[0] < 0:
            raise ValueError(
                "_SET_ANALOG_READ parameter error: invalid pin", resp_analog[0]
            )
        if self._debug:
            print(resp, resp_analog, resp_analog[0], 16 * resp_analog[0])
        return 16 * resp_analog[0]

    def get_time(self):
        """The current unix timestamp"""
        if self.status == WL_CONNECTED:
            resp = self._send_command_get_response(_GET_TIME)
            resp_time = struct.unpack("<i", resp[0])
            if resp_time == (0,):
                raise OSError("_GET_TIME returned 0")
            return resp_time
        if self.status in (WL_AP_LISTENING, WL_AP_CONNECTED):
            raise OSError(
                "Cannot obtain NTP while in AP mode, must be connected to internet"
            )
        raise OSError("Must be connected to WiFi before obtaining NTP.")

    def set_certificate(self, client_certificate):
        """Sets client certificate. Must be called
        BEFORE a network connection is established.

        :param str client_certificate: User-provided .PEM certificate up to 1300 bytes.
        """
        if self._debug:
            print("** Setting client certificate")
        if self.status == WL_CONNECTED:
            raise ValueError(
                "set_certificate must be called BEFORE a connection is established."
            )
        if isinstance(client_certificate, str):
            client_certificate = bytes(client_certificate, "utf-8")
        if "-----BEGIN CERTIFICATE" not in client_certificate:
            raise TypeError(".PEM must start with -----BEGIN CERTIFICATE")
        assert len(client_certificate) < 1300, ".PEM must be less than 1300 bytes."
        resp = self._send_command_get_response(_SET_CLI_CERT, (client_certificate,))
        if resp[0][0] != 1:
            raise OSError("Failed to set client certificate")
        self.set_crt = True
        return resp[0]

    def set_private_key(self, private_key):
        """Sets private key. Must be called
        BEFORE a network connection is established.

        :param str private_key: User-provided .PEM file up to 1700 bytes.
        """
        if self._debug:
            print("** Setting client's private key.")
        if self.status == WL_CONNECTED:
            raise ValueError(
                "set_private_key must be called BEFORE a connection is established."
            )
        if isinstance(private_key, str):
            private_key = bytes(private_key, "utf-8")
        if "-----BEGIN RSA" not in private_key:
            raise TypeError(".PEM must start with -----BEGIN RSA")
        assert len(private_key) < 1700, ".PEM must be less than 1700 bytes."
        resp = self._send_command_get_response(_SET_PK, (private_key,))
        if resp[0][0] != 1:
            raise OSError("Failed to set private key.")
        self.set_psk = True
        return resp[0]