Skip to content

Commit 4219cf8

Browse files
authored
fix: synchronize lazy ResultSet decoding (#3267)
Using one of the options DecodeMode.LAZY_PER_ROW or DecodeMode.LAZY_PER_COLUMN in combination with multi-threaded access to the ResultSet could lead to ClassCastExceptions, as the underlying decode methods were not synchronized. This could lead to multiple threads trying to access either the raw proto data or the decoded data at the same time, and expecting to get the other type of data.
1 parent 61f0ab7 commit 4219cf8

File tree

3 files changed

+180
-94
lines changed

3 files changed

+180
-94
lines changed

google-cloud-spanner/src/main/java/com/google/cloud/spanner/GrpcResultSet.java

+11-1
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import com.google.spanner.v1.ResultSetMetadata;
2626
import com.google.spanner.v1.ResultSetStats;
2727
import java.util.ArrayList;
28+
import java.util.Collections;
2829
import java.util.List;
2930
import javax.annotation.Nullable;
3031

@@ -35,6 +36,7 @@ class GrpcResultSet extends AbstractResultSet<List<Object>> implements ProtobufR
3536
private final DecodeMode decodeMode;
3637
private ResultSetMetadata metadata;
3738
private GrpcStruct currRow;
39+
private List<Object> rowData;
3840
private SpannerException error;
3941
private ResultSetStats statistics;
4042
private boolean closed;
@@ -85,7 +87,15 @@ public boolean next() throws SpannerException {
8587
throw SpannerExceptionFactory.newSpannerException(
8688
ErrorCode.FAILED_PRECONDITION, AbstractReadContext.NO_TRANSACTION_RETURNED_MSG);
8789
}
88-
currRow = new GrpcStruct(iterator.type(), new ArrayList<>(), decodeMode);
90+
if (rowData == null) {
91+
rowData = new ArrayList<>(metadata.getRowType().getFieldsCount());
92+
if (decodeMode != DecodeMode.DIRECT) {
93+
rowData = Collections.synchronizedList(rowData);
94+
}
95+
} else {
96+
rowData.clear();
97+
}
98+
currRow = new GrpcStruct(iterator.type(), rowData, decodeMode);
8999
}
90100
boolean hasNext = currRow.consumeRow(iterator);
91101
if (!hasNext) {

google-cloud-spanner/src/main/java/com/google/cloud/spanner/GrpcStruct.java

+93-51
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@
4949
import java.util.Collections;
5050
import java.util.Iterator;
5151
import java.util.List;
52+
import java.util.concurrent.atomic.AtomicBoolean;
5253
import java.util.function.Function;
5354
import java.util.stream.Collectors;
5455

@@ -60,7 +61,7 @@ class GrpcStruct extends Struct implements Serializable {
6061
private final List<Object> rowData;
6162
private final DecodeMode decodeMode;
6263
private final BitSet colDecoded;
63-
private boolean rowDecoded;
64+
private final AtomicBoolean rowDecoded;
6465

6566
/**
6667
* Builds an immutable version of this struct using {@link Struct#newBuilder()} which is used as a
@@ -224,7 +225,7 @@ private GrpcStruct(
224225
this.type = type;
225226
this.rowData = rowData;
226227
this.decodeMode = decodeMode;
227-
this.rowDecoded = rowDecoded;
228+
this.rowDecoded = new AtomicBoolean(rowDecoded);
228229
this.colDecoded = colDecoded;
229230
}
230231

@@ -234,29 +235,31 @@ public String toString() {
234235
}
235236

236237
boolean consumeRow(Iterator<com.google.protobuf.Value> iterator) {
237-
rowData.clear();
238-
if (decodeMode == DecodeMode.LAZY_PER_ROW) {
239-
rowDecoded = false;
240-
} else if (decodeMode == DecodeMode.LAZY_PER_COL) {
241-
colDecoded.clear();
242-
}
243-
if (!iterator.hasNext()) {
244-
return false;
245-
}
246-
for (Type.StructField fieldType : getType().getStructFields()) {
238+
synchronized (rowData) {
239+
rowData.clear();
240+
if (decodeMode == DecodeMode.LAZY_PER_ROW) {
241+
rowDecoded.set(false);
242+
} else if (decodeMode == DecodeMode.LAZY_PER_COL) {
243+
colDecoded.clear();
244+
}
247245
if (!iterator.hasNext()) {
248-
throw newSpannerException(
249-
ErrorCode.INTERNAL,
250-
"Invalid value stream: end of stream reached before row is complete");
246+
return false;
251247
}
252-
com.google.protobuf.Value value = iterator.next();
253-
if (decodeMode == DecodeMode.DIRECT) {
254-
rowData.add(decodeValue(fieldType.getType(), value));
255-
} else {
256-
rowData.add(value);
248+
for (Type.StructField fieldType : getType().getStructFields()) {
249+
if (!iterator.hasNext()) {
250+
throw newSpannerException(
251+
ErrorCode.INTERNAL,
252+
"Invalid value stream: end of stream reached before row is complete");
253+
}
254+
com.google.protobuf.Value value = iterator.next();
255+
if (decodeMode == DecodeMode.DIRECT) {
256+
rowData.add(decodeValue(fieldType.getType(), value));
257+
} else {
258+
rowData.add(value);
259+
}
257260
}
261+
return true;
258262
}
259-
return true;
260263
}
261264

262265
private static Object decodeValue(Type fieldType, com.google.protobuf.Value proto) {
@@ -367,12 +370,16 @@ private static void checkType(
367370
}
368371

369372
Struct immutableCopy() {
370-
return new GrpcStruct(
371-
type,
372-
new ArrayList<>(rowData),
373-
this.decodeMode,
374-
this.rowDecoded,
375-
this.colDecoded == null ? null : (BitSet) this.colDecoded.clone());
373+
synchronized (rowData) {
374+
return new GrpcStruct(
375+
type,
376+
this.decodeMode == DecodeMode.DIRECT
377+
? new ArrayList<>(rowData)
378+
: Collections.synchronizedList(new ArrayList<>(rowData)),
379+
this.decodeMode,
380+
this.rowDecoded.get(),
381+
this.colDecoded == null ? null : (BitSet) this.colDecoded.clone());
382+
}
376383
}
377384

378385
@Override
@@ -382,9 +389,14 @@ public Type getType() {
382389

383390
@Override
384391
public boolean isNull(int columnIndex) {
385-
if ((decodeMode == DecodeMode.LAZY_PER_ROW && !rowDecoded)
386-
|| (decodeMode == DecodeMode.LAZY_PER_COL && !colDecoded.get(columnIndex))) {
387-
return ((com.google.protobuf.Value) rowData.get(columnIndex)).hasNullValue();
392+
if (decodeMode == DecodeMode.LAZY_PER_ROW || decodeMode == DecodeMode.LAZY_PER_COL) {
393+
synchronized (rowData) {
394+
if ((decodeMode == DecodeMode.LAZY_PER_ROW && !rowDecoded.get())
395+
|| (decodeMode == DecodeMode.LAZY_PER_COL && !colDecoded.get(columnIndex))) {
396+
return ((com.google.protobuf.Value) rowData.get(columnIndex)).hasNullValue();
397+
}
398+
return rowData.get(columnIndex) == null;
399+
}
388400
}
389401
return rowData.get(columnIndex) == null;
390402
}
@@ -496,14 +508,18 @@ private boolean isUnrecognizedType(int columnIndex) {
496508
}
497509

498510
boolean canGetProtoValue(int columnIndex) {
499-
return isUnrecognizedType(columnIndex)
500-
|| (decodeMode == DecodeMode.LAZY_PER_ROW && !rowDecoded)
501-
|| (decodeMode == DecodeMode.LAZY_PER_COL && !colDecoded.get(columnIndex));
511+
synchronized (rowData) {
512+
return isUnrecognizedType(columnIndex)
513+
|| (decodeMode == DecodeMode.LAZY_PER_ROW && !rowDecoded.get())
514+
|| (decodeMode == DecodeMode.LAZY_PER_COL && !colDecoded.get(columnIndex));
515+
}
502516
}
503517

504518
protected com.google.protobuf.Value getProtoValueInternal(int columnIndex) {
505-
checkProtoValueSupported(columnIndex);
506-
return (com.google.protobuf.Value) rowData.get(columnIndex);
519+
synchronized (rowData) {
520+
checkProtoValueSupported(columnIndex);
521+
return (com.google.protobuf.Value) rowData.get(columnIndex);
522+
}
507523
}
508524

509525
private void checkProtoValueSupported(int columnIndex) {
@@ -515,30 +531,56 @@ private void checkProtoValueSupported(int columnIndex) {
515531
decodeMode != DecodeMode.DIRECT,
516532
"Getting proto value is not supported when DecodeMode#DIRECT is used.");
517533
Preconditions.checkState(
518-
!(decodeMode == DecodeMode.LAZY_PER_ROW && rowDecoded),
534+
!(decodeMode == DecodeMode.LAZY_PER_ROW && rowDecoded.get()),
519535
"Getting proto value after the row has been decoded is not supported.");
520536
Preconditions.checkState(
521537
!(decodeMode == DecodeMode.LAZY_PER_COL && colDecoded.get(columnIndex)),
522538
"Getting proto value after the column has been decoded is not supported.");
523539
}
524540

525541
private void ensureDecoded(int columnIndex) {
526-
if (decodeMode == DecodeMode.LAZY_PER_ROW && !rowDecoded) {
527-
for (int i = 0; i < rowData.size(); i++) {
528-
rowData.set(
529-
i,
530-
decodeValue(
531-
type.getStructFields().get(i).getType(),
532-
(com.google.protobuf.Value) rowData.get(i)));
542+
if (decodeMode == DecodeMode.LAZY_PER_ROW) {
543+
synchronized (rowData) {
544+
if (!rowDecoded.get()) {
545+
for (int i = 0; i < rowData.size(); i++) {
546+
rowData.set(
547+
i,
548+
decodeValue(
549+
type.getStructFields().get(i).getType(),
550+
(com.google.protobuf.Value) rowData.get(i)));
551+
}
552+
}
553+
rowDecoded.set(true);
554+
}
555+
} else if (decodeMode == DecodeMode.LAZY_PER_COL) {
556+
boolean decoded;
557+
Object value;
558+
synchronized (rowData) {
559+
decoded = colDecoded.get(columnIndex);
560+
value = rowData.get(columnIndex);
561+
}
562+
if (!decoded) {
563+
// Use the column as a lock during decoding to ensure that we decode once (mostly), but also
564+
// that multiple different columns can be decoded in parallel if requested.
565+
synchronized (type.getStructFields().get(columnIndex)) {
566+
// Note: It can be that we decode the value twice if two threads request this at the same
567+
// time, but the synchronization on rowData above and below makes sure that we always get
568+
// and set a consistent value (and only set it once).
569+
if (!colDecoded.get(columnIndex)) {
570+
value =
571+
decodeValue(
572+
type.getStructFields().get(columnIndex).getType(),
573+
(com.google.protobuf.Value) value);
574+
decoded = true;
575+
}
576+
}
577+
if (decoded) {
578+
synchronized (rowData) {
579+
rowData.set(columnIndex, value);
580+
colDecoded.set(columnIndex);
581+
}
582+
}
533583
}
534-
rowDecoded = true;
535-
} else if (decodeMode == DecodeMode.LAZY_PER_COL && !colDecoded.get(columnIndex)) {
536-
rowData.set(
537-
columnIndex,
538-
decodeValue(
539-
type.getStructFields().get(columnIndex).getType(),
540-
(com.google.protobuf.Value) rowData.get(columnIndex)));
541-
colDecoded.set(columnIndex);
542584
}
543585
}
544586

google-cloud-spanner/src/test/java/com/google/cloud/spanner/connection/DecodeModeTest.java

+76-42
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,12 @@
2727
import com.google.cloud.spanner.ResultSet;
2828
import com.google.cloud.spanner.SpannerException;
2929
import com.google.cloud.spanner.Statement;
30+
import java.util.ArrayList;
31+
import java.util.List;
32+
import java.util.concurrent.ExecutorService;
33+
import java.util.concurrent.Executors;
34+
import java.util.concurrent.Future;
35+
import java.util.concurrent.ThreadLocalRandom;
3036
import org.junit.After;
3137
import org.junit.Test;
3238
import org.junit.runner.RunWith;
@@ -41,7 +47,7 @@ public void clearRequests() {
4147
}
4248

4349
@Test
44-
public void testAllDecodeModes() {
50+
public void testAllDecodeModes() throws Exception {
4551
int numRows = 10;
4652
RandomResultSetGenerator generator = new RandomResultSetGenerator(numRows);
4753
String sql = "select * from random";
@@ -50,57 +56,85 @@ public void testAllDecodeModes() {
5056
MockSpannerServiceImpl.StatementResult.query(statement, generator.generate()));
5157

5258
try (Connection connection = createConnection()) {
53-
for (boolean readonly : new boolean[] {true, false}) {
54-
for (boolean autocommit : new boolean[] {true, false}) {
55-
connection.setReadOnly(readonly);
56-
connection.setAutocommit(autocommit);
59+
for (boolean multiThreaded : new boolean[] {true, false}) {
60+
for (boolean readonly : new boolean[] {true, false}) {
61+
for (boolean autocommit : new boolean[] {true, false}) {
62+
connection.setReadOnly(readonly);
63+
connection.setAutocommit(autocommit);
5764

58-
int receivedRows = 0;
59-
// DecodeMode#DIRECT is not supported in read/write transactions, as the protobuf value is
60-
// used for checksum calculation.
61-
try (ResultSet direct =
62-
connection.executeQuery(
63-
statement,
64-
!readonly && !autocommit
65-
? Options.decodeMode(DecodeMode.LAZY_PER_ROW)
66-
: Options.decodeMode(DecodeMode.DIRECT));
67-
ResultSet lazyPerRow =
68-
connection.executeQuery(statement, Options.decodeMode(DecodeMode.LAZY_PER_ROW));
69-
ResultSet lazyPerCol =
70-
connection.executeQuery(statement, Options.decodeMode(DecodeMode.LAZY_PER_COL))) {
71-
while (direct.next() && lazyPerRow.next() && lazyPerCol.next()) {
72-
assertEquals(direct.getColumnCount(), lazyPerRow.getColumnCount());
73-
assertEquals(direct.getColumnCount(), lazyPerCol.getColumnCount());
74-
for (int col = 0; col < direct.getColumnCount(); col++) {
75-
// Test getting the entire row as a struct both as the first thing we do, and as the
76-
// last thing we do. This ensures that the method works as expected both when a row
77-
// is lazily decoded by this method, and when it has already been decoded by another
78-
// method.
79-
if (col % 2 == 0) {
80-
assertEquals(direct.getCurrentRowAsStruct(), lazyPerRow.getCurrentRowAsStruct());
81-
assertEquals(direct.getCurrentRowAsStruct(), lazyPerCol.getCurrentRowAsStruct());
82-
}
83-
assertEquals(direct.isNull(col), lazyPerRow.isNull(col));
84-
assertEquals(direct.isNull(col), lazyPerCol.isNull(col));
85-
assertEquals(direct.getValue(col), lazyPerRow.getValue(col));
86-
assertEquals(direct.getValue(col), lazyPerCol.getValue(col));
87-
if (col % 2 == 1) {
88-
assertEquals(direct.getCurrentRowAsStruct(), lazyPerRow.getCurrentRowAsStruct());
89-
assertEquals(direct.getCurrentRowAsStruct(), lazyPerCol.getCurrentRowAsStruct());
65+
int receivedRows = 0;
66+
// DecodeMode#DIRECT is not supported in read/write transactions, as the protobuf value
67+
// is
68+
// used for checksum calculation.
69+
try (ResultSet direct =
70+
connection.executeQuery(
71+
statement,
72+
!readonly && !autocommit
73+
? Options.decodeMode(DecodeMode.LAZY_PER_ROW)
74+
: Options.decodeMode(DecodeMode.DIRECT));
75+
ResultSet lazyPerRow =
76+
connection.executeQuery(
77+
statement, Options.decodeMode(DecodeMode.LAZY_PER_ROW));
78+
ResultSet lazyPerCol =
79+
connection.executeQuery(
80+
statement, Options.decodeMode(DecodeMode.LAZY_PER_COL))) {
81+
while (direct.next() && lazyPerRow.next() && lazyPerCol.next()) {
82+
assertEquals(direct.getColumnCount(), lazyPerRow.getColumnCount());
83+
assertEquals(direct.getColumnCount(), lazyPerCol.getColumnCount());
84+
if (multiThreaded) {
85+
ExecutorService service = Executors.newFixedThreadPool(direct.getColumnCount());
86+
List<Future<?>> futures = new ArrayList<>(direct.getColumnCount());
87+
for (int col = 0; col < direct.getColumnCount(); col++) {
88+
final int colNumber = col;
89+
futures.add(
90+
service.submit(
91+
() -> checkRowValues(colNumber, direct, lazyPerRow, lazyPerCol)));
92+
}
93+
service.shutdown();
94+
for (Future<?> future : futures) {
95+
future.get();
96+
}
97+
} else {
98+
for (int col = 0; col < direct.getColumnCount(); col++) {
99+
checkRowValues(col, direct, lazyPerRow, lazyPerCol);
100+
}
90101
}
102+
receivedRows++;
91103
}
92-
receivedRows++;
104+
assertEquals(numRows, receivedRows);
105+
}
106+
if (!autocommit) {
107+
connection.commit();
93108
}
94-
assertEquals(numRows, receivedRows);
95-
}
96-
if (!autocommit) {
97-
connection.commit();
98109
}
99110
}
100111
}
101112
}
102113
}
103114

115+
private void checkRowValues(
116+
int col, ResultSet direct, ResultSet lazyPerRow, ResultSet lazyPerCol) {
117+
// Randomly decode and get a column to trigger parallel decoding of one column.
118+
lazyPerCol.getValue(ThreadLocalRandom.current().nextInt(lazyPerCol.getColumnCount()));
119+
120+
// Test getting the entire row as a struct both as the first thing we do, and as the
121+
// last thing we do. This ensures that the method works as expected both when a row
122+
// is lazily decoded by this method, and when it has already been decoded by another
123+
// method.
124+
if (col % 2 == 0) {
125+
assertEquals(direct.getCurrentRowAsStruct(), lazyPerRow.getCurrentRowAsStruct());
126+
assertEquals(direct.getCurrentRowAsStruct(), lazyPerCol.getCurrentRowAsStruct());
127+
}
128+
assertEquals(direct.isNull(col), lazyPerRow.isNull(col));
129+
assertEquals(direct.isNull(col), lazyPerCol.isNull(col));
130+
assertEquals(direct.getValue(col), lazyPerRow.getValue(col));
131+
assertEquals(direct.getValue(col), lazyPerCol.getValue(col));
132+
if (col % 2 == 1) {
133+
assertEquals(direct.getCurrentRowAsStruct(), lazyPerRow.getCurrentRowAsStruct());
134+
assertEquals(direct.getCurrentRowAsStruct(), lazyPerCol.getCurrentRowAsStruct());
135+
}
136+
}
137+
104138
@Test
105139
public void testDecodeModeDirect_failsInReadWriteTransaction() {
106140
int numRows = 1;

0 commit comments

Comments
 (0)