Skip to content

Add query logging #630

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 23 commits into from
Closed
Show file tree
Hide file tree
Changes from 14 commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
88410d5
proper fix for 'anomalous backslash' linter warning
dmig Oct 1, 2020
2c645e4
add query logger to Connection class and connect() function
dmig Oct 1, 2020
e139d77
query logging in Cursor and PreparedSatetment
dmig Oct 1, 2020
8609acd
fix undefined argument for _parse_connect_arguments
dmig Oct 1, 2020
0835532
fix AttributeError _query_logger
dmig Oct 1, 2020
64af95b
add pool query_logging option
dmig Oct 1, 2020
4013208
fix flake8 errors
dmig Oct 1, 2020
b1746f6
remove query text from _get_statement() log messages
dmig Oct 2, 2020
1bd8e57
lower 'uncacheable query' message priority
dmig Oct 2, 2020
8c28612
add log message to _prepare()
dmig Oct 2, 2020
3605c57
return only beginning of query to _get_statement log messages
dmig Oct 2, 2020
2b34cb9
move connect_kwargs description to the place it's expected to be
dmig Jan 27, 2021
670d505
declare `query_logging` explicitly in `create_pool()`
dmig Jan 27, 2021
c15d62a
add documentation section about query logging
dmig Jan 27, 2021
102aa23
Avoid unnecessary overhead during connection reset (#648)
kitogo Feb 10, 2021
1089e49
docs: fix simple typo, verifiction -> verification (#682)
timgates42 Feb 10, 2021
f5484b2
Don't build aarch64 wheels for now
elprans Feb 10, 2021
3e392d5
asyncpg v0.22.0
elprans Dec 21, 2020
4319d57
Post-release version bump
elprans Feb 10, 2021
26389fe
Merge remote-tracking branch 'upstream/master' into add_logging
dmig Mar 10, 2021
79194ed
Merge branch 'master' into add_logging
dmig May 11, 2022
392fa06
Merge branch 'master' into add_logging
dmig Jun 16, 2022
de08a01
Merge branch 'master' into add_logging
dmig Jun 27, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions asyncpg/connect_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -598,7 +598,8 @@ async def _connect_addr(
params,
config,
connection_class,
record_class
record_class,
query_logging
):
assert loop is not None

Expand Down Expand Up @@ -642,7 +643,8 @@ async def _connect_addr(
tr.close()
raise

con = connection_class(pr, tr, loop, addr, config, params_input)
con = connection_class(pr, tr, loop, addr, config, params_input,
query_logging)
pr.set_connection(con)
return con

Expand All @@ -651,6 +653,7 @@ async def _connect(*, loop, timeout, connection_class, record_class, **kwargs):
if loop is None:
loop = asyncio.get_event_loop()

query_logging = kwargs.pop('query_logging', False)
addrs, params, config = _parse_connect_arguments(timeout=timeout, **kwargs)

last_error = None
Expand All @@ -666,6 +669,7 @@ async def _connect(*, loop, timeout, connection_class, record_class, **kwargs):
config=config,
connection_class=connection_class,
record_class=record_class,
query_logging=query_logging
)
except (OSError, asyncio.TimeoutError, ConnectionError) as ex:
last_error = ex
Expand Down
65 changes: 60 additions & 5 deletions asyncpg/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import collections
import collections.abc
import itertools
import logging
import sys
import time
import traceback
Expand Down Expand Up @@ -41,7 +42,7 @@ class Connection(metaclass=ConnectionMeta):
"""

__slots__ = ('_protocol', '_transport', '_loop',
'_top_xact', '_aborted',
'_top_xact', '_aborted', '_query_logger',
'_pool_release_ctr', '_stmt_cache', '_stmts_to_close',
'_listeners', '_server_version', '_server_caps',
'_intro_query', '_reset_query', '_proxy',
Expand All @@ -52,7 +53,8 @@ class Connection(metaclass=ConnectionMeta):
def __init__(self, protocol, transport, loop,
addr,
config: connect_utils._ClientConfiguration,
params: connect_utils._ConnectionParameters):
params: connect_utils._ConnectionParameters,
query_logging=False):
self._protocol = protocol
self._transport = transport
self._loop = loop
Expand All @@ -67,6 +69,13 @@ def __init__(self, protocol, transport, loop,
self._config = config
self._params = params

if query_logging:
# use distinct logger name
self._query_logger = logging.getLogger('asyncpg.query')
self._query_logger.info('Query logging enabled')
else:
self._query_logger = None

self._stmt_cache = _StatementCache(
loop=loop,
max_size=config.statement_cache_size,
Expand Down Expand Up @@ -294,6 +303,8 @@ async def execute(self, query: str, *args, timeout: float=None) -> str:
self._check_open()

if not args:
if self._query_logger:
self._query_logger.debug('Executing: %s', query)
return await self._protocol.query(query, timeout)

_, status, _ = await self._execute(
Expand Down Expand Up @@ -356,6 +367,9 @@ async def _get_statement(
(query, record_class, ignore_custom_codec)
)
if statement is not None:
if self._query_logger:
self._query_logger.debug('Cache hit: %s for %.40r',
statement.name, query)
return statement

# Only use the cache when:
Expand All @@ -367,6 +381,16 @@ async def _get_statement(
len(query) > self._config.max_cacheable_statement_size):
use_cache = False

if self._query_logger:
self._query_logger.log(
5,
# cut query length by max_cacheable_statement_size
'Uncacheable query: %.{}r...'.format(
self._config.max_cacheable_statement_size
),
query
)

if use_cache or named:
stmt_name = self._get_unique_id('stmt')
else:
Expand Down Expand Up @@ -425,6 +449,13 @@ async def _get_statement(
self._stmt_cache.put(
(query, record_class, ignore_custom_codec), statement)

if self._query_logger:
self._query_logger.debug(
'New cached query: %s for %.40r' if use_cache
else 'Prepared query: %s for %.40r',
statement.name, query
)

# If we've just created a new statement object, check if there
# are any statements for GC.
if self._stmts_to_close:
Expand Down Expand Up @@ -542,6 +573,8 @@ async def _prepare(
use_cache: bool=False,
record_class=None
):
if self._query_logger:
self._query_logger.debug('Preparing query: %s', query)
self._check_open()
stmt = await self._get_statement(
query,
Expand Down Expand Up @@ -986,6 +1019,8 @@ async def _writer(data):
writer = _writer

try:
if self._query_logger:
self._query_logger.debug('Copy out query: %s', copy_stmt)
return await self._protocol.copy_out(copy_stmt, writer, timeout)
finally:
if opened_by_us:
Expand Down Expand Up @@ -1038,13 +1073,17 @@ async def __anext__(self):
reader = _Reader()

try:
if self._query_logger:
self._query_logger.debug('Copy in query: %s', copy_stmt)
return await self._protocol.copy_in(
copy_stmt, reader, data, None, None, timeout)
finally:
if opened_by_us:
await run_in_executor(None, f.close)

async def _copy_in_records(self, copy_stmt, records, intro_stmt, timeout):
if self._query_logger:
self._query_logger.debug('Copy in query: %s', copy_stmt)
return await self._protocol.copy_in(
copy_stmt, None, None, records, intro_stmt, timeout)

Expand Down Expand Up @@ -1656,6 +1695,11 @@ async def __execute(
executor = lambda stmt, timeout: self._protocol.bind_execute(
stmt, args, '', limit, return_status, timeout)
timeout = self._protocol._get_timeout(timeout)

if self._query_logger:
self._query_logger.debug('Executing query: %s', query)
self._query_logger.log(5, 'Arguments: %r', args)

return await self._do_execute(
query,
executor,
Expand All @@ -1668,6 +1712,11 @@ async def _executemany(self, query, args, timeout):
executor = lambda stmt, timeout: self._protocol.bind_execute_many(
stmt, args, '', timeout)
timeout = self._protocol._get_timeout(timeout)

if self._query_logger:
self._query_logger.debug('Executemany query: %s', query)
self._query_logger.log(5, 'Arguments: %r', args)

with self._stmt_exclusive_section:
result, _ = await self._do_execute(query, executor, timeout)
return result
Expand Down Expand Up @@ -1766,8 +1815,9 @@ async def connect(dsn=None, *,
ssl=None,
connection_class=Connection,
record_class=protocol.Record,
server_settings=None):
r"""A coroutine to establish a connection to a PostgreSQL server.
server_settings=None,
query_logging=False):
"""A coroutine to establish a connection to a PostgreSQL server.

The connection parameters may be specified either as a connection
URI in *dsn*, or as specific keyword arguments, or both.
Expand Down Expand Up @@ -1851,7 +1901,7 @@ async def connect(dsn=None, *,

:param passfile:
The name of the file used to store passwords
(defaults to ``~/.pgpass``, or ``%APPDATA%\postgresql\pgpass.conf``
(defaults to ``~/.pgpass``, or ``%APPDATA%\\postgresql\\pgpass.conf``
on Windows).

:param loop:
Expand Down Expand Up @@ -1920,6 +1970,10 @@ async def connect(dsn=None, *,
this connection object. Must be a subclass of
:class:`~asyncpg.Record`.

:param bool query_logging:
If set, a logger named `asyncpg.query` will be created and used for
query and query argument logging.

:return: A :class:`~asyncpg.connection.Connection` instance.

Example:
Expand Down Expand Up @@ -2004,6 +2058,7 @@ async def connect(dsn=None, *,
statement_cache_size=statement_cache_size,
max_cached_statement_lifetime=max_cached_statement_lifetime,
max_cacheable_statement_size=max_cacheable_statement_size,
query_logging=query_logging
)


Expand Down
20 changes: 20 additions & 0 deletions asyncpg/cursor.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,10 @@ def __init__(self, connection, query, state, args, record_class):
self._exhausted = False
self._query = query
self._record_class = record_class
logger = connection._query_logger
if logger:
logger.debug('Created cursor for query', query)
logger.log(5, 'Cursor query arguments %r', args)

def _check_ready(self):
if self._state is None:
Expand All @@ -126,8 +130,15 @@ async def _bind_exec(self, n, timeout):

con = self._connection
protocol = con._protocol
logger = con._query_logger

self._portal_name = con._get_unique_id('portal')
if logger:
logger.log(5, 'Opened cursor portal %r for query: %s',
self._portal_name, self._query)
logger.log(5, 'Fetching %d records from cursor %r', n,
self._portal_name)

buffer, _, self._exhausted = await protocol.bind_execute(
self._state, self._args, self._portal_name, n, True, timeout)
return buffer
Expand All @@ -143,6 +154,10 @@ async def _bind(self, timeout):
protocol = con._protocol

self._portal_name = con._get_unique_id('portal')
if con._query_logger:
con._query_logger.log(5, 'Opened cursor portal %r for query: %s',
self._portal_name, self._query)

buffer = await protocol.bind(self._state, self._args,
self._portal_name,
timeout)
Expand All @@ -155,6 +170,11 @@ async def _exec(self, n, timeout):
raise exceptions.InterfaceError(
'cursor does not have an open portal')

if self._connection._query_logger:
self._connection._query_logger.log(
5, 'Fetching %d records from cursor %r', n, self._portal_name
)

protocol = self._connection._protocol
buffer, _, self._exhausted = await protocol.execute(
self._state, self._portal_name, n, True, timeout)
Expand Down
20 changes: 15 additions & 5 deletions asyncpg/pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,7 @@ class Pool:

__slots__ = (
'_queue', '_loop', '_minsize', '_maxsize',
'_init', '_connect_args', '_connect_kwargs',
'_init', '_connect_args', '_connect_kwargs', '_query_logging',
'_working_addr', '_working_config', '_working_params',
'_holders', '_initialized', '_initializing', '_closing',
'_closed', '_connection_class', '_record_class', '_generation',
Expand All @@ -325,6 +325,7 @@ def __init__(self, *connect_args,
loop,
connection_class,
record_class,
query_logging=False,
**connect_kwargs):

if len(connect_args) > 1:
Expand Down Expand Up @@ -393,6 +394,7 @@ def __init__(self, *connect_args,
self._max_queries = max_queries
self._max_inactive_connection_lifetime = \
max_inactive_connection_lifetime
self._query_logging = query_logging

async def _async__init__(self):
if self._initialized:
Expand Down Expand Up @@ -479,6 +481,7 @@ async def _get_new_connection(self):
loop=self._loop,
connection_class=self._connection_class,
record_class=self._record_class,
query_logging=self._query_logging,
**self._connect_kwargs)

self._working_addr = con._addr
Expand All @@ -496,6 +499,7 @@ async def _get_new_connection(self):
params=self._working_params,
connection_class=self._connection_class,
record_class=self._record_class,
query_logging=self._query_logging,
)

if self._init is not None:
Expand Down Expand Up @@ -806,6 +810,7 @@ def create_pool(dsn=None, *,
loop=None,
connection_class=connection.Connection,
record_class=protocol.Record,
query_logging=False,
**connect_kwargs):
r"""Create a connection pool.

Expand Down Expand Up @@ -856,10 +861,6 @@ def create_pool(dsn=None, *,
the following format:
``postgres://user:pass@host:port/database?option=value``.

:param \*\*connect_kwargs:
Keyword arguments for the :func:`~asyncpg.connection.connect`
function.

:param Connection connection_class:
The class to use for connections. Must be a subclass of
:class:`~asyncpg.connection.Connection`.
Expand Down Expand Up @@ -901,6 +902,14 @@ def create_pool(dsn=None, *,
An asyncio event loop instance. If ``None``, the default
event loop will be used.

:param bool query_logging:
If set, a logger named `asyncpg.query` will be created and used for
query and query argument logging for every connection created.

:param \*\*connect_kwargs:
Keyword arguments for the :func:`~asyncpg.connection.connect`
function.

:return: An instance of :class:`~asyncpg.pool.Pool`.

.. versionchanged:: 0.10.0
Expand Down Expand Up @@ -930,4 +939,5 @@ def create_pool(dsn=None, *,
min_size=min_size, max_size=max_size,
max_queries=max_queries, loop=loop, setup=setup, init=init,
max_inactive_connection_lifetime=max_inactive_connection_lifetime,
query_logging=query_logging,
**connect_kwargs)
9 changes: 9 additions & 0 deletions asyncpg/prepared_stmt.py
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,10 @@ async def executemany(self, args, *, timeout: float=None):

.. versionadded:: 0.22.0
"""
if self._connection._query_logger:
self._connection._query_logger.log(
5, 'Executing %s with arguments: %r', self._state.name, args
)
return await self.__do_execute(
lambda protocol: protocol.bind_execute_many(
self._state, args, '', timeout))
Expand All @@ -230,6 +234,11 @@ async def __do_execute(self, executor):
raise

async def __bind_execute(self, args, limit, timeout):
if self._connection._query_logger:
self._connection._query_logger.log(
5, 'Executing %s with limit %r and arguments: %r',
self._state.name, limit, args
)
data, status, _ = await self.__do_execute(
lambda protocol: protocol.bind_execute(
self._state, args, '', limit, True, timeout))
Expand Down
Loading