Skip to content

Commit 2b70e4a

Browse files
authored
Merge pull request #2000 from cmu-delphi/nssp_patching
nssp patching code
2 parents 51620ea + 2a67c27 commit 2b70e4a

File tree

13 files changed

+627
-27
lines changed

13 files changed

+627
-27
lines changed

_delphi_utils_python/delphi_utils/export.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -205,4 +205,5 @@ def create_backup_csv(
205205
)
206206
# pylint: disable=W0703
207207
except Exception as e:
208-
logger.info("Backup file creation failed", msg=e)
208+
if logger:
209+
logger.info("Backup file creation failed", msg=e)

nssp/README.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,3 +76,10 @@ with the percentage of code covered by the tests.
7676

7777
None of the linting or unit tests should fail, and the code lines that are not covered by unit tests should be small and
7878
should not include critical sub-routines.
79+
80+
## Running Patches:
81+
A daily backup of source in the form of csv files can be found on prod under `/common/covidcast/source_backup/nssp/`. This data is needed to create patches. Talk to your sysadmin for access.
82+
When your credentials are working, to create patching data for a specific date range in batch issue format, adjust `params.json` in accordance with instructions in `patch.py`, then run
83+
```
84+
env/bin/python -m delphi_nssp.patch
85+
```

nssp/delphi_nssp/patch.py

