Skip to content

Commit ebb5d3b

Browse files
feat: Convert internal interfaces to use protos (#1335)
* feat: Convert internal interfaces to use protos * feat: Convert internal interfaces to use protos
1 parent 4daf3ab commit ebb5d3b

38 files changed

+382
-232
lines changed

google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/cloudpubsub/internal/ResettableSubscriberFactory.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,9 @@
1717
package com.google.cloud.pubsublite.cloudpubsub.internal;
1818

1919
import com.google.api.gax.rpc.ApiException;
20-
import com.google.cloud.pubsublite.SequencedMessage;
2120
import com.google.cloud.pubsublite.internal.wire.Subscriber;
2221
import com.google.cloud.pubsublite.internal.wire.SubscriberResetHandler;
22+
import com.google.cloud.pubsublite.proto.SequencedMessage;
2323
import java.io.Serializable;
2424
import java.util.List;
2525
import java.util.function.Consumer;

google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/cloudpubsub/internal/SinglePartitionSubscriber.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -83,9 +83,10 @@ private boolean terminated() {
8383
}
8484

8585
@VisibleForTesting
86-
void onMessages(List<SequencedMessage> sequencedMessages) {
86+
void onMessages(List<com.google.cloud.pubsublite.proto.SequencedMessage> sequencedMessages) {
8787
try {
88-
for (SequencedMessage message : sequencedMessages) {
88+
for (com.google.cloud.pubsublite.proto.SequencedMessage proto : sequencedMessages) {
89+
SequencedMessage message = SequencedMessage.fromProto(proto);
8990
PubsubMessage userMessage = transformer.transform(message);
9091
long bytes = message.byteSize();
9192
Runnable trackerConsumer = ackSetTracker.track(message);

google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/cloudpubsub/internal/WrappingPublisher.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ public ApiFuture<String> publish(PubsubMessage message) {
7373
return ApiFutures.immediateFailedFuture(e.underlying);
7474
}
7575
return ApiFutures.transform(
76-
wirePublisher.publish(wireMessage),
76+
wirePublisher.publish(wireMessage.toProto()),
7777
MessageMetadata::encode,
7878
MoreExecutors.directExecutor());
7979
}

google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/BlockingPullSubscriber.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
package com.google.cloud.pubsublite.internal;
1818

1919
import com.google.api.core.ApiFuture;
20-
import com.google.cloud.pubsublite.SequencedMessage;
20+
import com.google.cloud.pubsublite.proto.SequencedMessage;
2121
import java.util.Optional;
2222
import javax.annotation.concurrent.ThreadSafe;
2323

google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/BlockingPullSubscriberImpl.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,12 +24,12 @@
2424
import com.google.api.core.ApiService.State;
2525
import com.google.api.core.SettableApiFuture;
2626
import com.google.api.gax.rpc.StatusCode;
27-
import com.google.cloud.pubsublite.SequencedMessage;
2827
import com.google.cloud.pubsublite.cloudpubsub.FlowControlSettings;
2928
import com.google.cloud.pubsublite.internal.wire.Subscriber;
3029
import com.google.cloud.pubsublite.internal.wire.SubscriberFactory;
3130
import com.google.cloud.pubsublite.internal.wire.SystemExecutors;
3231
import com.google.cloud.pubsublite.proto.FlowControlRequest;
32+
import com.google.cloud.pubsublite.proto.SequencedMessage;
3333
import com.google.errorprone.annotations.concurrent.GuardedBy;
3434
import java.util.ArrayDeque;
3535
import java.util.Collection;
@@ -110,7 +110,7 @@ public synchronized Optional<SequencedMessage> messageIfAvailable() throws Check
110110
underlying.allowFlow(
111111
FlowControlRequest.newBuilder()
112112
.setAllowedMessages(1)
113-
.setAllowedBytes(msg.byteSize())
113+
.setAllowedBytes(msg.getSizeBytes())
114114
.build());
115115
return Optional.of(msg);
116116
}

google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/BufferingPullSubscriber.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,12 +21,12 @@
2121
import com.google.api.core.ApiService.Listener;
2222
import com.google.api.core.ApiService.State;
2323
import com.google.cloud.pubsublite.Offset;
24-
import com.google.cloud.pubsublite.SequencedMessage;
2524
import com.google.cloud.pubsublite.cloudpubsub.FlowControlSettings;
2625
import com.google.cloud.pubsublite.internal.wire.Subscriber;
2726
import com.google.cloud.pubsublite.internal.wire.SubscriberFactory;
2827
import com.google.cloud.pubsublite.internal.wire.SystemExecutors;
2928
import com.google.cloud.pubsublite.proto.FlowControlRequest;
29+
import com.google.cloud.pubsublite.proto.SequencedMessage;
3030
import com.google.common.collect.ImmutableList;
3131
import com.google.common.collect.Iterables;
3232
import com.google.errorprone.annotations.concurrent.GuardedBy;
@@ -85,13 +85,13 @@ public synchronized List<SequencedMessage> pull() throws CheckedApiException {
8585
}
8686
Deque<SequencedMessage> collection = messages;
8787
messages = new ArrayDeque<>();
88-
long bytes = collection.stream().mapToLong(SequencedMessage::byteSize).sum();
88+
long bytes = collection.stream().mapToLong(SequencedMessage::getSizeBytes).sum();
8989
underlying.allowFlow(
9090
FlowControlRequest.newBuilder()
9191
.setAllowedBytes(bytes)
9292
.setAllowedMessages(collection.size())
9393
.build());
94-
lastDelivered = Optional.of(Iterables.getLast(collection).offset());
94+
lastDelivered = Optional.of(Offset.of(Iterables.getLast(collection).getCursor().getOffset()));
9595
return ImmutableList.copyOf(collection);
9696
}
9797

google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/Publisher.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818

1919
import com.google.api.core.ApiFuture;
2020
import com.google.api.core.ApiService;
21-
import com.google.cloud.pubsublite.Message;
21+
import com.google.cloud.pubsublite.proto.PubSubMessage;
2222
import java.io.Flushable;
2323

2424
/** A generic PubSub Lite publisher. Errors are handled out of band. Thread safe. */
@@ -28,7 +28,7 @@ public interface Publisher<ResponseT> extends ApiService, Flushable {
2828
//
2929
// Guarantees that if a single publish future has an exception set, all publish calls made after
3030
// that will also have an exception set.
31-
ApiFuture<ResponseT> publish(Message message);
31+
ApiFuture<ResponseT> publish(PubSubMessage message);
3232

3333
// Attempts to cancel all outstanding publishes.
3434
void cancelOutstandingPublishes();

google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/SequencedPublisher.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818

1919
import com.google.api.core.ApiFuture;
2020
import com.google.api.core.ApiService;
21-
import com.google.cloud.pubsublite.Message;
21+
import com.google.cloud.pubsublite.proto.PubSubMessage;
2222
import java.io.Flushable;
2323

2424
/**
@@ -35,7 +35,7 @@ public interface SequencedPublisher<ResponseT> extends ApiService, Flushable {
3535
* <p>Guarantees that if a single publish future has an exception set, all publish calls made
3636
* after that will also have an exception set.
3737
*/
38-
ApiFuture<ResponseT> publish(Message message, PublishSequenceNumber sequenceNumber);
38+
ApiFuture<ResponseT> publish(PubSubMessage message, PublishSequenceNumber sequenceNumber);
3939

4040
/** Attempts to cancel all outstanding publishes. */
4141
void cancelOutstandingPublishes();

google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/ApiExceptionPublisher.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,9 @@
2020

2121
import com.google.api.core.ApiFuture;
2222
import com.google.api.gax.rpc.ApiException;
23-
import com.google.cloud.pubsublite.Message;
2423
import com.google.cloud.pubsublite.internal.ProxyService;
2524
import com.google.cloud.pubsublite.internal.Publisher;
25+
import com.google.cloud.pubsublite.proto.PubSubMessage;
2626
import java.io.IOException;
2727

2828
public class ApiExceptionPublisher<T> extends ProxyService implements Publisher<T> {
@@ -34,7 +34,7 @@ public class ApiExceptionPublisher<T> extends ProxyService implements Publisher<
3434
}
3535

3636
@Override
37-
public ApiFuture<T> publish(Message message) {
37+
public ApiFuture<T> publish(PubSubMessage message) {
3838
return toClientFuture(publisher.publish(message));
3939
}
4040

google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/ConnectedSubscriberFactory.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616

1717
package com.google.cloud.pubsublite.internal.wire;
1818

19-
import com.google.cloud.pubsublite.SequencedMessage;
19+
import com.google.cloud.pubsublite.proto.SequencedMessage;
2020
import com.google.cloud.pubsublite.proto.SubscribeRequest;
2121
import com.google.cloud.pubsublite.proto.SubscribeResponse;
2222
import java.util.List;

google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/ConnectedSubscriberImpl.java

Lines changed: 3 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -20,16 +20,15 @@
2020

2121
import com.google.api.gax.rpc.ResponseObserver;
2222
import com.google.api.gax.rpc.StatusCode.Code;
23-
import com.google.cloud.pubsublite.SequencedMessage;
2423
import com.google.cloud.pubsublite.internal.CheckedApiException;
2524
import com.google.cloud.pubsublite.internal.wire.StreamFactories.SubscribeStreamFactory;
2625
import com.google.cloud.pubsublite.proto.FlowControlRequest;
2726
import com.google.cloud.pubsublite.proto.MessageResponse;
27+
import com.google.cloud.pubsublite.proto.SequencedMessage;
2828
import com.google.cloud.pubsublite.proto.SubscribeRequest;
2929
import com.google.cloud.pubsublite.proto.SubscribeResponse;
3030
import com.google.common.base.Preconditions;
3131
import java.util.List;
32-
import java.util.stream.Collectors;
3332

3433
class ConnectedSubscriberImpl
3534
extends SingleConnection<SubscribeRequest, SubscribeResponse, List<SequencedMessage>>
@@ -103,14 +102,10 @@ private void onMessages(MessageResponse response) throws CheckedApiException {
103102
response.getMessagesCount() > 0,
104103
"Received an empty MessageResponse on stream with initial request %s.",
105104
initialRequest);
106-
List<SequencedMessage> messages =
107-
response.getMessagesList().stream()
108-
.map(SequencedMessage::fromProto)
109-
.collect(Collectors.toList());
110105
checkState(
111-
Predicates.isOrdered(messages),
106+
Predicates.isOrdered(response.getMessagesList()),
112107
"Received out of order messages on the stream with initial request %s.",
113108
initialRequest);
114-
sendToClient(messages);
109+
sendToClient(response.getMessagesList());
115110
}
116111
}

google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/FlowControlBatcher.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,9 @@
1616

1717
package com.google.cloud.pubsublite.internal.wire;
1818

19-
import com.google.cloud.pubsublite.SequencedMessage;
2019
import com.google.cloud.pubsublite.internal.CheckedApiException;
2120
import com.google.cloud.pubsublite.proto.FlowControlRequest;
21+
import com.google.cloud.pubsublite.proto.SequencedMessage;
2222
import java.util.Collection;
2323
import java.util.Optional;
2424

@@ -40,7 +40,7 @@ void onClientFlowRequest(FlowControlRequest request) throws CheckedApiException
4040
}
4141

4242
void onMessages(Collection<SequencedMessage> received) throws CheckedApiException {
43-
long byteSize = received.stream().mapToLong(SequencedMessage::byteSize).sum();
43+
long byteSize = received.stream().mapToLong(SequencedMessage::getSizeBytes).sum();
4444
clientTokens.sub(byteSize, received.size());
4545
}
4646

google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/NextOffsetTracker.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,10 +21,10 @@
2121
import static com.google.cloud.pubsublite.internal.wire.Predicates.isOrdered;
2222

2323
import com.google.cloud.pubsublite.Offset;
24-
import com.google.cloud.pubsublite.SequencedMessage;
2524
import com.google.cloud.pubsublite.internal.CheckedApiException;
2625
import com.google.cloud.pubsublite.proto.Cursor;
2726
import com.google.cloud.pubsublite.proto.SeekRequest;
27+
import com.google.cloud.pubsublite.proto.SequencedMessage;
2828
import com.google.common.collect.Iterables;
2929
import java.util.Collection;
3030
import java.util.Optional;
@@ -37,13 +37,13 @@ public class NextOffsetTracker {
3737
void onMessages(Collection<SequencedMessage> messages) throws CheckedApiException {
3838
checkArgument(!messages.isEmpty());
3939
checkArgument(isOrdered(messages));
40-
Offset firstMessageOffset = messages.iterator().next().offset();
40+
long firstMessageOffset = messages.iterator().next().getCursor().getOffset();
4141
checkState(
42-
!nextOffset.isPresent() || (nextOffset.get().value() <= firstMessageOffset.value()),
42+
!nextOffset.isPresent() || (nextOffset.get().value() <= firstMessageOffset),
4343
String.format(
4444
"Received message with offset %s older than known cursor location %s.",
4545
firstMessageOffset, nextOffset));
46-
nextOffset = Optional.of(Offset.of(Iterables.getLast(messages).offset().value() + 1));
46+
nextOffset = Optional.of(Offset.of(Iterables.getLast(messages).getCursor().getOffset() + 1));
4747
}
4848

4949
// Gives the SeekRequest that should be sent on restart, or empty if none should be sent because

google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/PartitionCountWatchingPublisher.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -24,14 +24,14 @@
2424
import com.google.api.core.ApiFuture;
2525
import com.google.api.core.ApiFutures;
2626
import com.google.api.core.ApiService;
27-
import com.google.cloud.pubsublite.Message;
2827
import com.google.cloud.pubsublite.MessageMetadata;
2928
import com.google.cloud.pubsublite.Partition;
3029
import com.google.cloud.pubsublite.internal.CheckedApiException;
3130
import com.google.cloud.pubsublite.internal.CloseableMonitor;
3231
import com.google.cloud.pubsublite.internal.ProxyService;
3332
import com.google.cloud.pubsublite.internal.Publisher;
3433
import com.google.cloud.pubsublite.internal.RoutingPolicy;
34+
import com.google.cloud.pubsublite.proto.PubSubMessage;
3535
import com.google.common.collect.ImmutableMap;
3636
import com.google.common.flogger.GoogleLogger;
3737
import com.google.errorprone.annotations.concurrent.GuardedBy;
@@ -56,12 +56,12 @@ private PartitionsWithRouting(
5656
this.routingPolicy = routingPolicy;
5757
}
5858

59-
public ApiFuture<MessageMetadata> publish(Message message) throws CheckedApiException {
59+
public ApiFuture<MessageMetadata> publish(PubSubMessage message) throws CheckedApiException {
6060
try {
6161
Partition routedPartition =
62-
message.key().isEmpty()
62+
message.getKey().isEmpty()
6363
? routingPolicy.routeWithoutKey()
64-
: routingPolicy.route(message.key());
64+
: routingPolicy.route(message.getKey());
6565
checkState(
6666
publishers.containsKey(routedPartition),
6767
"Routed to partition %s for which there is no publisher available.",
@@ -108,7 +108,7 @@ public void stop() {
108108
}
109109

110110
@Override
111-
public ApiFuture<MessageMetadata> publish(Message message) {
111+
public ApiFuture<MessageMetadata> publish(PubSubMessage message) {
112112
Optional<PartitionsWithRouting> partitions;
113113
try (CloseableMonitor.Hold h = monitor.enter()) {
114114
partitions = partitionsWithRouting;

google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/PartitionCountWatchingPublisherSettings.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,9 @@
1717

1818
import com.google.api.gax.rpc.ApiException;
1919
import com.google.auto.value.AutoValue;
20-
import com.google.cloud.pubsublite.*;
20+
import com.google.cloud.pubsublite.AdminClient;
21+
import com.google.cloud.pubsublite.MessageMetadata;
22+
import com.google.cloud.pubsublite.TopicPath;
2123
import com.google.cloud.pubsublite.internal.AlarmFactory;
2224
import com.google.cloud.pubsublite.internal.DefaultRoutingPolicy;
2325
import com.google.cloud.pubsublite.internal.Publisher;

google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/Predicates.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,13 +16,14 @@
1616

1717
package com.google.cloud.pubsublite.internal.wire;
1818

19-
import com.google.cloud.pubsublite.SequencedMessage;
19+
import com.google.cloud.pubsublite.proto.SequencedMessage;
2020
import com.google.common.collect.Ordering;
2121
import java.util.Comparator;
2222

2323
public final class Predicates {
2424
public static boolean isOrdered(Iterable<SequencedMessage> messages) {
25-
return Ordering.from(Comparator.comparingLong((SequencedMessage m) -> m.offset().value()))
25+
return Ordering.from(
26+
Comparator.comparingLong((SequencedMessage m) -> m.getCursor().getOffset()))
2627
.isStrictlyOrdered(messages);
2728
}
2829

google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/PublisherImpl.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@
2828
import com.google.api.gax.rpc.ApiException;
2929
import com.google.api.gax.rpc.StatusCode.Code;
3030
import com.google.cloud.pubsublite.Constants;
31-
import com.google.cloud.pubsublite.Message;
3231
import com.google.cloud.pubsublite.Offset;
3332
import com.google.cloud.pubsublite.internal.AlarmFactory;
3433
import com.google.cloud.pubsublite.internal.CheckedApiException;
@@ -246,8 +245,7 @@ private void terminateOutstandingPublishes(CheckedApiException e) {
246245
}
247246

248247
@Override
249-
public ApiFuture<Offset> publish(Message message, PublishSequenceNumber sequenceNumber) {
250-
PubSubMessage proto = message.toProto();
248+
public ApiFuture<Offset> publish(PubSubMessage message, PublishSequenceNumber sequenceNumber) {
251249
try (CloseableMonitor.Hold h = batcherMonitor.enter()) {
252250
ApiService.State currentState = state();
253251
switch (currentState) {
@@ -258,7 +256,7 @@ public ApiFuture<Offset> publish(Message message, PublishSequenceNumber sequence
258256
Code.FAILED_PRECONDITION);
259257
case STARTING:
260258
case RUNNING:
261-
return batcher.add(proto, sequenceNumber);
259+
return batcher.add(message, sequenceNumber);
262260
default:
263261
throw new CheckedApiException(
264262
"Cannot publish when Publisher state is " + currentState.name(),

google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/RoutingPublisher.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,13 +22,13 @@
2222
import com.google.api.core.ApiFuture;
2323
import com.google.api.core.ApiFutures;
2424
import com.google.api.gax.rpc.ApiException;
25-
import com.google.cloud.pubsublite.Message;
2625
import com.google.cloud.pubsublite.MessageMetadata;
2726
import com.google.cloud.pubsublite.Partition;
2827
import com.google.cloud.pubsublite.internal.CheckedApiException;
2928
import com.google.cloud.pubsublite.internal.ProxyService;
3029
import com.google.cloud.pubsublite.internal.Publisher;
3130
import com.google.cloud.pubsublite.internal.RoutingPolicy;
31+
import com.google.cloud.pubsublite.proto.PubSubMessage;
3232
import java.io.IOException;
3333
import java.util.Map;
3434

@@ -46,10 +46,10 @@ public class RoutingPublisher extends ProxyService implements Publisher<MessageM
4646

4747
// Publisher implementation.
4848
@Override
49-
public ApiFuture<MessageMetadata> publish(Message message) {
49+
public ApiFuture<MessageMetadata> publish(PubSubMessage message) {
5050
try {
5151
Partition routedPartition =
52-
message.key().isEmpty() ? policy.routeWithoutKey() : policy.route(message.key());
52+
message.getKey().isEmpty() ? policy.routeWithoutKey() : policy.route(message.getKey());
5353
checkState(
5454
partitionPublishers.containsKey(routedPartition),
5555
"Routed to partition %s for which there is no publisher available.",

google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/SequenceAssigningPublisher.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,12 @@
1818

1919
import com.google.api.core.ApiFuture;
2020
import com.google.api.gax.rpc.ApiException;
21-
import com.google.cloud.pubsublite.Message;
2221
import com.google.cloud.pubsublite.Offset;
2322
import com.google.cloud.pubsublite.internal.ProxyService;
2423
import com.google.cloud.pubsublite.internal.PublishSequenceNumber;
2524
import com.google.cloud.pubsublite.internal.Publisher;
2625
import com.google.cloud.pubsublite.internal.SequencedPublisher;
26+
import com.google.cloud.pubsublite.proto.PubSubMessage;
2727
import java.io.IOException;
2828
import javax.annotation.concurrent.GuardedBy;
2929

@@ -44,7 +44,7 @@ public class SequenceAssigningPublisher extends ProxyService implements Publishe
4444

4545
// Publisher implementation.
4646
@Override
47-
public synchronized ApiFuture<Offset> publish(Message message) {
47+
public synchronized ApiFuture<Offset> publish(PubSubMessage message) {
4848
ApiFuture<Offset> future = publisher.publish(message, nextSequence);
4949
nextSequence = nextSequence.next();
5050
return future;

0 commit comments

Comments
 (0)