Skip to content

CLI signal for CHNG #587

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 8 commits into from
Dec 2, 2020
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 17 additions & 3 deletions changehc/delphi_changehc/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,29 @@ class Config:
## data columns
COVID_COL = "COVID"
DENOM_COL = "Denominator"
COUNT_COLS = ["COVID"] + ["Denominator"]
FLU_COL = "Flu"
MIXED_COL = "Mixed"
FLU_LIKE_COL = "Flu-like"
COVID_LIKE_COL = "Covid-like"
COUNT_COLS = [COVID_COL,DENOM_COL,FLU_COL,MIXED_COL,FLU_LIKE_COL,COVID_LIKE_COL]
DATE_COL = "date"
GEO_COL = "fips"
ID_COLS = [DATE_COL] + [GEO_COL]
FILT_COLS = ID_COLS + COUNT_COLS

DENOM_COLS = [GEO_COL, DATE_COL, DENOM_COL]
COVID_COLS = [GEO_COL, DATE_COL, COVID_COL]
DENOM_DTYPES = {"date": str, "Denominator": str, "fips": str}
COVID_DTYPES = {"date": str, "COVID": str, "fips": str}
FLU_COLS = [GEO_COL, DATE_COL, FLU_COL]
MIXED_COLS = [GEO_COL, DATE_COL, MIXED_COL]
FLU_LIKE_COLS = [GEO_COL, DATE_COL, FLU_LIKE_COL]
COVID_LIKE_COLS = [GEO_COL, DATE_COL, COVID_LIKE_COL]

DENOM_DTYPES = {DATE_COL: str, DENOM_COL: str, GEO_COL: str}
COVID_DTYPES = {DATE_COL: str, COVID_COL: str, GEO_COL: str}
FLU_DTYPES = {DATE_COL: str, FLU_COL: str, GEO_COL: str}
MIXED_DTYPES = {DATE_COL: str, MIXED_COL: str, GEO_COL: str}
FLU_LIKE_DTYPES = {DATE_COL: str, FLU_LIKE_COL: str, GEO_COL: str}
COVID_LIKE_DTYPES = {DATE_COL: str, COVID_LIKE_COL: str, GEO_COL: str}

SMOOTHER_BANDWIDTH = 100 # bandwidth for the linear left Gaussian filter
MIN_DEN = 100 # number of total visits needed to produce a sensor
Expand Down
4 changes: 3 additions & 1 deletion changehc/delphi_changehc/constants.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
"""Registry for signal names and geo types"""
SMOOTHED = "smoothed_outpatient_covid"
SMOOTHED_ADJ = "smoothed_adj_outpatient_covid"
SIGNALS = [SMOOTHED, SMOOTHED_ADJ]
SMOOTHED_CLI = "smoothed_outpatient_cli"
SMOOTHED_ADJ_CLI = "smoothed_adj_outpatient_cli"
SIGNALS = [SMOOTHED, SMOOTHED_ADJ, SMOOTHED_CLI, SMOOTHED_ADJ_CLI]
NA = "NA"
HRR = "hrr"
FIPS = "fips"
42 changes: 40 additions & 2 deletions changehc/delphi_changehc/download_ftp_files.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,8 @@ def get_files_from_dir(sftp, out_path):
sftp.get(infile, outfile, callback=callback_for_filename)


def download(out_path, ftp_conn):
"""Downloads files necessary to create CHC signal from ftp server.
def download_covid(out_path, ftp_conn):
"""Downloads files necessary to create chng-covid signal from ftp server.
Args:
out_path: Path to local directory into which to download the files
ftp_conn: Dict containing login credentials to ftp server
Expand All @@ -71,3 +71,41 @@ def download(out_path, ftp_conn):
finally:
if client:
client.close()


def download_cli(out_path, ftp_conn):
"""Downloads files necessary to create chng-cli signal from ftp server.
Args:
out_path: Path to local directory into which to download the files
ftp_conn: Dict containing login credentials to ftp server
"""

# open client
try:
client = paramiko.SSHClient()
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())

client.connect(ftp_conn["host"], username=ftp_conn["user"],
password=ftp_conn["pass"],
port=ftp_conn["port"],
allow_agent=False, look_for_keys=False)
sftp = client.open_sftp()

sftp.chdir('/dailycounts/All_Outpatients_By_County')
get_files_from_dir(sftp, out_path)

sftp.chdir('/dailycounts/Flu_Patient_Count_By_County')
get_files_from_dir(sftp, out_path)

sftp.chdir('/dailycounts/Mixed_Patient_Count_By_County')
get_files_from_dir(sftp, out_path)

