|
12 | 12 | import java.util.HashSet;
|
13 | 13 | import java.util.Map;
|
14 | 14 | import java.util.Set;
|
| 15 | +import java.util.concurrent.CountDownLatch; |
| 16 | +import java.util.concurrent.ExecutorService; |
| 17 | +import java.util.concurrent.Executors; |
15 | 18 |
|
16 | 19 | /**
|
17 | 20 | * <p>
|
@@ -50,6 +53,8 @@ public class AutoPilot {
|
50 | 53 | */
|
51 | 54 | private static final AutoPilotResult[] ZERO_AUTO_PILOT_RESULT_ARRAY = new AutoPilotResult[0];
|
52 | 55 |
|
| 56 | + private static final ExecutorService THREAD_POOL = Executors.newFixedThreadPool(20); |
| 57 | + |
53 | 58 | /**
|
54 | 59 | * <p>
|
55 | 60 | * Represents the AutoPilotSource instance that is used to retrieve a list of project ids to
|
@@ -158,37 +163,48 @@ public AutoPilotResult[] advanceProjects(long[] projectId, String operator) thro
|
158 | 163 |
|
159 | 164 | // Map key is Long (project id). Map value is AutoPilotResult instance.
|
160 | 165 | Map resMap = new HashMap();
|
161 |
| - for (int i = 0; i < projectId.length; i++) { |
162 |
| - AutoPilotResult result = null; |
163 |
| - Long longProjectId = new Long(projectId[i]); |
164 |
| - |
165 |
| - // Check if the project is processing by another thread |
166 |
| - synchronized (processingProjectIds) { |
167 |
| - if (processingProjectIds.contains(longProjectId)) { |
168 |
| - log.info(new LogMessage(null, operator, "Stopped in synchronized for projectId=" + longProjectId).toString()); |
169 |
| - continue; |
170 |
| - } else { |
171 |
| - processingProjectIds.add(longProjectId); |
172 |
| - } |
173 |
| - } |
174 |
| - |
175 |
| - try { |
176 |
| - result = advanceProject(projectId[i], operator); |
177 |
| - // store/aggregate into Map |
178 |
| - if (resMap.containsKey(longProjectId)) { |
179 |
| - // Aggregate the result only if at least one of counters > 0. |
180 |
| - if (result.getPhaseEndedCount() > 0 || result.getPhaseStartedCount() > 0) { |
181 |
| - ((AutoPilotResult) resMap.get(longProjectId)).aggregate(result); |
| 166 | + CountDownLatch latch = new CountDownLatch(projectId.length); |
| 167 | + for (long id : projectId) { |
| 168 | + THREAD_POOL.execute(() -> { |
| 169 | + AutoPilotResult result = null; |
| 170 | + Long longProjectId = new Long(id); |
| 171 | + |
| 172 | + // Check if the project is processing by another thread |
| 173 | + synchronized (processingProjectIds) { |
| 174 | + if (processingProjectIds.contains(longProjectId)) { |
| 175 | + log.info( |
| 176 | + new LogMessage(null, operator, "Stopped in synchronized for projectId=" + longProjectId) |
| 177 | + .toString()); |
| 178 | + return; |
| 179 | + } else { |
| 180 | + processingProjectIds.add(longProjectId); |
182 | 181 | }
|
183 |
| - } else { |
184 |
| - resMap.put(longProjectId, result); |
185 | 182 | }
|
186 |
| - } finally { |
187 |
| - // Make sure this project can be processed by next thread |
188 |
| - synchronized (processingProjectIds) { |
189 |
| - processingProjectIds.remove(longProjectId); |
| 183 | + |
| 184 | + try { |
| 185 | + result = advanceProject(longProjectId, operator); |
| 186 | + // store/aggregate into Map |
| 187 | + if (resMap.containsKey(longProjectId)) { |
| 188 | + // Aggregate the result only if at least one of counters > 0. |
| 189 | + if (result.getPhaseEndedCount() > 0 || result.getPhaseStartedCount() > 0) { |
| 190 | + ((AutoPilotResult) resMap.get(longProjectId)).aggregate(result); |
| 191 | + } |
| 192 | + } else { |
| 193 | + resMap.put(longProjectId, result); |
| 194 | + } |
| 195 | + } finally { |
| 196 | + // Make sure this project can be processed by next thread |
| 197 | + synchronized (processingProjectIds) { |
| 198 | + processingProjectIds.remove(longProjectId); |
| 199 | + latch.countDown(); |
| 200 | + } |
190 | 201 | }
|
191 |
| - } |
| 202 | + }); |
| 203 | + } |
| 204 | + try { |
| 205 | + latch.await(); |
| 206 | + } catch (InterruptedException E) { |
| 207 | + |
192 | 208 | }
|
193 | 209 |
|
194 | 210 | return (AutoPilotResult[]) resMap.values().toArray(new AutoPilotResult[resMap.size()]);
|
|
0 commit comments