diff --git a/google_health/delphi_google_health/__init__.py b/google_health/delphi_google_health/__init__.py index 03e8af200..20f0b7932 100644 --- a/google_health/delphi_google_health/__init__.py +++ b/google_health/delphi_google_health/__init__.py @@ -8,7 +8,7 @@ from __future__ import absolute_import -from . import export +from . import data_tools from . import map_values from . import pull_api from . import run diff --git a/google_health/delphi_google_health/data_tools.py b/google_health/delphi_google_health/data_tools.py new file mode 100644 index 000000000..dec4e932e --- /dev/null +++ b/google_health/delphi_google_health/data_tools.py @@ -0,0 +1,32 @@ +# -*- coding: utf-8 -*- +"""Functions to reformat the data.""" + +import numpy as np +import pandas as pd + +from .smooth import smoothed_values_by_geo_id + +RESCALE_VAL = 4000 / 100 + +def format_for_export(df: pd.DataFrame, smooth: bool): + """Transform data columns of df to match those expected by `delphi_utils.create_export_csv()`. + Parameters + ---------- + df: pd.DataFrame + data frame with columns "geo_id", "timestamp", and "val" + smooth: bool + should the signal in "val" be smoothed? + + Returns + ------- + pd.DataFrame + A data frame with columns "val", "se", and "sample_size". + """ + df = df.copy() + if smooth: + df["val"] = smoothed_values_by_geo_id(df) + + df["val"] /= RESCALE_VAL + df["se"] = np.nan + df["sample_size"] = np.nan + return df diff --git a/google_health/delphi_google_health/export.py b/google_health/delphi_google_health/export.py deleted file mode 100644 index f25759f61..000000000 --- a/google_health/delphi_google_health/export.py +++ /dev/null @@ -1,59 +0,0 @@ -# -*- coding: utf-8 -*- -"""Function to export the dataset in the format expected of the API. -""" -from datetime import datetime - -import numpy as np -import pandas as pd - -from .smooth import smoothed_values_by_geo_id - -RESCALE_VAL = 4000 / 100 - - -def export_csv( - df: pd.DataFrame, geo_name: str, sensor: str, smooth: bool, - start_date: str, receiving_dir: str -) -> None: - """Export data set in format expected for injestion by the API - - Note that the output values will be multiplied by the value RESCALE_VAL - defined in this file. - - Parameters - ---------- - df: pd.DataFrame - data frame with columns "geo_id", "timestamp", and "val" - geo_name: str - name of the geographic region, such as "state" or "hrr" - sensor: str - name of the sensor; only used for naming the output file - smooth: bool - should the signal in "val" be smoothed? - start_date: str - Output start date as a string formated as "YYYY-MM-DD" - receiving_dir: str - path to location where the output CSV files to be uploaded should be stored - """ - - df = df.copy() - - if smooth: - df["val"] = smoothed_values_by_geo_id(df) - - df["val"] /= RESCALE_VAL - df["se"] = np.nan - df["sample_size"] = np.nan - - start_date = datetime.strptime(start_date, "%Y-%m-%d") - - for date in df["timestamp"].unique(): - if datetime.strptime(date, "%Y-%m-%d") >= start_date: - date_short = date.replace("-", "") - export_fn = f"{date_short}_{geo_name}_{sensor}.csv" - df[df["timestamp"] == date][["geo_id", "val", "se", "sample_size"]].to_csv( - f"{receiving_dir}/{export_fn}", - index=False, - na_rep="NA", - float_format="%.8f", - ) diff --git a/google_health/delphi_google_health/run.py b/google_health/delphi_google_health/run.py index 446993134..86912d3a4 100644 --- a/google_health/delphi_google_health/run.py +++ b/google_health/delphi_google_health/run.py @@ -13,12 +13,13 @@ from delphi_utils import ( read_params, S3ArchiveDiffer, - add_prefix + add_prefix, + create_export_csv ) +from .data_tools import format_for_export from .pull_api import GoogleHealthTrends, get_counts_states, get_counts_dma from .map_values import derived_counts_from_dma -from .export import export_csv from .constants import (SIGNALS, RAW, SMOOTHED, MSA, HRR, STATE, DMA, PULL_START_DATE) @@ -68,45 +69,37 @@ def run_module(): logging.basicConfig(format="%(levelname)s:%(message)s", level=logging.INFO) logging.info("Creating data from %s through %s.", start_date, end_date) + # Dictionary mapping geo resolution to the data corresponding to that resolution. + df_by_geo_res = {} + if not params["test"]: # setup class to handle API calls ght = GoogleHealthTrends(ght_key=ght_key) # read data frame version of the data - df_state = get_counts_states( + df_by_geo_res[STATE] = get_counts_states( ght, PULL_START_DATE, end_date, static_dir=static_dir, data_dir=data_dir ) - df_dma = get_counts_dma( + df_by_geo_res[DMA] = get_counts_dma( ght, PULL_START_DATE, end_date, static_dir=static_dir, data_dir=data_dir ) else: - df_state = pd.read_csv(params["test_data_dir"].format(geo_res="state")) - df_dma = pd.read_csv(params["test_data_dir"].format(geo_res="dma")) + df_by_geo_res[STATE] = pd.read_csv(params["test_data_dir"].format(geo_res="state")) + df_by_geo_res[DMA] = pd.read_csv(params["test_data_dir"].format(geo_res="dma")) - df_hrr, df_msa = derived_counts_from_dma(df_dma, static_dir=static_dir) + df_by_geo_res[HRR], df_by_geo_res[MSA] = derived_counts_from_dma(df_by_geo_res[DMA], + static_dir=static_dir) signal_names = add_prefix(SIGNALS, wip_signal, prefix="wip_") for signal in signal_names: - if signal.endswith(SMOOTHED): - # export each geographic region, with both smoothed and unsmoothed data - export_csv(df_state, STATE, signal, smooth=True, - start_date=start_date, receiving_dir=export_dir) - export_csv(df_dma, DMA, signal, smooth=True, - start_date=start_date, receiving_dir=export_dir) - export_csv(df_hrr, HRR, signal, smooth=True, - start_date=start_date, receiving_dir=export_dir) - export_csv(df_msa, MSA, signal, smooth=True, - start_date = start_date, receiving_dir=export_dir) - elif signal.endswith(RAW): - export_csv(df_state, STATE, signal, smooth=False, - start_date=start_date, receiving_dir=export_dir) - export_csv(df_dma, DMA, signal, smooth=False, - start_date=start_date, receiving_dir=export_dir) - export_csv(df_hrr, HRR, signal, smooth=False, - start_date=start_date, receiving_dir=export_dir) - export_csv(df_msa, MSA, signal, smooth=False, - start_date=start_date, receiving_dir=export_dir) + is_smoothed = signal.endswith(SMOOTHED) + for geo_res, df in df_by_geo_res.items(): + create_export_csv(format_for_export(df, is_smoothed), + geo_res=geo_res, + sensor=signal, + start_date=start_date, + export_dir=export_dir) if not params["test"]: # Diff exports, and make incremental versions diff --git a/google_health/tests/test_export.py b/google_health/tests/test_export.py deleted file mode 100644 index 6dc9f3ba9..000000000 --- a/google_health/tests/test_export.py +++ /dev/null @@ -1,61 +0,0 @@ -import pytest - -from os.path import join, exists -from tempfile import TemporaryDirectory - -import pandas as pd -import numpy as np - -from delphi_google_health.export import export_csv, RESCALE_VAL - - -class TestGoogleHealthTrends: - def test_export(self): - - # create fake dataset and save in a temporary directory - input_data = pd.DataFrame( - { - "geo_id": ["a", "a", "b", "b", "c", "c"], - "val": [0, 2, 3, 5, 10, 12], - "timestamp": ["2020-02-02", "2020-02-03"] * 3, - } - ) - - start_date = "2020-02-02" - - td = TemporaryDirectory() - export_csv( - input_data, - geo_name="region", - sensor="thing", - smooth=False, - start_date=start_date, - receiving_dir=td.name, - ) - - # check data for 2020-02-02 - expected_name = f"20200202_region_thing.csv" - assert exists(join(td.name, expected_name)) - - output_data = pd.read_csv(join(td.name, expected_name)) - - assert (output_data.columns == ["geo_id", "val", "se", "sample_size"]).all() - assert (output_data.geo_id == ["a", "b", "c"]).all() - assert (output_data.val == (np.array([0, 3, 10]) / RESCALE_VAL)).all() - assert np.isnan(output_data.se.values).all() - assert np.isnan(output_data.sample_size.values).all() - - # check data for 2020-02-03 - expected_name = f"20200203_region_thing.csv" - assert exists(join(td.name, expected_name)) - - output_data = pd.read_csv(join(td.name, expected_name)) - - assert (output_data.columns == ["geo_id", "val", "se", "sample_size"]).all() - assert (output_data.geo_id == ["a", "b", "c"]).all() - assert (output_data.val == np.array([2, 5, 12]) / RESCALE_VAL).all() - assert np.isnan(output_data.se.values).all() - assert np.isnan(output_data.sample_size.values).all() - - # remove temporary directory - td.cleanup()