|
19 | 19 | from delphi_utils import get_structured_logger
|
20 | 20 | from delphi_utils.export import create_export_csv
|
21 | 21 | import pandas as pd
|
| 22 | +import covidcast |
22 | 23 |
|
23 |
| -from .constants import make_signal_name |
| 24 | +from .constants import make_signal_name, SIGNALS |
24 | 25 | from .pull import fetch_new_reports
|
25 | 26 |
|
26 | 27 |
|
@@ -71,6 +72,48 @@ def replace_date_param(p):
|
71 | 72 | if len(dates)>0:
|
72 | 73 | run_stats.append((max(dates), len(dates)))
|
73 | 74 |
|
| 75 | + ## If any requested signal is not in metadata, generate it for all dates. |
| 76 | + # |
| 77 | + # Only do so if params.reports is set to "new". If set to "all", the |
| 78 | + # previous fetch_new_reports + CSV loop will already have generated the full |
| 79 | + # history for new signals. If params.reports is set to a specific date |
| 80 | + # range, that request overrides automated backfill. |
| 81 | + if params['indicator']['reports'] == 'new': |
| 82 | + # Fetch metadata to check how recent signals are |
| 83 | + metadata = covidcast.metadata() |
| 84 | + sensor_names = { |
| 85 | + SIGNALS[key][name_field]: key |
| 86 | + for key in params["indicator"]["export_signals"] |
| 87 | + for name_field in ["api_name", "api_prop_name"] |
| 88 | + if name_field in SIGNALS[key].keys() |
| 89 | + } |
| 90 | + |
| 91 | + # Filter to only those we currently want to produce |
| 92 | + cpr_metadata = metadata[(metadata.data_source == "dsew-cpr") & |
| 93 | + (metadata.signal.isin(sensor_names.keys()))] |
| 94 | + |
| 95 | + new_signals = set(sensor_names.keys()).difference(set(cpr_metadata.signal)) |
| 96 | + if new_signals: |
| 97 | + # If any signal not in metadata yet, we need to backfill its full |
| 98 | + # history. |
| 99 | + params['indicator']['reports'] = 'all' |
| 100 | + params['indicator']['export_signals'] = {sensor_names[key] for key in new_signals} |
| 101 | + |
| 102 | + dfs = fetch_new_reports(params, logger) |
| 103 | + for key, df in dfs.items(): |
| 104 | + (geo, sig, is_prop) = key |
| 105 | + if sig not in params["indicator"]["export_signals"]: |
| 106 | + continue |
| 107 | + dates = create_export_csv( |
| 108 | + df, |
| 109 | + params['common']['export_dir'], |
| 110 | + geo, |
| 111 | + make_signal_name(sig, is_prop), |
| 112 | + **export_params |
| 113 | + ) |
| 114 | + if len(dates)>0: |
| 115 | + run_stats.append((max(dates), len(dates))) |
| 116 | + |
74 | 117 | ## log this indicator run
|
75 | 118 | elapsed_time_in_seconds = round(time.time() - start_time, 2)
|
76 | 119 | min_max_date = run_stats and min(s[0] for s in run_stats)
|
|
0 commit comments