Skip to content

Commit 074f520

Browse files
authored
C++ migration: implement WatchStream (#1770)
This is a fairly close port of `FSTStream`/ stream on other platforms. Comments are mostly taken from Web client.
1 parent d195f6e commit 074f520

File tree

13 files changed

+1439
-9
lines changed

13 files changed

+1439
-9
lines changed

Firestore/Example/Firestore.xcodeproj/project.pbxproj

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,8 @@
2727
0535C1B65DADAE1CE47FA3CA /* string_format_apple_test.mm in Sources */ = {isa = PBXBuildFile; fileRef = 9CFD366B783AE27B9E79EE7A /* string_format_apple_test.mm */; };
2828
132E3E53179DE287D875F3F2 /* FSTLevelDBTransactionTests.mm in Sources */ = {isa = PBXBuildFile; fileRef = 132E36BB104830BD806351AC /* FSTLevelDBTransactionTests.mm */; };
2929
132E3EE56C143B2C9ACB6187 /* FSTLevelDBBenchmarkTests.mm in Sources */ = {isa = PBXBuildFile; fileRef = 132E3BB3D5C42282B4ACFB20 /* FSTLevelDBBenchmarkTests.mm */; };
30-
162B29FF814B3E21B95E3156 /* grpc_stream_tester.cc in Sources */ = {isa = PBXBuildFile; fileRef = 3564F6872205C4F7001D5436 /* grpc_stream_tester.cc */; };
3130
1CAA9012B25F975D445D5978 /* strerror_test.cc in Sources */ = {isa = PBXBuildFile; fileRef = 358C3B5FE573B1D60A4F7592 /* strerror_test.cc */; };
31+
333FCB7BB0C9986B5DF28FC8 /* grpc_stream_tester.cc in Sources */ = {isa = PBXBuildFile; fileRef = B1A7E1959AF8141FA7E6B888 /* grpc_stream_tester.cc */; };
3232
36FD4CE79613D18BC783C55B /* string_apple_test.mm in Sources */ = {isa = PBXBuildFile; fileRef = 0EE5300F8233D14025EF0456 /* string_apple_test.mm */; };
3333
3B843E4C1F3A182900548890 /* remote_store_spec_test.json in Resources */ = {isa = PBXBuildFile; fileRef = 3B843E4A1F3930A400548890 /* remote_store_spec_test.json */; };
3434
4AA4ABE36065DB79CD76DD8D /* Pods_Firestore_Benchmarks_iOS.framework in Frameworks */ = {isa = PBXBuildFile; fileRef = F694C3CE4B77B3C0FA4BBA53 /* Pods_Firestore_Benchmarks_iOS.framework */; };
@@ -199,6 +199,7 @@
199199
ABF6506C201131F8005F2C74 /* timestamp_test.cc in Sources */ = {isa = PBXBuildFile; fileRef = ABF6506B201131F8005F2C74 /* timestamp_test.cc */; };
200200
B6152AD7202A53CB000E5744 /* document_key_test.cc in Sources */ = {isa = PBXBuildFile; fileRef = B6152AD5202A5385000E5744 /* document_key_test.cc */; };
201201
B65D34A9203C995B0076A5E1 /* FIRTimestampTest.m in Sources */ = {isa = PBXBuildFile; fileRef = B65D34A7203C99090076A5E1 /* FIRTimestampTest.m */; };
202+
B66D8996213609EE0086DA0C /* stream_test.mm in Sources */ = {isa = PBXBuildFile; fileRef = B66D8995213609EE0086DA0C /* stream_test.mm */; };
202203
B686F2AF2023DDEE0028D6BE /* field_path_test.cc in Sources */ = {isa = PBXBuildFile; fileRef = B686F2AD2023DDB20028D6BE /* field_path_test.cc */; };
203204
B686F2B22025000D0028D6BE /* resource_path_test.cc in Sources */ = {isa = PBXBuildFile; fileRef = B686F2B02024FFD70028D6BE /* resource_path_test.cc */; };
204205
B6BBE43121262CF400C6A53E /* grpc_stream_test.cc in Sources */ = {isa = PBXBuildFile; fileRef = B6BBE42F21262CF400C6A53E /* grpc_stream_test.cc */; };
@@ -421,7 +422,6 @@
421422
54E9282A1F339CAD00C1953E /* XCTestCase+Await.h */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.c.h; path = "XCTestCase+Await.h"; sourceTree = "<group>"; };
422423
54EB764C202277B30088B8F3 /* array_sorted_map_test.cc */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.cpp.cpp; path = array_sorted_map_test.cc; sourceTree = "<group>"; };
423424
5918805E993304321A05E82B /* Pods_Firestore_Example_iOS.framework */ = {isa = PBXFileReference; explicitFileType = wrapper.framework; includeInIndex = 0; path = Pods_Firestore_Example_iOS.framework; sourceTree = BUILT_PRODUCTS_DIR; };
424-
5AF7BEEF4494840A0AF2DC0D /* grpc_stream_tester.h */ = {isa = PBXFileReference; includeInIndex = 1; lastKnownFileType = sourcecode.c.h; path = grpc_stream_tester.h; sourceTree = "<group>"; };
425425
5CAE131920FFFED600BE9A4A /* Firestore_Benchmarks_iOS.xctest */ = {isa = PBXFileReference; explicitFileType = wrapper.cfbundle; includeInIndex = 0; path = Firestore_Benchmarks_iOS.xctest; sourceTree = BUILT_PRODUCTS_DIR; };
426426
5CAE131D20FFFED600BE9A4A /* Info.plist */ = {isa = PBXFileReference; lastKnownFileType = text.plist.xml; path = Info.plist; sourceTree = "<group>"; };
427427
5CC9650120A0E93200A2D6A1 /* FSTLRUGarbageCollectorTests.h */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.c.h; path = FSTLRUGarbageCollectorTests.h; sourceTree = "<group>"; };
@@ -507,9 +507,11 @@
507507
ABC1D7DF2023A3EF00BA84F0 /* token_test.cc */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.cpp.cpp; path = token_test.cc; sourceTree = "<group>"; };
508508
ABC1D7E22023CDC500BA84F0 /* firebase_credentials_provider_test.mm */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.cpp.objcpp; path = firebase_credentials_provider_test.mm; sourceTree = "<group>"; };
509509
ABF6506B201131F8005F2C74 /* timestamp_test.cc */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.cpp.cpp; path = timestamp_test.cc; sourceTree = "<group>"; };
510+
B1A7E1959AF8141FA7E6B888 /* grpc_stream_tester.cc */ = {isa = PBXFileReference; includeInIndex = 1; path = grpc_stream_tester.cc; sourceTree = "<group>"; };
510511
B3F5B3AAE791A5911B9EAA82 /* Pods-Firestore_Tests_iOS.release.xcconfig */ = {isa = PBXFileReference; includeInIndex = 1; lastKnownFileType = text.xcconfig; name = "Pods-Firestore_Tests_iOS.release.xcconfig"; path = "Pods/Target Support Files/Pods-Firestore_Tests_iOS/Pods-Firestore_Tests_iOS.release.xcconfig"; sourceTree = "<group>"; };
511512
B6152AD5202A5385000E5744 /* document_key_test.cc */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.cpp.cpp; path = document_key_test.cc; sourceTree = "<group>"; };
512513
B65D34A7203C99090076A5E1 /* FIRTimestampTest.m */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.c.objc; path = FIRTimestampTest.m; sourceTree = "<group>"; };
514+
B66D8995213609EE0086DA0C /* stream_test.mm */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.cpp.objcpp; path = stream_test.mm; sourceTree = "<group>"; };
513515
B686F2AD2023DDB20028D6BE /* field_path_test.cc */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.cpp.cpp; path = field_path_test.cc; sourceTree = "<group>"; };
514516
B686F2B02024FFD70028D6BE /* resource_path_test.cc */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.cpp.cpp; path = resource_path_test.cc; sourceTree = "<group>"; };
515517
B6BBE42F21262CF400C6A53E /* grpc_stream_test.cc */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.cpp.cpp; path = grpc_stream_test.cc; sourceTree = "<group>"; };
@@ -545,6 +547,7 @@
545547
DE51B1A71F0D48AC0013853F /* README.md */ = {isa = PBXFileReference; lastKnownFileType = net.daringfireball.markdown; path = README.md; sourceTree = "<group>"; };
546548
E592181BFD7C53C305123739 /* Pods-Firestore_Tests_iOS.debug.xcconfig */ = {isa = PBXFileReference; includeInIndex = 1; lastKnownFileType = text.xcconfig; name = "Pods-Firestore_Tests_iOS.debug.xcconfig"; path = "Pods/Target Support Files/Pods-Firestore_Tests_iOS/Pods-Firestore_Tests_iOS.debug.xcconfig"; sourceTree = "<group>"; };
547549
ECEBABC7E7B693BE808A1052 /* Pods_Firestore_IntegrationTests_iOS.framework */ = {isa = PBXFileReference; explicitFileType = wrapper.framework; includeInIndex = 0; path = Pods_Firestore_IntegrationTests_iOS.framework; sourceTree = BUILT_PRODUCTS_DIR; };
550+
ED4B3E3EA0EBF3ED19A07060 /* grpc_stream_tester.h */ = {isa = PBXFileReference; includeInIndex = 1; lastKnownFileType = sourcecode.c.h; path = grpc_stream_tester.h; sourceTree = "<group>"; };
548551
F354C0FE92645B56A6C6FD44 /* Pods-Firestore_IntegrationTests_iOS.release.xcconfig */ = {isa = PBXFileReference; includeInIndex = 1; lastKnownFileType = text.xcconfig; name = "Pods-Firestore_IntegrationTests_iOS.release.xcconfig"; path = "Pods/Target Support Files/Pods-Firestore_IntegrationTests_iOS/Pods-Firestore_IntegrationTests_iOS.release.xcconfig"; sourceTree = "<group>"; };
549552
F51859B394D01C0C507282F1 /* filesystem_test.cc */ = {isa = PBXFileReference; includeInIndex = 1; lastKnownFileType = sourcecode.cpp.cpp; path = filesystem_test.cc; sourceTree = "<group>"; };
550553
F694C3CE4B77B3C0FA4BBA53 /* Pods_Firestore_Benchmarks_iOS.framework */ = {isa = PBXFileReference; explicitFileType = wrapper.framework; includeInIndex = 0; path = Pods_Firestore_Benchmarks_iOS.framework; sourceTree = BUILT_PRODUCTS_DIR; };
@@ -650,6 +653,7 @@
650653
B6D1B68420E2AB1A00B35856 /* exponential_backoff_test.cc */,
651654
B6BBE42F21262CF400C6A53E /* grpc_stream_test.cc */,
652655
61F72C5520BC48FD001A68CB /* serializer_test.cc */,
656+
B66D8995213609EE0086DA0C /* stream_test.mm */,
653657
);
654658
path = remote;
655659
sourceTree = "<group>";
@@ -670,8 +674,8 @@
670674
B6FB4688208F9B9100554BA2 /* executor_test.cc */,
671675
B6FB468A208F9B9100554BA2 /* executor_test.h */,
672676
F51859B394D01C0C507282F1 /* filesystem_test.cc */,
673-
3564F6872205C4F7001D5436 /* grpc_stream_tester.cc */,
674-
5AF7BEEF4494840A0AF2DC0D /* grpc_stream_tester.h */,
677+
B1A7E1959AF8141FA7E6B888 /* grpc_stream_tester.cc */,
678+
ED4B3E3EA0EBF3ED19A07060 /* grpc_stream_tester.h */,
675679
444B7AB3F5A2929070CB1363 /* hard_assert_test.cc */,
676680
54511E8D209805F8005BD28F /* hashing_test.cc */,
677681
54A0353420A3D8CB003E0143 /* iterator_adaptors_test.cc */,
@@ -1900,7 +1904,7 @@
19001904
618BBEAA20B89AAC00B5BCE7 /* firestore.pb.cc in Sources */,
19011905
AB7BAB342012B519001E0872 /* geo_point_test.cc in Sources */,
19021906
B6BBE43121262CF400C6A53E /* grpc_stream_test.cc in Sources */,
1903-
162B29FF814B3E21B95E3156 /* grpc_stream_tester.cc in Sources */,
1907+
333FCB7BB0C9986B5DF28FC8 /* grpc_stream_tester.cc in Sources */,
19041908
73FE5066020EF9B2892C86BF /* hard_assert_test.cc in Sources */,
19051909
54511E8E209805F8005BD28F /* hashing_test.cc in Sources */,
19061910
618BBEB020B89AAC00B5BCE7 /* http.pb.cc in Sources */,
@@ -1928,6 +1932,7 @@
19281932
618BBEB120B89AAC00B5BCE7 /* status.pb.cc in Sources */,
19291933
54A0352F20A3B3D8003E0143 /* status_test.cc in Sources */,
19301934
54A0353020A3B3D8003E0143 /* statusor_test.cc in Sources */,
1935+
B66D8996213609EE0086DA0C /* stream_test.mm in Sources */,
19311936
1CAA9012B25F975D445D5978 /* strerror_test.cc in Sources */,
19321937
36FD4CE79613D18BC783C55B /* string_apple_test.mm in Sources */,
19331938
0535C1B65DADAE1CE47FA3CA /* string_format_apple_test.mm in Sources */,

