Skip to content

channel sender / receiver disagree on content #834

Closed
@Licenser

Description

@Licenser

Ahead of time let me apologize for this not ideal bug request since I couldn't manage to reproduce it outside of a rather complex application.

The issue: "randomly" (or under conditions I couldn't rack down yet) the sender/receiver of a channel disagree on the amount of messages in it.

My gut feeling says there is some kind of race condition I've not yet found but I'm going to file this issue in the hope that there is an "obvious" answer I'm missing :)

Basically what happens, and I've seen the same issue on two different code path, the sender thinks the channel is full, while the receiver think it's empty leading to the sender never sending another message and the receiver never being able to read a message.

This seems to trigger more often when using the merge() on two channels.

The code that triggers this has a layout of:

source -c1-> processor -c2-> dst -c3-> processor -c4-> source

where the processor prioritizes c3 over c2 and the source loop uses try_send to ensure even if c1 is full it can read from c4.

The issue manifests in that the c4 sender thinks c4 is full

[src/pipeline.rs:309] o.len() = 64

and the receiver thinks c4 is empty

[src/source.rs:364] self.rx.len() = 0

It is al worth noting that this only triggers in release mode, as long as each step is "slow enough" it seems not to manifest.

This is reproducible from https://github.com/wayfair-tremor/tremor-runtime/pull/new/async-channel-issue by running:

cargo build -p tremor-server --release;perf record ./bench/run real-workflow-throughput-json

I'll add a few lines of debug output below for the sake of having context.

[src/pipeline.rs:309] o.len() = 1
[src/source.rs:364] self.rx.len() = 0
tremor://localhost/pipeline/main/01/out: 61
[src/pipeline.rs:309] o.len() = 0
[src/pipeline.rs:309] o.len() = 0
[src/pipeline.rs:309] o.len() = 1
[src/pipeline.rs:309] o.len() = 1
[src/pipeline.rs:309] o.len() = 2
[src/pipeline.rs:309] o.len() = 3
[src/pipeline.rs:309] o.len() = 4
[src/pipeline.rs:309] o.len() = 5
[src/pipeline.rs:309] o.len() = 6
[src/pipeline.rs:309] o.len() = 7
[src/pipeline.rs:309] o.len() = 8
[src/pipeline.rs:309] o.len() = 9
[src/pipeline.rs:309] o.len() = 10
[src/pipeline.rs:309] o.len() = 11
[src/pipeline.rs:309] o.len() = 12
[src/pipeline.rs:309] o.len() = 13
[src/pipeline.rs:309] o.len() = 14
[src/pipeline.rs:309] o.len() = 15
[src/pipeline.rs:309] o.len() = 16
[src/pipeline.rs:309] o.len() = 17
[src/pipeline.rs:309] o.len() = 18
[src/pipeline.rs:309] o.len() = 19
[src/pipeline.rs:309] o.len() = 20
[src/pipeline.rs:309] o.len() = 21
[src/pipeline.rs:309] o.len() = 22
[src/pipeline.rs:309] o.len() = 23
[src/pipeline.rs:309] o.len() = 24
[src/pipeline.rs:309] o.len() = 25
[src/pipeline.rs:309] o.len() = 26
[src/pipeline.rs:309] o.len() = 27
[src/pipeline.rs:309] o.len() = 28
[src/pipeline.rs:309] o.len() = 29
[src/pipeline.rs:309] o.len() = 30
[src/pipeline.rs:309] o.len() = 31
[src/pipeline.rs:309] o.len() = 32
[src/pipeline.rs:309] o.len() = 33
[src/pipeline.rs:309] o.len() = 34
[src/pipeline.rs:309] o.len() = 35
[src/pipeline.rs:309] o.len() = 36
[src/pipeline.rs:309] o.len() = 37
[src/pipeline.rs:309] o.len() = 38
[src/pipeline.rs:309] o.len() = 39
[src/pipeline.rs:309] o.len() = 40
[src/pipeline.rs:309] o.len() = 41
[src/pipeline.rs:309] o.len() = 42
[src/pipeline.rs:309] o.len() = 43
[src/pipeline.rs:309] o.len() = 44
[src/pipeline.rs:309] o.len() = 45
[src/pipeline.rs:309] o.len() = 46
[src/pipeline.rs:309] o.len() = 47
[src/pipeline.rs:309] o.len() = 48
[src/pipeline.rs:309] o.len() = 49
[src/pipeline.rs:309] o.len() = 50
[src/pipeline.rs:309] o.len() = 51
[src/pipeline.rs:309] o.len() = 52
[src/pipeline.rs:309] o.len() = 53
[src/pipeline.rs:309] o.len() = 54
[src/pipeline.rs:309] o.len() = 55
[src/pipeline.rs:309] o.len() = 56
[src/pipeline.rs:309] o.len() = 57
[src/pipeline.rs:309] o.len() = 58
[src/pipeline.rs:309] o.len() = 59
[src/pipeline.rs:309] o.len() = 60
[src/pipeline.rs:309] o.len() = 61
[src/pipeline.rs:309] o.len() = 62
[src/pipeline.rs:309] o.len() = 63
[src/pipeline.rs:309] o.len() = 64
[src/source.rs:364] self.rx.len() = 0
tremor://localhost/pipeline/main/01/out: 64

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions