Skip to content

Commit 8e63fdc

Browse files
committed
move automated backfill to run.py
1 parent 9839573 commit 8e63fdc

File tree

2 files changed

+44
-19
lines changed
  • dsew_community_profile/delphi_dsew_community_profile

2 files changed

+44
-19
lines changed

dsew_community_profile/delphi_dsew_community_profile/pull.py

Lines changed: 0 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@
55
import os
66
import re
77
from urllib.parse import quote_plus as quote_as_url
8-
import covidcast
98

109
import pandas as pd
1110
import requests
@@ -361,23 +360,6 @@ def nation_from_state(df, sig, geomapper):
361360

362361
def fetch_new_reports(params, logger=None):
363362
"""Retrieve, compute, and collate all data we haven't seen yet."""
364-
# Fetch metadata to check how recent each signal is
365-
metadata = covidcast.metadata()
366-
sensor_names = {
367-
SIGNALS[key][name_field]
368-
for key in params["indicator"]["export_signals"]
369-
for name_field in ["api_name", "api_prop_name"]
370-
if name_field in SIGNALS[key]
371-
}
372-
373-
# Filter to only those we currently want to produce, ignore any old or deprecated signals
374-
cpr_metadata = metadata[(metadata.data_source == "dsew-cpr") &
375-
(metadata.signal.isin(sensor_names))]
376-
377-
if sensor_names.difference(set(cpr_metadata.signal)):
378-
# If any signal not in metadata yet, we need to backfill its full history.
379-
params['indicator']['reports'] = 'all'
380-
381363
listing = fetch_listing(params)
382364

383365
# download and parse individual reports

dsew_community_profile/delphi_dsew_community_profile/run.py

Lines changed: 44 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,9 @@
1919
from delphi_utils import get_structured_logger
2020
from delphi_utils.export import create_export_csv
2121
import pandas as pd
22+
import covidcast
2223

23-
from .constants import make_signal_name
24+
from .constants import make_signal_name, SIGNALS
2425
from .pull import fetch_new_reports
2526

2627

@@ -71,6 +72,48 @@ def replace_date_param(p):
7172
if len(dates)>0:
7273
run_stats.append((max(dates), len(dates)))
7374

75+
## If any requested signal is not in metadata, generate it for all dates.
76+
#
77+
# Only do so if params.reports is set to "new". If set to "all", the
78+
# previous fetch_new_reports + CSV loop will already have generated the full
79+
# history for new signals. If params.reports is set to a specific date
80+
# range, that request overrides automated backfill.
81+
if params['indicator']['reports'] == 'new':
82+
# Fetch metadata to check how recent signals are
83+
metadata = covidcast.metadata()
84+
sensor_names = {
85+
SIGNALS[key][name_field]: key
86+
for key in params["indicator"]["export_signals"]
87+
for name_field in ["api_name", "api_prop_name"]
88+
if name_field in SIGNALS[key].keys()
89+
}
90+
91+
# Filter to only those we currently want to produce
92+
cpr_metadata = metadata[(metadata.data_source == "dsew-cpr") &
93+
(metadata.signal.isin(sensor_names.keys()))]
94+
95+
new_signals = set(sensor_names.keys()).difference(set(cpr_metadata.signal))
96+
if new_signals:
97+
# If any signal not in metadata yet, we need to backfill its full
98+
# history.
99+
params['indicator']['reports'] = 'all'
100+
params['indicator']['export_signals'] = {sensor_names[key] for key in new_signals}
101+
102+
dfs = fetch_new_reports(params, logger)
103+
for key, df in dfs.items():
104+
(geo, sig, is_prop) = key
105+
if sig not in params["indicator"]["export_signals"]:
106+
continue
107+
dates = create_export_csv(
108+
df,
109+
params['common']['export_dir'],
110+
geo,
111+
make_signal_name(sig, is_prop),
112+
**export_params
113+
)
114+
if len(dates)>0:
115+
run_stats.append((max(dates), len(dates)))
116+
74117
## log this indicator run
75118
elapsed_time_in_seconds = round(time.time() - start_time, 2)
76119
min_max_date = run_stats and min(s[0] for s in run_stats)

0 commit comments

Comments
 (0)