Lines changed: 186 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,186 @@
1+
"""
2+
This module is used for patching data in the delphi_nssp package.
3+
4+
The code assume user can use key-based auth to access prod server
5+
where historical source data is stored.
6+
7+
To use this module, configure params.json like so:
8+
9+
{
10+
"common": {
11+
"custom_run": true,
12+
...
13+
},
14+
"validation": {
15+
...
16+
},
17+
"patch": {
18+
"source_host": "prod.server.edu",
19+
"source_dir": "delphi/covidcast-indicators/nssp/source_data",
20+
"user": "username",
21+
"patch_dir": "delphi/covidcast-indicators/nssp/AprilPatch",
22+
"start_issue": "2024-04-20",
23+
"end_issue": "2024-04-21",
24+
}
25+
}
26+
27+
In this params.json, we
28+
- Turn on the "custom_run" flag under "common"
29+
- Add "patch" section, which contains:
30+
+ "source_host": the prod server where source data is backed up
31+
+ "source_dir": the local directory where source data is downloaded to
32+
+ "user": the username to log in to the remote server where source data is backed up
33+
+ "patch_dir": the local directory where to write all patch issues output
34+
+ "start_date": str, YYYY-MM-DD format, first issue date
35+
+ "end_date": str, YYYY-MM-DD format, last issue date
36+
37+
if "source_dir" doesn't exist locally or has no files in it, we download source data to source_dir
38+
else, we assume all needed source files are already in source_dir.
39+
40+
This module will generate data for that range of issue dates, and store them in batch issue format in the patch_dir:
41+
[patch_dir]/issue_[issue-date]/nssp/actual_data_file.csv
42+
"""
43+
44+
import sys
45+
from datetime import datetime, timedelta
46+
from os import listdir, makedirs, path
47+
from shutil import rmtree
48+
49+
import pandas as pd
50+
from delphi_utils import get_structured_logger, read_params
51+
from epiweeks import Week
52+
53+
from .pull import get_source_data
54+
from .run import run_module
55+
56+
57+
def good_patch_config(params, logger):
58+
"""
59+
Check if the params.json file is correctly configured for patching.
60+
61+
params: Dict[str, Any]
62+
Nested dictionary of parameters, typically loaded from params.json file.
63+
logger: Logger object
64+
Logger object to log messages.
65+
"""
66+
valid_config = True
67+
custom_run = params["common"].get("custom_run", False)
68+
if not custom_run:
69+
logger.error("Calling patch.py without custom_run flag set true.")
70+
valid_config = False
71+
72+
patch_config = params.get("patch", {})
73+
if patch_config == {}:
74+
logger.error("Custom flag is on, but patch section is missing.")
75+
valid_config = False
76+
else:
77+
required_patch_keys = ["start_issue", "end_issue", "patch_dir", "source_dir"]
78+
79+
source_dir = params["patch"]["source_dir"]
80+
if not path.isdir(source_dir) or not listdir(source_dir):
81+
required_patch_keys.append("user")
82+
required_patch_keys.append("source_host")
83+
84+
missing_keys = [key for key in required_patch_keys if key not in patch_config]
85+
if missing_keys:
86+
logger.error("Patch section is missing required key(s)", missing_keys=missing_keys)
87+
valid_config = False
88+
else:
89+
try: # issue dates validity check
90+
start_issue = datetime.strptime(patch_config["start_issue"], "%Y-%m-%d")
91+
end_issue = datetime.strptime(patch_config["end_issue"], "%Y-%m-%d")
92+
if start_issue > end_issue:
93+
logger.error("Start issue date is after end issue date.")
94+
valid_config = False
95+
except ValueError:
96+
logger.error("Issue dates must be in YYYY-MM-DD format.")
97+
valid_config = False
98+
99+
if valid_config:
100+
logger.info("Good patch configuration.")
101+
return True
102+
logger.info("Bad patch configuration.")
103+
return False
104+
105+
106+
def get_patch_dates(start_issue, end_issue, source_dir):
107+
"""
108+
Get the dates to run patch on given a range of issue dates.
109+
110+
Due to weekly cadence of nssp data, dates to run patch on are not necessarily the same as issue dates.
111+
We use the latest date with source data per epiweek as reporting date for patching of that week's data.
112+
113+
start_issue: datetime object
114+
end_issue: datetime object
115+
"""
116+
patch_dates = []
117+
date_range = pd.date_range(start=start_issue, end=end_issue)
118+
dates_with_source_data = {
119+
date for date in date_range if path.isfile(f"""{source_dir}/{date.strftime("%Y%m%d")}.csv.gz""")
120+
}
121+
epiweek_start_dates = {Week.fromdate(date).startdate() for date in date_range}
122+
for epiweek_start_date in epiweek_start_dates:
123+
epiweek = Week.fromdate(epiweek_start_date)
124+
dates_with_data_in_epiweek = [date for date in dates_with_source_data if date.date() in epiweek.iterdates()]
125+
if dates_with_data_in_epiweek == []:
126+
continue
127+
latest_date_with_data = max(dates_with_data_in_epiweek)
128+
patch_dates.append(latest_date_with_data)
129+
patch_dates.sort()
130+
return patch_dates
131+
132+
133+
def patch():
134+
"""Run nssp indicator for a range of issue dates."""
135+
params = read_params()
136+
logger = get_structured_logger("delphi_nssp.patch", filename=params["common"]["log_filename"])
137+
if not good_patch_config(params, logger):
138+
sys.exit(1)
139+
140+
source_dir = params["patch"]["source_dir"]
141+
download_source = False
142+
if not path.isdir(source_dir) or not listdir(source_dir): # no source dir or empty source dir
143+
download_source = True
144+
get_source_data(params, logger)
145+
else:
146+
logger.info("Source data already exists locally.")
147+
148+
start_issue = datetime.strptime(params["patch"]["start_issue"], "%Y-%m-%d")
149+
end_issue = datetime.strptime(params["patch"]["end_issue"], "%Y-%m-%d")
150+
151+
logger.info(start_issue=start_issue.strftime("%Y-%m-%d"))
152+
logger.info(end_issue=end_issue.strftime("%Y-%m-%d"))
153+
logger.info(source_dir=source_dir)
154+
logger.info(patch_dir=params["patch"]["patch_dir"])
155+
makedirs(params["patch"]["patch_dir"], exist_ok=True)
156+
157+
patch_dates = get_patch_dates(start_issue, end_issue, source_dir)
158+
159+
for current_issue in patch_dates:
160+
logger.info("patching issue", issue_date=current_issue.strftime("%Y%m%d"))
161+
162+
current_issue_source_csv = f"""{source_dir}/{current_issue.strftime("%Y%m%d")}.csv.gz"""
163+
if not path.isfile(current_issue_source_csv):
164+
logger.info("No source data at this path", current_issue_source_csv=current_issue_source_csv)
165+
current_issue += timedelta(days=1)
166+
continue
167+
168+
params["patch"]["current_issue"] = current_issue.strftime("%Y%m%d")
169+
170+
# current_issue_date can be different from params["patch"]["current_issue"]
171+
# due to weekly cadence of nssp data. For weekly sources, issue dates in our
172+
# db matches with first date of epiweek that the reporting date falls in,
173+
# rather than reporting date itself.
174+
current_issue_date = Week.fromdate(current_issue).startdate()
175+
current_issue_dir = f"""{params["patch"]["patch_dir"]}/issue_{current_issue_date.strftime("%Y%m%d")}/nssp"""
176+
makedirs(f"{current_issue_dir}", exist_ok=True)
177+
params["common"]["export_dir"] = f"""{current_issue_dir}"""
178+
179+
run_module(params, logger)
180+
181+
if download_source:
182+
rmtree(source_dir)
183+
184+
185+
if __name__ == "__main__":
186+
patch()

