Skip to content

Commit 74d8da9

Browse files
feat: Receipt modack (#1540)
* receipt-modack for exactly once * changing setup * changing the pendingReceipt List * using scheduled fixed rate * using blocked queues * using blocked queues * using blocked queues * adding null safety * adding null safety * removing list * adding list back * if permanent failure, remove outstandingmsg from queue * adding snippet of test * adding method to streaming subscriber * adding method to streaming subscriber * adding notifyAcks * changing notifyAckFailed calls * addressing some comments * changed logic to use one datastructure * fixing notifyFailed * fixing notifyFailed * changing Pair to custom class * removing the not needed data structure * Fixing test * Fixing test * Fixing test * Fixing test * fixing format * fixing test to call receiveMessage * testing test failure * testing test failure * testing test failure * increasing timestamp to test * increasing timestamp to test * adding log statement for testing * Fixing lint * Adding more logs * batch size log * changing method to syncronized * fixing for loop to not remove as we are iterating * trying a concurrent map * fix: syncronizing notifyFailed * fix: removing unused import * fix: reformat * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md * fix: removing System.out.println statements * fix: reviewign comments * fix: lint * adding another ordering key test example * fix: trying to run this test again * fix: trying to run this test again * fix: removing commented code * fix: removing commented code * resolving the comments from review * adding custom matcher * adding custom matcher * adding custom matcher * adding custom matcher * adding custom matcher correcting the matching statement * lint * removing comments * removing comments * removing comments * changing messageMatcher to messageDataMatcher, and fixing other nit things * lint * addressing review comments * addressing review comments --------- Co-authored-by: Owl Bot <gcf-owl-bot[bot]@users.noreply.github.com>
1 parent 9634a48 commit 74d8da9

File tree

4 files changed

+200
-4
lines changed

4 files changed

+200
-4
lines changed

google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher.java

Lines changed: 67 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,11 @@
3030
import com.google.pubsub.v1.ReceivedMessage;
3131
import java.util.ArrayList;
3232
import java.util.HashMap;
33+
import java.util.Iterator;
34+
import java.util.LinkedHashMap;
3335
import java.util.List;
3436
import java.util.Map;
37+
import java.util.Map.Entry;
3538
import java.util.concurrent.ConcurrentHashMap;
3639
import java.util.concurrent.ConcurrentMap;
3740
import java.util.concurrent.Executor;
@@ -89,7 +92,8 @@ class MessageDispatcher {
8992
private final LinkedBlockingQueue<AckRequestData> pendingAcks = new LinkedBlockingQueue<>();
9093
private final LinkedBlockingQueue<AckRequestData> pendingNacks = new LinkedBlockingQueue<>();
9194
private final LinkedBlockingQueue<AckRequestData> pendingReceipts = new LinkedBlockingQueue<>();
92-
95+
private final LinkedHashMap<String, ReceiptCompleteData> outstandingReceipts =
96+
new LinkedHashMap<String, ReceiptCompleteData>();
9397
private final AtomicInteger messageDeadlineSeconds = new AtomicInteger();
9498
private final AtomicBoolean extendDeadline = new AtomicBoolean(true);
9599
private final Lock jobLock;
@@ -350,6 +354,28 @@ private OutstandingMessage(ReceivedMessage receivedMessage, AckHandler ackHandle
350354
}
351355
}
352356

357+
private static class ReceiptCompleteData {
358+
private OutstandingMessage outstandingMessage;
359+
private Boolean receiptComplete;
360+
361+
private ReceiptCompleteData(OutstandingMessage outstandingMessage) {
362+
this.outstandingMessage = outstandingMessage;
363+
this.receiptComplete = false;
364+
}
365+
366+
private OutstandingMessage getOutstandingMessage() {
367+
return this.outstandingMessage;
368+
}
369+
370+
private Boolean isReceiptComplete() {
371+
return this.receiptComplete;
372+
}
373+
374+
private void notifyReceiptComplete() {
375+
this.receiptComplete = true;
376+
}
377+
}
378+
353379
void processReceivedMessages(List<ReceivedMessage> messages) {
354380
Instant totalExpiration = now().plus(maxAckExtensionPeriod);
355381
List<OutstandingMessage> outstandingBatch = new ArrayList<>(messages.size());
@@ -361,7 +387,13 @@ void processReceivedMessages(List<ReceivedMessage> messages) {
361387
AckRequestData ackRequestData = builder.build();
362388
AckHandler ackHandler =
363389
new AckHandler(ackRequestData, message.getMessage().getSerializedSize(), totalExpiration);
364-
if (pendingMessages.putIfAbsent(message.getAckId(), ackHandler) != null) {
390+
OutstandingMessage outstandingMessage = new OutstandingMessage(message, ackHandler);
391+
392+
if (this.exactlyOnceDeliveryEnabled.get()) {
393+
// For exactly once deliveries we don't add to outstanding batch because we first
394+
// process the receipt modack. If that is successful then we process the message.
395+
outstandingReceipts.put(message.getAckId(), new ReceiptCompleteData(outstandingMessage));
396+
} else if (pendingMessages.putIfAbsent(message.getAckId(), ackHandler) != null) {
365397
// putIfAbsent puts ackHandler if ackID isn't previously mapped, then return the
366398
// previously-mapped element.
367399
// If the previous element is not null, we already have the message and the new one is
@@ -371,14 +403,44 @@ void processReceivedMessages(List<ReceivedMessage> messages) {
371403
// we want to eventually
372404
// totally expire so that pubsub service sends us the message again.
373405
continue;
406+
} else {
407+
outstandingBatch.add(outstandingMessage);
374408
}
375-
outstandingBatch.add(new OutstandingMessage(message, ackHandler));
376409
pendingReceipts.add(ackRequestData);
377410
}
378-
379411
processBatch(outstandingBatch);
380412
}
381413

414+
synchronized void notifyAckSuccess(AckRequestData ackRequestData) {
415+
416+
if (outstandingReceipts.containsKey(ackRequestData.getAckId())) {
417+
outstandingReceipts.get(ackRequestData.getAckId()).notifyReceiptComplete();
418+
List<OutstandingMessage> outstandingBatch = new ArrayList<>();
419+
420+
for (Iterator<Entry<String, ReceiptCompleteData>> it =
421+
outstandingReceipts.entrySet().iterator();
422+
it.hasNext(); ) {
423+
Map.Entry<String, ReceiptCompleteData> receipt = it.next();
424+
// If receipt is complete then add to outstandingBatch to process the batch
425+
if (receipt.getValue().isReceiptComplete()) {
426+
it.remove();
427+
if (pendingMessages.putIfAbsent(
428+
receipt.getKey(), receipt.getValue().getOutstandingMessage().ackHandler)
429+
== null) {
430+
outstandingBatch.add(receipt.getValue().getOutstandingMessage());
431+
}
432+
} else {
433+
break;
434+
}
435+
}
436+
processBatch(outstandingBatch);
437+
}
438+
}
439+
440+
synchronized void notifyAckFailed(AckRequestData ackRequestData) {
441+
outstandingReceipts.remove(ackRequestData.getAckId());
442+
}
443+
382444
private void processBatch(List<OutstandingMessage> batch) {
383445
messagesWaiter.incrementPendingCount(batch.size());
384446
for (OutstandingMessage message : batch) {
@@ -519,6 +581,7 @@ void extendDeadlines() {
519581

520582
@InternalApi
521583
void processOutstandingOperations() {
584+
522585
List<ModackRequestData> modackRequestData = new ArrayList<ModackRequestData>();
523586

524587
// Nacks are modacks with an expiration of 0

google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/StreamingSubscriberConnection.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -527,6 +527,7 @@ public void onSuccess(Empty empty) {
527527
for (AckRequestData ackRequestData : ackRequestDataList) {
528528
// This will check if a response is needed, and if it has already been set
529529
ackRequestData.setResponse(AckResponse.SUCCESSFUL, setResponseOnSuccess);
530+
messageDispatcher.notifyAckSuccess(ackRequestData);
530531
// Remove from our pending operations
531532
pendingRequests.remove(ackRequestData);
532533
}
@@ -564,12 +565,15 @@ public void onFailure(Throwable t) {
564565
"Permanent error invalid ack id message, will not resend",
565566
errorMessage);
566567
ackRequestData.setResponse(AckResponse.INVALID, setResponseOnSuccess);
568+
messageDispatcher.notifyAckFailed(ackRequestData);
567569
} else {
568570
logger.log(Level.INFO, "Unknown error message, will not resend", errorMessage);
569571
ackRequestData.setResponse(AckResponse.OTHER, setResponseOnSuccess);
572+
messageDispatcher.notifyAckFailed(ackRequestData);
570573
}
571574
} else {
572575
ackRequestData.setResponse(AckResponse.SUCCESSFUL, setResponseOnSuccess);
576+
messageDispatcher.notifyAckSuccess(ackRequestData);
573577
}
574578
// Remove from our pending
575579
pendingRequests.remove(ackRequestData);
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
/*
2+
* Copyright 2017 Google LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.google.cloud.pubsub.v1;
18+
19+
import com.google.protobuf.ByteString;
20+
import com.google.pubsub.v1.PubsubMessage;
21+
import org.mockito.ArgumentMatcher;
22+
23+
public class MessageDataMatcher implements ArgumentMatcher<PubsubMessage> {
24+
25+
private ByteString expectedData;
26+
27+
public MessageDataMatcher(ByteString expectedData) {
28+
this.expectedData = expectedData;
29+
}
30+
31+
@Override
32+
public boolean matches(PubsubMessage message2) {
33+
return (expectedData.equals(message2.getData()));
34+
}
35+
}

google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/MessageDispatcherTest.java

Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -139,6 +139,100 @@ public void testReceiptMessageReceiver() {
139139
.receiveMessage(eq(TEST_MESSAGE.getMessage()), any(AckReplyConsumer.class));
140140
}
141141

142+
@Test
143+
public void testReceiptModackWithOrderingForExactlyOnceDelivered() {
144+
145+
MessageReceiverWithAckResponse mockMessageReceiverWithAckResponse =
146+
mock(MessageReceiverWithAckResponse.class);
147+
MessageDispatcher messageDispatcher = getMessageDispatcher(mockMessageReceiverWithAckResponse);
148+
messageDispatcher.setExactlyOnceDeliveryEnabled(true);
149+
150+
ReceivedMessage TEST_MESSAGE1 =
151+
ReceivedMessage.newBuilder()
152+
.setAckId("ACK_ID1")
153+
.setMessage(
154+
PubsubMessage.newBuilder()
155+
.setData(ByteString.copyFromUtf8("message-data1"))
156+
.build())
157+
.setDeliveryAttempt(DELIVERY_INFO_COUNT)
158+
.build();
159+
ReceivedMessage TEST_MESSAGE2 =
160+
ReceivedMessage.newBuilder()
161+
.setAckId("ACK_ID2")
162+
.setMessage(
163+
PubsubMessage.newBuilder()
164+
.setData(ByteString.copyFromUtf8("message-data2"))
165+
.build())
166+
.setDeliveryAttempt(DELIVERY_INFO_COUNT)
167+
.build();
168+
ReceivedMessage TEST_MESSAGE3 =
169+
ReceivedMessage.newBuilder()
170+
.setAckId("ACK_ID3")
171+
.setMessage(
172+
PubsubMessage.newBuilder()
173+
.setData(ByteString.copyFromUtf8("message-data3"))
174+
.build())
175+
.setDeliveryAttempt(DELIVERY_INFO_COUNT)
176+
.build();
177+
178+
messageDispatcher.processReceivedMessages(
179+
Arrays.asList(TEST_MESSAGE3, TEST_MESSAGE2, TEST_MESSAGE1));
180+
181+
messageDispatcher.processOutstandingOperations();
182+
verify(mockMessageReceiverWithAckResponse, never())
183+
.receiveMessage(eq(TEST_MESSAGE.getMessage()), any(AckReplyConsumerWithResponse.class));
184+
185+
AckRequestData ackRequestData1 = AckRequestData.newBuilder(TEST_MESSAGE1.getAckId()).build();
186+
AckRequestData ackRequestData2 = AckRequestData.newBuilder(TEST_MESSAGE2.getAckId()).build();
187+
AckRequestData ackRequestData3 = AckRequestData.newBuilder(TEST_MESSAGE3.getAckId()).build();
188+
messageDispatcher.notifyAckSuccess(ackRequestData2);
189+
messageDispatcher.processOutstandingOperations();
190+
191+
messageDispatcher.notifyAckSuccess(ackRequestData1);
192+
messageDispatcher.notifyAckSuccess(ackRequestData3);
193+
messageDispatcher.processOutstandingOperations();
194+
195+
verify(mockMessageReceiverWithAckResponse, times(1))
196+
.receiveMessage(
197+
argThat(new MessageDataMatcher(TEST_MESSAGE3.getMessage().getData())),
198+
any(AckReplyConsumerWithResponse.class));
199+
verify(mockMessageReceiverWithAckResponse, times(1))
200+
.receiveMessage(
201+
argThat(new MessageDataMatcher(TEST_MESSAGE2.getMessage().getData())),
202+
any(AckReplyConsumerWithResponse.class));
203+
verify(mockMessageReceiverWithAckResponse, times(1))
204+
.receiveMessage(
205+
argThat(new MessageDataMatcher(TEST_MESSAGE1.getMessage().getData())),
206+
any(AckReplyConsumerWithResponse.class));
207+
}
208+
209+
@Test
210+
public void testReceiptModackForExactlyOnceDelivered() {
211+
212+
MessageReceiverWithAckResponse mockMessageReceiverWithAckResponse =
213+
mock(MessageReceiverWithAckResponse.class);
214+
MessageDispatcher messageDispatcher = getMessageDispatcher(mockMessageReceiverWithAckResponse);
215+
messageDispatcher.setExactlyOnceDeliveryEnabled(true);
216+
217+
messageDispatcher.processReceivedMessages(Collections.singletonList(TEST_MESSAGE));
218+
219+
messageDispatcher.processOutstandingOperations();
220+
verify(mockMessageReceiverWithAckResponse, never())
221+
.receiveMessage(eq(TEST_MESSAGE.getMessage()), any(AckReplyConsumerWithResponse.class));
222+
223+
AckRequestData ackRequestData = AckRequestData.newBuilder(TEST_MESSAGE.getAckId()).build();
224+
messageDispatcher.notifyAckSuccess(ackRequestData);
225+
messageDispatcher.processOutstandingOperations();
226+
227+
List<ModackRequestData> modackRequestDataList = new ArrayList<ModackRequestData>();
228+
modackRequestDataList.add(new ModackRequestData(MIN_ACK_DEADLINE_SECONDS, ackRequestData));
229+
230+
verify(mockMessageReceiverWithAckResponse, times(1))
231+
.receiveMessage(
232+
argThat(new MessageDataMatcher(TEST_MESSAGE.getMessage().getData())),
233+
any(AckReplyConsumerWithResponse.class));
234+
}
235+
142236
@Test
143237
public void testReceiptMessageReceiverWithAckResponse() {
144238
MessageReceiverWithAckResponse mockMessageReceiverWithAckResponse =

0 commit comments

Comments
 (0)