16
16
# ' @details An `epi_archive` is an R6 class which contains a data table `DT`, of
17
17
# ' class `data.table` from the `data.table` package, with (at least) the
18
18
# ' following columns:
19
- # '
19
+ # '
20
20
# ' * `geo_value`: the geographic value associated with each row of measurements.
21
21
# ' * `time_value`: the time value associated with each row of measurements.
22
22
# ' * `version`: the time value specifying the version for each row of
31
31
# ' on `DT` directly). There can only be a single row per unique combination of
32
32
# ' key variables, and thus the key variables are critical for figuring out how
33
33
# ' to generate a snapshot of data from the archive, as of a given version.
34
- # '
34
+ # '
35
35
# ' In general, last observation carried forward (LOCF) is used to data in
36
36
# ' between recorded versions. Currently, deletions must be represented as
37
37
# ' revising a row to a special state (e.g., making the entries `NA` or
43
43
# ' reference semantics. A primary consequence of this is that objects are not
44
44
# ' copied when modified. You can read more about this in Hadley Wickham's
45
45
# ' [Advanced R](https://adv-r.hadley.nz/r6.html#r6-semantics) book.
46
- # '
46
+ # '
47
47
# ' @section Metadata:
48
48
# ' The following pieces of metadata are included as fields in an `epi_archive`
49
49
# ' object:
75
75
# ' sliding computation at any given reference time point t is performed on
76
76
# ' **data that would have been available as of t**. More details on `slide()`
77
77
# ' are documented in the wrapper function `epix_slide()`.
78
- # '
78
+ # '
79
+ # ' @importFrom R6 R6Class
79
80
# ' @export
80
81
epi_archive =
81
82
R6 :: R6Class(
@@ -88,7 +89,7 @@ epi_archive =
88
89
additional_metadata = NULL ,
89
90
# ' @description Creates a new `epi_archive` object.
90
91
# ' @param x A data frame, data table, or tibble, with columns `geo_value`,
91
- # ' `time_value`, `version`, and then any additional number of columns.
92
+ # ' `time_value`, `version`, and then any additional number of columns.
92
93
# ' @param geo_type Type for the geo values. If missing, then the function will
93
94
# ' attempt to infer it from the geo values present; if this fails, then it
94
95
# ' will be set to "custom".
@@ -104,12 +105,12 @@ epi_archive =
104
105
# ' @return An `epi_archive` object.
105
106
# ' @importFrom data.table as.data.table key setkeyv
106
107
initialize = function (x , geo_type , time_type , other_keys ,
107
- additional_metadata ) {
108
+ additional_metadata ) {
108
109
# Check that we have a data frame
109
110
if (! is.data.frame(x )) {
110
111
Abort(" `x` must be a data frame." )
111
112
}
112
-
113
+
113
114
# Check that we have geo_value, time_value, version columns
114
115
if (! (" geo_value" %in% names(x ))) {
115
116
Abort(" `x` must contain a `geo_value` column." )
@@ -120,7 +121,7 @@ epi_archive =
120
121
if (! (" version" %in% names(x ))) {
121
122
Abort(" `x` must contain a `version` column." )
122
123
}
123
-
124
+
124
125
# If geo type is missing, then try to guess it
125
126
if (missing(geo_type )) {
126
127
geo_type = guess_geo_type(x $ geo_value )
@@ -130,7 +131,7 @@ epi_archive =
130
131
if (missing(time_type )) {
131
132
time_type = guess_time_type(x $ time_value )
132
133
}
133
-
134
+
134
135
# Finish off with small checks on keys variables and metadata
135
136
if (missing(other_keys )) other_keys = NULL
136
137
if (missing(additional_metadata )) additional_metadata = list ()
@@ -144,7 +145,7 @@ epi_archive =
144
145
c(" geo_type" , " time_type" ))) {
145
146
Warn(" `additional_metadata` names overlap with existing metadata fields \" geo_type\" , \" time_type\" ." )
146
147
}
147
-
148
+
148
149
# Create the data table; if x was an un-keyed data.table itself,
149
150
# then the call to as.data.table() will fail to set keys, so we
150
151
# need to check this, then do it manually if needed
@@ -162,8 +163,8 @@ epi_archive =
162
163
cat(" An `epi_archive` object, with metadata:\n " )
163
164
cat(sprintf(" * %-9s = %s\n " , " geo_type" , self $ geo_type ))
164
165
cat(sprintf(" * %-9s = %s\n " , " time_type" , self $ time_type ))
165
- if (! is.null(self $ additional_metadata )) {
166
- sapply(self $ additional_metadata , function (m ) {
166
+ if (! is.null(self $ additional_metadata )) {
167
+ sapply(self $ additional_metadata , function (m ) {
167
168
cat(sprintf(" * %-9s = %s\n " , names(m ), m ))
168
169
})
169
170
}
@@ -177,7 +178,7 @@ epi_archive =
177
178
cat(sprintf(" * %-14s = %s\n " , " max version" ,
178
179
max(self $ DT $ version )))
179
180
cat(" ----------\n " )
180
- cat(sprintf(" Data archive (stored in DT field): %i x %i\n " ,
181
+ cat(sprintf(" Data archive (stored in DT field): %i x %i\n " ,
181
182
nrow(self $ DT ), ncol(self $ DT )))
182
183
cat(" ----------\n " )
183
184
cat(sprintf(" Public methods: %s" ,
@@ -194,7 +195,7 @@ epi_archive =
194
195
other_keys = setdiff(key(self $ DT ),
195
196
c(" geo_value" , " time_value" , " version" ))
196
197
if (length(other_keys ) == 0 ) other_keys = NULL
197
-
198
+
198
199
# Check a few things on max_version
199
200
if (! identical(class(max_version ), class(self $ DT $ version ))) {
200
201
Abort(" `max_version` and `DT$version` must have same class." )
@@ -208,25 +209,25 @@ epi_archive =
208
209
if (max_version == self_max ) {
209
210
Warn(" Getting data as of the latest version possible. For a variety of reasons, it is possible that we only have a preliminary picture of this version (e.g., the upstream source has updated it but we have not seen it due to latency in synchronization). Thus, the snapshot that we produce here might not be reproducible at a later time (e.g., when the archive has caught up in terms of synchronization)." )
210
211
}
211
-
212
+
212
213
# Filter by version and return
213
214
return (
214
- # Make sure to use data.table ways of filtering and selecting
215
+ # Make sure to use data.table ways of filtering and selecting
215
216
self $ DT [between(time_value ,
216
217
min_time_value ,
217
218
max_version ) &
218
219
version < = max_version , ] %> %
219
220
unique(by = c(" geo_value" , " time_value" , other_keys ),
220
221
fromLast = TRUE ) %> %
221
- tibble :: as_tibble() %> %
222
+ tibble :: as_tibble() %> %
222
223
dplyr :: select(- .data $ version ) %> %
223
224
as_epi_df(geo_type = self $ geo_type ,
224
225
time_type = self $ time_type ,
225
226
as_of = max_version ,
226
227
additional_metadata = c(self $ additional_metadata ,
227
228
other_keys = other_keys ))
228
229
)
229
- },
230
+ },
230
231
# ####
231
232
# ' @description Merges another `data.table` with the current one, and allows for
232
233
# ' a post-filling of `NA` values by last observation carried forward (LOCF).
@@ -235,7 +236,7 @@ epi_archive =
235
236
merge = function (y , ... , locf = TRUE , nan = NA ) {
236
237
# Check we have a `data.table` object
237
238
if (! (inherits(y , " data.table" ) || inherits(y , " epi_archive" ))) {
238
- Abort(" `y` must be of class `data.table` or `epi_archive`." )
239
+ Abort(" `y` must be of class `data.table` or `epi_archive`." )
239
240
}
240
241
241
242
# Use the data.table merge function, carrying through ... args
@@ -250,42 +251,42 @@ epi_archive =
250
251
251
252
# Important: use nafill and not setnafill because the latter
252
253
# returns the entire data frame by reference, and the former can
253
- # be set to act on particular columns by reference using :=
254
+ # be set to act on particular columns by reference using :=
254
255
self $ DT [,
255
- (cols ) : = nafill(.SD , type = " locf" , nan = nan ),
256
- .SDcols = cols ,
256
+ (cols ) : = nafill(.SD , type = " locf" , nan = nan ),
257
+ .SDcols = cols ,
257
258
by = by ]
258
259
}
259
- },
260
+ },
260
261
# ####
261
262
# ' @description Slides a given function over variables in an `epi_archive`
262
263
# ' object. See the documentation for the wrapper function `epix_as_of()` for
263
- # ' details.
264
+ # ' details.
264
265
# ' @importFrom data.table key
265
266
# ' @importFrom rlang !! !!! enquo enquos is_quosure sym syms
266
- slide = function (f , ... , n = 7 , group_by , ref_time_values ,
267
+ slide = function (f , ... , n = 7 , group_by , ref_time_values ,
267
268
time_step , new_col_name = " slide_value" ,
268
269
as_list_col = FALSE , names_sep = " _" ,
269
- all_rows = FALSE ) {
270
+ all_rows = FALSE ) {
270
271
# If missing, then set ref time values to be everything; else make
271
- # sure we intersect with observed time values
272
+ # sure we intersect with observed time values
272
273
if (missing(ref_time_values )) {
273
274
ref_time_values = unique(self $ DT $ time_value )
274
275
}
275
276
else {
276
277
ref_time_values = ref_time_values [ref_time_values %in%
277
278
unique(self $ DT $ time_value )]
278
279
}
279
-
280
- # If a custom time step is specified, then redefine units
280
+
281
+ # If a custom time step is specified, then redefine units
281
282
before_num = n - 1
282
283
if (! missing(time_step )) before_num = time_step(n - 1 )
283
-
284
+
284
285
# What to group by? If missing, set according to internal keys
285
286
if (missing(group_by )) {
286
287
group_by = setdiff(key(self $ DT ), c(" time_value" , " version" ))
287
288
}
288
-
289
+
289
290
# Symbolize column name, defuse grouping variables. We have to do
290
291
# the middle step here which is a bit complicated (unfortunately)
291
292
# since the function epix_slide() could have called the current one,
@@ -297,20 +298,20 @@ epi_archive =
297
298
298
299
# Key variable names, apart from time value and version
299
300
key_vars = setdiff(key(self $ DT ), c(" time_value" , " version" ))
300
-
301
+
301
302
# Computation for one group, one time value
302
303
comp_one_grp = function (.data_group ,
303
- f , ... ,
304
+ f , ... ,
304
305
time_value ,
305
306
key_vars ,
306
307
new_col ) {
307
- # Carry out the specified computation
308
+ # Carry out the specified computation
308
309
comp_value = f(.data_group , ... )
309
310
310
311
# Count the number of appearances of the reference time value.
311
312
# Note: ideally, we want to directly count occurrences of the ref
312
313
# time value but due to latency, this will often not appear in the
313
- # data group. So we count the number of unique key values, outside
314
+ # data group. So we count the number of unique key values, outside
314
315
# of the time value column
315
316
count = sum(! duplicated(.data_group [, key_vars ]))
316
317
@@ -344,23 +345,23 @@ epi_archive =
344
345
else {
345
346
Abort(" The slide computation must return an atomic vector or a data frame." )
346
347
}
347
-
348
+
348
349
# Note that we've already recycled comp value to make size stable,
349
350
# so tibble() will just recycle time value appropriately
350
- return (tibble :: tibble(time_value = time_value ,
351
+ return (tibble :: tibble(time_value = time_value ,
351
352
!! new_col : = comp_value ))
352
353
}
353
-
354
+
354
355
# If f is not missing, then just go ahead, slide by group
355
356
if (! missing(f )) {
356
357
if (rlang :: is_formula(f )) f = rlang :: as_function(f )
357
-
358
+
358
359
x = purrr :: map_dfr(ref_time_values , function (t ) {
359
360
self $ as_of(t , min_time_value = t - before_num ) %> %
360
- tibble :: as_tibble() %> %
361
+ tibble :: as_tibble() %> %
361
362
dplyr :: group_by(!!! group_by ) %> %
362
363
dplyr :: group_modify(comp_one_grp ,
363
- f = f , ... ,
364
+ f = f , ... ,
364
365
time_value = t ,
365
366
key_vars = key_vars ,
366
367
new_col = new_col ,
@@ -378,14 +379,14 @@ epi_archive =
378
379
if (length(quos ) > 1 ) {
379
380
Abort(" If `f` is missing then only a single computation can be specified via `...`." )
380
381
}
381
-
382
+
382
383
quo = quos [[1 ]]
383
384
f = function (x , quo , ... ) rlang :: eval_tidy(quo , x )
384
385
new_col = sym(names(rlang :: quos_auto_name(quos )))
385
386
386
387
x = purrr :: map_dfr(ref_time_values , function (t ) {
387
388
self $ as_of(t , min_time_value = t - before_num ) %> %
388
- tibble :: as_tibble() %> %
389
+ tibble :: as_tibble() %> %
389
390
dplyr :: group_by(!!! group_by ) %> %
390
391
dplyr :: group_modify(comp_one_grp ,
391
392
f = f , quo = quo ,
@@ -396,12 +397,12 @@ epi_archive =
396
397
dplyr :: ungroup()
397
398
})
398
399
}
399
-
400
+
400
401
# Unnest if we need to
401
402
if (! as_list_col ) {
402
403
x = tidyr :: unnest(x , !! new_col , names_sep = names_sep )
403
404
}
404
-
405
+
405
406
# Join to get all rows, if we need to, then return
406
407
if (all_rows ) {
407
408
cols = c(as.character(group_by ), " time_value" )
@@ -412,7 +413,7 @@ epi_archive =
412
413
}
413
414
)
414
415
)
415
-
416
+
416
417
# ' Convert to `epi_archive` format
417
418
# '
418
419
# ' Converts a data frame, data table, or tibble into an `epi_archive`
@@ -448,15 +449,15 @@ epi_archive =
448
449
# '
449
450
# ' @export
450
451
as_epi_archive = function (x , geo_type , time_type , other_keys ,
451
- additional_metadata = list ()) {
452
- epi_archive $ new(x , geo_type , time_type , other_keys , additional_metadata )
452
+ additional_metadata = list ()) {
453
+ epi_archive $ new(x , geo_type , time_type , other_keys , additional_metadata )
453
454
}
454
455
455
456
# ' Test for `epi_archive` format
456
457
# '
457
458
# ' @param x An object.
458
459
# ' @return `TRUE` if the object inherits from `epi_archive`.
459
- # '
460
+ # '
460
461
# ' @export
461
462
is_epi_archive = function (x ) {
462
463
inherits(x , " epi_archive" )
0 commit comments