Skip to content

PortalBase refactoring to work with PyPortal better #8

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Jan 12, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 15 additions & 3 deletions adafruit_portalbase/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ class PortalBase:
on-board NeoPixel. Defaults to ``None``, to not use the status LED
:param json_transform: A function or a list of functions to call with the parsed JSON.
Changes and additions are permitted for the ``dict`` object.
:param success_callback: A function we'll call if you like, when we fetch data successfully.
Defaults to ``None``.
:param debug: Turn on debug print outs. Defaults to False.

"""
Expand All @@ -64,6 +66,7 @@ def __init__(
json_path=None,
regexp_path=None,
json_transform=None,
success_callback=None,
debug=False,
):
self.network = network
Expand All @@ -88,6 +91,7 @@ def __init__(
self.json_path = json_path

self._regexp_path = regexp_path
self._success_callback = success_callback

# Add any JSON translators
if json_transform:
Expand Down Expand Up @@ -362,6 +366,17 @@ def fetch(self, refresh_url=None, timeout=10):
timeout=timeout,
)

# if we have a callback registered, call it now
if self._success_callback:
self._success_callback(values)

self._fill_text_labels(values)

if len(values) == 1:
return values[0]
return values

def _fill_text_labels(self, values):
# fill out all the text blocks
if self._text:
value_index = 0 # In case values and text is not the same
Expand All @@ -379,9 +394,6 @@ def fetch(self, refresh_url=None, timeout=10):
string = values[value_index] # ok it's a string
self._fetch_set_text(string, index=i)
value_index += 1
if len(values) == 1:
return values[0]
return values

def get_local_time(self, location=None):
"""Accessor function for get_local_time()"""
Expand Down
2 changes: 1 addition & 1 deletion adafruit_portalbase/graphics.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ def set_background(self, file_or_color, position=None):
def qrcode(
self, qr_data, *, qr_size=1, x=0, y=0, qr_color=0x000000
): # pylint: disable=invalid-name
"""Display a QR code on the eInk
"""Display a QR code

:param qr_data: The data for the QR code.
:param int qr_size: The scale of the QR code.
Expand Down
188 changes: 130 additions & 58 deletions adafruit_portalbase/network.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,12 @@ def __init__(
self.json_transform = []
self._extract_values = extract_values

self._json_types = [
"application/json",
"application/javascript",
"application/geo+json",
]

# This may be removed. Using for testing
self.requests = None

Expand Down Expand Up @@ -185,7 +191,7 @@ def get_local_time(self, location=None):
"Error connection to Adafruit IO. The response was: "
+ response.text
)
raise ValueError(error_message)
raise RuntimeError(error_message)
if self._debug:
print("Time request: ", api_url)
print("Time reply: ", response.text)
Expand Down Expand Up @@ -427,47 +433,40 @@ def fetch(self, url, *, headers=None, timeout=10):

return response

def fetch_data(
self,
url,
*,
headers=None,
json_path=None,
regexp_path=None,
timeout=10,
):
"""Fetch data from the specified url and perfom any parsing
def add_json_content_type(self, content_type):
"""
Add a JSON content type

:param str url: The URL to fetch from.
:param list headers: Extra headers to include in the request.
:param json_path: The path to drill down into the JSON data.
:param regexp_path: The path formatted as a regular expression to drill down
into the JSON data.
:param int timeout: The timeout period in seconds.
:param str type: The content JSON type like 'application/json'

"""
json_out = None
values = []
content_type = CONTENT_TEXT
if isinstance(content_type, str):
self._json_types.append(content_type)

def _detect_content_type(self, headers):
if "content-type" in headers:
if "image/" in headers["content-type"]:
return CONTENT_IMAGE
for json_type in self._json_types:
if json_type in headers["content-type"]:
return CONTENT_JSON
return CONTENT_TEXT

def check_response(self, response):
"""
Check the response object status code, change the lights, and return content type

response = self.fetch(url, headers=headers, timeout=timeout)
:param response: The response object from a network call

"""
headers = self._get_headers(response)

