Skip to content

feat: unbundle delphi-logger and re-export it in delphi-utils #1970

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions _delphi_utils_python/delphi_utils/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,16 @@

from __future__ import absolute_import

from delphi_logger import get_structured_logger, pool_and_threadedlogger

from .archive import ArchiveDiffer, GitArchiveDiffer, S3ArchiveDiffer
from .export import create_export_csv
from .utils import read_params

from .slack_notifier import SlackNotifier
from .logger import get_structured_logger
from .geomap import GeoMapper
from .smooth import Smoother
from .signal import add_prefix
from .nancodes import Nans
from .signal import add_prefix
from .slack_notifier import SlackNotifier
from .smooth import Smoother
from .utils import read_params
from .weekday import Weekday

__version__ = "0.3.23"
18 changes: 9 additions & 9 deletions _delphi_utils_python/delphi_utils/archive.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,25 +26,25 @@
Created: 2020-08-06
"""

from contextlib import contextmanager
import filecmp
from glob import glob
from os import remove, replace
from os.path import join, basename, abspath
import shutil
import time
from typing import Tuple, List, Dict, Optional
from contextlib import contextmanager
from glob import glob
from os import remove, replace
from os.path import abspath, basename, join
from typing import Dict, List, Optional, Tuple

import numpy as np
import pandas as pd
from boto3 import Session
from boto3.exceptions import S3UploadFailedError
from delphi_logger import get_structured_logger
from git import Repo
from git.refs.head import Head
import pandas as pd
import numpy as np

from .utils import read_params
from .logger import get_structured_logger
from .nancodes import Nans
from .utils import read_params

Files = List[str]
FileDiffMap = Dict[str, Optional[str]]
Expand Down
257 changes: 6 additions & 251 deletions _delphi_utils_python/delphi_utils/logger.py
Original file line number Diff line number Diff line change
@@ -1,255 +1,10 @@
"""Structured logger utility for creating JSON logs.
"""Temporary migration compatibility file.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

note: we'll need a followup issue once we remove this file to make sure that other logger imports continue to work, e.g..

The Delphi group uses two ~identical versions of this file.
Try to keep them in sync with edits, for sanity.
https://github.com/cmu-delphi/covidcast-indicators/blob/main/_delphi_utils_python/delphi_utils/logger.py
https://github.com/cmu-delphi/delphi-epidata/blob/dev/src/common/logger.py
"""

import contextlib
import logging
import multiprocessing
import os
import sys
import threading
from traceback import format_exception

import structlog


def handle_exceptions(logger):
"""Handle exceptions using the provided logger."""

def exception_handler(scope, etype, value, traceback):
logger.exception("Top-level exception occurred",
scope=scope, exc_info=(etype, value, traceback))

def sys_exception_handler(etype, value, traceback):
exception_handler("sys", etype, value, traceback)

def threading_exception_handler(args):
if args.exc_type == SystemExit and args.exc_value.code == 0:
# `sys.exit(0)` is considered "successful termination":
# https://docs.python.org/3/library/sys.html#sys.exit
logger.debug("normal thread exit", thread=args.thread,
stack="".join(
format_exception(
args.exc_type, args.exc_value, args.exc_traceback)))
else:
exception_handler(f"thread: {args.thread}",
args.exc_type, args.exc_value, args.exc_traceback)

sys.excepthook = sys_exception_handler
threading.excepthook = threading_exception_handler


def get_structured_logger(name=__name__,
filename=None,
log_exceptions=True):
"""Create a new structlog logger.

Use the logger returned from this in indicator code using the standard
wrapper calls, e.g.:

logger = get_structured_logger(__name__)
logger.warning("Error", type="Signal too low").

The output will be rendered as JSON which can easily be consumed by logs
processors.

See the structlog documentation for details.

Parameters
---------
name: Name to use for logger (included in log lines), __name__ from caller
is a good choice.
filename: An (optional) file to write log output.
"""
# Set the underlying logging configuration
if "LOG_DEBUG" in os.environ:
log_level = logging.DEBUG
else:
log_level = logging.INFO

logging.basicConfig(
format="%(message)s",
level=log_level,
handlers=[logging.StreamHandler()])

