Skip to content

Commit 4b8d399

Browse files
committed
Part of #1260: write a manually run concurrency test to tease out problem with pool retention (#1265)
1 parent 33c4260 commit 4b8d399

File tree

1 file changed

+175
-0
lines changed

1 file changed

+175
-0
lines changed
+175
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,175 @@
1+
package perf;
2+
3+
import java.io.*;
4+
import java.nio.charset.StandardCharsets;
5+
import java.util.Arrays;
6+
import java.util.List;
7+
import java.util.Random;
8+
import java.util.concurrent.ExecutorService;
9+
import java.util.concurrent.Executors;
10+
import java.util.concurrent.TimeUnit;
11+
import java.util.concurrent.atomic.AtomicInteger;
12+
import java.util.concurrent.atomic.AtomicLong;
13+
14+
import com.fasterxml.jackson.core.JsonFactory;
15+
import com.fasterxml.jackson.core.JsonGenerator;
16+
import com.fasterxml.jackson.core.JsonParser;
17+
import com.fasterxml.jackson.core.util.BufferRecycler;
18+
import com.fasterxml.jackson.core.util.JsonRecyclerPools;
19+
import com.fasterxml.jackson.core.util.RecyclerPool;
20+
21+
/**
22+
* High-concurrency test that tries to see if unbounded {@link RecyclerPool}
23+
* implementations grow without bounds or not.
24+
*/
25+
public class RecyclerPoolTest
26+
{
27+
final static int THREAD_COUNT = 100;
28+
29+
final static int RUNTIME_SECS = 60;
30+
31+
private final int _threadCount;
32+
33+
RecyclerPoolTest(int threadCount) {
34+
_threadCount = threadCount;
35+
}
36+
37+
public String testPool(JsonFactory jsonF, int runtimeMinutes)
38+
throws InterruptedException
39+
{
40+
RecyclerPool<BufferRecycler> poolImpl = jsonF._getRecyclerPool();
41+
42+
final String poolName = poolImpl.getClass().getSimpleName();
43+
final ExecutorService exec = Executors.newFixedThreadPool(_threadCount);
44+
final AtomicLong calls = new AtomicLong();
45+
final long startTime = System.currentTimeMillis();
46+
final long runtimeMsecs = TimeUnit.SECONDS.toMillis(runtimeMinutes);
47+
final long endtimeMsecs = startTime + runtimeMsecs;
48+
final AtomicInteger threadsRunning = new AtomicInteger();
49+
50+
System.out.printf("Starting test of '%s' with %d threads, for %d seconds.\n",
51+
poolImpl.getClass().getName(),
52+
_threadCount, runtimeMsecs / 1000L);
53+
54+
for (int i = 0; i < _threadCount; ++i) {
55+
final int id = i;
56+
threadsRunning.incrementAndGet();
57+
exec.execute(new Runnable() {
58+
@Override
59+
public void run() {
60+
testUntil(jsonF, endtimeMsecs, id, calls);
61+
threadsRunning.decrementAndGet();
62+
}
63+
});
64+
}
65+
66+
long currentTime;
67+
long nextPrint = 0L;
68+
// Print if exceeds threshold (3 x threadcount), otherwise every 2.5 seconds
69+
final int thresholdToPrint = _threadCount * 3;
70+
int maxPooled = 0;
71+
72+
while ((currentTime = System.currentTimeMillis()) < endtimeMsecs) {
73+
int poolSize;
74+
75+
if ((poolSize = poolImpl.pooledCount()) > thresholdToPrint
76+
|| (currentTime > nextPrint)) {
77+
double secs = (currentTime - startTime) / 1000.0;
78+
System.out.printf(" (%s) %.1fs, %dk calls; %d threads; pool size: %d (max seen: %d)\n",
79+
poolName, secs, calls.get()>>10, threadsRunning.get(), poolSize, maxPooled);
80+
if (poolSize > maxPooled) {
81+
maxPooled = poolSize;
82+
}
83+
Thread.sleep(100L);
84+
nextPrint = currentTime + 2500L;
85+
}
86+
}
87+
88+
String desc = String.format("Completed test of '%s': max size seen = %d",
89+
poolName, maxPooled);
90+
System.out.printf("%s. Wait termination of threads..\n", desc);
91+
if (!exec.awaitTermination(2000, TimeUnit.MILLISECONDS)) {
92+
System.out.printf("WARNING: ExecutorService.awaitTermination() failed: %d threads left; will shut down.\n",
93+
threadsRunning.get());
94+
exec.shutdown();
95+
}
96+
return desc;
97+
}
98+
99+
void testUntil(JsonFactory jsonF,
100+
long endTimeMsecs, int threadId, AtomicLong calls)
101+
{
102+
final Random rnd = new Random(threadId);
103+
final byte[] JSON_INPUT = "\"abc\"".getBytes(StandardCharsets.UTF_8);
104+
105+
while (System.currentTimeMillis() < endTimeMsecs) {
106+
try {
107+
// Randomize call order a bit
108+
switch (rnd.nextInt() & 3) {
109+
case 0:
110+
_testRead(jsonF, JSON_INPUT);
111+
break;
112+
case 1:
113+
_testWrite(jsonF);
114+
break;
115+
case 2:
116+
_testRead(jsonF, JSON_INPUT);
117+
_testWrite(jsonF);
118+
break;
119+
default:
120+
_testWrite(jsonF);
121+
_testRead(jsonF, JSON_INPUT);
122+
break;
123+
}
124+
} catch (Exception e) {
125+
System.err.printf("ERROR: thread %d fail, will exit: (%s) %s\n",
126+
threadId, e.getClass().getName(), e.getMessage());
127+
break;
128+
}
129+
calls.incrementAndGet();
130+
}
131+
}
132+
133+
private void _testRead(JsonFactory jsonF, byte[] input) throws Exception
134+
{
135+
JsonParser p = jsonF.createParser(new ByteArrayInputStream(input));
136+
while (p.nextToken() != null) {
137+
;
138+
}
139+
p.close();
140+
}
141+
142+
private void _testWrite(JsonFactory jsonF) throws Exception
143+
{
144+
StringWriter w = new StringWriter(16);
145+
JsonGenerator g = jsonF.createGenerator(w);
146+
g.writeStartArray();
147+
g.writeString("foobar");
148+
g.writeEndArray();
149+
g.close();
150+
}
151+
152+
public static void main(String[] args) throws Exception
153+
{
154+
RecyclerPoolTest test = new RecyclerPoolTest(THREAD_COUNT);
155+
List<String> results = Arrays.asList(
156+
test.testPool(JsonFactory.builder()
157+
.recyclerPool(JsonRecyclerPools.newConcurrentDequePool())
158+
.build(),
159+
RUNTIME_SECS),
160+
test.testPool(JsonFactory.builder()
161+
.recyclerPool(JsonRecyclerPools.newBoundedPool(THREAD_COUNT - 5))
162+
.build(),
163+
RUNTIME_SECS),
164+
test.testPool(JsonFactory.builder()
165+
.recyclerPool(JsonRecyclerPools.newLockFreePool())
166+
.build(),
167+
RUNTIME_SECS)
168+
);
169+
170+
System.out.println("Tests complete! Results:\n");
171+
for (String result : results) {
172+
System.out.printf(" * %s\n", result);
173+
}
174+
}
175+
}

0 commit comments

Comments
 (0)