Skip to content

Commit 0297314

Browse files
committed
Add circular_buffer.py
1 parent fff34ed commit 0297314

File tree

1 file changed

+239
-0
lines changed

1 file changed

+239
-0
lines changed

Diff for: data_structures/arrays/circular_buffer.py

+239
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,239 @@
1+
class CircularBuffer:
2+
def __init__(self, capacity):
3+
self._tail = 0
4+
self._head = 0
5+
self._array = [None] * capacity
6+
7+
def _push(self, item):
8+
if self.is_full:
9+
raise Exception("push to full queue")
10+
self._array[self._head] = item
11+
self._head = self.wrap(self._head + 1)
12+
13+
def push(self, item, overwrite=True):
14+
res = None
15+
if self.is_full and overwrite:
16+
res = self.pop()
17+
self._push(item)
18+
return res
19+
20+
@property
21+
def is_full(self):
22+
return self._tail == self.wrap(self._head + 1)
23+
24+
@property
25+
def is_empty(self):
26+
return self._tail == self._head
27+
28+
def pop(self):
29+
if self.is_empty:
30+
raise Exception("pop from empty queue")
31+
item = self._array[self._tail]
32+
self._tail = self.wrap(self._tail + 1)
33+
return item
34+
35+
def wrap(self, value):
36+
"""
37+
>>> cb = CircularBuffer(6)
38+
>>> cb.wrap(5)
39+
5
40+
>>> cb.wrap(6)
41+
0
42+
43+
>>> cb = CircularBuffer(8)
44+
>>> cb.wrap(7)
45+
7
46+
>>> cb.wrap(8)
47+
0
48+
"""
49+
if self._is_power_of_two():
50+
# optimization taken from https://github.com/AndersKaloer/Ring-Buffer/blob/1468e24fc55986/ringbuffer.c#L26
51+
return value & (self.capacity - 1)
52+
return value % self.capacity
53+
54+
def _is_power_of_two(self):
55+
return not self.capacity & (self.capacity - 1)
56+
57+
@property
58+
def capacity(self):
59+
"""Length of the underlying storage array
60+
61+
One slot is always left empty to distinguish between “full” and “empty”
62+
states. As a result, although len(self._array) is N, the usable
63+
capacity is N-1.
64+
"""
65+
return len(self._array)
66+
67+
@property
68+
def size(self):
69+
return self.wrap(self._head - self._tail)
70+
71+
def __iter__(self):
72+
# WARNING: the object should not be changed during iteration
73+
i = self._tail
74+
while i != self._head:
75+
yield self._array[i]
76+
i = self.wrap(i + 1)
77+
78+
def __getitem__(self, index_from_tail):
79+
"""
80+
>>> cb = CircularBuffer(8)
81+
>>> for i in range(10):
82+
... cb.push(i);
83+
0
84+
1
85+
2
86+
>>> cb.pop()
87+
3
88+
>>> cb.pop()
89+
4
90+
>>> print(cb, end='')
91+
8
92+
9
93+
h -> <free>
94+
<free>
95+
<free>
96+
t -> 5
97+
6
98+
7
99+
100+
>>> cb[0]
101+
5
102+
>>> cb[1]
103+
6
104+
>>> cb[3]
105+
8
106+
>>> cb[5]
107+
Traceback (most recent call last):
108+
...
109+
IndexError: ...
110+
111+
>>> cb[-1]
112+
9
113+
>>> cb[-5]
114+
5
115+
>>> cb[-6]
116+
Traceback (most recent call last):
117+
...
118+
IndexError: ...
119+
"""
120+
121+
if not -self.size <= index_from_tail < self.size:
122+
raise IndexError(
123+
"%d not in range [%d, %d)" % (index_from_tail, -self.size, self.size)
124+
)
125+
if index_from_tail >= 0:
126+
index_array = index_from_tail + self._tail
127+
else:
128+
index_array = self._head + index_from_tail
129+
return self._array[self.wrap(index_array)]
130+
131+
def __str__(self):
132+
res = ""
133+
for i in range(self.capacity):
134+
if i == self._head == self._tail:
135+
res += "h=t -> "
136+
elif i == self._head:
137+
res += " h -> "
138+
elif i == self._tail:
139+
res += " t -> "
140+
else:
141+
res += " "
142+
143+
if self.wrap(i - self._tail) < self.size:
144+
res += str(self._array[i])
145+
else:
146+
res += "<free>"
147+
res += "\n"
148+
return res
149+
150+
151+
import pytest
152+
153+
154+
def test():
155+
b = CircularBuffer(8)
156+
assert b.capacity == 8
157+
158+
assert (
159+
str(b)
160+
== """\
161+
h=t -> <free>
162+
<free>
163+
<free>
164+
<free>
165+
<free>
166+
<free>
167+
<free>
168+
<free>
169+
"""
170+
)
171+
172+
for i in range(6):
173+
b.push(i)
174+
assert b.size == 6
175+
176+
assert list(iter(b)) == list(range(6))
177+
178+
assert b.pop() == 0
179+
assert b.pop() == 1
180+
181+
assert (
182+
str(b)
183+
== """\
184+
<free>
185+
<free>
186+
t -> 2
187+
3
188+
4
189+
5
190+
h -> <free>
191+
<free>
192+
"""
193+
)
194+
195+
for _ in range(4):
196+
b.pop()
197+
198+
with pytest.raises(Exception, match="pop from empty queue"):
199+
b.pop()
200+
201+
202+
def test_overflow():
203+
b = CircularBuffer(8)
204+
205+
for i in range(10):
206+
b.push(i)
207+
208+
assert list(iter(b)) == list(range(3, 10))
209+
assert b.size == 7
210+
211+
assert (
212+
str(b)
213+
== """\
214+
8
215+
9
216+
h -> <free>
217+
t -> 3
218+
4
219+
5
220+
6
221+
7
222+
"""
223+
)
224+
225+
assert b.pop() == 3
226+
assert b.pop() == 4
227+
assert (
228+
str(b)
229+
== """\
230+
8
231+
9
232+
h -> <free>
233+
<free>
234+
<free>
235+
t -> 5
236+
6
237+
7
238+
"""
239+
)

0 commit comments

Comments
 (0)