Skip to content

Rework cbuf to use FreeRTOS Ringbuffer #7860

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 12 commits into from
Feb 7, 2024
277 changes: 185 additions & 92 deletions cores/esp32/cbuf.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,178 +19,271 @@
*/

#include "cbuf.h"
#include "esp32-hal-log.h"

#if CONFIG_DISABLE_HAL_LOCKS
#define CBUF_MUTEX_CREATE()
#define CBUF_MUTEX_LOCK()
#define CBUF_MUTEX_UNLOCK()
#define CBUF_MUTEX_DELETE()
#else
#define CBUF_MUTEX_CREATE() if(_lock == NULL){_lock = xSemaphoreCreateMutex(); if(_lock == NULL){log_e("failed to create mutex");}}
#define CBUF_MUTEX_LOCK() if(_lock != NULL){xSemaphoreTakeRecursive(_lock, portMAX_DELAY);}
#define CBUF_MUTEX_UNLOCK() if(_lock != NULL){xSemaphoreGiveRecursive(_lock);}
#define CBUF_MUTEX_DELETE() if(_lock != NULL){SemaphoreHandle_t l = _lock; _lock = NULL; vSemaphoreDelete(l);}
#endif

cbuf::cbuf(size_t size) :
next(NULL), _size(size+1), _buf(new char[size+1]), _bufend(_buf + size + 1), _begin(_buf), _end(_begin)
next(NULL),
has_peek(false),
_buf(xRingbufferCreate(size, RINGBUF_TYPE_BYTEBUF))
{
if(_buf == NULL) {
log_e("failed to allocate ring buffer");
}
CBUF_MUTEX_CREATE();
}

cbuf::~cbuf()
{
delete[] _buf;
CBUF_MUTEX_LOCK();
if(_buf != NULL){
RingbufHandle_t b = _buf;
_buf = NULL;
vRingbufferDelete(b);
}
CBUF_MUTEX_UNLOCK();
CBUF_MUTEX_DELETE();
}

size_t cbuf::resizeAdd(size_t addSize)
{
return resize(_size + addSize);
return resize(size() + addSize);
}