Firestore/core/src/firebase/firestore/model/types.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,10 @@
1919

2020
#include <cstdint>
2121

22+
#if defined(__OBJC__)
23+
#import <Foundation/Foundation.h>
24+
#endif
25+
2226
namespace firebase {
2327
namespace firestore {
2428
namespace model {
Lines changed: 242 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,242 @@
1+
/*
2+
* Copyright 2018 Google
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
#ifndef FIRESTORE_CORE_SRC_FIREBASE_FIRESTORE_REMOTE_STREAM_H_
18+
#define FIRESTORE_CORE_SRC_FIREBASE_FIRESTORE_REMOTE_STREAM_H_
19+
20+
#include <memory>
21+
#include <string>
22+
23+
#include "Firestore/core/src/firebase/firestore/auth/credentials_provider.h"
24+
#include "Firestore/core/src/firebase/firestore/auth/token.h"
25+
#include "Firestore/core/src/firebase/firestore/remote/datastore.h"
26+
#include "Firestore/core/src/firebase/firestore/remote/exponential_backoff.h"
27+
#include "Firestore/core/src/firebase/firestore/remote/grpc_completion.h"
28+
#include "Firestore/core/src/firebase/firestore/remote/grpc_stream.h"
29+
#include "Firestore/core/src/firebase/firestore/remote/stream_objc_bridge.h"
30+
#include "Firestore/core/src/firebase/firestore/util/async_queue.h"
31+
#include "Firestore/core/src/firebase/firestore/util/status.h"
32+
#include "Firestore/core/src/firebase/firestore/util/statusor.h"
33+
#include "absl/strings/string_view.h"
34+
#include "grpcpp/support/byte_buffer.h"
35+
36+
namespace firebase {
37+
namespace firestore {
38+
namespace remote {
39+
40+
/**
41+
* A `Stream` is an abstract base class that represents a bidirectional
42+
* streaming connection to the Firestore backend. It's built on top of gRPC C++
43+
* library and adds several critical features for our clients:
44+
*
45+
* - Exponential backoff on failure (independent of the gRPC mechanism)
46+
* - Authentication via CredentialsProvider
47+
* - Dispatching all callbacks into the shared Firestore async queue
48+
* - Closing idle streams after 60 seconds of inactivity
49+
*
50+
* Subclasses of `Stream`:
51+
*
52+
* - Implement serialization and deserialization of protocol buffers
53+
* - Notify their delegate about stream open/read/error events
54+
* - Create and finish the underlying gRPC streams.
55+
*
56+
* ## Starting and Stopping
57+
*
58+
* Streams are stateful and need to be `Start`ed before messages can
59+
* be sent and received. A `Stream` can be started and stopped repeatedly.
60+
*/
61+
class Stream : public GrpcStreamObserver,
62+
public std::enable_shared_from_this<Stream> {
63+
public:
64+
/**
65+
* `Stream` can be in one of 5 states (each described in detail below)
66+
* shown in the following state transition diagram:
67+
*
68+
* Start() called auth & connection succeeded
69+
* INITIAL ----------------> STARTING -----------------------------> OPEN
70+
* ^ | |
71+
* | | error occurred |
72+
* | \-----------------------------v-----/
73+
* | |
74+
* backoff | |
75+
* elapsed | Start() called |
76+
* \--- BACKOFF <---------------- ERROR
77+
*
78+
* [any state] --------------------------> INITIAL
79+
* Stop() called or
80+
* idle timer expired
81+
*/
82+
enum class State {
83+
/**
84+
* The stream is not yet running and there's no error condition.
85+
* Calling `Start` will start the stream immediately without backoff.
86+
* While in this state, `IsStarted` will return false.
87+
*/
88+
Initial,
89+
90+
/**
91+
* The stream is starting, either waiting for an auth token or for the
92+
* stream to successfully open. While in this state, `IsStarted` will
93+
* return true but `IsOpen` will return false.
94+
*/
95+
Starting,
96+
97+
/**
98+
* The stream is up and running. Requests and responses can flow
99+
* freely. Both `IsStarted` and `IsOpen` will return true.
100+
*/
101+
Open,
102+
103+
/**
104+
* The stream encountered an error. The next start attempt will back off.
105+
* While in this state, `IsStarted` will return false.
106+
*/
107+
Error,
108+
109+
/**
110+
* An in-between state after an error where the stream is waiting before
111+
* re-starting. After waiting is complete, the stream will try to open.
112+
* While in this state, `IsStarted` will return true but `IsOpen` will
113+
* return false.
114+
*/
115+
Backoff
116+
};
117+
118+
Stream(util::AsyncQueue* async_queue,
119+
auth::CredentialsProvider* credentials_provider,
120+
Datastore* datastore,
121+
util::TimerId backoff_timer_id,
122+
util::TimerId idle_timer_id);
123+
124+
/**
125+
* Starts the stream. Only allowed if `IsStarted` returns false. The stream is
126+
* not immediately ready for use: `OnStreamStart` will be invoked when the
127+
* stream is ready for outbound requests, at which point `IsOpen` will return
128+
* true.
129+
*
130+
* When start returns, `IsStarted` will return true.
131+
*/
132+
void Start();
133+
134+
/**
135+
* Stops the stream. This call is idempotent and allowed regardless of the
136+
* current `IsStarted` state.
137+
*
138+
* When stop returns, `IsStarted` and `IsOpen` will both return false.
139+
*/
140+
void Stop();
141+
142+
/**
143+
* Returns true if `Start` has been called and no error has occurred. True
144+
* indicates the stream is open or in the process of opening (which
145+
* encompasses respecting backoff, getting auth tokens, and starting the
146+
* actual stream). Use `IsOpen` to determine if the stream is open and ready
147+
* for outbound requests.
148+
*/
149+
bool IsStarted() const;
150+
151+
/**
152+
* Returns true if the underlying stream is open (`OnStreamStart` has been
153+
* called) and the stream is ready for outbound requests.
154+
*/
155+
bool IsOpen() const;
156+
157+
/**
158+
* After an error, the stream will usually back off on the next attempt to
159+
* start it. If the error warrants an immediate restart of the stream, the
160+
* caller can use this to indicate that the stream should not back off.
161+
*
162+
* Each error will call `OnStreamClose`. That function can decide to
163+
* cancel backoff if required.
164+
*/
165+
void InhibitBackoff();
166+
167+
/**
168+
* Marks this stream as idle. If no further actions are performed on the
169+
* stream for one minute, the stream will automatically close itself and
170+
* notify the stream's `OnClose` handler with Status::OK. The stream will then
171+
* be in a non-started state, requiring the caller to start the stream again
172+
* before further use.
173+
*
174+
* Only streams that are in state 'Open' can be marked idle, as all other
175+
* states imply pending network operations.
176+
*/
177+
void MarkIdle();
178+
179+
/**
180+
* Marks the stream as active again, preventing auto-closing of the stream.
181+
* Can be called from any state -- if the stream is not in state `Open`, this
182+
* is a no-op.
183+
*/
184+
void CancelIdleCheck();
185+
186+
// `GrpcStreamObserver` interface -- do not use.
187+
void OnStreamStart() override;
188+
void OnStreamRead(const grpc::ByteBuffer& message) override;
189+
void OnStreamError(const util::Status& status) override;
190+
191+
protected:
192+
// `Stream` expects all its methods to be called on the worker queue.
193+
void EnsureOnQueue() const;
194+
void Write(grpc::ByteBuffer&& message);
195+
std::string GetDebugDescription() const;
196+
197+
ExponentialBackoff backoff_;
198+
199+
private:
200+
// The interface for the derived classes.
201+
202+
virtual std::unique_ptr<GrpcStream> CreateGrpcStream(
203+
Datastore* datastore, absl::string_view token) = 0;
204+
virtual void TearDown(GrpcStream* stream) = 0;
205+
virtual void NotifyStreamOpen() = 0;
206+
virtual util::Status NotifyStreamResponse(
207+
const grpc::ByteBuffer& message) = 0;
208+
virtual void NotifyStreamClose(const util::Status& status) = 0;
209+
// PORTING NOTE: C++ cannot rely on RTTI, unlike other platforms.
210+
virtual std::string GetDebugName() const = 0;
211+
212+
void Close(const util::Status& status);
213+
void HandleErrorStatus(const util::Status& status);
214+
215+
void RequestCredentials();
216+
void ResumeStartWithCredentials(
217+
const util::StatusOr<auth::Token>& maybe_token);
218+
219+
void BackoffAndTryRestarting();
220+
void StopDueToIdleness();
221+
222+
State state_ = State::Initial;
223+
224+
std::unique_ptr<GrpcStream> grpc_stream_;
225+
226+
auth::CredentialsProvider* credentials_provider_ = nullptr;
227+
util::AsyncQueue* worker_queue_ = nullptr;
228+
Datastore* datastore_ = nullptr;
229+
230+
util::TimerId idle_timer_id_{};
231+
util::DelayedOperation idleness_timer_;
232+
233+
// Used to prevent auth if the stream happens to be restarted before token is
234+
// received.
235+
int close_count_ = 0;
236+
};
237+
238+
} // namespace remote
239+
} // namespace firestore
240+
} // namespace firebase
241+
242+
#endif // FIRESTORE_CORE_SRC_FIREBASE_FIRESTORE_REMOTE_STREAM_H_

0 commit comments

Comments
 (0)