Skip to content

Commit 4498fcc

Browse files
authored
Add large payload support to datatransport. (#1431)
* Add large payload support to datatransport. Large payloads are stored in a separate table as 80KB chunks of data keyed by (sequence_num,event_id). The motivation for the change is to avoid the Android SQLite implementation limitation, that uses `CursorWindow`s of a limited size that is unknown at runtime and vendors can change arbitrarily, hence the conservative choice of 80KB instead of 2048KB that is present in AOSP source code. The way it manifests is that it is possible to write rows in the the db that is impossible to read back as an exception gets thrown. An altertative approach that was considered was to store file paths in the DB and store blobs in the file-system. But that would introduce more complexity to the storage system: * Loss of ACID semantics, i.e. if we fail to commit we may fail to delete the underlying file. * Should we store it in cache dir or in file dir? * What if cache is cleared but the event reference still exists in the db? * etc. * Fix integ test * Upgrade to latest dagger. * Increase device coverage of sqlite tests. Specifically moved sqlite tests to instrumentation such that they run on all configured devices in FTL. Additionally added migration test from v3 to v4. * upgrade robolectric. * Address review comments.
1 parent 1022fe4 commit 4498fcc

File tree

9 files changed

+217
-32
lines changed

9 files changed

+217
-32
lines changed
Lines changed: 60 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// Copyright 2019 Google LLC
1+
// Copyright 2020 Google LLC
22
//
33
// Licensed under the Apache License, Version 2.0 (the "License");
44
// you may not use this file except in compliance with the License.
@@ -17,6 +17,9 @@
1717
import static com.google.android.datatransport.runtime.scheduling.persistence.SchemaManager.SCHEMA_VERSION;
1818
import static com.google.common.truth.Truth.assertThat;
1919

20+
import android.database.DatabaseUtils;
21+
import androidx.test.core.app.ApplicationProvider;
22+
import androidx.test.ext.junit.runners.AndroidJUnit4;
2023
import com.google.android.datatransport.Encoding;
2124
import com.google.android.datatransport.Priority;
2225
import com.google.android.datatransport.runtime.EncodedPayload;
@@ -28,12 +31,11 @@
2831
import java.nio.charset.Charset;
2932
import java.util.Arrays;
3033
import java.util.Collections;
34+
import java.util.UUID;
3135
import org.junit.Test;
3236
import org.junit.runner.RunWith;
33-
import org.robolectric.RobolectricTestRunner;
34-
import org.robolectric.RuntimeEnvironment;
3537

36-
@RunWith(RobolectricTestRunner.class)
38+
@RunWith(AndroidJUnit4.class)
3739
public class SQLiteEventStoreTest {
3840
private static final TransportContext TRANSPORT_CONTEXT =
3941
TransportContext.builder().setBackendName("backend1").build();
@@ -52,8 +54,14 @@ public class SQLiteEventStoreTest {
5254
.build();
5355

5456
private static final long HOUR = 60 * 60 * 1000;
57+
private static final int MAX_BLOB_SIZE_BYTES = 6;
5558
private static final EventStoreConfig CONFIG =
56-
EventStoreConfig.DEFAULT.toBuilder().setLoadBatchSize(5).setEventCleanUpAge(HOUR).build();
59+
EventStoreConfig.DEFAULT
60+
.toBuilder()
61+
.setLoadBatchSize(5)
62+
.setEventCleanUpAge(HOUR)
63+
.setMaxBlobByteSizePerRow(MAX_BLOB_SIZE_BYTES)
64+
.build();
5765

5866
private final TestClock clock = new TestClock(1);
5967
private final SQLiteEventStore store = newStoreWithConfig(clock, CONFIG);
@@ -63,7 +71,10 @@ private static SQLiteEventStore newStoreWithConfig(Clock clock, EventStoreConfig
6371
clock,
6472
new UptimeClock(),
6573
config,
66-
new SchemaManager(RuntimeEnvironment.application, SCHEMA_VERSION));
74+
new SchemaManager(
75+
ApplicationProvider.getApplicationContext(),
76+
UUID.randomUUID().toString(),
77+
SCHEMA_VERSION));
6778
}
6879

6980
@Test
@@ -75,6 +86,49 @@ public void persist_correctlyRoundTrips() {
7586
assertThat(events).containsExactly(newEvent);
7687
}
7788

89+
@Test
90+
public void persist_withNonInlineBlob_correctlyRoundTrips() {
91+
byte[] payload = "LongerThanSixBytes".getBytes(Charset.defaultCharset());
92+
EventInternal event =
93+
EVENT.toBuilder().setEncodedPayload(new EncodedPayload(JSON_ENCODING, payload)).build();
94+
PersistedEvent newEvent = store.persist(TRANSPORT_CONTEXT, event);
95+
Iterable<PersistedEvent> events = store.loadBatch(TRANSPORT_CONTEXT);
96+
97+
assertThat(newEvent.getEvent()).isEqualTo(event);
98+
assertThat(events).containsExactly(newEvent);
99+
}
100+
101+
@Test
102+
public void persist_withNonInlineBlob_correctlyStoresPayloadInSeparateTable() {
103+
byte[] payload = "LongerThanSixBytes".getBytes(Charset.defaultCharset());
104+
EventInternal event =
105+
EVENT.toBuilder().setEncodedPayload(new EncodedPayload(JSON_ENCODING, payload)).build();
106+
PersistedEvent newEvent = store.persist(TRANSPORT_CONTEXT, event);
107+
108+
long expectedRows = payload.length / MAX_BLOB_SIZE_BYTES;
109+
if (payload.length % MAX_BLOB_SIZE_BYTES != 0) {
110+
expectedRows += 1;
111+
}
112+
113+
long payloadRows =
114+
DatabaseUtils.queryNumEntries(
115+
store.getDb(),
116+
"event_payloads",
117+
"event_id = ?",
118+
new String[] {String.valueOf(newEvent.getId())});
119+
assertThat(payloadRows).isEqualTo(expectedRows);
120+
121+
store.recordSuccess(store.loadBatch(TRANSPORT_CONTEXT));
122+
assertThat(store.loadBatch(TRANSPORT_CONTEXT)).isEmpty();
123+
payloadRows =
124+
DatabaseUtils.queryNumEntries(
125+
store.getDb(),
126+
"event_payloads",
127+
"event_id = ?",
128+
new String[] {String.valueOf(newEvent.getId())});
129+
assertThat(payloadRows).isEqualTo(0);
130+
}
131+
78132
@Test
79133
public void persist_withEventsOfDifferentPriority_shouldEndBeStoredUnderDifferentContexts() {
80134
TransportContext ctx1 =

transport/transport-runtime/src/androidTest/java/com/google/android/datatransport/runtime/scheduling/persistence/SpyEventStoreModule.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,4 +54,10 @@ static SQLiteEventStore sqliteEventStore(
5454
static int schemaVersion() {
5555
return SchemaManager.SCHEMA_VERSION;
5656
}
57+
58+
@Provides
59+
@Named("SQLITE_DB_NAME")
60+
static String dbName() {
61+
return SchemaManager.DB_NAME;
62+
}
5763
}

transport/transport-runtime/src/androidTest/java/com/google/android/datatransport/runtime/scheduling/persistence/TestEventStoreModule.java

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,6 @@
1414

1515
package com.google.android.datatransport.runtime.scheduling.persistence;
1616

17-
import static com.google.android.datatransport.runtime.scheduling.persistence.SchemaManager.SCHEMA_VERSION;
18-
1917
import com.google.android.datatransport.runtime.synchronization.SynchronizationGuard;
2018
import dagger.Binds;
2119
import dagger.Module;
@@ -35,6 +33,7 @@ static EventStoreConfig storeConfig() {
3533
.setLoadBatchSize(LOAD_BATCH_SIZE)
3634
.setCriticalSectionEnterTimeoutMs(LOCK_TIME_OUT_MS)
3735
.setEventCleanUpAge(60 * 1000)
36+
.setMaxBlobByteSizePerRow(80 * 1000)
3837
.build();
3938
}
4039

@@ -47,6 +46,12 @@ static EventStoreConfig storeConfig() {
4746
@Provides
4847
@Named("SCHEMA_VERSION")
4948
static int schemaVersion() {
50-
return SCHEMA_VERSION;
49+
return SchemaManager.SCHEMA_VERSION;
50+
}
51+
52+
@Provides
53+
@Named("SQLITE_DB_NAME")
54+
static String dbName() {
55+
return SchemaManager.DB_NAME;
5156
}
5257
}

transport/transport-runtime/src/main/java/com/google/android/datatransport/runtime/scheduling/persistence/EventStoreConfig.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,13 +22,15 @@ abstract class EventStoreConfig {
2222
private static final int LOAD_BATCH_SIZE = 200;
2323
private static final int LOCK_TIME_OUT_MS = 10000;
2424
private static final long DURATION_ONE_WEEK_MS = 7 * 24 * 60 * 60 * 1000;
25+
private static final int MAX_BLOB_BYTE_SIZE_PER_ROW = 80 * 1024;
2526

2627
static final EventStoreConfig DEFAULT =
2728
EventStoreConfig.builder()
2829
.setMaxStorageSizeInBytes(MAX_DB_STORAGE_SIZE_IN_BYTES)
2930
.setLoadBatchSize(LOAD_BATCH_SIZE)
3031
.setCriticalSectionEnterTimeoutMs(LOCK_TIME_OUT_MS)
3132
.setEventCleanUpAge(DURATION_ONE_WEEK_MS)
33+
.setMaxBlobByteSizePerRow(MAX_BLOB_BYTE_SIZE_PER_ROW)
3234
.build();
3335

3436
abstract long getMaxStorageSizeInBytes();
@@ -39,6 +41,8 @@ abstract class EventStoreConfig {
3941

4042
abstract long getEventCleanUpAge();
4143

44+
abstract int getMaxBlobByteSizePerRow();
45+
4246
static EventStoreConfig.Builder builder() {
4347
return new AutoValue_EventStoreConfig.Builder();
4448
}
@@ -48,7 +52,8 @@ Builder toBuilder() {
4852
.setMaxStorageSizeInBytes(getMaxStorageSizeInBytes())
4953
.setLoadBatchSize(getLoadBatchSize())
5054
.setCriticalSectionEnterTimeoutMs(getCriticalSectionEnterTimeoutMs())
51-
.setEventCleanUpAge(getEventCleanUpAge());
55+
.setEventCleanUpAge(getEventCleanUpAge())
56+
.setMaxBlobByteSizePerRow(getMaxBlobByteSizePerRow());
5257
}
5358

5459
@AutoValue.Builder
@@ -61,6 +66,8 @@ abstract static class Builder {
6166

6267
abstract Builder setEventCleanUpAge(long value);
6368

69+
abstract Builder setMaxBlobByteSizePerRow(int value);
70+
6471
abstract EventStoreConfig build();
6572
}
6673
}

transport/transport-runtime/src/main/java/com/google/android/datatransport/runtime/scheduling/persistence/EventStoreModule.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,6 @@
1414

1515
package com.google.android.datatransport.runtime.scheduling.persistence;
1616

17-
import static com.google.android.datatransport.runtime.scheduling.persistence.SchemaManager.SCHEMA_VERSION;
18-
1917
import com.google.android.datatransport.runtime.synchronization.SynchronizationGuard;
2018
import dagger.Binds;
2119
import dagger.Module;
@@ -39,6 +37,12 @@ static EventStoreConfig storeConfig() {
3937
@Provides
4038
@Named("SCHEMA_VERSION")
4139
static int schemaVersion() {
42-
return SCHEMA_VERSION;
40+
return SchemaManager.SCHEMA_VERSION;
41+
}
42+
43+
@Provides
44+
@Named("SQLITE_DB_NAME")
45+
static String dbName() {
46+
return SchemaManager.DB_NAME;
4347
}
4448
}

transport/transport-runtime/src/main/java/com/google/android/datatransport/runtime/scheduling/persistence/SQLiteEventStore.java

Lines changed: 67 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@
5151
@Singleton
5252
@WorkerThread
5353
public class SQLiteEventStore implements EventStore, SynchronizationGuard {
54+
5455
private static final String LOG_TAG = "SQLiteEventStore";
5556

5657
static final int MAX_RETRIES = 10;
@@ -76,7 +77,8 @@ public class SQLiteEventStore implements EventStore, SynchronizationGuard {
7677
this.config = config;
7778
}
7879

79-
private SQLiteDatabase getDb() {
80+
@VisibleForTesting
81+
SQLiteDatabase getDb() {
8082
return retryIfDbLocked(
8183
schemaManager::getWritableDatabase,
8284
ex -> {
@@ -104,16 +106,37 @@ public PersistedEvent persist(TransportContext transportContext, EventInternal e
104106
}
105107

106108
long contextId = ensureTransportContext(db, transportContext);
109+
int maxBlobSizePerRow = config.getMaxBlobByteSizePerRow();
110+
111+
byte[] payloadBytes = event.getEncodedPayload().getBytes();
112+
boolean inline = payloadBytes.length <= maxBlobSizePerRow;
107113
ContentValues values = new ContentValues();
108114
values.put("context_id", contextId);
109115
values.put("transport_name", event.getTransportName());
110116
values.put("timestamp_ms", event.getEventMillis());
111117
values.put("uptime_ms", event.getUptimeMillis());
112118
values.put("payload_encoding", event.getEncodedPayload().getEncoding().getName());
113-
values.put("payload", event.getEncodedPayload().getBytes());
114119
values.put("code", event.getCode());
115120
values.put("num_attempts", 0);
121+
values.put("inline", inline);
122+
values.put("payload", inline ? payloadBytes : new byte[0]);
116123
long newEventId = db.insert("events", null, values);
124+
if (!inline) {
125+
int numChunks = (int) Math.ceil((double) payloadBytes.length / maxBlobSizePerRow);
126+
127+
for (int chunk = 1; chunk <= numChunks; chunk++) {
128+
byte[] chunkBytes =
129+
Arrays.copyOfRange(
130+
payloadBytes,
131+
(chunk - 1) * maxBlobSizePerRow,
132+
Math.min((chunk) * maxBlobSizePerRow, payloadBytes.length));
133+
ContentValues payloadValues = new ContentValues();
134+
payloadValues.put("event_id", newEventId);
135+
payloadValues.put("sequence_num", chunk);
136+
payloadValues.put("bytes", chunkBytes);
137+
db.insert("event_payloads", null, payloadValues);
138+
}
139+
}
117140

118141
// TODO: insert all with one sql query.
119142
for (Map.Entry<String, String> entry : event.getMetadata().entrySet()) {
@@ -358,7 +381,8 @@ private List<PersistedEvent> loadEvents(SQLiteDatabase db, TransportContext tran
358381
"uptime_ms",
359382
"payload_encoding",
360383
"payload",
361-
"code"
384+
"code",
385+
"inline",
362386
},
363387
"context_id = ?",
364388
new String[] {contextId.toString()},
@@ -369,13 +393,19 @@ private List<PersistedEvent> loadEvents(SQLiteDatabase db, TransportContext tran
369393
cursor -> {
370394
while (cursor.moveToNext()) {
371395
long id = cursor.getLong(0);
396+
boolean inline = cursor.getInt(7) != 0;
372397
EventInternal.Builder event =
373398
EventInternal.builder()
374399
.setTransportName(cursor.getString(1))
375400
.setEventMillis(cursor.getLong(2))
376-
.setUptimeMillis(cursor.getLong(3))
377-
.setEncodedPayload(
378-
new EncodedPayload(toEncoding(cursor.getString(4)), cursor.getBlob(5)));
401+
.setUptimeMillis(cursor.getLong(3));
402+
if (inline) {
403+
event.setEncodedPayload(
404+
new EncodedPayload(toEncoding(cursor.getString(4)), cursor.getBlob(5)));
405+
} else {
406+
event.setEncodedPayload(
407+
new EncodedPayload(toEncoding(cursor.getString(4)), readPayload(id)));
408+
}
379409
if (!cursor.isNull(6)) {
380410
event.setCode(cursor.getInt(6));
381411
}
@@ -386,6 +416,37 @@ private List<PersistedEvent> loadEvents(SQLiteDatabase db, TransportContext tran
386416
return events;
387417
}
388418

419+
private byte[] readPayload(long eventId) {
420+
return tryWithCursor(
421+
getDb()
422+
.query(
423+
"event_payloads",
424+
new String[] {"bytes"},
425+
"event_id = ?",
426+
new String[] {String.valueOf(eventId)},
427+
null,
428+
null,
429+
"sequence_num"),
430+
cursor -> {
431+
List<byte[]> chunks = new ArrayList<>();
432+
int totalLength = 0;
433+
while (cursor.moveToNext()) {
434+
byte[] chunk = cursor.getBlob(0);
435+
chunks.add(chunk);
436+
totalLength += chunk.length;
437+
}
438+
439+
byte[] payloadBytes = new byte[totalLength];
440+
int offset = 0;
441+
for (int i = 0; i < chunks.size(); i++) {
442+
byte[] chunk = chunks.get(i);
443+
System.arraycopy(chunk, 0, payloadBytes, offset, chunk.length);
444+
offset += chunk.length;
445+
}
446+
return payloadBytes;
447+
});
448+
}
449+
389450
private static Encoding toEncoding(@Nullable String value) {
390451
if (value == null) {
391452
return PROTOBUF_ENCODING;

transport/transport-runtime/src/main/java/com/google/android/datatransport/runtime/scheduling/persistence/SchemaManager.java

Lines changed: 22 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@
2525
final class SchemaManager extends SQLiteOpenHelper {
2626
// TODO: when we do schema upgrades in the future we need to make sure both downgrades and
2727
// upgrades work as expected, e.g. `up+down+up` is equivalent to `up`.
28-
private static final String DB_NAME = "com.google.android.datatransport.events";
28+
static final String DB_NAME = "com.google.android.datatransport.events";
2929
private final int schemaVersion;
3030
private boolean configured = false;
3131

@@ -73,7 +73,15 @@ final class SchemaManager extends SQLiteOpenHelper {
7373

7474
private static final String DROP_CONTEXTS_SQL = "DROP TABLE transport_contexts";
7575

76-
static int SCHEMA_VERSION = 3;
76+
private static final String CREATE_PAYLOADS_TABLE_V4 =
77+
"CREATE TABLE event_payloads "
78+
+ "(sequence_num INTEGER NOT NULL,"
79+
+ " event_id INTEGER NOT NULL,"
80+
+ " bytes BLOB NOT NULL,"
81+
+ "FOREIGN KEY (event_id) REFERENCES events(_id) ON DELETE CASCADE,"
82+
+ "PRIMARY KEY (sequence_num, event_id))";
83+
84+
static int SCHEMA_VERSION = 4;
7785

7886
private static final SchemaManager.Migration MIGRATE_TO_V1 =
7987
(db) -> {
@@ -95,12 +103,21 @@ final class SchemaManager extends SQLiteOpenHelper {
95103
private static final SchemaManager.Migration MIGRATE_TO_V3 =
96104
db -> db.execSQL("ALTER TABLE events ADD COLUMN payload_encoding TEXT");
97105

106+
private static final SchemaManager.Migration MIGRATE_TO_V4 =
107+
db -> {
108+
db.execSQL("ALTER TABLE events ADD COLUMN inline BOOLEAN NOT NULL DEFAULT 1");
109+
db.execSQL(CREATE_PAYLOADS_TABLE_V4);
110+
};
111+
98112
private static final List<Migration> INCREMENTAL_MIGRATIONS =
99-
Arrays.asList(MIGRATE_TO_V1, MIGRATE_TO_V2, MIGRATE_TO_V3);
113+
Arrays.asList(MIGRATE_TO_V1, MIGRATE_TO_V2, MIGRATE_TO_V3, MIGRATE_TO_V4);
100114

101115
@Inject
102-
SchemaManager(Context context, @Named("SCHEMA_VERSION") int schemaVersion) {
103-
super(context, DB_NAME, null, schemaVersion);
116+
SchemaManager(
117+
Context context,
118+
@Named("SQLITE_DB_NAME") String dbName,
119+
@Named("SCHEMA_VERSION") int schemaVersion) {
120+
super(context, dbName, null, schemaVersion);
104121
this.schemaVersion = schemaVersion;
105122
}
106123

0 commit comments

Comments
 (0)