Skip to content

Commit 285f5d7

Browse files
committed
Merge pull request arduino#82 from mapnull/hotfix-mqtt-gw_distribute_nodeId
fix mqtt gateway doesn't distribute node ids when no clients
2 parents be54af1 + 8d3f244 commit 285f5d7

File tree

3 files changed

+127
-100
lines changed

3 files changed

+127
-100
lines changed

libraries/MySensors/MyMQTT.cpp

Lines changed: 125 additions & 97 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ version 2 as published by the Free Software Foundation.
1313
#include "utility/MsTimer2.h"
1414

1515
char V_0[] PROGMEM = "TEMP"; //V_TEMP
16-
char V_1[] PROGMEM = "HUM"; //V_HUM
16+
char V_1[] PROGMEM = "HUM"; //V_HUM
1717
char V_2[] PROGMEM = "LIGHT"; //V_LIGHT
1818
char V_3[] PROGMEM = "DIMMER"; //V_DIMMER
1919
char V_4[] PROGMEM = "PRESSURE"; //V_PRESSURE
@@ -23,7 +23,7 @@ char V_7[] PROGMEM = "RAINRATE"; //V_RAINRATE
2323
char V_8[] PROGMEM = "WIND"; //V_WIND
2424
char V_9[] PROGMEM = "GUST"; //V_GUST
2525
char V_10[] PROGMEM = "DIRECTON"; //V_DIRECTON
26-
char V_11[] PROGMEM = "UV"; //V_UV
26+
char V_11[] PROGMEM = "UV"; //V_UV
2727
char V_12[] PROGMEM = "WEIGHT"; //V_WEIGHT
2828
char V_13[] PROGMEM = "DISTANCE"; //V_DISTANCE
2929
char V_14[] PROGMEM = "IMPEDANCE"; //V_IMPEDANCE
@@ -35,20 +35,20 @@ char V_19[] PROGMEM = "SCENE_ON"; //V_SCENE_ON
3535
char V_20[] PROGMEM = "SCENE_OFF"; //V_SCENE_OFF
3636
char V_21[] PROGMEM = "HEATER"; //V_HEATER
3737
char V_22[] PROGMEM = "HEATER_SW"; //V_HEATER_SW
38-
char V_23[] PROGMEM = "LIGHT_LEVEL"; //V_LIGHT_LEVEL
38+
char V_23[] PROGMEM = "LIGHT_LEVEL";//V_LIGHT_LEVEL
3939
char V_24[] PROGMEM = "VAR1"; //V_VAR1
4040
char V_25[] PROGMEM = "VAR2"; //V_VAR2
4141
char V_26[] PROGMEM = "VAR3"; //V_VAR3
4242
char V_27[] PROGMEM = "VAR4"; //V_VAR4
4343
char V_28[] PROGMEM = "VAR5"; //V_VAR5
44-
char V_29[] PROGMEM = "UP"; //V_UP
44+
char V_29[] PROGMEM = "UP"; //V_UP
4545
char V_30[] PROGMEM = "DOWN"; //V_DOWN
4646
char V_31[] PROGMEM = "STOP"; //V_STOP
4747
char V_32[] PROGMEM = "IR_SEND"; //V_IR_SEND
4848
char V_33[] PROGMEM = "IR_RECEIVE"; //V_IR_RECEIVE
4949
char V_34[] PROGMEM = "FLOW"; //V_FLOW
5050
char V_35[] PROGMEM = "VOLUME"; //V_VOLUME
51-
char V_36[] PROGMEM = "LOCK_STATUS"; //V_LOCK_STATUS
51+
char V_36[] PROGMEM = "LOCK_STATUS";//V_LOCK_STATUS
5252
char V_37[] PROGMEM = "DUST_LEVEL"; //V_DUST_LEVEL
5353
char V_38[] PROGMEM = "VOLTAGE"; //V_VOLTAGE
5454
char V_39[] PROGMEM = "CURRENT"; //V_CURRENT
@@ -72,10 +72,10 @@ char V_56[] PROGMEM = ""; //
7272
char V_57[] PROGMEM = ""; //
7373
char V_58[] PROGMEM = ""; //
7474
char V_59[] PROGMEM = ""; //
75-
char V_60[] PROGMEM = "Started!\n"; //Custom for MQTTGateway
75+
char V_60[] PROGMEM = "Started!\n"; //Custom for MQTTGateway
7676
char V_61[] PROGMEM = "SKETCH_NAME"; //Custom for MQTTGateway
7777
char V_62[] PROGMEM = "SKETCH_VERSION"; //Custom for MQTTGateway
78-
char V_63[] PROGMEM = "UNKNOWN"; //Custom for MQTTGateway
78+
char V_63[] PROGMEM = "UNKNOWN"; //Custom for MQTTGateway
7979