def add_pid(_logger, _method_name, event_dict):
"""Add current PID to the event dict."""
event_dict["pid"] = os.getpid()
return event_dict

# Configure structlog. This uses many of the standard suggestions from
# the structlog documentation.
structlog.configure(
processors=[
# Filter out log levels we are not tracking.
structlog.stdlib.filter_by_level,
# Include logger name in output.
structlog.stdlib.add_logger_name,
# Include log level in output.
structlog.stdlib.add_log_level,
# Include PID in output.
add_pid,
# Allow formatting into arguments e.g., logger.info("Hello, %s",
# name)
structlog.stdlib.PositionalArgumentsFormatter(),
# Add timestamps.
structlog.processors.TimeStamper(fmt="iso"),
# Match support for exception logging in the standard logger.
structlog.processors.StackInfoRenderer(),
structlog.processors.format_exc_info,
# Decode unicode characters
structlog.processors.UnicodeDecoder(),
# Render as JSON
structlog.processors.JSONRenderer(),
],
# Use a dict class for keeping track of data.
context_class=dict,
# Use a standard logger for the actual log call.
logger_factory=structlog.stdlib.LoggerFactory(),
# Use a standard wrapper class for utilities like log.warning()
wrapper_class=structlog.stdlib.BoundLogger,
# Cache the logger
cache_logger_on_first_use=True,
)

# Create the underlying python logger and wrap it with structlog
system_logger = logging.getLogger(name)
if filename and not system_logger.handlers:
system_logger.addHandler(logging.FileHandler(filename))
system_logger.setLevel(log_level)
logger = structlog.wrap_logger(system_logger)

if log_exceptions:
handle_exceptions(logger)

return logger
Can be removed once this line

https://github.com/cmu-delphi/delphi-epidata/blob/69835d1d7795eaf9a710d9f4903fef22a07e8fdf/src/client/delphi_epidata.py#L19

class LoggerThread():
"""
A construct to use a logger from multiprocessing workers/jobs.

the bare structlog loggers are thread-safe but not multiprocessing-safe.
a `LoggerThread` will spawn a thread that listens to a mp.Queue
and logs messages from it with the provided logger,
so other processes can send logging messages to it
via the logger-like `SubLogger` interface.
the SubLogger even logs the pid of the caller.

this is good to use with a set of jobs that are part of a mp.Pool,
but isnt recommended for general use
because of overhead from threading and multiprocessing,
and because it might introduce lag to log messages.

somewhat inspired by:
docs.python.org/3/howto/logging-cookbook.html#logging-to-a-single-file-from-multiple-processes
"""

class SubLogger():
"""MP-safe logger-like interface to convey log messages to a listening LoggerThread."""

def __init__(self, queue):
"""Create SubLogger with a bound queue."""
self.queue = queue

def _log(self, level, *args, **kwargs):
kwargs_plus = {'sub_pid': multiprocessing.current_process().pid}
kwargs_plus.update(kwargs)
self.queue.put([level, args, kwargs_plus])

def debug(self, *args, **kwargs):
"""Log a DEBUG level message."""
self._log(logging.DEBUG, *args, **kwargs)

def info(self, *args, **kwargs):
"""Log an INFO level message."""
self._log(logging.INFO, *args, **kwargs)

def warning(self, *args, **kwargs):
"""Log a WARNING level message."""
self._log(logging.WARNING, *args, **kwargs)

def error(self, *args, **kwargs):
"""Log an ERROR level message."""
self._log(logging.ERROR, *args, **kwargs)

def critical(self, *args, **kwargs):
"""Log a CRITICAL level message."""
self._log(logging.CRITICAL, *args, **kwargs)


def get_sublogger(self):
"""Retrieve SubLogger for this LoggerThread."""
return self.sublogger

def __init__(self, logger, q=None):
"""Create and start LoggerThread with supplied logger, creating a queue if not provided."""
self.logger = logger
if q:
self.msg_queue = q
else:
self.msg_queue = multiprocessing.Queue()

