-
-
Notifications
You must be signed in to change notification settings - Fork 7
/
Copy pathConnection.ts
121 lines (101 loc) · 3.53 KB
/
Connection.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
import jws from 'jws';
import mqtt from 'mqtt';
import { Observable, Subject } from 'rxjs';
import SenML from '../senML';
import Utils from '../utils';
import { CloudMessageValue } from '../client/ICloudClient';
import { IConnection, CloudMessage, ConnectionOptions } from './IConnection';
const BaseConnectionOptions: Partial<ConnectionOptions> = {
clean: true,
keepalive: 30,
properties: {},
protocolVersion: 4,
connectTimeout: 30000,
};
export class Connection implements IConnection {
public token: string;
public messages: Observable<CloudMessage>;
private _client: mqtt.MqttClient;
private get client(): mqtt.MqttClient {
return this._client;
}
private set client(client: mqtt.MqttClient) {
this._client = client;
const messages = (this.messages = new Subject<CloudMessage>());
this._client.on('message', (topic, msg) => {
if (topic.indexOf('/s/o') > -1) messages.next({ topic, value: msg.toString() });
else this.messagesFrom(topic, msg).forEach((m) => messages.next(m));
});
}
public static async From(
host: string,
port: string | number,
token: string,
mqttConnect: (string, IClientOptions) => mqtt.MqttClient
): Promise<IConnection> {
if (!token) throw new Error('connection failed: you need to provide a valid token');
if (!host) throw new Error('connection failed: you need to provide a valid host (broker)');
const userId = jws.decode(token).payload['http://arduino.cc/id'];
const options = {
clientId: `${userId}:${new Date().getTime()}`,
username: userId,
password: token,
};
const connection = new Connection();
connection.client = mqttConnect(`wss://${host}:${port}/mqtt`, {
...BaseConnectionOptions,
...options,
});
connection.token = token;
return connection;
}
public on(event: any, cb: any): IConnection {
this.client.on(event, cb);
return this;
}
public end(force?: boolean, opts?: Record<string, any>, cb?: mqtt.CloseCallback): IConnection {
this.client.end(force, opts, cb);
return this;
}
public reconnect(opts?: mqtt.IClientReconnectOptions): IConnection {
this.client.reconnect(opts);
return this;
}
public unsubscribe(topic: string | string[], opts?: any, callback?: any): IConnection {
this.client.subscribe(topic, opts, callback);
return this;
}
public publish(topic: any, message: any, opts?: any, callback?: any): IConnection {
this.client.publish(topic, message, opts, callback);
return this;
}
public subscribe(topic: any, callback?: any): IConnection {
this.client.subscribe(topic, callback);
return this;
}
private messagesFrom(topic: string, msg: Buffer): CloudMessage[] {
let current = '';
let attribute = '';
let previous = '';
let valueToSend: CloudMessageValue = {};
const messages: CloudMessage[] = [];
const properties = SenML.CBOR.decode(Utils.toArrayBuffer(msg));
properties.forEach((p) => {
const value = SenML.valueFrom(p);
[current, attribute] = SenML.nameFrom(p).split(':');
if (previous === '') previous = current;
if (previous !== current) {
messages.push({ topic, propertyName: previous, value: valueToSend });
previous = current;
valueToSend = {};
}
if (attribute) valueToSend[attribute] = value;
else valueToSend = value;
});
// Checking if valueToSend is NOT {}
if (Utils.isNotAnEmptyObject(valueToSend)) {
messages.push({ topic, propertyName: current, value: valueToSend });
}
return messages;
}
}