@@ -14,6 +14,7 @@ suppressPackageStartupMessages({
14
14
library(dplyr )
15
15
library(readr )
16
16
library(purrr )
17
+ library(delphiFacebook )
17
18
})
18
19
19
20
@@ -28,7 +29,6 @@ suppressPackageStartupMessages({
28
29
# ' open. By default, selects all `.csv` files with standard table date prefix.
29
30
run_rollup <- function (input_dir , output_dir , pattern = " ^[0-9]{8}_[0-9]{8}.*[.]csv$" ) {
30
31
files <- list.files(input_dir , pattern = pattern )
31
-
32
32
if (length(files ) == 0 ) {
33
33
stop(" No matching data files." )
34
34
}
@@ -38,12 +38,21 @@ run_rollup <- function(input_dir, output_dir, pattern = "^[0-9]{8}_[0-9]{8}.*[.]
38
38
# Reformat files as a list such that input files with same grouping variables
39
39
# (and thus same output file) are in a character vector named with the output
40
40
# file.
41
- files <- lapply(split(files , files $ rollupname ), function (x ) {x $ filename })
41
+ files <- lapply(split(files , files $ rollup_name ), function (x ) {x $ filename })
42
+
43
+ if (! dir.exists(output_dir )) { dir.create(output_dir ) }
44
+ seen_file <- file.path(output_dir , " seen.txt" )
45
+ seen_files <- load_seen_file(seen_file )
42
46
43
47
for (output_name in names(files )) {
44
- combine_and_save_tables(
45
- file.path(input_dir , files [[output_name ]]),
48
+ browser
49
+ newly_seen_files <- combine_and_save_tables(
50
+ seen_files ,
51
+ input_dir ,
52
+ files [[output_name ]],
46
53
file.path(output_dir , output_name ))
54
+ browser()
55
+ write(newly_seen_files , seen_file , append = TRUE )
47
56
}
48
57
49
58
return (NULL )
@@ -55,24 +64,40 @@ get_file_properties <- function(filename) {
55
64
parts <- strsplit(short , " _" , fixed = TRUE )[[1 ]]
56
65
57
66
group <- parts [3 : length(parts )]
58
- # Specify compression format in name, to be parsed by `write_csv` later.
59
- partialname <- paste0(paste0(group , collapse = " _" ), " .csv.gz" )
67
+ # Specify compression format via name, to be parsed by `write_csv` later.
68
+ partial_name <- paste0(paste0(group , collapse = " _" ), " .csv.gz" )
60
69
61
70
return (data.frame (
62
71
filename = filename ,
63
- rollupname = partialname ))
72
+ rollup_name = partial_name ))
73
+ }
74
+
75
+ # # Helper function to load "seen" file.
76
+ load_seen_file <- function (seen_file ) {
77
+ if (! file.exists(seen_file )) {
78
+ file.create(seen_file )
79
+ }
80
+
81
+ seen_files <- readLines(seen_file )
82
+ return (seen_files )
64
83
}
65
84
66
85
# ' Combine set of input files with existing output file, and save to disk.
67
86
# '
68
- # ' If a date range has been seen before, the input and output data are
87
+ # ' If an input filename has been seen before, the input and output data are
69
88
# ' deduplicated to use the newer set of data. Output is saved in gzip-compressed
70
89
# ' format.
71
90
# '
91
+ # ' @param seen_files Vector of filenames that have been previously loaded into
92
+ # ' an output file.
93
+ # ' @param input_dir Directory in which to look for survey CSV files, relative to
94
+ # ' the current working directory.
72
95
# ' @param input_files Vector of paths to input files that share a set of
73
96
# ' grouping variables.
74
97
# ' @param output_file Path to corresponding output file.
75
- combine_and_save_tables <- function (input_files , output_file ) {
98
+ # '
99
+ # ' @return Character vector of newly-seen filenames.
100
+ combine_and_save_tables <- function (seen_files , input_dir , input_files , output_file ) {
76
101
cols <- cols(
77
102
.default = col_guess(),
78
103
survey_geo = col_character(),
@@ -90,48 +115,60 @@ combine_and_save_tables <- function(input_files, output_file) {
90
115
county_fips = col_character()
91
116
)
92
117
118
+ # Get input data.
93
119
input_df <- map_dfr(
94
- input_files ,
120
+ file.path( input_dir , input_files ) ,
95
121
function (f ) {
96
122
read_csv(f , col_types = cols )
97
123
}
98
124
)
99
125
100
- if (! file.exists(output_file )) {
101
- warning(paste0(" Output file " , output_file , " does not exist. Creating a new copy." ))
102
- # Create an empty starting df with the expected column names, order, and type.
103
- output_df <- input_df [FALSE ,]
104
- } else {
105
- output_df <- read_csv(output_file , col_types = cols )
126
+ if (file.exists(output_file )) {
127
+ output_names <- names(read_csv(output_file , n_max = 0L ))
128
+ assert(identical(output_names , names(input_df )),
129
+ paste0(" Column names and/or order differ between new and old input for " , output_file ))
106
130
}
107
131
108
- # For finding unique group/geo-level/date combinations, use all columns up to
109
- # the first "val" column. This generalizes the process of finding unique rows,
110
- # when we might be using different grouping variables or different geo levels
111
- # (county/state/nation appear in different columns).
112
- group_names <- names(output_df )
113
- group_names <- group_names [ 1 : min(which(startsWith(group_names , " val_" )))- 1 ]
132
+ # If no input files have been seen before, we can append directly to the
133
+ # output file without needing to deduplicate. File is created if it doesn't
134
+ # already exist.
135
+ any_prev_seen <- any(input_files %in% seen_files )
114
136
115
- # # Deduplicate, keeping newest version by issue date of each unique row.
116
- # Merge the new data with the existing data, taking the last issue date for
117
- # any given grouping/geo level/date combo. This prevents duplication in case
118
- # of reissues. Note that the order matters: since arrange() uses order(),
119
- # which is a stable sort, ties will result in the input data being used in
120
- # preference over the existing rollup data.
121
- output_df <- bind_rows(output_df , input_df ) %> %
122
- arrange(issue_date ) %> %
123
- group_by(across(all_of(group_names ))) %> %
124
- slice_tail() %> %
125
- ungroup()
126
-
127
- # Automatically uses gzip compression based on output name.
128
- write_csv(output_df , output_file )
137
+ if (! any_prev_seen ) {
138
+ write_csv(input_df , output_file , append = file.exists(output_file ))
139
+ } else {
140
+ assert(file.exists(output_file ),
141
+ paste0(" The output file " , output_file , " does not exist, but " ,
142
+ " non-zero files using the same grouping have been seen before." ))
143
+
144
+ output_df <- read_csv(output_file , col_types = cols )
145
+
146
+ # Use all columns up to the first "val" column to find unique rows.
147
+ group_names <- names(output_df )
148
+ ind_first_val_col <- min(which(startsWith(group_names , " val_" )))
149
+ group_names <- group_names [ 1 : ind_first_val_col - 1 ]
150
+
151
+ # # Deduplicate, keeping newest version by issue date of each unique row.
152
+ # Merge the new data with the existing data, taking the last issue date for
153
+ # any given grouping/geo level/date combo. This prevents duplication in case
154
+ # of reissues. Note that the order matters: since arrange() uses order(),
155
+ # which is a stable sort, ties will result in the input data being used in
156
+ # preference over the existing rollup data.
157
+ output_df <- bind_rows(output_df , input_df ) %> %
158
+ arrange(issue_date ) %> %
159
+ group_by(across(all_of(group_names ))) %> %
160
+ slice_tail() %> %
161
+ ungroup()
162
+
163
+ # Automatically uses gzip compression based on output file name.
164
+ write_csv(output_df , output_file )
165
+ }
129
166
130
- return (NULL )
167
+ newly_seen <- setdiff(input_files , seen_files )
168
+ return (newly_seen )
131
169
}
132
170
133
171
134
-
135
172
args <- commandArgs(TRUE )
136
173
137
174
if (length(args ) < 2 ) {
0 commit comments