sftp.chdir('/dailycounts/Flu_Like_Patient_Count_By_County')
get_files_from_dir(sftp, out_path)

sftp.chdir('/dailycounts/Covid_Like_Patient_Count_By_County')
get_files_from_dir(sftp, out_path)

finally:
if client:
client.close()
155 changes: 83 additions & 72 deletions changehc/delphi_changehc/load_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,116 +12,114 @@
from .config import Config


def load_denom_data(denom_filepath, dropdate, base_geo):
"""Load in and set up denominator data.
def load_chng_data(filepath, dropdate, base_geo,
col_names, col_types, counts_col):
"""Load in and set up daily count data from Change.

Args:
denom_filepath: path to the aggregated denominator data
filepath: path to aggregated data
dropdate: data drop date (datetime object)
base_geo: base geographic unit before aggregation ('fips')
col_names: column names of data
col_types: column types of data
counts_col: name of column containing counts

Returns:
cleaned denominator dataframe
cleaned dataframe
"""
assert base_geo == "fips", "base unit must be 'fips'"

denom_suffix = denom_filepath.split("/")[-1].split(".")[0][9:]
assert denom_suffix == "All_Outpatients_By_County"
denom_filetype = denom_filepath.split("/")[-1].split(".")[1]
assert denom_filetype == "dat"

denom_data = pd.read_csv(
denom_filepath,
assert base_geo == "fips", "base unit must be 'fips'"
count_flag = False
date_flag = False
geo_flag = False
for n in col_names:
if n == counts_col:
count_flag = True
elif n == Config.DATE_COL:
date_flag = True
elif n == "fips":
geo_flag = True
assert count_flag, "counts_col must be present in col_names"
assert date_flag, "'%s' must be present in col_names"%(Config.DATE_COL)
assert geo_flag, "'fips' must be present in col_names"

data = pd.read_csv(
filepath,
sep="|",
header=None,
names=Config.DENOM_COLS,
dtype=Config.DENOM_DTYPES,
names=col_names,
dtype=col_types,
)

denom_data[Config.DATE_COL] = \
pd.to_datetime(denom_data[Config.DATE_COL],errors="coerce")
data[Config.DATE_COL] = \
pd.to_datetime(data[Config.DATE_COL],errors="coerce")

# restrict to start and end date
denom_data = denom_data[
(denom_data[Config.DATE_COL] >= Config.FIRST_DATA_DATE) &
(denom_data[Config.DATE_COL] < dropdate)
data = data[
(data[Config.DATE_COL] >= Config.FIRST_DATA_DATE) &
(data[Config.DATE_COL] < dropdate)
]

# counts between 1 and 3 are coded as "3 or less", we convert to 1
denom_data[Config.DENOM_COL][
denom_data[Config.DENOM_COL] == "3 or less"
data[counts_col][
data[counts_col] == "3 or less"
] = "1"
denom_data[Config.DENOM_COL] = denom_data[Config.DENOM_COL].astype(int)
data[counts_col] = data[counts_col].astype(int)

assert (
(denom_data[Config.DENOM_COL] >= 0).all().all()
), "Denominator counts must be nonnegative"
(data[counts_col] >= 0).all().all()
), "Counts must be nonnegative"

# aggregate age groups (so data is unique by date and base geography)
denom_data = denom_data.groupby([base_geo, Config.DATE_COL]).sum()
denom_data.dropna(inplace=True) # drop rows with any missing entries
data = data.groupby([base_geo, Config.DATE_COL]).sum()
data.dropna(inplace=True) # drop rows with any missing entries

return data

return denom_data

def load_covid_data(covid_filepath, dropdate, base_geo):
"""Load in and set up denominator data.
def load_combined_data(denom_filepath, covid_filepath, dropdate, base_geo):
"""Load in denominator and covid data, and combine them.

Args:
denom_filepath: path to the aggregated denominator data
covid_filepath: path to the aggregated covid data
dropdate: data drop date (datetime object)
base_geo: base geographic unit before aggregation ('fips')

Returns:
cleaned denominator dataframe
combined multiindexed dataframe, index 0 is geo_base, index 1 is date
"""
assert base_geo == "fips", "base unit must be 'fips'"

covid_suffix = covid_filepath.split("/")[-1].split(".")[0][9:]
assert covid_suffix == "Covid_Outpatients_By_County"
covid_filetype = covid_filepath.split("/")[-1].split(".")[1]
assert covid_filetype == "dat"

covid_data = pd.read_csv(
covid_filepath,
sep="|",
header=None,
names=Config.COVID_COLS,
dtype=Config.COVID_DTYPES,
parse_dates=[Config.DATE_COL]
)

