diff --git a/.bumpversion.cfg b/.bumpversion.cfg index 5ccea7b4c..d105db950 100644 --- a/.bumpversion.cfg +++ b/.bumpversion.cfg @@ -1,5 +1,5 @@ [bumpversion] -current_version = 0.4.2 +current_version = 0.4.3 commit = False tag = False diff --git a/dev/local/Makefile b/dev/local/Makefile index 48c63a958..cf3834aec 100644 --- a/dev/local/Makefile +++ b/dev/local/Makefile @@ -83,7 +83,7 @@ web: @# Run the web server @docker run --rm -p 127.0.0.1:10080:80 \ --env "SQLALCHEMY_DATABASE_URI=mysql+mysqldb://user:pass@delphi_database_epidata:3306/epidata" \ - --env "FLASK_SECRET=abc" --env "FLASK_PREFIX=/epidata" \ + --env "FLASK_SECRET=abc" --env "FLASK_PREFIX=/epidata" --env "LOG_DEBUG" \ --network delphi-net --name delphi_web_epidata \ delphi_web_epidata >$(LOG_WEB) 2>&1 & @@ -139,6 +139,20 @@ test: --env "FLASK_SECRET=abc" \ delphi_web_python python -m pytest --import-mode importlib $(pdb) $(test) | tee test_output_$(NOW).log +.PHONY=bash +bash: + @docker run -it --rm --network delphi-net \ + --mount type=bind,source=$(CWD)repos/delphi/delphi-epidata,target=/usr/src/app/repos/delphi/delphi-epidata,readonly \ + --mount type=bind,source=$(CWD)repos/delphi/delphi-epidata/src,target=/usr/src/app/delphi/epidata,readonly \ + --env "SQLALCHEMY_DATABASE_URI=mysql+mysqldb://user:pass@delphi_database_epidata:3306/epidata" \ + --env "FLASK_SECRET=abc" \ + delphi_web_python bash + +.PHONY=sql +sql: + @docker run --rm -it --network delphi-net --cap-add=sys_nice \ + percona mysql --user=user --password=pass --port 3306 --host delphi_database_epidata epidata + .PHONY=clean clean: @docker images -f "dangling=true" -q | xargs docker rmi >/dev/null 2>&1 diff --git a/requirements.txt b/requirements.txt index 8e240e526..21f87fa1d 100644 --- a/requirements.txt +++ b/requirements.txt @@ -12,3 +12,4 @@ tenacity==7.0.0 newrelic epiweeks==2.1.2 typing-extensions +structlog==22.1.0 diff --git a/src/acquisition/covidcast/logger.py b/src/acquisition/covidcast/logger.py index 62f2ff460..1db86ec57 100644 --- a/src/acquisition/covidcast/logger.py +++ b/src/acquisition/covidcast/logger.py @@ -1,10 +1,10 @@ """Structured logger utility for creating JSON logs in Delphi pipelines.""" import logging +import os import sys import threading import structlog - def handle_exceptions(logger): """Handle exceptions using the provided logger.""" def exception_handler(etype, value, traceback): @@ -45,12 +45,24 @@ def get_structured_logger(name=__name__, if filename: handlers.append(logging.FileHandler(filename)) + if "LOG_DEBUG" in os.environ: + log_level = logging.DEBUG + else: + log_level = logging.INFO + logging.basicConfig( format="%(message)s", - level=logging.INFO, + level=log_level, handlers=handlers ) + def add_pid(logger, method_name, event_dict): + """ + Add current PID to the event dict. + """ + event_dict["pid"] = os.getpid() + return event_dict + # Configure structlog. This uses many of the standard suggestions from # the structlog documentation. structlog.configure( @@ -61,6 +73,8 @@ def get_structured_logger(name=__name__, structlog.stdlib.add_logger_name, # Include log level in output. structlog.stdlib.add_log_level, + # Include PID in output. + add_pid, # Allow formatting into arguments e.g., logger.info("Hello, %s", # name) structlog.stdlib.PositionalArgumentsFormatter(), diff --git a/src/client/delphi_epidata.R b/src/client/delphi_epidata.R index 95475085f..4ed595d59 100644 --- a/src/client/delphi_epidata.R +++ b/src/client/delphi_epidata.R @@ -15,7 +15,7 @@ Epidata <- (function() { # API base url BASE_URL <- 'https://delphi.cmu.edu/epidata/api.php' - client_version <- '0.4.2' + client_version <- '0.4.3' # Helper function to cast values and/or ranges to strings .listitem <- function(value) { diff --git a/src/client/delphi_epidata.js b/src/client/delphi_epidata.js index 34028f320..0a3ba1053 100644 --- a/src/client/delphi_epidata.js +++ b/src/client/delphi_epidata.js @@ -22,7 +22,7 @@ } })(this, function (exports, fetchImpl, jQuery) { const BASE_URL = "https://delphi.cmu.edu/epidata/"; - const client_version = "0.4.2"; + const client_version = "0.4.3"; // Helper function to cast values and/or ranges to strings function _listitem(value) { diff --git a/src/client/packaging/npm/package.json b/src/client/packaging/npm/package.json index 8d3e27dc6..c674ef0a2 100644 --- a/src/client/packaging/npm/package.json +++ b/src/client/packaging/npm/package.json @@ -2,7 +2,7 @@ "name": "delphi_epidata", "description": "Delphi Epidata API Client", "authors": "Delphi Group", - "version": "0.4.2", + "version": "0.4.3", "license": "MIT", "homepage": "https://github.com/cmu-delphi/delphi-epidata", "bugs": { diff --git a/src/client/packaging/pypi/delphi_epidata/__init__.py b/src/client/packaging/pypi/delphi_epidata/__init__.py index 75369662e..2a32d0f10 100644 --- a/src/client/packaging/pypi/delphi_epidata/__init__.py +++ b/src/client/packaging/pypi/delphi_epidata/__init__.py @@ -1,4 +1,4 @@ from .delphi_epidata import Epidata name = 'delphi_epidata' -__version__ = '0.4.2' +__version__ = '0.4.3' diff --git a/src/client/packaging/pypi/setup.py b/src/client/packaging/pypi/setup.py index ce25c8b5f..f902e38b0 100644 --- a/src/client/packaging/pypi/setup.py +++ b/src/client/packaging/pypi/setup.py @@ -5,7 +5,7 @@ setuptools.setup( name="delphi_epidata", - version="0.4.2", + version="0.4.3", author="David Farrow", author_email="dfarrow0@gmail.com", description="A programmatic interface to Delphi's Epidata API.", diff --git a/src/server/_common.py b/src/server/_common.py index f4fbbd916..45813e451 100644 --- a/src/server/_common.py +++ b/src/server/_common.py @@ -1,12 +1,15 @@ from typing import cast +import time from flask import Flask, g, request +from sqlalchemy import event from sqlalchemy.engine import Connection from werkzeug.local import LocalProxy +from .utils.logger import get_structured_logger from ._config import SECRET from ._db import engine -from ._exceptions import DatabaseErrorException +from ._exceptions import DatabaseErrorException, EpiDataException app = Flask("EpiData", static_url_path="") app.config["SECRET"] = SECRET @@ -24,19 +27,53 @@ def _get_db() -> Connection: """ db: Connection = cast(Connection, LocalProxy(_get_db)) +@event.listens_for(engine, "before_cursor_execute") +def before_cursor_execute(conn, cursor, statement, parameters, context, executemany): + context._query_start_time = time.time() + + +@event.listens_for(engine, "after_cursor_execute") +def after_cursor_execute(conn, cursor, statement, parameters, context, executemany): + # this timing info may be suspect, at least in terms of dbms cpu time... + # it is likely that it includes that time as well as any overhead that + # comes from throttling or flow control on the streamed data, as well as + # any row transform/processing time + total_time = time.time() - context._query_start_time + + # Convert to milliseconds + total_time *= 1000 + get_structured_logger('server_api').info("Executed SQL", statement=statement, params=parameters, elapsed_time_ms=total_time) + @app.before_request -def connect_db(): +def before_request_execute(): + # Set timer for statement + g._request_start_time = time.time() + + # Log statement + get_structured_logger('server_api').info("Received API request", method=request.method, url=request.url, form_args=request.form, req_length=request.content_length, remote_addr=request.remote_addr, user_agent=request.user_agent.string) + if request.path.startswith('/lib'): return # try to get the db try: _get_db() - except: - app.logger.error('database connection error', exc_info=True) + except Exception as e: + get_structured_logger('server_error').error('database connection error', exception=e) raise DatabaseErrorException() +@app.after_request +def after_request_execute(response): + total_time = time.time() - g._request_start_time + # Convert to milliseconds + total_time *= 1000 + get_structured_logger('server_api').info('Served API request', method=request.method, url=request.url, form_args=request.form, req_length=request.content_length, remote_addr=request.remote_addr, user_agent=request.user_agent.string, + values=request.values.to_dict(flat=False), blueprint=request.blueprint, endpoint=request.endpoint, + response_status=response.status, content_length=response.calculate_content_length(), elapsed_time_ms=total_time) + return response + + @app.teardown_appcontext def teardown_db(exception=None): # close the db connection @@ -46,6 +83,16 @@ def teardown_db(exception=None): db.close() +@app.errorhandler(EpiDataException) +def handle_exception(e): + # Log error and pass through; EpiDataExceptions are HTTPExceptions which are valid WSGI responses (see https://werkzeug.palletsprojects.com/en/2.2.x/exceptions/ ) + if isinstance(e, DatabaseErrorException): + get_structured_logger('server_error').error('Received DatabaseErrorException', exception=str(e), exc_info=True) + else: + get_structured_logger('server_error').warn('Encountered user-side error', exception=str(e)) + return e + + def is_compatibility_mode() -> bool: """ checks whether this request is in compatibility mode diff --git a/src/server/_config.py b/src/server/_config.py index 560fdd796..f19bec766 100644 --- a/src/server/_config.py +++ b/src/server/_config.py @@ -5,7 +5,7 @@ load_dotenv() -VERSION = "0.4.2" +VERSION = "0.4.3" MAX_RESULTS = int(10e6) MAX_COMPATIBILITY_RESULTS = int(3650) diff --git a/src/server/_printer.py b/src/server/_printer.py index 715214e19..04196c71d 100644 --- a/src/server/_printer.py +++ b/src/server/_printer.py @@ -7,7 +7,8 @@ import orjson from ._config import MAX_RESULTS, MAX_COMPATIBILITY_RESULTS -from ._common import app, is_compatibility_mode +from ._common import is_compatibility_mode +from .utils.logger import get_structured_logger def print_non_standard(data): @@ -58,7 +59,7 @@ def gen(): if r is not None: yield r except Exception as e: - app.logger.exception(f"error executing: {str(e)}") + get_structured_logger('server_error').error("Exception while executing printer", exception=e) self.result = -1 yield self._error(e) diff --git a/src/server/_query.py b/src/server/_query.py index 0b34310ab..47d91a2df 100644 --- a/src/server/_query.py +++ b/src/server/_query.py @@ -241,7 +241,6 @@ def run_query(p: APrinter, query_tuple: Tuple[str, Dict[str, Any]]): query, params = query_tuple # limit rows + 1 for detecting whether we would have more full_query = text(limit_query(query, p.remaining_rows + 1)) - app.logger.info("full_query: %s, params: %s", full_query, params) return db.execution_options(stream_results=True).execute(full_query, **params) diff --git a/src/server/endpoints/covidcast_meta.py b/src/server/endpoints/covidcast_meta.py index f4d634bba..87476d271 100644 --- a/src/server/endpoints/covidcast_meta.py +++ b/src/server/endpoints/covidcast_meta.py @@ -5,10 +5,11 @@ from flask.json import loads from sqlalchemy import text -from .._common import db, app +from .._common import db from .._printer import create_printer from .._query import filter_fields from .._validate import extract_strings +from ..utils.logger import get_structured_logger bp = Blueprint("covidcast_meta", __name__) @@ -41,11 +42,12 @@ def fetch_data( ).fetchone() if not row or not row["epidata"]: + get_structured_logger('server_api').warning("no data in covidcast_meta cache") return age = row["age"] if age > max_age and row["epidata"]: - app.logger.warning("covidcast_meta cache is stale: %d", age) + get_structured_logger('server_api').warning("covidcast_meta cache is stale", cache_age=age) pass epidata = loads(row["epidata"]) diff --git a/src/server/main.py b/src/server/main.py index 5434b744b..7471a2491 100644 --- a/src/server/main.py +++ b/src/server/main.py @@ -57,5 +57,4 @@ def send_lib_file(path: str): app.logger.handlers = gunicorn_logger.handlers app.logger.setLevel(gunicorn_logger.level) sqlalchemy_logger = logging.getLogger("sqlalchemy") - sqlalchemy_logger.handlers = gunicorn_logger.handlers - sqlalchemy_logger.setLevel(gunicorn_logger.level) + sqlalchemy_logger.setLevel(logging.WARN) diff --git a/src/server/utils/dates.py b/src/server/utils/dates.py index ffd296c5a..eb293c744 100644 --- a/src/server/utils/dates.py +++ b/src/server/utils/dates.py @@ -5,10 +5,10 @@ Tuple, Union ) +from .logger import get_structured_logger from datetime import date, timedelta from epiweeks import Week, Year from typing_extensions import TypeAlias -import logging # Alias for a sequence of date ranges (int, int) or date integers TimeValues: TypeAlias = Sequence[Union[Tuple[int, int], int]] @@ -86,16 +86,22 @@ def time_values_to_ranges(values: Optional[TimeValues]) -> Optional[TimeValues]: e.g. [20200101, 20200102, (20200101, 20200104), 20200106] -> [(20200101, 20200104), 20200106] (the first two values of the original list are merged into a single range) """ + logger = get_structured_logger('server_utils') if not values or len(values) <= 1: + logger.info("List of dates looks like 0-1 elements, nothing to optimize", time_values=values) return values # determine whether the list is of days (YYYYMMDD) or weeks (YYYYWW) based on first element first_element = values[0][0] if isinstance(values[0], tuple) else values[0] if guess_time_value_is_day(first_element): + # TODO: reduce this and other date logging to DEBUG after prod metrics gathered + logger.info("Treating time value as day", time_value=first_element) return days_to_ranges(values) elif guess_time_value_is_week(first_element): + logger.info("Treating time value as week", time_value=first_element) return weeks_to_ranges(values) else: + logger.info("Time value unclear, not optimizing", time_value=first_element) return values def days_to_ranges(values: TimeValues) -> TimeValues: @@ -138,7 +144,9 @@ def _to_ranges(values: TimeValues, value_to_date: Callable, date_to_value: Calla else: ranges.append((date_to_value(m[0]), date_to_value(m[1]))) + get_structured_logger('server_utils').info("Optimized list of date values", original=values, optimized=ranges, original_length=len(values), optimized_length=len(ranges)) + return ranges except Exception as e: - logging.info('bad input to date ranges', input=values, exception=e) + get_structured_logger('server_utils').error('bad input to date ranges', time_values=values, exception=e) return values diff --git a/src/server/utils/logger.py b/src/server/utils/logger.py new file mode 100644 index 000000000..4d7d7d257 --- /dev/null +++ b/src/server/utils/logger.py @@ -0,0 +1,105 @@ +import logging +import os +import sys +import threading +import structlog + +def handle_exceptions(logger): + """Handle exceptions using the provided logger.""" + def exception_handler(etype, value, traceback): + logger.exception("Top-level exception occurred", + exc_info=(etype, value, traceback)) + + def multithread_exception_handler(args): + exception_handler(args.exc_type, args.exc_value, args.exc_traceback) + + sys.excepthook = exception_handler + threading.excepthook = multithread_exception_handler + + +def get_structured_logger(name=__name__, + filename=None, + log_exceptions=True): + """Create a new structlog logger. + + Use the logger returned from this in indicator code using the standard + wrapper calls, e.g.: + + logger = get_structured_logger(__name__) + logger.warning("Error", type="Signal too low"). + + The output will be rendered as JSON which can easily be consumed by logs + processors. + + See the structlog documentation for details. + + Parameters + --------- + name: Name to use for logger (included in log lines), __name__ from caller + is a good choice. + filename: An (optional) file to write log output. + """ + # Configure the underlying logging configuration + handlers = [logging.StreamHandler()] + if filename: + handlers.append(logging.FileHandler(filename)) + + if "LOG_DEBUG" in os.environ: + log_level = logging.DEBUG + else: + log_level = logging.INFO + + logging.basicConfig( + format="%(message)s", + level=log_level, + handlers=handlers + ) + + def add_pid(logger, method_name, event_dict): + """ + Add current PID to the event dict. + """ + event_dict["pid"] = os.getpid() + return event_dict + + # Configure structlog. This uses many of the standard suggestions from + # the structlog documentation. + structlog.configure( + processors=[ + # Filter out log levels we are not tracking. + structlog.stdlib.filter_by_level, + # Include logger name in output. + structlog.stdlib.add_logger_name, + # Include log level in output. + structlog.stdlib.add_log_level, + # Include PID in output. + add_pid, + # Allow formatting into arguments e.g., logger.info("Hello, %s", + # name) + structlog.stdlib.PositionalArgumentsFormatter(), + # Add timestamps. + structlog.processors.TimeStamper(fmt="iso"), + # Match support for exception logging in the standard logger. + structlog.processors.StackInfoRenderer(), + structlog.processors.format_exc_info, + # Decode unicode characters + structlog.processors.UnicodeDecoder(), + # Render as JSON + structlog.processors.JSONRenderer() + ], + # Use a dict class for keeping track of data. + context_class=dict, + # Use a standard logger for the actual log call. + logger_factory=structlog.stdlib.LoggerFactory(), + # Use a standard wrapper class for utilities like log.warning() + wrapper_class=structlog.stdlib.BoundLogger, + # Cache the logger + cache_logger_on_first_use=True, + ) + + logger = structlog.get_logger(name) + + if log_exceptions: + handle_exceptions(logger) + + return logger