8080
//////////////////////////////////////////////////////////////////
8181

@@ -89,7 +89,6 @@ PROGMEM const char *vType[] = {
8989
V_61, V_62, V_63
9090
};
9191

92-
9392
char broker[] PROGMEM = MQTT_BROKER_PREFIX;
9493

9594
#define S_FIRSTCUSTOM 60
@@ -132,7 +131,7 @@ void MyMQTT::begin(rf24_pa_dbm_e paLevel, uint8_t channel, rf24_datarate_e dataR
132131
Serial.begin(BAUD_RATE);
133132
repeaterMode = true;
134133
isGateway = true;
135-
MQTTClientConnected = false;
134+
MQTTClients = 0;
136135

137136
setupRepeaterMode();
138137
dataCallback = inDataCallback;
@@ -163,77 +162,120 @@ void MyMQTT::processRadioMessage() {
163162
if (process()) {
164163
// A new message was received from one of the sensors
165164
MyMessage message = getLastMessage();
166-
// Pass along the message from sensors to serial line
167165
rxBlink(1);
168-
SendMQTT(message);
169-
}
170166

167+
if (msg.isAck()) {
168+
Serial.println("msg is ack!");
169+
if (msg.sender == 255 && mGetCommand(msg) == C_INTERNAL && msg.type == I_ID_REQUEST) {
170+
// TODO: sending ACK request on id_response fucks node up. doesn't work.
171+
// The idea was to confirm id and save to EEPROM_LATEST_NODE_ADDRESS.
172+
}
173+
} else {
174+
// we have to check every message if its a newly assigned id or not.
175+
// Ack on I_ID_RESPONSE does not work, and checking on C_PRESENTATION isn't reliable.
176+
uint8_t newNodeID = loadState(EEPROM_LATEST_NODE_ADDRESS)+1;
177+
if (newNodeID <= MQTT_FIRST_SENSORID) {
178+
newNodeID = MQTT_FIRST_SENSORID;
179+
}
180+
if (msg.sender == newNodeID) {
181+
saveState(EEPROM_LATEST_NODE_ADDRESS,newNodeID);
182+
}
183+
184+
if (mGetCommand(msg) == C_INTERNAL) {
185+
if (msg.type == I_CONFIG) {
186+
txBlink(1);
187+
if (!sendRoute(build(msg, GATEWAY_ADDRESS, msg.sender, 255, C_INTERNAL, I_CONFIG, 0).set("M"))) {
188+
errBlink(1);
189+
}
190+
} else if (msg.type == I_ID_REQUEST && msg.sender == 255) {
191+
uint8_t newNodeID = loadState(EEPROM_LATEST_NODE_ADDRESS)+1;
192+
if (newNodeID <= MQTT_FIRST_SENSORID) {
193+
newNodeID = MQTT_FIRST_SENSORID;
194+
}
195+
if (newNodeID >= MQTT_LAST_SENSORID) {
196+
// Sorry no more id's left :(
197+
newNodeID = AUTO;
198+
}
199+
txBlink(1);
200+
if (!sendRoute(build(msg, GATEWAY_ADDRESS, msg.sender, 255, C_INTERNAL, I_ID_RESPONSE, 0).set(newNodeID))) {
201+
errBlink(1);
202+
}
203+
}
204+
} else if (mGetCommand(msg)!= C_PRESENTATION) {
205+
// Pass along the message from sensors to MQTT
206+
SendMQTT(message);
207+
}
208+
}
209+
}
171210
}
172211

