|
58 | 58 | import java.util.function.Supplier;
|
59 | 59 | import java.util.regex.Pattern;
|
60 | 60 | import java.util.stream.Collectors;
|
| 61 | +import java.util.stream.Stream; |
61 | 62 |
|
62 | 63 | import org.aopalliance.intercept.MethodInterceptor;
|
63 | 64 | import org.apache.commons.logging.LogFactory;
|
|
83 | 84 | import org.assertj.core.api.InstanceOfAssertFactories;
|
84 | 85 | import org.junit.jupiter.api.BeforeAll;
|
85 | 86 | import org.junit.jupiter.api.Test;
|
| 87 | +import org.junit.jupiter.params.ParameterizedTest; |
| 88 | +import org.junit.jupiter.params.provider.Arguments; |
| 89 | +import org.junit.jupiter.params.provider.EnumSource; |
| 90 | +import org.junit.jupiter.params.provider.MethodSource; |
86 | 91 | import org.mockito.ArgumentCaptor;
|
87 | 92 | import org.mockito.InOrder;
|
88 | 93 |
|
@@ -657,18 +662,10 @@ public void onMessage(ConsumerRecord<Integer, String> data) {
|
657 | 662 | assertThat(container.isRunning()).isFalse();
|
658 | 663 | }
|
659 | 664 |
|
660 |
| - @Test |
661 |
| - void testInOrderAckManual() throws Exception { |
662 |
| - testInOrderAck(AckMode.MANUAL); |
663 |
| - } |
664 |
| - |
665 |
| - @Test |
666 |
| - void testInOrderAckManualImm() throws Exception { |
667 |
| - testInOrderAck(AckMode.MANUAL_IMMEDIATE); |
668 |
| - } |
669 |
| - |
| 665 | + @ParameterizedTest(name = "{index} AckMode.{0}") |
| 666 | + @EnumSource(value = AckMode.class, names = { "MANUAL", "MANUAL_IMMEDIATE" }) |
670 | 667 | @SuppressWarnings("unchecked")
|
671 |
| - private void testInOrderAck(AckMode ackMode) throws Exception { |
| 668 | + void testInOrderAck(AckMode ackMode) throws Exception { |
672 | 669 | ConsumerFactory<Integer, String> cf = mock(ConsumerFactory.class);
|
673 | 670 | Consumer<Integer, String> consumer = mock(Consumer.class);
|
674 | 671 | given(cf.createConsumer(eq("grp"), eq("clientId"), isNull(), any())).willReturn(consumer);
|
@@ -723,28 +720,18 @@ private void testInOrderAck(AckMode ackMode) throws Exception {
|
723 | 720 | container.stop();
|
724 | 721 | }
|
725 | 722 |
|
726 |
| - @Test |
727 |
| - void testInOrderAckPauseUntilAckedManual() throws Exception { |
728 |
| - testInOrderAckPauseUntilAcked(AckMode.MANUAL, false); |
729 |
| - } |
730 |
| - |
731 |
| - @Test |
732 |
| - void testInOrderAckPauseUntilAckedManualImm() throws Exception { |
733 |
| - testInOrderAckPauseUntilAcked(AckMode.MANUAL_IMMEDIATE, false); |
734 |
| - } |
735 |
| - |
736 |
| - @Test |
737 |
| - void testInOrderAckPauseUntilAckedManualBatch() throws Exception { |
738 |
| - testInOrderAckPauseUntilAcked(AckMode.MANUAL, true); |
739 |
| - } |
740 |
| - |
741 |
| - @Test |
742 |
| - void testInOrderAckPauseUntilAckedManualImmBatch() throws Exception { |
743 |
| - testInOrderAckPauseUntilAcked(AckMode.MANUAL_IMMEDIATE, true); |
| 723 | + private static Stream<Arguments> testInOrderAckPauseUntilAckedParamters() { |
| 724 | + return Stream.of( |
| 725 | + Arguments.of(AckMode.MANUAL, false), |
| 726 | + Arguments.of(AckMode.MANUAL, true), |
| 727 | + Arguments.of(AckMode.MANUAL_IMMEDIATE, false), |
| 728 | + Arguments.of(AckMode.MANUAL_IMMEDIATE, true)); |
744 | 729 | }
|
745 | 730 |
|
| 731 | + @ParameterizedTest(name = "{index} AckMode.{0} batch:{1}") |
| 732 | + @MethodSource("testInOrderAckPauseUntilAckedParamters") |
746 | 733 | @SuppressWarnings("unchecked")
|
747 |
| - private void testInOrderAckPauseUntilAcked(AckMode ackMode, boolean batch) throws Exception { |
| 734 | + void testInOrderAckPauseUntilAcked(AckMode ackMode, boolean batch) throws Exception { |
748 | 735 | ConsumerFactory<Integer, String> cf = mock(ConsumerFactory.class);
|
749 | 736 | Consumer<Integer, String> consumer = mock(Consumer.class);
|
750 | 737 | given(cf.createConsumer(eq("grp"), eq("clientId"), isNull(), any())).willReturn(consumer);
|
@@ -998,18 +985,10 @@ public void onMessage(ConsumerRecord<Integer, String> data) {
|
998 | 985 | verify(consumer, never()).wakeup();
|
999 | 986 | }
|
1000 | 987 |
|
1001 |
| - @Test |
1002 |
| - public void testRecordAckMockForeignThread() throws Exception { |
1003 |
| - testRecordAckMockForeignThreadGuts(AckMode.MANUAL); |
1004 |
| - } |
1005 |
| - |
1006 |
| - @Test |
1007 |
| - public void testRecordAckMockForeignThreadImmediate() throws Exception { |
1008 |
| - testRecordAckMockForeignThreadGuts(AckMode.MANUAL_IMMEDIATE); |
1009 |
| - } |
1010 |
| - |
| 988 | + @ParameterizedTest(name = "{index} AckMode.{0}") |
| 989 | + @EnumSource(value = AckMode.class, names = { "MANUAL", "MANUAL_IMMEDIATE" }) |
1011 | 990 | @SuppressWarnings("unchecked")
|
1012 |
| - private void testRecordAckMockForeignThreadGuts(AckMode ackMode) throws Exception { |
| 991 | + void testRecordAckMockForeignThread(AckMode ackMode) throws Exception { |
1013 | 992 | ConsumerFactory<Integer, String> cf = mock(ConsumerFactory.class);
|
1014 | 993 | Consumer<Integer, String> consumer = mock(Consumer.class);
|
1015 | 994 | given(cf.createConsumer(eq("grp"), eq("clientId"), isNull(), any())).willReturn(consumer);
|
@@ -1499,25 +1478,6 @@ public void onMessage(List<ConsumerRecord<Integer, String>> data) {
|
1499 | 1478 | container.stop();
|
1500 | 1479 | }
|
1501 | 1480 |
|
1502 |
| - @Test |
1503 |
| - public void testSeek() throws Exception { |
1504 |
| - Map<String, Object> props = KafkaTestUtils.consumerProps("test11", "false", embeddedKafka); |
1505 |
| - testSeekGuts(props, topic11, false); |
1506 |
| - } |
1507 |
| - |
1508 |
| - @Test |
1509 |
| - public void testSeekAutoCommit() throws Exception { |
1510 |
| - Map<String, Object> props = KafkaTestUtils.consumerProps("test12", "true", embeddedKafka); |
1511 |
| - testSeekGuts(props, topic12, true); |
1512 |
| - } |
1513 |
| - |
1514 |
| - @Test |
1515 |
| - public void testSeekAutoCommitDefault() throws Exception { |
1516 |
| - Map<String, Object> props = KafkaTestUtils.consumerProps("test15", "true", embeddedKafka); |
1517 |
| - props.remove(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG); // test false by default |
1518 |
| - testSeekGuts(props, topic15, false); |
1519 |
| - } |
1520 |
| - |
1521 | 1481 | @Test
|
1522 | 1482 | public void testSeekBatch() throws Exception {
|
1523 | 1483 | logger.info("Start seek batch seek");
|
@@ -1573,7 +1533,18 @@ public void onIdleContainer(Map<TopicPartition, Long> assignments, ConsumerSeekC
|
1573 | 1533 | container.stop();
|
1574 | 1534 | }
|
1575 | 1535 |
|
1576 |
| - private void testSeekGuts(Map<String, Object> props, String topic, boolean autoCommit) throws Exception { |
| 1536 | + private static Stream<Arguments> testSeekParameters() { |
| 1537 | + Map<String, Object> noAutoCommit = KafkaTestUtils.consumerProps("test15", "true", embeddedKafka); |
| 1538 | + noAutoCommit.remove(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG); // test false by default |
| 1539 | + return Stream.of( |
| 1540 | + Arguments.of(KafkaTestUtils.consumerProps("test11", "false", embeddedKafka), topic11, false), |
| 1541 | + Arguments.of(KafkaTestUtils.consumerProps("test12", "true", embeddedKafka), topic12, true), |
| 1542 | + Arguments.of(noAutoCommit, topic15, false)); |
| 1543 | + } |
| 1544 | + |
| 1545 | + @ParameterizedTest(name = "topic:{1} autocommit:{2}") |
| 1546 | + @MethodSource("testSeekParameters") |
| 1547 | + void testSeek(Map<String, Object> props, String topic, boolean autoCommit) throws Exception { |
1577 | 1548 | logger.info("Start seek " + topic);
|
1578 | 1549 | DefaultKafkaConsumerFactory<Integer, String> cf = new DefaultKafkaConsumerFactory<>(props);
|
1579 | 1550 | ContainerProperties containerProps = new ContainerProperties(topic);
|
|
0 commit comments