Skip to content

Commit 74a8b90

Browse files
authored
Fix issue with Flask instrumentation when a request spawn children threads and copies the request context (#1654)
1 parent 3bcc043 commit 74a8b90

File tree

4 files changed

+94
-3
lines changed

4 files changed

+94
-3
lines changed

CHANGELOG.md

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,16 +7,23 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
77

88
## Unreleased
99

10+
### Added
11+
1012
- Add connection attributes to sqlalchemy connect span
1113
([#1608](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1608))
1214

15+
### Fixed
16+
17+
- Fix Flask instrumentation to only close the span if it was created by the same thread.
18+
([#1654](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1654))
19+
1320
## Version 1.16.0/0.37b0 (2023-02-17)
1421

1522
### Added
1623

1724
- Support `aio_pika` 9.x (([#1670](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1670])
1825
- `opentelemetry-instrumentation-redis` Add `sanitize_query` config option to allow query sanitization. ([#1572](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1572))
19-
- `opentelemetry-instrumentation-elasticsearch` Add optional db.statement query sanitization.
26+
- `opentelemetry-instrumentation-elasticsearch` Add optional db.statement query sanitization.
2027
([#1598](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1598))
2128
- `opentelemetry-instrumentation-celery` Record exceptions as events on the span.
2229
([#1573](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1573))

instrumentation/opentelemetry-instrumentation-flask/src/opentelemetry/instrumentation/flask/__init__.py

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -238,8 +238,8 @@ def response_hook(span: Span, status: str, response_headers: List):
238238
API
239239
---
240240
"""
241-
242241
from logging import getLogger
242+
from threading import get_ident
243243
from time import time_ns
244244
from timeit import default_timer
245245
from typing import Collection
@@ -265,6 +265,7 @@ def response_hook(span: Span, status: str, response_headers: List):
265265
_ENVIRON_STARTTIME_KEY = "opentelemetry-flask.starttime_key"
266266
_ENVIRON_SPAN_KEY = "opentelemetry-flask.span_key"
267267
_ENVIRON_ACTIVATION_KEY = "opentelemetry-flask.activation_key"
268+
_ENVIRON_THREAD_ID_KEY = "opentelemetry-flask.thread_id_key"
268269
_ENVIRON_TOKEN = "opentelemetry-flask.token"
269270

270271
_excluded_urls_from_env = get_excluded_urls("FLASK")
@@ -398,6 +399,7 @@ def _before_request():
398399
activation = trace.use_span(span, end_on_exit=True)
399400
activation.__enter__() # pylint: disable=E1101
400401
flask_request_environ[_ENVIRON_ACTIVATION_KEY] = activation
402+
flask_request_environ[_ENVIRON_THREAD_ID_KEY] = get_ident()
401403
flask_request_environ[_ENVIRON_SPAN_KEY] = span
402404
flask_request_environ[_ENVIRON_TOKEN] = token
403405

@@ -437,10 +439,17 @@ def _teardown_request(exc):
437439
return
438440

439441
activation = flask.request.environ.get(_ENVIRON_ACTIVATION_KEY)
440-
if not activation:
442+
thread_id = flask.request.environ.get(_ENVIRON_THREAD_ID_KEY)
443+
if not activation or thread_id != get_ident():
441444
# This request didn't start a span, maybe because it was created in
442445
# a way that doesn't run `before_request`, like when it is created
443446
# with `app.test_request_context`.
447+
#
448+
# Similarly, check the thread_id against the current thread to ensure
449+
# tear down only happens on the original thread. This situation can
450+
# arise if the original thread handling the request spawn children
451+
# threads and then uses something like copy_current_request_context
452+
# to copy the request context.
444453
return
445454
if exc is None:
446455
activation.__exit__(None, None, None)

instrumentation/opentelemetry-instrumentation-flask/tests/base_test.py

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,9 @@
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
1414

15+
from concurrent.futures import ThreadPoolExecutor, as_completed
16+
from random import randint
17+
1518
import flask
1619
from werkzeug.test import Client
1720
from werkzeug.wrappers import Response
@@ -34,6 +37,25 @@ def _sqlcommenter_endpoint():
3437
)
3538
return sqlcommenter_flask_values
3639

40+
@staticmethod
41+
def _multithreaded_endpoint(count):
42+
def do_random_stuff():
43+
@flask.copy_current_request_context
44+
def inner():
45+
return randint(0, 100)
46+
47+
return inner
48+
49+
executor = ThreadPoolExecutor(count)
50+
futures = []
51+
for _ in range(count):
52+
futures.append(executor.submit(do_random_stuff()))
53+
numbers = []
54+
for future in as_completed(futures):
55+
numbers.append(future.result())
56+
57+
return " ".join([str(i) for i in numbers])
58+
3759
@staticmethod
3860
def _custom_response_headers():
3961
resp = flask.Response("test response")
@@ -61,6 +83,7 @@ def excluded2_endpoint():
6183
# pylint: disable=no-member
6284
self.app.route("/hello/<int:helloid>")(self._hello_endpoint)
6385
self.app.route("/sqlcommenter")(self._sqlcommenter_endpoint)
86+
self.app.route("/multithreaded")(self._multithreaded_endpoint)
6487
self.app.route("/excluded/<int:helloid>")(self._hello_endpoint)
6588
self.app.route("/excluded")(excluded_endpoint)
6689
self.app.route("/excluded2")(excluded2_endpoint)
Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
# Copyright The OpenTelemetry Authors
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
import flask
16+
from werkzeug.test import Client
17+
from werkzeug.wrappers import Response
18+
19+
from opentelemetry.instrumentation.flask import FlaskInstrumentor
20+
from opentelemetry.test.wsgitestutil import WsgiTestBase
21+
22+
# pylint: disable=import-error
23+
from .base_test import InstrumentationTest
24+
25+
26+
class TestMultiThreading(InstrumentationTest, WsgiTestBase):
27+
def setUp(self):
28+
super().setUp()
29+
FlaskInstrumentor().instrument()
30+
self.app = flask.Flask(__name__)
31+
self._common_initialization()
32+
33+
def tearDown(self):
34+
super().tearDown()
35+
with self.disable_logging():
36+
FlaskInstrumentor().uninstrument()
37+
38+
def test_multithreaded(self):
39+
"""Test that instrumentation tear down does not blow up
40+
when the request thread spawn children threads and the request
41+
context is copied to the children threads
42+
"""
43+
self.app = flask.Flask(__name__)
44+
self.app.route("/multithreaded/<int:count>")(
45+
self._multithreaded_endpoint
46+
)
47+
client = Client(self.app, Response)
48+
count = 5
49+
resp = client.get(f"/multithreaded/{count}")
50+
self.assertEqual(200, resp.status_code)
51+
# Should return the specified number of random integers
52+
self.assertEqual(count, len(resp.text.split(" ")))

0 commit comments

Comments
 (0)