Skip to content

Commit bdc77da

Browse files
authored
Do not reuse the SetBackedScalingCuckooFilter during merging (#75111)
Fixes an IAE that is thrown when the underlaying circuit breaker is closed.
1 parent 063a1f2 commit bdc77da

File tree

4 files changed

+94
-21
lines changed

4 files changed

+94
-21
lines changed
Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0 and the Server Side Public License, v 1; you may not use this file except
5+
* in compliance with, at your election, the Elastic License 2.0 or the Server
6+
* Side Public License, v 1.
7+
*/
8+
9+
package org.elasticsearch.search.aggregations.bucket.terms;
10+
11+
import org.elasticsearch.action.bulk.BulkRequestBuilder;
12+
import org.elasticsearch.action.index.IndexRequest;
13+
import org.elasticsearch.action.search.SearchRequestBuilder;
14+
import org.elasticsearch.action.search.SearchResponse;
15+
import org.elasticsearch.cluster.metadata.IndexMetadata;
16+
import org.elasticsearch.common.settings.Settings;
17+
import org.elasticsearch.common.xcontent.XContentType;
18+
import org.elasticsearch.test.ESSingleNodeTestCase;
19+
import org.hamcrest.Matchers;
20+
21+
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailures;
22+
23+
/**
24+
* Test that index enough data to trigger the creation of Cuckoo filters.
25+
*/
26+
27+
public class RareTermsIT extends ESSingleNodeTestCase {
28+
29+
private static final String index = "idx";
30+
31+
private void indexDocs(int numDocs) {
32+
final BulkRequestBuilder bulk = client().prepareBulk();
33+
for (int i = 0; i < numDocs; ++i) {
34+
bulk.add(new IndexRequest(index).source("{\"str_value\" : \"s" + i + "\"}", XContentType.JSON));
35+
}
36+
assertNoFailures(bulk.get());
37+
}
38+
39+
public void testSingleValuedString() {
40+
final Settings.Builder settings = Settings.builder()
41+
.put(IndexMetadata.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), 2)
42+
.put(IndexMetadata.INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), 0);
43+
createIndex(index, settings.build());
44+
// We want to trigger the usage of cuckoo filters that happen only when there are
45+
// more than 10k distinct values in one shard.
46+
final int numDocs = randomIntBetween(12000, 17000);
47+
// Index every value 3 times
48+
for (int i = 0; i < 3; i++) {
49+
indexDocs(numDocs);
50+
assertNoFailures(client().admin().indices().prepareRefresh(index).get());
51+
}
52+
// There are no rare terms that only appear in one document
53+
assertNumRareTerms(1, 0);
54+
// All terms have a cardinality lower than 10
55+
assertNumRareTerms(10, numDocs);
56+
}
57+
58+
private void assertNumRareTerms(int maxDocs, int rareTerms) {
59+
final SearchRequestBuilder requestBuilder = client().prepareSearch(index);
60+
requestBuilder.addAggregation(new RareTermsAggregationBuilder("rareTerms").field("str_value.keyword").maxDocCount(maxDocs));
61+
final SearchResponse response = requestBuilder.get();
62+
assertNoFailures(response);
63+
final RareTerms terms = response.getAggregations().get("rareTerms");
64+
assertThat(terms.getBuckets().size(), Matchers.equalTo(rareTerms));
65+
}
66+
}

server/src/main/java/org/elasticsearch/common/util/SetBackedScalingCuckooFilter.java

Lines changed: 21 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -103,23 +103,6 @@ public SetBackedScalingCuckooFilter(int threshold, Random rng, double fpp) {
103103
this.fpp = fpp;
104104
}
105105

106-
public SetBackedScalingCuckooFilter(SetBackedScalingCuckooFilter other) {
107-
this.threshold = other.threshold;
108-
this.isSetMode = other.isSetMode;
109-
this.rng = other.rng;
110-
this.breaker = other.breaker;
111-
this.capacity = other.capacity;
112-
this.fpp = other.fpp;
113-
if (isSetMode) {
114-
this.hashes = new HashSet<>(other.hashes);
115-
} else {
116-
this.filters = new ArrayList<>(other.filters);
117-
this.numBuckets = filters.get(0).getNumBuckets();
118-
this.fingerprintMask = filters.get(0).getFingerprintMask();
119-
this.bitsPerEntry = filters.get(0).getBitsPerEntry();
120-
}
121-
}
122-
123106
public SetBackedScalingCuckooFilter(StreamInput in, Random rng) throws IOException {
124107
this.threshold = in.readVInt();
125108
this.isSetMode = in.readBoolean();
@@ -150,6 +133,27 @@ public void writeTo(StreamOutput out) throws IOException {
150133
}
151134
}
152135

136+
/**
137+
* Returns the number of distinct values that are tracked before converting to an approximate representation.
138+
* */
139+
public int getThreshold() {
140+
return threshold;
141+
}
142+
143+
/**
144+
* Returns the random number generator used for the cuckoo hashing process.
145+
* */
146+
public Random getRng() {
147+
return rng;
148+
}
149+
150+
/**
151+
* Returns the false-positive rate used for the cuckoo filters.
152+
* */
153+
public double getFpp() {
154+
return fpp;
155+
}
156+
153157
/**
154158
* Registers a circuit breaker with the datastructure.
155159
*

server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/InternalMappedRareTerms.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -107,10 +107,10 @@ public InternalAggregation reduce(List<InternalAggregation> aggregations, Reduce
107107

108108
SetBackedScalingCuckooFilter otherFilter = ((InternalMappedRareTerms)aggregation).getFilter();
109109
if (filter == null) {
110-
filter = new SetBackedScalingCuckooFilter(otherFilter);
111-
} else {
112-
filter.merge(otherFilter);
110+
filter = new SetBackedScalingCuckooFilter(otherFilter.getThreshold(), otherFilter.getRng(), otherFilter.getFpp());
113111
}
112+
filter.merge(otherFilter);
113+
114114
}
115115

116116
final List<B> rare = new ArrayList<>();

server/src/test/java/org/elasticsearch/common/util/SetBackedScalingCuckooFilterTests.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
import static org.hamcrest.Matchers.equalTo;
2020
import static org.hamcrest.Matchers.greaterThan;
21+
import static org.hamcrest.Matchers.in;
2122
import static org.hamcrest.Matchers.lessThanOrEqualTo;
2223

2324
public class SetBackedScalingCuckooFilterTests extends AbstractWireSerializingTestCase<SetBackedScalingCuckooFilter> {
@@ -41,7 +42,9 @@ protected Writeable.Reader<SetBackedScalingCuckooFilter> instanceReader() {
4142

4243
@Override
4344
protected SetBackedScalingCuckooFilter mutateInstance(SetBackedScalingCuckooFilter instance) throws IOException {
44-
SetBackedScalingCuckooFilter newInstance = new SetBackedScalingCuckooFilter(instance);
45+
SetBackedScalingCuckooFilter newInstance =
46+
new SetBackedScalingCuckooFilter(instance.getThreshold(), instance.getRng(), instance.getFpp());
47+
newInstance.merge(instance);
4548
int num = randomIntBetween(1, 10);
4649
for (int i = 0; i < num; i++) {
4750
newInstance.add(randomLong());

0 commit comments

Comments
 (0)