nssp/delphi_nssp/pull.py

Lines changed: 98 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,84 @@
11
# -*- coding: utf-8 -*-
22
"""Functions for pulling NSSP ER data."""
3+
4+
import functools
35
import logging
6+
import sys
47
import textwrap
8+
from os import makedirs, path
59
from typing import Optional
610

711
import pandas as pd
12+
import paramiko
813
from delphi_utils import create_backup_csv
914
from sodapy import Socrata
1015

11-
from .constants import (
12-
NEWLINE,
13-
SIGNALS,
14-
SIGNALS_MAP,
15-
TYPE_DICT,
16-
)
16+
from .constants import NEWLINE, SIGNALS, SIGNALS_MAP, TYPE_DICT
17+
18+
19+
def print_callback(remote_file_name, logger, bytes_so_far, bytes_total, progress_chunks):
20+
"""Print the callback information."""
21+
rough_percent_transferred = int(100 * (bytes_so_far / bytes_total))
22+
if rough_percent_transferred in progress_chunks:
23+
logger.info("Transfer in progress", remote_file_name=remote_file_name, percent=rough_percent_transferred)
24+
# Remove progress chunk, so it is not logged again
25+
progress_chunks.remove(rough_percent_transferred)
26+
27+
28+
def get_source_data(params, logger):
29+
"""
30+
Download historical source data from a backup server.
31+
32+
This function is typically used in patching only. Normal runs grab latest data from SODA API.
33+
34+
This function uses "user" configuration under "patch" section in params.json to specify
35+
a username with local key-based access to connect to server where backup nssp source data is stored.
36+
It uses "backup_dir" config under "common" section to locate backup files on remote server.
37+
It then searches for CSV files that match the inclusive range of issue dates
38+
specified by 'start_issue', and 'end_issue' config.
1739
40+
These CSV files are then downloaded and stored in the local 'source_dir' directory.
41+
"""
42+
makedirs(params["patch"]["source_dir"], exist_ok=True)
43+
ssh = paramiko.SSHClient()
44+
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
45+
host = params["patch"]["source_host"]
46+
user = params["patch"]["user"]
47+
ssh.connect(host, username=user)
48+
49+
# Generate file names of source files to download
50+
dates = pd.date_range(start=params["patch"]["start_issue"], end=params["patch"]["end_issue"])
51+
remote_source_files = [f"{date.strftime('%Y%m%d')}.csv.gz" for date in dates]
52+
53+
# Download source files
54+
sftp = ssh.open_sftp()
55+
try:
56+
sftp.stat(params["common"]["backup_dir"])
57+
except IOError:
58+
logger.error("Source backup directory does not exist on the remote server.")
59+
60+
sftp.chdir(params["common"]["backup_dir"])
61+
62+
num_files_transferred = 0
63+
for remote_file_name in remote_source_files:
64+
callback_for_filename = functools.partial(print_callback, remote_file_name, logger, progress_chunks=[0, 50])
65+
local_file_path = path.join(params["patch"]["source_dir"], remote_file_name)
66+
try:
67+
sftp.stat(remote_file_name)
68+
except IOError:
69+
logger.warning(
70+
"Source backup for this date does not exist on the remote server.",
71+
missing_filename=remote_file_name,
72+
)
73+
continue
74+
sftp.get(remote_file_name, local_file_path, callback=callback_for_filename)
75+
logger.info("Transfer finished", remote_file_name=remote_file_name, local_file_path=local_file_path)
76+
num_files_transferred += 1
77+
ssh.close()
78+
79+
if num_files_transferred == 0:
80+
logger.error("No source data was transferred. Check the source backup server for potential issues.")
81+
sys.exit(1)
1882

