diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index 1f7664b46..68ca9bec5 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -116,7 +116,7 @@ jobs: needs: build # only on main and dev branch # TODO: #1112 Remove `|| github.ref == 'refs/heads/api-keys'` after transition to production status. - if: github.ref == 'refs/heads/main' || github.ref == 'refs/heads/dev' || github.ref == 'refs/heads/api-keys' + if: github.ref == 'refs/heads/main' || github.ref == 'refs/heads/dev' || github.ref == 'refs/heads/api-keys' || github.ref == 'refs/heads/ds/dev-no-aprinter' runs-on: ubuntu-latest steps: @@ -141,6 +141,9 @@ jobs: if [ "$imageTag" = "main" ] ; then imageTag="latest" fi + if [ "$imageTag" = "ds/dev-no-aprinter" ] ; then + imageTag="dev-no-aprinter" + fi echo "::set-output name=tag::$imageTag" echo "::set-output name=repo::ghcr.io/${{ github.repository }}" - name: Push Dev Tag diff --git a/src/server/endpoints/covidcast.py b/src/server/endpoints/covidcast.py index c1350b490..67c83cce6 100644 --- a/src/server/endpoints/covidcast.py +++ b/src/server/endpoints/covidcast.py @@ -1,14 +1,16 @@ -from typing import List, Optional, Tuple, Dict, Any +from typing import Callable, List, Optional, Tuple, Dict, Any from itertools import groupby from datetime import date, timedelta from epiweeks import Week -from flask import Blueprint, request +from flask import Blueprint, request, Response from flask.json import loads, jsonify from bisect import bisect_right from sqlalchemy import text -from pandas import read_csv, to_datetime +from pandas import read_csv, to_datetime, DataFrame, concat +import pandas as pd from .._common import is_compatibility_mode, db +from .._config import MAX_RESULTS from .._exceptions import ValidationFailedException, DatabaseErrorException from .._params import ( GeoSet, @@ -42,6 +44,147 @@ latest_table = "epimetric_latest_v" history_table = "epimetric_full_v" +MIMETYPE_JSON = "application/json" +PANDAS_DTYPES = { + "source": str, + "signal": str, + "time_type": str, + "time_value": int, + "geo_type": str, + "geo_value": str, + "value": float, + "stderr": float, + "sample_size": float, + "missing_value": "Int8", + "missing_stderr": "Int8", + "missing_sample_size": "Int8", + "issue": "Int64", + "lag": "Int64", + "id": "Int64", + "direction": "Int8", +} + + +def df_to_response( + df: DataFrame, + filename: Optional[str] = None, +) -> Response: + """This function attempts to capture the substantial logic of APrinter. + + TODO: Double check that this is correct and there are no edge cases missed. + """ + is_compatibility = is_compatibility_mode() + if df.empty: + if is_compatibility: + return Response( + """{"result": -2, "message": "no results"}""", + mimetype=MIMETYPE_JSON + ) + else: + return Response( + """{"epidata": [], "result": -2, "message": "no results"}""", + mimetype=MIMETYPE_JSON + ) + + if is_compatibility: + df.drop(columns=["source", "geo_type", "time_type"], inplace=True, errors="ignore") + + fields = request.values.get("fields") + if fields: + keep_fields = [] + for field in fields.split(","): + if field.startswith("-") and field[1:] in df.columns: + df.drop(columns=[field[1:]], inplace=True) + elif field in df.columns: + keep_fields.append(field) + if keep_fields: + df = df[keep_fields] + else: + keep_fields = df.columns + + return_format = request.values.get("format", "classic") + if return_format == "classic": + json_str = df.to_json(orient="records") + return Response( + """{"epidata":""" + json_str + """, "result": 1, "message": "success"}""", + mimetype=MIMETYPE_JSON + ) + elif return_format == "json": + json_str = df.to_json(orient="records") + return Response(json_str, mimetype=MIMETYPE_JSON) + elif return_format == "csv": + column_order = [ + "geo_value", "signal", "time_value", "direction", "issue", "lag", "missing_value", + "missing_stderr", "missing_sample_size", "value", "stderr", "sample_size" + ] + cols = [col for col in column_order if col in keep_fields] + filename = "epidata" if not filename else filename + headers = {"Content-Disposition": f"attachment; filename={filename}.csv"} + return Response( + df[cols].to_csv(index=False), + mimetype="text/csv; charset=utf8", + headers=headers + ) + + +def _set_df_dtypes(df: pd.DataFrame, dtypes: Dict[str, Any]) -> pd.DataFrame: + """Set the dataframe column datatypes.""" + for d in dtypes.values(): + try: + pd.api.types.pandas_dtype(d) + except TypeError: + raise ValueError(f"Invalid dtype {d}") + + sub_dtypes = {k: v for k, v in dtypes.items() if k in df.columns} + return df.astype(sub_dtypes) + + +def query_to_df( + q: QueryBuilder, + fields_string: List[str], + fields_int: List[str], + fields_float: List[str], + alias_mapper: Optional[Callable[[str, str], str]] = None, +) -> DataFrame: + try: + query, params = text(f"{str(q)} LIMIT {MAX_RESULTS}"), q.params + rows = (parse_row(row, fields_string, fields_int, fields_float) for row in db.execute(query, params)) + except Exception as e: + raise DatabaseErrorException(str(e)) + + # parse rows + all_columns = set(PANDAS_DTYPES.keys()) + selected_columns = set(fields_string + fields_int + fields_float) + dfs = [] + if is_compatibility_mode(): + remove_columns = set(["source", "direction"]) + columns = list(all_columns.intersection(selected_columns).difference(remove_columns)) + + df = DataFrame(rows, columns=columns) + df = _set_df_dtypes(df, PANDAS_DTYPES) + + df["direction"] = None + dfs.append(df) + else: + remove_columns = set(["direction"]) + columns = list(all_columns.intersection(selected_columns).difference(remove_columns)) + + for (source_name, signal_name), rows in groupby(rows, lambda row: (row["source"], row["signal"])): + df = DataFrame(rows, columns=columns) + df = _set_df_dtypes(df, PANDAS_DTYPES) + + df.assign( + source = alias_mapper(source_name, signal_name) if alias_mapper else source_name, + signal = signal_name, + direction = None + ) + dfs.append(df) + + if dfs: + return concat(dfs, sort=False) + else: + return DataFrame([], PANDAS_DTYPES) + @bp.route("/", methods=("GET", "POST")) def handle(): @@ -81,14 +224,18 @@ def handle(): q.apply_lag_filter(history_table, lag) q.apply_as_of_filter(history_table, as_of) - def transform_row(row, proxy): - if is_compatibility or not alias_mapper or "source" not in row: + if request.values.get("format") == "tree": + def transform_row(row, proxy): + if is_compatibility or not alias_mapper or "source" not in row: + return row + row["source"] = alias_mapper(row["source"], proxy["signal"]) return row - row["source"] = alias_mapper(row["source"], proxy["signal"]) - return row - # send query - return execute_query(str(q), q.params, fields_string, fields_int, fields_float, transform=transform_row) + # send query + return execute_query(str(q), q.params, fields_string, fields_int, fields_float, transform=transform_row) + else: + df = query_to_df(q, fields_string, fields_int, fields_float) + return df_to_response(df) def _verify_argument_time_type_matches(is_day_argument: bool, count_daily_signal: int, count_weekly_signal: int) -> None: