Skip to content

Commit 8e23a70

Browse files
authored
feat: attempts with connection improvements (#118)
* feat(flagd-rpc): add caching with tests Signed-off-by: Simon Schrottner <[email protected]> * fixup: using new test-harness Signed-off-by: Simon Schrottner <[email protected]> * fixup(flagd): remove merge conflict error as stated by warber Signed-off-by: Simon Schrottner <[email protected]> * feat(flagd): add graceful attempts Signed-off-by: Simon Schrottner <[email protected]> * feat(flagd): add graceful attempts Signed-off-by: Simon Schrottner <[email protected]> * feat: better reconnect gherkins Signed-off-by: Simon Schrottner <[email protected]> * fixup: unblock Signed-off-by: Simon Schrottner <[email protected]> * fixup: incuberating feedback from pr review Signed-off-by: Simon Schrottner <[email protected]> * fixup: incuberating feedback from pr review Signed-off-by: Simon Schrottner <[email protected]> --------- Signed-off-by: Simon Schrottner <[email protected]>
1 parent f156ea5 commit 8e23a70

File tree

9 files changed

+166
-109
lines changed

9 files changed

+166
-109
lines changed

providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/config.py

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ class CacheType(Enum):
2626
DEFAULT_RESOLVER_TYPE = ResolverType.RPC
2727
DEFAULT_RETRY_BACKOFF = 1000
2828
DEFAULT_RETRY_BACKOFF_MAX = 120000
29-
DEFAULT_RETRY_GRACE_ATTEMPTS = 5
29+
DEFAULT_RETRY_GRACE_PERIOD_SECONDS = 5
3030
DEFAULT_STREAM_DEADLINE = 600000
3131
DEFAULT_TLS = False
3232

@@ -41,7 +41,7 @@ class CacheType(Enum):
4141
ENV_VAR_RESOLVER_TYPE = "FLAGD_RESOLVER"
4242
ENV_VAR_RETRY_BACKOFF_MS = "FLAGD_RETRY_BACKOFF_MS"
4343
ENV_VAR_RETRY_BACKOFF_MAX_MS = "FLAGD_RETRY_BACKOFF_MAX_MS"
44-
ENV_VAR_RETRY_GRACE_ATTEMPTS = "FLAGD_RETRY_GRACE_ATTEMPTS"
44+
ENV_VAR_RETRY_GRACE_PERIOD_SECONDS = "FLAGD_RETRY_GRACE_PERIOD"
4545
ENV_VAR_STREAM_DEADLINE_MS = "FLAGD_STREAM_DEADLINE_MS"
4646
ENV_VAR_TLS = "FLAGD_TLS"
4747

@@ -81,7 +81,7 @@ def __init__( # noqa: PLR0913
8181
offline_poll_interval_ms: typing.Optional[int] = None,
8282
retry_backoff_ms: typing.Optional[int] = None,
8383
retry_backoff_max_ms: typing.Optional[int] = None,
84-
retry_grace_attempts: typing.Optional[int] = None,
84+
retry_grace_period: typing.Optional[int] = None,
8585
deadline_ms: typing.Optional[int] = None,
8686
stream_deadline_ms: typing.Optional[int] = None,
8787
keep_alive_time: typing.Optional[int] = None,
@@ -115,14 +115,16 @@ def __init__( # noqa: PLR0913
115115
else retry_backoff_max_ms
116116
)
117117

118-
self.retry_grace_attempts: int = (
118+
self.retry_grace_period: int = (
119119
int(
120120
env_or_default(
121-
ENV_VAR_RETRY_GRACE_ATTEMPTS, DEFAULT_RETRY_GRACE_ATTEMPTS, cast=int
121+
ENV_VAR_RETRY_GRACE_PERIOD_SECONDS,
122+
DEFAULT_RETRY_GRACE_PERIOD_SECONDS,
123+
cast=int,
122124
)
123125
)
124-
if retry_grace_attempts is None
125-
else retry_grace_attempts
126+
if retry_grace_period is None
127+
else retry_grace_period
126128
)
127129

128130
self.resolver = (

providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/provider.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ def __init__( # noqa: PLR0913
5353
cache_type: typing.Optional[CacheType] = None,
5454
max_cache_size: typing.Optional[int] = None,
5555
retry_backoff_max_ms: typing.Optional[int] = None,
56-
retry_grace_attempts: typing.Optional[int] = None,
56+
retry_grace_period: typing.Optional[int] = None,
5757
):
5858
"""
5959
Create an instance of the FlagdProvider
@@ -84,7 +84,7 @@ def __init__( # noqa: PLR0913
8484
deadline_ms=deadline,
8585
retry_backoff_ms=retry_backoff_ms,
8686
retry_backoff_max_ms=retry_backoff_max_ms,
87-
retry_grace_attempts=retry_grace_attempts,
87+
retry_grace_period=retry_grace_period,
8888
resolver=resolver_type,
8989
offline_flag_source_path=offline_flag_source_path,
9090
stream_deadline_ms=stream_deadline_ms,

providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/resolvers/grpc.py

Lines changed: 81 additions & 68 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
from cachebox import BaseCacheImpl, LRUCache
88
from google.protobuf.json_format import MessageToDict
99
from google.protobuf.struct_pb2 import Struct
10+
from grpc import ChannelConnectivity
1011

1112
from openfeature.evaluation_context import EvaluationContext
1213
from openfeature.event import ProviderEventDetails
@@ -47,6 +48,7 @@ def __init__(
4748
[ProviderEventDetails], None
4849
],
4950
):
51+
self.active = False
5052
self.config = config
5153
self.emit_provider_ready = emit_provider_ready
5254
self.emit_provider_error = emit_provider_error
@@ -57,26 +59,30 @@ def __init__(
5759
if self.config.cache == CacheType.LRU
5860
else None
5961
)
60-
self.stub, self.channel = self._create_stub()
61-
self.retry_backoff_seconds = config.retry_backoff_ms * 0.001
62-
self.retry_backoff_max_seconds = config.retry_backoff_max_ms * 0.001
63-
self.retry_grace_attempts = config.retry_grace_attempts
62+
63+
self.retry_grace_period = config.retry_grace_period
6464
self.streamline_deadline_seconds = config.stream_deadline_ms * 0.001
6565
self.deadline = config.deadline_ms * 0.001
6666
self.connected = False
67-
68-
def _create_stub(
69-
self,
70-
) -> typing.Tuple[evaluation_pb2_grpc.ServiceStub, grpc.Channel]:
71-
config = self.config
7267
channel_factory = grpc.secure_channel if config.tls else grpc.insecure_channel
73-
channel = channel_factory(
68+
69+
# Create the channel with the service config
70+
options = [
71+
("grpc.keepalive_time_ms", config.keep_alive_time),
72+
("grpc.initial_reconnect_backoff_ms", config.retry_backoff_ms),
73+
("grpc.max_reconnect_backoff_ms", config.retry_backoff_max_ms),
74+
("grpc.min_reconnect_backoff_ms", config.deadline_ms),
75+
]
76+
self.channel = channel_factory(
7477
f"{config.host}:{config.port}",
75-
options=(("grpc.keepalive_time_ms", config.keep_alive_time),),
78+
options=options,
7679
)
77-
stub = evaluation_pb2_grpc.ServiceStub(channel)
80+
self.stub = evaluation_pb2_grpc.ServiceStub(self.channel)
7881

79-
return stub, channel
82+
self.thread: typing.Optional[threading.Thread] = None
83+
self.timer: typing.Optional[threading.Timer] = None
84+
85+
self.start_time = time.time()
8086

8187
def initialize(self, evaluation_context: EvaluationContext) -> None:
8288
self.connect()
@@ -89,11 +95,12 @@ def shutdown(self) -> None:
8995

9096
def connect(self) -> None:
9197
self.active = True
92-
self.thread = threading.Thread(
93-
target=self.listen, daemon=True, name="FlagdGrpcServiceWorkerThread"
94-
)
95-
self.thread.start()
9698

99+
# Run monitoring in a separate thread
100+
self.monitor_thread = threading.Thread(
101+
target=self.monitor, daemon=True, name="FlagdGrpcServiceMonitorThread"
102+
)
103+
self.monitor_thread.start()
97104
## block until ready or deadline reached
98105
timeout = self.deadline + time.time()
99106
while not self.connected and time.time() < timeout:
@@ -105,81 +112,87 @@ def connect(self) -> None:
105112
"Blocking init finished before data synced. Consider increasing startup deadline to avoid inconsistent evaluations."
106113
)
107114

115+
def monitor(self) -> None:
116+
self.channel.subscribe(self._state_change_callback, try_to_connect=True)
117+
118+
def _state_change_callback(self, new_state: ChannelConnectivity) -> None:
119+
logger.debug(f"gRPC state change: {new_state}")
120+
if new_state == ChannelConnectivity.READY:
121+
if not self.thread or not self.thread.is_alive():
122+
self.thread = threading.Thread(
123+
target=self.listen,
124+
daemon=True,
125+
name="FlagdGrpcServiceWorkerThread",
126+
)
127+
self.thread.start()
128+
129+
if self.timer and self.timer.is_alive():
130+
logger.debug("gRPC error timer expired")
131+
self.timer.cancel()
132+
133+
elif new_state == ChannelConnectivity.TRANSIENT_FAILURE:
134+
# this is the failed reconnect attempt so we are going into stale
135+
self.emit_provider_stale(
136+
ProviderEventDetails(
137+
message="gRPC sync disconnected, reconnecting",
138+
)
139+
)
140+
self.start_time = time.time()
141+
# adding a timer, so we can emit the error event after time
142+
self.timer = threading.Timer(self.retry_grace_period, self.emit_error)
143+
144+
logger.debug("gRPC error timer started")
145+
self.timer.start()
146+
self.connected = False
147+
148+
def emit_error(self) -> None:
149+
logger.debug("gRPC error emitted")
150+
if self.cache:
151+
self.cache.clear()
152+
self.emit_provider_error(
153+
ProviderEventDetails(
154+
message="gRPC sync disconnected, reconnecting",
155+
error_code=ErrorCode.GENERAL,
156+
)
157+
)
158+
108159
def listen(self) -> None:
109-
retry_delay = self.retry_backoff_seconds
160+
logger.debug("gRPC starting listener thread")
110161
call_args = (
111162
{"timeout": self.streamline_deadline_seconds}
112163
if self.streamline_deadline_seconds > 0
113164
else {}
114165
)
115-
retry_counter = 0
116-
while self.active:
117-
request = evaluation_pb2.EventStreamRequest()
166+
call_args["wait_for_ready"] = True
167+
request = evaluation_pb2.EventStreamRequest()
118168

169+
# defining a never ending loop to recreate the stream
170+
while self.active:
119171
try:
120172
logger.debug("Setting up gRPC sync flags connection")
121173
for message in self.stub.EventStream(request, **call_args):
122174
if message.type == "provider_ready":
123-
if not self.connected:
124-
self.emit_provider_ready(
125-
ProviderEventDetails(
126-
message="gRPC sync connection established"
127-
)
175+
self.connected = True
176+
self.emit_provider_ready(
177+
ProviderEventDetails(
178+
message="gRPC sync connection established"
128179
)
129-
self.connected = True
130-
retry_counter = 0
131-
# reset retry delay after successsful read
132-
retry_delay = self.retry_backoff_seconds
133-
180+
)
134181
elif message.type == "configuration_change":
135182
data = MessageToDict(message)["data"]
136183
self.handle_changed_flags(data)
137184

138185
if not self.active:
139186
logger.info("Terminating gRPC sync thread")
140187
return
141-
except grpc.RpcError as e:
142-
logger.error(f"SyncFlags stream error, {e.code()=} {e.details()=}")
143-
# re-create the stub if there's a connection issue - otherwise reconnect does not work as expected
144-
self.stub, self.channel = self._create_stub()
188+
except grpc.RpcError as e: # noqa: PERF203
189+
# although it seems like this error log is not interesting, without it, the retry is not working as expected
190+
logger.debug(f"SyncFlags stream error, {e.code()=} {e.details()=}")
145191
except ParseError:
146192
logger.exception(
147193
f"Could not parse flag data using flagd syntax: {message=}"
148194
)
149195

150-
self.connected = False
151-
self.on_connection_error(retry_counter, retry_delay)
152-
153-
retry_delay = self.handle_retry(retry_counter, retry_delay)
154-
155-
retry_counter = retry_counter + 1
156-
157-
def handle_retry(self, retry_counter: int, retry_delay: float) -> float:
158-
if retry_counter == 0:
159-
logger.info("gRPC sync disconnected, reconnecting immediately")
160-
else:
161-
logger.info(f"gRPC sync disconnected, reconnecting in {retry_delay}s")
162-
time.sleep(retry_delay)
163-
retry_delay = min(1.1 * retry_delay, self.retry_backoff_max_seconds)
164-
return retry_delay
165-
166-
def on_connection_error(self, retry_counter: int, retry_delay: float) -> None:
167-
if retry_counter == self.retry_grace_attempts:
168-
if self.cache:
169-
self.cache.clear()
170-
self.emit_provider_error(
171-
ProviderEventDetails(
172-
message=f"gRPC sync disconnected, reconnecting in {retry_delay}s",
173-
error_code=ErrorCode.GENERAL,
174-
)
175-
)
176-
elif retry_counter == 1:
177-
self.emit_provider_stale(
178-
ProviderEventDetails(
179-
message=f"gRPC sync disconnected, reconnecting in {retry_delay}s",
180-
)
181-
)
182-
183196
def handle_changed_flags(self, data: typing.Any) -> None:
184197
changed_flags = list(data["flags"].keys())
185198

providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/resolvers/process/file_watcher.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,17 +31,19 @@ def __init__(
3131
self.last_modified = 0.0
3232
self.flag_data: typing.Mapping[str, Flag] = {}
3333
self.load_data()
34+
self.active = True
3435
self.thread = threading.Thread(target=self.refresh_file, daemon=True)
3536
self.thread.start()
3637

3738
def shutdown(self) -> None:
39+
self.active = False
3840
pass
3941

4042
def get_flag(self, key: str) -> typing.Optional[Flag]:
4143
return self.flag_data.get(key)
4244

4345
def refresh_file(self) -> None:
44-
while True:
46+
while self.active:
4547
time.sleep(self.poll_interval_seconds)
4648
logger.debug("checking for new flag store contents from file")
4749
last_modified = os.path.getmtime(self.file_path)
Lines changed: 0 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,5 @@
11
import typing
22

3-
import pytest
4-
from testcontainers.core.container import DockerContainer
5-
from tests.e2e.flagd_container import FlagdContainer
63
from tests.e2e.steps import * # noqa: F403
74

85
JsonPrimitive = typing.Union[str, bool, float, int]
@@ -18,21 +15,3 @@ def pytest_collection_modifyitems(config):
1815
# this seems to not work with python 3.8
1916
if hasattr(config.option, "markexpr") and config.option.markexpr == "":
2017
config.option.markexpr = marker
21-
22-
23-
@pytest.fixture(autouse=True, scope="module")
24-
def setup(request, port, image):
25-
container: DockerContainer = FlagdContainer(
26-
image=image,
27-
port=port,
28-
)
29-
# Setup code
30-
c = container.start()
31-
32-
def fin():
33-
c.stop()
34-
35-
# Teardown code
36-
request.addfinalizer(fin)
37-
38-
return c.get_exposed_port(port)

0 commit comments

Comments
 (0)