Skip to content

create meta data covidcast file + use in /meta #564

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 46 commits into from
Jul 13, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
46 commits
Select commit Hold shift + click to select a range
1db286c
feat: start with google doc parser
sgratzl May 27, 2021
6544b75
feat: generate CSV meta data and use it for /covidcast/meta
sgratzl May 27, 2021
fdbde8e
test: fix for new meta data structure
sgratzl May 27, 2021
d5a82ea
feat: support default values and allow meta filter flags
sgratzl May 28, 2021
7dca07f
feat: update meta data
sgratzl May 28, 2021
46caba9
feat: support db alias
sgratzl May 28, 2021
83b4e17
fix: typo
sgratzl May 28, 2021
326d6fd
fix: resolve alias:* signals to only the subset one
sgratzl Jun 7, 2021
8f34ae1
Merge remote-tracking branch 'origin/dev' into sgratzl/covidcast_meta…
sgratzl Jun 10, 2021
afc1aff
feat: adapt to latest sheets
sgratzl Jun 10, 2021
be62bf1
refactor: add more type annotations
sgratzl Jun 10, 2021
9129db1
Merge remote-tracking branch 'origin/dev' into sgratzl/covidcast_meta…
sgratzl Jun 11, 2021
67286d1
fix: publish dua field
sgratzl Jun 11, 2021
faa3e42
feat: adapt to latest sheets
sgratzl Jun 14, 2021
4a56614
fix: back to "source" name
sgratzl Jun 14, 2021
883ab24
feat(covidcast names): adapt to latest CSV changes
sgratzl Jun 15, 2021
bd32cc9
Merge remote-tracking branch 'origin/dev' into sgratzl/covidcast_meta…
sgratzl Jun 15, 2021
93b389d
chore: update meta file
sgratzl Jun 17, 2021
4975fb0
Merge remote-tracking branch 'origin/dev' into sgratzl/covidcast_meta…
sgratzl Jun 23, 2021
c73327b
feat: update to current google sheets
sgratzl Jun 23, 2021
886cd7e
feat: support text replacements
sgratzl Jun 23, 2021
1efd87f
feat: add active filter
sgratzl Jun 23, 2021
e619162
feat: start with google doc parser
sgratzl May 27, 2021
e2a19c1
feat: generate CSV meta data and use it for /covidcast/meta
sgratzl May 27, 2021
d4d5d7d
test: fix for new meta data structure
sgratzl May 27, 2021
6441c81
feat: support default values and allow meta filter flags
sgratzl May 28, 2021
f6724fe
feat: update meta data
sgratzl May 28, 2021
f060177
feat: support db alias
sgratzl May 28, 2021
cca22db
fix: publish dua field
sgratzl Jun 11, 2021
f69fbfb
feat: adapt to latest sheets
sgratzl Jun 14, 2021
bb694ae
fix: back to "source" name
sgratzl Jun 14, 2021
5063856
feat(covidcast names): adapt to latest CSV changes
sgratzl Jun 15, 2021
137913c
chore: update meta file
sgratzl Jun 17, 2021
c8df415
fix: typo
sgratzl May 28, 2021
6e80e65
feat: update to current google sheets
sgratzl Jun 23, 2021
e43e002
fix: resolve alias:* signals to only the subset one
sgratzl Jun 7, 2021
fe1c29c
feat: support text replacements
sgratzl Jun 23, 2021
5c5428d
feat: adapt to latest sheets
sgratzl Jun 10, 2021
4f39018
feat: add active filter
sgratzl Jun 23, 2021
0031ee2
refactor: add more type annotations
sgratzl Jun 10, 2021
f84c9ce
Merge branch 'sgratzl/covidcast_meta_file' of github.com:cmu-delphi/d…
sgratzl Jun 25, 2021
a5d8dd2
feat: update sheets and adapt
sgratzl Jun 25, 2021
9dc29ba
Merge remote-tracking branch 'origin/dev' into sgratzl/covidcast_meta…
sgratzl Jul 13, 2021
3e59b6d
feat: update docs
sgratzl Jul 13, 2021
16f8104
Revert "refactor: add more type annotations"
sgratzl Jul 13, 2021
21917fe
Merge branch 'sgratzl/covidcast_meta_file' of github.com:cmu-delphi/d…
sgratzl Jul 13, 2021
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ jobs:
docker stop delphi_database_epidata delphi_web_epidata
docker network remove delphi-net

