Skip to content

Commit 5a435fa

Browse files
Merge pull request #2 from kamalaboulhosn/ML_experiments
Add examples for limited and unlimited exeuctors
2 parents c3a5725 + 2e58cd5 commit 5a435fa

File tree

2 files changed

+266
-0
lines changed

2 files changed

+266
-0
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,172 @@
1+
/*
2+
* Copyright 2016 Google LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package pubsub;
18+
19+
import com.google.api.gax.core.FixedExecutorProvider;
20+
import com.google.api.gax.grpc.GrpcTransportChannel;
21+
import com.google.api.gax.rpc.FixedTransportChannelProvider;
22+
import com.google.api.gax.rpc.TransportChannelProvider;
23+
import com.google.cloud.pubsub.v1.AckReplyConsumer;
24+
import com.google.cloud.pubsub.v1.MessageReceiver;
25+
import com.google.cloud.pubsub.v1.Subscriber;
26+
import com.google.common.util.concurrent.ThreadFactoryBuilder;
27+
import com.google.pubsub.v1.ProjectSubscriptionName;
28+
import com.google.pubsub.v1.PubsubMessage;
29+
import io.grpc.ManagedChannel;
30+
import io.grpc.ManagedChannelBuilder;
31+
import java.io.IOException;
32+
import java.util.ArrayList;
33+
import java.util.List;
34+
import java.util.Set;
35+
import java.util.concurrent.Executor;
36+
import java.util.concurrent.ScheduledThreadPoolExecutor;
37+
import java.util.concurrent.ThreadFactory;
38+
import java.util.concurrent.TimeUnit;
39+
import java.util.concurrent.TimeoutException;
40+
import java.util.concurrent.atomic.AtomicLong;
41+
42+
public class SubscribeAsyncLimitedConcurrencyExample {
43+
public static void main(String... args) throws Exception {
44+
// TODO(developer): Replace these variables before running the sample.
45+
String projectId = "my-project";
46+
String subscriptionId = "my-subscription";
47+
48+
subscribeAsyncLimitedConcurrencyExample(projectId, subscriptionId);
49+
}
50+
51+
static final int MAX_INBOUND_MESSAGE_SIZE = 20 * 1024 * 1024; // 20MB API maximum message size.
52+
static final int MAX_INBOUND_METADATA_SIZE = 4 * 1024 * 1024; // 4MB API maximum metadata size
53+
54+
private static ManagedChannel createSingleChannel(
55+
String serviceAddress, int port, Executor executor, Executor offloadExecutor)
56+
throws IOException {
57+
ManagedChannelBuilder<?> builder;
58+
builder = ManagedChannelBuilder.forAddress(serviceAddress, port);
59+
builder =
60+
builder
61+
.executor(executor)
62+
.offloadExecutor(offloadExecutor)
63+
.maxInboundMessageSize(MAX_INBOUND_MESSAGE_SIZE)
64+
.maxInboundMetadataSize(MAX_INBOUND_METADATA_SIZE)
65+
.keepAliveTime(30, TimeUnit.SECONDS);
66+
67+
ManagedChannel managedChannel = builder.build();
68+
return managedChannel;
69+
}
70+
71+
public static void subscribeAsyncLimitedConcurrencyExample(
72+
String projectId, String subscriptionId) {
73+
final int subCount = 100;
74+
final int transportChannelCount = 20;
75+
final AtomicLong receivedCount = new AtomicLong();
76+
77+
// Instantiate an asynchronous message receiver.
78+
MessageReceiver receiver =
79+
(PubsubMessage message, AckReplyConsumer consumer) -> {
80+
// Handle incoming message, then ack the received message.
81+
consumer.ack();
82+
long currentCount = receivedCount.incrementAndGet();
83+
if (currentCount % 100 == 0) {
84+
System.out.println("Received " + currentCount);
85+
}
86+
};
87+
88+
ThreadFactory callbackThreadFactory =
89+
new ThreadFactoryBuilder().setNameFormat("callback-pool-%d").build();
90+
ScheduledThreadPoolExecutor callbackExecutor =
91+
new ScheduledThreadPoolExecutor(10, callbackThreadFactory);
92+
callbackExecutor.setMaximumPoolSize(10);
93+
FixedExecutorProvider callbackExecutorProvider = FixedExecutorProvider.create(callbackExecutor);
94+
ThreadFactory leaseThreadFactory =
95+
new ThreadFactoryBuilder().setNameFormat("lease-pool-%d").build();
96+
ScheduledThreadPoolExecutor leaseExecutor =
97+
new ScheduledThreadPoolExecutor(10, leaseThreadFactory);
98+
leaseExecutor.setMaximumPoolSize(10);
99+
FixedExecutorProvider leaseExecutorProvider = FixedExecutorProvider.create(leaseExecutor);
100+
ThreadFactory channelThreadFactory =
101+
new ThreadFactoryBuilder().setNameFormat("channel-pool-%d").build();
102+
ScheduledThreadPoolExecutor channelExecutor =
103+
new ScheduledThreadPoolExecutor(10, channelThreadFactory);
104+
ThreadFactory channelOffloadThreadFactory =
105+
new ThreadFactoryBuilder().setNameFormat("channel-offload-pool-%d").build();
106+
ScheduledThreadPoolExecutor channelOffloadExecutor =
107+
new ScheduledThreadPoolExecutor(10, channelOffloadThreadFactory);
108+
109+
ArrayList<TransportChannelProvider> transportChannelProviders =
110+
new ArrayList<>(transportChannelCount);
111+
112+
for (int i = 0; i < transportChannelCount; ++i) {
113+
TransportChannelProvider channelProvider = null;
114+
try {
115+
channelProvider =
116+
FixedTransportChannelProvider.create(
117+
GrpcTransportChannel.create(
118+
createSingleChannel(
119+
"pubsub.googleapis.com", 443, channelExecutor, channelOffloadExecutor)));
120+
transportChannelProviders.add(channelProvider);
121+
} catch (Exception e) {
122+
System.out.println("Could not create channel provider: " + e);
123+
return;
124+
}
125+
}
126+
127+
List<Subscriber> subscribers = new ArrayList<>();
128+
for (int i = 0; i < subCount; ++i) {
129+
130+
ProjectSubscriptionName subscriptionName =
131+
ProjectSubscriptionName.of(projectId, subscriptionId + i);
132+
Subscriber subscriber = null;
133+
subscriber =
134+
Subscriber.newBuilder(subscriptionName, receiver)
135+
.setChannelProvider(transportChannelProviders.get(i % transportChannelCount))
136+
.setExecutorProvider(callbackExecutorProvider)
137+
.setSystemExecutorProvider(leaseExecutorProvider)
138+
.build();
139+
// Start the subscriber.
140+
subscriber.startAsync().awaitRunning();
141+
subscribers.add(subscriber);
142+
}
143+
printThreads();
144+
System.out.println("Listening for messages for 30s before checking threads again.");
145+
try {
146+
Thread.sleep(30000);
147+
} catch (Exception e) {
148+
System.out.println("Could not sleep: " + e);
149+
return;
150+
}
151+
printThreads();
152+
153+
for (Subscriber subscriber : subscribers) {
154+
try {
155+
// Allow the subscriber to run for 30s unless an unrecoverable error occurs.
156+
subscriber.awaitTerminated(120, TimeUnit.SECONDS);
157+
} catch (TimeoutException timeoutException) {
158+
// Shut down the subscriber after 30s. Stop receiving messages.
159+
subscriber.stopAsync();
160+
}
161+
}
162+
}
163+
164+
private static void printThreads() {
165+
System.out.println("Thread names:");
166+
Set<Thread> threadSet = Thread.getAllStackTraces().keySet();
167+
for (Thread t : threadSet) {
168+
System.out.println("\t" + t.getName());
169+
}
170+
System.out.printf("Thread count: %d\n", Thread.activeCount());
171+
}
172+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
/*
2+
* Copyright 2016 Google LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package pubsub;
18+
19+
import com.google.cloud.pubsub.v1.AckReplyConsumer;
20+
import com.google.cloud.pubsub.v1.MessageReceiver;
21+
import com.google.cloud.pubsub.v1.Subscriber;
22+
import com.google.pubsub.v1.ProjectSubscriptionName;
23+
import com.google.pubsub.v1.PubsubMessage;
24+
import java.util.ArrayList;
25+
import java.util.List;
26+
import java.util.Set;
27+
import java.util.concurrent.TimeUnit;
28+
import java.util.concurrent.TimeoutException;
29+
import java.util.concurrent.atomic.AtomicLong;
30+
31+
public class SubscribeAsyncUnlimitedConcurrencyExample {
32+
public static void main(String... args) throws Exception {
33+
// TODO(developer): Replace these variables before running the sample.
34+
String projectId = "my-project";
35+
String subscriptionId = "my-subscription";
36+
37+
subscribeAsyncUnlimitedConcurrencyExample(projectId, subscriptionId);
38+
}
39+
40+
public static void subscribeAsyncUnlimitedConcurrencyExample(
41+
String projectId, String subscriptionId) {
42+
final int subCount = 100;
43+
final AtomicLong receivedCount = new AtomicLong();
44+
45+
// Instantiate an asynchronous message receiver.
46+
MessageReceiver receiver =
47+
(PubsubMessage message, AckReplyConsumer consumer) -> {
48+
// Handle incoming message, then ack the received message.
49+
consumer.ack();
50+
long currentCount = receivedCount.incrementAndGet();
51+
if (currentCount % 100 == 0) {
52+
System.out.println("Received " + currentCount);
53+
}
54+
};
55+
56+
List<Subscriber> subscribers = new ArrayList<>();
57+
for (int i = 0; i < subCount; ++i) {
58+
ProjectSubscriptionName subscriptionName =
59+
ProjectSubscriptionName.of(projectId, subscriptionId + i);
60+
Subscriber subscriber = null;
61+
subscriber = Subscriber.newBuilder(subscriptionName, receiver).build();
62+
// Start the subscriber.
63+
subscriber.startAsync().awaitRunning();
64+
subscribers.add(subscriber);
65+
}
66+
printThreads();
67+
System.out.println("Listening for messages for 30s before checking threads again.");
68+
try {
69+
Thread.sleep(30000);
70+
} catch (Exception e) {
71+
System.out.println("Could not sleep: " + e);
72+
return;
73+
}
74+
printThreads();
75+
for (Subscriber subscriber : subscribers) {
76+
try {
77+
// Allow the subscriber to run for 30s unless an unrecoverable error occurs.
78+
subscriber.awaitTerminated(300, TimeUnit.SECONDS);
79+
} catch (TimeoutException timeoutException) {
80+
// Shut down the subscriber after 30s. Stop receiving messages.
81+
subscriber.stopAsync();
82+
}
83+
}
84+
}
85+
86+
private static void printThreads() {
87+
System.out.println("Thread names:");
88+
Set<Thread> threadSet = Thread.getAllStackTraces().keySet();
89+
for (Thread t : threadSet) {
90+
System.out.println("\t" + t.getName());
91+
}
92+
System.out.printf("Thread count: %d\n", Thread.activeCount());
93+
}
94+
}

0 commit comments

Comments
 (0)