@@ -168,19 +168,24 @@ const connect = options => new Promise((resolve, reject) => {
168
168
}
169
169
}
170
170
} ;
171
-
171
+
172
172
client . onConnected = ( reconnect ) => {
173
+ const reconnectPromises = [ ] ;
174
+
173
175
if ( reconnect === true ) {
174
176
// This is a re-connection: re-subscribe to all topics subscribed before the
175
177
// connection loss
176
178
Object . values ( subscribedTopics ) . forEach ( ( subscribeParams ) => {
177
- subscribe ( subscribeParams . topic , subscribeParams . cb ) ;
179
+ reconnectPromises . push ( ( ) => subscribe ( subscribeParams . topic , subscribeParams . cb ) ) ;
178
180
} ) ;
179
181
}
180
182
181
- if ( typeof opts . onConnected === 'function' ) {
182
- opts . onConnected ( reconnect ) ;
183
- }
183
+ return Promise . all ( reconnectPromises )
184
+ . then ( ( ) => {
185
+ if ( typeof opts . onConnected === 'function' ) {
186
+ opts . onConnected ( reconnect ) ;
187
+ }
188
+ } ) ;
184
189
} ;
185
190
186
191
if ( typeof onDisconnect === 'function' ) {
@@ -307,7 +312,7 @@ const subscribe = (topic, cb) => new Promise((resolve, reject) => {
307
312
connection . topics [ topic ] . push ( cb ) ;
308
313
return resolve ( topic ) ;
309
314
} ,
310
- onFailure : ( ) => reject ( ) ,
315
+ onFailure : error => reject ( new Error ( `subscription failed: ${ error . errorMessage } ` ) ) ,
311
316
} ) ;
312
317
} ) ;
313
318
@@ -612,8 +617,10 @@ const onPropertyValue = (thingId, name, cb) => {
612
617
if ( ! propertyCallback [ propOutputTopic ] ) {
613
618
propertyCallback [ propOutputTopic ] = { } ;
614
619
propertyCallback [ propOutputTopic ] [ name ] = cb ;
615
- subscribe ( propOutputTopic , cb ) ;
616
- } else if ( propertyCallback [ propOutputTopic ] && ! propertyCallback [ propOutputTopic ] [ name ] ) {
620
+ return subscribe ( propOutputTopic , cb ) ;
621
+ }
622
+
623
+ if ( propertyCallback [ propOutputTopic ] && ! propertyCallback [ propOutputTopic ] [ name ] ) {
617
624
propertyCallback [ propOutputTopic ] [ name ] = cb ;
618
625
}
619
626
return Promise . resolve ( propOutputTopic ) ;
0 commit comments