build_js_clients:
build_js_client:
runs-on: ubuntu-latest
defaults:
run:
Expand Down
37 changes: 37 additions & 0 deletions .github/workflows/update_gdocs_data.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
name: Update Google Docs Meta Data
on:
workflow_dispatch:
jobs:
update_gdocs:
runs-on: ubuntu-latest
steps:
- name: Check out code
uses: actions/checkout@v2
with:
branch: dev
ssh-key: ${{ secrets.CMU_DELPHI_DEPLOY_MACHINE_SSH }}
- name: Set up Python 3.8
uses: actions/setup-python@v2
with:
python-version: 3.8
- uses: actions/cache@v2
with:
path: ~/.cache/pip
key: ${{ runner.os }}-pipd-${{ hashFiles('requirements-dev.txt') }}
restore-keys: |
${{ runner.os }}-pipd-
- name: Install Dependencies
run: pip install -r requirements-dev.txt
- name: Update Docs
run: inv update-gdoc
- name: Create pull request into dev
uses: peter-evans/create-pull-request@v3
with:
branch: bot/update-docs
commit-message: 'chore: update docs'
title: Update Google Docs Meta Data
labels: chore
reviewers: krivard
assignees: krivard
body: |
Updating Google Docs Meta Data
13 changes: 10 additions & 3 deletions integrations/server/test_covidcast_endpoints.py
Original file line number Diff line number Diff line change
Expand Up @@ -339,7 +339,7 @@ def test_meta(self):
"""Request a signal the /meta endpoint."""

num_rows = 10
rows = [CovidcastRow(time_value=20200401 + i, value=i) for i in range(num_rows)]
rows = [CovidcastRow(time_value=20200401 + i, value=i, source="fb-survey", signal="smoothed_cli") for i in range(num_rows)]
self._insert_rows(rows)
first = rows[0]
last = rows[-1]
Expand All @@ -349,7 +349,10 @@ def test_meta(self):
with self.subTest("plain"):
out = self._fetch("/meta")
self.assertEqual(len(out), 1)
stats = out[0]
data_source = out[0]
self.assertEqual(data_source["source"], first.source)
self.assertEqual(len(data_source["signals"]), 1)
stats = data_source["signals"][0]
self.assertEqual(stats["source"], first.source)
self.assertEqual(stats["signal"], first.signal)
self.assertEqual(stats["min_time"], first.time_value)
Expand All @@ -364,7 +367,11 @@ def test_meta(self):
with self.subTest("filtered"):
out = self._fetch("/meta", signal=f"{first.source}:*")
self.assertEqual(len(out), 1)
self.assertEqual(out[0]["source"], first.source)
data_source = out[0]
self.assertEqual(data_source["source"], first.source)
self.assertEqual(len(data_source["signals"]), 1)
stats = data_source["signals"][0]
self.assertEqual(stats["source"], first.source)
out = self._fetch("/meta", signal=f"{first.source}:X")
self.assertEqual(len(out), 0)

Expand Down
1 change: 1 addition & 0 deletions requirements.dev.txt
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,4 @@ mypy>=0.790
pytest
tenacity==7.0.0
bump2version
requests
148 changes: 115 additions & 33 deletions src/server/endpoints/covidcast.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import List, Optional, Union, Tuple, Dict, Any, Set
from typing import List, Optional, Union, Tuple, Dict, Any
from itertools import groupby
from datetime import date, datetime, timedelta
from flask import Blueprint, request
Expand Down Expand Up @@ -33,8 +33,9 @@
require_any,
)
from .._pandas import as_pandas, print_pandas
from .covidcast_utils import compute_trend, compute_trends, compute_correlations, compute_trend_value, CovidcastMetaEntry, AllSignalsMap
from .covidcast_utils import compute_trend, compute_trends, compute_correlations, compute_trend_value, CovidcastMetaEntry
from ..utils import shift_time_value, date_to_time_value, time_value_to_iso, time_value_to_date
from .covidcast_utils.model import TimeType, data_sources, create_source_signal_alias_mapper

