Skip to content

Single connection #8

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Jan 8, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .eslintrc.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,8 @@ module.exports = {
"env": {
"browser": true,
"jest": true
},
"rules": {
"no-await-in-loop": 0
}
};
26 changes: 16 additions & 10 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,51 +24,57 @@ import ArduinoCloud from 'arduino-iot-js';
// apiUrl: 'AUTH SERVER URL', // Default is https://auth.arduino.cc
// onDisconnect: message => { /* Disconnection callback */ }
// }
ArduinoCloud.connect(options).then(connectionId => {
ArduinoCloud.connect(options).then(() => {
// Connected
});

ArduinoCloud.disconnect(connectionId).then(() => {
ArduinoCloud.disconnect().then(() => {
// Disconnected
});

ArduinoCloud.subscribe(connectionId, topic, cb).then(topic => {
ArduinoCloud.subscribe(topic, cb).then(topic => {
// Subscribed to topic, messaged fired in the cb
});

ArduinoCloud.unsubscribe(connectionId, topic).then(topic => {
ArduinoCloud.unsubscribe(topic).then(topic => {
// Unsubscribed to topic
});

ArduinoCloud.sendMessage(connectionId, topic, message).then(() => {
ArduinoCloud.sendMessage(topic, message).then(() => {
// Message sent
});

ArduinoCloud.openCloudMonitor(connectionId, deviceId, cb).then(topic => {
ArduinoCloud.openCloudMonitor(deviceId, cb).then(topic => {
// Cloud monitor messages fired to cb
});

ArduinoCloud.writeCloudMonitor(connectionId, deviceId, message).then(() => {
ArduinoCloud.writeCloudMonitor(deviceId, message).then(() => {
// Message sent to cloud monitor
});

ArduinoCloud.closeCloudMonitor(connectionId, deviceId).then(topic => {
ArduinoCloud.closeCloudMonitor(deviceId).then(topic => {
// Close cloud monitor
});

// Send a property value to a device
// - value can be a string, a boolean or a number
// - timestamp is a unix timestamp, not required
ArduinoCloud.sendProperty(connectionId, thingId, name, value, timestamp).then(() => {
ArduinoCloud.sendProperty(thingId, name, value, timestamp).then(() => {
// Property value sent
});

// Register a callback on a property value change
//
ArduinoCloud.onPropertyValue(connectionId, thingId, propertyName, updateCb).then(() => {
ArduinoCloud.onPropertyValue(thingId, propertyName, updateCb).then(() => {
// updateCb(message) will be called every time a new value is available. Value can be string, number, or a boolean depending on the property type
});

// Re-connect with a new authentication token, keeping the subscriptions
// to the Things topics
ArduinoCloud.updateToken(newToken).then(() => {
// Successful reconnection with the provided new token
});

```

## Run tests
Expand Down
151 changes: 98 additions & 53 deletions src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,8 @@ import CBOR from 'cbor-js';

import ArduinoCloudError from './ArduinoCloudError';

const connections = {};
let connection = null;
let connectionOptions = null;
const subscribedTopics = {};
const propertyCallback = {};
const arduinoCloudPort = 8443;
Expand Down Expand Up @@ -82,6 +83,12 @@ const connect = options => new Promise((resolve, reject) => {
onConnected: options.onConnected,
};

connectionOptions = opts;

if (connection) {
return reject(new Error('connection failed: connection already open'));
}

if (!opts.host) {
return reject(new Error('connection failed: you need to provide a valid host (broker)'));
}
Expand Down Expand Up @@ -140,15 +147,13 @@ const connect = options => new Promise((resolve, reject) => {
if (reconnect === true) {
// This is a re-connection: re-subscribe to all topics subscribed before the
// connection loss
Object.getOwnPropertySymbols(subscribedTopics).forEach((connectionId) => {
Object.values(subscribedTopics[connectionId]).forEach((subscribeParams) => {
subscribe(connectionId, subscribeParams.topic, subscribeParams.cb)
});
Object.values(subscribedTopics).forEach((subscribeParams) => {
subscribe(subscribeParams.topic, subscribeParams.cb);
});
}

if (typeof opts.onConnected === 'function') {
opts.onConnected(reconnect)
opts.onConnected(reconnect);
}
};

Expand All @@ -170,9 +175,8 @@ const connect = options => new Promise((resolve, reject) => {
reconnect: true,
keepAliveInterval: 30,
onSuccess: () => {
const id = Symbol(clientID);
connections[id] = client;
return resolve(id);
connection = client;
return resolve();
},
onFailure: ({ errorCode, errorMessage }) => reject(
new ArduinoCloudError(errorCode, errorMessage),
Expand All @@ -192,13 +196,19 @@ const connect = options => new Promise((resolve, reject) => {
}, reject);
});

const disconnect = id => new Promise((resolve, reject) => {
const client = connections[id];
if (!client) {
return reject(new Error('disconnection failed: client not found'));
const disconnect = () => new Promise((resolve, reject) => {
if (!connection) {
return reject(new Error('disconnection failed: connection closed'));
}

try {
connection.disconnect();
} catch (error) {
return reject(error);
}

client.disconnect();
// Remove the connection
connection = null;

// Remove property callbacks to allow resubscribing in a later connect()
Object.keys(propertyCallback).forEach((topic) => {
Expand All @@ -209,39 +219,78 @@ const disconnect = id => new Promise((resolve, reject) => {

// Clean up subscribed topics - a new connection might not need the same topics
Object.keys(subscribedTopics).forEach((topic) => {
if (subscribedTopics[topic]) {
delete subscribedTopics[topic];
}
delete subscribedTopics[topic];
});

return resolve();
});

const subscribe = (id, topic, cb) => new Promise((resolve, reject) => {
const client = connections[id];
if (!client) {
return reject(new Error('disconnection failed: client not found'));
const updateToken = async function updateToken(token) {
// This infinite loop will exit once the reconnection is successful -
// and will pause between each reconnection tentative, every 5 secs.
// eslint-disable-next-line no-constant-condition
while (true) {
try {
if (connection) {
// Disconnect to the connection that is using the old token
connection.disconnect();

// Remove the connection
connection = null;
}

// Reconnect using the new token
const reconnectOptions = Object.assign({}, connectionOptions, { token });
await connect(reconnectOptions);

// Re-subscribe to all topics subscribed before the reconnection
Object.values(subscribedTopics).forEach((subscribeParams) => {
subscribe(subscribeParams.topic, subscribeParams.cb);
});

if (typeof connectionOptions.onConnected === 'function') {
// Call the connection callback (with the reconnection param set to true)
connectionOptions.onConnected(true);
}

// Exit the infinite loop
return;
} catch (error) {
// Expose paho-mqtt errors
// eslint-disable-next-line no-console
console.error(error);

// Something went wrong during the reconnection - retry in 5 secs.
await new Promise((resolve) => {
setTimeout(resolve, 5000);
});
}
}
};

return client.subscribe(topic, {
const subscribe = (topic, cb) => new Promise((resolve, reject) => {
if (!connection) {
return reject(new Error('subscription failed: connection closed'));
}

return connection.subscribe(topic, {
onSuccess: () => {
if (!client.topics[topic]) {
client.topics[topic] = [];
if (!connection.topics[topic]) {
connection.topics[topic] = [];
}
client.topics[topic].push(cb);
connection.topics[topic].push(cb);
return resolve(topic);
},
onFailure: () => reject(),
});
});

const unsubscribe = (id, topic) => new Promise((resolve, reject) => {
const client = connections[id];
if (!client) {
return reject(new Error('disconnection failed: client not found'));
const unsubscribe = topic => new Promise((resolve, reject) => {
if (!connection) {
return reject(new Error('disconnection failed: connection closed'));
}

return client.unsubscribe(topic, {
return connection.unsubscribe(topic, {
onSuccess: () => resolve(topic),
onFailure: () => reject(),
});
Expand All @@ -258,32 +307,31 @@ const arrayBufferToBase64 = (buffer) => {
return window.btoa(binary);
};

const sendMessage = (id, topic, message) => new Promise((resolve, reject) => {
const client = connections[id];
if (!client) {
return reject(new Error('disconnection failed: client not found'));
const sendMessage = (topic, message) => new Promise((resolve, reject) => {
if (!connection) {
return reject(new Error('disconnection failed: connection closed'));
}

client.publish(topic, message, 1, false);
connection.publish(topic, message, 1, false);
return resolve();
});

const openCloudMonitor = (id, deviceId, cb) => {
const openCloudMonitor = (deviceId, cb) => {
const cloudMonitorOutputTopic = `/a/d/${deviceId}/s/o`;
return subscribe(id, cloudMonitorOutputTopic, cb);
return subscribe(cloudMonitorOutputTopic, cb);
};

const writeCloudMonitor = (id, deviceId, message) => {
const writeCloudMonitor = (deviceId, message) => {
const cloudMonitorInputTopic = `/a/d/${deviceId}/s/i`;
return sendMessage(id, cloudMonitorInputTopic, message);
return sendMessage(cloudMonitorInputTopic, message);
};

const closeCloudMonitor = (id, deviceId) => {
const closeCloudMonitor = (deviceId) => {
const cloudMonitorOutputTopic = `/a/d/${deviceId}/s/o`;
return unsubscribe(id, cloudMonitorOutputTopic);
return unsubscribe(cloudMonitorOutputTopic);
};

const sendProperty = (connectionId, thingId, name, value, timestamp) => {
const sendProperty = (thingId, name, value, timestamp) => {
const propertyInputTopic = `/a/t/${thingId}/e/i`;

if (timestamp && !Number.isInteger(timestamp)) {
Expand Down Expand Up @@ -313,7 +361,7 @@ const sendProperty = (connectionId, thingId, name, value, timestamp) => {
break;
}

return sendMessage(connectionId, propertyInputTopic, CBOR.encode([cborValue]));
return sendMessage(propertyInputTopic, CBOR.encode([cborValue]));
};

const getSenml = (deviceId, name, value, timestamp) => {
Expand Down Expand Up @@ -355,7 +403,7 @@ const getCborValue = (senMl) => {
return arrayBufferToBase64(cborEncoded);
};

const sendPropertyAsDevice = (connectionId, deviceId, thingId, name, value, timestamp) => {
const sendPropertyAsDevice = (deviceId, thingId, name, value, timestamp) => {
const propertyInputTopic = `/a/t/${thingId}/e/o`;

if (timestamp && !Number.isInteger(timestamp)) {
Expand All @@ -367,10 +415,10 @@ const sendPropertyAsDevice = (connectionId, deviceId, thingId, name, value, time
}

const senMlValue = getSenml(deviceId, name, value, timestamp);
return sendMessage(connectionId, propertyInputTopic, CBOR.encode([senMlValue]));
return sendMessage(propertyInputTopic, CBOR.encode([senMlValue]));
};

const onPropertyValue = (connectionId, thingId, name, cb) => {
const onPropertyValue = (thingId, name, cb) => {
if (!name) {
throw new Error('Invalid property name');
}
Expand All @@ -379,19 +427,15 @@ const onPropertyValue = (connectionId, thingId, name, cb) => {
}
const propOutputTopic = `/a/t/${thingId}/e/o`;

if (!subscribedTopics[connectionId]) {
subscribedTopics[connectionId] = {};
}

subscribedTopics[connectionId][thingId] = {
subscribedTopics[thingId] = {
topic: propOutputTopic,
cb: cb,
cb,
};

if (!propertyCallback[propOutputTopic]) {
propertyCallback[propOutputTopic] = {};
propertyCallback[propOutputTopic][name] = cb;
subscribe(connectionId, propOutputTopic, cb);
subscribe(propOutputTopic, cb);
} else if (propertyCallback[propOutputTopic] && !propertyCallback[propOutputTopic][name]) {
propertyCallback[propOutputTopic][name] = cb;
}
Expand All @@ -401,6 +445,7 @@ const onPropertyValue = (connectionId, thingId, name, cb) => {
export default {
connect,
disconnect,
updateToken,
subscribe,
unsubscribe,
sendMessage,
Expand Down
Loading