@@ -58,7 +58,10 @@ def check_last_updated(socrata_token, dataset_id, logger):
58
58
def pull_data (socrata_token : str , dataset_id : str , backup_dir : str , logger ):
59
59
"""Pull data from Socrata API."""
60
60
client = Socrata ("data.cdc.gov" , socrata_token )
61
- logger .info ("Pulling data from Socrata API" )
61
+ logger .info (
62
+ f"Pulling { 'main' if dataset_id == MAIN_DATASET_ID else 'preliminary' } data from Socrata API" ,
63
+ dataset_id = dataset_id ,
64
+ )
62
65
results = []
63
66
offset = 0
64
67
limit = 50000 # maximum limit allowed by SODA 2.0
@@ -80,7 +83,8 @@ def pull_data(socrata_token: str, dataset_id: str, backup_dir: str, logger):
80
83
81
84
if results :
82
85
df = pd .DataFrame .from_records (results )
83
- create_backup_csv (df , backup_dir , False , logger = logger )
86
+ sensor = "prelim" if dataset_id == PRELIM_DATASET_ID else None
87
+ create_backup_csv (df , backup_dir , False , sensor = sensor , logger = logger )
84
88
else :
85
89
df = pd .DataFrame ()
86
90
return df
@@ -120,6 +124,7 @@ def pull_nhsn_data(
120
124
backup_dir : str ,
121
125
custom_run : bool ,
122
126
issue_date : Optional [str ],
127
+ preliminary : bool = False ,
123
128
logger : Optional [logging .Logger ] = None ,
124
129
):
125
130
"""Pull the latest NHSN hospital admission data, and conforms it into a dataset.
@@ -137,6 +142,10 @@ def pull_nhsn_data(
137
142
Directory to which to save raw backup data
138
143
custom_run: bool
139
144
Flag indicating if the current run is a patch. If so, don't save any data to disk
145
+ preliminary: bool
146
+ Flag indicating if the grabbing main or preliminary data
147
+ issue_date:
148
+ date to indicate which backup file to pull for patching
140
149
logger: Optional[logging.Logger]
141
150
logger object
142
151
@@ -145,22 +154,26 @@ def pull_nhsn_data(
145
154
pd.DataFrame
146
155
Dataframe as described above.
147
156
"""
157
+ dataset_id = PRELIM_DATASET_ID if preliminary else MAIN_DATASET_ID
148
158
# Pull data from Socrata API
149
159
df = (
150
- pull_data (socrata_token , MAIN_DATASET_ID , backup_dir , logger )
160
+ pull_data (socrata_token , dataset_id , backup_dir , logger )
151
161
if not custom_run
152
- else pull_data_from_file (backup_dir , issue_date , logger , prelim_flag = False )
162
+ else pull_data_from_file (backup_dir , issue_date , logger , prelim_flag = preliminary )
153
163
)
154
164
155
- recently_updated = True if custom_run else check_last_updated (socrata_token , MAIN_DATASET_ID , logger )
165
+ recently_updated = True if custom_run else check_last_updated (socrata_token , dataset_id , logger )
166
+
167
+ type_dict = PRELIM_TYPE_DICT if preliminary else TYPE_DICT
168
+ keep_columns = list (type_dict .keys ())
169
+ filtered_type_dict = copy .deepcopy (type_dict )
156
170
157
- keep_columns = list ( TYPE_DICT . keys ())
171
+ signal_map = PRELIM_SIGNALS_MAP if preliminary else SIGNALS_MAP
158
172
159
173
if not df .empty and recently_updated :
160
174
df = df .rename (columns = {"weekendingdate" : "timestamp" , "jurisdiction" : "geo_id" })
161
- filtered_type_dict = copy .deepcopy (TYPE_DICT )
162
175
163
- for signal , col_name in SIGNALS_MAP .items ():
176
+ for signal , col_name in signal_map .items ():
164
177
# older backups don't have certain columns
165
178
try :
166
179
df [signal ] = df [col_name ]
@@ -178,66 +191,3 @@ def pull_nhsn_data(
178
191
df = pd .DataFrame (columns = keep_columns )
179
192
180
193
return df
181
-
182
-
183
- def pull_preliminary_nhsn_data (
184
- socrata_token : str ,
185
- backup_dir : str ,
186
- custom_run : bool ,
187
- issue_date : Optional [str ],
188
- logger : Optional [logging .Logger ] = None ,
189
- ):
190
- """Pull the latest preliminary NHSN hospital admission data, and conforms it into a dataset.
191
-
192
- The output dataset has:
193
-
194
- - Each row corresponds to a single observation
195
- - Each row additionally has columns for the signals in SIGNALS
196
-
197
- Parameters
198
- ----------
199
- socrata_token: str
200
- My App Token for pulling the NHSN data
201
- backup_dir: str
202
- Directory to which to save raw backup data
203
- custom_run: bool
204
- Flag indicating if the current run is a patch. If so, don't save any data to disk
205
- logger: Optional[logging.Logger]
206
- logger object
207
-
208
- Returns
209
- -------
210
- pd.DataFrame
211
- Dataframe as described above.
212
- """
213
- # Pull data from Socrata API
214
- df = (
215
- pull_data (socrata_token , PRELIM_DATASET_ID , backup_dir , logger )
216
- if not custom_run
217
- else pull_data_from_file (backup_dir , issue_date , logger , prelim_flag = True )
218
- )
219
-
220
- keep_columns = list (PRELIM_TYPE_DICT .keys ())
221
- recently_updated = True if custom_run else check_last_updated (socrata_token , PRELIM_DATASET_ID , logger )
222
-
223
- if not df .empty and recently_updated :
224
- df = df .rename (columns = {"weekendingdate" : "timestamp" , "jurisdiction" : "geo_id" })
225
- filtered_type_dict = copy .deepcopy (PRELIM_TYPE_DICT )
226
-
227
- for signal , col_name in PRELIM_SIGNALS_MAP .items ():
228
- try :
229
- df [signal ] = df [col_name ]
230
- except KeyError :
231
- logger .info ("column not available in data" , col_name = col_name , signal = signal )
232
- keep_columns .remove (signal )
233
- del filtered_type_dict [signal ]
234
-
235
- df = df [keep_columns ]
236
- df = df .astype (filtered_type_dict )
237
-
238
- df ["geo_id" ] = df ["geo_id" ].str .lower ()
239
- df .loc [df ["geo_id" ] == "usa" , "geo_id" ] = "us"
240
- else :
241
- df = pd .DataFrame (columns = keep_columns )
242
-
243
- return df
0 commit comments