16
16
package software .amazon .awssdk .services .kinesis ;
17
17
18
18
import static org .junit .Assert .assertThat ;
19
+ import static org .junit .Assert .assertTrue ;
19
20
20
21
import java .math .BigInteger ;
21
22
import java .time .Duration ;
22
23
import java .time .Instant ;
23
24
import java .util .List ;
24
25
import org .hamcrest .Matchers ;
25
26
import org .junit .Assert ;
26
- import org .junit .Ignore ;
27
27
import org .junit .Test ;
28
28
import software .amazon .awssdk .core .SdkBytes ;
29
29
import software .amazon .awssdk .core .exception .SdkServiceException ;
37
37
import software .amazon .awssdk .services .kinesis .model .GetShardIteratorResponse ;
38
38
import software .amazon .awssdk .services .kinesis .model .HashKeyRange ;
39
39
import software .amazon .awssdk .services .kinesis .model .InvalidArgumentException ;
40
- import software .amazon .awssdk .services .kinesis .model .ListStreamsRequest ;
41
- import software .amazon .awssdk .services .kinesis .model .ListStreamsResponse ;
42
- import software .amazon .awssdk .services .kinesis .model .MergeShardsRequest ;
43
40
import software .amazon .awssdk .services .kinesis .model .PutRecordRequest ;
44
41
import software .amazon .awssdk .services .kinesis .model .PutRecordResponse ;
45
42
import software .amazon .awssdk .services .kinesis .model .Record ;
46
43
import software .amazon .awssdk .services .kinesis .model .ResourceNotFoundException ;
47
44
import software .amazon .awssdk .services .kinesis .model .SequenceNumberRange ;
48
45
import software .amazon .awssdk .services .kinesis .model .Shard ;
49
46
import software .amazon .awssdk .services .kinesis .model .ShardIteratorType ;
50
- import software .amazon .awssdk .services .kinesis .model .SplitShardRequest ;
51
47
import software .amazon .awssdk .services .kinesis .model .StreamDescription ;
52
48
import software .amazon .awssdk .services .kinesis .model .StreamStatus ;
53
49
@@ -105,45 +101,29 @@ public void testGetFromBogusIterator() {
105
101
} catch (InvalidArgumentException exception ) {
106
102
// Ignored or expected.
107
103
}
108
-
109
104
}
110
105
111
106
@ Test
112
- @ Ignore
113
- public void testKinesisOperations () throws Exception {
107
+ public void testCreatePutGetDelete () throws Exception {
114
108
String streamName = "java-test-stream-" + System .currentTimeMillis ();
115
109
boolean created = false ;
116
110
117
111
try {
118
112
119
113
// Create a stream with one shard.
120
- System .out .println ("Creating Stream..." );
121
114
client .createStream (CreateStreamRequest .builder ().streamName (streamName ).shardCount (1 ).build ());
122
- System .out .println (" OK" );
123
115
created = true ;
124
116
125
- // Verify that it shows up in a list call.
126
- findStreamInList (streamName );
127
-
128
117
// Wait for it to become ACTIVE.
129
- System .out .println ("Waiting for stream to become active..." );
130
118
List <Shard > shards = waitForStream (streamName );
131
- System .out .println (" OK" );
132
119
133
120
Assert .assertEquals (1 , shards .size ());
134
121
Shard shard = shards .get (0 );
135
122
136
- // Just to be really sure in case of eventual consistency...
137
- Thread .sleep (5000 );
138
-
139
- testPuts (streamName , shard );
123
+ putRecord (streamName , "See No Evil" );
124
+ putRecord (streamName , "Hear No Evil" );
140
125
141
- // Wait a bit to make sure the records propagate.
142
- Thread .sleep (5000 );
143
-
144
- System .out .println ("Reading..." );
145
126
testGets (streamName , shard );
146
- System .out .println (" OK" );
147
127
148
128
} finally {
149
129
if (created ) {
@@ -152,7 +132,8 @@ public void testKinesisOperations() throws Exception {
152
132
}
153
133
}
154
134
155
- private void testGets (final String streamName , final Shard shard ) {
135
+ private void testGets (final String streamName , final Shard shard ) throws InterruptedException {
136
+ // Wait for the shard to be in an active state
156
137
// Get an iterator for the first shard.
157
138
GetShardIteratorResponse iteratorResult = client .getShardIterator (
158
139
GetShardIteratorRequest .builder ()
@@ -166,6 +147,19 @@ private void testGets(final String streamName, final Shard shard) {
166
147
String iterator = iteratorResult .shardIterator ();
167
148
Assert .assertNotNull (iterator );
168
149
150
+ GetRecordsResponse result = getOneRecord (iterator );
151
+ validateRecord (result .records ().get (0 ), "See No Evil" );
152
+
153
+ result = getOneRecord (result .nextShardIterator ());
154
+ validateRecord (result .records ().get (0 ), "Hear No Evil" );
155
+
156
+ result = client .getRecords (GetRecordsRequest .builder ()
157
+ .shardIterator (result .nextShardIterator ())
158
+ .build ());
159
+ assertTrue (result .records ().isEmpty ());
160
+ }
161
+
162
+ private GetRecordsResponse getOneRecord (String iterator ) {
169
163
int tries = 0 ;
170
164
GetRecordsResponse result ;
171
165
List <Record > records ;
@@ -203,31 +197,7 @@ private void testGets(final String streamName, final Shard shard) {
203
197
204
198
iterator = result .nextShardIterator ();
205
199
}
206
-
207
- System .out .println (" [Succeeded after " + tries + " tries]" );
208
- Assert .assertEquals (1 , records .size ());
209
- validateRecord (records .get (0 ), "See No Evil" );
210
-
211
- // Read the second record from the first shard.
212
- result = client .getRecords (GetRecordsRequest .builder ()
213
- .shardIterator (result .nextShardIterator ())
214
- .build ());
215
- Assert .assertNotNull (result );
216
- Assert .assertNotNull (result .records ());
217
- Assert .assertNotNull (result .nextShardIterator ());
218
-
219
- records = result .records ();
220
- Assert .assertEquals (1 , records .size ());
221
- validateRecord (records .get (0 ), "See No Evil" );
222
-
223
- // Try to read some more, get EOF.
224
- result = client .getRecords (GetRecordsRequest .builder ()
225
- .shardIterator (result .nextShardIterator ())
226
- .build ());
227
- Assert .assertNotNull (result );
228
- Assert .assertNotNull (result .records ());
229
- Assert .assertTrue (result .records ().isEmpty ());
230
- Assert .assertNull (result .nextShardIterator ());
200
+ return result ;
231
201
}
232
202
233
203
private void validateRecord (final Record record , String data ) {
@@ -245,134 +215,8 @@ private void validateRecord(final Record record, String data) {
245
215
Assert .assertTrue (Duration .between (record .approximateArrivalTimestamp (), Instant .now ()).toMinutes () < 5 );
246
216
}
247
217
248
- private void testPuts (final String streamName , final Shard shard )
249
- throws InterruptedException {
250
-
251
- // Put a record into the shard.
252
- System .out .println ("Putting two records..." );
253
- PutRecordResponse r1 = putRecord (streamName , "See No Evil" );
254
- Assert .assertEquals (shard .shardId (), r1 .shardId ());
255
-
256
- // Check that it's sequence number is sane.
257
- BigInteger startingSQN = new BigInteger (
258
- shard .sequenceNumberRange ().startingSequenceNumber ()
259
- );
260
- BigInteger sqn1 = new BigInteger (r1 .sequenceNumber ());
261
- Assert .assertTrue (sqn1 .compareTo (startingSQN ) >= 0 );
262
-
263
- // Put another record, which should show up later in the same shard.
264
- PutRecordResponse r2 = putRecord (streamName , "See No Evil" );
265
- Assert .assertEquals (shard .shardId (), r2 .shardId ());
266
- BigInteger sqn2 = new BigInteger (r2 .sequenceNumber ());
267
- System .out .println (" OK" );
268
-
269
- // Not guaranteed an order unless we explicitly ask for one, but
270
- // it has to at least be larger than the starting sqn.
271
- Assert .assertTrue (sqn2 .compareTo (startingSQN ) >= 0 );
272
-
273
- // Split the shard in two: [0-1000) and [1000-*]
274
- System .out .println ("Splitting the shard..." );
275
- List <Shard > shards = splitShard (streamName , shard , 1000 );
276
- System .out .println (" OK" );
277
-
278
- // Sleep a bit for eventual consistency.
279
- Thread .sleep (5000 );
280
-
281
-
282
- // Put records into the two new shards, one after another.
283
- System .out .println ("Putting some more..." );
284
- PutRecordResponse r3 = putRecordExplicit (streamName , "999" );
285
- PutRecordResponse r4 = putRecordExplicit (streamName ,
286
- "1000" ,
287
- r3 .sequenceNumber ());
288
-
289
- BigInteger sqn3 = new BigInteger (r3 .sequenceNumber ());
290
- BigInteger sqn4 = new BigInteger (r4 .sequenceNumber ());
291
- Assert .assertTrue (sqn4 .compareTo (sqn3 ) >= 0 );
292
- System .out .println (" OK" );
293
-
294
- // Merge the two shards back together.
295
- System .out .println ("Merging the shards back together..." );
296
- mergeShards (streamName ,
297
- shards .get (1 ).shardId (),
298
- shards .get (2 ).shardId ());
299
- System .out .println (" OK" );
300
- }
301
-
302
-
303
- private List <Shard > splitShard (final String streamName ,
304
- final Shard shard ,
305
- final long splitHashKey )
306
- throws InterruptedException {
307
-
308
- client .splitShard (SplitShardRequest .builder ()
309
- .streamName (streamName )
310
- .shardToSplit (shard .shardId ())
311
- .newStartingHashKey (Long .toString (splitHashKey ))
312
- .build ());
313
-
314
- List <Shard > shards = waitForStream (streamName );
315
-
316
- Assert .assertEquals (3 , shards .size ());
317
-
318
- Shard old = shards .get (0 );
319
- Assert .assertEquals (shard .shardId (), old .shardId ());
320
- Assert .assertNotNull (
321
- old .sequenceNumberRange ().endingSequenceNumber ()
322
- );
323
-
324
- Shard new1 = shards .get (1 );
325
- Assert .assertEquals (shard .shardId (), new1 .parentShardId ());
326
- validateHashKeyRange (new1 .hashKeyRange (), 0L , splitHashKey - 1 );
327
-
328
- Shard new2 = shards .get (2 );
329
- Assert .assertEquals (shard .shardId (), new2 .parentShardId ());
330
- validateHashKeyRange (new2 .hashKeyRange (), splitHashKey , null );
331
- Assert .assertEquals (old .hashKeyRange ().endingHashKey (),
332
- new2 .hashKeyRange ().endingHashKey ());
333
-
334
- return shards ;
335
- }
336
-
337
- private List <Shard > mergeShards (final String streamName ,
338
- final String shard1 ,
339
- final String shard2 )
340
- throws InterruptedException {
341
-
342
- client .mergeShards (MergeShardsRequest .builder ()
343
- .streamName (streamName )
344
- .shardToMerge (shard1 )
345
- .adjacentShardToMerge (shard2 )
346
- .build ());
347
218
348
- List <Shard > shards = waitForStream (streamName );
349
219
350
- Assert .assertEquals (4 , shards .size ());
351
- Shard merged = shards .get (3 );
352
-
353
- BigInteger start =
354
- new BigInteger (merged .hashKeyRange ().startingHashKey ());
355
- BigInteger end =
356
- new BigInteger (merged .hashKeyRange ().endingHashKey ());
357
-
358
- Assert .assertEquals (BigInteger .valueOf (0 ), start );
359
- Assert .assertTrue (end .compareTo (BigInteger .valueOf (1000 )) >= 0 );
360
-
361
- return shards ;
362
- }
363
-
364
- private void validateHashKeyRange (final HashKeyRange range ,
365
- final Long start ,
366
- final Long end ) {
367
- if (start != null ) {
368
- Assert .assertEquals (BigInteger .valueOf (start ),
369
- new BigInteger (range .startingHashKey ()));
370
- }
371
- if (end != null ) {
372
- Assert .assertEquals (BigInteger .valueOf (end ),
373
- new BigInteger (range .endingHashKey ()));
374
- }
375
- }
376
220
377
221
private PutRecordResponse putRecord (final String streamName ,
378
222
final String data ) {
@@ -391,71 +235,6 @@ private PutRecordResponse putRecord(final String streamName,
391
235
return result ;
392
236
}
393
237
394
- private PutRecordResponse putRecordExplicit (final String streamName ,
395
- final String hashKey ) {
396
-
397
- PutRecordResponse result = client .putRecord (PutRecordRequest .builder ()
398
- .streamName (streamName )
399
- .partitionKey ("foobar" )
400
- .explicitHashKey (hashKey )
401
- .data (SdkBytes .fromUtf8String ("Speak No Evil" ))
402
- .build ());
403
- Assert .assertNotNull (result );
404
-
405
- Assert .assertNotNull (result .shardId ());
406
- Assert .assertNotNull (result .sequenceNumber ());
407
-
408
- return result ;
409
- }
410
-
411
- private PutRecordResponse putRecordExplicit (final String streamName ,
412
- final String hashKey ,
413
- final String minSQN ) {
414
-
415
- PutRecordResponse result = client .putRecord (PutRecordRequest .builder ()
416
- .streamName (streamName )
417
- .partitionKey ("foobar" )
418
- .explicitHashKey (hashKey )
419
- .sequenceNumberForOrdering (minSQN )
420
- .data (SdkBytes .fromUtf8String ("Hear No Evil" ))
421
- .build ());
422
- Assert .assertNotNull (result );
423
-
424
- Assert .assertNotNull (result .shardId ());
425
- Assert .assertNotNull (result .sequenceNumber ());
426
-
427
- return result ;
428
- }
429
-
430
- private void findStreamInList (final String streamName ) {
431
- boolean found = false ;
432
-
433
- String start = null ;
434
- while (true ) {
435
-
436
- ListStreamsResponse result = client .listStreams (ListStreamsRequest .builder ().exclusiveStartStreamName (start ).build ());
437
-
438
- Assert .assertNotNull (result );
439
-
440
- List <String > names = result .streamNames ();
441
- Assert .assertNotNull (names );
442
-
443
- if (names .size () > 0 ) {
444
- if (names .contains (streamName )) {
445
- found = true ;
446
- }
447
-
448
- start = names .get (names .size () - 1 );
449
- }
450
-
451
- if (!result .hasMoreStreams ()) {
452
- break ;
453
- }
454
-
455
- }
456
-
457
- Assert .assertTrue (found );
458
- }
459
238
460
239
private List <Shard > waitForStream (final String streamName )
461
240
throws InterruptedException {
0 commit comments