Skip to content

Commit ffeeebf

Browse files
author
Michal Ploski
committed
Add new tests. Fix cyclomatic complexity check error
1 parent a659301 commit ffeeebf

File tree

2 files changed

+92
-35
lines changed

2 files changed

+92
-35
lines changed

aws_lambda_powertools/logging/utils.py

+48-13
Original file line numberDiff line numberDiff line change
@@ -8,23 +8,58 @@
88

99
def copy_config_to_registered_loggers(
1010
source_logger: PowertoolsLogger, exclude: Optional[List[str]] = None, include: Optional[List[str]] = None
11-
):
12-
root_loggers = [
11+
) -> None:
12+
"""Enable powertools logging for imported libraries.
13+
14+
Attach source logger handlers to external loggers.
15+
Modify logger level based on source logger attribute.
16+
"""
17+
root_loggers = _find_root_loggers(source_logger, exclude, include)
18+
for logger in root_loggers:
19+
_configure_logger(source_logger, logger)
20+
21+
22+
def _include_root_loggers_filter(logger_list: List[str]):
23+
return [
24+
logging.getLogger(name) for name in logging.root.manager.loggerDict if "." not in name and name in logger_list
25+
]
26+
27+
28+
def _exclude_root_loggers_filter(logger_list: List[str]) -> List[logging.Logger]:
29+
return [
1330
logging.getLogger(name)
1431
for name in logging.root.manager.loggerDict
15-
if "." not in name and name != source_logger.name
32+
if "." not in name and name not in logger_list
1633
]
17-
source_logger.debug(f"Found registered root loggers: {root_loggers}")
34+
35+
36+
def _find_root_loggers(
37+
source_logger: PowertoolsLogger, exclude: Optional[List[str]] = None, include: Optional[List[str]] = None
38+
) -> list[logging.Logger]:
39+
"""Filter root loggers based on provided parameters.
40+
41+
Ensure powertools logger itself is excluded from final list.
42+
"""
43+
root_loggers = []
1844
if include and not exclude:
19-
root_loggers = [logger for logger in root_loggers if logger.name in include]
20-
elif not include and exclude:
21-
root_loggers = [logger for logger in root_loggers if logger.name not in exclude]
45+
root_loggers = _include_root_loggers_filter(logger_list=include)
2246
elif include and exclude:
23-
root_loggers = [logger for logger in root_loggers if logger.name in include and logger.name not in exclude]
47+
exclude = [source_logger.name, *exclude]
48+
root_loggers = _include_root_loggers_filter(logger_list=list(set(include) - set(exclude)))
49+
elif not include and exclude:
50+
exclude = [source_logger.name, *exclude]
51+
root_loggers = _exclude_root_loggers_filter(logger_list=exclude)
52+
else:
53+
root_loggers = _exclude_root_loggers_filter(logger_list=[source_logger.name])
2454

2555
source_logger.debug(f"Filtered root loggers: {root_loggers}")
26-
for logger in root_loggers:
27-
logger.handlers = []
28-
logger.setLevel(source_logger.level)
29-
for source_handler in source_logger.handlers:
30-
logger.addHandler(source_handler)
56+
return root_loggers
57+
58+
59+
def _configure_logger(source_logger: PowertoolsLogger, logger: logging.Logger) -> None:
60+
logger.handlers = []
61+
logger.setLevel(source_logger.level)
62+
source_logger.debug(f"Logger {logger} reconfigured to use logging level {source_logger.level}")
63+
for source_handler in source_logger.handlers:
64+
logger.addHandler(source_handler)
65+
source_logger.debug(f"Logger {logger} reconfigured to use {source_handler}")

tests/functional/test_logger_utils.py

+44-22
Original file line numberDiff line numberDiff line change
@@ -25,15 +25,6 @@ class LogLevel(Enum):
2525
return LogLevel
2626

2727