def logger_thread_worker():
logger.info('thread started')
while True:
msg = self.msg_queue.get()
if msg == 'STOP':
logger.debug('received stop signal')
break
level, args, kwargs = msg
if level in [logging.DEBUG, logging.INFO, logging.WARNING,
logging.ERROR, logging.CRITICAL]:
logger.log(level, *args, **kwargs)
else:
logger.error('received unknown logging level! exiting...',
level=level, args_kwargs=(args, kwargs))
break
logger.debug('stopping thread')

self.thread = threading.Thread(target=logger_thread_worker,
name="LoggerThread__"+logger.name)
logger.debug('starting thread')
self.thread.start()

self.sublogger = LoggerThread.SubLogger(self.msg_queue)
self.running = True

def stop(self):
"""Terminate this LoggerThread."""
if not self.running:
self.logger.warning('thread already stopped')
return
self.logger.debug('sending stop signal')
self.msg_queue.put('STOP')
self.thread.join()
self.running = False
self.logger.info('thread stopped')


@contextlib.contextmanager
def pool_and_threadedlogger(logger, *poolargs):
"""
Provide (to a context) a multiprocessing Pool and a proxy to the supplied logger.
no longer imports from `delphi_utils.logger` directly.
"""

Emulates the multiprocessing.Pool() context manager,
but also provides (via a LoggerThread) a SubLogger proxy to logger
that can be safely used by pool workers.
The SubLogger proxy interface supports these methods: debug, info, warning, error,
and critical.
Also "cleans up" the pool by waiting for workers to complete
as it exits the context.
"""
with multiprocessing.Manager() as manager:
logger_thread = LoggerThread(logger, manager.Queue())
try:
with multiprocessing.Pool(*poolargs) as pool:
yield pool, logger_thread.get_sublogger()
pool.close()
pool.join()
finally:
logger_thread.stop()
from delphi_logger import get_structured_logger # pylint: disable=unused-import
13 changes: 7 additions & 6 deletions _delphi_utils_python/delphi_utils/runner.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,17 @@
"""Indicator running utilities."""
import argparse as ap
import importlib
import os
from typing import Any, Callable, Dict, Optional
import multiprocessing
import os
import time
from typing import Any, Callable, Dict, Optional

from delphi_logger import get_structured_logger

from .archive import ArchiveDiffer, archiver_from_params
from .logger import get_structured_logger
from .utils import read_params, transfer_files, delete_move_files
from .validator.validate import Validator
from .utils import delete_move_files, read_params, transfer_files
from .validator.run import validator_from_params

from .validator.validate import Validator

Params = Dict[str, Any]

Expand Down
5 changes: 4 additions & 1 deletion _delphi_utils_python/delphi_utils/validator/report.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
"""Validation output reports."""
import sys
from typing import List
from ..logger import get_structured_logger

from delphi_logger import get_structured_logger

from .errors import ValidationFailure


class ValidationReport:
"""Class for reporting the results of validation."""

Expand Down
6 changes: 3 additions & 3 deletions _delphi_utils_python/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
"covidcast",
"cvxpy",
"darker[isort]~=2.1.1",
"delphi_logger @ git+https://github.com/cmu-delphi/delphi-logger",
"epiweeks",
"freezegun",
"gitpython",
Expand All @@ -22,8 +23,7 @@
"pytest",
"requests-mock",
"slackclient",
"structlog",
"xlrd"
"xlrd",
]

setup(
Expand All @@ -42,5 +42,5 @@
"Programming Language :: Python :: 3.8",
],
packages=find_packages(),
package_data={'': ['data/20*/*.csv']}
package_data={"": ["data/20*/*.csv"]},
)
4 changes: 2 additions & 2 deletions _delphi_utils_python/tests/test_export.py
Original file line number Diff line number Diff line change
Expand Up @@ -321,8 +321,8 @@ def test_export_df_with_missingness(self, tmp_path):
).astype({"geo_id": str, "sample_size": int})
assert_frame_equal(df, expected_df)

@mock.patch("delphi_utils.logger")
def test_export_df_with_contradictory_missingness(self, mock_logger, tmp_path):
def test_export_df_with_contradictory_missingness(self, tmp_path):
mock_logger = mock.MagicMock()

create_export_csv(
df=self.DF3.copy(),
Expand Down
Loading
Loading