Skip to content

Commit 0fa97bc

Browse files
committed
Honor timeout in ZipkinWebClientSender
Unfortunately there's no good way to configure connect and read timeout separately, which works for all supported reactive clients. This implementation applies a timeout through Reactor's timeout method. The timeout from the properties is summed together and this is the applied timeout. While not perfect, this is better than no timeout at all. Closes spring-projectsgh-31496
1 parent 1d60e42 commit 0fa97bc

File tree

5 files changed

+110
-32
lines changed

5 files changed

+110
-32
lines changed

spring-boot-project/spring-boot-actuator-autoconfigure/src/main/java/org/springframework/boot/actuate/autoconfigure/tracing/zipkin/ZipkinConfigurations.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -119,7 +119,8 @@ ZipkinWebClientSender webClientSender(ZipkinProperties properties,
119119
.getIfAvailable(() -> new PropertiesZipkinConnectionDetails(properties));
120120
WebClient.Builder builder = WebClient.builder();
121121
customizers.orderedStream().forEach((customizer) -> customizer.customize(builder));
122-
return new ZipkinWebClientSender(connectionDetails.getSpanEndpoint(), builder.build());
122+
return new ZipkinWebClientSender(connectionDetails.getSpanEndpoint(), builder.build(),
123+
properties.getConnectTimeout().plus(properties.getReadTimeout()));
123124
}
124125

125126
}

spring-boot-project/spring-boot-actuator-autoconfigure/src/main/java/org/springframework/boot/actuate/autoconfigure/tracing/zipkin/ZipkinWebClientSender.java

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@
1616

1717
package org.springframework.boot.actuate.autoconfigure.tracing.zipkin;
1818

