Skip to content

Commit 1f2752f

Browse files
authored
feat: add routing header for multiplexed connection (#2035)
1 parent e4c5e97 commit 1f2752f

File tree

2 files changed

+30
-0
lines changed

2 files changed

+30
-0
lines changed

google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,8 @@
5151
import java.util.concurrent.locks.ReentrantLock;
5252
import java.util.logging.Level;
5353
import java.util.logging.Logger;
54+
import java.util.regex.Matcher;
55+
import java.util.regex.Pattern;
5456
import javax.annotation.concurrent.GuardedBy;
5557

5658
/**
@@ -219,11 +221,29 @@ class ConnectionWorker implements AutoCloseable {
219221
private RuntimeException testOnlyRunTimeExceptionInAppendLoop = null;
220222
private long testOnlyAppendLoopSleepTime = 0;
221223

224+
private static String projectMatching = "projects/[^/]+/";
225+
private static Pattern streamPatternProject = Pattern.compile(projectMatching);
226+
222227
/** The maximum size of one request. Defined by the API. */
223228
public static long getApiMaxRequestBytes() {
224229
return 10L * 1000L * 1000L; // 10 megabytes (https://en.wikipedia.org/wiki/Megabyte)
225230
}
226231

232+
static String extractProjectName(String streamName) {
233+
Matcher streamMatcher = streamPatternProject.matcher(streamName);
234+
if (streamMatcher.find()) {
235+
return streamMatcher.group();
236+
} else {
237+
throw new IllegalStateException(
238+
String.format("The passed in stream name does not match standard format %s", streamName));
239+
}
240+
}
241+
242+
static String getRoutingHeader(String streamName, String location) {
243+
String project = extractProjectName(streamName);
244+
return project + "locations/" + location;
245+
}
246+
227247
public ConnectionWorker(
228248
String streamName,
229249
String location,
@@ -259,6 +279,10 @@ public ConnectionWorker(
259279
newHeaders.putAll(clientSettings.toBuilder().getHeaderProvider().getHeaders());
260280
if (this.location == null) {
261281
newHeaders.put("x-goog-request-params", "write_stream=" + this.streamName);
282+
} else {
283+
newHeaders.put(
284+
"x-goog-request-params",
285+
"write_location=" + getRoutingHeader(this.streamName, this.location));
262286
}
263287
BigQueryWriteSettings stubSettings =
264288
clientSettings

google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerTest.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -710,4 +710,10 @@ public void testLongTimeIdleWontFail() throws Exception {
710710
assertEquals(i, futures.get(i).get().getAppendResult().getOffset().getValue());
711711
}
712712
}
713+
714+
@Test
715+
public void testLocationName() throws Exception {
716+
assertEquals(
717+
"projects/p1/locations/us", ConnectionWorker.getRoutingHeader(TEST_STREAM_1, "us"));
718+
}
713719
}

0 commit comments

Comments
 (0)