@@ -12,9 +12,21 @@ suppressPackageStartupMessages({
12
12
library(tibble )
13
13
library(tidyr )
14
14
library(rlang )
15
- library(epidatr )
16
15
})
17
16
17
+ # The external scores processing causes the pipeline to exit with an error,
18
+ # apparently due to running out of memory. Set up a non-parallel `crew`
19
+ # controller to avoid.
20
+ # https://books.ropensci.org/targets/crew.html#heterogeneous-workers
21
+ main_controller <- crew_controller_local(
22
+ name = " main_controller" ,
23
+ workers = parallel :: detectCores() - 5
24
+ )
25
+ serial_controller <- crew_controller_local(
26
+ name = " serial_controller" ,
27
+ workers = 1L
28
+ )
29
+
18
30
tar_option_set(
19
31
packages = c(
20
32
" assertthat" ,
@@ -30,159 +42,29 @@ tar_option_set(
30
42
), # packages that your targets need to run
31
43
imports = c(" epieval" , " parsnip" ),
32
44
format = " qs" , # Optionally set the default storage format. qs is fast.
33
- controller = crew :: crew_controller_local(workers = parallel :: detectCores() - 5 ),
34
- )
35
- # Run the R scripts in the R/ folder with your custom functions:
36
- # tar_source()
37
- linreg <- parsnip :: linear_reg()
38
- quantreg <- epipredict :: quantile_reg()
39
-
40
- grids <- list (
41
- tidyr :: expand_grid(
42
- forecaster = " scaled_pop" ,
43
- trainer = c(" linreg" , " quantreg" ),
44
- ahead = 1 : 4 ,
45
- pop_scaling = c(TRUE , FALSE )
46
- ),
47
- tidyr :: expand_grid(
48
- forecaster = " scaled_pop" ,
49
- trainer = c(" linreg" , " quantreg" ),
50
- ahead = 5 : 7 ,
51
- lags = list (c(0 , 3 , 5 , 7 , 14 ), c(0 , 7 , 14 )),
52
- pop_scaling = c(TRUE , FALSE )
53
- )
54
- )
55
- # bind them together and give static ids; if you add a new field to a given
56
- # expand_grid, everything will get a new id, so it's better to add a new
57
- # expand_grid instead
58
- param_grid <- bind_rows(map(grids , add_id )) %> % relocate(id , .after = last_col())
59
-
60
- forecaster_param_grids <- make_target_param_grid(param_grid ) %> %
61
- # # TODO This forecaster is hanging. Filter it out for now.
62
- filter(id != " necessary endless 5" )
63
-
64
- # not actually used downstream, this is for lookup during plotting and human evaluation
65
- forecasters <- list (
66
- tar_target(
67
- name = forecasters ,
68
- command = {
69
- param_grid
70
- }
71
- )
72
- )
73
-
74
- response_signal <- " confirmed_admissions_influenza_1d_prop_7dav"
75
- target_range <- epirange(from = " 20211001" , to = " 20220401" )
76
- data <- list (
77
- tar_target(
78
- name = hhs_evaluation_data ,
79
- command = {
80
- epidatr :: pub_covidcast(
81
- source = " hhs" ,
82
- signals = response_signal ,
83
- geo_type = " state" ,
84
- time_type = " day" ,
85
- geo_values = " *" ,
86
- time_values = epirange(from = " 2020-01-01" , to = " 2024-01-01" ),
87
- ) %> %
88
- rename(
89
- actual = value ,
90
- target_end_date = time_value
91
- )
92
- }
93
- ),
94
- tar_target(
95
- name = hhs_archive_data_2022 ,
96
- command = {
97
- epidatr :: pub_covidcast(
98
- source = " hhs" ,
99
- signals = response_signal ,
100
- geo_type = " state" ,
101
- time_type = " day" ,
102
- geo_values = " *" ,
103
- time_values = target_range ,
104
- issues = " *" ,
105
- fetch_params = fetch_params_list(return_empty = TRUE , timeout_seconds = 100 )
106
- )
107
- }
108
- ),
109
- tar_target(
110
- name = chng_archive_data_2022 ,
111
- command = {
112
- epidatr :: pub_covidcast(
113
- source = " chng" ,
114
- signals = " smoothed_adj_outpatient_flu" ,
115
- geo_type = " state" ,
116
- time_type = " day" ,
117
- geo_values = " *" ,
118
- time_values = target_range ,
119
- issues = " *" ,
120
- fetch_params = fetch_params_list(return_empty = TRUE , timeout_seconds = 100 )
121
- )
122
- }
123
- ),
124
- tar_target(
125
- name = joined_archive_data_2022 ,
126
- command = {
127
- hhs_archive_data_2022 %<> %
128
- select(geo_value , time_value , value , issue ) %> %
129
- rename(" hhs" : = value ) %> %
130
- rename(version = issue ) %> %
131
- as_epi_archive(
132
- geo_type = " state" ,
133
- time_type = " day" ,
134
- compactify = TRUE
135
- )
136
- chng_archive_data_2022 %<> %
137
- select(geo_value , time_value , value , issue ) %> %
138
- rename(" chng" : = value ) %> %
139
- rename(version = issue ) %> %
140
- as_epi_archive(
141
- geo_type = " state" ,
142
- time_type = " day" ,
143
- compactify = TRUE
144
- )
145
- epix_merge(hhs_archive_data_2022 , chng_archive_data_2022 , sync = " locf" )
146
- }
147
- ),
148
- tar_target(
149
- name = hhs_latest_data_2022 ,
150
- command = {
151
- epidatr :: pub_covidcast(
152
- source = " hhs" ,
153
- signals = " confirmed_admissions_covid_1d" ,
154
- geo_type = " state" ,
155
- time_type = " day" ,
156
- geo_values = " *" ,
157
- time_values = epirange(from = " 20220101" , to = " 20220401" ),
158
- fetch_params = fetch_params_list(return_empty = TRUE , timeout_seconds = 100 )
159
- )
160
- }
161
- ),
162
- tar_target(
163
- name = chng_latest_data_2022 ,
164
- command = {
165
- epidatr :: pub_covidcast(
166
- source = " chng" ,
167
- signals = " smoothed_adj_outpatient_covid" ,
168
- geo_type = " state" ,
169
- time_type = " day" ,
170
- geo_values = " *" ,
171
- time_values = epirange(from = " 20220101" , to = " 20220401" ),
172
- fetch_params = fetch_params_list(return_empty = TRUE , timeout_seconds = 100 )
173
- )
174
- }
45
+ controller = crew_controller_group(main_controller , serial_controller ),
46
+ # Set default crew controller.
47
+ # https://books.ropensci.org/targets/crew.html#heterogeneous-workers
48
+ resources = tar_resources(
49
+ crew = tar_resources_crew(controller = " main_controller" )
175
50
)
176
51
)
177
52
53
+ # Run the R scripts in the R/ folder with your custom functions:
54
+ # tar_source()
55
+ # where the forecasters and parameters are joined; see either the variable param_grid or `tar_read(forecasters)`
56
+ source(" flu_hosp_explore/forecaster_instantiation.R" )
57
+ source(" flu_hosp_explore/data_targets.R" )
58
+ # Auto-generated in run.R
59
+ source(" flu_hosp_explore/dynamic_constants.R" )
178
60
179
- forecasts_and_scores <- tar_map(
61
+ forecasts_and_scores_by_ahead <- tar_map(
180
62
values = forecaster_param_grids ,
181
63
names = id ,
182
64
unlist = FALSE ,
183
- tar_target (
184
- name = forecast ,
185
- command = {
65
+ tar_target_raw (
66
+ name = ONE_AHEAD_FORECAST_NAME ,
67
+ command = expression(
186
68
forecaster_pred(
187
69
data = joined_archive_data_2022 ,
188
70
outcome = " hhs" ,
@@ -192,34 +74,51 @@ forecasts_and_scores <- tar_map(
192
74
forecaster_args = params ,
193
75
forecaster_args_names = param_names
194
76
)
195
- }
77
+ )
196
78
),
197
- tar_target (
198
- name = score ,
199
- command = {
79
+ tar_target_raw (
80
+ name = ONE_AHEAD_SCORE_NAME ,
81
+ command = expression(
200
82
run_evaluation_measure(
201
- data = forecast ,
83
+ data = forecast_by_ahead ,
202
84
evaluation_data = hhs_evaluation_data ,
203
85
measure = list (
204
86
wis = weighted_interval_score ,
205
87
ae = absolute_error ,
206
88
cov_80 = interval_coverage(0.8 )
207
89
)
208
90
)
209
- }
91
+ )
210
92
)
211
93
)
212
94
213
- ensemble_keys <- list (a = c(300 , 15 ))
214
- ensembles <- list (
95
+ forecasts_and_scores <- tar_map(
96
+ values = forecaster_parent_id_map ,
97
+ names = parent_id ,
98
+ tar_target(
99
+ name = forecast ,
100
+ command = {
101
+ bind_rows(forecast_component_ids ) %> %
102
+ mutate(parent_forecaster = parent_id )
103
+ }
104
+ ),
215
105
tar_target(
216
- name = ensembles ,
106
+ name = score ,
217
107
command = {
218
- ensemble_keys
108
+ bind_rows(score_component_ids ) %> %
109
+ mutate(parent_forecaster = parent_id )
219
110
}
220
111
)
221
112
)
222
113
114
+ ensemble_keys <- list (a = c(300 , 15 ))
115
+ ensemble_targets <- tar_target(
116
+ name = ensembles ,
117
+ command = {
118
+ ensemble_keys
119
+ }
120
+ )
121
+
223
122
# The combine approach below is taken from the manual:
224
123
# https://books.ropensci.org/targets/static.html#combine
225
124
# The key is that the map above has unlist = FALSE.
@@ -229,8 +128,8 @@ ensemble_forecast <- tar_map(
229
128
name = ensemble_forecast ,
230
129
# TODO: Needs a lookup table to select the right forecasters
231
130
list (
232
- forecasts_and_scores [[" forecast " ]][[1 ]],
233
- forecasts_and_scores [[" forecast " ]][[2 ]]
131
+ forecasts_and_scores_by_ahead [[" forecast_by_ahead " ]][[1 ]],
132
+ forecasts_and_scores_by_ahead [[" forecast_by_ahead " ]][[2 ]]
234
133
),
235
134
command = {
236
135
bind_rows(!!! .x , .id = " forecaster" ) %> %
@@ -261,10 +160,69 @@ ensemble_forecast <- tar_map(
261
160
)
262
161
)
263
162
163
+ if (LOAD_EXTERNAL_SCORES ) {
164
+ external_names_and_scores <- list (
165
+ tar_target(
166
+ name = external_scores_df ,
167
+ command = {
168
+ readRDS(external_scores_path ) %> %
169
+ group_by(forecaster ) %> %
170
+ targets :: tar_group()
171
+ },
172
+ iteration = " group" ,
173
+ garbage_collection = TRUE
174
+ ),
175
+ tar_target(
176
+ name = external_names ,
177
+ command = {
178
+ external_scores_df %> %
179
+ group_by(forecaster ) %> %
180
+ group_keys() %> %
181
+ pull(forecaster )
182
+ },
183
+ garbage_collection = TRUE
184
+ ),
185
+ tar_target(
186
+ name = external_scores ,
187
+ pattern = map(external_scores_df ),
188
+ command = {
189
+ external_scores_df
190
+ },
191
+ # This step causes the pipeline to exit with an error, apparently due to
192
+ # running out of memory. Run this in series on a non-parallel `crew`
193
+ # controller to avoid.
194
+ # https://books.ropensci.org/targets/crew.html#heterogeneous-workers
195
+ resources = tar_resources(
196
+ crew = tar_resources_crew(controller = " serial_controller" )
197
+ ),
198
+ memory = " transient" ,
199
+ garbage_collection = TRUE
200
+ )
201
+ )
202
+ } else {
203
+ external_names_and_scores <- list (
204
+ tar_target(
205
+ name = external_names ,
206
+ command = {
207
+ c()
208
+ }
209
+ ),
210
+ tar_target(
211
+ name = external_scores ,
212
+ command = {
213
+ data.frame ()
214
+ }
215
+ )
216
+ )
217
+ }
218
+
219
+
264
220
list (
265
- data ,
266
- forecasters ,
221
+ data_targets ,
222
+ forecaster_targets ,
223
+ forecasts_and_scores_by_ahead ,
267
224
forecasts_and_scores ,
268
- ensembles ,
269
- ensemble_forecast
225
+ ensemble_targets ,
226
+ ensemble_forecast ,
227
+ external_names_and_scores
270
228
)
0 commit comments