19+
import java.time.Duration;
20+
1921
import reactor.core.publisher.Mono;
2022
import zipkin2.Call;
2123
import zipkin2.Callback;
@@ -28,21 +30,25 @@
2830
* An {@link HttpSender} which uses {@link WebClient} for HTTP communication.
2931
*
3032
* @author Stefan Bratanov
33+
* @author Moritz Halbritter
3134
*/
3235
class ZipkinWebClientSender extends HttpSender {
3336

3437
private final String endpoint;
3538

3639
private final WebClient webClient;
3740

38-
ZipkinWebClientSender(String endpoint, WebClient webClient) {
41+
private final Duration timeout;
42+
43+
ZipkinWebClientSender(String endpoint, WebClient webClient, Duration timeout) {
3944
this.endpoint = endpoint;
4045
this.webClient = webClient;
46+
this.timeout = timeout;
4147
}
4248

4349
@Override
4450
public HttpPostCall sendSpans(byte[] batchedEncodedSpans) {
45-
return new WebClientHttpPostCall(this.endpoint, batchedEncodedSpans, this.webClient);
51+
return new WebClientHttpPostCall(this.endpoint, batchedEncodedSpans, this.webClient, this.timeout);
4652
}
4753

4854
private static class WebClientHttpPostCall extends HttpPostCall {
@@ -51,15 +57,18 @@ private static class WebClientHttpPostCall extends HttpPostCall {
5157

5258
private final WebClient webClient;
5359

54-
WebClientHttpPostCall(String endpoint, byte[] body, WebClient webClient) {
60+
private final Duration timeout;
61+
62+
WebClientHttpPostCall(String endpoint, byte[] body, WebClient webClient, Duration timeout) {
5563
super(body);
5664
this.endpoint = endpoint;
5765
this.webClient = webClient;
66+
this.timeout = timeout;
5867
}
5968

6069
@Override
6170
public Call<Void> clone() {
62-
return new WebClientHttpPostCall(this.endpoint, getUncompressedBody(), this.webClient);
71+
return new WebClientHttpPostCall(this.endpoint, getUncompressedBody(), this.webClient, this.timeout);
6372
}
6473

6574
@Override
@@ -79,7 +88,8 @@ private Mono<ResponseEntity<Void>> sendRequest() {
7988
.headers(this::addDefaultHeaders)
8089
.bodyValue(getBody())
8190
.retrieve()
82-
.toBodilessEntity();
91+
.toBodilessEntity()
92+
.timeout(this.timeout);
8393
}
8494

8595
private void addDefaultHeaders(HttpHeaders headers) {

spring-boot-project/spring-boot-actuator-autoconfigure/src/test/java/org/springframework/boot/actuate/autoconfigure/tracing/zipkin/ZipkinHttpSenderTests.java

Lines changed: 24 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2012-2022 the original author or authors.
2+
* Copyright 2012-2023 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -25,6 +25,7 @@
2525
import java.util.concurrent.atomic.AtomicReference;
2626

2727
import org.awaitility.Awaitility;
28+
import org.junit.jupiter.api.AfterEach;
2829
import org.junit.jupiter.api.BeforeEach;
2930
import org.junit.jupiter.api.Test;
3031
import zipkin2.Callback;
@@ -42,19 +43,25 @@
4243
*/
4344
abstract class ZipkinHttpSenderTests {
4445

45-
protected Sender sut;
46+
protected Sender sender;
4647

47-
abstract Sender createSut();
48+
abstract Sender createSender();
4849

4950
@BeforeEach
50-
void setUp() {
51-
this.sut = createSut();
51+
void beforeEach() throws Exception {
52+
this.sender = createSender();
53+
}
54+
55+
@AfterEach
56+
void afterEach() throws IOException {
57+
this.sender.close();
5258
}
5359

5460
@Test
5561
void sendSpansShouldThrowIfCloseWasCalled() throws IOException {
56-
this.sut.close();
57-
assertThatThrownBy(() -> this.sut.sendSpans(Collections.emptyList())).isInstanceOf(ClosedSenderException.class);
62+
this.sender.close();
63+
assertThatThrownBy(() -> this.sender.sendSpans(Collections.emptyList()))
64+
.isInstanceOf(ClosedSenderException.class);
5865
}
5966

6067
protected void makeRequest(List<byte[]> encodedSpans, boolean async) throws IOException {
@@ -68,8 +75,12 @@ protected void makeRequest(List<byte[]> encodedSpans, boolean async) throws IOEx
6875
}
6976

7077
protected CallbackResult makeAsyncRequest(List<byte[]> encodedSpans) {
78+
return makeAsyncRequest(this.sender, encodedSpans);
79+
}
80+
81+
protected CallbackResult makeAsyncRequest(Sender sender, List<byte[]> encodedSpans) {
7182
AtomicReference<CallbackResult> callbackResult = new AtomicReference<>();
72-
this.sut.sendSpans(encodedSpans).enqueue(new Callback<>() {
83+
sender.sendSpans(encodedSpans).enqueue(new Callback<>() {
7384
@Override
7485
public void onSuccess(Void value) {
7586
callbackResult.set(new CallbackResult(true, null));
@@ -84,7 +95,11 @@ public void onError(Throwable t) {
8495
}
8596

8697
protected void makeSyncRequest(List<byte[]> encodedSpans) throws IOException {
87-
this.sut.sendSpans(encodedSpans).execute();
98+
makeSyncRequest(this.sender, encodedSpans);
99+
}
100+
101+
protected void makeSyncRequest(Sender sender, List<byte[]> encodedSpans) throws IOException {
102+
sender.sendSpans(encodedSpans).execute();
88103
}
89104

90105
protected byte[] toByteArray(String input) {

spring-boot-project/spring-boot-actuator-autoconfigure/src/test/java/org/springframework/boot/actuate/autoconfigure/tracing/zipkin/ZipkinRestTemplateSenderTests.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -54,14 +54,16 @@ class ZipkinRestTemplateSenderTests extends ZipkinHttpSenderTests {
5454
private MockRestServiceServer mockServer;
5555

5656
@Override
57-
Sender createSut() {
57+
Sender createSender() {
5858
RestTemplate restTemplate = new RestTemplate();
5959
this.mockServer = MockRestServiceServer.createServer(restTemplate);
6060
return new ZipkinRestTemplateSender(ZIPKIN_URL, restTemplate);
6161
}
6262

6363
@AfterEach
64-
void tearDown() {
64+
@Override
65+
void afterEach() throws IOException {
66+
super.afterEach();
6567
this.mockServer.verify();
6668
}
6769

@@ -71,15 +73,15 @@ void checkShouldSendEmptySpanList() {
7173
.andExpect(method(HttpMethod.POST))
7274
.andExpect(content().string("[]"))
7375
.andRespond(withStatus(HttpStatus.ACCEPTED));
74-
assertThat(this.sut.check()).isEqualTo(CheckResult.OK);
76+
assertThat(this.sender.check()).isEqualTo(CheckResult.OK);
7577
}
7678

7779
@Test
7880
void checkShouldNotRaiseException() {
7981
this.mockServer.expect(requestTo(ZIPKIN_URL))
8082
.andExpect(method(HttpMethod.POST))
8183
.andRespond(withStatus(HttpStatus.INTERNAL_SERVER_ERROR));
82-
CheckResult result = this.sut.check();
84+
CheckResult result = this.sender.check();
8385
assertThat(result.ok()).isFalse();
8486
assertThat(result.error()).hasMessageContaining("500 Internal Server Error");
8587
}

spring-boot-project/spring-boot-actuator-autoconfigure/src/test/java/org/springframework/boot/actuate/autoconfigure/tracing/zipkin/ZipkinWebClientSenderTests.java

Lines changed: 63 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -17,16 +17,21 @@
1717
package org.springframework.boot.actuate.autoconfigure.tracing.zipkin;
1818

1919
import java.io.IOException;
20+
import java.time.Duration;
2021
import java.util.Base64;
2122
import java.util.Collections;
2223
import java.util.List;
24+
import java.util.concurrent.TimeUnit;
25+
import java.util.concurrent.TimeoutException;
2326
import java.util.function.Consumer;
2427

2528
import okhttp3.mockwebserver.MockResponse;
2629
import okhttp3.mockwebserver.MockWebServer;
30+
import okhttp3.mockwebserver.QueueDispatcher;
2731
import okhttp3.mockwebserver.RecordedRequest;
2832
import org.junit.jupiter.api.AfterAll;
2933
import org.junit.jupiter.api.BeforeAll;
34+
import org.junit.jupiter.api.BeforeEach;
3035
import org.junit.jupiter.api.Test;
3136
import org.junit.jupiter.params.ParameterizedTest;
3237
import org.junit.jupiter.params.provider.ValueSource;
@@ -45,33 +50,48 @@
4550
*/
4651
class ZipkinWebClientSenderTests extends ZipkinHttpSenderTests {
4752

53+
private static ClearableDispatcher dispatcher;
54+
4855
private static MockWebServer mockBackEnd;
4956

5057
private static String ZIPKIN_URL;
5158

5259
@BeforeAll
5360
static void beforeAll() throws IOException {
61+
dispatcher = new ClearableDispatcher();
5462
mockBackEnd = new MockWebServer();
63+
mockBackEnd.setDispatcher(dispatcher);
5564
mockBackEnd.start();
56-
ZIPKIN_URL = "http://localhost:%s/api/v2/spans".formatted(mockBackEnd.getPort());
65+
ZIPKIN_URL = mockBackEnd.url("/api/v2/spans").toString();
5766
}
5867

5968
@AfterAll
60-
static void tearDown() throws IOException {
69+
static void afterAll() throws IOException {
6170
mockBackEnd.shutdown();
6271
}
6372

6473
@Override
65-
Sender createSut() {
74+
@BeforeEach
75+
void beforeEach() throws Exception {
76+
super.beforeEach();
77+
clearResponses();
78+
clearRequests();
79+
}
80+
81+
@Override
82+
Sender createSender() {
83+
return createSender(Duration.ofSeconds(10));
84+
}
85+
86+
Sender createSender(Duration timeout) {
6687
WebClient webClient = WebClient.builder().build();
67-
return new ZipkinWebClientSender(ZIPKIN_URL, webClient);
88+
return new ZipkinWebClientSender(ZIPKIN_URL, webClient, timeout);
6889
}
6990

7091
@Test
7192
void checkShouldSendEmptySpanList() throws InterruptedException {
7293
mockBackEnd.enqueue(new MockResponse());
73-
assertThat(this.sut.check()).isEqualTo(CheckResult.OK);
74-
94+
assertThat(this.sender.check()).isEqualTo(CheckResult.OK);
7595
requestAssertions((request) -> {
7696
assertThat(request.getMethod()).isEqualTo("POST");
7797
assertThat(request.getBody().readUtf8()).isEqualTo("[]");
@@ -81,10 +101,9 @@ void checkShouldSendEmptySpanList() throws InterruptedException {
81101
@Test
82102
void checkShouldNotRaiseException() throws InterruptedException {
83103
mockBackEnd.enqueue(new MockResponse().setResponseCode(500));
84-
CheckResult result = this.sut.check();
104+
CheckResult result = this.sender.check();
85105
assertThat(result.ok()).isFalse();
86106
assertThat(result.error()).hasMessageContaining("500 Internal Server Error");
87-
88107
requestAssertions((request) -> assertThat(request.getMethod()).isEqualTo("POST"));
89108
}
90109

@@ -94,7 +113,6 @@ void sendSpansShouldSendSpansToZipkin(boolean async) throws IOException, Interru
94113
mockBackEnd.enqueue(new MockResponse());
95114
List<byte[]> encodedSpans = List.of(toByteArray("span1"), toByteArray("span2"));
96115
makeRequest(encodedSpans, async);
97-
98116
requestAssertions((request) -> {
99117
assertThat(request.getMethod()).isEqualTo("POST");
100118
assertThat(request.getHeader("Content-Type")).isEqualTo("application/json");
@@ -115,7 +133,6 @@ void sendSpansShouldHandleHttpFailures(boolean async) throws InterruptedExceptio
115133
assertThatThrownBy(() -> makeSyncRequest(Collections.emptyList()))
116134
.hasMessageContaining("500 Internal Server Error");
117135
}
118-
119136
requestAssertions((request) -> assertThat(request.getMethod()).isEqualTo("POST"));
120137
}
121138

@@ -126,23 +143,56 @@ void sendSpansShouldCompressData(boolean async) throws IOException, InterruptedE
126143
// This is gzip compressed 10000 times 'a'
127144
byte[] compressed = Base64.getDecoder()
128145
.decode("H4sIAAAAAAAA/+3BMQ0AAAwDIKFLj/k3UR8NcA8AAAAAAAAAAAADUsAZfeASJwAA");
129-
130146
mockBackEnd.enqueue(new MockResponse());
131-
132147
makeRequest(List.of(toByteArray(uncompressed)), async);
133-
134148
requestAssertions((request) -> {
135149
assertThat(request.getMethod()).isEqualTo("POST");
136150
assertThat(request.getHeader("Content-Type")).isEqualTo("application/json");
137151
assertThat(request.getHeader("Content-Encoding")).isEqualTo("gzip");
138152
assertThat(request.getBody().readByteArray()).isEqualTo(compressed);
139153
});
154+
}
140155

156+
@ParameterizedTest
157+
@ValueSource(booleans = { true, false })
158+
void shouldTimeout(boolean async) {
159+
Sender sender = createSender(Duration.ofMillis(1));
160+
MockResponse response = new MockResponse().setResponseCode(200).setHeadersDelay(100, TimeUnit.MILLISECONDS);
161+
mockBackEnd.enqueue(response);
162+
if (async) {
163+
CallbackResult callbackResult = makeAsyncRequest(sender, Collections.emptyList());
164+
assertThat(callbackResult.success()).isFalse();
165+
assertThat(callbackResult.error()).isNotNull().isInstanceOf(TimeoutException.class);
166+
}
167+
else {
168+
assertThatThrownBy(() -> makeSyncRequest(sender, Collections.emptyList()))
169+
.hasCauseInstanceOf(TimeoutException.class);
170+
}
141171
}
142172

143173
private void requestAssertions(Consumer<RecordedRequest> assertions) throws InterruptedException {
144174
RecordedRequest request = mockBackEnd.takeRequest();
145175
assertThat(request).satisfies(assertions);
146176
}
147177

178+
private static void clearRequests() throws InterruptedException {
179+
RecordedRequest request;
180+
do {
181+
request = mockBackEnd.takeRequest(0, TimeUnit.SECONDS);
182+
}
183+
while (request != null);
184+
}
185+
186+
private static void clearResponses() {
187+
dispatcher.clear();
188+
}
189+
190+
private static class ClearableDispatcher extends QueueDispatcher {
191+
192+
void clear() {
193+
getResponseQueue().clear();
194+
}
195+
196+
}
197+
148198
}

0 commit comments

Comments
 (0)