-
Notifications
You must be signed in to change notification settings - Fork 16
Optimize with dask #1981
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Optimize with dask #1981
Changes from 5 commits
aacc545
dbde5c7
1394d3d
dfc3be2
e07c697
d1ee4ce
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,93 @@ | ||
import dask.dataframe as dd | ||
from datetime import datetime | ||
import numpy as np | ||
import pandas as pd | ||
from pathlib import Path | ||
|
||
from .config import Config | ||
|
||
|
||
def write_to_csv(output_df: pd.DataFrame, geo_level: str, se:bool, out_name: str, logger, output_path="."): | ||
"""Write sensor values to csv. | ||
|
||
Args: | ||
output_dict: dictionary containing sensor rates, se, unique dates, and unique geo_id | ||
geo_level: geographic resolution, one of ["county", "state", "msa", "hrr", "nation", "hhs"] | ||
se: boolean to write out standard errors, if true, use an obfuscated name | ||
out_name: name of the output file | ||
output_path: outfile path to write the csv (default is current directory) | ||
""" | ||
if se: | ||
logger.info(f"========= WARNING: WRITING SEs TO {out_name} =========") | ||
|
||
out_n = 0 | ||
for d in set(output_df["date"]): | ||
filename = "%s/%s_%s_%s.csv" % (output_path, | ||
(d + Config.DAY_SHIFT).strftime("%Y%m%d"), | ||
geo_level, | ||
out_name) | ||
single_date_df = output_df[output_df["date"] == d] | ||
with open(filename, "w") as outfile: | ||
outfile.write("geo_id,val,se,direction,sample_size\n") | ||
|
||
for line in single_date_df.itertuples(): | ||
geo_id = line.geo_id | ||
sensor = 100 * line.val # report percentages | ||
se_val = 100 * line.se | ||
assert not np.isnan(sensor), "sensor value is nan, check pipeline" | ||
assert sensor < 90, f"strangely high percentage {geo_id, sensor}" | ||
if not np.isnan(se_val): | ||
assert se_val < 5, f"standard error suspiciously high! investigate {geo_id}" | ||
|
||
if se: | ||
assert sensor > 0 and se_val > 0, "p=0, std_err=0 invalid" | ||
outfile.write( | ||
"%s,%f,%s,%s,%s\n" % (geo_id, sensor, se_val, "NA", "NA")) | ||
else: | ||
# for privacy reasons we will not report the standard error | ||
outfile.write( | ||
"%s,%f,%s,%s,%s\n" % (geo_id, sensor, "NA", "NA", "NA")) | ||
out_n += 1 | ||
logger.debug(f"wrote {out_n} rows for {geo_level}") | ||
|
||
|
||
#TODO clean the date params | ||
def csv_to_df(filepath: str, startdate: datetime, enddate: datetime, dropdate: datetime, logger) -> pd.DataFrame: | ||
''' | ||
Reads csv using Dask and filters out based on date range and currently unused column, | ||
then converts back into pandas dataframe. | ||
Parameters | ||
---------- | ||
filepath: path to the aggregated doctor-visits data | ||
startdate: first sensor date (YYYY-mm-dd) | ||
enddate: last sensor date (YYYY-mm-dd) | ||
dropdate: data drop date (YYYY-mm-dd) | ||
|
||
------- | ||
''' | ||
filename = Path(filepath).name | ||
logger.info(f"Processing {filename}") | ||
ddata = dd.read_csv( | ||
filepath, | ||
compression="gzip", | ||
dtype=Config.DTYPES, | ||
blocksize=None, | ||
) | ||
|
||
ddata = ddata.dropna() | ||
ddata = ddata.rename(columns=Config.DEVIANT_COLS_MAP) | ||
ddata = ddata[Config.FILT_COLS] | ||
ddata[Config.DATE_COL] = dd.to_datetime(ddata[Config.DATE_COL]) | ||
|
||
# restrict to training start and end date | ||
startdate = startdate - Config.DAY_SHIFT | ||
|
||
assert startdate > Config.FIRST_DATA_DATE, "Start date <= first day of data" | ||
assert startdate < enddate, "Start date >= end date" | ||
assert enddate <= dropdate, "End date > drop date" | ||
|
||
date_filter = ((ddata[Config.DATE_COL] >= Config.FIRST_DATA_DATE) & (ddata[Config.DATE_COL] < dropdate)) | ||
|
||
df = ddata[date_filter].compute() | ||
logger.info(f"Done processing {filename}") | ||
return df |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -9,13 +9,14 @@ | |
""" | ||
|
||
# standard packages | ||
from datetime import timedelta | ||
from datetime import timedelta, datetime | ||
from multiprocessing import Pool, cpu_count | ||
from pathlib import Path | ||
|
||
# third party | ||
import numpy as np | ||
import pandas as pd | ||
import dask.dataframe as dd | ||
|
||
|
||
# first party | ||
from delphi_utils import Weekday | ||
|
@@ -24,57 +25,14 @@ | |
from .sensor import DoctorVisitsSensor | ||
|
||
|
||
def write_to_csv(output_df: pd.DataFrame, geo_level, se, out_name, logger, output_path="."): | ||
"""Write sensor values to csv. | ||
|
||
Args: | ||
output_dict: dictionary containing sensor rates, se, unique dates, and unique geo_id | ||
se: boolean to write out standard errors, if true, use an obfuscated name | ||
out_name: name of the output file | ||
output_path: outfile path to write the csv (default is current directory) | ||
""" | ||
if se: | ||
logger.info(f"========= WARNING: WRITING SEs TO {out_name} =========") | ||
|
||
out_n = 0 | ||
for d in set(output_df["date"]): | ||
filename = "%s/%s_%s_%s.csv" % (output_path, | ||
(d + Config.DAY_SHIFT).strftime("%Y%m%d"), | ||
geo_level, | ||
out_name) | ||
single_date_df = output_df[output_df["date"] == d] | ||
with open(filename, "w") as outfile: | ||
outfile.write("geo_id,val,se,direction,sample_size\n") | ||
|
||
for line in single_date_df.itertuples(): | ||
geo_id = line.geo_id | ||
sensor = 100 * line.val # report percentages | ||
se_val = 100 * line.se | ||
assert not np.isnan(sensor), "sensor value is nan, check pipeline" | ||
assert sensor < 90, f"strangely high percentage {geo_id, sensor}" | ||
if not np.isnan(se_val): | ||
assert se_val < 5, f"standard error suspiciously high! investigate {geo_id}" | ||
|
||
if se: | ||
assert sensor > 0 and se_val > 0, "p=0, std_err=0 invalid" | ||
outfile.write( | ||
"%s,%f,%s,%s,%s\n" % (geo_id, sensor, se_val, "NA", "NA")) | ||
else: | ||
# for privacy reasons we will not report the standard error | ||
outfile.write( | ||
"%s,%f,%s,%s,%s\n" % (geo_id, sensor, "NA", "NA", "NA")) | ||
out_n += 1 | ||
logger.debug(f"wrote {out_n} rows for {geo_level}") | ||
|
||
|
||
def update_sensor( | ||
filepath, startdate, enddate, dropdate, geo, parallel, | ||
weekday, se, logger | ||
data:pd.DataFrame, startdate:datetime, enddate:datetime, dropdate:datetime, geo:str, parallel: bool, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 👍 we should start doing type specification. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yeah but I think that should be a seperate ticket maybe? don't want to have more PR/confusion within already scoped out feature. |
||
weekday:bool, se:bool, logger | ||
): | ||
"""Generate sensor values. | ||
|
||
Args: | ||
filepath: path to the aggregated doctor-visits data | ||
data: dataframe of the cleaned claims file | ||
startdate: first sensor date (YYYY-mm-dd) | ||
enddate: last sensor date (YYYY-mm-dd) | ||
dropdate: data drop date (YYYY-mm-dd) | ||
|
@@ -84,45 +42,14 @@ def update_sensor( | |
se: boolean to write out standard errors, if true, use an obfuscated name | ||
logger: the structured logger | ||
""" | ||
# as of 2020-05-11, input file expected to have 10 columns | ||
# id cols: ServiceDate, PatCountyFIPS, PatAgeGroup, Pat HRR ID/Pat HRR Name | ||
# value cols: Denominator, Covid_like, Flu_like, Flu1, Mixed | ||
filename = Path(filepath).name | ||
data = pd.read_csv( | ||
filepath, | ||
dtype=Config.DTYPES, | ||
) | ||
logger.info(f"Starting processing {filename} ") | ||
data.rename(columns=Config.DEVIANT_COLS_MAP, inplace=True) | ||
data = data[Config.FILT_COLS] | ||
data[Config.DATE_COL] = data[Config.DATE_COL].apply(pd.to_datetime) | ||
logger.info(f"finished processing {filename} ") | ||
assert ( | ||
np.sum(data.duplicated(subset=Config.ID_COLS)) == 0 | ||
), "Duplicated data! Check the input file" | ||
|
||
# drop HRR columns - unused for now since we assign HRRs by FIPS | ||
data.drop(columns=Config.HRR_COLS, inplace=True) | ||
data.dropna(inplace=True) # drop rows with any missing entries | ||
|
||
# aggregate age groups (so data is unique by service date and FIPS) | ||
data = data.groupby([Config.DATE_COL, Config.GEO_COL]).sum(numeric_only=True).reset_index() | ||
assert np.sum(data.duplicated()) == 0, "Duplicates after age group aggregation" | ||
assert (data[Config.COUNT_COLS] >= 0).all().all(), "Counts must be nonnegative" | ||
|
||
## collect dates | ||
# restrict to training start and end date | ||
drange = lambda s, e: np.array([s + timedelta(days=x) for x in range((e - s).days)]) | ||
startdate = pd.to_datetime(startdate) - Config.DAY_SHIFT | ||
burnindate = startdate - Config.DAY_SHIFT | ||
enddate = pd.to_datetime(enddate) | ||
dropdate = pd.to_datetime(dropdate) | ||
assert startdate > Config.FIRST_DATA_DATE, "Start date <= first day of data" | ||
assert startdate < enddate, "Start date >= end date" | ||
assert enddate <= dropdate, "End date > drop date" | ||
data = data[(data[Config.DATE_COL] >= Config.FIRST_DATA_DATE) & \ | ||
(data[Config.DATE_COL] < dropdate)] | ||
fit_dates = drange(Config.FIRST_DATA_DATE, dropdate) | ||
burnindate = startdate - Config.DAY_SHIFT | ||
burn_in_dates = drange(burnindate, dropdate) | ||
sensor_dates = drange(startdate, enddate) | ||
# The ordering of sensor dates corresponds to the order of burn-in dates | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -11,6 +11,7 @@ | |
"pytest-cov", | ||
"pytest", | ||
"scikit-learn", | ||
"dask", | ||
] | ||
|
||
setup( | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,24 @@ | ||
"""Tests for update_sensor.py.""" | ||
import logging | ||
import pandas as pd | ||
|
||
from delphi_doctor_visits.update_sensor import update_sensor | ||
|
||
TEST_LOGGER = logging.getLogger() | ||
|
||
class TestProcessData: | ||
def test_csv_to_df(self): | ||
actual = update_sensor( | ||
filepath="./test_data/SYNEDI_AGG_OUTPATIENT_07022020_1455CDT.csv.gz", | ||
startdate="2020-02-04", | ||
enddate="2020-02-05", | ||
dropdate="2020-02-06", | ||
geo="state", | ||
parallel=False, | ||
weekday=False, | ||
se=False, | ||
logger=TEST_LOGGER, | ||
) | ||
|
||
comparison = pd.read_csv("./comparison/update_sensor/all.csv", parse_dates=["date"]) | ||
pd.testing.assert_frame_equal(actual.reset_index(drop=True), comparison) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nitpick: filepath might be more helpful, as it includes input dir.