@@ -31,14 +31,14 @@ class BufferDataSource : public DataSource {
31
31
32
32
const uint8_t * get_buffer (size_t size) override
33
33
{
34
- (void ) size;
34
+ (void )size;
35
35
assert (_pos + size <= _size);
36
36
return _data + _pos;
37
37
}
38
38
39
39
void release_buffer (const uint8_t * buffer, size_t size) override
40
40
{
41
- (void ) buffer;
41
+ (void )buffer;
42
42
assert (buffer == _data + _pos);
43
43
_pos += size;
44
44
}
@@ -66,28 +66,65 @@ class BufferedStreamDataSource : public DataSource {
66
66
const uint8_t * get_buffer (size_t size) override
67
67
{
68
68
assert (_pos + size <= _size);
69
- if (_bufferSize < size) {
70
- _buffer.reset (new uint8_t [size]);
71
- _bufferSize = size;
69
+
70
+ // Data that was already read from the stream but not released (e.g. if tcp_write error occured). Otherwise this should be 0.
71
+ const size_t stream_read = _streamPos - _pos;
72
+
73
+ // Min required buffer size: max(requested size, previous stream data already in buffer)
74
+ const size_t min_buffer_size = size > stream_read ? size : stream_read;
75
+
76
+ // Buffer too small?
77
+ if (_bufferSize < min_buffer_size) {
78
+ uint8_t *new_buffer = new uint8_t [min_buffer_size];
79
+ // If stream reading is ahead, than some data is already in the old buffer and needs to be copied to new resized buffer
80
+ if (_buffer && stream_read > 0 ) {
81
+ memcpy (new_buffer, _buffer.get (), stream_read);
82
+ }
83
+ _buffer.reset (new_buffer);
84
+ _bufferSize = min_buffer_size;
85
+ }
86
+
87
+ // Fetch remaining data from stream
88
+ // If error in tcp_write in ClientContext::_write_some() occured earlier and therefore release_buffer was not called last time, than the requested stream data is already in the buffer.
89
+ if (size > stream_read) {
90
+ // Remaining bytes to read from stream
91
+ const size_t stream_rem = size - stream_read;
92
+ const size_t cb = _stream.readBytes (reinterpret_cast <char *>(_buffer.get () + stream_read), stream_rem);
93
+ assert (cb == stream_rem);
94
+ (void )cb;
95
+ _streamPos += stream_rem;
72
96
}
73
- size_t cb = _stream.readBytes (reinterpret_cast <char *>(_buffer.get ()), size);
74
- assert (cb == size);
75
- (void ) cb;
76
97
return _buffer.get ();
98
+
77
99
}
78
100
79
101
void release_buffer (const uint8_t * buffer, size_t size) override
80
102
{
81
- (void ) buffer;
82
- _pos += size;
103
+ if (size == 0 ) {
104
+ return ;
105
+ }
106
+
107
+ (void )buffer;
108
+ _pos += size;
109
+
110
+ // Cannot release more than acquired through get_buffer
111
+ assert (_pos <= _streamPos);
112
+
113
+ // Release less than requested with get_buffer?
114
+ if (_pos < _streamPos) {
115
+ // Move unreleased stream data in buffer to front
116
+ assert (_buffer);
117
+ memmove (_buffer.get (), _buffer.get () + size, _streamPos - _pos);
118
+ }
83
119
}
84
120
85
121
protected:
86
- TStream& _stream;
122
+ TStream & _stream;
87
123
std::unique_ptr<uint8_t []> _buffer;
88
124
size_t _size;
89
125
size_t _pos = 0 ;
90
126
size_t _bufferSize = 0 ;
127
+ size_t _streamPos = 0 ;
91
128
};
92
129
93
130
class ProgmemStream
@@ -104,7 +141,7 @@ class ProgmemStream
104
141
size_t will_read = (_left < size) ? _left : size;
105
142
memcpy_P ((void *)dst, (PGM_VOID_P)_buf, will_read);
106
143
_left -= will_read;
107
- _buf += will_read;
144
+ _buf += will_read;
108
145
return will_read;
109
146
}
110
147
0 commit comments