Skip to content

301/302 Redirects, FormData files, Cookies, Token authentication #67

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from 21 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
503522c
Added tuple to possible data types of route(methods=...)
michalpokusa Aug 27, 2023
4ef72f1
Refactor of Redirect response class to allow 301 and 302 codes
michalpokusa Aug 27, 2023
bc579e8
Updated example for redirects
michalpokusa Aug 27, 2023
d743d1c
Minor docstring, typing and encoding fixes
michalpokusa Aug 27, 2023
6031bec
Added Headers.get_directive() and Headers.get_parameter()
michalpokusa Aug 27, 2023
60a472e
Added option to construct Headers from str and refactor of parsing re…
michalpokusa Aug 27, 2023
5da48c2
Added Token authentication method
michalpokusa Aug 27, 2023
9a79039
Refactor of QueryParams and FormData, moved interfaces to separate file
michalpokusa Aug 28, 2023
46c22bf
Refactor of Headers, implemented _IFieldStorage interface
michalpokusa Aug 28, 2023
4ec5f9a
Added cookies handling and example for it
michalpokusa Aug 28, 2023
e671f8e
Added elapsed time to _debug_response_sent
michalpokusa Aug 28, 2023
8c16b27
Added info abotu adding cookies using Set-Cookie headers
michalpokusa Aug 29, 2023
dc7bec0
Added pool_interval parameter to Server.serve_forever
michalpokusa Aug 31, 2023
4ec080e
Fix #69: `'None'` instead of `None` in FormData.get()
michalpokusa Sep 1, 2023
f2773cd
Fix #68: Incorrect parsing of x-www-from-urlencoded in some cases
michalpokusa Sep 1, 2023
d2e9c58
Improvements in FormData parsing and Content-Type determining
michalpokusa Sep 1, 2023
d1f8fda
Pylint CI fix
michalpokusa Sep 1, 2023
615290d
Minor change in redirect example
michalpokusa Sep 1, 2023
eff94fa
Fix typing in Server.route
michalpokusa Sep 1, 2023
36dc073
Fix: Missing headers in Response constructor
michalpokusa Sep 1, 2023
d051ef5
Made Request.json() available for PUT, PATCH and DELETE requests
michalpokusa Sep 1, 2023
403fe01
Adjusted mDNS docs about hotname without .local
michalpokusa Sep 2, 2023
ccef0a8
Minor change in docstring
michalpokusa Sep 2, 2023
af880fc
Added File.content_bytes
michalpokusa Sep 2, 2023
d945f6f
Minor typo fixes in docs
michalpokusa Sep 5, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions adafruit_httpserver/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@

