Skip to content

Commit 8c39b16

Browse files
committed
Handle lag, issues & as_of
1 parent f713e39 commit 8c39b16

File tree

2 files changed

+25
-35
lines changed

2 files changed

+25
-35
lines changed

src/server/_query.py

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -466,6 +466,27 @@ def where_time_pair(
466466
)
467467
return self
468468

469+
def apply_lag_filter(self, history_table: str, lag: Optional[int]):
470+
if lag is not None:
471+
self.retable(history_table)
472+
# history_table has full spectrum of lag values to search from whereas the latest_table does not
473+
self.where(lag=lag)
474+
475+
def apply_issues_filter(self, history_table: str, issues: Optional[TimeValues]):
476+
if issues:
477+
self.retable(history_table)
478+
self.where_integers("issue", issues)
479+
480+
def apply_as_of_filter(self, history_table: str, as_of: Optional[int]):
481+
if as_of is not None:
482+
self.retable(history_table)
483+
sub_condition_asof = "(issue <= :as_of)"
484+
self.params["as_of"] = as_of
485+
sub_fields = "max(issue) max_issue, time_type, time_value, `source`, `signal`, geo_type, geo_value"
486+
sub_group = "time_type, time_value, `source`, `signal`, geo_type, geo_value"
487+
sub_condition = f"x.max_issue = {self.alias}.issue AND x.time_type = {self.alias}.time_type AND x.time_value = {self.alias}.time_value AND x.source = {self.alias}.source AND x.signal = {self.alias}.signal AND x.geo_type = {self.alias}.geo_type AND x.geo_value = {self.alias}.geo_value"
488+
self.subquery = f"JOIN (SELECT {sub_fields} FROM {self.table} WHERE {self.conditions_clause} AND {sub_condition_asof} GROUP BY {sub_group}) x ON {sub_condition}"
489+
469490
def set_fields(self, *fields: Iterable[str]) -> "QueryBuilder":
470491
self.fields = [f"{self.alias}.{field}" for field_list in fields for field in field_list]
471492
return self

src/server/endpoints/covidcast.py

Lines changed: 4 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -91,28 +91,6 @@ def parse_time_pairs() -> TimePair:
9191
return parse_time_arg()
9292

9393

94-
def _handle_lag_issues_as_of(q: QueryBuilder, issues: Optional[TimeValues] = None, lag: Optional[int] = None, as_of: Optional[int] = None):
95-
if issues:
96-
q.retable(history_table)
97-
q.where_integers("issue", issues)
98-
elif lag is not None:
99-
q.retable(history_table)
100-
# history_table has full spectrum of lag values to search from whereas the latest_table does not
101-
q.where(lag=lag)
102-
elif as_of is not None:
103-
# fetch the most recent issue as of a certain date (not to be confused w/ plain-old "most recent issue"
104-
q.retable(history_table)
105-
sub_condition_asof = "(issue <= :as_of)"
106-
q.params["as_of"] = as_of
107-
sub_fields = "max(issue) max_issue, time_type, time_value, `source`, `signal`, geo_type, geo_value"
108-
sub_group = "time_type, time_value, `source`, `signal`, geo_type, geo_value"
109-
sub_condition = f"x.max_issue = {q.alias}.issue AND x.time_type = {q.alias}.time_type AND x.time_value = {q.alias}.time_value AND x.source = {q.alias}.source AND x.signal = {q.alias}.signal AND x.geo_type = {q.alias}.geo_type AND x.geo_value = {q.alias}.geo_value"
110-
q.subquery = f"JOIN (SELECT {sub_fields} FROM {q.table} WHERE {q.conditions_clause} AND {sub_condition_asof} GROUP BY {sub_group}) x ON {sub_condition}"
111-
else:
112-
# else we are using the (standard/default) `latest_table`, to fetch the most recent issue quickly
113-
pass
114-
115-
11694
@bp.route("/", methods=("GET", "POST"))
11795
def handle():
11896
source_signal_pairs = parse_source_signal_pairs()
@@ -147,7 +125,9 @@ def handle():
147125
q.where_geo_pairs("geo_type", "geo_value", geo_pairs)
148126
q.where_time_pair("time_type", "time_value", time_pair)
149127

150-
_handle_lag_issues_as_of(q, issues, lag, as_of)
128+
q.apply_issues_filter(history_table, issues)
129+
q.apply_lag_filter(history_table, lag)
130+
q.apply_as_of_filter(history_table, as_of)
151131

152132
def transform_row(row, proxy):
153133
if is_compatibility or not alias_mapper or "source" not in row:
@@ -201,9 +181,6 @@ def handle_trend():
201181
q.where_geo_pairs("geo_type", "geo_value", geo_pairs)
202182
q.where_time_pair("time_type", "time_value", time_window)
203183

204-
# fetch most recent issue fast
205-
_handle_lag_issues_as_of(q, None, None, None)
206-
207184
p = create_printer()
208185

209186
def gen(rows):
@@ -252,9 +229,6 @@ def handle_trendseries():
252229
q.where_geo_pairs("geo_type", "geo_value", geo_pairs)
253230
q.where_time_pair("time_type", "time_value", time_window)
254231

255-
# fetch most recent issue fast
256-
_handle_lag_issues_as_of(q, None, None, None)
257-
258232
p = create_printer()
259233

260234
shifter = lambda x: shift_day_value(x, -basis_shift)
@@ -386,7 +360,7 @@ def handle_export():
386360
q.where_time_pair("time_type", "time_value", TimePair("day" if is_day else "week", [(start_day, end_day)]))
387361
q.where_geo_pairs("geo_type", "geo_value", [GeoPair(geo_type, True if geo_values == "*" else geo_values)])
388362

389-
_handle_lag_issues_as_of(q, None, None, as_of)
363+
q.apply_as_of_filter(history_table, as_of)
390364

391365
format_date = time_value_to_iso if is_day else lambda x: time_value_to_week(x).cdcformat()
392366
# tag as_of in filename, if it was specified
@@ -466,9 +440,6 @@ def handle_backfill():
466440
q.where_geo_pairs("geo_type", "geo_value", [geo_pair])
467441
q.where_time_pair("time_type", "time_value", time_pair)
468442

469-
# no restriction of issues or dates since we want all issues
470-
# _handle_lag_issues_as_of(q, issues, lag, as_of)
471-
472443
p = create_printer()
473444

474445
def find_anchor_row(rows: List[Dict[str, Any]], issue: int) -> Optional[Dict[str, Any]]:
@@ -644,8 +615,6 @@ def handle_coverage():
644615
q.group_by = "c.source, c.signal, c.time_value"
645616
q.set_order("source", "signal", "time_value")
646617

647-
_handle_lag_issues_as_of(q, None, None, None)
648-
649618
def transform_row(row, proxy):
650619
if not alias_mapper or "source" not in row:
651620
return row

0 commit comments

Comments
 (0)