size_t cbuf::resize(size_t newSize)
{
CBUF_MUTEX_LOCK();
size_t _size = size();
if(newSize == _size) {
return _size;
}

size_t bytes_available = available();
newSize += 1;
// not lose any data
// if data can be lost use remove or flush before resize
if((newSize < bytes_available) || (newSize == _size)) {
size_t bytes_available = available();
if(newSize < bytes_available) {
CBUF_MUTEX_UNLOCK();
log_e("new size is less than the currently available data size");
return _size;
}

char *newbuf = new char[newSize];
char *oldbuf = _buf;

if(!newbuf) {
RingbufHandle_t newbuf = xRingbufferCreate(newSize, RINGBUF_TYPE_BYTEBUF);
if(newbuf == NULL) {
CBUF_MUTEX_UNLOCK();
log_e("failed to allocate new ring buffer");
return _size;
}

if(_buf) {
read(newbuf, bytes_available);
memset((newbuf + bytes_available), 0x00, (newSize - bytes_available));
if(_buf != NULL) {
if(bytes_available){
char * old_data = (char *)malloc(bytes_available);
if(old_data == NULL){
vRingbufferDelete(newbuf);
CBUF_MUTEX_UNLOCK();
log_e("failed to allocate temporary buffer");
return _size;
}
bytes_available = read(old_data, bytes_available);
if(!bytes_available){
free(old_data);
vRingbufferDelete(newbuf);
CBUF_MUTEX_UNLOCK();
log_e("failed to read previous data");
return _size;
}
if(xRingbufferSend(newbuf, (void*)old_data, bytes_available, 0) != pdTRUE){
write(old_data, bytes_available);
free(old_data);
vRingbufferDelete(newbuf);
CBUF_MUTEX_UNLOCK();
log_e("failed to restore previous data");
return _size;
}
free(old_data);
}

RingbufHandle_t b = _buf;
_buf = newbuf;
vRingbufferDelete(b);
} else {
_buf = newbuf;
}

_begin = newbuf;
_end = newbuf + bytes_available;
_bufend = newbuf + newSize;
_size = newSize;

_buf = newbuf;
delete[] oldbuf;

return _size;
CBUF_MUTEX_UNLOCK();
return newSize;
}

size_t cbuf::available() const
{
if(_end >= _begin) {
return _end - _begin;
size_t available = 0;
if(_buf != NULL){
vRingbufferGetInfo(_buf, NULL, NULL, NULL, NULL, (UBaseType_t *)&available);
}
return _size - (_begin - _end);
if (has_peek) available++;
return available;
}

size_t cbuf::size()
{
size_t _size = 0;
if(_buf != NULL){
_size = xRingbufferGetMaxItemSize(_buf);
}
return _size;
}

size_t cbuf::room() const
{
if(_end >= _begin) {
return _size - (_end - _begin) - 1;
size_t _room = 0;
if(_buf != NULL){
_room = xRingbufferGetCurFreeSize(_buf);
}
return _begin - _end - 1;
return _room;
}

bool cbuf::empty() const
{
return available() == 0;
}

bool cbuf::full() const
{
return room() == 0;
}

int cbuf::peek()
{
if(empty()) {
if (!available()) {
return -1;
}

return static_cast<int>(*_begin);
}
int c;

size_t cbuf::peek(char *dst, size_t size)
{
size_t bytes_available = available();
size_t size_to_read = (size < bytes_available) ? size : bytes_available;
size_t size_read = size_to_read;
char * begin = _begin;
if(_end < _begin && size_to_read > (size_t) (_bufend - _begin)) {
size_t top_size = _bufend - _begin;
memcpy(dst, _begin, top_size);
begin = _buf;
size_to_read -= top_size;
dst += top_size;
}
memcpy(dst, begin, size_to_read);
return size_read;
CBUF_MUTEX_LOCK();
if (has_peek) {
c = peek_byte;
} else {
c = read();
if (c >= 0) {
has_peek = true;
peek_byte = c;
}
}
CBUF_MUTEX_UNLOCK();
return c;
}

int cbuf::read()
{
if(empty()) {
char result = 0;
if(!read(&result, 1)){
return -1;
}

char result = *_begin;
_begin = wrap_if_bufend(_begin + 1);
return static_cast<int>(result);
}

size_t cbuf::read(char* dst, size_t size)
{
CBUF_MUTEX_LOCK();
size_t bytes_available = available();
size_t size_to_read = (size < bytes_available) ? size : bytes_available;
size_t size_read = size_to_read;
if(_end < _begin && size_to_read > (size_t) (_bufend - _begin)) {
size_t top_size = _bufend - _begin;
memcpy(dst, _begin, top_size);
_begin = _buf;
size_to_read -= top_size;
dst += top_size;
}
memcpy(dst, _begin, size_to_read);
_begin = wrap_if_bufend(_begin + size_to_read);
if(!bytes_available || !size){
CBUF_MUTEX_UNLOCK();
return 0;
}

if (has_peek) {
if (dst != NULL) {
*dst++ = peek_byte;
}
size--;
}

size_t size_read = 0;
if (size) {
size_t received_size = 0;
size_t size_to_read = (size < bytes_available) ? size : bytes_available;
uint8_t *received_buff = (uint8_t *)xRingbufferReceiveUpTo(_buf, &received_size, 0, size_to_read);
if (received_buff != NULL) {
if(dst != NULL){
memcpy(dst, received_buff, received_size);
}
vRingbufferReturnItem(_buf, received_buff);
size_read = received_size;
size_to_read -= received_size;
// wrap around data
if(size_to_read){
received_size = 0;
received_buff = (uint8_t *)xRingbufferReceiveUpTo(_buf, &received_size, 0, size_to_read);
if (received_buff != NULL) {
if(dst != NULL){
memcpy(dst+size_read, received_buff, received_size);
}
vRingbufferReturnItem(_buf, received_buff);
size_read += received_size;
} else {
log_e("failed to read wrap around data from ring buffer");
}
}
} else {
log_e("failed to read from ring buffer");
}
}

if (has_peek) {
has_peek = false;
size_read++;
}

CBUF_MUTEX_UNLOCK();
return size_read;
}

size_t cbuf::write(char c)
{
if(full()) {
return 0;
}

*_end = c;
_end = wrap_if_bufend(_end + 1);
return 1;
return write(&c, 1);
}

size_t cbuf::write(const char* src, size_t size)
{
CBUF_MUTEX_LOCK();
size_t bytes_available = room();
if(!bytes_available || !size){
CBUF_MUTEX_UNLOCK();
return 0;
}
size_t size_to_write = (size < bytes_available) ? size : bytes_available;
size_t size_written = size_to_write;
if(_end >= _begin && size_to_write > (size_t) (_bufend - _end)) {
size_t top_size = _bufend - _end;
memcpy(_end, src, top_size);
_end = _buf;
size_to_write -= top_size;
src += top_size;
if(xRingbufferSend(_buf, (void*)src, size_to_write, 0) != pdTRUE){
CBUF_MUTEX_UNLOCK();
log_e("failed to write to ring buffer");
return 0;
}
memcpy(_end, src, size_to_write);
_end = wrap_if_bufend(_end + size_to_write);
return size_written;
CBUF_MUTEX_UNLOCK();
return size_to_write;
}

void cbuf::flush()
{
_begin = _buf;
_end = _buf;
read(NULL, available());
}

size_t cbuf::remove(size_t size)
{
CBUF_MUTEX_LOCK();
size_t bytes_available = available();
if(size >= bytes_available) {
flush();
return 0;
}
size_t size_to_remove = (size < bytes_available) ? size : bytes_available;
if(_end < _begin && size_to_remove > (size_t) (_bufend - _begin)) {
size_t top_size = _bufend - _begin;
_begin = _buf;
size_to_remove -= top_size;
if(bytes_available && size){
size_t size_to_remove = (size < bytes_available) ? size : bytes_available;
bytes_available -= read(NULL, size_to_remove);
}
_begin = wrap_if_bufend(_begin + size_to_remove);
return available();
CBUF_MUTEX_UNLOCK();
return bytes_available;
}
Loading