from .authentication import (
Basic,
Token,
Bearer,
check_authentication,
require_authentication,
Expand Down Expand Up @@ -77,6 +78,8 @@
ACCEPTED_202,
NO_CONTENT_204,
PARTIAL_CONTENT_206,
MOVED_PERMANENTLY_301,
FOUND_302,
TEMPORARY_REDIRECT_307,
PERMANENT_REDIRECT_308,
BAD_REQUEST_400,
Expand Down
24 changes: 18 additions & 6 deletions adafruit_httpserver/authentication.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,17 +28,27 @@ def __str__(self) -> str:
return f"Basic {self._value}"


class Bearer:
"""Represents HTTP Bearer Token Authentication."""
class Token:
"""Represents HTTP Token Authentication."""

prefix = "Token"

def __init__(self, token: str) -> None:
self._value = token

def __str__(self) -> str:
return f"Bearer {self._value}"
return f"{self.prefix} {self._value}"


class Bearer(Token): # pylint: disable=too-few-public-methods
"""Represents HTTP Bearer Token Authentication."""

prefix = "Bearer"


def check_authentication(request: Request, auths: List[Union[Basic, Bearer]]) -> bool:
def check_authentication(
request: Request, auths: List[Union[Basic, Token, Bearer]]
) -> bool:
"""
Returns ``True`` if request is authorized by any of the authentications, ``False`` otherwise.

Expand All @@ -47,15 +57,17 @@ def check_authentication(request: Request, auths: List[Union[Basic, Bearer]]) ->
check_authentication(request, [Basic("username", "password")])
"""

auth_header = request.headers.get("Authorization")
auth_header = request.headers.get_directive("Authorization")

if auth_header is None:
return False

return any(auth_header == str(auth) for auth in auths)


def require_authentication(request: Request, auths: List[Union[Basic, Bearer]]) -> None:
def require_authentication(
request: Request, auths: List[Union[Basic, Token, Bearer]]
) -> None:
"""
Checks if the request is authorized and raises ``AuthenticationError`` if not.

Expand Down
113 changes: 79 additions & 34 deletions adafruit_httpserver/headers.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,14 @@
"""

try:
from typing import Dict, Tuple
from typing import Dict, List, Union
except ImportError:
pass

from .interfaces import _IFieldStorage

class Headers:

class Headers(_IFieldStorage):
"""
A dict-like class for storing HTTP headers.

Expand All @@ -23,6 +25,8 @@ class Headers:

Examples::

headers = Headers("Content-Type: text/html\\r\\nContent-Length: 1024\\r\\n")
# or
headers = Headers({"Content-Type": "text/html", "Content-Length": "1024"})

len(headers)
Expand All @@ -45,60 +49,101 @@ class Headers:
# True
"""

_storage: Dict[str, Tuple[str, str]]
_storage: Dict[str, List[str]]

def __init__(self, headers: Union[str, Dict[str, str]] = None) -> None:
self._storage = {}

def __init__(self, headers: Dict[str, str] = None) -> None:
headers = headers or {}
if isinstance(headers, str):
for header_line in headers.strip().splitlines():
name, value = header_line.split(": ", 1)
self.add(name, value)
else:
for key, value in (headers or {}).items():
self.add(key, value)

self._storage = {key.lower(): [key, value] for key, value in headers.items()}
def add(self, field_name: str, value: str):
"""
Adds a header with the given field name and value.
Allows adding multiple headers with the same name.
"""
self._add_field_value(field_name.lower(), value)

def get(self, name: str, default: str = None):
def get(self, field_name: str, default: str = None) -> Union[str, None]:
"""Returns the value for the given header name, or default if not found."""
return self._storage.get(name.lower(), [None, default])[1]
return super().get(field_name.lower(), default)

def setdefault(self, name: str, default: str = None):
"""Sets the value for the given header name if it does not exist."""
return self._storage.setdefault(name.lower(), [name, default])[1]
def get_list(self, field_name: str) -> List[str]:
"""Get the list of values of a field."""
return super().get_list(field_name.lower())

def get_directive(self, name: str, default: str = None) -> Union[str, None]:
"""
Returns the main value (directive) for the given header name, or default if not found.

Example::

headers = Headers({"Content-Type": "text/html; charset=utf-8"})
headers.get_directive("Content-Type")
# 'text/html'
"""

header_value = self.get(name)
if header_value is None:
return default
return header_value.split(";")[0].strip('" ')

def get_parameter(
self, name: str, parameter: str, default: str = None
) -> Union[str, None]:
"""
Returns the value of the given parameter for the given header name, or default if not found.

def items(self):
"""Returns a list of (name, value) tuples."""
return dict(self._storage.values()).items()
Example::

def keys(self):
"""Returns a list of header names."""
return dict(self._storage.values()).keys()
headers = Headers({"Content-Type": "text/html; charset=utf-8"})
headers.get_parameter("Content-Type", "charset")
# 'utf-8'
"""

def values(self):
"""Returns a list of header values."""
return dict(self._storage.values()).values()
header_value = self.get(name)
if header_value is None:
return default
for header_parameter in header_value.split(";"):
if header_parameter.strip().startswith(parameter):
return header_parameter.strip().split("=")[1].strip('" ')
return default

def set(self, name: str, value: str):
"""Sets the value for the given header name."""
self._storage[name.lower()] = [value]

def setdefault(self, name: str, default: str = None):
"""Sets the value for the given header name if it does not exist."""
return self._storage.setdefault(name.lower(), [default])

def update(self, headers: Dict[str, str]):
"""Updates the headers with the given dict."""
return self._storage.update(
{key.lower(): [key, value] for key, value in headers.items()}
{key.lower(): [value] for key, value in headers.items()}
)

def copy(self):
"""Returns a copy of the headers."""
return Headers(dict(self._storage.values()))
return Headers(
"\r\n".join(
f"{key}: {value}" for key in self.fields for value in self.get_list(key)
)
)

def __getitem__(self, name: str):
return self._storage[name.lower()][1]
return super().__getitem__(name.lower())

def __setitem__(self, name: str, value: str):
self._storage[name.lower()] = [name, value]
self._storage[name.lower()] = [value]

def __delitem__(self, name: str):
del self._storage[name.lower()]

def __iter__(self):
return iter(dict(self._storage.values()))

def __len__(self):
return len(self._storage)

def __contains__(self, key: str):
return key.lower() in self._storage.keys()

def __repr__(self):
return f"{self.__class__.__name__}({dict(self._storage.values())})"
return super().__contains__(key.lower())
109 changes: 109 additions & 0 deletions adafruit_httpserver/interfaces.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
# SPDX-FileCopyrightText: Copyright (c) 2023 Michał Pokusa
#
# SPDX-License-Identifier: MIT
"""
`adafruit_httpserver.interfaces`
====================================================
* Author(s): Michał Pokusa
"""

try:
from typing import List, Dict, Union, Any
except ImportError:
pass


class _IFieldStorage:
"""Interface with shared methods for QueryParams, FormData and Headers."""

_storage: Dict[str, List[Any]]

def _add_field_value(self, field_name: str, value: Any) -> None:
if field_name not in self._storage:
self._storage[field_name] = [value]
else:
self._storage[field_name].append(value)

def get(self, field_name: str, default: Any = None) -> Union[Any, None]:
"""Get the value of a field."""
return self._storage.get(field_name, [default])[0]

def get_list(self, field_name: str) -> List[Any]:
"""Get the list of values of a field."""
return self._storage.get(field_name, [])

@property
def fields(self):
"""Returns a list of field names."""
return list(self._storage.keys())

def items(self):
"""Returns a list of (name, value) tuples."""
return [(key, value) for key in self.fields for value in self.get_list(key)]

def keys(self):
"""Returns a list of header names."""
return self.fields

def values(self):
"""Returns a list of header values."""
return [value for key in self.keys() for value in self.get_list(key)]

def __getitem__(self, field_name: str):
return self._storage[field_name][0]

def __iter__(self):
return iter(self._storage)

def __len__(self) -> int:
return len(self._storage)

def __contains__(self, key: str) -> bool:
return key in self._storage

def __repr__(self) -> str:
return f"{self.__class__.__name__}({repr(self._storage)})"


def _encode_html_entities(value: Union[str, None]) -> Union[str, None]:
"""Encodes unsafe HTML characters that could enable XSS attacks."""
if value is None:
return None

return (
str(value)
.replace("&", "&")
.replace("<", "&lt;")
.replace(">", "&gt;")
.replace('"', "&quot;")
.replace("'", "&apos;")
)


class _IXSSSafeFieldStorage(_IFieldStorage):
def get(
self, field_name: str, default: Any = None, *, safe=True
) -> Union[Any, None]:
if safe:
return _encode_html_entities(super().get(field_name, default))

_debug_warning_nonencoded_output()
return super().get(field_name, default)

def get_list(self, field_name: str, *, safe=True) -> List[Any]:
if safe:
return [
_encode_html_entities(value) for value in super().get_list(field_name)
]

_debug_warning_nonencoded_output()
return super().get_list(field_name)


def _debug_warning_nonencoded_output():
"""Warns about XSS risks."""
print(
"WARNING: Setting safe to False makes XSS vulnerabilities possible by "
"allowing access to raw untrusted values submitted by users. If this data is reflected "
"or shown within HTML without proper encoding it could enable Cross-Site Scripting."
)
Loading