Skip to content

Commit cdf6995

Browse files
authored
Merge pull request #950 from cmu-delphi/main
Deploy new retired signals to sircal (nmf, surveys local-health) to production
2 parents 324a2f8 + 0cca967 commit cdf6995

23 files changed

+356
-196
lines changed

ansible/templates/safegraph-params-prod.json.j2

Lines changed: 26 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,31 @@
1414
"sync": true,
1515
"wip_signal" : []
1616
},
17-
"archive": {
18-
"cache_dir": "./cache"
17+
"validation": {
18+
"common": {
19+
"data_source": "safegraph",
20+
"span_length": 14,
21+
"end_date": "today",
22+
"suppressed_errors": [
23+
{"signal": "bars_visit_num"},
24+
{"signal": "bars_visit_prop"},
25+
{"signal": "restaurants_visit_num"},
26+
{"signal": "restaurants_visit_prop"}
27+
]
28+
},
29+
"static": {
30+
"minimum_sample_size": 100,
31+
"missing_se_allowed": false,
32+
"missing_sample_size_allowed": false
33+
},
34+
"dynamic": {
35+
"ref_window_size": 7,
36+
"smoothed_signals": [
37+
"completely_home_prop_7dav",
38+
"full_time_work_prop_7dav",
39+
"part_time_work_prop_7dav",
40+
"median_home_dwell_time_7dav"
41+
]
42+
}
1943
}
2044
}

