Skip to content

Commit ae6f011

Browse files
minhkhuldsweber2nmdefriesdshemetov
authored
nssp pipeline code (#1952)
* to make nssp run in staging * add nssp to Jenkinsfile * nssp_token name change * et code * Update nssp/delphi_nssp/run.py Co-authored-by: David Weber <[email protected]> * Update nssp/README.md Co-authored-by: David Weber <[email protected]> * Update nssp/DETAILS.md Co-authored-by: David Weber <[email protected]> * Update nssp/delphi_nssp/__main__.py Co-authored-by: nmdefries <[email protected]> * Update nssp/delphi_nssp/pull.py Co-authored-by: nmdefries <[email protected]> * Update nssp/delphi_nssp/run.py Co-authored-by: nmdefries <[email protected]> * readme update * column names mapping + signals name standardization to fit with other available sources and signals" * improve readability * Add type_dict constant * more type_dict * add more unit test pull * data for unit test of pull * hrr + msa geos * use enumerate for clarity * set nssp sircal max_age to 13 days * set nssp sircal max_age to 15 days, to account for nighttime run * add validation to params * Update nssp/DETAILS.md Co-authored-by: nmdefries <[email protected]> * Update nssp/delphi_nssp/constants.py Co-authored-by: nmdefries <[email protected]> * et code * Update nssp/delphi_nssp/run.py Co-authored-by: David Weber <[email protected]> * Update nssp/README.md Co-authored-by: David Weber <[email protected]> * Update nssp/DETAILS.md Co-authored-by: David Weber <[email protected]> * Update nssp/delphi_nssp/__main__.py Co-authored-by: nmdefries <[email protected]> * Update nssp/delphi_nssp/pull.py Co-authored-by: nmdefries <[email protected]> * Update nssp/delphi_nssp/run.py Co-authored-by: nmdefries <[email protected]> * readme update * column names mapping + signals name standardization to fit with other available sources and signals" * improve readability * Add type_dict constant * more type_dict * add more unit test pull * data for unit test of pull * hrr + msa geos * use enumerate for clarity * to make nssp run in staging * add nssp to Jenkinsfile * nssp_token name change * set nssp sircal max_age to 15 days, to account for nighttime run * set nssp sircal max_age to 13 days * add validation to params * Update nssp/DETAILS.md Co-authored-by: nmdefries <[email protected]> * Update nssp/delphi_nssp/constants.py Co-authored-by: nmdefries <[email protected]> * nssp correlation rmd and general notebook folder * making Black happy * update to new geomapper function * following 120 line convention everywhere * happy linter * happy black formatter in nssp * drop unneeded nssp tests * updates borked old tests, caught by @dshemetov * rebase woes and version consistency * Update nssp-params-prod.json.j2 min/max lag to 13 * Update params.json.template min/max lag to 7 and 13 * missed column renames for geo_mapper, unneeded index * lint+fix: update from linter changes * add make format to nssp Makefile * run make format on nssp * ci: update ci to lint nssp * lint: linter happy * lint: pydocstyle happy * lint: pydocstyle happy * pct_visits to pct_ed_visits --------- Co-authored-by: David Weber <[email protected]> Co-authored-by: nmdefries <[email protected]> Co-authored-by: Dmitry Shemetov <[email protected]>
1 parent 38ea2da commit ae6f011

37 files changed

+3589
-2
lines changed

.github/workflows/python-ci.yml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,8 @@ jobs:
3131
dir: "delphi_hhs"
3232
- package: "nchs_mortality"
3333
dir: "delphi_nchs_mortality"
34+
- package: "nssp"
35+
dir: "delphi_nssp"
3436
- package: "nwss_wastewater"
3537
dir: "delphi_nwss"
3638
- package: "quidel_covidtest"

Jenkinsfile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
- TODO: #527 Get this list automatically from python-ci.yml at runtime.
1111
*/
1212

13-
def indicator_list = ["backfill_corrections", "changehc", "claims_hosp", "google_symptoms", "hhs_hosp", "nchs_mortality", "quidel_covidtest", "sir_complainsalot", "doctor_visits", "nwss_wastewater"]
13+
def indicator_list = ["backfill_corrections", "changehc", "claims_hosp", "google_symptoms", "hhs_hosp", "nchs_mortality", "quidel_covidtest", "sir_complainsalot", "doctor_visits", "nwss_wastewater", "nssp"]
1414
def build_package_main = [:]
1515
def build_package_prod = [:]
1616
def deploy_staging = [:]

_delphi_utils_python/tests/test_weekday.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,4 +58,4 @@ def test_calc_adjustment(self):
5858
# The date and "den" column are unchanged by this function
5959
assert np.allclose(result["num"].values, expected_nums)
6060
assert np.allclose(result["den"].values, self.TEST_DATA["den"].values)
61-
assert np.array_equal(result["date"].values, self.TEST_DATA["date"].values)
61+
assert np.array_equal(result["date"].values, self.TEST_DATA["date"].values)
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
{
2+
"common": {
3+
"export_dir": "/common/covidcast/receiving/nssp",
4+
"log_filename": "/var/log/indicators/nssp.log",
5+
"log_exceptions": false
6+
},
7+
"indicator": {
8+
"wip_signal": true,
9+
"static_file_dir": "./static",
10+
"socrata_token": "{{ nssp_token }}"
11+
},
12+
"validation": {
13+
"common": {
14+
"data_source": "nssp",
15+
"api_credentials": "{{ validation_api_key }}",
16+
"span_length": 15,
17+
"min_expected_lag": {"all": "7"},
18+
"max_expected_lag": {"all": "13"},
19+
"dry_run": true,
20+
"suppressed_errors": []
21+
},
22+
"static": {
23+
"minimum_sample_size": 0,
24+
"missing_se_allowed": true,
25+
"missing_sample_size_allowed": true
26+
},
27+
"dynamic": {}
28+
}
29+
}

ansible/templates/sir_complainsalot-params-prod.json.j2

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,10 @@
5050
"hhs": {
5151
"max_age":15,
5252
"maintainers": []
53+
},
54+
"nssp": {
55+
"max_age":13,
56+
"maintainers": []
5357
}
5458
}
5559
}