# first argument is the endpoint name
bp = Blueprint("covidcast", __name__)
Expand Down Expand Up @@ -124,6 +125,7 @@ def guess_index_to_use(time: List[TimePair], geo: List[GeoPair], issues: Optiona
@bp.route("/", methods=("GET", "POST"))
def handle():
source_signal_pairs = parse_source_signal_pairs()
source_signal_pairs, alias_mapper = create_source_signal_alias_mapper(source_signal_pairs)
time_pairs = parse_time_pairs()
geo_pairs = parse_geo_pairs()

Expand All @@ -137,8 +139,8 @@ def handle():
fields_string = ["geo_value", "signal"]
fields_int = ["time_value", "direction", "issue", "lag", "missing_value", "missing_stderr", "missing_sample_size"]
fields_float = ["value", "stderr", "sample_size"]

if is_compatibility_mode():
is_compatibility = is_compatibility_mode()
if is_compatibility:
q.set_order("signal", "time_value", "geo_value", "issue")
else:
# transfer also the new detail columns
Expand All @@ -158,14 +160,22 @@ def handle():

_handle_lag_issues_as_of(q, issues, lag, as_of)

def transform_row(row, _):
if is_compatibility or not alias_mapper:
return row
row["source"] = alias_mapper(row["source"], row["signal"])
return row

# send query
return execute_query(str(q), q.params, fields_string, fields_int, fields_float)
return execute_query(str(q), q.params, fields_string, fields_int, fields_float, transform=transform_row)


@bp.route("/trend", methods=("GET", "POST"))
def handle_trend():
require_all("date", "window")
source_signal_pairs = parse_source_signal_pairs()
source_signal_pairs, alias_mapper = create_source_signal_alias_mapper(source_signal_pairs)
# TODO alias
geo_pairs = parse_geo_pairs()

time_value = parse_day_arg("date")
Expand All @@ -192,7 +202,10 @@ def handle_trend():

def gen(rows):
for key, group in groupby((parse_row(row, fields_string, fields_int, fields_float) for row in rows), lambda row: (row["geo_type"], row["geo_value"], row["source"], row["signal"])):
trend = compute_trend(key[0], key[1], key[2], key[3], time_value, basis_time_value, ((row["time_value"], row["value"]) for row in group))
geo_type, geo_value, source, signal = key
if alias_mapper:
source = alias_mapper(source, signal)
trend = compute_trend(geo_type, geo_value, source, signal, time_value, basis_time_value, ((row["time_value"], row["value"]) for row in group))
yield trend.asdict()

# execute first query
Expand All @@ -209,6 +222,7 @@ def gen(rows):
def handle_trendseries():
require_all("window")
source_signal_pairs = parse_source_signal_pairs()
source_signal_pairs, alias_mapper = create_source_signal_alias_mapper(source_signal_pairs)
geo_pairs = parse_geo_pairs()

time_window = parse_day_range_arg("window")
Expand Down Expand Up @@ -238,9 +252,12 @@ def handle_trendseries():

def gen(rows):
for key, group in groupby((parse_row(row, fields_string, fields_int, fields_float) for row in rows), lambda row: (row["geo_type"], row["geo_value"], row["source"], row["signal"])):
trends = compute_trends(key[0], key[1], key[2], key[3], shifter, ((row["time_value"], row["value"]) for row in group))
for trend in trends:
yield trend.asdict()
geo_type, geo_value, source, signal = key
if alias_mapper:
source = alias_mapper(source, signal)
trends = compute_trends(geo_type, geo_value, source, signal, shifter, ((row["time_value"], row["value"]) for row in group))
for t in trends:
yield t.asdict()

# execute first query
try:
Expand All @@ -257,6 +274,7 @@ def handle_correlation():
require_all("reference", "window", "others", "geo")
reference = parse_single_source_signal_arg("reference")
other_pairs = parse_source_signal_arg("others")
source_signal_pairs, alias_mapper = create_source_signal_alias_mapper(other_pairs + [reference])
geo_pairs = parse_geo_arg()
time_window = parse_day_range_arg("window")
lag = extract_integer("lag")
Expand All @@ -272,7 +290,11 @@ def handle_correlation():
q.set_fields(fields_string, fields_int, fields_float)
q.set_order("geo_type", "geo_value", "source", "signal", "time_value")

q.where_source_signal_pairs("source", "signal", other_pairs + [reference])
q.where_source_signal_pairs(
"source",
"signal",
source_signal_pairs,
)
q.where_geo_pairs("geo_type", "geo_value", geo_pairs)
q.where_time_pairs("time_type", "time_value", [TimePair("day", [time_window])])

Expand Down Expand Up @@ -305,6 +327,8 @@ def gen():
continue # no other signals

for (source, signal), other_group in other_groups:
if alias_mapper:
source = alias_mapper(source, signal)
for cor in compute_correlations(geo_type, geo_value, source, signal, lag, reference_group, other_group):
yield cor.asdict()

Expand All @@ -315,6 +339,7 @@ def gen():
@bp.route("/csv", methods=("GET", "POST"))
def handle_export():
source, signal = request.args.get("signal", "jhu-csse:confirmed_incidence_num").split(":")
source_signal_pairs, alias_mapper = create_source_signal_alias_mapper([SourceSignalPair(source, [signal])])
start_day = request.args.get("start_day", "2020-04-01")
end_day = request.args.get("end_day", "2020-09-01")
geo_type = request.args.get("geo_type", "county")
Expand All @@ -336,7 +361,8 @@ def handle_export():

q.set_fields(["geo_value", "signal", "time_value", "issue", "lag", "value", "stderr", "sample_size", "geo_type", "source"], [], [])
q.set_order("time_value", "geo_value")
q.where(source=source, signal=signal, time_type="day")
q.where(time_type="day")
q.where_source_signal_pairs("source", "signal", source_signal_pairs)
q.conditions.append("time_value BETWEEN :start_day AND :end_day")
q.params["start_day"] = date_to_time_value(start_day)
q.params["end_day"] = date_to_time_value(end_day)
Expand All @@ -362,7 +388,7 @@ def parse_row(i, row):
"stderr": row["stderr"],
"sample_size": row["sample_size"],
"geo_type": row["geo_type"],
"data_source": row["source"],
"data_source": alias_mapper(row["source"], row["signal"]) if alias_mapper else row["source"],
}