covid_data[Config.DATE_COL] = \
pd.to_datetime(covid_data[Config.DATE_COL],errors="coerce")

# restrict to start and end date
covid_data = covid_data[
(covid_data[Config.DATE_COL] >= Config.FIRST_DATA_DATE) &
(covid_data[Config.DATE_COL] < dropdate)
]

# counts between 1 and 3 are coded as "3 or less", we convert to 1
covid_data[Config.COVID_COL][
covid_data[Config.COVID_COL] == "3 or less"
] = "1"
covid_data[Config.COVID_COL] = covid_data[Config.COVID_COL].astype(int)
# load each data stream
denom_data = load_chng_data(denom_filepath, dropdate, base_geo,
Config.DENOM_COLS, Config.DENOM_DTYPES, Config.DENOM_COL)
covid_data = load_chng_data(covid_filepath, dropdate, base_geo,
Config.COVID_COLS, Config.COVID_DTYPES, Config.COVID_COL)

assert (
(covid_data[Config.COVID_COL] >= 0).all().all()
), "COVID counts must be nonnegative"
# merge data
data = denom_data.merge(covid_data, how="outer", left_index=True, right_index=True)
assert data.isna().all(axis=1).sum() == 0, "entire row is NA after merge"

# aggregate age groups (so data is unique by date and base geography)
covid_data = covid_data.groupby([base_geo, Config.DATE_COL]).sum()
covid_data.dropna(inplace=True) # drop rows with any missing entries
# calculate combined numerator and denominator
data.fillna(0, inplace=True)
data["num"] = data[Config.COVID_COL]
data["den"] = data[Config.DENOM_COL]
data = data[["num", "den"]]

return covid_data
return data


def load_combined_data(denom_filepath, covid_filepath, dropdate, base_geo):
"""Load in denominator and covid data, and combine them.
def load_cli_data(denom_filepath, flu_filepath, mixed_filepath, flu_like_filepath,
covid_like_filepath, dropdate, base_geo):
"""Load in denominator and covid-like data, and combine them.

Args:
denom_filepath: path to the aggregated denominator data
covid_filepath: path to the aggregated covid data
flu_filepath: path to the aggregated flu data
mixed_filepath: path to the aggregated mixed data
flu_like_filepath: path to the aggregated flu-like data
covid_like_filepath: path to the aggregated covid-like data
dropdate: data drop date (datetime object)
base_geo: base geographic unit before aggregation ('fips')

Expand All @@ -131,16 +129,29 @@ def load_combined_data(denom_filepath, covid_filepath, dropdate, base_geo):
assert base_geo == "fips", "base unit must be 'fips'"

# load each data stream
denom_data = load_denom_data(denom_filepath, dropdate, base_geo)
covid_data = load_covid_data(covid_filepath, dropdate, base_geo)
denom_data = load_chng_data(denom_filepath, dropdate, base_geo,
Config.DENOM_COLS, Config.DENOM_DTYPES, Config.DENOM_COL)
flu_data = load_chng_data(flu_filepath, dropdate, base_geo,
Config.FLU_COLS, Config.FLU_DTYPES, Config.FLU_COL)
mixed_data = load_chng_data(mixed_filepath, dropdate, base_geo,
Config.MIXED_COLS, Config.MIXED_DTYPES, Config.MIXED_COL)
flu_like_data = load_chng_data(flu_like_filepath, dropdate, base_geo,
Config.FLU_LIKE_COLS, Config.FLU_LIKE_DTYPES, Config.FLU_LIKE_COL)
covid_like_data = load_chng_data(covid_like_filepath, dropdate, base_geo,
Config.COVID_LIKE_COLS, Config.COVID_LIKE_DTYPES, Config.COVID_LIKE_COL)

# merge data
data = denom_data.merge(covid_data, how="outer", left_index=True, right_index=True)
data = denom_data.merge(flu_data, how="outer", left_index=True, right_index=True)
data = data.merge(mixed_data, how="outer", left_index=True, right_index=True)
data = data.merge(flu_like_data, how="outer", left_index=True, right_index=True)
data = data.merge(covid_like_data, how="outer", left_index=True, right_index=True)
assert data.isna().all(axis=1).sum() == 0, "entire row is NA after merge"

# calculate combined numerator and denominator
data.fillna(0, inplace=True)
data["num"] = data[Config.COVID_COL]
data["num"] = -data[Config.FLU_COL] + data[Config.MIXED_COL] + data[Config.FLU_LIKE_COL]
data["num"] = data["num"].clip(lower=0)
data["num"] = data["num"] + data[Config.COVID_LIKE_COL]
data["den"] = data[Config.DENOM_COL]
data = data[["num", "den"]]

Expand Down
Loading