ansible/templates/sir_complainsalot-params-prod.json.j2

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,12 +34,12 @@
3434
"fb-survey": {
3535
"max_age": 3,
3636
"maintainers": ["U01069KCRS7"],
37-
"retired-signals": ["smoothed_anxious_5d", "smoothed_wanxious_5d", "smoothed_depressed_5d", "smoothed_wdepressed_5d", "smoothed_felt_isolated_5d", "smoothed_wfelt_isolated_5d", "smoothed_large_event_1d", "smoothed_wlarge_event_1d", "smoothed_restaurant_1d", "smoothed_wrestaurant_1d", "smoothed_shop_1d", "smoothed_wshop_1d", "smoothed_spent_time_1d", "smoothed_wspent_time_1d", "smoothed_travel_outside_state_5d", "smoothed_wtravel_outside_state_5d", "smoothed_work_outside_home_1d", "smoothed_wwork_outside_home_1d", "smoothed_wearing_mask", "smoothed_wwearing_mask"]
37+
"retired-signals": ["smoothed_anxious_5d", "smoothed_wanxious_5d", "smoothed_depressed_5d", "smoothed_wdepressed_5d", "smoothed_felt_isolated_5d", "smoothed_wfelt_isolated_5d", "smoothed_large_event_1d", "smoothed_wlarge_event_1d", "smoothed_restaurant_1d", "smoothed_wrestaurant_1d", "smoothed_shop_1d", "smoothed_wshop_1d", "smoothed_spent_time_1d", "smoothed_wspent_time_1d", "smoothed_travel_outside_state_5d", "smoothed_wtravel_outside_state_5d", "smoothed_work_outside_home_1d", "smoothed_wwork_outside_home_1d", "smoothed_wearing_mask", "smoothed_wwearing_mask", "smoothed_vaccine_likely_local_health", "smoothed_wvaccine_likely_local_health"]
3838
},
3939
"indicator-combination": {
4040
"max_age": 4,
4141
"maintainers": ["U01AP8GSWG3","U01069KCRS7"],
42-
"retired-signals": ["nmf_day_doc_fbs_ght"]
42+
"retired-signals": ["nmf_day_doc_fbs_ght", "nmf_day_doc_fbc_fbs_ght"]
4343
},
4444
"quidel": {
4545
"max_age":6,

claims_hosp/delphi_claims_hosp/config.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ class GeoConstants:
6363
NUM_COUNTIES = 3141 + 52
6464
NUM_HRRS = 308
6565
NUM_MSAS = 392 + 52 # MSA + States
66-
NUM_STATES = 52 # including DC and PR
66+
NUM_STATES = 54 # including DC, PR, VI, GU
6767
NUM_HHSS = 10
6868
NUM_NATIONS = 1
6969

claims_hosp/params.json.template

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
"start_date": "2020-02-01",
99
"end_date": null,
1010
"drop_date": null,
11-
"n_backfill_days": 60,
11+
"n_backfill_days": 70,
1212
"n_waiting_days": 3,
1313
"write_se": false,
1414
"obfuscated_prefix": "foo_obfuscated",

doctor_visits/delphi_doctor_visits/geo_maps.py

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
Created: 2020-04-18
88
Last modified: 2020-04-30 by Aaron Rumack (add megacounty code)
99
"""
10+
from functools import partial
1011

1112
import pandas as pd
1213
from delphi_utils.geomap import GeoMapper
@@ -20,6 +21,14 @@ class GeoMaps:
2021
def __init__(self):
2122
"""Create the underlying GeoMapper."""
2223
self.gmpr = GeoMapper()
24+
self.geo_func = {"county": partial(self.county_to_megacounty,
25+
threshold_visits=Config.MIN_RECENT_VISITS,
26+
threshold_len=Config.RECENT_LENGTH),
27+
"state": self.county_to_state,
28+
"msa": self.county_to_msa,
29+
"hrr": self.county_to_hrr,
30+
"hhs": self.county_to_hhs,
31+
"nation": self.county_to_nation}
2332

2433
@staticmethod
2534
def convert_fips(x):
@@ -61,6 +70,40 @@ def county_to_state(self, data):
6170

6271
return data.groupby("state_id"), "state_id"
6372

73+
def county_to_hhs(self, data):
74+
"""Aggregate county data to the HHS region resolution.
75+
76+
Args:
77+
data: dataframe aggregated to the daily-county resolution (all 7 cols expected)
78+
79+
Returns: tuple of dataframe at the daily-HHS resolution, and geo_id column name
80+
"""
81+
data = self.gmpr.add_geocode(data,
82+
"fips",
83+
"hhs",
84+
from_col="PatCountyFIPS")
85+
data.drop(columns="PatCountyFIPS", inplace=True)
86+
data = data.groupby(["ServiceDate", "hhs"]).sum().reset_index()
87+
88+
return data.groupby("hhs"), "hhs"
89+
90+
def county_to_nation(self, data):
91+
"""Aggregate county data to the nation resolution.
92+
93+
Args:
94+
data: dataframe aggregated to the daily-county resolution (all 7 cols expected)
95+
96+
Returns: tuple of dataframe at the daily-nation resolution, and geo_id column name
97+
"""
98+
data = self.gmpr.add_geocode(data,
99+
"fips",
100+
"nation",
101+
from_col="PatCountyFIPS")
102+
data.drop(columns="PatCountyFIPS", inplace=True)
103+
data = data.groupby(["ServiceDate", "nation"]).sum().reset_index()
104+
105+
return data.groupby("nation"), "nation"
106+
64107
def county_to_hrr(self, data):
65108
"""Aggregate county data to the HRR resolution.
66109

doctor_visits/delphi_doctor_visits/run.py

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -61,14 +61,14 @@ def run_module(params):
6161
startdate_dt = enddate_dt - timedelta(days=n_backfill_days)
6262
enddate = str(enddate_dt.date())
6363
startdate = str(startdate_dt.date())
64-
logging.info("drop date:\t\t{dropdate}")
65-
logging.info("first sensor date:\t{startdate}")
66-
logging.info("last sensor date:\t{enddate}")
67-
logging.info("n_backfill_days:\t{n_backfill_days}")
68-
logging.info("n_waiting_days:\t{n_waiting_days}")
64+
logging.info("drop date:\t\t%s", dropdate)
65+
logging.info("first sensor date:\t%s", startdate)
66+
logging.info("last sensor date:\t%s", enddate)
67+
logging.info("n_backfill_days:\t%s", n_backfill_days)
68+
logging.info("n_waiting_days:\t%s", n_waiting_days)
6969

7070
## geographies
71-
geos = ["state", "msa", "hrr", "county"]
71+
geos = ["state", "msa", "hrr", "county", "hhs", "nation"]
7272

7373

7474
## print out other vars

doctor_visits/delphi_doctor_visits/update_sensor.py

Lines changed: 3 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ def update_sensor(
7878
startdate: first sensor date (YYYY-mm-dd)
7979
enddate: last sensor date (YYYY-mm-dd)
8080
dropdate: data drop date (YYYY-mm-dd)
81-
geo: geographic resolution, one of ["county", "state", "msa", "hrr"]
81+
geo: geographic resolution, one of ["county", "state", "msa", "hrr", "nation", "hhs"]
8282
parallel: boolean to run the sensor update in parallel
8383
weekday: boolean to adjust for weekday effects
8484
se: boolean to write out standard errors, if true, use an obfuscated name
@@ -132,19 +132,8 @@ def update_sensor(
132132

133133
# get right geography
134134
geo_map = GeoMaps()
135-
if geo.lower() == "county":
136-
data_groups, _ = geo_map.county_to_megacounty(
137-
data, Config.MIN_RECENT_VISITS, Config.RECENT_LENGTH
138-
)
139-
elif geo.lower() == "state":
140-
data_groups, _ = geo_map.county_to_state(data)
141-
elif geo.lower() == "msa":
142-
data_groups, _ = geo_map.county_to_msa(data)
143-
elif geo.lower() == "hrr":
144-
data_groups, _ = geo_map.county_to_hrr(data)
145-
else:
146-
logging.error(f"{geo} is invalid, pick one of 'county', 'state', 'msa', 'hrr'")
147-
return {}
135+
mapping_func = geo_map.geo_func[geo.lower()]
136+
data_groups, _ = mapping_func(data)
148137
unique_geo_ids = list(data_groups.groups.keys())
149138

150139
# run sensor fitting code (maybe in parallel)

doctor_visits/params.json.template

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
"indicator": {
66
"input_file": "./input/SYNEDI_AGG_OUTPATIENT_18052020_1455CDT.csv.gz",
77
"drop_date": "",
8-
"n_backfill_days": 60,
8+
"n_backfill_days": 70,
99
"n_waiting_days": 3,
1010
"weekday": [true, false],
1111
"se": false,

facebook/delphiFacebook/NAMESPACE

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@ export(filter_complete_responses)
1717
export(filter_data_for_aggregation)
1818
export(filter_responses)
1919
export(floor_epiweek)
20-
export(get_date_range_from_filenames)
2120
export(get_filenames_in_range)
2221
export(get_range_prev_full_month)
2322
export(get_range_prev_full_period)

facebook/delphiFacebook/R/contingency_aggregate.R

Lines changed: 46 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@
2424
#' @return none
2525
#'
2626
#' @import data.table
27-
#' @importFrom dplyr full_join %>%
27+
#' @importFrom dplyr full_join %>% select all_of
2828
#' @importFrom purrr reduce
2929
#'
3030
#' @export
@@ -44,9 +44,18 @@ produce_aggregates <- function(df, aggregations, cw_list, params) {
4444
df <- output[[1]]
4545
aggregations <- output[[2]]
4646

47+
## Keep only columns used in indicators, plus supporting columns.
48+
group_vars <- unique( unlist(aggregations$group_by) )
49+
df <- select(df,
50+
all_of(unique(aggregations$metric)),
51+
all_of(unique(aggregations$var_weight)),
52+
all_of( group_vars[group_vars != "geo_id"] ),
53+
zip5,
54+
start_dt)
55+
4756
agg_groups <- unique(aggregations[c("group_by", "geo_level")])
4857

49-
# For each unique combination of groupby_vars and geo level, run aggregation process once
58+
# For each unique combination of group_vars and geo level, run aggregation process once
5059
# and calculate all desired aggregations on the grouping. Rename columns. Save
5160
# to individual files
5261
for (group_ind in seq_along(agg_groups$group_by)) {
@@ -158,37 +167,43 @@ post_process_aggs <- function(df, aggregations, cw_list) {
158167
# - multi-select items are converted to a series of binary columns, one for
159168
# each unique level/response code; multi-select used for grouping are left as-is.
160169
# - multiple choice items are left as-is
161-
170+
162171
#### TODO: How do we want to handle multi-select items when used for grouping?
163-
agg_groups <- unique(aggregations$group_by)
164-
group_cols_to_convert <- unique(do.call(c, agg_groups))
165-
group_cols_to_convert <- group_cols_to_convert[startsWith(group_cols_to_convert, "b_")]
166-
167-
metric_cols_to_convert <- unique(aggregations$metric)
168-
169-
for (col_var in c(group_cols_to_convert, metric_cols_to_convert)) {
170-
if ( is.null(df[[col_var]]) ) {
171-
aggregations <- aggregations[aggregations$metric != col_var &
172-
!mapply(aggregations$group_by,
173-
FUN=function(x) {col_var %in% x}), ]
174-
msg_plain(
175-
paste0(
176-
col_var, " is not defined. Removing all aggregations that use it. ",
177-
nrow(aggregations), " remaining")
178-
)
172+
group_vars <- unique( unlist(aggregations$group_by) )
173+
group_vars <- group_vars[group_vars != "geo_id"]
174+
175+
metric_cols <- unique(aggregations$metric)
176+
177+
cols_check_available <- unique(c(group_vars, metric_cols))
178+
available <- cols_check_available %in% names(df)
179+
cols_not_available <- cols_check_available[ !available ]
180+
for (col_var in cols_not_available) {
181+
# Remove from aggregations
182+
aggregations <- aggregations[aggregations$metric != col_var &
183+
!mapply(aggregations$group_by,
184+
FUN=function(x) {col_var %in% x}), ]
185+
msg_plain(paste0(
186+
col_var, " is not defined. Removing all aggregations that use it. ",
187+
nrow(aggregations), " remaining")
188+
)
189+
}
190+
191+
cols_available <- cols_check_available[ available ]
192+
for (col_var in cols_available) {
193+
if ( col_var %in% group_vars & !(col_var %in% metric_cols) & !startsWith(col_var, "b_") ) {
179194
next
180195
}
181196

182197
if (startsWith(col_var, "b_")) { # Binary
183198
output <- code_binary(df, aggregations, col_var)
184-
} else if (startsWith(col_var, "ms_")) { # Multiselect
185-
output <- code_multiselect(df, aggregations, col_var)
186199
} else if (startsWith(col_var, "n_")) { # Numeric free response
187200
output <- code_numeric_freeresponse(df, aggregations, col_var)
188-
} else if (startsWith(col_var, "mc_")) { # Multiple choice
201+
} else if (startsWith(col_var, "ms_")) { # Multi-select
202+
output <- code_multiselect(df, aggregations, col_var)
203+
} else {
204+
# Multiple choice and variables that are formatted differently
189205
output <- list(df, aggregations)
190206
}
191-
192207
df <- output[[1]]
193208
aggregations <- output[[2]]
194209
}
@@ -233,28 +248,27 @@ summarize_aggs <- function(df, crosswalk_data, aggregations, geo_level, params)
233248
## inefficient; profiling shows the cost to be negligible, so shut it up
234249
df <- suppressWarnings(inner_join(df, crosswalk_data, by = "zip5"))
235250

236-
groupby_vars <- aggregations$group_by[[1]]
251+
group_vars <- aggregations$group_by[[1]]
237252

238-
if (all(groupby_vars %in% names(df))) {
239-
unique_group_combos <- unique(df[, groupby_vars, with=FALSE])
253+
if (all(group_vars %in% names(df))) {
254+
unique_group_combos <- unique(df[, group_vars, with=FALSE])
240255
unique_group_combos <- unique_group_combos[complete.cases(unique_group_combos)]
241256
} else {
242257
msg_plain(
243258
sprintf(
244259
"not all of groupby columns %s available in data; skipping aggregation",
245-
paste(groupby_vars, collapse=", ")
260+
paste(group_vars, collapse=", ")
246261
))
247262
}
248263

249264
if ( !exists("unique_group_combos") || nrow(unique_group_combos) == 0 ) {
250265
return(list())
251266
}
252267

253-
254268
## Set an index on the groupby var columns so that the groupby step can be
255269
## faster; data.table stores the sort order of the column and
256270
## uses a binary search to find matching values, rather than a linear scan.
257-
setindexv(df, groupby_vars)
271+
setindexv(df, group_vars)
258272

259273
calculate_group <- function(ii) {
260274
target_group <- unique_group_combos[ii]
@@ -287,15 +301,15 @@ summarize_aggs <- function(df, crosswalk_data, aggregations, geo_level, params)
287301
## Do post-processing.
288302
for (row in seq_len(nrow(aggregations))) {
289303
aggregation <- aggregations$id[row]
290-
groupby_vars <- aggregations$group_by[[row]]
304+
group_vars <- aggregations$group_by[[row]]
291305
post_fn <- aggregations$post_fn[[row]]
292306

293307
dfs_out[[aggregation]] <- dfs_out[[aggregation]][
294-
rowSums(is.na(dfs_out[[aggregation]][, c("val", "sample_size", groupby_vars)])) == 0,
308+
rowSums(is.na(dfs_out[[aggregation]][, c("val", "sample_size", group_vars)])) == 0,
295309
]
296310

297311
if (geo_level == "county") {
298-
df_megacounties <- megacounty(dfs_out[[aggregation]], params$num_filter, groupby_vars)
312+
df_megacounties <- megacounty(dfs_out[[aggregation]], params$num_filter, group_vars)
299313
dfs_out[[aggregation]] <- bind_rows(dfs_out[[aggregation]], df_megacounties)
300314
}
301315

0 commit comments

Comments
 (0)