headers = {}
for title, content in response.headers.items():
headers[title.lower()] = content
gc.collect()
if self._debug:
print("Headers:", headers)
if response.status_code == 200:
print("Reply is OK!")
self.neo_status(STATUS_DATA_RECEIVED) # green = got data
if "content-type" in headers:
if "image/" in headers["content-type"]:
content_type = CONTENT_IMAGE
elif "application/json" in headers["content-type"]:
content_type = CONTENT_JSON
elif "application/javascript" in headers["content-type"]:
content_type = CONTENT_JSON
content_type = self._detect_content_type(headers)
else:
if self._debug:
if "content-length" in headers:
Expand All @@ -481,11 +480,56 @@ def fetch_data(
)
)

if content_type == CONTENT_JSON and json_path is not None:
if isinstance(json_path, (list, tuple)) and (
not json_path or not isinstance(json_path[0], (list, tuple))
):
json_path = (json_path,)
return content_type

@staticmethod
def _get_headers(response):
headers = {}
for title, content in response.headers.items():
headers[title.lower()] = content
gc.collect()
return headers

def fetch_data(
self,
url,
*,
headers=None,
json_path=None,
regexp_path=None,
timeout=10,
):
"""Fetch data from the specified url and perfom any parsing

:param str url: The URL to fetch from.
:param list headers: Extra headers to include in the request.
:param json_path: The path to drill down into the JSON data.
:param regexp_path: The path formatted as a regular expression to search
the text data.
:param int timeout: The timeout period in seconds.

"""
response = self.fetch(url, headers=headers, timeout=timeout)
return self._parse_data(response, json_path=json_path, regexp_path=regexp_path)

def _parse_data(
self,
response,
*,
json_path=None,
regexp_path=None,
):

json_out = None
content_type = self.check_response(response)

if content_type == CONTENT_JSON:
if json_path is not None:
# Drill down to the json path and set json_out as that node
if isinstance(json_path, (list, tuple)) and (
not json_path or not isinstance(json_path[0], (list, tuple))
):
json_path = (json_path,)
try:
gc.collect()
json_out = response.json()
Expand All @@ -498,43 +542,71 @@ def fetch_data(
except MemoryError:
supervisor.reload()

if content_type == CONTENT_JSON:
values = self.process_json(json_out, json_path)
elif content_type == CONTENT_TEXT:
values = self.process_text(response.text, regexp_path)

# Clean up
json_out = None
response = None
if self._extract_values and len(values) == 1:
values = values[0]

gc.collect()

return values

@staticmethod
def process_text(text, regexp_path):
"""
Process text content

:param str text: The entire text content
:param regexp_path: The path formatted as a regular expression to search
the text data.

"""
values = []
if regexp_path:
import re # pylint: disable=import-outside-toplevel

for regexp in regexp_path:
values.append(re.search(regexp, text).group(1))
else:
values = text
return values

def process_json(self, json_data, json_path):
"""
Process JSON content

:param dict json_data: The JSON data as a dict
:param json_path: The path to drill down into the JSON data.

"""
values = []

# optional JSON post processing, apply any transformations
# these MAY change/add element
for idx, json_transform in enumerate(self.json_transform):
try:
json_transform(json_out)
json_transform(json_data)
except Exception as error:
print("Exception from json_transform: ", idx, error)
raise

# extract desired text/values from json
if json_out is not None and json_path:
if json_data is not None and json_path:
for path in json_path:
try:
values.append(self.json_traverse(json_out, path))
values.append(self.json_traverse(json_data, path))
except KeyError:
print(json_out)
print(json_data)
raise
elif content_type == CONTENT_TEXT and regexp_path:
for regexp in regexp_path:
values.append(re.search(regexp, response.text).group(1))
else:
if json_out:
# No path given, so return JSON as string for compatibility
import json # pylint: disable=import-outside-toplevel

values = json.dumps(response.json())
else:
values = response.text

# we're done with the requests object, lets delete it so we can do more!
json_out = None
response = None
gc.collect()
if self._extract_values and len(values) == 1:
return values[0]
# No path given, so return JSON as string for compatibility
import json # pylint: disable=import-outside-toplevel

values = json.dumps(json_data)
return values