diff --git a/.bumpversion.cfg b/.bumpversion.cfg index 7c34c9165..fa394e9d6 100644 --- a/.bumpversion.cfg +++ b/.bumpversion.cfg @@ -1,5 +1,5 @@ [bumpversion] -current_version = 4.1.24 +current_version = 4.1.25 commit = False tag = False diff --git a/.git-blame-ignore-revs b/.git-blame-ignore-revs index bfaa3109e..3abbfed0e 100644 --- a/.git-blame-ignore-revs +++ b/.git-blame-ignore-revs @@ -20,3 +20,5 @@ b9ceb400d9248c8271e8342275664ac5524e335d 07ed83e5768f717ab0f9a62a9209e4e2cffa058d # style(black): format wiki acquisition 923852eafa86b8f8b182d499489249ba8f815843 +# lint: trailing whitespace changes +81179c5f144b8f25421e799e823e18cde43c84f9 diff --git a/dev/local/setup.cfg b/dev/local/setup.cfg index 4e67b3354..dd30723a4 100644 --- a/dev/local/setup.cfg +++ b/dev/local/setup.cfg @@ -1,6 +1,6 @@ [metadata] name = Delphi Development -version = 4.1.24 +version = 4.1.25 [options] packages = diff --git a/docs/api/covidcast-signals/covid-act-now.md b/docs/api/covidcast-signals/covid-act-now.md index 72a5a9a20..670bda61f 100644 --- a/docs/api/covidcast-signals/covid-act-now.md +++ b/docs/api/covidcast-signals/covid-act-now.md @@ -15,11 +15,13 @@ grand_parent: COVIDcast Main Endpoint * **Time type:** day (see [date format docs](../covidcast_times.md)) * **License:** [CC BY-NC](../covidcast_licensing.md#creative-commons-attribution-noncommercial) -The COVID Act Now (CAN) data source provides COVID-19 testing statistics, such as positivity rates and total tests performed. -The county-level positivity rates and test totals are pulled directly from CAN. -While CAN provides this data potentially from multiple sources, we only use data sourced from the +The [COVID Act Now (CAN)](https://covidactnow.org/) data source provides COVID-19 testing statistics, such as positivity rates and total tests performed. +The county-level positivity rates and test totals are pulled directly from CAN using [their API](https://covidactnow.org/data-api). +While CAN provides this data potentially from multiple sources, we only use data that CAN sources from the [CDC's COVID-19 Integrated County View](https://covid.cdc.gov/covid-data-tracker/#county-view). +Delphi's mirror of the CAN data was deactivated in December 2021 (last issue 2021-12-10) in favor of the [DSEW CPR data](./dsew-cpr.md), which reports the same information under the `covid_naat_pct_positive_7dav` signal. + | Signal | Description | |--------------------------------|----------------------------------------------------------------| @@ -34,9 +36,9 @@ While CAN provides this data potentially from multiple sources, we only use data ## Estimation -The quantities received from CAN / CDC are the county-level positivity rate and total tests, -which are based on the counts of PCR specimens tested. -In particular, they are also already smoothed with a 7-day-average. +We receive county-level positivity rate and total tests from CAN, originating from the CDC. +These quantiles are based on the counts of PCR specimens tested. +They are also already smoothed with a 7-day-average. For a fixed location $$i$$ and time $$t$$, let $$Y_{it}$$ denote the number of PCR specimens tested that have a positive result. Let $$N_{it}$$ denote the total number of PCR specimens tested. @@ -79,38 +81,41 @@ $$ ### Smoothing -No additional smoothing is done to avoid double-smoothing, since the data pulled from CAN / CDC +No additional smoothing is done to avoid double-smoothing, since the CAN data is already smoothed with a 7-day-average. ## Limitations -Estimates for geographical levels beyond counties may be inaccurate due to how aggregations -are done on smoothed values instead of the raw values. Ideally we would aggregate raw values +Estimates for geographical levels beyond counties may be inaccurate because our aggregations +are performed on smoothed values instead of the raw values. +Ideally we would aggregate raw values then smooth, but the raw values are not accessible in this case. -The positivity rate here should not be interpreted as the population positivity rate as +The reported test positivity rate should not be interpreted as the population positivity rate as the testing performed are typically not randomly sampled, especially for early data with lower testing volumes. A few counties, most notably in California, are also not covered by this data source. -Entries with zero total tests performed are also suppressed, even if it was actually the case that +Entries with zero total tests performed are suppressed, even if it was actually the case that no tests were performed for the day. ## Lag and Backfill The lag for these signals varies depending on the reporting patterns of individual counties. Most counties have their latest data report with a lag of 2 days, while others can take 9 days -or more in the case of California counties. +or more, as is the case with California counties. -These signals are also backfilled as backlogged test results could get assigned to older 7-day timeframes. -Most recent test positivity rates do not change substantially with backfill (having a median delta of close to 0). -However, most recent total tests performed is expected to increase in later data revisions (having a median increase of 7%). +Revisions are sometimes made to the data. For example, backlogged test results can get assigned to past dates. +The majority of recent test positivity rates do not change substantially with backfill (having a median delta of close to 0). +However, the majority of recent total tests performed is expected to increase in later data revisions (having a median increase of 7%). Values more than 5 days in the past are expected to remain fairly static (with total tests performed having a median increase of 1% of less), as most major revisions have already occurred. ## Source and Licensing -County-level testing data is scraped by CAN from the +County-level testing data is scraped by [CAN](https://covidactnow.org/) from the [CDC's COVID-19 Integrated County View](https://covid.cdc.gov/covid-data-tracker/#county-view), and made available through [CAN's API](https://covidactnow.org/tools). + +The data is made available under a [CC BY-NC](../covidcast_licensing.md#creative-commons-attribution-noncommercial) license. diff --git a/docs/api/covidcast-signals/hhs.md b/docs/api/covidcast-signals/hhs.md index b6f916e17..a77187532 100644 --- a/docs/api/covidcast-signals/hhs.md +++ b/docs/api/covidcast-signals/hhs.md @@ -1,6 +1,6 @@ --- title: Department of Health & Human Services -parent: Data Sources and Signals +parent: Inactive Signals grand_parent: COVIDcast Main Endpoint --- diff --git a/docs/api/covidcast-signals/indicator-combination b/docs/api/covidcast-signals/indicator-combination.md similarity index 100% rename from docs/api/covidcast-signals/indicator-combination rename to docs/api/covidcast-signals/indicator-combination.md diff --git a/docs/epidata_development.md b/docs/epidata_development.md index 024f1b4dc..eb38e6ae9 100644 --- a/docs/epidata_development.md +++ b/docs/epidata_development.md @@ -49,7 +49,7 @@ $ [sudo] make test pdb=1 $ [sudo] make test test=repos/delphi/delphi-epidata/integrations/acquisition ``` -You can read the commands executed by the Makefile [here](../dev/local/Makefile). +You can read the commands executed by the Makefile [here](https://github.com/cmu-delphi/delphi-epidata/blob/dev/dev/local/Makefile). ## Rapid Iteration and Bind Mounts @@ -87,8 +87,8 @@ You can test your changes manually by: What follows is a worked demonstration based on the `fluview` endpoint. Before starting, make sure that you have the `delphi_database_epidata`, -`delphi_web_epidata`, and `delphi_redis` containers running; if you don't, see -the Makefile instructions above. +`delphi_web_epidata`, and `delphi_redis` containers running (with `docker ps`); +if you don't, see the Makefile instructions above. First, let's insert some fake data into the `fluview` table: diff --git a/integrations/client/test_delphi_epidata.py b/integrations/client/test_delphi_epidata.py index 3b69eb4cf..f4cf773c6 100644 --- a/integrations/client/test_delphi_epidata.py +++ b/integrations/client/test_delphi_epidata.py @@ -2,8 +2,8 @@ # standard library import time +import json from json import JSONDecodeError -from requests.models import Response from unittest.mock import MagicMock, patch # first party @@ -306,6 +306,39 @@ def test_sandbox(self, get, post): Epidata.debug = False Epidata.sandbox = False + @patch('requests.get') + def test_version_check(self, get): + """Test that the _version_check() function correctly logs a version discrepancy.""" + class MockJson: + def __init__(self, content, status_code): + self.content = content + self.status_code = status_code + def raise_for_status(self): pass + def json(self): return json.loads(self.content) + get.reset_mock() + get.return_value = MockJson(b'{"info": {"version": "0.0.1"}}', 200) + + # "back up" the value of this private class var and replace w/ default + # so the ._version_check() method runs unencumbered: + e_vdc__save = Epidata._version_checked + Epidata._version_checked = False + # run version check: + Epidata._version_check() + # "restore" class var: + Epidata._version_checked = e_vdc__save + + captured = self.capsys.readouterr() + output = captured.err.splitlines() + self.assertEqual(len(output), 1) + self.assertIn("Client version not up to date", output[0]) + self.assertIn("\'latest_version\': \'0.0.1\'", output[0]) + + @patch('delphi.epidata.client.delphi_epidata.Epidata._version_check') + def test_version_check_once(self, version_check): + """Test that the _version_check() function is only called once on initial module import.""" + from delphi.epidata.client.delphi_epidata import Epidata + version_check.assert_not_called() + def test_geo_value(self): """test different variants of geo types: single, *, multi.""" diff --git a/src/acquisition/covid_hosp/common/database.py b/src/acquisition/covid_hosp/common/database.py index efbdb6c45..18c7f377f 100644 --- a/src/acquisition/covid_hosp/common/database.py +++ b/src/acquisition/covid_hosp/common/database.py @@ -11,7 +11,7 @@ # first party import delphi.operations.secrets as secrets -from delphi.epidata.common.logger import get_structured_logger +from delphi_utils import get_structured_logger Columndef = namedtuple("Columndef", "csv_name sql_name dtype") diff --git a/src/acquisition/covidcast/csv_importer.py b/src/acquisition/covidcast/csv_importer.py index f6122e610..e9893c0da 100644 --- a/src/acquisition/covidcast/csv_importer.py +++ b/src/acquisition/covidcast/csv_importer.py @@ -13,10 +13,9 @@ import pandas as pd # first party -from delphi_utils import Nans +from delphi_utils import get_structured_logger, Nans from delphi.utils.epiweek import delta_epiweeks from delphi.epidata.common.covidcast_row import CovidcastRow -from delphi.epidata.common.logger import get_structured_logger DataFrameRow = NamedTuple('DFRow', [ ('geo_id', str), diff --git a/src/acquisition/covidcast/csv_to_database.py b/src/acquisition/covidcast/csv_to_database.py index be9dad86c..b3642fc51 100644 --- a/src/acquisition/covidcast/csv_to_database.py +++ b/src/acquisition/covidcast/csv_to_database.py @@ -11,7 +11,7 @@ from delphi.epidata.acquisition.covidcast.csv_importer import CsvImporter, PathDetails from delphi.epidata.acquisition.covidcast.database import Database, DBLoadStateException from delphi.epidata.acquisition.covidcast.file_archiver import FileArchiver -from delphi.epidata.common.logger import get_structured_logger +from delphi_utils import get_structured_logger def get_argument_parser(): diff --git a/src/acquisition/covidcast/database.py b/src/acquisition/covidcast/database.py index 871061b81..5fd56923b 100644 --- a/src/acquisition/covidcast/database.py +++ b/src/acquisition/covidcast/database.py @@ -14,7 +14,7 @@ # first party import delphi.operations.secrets as secrets -from delphi.epidata.common.logger import get_structured_logger +from delphi_utils import get_structured_logger from delphi.epidata.common.covidcast_row import CovidcastRow @@ -117,16 +117,16 @@ def insert_or_update_batch(self, cc_rows: List[CovidcastRow], batch_size=2**20, get_structured_logger("insert_or_update_batch").fatal(err_msg) raise DBLoadStateException(err_msg) - # NOTE: `value_update_timestamp` is hardcoded to "NOW" (which is appropriate) and + # NOTE: `value_update_timestamp` is hardcoded to "NOW" (which is appropriate) and # `is_latest_issue` is hardcoded to 1 (which is temporary and addressed later in this method) insert_into_loader_sql = f''' INSERT INTO `{self.load_table}` (`source`, `signal`, `time_type`, `geo_type`, `time_value`, `geo_value`, - `value_updated_timestamp`, `value`, `stderr`, `sample_size`, `issue`, `lag`, + `value_updated_timestamp`, `value`, `stderr`, `sample_size`, `issue`, `lag`, `is_latest_issue`, `missing_value`, `missing_stderr`, `missing_sample_size`) VALUES - (%s, %s, %s, %s, %s, %s, - UNIX_TIMESTAMP(NOW()), %s, %s, %s, %s, %s, + (%s, %s, %s, %s, %s, %s, + UNIX_TIMESTAMP(NOW()), %s, %s, %s, %s, %s, 1, %s, %s, %s) ''' @@ -134,11 +134,11 @@ def insert_or_update_batch(self, cc_rows: List[CovidcastRow], batch_size=2**20, # if an entry in the load table is NOT in the latest table, it is clearly now the latest value for that key (so we do nothing (thanks to INNER join)). # if an entry *IS* in both load and latest tables, but latest table issue is newer, unmark is_latest_issue in load. fix_is_latest_issue_sql = f''' - UPDATE - `{self.load_table}` JOIN `{self.latest_view}` - USING (`source`, `signal`, `geo_type`, `geo_value`, `time_type`, `time_value`) - SET `{self.load_table}`.`is_latest_issue`=0 - WHERE `{self.load_table}`.`issue` < `{self.latest_view}`.`issue` + UPDATE + `{self.load_table}` JOIN `{self.latest_view}` + USING (`source`, `signal`, `geo_type`, `geo_value`, `time_type`, `time_value`) + SET `{self.load_table}`.`is_latest_issue`=0 + WHERE `{self.load_table}`.`issue` < `{self.latest_view}`.`issue` ''' # TODO: consider handling cc_rows as a generator instead of a list diff --git a/src/acquisition/covidcast/file_archiver.py b/src/acquisition/covidcast/file_archiver.py index 802590871..07bd453f9 100644 --- a/src/acquisition/covidcast/file_archiver.py +++ b/src/acquisition/covidcast/file_archiver.py @@ -6,7 +6,7 @@ import shutil # first party -from delphi.epidata.common.logger import get_structured_logger +from delphi_utils import get_structured_logger class FileArchiver: """Archives files by moving and compressing.""" diff --git a/src/client/delphi_epidata.R b/src/client/delphi_epidata.R index 96e168e48..fd461de00 100644 --- a/src/client/delphi_epidata.R +++ b/src/client/delphi_epidata.R @@ -15,7 +15,7 @@ Epidata <- (function() { # API base url BASE_URL <- getOption('epidata.url', default = 'https://api.delphi.cmu.edu/epidata/') - client_version <- '4.1.24' + client_version <- '4.1.25' auth <- getOption("epidata.auth", default = NA) diff --git a/src/client/delphi_epidata.js b/src/client/delphi_epidata.js index 375c3ba01..7afa235c0 100644 --- a/src/client/delphi_epidata.js +++ b/src/client/delphi_epidata.js @@ -22,7 +22,7 @@ } })(this, function (exports, fetchImpl, jQuery) { const BASE_URL = "https://api.delphi.cmu.edu/epidata/"; - const client_version = "4.1.24"; + const client_version = "4.1.25"; // Helper function to cast values and/or ranges to strings function _listitem(value) { diff --git a/src/client/delphi_epidata.py b/src/client/delphi_epidata.py index 9c55e0b73..c72e12b89 100644 --- a/src/client/delphi_epidata.py +++ b/src/client/delphi_epidata.py @@ -18,7 +18,7 @@ from aiohttp import ClientSession, TCPConnector, BasicAuth -__version__ = "4.1.24" +__version__ = "4.1.25" _HEADERS = {"user-agent": "delphi_epidata/" + __version__ + " (Python)"} @@ -43,17 +43,42 @@ class Epidata: BASE_URL = "https://api.delphi.cmu.edu/epidata" auth = None - client_version = __version__ - debug = False # if True, prints extra logging statements sandbox = False # if True, will not execute any queries + _version_checked = False + @staticmethod def log(evt, **kwargs): kwargs['event'] = evt kwargs['timestamp'] = time.strftime("%Y-%m-%d %H:%M:%S %z") return sys.stderr.write(str(kwargs) + "\n") + # Check that this client's version matches the most recent available. + # This is intended to run just once per program execution, on initial module load. + # See the bottom of this file for the ultimate call to this method. + @staticmethod + def _version_check(): + if Epidata._version_checked: + # already done; nothing to do! + return + + Epidata._version_checked = True + + try: + request = requests.get('https://pypi.org/pypi/delphi-epidata/json', timeout=5) + latest_version = request.json()['info']['version'] + except Exception as e: + Epidata.log("Error getting latest client version", exception=str(e)) + return + + if latest_version != __version__: + Epidata.log( + "Client version not up to date", + client_version=__version__, + latest_version=latest_version + ) + # Helper function to cast values and/or ranges to strings @staticmethod def _listitem(value): @@ -692,3 +717,7 @@ async def async_make_calls(param_combos): future = asyncio.ensure_future(async_make_calls(param_list)) responses = loop.run_until_complete(future) return responses + + + +Epidata._version_check() diff --git a/src/client/packaging/npm/package.json b/src/client/packaging/npm/package.json index 44468d7f7..c88d0c6ec 100644 --- a/src/client/packaging/npm/package.json +++ b/src/client/packaging/npm/package.json @@ -2,7 +2,7 @@ "name": "delphi_epidata", "description": "Delphi Epidata API Client", "authors": "Delphi Group", - "version": "4.1.24", + "version": "4.1.25", "license": "MIT", "homepage": "https://github.com/cmu-delphi/delphi-epidata", "bugs": { diff --git a/src/client/packaging/pypi/.bumpversion.cfg b/src/client/packaging/pypi/.bumpversion.cfg index 5413f5eca..643580e4b 100644 --- a/src/client/packaging/pypi/.bumpversion.cfg +++ b/src/client/packaging/pypi/.bumpversion.cfg @@ -1,5 +1,5 @@ [bumpversion] -current_version = 4.1.24 +current_version = 4.1.25 commit = False tag = False diff --git a/src/client/packaging/pypi/CHANGELOG.md b/src/client/packaging/pypi/CHANGELOG.md index e053619c8..3c465e3e3 100644 --- a/src/client/packaging/pypi/CHANGELOG.md +++ b/src/client/packaging/pypi/CHANGELOG.md @@ -3,6 +3,14 @@ All notable future changes to the `delphi_epidata` python client will be documented in this file. The format is based on [Keep a Changelog](http://keepachangelog.com/). +## [4.1.25] - 2024-07-18 + +### Includes +- https://github.com/cmu-delphi/delphi-epidata/pull/1456 + +### Changed +- Added a one-time check which logs a warning when the newest client version does not match the client version in use. + ## [4.1.24] - 2024-07-09 ### Includes diff --git a/src/client/packaging/pypi/pyproject.toml b/src/client/packaging/pypi/pyproject.toml index 7acf8dea7..d869f42e4 100644 --- a/src/client/packaging/pypi/pyproject.toml +++ b/src/client/packaging/pypi/pyproject.toml @@ -18,7 +18,7 @@ build-backend = "setuptools.build_meta" # If not defined, then legacy behavior c [project] name = "delphi_epidata" # REQUIRED, is the only field that cannot be marked as dynamic. -version = "4.1.24" +version = "4.1.25" description = "A programmatic interface to Delphi's Epidata API." readme = "README.md" license = { file = "LICENSE" } diff --git a/src/common/logger.py b/src/common/logger.py deleted file mode 100644 index d04ff7673..000000000 --- a/src/common/logger.py +++ /dev/null @@ -1,254 +0,0 @@ -"""Structured logger utility for creating JSON logs.""" - -# the Delphi group uses two ~identical versions of this file. -# try to keep them in sync with edits, for sanity. -# https://github.com/cmu-delphi/covidcast-indicators/blob/main/_delphi_utils_python/delphi_utils/logger.py # pylint: disable=line-too-long -# https://github.com/cmu-delphi/delphi-epidata/blob/dev/src/common/logger.py - -import contextlib -import logging -import multiprocessing -import os -import sys -import threading -from traceback import format_exception - -import structlog - - -def handle_exceptions(logger): - """Handle exceptions using the provided logger.""" - - def exception_handler(scope, etype, value, traceback): - logger.exception("Top-level exception occurred", - scope=scope, exc_info=(etype, value, traceback)) - - def sys_exception_handler(etype, value, traceback): - exception_handler("sys", etype, value, traceback) - - def threading_exception_handler(args): - if args.exc_type == SystemExit and args.exc_value.code == 0: - # `sys.exit(0)` is considered "successful termination": - # https://docs.python.org/3/library/sys.html#sys.exit - logger.debug("normal thread exit", thread=args.thread, - stack="".join( - format_exception( - args.exc_type, args.exc_value, args.exc_traceback))) - else: - exception_handler(f"thread: {args.thread}", - args.exc_type, args.exc_value, args.exc_traceback) - - sys.excepthook = sys_exception_handler - threading.excepthook = threading_exception_handler - - -def get_structured_logger(name=__name__, - filename=None, - log_exceptions=True): - """Create a new structlog logger. - - Use the logger returned from this in indicator code using the standard - wrapper calls, e.g.: - - logger = get_structured_logger(__name__) - logger.warning("Error", type="Signal too low"). - - The output will be rendered as JSON which can easily be consumed by logs - processors. - - See the structlog documentation for details. - - Parameters - --------- - name: Name to use for logger (included in log lines), __name__ from caller - is a good choice. - filename: An (optional) file to write log output. - """ - # Set the underlying logging configuration - if "LOG_DEBUG" in os.environ: - log_level = logging.DEBUG - else: - log_level = logging.INFO - - logging.basicConfig( - format="%(message)s", - level=log_level, - handlers=[logging.StreamHandler()]) - - def add_pid(_logger, _method_name, event_dict): - """Add current PID to the event dict.""" - event_dict["pid"] = os.getpid() - return event_dict - - # Configure structlog. This uses many of the standard suggestions from - # the structlog documentation. - structlog.configure( - processors=[ - # Filter out log levels we are not tracking. - structlog.stdlib.filter_by_level, - # Include logger name in output. - structlog.stdlib.add_logger_name, - # Include log level in output. - structlog.stdlib.add_log_level, - # Include PID in output. - add_pid, - # Allow formatting into arguments e.g., logger.info("Hello, %s", - # name) - structlog.stdlib.PositionalArgumentsFormatter(), - # Add timestamps. - structlog.processors.TimeStamper(fmt="iso"), - # Match support for exception logging in the standard logger. - structlog.processors.StackInfoRenderer(), - structlog.processors.format_exc_info, - # Decode unicode characters - structlog.processors.UnicodeDecoder(), - # Render as JSON - structlog.processors.JSONRenderer(), - ], - # Use a dict class for keeping track of data. - context_class=dict, - # Use a standard logger for the actual log call. - logger_factory=structlog.stdlib.LoggerFactory(), - # Use a standard wrapper class for utilities like log.warning() - wrapper_class=structlog.stdlib.BoundLogger, - # Cache the logger - cache_logger_on_first_use=True, - ) - - # Create the underlying python logger and wrap it with structlog - system_logger = logging.getLogger(name) - if filename and not system_logger.handlers: - system_logger.addHandler(logging.FileHandler(filename)) - system_logger.setLevel(log_level) - logger = structlog.wrap_logger(system_logger) - - if log_exceptions: - handle_exceptions(logger) - - return logger - - -class LoggerThread(): - """ - A construct to use a logger from multiprocessing workers/jobs. - - the bare structlog loggers are thread-safe but not multiprocessing-safe. - a `LoggerThread` will spawn a thread that listens to a mp.Queue - and logs messages from it with the provided logger, - so other processes can send logging messages to it - via the logger-like `SubLogger` interface. - the SubLogger even logs the pid of the caller. - - this is good to use with a set of jobs that are part of a mp.Pool, - but isnt recommended for general use - because of overhead from threading and multiprocessing, - and because it might introduce lag to log messages. - - somewhat inspired by: - docs.python.org/3/howto/logging-cookbook.html#logging-to-a-single-file-from-multiple-processes - """ - - class SubLogger(): - """MP-safe logger-like interface to convey log messages to a listening LoggerThread.""" - - def __init__(self, queue): - """Create SubLogger with a bound queue.""" - self.queue = queue - - def _log(self, level, *args, **kwargs): - kwargs_plus = {'sub_pid': multiprocessing.current_process().pid} - kwargs_plus.update(kwargs) - self.queue.put([level, args, kwargs_plus]) - - def debug(self, *args, **kwargs): - """Log a DEBUG level message.""" - self._log(logging.DEBUG, *args, **kwargs) - - def info(self, *args, **kwargs): - """Log an INFO level message.""" - self._log(logging.INFO, *args, **kwargs) - - def warning(self, *args, **kwargs): - """Log a WARNING level message.""" - self._log(logging.WARNING, *args, **kwargs) - - def error(self, *args, **kwargs): - """Log an ERROR level message.""" - self._log(logging.ERROR, *args, **kwargs) - - def critical(self, *args, **kwargs): - """Log a CRITICAL level message.""" - self._log(logging.CRITICAL, *args, **kwargs) - - - def get_sublogger(self): - """Retrieve SubLogger for this LoggerThread.""" - return self.sublogger - - def __init__(self, logger, q=None): - """Create and start LoggerThread with supplied logger, creating a queue if not provided.""" - self.logger = logger - if q: - self.msg_queue = q - else: - self.msg_queue = multiprocessing.Queue() - - def logger_thread_worker(): - logger.info('thread started') - while True: - msg = self.msg_queue.get() - if msg == 'STOP': - logger.debug('received stop signal') - break - level, args, kwargs = msg - if level in [logging.DEBUG, logging.INFO, logging.WARNING, - logging.ERROR, logging.CRITICAL]: - logger.log(level, *args, **kwargs) - else: - logger.error('received unknown logging level! exiting...', - level=level, args_kwargs=(args, kwargs)) - break - logger.debug('stopping thread') - - self.thread = threading.Thread(target=logger_thread_worker, - name="LoggerThread__"+logger.name) - logger.debug('starting thread') - self.thread.start() - - self.sublogger = LoggerThread.SubLogger(self.msg_queue) - self.running = True - - def stop(self): - """Terminate this LoggerThread.""" - if not self.running: - self.logger.warning('thread already stopped') - return - self.logger.debug('sending stop signal') - self.msg_queue.put('STOP') - self.thread.join() - self.running = False - self.logger.info('thread stopped') - - -@contextlib.contextmanager -def pool_and_threadedlogger(logger, *poolargs): - """ - Provide (to a context) a multiprocessing Pool and a proxy to the supplied logger. - - Emulates the multiprocessing.Pool() context manager, - but also provides (via a LoggerThread) a SubLogger proxy to logger - that can be safely used by pool workers. - The SubLogger proxy interface supports these methods: debug, info, warning, error, - and critical. - Also "cleans up" the pool by waiting for workers to complete - as it exits the context. - """ - with multiprocessing.Manager() as manager: - logger_thread = LoggerThread(logger, manager.Queue()) - try: - with multiprocessing.Pool(*poolargs) as pool: - yield pool, logger_thread.get_sublogger() - pool.close() - pool.join() - finally: - logger_thread.stop() diff --git a/src/maintenance/covidcast_meta_cache_updater.py b/src/maintenance/covidcast_meta_cache_updater.py index c5f7fe3e8..cb0b2703f 100644 --- a/src/maintenance/covidcast_meta_cache_updater.py +++ b/src/maintenance/covidcast_meta_cache_updater.py @@ -7,7 +7,7 @@ # first party from delphi.epidata.acquisition.covidcast.database import Database -from delphi.epidata.common.logger import get_structured_logger +from delphi_utils import get_structured_logger from delphi.epidata.client.delphi_epidata import Epidata def get_argument_parser(): diff --git a/src/maintenance/delete_batch.py b/src/maintenance/delete_batch.py index 31a25ef2a..8e8298817 100644 --- a/src/maintenance/delete_batch.py +++ b/src/maintenance/delete_batch.py @@ -8,7 +8,7 @@ # first party from delphi.epidata.acquisition.covidcast.database import Database -from delphi.epidata.common.logger import get_structured_logger +from delphi_utils import get_structured_logger def get_argument_parser(): diff --git a/src/maintenance/signal_dash_data_generator.py b/src/maintenance/signal_dash_data_generator.py index b7f1048f5..5a7067f83 100644 --- a/src/maintenance/signal_dash_data_generator.py +++ b/src/maintenance/signal_dash_data_generator.py @@ -15,7 +15,7 @@ # first party import covidcast import delphi.operations.secrets as secrets -from delphi.epidata.common.logger import get_structured_logger +from delphi_utils import get_structured_logger LOOKBACK_DAYS_FOR_COVERAGE = 56 @@ -150,11 +150,11 @@ def write_coverage( def get_enabled_signals(self) -> List[DashboardSignal]: """Retrieve all enabled signals from the database""" - select_statement = f'''SELECT `id`, + select_statement = f'''SELECT `id`, `name`, `source`, `covidcast_signal`, - `latest_coverage_update`, + `latest_coverage_update`, `latest_status_update` FROM `{Database.SIGNAL_TABLE_NAME}` WHERE `enabled` @@ -208,7 +208,7 @@ def get_coverage(dashboard_signal: DashboardSignal) -> List[DashboardSignalCover lambda x: pd.to_datetime(Week(x // 100, x % 100).startdate())) signal_coverage_list = [] - + for _, row in count_by_geo_type_df.iterrows(): signal_coverage = DashboardSignalCoverage( signal_id=dashboard_signal.db_id, diff --git a/src/server/_common.py b/src/server/_common.py index 33a3f9c48..692b83491 100644 --- a/src/server/_common.py +++ b/src/server/_common.py @@ -7,7 +7,7 @@ from werkzeug.exceptions import Unauthorized from werkzeug.local import LocalProxy -from delphi.epidata.common.logger import get_structured_logger +from delphi_utils import get_structured_logger from ._config import SECRET, REVERSE_PROXY_DEPTH from ._db import engine from ._exceptions import DatabaseErrorException, EpiDataException diff --git a/src/server/_config.py b/src/server/_config.py index 9ef373d1e..7ca9ae486 100644 --- a/src/server/_config.py +++ b/src/server/_config.py @@ -7,7 +7,7 @@ load_dotenv() -VERSION = "4.1.24" +VERSION = "4.1.25" MAX_RESULTS = int(10e6) MAX_COMPATIBILITY_RESULTS = int(3650) diff --git a/src/server/_printer.py b/src/server/_printer.py index 5616787a2..6df6d62b9 100644 --- a/src/server/_printer.py +++ b/src/server/_printer.py @@ -8,7 +8,7 @@ from ._config import MAX_RESULTS, MAX_COMPATIBILITY_RESULTS from ._common import is_compatibility_mode, log_info_with_request -from delphi.epidata.common.logger import get_structured_logger +from delphi_utils import get_structured_logger def print_non_standard(format: str, data): diff --git a/src/server/_security.py b/src/server/_security.py index c47f948a5..2e127debf 100644 --- a/src/server/_security.py +++ b/src/server/_security.py @@ -3,7 +3,7 @@ from typing import Optional, cast import redis -from delphi.epidata.common.logger import get_structured_logger +from delphi_utils import get_structured_logger from flask import g, request from werkzeug.exceptions import Unauthorized from werkzeug.local import LocalProxy diff --git a/src/server/admin/models.py b/src/server/admin/models.py index f5c0d54ed..e0ef86b0f 100644 --- a/src/server/admin/models.py +++ b/src/server/admin/models.py @@ -4,7 +4,7 @@ from copy import deepcopy from .._db import Session, WriteSession, default_session -from delphi.epidata.common.logger import get_structured_logger +from delphi_utils import get_structured_logger from typing import Set, Optional, List from datetime import datetime as dtime diff --git a/src/server/endpoints/covidcast.py b/src/server/endpoints/covidcast.py index 11de3cbca..3d7d99e82 100644 --- a/src/server/endpoints/covidcast.py +++ b/src/server/endpoints/covidcast.py @@ -36,7 +36,7 @@ from .covidcast_utils import compute_trend, compute_trends, compute_trend_value, CovidcastMetaEntry from ..utils import shift_day_value, day_to_time_value, time_value_to_iso, time_value_to_day, shift_week_value, time_value_to_week, guess_time_value_is_day, week_to_time_value, TimeValues from .covidcast_utils.model import TimeType, count_signal_time_types, data_sources, create_source_signal_alias_mapper -from delphi.epidata.common.logger import get_structured_logger +from delphi_utils import get_structured_logger # first argument is the endpoint name bp = Blueprint("covidcast", __name__) diff --git a/src/server/endpoints/covidcast_meta.py b/src/server/endpoints/covidcast_meta.py index 35dc9f12e..8c2219ae7 100644 --- a/src/server/endpoints/covidcast_meta.py +++ b/src/server/endpoints/covidcast_meta.py @@ -9,7 +9,7 @@ from .._printer import create_printer from .._query import filter_fields from .._security import current_user, sources_protected_by_roles -from delphi.epidata.common.logger import get_structured_logger +from delphi_utils import get_structured_logger bp = Blueprint("covidcast_meta", __name__) diff --git a/src/server/main.py b/src/server/main.py index 2ec07e5a5..9d308c8ac 100644 --- a/src/server/main.py +++ b/src/server/main.py @@ -6,7 +6,7 @@ from flask import request, send_file, Response, send_from_directory, jsonify -from delphi.epidata.common.logger import get_structured_logger +from delphi_utils import get_structured_logger from ._config import URL_PREFIX, VERSION from ._common import app, set_compatibility_mode diff --git a/src/server/utils/dates.py b/src/server/utils/dates.py index 4d6c242c9..010a6d27f 100644 --- a/src/server/utils/dates.py +++ b/src/server/utils/dates.py @@ -10,7 +10,7 @@ from epiweeks import Week, Year from typing_extensions import TypeAlias -from delphi.epidata.common.logger import get_structured_logger +from delphi_utils import get_structured_logger # Alias for a sequence of date ranges (int, int) or date integers IntRange: TypeAlias = Union[Tuple[int, int], int]