Skip to content

Commit 3b9f7eb

Browse files
schmidt-sebastianscottcrossen
authored andcommitted
Untangle Datastore (#2971)
1 parent a123af9 commit 3b9f7eb

File tree

11 files changed

+308
-294
lines changed

11 files changed

+308
-294
lines changed

packages/firestore/src/core/firestore_client.ts

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ import { Document, MaybeDocument, NoDocument } from '../model/document';
2323
import { DocumentKey } from '../model/document_key';
2424
import { Mutation } from '../model/mutation';
2525
import { Platform } from '../platform/platform';
26-
import { Datastore } from '../remote/datastore';
26+
import { newDatastore } from '../remote/datastore';
2727
import { RemoteStore } from '../remote/remote_store';
2828
import { AsyncQueue } from '../util/async_queue';
2929
import { Code, FirestoreError } from '../util/error';
@@ -238,12 +238,7 @@ export class FirestoreClient {
238238
const serializer = this.platform.newSerializer(
239239
this.databaseInfo.databaseId
240240
);
241-
const datastore = new Datastore(
242-
this.asyncQueue,
243-
connection,
244-
this.credentials,
245-
serializer
246-
);
241+
const datastore = newDatastore(connection, this.credentials, serializer);
247242

248243
await componentProvider.initialize({
249244
asyncQueue: this.asyncQueue,

packages/firestore/src/core/transaction.ts

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,12 @@ import {
2626
Precondition,
2727
VerifyMutation
2828
} from '../model/mutation';
29-
import { Datastore } from '../remote/datastore';
30-
import { debugAssert, fail } from '../util/assert';
29+
import {
30+
Datastore,
31+
invokeBatchGetDocumentsRpc,
32+
invokeCommitRpc
33+
} from '../remote/datastore';
34+
import { fail, debugAssert } from '../util/assert';
3135
import { Code, FirestoreError } from '../util/error';
3236
import { SnapshotVersion } from './snapshot_version';
3337

@@ -66,7 +70,7 @@ export class Transaction {
6670
'Firestore transactions require all reads to be executed before all writes.'
6771
);
6872
}
69-
const docs = await this.datastore.lookup(keys);
73+
const docs = await invokeBatchGetDocumentsRpc(this.datastore, keys);
7074
docs.forEach(doc => {
7175
if (doc instanceof NoDocument || doc instanceof Document) {
7276
this.recordVersion(doc);
@@ -112,7 +116,7 @@ export class Transaction {
112116
unwritten.forEach((key, _version) => {
113117
this.mutations.push(new VerifyMutation(key, this.precondition(key)));
114118
});
115-
await this.datastore.commit(this.mutations);
119+
await invokeCommitRpc(this.datastore, this.mutations);
116120
this.committed = true;
117121
}
118122

packages/firestore/src/protos/firestore_proto_api.d.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -141,6 +141,7 @@ export declare namespace firestoreV1ApiClientInterfaces {
141141
values?: Value[];
142142
}
143143
interface BatchGetDocumentsRequest {
144+
database?: string;
144145
documents?: string[];
145146
mask?: DocumentMask;
146147
transaction?: string;
@@ -164,6 +165,7 @@ export declare namespace firestoreV1ApiClientInterfaces {
164165
allDescendants?: boolean;
165166
}
166167
interface CommitRequest {
168+
database?: string;
167169
writes?: Write[];
168170
transaction?: string;
169171
}

packages/firestore/src/remote/datastore.ts

Lines changed: 107 additions & 91 deletions
Original file line numberDiff line numberDiff line change
@@ -16,113 +16,44 @@
1616
*/
1717

1818
import { CredentialsProvider } from '../api/credentials';
19-
import { maybeDocumentMap } from '../model/collections';
2019
import { MaybeDocument } from '../model/document';
2120
import { DocumentKey } from '../model/document_key';
2221
import { Mutation, MutationResult } from '../model/mutation';
2322
import * as api from '../protos/firestore_proto_api';
24-
import { hardAssert } from '../util/assert';
25-
import { AsyncQueue } from '../util/async_queue';
23+
import { debugCast, hardAssert } from '../util/assert';
2624
import { Code, FirestoreError } from '../util/error';
2725
import { Connection } from './connection';
26+
import { JsonProtoSerializer } from './serializer';
2827
import {
29-
WatchStreamListener,
30-
WriteStreamListener,
3128
PersistentListenStream,
32-
PersistentWriteStream
29+
PersistentWriteStream,
30+
WatchStreamListener,
31+
WriteStreamListener
3332
} from './persistent_stream';
33+
import { AsyncQueue } from '../util/async_queue';
3434

35-
import { JsonProtoSerializer } from './serializer';
36-
37-
// The generated proto interfaces for these class are missing the database
38-
// field. So we add it here.
39-
// TODO(b/36015800): Remove this once the api generator is fixed.
40-
interface BatchGetDocumentsRequest extends api.BatchGetDocumentsRequest {
41-
database?: string;
42-
}
43-
interface CommitRequest extends api.CommitRequest {
44-
database?: string;
45-
}
35+
/**
36+
* Datastore and its related methods are a wrapper around the external Google
37+
* Cloud Datastore grpc API, which provides an interface that is more convenient
38+
* for the rest of the client SDK architecture to consume.
39+
*/
40+
export class Datastore {}
4641

4742
/**
48-
* Datastore is a wrapper around the external Google Cloud Datastore grpc API,
49-
* which provides an interface that is more convenient for the rest of the
50-
* client SDK architecture to consume.
43+
* An implementation of Datastore that exposes additional state for internal
44+
* consumption.
5145
*/
52-
export class Datastore {
46+
class DatastoreImpl extends Datastore {
5347
constructor(
54-
private queue: AsyncQueue,
55-
private connection: Connection,
56-
private credentials: CredentialsProvider,
57-
private serializer: JsonProtoSerializer
58-
) {}
59-
60-
newPersistentWriteStream(
61-
listener: WriteStreamListener
62-
): PersistentWriteStream {
63-
return new PersistentWriteStream(
64-
this.queue,
65-
this.connection,
66-
this.credentials,
67-
this.serializer,
68-
listener
69-
);
70-
}
71-
72-
newPersistentWatchStream(
73-
listener: WatchStreamListener
74-
): PersistentListenStream {
75-
return new PersistentListenStream(
76-
this.queue,
77-
this.connection,
78-
this.credentials,
79-
this.serializer,
80-
listener
81-
);
82-
}
83-
84-
commit(mutations: Mutation[]): Promise<MutationResult[]> {
85-
const params: CommitRequest = {
86-
database: this.serializer.encodedDatabaseId,
87-
writes: mutations.map(m => this.serializer.toMutation(m))
88-
};
89-
return this.invokeRPC<CommitRequest, api.CommitResponse>(
90-
'Commit',
91-
params
92-
).then(response => {
93-
return this.serializer.fromWriteResults(
94-
response.writeResults,
95-
response.commitTime
96-
);
97-
});
98-
}
99-
100-
lookup(keys: DocumentKey[]): Promise<MaybeDocument[]> {
101-
const params: BatchGetDocumentsRequest = {
102-
database: this.serializer.encodedDatabaseId,
103-
documents: keys.map(k => this.serializer.toName(k))
104-
};
105-
return this.invokeStreamingRPC<
106-
BatchGetDocumentsRequest,
107-
api.BatchGetDocumentsResponse
108-
>('BatchGetDocuments', params).then(response => {
109-
let docs = maybeDocumentMap();
110-
response.forEach(proto => {
111-
const doc = this.serializer.fromMaybeDocument(proto);
112-
docs = docs.insert(doc.key, doc);
113-
});
114-
const result: MaybeDocument[] = [];
115-
keys.forEach(key => {
116-
const doc = docs.get(key);
117-
hardAssert(!!doc, 'Missing entity in write response for ' + key);
118-
result.push(doc);
119-
});
120-
return result;
121-
});
48+
public readonly connection: Connection,
49+
public readonly credentials: CredentialsProvider,
50+
public readonly serializer: JsonProtoSerializer
51+
) {
52+
super();
12253
}
12354

12455
/** Gets an auth token and invokes the provided RPC. */
125-
private invokeRPC<Req, Resp>(rpcName: string, request: Req): Promise<Resp> {
56+
invokeRPC<Req, Resp>(rpcName: string, request: Req): Promise<Resp> {
12657
return this.credentials
12758
.getToken()
12859
.then(token => {
@@ -137,7 +68,7 @@ export class Datastore {
13768
}
13869

13970
/** Gets an auth token and invokes the provided RPC with streamed results. */
140-
private invokeStreamingRPC<Req, Resp>(
71+
invokeStreamingRPC<Req, Resp>(
14172
rpcName: string,
14273
request: Req
14374
): Promise<Resp[]> {
@@ -158,3 +89,88 @@ export class Datastore {
15889
});
15990
}
16091
}
92+
93+
export function newDatastore(
94+
connection: Connection,
95+
credentials: CredentialsProvider,
96+
serializer: JsonProtoSerializer
97+
): Datastore {
98+
return new DatastoreImpl(connection, credentials, serializer);
99+
}
100+
101+
export async function invokeCommitRpc(
102+
datastore: Datastore,
103+
mutations: Mutation[]
104+
): Promise<MutationResult[]> {
105+
const datastoreImpl = debugCast(datastore, DatastoreImpl);
106+
const params = {
107+
database: datastoreImpl.serializer.encodedDatabaseId,
108+
writes: mutations.map(m => datastoreImpl.serializer.toMutation(m))
109+
};
110+
const response = await datastoreImpl.invokeRPC<
111+
api.CommitRequest,
112+
api.CommitResponse
113+
>('Commit', params);
114+
return datastoreImpl.serializer.fromWriteResults(
115+
response.writeResults,
116+
response.commitTime
117+
);
118+
}
119+
120+
export async function invokeBatchGetDocumentsRpc(
121+
datastore: Datastore,
122+
keys: DocumentKey[]
123+
): Promise<MaybeDocument[]> {
124+
const datastoreImpl = debugCast(datastore, DatastoreImpl);
125+
const params = {
126+
database: datastoreImpl.serializer.encodedDatabaseId,
127+
documents: keys.map(k => datastoreImpl.serializer.toName(k))
128+
};
129+
const response = await datastoreImpl.invokeStreamingRPC<
130+
api.BatchGetDocumentsRequest,
131+
api.BatchGetDocumentsResponse
132+
>('BatchGetDocuments', params);
133+
134+
const docs = new Map<string, MaybeDocument>();
135+
response.forEach(proto => {
136+
const doc = datastoreImpl.serializer.fromMaybeDocument(proto);
137+
docs.set(doc.key.toString(), doc);
138+
});
139+
const result: MaybeDocument[] = [];
140+
keys.forEach(key => {
141+
const doc = docs.get(key.toString());
142+
hardAssert(!!doc, 'Missing entity in write response for ' + key);
143+
result.push(doc);
144+
});
145+
return result;
146+
}
147+
148+
export function newPersistentWriteStream(
149+
datastore: Datastore,
150+
queue: AsyncQueue,
151+
listener: WriteStreamListener
152+
): PersistentWriteStream {
153+
const datastoreImpl = debugCast(datastore, DatastoreImpl);
154+
return new PersistentWriteStream(
155+
queue,
156+
datastoreImpl.connection,
157+
datastoreImpl.credentials,
158+
datastoreImpl.serializer,
159+
listener
160+
);
161+
}
162+
163+
export function newPersistentWatchStream(
164+
datastore: Datastore,
165+
queue: AsyncQueue,
166+
listener: WatchStreamListener
167+
): PersistentListenStream {
168+
const datastoreImpl = debugCast(datastore, DatastoreImpl);
169+
return new PersistentListenStream(
170+
queue,
171+
datastoreImpl.connection,
172+
datastoreImpl.credentials,
173+
datastoreImpl.serializer,
174+
listener
175+
);
176+
}

packages/firestore/src/remote/remote_store.ts

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,11 @@ import { logDebug } from '../util/log';
3232
import { DocumentKeySet } from '../model/collections';
3333
import { AsyncQueue } from '../util/async_queue';
3434
import { ConnectivityMonitor, NetworkStatus } from './connectivity_monitor';
35-
import { Datastore } from './datastore';
35+
import {
36+
Datastore,
37+
newPersistentWatchStream,
38+
newPersistentWriteStream
39+
} from './datastore';
3640
import { OnlineStateTracker } from './online_state_tracker';
3741
import {
3842
PersistentListenStream,
@@ -151,13 +155,13 @@ export class RemoteStore implements TargetMetadataProvider {
151155
);
152156

153157
// Create streams (but note they're not started yet).
154-
this.watchStream = this.datastore.newPersistentWatchStream({
158+
this.watchStream = newPersistentWatchStream(this.datastore, asyncQueue, {
155159
onOpen: this.onWatchStreamOpen.bind(this),
156160
onClose: this.onWatchStreamClose.bind(this),
157161
onWatchChange: this.onWatchStreamChange.bind(this)
158162
});
159163

160-
this.writeStream = this.datastore.newPersistentWriteStream({
164+
this.writeStream = newPersistentWriteStream(this.datastore, asyncQueue, {
161165
onOpen: this.onWriteStreamOpen.bind(this),
162166
onClose: this.onWriteStreamClose.bind(this),
163167
onHandshakeComplete: this.onWriteHandshakeComplete.bind(this),

packages/firestore/src/util/assert.ts

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,3 +70,19 @@ export function debugAssert(
7070
fail(message);
7171
}
7272
}
73+
74+
/**
75+
* Casts `obj` to `T`. In non-production builds, verifies that `obj` is an
76+
* instance of `T` before casting.
77+
*/
78+
export function debugCast<T>(
79+
obj: object,
80+
// eslint-disable-next-line @typescript-eslint/no-explicit-any
81+
constructor: { new (...args: any[]): T }
82+
): T {
83+
debugAssert(
84+
obj instanceof constructor,
85+
`Expected type '${constructor.name}', but was '${obj.constructor.name}'`
86+
);
87+
return obj as T;
88+
}

0 commit comments

Comments
 (0)