18
18
19
19
// [START bigtable_writes_batch]
20
20
21
+ import com .google .api .core .ApiFuture ;
22
+ import com .google .api .gax .batching .Batcher ;
23
+ import com .google .api .gax .batching .BatchingException ;
21
24
import com .google .cloud .bigtable .data .v2 .BigtableDataClient ;
22
- import com .google .cloud .bigtable .data .v2 .models .BulkMutation ;
23
- import com .google .cloud .bigtable .data .v2 .models .Mutation ;
25
+ import com .google .cloud .bigtable .data .v2 .models .RowMutationEntry ;
24
26
import com .google .protobuf .ByteString ;
27
+ import java .util .ArrayList ;
28
+ import java .util .List ;
29
+ import java .util .concurrent .ExecutionException ;
25
30
26
31
public class WriteBatch {
27
32
private static final String COLUMN_FAMILY_NAME = "stats_summary" ;
@@ -32,34 +37,43 @@ public static void writeBatch(String projectId, String instanceId, String tableI
32
37
// String tableId = "mobile-time-series";
33
38
34
39
try (BigtableDataClient dataClient = BigtableDataClient .create (projectId , instanceId )) {
35
- long timestamp = System .currentTimeMillis () * 1000 ;
40
+ List <ApiFuture <Void >> batchFutures = new ArrayList <>();
41
+ try (Batcher <RowMutationEntry , Void > batcher = dataClient .newBulkMutationBatcher (tableId )) {
42
+ long timestamp = System .currentTimeMillis () * 1000 ;
43
+ batchFutures .add (
44
+ batcher .add (
45
+ RowMutationEntry .create ("tablet#a0b81f74#20190501" )
46
+ .setCell (
47
+ COLUMN_FAMILY_NAME , ByteString .copyFromUtf8 ("connected_wifi" ), timestamp , 1 )
48
+ .setCell (COLUMN_FAMILY_NAME , "os_build" , timestamp , "12155.0.0-rc1" )));
49
+ batchFutures .add (
50
+ batcher .add (
51
+ RowMutationEntry .create ("tablet#a0b81f74#20190502" )
52
+ .setCell (
53
+ COLUMN_FAMILY_NAME , ByteString .copyFromUtf8 ("connected_wifi" ), timestamp , 1 )
54
+ .setCell (COLUMN_FAMILY_NAME , "os_build" , timestamp , "12155.0.0-rc6" )));
36
55
37
- BulkMutation bulkMutation =
38
- BulkMutation .create (tableId )
39
- .add (
40
- "tablet#a0b81f74#20190501" ,
41
- Mutation .create ()
42
- .setCell (
43
- COLUMN_FAMILY_NAME ,
44
- ByteString .copyFrom ("connected_wifi" .getBytes ()),
45
- timestamp ,
46
- 1 )
47
- .setCell (COLUMN_FAMILY_NAME , "os_build" , timestamp , "12155.0.0-rc1" ))
48
- .add (
49
- "tablet#a0b81f74#20190502" ,
50
- Mutation .create ()
51
- .setCell (
52
- COLUMN_FAMILY_NAME ,
53
- ByteString .copyFrom ("connected_wifi" .getBytes ()),
54
- timestamp ,
55
- 1 )
56
- .setCell (COLUMN_FAMILY_NAME , "os_build" , timestamp , "12155.0.0-rc6" ));
57
-
58
- dataClient .bulkMutateRows (bulkMutation );
59
-
60
- System .out .print ("Successfully wrote 2 rows" );
56
+ // Blocks until mutations are applied on all submitted row entries.
57
+ // flush will be called automatically when a batch is full.
58
+ batcher .flush ();
59
+ // Before batcher is closed, all remaining (if any) mutations are applied.
60
+ } catch (BatchingException batchingException ) {
61
+ System .out .println (
62
+ "At least one entry failed to apply. Summary of the errors: \n " + batchingException );
63
+ // get individual entry error details
64
+ for (ApiFuture <Void > future : batchFutures ) {
65
+ try {
66
+ future .get ();
67
+ } catch (ExecutionException entryException ) {
68
+ System .out .println ("Entry failure: " + entryException .getCause ());
69
+ } catch (InterruptedException e ) {
70
+ // handle interrupted exception
71
+ }
72
+ }
73
+ }
74
+ System .out .println ("Successfully wrote 2 rows" );
61
75
} catch (Exception e ) {
62
- System .out .println ("Error during WriteBatch: \n " + e . toString () );
76
+ System .out .println ("Error during WriteBatch: \n " + e );
63
77
}
64
78
}
65
79
}
0 commit comments