def gen(first_row, rows):
Expand Down Expand Up @@ -394,6 +420,9 @@ def handle_backfill():
"""
require_all("geo", "time", "signal")
signal_pair = parse_single_source_signal_arg("signal")
source_signal_pairs, _ = create_source_signal_alias_mapper([signal_pair])
# don't need the alias mapper since we don't return the source

time_pair = parse_single_time_arg("time")
geo_pair = parse_single_geo_arg("geo")
reference_anchor_lag = extract_integer("anchor_lag") # in days
Expand All @@ -410,7 +439,7 @@ def handle_backfill():
q.set_order(time_value=True, issue=True)
q.set_fields(fields_string, fields_int, fields_float, ["is_latest_issue"])

q.where_source_signal_pairs("source", "signal", [signal_pair])
q.where_source_signal_pairs("source", "signal", source_signal_pairs)
q.where_geo_pairs("geo_type", "geo_value", [geo_pair])
q.where_time_pairs("time_type", "time_value", [time_pair])

Expand Down Expand Up @@ -463,31 +492,79 @@ def handle_meta():
similar to /covidcast_meta but in a structured optimized JSON form for the app
"""

signal = parse_source_signal_arg("signal")
filter_signal = parse_source_signal_arg("signal")
flags = ",".join(request.values.getlist("flags")).split(",")
filter_smoothed: Optional[bool] = None
filter_weighted: Optional[bool] = None
filter_cumulative: Optional[bool] = None
filter_active: Optional[bool] = None
filter_time_type: Optional[TimeType] = None

if "smoothed" in flags:
filter_smoothed = True
elif "not_smoothed" in flags:
filter_smoothed = False
if "weighted" in flags:
filter_weighted = True
elif "not_weighted" in flags:
filter_weighted = False
if "cumulative" in flags:
filter_cumulative = True
elif "not_cumulative" in flags:
filter_cumulative = False
if "active" in flags:
filter_active = True
elif "inactive" in flags:
filter_active = False
if "day" in flags:
filter_active = TimeType.day
elif "week" in flags:
filter_active = TimeType.week

