Skip to content

[WIP] API server code health pass #1058

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 8 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/api/covidcast.md
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ require knowing when we last confirmed an unchanged value, please get in touch.
| Field | Description | Type |
| --- | --- | --- |
| `result` | result code: 1 = success, 2 = too many results, -2 = no results | integer |
| `epidata` | list of results, 1 per geo/time pair | array of objects |
| `epidata` | list of results, 1 per geo/time filter | array of objects |
| `epidata[].source` | selected `data_source` | string |
| `epidata[].signal` | selected `signal` | string |
| `epidata[].geo_type` | selected `geo_type` | string |
Expand Down
2 changes: 1 addition & 1 deletion docs/api/covidcast_meta.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ None required.
| Field | Description | Type |
|---------------------------|---------------------------------------------------------------------|------------------|
| `result` | result code: 1 = success, 2 = too many results, -2 = no results | integer |
| `epidata` | list of results, one per name/geo_type pair | array of objects |
| `epidata` | list of results, one per name/geo_type filter | array of objects |
| `epidata[].data_source` | data source | string |
| `epidata[].signal` | signal name | string |
| `epidata[].time_type` | temporal resolution of the signal (e.g., `day`, `week`) | string |
Expand Down
18 changes: 9 additions & 9 deletions integrations/server/test_covidcast_endpoints.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ def test_basic(self):
self.assertEqual(out["result"], -1)

with self.subTest("simple"):
out = self._fetch("/", signal=first.signal_pair(), geo=first.geo_pair(), time="day:*")
out = self._fetch("/", signal=first.signal_filter(), geo=first.geo_filter(), time="day:*")
self.assertEqual(len(out["epidata"]), len(rows))

