9
9
"""
10
10
11
11
# standard packages
12
- from datetime import timedelta
12
+ from datetime import timedelta , datetime
13
13
from multiprocessing import Pool , cpu_count
14
- from pathlib import Path
15
-
16
14
# third party
17
15
import dask .dataframe as dd
18
16
import numpy as np
@@ -68,29 +66,23 @@ def write_to_csv(output_df: pd.DataFrame, geo_level, se, out_name, logger, outpu
68
66
out_n += 1
69
67
logger .debug (f"wrote { out_n } rows for { geo_level } " )
70
68
71
-
72
- def update_sensor (
73
- filepath , startdate , enddate , dropdate , geo , parallel ,
74
- weekday , se , logger
75
- ):
76
- """Generate sensor values.
77
-
78
- Args:
69
+ #TODO clean the date params
70
+ def process_csv (filepath : str , startdate : datetime , enddate : datetime , dropdate : datetime ) -> pd .DataFrame :
71
+ '''
72
+ Reads csv using Dask and filters out based on date range and currently unused column,
73
+ then converts back into pandas dataframe.
74
+ Parameters
75
+ ----------
79
76
filepath: path to the aggregated doctor-visits data
80
77
startdate: first sensor date (YYYY-mm-dd)
81
78
enddate: last sensor date (YYYY-mm-dd)
82
79
dropdate: data drop date (YYYY-mm-dd)
83
- geo: geographic resolution, one of ["county", "state", "msa", "hrr", "nation", "hhs"]
84
- parallel: boolean to run the sensor update in parallel
85
- weekday: boolean to adjust for weekday effects
86
- se: boolean to write out standard errors, if true, use an obfuscated name
87
- logger: the structured logger
88
- """
89
- # as of 2020-05-11, input file expected to have 10 columns
90
- # id cols: ServiceDate, PatCountyFIPS, PatAgeGroup, Pat HRR ID/Pat HRR Name
91
- # value cols: Denominator, Covid_like, Flu_like, Flu1, Mixed
92
- filename = Path (filepath ).name
93
- data = pd .read_csv (
80
+
81
+ Returns
82
+ -------
83
+ cleaned dataframe
84
+ '''
85
+
94
86
ddata = dd .read_csv (
95
87
filepath ,
96
88
compression = "gzip" ,
@@ -101,39 +93,46 @@ def update_sensor(
101
93
ddata = ddata .dropna ()
102
94
ddata = ddata .rename (columns = Config .DEVIANT_COLS_MAP )
103
95
ddata = ddata [Config .FILT_COLS ]
96
+ ddata [Config .DATE_COL ] = dd .to_datetime (ddata [Config .DATE_COL ])
104
97
98
+ # restrict to training start and end date
99
+ startdate = startdate - Config .DAY_SHIFT
105
100
106
- data = ddata .compute ()
101
+ assert startdate > Config .FIRST_DATA_DATE , "Start date <= first day of data"
102
+ assert startdate < enddate , "Start date >= end date"
103
+ assert enddate <= dropdate , "End date > drop date"
107
104
108
- # data.dropna(inplace=True) # drop rows with any missing entries
105
+ date_filter = (( ddata [ Config . DATE_COL ] >= Config . FIRST_DATA_DATE ) & ( ddata [ Config . DATE_COL ] < dropdate ))
109
106
110
- # data.columns = data.columns.to_series().replace(Config.DEVIANT_COLS_MAP)
111
- #
112
- # data = data[Config.FILT_COLS]
113
- #
114
- # # drop HRR columns - unused for now since we assign HRRs by FIPS
115
- # data.drop(columns=Config.HRR_COLS, inplace=True)
116
- # data.dropna(inplace=True) # drop rows with any missing entries
107
+ return ddata [date_filter ].compute ()
108
+
109
+ def update_sensor (
110
+ filepath :str , startdate :datetime , enddate :datetime , dropdate :datetime , geo :str , parallel : bool ,
111
+ weekday :bool , se :bool , logger
112
+ ):
113
+ """Generate sensor values.
114
+
115
+ Args:
116
+ filepath: path to the aggregated doctor-visits data
117
+ startdate: first sensor date (YYYY-mm-dd)
118
+ enddate: last sensor date (YYYY-mm-dd)
119
+ dropdate: data drop date (YYYY-mm-dd)
120
+ geo: geographic resolution, one of ["county", "state", "msa", "hrr", "nation", "hhs"]
121
+ parallel: boolean to run the sensor update in parallel
122
+ weekday: boolean to adjust for weekday effects
123
+ se: boolean to write out standard errors, if true, use an obfuscated name
124
+ logger: the structured logger
125
+ """
126
+ data = process_csv (filepath , startdate , enddate , dropdate )
117
127
118
- data [Config .DATE_COL ] = data [Config .DATE_COL ].apply (pd .to_datetime )
119
128
# aggregate age groups (so data is unique by service date and FIPS)
120
129
data = data .groupby ([Config .DATE_COL , Config .GEO_COL ]).sum (numeric_only = True ).reset_index ()
121
130
assert np .sum (data .duplicated ()) == 0 , "Duplicates after age group aggregation"
122
131
assert (data [Config .COUNT_COLS ] >= 0 ).all ().all (), "Counts must be nonnegative"
123
132
124
- ## collect dates
125
- # restrict to training start and end date
126
133
drange = lambda s , e : np .array ([s + timedelta (days = x ) for x in range ((e - s ).days )])
127
- startdate = pd .to_datetime (startdate ) - Config .DAY_SHIFT
128
- burnindate = startdate - Config .DAY_SHIFT
129
- enddate = pd .to_datetime (enddate )
130
- dropdate = pd .to_datetime (dropdate )
131
- assert startdate > Config .FIRST_DATA_DATE , "Start date <= first day of data"
132
- assert startdate < enddate , "Start date >= end date"
133
- assert enddate <= dropdate , "End date > drop date"
134
- data = data [(data [Config .DATE_COL ] >= Config .FIRST_DATA_DATE ) & \
135
- (data [Config .DATE_COL ] < dropdate )]
136
134
fit_dates = drange (Config .FIRST_DATA_DATE , dropdate )
135
+ burnindate = startdate - Config .DAY_SHIFT
137
136
burn_in_dates = drange (burnindate , dropdate )
138
137
sensor_dates = drange (startdate , enddate )
139
138
# The ordering of sensor dates corresponds to the order of burn-in dates
0 commit comments