diff --git a/examples/Blocks/Blocks.ino b/examples/Blocks/Blocks.ino new file mode 100644 index 0000000..85efc0b --- /dev/null +++ b/examples/Blocks/Blocks.ino @@ -0,0 +1,22 @@ +/* + * This examples demonstrates the SOURCE/SINK abstraction. + * Each thread may have any number of SOURCES and SINKS that can be connected + * together using the "connectTo" method. + */ + +void setup() { + data_reader.out.connectTo(data_writer.in); + data_reader.start(); + data_writer.start(); + + // put your setup code here, to run once: + pinMode(LEDR, OUTPUT); +} + +void loop() { + // put your main code here, to run repeatedly: + digitalWrite(LEDR, HIGH); + delay(1000); + digitalWrite(LEDR, LOW); + delay(1000); +} diff --git a/examples/Blocks/SharedVariables.h b/examples/Blocks/SharedVariables.h new file mode 100644 index 0000000..e69de29 diff --git a/examples/Blocks/data_reader.inot b/examples/Blocks/data_reader.inot new file mode 100644 index 0000000..6bd9708 --- /dev/null +++ b/examples/Blocks/data_reader.inot @@ -0,0 +1,12 @@ + +/* The output SOURCE, it sends 'int' */ +SOURCE(out, int) + +void setup() { +} + +// a '1' is sent every 100 ms +void loop() { + out.send(1); + delay(100); +} diff --git a/examples/Blocks/data_writer.inot b/examples/Blocks/data_writer.inot new file mode 100644 index 0000000..fbd6b5e --- /dev/null +++ b/examples/Blocks/data_writer.inot @@ -0,0 +1,21 @@ + +/* + * An 'int' SINK with a size of '0'. This kind of SINK has no buffer so the reading thread + * will block until the writing thread has written something, or viceversa. + */ +SINK(in, int, 0) + +void setup() { + pinMode(LEDB, OUTPUT); +} + +void loop() { + // Read an 'int' from the SINK and discards it. Since there is basically no delay in the loop + // this call will surely block until something comes from the connected SOURCE. In this case + // the pace is dictated by the SOURCE that sends data every 100 ms. + in.read(); + digitalWrite(LEDB, HIGH); + + in.read(); + digitalWrite(LEDB, LOW); +} diff --git a/src/Arduino_Threads.h b/src/Arduino_Threads.h index 6b2a6f9..f176365 100644 --- a/src/Arduino_Threads.h +++ b/src/Arduino_Threads.h @@ -3,6 +3,170 @@ #include +#define SOURCE(name, type) \ +public: \ + Source name; \ +private: + +#define SINK(name, type, size) \ +public: \ + Sink name{size}; \ +private: +// we need to call the Sink(int size) non-default constructor using size as parameter. +// This is done by writing +// Sink name{size}; +// instead of: +// Sink name(size); +// otherwise the compiler will read it as a declaration of a method called "name" and we +// get a syntax error. +// This is called "C++11 uniform init" (using "{}" instead of "()" without "="... yikes!) +// https://chromium.googlesource.com/chromium/src/+/master/styleguide/c++/c++-dos-and-donts.md + +// Forward declaration of Sink and Source +template +class Sink; +template +class Source; + +template +class Sink +{ + private: + rtos::Mutex dataMutex; + rtos::ConditionVariable dataAvailable; + rtos::ConditionVariable slotAvailable; + T latest; + Sink *next; + const int size; + int first, last; + bool full; + T *queue; + + public: + Sink(int s) : + dataAvailable(dataMutex), + slotAvailable(dataMutex), + size(s), + queue((size > 0) ? new T[size] : nullptr), + first(0), last(0), full(false) + {}; + + ~Sink() { + if (queue != nullptr) { delete queue; } + } + + + //protected: TODO + void connectTo(Sink &sink) + { + if (next == nullptr) { + next = &sink; + } else { + next->connectTo(sink); + } + } + + T read() + { + // Non-blocking shared variable + if (size == -1) { + dataMutex.lock(); + T res = latest; + dataMutex.unlock(); + return res; + } + + // Blocking shared variable + if (size == 0) { + dataMutex.lock(); + while (!full) { + dataAvailable.wait(); + } + T res = latest; + full = false; + slotAvailable.notify_all(); + dataMutex.unlock(); + return res; + } + + // Blocking queue + dataMutex.lock(); + while (first == last && !full) { + dataAvailable.wait(); + } + T res = queue[first++]; + first %= size; + if (full) { + full = false; + slotAvailable.notify_one(); + } + dataMutex.unlock(); + return res; + } + + //protected: TODO + void inject(const T &value) + { + dataMutex.lock(); + + // Non-blocking shared variable + if (size == -1) { + latest = value; + } + + // Blocking shared variable + else if (size == 0) { + while (full) { + slotAvailable.wait(); + } + latest = value; + full = true; + dataAvailable.notify_one(); + slotAvailable.wait(); + } + + // Blocking queue + else { + while (full) { + slotAvailable.wait(); + } + if (first == last) { + dataAvailable.notify_one(); + } + queue[last++] = value; + last %= size; + if (first == last) { + full = true; + } + } + dataMutex.unlock(); + + if (next) next->inject(value); + } +}; + +template +class Source +{ + public: + Source() {}; + + void connectTo(Sink &sink) { + if (destination == nullptr) { + destination = &sink; + } else { + destination->connectTo(sink); + } + } + + void send(const T &value) { + if (destination) destination->inject(value); + } + + private: + Sink *destination; +}; + template class Shared // template definition {