Skip to content

Commit e07c697

Browse files
committed
only procesing once and passing along the dataframe
1 parent dfc3be2 commit e07c697

File tree

4 files changed

+32
-9
lines changed

4 files changed

+32
-9
lines changed

doctor_visits/delphi_doctor_visits/process_data.py

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -63,9 +63,7 @@ def csv_to_df(filepath: str, startdate: datetime, enddate: datetime, dropdate: d
6363
enddate: last sensor date (YYYY-mm-dd)
6464
dropdate: data drop date (YYYY-mm-dd)
6565
66-
Returns
6766
-------
68-
cleaned dataframe
6967
'''
7068
filename = Path(filepath).name
7169
logger.info(f"Processing {filename}")

doctor_visits/delphi_doctor_visits/run.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,8 @@
1414
from delphi_utils import get_structured_logger
1515

1616
# first party
17-
from .update_sensor import update_sensor, write_to_csv
17+
from .update_sensor import update_sensor
18+
from .process_data import csv_to_df, write_to_csv
1819
from .download_claims_ftp_files import download
1920
from .get_latest_claims_name import get_latest_filename
2021

@@ -85,6 +86,8 @@ def run_module(params): # pylint: disable=too-many-statements
8586
## geographies
8687
geos = ["state", "msa", "hrr", "county", "hhs", "nation"]
8788

89+
claims_df = csv_to_df(claims_file, startdate_dt, enddate_dt, dropdate_dt, logger)
90+
8891
## print out other vars
8992
logger.info("outpath:\t\t%s", export_dir)
9093
logger.info("parallel:\t\t%s", params["indicator"]["parallel"])
@@ -102,7 +105,7 @@ def run_module(params): # pylint: disable=too-many-statements
102105
else:
103106
logger.info("starting %s, no adj", geo)
104107
sensor = update_sensor(
105-
filepath=claims_file,
108+
data=claims_df,
106109
startdate=startdate_dt,
107110
enddate=enddate_dt,
108111
dropdate=dropdate_dt,

doctor_visits/delphi_doctor_visits/update_sensor.py

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -15,24 +15,24 @@
1515
# third party
1616
import numpy as np
1717
import pandas as pd
18+
import dask.dataframe as dd
1819

1920

2021
# first party
2122
from delphi_utils import Weekday
2223
from .config import Config
2324
from .geo_maps import GeoMaps
24-
from .process_data import csv_to_df
2525
from .sensor import DoctorVisitsSensor
2626

2727

2828
def update_sensor(
29-
filepath:str, startdate:datetime, enddate:datetime, dropdate:datetime, geo:str, parallel: bool,
29+
data:pd.DataFrame, startdate:datetime, enddate:datetime, dropdate:datetime, geo:str, parallel: bool,
3030
weekday:bool, se:bool, logger
3131
):
3232
"""Generate sensor values.
3333
3434
Args:
35-
filepath: path to the aggregated doctor-visits data
35+
data: dataframe of the cleaned claims file
3636
startdate: first sensor date (YYYY-mm-dd)
3737
enddate: last sensor date (YYYY-mm-dd)
3838
dropdate: data drop date (YYYY-mm-dd)
@@ -42,8 +42,6 @@ def update_sensor(
4242
se: boolean to write out standard errors, if true, use an obfuscated name
4343
logger: the structured logger
4444
"""
45-
data = csv_to_df(filepath, startdate, enddate, dropdate, logger)
46-
4745
# aggregate age groups (so data is unique by service date and FIPS)
4846
data = data.groupby([Config.DATE_COL, Config.GEO_COL]).sum(numeric_only=True).reset_index()
4947
assert np.sum(data.duplicated()) == 0, "Duplicates after age group aggregation"
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
"""Tests for update_sensor.py."""
2+
import logging
3+
import pandas as pd
4+
5+
from delphi_doctor_visits.update_sensor import update_sensor
6+
7+
TEST_LOGGER = logging.getLogger()
8+
9+
class TestProcessData:
10+
def test_csv_to_df(self):
11+
actual = update_sensor(
12+
filepath="./test_data/SYNEDI_AGG_OUTPATIENT_07022020_1455CDT.csv.gz",
13+
startdate="2020-02-04",
14+
enddate="2020-02-05",
15+
dropdate="2020-02-06",
16+
geo="state",
17+
parallel=False,
18+
weekday=False,
19+
se=False,
20+
logger=TEST_LOGGER,
21+
)
22+
23+
comparison = pd.read_csv("./comparison/update_sensor/all.csv", parse_dates=["date"])
24+
pd.testing.assert_frame_equal(actual.reset_index(drop=True), comparison)

0 commit comments

Comments
 (0)