Skip to content

Commit 7bfd800

Browse files
committed
A big rework of the source/sink system.
1 parent 318d7ff commit 7bfd800

15 files changed

+159
-141
lines changed
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,13 @@
1-
void setup() {
1+
void setup()
2+
{
23
Serial.begin(115200);
34
while (!Serial) { }
45

56
Producer.start();
67
Consumer.start();
78
}
89

9-
void loop() {
10+
void loop()
11+
{
1012
rtos::ThisThread::yield();
1113
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
SINK(counter, int);
2+
3+
void setup()
4+
{
5+
6+
}
7+
8+
void loop()
9+
{
10+
Serial.println(counter.read());
11+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
/*
2+
* This examples demonstrates the SOURCE/SINK abstraction.
3+
* Each thread may have any number of SOURCES and SINKS that can be connected
4+
* together using the "connectTo" method.
5+
*/
6+
7+
void setup()
8+
{
9+
Producer.counter.connectTo(Consumer.counter);
10+
Producer.start();
11+
Consumer.start();
12+
}
13+
14+
void loop() {
15+
rtos::ThisThread::yield();
16+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
SOURCE(counter, int);
2+
3+
void setup()
4+
{
5+
6+
}
7+
8+
void loop()
9+
{
10+
static int i = 0;
11+
counter.write(i);
12+
i++;
13+
}

examples/Demo_Source_Sink_LED/SharedVariables.h

Whitespace-only changes.

examples/Demo_Source_Sink/Sink_Thread.inot renamed to examples/Demo_Source_Sink_LED/Sink_Thread.inot

+1-1
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
* An 'bool' SINK with a size of '0'. This kind of SINK has no buffer so the reading thread
44
* will block until the writing thread has written something, or vice versa.
55
*/
6-
SINK(led, bool, 0)
6+
SINK(led, bool);
77

88
void setup()
99
{
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/* The output SOURCE, it sends 'bool' values. */
2-
SOURCE(led, bool)
2+
SOURCE(led, bool);
33

44
void setup()
55
{
@@ -8,8 +8,8 @@ void setup()
88

99
void loop()
1010
{
11-
led.send(true);
11+
led.write(true);
1212
delay(100);
13-
led.send(false);
13+
led.write(false);
1414
delay(100);
1515
}

src/Arduino_Threads.h

+2-2
Original file line numberDiff line numberDiff line change
@@ -39,9 +39,9 @@ public: \
3939
Source<type> name; \
4040
private:
4141

42-
#define SINK(name, type, size) \
42+
#define SINK(name, type) \
4343
public: \
44-
Sink<type> name{size}; \
44+
SinkBlocking<type> name; \
4545
private:
4646
// we need to call the Sink<T>(int size) non-default constructor using size as parameter.
4747
// This is done by writing

src/Sink.hpp

+72-114
Original file line numberDiff line numberDiff line change
@@ -29,121 +29,79 @@
2929
* CLASS DECLARATION
3030
**************************************************************************************/
3131

32-
template<class T>
33-
class Sink
32+
template<typename T>
33+
class SinkBase
3434
{
35-
private:
36-
rtos::Mutex dataMutex;
37-
rtos::ConditionVariable dataAvailable;
38-
rtos::ConditionVariable slotAvailable;
39-
T latest;
40-
Sink *next;
41-
const int size;
42-
int first, last;
43-
bool full;
44-
T *queue;
45-
46-
public:
47-
Sink(int s) :
48-
dataAvailable(dataMutex),
49-
slotAvailable(dataMutex),
50-
size(s),
51-
queue((size > 0) ? new T[size] : nullptr),
52-
first(0), last(0), full(false)
53-
{};
54-
55-
~Sink() {
56-
if (queue != nullptr) { delete queue; }
57-
}
58-
59-
60-
//protected: TODO
61-
void connectTo(Sink &sink)
62-
{
63-
if (next == nullptr) {
64-
next = &sink;
65-
} else {
66-
next->connectTo(sink);
67-
}
68-
}
69-
70-
T read()
71-
{
72-
// Non-blocking shared variable
73-
if (size == -1) {
74-
dataMutex.lock();
75-
T res = latest;
76-
dataMutex.unlock();
77-
return res;
78-
}
79-
80-
// Blocking shared variable
81-
if (size == 0) {
82-
dataMutex.lock();
83-
while (!full) {
84-
dataAvailable.wait();
85-
}
86-
T res = latest;
87-
full = false;
88-
slotAvailable.notify_all();
89-
dataMutex.unlock();
90-
return res;
91-
}
92-
93-
// Blocking queue
94-
dataMutex.lock();
95-
while (first == last && !full) {
96-
dataAvailable.wait();
97-
}
98-
T res = queue[first++];
99-
first %= size;
100-
if (full) {
101-
full = false;
102-
slotAvailable.notify_one();
103-
}
104-
dataMutex.unlock();
105-
return res;
106-
}
107-
108-
//protected: TODO
109-
void inject(const T &value)
110-
{
111-
dataMutex.lock();
112-
113-
// Non-blocking shared variable
114-
if (size == -1) {
115-
latest = value;
116-
}
117-
118-
// Blocking shared variable
119-
else if (size == 0) {
120-
while (full) {
121-
slotAvailable.wait();
122-
}
123-
latest = value;
124-
full = true;
125-
dataAvailable.notify_one();
126-
slotAvailable.wait();
127-
}
128-
129-
// Blocking queue
130-
else {
131-
while (full) {
132-
slotAvailable.wait();
133-
}
134-
if (first == last) {
135-
dataAvailable.notify_one();
136-
}
137-
queue[last++] = value;
138-
last %= size;
139-
if (first == last) {
140-
full = true;
141-
}
142-
}
143-
dataMutex.unlock();
144-
145-
if (next) next->inject(value);
146-
}
35+
public:
36+
37+
virtual ~SinkBase() { }
38+
39+
virtual T read() = 0;
40+
virtual void inject(T const & value) = 0;
41+
};
42+
43+
template<typename T>
44+
class SinkNonBlocking : public SinkBase<T>
45+
{
46+
/* TODO - Do we really need this? */
14747
};
14848

49+
template<typename T>
50+
class SinkBlocking : public SinkBase<T>
51+
{
52+
public:
53+
54+
SinkBlocking();
55+
virtual ~SinkBlocking() { }
56+
57+
virtual T read() override;
58+
virtual void inject(T const & value) override;
59+
60+
61+
private:
62+
63+
T _data;
64+
bool _is_data_available;
65+
rtos::Mutex _mutex;
66+
rtos::ConditionVariable _cond_data_available;
67+
rtos::ConditionVariable _cond_slot_available;
68+
69+
};
70+
71+
/**************************************************************************************
72+
* PUBLIC MEMBER FUNCTIONS - SinkBlocking
73+
**************************************************************************************/
74+
75+
template<typename T>
76+
SinkBlocking<T>::SinkBlocking()
77+
: _is_data_available{false}
78+
, _cond_data_available(_mutex)
79+
, _cond_slot_available(_mutex)
80+
{ }
81+
82+
template<typename T>
83+
T SinkBlocking<T>::read()
84+
{
85+
_mutex.lock();
86+
while (!_is_data_available)
87+
_cond_data_available.wait();
88+
T const d = _data;
89+
_is_data_available = false;
90+
_cond_slot_available.notify_all();
91+
_mutex.unlock();
92+
return d;
93+
}
94+
95+
template<typename T>
96+
void SinkBlocking<T>::inject(T const & value)
97+
{
98+
_mutex.lock();
99+
while (_is_data_available)
100+
_cond_slot_available.wait();
101+
_data = value;
102+
_is_data_available = true;
103+
_cond_data_available.notify_all();
104+
_mutex.unlock();
105+
}
106+
149107
#endif /* ARDUINO_THREADS_SINK_HPP_ */

src/Source.hpp

+37-19
Original file line numberDiff line numberDiff line change
@@ -19,37 +19,55 @@
1919
#ifndef ARDUINO_THREADS_SOURCE_HPP_
2020
#define ARDUINO_THREADS_SOURCE_HPP_
2121

22+
/**************************************************************************************
23+
* INCLUDE
24+
**************************************************************************************/
25+
26+
#include <list>
27+
#include <algorithm>
28+
2229
/**************************************************************************************
2330
* FORWARD DECLARATION
2431
**************************************************************************************/
2532

2633
template<class T>
27-
class Sink;
34+
class SinkBase;
2835

2936
/**************************************************************************************
3037
* CLASS DECLARATION
3138
**************************************************************************************/
3239

33-
template<class T>
40+
template<typename T>
3441
class Source
3542
{
36-
public:
37-
Source() {};
38-
39-
void connectTo(Sink<T> &sink) {
40-
if (destination == nullptr) {
41-
destination = &sink;
42-
} else {
43-
destination->connectTo(sink);
44-
}
45-
}
46-
47-
void send(const T &value) {
48-
if (destination) destination->inject(value);
49-
}
50-
51-
private:
52-
Sink<T> *destination;
43+
public:
44+
45+
void connectTo(SinkBase<T> & sink);
46+
void write(T const & value);
47+
48+
private:
49+
std::list<SinkBase<T> *> _sink_list;
5350
};
5451

52+
/**************************************************************************************
53+
* PUBLIC MEMBER FUNCTIONS
54+
**************************************************************************************/
55+
56+
template<typename T>
57+
void Source<T>::connectTo(SinkBase<T> & sink)
58+
{
59+
_sink_list.push_back(&sink);
60+
}
61+
62+
template<typename T>
63+
void Source<T>::write(T const & value)
64+
{
65+
std::for_each(std::begin(_sink_list),
66+
std::end (_sink_list),
67+
[value](SinkBase<T> * sink)
68+
{
69+
sink->inject(value);
70+
});
71+
}
72+
5573
#endif /* ARDUINO_THREADS_SOURCE_HPP_ */

0 commit comments

Comments
 (0)