row = db.execute(text("SELECT epidata FROM covidcast_meta_cache LIMIT 1")).fetchone()

data = loads(row["epidata"]) if row and row["epidata"] else []

all_signals: AllSignalsMap = {}
by_signal: Dict[Tuple[str, str], List[Dict[str, Any]]] = {}
for row in data:
if row["time_type"] != "day":
continue
entry: Set[str] = all_signals.setdefault(row["data_source"], set())
entry.add(row["signal"])
entry = by_signal.setdefault((row["data_source"], row["signal"]), [])
entry.append(row)

out: Dict[str, CovidcastMetaEntry] = {}
for row in data:
if row["time_type"] != "day":
sources: List[Dict[str, Any]] = []
for source in data_sources:
if filter_active is not None and source.active != filter_active:
continue
if signal and all((not s.matches(row["data_source"], row["signal"]) for s in signal)):

meta_signals: List[Dict[str, Any]] = []

for signal in source.signals:
if filter_signal and all((not s.matches(signal.source, signal.signal) for s in filter_signal)):
continue
if filter_smoothed is not None and signal.is_smoothed != filter_smoothed:
continue
if filter_weighted is not None and signal.is_weighted != filter_weighted:
continue
if filter_cumulative is not None and signal.is_cumulative != filter_cumulative:
continue
if filter_time_type is not None and signal.time_type != filter_time_type:
continue
meta_data = by_signal.get(signal.key)
if not meta_data:
continue
row = meta_data[0]
entry = CovidcastMetaEntry(signal, row["min_time"], row["max_time"], row["max_issue"])
for row in meta_data:
entry.intergrate(row)
meta_signals.append(entry.asdict())

if not meta_signals: # none found or no signals
continue
entry = out.setdefault(
f"{row['data_source']}:{row['signal']}", CovidcastMetaEntry(row["data_source"], row["signal"], row["min_time"], row["max_time"], row["max_issue"], {}, all_signals=all_signals)
)
entry.intergrate(row)

return jsonify([r.asdict() for r in out.values()])
s = source.asdict()
s["signals"] = meta_signals
sources.append(s)

return jsonify(sources)


@bp.route("/coverage", methods=("GET", "POST"))
Expand All @@ -496,7 +573,8 @@ def handle_coverage():
similar to /signal_dashboard_coverage for a specific signal returns the coverage (number of locations for a given geo_type)
"""

signal = parse_source_signal_pairs()
source_signal_pairs = parse_source_signal_pairs()
source_signal_pairs, alias_mapper = create_source_signal_alias_mapper(source_signal_pairs)
geo_type = request.args.get("geo_type", "county")
if "window" in request.values:
time_window = parse_day_range_arg("window")
Expand All @@ -523,14 +601,20 @@ def handle_coverage():
q.conditions.append('geo_value not like "%000"')
else:
q.where(geo_type=geo_type)
q.where_source_signal_pairs("source", "signal", signal)
q.where_source_signal_pairs("source", "signal", source_signal_pairs)
q.where_time_pairs("time_type", "time_value", [TimePair("day", [time_window])])
q.group_by = "c.source, c.signal, c.time_value"
q.set_order("source", "signal", "time_value")

_handle_lag_issues_as_of(q, None, None, None)

return execute_query(q.query, q.params, fields_string, fields_int, [])
def transform_row(row, _):
if not alias_mapper:
return row
row["source"] = alias_mapper(row["source"], row["signal"])
return row

return execute_query(q.query, q.params, fields_string, fields_int, [], transform=transform_row)


@bp.route("/anomalies", methods=("GET", "POST"))
Expand All @@ -539,8 +623,6 @@ def handle_anomalies():
proxy to the excel sheet about data anomalies
"""

signal = parse_source_signal_arg("signal")

df = read_csv(
"https://docs.google.com/spreadsheets/d/e/2PACX-1vToGcf9x5PNJg-eSrxadoR5b-LM2Cqs9UML97587OGrIX0LiQDcU1HL-L2AA8o5avbU7yod106ih0_n/pub?gid=0&single=true&output=csv", skip_blank_lines=True
)
Expand Down
Loading