def test_trend(self):
Expand All @@ -66,7 +66,7 @@ def test_trend(self):
ref = rows[num_rows // 2]
self._insert_rows(rows)

out = self._fetch("/trend", signal=first.signal_pair(), geo=first.geo_pair(), date=last.time_value, window="20200401-20201212", basis=ref.time_value)
out = self._fetch("/trend", signal=first.signal_filter(), geo=first.geo_filter(), date=last.time_value, window="20200401-20201212", basis=ref.time_value)

self.assertEqual(out["result"], 1)
self.assertEqual(len(out["epidata"]), 1)
Expand Down Expand Up @@ -99,7 +99,7 @@ def test_trendseries(self):
last = rows[-1]
self._insert_rows(rows)

out = self._fetch("/trendseries", signal=first.signal_pair(), geo=first.geo_pair(), date=last.time_value, window="20200401-20200410", basis=1)
out = self._fetch("/trendseries", signal=first.signal_filter(), geo=first.geo_filter(), date=last.time_value, window="20200401-20200410", basis=1)

self.assertEqual(out["result"], 1)
self.assertEqual(len(out["epidata"]), 3)
Expand Down Expand Up @@ -167,7 +167,7 @@ def test_correlation(self):
self._insert_rows(other_rows)
max_lag = 3

out = self._fetch("/correlation", reference=first.signal_pair(), others=other.signal_pair(), geo=first.geo_pair(), window="20200401-20201212", lag=max_lag)
out = self._fetch("/correlation", reference=first.signal_filter(), others=other.signal_filter(), geo=first.geo_filter(), window="20200401-20201212", lag=max_lag)
self.assertEqual(out["result"], 1)
df = pd.DataFrame(out["epidata"])
self.assertEqual(len(df), max_lag * 2 + 1) # -...0...+
Expand All @@ -191,7 +191,7 @@ def test_csv(self):

response = requests.get(
f"{BASE_URL}/csv",
params=dict(signal=first.signal_pair(), start_day="2020-04-01", end_day="2020-12-12", geo_type=first.geo_type),
params=dict(signal=first.signal_filter(), start_day="2020-04-01", end_day="2020-12-12", geo_type=first.geo_type),
)
response.raise_for_status()
out = response.text
Expand All @@ -209,7 +209,7 @@ def test_backfill(self):
self._insert_rows([*issue_0, *issue_1, *last_issue])
first = issue_0[0]

out = self._fetch("/backfill", signal=first.signal_pair(), geo=first.geo_pair(), time="day:20200401-20201212", anchor_lag=3)
out = self._fetch("/backfill", signal=first.signal_filter(), geo=first.geo_filter(), time="day:20200401-20201212", anchor_lag=3)
self.assertEqual(out["result"], 1)
df = pd.DataFrame(out["epidata"])
self.assertEqual(len(df), 3 * num_rows) # num issues
Expand Down Expand Up @@ -277,17 +277,17 @@ def test_coverage(self):
first = rows[0]

with self.subTest("default"):
out = self._fetch("/coverage", signal=first.signal_pair(), geo_type=first.geo_type, latest=dates[-1], format="json")
out = self._fetch("/coverage", signal=first.signal_filter(), geo_type=first.geo_type, latest=dates[-1], format="json")
self.assertEqual(len(out), len(num_geos_per_date))
self.assertEqual([o["time_value"] for o in out], dates)
self.assertEqual([o["count"] for o in out], num_geos_per_date)

with self.subTest("specify window"):
out = self._fetch("/coverage", signal=first.signal_pair(), geo_type=first.geo_type, window=f"{dates[0]}-{dates[1]}", format="json")
out = self._fetch("/coverage", signal=first.signal_filter(), geo_type=first.geo_type, window=f"{dates[0]}-{dates[1]}", format="json")
self.assertEqual(len(out), 2)
self.assertEqual([o["time_value"] for o in out], dates[:2])
self.assertEqual([o["count"] for o in out], num_geos_per_date[:2])

with self.subTest("invalid geo_type"):
out = self._fetch("/coverage", signal=first.signal_pair(), geo_type="doesnt_exist", format="json")
out = self._fetch("/coverage", signal=first.signal_filter(), geo_type="doesnt_exist", format="json")
self.assertEqual(len(out), 0)
2 changes: 1 addition & 1 deletion src/acquisition/covidcast/covidcast_meta_cache_updater.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ def get_argument_parser():

parser = argparse.ArgumentParser()
parser.add_argument("--log_file", help="filename for log output")
parser.add_argument("--num_threads", type=int, help="number of worker threads to spawn for processing source/signal pairs")
parser.add_argument("--num_threads", type=int, help="number of worker threads to spawn for processing source/signal filters")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
parser.add_argument("--num_threads", type=int, help="number of worker threads to spawn for processing source/signal filters")
parser.add_argument("--num_threads", type=int, help="number of worker threads to spawn for processing source/signal pairs")

this was acceptable usage, each unique pair of (source,signal) makes up a workload

return parser


Expand Down
6 changes: 3 additions & 3 deletions src/acquisition/covidcast/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,10 +60,10 @@ def __init__(self, source, signal, time_type, geo_type, time_value, geo_value, v
self.issue = issue
self.lag = lag

def signal_pair(self):
def signal_filter(self):
return f"{self.source}:{self.signal}"

def geo_pair(self):
def geo_filter(self):
return f"{self.geo_type}:{self.geo_value}"


Expand Down Expand Up @@ -551,7 +551,7 @@ def worker():
try:
while True:
(source, signal) = srcsigs.get_nowait() # this will throw the Empty caught below
logger.info("starting pair", thread=name, pair=f"({source}, {signal})")
logger.info("starting filter", thread=name, filter=f"({source}, {signal})")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
logger.info("starting filter", thread=name, filter=f"({source}, {signal})")
logger.info("starting pair", thread=name, pair=f"({source}, {signal})")

this was acceptable usage, each unique pair of (source,signal) makes up a workload

w_cursor.execute(inner_sql, (source, signal))
with meta_lock:
meta.extend(list(
Expand Down
9 changes: 5 additions & 4 deletions src/server/_common.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,16 @@
import time

from flask import Flask, g, request
from sqlalchemy import event
from sqlalchemy.engine import Connection
from sqlalchemy import create_engine, event
from sqlalchemy.engine import Connection, Engine
from werkzeug.local import LocalProxy

from .utils.logger import get_structured_logger
from ._config import SECRET
from ._db import engine
from ._config import SECRET, SQLALCHEMY_DATABASE_URI, SQLALCHEMY_ENGINE_OPTIONS
from ._exceptions import DatabaseErrorException, EpiDataException

engine: Engine = create_engine(SQLALCHEMY_DATABASE_URI, **SQLALCHEMY_ENGINE_OPTIONS)

app = Flask("EpiData", static_url_path="")
app.config["SECRET"] = SECRET

Expand Down
26 changes: 0 additions & 26 deletions src/server/_db.py

This file was deleted.

Loading