diff --git a/_delphi_utils_python/delphi_utils/export.py b/_delphi_utils_python/delphi_utils/export.py index 5a3b804b2..cb96cfa33 100644 --- a/_delphi_utils_python/delphi_utils/export.py +++ b/_delphi_utils_python/delphi_utils/export.py @@ -4,6 +4,7 @@ from os.path import join from typing import Optional +from epiweeks import Week import numpy as np import pandas as pd @@ -16,7 +17,8 @@ def create_export_csv( start_date: Optional[datetime] = None, end_date: Optional[datetime] = None, remove_null_samples: Optional[bool] = False, - write_empty_days: Optional[bool] = False + write_empty_days: Optional[bool] = False, + weekly_dates = False, ): """Export data in the format expected by the Delphi API. @@ -65,10 +67,15 @@ def create_export_csv( dates = pd.date_range(start_date, end_date) for date in dates: + if weekly_dates: + t = Week.fromdate(pd.to_datetime(str(date))) + date_str = "weekly_" + str(t.year) + str(t.week).zfill(2) + else: + date_str = date.strftime('%Y%m%d') if metric is None: - export_filename = f"{date.strftime('%Y%m%d')}_{geo_res}_{sensor}.csv" + export_filename = f"{date_str}_{geo_res}_{sensor}.csv" else: - export_filename = f"{date.strftime('%Y%m%d')}_{geo_res}_{metric}_{sensor}.csv" + export_filename = f"{date_str}_{geo_res}_{metric}_{sensor}.csv" export_file = join(export_dir, export_filename) export_df = df[df["timestamp"] == date][["geo_id", "val", "se", "sample_size",]] if remove_null_samples: diff --git a/_delphi_utils_python/setup.py b/_delphi_utils_python/setup.py index 731d7e957..016263c32 100644 --- a/_delphi_utils_python/setup.py +++ b/_delphi_utils_python/setup.py @@ -7,6 +7,7 @@ required = [ "boto3", "covidcast", + "epiweeks", "freezegun", "gitpython", "mock", diff --git a/nchs_mortality/delphi_nchs_mortality/export.py b/nchs_mortality/delphi_nchs_mortality/export.py deleted file mode 100644 index 47033a50a..000000000 --- a/nchs_mortality/delphi_nchs_mortality/export.py +++ /dev/null @@ -1,36 +0,0 @@ -# -*- coding: utf-8 -*- -"""Function to export the dataset in the format expected of the API.""" -import pandas as pd -from epiweeks import Week - -def export_csv(df, geo_name, sensor, export_dir, start_date): - """Export data set in format expected for ingestion by the API. - - Parameters - ---------- - df: pd.DataFrame - data frame with columns "geo_id", "timestamp", and "val" - geo_name: str - name of the geographic region, such as "state" or "hrr" - sensor: str - name of the sensor; only used for naming the output file - export_dir: str - path to location where the output CSV files to be uploaded should be stored - start_date: datetime.datetime - The first date to report - end_date: datetime.datetime - The last date to report - """ - df = df.copy() - df = df[df["timestamp"] >= start_date] - - dates = df["timestamp"].unique() - for date in dates: - t = Week.fromdate(pd.to_datetime(str(date))) - date_short = "weekly_" + str(t.year) + str(t.week).zfill(2) - export_fn = f"{date_short}_{geo_name}_{sensor}.csv" - result_df = df[df["timestamp"] == date][["geo_id", "val", "se", "sample_size"]] - result_df.to_csv(f"{export_dir}/{export_fn}", - index=False, - float_format="%.8f") - return pd.to_datetime(dates) diff --git a/nchs_mortality/delphi_nchs_mortality/run.py b/nchs_mortality/delphi_nchs_mortality/run.py index 1cf3d36d5..ec5416bb2 100644 --- a/nchs_mortality/delphi_nchs_mortality/run.py +++ b/nchs_mortality/delphi_nchs_mortality/run.py @@ -9,12 +9,11 @@ from typing import Dict, Any import numpy as np -from delphi_utils import S3ArchiveDiffer, get_structured_logger +from delphi_utils import S3ArchiveDiffer, get_structured_logger, create_export_csv from .archive_diffs import arch_diffs from .constants import (METRICS, SENSOR_NAME_MAP, SENSORS, INCIDENCE_BASE, GEO_RES) -from .export import export_csv from .pull import pull_nchs_mortality_data @@ -70,12 +69,13 @@ def run_module(params: Dict[str, Any]): df["sample_size"] = np.nan df = df[~df["val"].isnull()] sensor_name = "_".join([SENSOR_NAME_MAP[metric]]) - dates = export_csv( + dates = create_export_csv( df, - geo_name=GEO_RES, + geo_res=GEO_RES, export_dir=daily_export_dir, start_date=datetime.strptime(export_start_date, "%Y-%m-%d"), sensor=sensor_name, + weekly_dates=True ) if len(dates) > 0: stats.append((max(dates), len(dates))) @@ -93,12 +93,13 @@ def run_module(params: Dict[str, Any]): df["sample_size"] = np.nan df = df[~df["val"].isnull()] sensor_name = "_".join([SENSOR_NAME_MAP[metric], sensor]) - dates = export_csv( + dates = create_export_csv( df, - geo_name=GEO_RES, + geo_res=GEO_RES, export_dir=daily_export_dir, start_date=datetime.strptime(export_start_date, "%Y-%m-%d"), sensor=sensor_name, + weekly_dates=True ) if len(dates) > 0: stats.append((max(dates), len(dates))) diff --git a/nchs_mortality/tests/test_export.py b/nchs_mortality/tests/test_export.py deleted file mode 100644 index c05f287ca..000000000 --- a/nchs_mortality/tests/test_export.py +++ /dev/null @@ -1,51 +0,0 @@ -from datetime import datetime -from os.path import join, exists - -import pandas as pd - -from delphi_nchs_mortality.export import export_csv - - -class TestExport: - def test_export(self): - - # create fake dataset and save in a temporary directory - input_data = pd.DataFrame( - { - "geo_id": ["a", "a", "b", "b", "c", "c"], - "val": [0, 2, 3, 5, 10, 12], - "timestamp": [datetime(2020, 6, 2), datetime(2020, 6, 9)] * 3, - "se": [0.01, 0.02, 0.01, 0.01, 0.005, 0.01], - "sample_size": [100, 200, 500, 50, 80, 10] - } - ) - - export_csv( - input_data, - geo_name = "state", - sensor="region_thing", - export_dir="./receiving", - start_date = datetime(2020, 6, 2), - ) - - # check data for 2020-06-02 - expected_name = "weekly_202023_state_region_thing.csv" - assert exists(join("./receiving", expected_name)) - - output_data = pd.read_csv(join("./receiving", expected_name)) - - assert (output_data.columns == ["geo_id", "val", "se", "sample_size"]).all() - assert (output_data.geo_id == ["a", "b", "c"]).all() - assert (output_data.se.values == [0.01, 0.01, 0.005]).all() - assert (output_data.sample_size.values == [100, 500, 80]).all() - - # check data for 2020-06-03 - expected_name = "weekly_202024_state_region_thing.csv" - assert exists(join("./receiving", expected_name)) - - output_data = pd.read_csv(join("./receiving", expected_name)) - - assert (output_data.columns == ["geo_id", "val", "se", "sample_size"]).all() - assert (output_data.geo_id == ["a", "b", "c"]).all() - assert (output_data.se.values == [0.02, 0.01, 0.01]).all() - assert (output_data.sample_size.values == [200, 50, 10]).all()