Skip to content

Commit 98d7e44

Browse files
risnayakRishabh Nayak
and
Rishabh Nayak
authored
feat: add functions to set missing value map in the stream writers (#1966)
* feat: add functions to set missing value map in the stream writers * fix syntax error * fix lint errors --------- Co-authored-by: Rishabh Nayak <[email protected]>
1 parent 3d67443 commit 98d7e44

File tree

5 files changed

+136
-0
lines changed

5 files changed

+136
-0
lines changed

google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@
5959
* <p>TODO: support updated schema
6060
*/
6161
class ConnectionWorker implements AutoCloseable {
62+
6263
private static final Logger log = Logger.getLogger(StreamWriter.class.getName());
6364

6465
// Maximum wait time on inflight quota before error out.
@@ -280,6 +281,8 @@ ApiFuture<AppendRowsResponse> append(StreamWriter streamWriter, ProtoRows rows,
280281
requestBuilder.setOffset(Int64Value.of(offset));
281282
}
282283
requestBuilder.setWriteStream(streamWriter.getStreamName());
284+
requestBuilder.putAllMissingValueInterpretations(
285+
streamWriter.getMissingValueInterpretationMap());
283286
return appendInternal(streamWriter, requestBuilder.build());
284287
}
285288

@@ -853,6 +856,7 @@ synchronized TableSchemaAndTimestamp getUpdatedSchema() {
853856

854857
// Class that wraps AppendRowsRequest and its corresponding Response future.
855858
static final class AppendRequestAndResponse {
859+
856860
final SettableApiFuture<AppendRowsResponse> appendResult;
857861
final AppendRowsRequest message;
858862
final long messageSize;
@@ -884,6 +888,7 @@ public Load getLoad() {
884888
*/
885889
@AutoValue
886890
public abstract static class Load {
891+
887892
// Consider the load on this worker to be overwhelmed when above some percentage of
888893
// in-flight bytes or in-flight requests count.
889894
private static double overwhelmedInflightCount = 0.2;
@@ -957,6 +962,7 @@ static void setMaxInflightQueueWaitTime(long waitTime) {
957962

958963
@AutoValue
959964
abstract static class TableSchemaAndTimestamp {
965+
960966
// Shows the timestamp updated schema is reported from response
961967
abstract long updateTimeStamp();
962968

google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriter.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -275,6 +275,24 @@ public long getInflightWaitSeconds() {
275275
return streamWriter.getInflightWaitSeconds();
276276
}
277277

278+
/**
279+
* Sets the missing value interpretation map for the JsonStreamWriter. The input
280+
* missingValueInterpretationMap is used for all append requests unless otherwise changed.
281+
*
282+
* @param missingValueInterpretationMap the missing value interpretation map used by the
283+
* JsonStreamWriter.
284+
*/
285+
public void setMissingValueInterpretationMap(
286+
Map<String, AppendRowsRequest.MissingValueInterpretation> missingValueInterpretationMap) {
287+
streamWriter.setMissingValueInterpretationMap(missingValueInterpretationMap);
288+
}
289+
290+
/** @return the missing value interpretation map used for the writer. */
291+
public Map<String, AppendRowsRequest.MissingValueInterpretation>
292+
getMissingValueInterpretationMap() {
293+
return streamWriter.getMissingValueInterpretationMap();
294+
}
295+
278296
/** Sets all StreamWriter settings. */
279297
private void setStreamWriterSettings(
280298
@Nullable TransportChannelProvider channelProvider,

google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import io.grpc.StatusRuntimeException;
3333
import java.io.IOException;
3434
import java.time.Duration;
35+
import java.util.HashMap;
3536
import java.util.Map;
3637
import java.util.Map.Entry;
3738
import java.util.Objects;
@@ -61,6 +62,11 @@ public class StreamWriter implements AutoCloseable {
6162
// Cache of location info for a given dataset.
6263
private static Map<String, String> projectAndDatasetToLocation = new ConcurrentHashMap<>();
6364

65+
// Map of fields to their MissingValueInterpretation, which dictates how a field should be
66+
// populated when it is missing from an input user row.
67+
private Map<String, AppendRowsRequest.MissingValueInterpretation> missingValueInterpretationMap =
68+
new HashMap();
69+
6470
/*
6571
* The identifier of stream to write to.
6672
*/
@@ -336,6 +342,18 @@ private void validateFetchedConnectonPool(StreamWriter.Builder builder) {
336342
}
337343
}
338344

345+
/**
346+
* Sets the missing value interpretation map for the stream writer. The input
347+
* missingValueInterpretationMap is used for all write requests unless otherwise changed.
348+
*
349+
* @param missingValueInterpretationMap the missing value interpretation map used by stream
350+
* writer.
351+
*/
352+
public void setMissingValueInterpretationMap(
353+
Map<String, AppendRowsRequest.MissingValueInterpretation> missingValueInterpretationMap) {
354+
this.missingValueInterpretationMap = missingValueInterpretationMap;
355+
}
356+
339357
/**
340358
* Schedules the writing of rows at the end of current stream.
341359
*
@@ -419,6 +437,12 @@ public String getLocation() {
419437
return location;
420438
}
421439

440+
/** @return the missing value interpretation map used for the writer. */
441+
public Map<String, AppendRowsRequest.MissingValueInterpretation>
442+
getMissingValueInterpretationMap() {
443+
return missingValueInterpretationMap;
444+
}
445+
422446
/**
423447
* @return if a stream writer can no longer be used for writing. It is due to either the
424448
* StreamWriter is explicitly closed or the underlying connection is broken when connection

google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriterTest.java

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@
4848
import java.math.BigDecimal;
4949
import java.math.RoundingMode;
5050
import java.util.Arrays;
51+
import java.util.HashMap;
5152
import java.util.Map;
5253
import java.util.UUID;
5354
import java.util.concurrent.ExecutionException;
@@ -1290,4 +1291,43 @@ private AppendRowsResponse createAppendResponse(long offset) {
12901291
AppendRowsResponse.AppendResult.newBuilder().setOffset(Int64Value.of(offset)).build())
12911292
.build();
12921293
}
1294+
1295+
@Test
1296+
public void testAppendWithMissingValueMap() throws Exception {
1297+
TableFieldSchema field =
1298+
TableFieldSchema.newBuilder()
1299+
.setType(TableFieldSchema.Type.STRING)
1300+
.setMode(TableFieldSchema.Mode.NULLABLE)
1301+
.setName("test-列")
1302+
.build();
1303+
TableSchema tableSchema = TableSchema.newBuilder().addFields(0, field).build();
1304+
FlexibleType expectedProto = FlexibleType.newBuilder().setColDGVzdC3LiJc("allen").build();
1305+
JSONObject flexible = new JSONObject();
1306+
flexible.put("test-列", "allen");
1307+
JSONArray jsonArr = new JSONArray();
1308+
jsonArr.put(flexible);
1309+
1310+
try (JsonStreamWriter writer =
1311+
getTestJsonStreamWriterBuilder(TEST_STREAM, tableSchema).setTraceId("test:empty").build()) {
1312+
1313+
Map<String, AppendRowsRequest.MissingValueInterpretation> missingValueMap = new HashMap();
1314+
missingValueMap.put("col1", AppendRowsRequest.MissingValueInterpretation.NULL_VALUE);
1315+
missingValueMap.put("col3", AppendRowsRequest.MissingValueInterpretation.DEFAULT_VALUE);
1316+
writer.setMissingValueInterpretationMap(missingValueMap);
1317+
assertEquals(missingValueMap, writer.getMissingValueInterpretationMap());
1318+
1319+
testBigQueryWrite.addResponse(
1320+
AppendRowsResponse.newBuilder()
1321+
.setAppendResult(
1322+
AppendRowsResponse.AppendResult.newBuilder().setOffset(Int64Value.of(0)).build())
1323+
.build());
1324+
1325+
ApiFuture<AppendRowsResponse> appendFuture = writer.append(jsonArr);
1326+
assertEquals(0L, appendFuture.get().getAppendResult().getOffset().getValue());
1327+
appendFuture.get();
1328+
assertEquals(
1329+
testBigQueryWrite.getAppendRequests().get(0).getMissingValueInterpretations(),
1330+
missingValueMap);
1331+
}
1332+
}
12931333
}

google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,9 @@
4949
import java.io.IOException;
5050
import java.util.ArrayList;
5151
import java.util.Arrays;
52+
import java.util.HashMap;
5253
import java.util.List;
54+
import java.util.Map;
5355
import java.util.UUID;
5456
import java.util.concurrent.ExecutionException;
5557
import java.util.concurrent.Future;
@@ -67,6 +69,7 @@
6769

6870
@RunWith(JUnit4.class)
6971
public class StreamWriterTest {
72+
7073
private static final Logger log = Logger.getLogger(StreamWriterTest.class.getName());
7174
private static final String TEST_STREAM_1 = "projects/p/datasets/d1/tables/t1/streams/_default";
7275
private static final String TEST_STREAM_2 = "projects/p/datasets/d2/tables/t2/streams/_default";
@@ -1227,6 +1230,51 @@ public void testCloseDisconnectedStream() throws Exception {
12271230
writer.close();
12281231
}
12291232

1233+
@Test
1234+
public void testSetAndGetMissingValueInterpretationMap() throws Exception {
1235+
StreamWriter writer = getTestStreamWriter();
1236+
Map<String, AppendRowsRequest.MissingValueInterpretation> missingValueMap = new HashMap();
1237+
missingValueMap.put("col1", AppendRowsRequest.MissingValueInterpretation.NULL_VALUE);
1238+
missingValueMap.put("col3", AppendRowsRequest.MissingValueInterpretation.DEFAULT_VALUE);
1239+
writer.setMissingValueInterpretationMap(missingValueMap);
1240+
assertEquals(missingValueMap, writer.getMissingValueInterpretationMap());
1241+
}
1242+
1243+
@Test
1244+
public void testAppendWithMissingValueMap() throws Exception {
1245+
StreamWriter writer = getTestStreamWriter();
1246+
1247+
long appendCount = 2;
1248+
testBigQueryWrite.addResponse(createAppendResponse(0));
1249+
testBigQueryWrite.addResponse(createAppendResponse(1));
1250+
1251+
List<ApiFuture<AppendRowsResponse>> futures = new ArrayList<>();
1252+
// The first append doesn't use a missing value map.
1253+
futures.add(writer.append(createProtoRows(new String[] {String.valueOf(0)}), 0));
1254+
1255+
// The second append uses a missing value map.
1256+
Map<String, AppendRowsRequest.MissingValueInterpretation> missingValueMap = new HashMap();
1257+
missingValueMap.put("col1", AppendRowsRequest.MissingValueInterpretation.NULL_VALUE);
1258+
missingValueMap.put("col3", AppendRowsRequest.MissingValueInterpretation.DEFAULT_VALUE);
1259+
writer.setMissingValueInterpretationMap(missingValueMap);
1260+
futures.add(writer.append(createProtoRows(new String[] {String.valueOf(1)}), 1));
1261+
1262+
for (int i = 0; i < appendCount; i++) {
1263+
assertEquals(i, futures.get(i).get().getAppendResult().getOffset().getValue());
1264+
}
1265+
1266+
// Ensure that the AppendRowsRequest for the first append operation does not have a missing
1267+
// value map, and that the second AppendRowsRequest has the missing value map provided in the
1268+
// second append.
1269+
verifyAppendRequests(appendCount);
1270+
AppendRowsRequest request1 = testBigQueryWrite.getAppendRequests().get(0);
1271+
AppendRowsRequest request2 = testBigQueryWrite.getAppendRequests().get(1);
1272+
assertTrue(request1.getMissingValueInterpretations().isEmpty());
1273+
assertEquals(request2.getMissingValueInterpretations(), missingValueMap);
1274+
1275+
writer.close();
1276+
}
1277+
12301278
@Test(timeout = 10000)
12311279
public void testStreamWriterUserCloseMultiplexing() throws Exception {
12321280
StreamWriter writer =

0 commit comments

Comments
 (0)