Skip to content

feat(firestore): Add support for SnapshotListenOptions #1813

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 6 additions & 5 deletions src/firestore/collection/changes.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { fromCollectionRef } from '../observable/fromRef';
import { Observable } from 'rxjs';
import { map, filter, scan } from 'rxjs/operators';
import { map, scan } from 'rxjs/operators';
import { firestore } from 'firebase';

import { Query, DocumentChangeType, DocumentChange, DocumentChangeAction, Action } from '../interfaces';

Expand All @@ -9,8 +10,8 @@ import { Query, DocumentChangeType, DocumentChange, DocumentChangeAction, Action
* order of occurence.
* @param query
*/
export function docChanges<T>(query: Query): Observable<DocumentChangeAction<T>[]> {
return fromCollectionRef(query)
export function docChanges<T>(query: Query, options?: firestore.SnapshotListenOptions): Observable<DocumentChangeAction<T>[]> {
return fromCollectionRef(query, options)
.pipe(
map(action =>
action.payload.docChanges()
Expand All @@ -21,8 +22,8 @@ export function docChanges<T>(query: Query): Observable<DocumentChangeAction<T>[
* Return a stream of document changes on a query. These results are in sort order.
* @param query
*/
export function sortedChanges<T>(query: Query, events: DocumentChangeType[]): Observable<DocumentChangeAction<T>[]> {
return fromCollectionRef(query)
export function sortedChanges<T>(query: Query, events: DocumentChangeType[], options?: firestore.SnapshotListenOptions): Observable<DocumentChangeAction<T>[]> {
return fromCollectionRef(query, options)
.pipe(
map(changes => changes.payload.docChanges()),
scan((current, changes) => combineChanges(current, changes, events), []),
Expand Down
33 changes: 29 additions & 4 deletions src/firestore/collection/collection.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,7 @@ describe('AngularFirestoreCollection', () => {
deleteThemAll(names, ref).then(done).catch(done.fail);
}
});

});

it('should be able to filter snapshotChanges() types - modified', async (done) => {
Expand Down Expand Up @@ -274,7 +275,7 @@ describe('AngularFirestoreCollection', () => {
const ITEMS = 10;
const { randomCollectionName, ref, stocks, names } = await collectionHarness(afs, ITEMS);

const sub = stocks.snapshotChanges(['added', 'removed']).pipe(skip(1)).subscribe(data => {
const sub = stocks.snapshotChanges(['added', 'removed'], { includeMetadataChanges: true }).pipe(skip(1)).subscribe(data => {
sub.unsubscribe();
const change = data.filter(x => x.payload.doc.id === names[0]);
expect(data.length).toEqual(ITEMS - 1);
Expand All @@ -286,6 +287,30 @@ describe('AngularFirestoreCollection', () => {
delayDelete(stocks, names[0], 400);
});

it('should listen to all snapshotChanges() by default with listener options', async (done) => {
const ITEMS = 10;
let count = 0;
const { randomCollectionName, ref, stocks, names } = await collectionHarness(afs, ITEMS);
const sub = stocks.snapshotChanges({ includeMetadataChanges: true }).subscribe(data => {
const ids = data.map(d => d.payload.doc.id);
count = count + 1;
// the first time should all be 'added'
if(count === 1) {
// make an update
stocks.doc(names[0]).update({ price: 2});
}
// on the second round, make sure the array is still the same
// length but the updated item is now modified
if(count === 2) {
expect(data.length).toEqual(ITEMS);
const change = data.filter(x => x.payload.doc.id === names[0])[0];
expect(change.type).toEqual('modified');
sub.unsubscribe();
deleteThemAll(names, ref).then(done).catch(done.fail);
}
});
});

});

describe('stateChanges()', () => {
Expand Down Expand Up @@ -346,7 +371,7 @@ describe('AngularFirestoreCollection', () => {
it('should handle multiple subscriptions (warm)', async (done: any) => {
const ITEMS = 4;
const { randomCollectionName, ref, stocks, names } = await collectionHarness(afs, ITEMS);
const changes = stocks.stateChanges();
const changes = stocks.stateChanges({ includeMetadataChanges: true });
changes.pipe(take(1)).subscribe(() => {}).add(() => {
const sub = changes.pipe(take(1)).subscribe(data => {
expect(data.length).toEqual(ITEMS);
Expand All @@ -361,7 +386,7 @@ describe('AngularFirestoreCollection', () => {
let count = 0;
const { randomCollectionName, ref, stocks, names } = await collectionHarness(afs, ITEMS);

const sub = stocks.stateChanges(['modified']).subscribe(data => {
const sub = stocks.stateChanges(['modified'], { includeMetadataChanges: true }).subscribe(data => {
sub.unsubscribe();
expect(data.length).toEqual(1);
expect(data[0].payload.doc.data().price).toEqual(2);
Expand Down Expand Up @@ -431,7 +456,7 @@ describe('AngularFirestoreCollection', () => {
const ITEMS = 10;
const { randomCollectionName, ref, stocks, names } = await collectionHarness(afs, ITEMS);

const sub = stocks.auditTrail(['removed']).subscribe(data => {
const sub = stocks.auditTrail(['removed'], { includeMetadataChanges: true }).subscribe(data => {
sub.unsubscribe();
expect(data.length).toEqual(1);
expect(data[0].type).toEqual('removed');
Expand Down
42 changes: 31 additions & 11 deletions src/firestore/collection/collection.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
import { Observable, Subscriber } from 'rxjs';
import { fromCollectionRef } from '../observable/fromRef';
import { map, filter, scan } from 'rxjs/operators';

import { Injectable } from '@angular/core';
import { firestore } from 'firebase';

import { DocumentChangeType, CollectionReference, Query, DocumentReference, DocumentData, QueryFn, AssociatedReference, DocumentChangeAction, DocumentChange } from '../interfaces';
import { docChanges, sortedChanges } from './changes';
Expand All @@ -16,6 +15,17 @@ export function validateEventsArray(events?: DocumentChangeType[]) {
return events;
}

function validateEventsOrOptions(eventsOrOptions, options) {
let events: DocumentChangeType[] = [];
let listenerOptions: firestore.SnapshotListenOptions | undefined = options;
if(Array.isArray(eventsOrOptions)) {
events = eventsOrOptions;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would it make sense to pass into validateEventsArray here?

} else {
listenerOptions = eventsOrOptions;
}
return { events, listenerOptions };
}

/**
* AngularFirestoreCollection service
*
Expand Down Expand Up @@ -61,17 +71,18 @@ export class AngularFirestoreCollection<T=DocumentData> {
* your own data structure.
* @param events
*/
stateChanges(events?: DocumentChangeType[]): Observable<DocumentChangeAction<T>[]> {
stateChanges(eventsOrOptions?: DocumentChangeType[] | firestore.SnapshotListenOptions, options?: firestore.SnapshotListenOptions): Observable<DocumentChangeAction<T>[]> {
const { events, listenerOptions } = validateEventsOrOptions(eventsOrOptions, options);
if(!events || events.length === 0) {
return this.afs.scheduler.keepUnstableUntilFirst(
this.afs.scheduler.runOutsideAngular(
docChanges<T>(this.query)
docChanges<T>(this.query, listenerOptions)
)
);
}
return this.afs.scheduler.keepUnstableUntilFirst(
this.afs.scheduler.runOutsideAngular(
docChanges<T>(this.query)
docChanges<T>(this.query, listenerOptions)
)
)
.pipe(
Expand All @@ -85,27 +96,36 @@ export class AngularFirestoreCollection<T=DocumentData> {
* but it collects each event in an array over time.
* @param events
*/
auditTrail(events?: DocumentChangeType[]): Observable<DocumentChangeAction<T>[]> {
return this.stateChanges(events).pipe(scan((current, action) => [...current, ...action], []));
auditTrail(eventsOrOptions?: DocumentChangeType[] | firestore.SnapshotListenOptions, options?: firestore.SnapshotListenOptions): Observable<DocumentChangeAction<T>[]> {
const { events, listenerOptions } = validateEventsOrOptions(eventsOrOptions, options);
return this.stateChanges(events, listenerOptions).pipe(scan((current, action) => [...current, ...action], []));
}

/**
* Create a stream of synchronized changes. This method keeps the local array in sorted
* query order.
* @param events
*/
snapshotChanges(events?: DocumentChangeType[]): Observable<DocumentChangeAction<T>[]> {
snapshotChanges(eventsOrOptions?: DocumentChangeType[] | firestore.SnapshotListenOptions, options?: firestore.SnapshotListenOptions): Observable<DocumentChangeAction<T>[]> {
let events: DocumentChangeType[] = [];
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use validateEventsOrOptions here?

let listenerOptions: firestore.SnapshotListenOptions | undefined = options;
if(Array.isArray(eventsOrOptions)) {
events = eventsOrOptions;
} else {
listenerOptions = eventsOrOptions;
}

const validatedEvents = validateEventsArray(events);
const sortedChanges$ = sortedChanges<T>(this.query, validatedEvents);
const sortedChanges$ = sortedChanges<T>(this.query, validatedEvents, listenerOptions);
const scheduledSortedChanges$ = this.afs.scheduler.runOutsideAngular(sortedChanges$);
return this.afs.scheduler.keepUnstableUntilFirst(scheduledSortedChanges$);
}

/**
* Listen to all documents in the collection and its possible query as an Observable.
*/
valueChanges(): Observable<T[]> {
const fromCollectionRef$ = fromCollectionRef<T>(this.query);
valueChanges(options?: firestore.SnapshotListenOptions): Observable<T[]> {
const fromCollectionRef$ = fromCollectionRef<T>(this.query, options);
const scheduled$ = this.afs.scheduler.runOutsideAngular(fromCollectionRef$);
return this.afs.scheduler.keepUnstableUntilFirst(scheduled$)
.pipe(
Expand Down
8 changes: 4 additions & 4 deletions src/firestore/document/document.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { FirebaseApp, AngularFireModule } from 'angularfire2';
import { AngularFirestore } from '../firestore';
import { AngularFirestoreModule } from '../firestore.module';
import { AngularFirestoreDocument } from '../document/document';
import { Observable, Subscription } from 'rxjs';
import { Subscription } from 'rxjs';
import { take } from 'rxjs/operators';

import { TestBed, inject } from '@angular/core/testing';
Expand Down Expand Up @@ -40,10 +40,10 @@ describe('AngularFirestoreDocument', () => {
await stock.set(FAKE_STOCK_DATA);
const sub = stock
.snapshotChanges()
.subscribe(async a => {
.subscribe(a => {
sub.unsubscribe();
if (a.payload.exists) {
expect(a.payload.data()).toEqual(FAKE_STOCK_DATA);
if (a.exists) {
expect(a.data()).toEqual(FAKE_STOCK_DATA);
stock.delete().then(done).catch(done.fail);
}
});
Expand Down
8 changes: 2 additions & 6 deletions src/firestore/document/document.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,6 @@ import { DocumentReference, SetOptions, DocumentData, QueryFn, AssociatedReferen
import { fromDocRef } from '../observable/fromRef';
import { map } from 'rxjs/operators';

import { Injectable } from '@angular/core';

import { AngularFirestore, associateQuery } from '../firestore';
import { AngularFirestoreCollection } from '../collection/collection';

Expand Down Expand Up @@ -78,7 +76,7 @@ export class AngularFirestoreDocument<T=DocumentData> {
/**
* Listen to snapshot updates from the document.
*/
snapshotChanges(): Observable<Action<DocumentSnapshot<T>>> {
snapshotChanges(): Observable<DocumentSnapshot<T>> {
const fromDocRef$ = fromDocRef<T>(this.ref);
const scheduledFromDocRef$ = this.afs.scheduler.runOutsideAngular(fromDocRef$);
return this.afs.scheduler.keepUnstableUntilFirst(scheduledFromDocRef$);
Expand All @@ -89,9 +87,7 @@ export class AngularFirestoreDocument<T=DocumentData> {
*/
valueChanges(): Observable<T|undefined> {
return this.snapshotChanges().pipe(
map(action => {
return action.payload.data();
})
map(snap => snap.data())
);
}
}
4 changes: 1 addition & 3 deletions src/firestore/interfaces.ts
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,7 @@ export interface Action<T> {
payload: T;
};

export interface Reference<T> {
onSnapshot: (sub: Subscriber<any>) => any;
}
export interface Reference<T> extends Query { }

// A convience type for making a query.
// Example: const query = (ref) => ref.where('name', == 'david');
Expand Down
23 changes: 12 additions & 11 deletions src/firestore/observable/fromRef.ts
Original file line number Diff line number Diff line change
@@ -1,25 +1,26 @@
import { Observable, Subscriber } from 'rxjs';
import { DocumentReference, Query, Action, Reference, DocumentSnapshot, QuerySnapshot } from '../interfaces';
import { map, share } from 'rxjs/operators';
import { firestore } from 'firebase';

function _fromRef<T, R>(ref: Reference<T>): Observable<R> {
function _fromRef<T, R>(ref: Query, options?: firestore.SnapshotListenOptions): Observable<R> {
return new Observable(subscriber => {
const unsubscribe = ref.onSnapshot(subscriber);
const unsubscribe = ref.onSnapshot(options || {}, subscriber as any)
return { unsubscribe };
});
}

export function fromRef<R>(ref: DocumentReference | Query) {
return _fromRef<typeof ref, R>(ref).pipe(share());
export function fromRef<R>(ref: any, options?: firestore.SnapshotListenOptions) {
return _fromRef<typeof ref, R>(ref, options).pipe(share());
}

export function fromDocRef<T>(ref: DocumentReference): Observable<Action<DocumentSnapshot<T>>>{
return fromRef<DocumentSnapshot<T>>(ref)
.pipe(
map(payload => ({ payload, type: 'value' }))
);
export function fromDocRef<T>(ref: DocumentReference): Observable<DocumentSnapshot<T>>{
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we want to drop the payload/type in this commit? If so let's make sure to address the docs.

return new Observable(subscriber => {
const unsubscribe = ref.onSnapshot(subscriber as any)
return { unsubscribe };
});
}

export function fromCollectionRef<T>(ref: Query): Observable<Action<QuerySnapshot<T>>> {
return fromRef<QuerySnapshot<T>>(ref).pipe(map(payload => ({ payload, type: 'query' })));
export function fromCollectionRef<T>(ref: Query, options?: firestore.SnapshotListenOptions,): Observable<Action<QuerySnapshot<T>>> {
return fromRef<QuerySnapshot<T>>(ref, options).pipe(map(payload => ({ payload, type: 'query' })));
}