|
22 | 22 | import static java.lang.String.format;
|
23 | 23 | import static org.assertj.core.api.Assertions.assertThat;
|
24 | 24 | import static org.assertj.core.api.Assertions.assertThatThrownBy;
|
25 |
| -import static org.mockito.ArgumentMatchers.any; |
26 |
| -import static org.mockito.ArgumentMatchers.anyByte; |
27 |
| -import static org.mockito.ArgumentMatchers.anyInt; |
28 |
| -import static org.mockito.ArgumentMatchers.anyMap; |
29 |
| -import static org.mockito.ArgumentMatchers.anyString; |
30 |
| -import static org.mockito.ArgumentMatchers.isNull; |
| 25 | +import static org.mockito.ArgumentMatchers.*; |
31 | 26 | import static org.mockito.Mockito.mock;
|
32 | 27 | import static org.mockito.Mockito.times;
|
33 | 28 | import static org.mockito.Mockito.verify;
|
|
54 | 49 | import java.util.concurrent.ScheduledExecutorService;
|
55 | 50 | import java.util.concurrent.atomic.AtomicInteger;
|
56 | 51 | import java.util.function.Consumer;
|
| 52 | +import java.util.function.Supplier; |
57 | 53 | import java.util.stream.Collectors;
|
58 | 54 | import java.util.stream.IntStream;
|
59 | 55 | import java.util.stream.Stream;
|
|
62 | 58 | import org.junit.jupiter.api.Test;
|
63 | 59 | import org.junit.jupiter.params.ParameterizedTest;
|
64 | 60 | import org.junit.jupiter.params.provider.MethodSource;
|
65 |
| -import org.mockito.ArgumentCaptor; |
66 |
| -import org.mockito.Captor; |
67 |
| -import org.mockito.Mock; |
68 |
| -import org.mockito.MockitoAnnotations; |
| 61 | +import org.mockito.*; |
69 | 62 |
|
70 | 63 | public class ConsumersCoordinatorTest {
|
71 | 64 |
|
@@ -1657,6 +1650,122 @@ void subscribeUnsubscribeInDifferentThreadsShouldNotDeadlock() {
|
1657 | 1650 | }
|
1658 | 1651 | }
|
1659 | 1652 |
|
| 1653 | + @ParameterizedTest |
| 1654 | + @MethodSource("disruptionArguments") |
| 1655 | + @SuppressWarnings("unchecked") |
| 1656 | + void shouldCallConsumerFlowControlHandlers(Consumer<ConsumersCoordinatorTest> configurator) |
| 1657 | + throws Exception { |
| 1658 | + |
| 1659 | + scheduledExecutorService = createScheduledExecutorService(); |
| 1660 | + when(environment.scheduledExecutorService()).thenReturn(scheduledExecutorService); |
| 1661 | + Duration retryDelay = Duration.ofMillis(100); |
| 1662 | + when(environment.recoveryBackOffDelayPolicy()).thenReturn(BackOffDelayPolicy.fixed(retryDelay)); |
| 1663 | + when(environment.topologyUpdateBackOffDelayPolicy()) |
| 1664 | + .thenReturn(BackOffDelayPolicy.fixed(retryDelay)); |
| 1665 | + when(consumer.isOpen()).thenReturn(true); |
| 1666 | + when(locator.metadata("stream")) |
| 1667 | + .thenReturn(metadata(null, replicas())) |
| 1668 | + .thenReturn(metadata(null, Collections.emptyList())) |
| 1669 | + .thenReturn(metadata(null, replicas())); |
| 1670 | + |
| 1671 | + when(clientFactory.client(any())).thenReturn(client); |
| 1672 | + |
| 1673 | + String consumerName = "consumer-name"; |
| 1674 | + long lastStoredOffset = 5; |
| 1675 | + long lastReceivedOffset = 10; |
| 1676 | + when(client.queryOffset(consumerName, "stream")) |
| 1677 | + .thenReturn(new QueryOffsetResponse(Constants.RESPONSE_CODE_OK, 0L)) |
| 1678 | + .thenReturn(new QueryOffsetResponse(Constants.RESPONSE_CODE_OK, lastStoredOffset)); |
| 1679 | + |
| 1680 | + ArgumentCaptor<OffsetSpecification> offsetSpecificationArgumentCaptor = |
| 1681 | + ArgumentCaptor.forClass(OffsetSpecification.class); |
| 1682 | + ArgumentCaptor<Map<String, String>> subscriptionPropertiesArgumentCaptor = |
| 1683 | + ArgumentCaptor.forClass(Map.class); |
| 1684 | + when(client.subscribe( |
| 1685 | + subscriptionIdCaptor.capture(), |
| 1686 | + anyString(), |
| 1687 | + offsetSpecificationArgumentCaptor.capture(), |
| 1688 | + anyInt(), |
| 1689 | + subscriptionPropertiesArgumentCaptor.capture())) |
| 1690 | + .thenReturn(new Client.Response(Constants.RESPONSE_CODE_OK)); |
| 1691 | + |
| 1692 | + ConsumerFlowControlStrategy mockedConsumerFlowControlStrategy = Mockito.mock(ConsumerFlowControlStrategy.class); |
| 1693 | + |
| 1694 | + int numberOfInitialCreditsOnSubscribe = 7; |
| 1695 | + |
| 1696 | + when(mockedConsumerFlowControlStrategy.handleSubscribe(anyByte(), anyString(), any(), anyMap())).thenReturn(numberOfInitialCreditsOnSubscribe); |
| 1697 | + |
| 1698 | + ConsumerFlowControlStrategyBuilder<ConsumerFlowControlStrategy> mockedConsumerFlowControlStrategyBuilder = Mockito.mock(ConsumerFlowControlStrategyBuilder.class); |
| 1699 | + when(mockedConsumerFlowControlStrategyBuilder.build(any())).thenReturn(mockedConsumerFlowControlStrategy); |
| 1700 | + |
| 1701 | + Runnable closingRunnable = |
| 1702 | + coordinator.subscribe( |
| 1703 | + consumer, |
| 1704 | + "stream", |
| 1705 | + null, |
| 1706 | + consumerName, |
| 1707 | + NO_OP_SUBSCRIPTION_LISTENER, |
| 1708 | + NO_OP_TRACKING_CLOSING_CALLBACK, |
| 1709 | + (offset, message) -> {}, |
| 1710 | + mockedConsumerFlowControlStrategyBuilder, |
| 1711 | + Collections.emptyMap(), |
| 1712 | + initialCredits); |
| 1713 | + verify(clientFactory, times(1)).client(any()); |
| 1714 | + verify(client, times(1)) |
| 1715 | + .subscribe(anyByte(), anyString(), any(OffsetSpecification.class), eq(numberOfInitialCreditsOnSubscribe), anyMap()); |
| 1716 | + verify(mockedConsumerFlowControlStrategy, times(1)) |
| 1717 | + .handleSubscribe(anyByte(), anyString(), any(OffsetSpecification.class), anyMap()); |
| 1718 | + assertThat(offsetSpecificationArgumentCaptor.getAllValues()) |
| 1719 | + .element(0) |
| 1720 | + .isEqualTo(OffsetSpecification.next()); |
| 1721 | + assertThat(subscriptionPropertiesArgumentCaptor.getAllValues()) |
| 1722 | + .element(0) |
| 1723 | + .isEqualTo(Collections.singletonMap("name", "consumer-name")); |
| 1724 | + |
| 1725 | + Message message = new WrapperMessageBuilder().build(); |
| 1726 | + |
| 1727 | + messageListener.handle( |
| 1728 | + subscriptionIdCaptor.getValue(), |
| 1729 | + lastReceivedOffset, |
| 1730 | + 0, |
| 1731 | + 0, |
| 1732 | + message); |
| 1733 | + |
| 1734 | + verify(mockedConsumerFlowControlStrategy).handleMessage( |
| 1735 | + subscriptionIdCaptor.getValue(), |
| 1736 | + lastReceivedOffset, |
| 1737 | + 0, |
| 1738 | + 0, |
| 1739 | + message |
| 1740 | + ); |
| 1741 | + |
| 1742 | + configurator.accept(this); |
| 1743 | + |
| 1744 | + Thread.sleep(retryDelay.toMillis() * 5); |
| 1745 | + |
| 1746 | + verify(client, times(2)) |
| 1747 | + .subscribe(anyByte(), anyString(), any(OffsetSpecification.class), anyInt(), anyMap()); |
| 1748 | + |
| 1749 | + verify(mockedConsumerFlowControlStrategy, times(2)) |
| 1750 | + .handleSubscribe(anyByte(), anyString(), any(OffsetSpecification.class), anyMap()); |
| 1751 | + |
| 1752 | + assertThat(offsetSpecificationArgumentCaptor.getAllValues()) |
| 1753 | + .element(1) |
| 1754 | + .isEqualTo(OffsetSpecification.offset(lastStoredOffset + 1)) |
| 1755 | + .isNotEqualTo(OffsetSpecification.offset(lastReceivedOffset)); |
| 1756 | + assertThat(subscriptionPropertiesArgumentCaptor.getAllValues()) |
| 1757 | + .element(1) |
| 1758 | + .isEqualTo(Collections.singletonMap("name", "consumer-name")); |
| 1759 | + when(client.unsubscribe(subscriptionIdCaptor.getValue())) |
| 1760 | + .thenReturn(new Client.Response(Constants.RESPONSE_CODE_OK)); |
| 1761 | + |
| 1762 | + closingRunnable.run(); |
| 1763 | + |
| 1764 | + verify(client, times(1)).unsubscribe(subscriptionIdCaptor.getValue()); |
| 1765 | + verify(mockedConsumerFlowControlStrategy, times(1)) |
| 1766 | + .handleUnsubscribe(subscriptionIdCaptor.getValue()); |
| 1767 | + } |
| 1768 | + |
1660 | 1769 | Client.Broker leader() {
|
1661 | 1770 | return new Client.Broker("leader", -1);
|
1662 | 1771 | }
|
|
0 commit comments