diff --git a/asyncpg/connect_utils.py b/asyncpg/connect_utils.py index 8b29c0fc..43e51a55 100644 --- a/asyncpg/connect_utils.py +++ b/asyncpg/connect_utils.py @@ -778,7 +778,8 @@ async def _connect_addr( params, config, connection_class, - record_class + record_class, + query_logging ): assert loop is not None @@ -899,7 +900,8 @@ async def __connect_addr( tr.close() raise - con = connection_class(pr, tr, loop, addr, config, params_input) + con = connection_class(pr, tr, loop, addr, config, params_input, + query_logging) pr.set_connection(con) return con @@ -972,6 +974,7 @@ async def _connect(*, loop, timeout, connection_class, record_class, **kwargs): if loop is None: loop = asyncio.get_event_loop() + query_logging = kwargs.pop('query_logging', False) addrs, params, config = _parse_connect_arguments(timeout=timeout, **kwargs) target_attr = params.target_session_attrs @@ -989,6 +992,7 @@ async def _connect(*, loop, timeout, connection_class, record_class, **kwargs): config=config, connection_class=connection_class, record_class=record_class, + query_logging=query_logging ) candidates.append(conn) if await _can_use_connection(conn, target_attr): diff --git a/asyncpg/connection.py b/asyncpg/connection.py index 0b13d356..188b4f41 100644 --- a/asyncpg/connection.py +++ b/asyncpg/connection.py @@ -12,6 +12,7 @@ import functools import itertools import inspect +import logging import os import sys import time @@ -45,7 +46,7 @@ class Connection(metaclass=ConnectionMeta): """ __slots__ = ('_protocol', '_transport', '_loop', - '_top_xact', '_aborted', + '_top_xact', '_aborted', '_query_logger', '_pool_release_ctr', '_stmt_cache', '_stmts_to_close', '_listeners', '_server_version', '_server_caps', '_intro_query', '_reset_query', '_proxy', @@ -56,7 +57,8 @@ class Connection(metaclass=ConnectionMeta): def __init__(self, protocol, transport, loop, addr, config: connect_utils._ClientConfiguration, - params: connect_utils._ConnectionParameters): + params: connect_utils._ConnectionParameters, + query_logging=False): self._protocol = protocol self._transport = transport self._loop = loop @@ -71,6 +73,13 @@ def __init__(self, protocol, transport, loop, self._config = config self._params = params + if query_logging: + # use distinct logger name + self._query_logger = logging.getLogger('asyncpg.query') + self._query_logger.info('Query logging enabled') + else: + self._query_logger = None + self._stmt_cache = _StatementCache( loop=loop, max_size=config.statement_cache_size, @@ -314,6 +323,8 @@ async def execute(self, query: str, *args, timeout: float=None) -> str: self._check_open() if not args: + if self._query_logger: + self._query_logger.debug('Executing: %s', query) return await self._protocol.query(query, timeout) _, status, _ = await self._execute( @@ -376,6 +387,9 @@ async def _get_statement( (query, record_class, ignore_custom_codec) ) if statement is not None: + if self._query_logger: + self._query_logger.debug('Cache hit: %s for %.40r', + statement.name, query) return statement # Only use the cache when: @@ -387,6 +401,16 @@ async def _get_statement( len(query) > self._config.max_cacheable_statement_size): use_cache = False + if self._query_logger: + self._query_logger.log( + 5, + # cut query length by max_cacheable_statement_size + 'Uncacheable query: %.{}r...'.format( + self._config.max_cacheable_statement_size + ), + query + ) + if isinstance(named, str): stmt_name = named elif use_cache or named: @@ -447,6 +471,13 @@ async def _get_statement( self._stmt_cache.put( (query, record_class, ignore_custom_codec), statement) + if self._query_logger: + self._query_logger.debug( + 'New cached query: %s for %.40r' if use_cache + else 'Prepared query: %s for %.40r', + statement.name, query + ) + # If we've just created a new statement object, check if there # are any statements for GC. if self._stmts_to_close: @@ -579,6 +610,8 @@ async def _prepare( use_cache: bool=False, record_class=None ): + if self._query_logger: + self._query_logger.debug('Preparing query: %s', query) self._check_open() stmt = await self._get_statement( query, @@ -1046,6 +1079,8 @@ async def _writer(data): writer = _writer try: + if self._query_logger: + self._query_logger.debug('Copy out query: %s', copy_stmt) return await self._protocol.copy_out(copy_stmt, writer, timeout) finally: if opened_by_us: @@ -1097,6 +1132,8 @@ async def __anext__(self): reader = _Reader() try: + if self._query_logger: + self._query_logger.debug('Copy in query: %s', copy_stmt) return await self._protocol.copy_in( copy_stmt, reader, data, None, None, timeout) finally: @@ -1681,6 +1718,11 @@ async def __execute( executor = lambda stmt, timeout: self._protocol.bind_execute( stmt, args, '', limit, return_status, timeout) timeout = self._protocol._get_timeout(timeout) + + if self._query_logger: + self._query_logger.debug('Executing query: %s', query) + self._query_logger.log(5, 'Arguments: %r', args) + return await self._do_execute( query, executor, @@ -1693,6 +1735,11 @@ async def _executemany(self, query, args, timeout): executor = lambda stmt, timeout: self._protocol.bind_execute_many( stmt, args, '', timeout) timeout = self._protocol._get_timeout(timeout) + + if self._query_logger: + self._query_logger.debug('Executemany query: %s', query) + self._query_logger.log(5, 'Arguments: %r', args) + with self._stmt_exclusive_section: result, _ = await self._do_execute(query, executor, timeout) return result @@ -1793,7 +1840,8 @@ async def connect(dsn=None, *, connection_class=Connection, record_class=protocol.Record, server_settings=None, - target_session_attrs=None): + target_session_attrs=None, + query_logging=False): r"""A coroutine to establish a connection to a PostgreSQL server. The connection parameters may be specified either as a connection @@ -1885,7 +1933,7 @@ async def connect(dsn=None, *, :param passfile: The name of the file used to store passwords - (defaults to ``~/.pgpass``, or ``%APPDATA%\postgresql\pgpass.conf`` + (defaults to ``~/.pgpass``, or ``%APPDATA%\\postgresql\\pgpass.conf`` on Windows). :param loop: @@ -2020,6 +2068,10 @@ async def connect(dsn=None, *, from the environment. Defaults to "any" if no value is set. + :param bool query_logging: + If set, a logger named `asyncpg.query` will be created and used for + query and query argument logging. + :return: A :class:`~asyncpg.connection.Connection` instance. Example: @@ -2126,7 +2178,8 @@ async def connect(dsn=None, *, statement_cache_size=statement_cache_size, max_cached_statement_lifetime=max_cached_statement_lifetime, max_cacheable_statement_size=max_cacheable_statement_size, - target_session_attrs=target_session_attrs + target_session_attrs=target_session_attrs, + query_logging=query_logging ) diff --git a/asyncpg/cursor.py b/asyncpg/cursor.py index 7ec159ba..b092bcfa 100644 --- a/asyncpg/cursor.py +++ b/asyncpg/cursor.py @@ -101,6 +101,10 @@ def __init__(self, connection, query, state, args, record_class): self._exhausted = False self._query = query self._record_class = record_class + logger = connection._query_logger + if logger: + logger.debug('Created cursor for query', query) + logger.log(5, 'Cursor query arguments %r', args) def _check_ready(self): if self._state is None: @@ -124,8 +128,15 @@ async def _bind_exec(self, n, timeout): con = self._connection protocol = con._protocol + logger = con._query_logger self._portal_name = con._get_unique_id('portal') + if logger: + logger.log(5, 'Opened cursor portal %r for query: %s', + self._portal_name, self._query) + logger.log(5, 'Fetching %d records from cursor %r', n, + self._portal_name) + buffer, _, self._exhausted = await protocol.bind_execute( self._state, self._args, self._portal_name, n, True, timeout) return buffer @@ -141,6 +152,10 @@ async def _bind(self, timeout): protocol = con._protocol self._portal_name = con._get_unique_id('portal') + if con._query_logger: + con._query_logger.log(5, 'Opened cursor portal %r for query: %s', + self._portal_name, self._query) + buffer = await protocol.bind(self._state, self._args, self._portal_name, timeout) @@ -153,6 +168,11 @@ async def _exec(self, n, timeout): raise exceptions.InterfaceError( 'cursor does not have an open portal') + if self._connection._query_logger: + self._connection._query_logger.log( + 5, 'Fetching %d records from cursor %r', n, self._portal_name + ) + protocol = self._connection._protocol buffer, _, self._exhausted = await protocol.execute( self._state, self._portal_name, n, True, timeout) diff --git a/asyncpg/pool.py b/asyncpg/pool.py index eaf501f4..c5af6988 100644 --- a/asyncpg/pool.py +++ b/asyncpg/pool.py @@ -310,7 +310,7 @@ class Pool: __slots__ = ( '_queue', '_loop', '_minsize', '_maxsize', - '_init', '_connect_args', '_connect_kwargs', + '_init', '_connect_args', '_connect_kwargs', '_query_logging', '_working_addr', '_working_config', '_working_params', '_holders', '_initialized', '_initializing', '_closing', '_closed', '_connection_class', '_record_class', '_generation', @@ -327,6 +327,7 @@ def __init__(self, *connect_args, loop, connection_class, record_class, + query_logging=False, **connect_kwargs): if len(connect_args) > 1: @@ -395,6 +396,7 @@ def __init__(self, *connect_args, self._max_queries = max_queries self._max_inactive_connection_lifetime = \ max_inactive_connection_lifetime + self._query_logging = query_logging async def _async__init__(self): if self._initialized: @@ -516,6 +518,7 @@ async def _get_new_connection(self): loop=self._loop, connection_class=self._connection_class, record_class=self._record_class, + query_logging=self._query_logging, **self._connect_kwargs) self._working_addr = con._addr @@ -533,6 +536,7 @@ async def _get_new_connection(self): params=self._working_params, connection_class=self._connection_class, record_class=self._record_class, + query_logging=self._query_logging, ) if self._init is not None: @@ -1026,6 +1030,7 @@ def create_pool(dsn=None, *, loop=None, connection_class=connection.Connection, record_class=protocol.Record, + query_logging=False, **connect_kwargs): r"""Create a connection pool. @@ -1076,10 +1081,6 @@ def create_pool(dsn=None, *, the following format: ``postgres://user:pass@host:port/database?option=value``. - :param \*\*connect_kwargs: - Keyword arguments for the :func:`~asyncpg.connection.connect` - function. - :param Connection connection_class: The class to use for connections. Must be a subclass of :class:`~asyncpg.connection.Connection`. @@ -1121,6 +1122,14 @@ def create_pool(dsn=None, *, An asyncio event loop instance. If ``None``, the default event loop will be used. + :param bool query_logging: + If set, a logger named `asyncpg.query` will be created and used for + query and query argument logging for every connection created. + + :param \*\*connect_kwargs: + Keyword arguments for the :func:`~asyncpg.connection.connect` + function. + :return: An instance of :class:`~asyncpg.pool.Pool`. .. versionchanged:: 0.10.0 @@ -1151,4 +1160,5 @@ def create_pool(dsn=None, *, min_size=min_size, max_size=max_size, max_queries=max_queries, loop=loop, setup=setup, init=init, max_inactive_connection_lifetime=max_inactive_connection_lifetime, + query_logging=query_logging, **connect_kwargs) diff --git a/asyncpg/prepared_stmt.py b/asyncpg/prepared_stmt.py index 8e241d67..296b6250 100644 --- a/asyncpg/prepared_stmt.py +++ b/asyncpg/prepared_stmt.py @@ -220,6 +220,10 @@ async def executemany(self, args, *, timeout: float=None): .. versionadded:: 0.22.0 """ + if self._connection._query_logger: + self._connection._query_logger.log( + 5, 'Executing %s with arguments: %r', self._state.name, args + ) return await self.__do_execute( lambda protocol: protocol.bind_execute_many( self._state, args, '', timeout)) @@ -238,6 +242,11 @@ async def __do_execute(self, executor): raise async def __bind_execute(self, args, limit, timeout): + if self._connection._query_logger: + self._connection._query_logger.log( + 5, 'Executing %s with limit %r and arguments: %r', + self._state.name, limit, args + ) data, status, _ = await self.__do_execute( lambda protocol: protocol.bind_execute( self._state, args, '', limit, True, timeout)) diff --git a/docs/usage.rst b/docs/usage.rst index a6c62b41..df8a5fe8 100644 --- a/docs/usage.rst +++ b/docs/usage.rst @@ -416,3 +416,61 @@ Web service that computes the requested power of two. web.run_app(app) See :ref:`asyncpg-api-pool` API documentation for more information. + +Query logging +============= + +Sometimes one may need to see queries being executed. +For example, if they are built dynamically. ``asyncpg`` uses python standard +``logging`` library to emit debug messages of levels ``DEBUG`` and ``TRACE``. +Logging is disabled by default to avoid perfomance affection. + + +.. note:: + ``TRACE`` level is custom and not defined inside ``asyncpg``. Define it + yourself if you plan to use it with numeric value ``5`` + (using :func:`logging.addLevelName() `) or just + use ``5`` as level value. + + +.. code-block:: python + + import asyncio + import asyncpg + import datetime + import logging + + async def main(): + # Establish a connection to an existing database named "test" + # as a "postgres" user. + conn = await asyncpg.connect('postgresql://postgres@localhost/test', + query_logging=True) + # Execute a statement to create a new table. + await conn.execute(''' + CREATE TABLE users( + id serial PRIMARY KEY, + name text, + dob date + ) + ''') + + # by default root logger level is set to logging.WARNING, + # lets lower it to DEBUG to see the query + logging.getLogger().setLevel(logging.DEBUG) + # Insert a record into the created table. + await conn.execute(''' + INSERT INTO users(name, dob) VALUES($1, $2) + ''', 'Bob', datetime.date(1984, 3, 1)) + + # lets lower it to TRACE to see query parameters + logging.getLogger().setLevel(5) + # Select a row from the table. + row = await conn.fetchrow( + 'SELECT * FROM users WHERE name = $1', 'Bob') + # *row* now contains + # asyncpg.Record(id=1, name='Bob', dob=datetime.date(1984, 3, 1)) + + # Close the connection. + await conn.close() + + asyncio.get_event_loop().run_until_complete(main())