Skip to content

Commit b34f90c

Browse files
authored
[ENH] Add otel support for query and compaction service (#2122)
## Description of changes Export traces and spans in open telemetry format so that it can be consumed by backends like Jaeger and Honeycomb. For Jaeger verified that it works locally. Will test honeycomb once it is merged ## Test plan All existing tests pass. Verified locally via tilt up and tilt down that traces are exported to Jaeger ## Documentation Changes NA ---------
1 parent 3eb26d5 commit b34f90c

File tree

8 files changed

+329
-19
lines changed

8 files changed

+329
-19
lines changed

Cargo.lock

Lines changed: 235 additions & 15 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

rust/worker/Cargo.toml

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,11 @@ arrow = "50.0.0"
4545
roaring = "0.10.3"
4646
tantivy = "0.21.1"
4747
tracing = "0.1"
48-
tracing-subscriber = "0.3"
48+
tracing-bunyan-formatter = "0.3.3"
49+
tracing-opentelemetry = "0.19.0"
50+
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
51+
opentelemetry = { version = "0.19.0", default-features = false, features = ["trace", "rt-tokio"] }
52+
opentelemetry-otlp = "0.12.0"
4953

5054
[dev-dependencies]
5155
proptest = "1.4.0"

rust/worker/chroma_config.yaml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@
44
# for now we nest it in the worker directory
55

66
query_service:
7+
service_name: "query-service"
8+
otel_endpoint: "http://jaeger:4317"
79
my_ip: "10.244.0.9"
810
my_port: 50051
911
assignment_policy:
@@ -32,6 +34,8 @@ query_service:
3234
worker_queue_size: 100
3335

3436
compaction_service:
37+
service_name: "compaction-service"
38+
otel_endpoint: "http://jaeger:4317"
3539
my_ip: "10.244.0.9"
3640
my_port: 50051
3741
assignment_policy:

rust/worker/src/bin/query_service.rs

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,5 @@ use worker::query_service_entrypoint;
22

33
#[tokio::main]
44
async fn main() {
5-
tracing_subscriber::fmt()
6-
.with_max_level(tracing::Level::INFO)
7-
.init();
85
query_service_entrypoint().await;
96
}

rust/worker/src/config.rs

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,8 @@ impl RootConfig {
9393
/// Each submodule that needs to be configured from the config object should implement the Configurable trait and
9494
/// have its own field in this struct for its Config struct.
9595
pub(crate) struct QueryServiceConfig {
96+
pub(crate) service_name: String,
97+
pub(crate) otel_endpoint: String,
9698
pub(crate) my_ip: String,
9799
pub(crate) my_port: u16,
98100
pub(crate) assignment_policy: crate::assignment::config::AssignmentPolicyConfig,
@@ -115,6 +117,8 @@ pub(crate) struct QueryServiceConfig {
115117
/// Each submodule that needs to be configured from the config object should implement the Configurable trait and
116118
/// have its own field in this struct for its Config struct.
117119
pub(crate) struct CompactionServiceConfig {
120+
pub(crate) service_name: String,
121+
pub(crate) otel_endpoint: String,
118122
pub(crate) my_ip: String,
119123
pub(crate) my_port: u16,
120124
pub(crate) assignment_policy: crate::assignment::config::AssignmentPolicyConfig,
@@ -150,6 +154,8 @@ mod tests {
150154
"chroma_config.yaml",
151155
r#"
152156
query_service:
157+
service_name: "query-service"
158+
otel_endpoint: "http://jaeger:4317"
153159
my_ip: "192.0.0.1"
154160
my_port: 50051
155161
assignment_policy:
@@ -178,6 +184,8 @@ mod tests {
178184
worker_queue_size: 100
179185
180186
compaction_service:
187+
service_name: "compaction-service"
188+
otel_endpoint: "http://jaeger:4317"
181189
my_ip: "192.0.0.1"
182190
my_port: 50051
183191
assignment_policy:
@@ -227,6 +235,8 @@ mod tests {
227235
"random_path.yaml",
228236
r#"
229237
query_service:
238+
service_name: "query-service"
239+
otel_endpoint: "http://jaeger:4317"
230240
my_ip: "192.0.0.1"
231241
my_port: 50051
232242
assignment_policy:
@@ -255,6 +265,8 @@ mod tests {
255265
worker_queue_size: 100
256266
257267
compaction_service:
268+
service_name: "compaction-service"
269+
otel_endpoint: "http://jaeger:4317"
258270
my_ip: "192.0.0.1"
259271
my_port: 50051
260272
assignment_policy:
@@ -322,6 +334,8 @@ mod tests {
322334
"chroma_config.yaml",
323335
r#"
324336
query_service:
337+
service_name: "query-service"
338+
otel_endpoint: "http://jaeger:4317"
325339
my_ip: "192.0.0.1"
326340
my_port: 50051
327341
assignment_policy:
@@ -350,6 +364,8 @@ mod tests {
350364
worker_queue_size: 100
351365
352366
compaction_service:
367+
service_name: "compaction-service"
368+
otel_endpoint: "http://jaeger:4317"
353369
my_ip: "192.0.0.1"
354370
my_port: 50051
355371
assignment_policy:
@@ -401,6 +417,8 @@ mod tests {
401417
"chroma_config.yaml",
402418
r#"
403419
query_service:
420+
service_name: "query-service"
421+
otel_endpoint: "http://jaeger:4317"
404422
assignment_policy:
405423
RendezvousHashing:
406424
hasher: Murmur3
@@ -427,6 +445,8 @@ mod tests {
427445
worker_queue_size: 100
428446
429447
compaction_service:
448+
service_name: "compaction-service"
449+
otel_endpoint: "http://jaeger:4317"
430450
assignment_policy:
431451
RendezvousHashing:
432452
hasher: Murmur3

rust/worker/src/lib.rs

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ mod server;
1313
mod storage;
1414
mod sysdb;
1515
mod system;
16+
mod tracing;
1617
mod types;
1718

1819
use config::Configurable;
@@ -35,6 +36,12 @@ pub async fn query_service_entrypoint() {
3536
};
3637

3738
let config = config.query_service;
39+
40+
crate::tracing::opentelemetry_config::init_otel_tracing(
41+
&config.service_name,
42+
&config.otel_endpoint,
43+
);
44+
3845
let system: system::System = system::System::new();
3946
let dispatcher =
4047
match execution::dispatcher::Dispatcher::try_from_config(&config.dispatcher).await {
@@ -94,6 +101,12 @@ pub async fn compaction_service_entrypoint() {
94101
};
95102

96103
let config = config.compaction_service;
104+
105+
crate::tracing::opentelemetry_config::init_otel_tracing(
106+
&config.service_name,
107+
&config.otel_endpoint,
108+
);
109+
97110
let system: system::System = system::System::new();
98111

99112
let mut memberlist = match memberlist::CustomResourceMemberlistProvider::try_from_config(

rust/worker/src/tracing/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
pub(crate) mod opentelemetry_config;
Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
use opentelemetry::global;
2+
use opentelemetry::sdk::propagation::TraceContextPropagator;
3+
use opentelemetry::sdk::trace;
4+
use opentelemetry_otlp::WithExportConfig;
5+
use tracing_bunyan_formatter::BunyanFormattingLayer;
6+
use tracing_subscriber::{layer::SubscriberExt, EnvFilter, Layer};
7+
8+
pub(crate) fn init_otel_tracing(service_name: &String, otel_endpoint: &String) {
9+
println!(
10+
"Registering jaeger subscriber for {} at endpoint {}",
11+
service_name, otel_endpoint
12+
);
13+
let resource = opentelemetry::sdk::Resource::new(vec![opentelemetry::KeyValue::new(
14+
"service.name",
15+
service_name.clone(),
16+
)]);
17+
// Prepare trace config.
18+
let trace_config = trace::config()
19+
.with_sampler(opentelemetry::sdk::trace::Sampler::AlwaysOn)
20+
.with_resource(resource);
21+
// Prepare exporter.
22+
let exporter = opentelemetry_otlp::new_exporter()
23+
.tonic()
24+
.with_endpoint(otel_endpoint);
25+
let otlp_tracer = opentelemetry_otlp::new_pipeline()
26+
.tracing()
27+
.with_exporter(exporter)
28+
.with_trace_config(trace_config)
29+
.install_batch(opentelemetry::runtime::Tokio)
30+
.expect("Error - Failed to create tracer.");
31+
// Layer for adding our configured tracer.
32+
// Export everything at this layer. The backend i.e. honeycomb or jaeger will filter at its end.
33+
let exporter_layer = tracing_opentelemetry::layer()
34+
.with_tracer(otlp_tracer)
35+
.with_filter(tracing_subscriber::filter::LevelFilter::TRACE);
36+
// Layer for printing spans to stdout. Only print INFO logs by default.
37+
let stdout_layer =
38+
BunyanFormattingLayer::new(service_name.clone().to_string(), std::io::stdout)
39+
.with_filter(tracing_subscriber::filter::LevelFilter::INFO);
40+
// global filter layer. Don't filter anything at global layer.
41+
let global_layer = EnvFilter::new("TRACE");
42+
// Create subscriber.
43+
let subscriber = tracing_subscriber::registry()
44+
.with(global_layer)
45+
.with(stdout_layer)
46+
.with(exporter_layer);
47+
global::set_text_map_propagator(TraceContextPropagator::new());
48+
tracing::subscriber::set_global_default(subscriber)
49+
.expect("Set global default subscriber failed");
50+
println!("Set global subscriber for {}", service_name);
51+
}

0 commit comments

Comments
 (0)