Skip to content

Implementation of Source/Sinks and connected blocks paradigm #14

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Oct 15, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions examples/Blocks/Blocks.ino
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/*
* This examples demonstrates the SOURCE/SINK abstraction.
* Each thread may have any number of SOURCES and SINKS that can be connected
* together using the "connectTo" method.
*/

void setup() {
data_reader.out.connectTo(data_writer.in);
data_reader.start();
data_writer.start();

// put your setup code here, to run once:
pinMode(LEDR, OUTPUT);
}

void loop() {
// put your main code here, to run repeatedly:
digitalWrite(LEDR, HIGH);
delay(1000);
digitalWrite(LEDR, LOW);
delay(1000);
}
Empty file.
12 changes: 12 additions & 0 deletions examples/Blocks/data_reader.inot
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@

/* The output SOURCE, it sends 'int' */
SOURCE(out, int)

void setup() {
}

// a '1' is sent every 100 ms
void loop() {
out.send(1);
delay(100);
}
21 changes: 21 additions & 0 deletions examples/Blocks/data_writer.inot
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@

/*
* An 'int' SINK with a size of '0'. This kind of SINK has no buffer so the reading thread
* will block until the writing thread has written something, or viceversa.
*/
SINK(in, int, 0)

void setup() {
pinMode(LEDB, OUTPUT);
}

void loop() {
// Read an 'int' from the SINK and discards it. Since there is basically no delay in the loop
// this call will surely block until something comes from the connected SOURCE. In this case
// the pace is dictated by the SOURCE that sends data every 100 ms.
in.read();
digitalWrite(LEDB, HIGH);

in.read();
digitalWrite(LEDB, LOW);
}
164 changes: 164 additions & 0 deletions src/Arduino_Threads.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,170 @@

#include <MemoryPool.h>

#define SOURCE(name, type) \
public: \
Source<type> name; \
private:

#define SINK(name, type, size) \
public: \
Sink<type> name{size}; \
private:
// we need to call the Sink<T>(int size) non-default constructor using size as parameter.
// This is done by writing
// Sink<type> name{size};
// instead of:
// Sink<type> name(size);
// otherwise the compiler will read it as a declaration of a method called "name" and we
// get a syntax error.
// This is called "C++11 uniform init" (using "{}" instead of "()" without "="... yikes!)
// https://chromium.googlesource.com/chromium/src/+/master/styleguide/c++/c++-dos-and-donts.md

// Forward declaration of Sink and Source
template<class T>
class Sink;
template<class T>
class Source;

template<class T>
class Sink
{
private:
rtos::Mutex dataMutex;
rtos::ConditionVariable dataAvailable;
rtos::ConditionVariable slotAvailable;
T latest;
Sink *next;
const int size;
int first, last;
bool full;
T *queue;

public:
Sink(int s) :
dataAvailable(dataMutex),
slotAvailable(dataMutex),
size(s),
queue((size > 0) ? new T[size] : nullptr),
first(0), last(0), full(false)
{};

~Sink() {
if (queue != nullptr) { delete queue; }
}


//protected: TODO
void connectTo(Sink &sink)
{
if (next == nullptr) {
next = &sink;
} else {
next->connectTo(sink);
}
}

T read()
{
// Non-blocking shared variable
if (size == -1) {
dataMutex.lock();
T res = latest;
dataMutex.unlock();
return res;
}

// Blocking shared variable
if (size == 0) {
dataMutex.lock();
while (!full) {
dataAvailable.wait();
}
T res = latest;
full = false;
slotAvailable.notify_all();
dataMutex.unlock();
return res;
}

// Blocking queue
dataMutex.lock();
while (first == last && !full) {
dataAvailable.wait();
}
T res = queue[first++];
first %= size;
if (full) {
full = false;
slotAvailable.notify_one();
}
dataMutex.unlock();
return res;
}

//protected: TODO
void inject(const T &value)
{
dataMutex.lock();

// Non-blocking shared variable
if (size == -1) {
latest = value;
}

// Blocking shared variable
else if (size == 0) {
while (full) {
slotAvailable.wait();
}
latest = value;
full = true;
dataAvailable.notify_one();
slotAvailable.wait();
}

// Blocking queue
else {
while (full) {
slotAvailable.wait();
}
if (first == last) {
dataAvailable.notify_one();
}
queue[last++] = value;
last %= size;
if (first == last) {
full = true;
}
}
dataMutex.unlock();

if (next) next->inject(value);
}
};

template<class T>
class Source
{
public:
Source() {};

void connectTo(Sink<T> &sink) {
if (destination == nullptr) {
destination = &sink;
} else {
destination->connectTo(sink);
}
}

void send(const T &value) {
if (destination) destination->inject(value);
}

private:
Sink<T> *destination;
};

template<class T, size_t QUEUE_SIZE = 16>
class Shared // template definition
{
Expand Down