Skip to content

Commit dfc3be2

Browse files
committed
organizing code
1 parent 1394d3d commit dfc3be2

File tree

2 files changed

+98
-84
lines changed

2 files changed

+98
-84
lines changed
Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
import dask.dataframe as dd
2+
from datetime import datetime
3+
import numpy as np
4+
import pandas as pd
5+
from pathlib import Path
6+
7+
from .config import Config
8+
9+
10+
def write_to_csv(output_df: pd.DataFrame, geo_level: str, se:bool, out_name: str, logger, output_path="."):
11+
"""Write sensor values to csv.
12+
13+
Args:
14+
output_dict: dictionary containing sensor rates, se, unique dates, and unique geo_id
15+
geo_level: geographic resolution, one of ["county", "state", "msa", "hrr", "nation", "hhs"]
16+
se: boolean to write out standard errors, if true, use an obfuscated name
17+
out_name: name of the output file
18+
output_path: outfile path to write the csv (default is current directory)
19+
"""
20+
if se:
21+
logger.info(f"========= WARNING: WRITING SEs TO {out_name} =========")
22+
23+
out_n = 0
24+
for d in set(output_df["date"]):
25+
filename = "%s/%s_%s_%s.csv" % (output_path,
26+
(d + Config.DAY_SHIFT).strftime("%Y%m%d"),
27+
geo_level,
28+
out_name)
29+
single_date_df = output_df[output_df["date"] == d]
30+
with open(filename, "w") as outfile:
31+
outfile.write("geo_id,val,se,direction,sample_size\n")
32+
33+
for line in single_date_df.itertuples():
34+
geo_id = line.geo_id
35+
sensor = 100 * line.val # report percentages
36+
se_val = 100 * line.se
37+
assert not np.isnan(sensor), "sensor value is nan, check pipeline"
38+
assert sensor < 90, f"strangely high percentage {geo_id, sensor}"
39+
if not np.isnan(se_val):
40+
assert se_val < 5, f"standard error suspiciously high! investigate {geo_id}"
41+
42+
if se:
43+
assert sensor > 0 and se_val > 0, "p=0, std_err=0 invalid"
44+
outfile.write(
45+
"%s,%f,%s,%s,%s\n" % (geo_id, sensor, se_val, "NA", "NA"))
46+
else:
47+
# for privacy reasons we will not report the standard error
48+
outfile.write(
49+
"%s,%f,%s,%s,%s\n" % (geo_id, sensor, "NA", "NA", "NA"))
50+
out_n += 1
51+
logger.debug(f"wrote {out_n} rows for {geo_level}")
52+
53+
54+
#TODO clean the date params
55+
def csv_to_df(filepath: str, startdate: datetime, enddate: datetime, dropdate: datetime, logger) -> pd.DataFrame:
56+
'''
57+
Reads csv using Dask and filters out based on date range and currently unused column,
58+
then converts back into pandas dataframe.
59+
Parameters
60+
----------
61+
filepath: path to the aggregated doctor-visits data
62+
startdate: first sensor date (YYYY-mm-dd)
63+
enddate: last sensor date (YYYY-mm-dd)
64+
dropdate: data drop date (YYYY-mm-dd)
65+
66+
Returns
67+
-------
68+
cleaned dataframe
69+
'''
70+
filename = Path(filepath).name
71+
logger.info(f"Processing {filename}")
72+
ddata = dd.read_csv(
73+
filepath,
74+
compression="gzip",
75+
dtype=Config.DTYPES,
76+
blocksize=None,
77+
)
78+
79+
ddata = ddata.dropna()
80+
ddata = ddata.rename(columns=Config.DEVIANT_COLS_MAP)
81+
ddata = ddata[Config.FILT_COLS]
82+
ddata[Config.DATE_COL] = dd.to_datetime(ddata[Config.DATE_COL])
83+
84+
# restrict to training start and end date
85+
startdate = startdate - Config.DAY_SHIFT
86+
87+
assert startdate > Config.FIRST_DATA_DATE, "Start date <= first day of data"
88+
assert startdate < enddate, "Start date >= end date"
89+
assert enddate <= dropdate, "End date > drop date"
90+
91+
date_filter = ((ddata[Config.DATE_COL] >= Config.FIRST_DATA_DATE) & (ddata[Config.DATE_COL] < dropdate))
92+
93+
df = ddata[date_filter].compute()
94+
logger.info(f"Done processing {filename}")
95+
return df

doctor_visits/delphi_doctor_visits/update_sensor.py

Lines changed: 3 additions & 84 deletions
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,8 @@
1111
# standard packages
1212
from datetime import timedelta, datetime
1313
from multiprocessing import Pool, cpu_count
14+
1415
# third party
15-
import dask.dataframe as dd
1616
import numpy as np
1717
import pandas as pd
1818

