23
23
import java .util .Optional ;
24
24
import java .util .UUID ;
25
25
import java .util .concurrent .CompletionException ;
26
- import java .util .concurrent .ExecutorService ;
27
26
import java .util .concurrent .Executors ;
28
27
import java .util .concurrent .ScheduledExecutorService ;
29
28
import java .util .concurrent .TimeUnit ;
37
36
import org .junit .Test ;
38
37
import org .reactivestreams .Subscriber ;
39
38
import org .reactivestreams .Subscription ;
40
- import software .amazon .awssdk .auth .credentials .ProfileCredentialsProvider ;
41
39
import software .amazon .awssdk .core .SdkBytes ;
42
40
import software .amazon .awssdk .core .async .SdkPublisher ;
43
- import software .amazon .awssdk .http .async .SdkAsyncHttpClient ;
44
- import software .amazon .awssdk .http .nio .netty .NettyNioAsyncHttpClient ;
45
41
import software .amazon .awssdk .regions .Region ;
46
42
import software .amazon .awssdk .services .kinesis .model .ConsumerStatus ;
47
43
import software .amazon .awssdk .services .kinesis .model .PutRecordRequest ;
48
- import software .amazon .awssdk .services .kinesis .model .PutRecordResponse ;
49
44
import software .amazon .awssdk .services .kinesis .model .Record ;
50
45
import software .amazon .awssdk .services .kinesis .model .ShardIteratorType ;
51
46
import software .amazon .awssdk .services .kinesis .model .StreamStatus ;
56
51
57
52
public class SubscribeToShardIntegrationTest {
58
53
59
- private static final String STREAM_NAME = "subscribe-to-shard-integ-test-" + System . currentTimeMillis () ;
54
+ private String streamName ;
60
55
private static final String CONSUMER_NAME = "subscribe-to-shard-consumer" ;
61
56
private KinesisAsyncClient client ;
62
57
private String consumerArn ;
63
58
private String shardId ;
64
59
65
60
@ Before
66
61
public void setup () throws InterruptedException {
62
+ streamName = "subscribe-to-shard-integ-test-" + System .currentTimeMillis ();
67
63
client = KinesisAsyncClient .builder ()
68
64
.region (Region .EU_CENTRAL_1 )
69
65
.build ();
70
- client .createStream (r -> r .streamName (STREAM_NAME )
66
+ client .createStream (r -> r .streamName (streamName )
71
67
.shardCount (1 )).join ();
72
68
waitForStreamToBeActive ();
73
- String streamARN = client .describeStream (r -> r .streamName (STREAM_NAME )).join ()
69
+ String streamARN = client .describeStream (r -> r .streamName (streamName )).join ()
74
70
.streamDescription ()
75
71
.streamARN ();
76
- this .shardId = client .listShards (r -> r .streamName (STREAM_NAME ))
72
+ this .shardId = client .listShards (r -> r .streamName (streamName ))
77
73
.join ()
78
74
.shards ().get (0 ).shardId ();
79
75
this .consumerArn = client .registerStreamConsumer (r -> r .streamARN (streamARN )
@@ -85,7 +81,7 @@ public void setup() throws InterruptedException {
85
81
86
82
@ After
87
83
public void tearDown () {
88
- client .deleteStream (r -> r .streamName (STREAM_NAME )
84
+ client .deleteStream (r -> r .streamName (streamName )
89
85
.enforceConsumerDeletion (true )).join ();
90
86
}
91
87
@@ -180,7 +176,7 @@ private void waitForConsumerToBeActive() throws InterruptedException {
180
176
}
181
177
182
178
private void waitForStreamToBeActive () throws InterruptedException {
183
- waitUntilTrue (() -> StreamStatus .ACTIVE == client .describeStream (r -> r .streamName (STREAM_NAME ))
179
+ waitUntilTrue (() -> StreamStatus .ACTIVE == client .describeStream (r -> r .streamName (streamName ))
184
180
.join ()
185
181
.streamDescription ()
186
182
.streamStatus ());
@@ -209,7 +205,7 @@ private Optional<SdkBytes> putRecord() {
209
205
try {
210
206
SdkBytes data = SdkBytes .fromByteArray (RandomUtils .nextBytes (50 ));
211
207
client .putRecord (PutRecordRequest .builder ()
212
- .streamName (STREAM_NAME )
208
+ .streamName (streamName )
213
209
.data (data )
214
210
.partitionKey (UUID .randomUUID ().toString ())
215
211
.build ())
0 commit comments