@@ -48,7 +48,8 @@ import CBOR from 'cbor-js';
48
48
49
49
import ArduinoCloudError from './ArduinoCloudError' ;
50
50
51
- const connections = { } ;
51
+ let connection = null ;
52
+ let connectionOptions = null ;
52
53
const subscribedTopics = { } ;
53
54
const propertyCallback = { } ;
54
55
const arduinoCloudPort = 8443 ;
@@ -82,6 +83,12 @@ const connect = options => new Promise((resolve, reject) => {
82
83
onConnected : options . onConnected ,
83
84
} ;
84
85
86
+ connectionOptions = opts ;
87
+
88
+ if ( connection ) {
89
+ return reject ( new Error ( 'connection failed: connection already open' ) ) ;
90
+ }
91
+
85
92
if ( ! opts . host ) {
86
93
return reject ( new Error ( 'connection failed: you need to provide a valid host (broker)' ) ) ;
87
94
}
@@ -140,15 +147,13 @@ const connect = options => new Promise((resolve, reject) => {
140
147
if ( reconnect === true ) {
141
148
// This is a re-connection: re-subscribe to all topics subscribed before the
142
149
// connection loss
143
- Object . getOwnPropertySymbols ( subscribedTopics ) . forEach ( ( connectionId ) => {
144
- Object . values ( subscribedTopics [ connectionId ] ) . forEach ( ( subscribeParams ) => {
145
- subscribe ( connectionId , subscribeParams . topic , subscribeParams . cb )
146
- } ) ;
150
+ Object . values ( subscribedTopics ) . forEach ( ( subscribeParams ) => {
151
+ subscribe ( subscribeParams . topic , subscribeParams . cb ) ;
147
152
} ) ;
148
153
}
149
154
150
155
if ( typeof opts . onConnected === 'function' ) {
151
- opts . onConnected ( reconnect )
156
+ opts . onConnected ( reconnect ) ;
152
157
}
153
158
} ;
154
159
@@ -170,9 +175,8 @@ const connect = options => new Promise((resolve, reject) => {
170
175
reconnect : true ,
171
176
keepAliveInterval : 30 ,
172
177
onSuccess : ( ) => {
173
- const id = Symbol ( clientID ) ;
174
- connections [ id ] = client ;
175
- return resolve ( id ) ;
178
+ connection = client ;
179
+ return resolve ( ) ;
176
180
} ,
177
181
onFailure : ( { errorCode, errorMessage } ) => reject (
178
182
new ArduinoCloudError ( errorCode , errorMessage ) ,
@@ -192,13 +196,19 @@ const connect = options => new Promise((resolve, reject) => {
192
196
} , reject ) ;
193
197
} ) ;
194
198
195
- const disconnect = id => new Promise ( ( resolve , reject ) => {
196
- const client = connections [ id ] ;
197
- if ( ! client ) {
198
- return reject ( new Error ( 'disconnection failed: client not found' ) ) ;
199
+ const disconnect = ( ) => new Promise ( ( resolve , reject ) => {
200
+ if ( ! connection ) {
201
+ return reject ( new Error ( 'disconnection failed: connection closed' ) ) ;
202
+ }
203
+
204
+ try {
205
+ connection . disconnect ( ) ;
206
+ } catch ( error ) {
207
+ return reject ( error ) ;
199
208
}
200
209
201
- client . disconnect ( ) ;
210
+ // Remove the connection
211
+ connection = null ;
202
212
203
213
// Remove property callbacks to allow resubscribing in a later connect()
204
214
Object . keys ( propertyCallback ) . forEach ( ( topic ) => {
@@ -209,39 +219,78 @@ const disconnect = id => new Promise((resolve, reject) => {
209
219
210
220
// Clean up subscribed topics - a new connection might not need the same topics
211
221
Object . keys ( subscribedTopics ) . forEach ( ( topic ) => {
212
- if ( subscribedTopics [ topic ] ) {
213
- delete subscribedTopics [ topic ] ;
214
- }
222
+ delete subscribedTopics [ topic ] ;
215
223
} ) ;
216
224
217
225
return resolve ( ) ;
218
226
} ) ;
219
227
220
- const subscribe = ( id , topic , cb ) => new Promise ( ( resolve , reject ) => {
221
- const client = connections [ id ] ;
222
- if ( ! client ) {
223
- return reject ( new Error ( 'disconnection failed: client not found' ) ) ;
228
+ const updateToken = async function updateToken ( token ) {
229
+ // This infinite loop will exit once the reconnection is successful -
230
+ // and will pause between each reconnection tentative, every 5 secs.
231
+ // eslint-disable-next-line no-constant-condition
232
+ while ( true ) {
233
+ try {
234
+ if ( connection ) {
235
+ // Disconnect to the connection that is using the old token
236
+ connection . disconnect ( ) ;
237
+
238
+ // Remove the connection
239
+ connection = null ;
240
+ }
241
+
242
+ // Reconnect using the new token
243
+ const reconnectOptions = Object . assign ( { } , connectionOptions , { token } ) ;
244
+ await connect ( reconnectOptions ) ;
245
+
246
+ // Re-subscribe to all topics subscribed before the reconnection
247
+ Object . values ( subscribedTopics ) . forEach ( ( subscribeParams ) => {
248
+ subscribe ( subscribeParams . topic , subscribeParams . cb ) ;
249
+ } ) ;
250
+
251
+ if ( typeof connectionOptions . onConnected === 'function' ) {
252
+ // Call the connection callback (with the reconnection param set to true)
253
+ connectionOptions . onConnected ( true ) ;
254
+ }
255
+
256
+ // Exit the infinite loop
257
+ return ;
258
+ } catch ( error ) {
259
+ // Expose paho-mqtt errors
260
+ // eslint-disable-next-line no-console
261
+ console . error ( error ) ;
262
+
263
+ // Something went wrong during the reconnection - retry in 5 secs.
264
+ await new Promise ( ( resolve ) => {
265
+ setTimeout ( resolve , 5000 ) ;
266
+ } ) ;
267
+ }
224
268
}
269
+ } ;
225
270
226
- return client . subscribe ( topic , {
271
+ const subscribe = ( topic , cb ) => new Promise ( ( resolve , reject ) => {
272
+ if ( ! connection ) {
273
+ return reject ( new Error ( 'subscription failed: connection closed' ) ) ;
274
+ }
275
+
276
+ return connection . subscribe ( topic , {
227
277
onSuccess : ( ) => {
228
- if ( ! client . topics [ topic ] ) {
229
- client . topics [ topic ] = [ ] ;
278
+ if ( ! connection . topics [ topic ] ) {
279
+ connection . topics [ topic ] = [ ] ;
230
280
}
231
- client . topics [ topic ] . push ( cb ) ;
281
+ connection . topics [ topic ] . push ( cb ) ;
232
282
return resolve ( topic ) ;
233
283
} ,
234
284
onFailure : ( ) => reject ( ) ,
235
285
} ) ;
236
286
} ) ;
237
287
238
- const unsubscribe = ( id , topic ) => new Promise ( ( resolve , reject ) => {
239
- const client = connections [ id ] ;
240
- if ( ! client ) {
241
- return reject ( new Error ( 'disconnection failed: client not found' ) ) ;
288
+ const unsubscribe = topic => new Promise ( ( resolve , reject ) => {
289
+ if ( ! connection ) {
290
+ return reject ( new Error ( 'disconnection failed: connection closed' ) ) ;
242
291
}
243
292
244
- return client . unsubscribe ( topic , {
293
+ return connection . unsubscribe ( topic , {
245
294
onSuccess : ( ) => resolve ( topic ) ,
246
295
onFailure : ( ) => reject ( ) ,
247
296
} ) ;
@@ -258,32 +307,31 @@ const arrayBufferToBase64 = (buffer) => {
258
307
return window . btoa ( binary ) ;
259
308
} ;
260
309
261
- const sendMessage = ( id , topic , message ) => new Promise ( ( resolve , reject ) => {
262
- const client = connections [ id ] ;
263
- if ( ! client ) {
264
- return reject ( new Error ( 'disconnection failed: client not found' ) ) ;
310
+ const sendMessage = ( topic , message ) => new Promise ( ( resolve , reject ) => {
311
+ if ( ! connection ) {
312
+ return reject ( new Error ( 'disconnection failed: connection closed' ) ) ;
265
313
}
266
314
267
- client . publish ( topic , message , 1 , false ) ;
315
+ connection . publish ( topic , message , 1 , false ) ;
268
316
return resolve ( ) ;
269
317
} ) ;
270
318
271
- const openCloudMonitor = ( id , deviceId , cb ) => {
319
+ const openCloudMonitor = ( deviceId , cb ) => {
272
320
const cloudMonitorOutputTopic = `/a/d/${ deviceId } /s/o` ;
273
- return subscribe ( id , cloudMonitorOutputTopic , cb ) ;
321
+ return subscribe ( cloudMonitorOutputTopic , cb ) ;
274
322
} ;
275
323
276
- const writeCloudMonitor = ( id , deviceId , message ) => {
324
+ const writeCloudMonitor = ( deviceId , message ) => {
277
325
const cloudMonitorInputTopic = `/a/d/${ deviceId } /s/i` ;
278
- return sendMessage ( id , cloudMonitorInputTopic , message ) ;
326
+ return sendMessage ( cloudMonitorInputTopic , message ) ;
279
327
} ;
280
328
281
- const closeCloudMonitor = ( id , deviceId ) => {
329
+ const closeCloudMonitor = ( deviceId ) => {
282
330
const cloudMonitorOutputTopic = `/a/d/${ deviceId } /s/o` ;
283
- return unsubscribe ( id , cloudMonitorOutputTopic ) ;
331
+ return unsubscribe ( cloudMonitorOutputTopic ) ;
284
332
} ;
285
333
286
- const sendProperty = ( connectionId , thingId , name , value , timestamp ) => {
334
+ const sendProperty = ( thingId , name , value , timestamp ) => {
287
335
const propertyInputTopic = `/a/t/${ thingId } /e/i` ;
288
336
289
337
if ( timestamp && ! Number . isInteger ( timestamp ) ) {
@@ -313,7 +361,7 @@ const sendProperty = (connectionId, thingId, name, value, timestamp) => {
313
361
break ;
314
362
}
315
363
316
- return sendMessage ( connectionId , propertyInputTopic , CBOR . encode ( [ cborValue ] ) ) ;
364
+ return sendMessage ( propertyInputTopic , CBOR . encode ( [ cborValue ] ) ) ;
317
365
} ;
318
366
319
367
const getSenml = ( deviceId , name , value , timestamp ) => {
@@ -355,7 +403,7 @@ const getCborValue = (senMl) => {
355
403
return arrayBufferToBase64 ( cborEncoded ) ;
356
404
} ;
357
405
358
- const sendPropertyAsDevice = ( connectionId , deviceId , thingId , name , value , timestamp ) => {
406
+ const sendPropertyAsDevice = ( deviceId , thingId , name , value , timestamp ) => {
359
407
const propertyInputTopic = `/a/t/${ thingId } /e/o` ;
360
408
361
409
if ( timestamp && ! Number . isInteger ( timestamp ) ) {
@@ -367,10 +415,10 @@ const sendPropertyAsDevice = (connectionId, deviceId, thingId, name, value, time
367
415
}
368
416
369
417
const senMlValue = getSenml ( deviceId , name , value , timestamp ) ;
370
- return sendMessage ( connectionId , propertyInputTopic , CBOR . encode ( [ senMlValue ] ) ) ;
418
+ return sendMessage ( propertyInputTopic , CBOR . encode ( [ senMlValue ] ) ) ;
371
419
} ;
372
420
373
- const onPropertyValue = ( connectionId , thingId , name , cb ) => {
421
+ const onPropertyValue = ( thingId , name , cb ) => {
374
422
if ( ! name ) {
375
423
throw new Error ( 'Invalid property name' ) ;
376
424
}
@@ -379,19 +427,15 @@ const onPropertyValue = (connectionId, thingId, name, cb) => {
379
427
}
380
428
const propOutputTopic = `/a/t/${ thingId } /e/o` ;
381
429
382
- if ( ! subscribedTopics [ connectionId ] ) {
383
- subscribedTopics [ connectionId ] = { } ;
384
- }
385
-
386
- subscribedTopics [ connectionId ] [ thingId ] = {
430
+ subscribedTopics [ thingId ] = {
387
431
topic : propOutputTopic ,
388
- cb : cb ,
432
+ cb,
389
433
} ;
390
434
391
435
if ( ! propertyCallback [ propOutputTopic ] ) {
392
436
propertyCallback [ propOutputTopic ] = { } ;
393
437
propertyCallback [ propOutputTopic ] [ name ] = cb ;
394
- subscribe ( connectionId , propOutputTopic , cb ) ;
438
+ subscribe ( propOutputTopic , cb ) ;
395
439
} else if ( propertyCallback [ propOutputTopic ] && ! propertyCallback [ propOutputTopic ] [ name ] ) {
396
440
propertyCallback [ propOutputTopic ] [ name ] = cb ;
397
441
}
@@ -401,6 +445,7 @@ const onPropertyValue = (connectionId, thingId, name, cb) => {
401
445
export default {
402
446
connect,
403
447
disconnect,
448
+ updateToken,
404
449
subscribe,
405
450
unsubscribe,
406
451
sendMessage,
0 commit comments