Skip to content

Standardize logging #1256

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 12 commits into from
Sep 22, 2021
20 changes: 10 additions & 10 deletions combo_cases_and_deaths/delphi_combo_cases_and_deaths/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,6 @@
covidcast.covidcast._ASYNC_CALL = True # pylint: disable=protected-access


def check_none_data_frame(data_frame, label, date_range):
"""Log and return True when a data frame is None."""
if data_frame is None:
print(f"{label} completely unavailable in range {date_range}")
return True
return False


def maybe_append(usa_facts, jhu):
"""
Append dataframes if available, otherwise return USAFacts.
Expand Down Expand Up @@ -133,7 +125,7 @@ def get_updated_dates(signal, geo, date_range, issue_range=None, fetcher=covidca
issues=issue_range
)

if check_none_data_frame(usafacts_df, "USA-FACTS", date_range):
if usafacts_df is None:
return None

merged_df = merge_dfs_by_geos(usafacts_df, jhu_df, geo)
Expand All @@ -142,7 +134,8 @@ def get_updated_dates(signal, geo, date_range, issue_range=None, fetcher=covidca
return unique_dates


def combine_usafacts_and_jhu(signal, geo, date_range, issue_range=None, fetcher=covidcast.signal):
def combine_usafacts_and_jhu(signal, geo, date_range, logger,
issue_range=None, fetcher=covidcast.signal):
"""Add rows for PR from JHU signals to USA-FACTS signals.

For hhs and nation, fetch the county `num` data so we can compute the proportions correctly
Expand All @@ -158,6 +151,7 @@ def combine_usafacts_and_jhu(signal, geo, date_range, issue_range=None, fetcher=

# This occurs if the usafacts ~and the jhu query were empty
if unique_dates is None:
logger.info("USA-FACTS completely unavailable for dates", date_range=date_range)
return EMPTY_FRAME

# Query only the represented window so that every geo is represented; a single window call is
Expand Down Expand Up @@ -329,9 +323,15 @@ def run_module(params):
log_exceptions=params["common"].get("log_exceptions", True))

for metric, geo_res, sensor_name, signal in variants:
logger.info("Generating signal and exporting to CSV",
geo_res = geo_res,
metric = metric,
sensor = sensor_name,
signal = signal)
df = combine_usafacts_and_jhu(signal,
geo_res,
extend_raw_date_range(params, sensor_name),
logger,
params['indicator']['issue_range'])
df["timestamp"] = pd.to_datetime(df["timestamp"])
start_date = pd.to_datetime(params['indicator']['export_start_date'])
Expand Down
14 changes: 8 additions & 6 deletions combo_cases_and_deaths/tests/test_run.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
"""Tests for running combo cases and deaths indicator."""
import logging
from datetime import date
from itertools import product
import os
Expand All @@ -17,6 +18,7 @@
COLUMN_MAPPING)
from delphi_combo_cases_and_deaths.constants import METRICS, SMOOTH_TYPES, SENSORS

TEST_LOGGER = logging.getLogger()

def test_issue_dates():
"""The smoothed value for a particular date is computed from the raw
Expand Down Expand Up @@ -98,7 +100,7 @@ def make_mock(geo):
("1 1", 4, 1 if geo in ["nation", "hhs"] else 2),
("0 0", 2, 0)
]:
df = combine_usafacts_and_jhu("", geo, date_range, fetcher=mock_covidcast_signal)
df = combine_usafacts_and_jhu("", geo, date_range, TEST_LOGGER, fetcher=mock_covidcast_signal)
assert df.size == expected_size * len(COLUMN_MAPPING), f"""
Wrong number of rows in combined data frame for the number of available signals.

