Skip to content

Commit 99e5893

Browse files
committed
Merge remote-tracking branch 'upstream/master'
2 parents 93dfbcb + 7184bdc commit 99e5893

File tree

2 files changed

+205
-10
lines changed

2 files changed

+205
-10
lines changed

hystrix-contrib/hystrix-metrics-event-stream/src/main/java/com/netflix/hystrix/contrib/sample/stream/HystrixSampleSseServlet.java

Lines changed: 20 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,9 @@ public abstract class HystrixSampleSseServlet extends HttpServlet {
4242

4343
private final int pausePollerThreadDelayInMs;
4444

45+
/* response is not thread-safe */
46+
private final Object responseWriteLock = new Object();
47+
4548
/* Set to true upon shutdown, so it's OK to be shared among all SampleSseServlets */
4649
private static volatile boolean isDestroyed = false;
4750

@@ -147,12 +150,15 @@ public void onError(Throwable e) {
147150
public void onNext(String sampleDataAsString) {
148151
if (sampleDataAsString != null) {
149152
try {
150-
writer.print("data: " + sampleDataAsString + "\n\n");
151-
// explicitly check for client disconnect - PrintWriter does not throw exceptions
152-
if (writer.checkError()) {
153-
moreDataWillBeSent.set(false);
153+
// avoid concurrent writes with ping
154+
synchronized (responseWriteLock) {
155+
writer.print("data: " + sampleDataAsString + "\n\n");
156+
// explicitly check for client disconnect - PrintWriter does not throw exceptions
157+
if (writer.checkError()) {
158+
moreDataWillBeSent.set(false);
159+
}
160+
writer.flush();
154161
}
155-
writer.flush();
156162
} catch (Exception ex) {
157163
moreDataWillBeSent.set(false);
158164
}
@@ -164,12 +170,16 @@ public void onNext(String sampleDataAsString) {
164170
try {
165171
Thread.sleep(pausePollerThreadDelayInMs);
166172
//in case stream has not started emitting yet, catch any clients which connect/disconnect before emits start
167-
writer.print("ping: \n\n");
168-
// explicitly check for client disconnect - PrintWriter does not throw exceptions
169-
if (writer.checkError()) {
170-
moreDataWillBeSent.set(false);
173+
174+
// avoid concurrent writes with sample
175+
synchronized (responseWriteLock) {
176+
writer.print("ping: \n\n");
177+
// explicitly check for client disconnect - PrintWriter does not throw exceptions
178+
if (writer.checkError()) {
179+
moreDataWillBeSent.set(false);
180+
}
181+
writer.flush();
171182
}
172-
writer.flush();
173183
} catch (Exception ex) {
174184
moreDataWillBeSent.set(false);
175185
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,185 @@
1+
/**
2+
* Copyright 2016 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package com.netflix.hystrix.contrib.sample.stream;
17+
18+
import com.netflix.config.DynamicIntProperty;
19+
import com.netflix.config.DynamicPropertyFactory;
20+
import com.netflix.hystrix.config.HystrixConfiguration;
21+
import com.netflix.hystrix.config.HystrixConfigurationStream;
22+
23+
import org.junit.After;
24+
import org.junit.Before;
25+
import org.junit.Test;
26+
import org.mockito.Mock;
27+
import org.mockito.Mockito;
28+
import org.mockito.MockitoAnnotations;
29+
import org.mockito.invocation.InvocationOnMock;
30+
import org.mockito.stubbing.Answer;
31+
32+
import java.io.IOException;
33+
import java.io.PrintWriter;
34+
import java.util.concurrent.atomic.AtomicInteger;
35+
import java.util.regex.Pattern;
36+
37+
import javax.servlet.ServletException;
38+
import javax.servlet.http.HttpServletRequest;
39+
import javax.servlet.http.HttpServletResponse;
40+
41+
import rx.Observable;
42+
import rx.Subscriber;
43+
import rx.functions.Func1;
44+
import rx.schedulers.Schedulers;
45+
46+
import static org.junit.Assert.assertFalse;
47+
import static org.junit.Assert.fail;
48+
import static org.mockito.Mockito.when;
49+
50+
public class HystrixSampleSseServletTest {
51+
52+
private static final String INTERJECTED_CHARACTER = "a";
53+
54+
@Mock HttpServletRequest mockReq;
55+
@Mock HttpServletResponse mockResp;
56+
@Mock HystrixConfiguration mockConfig;
57+
@Mock PrintWriter mockPrintWriter;
58+
59+
TestHystrixConfigSseServlet servlet;
60+
61+
@Before
62+
public void init() {
63+
MockitoAnnotations.initMocks(this);
64+
}
65+
66+
@After
67+
public void tearDown() {
68+
servlet.destroy();
69+
servlet.shutdown();
70+
}
71+
72+
@Test
73+
public void testNoConcurrentResponseWrites() throws IOException, InterruptedException {
74+
final Observable<HystrixConfiguration> limitedOnNexts = Observable.create(new Observable.OnSubscribe<HystrixConfiguration>() {
75+
@Override
76+
public void call(Subscriber<? super HystrixConfiguration> subscriber) {
77+
try {
78+
for (int i = 0; i < 500; i++) {
79+
Thread.sleep(10);
80+
subscriber.onNext(mockConfig);
81+
}
82+
83+
} catch (InterruptedException ex) {
84+
ex.printStackTrace();
85+
} catch (Exception e) {
86+
subscriber.onCompleted();
87+
}
88+
}
89+
}).subscribeOn(Schedulers.computation());
90+
91+
servlet = new TestHystrixConfigSseServlet(limitedOnNexts, 1);
92+
try {
93+
servlet.init();
94+
} catch (ServletException ex) {
95+
96+
}
97+
98+
final StringBuilder buffer = new StringBuilder();
99+
100+
when(mockReq.getParameter("delay")).thenReturn("100");
101+
when(mockResp.getWriter()).thenReturn(mockPrintWriter);
102+
Mockito.doAnswer(new Answer<Void>() {
103+
@Override
104+
public Void answer(InvocationOnMock invocation) throws Throwable {
105+
String written = (String) invocation.getArguments()[0];
106+
if (written.contains("ping")) {
107+
buffer.append(INTERJECTED_CHARACTER);
108+
} else {
109+
// slow down the append to increase chances to interleave
110+
for (int i = 0; i < written.length(); i++) {
111+
Thread.sleep(5);
112+
buffer.append(written.charAt(i));
113+
}
114+
}
115+
return null;
116+
}
117+
}).when(mockPrintWriter).print(Mockito.anyString());
118+
119+
Runnable simulateClient = new Runnable() {
120+
@Override
121+
public void run() {
122+
try {
123+
servlet.doGet(mockReq, mockResp);
124+
} catch (ServletException ex) {
125+
fail(ex.getMessage());
126+
} catch (IOException ex) {
127+
fail(ex.getMessage());
128+
}
129+
}
130+
};
131+
132+
Thread t = new Thread(simulateClient);
133+
t.start();
134+
135+
try {
136+
Thread.sleep(1000);
137+
System.out.println(System.currentTimeMillis() + " Woke up from sleep : " + Thread.currentThread().getName());
138+
} catch (InterruptedException ex) {
139+
fail(ex.getMessage());
140+
}
141+
142+
Pattern pattern = Pattern.compile("\\{[" + INTERJECTED_CHARACTER + "]+\\}");
143+
boolean hasInterleaved = pattern.matcher(buffer).find();
144+
assertFalse(hasInterleaved);
145+
}
146+
147+
private static class TestHystrixConfigSseServlet extends HystrixSampleSseServlet {
148+
149+
private static AtomicInteger concurrentConnections = new AtomicInteger(0);
150+
private static DynamicIntProperty maxConcurrentConnections = DynamicPropertyFactory.getInstance().getIntProperty("hystrix.config.stream.maxConcurrentConnections", 5);
151+
152+
public TestHystrixConfigSseServlet() {
153+
this(HystrixConfigurationStream.getInstance().observe(), DEFAULT_PAUSE_POLLER_THREAD_DELAY_IN_MS);
154+
}
155+
156+
TestHystrixConfigSseServlet(Observable<HystrixConfiguration> sampleStream, int pausePollerThreadDelayInMs) {
157+
super(sampleStream.map(new Func1<HystrixConfiguration, String>() {
158+
@Override
159+
public String call(HystrixConfiguration hystrixConfiguration) {
160+
return "{}";
161+
}
162+
}), pausePollerThreadDelayInMs);
163+
}
164+
165+
@Override
166+
protected int getMaxNumberConcurrentConnectionsAllowed() {
167+
return maxConcurrentConnections.get();
168+
}
169+
170+
@Override
171+
protected int getNumberCurrentConnections() {
172+
return concurrentConnections.get();
173+
}
174+
175+
@Override
176+
protected int incrementAndGetCurrentConcurrentConnections() {
177+
return concurrentConnections.incrementAndGet();
178+
}
179+
180+
@Override
181+
protected void decrementCurrentConcurrentConnections() {
182+
concurrentConnections.decrementAndGet();
183+
}
184+
}
185+
}

0 commit comments

Comments
 (0)