Skip to content

[PoC] Route around APrinter with Pandas Dataframe #1099

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 5 commits into
base: dev
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ jobs:
needs: build
# only on main and dev branch
# TODO: #1112 Remove `|| github.ref == 'refs/heads/api-keys'` after transition to production status.
if: github.ref == 'refs/heads/main' || github.ref == 'refs/heads/dev' || github.ref == 'refs/heads/api-keys'
if: github.ref == 'refs/heads/main' || github.ref == 'refs/heads/dev' || github.ref == 'refs/heads/api-keys' || github.ref == 'refs/heads/ds/dev-no-aprinter'

runs-on: ubuntu-latest
steps:
Expand All @@ -141,6 +141,9 @@ jobs:
if [ "$imageTag" = "main" ] ; then
imageTag="latest"
fi
if [ "$imageTag" = "ds/dev-no-aprinter" ] ; then
imageTag="dev-no-aprinter"
fi
echo "::set-output name=tag::$imageTag"
echo "::set-output name=repo::ghcr.io/${{ github.repository }}"
- name: Push Dev Tag
Expand Down
165 changes: 156 additions & 9 deletions src/server/endpoints/covidcast.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,16 @@
from typing import List, Optional, Tuple, Dict, Any
from typing import Callable, List, Optional, Tuple, Dict, Any
from itertools import groupby
from datetime import date, timedelta
from epiweeks import Week
from flask import Blueprint, request
from flask import Blueprint, request, Response
from flask.json import loads, jsonify
from bisect import bisect_right
from sqlalchemy import text
from pandas import read_csv, to_datetime
from pandas import read_csv, to_datetime, DataFrame, concat
import pandas as pd

from .._common import is_compatibility_mode, db
from .._config import MAX_RESULTS
from .._exceptions import ValidationFailedException, DatabaseErrorException
from .._params import (
GeoSet,
Expand Down Expand Up @@ -42,6 +44,147 @@

latest_table = "epimetric_latest_v"
history_table = "epimetric_full_v"
MIMETYPE_JSON = "application/json"
PANDAS_DTYPES = {
"source": str,
"signal": str,
"time_type": str,
"time_value": int,
"geo_type": str,
"geo_value": str,
"value": float,
"stderr": float,
"sample_size": float,
"missing_value": "Int8",
"missing_stderr": "Int8",
"missing_sample_size": "Int8",
"issue": "Int64",
"lag": "Int64",
"id": "Int64",
"direction": "Int8",
}


def df_to_response(
df: DataFrame,
filename: Optional[str] = None,
) -> Response:
"""This function attempts to capture the substantial logic of APrinter.

TODO: Double check that this is correct and there are no edge cases missed.
"""
is_compatibility = is_compatibility_mode()
if df.empty:
if is_compatibility:
return Response(
"""{"result": -2, "message": "no results"}""",
mimetype=MIMETYPE_JSON
)
else:
return Response(
"""{"epidata": [], "result": -2, "message": "no results"}""",
mimetype=MIMETYPE_JSON
)

if is_compatibility:
df.drop(columns=["source", "geo_type", "time_type"], inplace=True, errors="ignore")

fields = request.values.get("fields")
if fields:
keep_fields = []
for field in fields.split(","):
if field.startswith("-") and field[1:] in df.columns:
df.drop(columns=[field[1:]], inplace=True)
elif field in df.columns:
keep_fields.append(field)
if keep_fields:
df = df[keep_fields]
else:
keep_fields = df.columns

return_format = request.values.get("format", "classic")
if return_format == "classic":
json_str = df.to_json(orient="records")
return Response(
"""{"epidata":""" + json_str + """, "result": 1, "message": "success"}""",
mimetype=MIMETYPE_JSON
)
elif return_format == "json":
json_str = df.to_json(orient="records")
return Response(json_str, mimetype=MIMETYPE_JSON)
elif return_format == "csv":
column_order = [
"geo_value", "signal", "time_value", "direction", "issue", "lag", "missing_value",
"missing_stderr", "missing_sample_size", "value", "stderr", "sample_size"
]
cols = [col for col in column_order if col in keep_fields]
filename = "epidata" if not filename else filename
headers = {"Content-Disposition": f"attachment; filename={filename}.csv"}
return Response(
df[cols].to_csv(index=False),
mimetype="text/csv; charset=utf8",
headers=headers
)


def _set_df_dtypes(df: pd.DataFrame, dtypes: Dict[str, Any]) -> pd.DataFrame:
"""Set the dataframe column datatypes."""
for d in dtypes.values():
try:
pd.api.types.pandas_dtype(d)
except TypeError:
raise ValueError(f"Invalid dtype {d}")

sub_dtypes = {k: v for k, v in dtypes.items() if k in df.columns}
return df.astype(sub_dtypes)


def query_to_df(
q: QueryBuilder,
fields_string: List[str],
fields_int: List[str],
fields_float: List[str],
alias_mapper: Optional[Callable[[str, str], str]] = None,
) -> DataFrame:
try:
query, params = text(f"{str(q)} LIMIT {MAX_RESULTS}"), q.params
rows = (parse_row(row, fields_string, fields_int, fields_float) for row in db.execute(query, params))
except Exception as e:
raise DatabaseErrorException(str(e))

# parse rows
all_columns = set(PANDAS_DTYPES.keys())
selected_columns = set(fields_string + fields_int + fields_float)
dfs = []
if is_compatibility_mode():
remove_columns = set(["source", "direction"])
columns = list(all_columns.intersection(selected_columns).difference(remove_columns))

df = DataFrame(rows, columns=columns)
df = _set_df_dtypes(df, PANDAS_DTYPES)

df["direction"] = None
dfs.append(df)
else:
remove_columns = set(["direction"])
columns = list(all_columns.intersection(selected_columns).difference(remove_columns))

for (source_name, signal_name), rows in groupby(rows, lambda row: (row["source"], row["signal"])):
df = DataFrame(rows, columns=columns)
df = _set_df_dtypes(df, PANDAS_DTYPES)

df.assign(
source = alias_mapper(source_name, signal_name) if alias_mapper else source_name,
signal = signal_name,
direction = None
)
dfs.append(df)

if dfs:
return concat(dfs, sort=False)
else:
return DataFrame([], PANDAS_DTYPES)


@bp.route("/", methods=("GET", "POST"))
def handle():
Expand Down Expand Up @@ -81,14 +224,18 @@ def handle():
q.apply_lag_filter(history_table, lag)
q.apply_as_of_filter(history_table, as_of)

def transform_row(row, proxy):
if is_compatibility or not alias_mapper or "source" not in row:
if request.values.get("format") == "tree":
def transform_row(row, proxy):
if is_compatibility or not alias_mapper or "source" not in row:
return row
row["source"] = alias_mapper(row["source"], proxy["signal"])
return row
row["source"] = alias_mapper(row["source"], proxy["signal"])
return row

# send query
return execute_query(str(q), q.params, fields_string, fields_int, fields_float, transform=transform_row)
# send query
return execute_query(str(q), q.params, fields_string, fields_int, fields_float, transform=transform_row)
else:
df = query_to_df(q, fields_string, fields_int, fields_float)
return df_to_response(df)


def _verify_argument_time_type_matches(is_day_argument: bool, count_daily_signal: int, count_weekly_signal: int) -> None:
Expand Down