From 855d1b1a150f1fbd4a63ed77d30e9c2d67b838f3 Mon Sep 17 00:00:00 2001 From: me-no-dev Date: Thu, 9 Sep 2021 15:39:59 +0300 Subject: [PATCH 1/2] Use Ringbuf for CDC write to improve responsiveness and throughput --- cores/esp32/USBCDC.cpp | 133 ++++++++++++++++++++++++++--------------- cores/esp32/USBCDC.h | 5 +- 2 files changed, 90 insertions(+), 48 deletions(-) diff --git a/cores/esp32/USBCDC.cpp b/cores/esp32/USBCDC.cpp index ce2380feebf..6de02e97eb9 100644 --- a/cores/esp32/USBCDC.cpp +++ b/cores/esp32/USBCDC.cpp @@ -67,58 +67,72 @@ void tud_cdc_send_break_cb(uint8_t itf, uint16_t duration_ms){ // Invoked when space becomes available in TX buffer void tud_cdc_tx_complete_cb(uint8_t itf){ - if(itf < MAX_USB_CDC_DEVICES && devices[itf] != NULL && devices[itf]->tx_sem != NULL){ - xSemaphoreGive(devices[itf]->tx_sem); + if(itf < MAX_USB_CDC_DEVICES && devices[itf] != NULL){ devices[itf]->_onTX(); } } -static size_t tinyusb_cdc_write(uint8_t itf, const uint8_t *buffer, size_t size){ - if(itf >= MAX_USB_CDC_DEVICES || devices[itf] == NULL || devices[itf]->tx_sem == NULL){ - return 0; - } +static size_t ring_buf_flush(uint8_t itf, RingbufHandle_t ring_buf){ if(!tud_cdc_n_connected(itf)){ return 0; } - size_t tosend = size, sofar = 0; - while(tosend){ - uint32_t space = tud_cdc_n_write_available(itf); - if(!space){ - //make sure that we do not get previous semaphore - xSemaphoreTake(devices[itf]->tx_sem, 0); - //wait for tx_complete - if(xSemaphoreTake(devices[itf]->tx_sem, 200 / portTICK_PERIOD_MS) == pdTRUE){ - space = tud_cdc_n_write_available(itf); - } - if(!space){ - return sofar; - } + size_t max_size = tud_cdc_n_write_available(itf); + size_t queued_size = 0; + if(max_size == 0){ + if(tud_cdc_n_write_flush(itf) > 0){ + // no space but we were able to flush some out + max_size = tud_cdc_n_write_available(itf); } - if(tosend < space){ - space = tosend; + if(max_size == 0){ + // no space and could not flush + return 0; } - uint32_t sent = tud_cdc_n_write(itf, buffer + sofar, space); - if(!sent){ - return sofar; - } - sofar += sent; - tosend -= sent; + } + uint8_t *queued_buff = (uint8_t *)xRingbufferReceiveUpTo(ring_buf, &queued_size, 0, max_size); + if (queued_buff == NULL) { + //log_d("nothing to send"); + return 0; + } + size_t sent = tud_cdc_n_write(itf, queued_buff, queued_size); + if(sent && sent < max_size){ tud_cdc_n_write_flush(itf); - //xSemaphoreTake(devices[itf]->tx_sem, portMAX_DELAY); } - return sofar; + vRingbufferReturnItem(ring_buf, queued_buff); + return sent; } -static void ARDUINO_ISR_ATTR cdc0_write_char(char c) -{ - tinyusb_cdc_write(0, (const uint8_t *)&c, 1); +static size_t ring_buf_write_nb(uint8_t itf, RingbufHandle_t ring_buf, const uint8_t *buffer, size_t size){ + if(buffer == NULL || ring_buf == NULL || size == 0){ + return 0; + } + size_t space = xRingbufferGetCurFreeSize(ring_buf); + if(space == 0){ + // ring buff is full, maybe we can flush some? + if(ring_buf_flush(itf, ring_buf)){ + space = xRingbufferGetCurFreeSize(ring_buf); + } + } + if(size > space){ + size = space; + } + if(size && xRingbufferSend(ring_buf, (void*) (buffer), size, 0) != pdTRUE){ + return 0; + } + ring_buf_flush(itf, ring_buf); + return size; +} + +static void ARDUINO_ISR_ATTR cdc0_write_char(char c){ + if(devices[0] != NULL){ + devices[0]->write(c); + } } static void usb_unplugged_cb(void* arg, esp_event_base_t event_base, int32_t event_id, void* event_data){ ((USBCDC*)arg)->_onUnplugged(); } -USBCDC::USBCDC(uint8_t itfn) : itf(itfn), bit_rate(0), stop_bits(0), parity(0), data_bits(0), dtr(false), rts(false), connected(false), reboot_enable(true), rx_queue(NULL), tx_sem(NULL) { +USBCDC::USBCDC(uint8_t itfn) : itf(itfn), bit_rate(0), stop_bits(0), parity(0), data_bits(0), dtr(false), rts(false), connected(false), reboot_enable(true), rx_queue(NULL), tx_ring_buf(NULL) { tinyusb_enable_interface(USB_INTERFACE_CDC, TUD_CDC_DESC_LEN, load_cdc_descriptor); if(itf < MAX_USB_CDC_DEVICES){ arduino_usb_event_handler_register_with(ARDUINO_USB_EVENTS, ARDUINO_USB_STOPPED_EVENT, usb_unplugged_cb, this); @@ -151,13 +165,25 @@ size_t USBCDC::setRxBufferSize(size_t rx_queue_len){ return rx_queue_len; } +size_t USBCDC::setTxBufferSize(size_t tx_queue_len){ + if(tx_ring_buf){ + if(!tx_queue_len){ + vRingbufferDelete(tx_ring_buf); + tx_ring_buf = NULL; + } + return 0; + } + tx_ring_buf = xRingbufferCreate(tx_queue_len, RINGBUF_TYPE_BYTEBUF); + if(!tx_ring_buf){ + return 0; + } + return tx_queue_len; +} + void USBCDC::begin(unsigned long baud) { - if(tx_sem == NULL){ - tx_sem = xSemaphoreCreateBinary(); - xSemaphoreTake(tx_sem, 0); - } setRxBufferSize(256);//default if not preset + setTxBufferSize(256);//default if not preset devices[itf] = this; } @@ -166,10 +192,7 @@ void USBCDC::end() connected = false; devices[itf] = NULL; setRxBufferSize(0); - if (tx_sem != NULL) { - vSemaphoreDelete(tx_sem); - tx_sem = NULL; - } + setTxBufferSize(0); } void USBCDC::_onUnplugged(void){ @@ -278,6 +301,7 @@ void USBCDC::_onRX(){ } void USBCDC::_onTX(){ + ring_buf_flush(itf, tx_ring_buf); arduino_usb_cdc_event_data_t p = {0}; arduino_usb_event_post(ARDUINO_USB_CDC_EVENTS, ARDUINO_USB_CDC_TX_EVENT, &p, sizeof(arduino_usb_cdc_event_data_t), portMAX_DELAY); } @@ -336,23 +360,38 @@ size_t USBCDC::read(uint8_t *buffer, size_t size) void USBCDC::flush(void) { - if(itf >= MAX_USB_CDC_DEVICES || tx_sem == NULL){ + if(itf >= MAX_USB_CDC_DEVICES || tx_ring_buf == NULL){ return; } - tud_cdc_n_write_flush(itf); + UBaseType_t uxItemsWaiting = 0; + vRingbufferGetInfo(tx_ring_buf, NULL, NULL, NULL, NULL, &uxItemsWaiting); + while(uxItemsWaiting){ + ring_buf_flush(itf, tx_ring_buf); + delay(5); + vRingbufferGetInfo(tx_ring_buf, NULL, NULL, NULL, NULL, &uxItemsWaiting); + } } int USBCDC::availableForWrite(void) { - if(itf >= MAX_USB_CDC_DEVICES || tx_sem == NULL){ - return -1; + if(itf >= MAX_USB_CDC_DEVICES || tx_ring_buf == NULL){ + return 0; } - return tud_cdc_n_write_available(itf); + return xRingbufferGetCurFreeSize(tx_ring_buf); } size_t USBCDC::write(const uint8_t *buffer, size_t size) { - return tinyusb_cdc_write(itf, buffer, size); + if(itf >= MAX_USB_CDC_DEVICES || tx_ring_buf == NULL){ + return 0; + } + size_t sent = ring_buf_write_nb(itf, tx_ring_buf, buffer, size); + if(sent < size){ + //log_w("sent %u less bytes", size - sent); + //maybe wait to send the rest? + //xRingbufferSend(tx_ring_buf, (void*) (buffer+sent), size-sent, timeout_ms / portTICK_PERIOD_MS) + } + return sent; } size_t USBCDC::write(uint8_t c) diff --git a/cores/esp32/USBCDC.h b/cores/esp32/USBCDC.h index ced588f45f2..daf164cc292 100644 --- a/cores/esp32/USBCDC.h +++ b/cores/esp32/USBCDC.h @@ -18,6 +18,8 @@ #include #include "esp_event.h" +#include "freertos/FreeRTOS.h" +#include "freertos/ringbuf.h" #include "Stream.h" ESP_EVENT_DECLARE_BASE(ARDUINO_USB_CDC_EVENTS); @@ -59,6 +61,7 @@ class USBCDC: public Stream void onEvent(arduino_usb_cdc_event_t event, esp_event_handler_t callback); size_t setRxBufferSize(size_t); + size_t setTxBufferSize(size_t); void begin(unsigned long baud=0); void end(); @@ -113,7 +116,6 @@ class USBCDC: public Stream void _onRX(void); void _onTX(void); void _onUnplugged(void); - xSemaphoreHandle tx_sem; protected: uint8_t itf; @@ -126,6 +128,7 @@ class USBCDC: public Stream bool connected; bool reboot_enable; xQueueHandle rx_queue; + RingbufHandle_t tx_ring_buf; }; From c2a645d91da52872bf319e4e36d22452468ca887 Mon Sep 17 00:00:00 2001 From: me-no-dev Date: Wed, 15 Sep 2021 10:45:24 +0300 Subject: [PATCH 2/2] Do not use tx_complete. It is not reliable --- cores/esp32/USBCDC.cpp | 162 +++++++++++++++++++---------------------- cores/esp32/USBCDC.h | 10 ++- 2 files changed, 82 insertions(+), 90 deletions(-) diff --git a/cores/esp32/USBCDC.cpp b/cores/esp32/USBCDC.cpp index 6de02e97eb9..bb3140035cd 100644 --- a/cores/esp32/USBCDC.cpp +++ b/cores/esp32/USBCDC.cpp @@ -62,7 +62,7 @@ void tud_cdc_rx_cb(uint8_t itf) // Invoked when received send break void tud_cdc_send_break_cb(uint8_t itf, uint16_t duration_ms){ - //isr_log_v("itf: %u, duration_ms: %u", itf, duration_ms); + //log_v("itf: %u, duration_ms: %u", itf, duration_ms); } // Invoked when space becomes available in TX buffer @@ -72,56 +72,6 @@ void tud_cdc_tx_complete_cb(uint8_t itf){ } } -static size_t ring_buf_flush(uint8_t itf, RingbufHandle_t ring_buf){ - if(!tud_cdc_n_connected(itf)){ - return 0; - } - size_t max_size = tud_cdc_n_write_available(itf); - size_t queued_size = 0; - if(max_size == 0){ - if(tud_cdc_n_write_flush(itf) > 0){ - // no space but we were able to flush some out - max_size = tud_cdc_n_write_available(itf); - } - if(max_size == 0){ - // no space and could not flush - return 0; - } - } - uint8_t *queued_buff = (uint8_t *)xRingbufferReceiveUpTo(ring_buf, &queued_size, 0, max_size); - if (queued_buff == NULL) { - //log_d("nothing to send"); - return 0; - } - size_t sent = tud_cdc_n_write(itf, queued_buff, queued_size); - if(sent && sent < max_size){ - tud_cdc_n_write_flush(itf); - } - vRingbufferReturnItem(ring_buf, queued_buff); - return sent; -} - -static size_t ring_buf_write_nb(uint8_t itf, RingbufHandle_t ring_buf, const uint8_t *buffer, size_t size){ - if(buffer == NULL || ring_buf == NULL || size == 0){ - return 0; - } - size_t space = xRingbufferGetCurFreeSize(ring_buf); - if(space == 0){ - // ring buff is full, maybe we can flush some? - if(ring_buf_flush(itf, ring_buf)){ - space = xRingbufferGetCurFreeSize(ring_buf); - } - } - if(size > space){ - size = space; - } - if(size && xRingbufferSend(ring_buf, (void*) (buffer), size, 0) != pdTRUE){ - return 0; - } - ring_buf_flush(itf, ring_buf); - return size; -} - static void ARDUINO_ISR_ATTR cdc0_write_char(char c){ if(devices[0] != NULL){ devices[0]->write(c); @@ -132,7 +82,20 @@ static void usb_unplugged_cb(void* arg, esp_event_base_t event_base, int32_t eve ((USBCDC*)arg)->_onUnplugged(); } -USBCDC::USBCDC(uint8_t itfn) : itf(itfn), bit_rate(0), stop_bits(0), parity(0), data_bits(0), dtr(false), rts(false), connected(false), reboot_enable(true), rx_queue(NULL), tx_ring_buf(NULL) { +USBCDC::USBCDC(uint8_t itfn) +: itf(itfn) +, bit_rate(0) +, stop_bits(0) +, parity(0) +, data_bits(0) +, dtr(false) +, rts(false) +, connected(false) +, reboot_enable(true) +, rx_queue(NULL) +, tx_lock(NULL) +, tx_timeout_ms(250) +{ tinyusb_enable_interface(USB_INTERFACE_CDC, TUD_CDC_DESC_LEN, load_cdc_descriptor); if(itf < MAX_USB_CDC_DEVICES){ arduino_usb_event_handler_register_with(ARDUINO_USB_EVENTS, ARDUINO_USB_STOPPED_EVENT, usb_unplugged_cb, this); @@ -165,25 +128,12 @@ size_t USBCDC::setRxBufferSize(size_t rx_queue_len){ return rx_queue_len; } -size_t USBCDC::setTxBufferSize(size_t tx_queue_len){ - if(tx_ring_buf){ - if(!tx_queue_len){ - vRingbufferDelete(tx_ring_buf); - tx_ring_buf = NULL; - } - return 0; - } - tx_ring_buf = xRingbufferCreate(tx_queue_len, RINGBUF_TYPE_BYTEBUF); - if(!tx_ring_buf){ - return 0; - } - return tx_queue_len; -} - void USBCDC::begin(unsigned long baud) { + if(tx_lock == NULL) { + tx_lock = xSemaphoreCreateMutex(); + } setRxBufferSize(256);//default if not preset - setTxBufferSize(256);//default if not preset devices[itf] = this; } @@ -192,7 +142,13 @@ void USBCDC::end() connected = false; devices[itf] = NULL; setRxBufferSize(0); - setTxBufferSize(0); + if(tx_lock != NULL) { + vSemaphoreDelete(tx_lock); + } +} + +void USBCDC::setTxTimeoutMs(uint32_t timeout){ + tx_timeout_ms = timeout; } void USBCDC::_onUnplugged(void){ @@ -301,7 +257,6 @@ void USBCDC::_onRX(){ } void USBCDC::_onTX(){ - ring_buf_flush(itf, tx_ring_buf); arduino_usb_cdc_event_data_t p = {0}; arduino_usb_event_post(ARDUINO_USB_CDC_EVENTS, ARDUINO_USB_CDC_TX_EVENT, &p, sizeof(arduino_usb_cdc_event_data_t), portMAX_DELAY); } @@ -360,38 +315,73 @@ size_t USBCDC::read(uint8_t *buffer, size_t size) void USBCDC::flush(void) { - if(itf >= MAX_USB_CDC_DEVICES || tx_ring_buf == NULL){ + if(itf >= MAX_USB_CDC_DEVICES || tx_lock == NULL || !tud_cdc_n_connected(itf)){ return; } - UBaseType_t uxItemsWaiting = 0; - vRingbufferGetInfo(tx_ring_buf, NULL, NULL, NULL, NULL, &uxItemsWaiting); - while(uxItemsWaiting){ - ring_buf_flush(itf, tx_ring_buf); - delay(5); - vRingbufferGetInfo(tx_ring_buf, NULL, NULL, NULL, NULL, &uxItemsWaiting); + if(xSemaphoreTake(tx_lock, tx_timeout_ms / portTICK_PERIOD_MS) != pdPASS){ + return; } + tud_cdc_n_write_flush(itf); + xSemaphoreGive(tx_lock); } int USBCDC::availableForWrite(void) { - if(itf >= MAX_USB_CDC_DEVICES || tx_ring_buf == NULL){ + if(itf >= MAX_USB_CDC_DEVICES || tx_lock == NULL || !tud_cdc_n_connected(itf)){ + return 0; + } + if(xSemaphoreTake(tx_lock, tx_timeout_ms / portTICK_PERIOD_MS) != pdPASS){ return 0; } - return xRingbufferGetCurFreeSize(tx_ring_buf); + size_t a = tud_cdc_n_write_available(itf); + xSemaphoreGive(tx_lock); + return a; } size_t USBCDC::write(const uint8_t *buffer, size_t size) { - if(itf >= MAX_USB_CDC_DEVICES || tx_ring_buf == NULL){ + if(itf >= MAX_USB_CDC_DEVICES || tx_lock == NULL || buffer == NULL || size == 0 || !tud_cdc_n_connected(itf)){ return 0; } - size_t sent = ring_buf_write_nb(itf, tx_ring_buf, buffer, size); - if(sent < size){ - //log_w("sent %u less bytes", size - sent); - //maybe wait to send the rest? - //xRingbufferSend(tx_ring_buf, (void*) (buffer+sent), size-sent, timeout_ms / portTICK_PERIOD_MS) + if(xPortInIsrContext()){ + BaseType_t taskWoken = false; + if(xSemaphoreTakeFromISR(tx_lock, &taskWoken) != pdPASS){ + return 0; + } + } else if(xSemaphoreTake(tx_lock, tx_timeout_ms / portTICK_PERIOD_MS) != pdPASS){ + return 0; + } + size_t to_send = size, so_far = 0; + while(to_send){ + if(!tud_cdc_n_connected(itf)){ + size = so_far; + break; + } + size_t space = tud_cdc_n_write_available(itf); + if(!space){ + tud_cdc_n_write_flush(itf); + continue; + } + if(space > to_send){ + space = to_send; + } + size_t sent = tud_cdc_n_write(itf, buffer+so_far, space); + if(sent){ + so_far += sent; + to_send -= sent; + tud_cdc_n_write_flush(itf); + } else { + size = so_far; + break; + } } - return sent; + if(xPortInIsrContext()){ + BaseType_t taskWoken = false; + xSemaphoreGiveFromISR(tx_lock, &taskWoken); + } else { + xSemaphoreGive(tx_lock); + } + return size; } size_t USBCDC::write(uint8_t c) diff --git a/cores/esp32/USBCDC.h b/cores/esp32/USBCDC.h index daf164cc292..3dfe7d9e69c 100644 --- a/cores/esp32/USBCDC.h +++ b/cores/esp32/USBCDC.h @@ -19,7 +19,8 @@ #include #include "esp_event.h" #include "freertos/FreeRTOS.h" -#include "freertos/ringbuf.h" +#include "freertos/queue.h" +#include "freertos/semphr.h" #include "Stream.h" ESP_EVENT_DECLARE_BASE(ARDUINO_USB_CDC_EVENTS); @@ -60,8 +61,8 @@ class USBCDC: public Stream void onEvent(esp_event_handler_t callback); void onEvent(arduino_usb_cdc_event_t event, esp_event_handler_t callback); - size_t setRxBufferSize(size_t); - size_t setTxBufferSize(size_t); + size_t setRxBufferSize(size_t size); + void setTxTimeoutMs(uint32_t timeout); void begin(unsigned long baud=0); void end(); @@ -128,7 +129,8 @@ class USBCDC: public Stream bool connected; bool reboot_enable; xQueueHandle rx_queue; - RingbufHandle_t tx_ring_buf; + xSemaphoreHandle tx_lock; + uint32_t tx_timeout_ms; };