Skip to content

Commit b62d3d1

Browse files
aepflitoddbaertgruebel
authored
feat(flagd-rpc)!: add events for rpc mode, some breaking config fixes (#108)
* build(flagd): auto generate proto files from schema Signed-off-by: Simon Schrottner <[email protected]> * fixup: changing to mypy-protobuf Signed-off-by: Simon Schrottner <[email protected]> * fixup: changing to mypy-protobuf Signed-off-by: Simon Schrottner <[email protected]> * Update providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/resolvers/grpc.py Co-authored-by: Todd Baert <[email protected]> Signed-off-by: Simon Schrottner <[email protected]> * Apply suggestions from code review Signed-off-by: Simon Schrottner <[email protected]> * Update providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/provider.py Co-authored-by: Anton Grübel <[email protected]> Signed-off-by: Simon Schrottner <[email protected]> --------- Signed-off-by: Simon Schrottner <[email protected]> Co-authored-by: Todd Baert <[email protected]> Co-authored-by: Anton Grübel <[email protected]>
1 parent 61e42e7 commit b62d3d1

File tree

13 files changed

+656
-123
lines changed

13 files changed

+656
-123
lines changed

providers/openfeature-provider-flagd/README.md

Lines changed: 76 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,14 @@ pip install openfeature-provider-flagd
1010

1111
## Configuration and Usage
1212

13+
The flagd provider can operate in two modes: [RPC](#remote-resolver-rpc) (evaluation takes place in flagd, via gRPC calls) or [in-process](#in-process-resolver) (evaluation takes place in-process, with the provider getting a ruleset from a compliant sync-source).
14+
15+
### Remote resolver (RPC)
16+
17+
This is the default mode of operation of the provider.
18+
In this mode, `FlagdProvider` communicates with [flagd](https://github.com/open-feature/flagd) via the gRPC protocol.
19+
Flag evaluations take place remotely at the connected flagd instance.
20+
1321
Instantiate a new FlagdProvider instance and configure the OpenFeature SDK to use it:
1422

1523
```python
@@ -19,7 +27,9 @@ from openfeature.contrib.provider.flagd import FlagdProvider
1927
api.set_provider(FlagdProvider())
2028
```
2129

22-
To use in-process evaluation in offline mode with a file as source:
30+
### In-process resolver
31+
32+
This mode performs flag evaluations locally (in-process).
2333

2434
```python
2535
from openfeature import api
@@ -36,12 +46,71 @@ api.set_provider(FlagdProvider(
3646

3747
The default options can be defined in the FlagdProvider constructor.
3848

39-
| Option name | Type & Values | Default |
40-
|----------------|---------------|-----------|
41-
| host | str | localhost |
42-
| port | int | 8013 |
43-
| schema | str | http |
44-
| timeout | int | 2 |
49+
| Option name | Environment variable name | Type & Values | Default | Compatible resolver |
50+
| ------------------------ | ------------------------------ | -------------------------- | ----------------------------- | ------------------- |
51+
| resolver_type | FLAGD_RESOLVER | enum - `rpc`, `in-process` | rpc | |
52+
| host | FLAGD_HOST | str | localhost | rpc & in-process |
53+
| port | FLAGD_PORT | int | 8013 (rpc), 8015 (in-process) | rpc & in-process |
54+
| tls | FLAGD_TLS | bool | false | rpc & in-process |
55+
| deadline | FLAGD_DEADLINE_MS | int | 500 | rpc & in-process |
56+
| stream_deadline_ms | FLAGD_STREAM_DEADLINE_MS | int | 600000 | rpc & in-process |
57+
| keep_alive_time | FLAGD_KEEP_ALIVE_TIME_MS | int | 0 | rpc & in-process |
58+
| selector | FLAGD_SOURCE_SELECTOR | str | null | in-process |
59+
| cache_type | FLAGD_CACHE | enum - `lru`, `disabled` | lru | rpc |
60+
| max_cache_size | FLAGD_MAX_CACHE_SIZE | int | 1000 | rpc |
61+
| retry_backoff_ms | FLAGD_RETRY_BACKOFF_MS | int | 1000 | rpc |
62+
| offline_flag_source_path | FLAGD_OFFLINE_FLAG_SOURCE_PATH | str | null | in-process |
63+
64+
<!-- not implemented
65+
| target_uri | FLAGD_TARGET_URI | alternative to host/port, supporting custom name resolution | string | null | rpc & in-process |
66+
| socket_path | FLAGD_SOCKET_PATH | alternative to host port, unix socket | String | null | rpc & in-process |
67+
| cert_path | FLAGD_SERVER_CERT_PATH | tls cert path | String | null | rpc & in-process |
68+
| max_event_stream_retries | FLAGD_MAX_EVENT_STREAM_RETRIES | int | 5 | rpc |
69+
| context_enricher | - | sync-metadata to evaluation context mapping function | function | identity function | in-process |
70+
| offline_pollIntervalMs | FLAGD_OFFLINE_POLL_MS | poll interval for reading offlineFlagSourcePath | int | 5000 | in-process |
71+
-->
72+
73+
> [!NOTE]
74+
> Some configurations are only applicable for RPC resolver.
75+
76+
<!--
77+
### Unix socket support
78+
Unix socket communication with flagd is facilitated by usaging of the linux-native `epoll` library on `linux-x86_64`
79+
only (ARM support is pending the release of `netty-transport-native-epoll` v5).
80+
Unix sockets are not supported on other platforms or architectures.
81+
-->
82+
83+
### Reconnection
84+
85+
Reconnection is supported by the underlying gRPC connections.
86+
If the connection to flagd is lost, it will reconnect automatically.
87+
A failure to connect will result in an [error event](https://openfeature.dev/docs/reference/concepts/events#provider_error) from the provider, though it will attempt to reconnect indefinitely.
88+
89+
### Deadlines
90+
91+
Deadlines are used to define how long the provider waits to complete initialization or flag evaluations.
92+
They behave differently based on the resolver type.
93+
94+
#### Deadlines with Remote resolver (RPC)
95+
96+
If the remote evaluation call is not completed within this deadline, the gRPC call is terminated with the error `DEADLINE_EXCEEDED`
97+
and the evaluation will default.
98+
99+
### TLS
100+
101+
TLS is available in situations where flagd is running on another host.
102+
103+
<!--
104+
You may optionally supply an X.509 certificate in PEM format. Otherwise, the default certificate store will be used.
105+
```java
106+
FlagdProvider flagdProvider = new FlagdProvider(
107+
FlagdOptions.builder()
108+
.host("myflagdhost")
109+
.tls(true) // use TLS
110+
.certPath("etc/cert/ca.crt") // PEM cert
111+
.build());
112+
```
113+
-->
45114

46115
## License
47116

providers/openfeature-provider-flagd/pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ classifiers = [
1818
keywords = []
1919
dependencies = [
2020
"openfeature-sdk>=0.6.0",
21-
"grpcio>=1.60.0",
21+
"grpcio>=1.68.0",
2222
"protobuf>=4.25.2",
2323
"mmh3>=4.1.0",
2424
"panzi-json-logic>=1.0.1",

providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/config.py

Lines changed: 87 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,33 @@
22
import typing
33
from enum import Enum
44

5+
6+
class ResolverType(Enum):
7+
RPC = "rpc"
8+
IN_PROCESS = "in-process"
9+
10+
11+
DEFAULT_DEADLINE = 500
12+
DEFAULT_HOST = "localhost"
13+
DEFAULT_KEEP_ALIVE = 0
14+
DEFAULT_OFFLINE_SOURCE_PATH: typing.Optional[str] = None
15+
DEFAULT_PORT_IN_PROCESS = 8015
16+
DEFAULT_PORT_RPC = 8013
17+
DEFAULT_RESOLVER_TYPE = ResolverType.RPC
18+
DEFAULT_RETRY_BACKOFF = 1000
19+
DEFAULT_STREAM_DEADLINE = 600000
20+
DEFAULT_TLS = False
21+
22+
ENV_VAR_DEADLINE_MS = "FLAGD_DEADLINE_MS"
23+
ENV_VAR_HOST = "FLAGD_HOST"
24+
ENV_VAR_KEEP_ALIVE_TIME_MS = "FLAGD_KEEP_ALIVE_TIME_MS"
25+
ENV_VAR_OFFLINE_FLAG_SOURCE_PATH = "FLAGD_OFFLINE_FLAG_SOURCE_PATH"
26+
ENV_VAR_PORT = "FLAGD_PORT"
27+
ENV_VAR_RESOLVER_TYPE = "FLAGD_RESOLVER_TYPE"
28+
ENV_VAR_RETRY_BACKOFF_MS = "FLAGD_RETRY_BACKOFF_MS"
29+
ENV_VAR_STREAM_DEADLINE_MS = "FLAGD_STREAM_DEADLINE_MS"
30+
ENV_VAR_TLS = "FLAGD_TLS"
31+
532
T = typing.TypeVar("T")
633

734

@@ -18,42 +45,83 @@ def env_or_default(
1845
return val if cast is None else cast(val)
1946

2047

21-
class ResolverType(Enum):
22-
GRPC = "grpc"
23-
IN_PROCESS = "in-process"
24-
25-
2648
class Config:
2749
def __init__( # noqa: PLR0913
2850
self,
2951
host: typing.Optional[str] = None,
3052
port: typing.Optional[int] = None,
3153
tls: typing.Optional[bool] = None,
32-
timeout: typing.Optional[int] = None,
3354
resolver_type: typing.Optional[ResolverType] = None,
3455
offline_flag_source_path: typing.Optional[str] = None,
35-
offline_poll_interval_seconds: typing.Optional[float] = None,
56+
retry_backoff_ms: typing.Optional[int] = None,
57+
deadline: typing.Optional[int] = None,
58+
stream_deadline_ms: typing.Optional[int] = None,
59+
keep_alive_time: typing.Optional[int] = None,
3660
):
37-
self.host = env_or_default("FLAGD_HOST", "localhost") if host is None else host
38-
self.port = (
39-
env_or_default("FLAGD_PORT", 8013, cast=int) if port is None else port
40-
)
61+
self.host = env_or_default(ENV_VAR_HOST, DEFAULT_HOST) if host is None else host
62+
4163
self.tls = (
42-
env_or_default("FLAGD_TLS", False, cast=str_to_bool) if tls is None else tls
64+
env_or_default(ENV_VAR_TLS, DEFAULT_TLS, cast=str_to_bool)
65+
if tls is None
66+
else tls
4367
)
44-
self.timeout = 5 if timeout is None else timeout
68+
69+
self.retry_backoff_ms: int = (
70+
int(
71+
env_or_default(
72+
ENV_VAR_RETRY_BACKOFF_MS, DEFAULT_RETRY_BACKOFF, cast=int
73+
)
74+
)
75+
if retry_backoff_ms is None
76+
else retry_backoff_ms
77+
)
78+
4579
self.resolver_type = (
46-
ResolverType(env_or_default("FLAGD_RESOLVER_TYPE", "grpc"))
80+
ResolverType(env_or_default(ENV_VAR_RESOLVER_TYPE, DEFAULT_RESOLVER_TYPE))
4781
if resolver_type is None
4882
else resolver_type
4983
)
84+
85+
default_port = (
86+
DEFAULT_PORT_RPC
87+
if self.resolver_type is ResolverType.RPC
88+
else DEFAULT_PORT_IN_PROCESS
89+
)
90+
91+
self.port: int = (
92+
int(env_or_default(ENV_VAR_PORT, default_port, cast=int))
93+
if port is None
94+
else port
95+
)
96+
5097
self.offline_flag_source_path = (
51-
env_or_default("FLAGD_OFFLINE_FLAG_SOURCE_PATH", None)
98+
env_or_default(
99+
ENV_VAR_OFFLINE_FLAG_SOURCE_PATH, DEFAULT_OFFLINE_SOURCE_PATH
100+
)
52101
if offline_flag_source_path is None
53102
else offline_flag_source_path
54103
)
55-
self.offline_poll_interval_seconds = (
56-
float(env_or_default("FLAGD_OFFLINE_POLL_INTERVAL_SECONDS", 1.0))
57-
if offline_poll_interval_seconds is None
58-
else offline_poll_interval_seconds
104+
105+
self.deadline: int = (
106+
int(env_or_default(ENV_VAR_DEADLINE_MS, DEFAULT_DEADLINE, cast=int))
107+
if deadline is None
108+
else deadline
109+
)
110+
111+
self.stream_deadline_ms: int = (
112+
int(
113+
env_or_default(
114+
ENV_VAR_STREAM_DEADLINE_MS, DEFAULT_STREAM_DEADLINE, cast=int
115+
)
116+
)
117+
if stream_deadline_ms is None
118+
else stream_deadline_ms
119+
)
120+
121+
self.keep_alive_time: int = (
122+
int(
123+
env_or_default(ENV_VAR_KEEP_ALIVE_TIME_MS, DEFAULT_KEEP_ALIVE, cast=int)
124+
)
125+
if keep_alive_time is None
126+
else keep_alive_time
59127
)

providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/provider.py

Lines changed: 34 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
"""
2323

2424
import typing
25+
import warnings
2526

2627
from openfeature.evaluation_context import EvaluationContext
2728
from openfeature.flag_evaluation import FlagResolutionDetails
@@ -42,41 +43,68 @@ def __init__( # noqa: PLR0913
4243
host: typing.Optional[str] = None,
4344
port: typing.Optional[int] = None,
4445
tls: typing.Optional[bool] = None,
46+
deadline: typing.Optional[int] = None,
4547
timeout: typing.Optional[int] = None,
48+
retry_backoff_ms: typing.Optional[int] = None,
4649
resolver_type: typing.Optional[ResolverType] = None,
4750
offline_flag_source_path: typing.Optional[str] = None,
48-
offline_poll_interval_seconds: typing.Optional[float] = None,
51+
stream_deadline_ms: typing.Optional[int] = None,
52+
keep_alive_time: typing.Optional[int] = None,
4953
):
5054
"""
5155
Create an instance of the FlagdProvider
5256
5357
:param host: the host to make requests to
5458
:param port: the port the flagd service is available on
5559
:param tls: enable/disable secure TLS connectivity
56-
:param timeout: the maximum to wait before a request times out
60+
:param deadline: the maximum to wait before a request times out
61+
:param timeout: the maximum time to wait before a request times out
62+
:param retry_backoff_ms: the number of milliseconds to backoff
63+
:param offline_flag_source_path: the path to the flag source file
64+
:param stream_deadline_ms: the maximum time to wait before a request times out
65+
:param keep_alive_time: the number of milliseconds to keep alive
66+
:param resolver_type: the type of resolver to use
5767
"""
68+
if deadline is None and timeout is not None:
69+
deadline = timeout * 1000
70+
warnings.warn(
71+
"'timeout' property is deprecated, please use 'deadline' instead, be aware that 'deadline' is in milliseconds",
72+
DeprecationWarning,
73+
stacklevel=2,
74+
)
75+
5876
self.config = Config(
5977
host=host,
6078
port=port,
6179
tls=tls,
62-
timeout=timeout,
80+
deadline=deadline,
81+
retry_backoff_ms=retry_backoff_ms,
6382
resolver_type=resolver_type,
6483
offline_flag_source_path=offline_flag_source_path,
65-
offline_poll_interval_seconds=offline_poll_interval_seconds,
84+
stream_deadline_ms=stream_deadline_ms,
85+
keep_alive_time=keep_alive_time,
6686
)
6787

6888
self.resolver = self.setup_resolver()
6989

7090
def setup_resolver(self) -> AbstractResolver:
71-
if self.config.resolver_type == ResolverType.GRPC:
72-
return GrpcResolver(self.config)
91+
if self.config.resolver_type == ResolverType.RPC:
92+
return GrpcResolver(
93+
self.config,
94+
self.emit_provider_ready,
95+
self.emit_provider_error,
96+
self.emit_provider_configuration_changed,
97+
)
7398
elif self.config.resolver_type == ResolverType.IN_PROCESS:
7499
return InProcessResolver(self.config, self)
75100
else:
76101
raise ValueError(
77102
f"`resolver_type` parameter invalid: {self.config.resolver_type}"
78103
)
79104

105+
def initialize(self, evaluation_context: EvaluationContext) -> None:
106+
self.resolver.initialize(evaluation_context)
107+
80108
def shutdown(self) -> None:
81109
if self.resolver:
82110
self.resolver.shutdown()

providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/resolvers/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@
88

99

1010
class AbstractResolver(typing.Protocol):
11+
def initialize(self, evaluation_context: EvaluationContext) -> None: ...
12+
1113
def shutdown(self) -> None: ...
1214

1315
def resolve_boolean_details(

0 commit comments

Comments
 (0)