173212
void MyMQTT::processMQTTMessage(char *inputString, uint8_t inputPos) {
174-
char *str, *p;
213+
char *str, *p, *payload=NULL;
175214
uint8_t i = 0;
176215
buffer[0]= 0;
177216
buffsize = 0;
217+
uint8_t mqttMsgType = (uint8_t)inputString[0] >> 4;
178218

179-
if ((uint8_t)inputString[0] >> 4 == MQTTCONNECT) {
219+
if (mqttMsgType == MQTTCONNECT) {
180220
buffer[buffsize++] = MQTTCONNACK << 4;
181221
buffer[buffsize++] = 0x02; // Remaining length
182222
buffer[buffsize++] = 0x00; // Connection accepted
183223
buffer[buffsize++] = 0x00; // Reserved
184-
MQTTClientConnected=true; // We have connection!
185-
}
186-
if ((uint8_t)inputString[0] >> 4 == MQTTPINGREQ) {
224+
MQTTClients++; // We have a new client connected!
225+
} else if (mqttMsgType == MQTTPINGREQ) {
187226
buffer[buffsize++] = MQTTPINGRESP << 4;
188227
buffer[buffsize++] = 0x00;
189-
}
190-
if ((uint8_t)inputString[0] >> 4 == MQTTSUBSCRIBE) {
191-
buffer[buffsize++] = MQTTSUBACK << 4; // Just ack everything, we actually dont really care!
192-
buffer[buffsize++] = 0x03; // Remaining length
228+
} else if (mqttMsgType == MQTTSUBSCRIBE) {
229+
buffer[buffsize++] = MQTTSUBACK << 4; // Just ack everything, we actually dont really care!
230+
buffer[buffsize++] = 0x03; // Remaining length
193231
buffer[buffsize++] = (uint8_t)inputString[2]; // Message ID MSB
194232
buffer[buffsize++] = (uint8_t)inputString[3]; // Message ID LSB
195-
buffer[buffsize++] = MQTTQOS0; // QOS level
196-
}
197-
if ((uint8_t)inputString[0] >> 4 == MQTTUNSUBSCRIBE) {
233+
buffer[buffsize++] = MQTTQOS0; // QOS level
234+
} else if (mqttMsgType == MQTTUNSUBSCRIBE) {
198235
buffer[buffsize++] = MQTTUNSUBACK << 4;
199-
buffer[buffsize++] = 0x02; // Remaining length
236+
buffer[buffsize++] = 0x02; // Remaining length
200237
buffer[buffsize++] = (uint8_t)inputString[2]; // Message ID MSB
201238
buffer[buffsize++] = (uint8_t)inputString[3]; // Message ID LSB
239+
} else if (mqttMsgType == MQTTDISCONNECT) {
240+
MQTTClients--; // Client disconnected!
202241
}
203-
if ((uint8_t)inputString[0] >> 4 == MQTTDISCONNECT) {
204-
MQTTClientConnected=false; // We lost connection!
205-
}
242+
206243
if (buffsize > 0) {
207-
dataCallback(buffer,&buffsize);
244+
dataCallback(buffer, &buffsize);
208245
}
209246

210247
// We publish everything we get, we dont care if its subscribed or not!
211-
if ((uint8_t)inputString[0] >> 4 == MQTTPUBLISH || (MQTT_SEND_SUBSCRIPTION && (uint8_t)inputString[0] >> 4 == MQTTSUBSCRIBE)) {
212-
buffer[0]= 0;
248+
if (mqttMsgType == MQTTPUBLISH || (MQTT_SEND_SUBSCRIPTION && mqttMsgType == MQTTSUBSCRIBE)) {
249+
buffer[0] = 0;
213250
buffsize = 0;
214251
// Cut out address and payload depending on message type.
215-
if ((uint8_t)inputString[0] >> 4 == MQTTSUBSCRIBE) {
216-
strncat(buffer,inputString+6,inputString[5]);
252+
if (mqttMsgType == MQTTSUBSCRIBE) {
253+
strncat(buffer, inputString+6, inputString[5]);
217254
} else {
218-
strncat(buffer,inputString+4,inputString[3]);
255+
strncat(buffer, inputString+4, inputString[3]);
219256
}
220257

221258
// TODO: Check if we should send ack or not.
222-
for (str = strtok_r(buffer,"/",&p) ; str && i<4 ; str = strtok_r(NULL,"/",&p)) {
259+
for (str = strtok_r(buffer, "/", &p) ; str && i<4 ; str = strtok_r(NULL, "/", &p)) {
223260
if (i == 0) {
224-
if (strcmp_P(str,broker)!=0) { //look for MQTT_BROKER_PREFIX
225-
return; //Message not for us or malformatted!
261+
//look for MQTT_BROKER_PREFIX
262+
if (strcmp_P(str, broker) != 0) {
263+
//Message not for us or malformatted!
264+
return;
226265
}
227266
} else if (i==1) {
228-
msg.destination = atoi(str); //NodeID
267+
//NodeID
268+
msg.destination = atoi(str);
229269
} else if (i==2) {
230-
msg.sensor = atoi(str); //SensorID
270+
//SensorID
271+
msg.sensor = atoi(str);
231272
} else if (i==3) {
232-
char match=0; //SensorType
233-
//strcpy(str,(char*)&str[2]); //Strip V_
273+
//SensorType
274+
char match=0;
234275

235276
for (uint8_t j=0; strcpy_P(convBuf, (char*)pgm_read_word(&(vType[j]))) ; j++) {
236-
if (strcmp((char*)&str[2],convBuf)==0) { //Strip V_ and compare
277+
//Strip V_ and compare
278+
if (strcmp((char*)&str[2], convBuf)==0) {
237279
match=j;
238280
break;
239281
}
@@ -245,70 +287,58 @@ void MyMQTT::processMQTTMessage(char *inputString, uint8_t inputPos) {
245287
msg.type = match;
246288
}
247289
i++;
248-
} //Check if packge has payload
249-
if ((uint8_t)inputString[1] > (uint8_t)(inputString[3]+2) && !((uint8_t)inputString[0] >> 4 == MQTTSUBSCRIBE)) {
250-
strcpy(convBuf,inputString+(inputString[3]+4));
251-
msg.set(convBuf); //Payload
252-
} else {
253-
msg.set(""); //No payload
254290
}
291+
292+
// Check if package has payload
293+
if (mqttMsgType == MQTTPUBLISH) {
294+
uint8_t length = (uint8_t)inputString[1] - (uint8_t)(inputString[3]+2);
295+
if (length && length < MAX_PAYLOAD*2) {
296+
// Payload
297+
memcpy(convBuf, inputString+(inputString[3]+4), length);
298+
convBuf[length] = 0;
299+
payload = convBuf;
300+
}
301+
}
302+
msg.set(payload);
303+
255304
txBlink(1);
256305
if (!sendRoute(build(msg, GATEWAY_ADDRESS, msg.destination, msg.sensor, C_SET, msg.type, 0))) errBlink(1);
257-
258306
}
259307
}
260308

261309
void MyMQTT::SendMQTT(MyMessage &msg) {
262310
buffsize = 0;
263-
if (!MQTTClientConnected) return; //We have no connections - return
264-
if (msg.isAck()) {
265-
Serial.println("msg is ack!");
266-
if (msg.sender==255 && mGetCommand(msg)==C_INTERNAL && msg.type==I_ID_REQUEST) {
267-
// TODO: sending ACK request on id_response fucks node up. doesn't work.
268-
// The idea was to confirm id and save to EEPROM_LATEST_NODE_ADDRESS.
269-
}
270-
} else {
271-
// we have to check every message if its a newly assigned id or not.
272-
// Ack on I_ID_RESPONSE does not work, and checking on C_PRESENTATION isn't reliable.
273-
uint8_t newNodeID = loadState(EEPROM_LATEST_NODE_ADDRESS)+1;
274-
if (newNodeID <= MQTT_FIRST_SENSORID) newNodeID = MQTT_FIRST_SENSORID;
275-
if (msg.sender==newNodeID) {
276-
saveState(EEPROM_LATEST_NODE_ADDRESS,newNodeID);
277-
}
278-
if (mGetCommand(msg)==C_INTERNAL) {
279-
if (msg.type==I_CONFIG) {
280-
txBlink(1);
281-
if (!sendRoute(build(msg, GATEWAY_ADDRESS, msg.sender, 255, C_INTERNAL, I_CONFIG, 0).set("M"))) errBlink(1);
282-
} else if (msg.type==I_ID_REQUEST && msg.sender==255) {
283-
uint8_t newNodeID = loadState(EEPROM_LATEST_NODE_ADDRESS)+1;
284-
if (newNodeID <= MQTT_FIRST_SENSORID) newNodeID = MQTT_FIRST_SENSORID;
285-
if (newNodeID >= MQTT_LAST_SENSORID) return; // Sorry no more id's left :(
286-
txBlink(1);
287-
if (!sendRoute(build(msg, GATEWAY_ADDRESS, msg.sender, 255, C_INTERNAL, I_ID_RESPONSE, 0).set(newNodeID))) errBlink(1);
288-
}
289-
} else if (mGetCommand(msg)!=0) {
290-
if (mGetCommand(msg)==3) msg.type=msg.type+(S_FIRSTCUSTOM-10); //Special message
291-
292-
buffer[buffsize++] = MQTTPUBLISH << 4; // 0:
293-
buffer[buffsize++] = 0x09; // 1: Remaining length with no payload, we'll set this later to correct value, buffsize -2
294-
buffer[buffsize++] = 0x00; // 2: Length MSB (Remaing length can never exceed ff,so MSB must be 0!)
295-
buffer[buffsize++] = 0x08; // 3: Length LSB (ADDR), We'll set this later
296-
if (msg.type > V_TOTAL) msg.type=V_TOTAL;// If type > defined types set to unknown.
297-
strcpy_P(buffer+4, broker);
298-
buffsize+=strlen_P(broker);
299-
buffsize+=sprintf(&buffer[buffsize],"/%i/%i/V_%s",msg.sender,msg.sensor,getType(convBuf, &vType[msg.type]));
300-
buffer[3]=buffsize-4; // Set correct address length on byte 4.
311+
if (!MQTTClients) {
312+
//We have no clients connected - return
313+
return;
314+
}
315+
316+
if (mGetCommand(msg) == C_INTERNAL) {
317+
//Special message
318+
msg.type = msg.type+(S_FIRSTCUSTOM-10);
319+
}
320+
321+
buffer[buffsize++] = MQTTPUBLISH << 4; // 0:
322+
buffer[buffsize++] = 0x09; // 1: Remaining length with no payload, we'll set this later to correct value, buffsize -2
323+
buffer[buffsize++] = 0x00; // 2: Length MSB (Remaing length can never exceed ff,so MSB must be 0!)
324+
buffer[buffsize++] = 0x08; // 3: Length LSB (ADDR), We'll set this later
325+
if (msg.type > V_TOTAL) {
326+
// If type > defined types set to unknown.
327+
msg.type=V_TOTAL;
328+
}
329+
strcpy_P(buffer+4, broker);
330+
buffsize+=strlen_P(broker);
331+
buffsize+=sprintf(&buffer[buffsize], "/%i/%i/V_%s", msg.sender, msg.sensor, getType(convBuf, &vType[msg.type]));
332+
buffer[3]=buffsize-4; // Set correct address length on byte 4.
301333
#ifdef DEBUG
302-
Serial.println((char*)&buffer[4]);
334+
Serial.println((char*)&buffer[4]);
303335
#endif
304-
msg.getString(convBuf);
305-
for (uint8_t a=0; a<strlen(convBuf); a++) {// Payload
306-
buffer[buffsize++] = convBuf[a];
307-
}
308-
buffer[1]=buffsize-2; // Set correct Remaining length on byte 2.
309-
dataCallback(buffer,&buffsize);
310-
}
336+
msg.getString(convBuf);
337+
for (uint8_t a=0; a<strlen(convBuf); a++) { // Payload
338+
buffer[buffsize++] = convBuf[a];
311339
}
340+
buffer[1]=buffsize-2; // Set correct Remaining length on byte 2.
341+
dataCallback(buffer, &buffsize);
312342
}
313343

314344
void MyMQTT::rxBlink(uint8_t cnt) {
@@ -322,5 +352,3 @@ void MyMQTT::txBlink(uint8_t cnt) {
322352
void MyMQTT::errBlink(uint8_t cnt) {
323353
if(countErr == 255) { countErr = cnt; }
324354
}
325-
326-

libraries/MySensors/MyMQTT.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ public MySensor {
7171
void txBlink(uint8_t cnt);
7272
void errBlink(uint8_t cnt);
7373

74-
bool MQTTClientConnected;
74+
uint8_t MQTTClients;
7575
char buffer[MQTT_MAX_PACKET_SIZE];
7676
char convBuf[MAX_PAYLOAD*2+1];
7777
uint8_t buffsize;

libraries/MySensors/examples/MQTTGateway/MQTTGateway.ino

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,7 @@ void processEthernetMessages() {
100100
uint8_t inChar = client.read();
101101
readCnt++;
102102

103-
if (inputSize < MQTT_MAX_PACKET_SIZE-1) {
103+
if (inputSize < MQTT_MAX_PACKET_SIZE) {
104104
inputString[inputSize] = (char)inChar;
105105
inputSize++;
106106
}
@@ -112,7 +112,6 @@ void processEthernetMessages() {
112112
break;
113113
}
114114
}
115-
inputString[inputSize] = 0;
116115
#ifdef TCPDUMP
117116
Serial.print("<<");
118117
char buf[4];

0 commit comments

Comments
 (0)