|
| 1 | +/** |
| 2 | + * Copyright 2018 Google Inc. |
| 3 | + * |
| 4 | + * Licensed under the Apache License, Version 2.0 (the "License"); |
| 5 | + * you may not use this file except in compliance with the License. |
| 6 | + * You may obtain a copy of the License at |
| 7 | + * |
| 8 | + * http://www.apache.org/licenses/LICENSE-2.0 |
| 9 | + * |
| 10 | + * Unless required by applicable law or agreed to in writing, software |
| 11 | + * distributed under the License is distributed on an "AS IS" BASIS, |
| 12 | + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 13 | + * See the License for the specific language governing permissions and |
| 14 | + * limitations under the License. |
| 15 | + */ |
| 16 | + |
| 17 | +import { firestore } from 'firebase/app'; |
| 18 | +import { fromCollectionRef } from '../fromRef'; |
| 19 | +import { Observable } from 'rxjs'; |
| 20 | +import { map, filter, scan } from 'rxjs/operators'; |
| 21 | + |
| 22 | +const ALL_EVENTS: firestore.DocumentChangeType[] = [ |
| 23 | + 'added', |
| 24 | + 'modified', |
| 25 | + 'removed' |
| 26 | +]; |
| 27 | + |
| 28 | +/** |
| 29 | + * Create an operator that determines if a the stream of document changes |
| 30 | + * are specified by the event filter. If the document change type is not |
| 31 | + * in specified events array, it will not be emitted. |
| 32 | + */ |
| 33 | +const filterEvents = (events?: firestore.DocumentChangeType[]) => |
| 34 | + filter((changes: firestore.DocumentChange[]) => { |
| 35 | + let hasChange = false; |
| 36 | + for (let i = 0; i < changes.length; i++) { |
| 37 | + const change = changes[i]; |
| 38 | + if (events.indexOf(change.type) >= 0) { |
| 39 | + hasChange = true; |
| 40 | + break; |
| 41 | + } |
| 42 | + } |
| 43 | + return hasChange; |
| 44 | + }); |
| 45 | + |
| 46 | +/** |
| 47 | + * Create an operator that filters out empty changes. We provide the |
| 48 | + * ability to filter on events, which means all changes can be filtered out. |
| 49 | + * This creates an empty array and would be incorrect to emit. |
| 50 | + */ |
| 51 | +const filterEmpty = filter( |
| 52 | + (changes: firestore.DocumentChange[]) => changes.length > 0 |
| 53 | +); |
| 54 | + |
| 55 | +/** |
| 56 | + * Creates a new sorted array from a new change. |
| 57 | + * @param combined |
| 58 | + * @param change |
| 59 | + */ |
| 60 | +function processIndividualChange( |
| 61 | + combined: firestore.DocumentChange[], |
| 62 | + change: firestore.DocumentChange |
| 63 | +): firestore.DocumentChange[] { |
| 64 | + switch (change.type) { |
| 65 | + case 'added': |
| 66 | + if ( |
| 67 | + combined[change.newIndex] && |
| 68 | + combined[change.newIndex].doc.id == change.doc.id |
| 69 | + ) { |
| 70 | + // Skip duplicate emissions. This is rare. |
| 71 | + // TODO: Investigate possible bug in SDK. |
| 72 | + } else { |
| 73 | + combined.splice(change.newIndex, 0, change); |
| 74 | + } |
| 75 | + break; |
| 76 | + case 'modified': |
| 77 | + // When an item changes position we first remove it |
| 78 | + // and then add it's new position |
| 79 | + if (change.oldIndex !== change.newIndex) { |
| 80 | + combined.splice(change.oldIndex, 1); |
| 81 | + combined.splice(change.newIndex, 0, change); |
| 82 | + } else { |
| 83 | + combined[change.newIndex] = change; |
| 84 | + } |
| 85 | + break; |
| 86 | + case 'removed': |
| 87 | + combined.splice(change.oldIndex, 1); |
| 88 | + break; |
| 89 | + } |
| 90 | + return combined; |
| 91 | +} |
| 92 | + |
| 93 | +/** |
| 94 | + * Combines the total result set from the current set of changes from an incoming set |
| 95 | + * of changes. |
| 96 | + * @param current |
| 97 | + * @param changes |
| 98 | + * @param events |
| 99 | + */ |
| 100 | +function processDocumentChanges( |
| 101 | + current: firestore.DocumentChange[], |
| 102 | + changes: firestore.DocumentChange[], |
| 103 | + events: firestore.DocumentChangeType[] = ALL_EVENTS |
| 104 | +) { |
| 105 | + changes.forEach(change => { |
| 106 | + // skip unwanted change types |
| 107 | + if (events.indexOf(change.type) > -1) { |
| 108 | + current = processIndividualChange(current, change); |
| 109 | + } |
| 110 | + }); |
| 111 | + return current; |
| 112 | +} |
| 113 | + |
| 114 | +/** |
| 115 | + * Return a stream of document changes on a query. These results are not in sort order but in |
| 116 | + * order of occurence. |
| 117 | + * @param query |
| 118 | + */ |
| 119 | +export function docChanges( |
| 120 | + query: firestore.Query, |
| 121 | + events: firestore.DocumentChangeType[] = ALL_EVENTS |
| 122 | +) { |
| 123 | + return fromCollectionRef(query).pipe( |
| 124 | + map(snapshot => snapshot.docChanges()), |
| 125 | + filterEvents(events), |
| 126 | + filterEmpty |
| 127 | + ); |
| 128 | +} |
| 129 | + |
| 130 | +/** |
| 131 | + * Return a stream of document snapshots on a query. These results are in sort order. |
| 132 | + * @param query |
| 133 | + */ |
| 134 | +export function collection(query: firestore.Query) { |
| 135 | + return fromCollectionRef(query).pipe(map(changes => changes.docs)); |
| 136 | +} |
| 137 | + |
| 138 | +/** |
| 139 | + * Return a stream of document changes on a query. These results are in sort order. |
| 140 | + * @param query |
| 141 | + */ |
| 142 | +export function sortedChanges( |
| 143 | + query: firestore.Query, |
| 144 | + events?: firestore.DocumentChangeType[] |
| 145 | +) { |
| 146 | + return docChanges(query, events).pipe( |
| 147 | + scan( |
| 148 | + ( |
| 149 | + current: firestore.DocumentChange[], |
| 150 | + changes: firestore.DocumentChange[] |
| 151 | + ) => processDocumentChanges(current, changes, events), |
| 152 | + [] |
| 153 | + ) |
| 154 | + ); |
| 155 | +} |
| 156 | + |
| 157 | +/** |
| 158 | + * Create a stream of changes as they occur it time. This method is similar |
| 159 | + * to docChanges() but it collects each event in an array over time. |
| 160 | + */ |
| 161 | +export function auditTrail( |
| 162 | + query: firestore.Query, |
| 163 | + events?: firestore.DocumentChangeType[] |
| 164 | +): Observable<firestore.DocumentChange[]> { |
| 165 | + return docChanges(query, events).pipe( |
| 166 | + scan((current, action) => [...current, ...action], []) |
| 167 | + ); |
| 168 | +} |
0 commit comments