Expand Down Expand Up @@ -126,7 +128,7 @@ def test_multiple_issues(mock_covidcast_signal):
}),
None
] * 2
result = combine_usafacts_and_jhu("confirmed_incidence_num", "county", date_range=(0, 1), fetcher=mock_covidcast_signal)
result = combine_usafacts_and_jhu("confirmed_incidence_num", "county", date_range=(0, 1), logger=TEST_LOGGER, fetcher=mock_covidcast_signal)
pd.testing.assert_frame_equal(
result,
pd.DataFrame(
Expand Down Expand Up @@ -186,23 +188,23 @@ def test_combine_usafacts_and_jhu_special_geos(mock_covidcast_signal):
] * 6 # each call to combine_usafacts_and_jhu makes (2 + 2 * len(unique_timestamps)) = 12 calls to the fetcher

pd.testing.assert_frame_equal(
combine_usafacts_and_jhu("confirmed_incidence_num", "nation", date_range=(0, 1), fetcher=mock_covidcast_signal),
combine_usafacts_and_jhu("confirmed_incidence_num", "nation", date_range=(0, 1), logger=TEST_LOGGER, fetcher=mock_covidcast_signal),
pd.DataFrame({"timestamp": [20200101],
"geo_id": ["us"],
"val": [50 + 100 + 200],
"se": [None],
"sample_size": [None]})
)
pd.testing.assert_frame_equal(
combine_usafacts_and_jhu("confirmed_incidence_prop", "nation", date_range=(0, 1), fetcher=mock_covidcast_signal),
combine_usafacts_and_jhu("confirmed_incidence_prop", "nation", date_range=(0, 1), logger=TEST_LOGGER, fetcher=mock_covidcast_signal),
pd.DataFrame({"timestamp": [20200101],
"geo_id": ["us"],
"val": [(50 + 100 + 200) / (4903185 + 3723066) * 100000],
"se": [None],
"sample_size": [None]})
)
pd.testing.assert_frame_equal(
combine_usafacts_and_jhu("confirmed_incidence_num", "county", date_range=(0, 1), fetcher=mock_covidcast_signal),
combine_usafacts_and_jhu("confirmed_incidence_num", "county", date_range=(0, 1), logger=TEST_LOGGER, fetcher=mock_covidcast_signal),
pd.DataFrame({"geo_id": ["01000", "01001", "72001"],
"val": [50, 100, 200],
"timestamp": [20200101, 20200101, 20200101]},
Expand All @@ -229,7 +231,7 @@ def test_no_nation_jhu(mock_covidcast_signal):
"value": [1],
"timestamp": [20200101]})
]
result = combine_usafacts_and_jhu("_num", "nation", date_range=(0, 1), fetcher=mock_covidcast_signal)
result = combine_usafacts_and_jhu("_num", "nation", date_range=(0, 1), logger=TEST_LOGGER, fetcher=mock_covidcast_signal)

