15
15
16
16
import static com .rabbitmq .stream .impl .TestUtils .b ;
17
17
import static com .rabbitmq .stream .impl .TestUtils .declareSuperStreamTopology ;
18
+ import static com .rabbitmq .stream .impl .TestUtils .deleteSuperStreamTopology ;
18
19
import static com .rabbitmq .stream .impl .TestUtils .latchAssert ;
20
+ import static com .rabbitmq .stream .impl .TestUtils .localhost ;
19
21
import static org .assertj .core .api .Assertions .assertThat ;
20
22
21
23
import com .rabbitmq .client .Connection ;
24
+ import com .rabbitmq .client .ConnectionFactory ;
22
25
import com .rabbitmq .stream .Consumer ;
23
26
import com .rabbitmq .stream .Environment ;
27
+ import com .rabbitmq .stream .EnvironmentBuilder ;
24
28
import com .rabbitmq .stream .OffsetSpecification ;
25
29
import com .rabbitmq .stream .impl .Client .ClientParameters ;
26
30
import io .netty .channel .EventLoopGroup ;
31
35
import java .util .concurrent .ConcurrentMap ;
32
36
import java .util .concurrent .CountDownLatch ;
33
37
import java .util .concurrent .atomic .AtomicInteger ;
38
+ import org .junit .jupiter .api .AfterEach ;
39
+ import org .junit .jupiter .api .BeforeEach ;
34
40
import org .junit .jupiter .api .Test ;
41
+ import org .junit .jupiter .api .TestInfo ;
35
42
import org .junit .jupiter .api .extension .ExtendWith ;
36
43
37
44
@ ExtendWith (TestUtils .StreamTestInfrastructureExtension .class )
@@ -47,6 +54,26 @@ public class SuperStreamConsumerTest {
47
54
String [] routingKeys = null ;
48
55
TestUtils .ClientFactory cf ;
49
56
57
+ @ BeforeEach
58
+ void init (TestInfo info ) throws Exception {
59
+ EnvironmentBuilder environmentBuilder = Environment .builder ().eventLoopGroup (eventLoopGroup );
60
+ environmentBuilder .addressResolver (add -> localhost ());
61
+ environment = environmentBuilder .build ();
62
+ superStream = TestUtils .streamName (info );
63
+ connection = new ConnectionFactory ().newConnection ();
64
+ }
65
+
66
+ @ AfterEach
67
+ void tearDown () throws Exception {
68
+ environment .close ();
69
+ if (routingKeys == null ) {
70
+ deleteSuperStreamTopology (connection , superStream , partitionCount );
71
+ } else {
72
+ deleteSuperStreamTopology (connection , superStream , routingKeys );
73
+ }
74
+ connection .close ();
75
+ }
76
+
50
77
private static void publishToPartitions (
51
78
TestUtils .ClientFactory cf , List <String > partitions , int messageCount ) {
52
79
CountDownLatch publishLatch = new CountDownLatch (messageCount );
0 commit comments