|
14 | 14 | from pathlib import Path
|
15 | 15 |
|
16 | 16 | # third party
|
| 17 | +import dask.dataframe as dd |
17 | 18 | import numpy as np
|
18 | 19 | import pandas as pd
|
19 | 20 |
|
| 21 | + |
20 | 22 | # first party
|
21 | 23 | from delphi_utils import Weekday
|
22 | 24 | from .config import Config
|
@@ -89,22 +91,31 @@ def update_sensor(
|
89 | 91 | # value cols: Denominator, Covid_like, Flu_like, Flu1, Mixed
|
90 | 92 | filename = Path(filepath).name
|
91 | 93 | data = pd.read_csv(
|
| 94 | + ddata = dd.read_csv( |
92 | 95 | filepath,
|
| 96 | + compression="gzip", |
93 | 97 | dtype=Config.DTYPES,
|
| 98 | + blocksize=None, |
94 | 99 | )
|
95 |
| - logger.info(f"Starting processing {filename} ") |
96 |
| - data.rename(columns=Config.DEVIANT_COLS_MAP, inplace=True) |
97 |
| - data = data[Config.FILT_COLS] |
98 |
| - data[Config.DATE_COL] = data[Config.DATE_COL].apply(pd.to_datetime) |
99 |
| - logger.info(f"finished processing {filename} ") |
100 |
| - assert ( |
101 |
| - np.sum(data.duplicated(subset=Config.ID_COLS)) == 0 |
102 |
| - ), "Duplicated data! Check the input file" |
103 | 100 |
|
104 |
| - # drop HRR columns - unused for now since we assign HRRs by FIPS |
105 |
| - data.drop(columns=Config.HRR_COLS, inplace=True) |
106 |
| - data.dropna(inplace=True) # drop rows with any missing entries |
| 101 | + ddata = ddata.dropna() |
| 102 | + ddata = ddata.rename(columns=Config.DEVIANT_COLS_MAP) |
| 103 | + ddata = ddata[Config.FILT_COLS] |
| 104 | + |
| 105 | + |
| 106 | + data = ddata.compute() |
107 | 107 |
|
| 108 | + # data.dropna(inplace=True) # drop rows with any missing entries |
| 109 | + |
| 110 | + # data.columns = data.columns.to_series().replace(Config.DEVIANT_COLS_MAP) |
| 111 | + # |
| 112 | + # data = data[Config.FILT_COLS] |
| 113 | + # |
| 114 | + # # drop HRR columns - unused for now since we assign HRRs by FIPS |
| 115 | + # data.drop(columns=Config.HRR_COLS, inplace=True) |
| 116 | + # data.dropna(inplace=True) # drop rows with any missing entries |
| 117 | + |
| 118 | + data[Config.DATE_COL] = data[Config.DATE_COL].apply(pd.to_datetime) |
108 | 119 | # aggregate age groups (so data is unique by service date and FIPS)
|
109 | 120 | data = data.groupby([Config.DATE_COL, Config.GEO_COL]).sum(numeric_only=True).reset_index()
|
110 | 121 | assert np.sum(data.duplicated()) == 0, "Duplicates after age group aggregation"
|
|
0 commit comments