Skip to content

Query get API for RTDB #2087

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 10 commits into from
Nov 13, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3419,15 +3419,11 @@ private static FirebaseApp appForDatabaseUrl(String url, String name) {
}

@Test
public void emptyQueryGet() throws DatabaseException, InterruptedException {
public void emptyQueryGet() throws DatabaseException, InterruptedException, ExecutionException {
FirebaseApp app =
appForDatabaseUrl(IntegrationTestValues.getNamespace(), UUID.randomUUID().toString());
FirebaseDatabase db = FirebaseDatabase.getInstance(app);
try {
assertNull(Tasks.await(db.getReference(UUID.randomUUID().toString()).get()).getValue());
} catch (ExecutionException e) {
fail(e.getMessage());
}
assertNull(Tasks.await(db.getReference(UUID.randomUUID().toString()).get()).getValue());
}

@Test
Expand Down Expand Up @@ -3524,11 +3520,7 @@ public void onCancelled(@NonNull DatabaseError error) {}
write = new WriteFuture(writer, 43L);
assertNull(write.timedGet());

try {
assertEquals(43L, Tasks.await(reader.get()).getValue());
} catch (ExecutionException e) {
fail(e.getMessage());
}
assertEquals(43L, Tasks.await(reader.get()).getValue());
}

@Test
Expand All @@ -3551,19 +3543,7 @@ public void getUpdatesPersistenceCacheWhenEnabled()
readerDb.goOffline();

Semaphore semaphore = new Semaphore(0);
reader.addListenerForSingleValueEvent(
new ValueEventListener() {
@Override
public void onDataChange(@NonNull DataSnapshot snapshot) {
if (snapshot.getValue() != null && snapshot.getValue().equals(42L)) {
semaphore.release();
}
}

@Override
public void onCancelled(@NonNull DatabaseError error) {}
});
IntegrationTestHelpers.waitFor(semaphore);
assertNotNull(ReadFuture.untilEquals(reader, 42L).timedGet());
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

public class PersistentConnectionImpl implements Connection.Delegate, PersistentConnection {

Expand Down Expand Up @@ -114,17 +113,17 @@ public String toString() {
}

private static class OutstandingGet {
private Map<String, Object> request;
private ConnectionRequestCallback onComplete;
private String action;
private AtomicBoolean sent;
private final Map<String, Object> request;
private final ConnectionRequestCallback onComplete;
private final String action;
private boolean sent;

private OutstandingGet(
String action, Map<String, Object> request, ConnectionRequestCallback onComplete) {
this.action = action;
this.request = request;
this.onComplete = onComplete;
this.sent = new AtomicBoolean(false);
this.sent = false;
}

private String getAction() {
Expand All @@ -140,11 +139,12 @@ private Map<String, Object> getRequest() {
}

private boolean markSent() {
return sent.compareAndSet(false, true);
}

private boolean wasSent() {
return sent.get();
boolean prev = sent;
if (prev) {
return false;
}
this.sent = true;
return true;
}
}

Expand Down Expand Up @@ -399,8 +399,7 @@ public Task<Object> get(List<String> path, Map<String, Object> queryParams) {
request.put(REQUEST_PATH, ConnectionUtils.pathToString(query.path));
request.put(REQUEST_QUERIES, query.queryParams);

outstandingGets.put(
readId,
OutstandingGet outstandingGet =
new OutstandingGet(
REQUEST_ACTION_GET,
request,
Expand All @@ -417,15 +416,16 @@ public void onResponse(Map<String, Object> response) {
new Exception((String) response.get(SERVER_DATA_UPDATE_BODY)));
}
}
}));
});
outstandingGets.put(readId, outstandingGet);

if (!connected()) {
executorService.schedule(
new Runnable() {
@Override
public void run() {
OutstandingGet get = outstandingGets.get(readId);
if (get == null || !get.markSent()) {
if (get == null || get != outstandingGet || !get.markSent()) {
return;
}
if (logger.logsDebug()) {
Expand Down Expand Up @@ -1067,13 +1067,6 @@ private void restoreState() {
sendPut(put);
}

if (logger.logsDebug()) logger.debug("Restoring reads.");
ArrayList<Long> outstandingGetKeys = new ArrayList<Long>(outstandingGets.keySet());
Collections.sort(outstandingGetKeys);
for (Long getId : outstandingGetKeys) {
sendGet(getId);
}

// Restore disconnect operations
for (OutstandingDisconnect disconnect : onDisconnectRequestQueue) {
sendOnDisconnect(
Expand All @@ -1083,6 +1076,13 @@ private void restoreState() {
disconnect.getOnComplete());
}
onDisconnectRequestQueue.clear();

if (logger.logsDebug()) logger.debug("Restoring reads.");
ArrayList<Long> outstandingGetKeys = new ArrayList<Long>(outstandingGets.keySet());
Collections.sort(outstandingGetKeys);
for (Long getId : outstandingGetKeys) {
sendGet(getId);
}
}

private void handleTimestamp(long timestamp) {
Expand Down Expand Up @@ -1167,7 +1167,7 @@ private void sendGet(final Long readId) {
OutstandingGet get = outstandingGets.get(readId);
if (!get.markSent()) {
if (logger.logsDebug()) {
logger.debug("get" + readId + " already sent or cancelled, ignoring.");
logger.debug("get" + readId + " cancelled, ignoring.");
return;
}
}
Expand All @@ -1181,10 +1181,9 @@ public void onResponse(Map<String, Object> response) {
if (currentGet == get) {
outstandingGets.remove(readId);
get.getOnComplete().onResponse(response);
} else {
if (logger.logsDebug())
logger.debug(
"Ignoring on complete for get " + readId + " because it was removed already.");
} else if (logger.logsDebug()) {
logger.debug(
"Ignoring on complete for get " + readId + " because it was removed already.");
}
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
import static com.google.firebase.database.core.utilities.Utilities.hardAssert;

import androidx.annotation.NonNull;
import com.google.android.gms.tasks.Continuation;
import com.google.android.gms.tasks.OnCompleteListener;
import com.google.android.gms.tasks.Task;
import com.google.android.gms.tasks.TaskCompletionSource;
Expand Down Expand Up @@ -513,12 +512,11 @@ public void onComplete(@NonNull Task<Object> task) {
});
return source
.getTask()
.continueWithTask(
new Continuation<DataSnapshot, Task<DataSnapshot>>() {
.addOnCompleteListener(
new OnCompleteListener<DataSnapshot>() {
@Override
public Task<DataSnapshot> then(@NonNull Task<DataSnapshot> task) throws Exception {
public void onComplete(@NonNull Task<DataSnapshot> task) {
serverSyncTree.setQueryInactive(query.getSpec());
return task;
}
});
}
Expand Down