1983
def warn_string(df, type_dict):
2084
"""Format the warning string."""
@@ -44,6 +108,7 @@ def pull_with_socrata_api(socrata_token: str, dataset_id: str):
44108
dataset_id: str
45109
The dataset id to pull data from
46110
111+
47112
Returns
48113
-------
49114
list of dictionaries, each representing a row in the dataset
@@ -61,8 +126,14 @@ def pull_with_socrata_api(socrata_token: str, dataset_id: str):
61126
return results
62127

63128

64-
def pull_nssp_data(socrata_token: str, backup_dir: str, custom_run: bool, logger: Optional[logging.Logger] = None):
65-
"""Pull the latest NSSP ER visits primary dataset.
129+
def pull_nssp_data(
130+
socrata_token: str,
131+
backup_dir: str,
132+
custom_run: bool,
133+
issue_date: Optional[str] = None,
134+
logger: Optional[logging.Logger] = None,
135+
):
136+
"""Pull the NSSP ER visits primary dataset.
66137
67138
https://data.cdc.gov/Public-Health-Surveillance/NSSP-Emergency-Department-Visit-Trajectories-by-St/rdmq-nq56/data_preview
68139
@@ -76,9 +147,25 @@ def pull_nssp_data(socrata_token: str, backup_dir: str, custom_run: bool, logger
76147
pd.DataFrame
77148
Dataframe as described above.
78149
"""
79-
socrata_results = pull_with_socrata_api(socrata_token, "rdmq-nq56")
80-
df_ervisits = pd.DataFrame.from_records(socrata_results)
81-
create_backup_csv(df_ervisits, backup_dir, custom_run, logger=logger)
150+
if not custom_run:
151+
socrata_results = pull_with_socrata_api(socrata_token, "rdmq-nq56")
152+
df_ervisits = pd.DataFrame.from_records(socrata_results)
153+
create_backup_csv(df_ervisits, backup_dir, custom_run, logger=logger)
154+
logger.info("Number of records grabbed", num_records=len(df_ervisits), source="Socrata API")
155+
elif custom_run and logger.name == "delphi_nssp.patch":
156+
if issue_date is None:
157+
raise ValueError("Issue date is required for patching")
158+
source_filename = f"{backup_dir}/{issue_date}.csv.gz"
159+
if not path.isfile(source_filename):
160+
logger.warning("No primary source data found", source=source_filename, issue_date=issue_date)
161+
return None
162+
df_ervisits = pd.read_csv(source_filename)
163+
logger.info(
164+
"Number of records grabbed",
165+
num_records=len(df_ervisits),
166+
source=source_filename,
167+
)
168+
82169
df_ervisits = df_ervisits.rename(columns={"week_end": "timestamp"})
83170
df_ervisits = df_ervisits.rename(columns=SIGNALS_MAP)
84171

0 commit comments

Comments
 (0)