Skip to content

Commit f2e83c3

Browse files
authored
ref(cardinality): Log Sentry errors for cardinality limits (#3053)
To also track which namespace/limit was affected, this PR also adds a hashset of limit IDs to the cardinality limiter return value.
1 parent 32b4750 commit f2e83c3

File tree

5 files changed

+95
-46
lines changed

5 files changed

+95
-46
lines changed

relay-cardinality/benches/redis_impl.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,8 +36,8 @@ fn build_limiter(redis: RedisPool, reset_redis: bool) -> RedisSetLimiter {
3636

3737
struct NoopRejections;
3838

39-
impl Rejections for NoopRejections {
40-
fn reject(&mut self, _entry_id: EntryId) {}
39+
impl<'a> Rejections<'a> for NoopRejections {
40+
fn reject(&mut self, _limit_id: &'a str, _entry_id: EntryId) {}
4141
}
4242

4343
#[derive(Debug)]

relay-cardinality/src/limiter.rs

Lines changed: 56 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -20,26 +20,26 @@ pub struct Scoping {
2020
}
2121

2222
/// Accumulator of all cardinality limiter rejections.
23-
pub trait Rejections {
23+
pub trait Rejections<'a> {
2424
/// Called for ever [`Entry`] which was rejected from the [`Limiter`].
25-
fn reject(&mut self, entry_id: EntryId);
25+
fn reject(&mut self, limit_id: &'a str, entry_id: EntryId);
2626
}
2727

2828
/// Limiter responsible to enforce limits.
2929
pub trait Limiter {
3030
/// Verifies cardinality limits.
3131
///
3232
/// Returns an iterator containing only accepted entries.
33-
fn check_cardinality_limits<E, R>(
33+
fn check_cardinality_limits<'a, E, R>(
3434
&self,
3535
scoping: Scoping,
36-
limits: &[CardinalityLimit],
36+
limits: &'a [CardinalityLimit],
3737
entries: E,
3838
rejections: &mut R,
3939
) -> Result<()>
4040
where
4141
E: IntoIterator<Item = Entry>,
42-
R: Rejections;
42+
R: Rejections<'a>;
4343
}
4444

4545
/// Unit of operation for the cardinality limiter.
@@ -100,30 +100,30 @@ impl<T: Limiter> CardinalityLimiter<T> {
100100
/// Checks cardinality limits of a list of buckets.
101101
///
102102
/// Returns an iterator of all buckets that have been accepted.
103-
pub fn check_cardinality_limits<I: CardinalityItem>(
103+
pub fn check_cardinality_limits<'a, I: CardinalityItem>(
104104
&self,
105105
scoping: Scoping,
106-
limits: &[CardinalityLimit],
106+
limits: &'a [CardinalityLimit],
107107
items: Vec<I>,
108-
) -> Result<CardinalityLimits<I>, (Vec<I>, Error)> {
108+
) -> Result<CardinalityLimits<'a, I>, (Vec<I>, Error)> {
109109
metric!(timer(CardinalityLimiterTimers::CardinalityLimiter), {
110110
let entries = items.iter().enumerate().filter_map(|(id, item)| {
111111
Some(Entry::new(EntryId(id), item.namespace()?, item.to_hash()))
112112
});
113113

114-
let mut rejections = RejectedIds::default();
114+
let mut rejections = RejectionTracker::default();
115115
if let Err(err) =
116116
self.limiter
117117
.check_cardinality_limits(scoping, limits, entries, &mut rejections)
118118
{
119119
return Err((items, err));
120120
}
121121

122-
if !rejections.0.is_empty() {
122+
if !rejections.entries.is_empty() {
123123
relay_log::debug!(
124124
scoping = ?scoping,
125125
"rejected {} metrics due to cardinality limit",
126-
rejections.0.len(),
126+
rejections.entries.len(),
127127
);
128128
}
129129

@@ -136,30 +136,46 @@ impl<T: Limiter> CardinalityLimiter<T> {
136136
///
137137
/// The result can be used directly by [`CardinalityLimits`].
138138
#[derive(Debug, Default)]
139-
struct RejectedIds(HashSet<usize>);
139+
struct RejectionTracker<'a> {
140+
limits: HashSet<&'a str>,
141+
entries: HashSet<usize>,
142+
}
140143

141-
impl Rejections for RejectedIds {
144+
impl<'a> Rejections<'a> for RejectionTracker<'a> {
142145
#[inline(always)]
143-
fn reject(&mut self, entry_id: EntryId) {
144-
self.0.insert(entry_id.0);
146+
fn reject(&mut self, limit_id: &'a str, entry_id: EntryId) {
147+
self.limits.insert(limit_id);
148+
self.entries.insert(entry_id.0);
145149
}
146150
}
147151

148152
/// Result of [`CardinalityLimiter::check_cardinality_limits`].
149153
#[derive(Debug)]
150-
pub struct CardinalityLimits<T> {
154+
pub struct CardinalityLimits<'a, T> {
151155
source: Vec<T>,
152156
rejections: HashSet<usize>,
157+
limits: HashSet<&'a str>,
153158
}
154159

155-
impl<T> CardinalityLimits<T> {
156-
fn new(source: Vec<T>, rejections: RejectedIds) -> Self {
160+
impl<'a, T> CardinalityLimits<'a, T> {
161+
fn new(source: Vec<T>, rejections: RejectionTracker<'a>) -> Self {
157162
Self {
158163
source,
159-
rejections: rejections.0,
164+
rejections: rejections.entries,
165+
limits: rejections.limits,
160166
}
161167
}
162168

169+
/// Returns `true` if any items have been rejected.
170+
pub fn has_rejections(&self) -> bool {
171+
!self.rejections.is_empty()
172+
}
173+
174+
/// Returns all id's of cardinality limits which were exceeded.
175+
pub fn enforced_limits(&self) -> &HashSet<&'a str> {
176+
&self.limits
177+
}
178+
163179
/// Recovers the original list of items passed to the cardinality limiter.
164180
pub fn into_source(self) -> Vec<T> {
165181
self.source
@@ -259,21 +275,27 @@ mod tests {
259275
let limits = CardinalityLimits {
260276
source: vec!['a', 'b', 'c', 'd', 'e'],
261277
rejections: HashSet::from([0, 1, 3]),
278+
limits: HashSet::new(),
262279
};
263280
assert_rejected(&limits, ['a', 'b', 'd']);
281+
assert!(limits.has_rejections());
264282
assert_eq!(limits.into_accepted(), vec!['c', 'e']);
265283

266284
let limits = CardinalityLimits {
267285
source: vec!['a', 'b', 'c', 'd', 'e'],
268286
rejections: HashSet::from([]),
287+
limits: HashSet::new(),
269288
};
270289
assert_rejected(&limits, []);
290+
assert!(!limits.has_rejections());
271291
assert_eq!(limits.into_accepted(), vec!['a', 'b', 'c', 'd', 'e']);
272292

273293
let limits = CardinalityLimits {
274294
source: vec!['a', 'b', 'c', 'd', 'e'],
275295
rejections: HashSet::from([0, 1, 2, 3, 4]),
296+
limits: HashSet::new(),
276297
};
298+
assert!(limits.has_rejections());
277299
assert_rejected(&limits, ['a', 'b', 'c', 'd', 'e']);
278300
assert!(limits.into_accepted().is_empty());
279301
}
@@ -283,19 +305,19 @@ mod tests {
283305
struct RejectAllLimiter;
284306

285307
impl Limiter for RejectAllLimiter {
286-
fn check_cardinality_limits<I, T>(
308+
fn check_cardinality_limits<'a, I, T>(
287309
&self,
288310
_scoping: Scoping,
289-
_limits: &[CardinalityLimit],
311+
_limits: &'a [CardinalityLimit],
290312
entries: I,
291313
outcomes: &mut T,
292314
) -> Result<()>
293315
where
294316
I: IntoIterator<Item = Entry>,
295-
T: Rejections,
317+
T: Rejections<'a>,
296318
{
297319
for entry in entries {
298-
outcomes.reject(entry.id);
320+
outcomes.reject("test", entry.id);
299321
}
300322

301323
Ok(())
@@ -304,17 +326,19 @@ mod tests {
304326

305327
let limiter = CardinalityLimiter::new(RejectAllLimiter);
306328

329+
let limits = build_limits();
307330
let result = limiter
308331
.check_cardinality_limits(
309332
build_scoping(),
310-
&build_limits(),
333+
&limits,
311334
vec![
312335
Item::new(0, MetricNamespace::Transactions),
313336
Item::new(1, MetricNamespace::Transactions),
314337
],
315338
)
316339
.unwrap();
317340

341+
assert_eq!(result.enforced_limits(), &HashSet::from(["test"]));
318342
assert!(result.into_accepted().is_empty());
319343
}
320344

@@ -323,16 +347,16 @@ mod tests {
323347
struct AcceptAllLimiter;
324348

325349
impl Limiter for AcceptAllLimiter {
326-
fn check_cardinality_limits<I, T>(
350+
fn check_cardinality_limits<'a, I, T>(
327351
&self,
328352
_scoping: Scoping,
329-
_limits: &[CardinalityLimit],
353+
_limits: &'a [CardinalityLimit],
330354
_entries: I,
331355
_outcomes: &mut T,
332356
) -> Result<()>
333357
where
334358
I: IntoIterator<Item = Entry>,
335-
T: Rejections,
359+
T: Rejections<'a>,
336360
{
337361
Ok(())
338362
}
@@ -344,8 +368,9 @@ mod tests {
344368
Item::new(0, MetricNamespace::Transactions),
345369
Item::new(1, MetricNamespace::Spans),
346370
];
371+
let limits = build_limits();
347372
let result = limiter
348-
.check_cardinality_limits(build_scoping(), &build_limits(), items.clone())
373+
.check_cardinality_limits(build_scoping(), &limits, items.clone())
349374
.unwrap();
350375

351376
assert_eq!(result.into_accepted(), items);
@@ -356,7 +381,7 @@ mod tests {
356381
struct RejectEvenLimiter;
357382

358383
impl Limiter for RejectEvenLimiter {
359-
fn check_cardinality_limits<I, T>(
384+
fn check_cardinality_limits<'a, I, T>(
360385
&self,
361386
scoping: Scoping,
362387
limits: &[CardinalityLimit],
@@ -365,14 +390,14 @@ mod tests {
365390
) -> Result<()>
366391
where
367392
I: IntoIterator<Item = Entry>,
368-
T: Rejections,
393+
T: Rejections<'a>,
369394
{
370395
assert_eq!(scoping, build_scoping());
371396
assert_eq!(limits, &build_limits());
372397

373398
for entry in entries {
374399
if entry.id.0 % 2 == 0 {
375-
outcomes.reject(entry.id);
400+
outcomes.reject("test", entry.id);
376401
}
377402
}
378403

relay-cardinality/src/redis/limiter.rs

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -90,16 +90,16 @@ impl RedisSetLimiter {
9090
}
9191

9292
impl Limiter for RedisSetLimiter {
93-
fn check_cardinality_limits<E, R>(
93+
fn check_cardinality_limits<'a, E, R>(
9494
&self,
9595
scoping: Scoping,
96-
limits: &[CardinalityLimit],
96+
limits: &'a [CardinalityLimit],
9797
entries: E,
9898
rejections: &mut R,
9999
) -> Result<()>
100100
where
101101
E: IntoIterator<Item = Entry>,
102-
R: Rejections,
102+
R: Rejections<'a>,
103103
{
104104
let timestamp = UnixTimestamp::now();
105105
// Allows to fast forward time in tests.
@@ -124,7 +124,7 @@ impl Limiter for RedisSetLimiter {
124124
}
125125
CacheOutcome::Rejected => {
126126
// Rejected, add it to the rejected list and move on.
127-
rejections.reject(entry.id);
127+
rejections.reject(state.id, entry.id);
128128
state.cache_hit();
129129
state.rejected();
130130
}
@@ -156,7 +156,7 @@ impl Limiter for RedisSetLimiter {
156156
let mut cache = self.cache.update(state.scope, timestamp); // Acquire a write lock.
157157
for (entry, status) in results {
158158
if status.is_rejected() {
159-
rejections.reject(entry.id);
159+
rejections.reject(state.id, entry.id);
160160
state.rejected();
161161
} else {
162162
cache.accept(entry.hash);
@@ -346,8 +346,8 @@ mod tests {
346346
#[derive(Debug, Default, PartialEq, Eq)]
347347
struct Rejections(HashSet<EntryId>);
348348

349-
impl super::Rejections for Rejections {
350-
fn reject(&mut self, entry_id: EntryId) {
349+
impl<'a> super::Rejections<'a> for Rejections {
350+
fn reject(&mut self, _limit_id: &'a str, entry_id: EntryId) {
351351
self.0.insert(entry_id);
352352
}
353353
}

relay-dynamic-config/src/global.rs

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -38,11 +38,6 @@ impl GlobalConfig {
3838
Ok(None)
3939
}
4040
}
41-
42-
/// Returns the [`Options::cardinality_limiter_mode`] option.
43-
pub fn cardinality_limiter_mode(&self) -> CardinalityLimiterMode {
44-
self.options.cardinality_limiter_mode
45-
}
4641
}
4742

4843
/// All options passed down from Sentry to Relay.
@@ -79,6 +74,16 @@ pub struct Options {
7974
)]
8075
pub cardinality_limiter_mode: CardinalityLimiterMode,
8176

77+
/// Sample rate for Cardinality Limiter Sentry errors.
78+
///
79+
/// Rate needs to be between `0.0` and `1.0`.
80+
/// If set to `1.0` all cardinality limiter rejections will be logged as a Sentry error.
81+
#[serde(
82+
rename = "relay.cardinality-limiter.error-sample-rate",
83+
skip_serializing_if = "is_default"
84+
)]
85+
pub cardinality_limiter_error_sample_rate: f32,
86+
8287
/// Kill switch for disabling the span usage metric.
8388
///
8489
/// This metric is converted into outcomes in a sentry-side consumer.

relay-server/src/services/processor.rs

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1774,7 +1774,7 @@ impl EnvelopeProcessorService {
17741774
mode: ExtractionMode,
17751775
) -> Vec<Bucket> {
17761776
let global_config = self.inner.global_config.current();
1777-
let cardinality_limiter_mode = global_config.cardinality_limiter_mode();
1777+
let cardinality_limiter_mode = global_config.options.cardinality_limiter_mode;
17781778

17791779
if matches!(cardinality_limiter_mode, CardinalityLimiterMode::Disabled) {
17801780
return buckets;
@@ -1801,6 +1801,17 @@ impl EnvelopeProcessorService {
18011801
}
18021802
};
18031803

1804+
let error_sample_rate = global_config.options.cardinality_limiter_error_sample_rate;
1805+
if limits.has_rejections() && sample(error_sample_rate) {
1806+
for limit_id in limits.enforced_limits() {
1807+
relay_log::error!(
1808+
tags.organization_id = scoping.organization_id,
1809+
tags.limit_id = limit_id,
1810+
"Cardinality Limit"
1811+
);
1812+
}
1813+
}
1814+
18041815
if matches!(cardinality_limiter_mode, CardinalityLimiterMode::Passive) {
18051816
return limits.into_source();
18061817
}
@@ -2251,6 +2262,14 @@ impl UpstreamRequest for SendEnvelope {
22512262
}
22522263
}
22532264

2265+
/// Returns `true` if the current item should be sampled.
2266+
///
2267+
/// The passed `rate` is expected to be `0 <= rate <= 1`.
2268+
#[cfg(feature = "processing")]
2269+
fn sample(rate: f32) -> bool {
2270+
(rate >= 1.0) || (rate > 0.0 && rand::random::<f32>() < rate)
2271+
}
2272+
22542273
/// Computes a stable partitioning key for sharded metric requests.
22552274
fn partition_key(project_key: ProjectKey, bucket: &Bucket, partitions: Option<u64>) -> Option<u64> {
22562275
use std::hash::{Hash, Hasher};

0 commit comments

Comments
 (0)