Description
Ahead of time let me apologize for this not ideal bug request since I couldn't manage to reproduce it outside of a rather complex application.
The issue: "randomly" (or under conditions I couldn't rack down yet) the sender/receiver of a channel disagree on the amount of messages in it.
My gut feeling says there is some kind of race condition I've not yet found but I'm going to file this issue in the hope that there is an "obvious" answer I'm missing :)
Basically what happens, and I've seen the same issue on two different code path, the sender thinks the channel is full, while the receiver think it's empty leading to the sender never sending another message and the receiver never being able to read a message.
This seems to trigger more often when using the merge()
on two channels.
The code that triggers this has a layout of:
source -c1-> processor -c2-> dst -c3-> processor -c4-> source
where the processor prioritizes c3
over c2
and the source loop uses try_send
to ensure even if c1
is full it can read from c4
.
The issue manifests in that the c4
sender thinks c4
is full
[src/pipeline.rs:309] o.len() = 64
and the receiver thinks c4 is empty
[src/source.rs:364] self.rx.len() = 0
It is al worth noting that this only triggers in release mode, as long as each step is "slow enough" it seems not to manifest.
This is reproducible from https://github.com/wayfair-tremor/tremor-runtime/pull/new/async-channel-issue by running:
cargo build -p tremor-server --release;perf record ./bench/run real-workflow-throughput-json
I'll add a few lines of debug output below for the sake of having context.
[src/pipeline.rs:309] o.len() = 1
[src/source.rs:364] self.rx.len() = 0
tremor://localhost/pipeline/main/01/out: 61
[src/pipeline.rs:309] o.len() = 0
[src/pipeline.rs:309] o.len() = 0
[src/pipeline.rs:309] o.len() = 1
[src/pipeline.rs:309] o.len() = 1
[src/pipeline.rs:309] o.len() = 2
[src/pipeline.rs:309] o.len() = 3
[src/pipeline.rs:309] o.len() = 4
[src/pipeline.rs:309] o.len() = 5
[src/pipeline.rs:309] o.len() = 6
[src/pipeline.rs:309] o.len() = 7
[src/pipeline.rs:309] o.len() = 8
[src/pipeline.rs:309] o.len() = 9
[src/pipeline.rs:309] o.len() = 10
[src/pipeline.rs:309] o.len() = 11
[src/pipeline.rs:309] o.len() = 12
[src/pipeline.rs:309] o.len() = 13
[src/pipeline.rs:309] o.len() = 14
[src/pipeline.rs:309] o.len() = 15
[src/pipeline.rs:309] o.len() = 16
[src/pipeline.rs:309] o.len() = 17
[src/pipeline.rs:309] o.len() = 18
[src/pipeline.rs:309] o.len() = 19
[src/pipeline.rs:309] o.len() = 20
[src/pipeline.rs:309] o.len() = 21
[src/pipeline.rs:309] o.len() = 22
[src/pipeline.rs:309] o.len() = 23
[src/pipeline.rs:309] o.len() = 24
[src/pipeline.rs:309] o.len() = 25
[src/pipeline.rs:309] o.len() = 26
[src/pipeline.rs:309] o.len() = 27
[src/pipeline.rs:309] o.len() = 28
[src/pipeline.rs:309] o.len() = 29
[src/pipeline.rs:309] o.len() = 30
[src/pipeline.rs:309] o.len() = 31
[src/pipeline.rs:309] o.len() = 32
[src/pipeline.rs:309] o.len() = 33
[src/pipeline.rs:309] o.len() = 34
[src/pipeline.rs:309] o.len() = 35
[src/pipeline.rs:309] o.len() = 36
[src/pipeline.rs:309] o.len() = 37
[src/pipeline.rs:309] o.len() = 38
[src/pipeline.rs:309] o.len() = 39
[src/pipeline.rs:309] o.len() = 40
[src/pipeline.rs:309] o.len() = 41
[src/pipeline.rs:309] o.len() = 42
[src/pipeline.rs:309] o.len() = 43
[src/pipeline.rs:309] o.len() = 44
[src/pipeline.rs:309] o.len() = 45
[src/pipeline.rs:309] o.len() = 46
[src/pipeline.rs:309] o.len() = 47
[src/pipeline.rs:309] o.len() = 48
[src/pipeline.rs:309] o.len() = 49
[src/pipeline.rs:309] o.len() = 50
[src/pipeline.rs:309] o.len() = 51
[src/pipeline.rs:309] o.len() = 52
[src/pipeline.rs:309] o.len() = 53
[src/pipeline.rs:309] o.len() = 54
[src/pipeline.rs:309] o.len() = 55
[src/pipeline.rs:309] o.len() = 56
[src/pipeline.rs:309] o.len() = 57
[src/pipeline.rs:309] o.len() = 58
[src/pipeline.rs:309] o.len() = 59
[src/pipeline.rs:309] o.len() = 60
[src/pipeline.rs:309] o.len() = 61
[src/pipeline.rs:309] o.len() = 62
[src/pipeline.rs:309] o.len() = 63
[src/pipeline.rs:309] o.len() = 64
[src/source.rs:364] self.rx.len() = 0
tremor://localhost/pipeline/main/01/out: 64