diff --git a/doctor_visits/.gitignore b/doctor_visits/.gitignore new file mode 100644 index 000000000..552154e09 --- /dev/null +++ b/doctor_visits/.gitignore @@ -0,0 +1,120 @@ +# You should hard commit a prototype for this file, but we +# want to avoid accidental adding of API tokens and other +# private data parameters +params.json + +# Do not commit output files +receiving/*.csv + +# Remove macOS files +.DS_Store + +# virtual environment +dview/ + +# Byte-compiled / optimized / DLL files +__pycache__/ +*.py[cod] +*$py.class + +# C extensions +*.so + +# Distribution / packaging +coverage.xml +.Python +build/ +develop-eggs/ +dist/ +downloads/ +eggs/ +.eggs/ +lib/ +lib64/ +parts/ +sdist/ +var/ +wheels/ +*.egg-info/ +.installed.cfg +*.egg +MANIFEST + +# PyInstaller +# Usually these files are written by a python script from a template +# before PyInstaller builds the exe, so as to inject date/other infos into it. +*.manifest +*.spec + +# Installer logs +pip-log.txt +pip-delete-this-directory.txt + +# Unit test / coverage reports +htmlcov/ +.tox/ +.coverage +.coverage.* +.cache +nosetests.xml +coverage.xml +*.cover +.hypothesis/ +.pytest_cache/ + +# Translations +*.mo +*.pot + +# Django stuff: +*.log +.static_storage/ +.media/ +local_settings.py + +# Flask stuff: +instance/ +.webassets-cache + +# Scrapy stuff: +.scrapy + +# Sphinx documentation +docs/_build/ + +# PyBuilder +target/ + +# Jupyter Notebook +.ipynb_checkpoints + +# pyenv +.python-version + +# celery beat schedule file +celerybeat-schedule + +# SageMath parsed files +*.sage.py + +# Environments +.env +.venv +env/ +venv/ +ENV/ +env.bak/ +venv.bak/ + +# Spyder project settings +.spyderproject +.spyproject + +# Rope project settings +.ropeproject + +# mkdocs documentation +/site + +# mypy +.mypy_cache/ diff --git a/doctor_visits/.pylintrc b/doctor_visits/.pylintrc new file mode 100644 index 000000000..a14b269cc --- /dev/null +++ b/doctor_visits/.pylintrc @@ -0,0 +1,8 @@ +[DESIGN] + +min-public-methods=0 + + +[MESSAGES CONTROL] + +disable=R0801, C0200, C0330, E1101, E0611, E1136, C0114, C0116, C0103, R0913, R0914, R0915, W1401, W1202, W1203, W0702 diff --git a/doctor_visits/README.md b/doctor_visits/README.md new file mode 100644 index 000000000..ba2acf43e --- /dev/null +++ b/doctor_visits/README.md @@ -0,0 +1,55 @@ +# Doctor Visits Indicator + +## Running the Indicator + +The indicator is run by directly executing the Python module contained in this +directory. The safest way to do this is to create a virtual environment, +installed the common DELPHI tools, and then install the module and its +dependencies. To do this, run the following code from this directory: + +``` +python -m venv env +source env/bin/activate +pip install ../_delphi_utils_python/. +pip install . +``` + +All of the user-changable parameters are stored in `params.json`. To execute +the module and produce the output datasets (by default, in `receiving`), run +the following: + +``` +env/bin/python -m delphi_doctor_visits +``` + +Once you are finished with the code, you can deactivate the virtual environment +and (optionally) remove the environment itself. + +``` +deactivate +rm -r env +``` + +## Testing the code + +To do a static test of the code style, it is recommended to run **pylint** on +the module. To do this, run the following from the main module directory: + +``` +env/bin/pylint delphi_doctor_visits +``` + +The most aggressive checks are turned off; only relatively important issues +should be raised and they should be manually checked (or better, fixed). + +Unit tests are also included in the module. To execute these, run the following +command from this directory: + +``` +(cd tests && ../env/bin/pytest --cov=delphi_doctor_visits --cov-report=term-missing) +``` + +The output will show the number of unit tests that passed and failed, along +with the percentage of code covered by the tests. None of the tests should +fail and the code lines that are not covered by unit tests should be small and +should not include critical sub-routines. diff --git a/doctor_visits/REVIEW.md b/doctor_visits/REVIEW.md new file mode 100644 index 000000000..93a5a6579 --- /dev/null +++ b/doctor_visits/REVIEW.md @@ -0,0 +1,39 @@ +## Code Review (Python) + +A code review of this module should include a careful look at the code and the +output. To assist in the process, but certainly not in replace of it, please +check the following items. + +**Documentation** + +- [ ] the README.md file template is filled out and currently accurate; it is +possible to load and test the code using only the instructions given +- [ ] minimal docstrings (one line describing what the function does) are +included for all functions; full docstrings describing the inputs and expected +outputs should be given for non-trivial functions + +**Structure** + +- [ ] code should use 4 spaces for indentation; other style decisions are +flexible, but be consistent within a module +- [ ] any required metadata files are checked into the repository and placed +within the directory `static` +- [ ] any intermediate files that are created and stored by the module should +be placed in the directory `cache` +- [ ] final expected output files to be uploaded to the API are placed in the +`receiving` directory; output files should not be committed to the respository +- [ ] all options and API keys are passed through the file `params.json` +- [ ] template parameter file (`params.json.template`) is checked into the +code; no personal (i.e., usernames) or private (i.e., API keys) information is +included in this template file + +**Testing** + +- [ ] module can be installed in a new virtual environment +- [ ] pylint with the default `.pylint` settings run over the module produces +minimal warnings; warnings that do exist have been confirmed as false positives +- [ ] reasonably high level of unit test coverage covering all of the main logic +of the code (e.g., missing coverage for raised errors that do not currently seem +possible to reach are okay; missing coverage for options that will be needed are +not) +- [ ] all unit tests run without errors diff --git a/doctor_visits/cache/.gitignore b/doctor_visits/cache/.gitignore new file mode 100644 index 000000000..e69de29bb diff --git a/doctor_visits/delphi_doctor_visits/__init__.py b/doctor_visits/delphi_doctor_visits/__init__.py new file mode 100644 index 000000000..0aceb317a --- /dev/null +++ b/doctor_visits/delphi_doctor_visits/__init__.py @@ -0,0 +1,20 @@ +# -*- coding: utf-8 -*- +"""Module to pull and clean indicators from the Doctor's Visits source. + +This file defines the functions that are made public by the module. As the +module is intended to be executed though the main method, these are primarily +for testing. +""" + +from __future__ import absolute_import + +from . import config +from . import direction +from . import geo_maps +from . import run +from . import sensor +from . import smooth +from . import update_sensor +from . import weekday + +__version__ = "0.1.0" diff --git a/doctor_visits/delphi_doctor_visits/__main__.py b/doctor_visits/delphi_doctor_visits/__main__.py new file mode 100644 index 000000000..150856e3a --- /dev/null +++ b/doctor_visits/delphi_doctor_visits/__main__.py @@ -0,0 +1,11 @@ +# -*- coding: utf-8 -*- +"""Call the function run_module when executed. + +This file indicates that calling the module (`python -m delphi_doctor_visits`) will +call the function `run_module` found within the run.py file. There should be +no need to change this template. +""" + +from .run import run_module # pragma: no cover + +run_module() # pragma: no cover diff --git a/doctor_visits/delphi_doctor_visits/config.py b/doctor_visits/delphi_doctor_visits/config.py new file mode 100644 index 000000000..5d787efb3 --- /dev/null +++ b/doctor_visits/delphi_doctor_visits/config.py @@ -0,0 +1,41 @@ +""" +This file contains configuration variables used to generate the doctor visits signal. + +Author: Maria +Created: 2020-04-16 +Last modified: 2020-06-17 +""" + +from datetime import datetime, timedelta + + +class Config: + """Static configuration variables. + """ + + # dates + FIRST_DATA_DATE = datetime(2020, 1, 1) + DAY_SHIFT = timedelta(days=1) # shift dates forward for labeling purposes + + # data columns + CLI_COLS = ["Covid_like", "Flu_like", "Mixed"] + FLU1_COL = ["Flu1"] + COUNT_COLS = CLI_COLS + FLU1_COL + ["Denominator"] + DATE_COL = "ServiceDate" + GEO_COL = "PatCountyFIPS" + AGE_COL = "PatAgeGroup" + HRR_COLS = ["Pat HRR Name", "Pat HRR ID"] + ID_COLS = [DATE_COL] + [GEO_COL] + [AGE_COL] + HRR_COLS + FILT_COLS = ID_COLS + COUNT_COLS + DTYPES = {"ServiceDate": str, "PatCountyFIPS": str, + "Denominator": int, "Flu1": int, + "Covid_like": int, "Flu_like": int, + "Mixed": int, "PatAgeGroup": str, + "Pat HRR Name": str, "Pat HRR ID": float} + + SMOOTHER_BANDWIDTH = 100 # bandwidth for the linear left Gaussian filter + MAX_BACKFILL_WINDOW = 7 # maximum number of days used to average a backfill correction + MIN_CUM_VISITS = 500 # need to observe at least 500 counts before averaging + RECENT_LENGTH = 7 # number of days to sum over for sparsity threshold + MIN_RECENT_VISITS = 100 # min numbers of visits needed to include estimate + MIN_RECENT_OBS = 3 # minimum days needed to produce an estimate for latest time diff --git a/doctor_visits/delphi_doctor_visits/direction.py b/doctor_visits/delphi_doctor_visits/direction.py new file mode 100644 index 000000000..b88b42751 --- /dev/null +++ b/doctor_visits/delphi_doctor_visits/direction.py @@ -0,0 +1,53 @@ +""" +Functions used to calculate direction. (Thanks to Addison Hu) + +Author: Maria Jahja +Created: 2020-04-17 + +""" + +import numpy as np + + +def running_mean(s): + """Compute running mean.""" + return np.cumsum(s) / np.arange(1, len(s) + 1) + + +def running_sd(s, mu=None): + """ + Compute running standard deviation. Running mean can be pre-supplied + to save on computation. + """ + if mu is None: + mu = running_mean(s) + sqmu = running_mean(s ** 2) + sd = np.sqrt(sqmu - mu ** 2) + return sd + + +def first_difference_direction(s): + """ + Code taken from Addison Hu. Modified to return directional strings. + Declares "notable" increases and decreases based on the distribution + of past first differences. + + Args: + s: input data + + Returns: Directions in "-1", "0", "+1", or "NA" for first 3 values + """ + T = s[1:] - s[:-1] + mu = running_mean(T) + sd = running_sd(T, mu=mu) + d = np.full(s.shape, "NA") + + for idx in range(2, len(T)): + if T[idx] < min(mu[idx - 1] - sd[idx - 1], 0): + d[idx + 1] = "-1" + elif T[idx] > max(mu[idx - 1] + sd[idx - 1], 0): + d[idx + 1] = "+1" + else: + d[idx + 1] = "0" + + return d diff --git a/doctor_visits/delphi_doctor_visits/geo_maps.py b/doctor_visits/delphi_doctor_visits/geo_maps.py new file mode 100644 index 000000000..b7fc0fbb1 --- /dev/null +++ b/doctor_visits/delphi_doctor_visits/geo_maps.py @@ -0,0 +1,114 @@ +"""Contains geographic mapping tools. + +NOTE: This file is mostly duplicated in the Quidel pipeline; bugs fixed here +should be fixed there as well. + +Author: Maria Jahja +Created: 2020-04-18 +Last modified: 2020-04-30 by Aaron Rumack (add megacounty code) +""" + +from os.path import join + +import pandas as pd +import numpy as np +from delphi_utils.geomap import GeoMapper + +from .config import Config +from .sensor import DoctorVisitsSensor + + +class GeoMaps: + """Class to map counties to other geographic resolutions.""" + + def __init__(self): + self.gmpr = GeoMapper() + + @staticmethod + def convert_fips(x): + """Ensure fips is a string of length 5.""" + return str(x).zfill(5) + + def county_to_msa(self, data): + """Aggregate county data to the msa resolution. + + Args: + data: dataframe aggregated to the daily-county resolution (all 7 cols expected) + + Returns: tuple of dataframe at the daily-msa resolution, and the geo_id column name + """ + data = self.gmpr.add_geocode(data, + "fips", + "msa", + from_col="PatCountyFIPS", + new_col="cbsa_id") + data.drop(columns="PatCountyFIPS", inplace=True) + data = data.groupby(["ServiceDate", "cbsa_id"]).sum().reset_index() + + return data.groupby("cbsa_id"), "cbsa_id" + + def county_to_state(self, data): + """Aggregate county data to the state resolution. + + Args: + data: dataframe aggregated to the daily-county resolution (all 7 cols expected) + + Returns: tuple of dataframe at the daily-state resolution, and geo_id column name + """ + data = self.gmpr.add_geocode(data, + "fips", + "state_id", + from_col="PatCountyFIPS") + data.drop(columns="PatCountyFIPS", inplace=True) + data = data.groupby(["ServiceDate", "state_id"]).sum().reset_index() + + return data.groupby("state_id"), "state_id" + + def county_to_hrr(self, data): + """Aggregate county data to the HRR resolution. + + Note that counties are not strictly contained within HRRs. When a county + spans boundaries, we report it with the same rate in each containing HRR, + but with a sample size weighted by how much it overlaps that HRR. + + Args: + data: dataframe aggregated to the daily-county resolution (all 7 cols expected) + + Returns: + tuple of (data frame at daily-HRR resolution, geo_id column name) + + """ + data = self.gmpr.add_geocode(data, + "fips", + "hrr", + from_col="PatCountyFIPS") + data.drop(columns="PatCountyFIPS", inplace=True) + + ## do a weighted sum by the wpop column to get each HRR's contribution + tmp = data.groupby(["ServiceDate", "hrr"]) + wtsum = lambda g: g["weight"].values @ g[Config.COUNT_COLS] + data = tmp.apply(wtsum).reset_index() + + return data.groupby("hrr"), "hrr" + + def county_to_megacounty(self, data, threshold_visits, threshold_len): + """Convert to megacounty and groupby FIPS using GeoMapper package. + + Args: + data: dataframe aggregated to the daily-county resolution (all 7 cols expected) + threshold_visits: count threshold to determine when to convert to megacounty. + threshold_len: number of days to use when thresholding. + + Returns: tuple of dataframe at the daily-state resolution, and geo_id column name + """ + all_data = self.gmpr.fips_to_megacounty(data, + threshold_visits, + threshold_len, + fips_col="PatCountyFIPS", + thr_col="Denominator", + date_col="ServiceDate") + all_data.rename({"megafips": "PatCountyFIPS"}, axis=1, inplace=True) + megacounties = all_data[all_data.PatCountyFIPS.str.endswith("000")] + data = pd.concat([data, megacounties]) + + return data.groupby("PatCountyFIPS"), "PatCountyFIPS" diff --git a/doctor_visits/delphi_doctor_visits/input/SYNEDI_AGG_OUTPATIENT_18052020_1455CDT.csv.gz b/doctor_visits/delphi_doctor_visits/input/SYNEDI_AGG_OUTPATIENT_18052020_1455CDT.csv.gz new file mode 100644 index 000000000..b2a52a8f9 Binary files /dev/null and b/doctor_visits/delphi_doctor_visits/input/SYNEDI_AGG_OUTPATIENT_18052020_1455CDT.csv.gz differ diff --git a/doctor_visits/delphi_doctor_visits/run.py b/doctor_visits/delphi_doctor_visits/run.py new file mode 100644 index 000000000..63840e4ef --- /dev/null +++ b/doctor_visits/delphi_doctor_visits/run.py @@ -0,0 +1,82 @@ +# -*- coding: utf-8 -*- +"""Functions to call when running the function. + +This module should contain a function called `run_module`, that is executed +when the module is run with `python -m delphi_doctor_visits`. +""" + +# standard packages +import logging +from datetime import datetime, timedelta +from pathlib import Path + +#  third party +from delphi_utils import read_params + +# first party +from .update_sensor import update_sensor + + +def run_module(): + + params = read_params() + + logging.basicConfig(level=logging.DEBUG) + + ## get end date from input file + # the filename is expected to be in the format: + # "EDI_AGG_OUTPATIENT_DDMMYYYY_HHMM{timezone}.csv.gz" + if params["drop_date"] == "": + dropdate_dt = datetime.strptime( + Path(params["input_file"]).name.split("_")[3], "%d%m%Y" + ) + else: + dropdate_dt = datetime.strptime(params["drop_date"], "%Y-%m-%d") + dropdate = str(dropdate_dt.date()) + + # range of estimates to produce + n_backfill_days = params["n_backfill_days"] # produce estimates for n_backfill_days + n_waiting_days = params["n_waiting_days"] # most recent n_waiting_days won't be est + enddate_dt = dropdate_dt - timedelta(days=n_waiting_days) + startdate_dt = enddate_dt - timedelta(days=n_backfill_days) + enddate = str(enddate_dt.date()) + startdate = str(startdate_dt.date()) + logging.info(f"drop date:\t\t{dropdate}") + logging.info(f"first sensor date:\t{startdate}") + logging.info(f"last sensor date:\t{enddate}") + logging.info(f"n_backfill_days:\t{n_backfill_days}") + logging.info(f"n_waiting_days:\t{n_waiting_days}") + + ## geographies + geos = ["state", "msa", "hrr", "county"] + + ## print out other vars + logging.info("outpath:\t\t%s", params["export_dir"]) + logging.info("parallel:\t\t%s", params["parallel"]) + logging.info(f"weekday:\t\t%s", params["weekday"]) + logging.info(f"write se:\t\t%s", params["se"]) + logging.info(f"obfuscated prefix:\t%s", params["obfuscated_prefix"]) + + ## start generating + for geo in geos: + for weekday in params["weekday"]: + if weekday: + logging.info("starting %s, weekday adj", geo) + else: + logging.info("starting %s, no adj", geo) + update_sensor( + filepath=params["input_file"], + outpath=params["export_dir"], + staticpath=params["static_file_dir"], + startdate=startdate, + enddate=enddate, + dropdate=dropdate, + geo=geo, + parallel=params["parallel"], + weekday=weekday, + se=params["se"], + prefix=params["obfuscated_prefix"] + ) + logging.info("finished %s", geo) + + logging.info("finished all") diff --git a/doctor_visits/delphi_doctor_visits/sensor.py b/doctor_visits/delphi_doctor_visits/sensor.py new file mode 100644 index 000000000..f1bc3f26d --- /dev/null +++ b/doctor_visits/delphi_doctor_visits/sensor.py @@ -0,0 +1,243 @@ +""" +Sensor class to fit a signal using CLI counts from doctor visits. + +Author: Maria Jahja +Created: 2020-04-17 + +""" + +# standard packages +import logging + +# third party +import numpy as np +import pandas as pd +from sklearn.preprocessing import MinMaxScaler + +# first party +from .config import Config +from .smooth import left_gauss_linear + + +class DoctorVisitsSensor: + """Sensor class to fit a signal using CLI counts from doctor visits + """ + + @staticmethod + def transform( + sig, h=Config.SMOOTHER_BANDWIDTH, smoother=left_gauss_linear, base=None + ): + """Transform signal by applying a smoother, and/or adjusting by a base. + + Args: + signal: 1D signal to transform + h: smoothing bandwidth + base: signal to adjust arr with + + Returns: smoothed and/or adjusted 1D signal + """ + + scaler = MinMaxScaler(feature_range=(0, 1)) + sc_sig = scaler.fit_transform(sig) + sm_sig = smoother(sc_sig, h) + + if base is not None: + base_scaler = MinMaxScaler(feature_range=(0, 1)) + base = base_scaler.fit_transform(base) + sc_base = smoother(base, h) + sm_sig = np.clip(sm_sig - sc_base, 0, None) + else: + sm_sig = np.clip(sm_sig, 0, None) + + return scaler.inverse_transform(sm_sig) + + @staticmethod + def fill_dates(y_data, dates): + """Ensure all dates are listed in the data, otherwise, add days with 0 counts. + + Args: + y_data: dataframe with datetime index + dates: list of datetime to include + + Returns: dataframe containing all dates given + """ + first_date = dates[0] + last_date = dates[-1] + cols = y_data.columns + + if first_date not in y_data.index: + y_data = y_data.append( + pd.DataFrame(dict.fromkeys(cols, 0.0), columns=cols, index=[first_date]) + ) + if last_date not in y_data.index: + y_data = y_data.append( + pd.DataFrame(dict.fromkeys(cols, 0.0), columns=cols, index=[last_date]) + ) + + y_data.sort_index(inplace=True) + y_data = y_data.asfreq("D", fill_value=0) + return y_data + + @staticmethod + def backfill( + num, + den, + k=Config.MAX_BACKFILL_WINDOW, + min_visits_to_fill=Config.MIN_CUM_VISITS, + min_visits_to_include=Config.MIN_RECENT_VISITS, + min_recent_obs_to_include=Config.MIN_RECENT_OBS, + ): + """ + Adjust for backfill (retroactively added observations) by using a + variable length smoother, which starts from the RHS and moves + leftwards (backwards through time). We cumulatively sum the total + visits (denominator), until we have observed some minimum number of + counts, then calculate the sum over that bin. We restrict the + bin size so to avoid inluding long-past values. + + Args: + num: dataframe of covid counts + den: dataframe of total visits + k: maximum number of days used to average a backfill correction + min_visits_to_fill: minimum number of total visits needed in order to sum a bin + min_visits_to_include: minimum number of total visits needed to include a date + min_recent_obs_to_include: day window where we need to observe at least 1 count + (inclusive) + + Returns: dataframes of adjusted covid counts, adjusted visit counts, inclusion array + """ + revden = den[::-1].values + revnum = num[::-1].values + new_num = np.full_like(num, np.nan, dtype=float) + new_den = np.full_like(den, np.nan, dtype=float) + include = np.full_like(den, True, dtype=bool) + n, p = num.shape + + for i in range(n): + visit_cumsum = revden[i:].cumsum() + + # calculate backfill window + closest_fill_day = np.where(visit_cumsum >= min_visits_to_fill)[0] + if len(closest_fill_day) > 0: + closest_fill_day = min(k, closest_fill_day[0]) + else: + closest_fill_day = k + + if closest_fill_day == 0: + new_den[i] = revden[i] + + for j in range(p): + new_num[i, j] = revnum[i, j] + else: + den_bin = revden[i: (i + closest_fill_day + 1)] + new_den[i] = den_bin.sum() + + for j in range(p): + num_bin = revnum[i: (i + closest_fill_day + 1), j] + new_num[i, j] = num_bin.sum() + + # if we do not observe at least min_visits_to_include in the denominator or + # if we observe 0 counts for min_recent_obs window, don't show. + if (new_den[i] < min_visits_to_include) or ( + revden[i:][:min_recent_obs_to_include].sum() == 0 + ): + include[i] = False + + new_num = new_num[::-1] + new_den = new_den[::-1] + include = include[::-1] + + # reset date index and format + new_num = pd.DataFrame(new_num, columns=num.columns) + new_num.set_index(num.index, inplace=True) + new_den = pd.Series(new_den) + new_den.index = den.index + + return new_num, new_den, include + + @staticmethod + def fit(y_data, + fit_dates, + sensor_dates, + geo_id, + recent_min_visits, + min_recent_obs, + jeffreys): + """Fitting routine. + + Args: + y_data: dataframe for one geo_id, with all 7 cols + fit_dates: list of sorted datetime for which to use as training + sensor_dates: list of sorted datetime for which to produce sensor values + geo_id: unique identifier for the location column + recent_min_visits: location is sparse if it has fewer than min_recent_visits over + days + min_recent_obs: location is sparse also if it has 0 observations in the + last min_recent_obs days + jeffreys: boolean whether to use Jeffreys estimate for binomial proportion, this + is currently only applied if we are writing SEs out. The estimate is + p_hat = (x + 0.5)/(n + 1). + + Returns: dictionary of results + """ + y_data.set_index("ServiceDate", inplace=True) + y_data = DoctorVisitsSensor.fill_dates(y_data, fit_dates) + sensor_idxs = np.where(y_data.index >= sensor_dates[0])[0] + n_dates = y_data.shape[0] + + # combine Flu_like and Mixed columns + y_data["Flu_like_Mixed"] = y_data["Flu_like"] + y_data["Mixed"] + NEW_CLI_COLS = list(set(Config.CLI_COLS) - {"Flu_like", "Mixed"}) + [ + "Flu_like_Mixed"] + + # small backfill correction + total_visits = y_data["Denominator"] + total_counts = y_data[NEW_CLI_COLS + Config.FLU1_COL] + total_counts, total_visits, include = DoctorVisitsSensor.backfill( + total_counts, + total_visits, + min_visits_to_include=recent_min_visits, + min_recent_obs_to_include=min_recent_obs + ) + + # jeffreys inflation + if jeffreys: + total_counts[NEW_CLI_COLS] = total_counts[NEW_CLI_COLS] + 0.5 + total_rates = total_counts.div(total_visits + 1, axis=0) + else: + total_rates = total_counts.div(total_visits, axis=0) + + total_rates.fillna(0, inplace=True) + flu1 = total_rates[Config.FLU1_COL] + new_rates = [] + for code in NEW_CLI_COLS: + code_vals = total_rates[code] + + # if all rates are zero, don't bother + if code_vals.sum() == 0: + if jeffreys: + logging.error("p is 0 even though we used Jefferys estimate") + new_rates.append(np.zeros((n_dates,))) + continue + + # include adjustment for flu like codes + base = flu1 if code in ["Flu_like_Mixed"] else None + fitted_codes = DoctorVisitsSensor.transform( + code_vals.values.reshape(-1, 1), base=base + ) + new_rates.append(fitted_codes.flatten()) + + new_rates = np.array(new_rates).sum(axis=0) + + # cut off at sensor indexes + new_rates = new_rates[sensor_idxs] + include = include[sensor_idxs] + den = total_visits[sensor_idxs].values + + # calculate standard error + se = np.full_like(new_rates, np.nan) + se[include] = np.sqrt( + np.divide((new_rates[include] * (1 - new_rates[include])), den[include])) + + logging.debug(f"{geo_id}: {new_rates[-1]:.3f},[{se[-1]:.3f}]") + return {"geo_id": geo_id, "rate": new_rates, "se": se, "incl": include} diff --git a/doctor_visits/delphi_doctor_visits/smooth.py b/doctor_visits/delphi_doctor_visits/smooth.py new file mode 100644 index 000000000..11564d000 --- /dev/null +++ b/doctor_visits/delphi_doctor_visits/smooth.py @@ -0,0 +1,93 @@ +""" +This file contains various filters used to smooth the 1-d signals. +Code is courtesy of Addison Hu (minor adjustments by Maria). + +Author: Maria Jahja +Created: 2020-04-16 + +""" +import numpy as np + + +def moving_avg(x, y, k=7): + """Smooth the y-values using a rolling window with k observations. + + Args: + x: indexing array of the signal + y: one dimensional signal to smooth + k: the number of observations to average + + Returns: tuple of indexing array, without the first k-1 obs, and smoothed values + """ + + n = len(y) + sy = np.zeros((n - k + 1, 1)) + for i in range(len(sy)): + sy[i] = np.mean(y[i : (i + k)]) + + return x[(k - 1) :], sy + + +def padded_moving_avg(y, k=7): + """Smooth the y-values using a rolling window with k observations. Pad the first k. + + Args: + y: one dimensional signal to smooth. + k: the number of observations to average + + Returns: smoothed values, where the first k-1 obs are padded with 0 + """ + + n = len(y) + sy = np.zeros((n - k + 1, 1)) + for i in range(len(sy)): + sy[i] = np.mean(y[i : (i + k)]) + + # pad first k obs with 0 + for i in range(k - 1): + sy = np.insert(sy, i, 0) + return sy.reshape(-1, 1) + + +def left_gauss(y, h=100): + """Smooth the y-values using a left Gaussian filter. + + Args: + y: one dimensional signal to smooth. + h: smoothing bandwidth (in terms of variance) + + Returns: a smoothed 1D signal. + """ + + t = np.zeros_like(y) + n = len(t) + indices = np.arange(n) + for i in range(1, n): + wts = np.exp(-(((i - 1) - indices[:i]) ** 2) / h) + t[i] = np.dot(wts, y[:i]) / np.sum(wts) + return t + + +def left_gauss_linear(s, h=250): + """Smooth the y-values using a local linear left Gaussian filter. + + Args: + y: one dimensional signal to smooth. + h: smoothing bandwidth (in terms of variance) + + Returns: a smoothed 1D signal. + """ + + n = len(s) + t = np.zeros_like(s) + X = np.vstack([np.ones(n), np.arange(n)]).T + for idx in range(n): + wts = np.exp(-((np.arange(idx + 1) - idx) ** 2) / h) + XwX = np.dot(X[: (idx + 1), :].T * wts, X[: (idx + 1), :]) + Xwy = np.dot(X[: (idx + 1), :].T * wts, s[: (idx + 1)].reshape(-1, 1)) + try: + beta = np.linalg.solve(XwX, Xwy) + t[idx] = np.dot(X[: (idx + 1), :], beta)[-1] + except np.linalg.LinAlgError: + t[idx] = np.nan + return t diff --git a/doctor_visits/delphi_doctor_visits/update_sensor.py b/doctor_visits/delphi_doctor_visits/update_sensor.py new file mode 100644 index 000000000..5a257ad68 --- /dev/null +++ b/doctor_visits/delphi_doctor_visits/update_sensor.py @@ -0,0 +1,236 @@ +""" +Generate doctor-visits sensors. + +Author: Maria Jahja +Created: 2020-04-18 +Modified: + - 2020-04-30: Aaron Rumack (add megacounty code) + - 2020-05-06: Aaron and Maria (weekday effects/other adjustments) +""" + +# standard packages +import logging +from datetime import timedelta +from multiprocessing import Pool, cpu_count + +# third party +import numpy as np +import pandas as pd + +# first party +from .config import Config +from .geo_maps import GeoMaps +from .sensor import DoctorVisitsSensor +from .weekday import Weekday + + +def write_to_csv(output_dict, se, out_name, output_path="."): + """Write sensor values to csv. + + Args: + output_dict: dictionary containing sensor rates, se, unique dates, and unique geo_id + se: boolean to write out standard errors, if true, use an obfuscated name + out_name: name of the output file + output_path: outfile path to write the csv (default is current directory) + """ + + if se: + logging.info(f"========= WARNING: WRITING SEs TO {out_name} =========") + + geo_level = output_dict["geo_level"] + dates = output_dict["dates"] + geo_ids = output_dict["geo_ids"] + all_rates = output_dict["rates"] + all_se = output_dict["se"] + all_include = output_dict["include"] + + out_n = 0 + for i, d in enumerate(dates): + filename = "%s/%s_%s_%s.csv" % (output_path, + (d + Config.DAY_SHIFT).strftime("%Y%m%d"), + geo_level, + out_name) + + with open(filename, "w") as outfile: + outfile.write("geo_id,val,se,direction,sample_size\n") + + for geo_id in geo_ids: + sensor = all_rates[geo_id][i] * 100 # report percentage + se_val = all_se[geo_id][i] * 100 + + if all_include[geo_id][i]: + assert not np.isnan(sensor), "sensor value is nan, check pipeline" + assert sensor < 90, f"strangely high percentage {geo_id, sensor}" + if not np.isnan(se_val): + assert se_val < 5, f"standard error suspiciously high! investigate {geo_id}" + + if se: + assert sensor > 0 and se_val > 0, "p=0, std_err=0 invalid" + outfile.write( + "%s,%f,%s,%s,%s\n" % (geo_id, sensor, se_val, "NA", "NA")) + else: + # for privacy reasons we will not report the standard error + outfile.write( + "%s,%f,%s,%s,%s\n" % (geo_id, sensor, "NA", "NA", "NA")) + out_n += 1 + logging.debug(f"wrote {out_n} rows for {len(geo_ids)} {geo_level}") + + +def update_sensor( + filepath, outpath, staticpath, startdate, enddate, dropdate, geo, parallel, + weekday, se, prefix=None +): + """Generate sensor values, and write to csv format. + + Args: + filepath: path to the aggregated doctor-visits data + outpath: output path for the csv results + staticpath: path for the static geographic files + startdate: first sensor date (YYYY-mm-dd) + enddate: last sensor date (YYYY-mm-dd) + dropdate: data drop date (YYYY-mm-dd) + geo: geographic resolution, one of ["county", "state", "msa", "hrr"] + parallel: boolean to run the sensor update in parallel + weekday: boolean to adjust for weekday effects + se: boolean to write out standard errors, if true, use an obfuscated name + prefix: string to prefix to output files (used for obfuscation in producing SEs) + """ + + # as of 2020-05-11, input file expected to have 10 columns + # id cols: ServiceDate, PatCountyFIPS, PatAgeGroup, Pat HRR ID/Pat HRR Name + # value cols: Denominator, Covid_like, Flu_like, Flu1, Mixed + data = pd.read_csv( + filepath, + usecols=Config.FILT_COLS, + dtype=Config.DTYPES, + parse_dates=[Config.DATE_COL], + ) + assert ( + np.sum(data.duplicated(subset=Config.ID_COLS)) == 0 + ), "Duplicated data! Check the input file" + + # drop HRR columns - unused for now since we assign HRRs by FIPS + data.drop(columns=Config.HRR_COLS, inplace=True) + data.dropna(inplace=True) # drop rows with any missing entries + + # aggregate age groups (so data is unique by service date and FIPS) + data = data.groupby([Config.DATE_COL, Config.GEO_COL]).sum().reset_index() + assert np.sum(data.duplicated()) == 0, "Duplicates after age group aggregation" + assert (data[Config.COUNT_COLS] >= 0).all().all(), "Counts must be nonnegative" + + ## collect dates + # restrict to training start and end date + drange = lambda s, e: np.array([s + timedelta(days=x) for x in range((e - s).days)]) + startdate = pd.to_datetime(startdate) - Config.DAY_SHIFT + burnindate = startdate - Config.DAY_SHIFT + enddate = pd.to_datetime(enddate) + dropdate = pd.to_datetime(dropdate) + assert startdate > Config.FIRST_DATA_DATE, "Start date <= first day of data" + assert startdate < enddate, "Start date >= end date" + assert enddate <= dropdate, "End date > drop date" + data = data[(data[Config.DATE_COL] >= Config.FIRST_DATA_DATE) & \ + (data[Config.DATE_COL] < dropdate)] + fit_dates = drange(Config.FIRST_DATA_DATE, dropdate) + burn_in_dates = drange(burnindate, dropdate) + sensor_dates = drange(startdate, enddate) + final_sensor_idxs = np.where( + (burn_in_dates >= startdate) & (burn_in_dates <= enddate)) + + # handle if we need to adjust by weekday + params = Weekday.get_params(data) if weekday else None + + # handle explicitly if we need to use Jeffreys estimate for binomial proportions + jeffreys = True if se else False + + # get right geography + geo_map = GeoMaps() + if geo.lower() == "county": + data_groups, _ = geo_map.county_to_megacounty( + data, Config.MIN_RECENT_VISITS, Config.RECENT_LENGTH + ) + elif geo.lower() == "state": + data_groups, _ = geo_map.county_to_state(data) + elif geo.lower() == "msa": + data_groups, _ = geo_map.county_to_msa(data) + elif geo.lower() == "hrr": + data_groups, _ = geo_map.county_to_hrr(data) + else: + logging.error(f"{geo} is invalid, pick one of 'county', 'state', 'msa', 'hrr'") + return False + unique_geo_ids = list(data_groups.groups.keys()) + + # run sensor fitting code (maybe in parallel) + sensor_rates = {} + sensor_se = {} + sensor_include = {} + if not parallel: + for geo_id in unique_geo_ids: + sub_data = data_groups.get_group(geo_id).copy() + if weekday: + sub_data = Weekday.calc_adjustment(params, sub_data) + + res = DoctorVisitsSensor.fit( + sub_data, + fit_dates, + burn_in_dates, + geo_id, + Config.MIN_RECENT_VISITS, + Config.MIN_RECENT_OBS, + jeffreys + ) + sensor_rates[geo_id] = res["rate"][final_sensor_idxs] + sensor_se[geo_id] = res["se"][final_sensor_idxs] + sensor_include[geo_id] = res["incl"][final_sensor_idxs] + + else: + n_cpu = min(10, cpu_count()) + logging.debug(f"starting pool with {n_cpu} workers") + + with Pool(n_cpu) as pool: + pool_results = [] + for geo_id in unique_geo_ids: + sub_data = data_groups.get_group(geo_id).copy() + if weekday: + sub_data = Weekday.calc_adjustment(params, sub_data) + + pool_results.append( + pool.apply_async( + DoctorVisitsSensor.fit, + args=( + sub_data, + fit_dates, + burn_in_dates, + geo_id, + Config.MIN_RECENT_VISITS, + Config.MIN_RECENT_OBS, + jeffreys, + ), + ) + ) + pool_results = [proc.get() for proc in pool_results] + + for res in pool_results: + geo_id = res["geo_id"] + sensor_rates[geo_id] = res["rate"][final_sensor_idxs] + sensor_se[geo_id] = res["se"][final_sensor_idxs] + sensor_include[geo_id] = res["incl"][final_sensor_idxs] + + unique_geo_ids = list(sensor_rates.keys()) + output_dict = { + "rates": sensor_rates, + "se": sensor_se, + "dates": sensor_dates, + "geo_ids": unique_geo_ids, + "geo_level": geo, + "include": sensor_include, + } + + # write out results + out_name = "smoothed_adj_cli" if weekday else "smoothed_cli" + if se: + assert prefix is not None, "template has no obfuscated prefix" + out_name = prefix + "_" + out_name + + write_to_csv(output_dict, se, out_name, outpath) + logging.debug(f"wrote files to {outpath}") + return True diff --git a/doctor_visits/delphi_doctor_visits/weekday.py b/doctor_visits/delphi_doctor_visits/weekday.py new file mode 100644 index 000000000..0c1bb3572 --- /dev/null +++ b/doctor_visits/delphi_doctor_visits/weekday.py @@ -0,0 +1,129 @@ +""" +Weekday effects (code from Aaron Rumack). + +Created: 2020-05-06 +""" + +# third party +import cvxpy as cp +import numpy as np + +# first party +from .config import Config + + +class Weekday: + """Class to handle weekday effects.""" + + @staticmethod + def get_params(data): + """Correct a signal estimated as numerator/denominator for weekday effects. + + The ordinary estimate would be numerator_t/denominator_t for each time point + t. Instead, model + + log(numerator_t/denominator_t) = alpha_{wd(t)} + phi_t + + where alpha is a vector of fixed effects for each weekday. For + identifiability, we constrain \sum alpha_j = 0, and to enforce this we set + Sunday's fixed effect to be the negative sum of the other weekdays. + + We estimate this as a penalized Poisson GLM problem with log link. We + rewrite the problem as + + log(numerator_t) = alpha_{wd(t)} + phi_t + log(denominator_t) + + and set a design matrix X with one row per time point. The first six columns + of X are weekday indicators; the remaining columns are the identity matrix, + so that each time point gets a unique phi. Using this X, we write + + log(numerator_t) = X beta + log(denominator_t) + + Hence the first six entries of beta correspond to alpha, and the remaining + entries to phi. + + The penalty is on the L1 norm of third differences of phi (so the third + differences of the corresponding columns of beta), to enforce smoothness. + Third differences ensure smoothness without removing peaks or valleys. + + Objective function is negative mean Poisson log likelihood plus penalty: + + ll = (numerator * (X*b + log(denominator)) - sum(exp(X*b) + log(denominator))) + / num_days + + Return a matrix of parameters: the entire vector of betas, for each time + series column in the data. + """ + + denoms = data.groupby(Config.DATE_COL).sum()["Denominator"] + nums = data.groupby(Config.DATE_COL).sum()[Config.CLI_COLS + Config.FLU1_COL] + + # Construct design matrix to have weekday indicator columns and then day + # indicators. + X = np.zeros((nums.shape[0], 6 + nums.shape[0])) + not_sunday = np.where(nums.index.dayofweek != 6)[0] + X[not_sunday, np.array(nums.index.dayofweek)[not_sunday]] = 1 + X[np.where(nums.index.dayofweek == 6)[0], :6] = -1 + X[:, 6:] = np.eye(X.shape[0]) + + npnums, npdenoms = np.array(nums), np.array(denoms) + params = np.zeros((nums.shape[1], X.shape[1])) + + # Loop over the available numerator columns and smooth each separately. + for i in range(nums.shape[1]): + b = cp.Variable((X.shape[1])) + + lmbda = cp.Parameter(nonneg=True) + lmbda.value = 10 # Hard-coded for now, seems robust to changes + + ll = ( + cp.matmul(npnums[:, i], cp.matmul(X, b) + np.log(npdenoms)) + - cp.sum(cp.exp(cp.matmul(X, b) + np.log(npdenoms))) + ) / X.shape[0] + penalty = ( + lmbda * cp.norm(cp.diff(b[6:], 3), 1) / (X.shape[0] - 2) + ) # L-1 Norm of third differences, rewards smoothness + try: + prob = cp.Problem(cp.Minimize(-ll + lmbda * penalty)) + _ = prob.solve() + except: + # If the magnitude of the objective function is too large, an error is + # thrown; Rescale the objective function + prob = cp.Problem(cp.Minimize((-ll + lmbda * penalty) / 1e5)) + _ = prob.solve() + params[i, :] = b.value + + return params + + @staticmethod + def calc_adjustment(params, sub_data): + """Apply the weekday adjustment to a specific time series. + + Extracts the weekday fixed effects from the parameters and uses these to + adjust the time series. + + Since + + log(numerator_t / denominator_t) = alpha_{wd(t)} + phi_t, + + we have that + + numerator_t / denominator_t = exp(alpha_{wd(t)}) exp(phi_t) + + and can divide by exp(alpha_{wd(t)}) to get a weekday-corrected ratio. In + this case, we only divide the numerator, leaving the denominator unchanged + -- this has the same effect. + + """ + + for i, c in enumerate(Config.CLI_COLS + Config.FLU1_COL): + wd_correction = np.zeros((len(sub_data[c]))) + + for wd in range(7): + mask = sub_data[Config.DATE_COL].dt.dayofweek == wd + wd_correction[mask] = sub_data.loc[mask, c] / ( + np.exp(params[i, wd]) if wd < 6 else np.exp(-np.sum(params[i, :6])) + ) + sub_data.loc[:, c] = wd_correction + + return sub_data diff --git a/doctor_visits/input/SYNEDI_AGG_OUTPATIENT_18052020_1455CDT.csv.gz b/doctor_visits/input/SYNEDI_AGG_OUTPATIENT_18052020_1455CDT.csv.gz new file mode 100644 index 000000000..b2a52a8f9 Binary files /dev/null and b/doctor_visits/input/SYNEDI_AGG_OUTPATIENT_18052020_1455CDT.csv.gz differ diff --git a/doctor_visits/params.json.template b/doctor_visits/params.json.template new file mode 100644 index 000000000..f18e0b696 --- /dev/null +++ b/doctor_visits/params.json.template @@ -0,0 +1,12 @@ +{ + "static_file_dir": "./static", + "export_dir": "./receiving", + "input_file": "./input/SYNEDI_AGG_OUTPATIENT_18052020_1455CDT.csv.gz", + "drop_date": "", + "n_backfill_days": 60, + "n_waiting_days": 3, + "weekday": [true, false], + "se": false, + "obfuscated_prefix": "wip_XXXXX", + "parallel": false +} diff --git a/doctor_visits/receiving/.gitignore b/doctor_visits/receiving/.gitignore new file mode 100644 index 000000000..e69de29bb diff --git a/doctor_visits/setup.py b/doctor_visits/setup.py new file mode 100644 index 000000000..c9e5c6c4f --- /dev/null +++ b/doctor_visits/setup.py @@ -0,0 +1,29 @@ +from setuptools import setup +from setuptools import find_packages + +required = [ + "numpy", + "pandas", + "sklearn", + "cvxpy", + "pytest", + "pytest-cov", + "pylint", + "delphi-utils" +] + +setup( + name="delphi_doctor_visits", + version="0.1.0", + description="Parse information for the doctors visits indicator", + author="", + author_email="", + url="https://github.com/cmu-delphi/covidcast-indicators", + install_requires=required, + classifiers=[ + "Development Status :: 5 - Production/Stable", + "Intended Audience :: Developers", + "Programming Language :: Python :: 3.7", + ], + packages=find_packages(), +) diff --git a/doctor_visits/static/.gitignore b/doctor_visits/static/.gitignore new file mode 100644 index 000000000..e69de29bb diff --git a/doctor_visits/tests/params.json.template b/doctor_visits/tests/params.json.template new file mode 100644 index 000000000..6142736a5 --- /dev/null +++ b/doctor_visits/tests/params.json.template @@ -0,0 +1,8 @@ +{ + "static_file_dir": "./static", + "export_dir": "./receiving", + "input_file": "./input/SYNEDI_AGG_OUTPATIENT_18052020_1455CDT.csv.gz", + "start_date": "2020-01-01", + "end_date": "", + "parallel": false +} diff --git a/doctor_visits/tests/test_classconfig.py b/doctor_visits/tests/test_classconfig.py new file mode 100644 index 000000000..ec167e74b --- /dev/null +++ b/doctor_visits/tests/test_classconfig.py @@ -0,0 +1,13 @@ +import pytest + +from delphi_doctor_visits.config import Config + + +class TestConfigValues: + def test_values(self): + + conf = Config() + + assert conf.CLI_COLS == ["Covid_like", "Flu_like", "Mixed"] + assert conf.MIN_RECENT_VISITS == 100 + assert conf.MIN_RECENT_OBS == 3 diff --git a/doctor_visits/tests/test_data/SYNEDI_AGG_OUTPATIENT_07022020_1455CDT.csv.gz b/doctor_visits/tests/test_data/SYNEDI_AGG_OUTPATIENT_07022020_1455CDT.csv.gz new file mode 100644 index 000000000..6aee227b9 Binary files /dev/null and b/doctor_visits/tests/test_data/SYNEDI_AGG_OUTPATIENT_07022020_1455CDT.csv.gz differ diff --git a/doctor_visits/tests/test_direction.py b/doctor_visits/tests/test_direction.py new file mode 100644 index 000000000..e400390b9 --- /dev/null +++ b/doctor_visits/tests/test_direction.py @@ -0,0 +1,33 @@ +import pytest + +import numpy as np + +from delphi_doctor_visits.direction import ( + running_mean, + running_sd, + first_difference_direction, +) + + +class TestDirection: + def test_running_mean(self): + + input = np.array([1, 2, 3, 4]) + output = running_mean(input) + assert (output == np.array([1, 1.5, 2, 2.5])).all() + + def test_running_sd(self): + + input = np.array([1, 2, 3, 4]) + output = running_sd(input) + assert np.max(np.abs(output ** 2 - np.array([0, 0.25, 2 / 3, 1.25]))) < 1e-8 + + def test_first_difference_direction(self): + + input = np.array([1, 2, 3, 2, 3]) + output = first_difference_direction(input) + assert (output == np.array(["NA", "NA", "NA", "-1", "0"])).all() + + input = np.array([1, 2, 3, 4, 6]) + output = first_difference_direction(input) + assert (output == np.array(["NA", "NA", "NA", "0", "+1"])).all() diff --git a/doctor_visits/tests/test_geomap.py b/doctor_visits/tests/test_geomap.py new file mode 100644 index 000000000..1ee054016 --- /dev/null +++ b/doctor_visits/tests/test_geomap.py @@ -0,0 +1,58 @@ +import pytest + +import pandas as pd + +from delphi_doctor_visits.geo_maps import GeoMaps +from delphi_doctor_visits.config import Config + +CONFIG = Config() +DATA = pd.read_csv( + "test_data/SYNEDI_AGG_OUTPATIENT_07022020_1455CDT.csv.gz", + usecols=CONFIG.FILT_COLS, + dtype=CONFIG.DTYPES, + parse_dates=[CONFIG.DATE_COL], + nrows=9, +) + +GM = GeoMaps() + + +class TestGeoMap: + def test_convert_fips(self): + + assert GM.convert_fips("23") == "00023" + + def test_county_to_msa(self): + + out, name = GM.county_to_msa(DATA) + assert name == "cbsa_id" + assert set(out.groups.keys()) == {"11500", "13820", "19300", "33860"} + + def test_county_to_state(self): + + out, name = GM.county_to_state(DATA) + assert name == "state_id" + assert set(out.groups.keys()) == {"al"} + + def test_county_to_hrr(self): + + out, name = GM.county_to_hrr(DATA) + assert name == "hrr" + assert set(out.groups.keys()) == {"1", "134", "144", "146", "2", "5", "6", "7", "9"} + + def test_county_to_megacounty(self): + + out, name = GM.county_to_megacounty(DATA, 100000, 10) + assert name == "PatCountyFIPS" + assert set(out.groups.keys()) == { + "01001", + "01003", + "01005", + "01007", + "01009", + "01011", + "01013", + "01015", + "01017", + "01000" + } \ No newline at end of file diff --git a/doctor_visits/tests/test_run.py b/doctor_visits/tests/test_run.py new file mode 100644 index 000000000..cddd48d37 --- /dev/null +++ b/doctor_visits/tests/test_run.py @@ -0,0 +1,11 @@ +import pytest + +import pandas as pd + +from delphi_doctor_visits.run import run_module + + +class TestRun: + def todo_test_run(self): + + run_module()