ansible/vars.yaml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,9 @@ nchs_mortality_token: "{{ vault_cdc_socrata_token }}"
5656
# NWSS
5757
nwss_wastewater_token: "{{ vault_cdc_socrata_token }}"
5858

59+
# nssp
60+
nssp_token: "{{ vault_cdc_socrata_token }}"
61+
5962
# SirCAL
6063
sir_complainsalot_api_key: "{{ vault_sir_complainsalot_api_key }}"
6164
sir_complainsalot_slack_token: "{{ vault_sir_complainsalot_slack_token }}"

notebooks/.Rprofile

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
source("renv/activate.R")

notebooks/nssp/cor_dashboard.Rmd

Lines changed: 257 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,257 @@
1+
---
2+
title: "Correlation Analyses for COVID-19 Indicators"
3+
author: "Delphi Group"
4+
date: "`r format(Sys.time(), '%B %d, %Y')`"
5+
output:
6+
html_document:
7+
code_folding: hide
8+
---
9+
10+
```{r, include = FALSE}
11+
knitr::opts_chunk$set(message = FALSE, warning = FALSE, fig.width = 8,
12+
fig.height = 7)
13+
```
14+
15+
### Getting data
16+
This requires that you've already run the nssp pipeline. See the `nssp` directory for instructions on doing that.
17+
First loading some libraries and reading the results from the pipeline:
18+
```{r}
19+
library(covidcast)
20+
library(epidatr)
21+
library(dplyr)
22+
library(ggplot2)
23+
24+
library(purrr)
25+
library(tidyverse)
26+
library(dplyr)
27+
library(readr)
28+
files <- list.files(here::here("nssp/receiving"), pattern="\\.csv$", full.names = TRUE)
29+
read_row <- function(filename) {
30+
split_name <- filename %>%
31+
tools::file_path_sans_ext() %>%
32+
strsplit("/") %>% `[[`(1) %>% tail(n=1) %>%
33+
strsplit("_") %>% `[[`(1)
34+
week_number <- split_name[[2]]
35+
geo_type <- split_name[[3]]
36+
col_name <- split_name[-(1:3)] %>% paste(collapse = "_")
37+
read_csv(filename, show_col_types = FALSE) %>%
38+
as_tibble %>%
39+
mutate(signal = col_name,
40+
geo_type = geo_type,
41+
week_number = week_number) %>%
42+
mutate(across(geo_id, factor)) %>%
43+
rename(geo_value = geo_id, time_value = week_number) %>%
44+
select(-missing_se, -se, -sample_size, -missing_sample_size) %>%
45+
return
46+
}
47+
res <- map(files, read_row)
48+
nssp_data <- bind_rows(res)
49+
nssp_state <- nssp_data %>%
50+
filter(geo_type == "state") %>%
51+
mutate(time_value = epidatr:::parse_api_week(time_value)) %>%
52+
as_epi_df(time_type = "week", geo_type = "state") %>%
53+
select(-missing_val, -geo_type)
54+
unique(nssp_data$time_value)
55+
```
56+
And epidatr versions of hhs for comparison
57+
```{r}
58+
library(epidatr)
59+
eval_time <- epidatr::epirange(from = "2020-01-01", to = Sys.Date())
60+
fetch_args <- epidatr::fetch_args_list(return_empty = TRUE, timeout_seconds = 300)
61+
62+
flu_hhs <- epidatr::pub_covidcast(
63+
source = "hhs",
64+
signals = "confirmed_admissions_influenza_1d_prop_7dav",
65+
geo_type = "state",
66+
time_type = "day",
67+
geo_values = "*",
68+
time_values = eval_time,
69+
fetch_args = fetch_args
70+
) %>%
71+
select(-signal, -source, - time_type)
72+
73+
covid_hhs <- epidatr::pub_covidcast(
74+
source = "hhs",
75+
signals = "confirmed_admissions_covid_1d_prop_7dav",
76+
geo_type = "state",
77+
time_type = "day",
78+
geo_values = "*",
79+
time_values = eval_time,
80+
fetch_args = fetch_args
81+
) %>%
82+
select(-signal, -source, - time_type)
83+
84+
85+
nchs <- epidatr::pub_covidcast(
86+
source = "nchs-mortality",
87+
signals = "deaths_allcause_incidence_num",
88+
geo_type = "state",
89+
time_type = "week",
90+
geo_values = "*",
91+
time_values = epidatr::epirange(from = "202001", to = "202418"),
92+
fetch_args = epidatr::fetch_args_list(return_empty = TRUE, timeout_seconds = 300)
93+
)
94+
```
95+
# Flu
96+
```{r}
97+
library(epiprocess)
98+
nssp_flu_state <- nssp_state %>% filter(signal == "pct_ed_visits_influenza") %>% select(-signal) %>% drop_na %>% rename(pct_flu_visits = val) %>% as_epi_df(time_type = "week", geo_type = "state")
99+
week_starts <- nssp_flu_state$time_value %>% unique()
100+
flu_hhs_weekly <- flu_hhs %>% select(geo_value, time_value, value) %>% filter(time_value %in% week_starts) %>% rename(conf_admission = value) %>% drop_na %>% as_epi_df(time_type = "week", geo_type = "state")
101+
joined <- nssp_flu_state %>% left_join(flu_hhs_weekly)
102+
```
103+
104+
After the necessary joining, lets look at the average correlations
105+
```{r}
106+
cor(joined$pct_flu_visits, joined$conf_admission, method = "spearman")
107+
```
108+
So the overall correlation is pretty high.
109+
110+
## Correlations sliced by state
111+
```{r}
112+
correlations_space_flu <- epi_cor(joined, pct_flu_visits, conf_admission, cor_by = "geo_value", use = "complete.obs", method = "spearman")
113+
library(maps) # For map data
114+
states_map <- map_data("state")
115+
mapped <- states_map %>% as_tibble %>% mutate(geo_value = setNames(tolower(state.abb), tolower(state.name))[region]) %>% right_join(correlations_space_flu) %>% arrange(group, order)
116+
library(viridis)
117+
ggplot(mapped, aes(x = long, y = lat, group = group, fill = cor)) +
118+
geom_polygon(colour = "black") +
119+
scale_fill_viridis(discrete=FALSE, option="viridis", limits = c(0,1)) +
120+
coord_map("polyconic") +
121+
labs(title = "Spearman Correlations between Flu ER visits and Flu hospital admissions")
122+
ggsave("flu_ER_admissions_state_correlations.pdf")
123+
```
124+
Over space, hospital admissions look like they're highly correlated with ER visits (which makes sense, frequently when one is admitted it is via the ER).
125+
The lowest overall correlation is
126+
```{r}
127+
correlations_space_flu %>% summarize(across(where(is.numeric), .fns = list(min = min, median = median, mean = mean, std = sd, q25 = ~quantile(.,0.25), q75 = ~quantile(.,0.75), max = max)))
128+
```
129+
### Lag evaluation
130+
```{r}
131+
library(purrr)
132+
lags <- 0:35
133+
134+
lagged_flu_state <- map_dfr(lags, function(lag) {
135+
epi_cor(joined, pct_flu_visits, conf_admission, cor_by = geo_value, dt1 = lag, use = "complete.obs", method = "spearman") %>%
136+
mutate(lag = .env$lag)
137+
})
138+
139+
lagged_flu_state %>%
140+
group_by(lag) %>%
141+
summarize(mean = mean(cor, na.rm = TRUE)) %>%
142+
ggplot(aes(x = lag, y = mean)) +
143+
geom_line() +
144+
geom_point() +
145+
labs(x = "Lag", y = "Mean correlation", title = "Lag comparison for state spearman correlations for flu ER and Hosp admissions")
146+
ggsave("flu_ER_admissions_state_lag_cor.pdf")
147+
```
148+
Somewhat unsurprisingly, the correlation is highest immediately afterward.
149+
## Correlations sliced by time
150+
```{r}
151+
correlations_time_flu <- epi_cor(joined, pct_flu_visits, conf_admission, cor_by = "time_value", use = "complete.obs", method = "spearman")
152+
correlations_time_flu
153+
ggplot(correlations_time_flu, aes(x = time_value, y = cor)) + geom_line() + lims(y=c(0,1)) + labs(title = "Spearman Correlations between Flu ER visits and Flu hospital admissions")
154+
ggsave("flu_ER_admissions_time_correlations.pdf")
155+
```
156+
Strangely, sliced by time, we get significantly lower correlations
157+
```{r}
158+
correlations_time_flu %>% summarize(across(where(is.numeric), .fns = list(min = min, median = median, mean = mean, std = sd, q25 = ~quantile(.,0.25), q75 = ~quantile(.,0.75), max = max)))
159+
```
160+
Seems like we have a Simpson's paradox adjacent result, since for any given location the signals are fairly well correlated when averaged over time, but at a given time, averaging over different locations suggests they're not very well correlated.
161+
If the typical explanation applies, this means that there are large differences in the number of points.
162+
163+
so, getting the counts:
164+
```{r}
165+
joined %>% group_by(geo_value) %>% count %>% arrange(n) %>% ungroup %>% summarise(across(where(is.numeric), .fns = list(min = min, max = max)))
166+
```
167+
Each location has 82
168+
169+
```{r}
170+
joined %>% group_by(time_value) %>% count %>% arrange(n) %>% ungroup %>% summarise(across(where(is.numeric), .fns = list(min = min, max = max)))
171+
```
172+
# Covid
173+
```{r}
174+
library(epiprocess)
175+
nssp_data %>% pull(signal) %>% unique
176+
nssp_state <- nssp_data %>%
177+
filter(geo_type == "state") %>%
178+
mutate(time_value = epidatr:::parse_api_week(time_value)) %>%
179+
as_epi_df(time_type = "week", geo_type = "state") %>%
180+
select(-missing_val, -geo_type)
181+
nssp_covid_state <- nssp_state %>% filter(signal == "pct_ed_visits_covid") %>% select(-signal) %>% drop_na %>% rename(pct_covid_visits = val) %>% as_epi_df(time_type = "week", geo_type = "state")
182+
week_starts <- nssp_covid_state$time_value %>% unique()
183+
covid_hhs_weekly <- covid_hhs %>% select(geo_value, time_value, value) %>% filter(time_value %in% week_starts) %>% rename(conf_admission = value) %>% drop_na %>% as_epi_df(time_type = "week", geo_type = "state")
184+
joined_covid <- nssp_covid_state %>% left_join(covid_hhs_weekly)
185+
```
186+
187+
After the necessary joining, lets look at the average correlations
188+
```{r}
189+
cor(joined_covid$pct_covid_visits, joined_covid$conf_admission, method = "spearman")
190+
```
191+
So the overall correlation is pretty high, but lower than flu.
192+
193+
## Correlations sliced by state
194+
```{r}
195+
correlations_space_covid <- epi_cor(joined_covid, pct_covid_visits, conf_admission, cor_by = "geo_value", use = "complete.obs", method = "spearman")
196+
library(maps) # For map data
197+
states_map <- map_data("state")
198+
mapped <- states_map %>% as_tibble %>% mutate(geo_value = setNames(tolower(state.abb), tolower(state.name))[region]) %>% right_join(correlations_space_covid) %>% arrange(group, order)
199+
library(viridis)
200+
ggplot(mapped, aes(x = long, y = lat, group = group, fill = cor)) +
201+
geom_polygon(colour = "black") +
202+
scale_fill_viridis(discrete=FALSE, option="viridis", limits = c(0,1)) +
203+
coord_map("polyconic") +
204+
labs(title = "Spearman Correlations between covid ER visits and covid hospital admissions")
205+
ggsave("covid_ER_admissions_state_correlations.pdf")
206+
ggsave("covid_ER_admissions_state_correlations.png")
207+
```
208+
Over space, hospital admissions look like they're highly correlated with ER visits (which makes sense, frequently when one is admitted it is via the ER).
209+
The lowest overall correlation is
210+
```{r}
211+
correlations_space_covid %>% summarize(across(where(is.numeric), .fns = list(min = min, median = median, mean = mean, std = sd, q25 = ~quantile(.,0.25), q75 = ~quantile(.,0.75), max = max)))
212+
```
213+
### Lag evaluation
214+
```{r}
215+
library(purrr)
216+
lags <- 0:35
217+
218+
lagged_covid_state <- map_dfr(lags, function(lag) {
219+
epi_cor(joined_covid, pct_covid_visits, conf_admission, cor_by = geo_value, dt1 = -lag, use = "complete.obs", method = "spearman") %>%
220+
mutate(lag = .env$lag)
221+
})
222+
223+
lagged_covid_state %>%
224+
group_by(lag) %>%
225+
summarize(mean = mean(cor, na.rm = TRUE)) %>%
226+
ggplot(aes(x = lag, y = mean)) +
227+
geom_line() +
228+
geom_point() +
229+
labs(x = "Lag", y = "Mean correlation", title = "Lag comparison for state spearman correlations for covid ER and Hosp admissions")
230+
ggsave("covid_ER_admissions_state_lag_cor.pdf")
231+
ggsave("covid_ER_admissions_state_lag_cor.png")
232+
```
233+
Somewhat unsurprisingly, the correlation is highest immediately afterward, though its significantly lower than in the flu case.
234+
## Correlations sliced by time
235+
```{r}
236+
correlations_time_covid <- epi_cor(joined_covid, pct_covid_visits, conf_admission, cor_by = "time_value", use = "complete.obs", method = "spearman")
237+
correlations_time_covid
238+
ggplot(correlations_time_covid, aes(x = time_value, y = cor)) + geom_line() + lims(y=c(0,1)) + labs(title = "Spearman Correlations between covid ER visits and covid hospital admissions")
239+
ggsave("covid_ER_admissions_time_correlations.pdf")
240+
ggsave("covid_ER_admissions_time_correlations.png")
241+
```
242+
Strangely, sliced by time, we get significantly lower correlations, some of them are even negative
243+
```{r}
244+
correlations_time_covid %>% summarize(across(where(is.numeric), .fns = list(min = min, median = median, mean = mean, std = sd, q25 = ~quantile(.,0.25), q75 = ~quantile(.,0.75), max = max)))
245+
```
246+
Seems like we have a Simpson's paradox adjacent result, since for any given location the signals are fairly well correlated when averaged over time, but at a given time, averaging over different locations suggests they're not very well correlated.
247+
If the typical explanation applies, this means that there are large differences in the number of points.
248+
249+
so, getting the counts:
250+
```{r}
251+
joined_covid %>% group_by(geo_value) %>% count %>% arrange(n) %>% ungroup %>% summarise(across(where(is.numeric), .fns = list(min = min, max = max)))
252+
```
253+
Each location has 82
254+
255+
```{r}
256+
joined_covid %>% group_by(time_value) %>% count %>% arrange(n) %>% ungroup %>% summarise(across(where(is.numeric), .fns = list(min = min, max = max)))
257+
```
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.

0 commit comments

Comments
 (0)