@@ -21,91 +21,10 @@
2121
from delphi_utils import Weekday
2222
from .config import Config
2323
from .geo_maps import GeoMaps
24+
from .process_data import csv_to_df
2425
from .sensor import DoctorVisitsSensor
2526

2627

27-
def write_to_csv(output_df: pd.DataFrame, geo_level, se, out_name, logger, output_path="."):
28-
"""Write sensor values to csv.
29-
30-
Args:
31-
output_dict: dictionary containing sensor rates, se, unique dates, and unique geo_id
32-
se: boolean to write out standard errors, if true, use an obfuscated name
33-
out_name: name of the output file
34-
output_path: outfile path to write the csv (default is current directory)
35-
"""
36-
if se:
37-
logger.info(f"========= WARNING: WRITING SEs TO {out_name} =========")
38-
39-
out_n = 0
40-
for d in set(output_df["date"]):
41-
filename = "%s/%s_%s_%s.csv" % (output_path,
42-
(d + Config.DAY_SHIFT).strftime("%Y%m%d"),
43-
geo_level,
44-
out_name)
45-
single_date_df = output_df[output_df["date"] == d]
46-
with open(filename, "w") as outfile:
47-
outfile.write("geo_id,val,se,direction,sample_size\n")
48-
49-
for line in single_date_df.itertuples():
50-
geo_id = line.geo_id
51-
sensor = 100 * line.val # report percentages
52-
se_val = 100 * line.se
53-
assert not np.isnan(sensor), "sensor value is nan, check pipeline"
54-
assert sensor < 90, f"strangely high percentage {geo_id, sensor}"
55-
if not np.isnan(se_val):
56-
assert se_val < 5, f"standard error suspiciously high! investigate {geo_id}"
57-
58-
if se:
59-
assert sensor > 0 and se_val > 0, "p=0, std_err=0 invalid"
60-
outfile.write(
61-
"%s,%f,%s,%s,%s\n" % (geo_id, sensor, se_val, "NA", "NA"))
62-
else:
63-
# for privacy reasons we will not report the standard error
64-
outfile.write(
65-
"%s,%f,%s,%s,%s\n" % (geo_id, sensor, "NA", "NA", "NA"))
66-
out_n += 1
67-
logger.debug(f"wrote {out_n} rows for {geo_level}")
68-
69-
#TODO clean the date params
70-
def process_csv(filepath: str, startdate: datetime, enddate: datetime, dropdate: datetime) -> pd.DataFrame:
71-
'''
72-
Reads csv using Dask and filters out based on date range and currently unused column,
73-
then converts back into pandas dataframe.
74-
Parameters
75-
----------
76-
filepath: path to the aggregated doctor-visits data
77-
startdate: first sensor date (YYYY-mm-dd)
78-
enddate: last sensor date (YYYY-mm-dd)
79-
dropdate: data drop date (YYYY-mm-dd)
80-
81-
Returns
82-
-------
83-
cleaned dataframe
84-
'''
85-
86-
ddata = dd.read_csv(
87-
filepath,
88-
compression="gzip",
89-
dtype=Config.DTYPES,
90-
blocksize=None,
91-
)
92-
93-
ddata = ddata.dropna()
94-
ddata = ddata.rename(columns=Config.DEVIANT_COLS_MAP)
95-
ddata = ddata[Config.FILT_COLS]
96-
ddata[Config.DATE_COL] = dd.to_datetime(ddata[Config.DATE_COL])
97-
98-
# restrict to training start and end date
99-
startdate = startdate - Config.DAY_SHIFT
100-
101-
assert startdate > Config.FIRST_DATA_DATE, "Start date <= first day of data"
102-
assert startdate < enddate, "Start date >= end date"
103-
assert enddate <= dropdate, "End date > drop date"
104-
105-
date_filter = ((ddata[Config.DATE_COL] >= Config.FIRST_DATA_DATE) & (ddata[Config.DATE_COL] < dropdate))
106-
107-
return ddata[date_filter].compute()
108-
10928
def update_sensor(
11029
filepath:str, startdate:datetime, enddate:datetime, dropdate:datetime, geo:str, parallel: bool,
11130
weekday:bool, se:bool, logger
@@ -123,7 +42,7 @@ def update_sensor(
12342
se: boolean to write out standard errors, if true, use an obfuscated name
12443
logger: the structured logger
12544
"""
126-
data = process_csv(filepath, startdate, enddate, dropdate)
45+
data = csv_to_df(filepath, startdate, enddate, dropdate, logger)
12746

12847
# aggregate age groups (so data is unique by service date and FIPS)
12948
data = data.groupby([Config.DATE_COL, Config.GEO_COL]).sum(numeric_only=True).reset_index()

0 commit comments

Comments
 (0)