@@ -310,7 +310,74 @@ uint16_t PubSubClient::readPacket(uint8_t* lengthLength) {
310
310
return len;
311
311
}
312
312
313
+ bool PubSubClient::loop_read () {
314
+ if (_client == nullptr ) {
315
+ return false ;
316
+ }
317
+ if (!_client->available ()) {
318
+ return false ;
319
+ }
320
+ uint8_t llen;
321
+ uint16_t len = readPacket (&llen);
322
+ if (len == 0 ) {
323
+ return false ;
324
+ }
325
+ unsigned long t = millis ();
326
+ lastInActivity = t;
327
+ uint8_t type = buffer[0 ]&0xF0 ;
328
+
329
+ switch (type) {
330
+ case MQTTPUBLISH:
331
+ {
332
+ if (callback) {
333
+ uint16_t tl = (buffer[llen+1 ]<<8 )+buffer[llen+2 ]; /* topic length in bytes */
334
+ memmove (buffer+llen+2 ,buffer+llen+3 ,tl); /* move topic inside buffer 1 byte to front */
335
+ buffer[llen+2 +tl] = 0 ; /* end the topic as a 'C' string with \x00 */
336
+ char *topic = (char *) buffer+llen+2 ;
337
+ uint8_t *payload;
338
+ // msgId only present for QOS>0
339
+ if ((buffer[0 ]&0x06 ) == MQTTQOS1) {
340
+ const uint16_t msgId = (buffer[llen+3 +tl]<<8 )+buffer[llen+3 +tl+1 ];
341
+ payload = buffer+llen+3 +tl+2 ;
342
+ callback (topic,payload,len-llen-3 -tl-2 );
343
+ if (_client->connected ()) {
344
+ buffer[0 ] = MQTTPUBACK;
345
+ buffer[1 ] = 2 ;
346
+ buffer[2 ] = (msgId >> 8 );
347
+ buffer[3 ] = (msgId & 0xFF );
348
+ if (_client->write (buffer,4 ) != 0 ) {
349
+ lastOutActivity = t;
350
+ }
351
+ }
352
+ } else {
353
+ payload = buffer+llen+3 +tl;
354
+ callback (topic,payload,len-llen-3 -tl);
355
+ }
356
+ }
357
+ break ;
358
+ }
359
+ case MQTTPINGREQ:
360
+ {
361
+ if (_client->connected ()) {
362
+ buffer[0 ] = MQTTPINGRESP;
363
+ buffer[1 ] = 0 ;
364
+ _client->write (buffer,2 );
365
+ }
366
+ break ;
367
+ }
368
+ case MQTTPINGRESP:
369
+ {
370
+ pingOutstanding = false ;
371
+ break ;
372
+ }
373
+ default :
374
+ return false ;
375
+ }
376
+ return true ;
377
+ }
378
+
313
379
boolean PubSubClient::loop () {
380
+ loop_read ();
314
381
if (connected ()) {
315
382
unsigned long t = millis ();
316
383
if ((t - lastInActivity > MQTT_KEEPALIVE*1000UL ) || (t - lastOutActivity > MQTT_KEEPALIVE*1000UL )) {
@@ -328,50 +395,6 @@ boolean PubSubClient::loop() {
328
395
pingOutstanding = true ;
329
396
}
330
397
}
331
- if (_client->available ()) {
332
- uint8_t llen;
333
- uint16_t len = readPacket (&llen);
334
- uint16_t msgId = 0 ;
335
- uint8_t *payload;
336
- if (len > 0 ) {
337
- lastInActivity = t;
338
- uint8_t type = buffer[0 ]&0xF0 ;
339
- if (type == MQTTPUBLISH) {
340
- if (callback) {
341
- uint16_t tl = (buffer[llen+1 ]<<8 )+buffer[llen+2 ]; /* topic length in bytes */
342
- memmove (buffer+llen+2 ,buffer+llen+3 ,tl); /* move topic inside buffer 1 byte to front */
343
- buffer[llen+2 +tl] = 0 ; /* end the topic as a 'C' string with \x00 */
344
- char *topic = (char *) buffer+llen+2 ;
345
- // msgId only present for QOS>0
346
- if ((buffer[0 ]&0x06 ) == MQTTQOS1) {
347
- msgId = (buffer[llen+3 +tl]<<8 )+buffer[llen+3 +tl+1 ];
348
- payload = buffer+llen+3 +tl+2 ;
349
- callback (topic,payload,len-llen-3 -tl-2 );
350
-
351
- buffer[0 ] = MQTTPUBACK;
352
- buffer[1 ] = 2 ;
353
- buffer[2 ] = (msgId >> 8 );
354
- buffer[3 ] = (msgId & 0xFF );
355
- if (_client->write (buffer,4 ) != 0 ) {
356
- lastOutActivity = t;
357
- }
358
- } else {
359
- payload = buffer+llen+3 +tl;
360
- callback (topic,payload,len-llen-3 -tl);
361
- }
362
- }
363
- } else if (type == MQTTPINGREQ) {
364
- buffer[0 ] = MQTTPINGRESP;
365
- buffer[1 ] = 0 ;
366
- _client->write (buffer,2 );
367
- } else if (type == MQTTPINGRESP) {
368
- pingOutstanding = false ;
369
- }
370
- } else if (!connected ()) {
371
- // readPacket has closed the connection
372
- return false ;
373
- }
374
- }
375
398
return true ;
376
399
}
377
400
return false ;
0 commit comments