28-
def capture_logging_output(stdout):
29-
return json.loads(stdout.getvalue().strip())
30-
31-
32-
def service_name():
33-
chars = string.ascii_letters + string.digits
34-
return "".join(random.SystemRandom().choice(chars) for _ in range(15))
35-
36-
3728
@pytest.fixture
3829
def logger(stdout, log_level):
3930
def _logger():
@@ -44,30 +35,43 @@ def _logger():
4435
return _logger
4536

4637

38+
def capture_logging_output(stdout):
39+
return json.loads(stdout.getvalue().strip())
40+
41+
42+
def capture_multiple_logging_statements_output(stdout):
43+
return [json.loads(line.strip()) for line in stdout.getvalue().split("\n") if line]
44+
45+
46+
def service_name():
47+
chars = string.ascii_letters + string.digits
48+
return "".join(random.SystemRandom().choice(chars) for _ in range(15))
49+
50+
4751
def test_copy_config_to_ext_loggers(stdout, logger, log_level):
4852

4953
msg = "test message"
5054

5155
# GIVEN a external logger and powertools logger initialized
52-
logger = logger()
53-
logger_initial_handlers = logger.handlers.copy()
54-
logger_initial_level = logger.level
56+
logger_1 = logger()
57+
logger_2 = logger()
58+
5559
powertools_logger = Logger(service=service_name(), level=log_level.INFO.value, stream=stdout)
5660

5761
# WHEN configuration copied from powertools logger to ALL external loggers AND our external logger used
5862
utils.copy_config_to_registered_loggers(source_logger=powertools_logger)
59-
logger.info(msg)
60-
log = capture_logging_output(stdout)
63+
logger_1.info(msg)
64+
logger_2.info(msg)
65+
logs = capture_multiple_logging_statements_output(stdout)
6166

6267
# THEN
63-
assert not logger_initial_handlers
64-
assert logger_initial_level == log_level.NOTSET.value
65-
assert len(logger.handlers) == 1
66-
assert type(logger.handlers[0]) is logging.StreamHandler
67-
assert type(logger.handlers[0].formatter) is formatter.LambdaPowertoolsFormatter
68-
assert logger.level == log_level.INFO.value
69-
assert log["message"] == msg
70-
assert log["level"] == log_level.INFO.name
68+
for index, logger in enumerate([logger_1, logger_2]):
69+
assert len(logger.handlers) == 1
70+
assert type(logger.handlers[0]) is logging.StreamHandler
71+
assert type(logger.handlers[0].formatter) is formatter.LambdaPowertoolsFormatter
72+
assert logger.level == log_level.INFO.value
73+
assert logs[index]["message"] == msg
74+
assert logs[index]["level"] == log_level.INFO.name
7175

7276

7377
def test_copy_config_to_ext_loggers_include(stdout, logger, log_level):
@@ -134,6 +138,7 @@ def test_copy_config_to_ext_loggers_include_exclude(stdout, logger, log_level):
134138
)
135139
logger_2.info(msg)
136140
log = capture_logging_output(stdout)
141+
137142
# THEN
138143
assert not logger_1.handlers
139144
assert len(logger_2.handlers) == 1
@@ -142,3 +147,20 @@ def test_copy_config_to_ext_loggers_include_exclude(stdout, logger, log_level):
142147
assert logger_2.level == log_level.INFO.value
143148
assert log["message"] == msg
144149
assert log["level"] == log_level.INFO.name
150+
151+
152+
def test_copy_config_to_ext_loggers_clean_old_handlers(stdout, logger, log_level):
153+
154+
# GIVEN a external logger with handler and powertools logger initialized
155+
logger = logger()
156+
handler = logging.FileHandler("logfile")
157+
logger.addHandler(handler)
158+
powertools_logger = Logger(service=service_name(), level=log_level.INFO.value, stream=stdout)
159+
160+
# WHEN configuration copied from powertools logger to ALL external loggers AND our external logger used
161+
utils.copy_config_to_registered_loggers(source_logger=powertools_logger)
162+
163+
# THEN
164+
assert len(logger.handlers) == 1
165+
assert type(logger.handlers[0]) is logging.StreamHandler
166+
assert type(logger.handlers[0].formatter) is formatter.LambdaPowertoolsFormatter

0 commit comments

Comments
 (0)