assert mock_covidcast_signal.call_args_list[-1] == call(
"jhu-csse",
Expand Down
7 changes: 4 additions & 3 deletions covid_act_now/delphi_covid_act_now/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ def run_module(params):
parquet_url = params["indicator"]["parquet_url"]

# Load CAN county-level testing data
print("Pulling CAN data")
logger.info("Pulling CAN data")
df_pq = load_data(parquet_url)
df_county_testing = extract_testing_metrics(df_pq)

Expand All @@ -54,7 +54,8 @@ def run_module(params):
max_dates_exported = []
# Perform geo aggregations and export to receiving
for geo_res in GEO_RESOLUTIONS:
print(f"Processing {geo_res}")
logger.info("Generating signal and exporting to CSV",
geo_res = geo_res)
df = geo_map(df_county_testing, geo_res)

# Export 'pcr_specimen_positivity_rate'
Expand All @@ -79,7 +80,7 @@ def run_module(params):
max_dates_exported.append(latest)
# x2 to count both positivity and tests signals
num_exported_files += exported_csv_dates.size * 2
print(f"Exported dates: {earliest} to {latest}")
logger.info("Exported for dates between", earliest=earliest, latest=latest)

elapsed_time_in_seconds = round(time.time() - start_time, 2)
max_lag_in_days = (datetime.now() - min(max_dates_exported)).days
Expand Down
3 changes: 3 additions & 0 deletions hhs_facilities/delphi_hhs_facilities/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@ def run_module(params) -> None:
filled_fips_df = fill_missing_fips(raw_df, gmpr)
stats = []
for geo, (sig_name, sig_cols, sig_func, sig_offset) in product(GEO_RESOLUTIONS, SIGNALS):
logger.info("Generating signal and exporting to CSV",
geo_res = geo,
signal_name = sig_name)
mapped_df = convert_geo(filled_fips_df, geo, gmpr)
output_df = generate_signal(mapped_df, sig_cols, sig_func, sig_offset)
dates = create_export_csv(output_df, params["common"]["export_dir"], geo, sig_name)
Expand Down
4 changes: 4 additions & 0 deletions hhs_hosp/delphi_hhs/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,10 @@ def run_module(params):
geo_mapper = GeoMapper()
stats = []
for sensor, smoother, geo in product(SIGNALS, SMOOTHERS, GEOS):
logger.info("Generating signal and exporting to CSV",
geo_res = geo,
sensor = sensor,
smoother = smoother)
df = geo_mapper.add_geocode(make_signal(all_columns, sensor),
"state_id",
"state_code",
Expand Down
8 changes: 5 additions & 3 deletions nchs_mortality/delphi_nchs_mortality/archive_diffs.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

from delphi_utils import S3ArchiveDiffer

def arch_diffs(params, daily_arch_diff):
def arch_diffs(params, daily_arch_diff, logger):
"""
Archive differences between new updates and existing data.

Expand All @@ -23,6 +23,8 @@ def arch_diffs(params, daily_arch_diff):
Read from params.json
daily_arch_diff: S3ArchiveDiffer
Used to store and update cache
logger: logging.Logger
The structured logger.
"""
weekly_export_dir = params["common"]["weekly_export_dir"]
daily_export_dir = params["common"]["daily_export_dir"]
Expand Down Expand Up @@ -59,7 +61,7 @@ def arch_diffs(params, daily_arch_diff):

# Report failures: someone should probably look at them
for exported_file in fails:
print(f"Failed to archive (weekly) '{exported_file}'")
logger.info("Failed to archive (weekly)", filename={exported_file})

# Daily run of archiving utility
# - Uploads changed files to S3
Expand All @@ -83,4 +85,4 @@ def arch_diffs(params, daily_arch_diff):

# Report failures: someone should probably look at them
for exported_file in fails:
print(f"Failed to archive (daily) '{exported_file}'")
logger.info("Failed to archive (daily)", filename={exported_file})
9 changes: 6 additions & 3 deletions nchs_mortality/delphi_nchs_mortality/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,8 @@ def run_module(params: Dict[str, Any]):
df_pull = pull_nchs_mortality_data(token, test_file)
for metric in METRICS:
if metric == 'percent_of_expected_deaths':
print(metric)
logger.info("Generating signal and exporting to CSV",
metric = metric)
df = df_pull.copy()
df["val"] = df[metric]
df["se"] = np.nan
Expand All @@ -80,7 +81,9 @@ def run_module(params: Dict[str, Any]):
stats.append((max(dates), len(dates)))
else:
for sensor in SENSORS:
print(metric, sensor)
logger.info("Generating signal and exporting to CSV",
metric = metric,
sensor = sensor)
df = df_pull.copy()
if sensor == "num":
df["val"] = df[metric]
Expand All @@ -107,7 +110,7 @@ def run_module(params: Dict[str, Any]):
# - Uploads changed files to S3
# - Does not export any issues into receiving
if "archive" in params:
arch_diffs(params, daily_arch_diff)
arch_diffs(params, daily_arch_diff, logger)

elapsed_time_in_seconds = round(time.time() - start_time, 2)
min_max_date = stats and min(s[0] for s in stats)
Expand Down
12 changes: 4 additions & 8 deletions quidel/delphi_quidel/data_tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,6 @@ def _geographical_pooling(tpooled_tests, tpooled_ptests, min_obs, max_borrow_obs
Same length as tests; proportion of parent observations to borrow.
"""
if (np.any(np.isnan(tpooled_tests)) or np.any(np.isnan(tpooled_ptests))):
print(tpooled_tests)
print(tpooled_ptests)
raise ValueError('[parent] tests should be non-negative '
'with no np.nan')
if max_borrow_obs > min_obs:
Expand Down Expand Up @@ -153,7 +151,6 @@ def raw_positive_prop(positives, tests, min_obs):
positives = positives.astype(float)
tests = tests.astype(float)
if np.any(np.isnan(positives)) or np.any(np.isnan(tests)):
print(positives, tests)
raise ValueError('positives and tests should be non-negative '
'with no np.nan')
if np.any(positives > tests):
Expand Down Expand Up @@ -290,11 +287,10 @@ def raw_tests_per_device(devices, tests, min_obs):
"""
devices = devices.astype(float)
tests = tests.astype(float)
if (np.any(np.isnan(devices)) or np.any(np.isnan(tests))):
print(devices)
print(tests)
raise ValueError('devices and tests should be non-negative '
'with no np.nan')
if np.any(np.isnan(devices)) or np.any(devices < 0):
raise ValueError("devices should be non-negative with no np.nan")
if np.any(np.isnan(tests)) or np.any(tests < 0):
raise ValueError("tests should be non-negative with no np.nan")
if min_obs <= 0:
raise ValueError('min_obs should be positive')
tests[tests < min_obs] = np.nan
Expand Down
34 changes: 17 additions & 17 deletions quidel/delphi_quidel/pull.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ def regulate_column_names(df, test_type):
return df

def get_from_email(column_names, start_dates, end_dates, mail_server,
account, sender, password):
account, sender, password, logger):
"""
Get raw data from email account.

Expand All @@ -98,6 +98,8 @@ def get_from_email(column_names, start_dates, end_dates, mail_server,
email account of the sender
password: str
password of the datadrop email
logger: logging.Logger
The structured logger.

Returns:
df: pd.DataFrame
Expand Down Expand Up @@ -131,7 +133,7 @@ def get_from_email(column_names, start_dates, end_dates, mail_server,
if not whether_in_range:
continue

print(f"Pulling {test} data received on %s"%search_date.date())
logger.info("Pulling data", test=test, date=search_date.date())
toread = io.BytesIO()
toread.write(att.payload)
toread.seek(0) # reset the pointer
Expand All @@ -153,10 +155,9 @@ def fix_zipcode(df):
zipcode = int(float(zipcode))
zipcode5.append(zipcode)
df['zip'] = zipcode5
# print('Fixing %.2f %% of the data' % (fixnum * 100 / len(zipcode5)))
return df

def fix_date(df):
def fix_date(df, logger):
"""
Remove invalid dates and select correct test date to use.

Expand All @@ -175,16 +176,16 @@ def fix_date(df):
df.insert(2, "timestamp", df["TestDate"])

mask = df["TestDate"] <= df["StorageDate"]
print("Removing %.2f%% of unusual data" % ((len(df) - np.sum(mask)) * 100 / len(df)))
logger.info(f"Removing {((len(df) - np.sum(mask)) * 100 / len(df)):.2f} of unusual data")
df = df[mask]

mask = df["StorageDate"] - df["TestDate"] > pd.Timedelta(days=90)
print("Fixing %.2f%% of outdated data" % (np.sum(mask) * 100 / len(df)))
logger.info(f"Fixing {(np.sum(mask) * 100 / len(df)):.2f} of outdated data")
df["timestamp"].values[mask] = df["StorageDate"].values[mask]
return df

def preprocess_new_data(start_dates, end_dates, mail_server, account,
sender, password, test_mode):
sender, password, test_mode, logger):
"""
Pull and pre-process Quidel Antigen Test data from datadrop email.

Expand All @@ -206,6 +207,8 @@ def preprocess_new_data(start_dates, end_dates, mail_server, account,
password of the datadrop email
test_mode: bool
pull raw data from email or not
logger: logging.Logger
The structured logger.
Returns:
df: pd.DataFrame
time_flag: datetime.date:
Expand All @@ -220,21 +223,20 @@ def preprocess_new_data(start_dates, end_dates, mail_server, account,
else:
# Get new data from email
dfs, time_flag = get_from_email(COLUMN_NAMES, start_dates, end_dates,
mail_server, account, sender, password)
mail_server, account, sender, password, logger)

# No new data can be pulled
if time_flag is None:
return dfs, time_flag

df_finals = {}
for test_type in TEST_TYPES:
print(f"For {test_type}:")
logger.info(f"For {test_type}:")
df = dfs[test_type]
# Fix some of the fipcodes that are 9 digit instead of 5 digit
df = fix_zipcode(df)
# Create a column CanonicalDate according to StarageDate and TestDate
df = fix_date(df)

df = fix_date(df, logger)
# Compute numUniqueDevices
numUniqueDevices = df.groupby(
by=["timestamp", "zip"],
Expand Down Expand Up @@ -309,17 +311,15 @@ def check_intermediate_file(cache_dir, pull_start_dates):
sep=",", parse_dates=["timestamp"])
return previous_dfs, pull_start_dates

def pull_quidel_data(params):
def pull_quidel_data(params, logger):
"""
Pull new quidel test data and decide whether to combine it with historical records in ./cache.

Parameters:
params: dict
including all the information read from params.json
END_FROM_TODAY_MINUS: int
report data until - X days
EXPORT_DAY_RANGE: int
number of dates to report
logger: logging.Logger
The structured logger.

Returns:
DataFrame:
Expand Down Expand Up @@ -355,7 +355,7 @@ def pull_quidel_data(params):
# Use _end_date to check the most recent date that we received data
dfs, _end_date = preprocess_new_data(
pull_start_dates, pull_end_dates, mail_server,
account, sender, password, test_mode)
account, sender, password, test_mode, logger)

# Utilize previously stored data
for test_type in TEST_TYPES:
Expand Down
Loading