Skip to content

Commit 5e2ad37

Browse files
committed
extract code in helper function and uniform the code
reintroduce the closing of input channel (it's required)
1 parent 0bbb45b commit 5e2ad37

File tree

3 files changed

+72
-66
lines changed

3 files changed

+72
-66
lines changed

bufferflow_timed.go

+23-24
Original file line numberDiff line numberDiff line change
@@ -31,33 +31,32 @@ func NewBufferflowTimed(port string, output chan []byte) *BufferflowTimed {
3131

3232
func (b *BufferflowTimed) Init() {
3333
log.Println("Initting timed buffer flow (output once every 16ms)")
34+
go b.consumeInput()
35+
}
3436

35-
go func() {
36-
Loop:
37-
for {
38-
select {
39-
case data := <-b.input:
40-
b.bufferedOutput = b.bufferedOutput + data
41-
b.sPort = b.port
42-
case <-b.ticker.C:
43-
if b.bufferedOutput != "" {
44-
m := SpPortMessage{b.sPort, b.bufferedOutput}
45-
buf, _ := json.Marshal(m)
46-
// data is now encoded in base64 format
47-
// need a decoder on the other side
48-
b.output <- []byte(buf)
49-
b.bufferedOutput = ""
50-
b.sPort = ""
51-
}
52-
case <-b.done:
53-
break Loop
37+
func (b *BufferflowTimed) consumeInput() {
38+
Loop:
39+
for {
40+
select {
41+
case data := <-b.input: // use the buffer and append data to it
42+
b.bufferedOutput = b.bufferedOutput + data
43+
b.sPort = b.port
44+
case <-b.ticker.C: // after 16ms send the buffered output message
45+
if b.bufferedOutput != "" {
46+
m := SpPortMessage{b.sPort, b.bufferedOutput}
47+
buf, _ := json.Marshal(m)
48+
// data is now encoded in base64 format
49+
// need a decoder on the other side
50+
b.output <- buf
51+
// reset the buffer and the port
52+
b.bufferedOutput = ""
53+
b.sPort = ""
5454
}
55+
case <-b.done:
56+
break Loop //this is required, a simple break statement would only exit the innermost switch statement
5557
}
56-
57-
close(b.input)
58-
59-
}()
60-
58+
}
59+
close(b.input)
6160
}
6261

6362
func (b *BufferflowTimed) BlockUntilReady(cmd string, id string) (bool, bool) {

bufferflow_timedbinary.go

+25-20
Original file line numberDiff line numberDiff line change
@@ -31,28 +31,32 @@ func NewBufferflowTimedBinary(port string, output chan []byte) *BufferflowTimedB
3131

3232
func (b *BufferflowTimedBinary) Init() {
3333
log.Println("Initting timed buffer binary flow (output once every 16ms)")
34-
go func() {
35-
Loop:
36-
for {
37-
select {
38-
case data := <-b.input:
39-
b.bufferedOutputBinary = append(b.bufferedOutputBinary, data...)
40-
b.sPortBinary = b.port
41-
case <-b.ticker.C:
42-
if b.bufferedOutputBinary != nil {
43-
m := SpPortMessageRaw{b.sPortBinary, b.bufferedOutputBinary}
44-
buf, _ := json.Marshal(m)
45-
b.output <- buf
46-
b.bufferedOutputBinary = nil
47-
b.sPortBinary = ""
48-
}
49-
case <-b.done:
50-
break Loop
34+
go b.consumeInput()
35+
}
36+
37+
func (b *BufferflowTimedBinary) consumeInput() {
38+
Loop:
39+
for {
40+
select {
41+
case data := <-b.input: // use the buffer and append data to it
42+
b.bufferedOutputBinary = append(b.bufferedOutputBinary, data...)
43+
b.sPortBinary = b.port
44+
case <-b.ticker.C: // after 16ms send the buffered output message
45+
if b.bufferedOutputBinary != nil {
46+
m := SpPortMessageRaw{b.sPortBinary, b.bufferedOutputBinary}
47+
buf, _ := json.Marshal(m)
48+
// data is now encoded in base64 format
49+
// need a decoder on the other side
50+
b.output <- buf
51+
// reset the buffer and the port
52+
b.bufferedOutputBinary = nil
53+
b.sPortBinary = ""
5154
}
55+
case <-b.done:
56+
break Loop //this is required, a simple break statement would only exit the innermost switch statement
5257
}
53-
54-
close(b.input)
55-
}()
58+
}
59+
close(b.input)
5660
}
5761

5862
func (b *BufferflowTimedBinary) BlockUntilReady(cmd string, id string) (bool, bool) {
@@ -70,5 +74,6 @@ func (b *BufferflowTimedBinary) IsBufferGloballySendingBackIncomingData() bool {
7074

7175
func (b *BufferflowTimedBinary) Close() {
7276
b.ticker.Stop()
77+
b.done <- true
7378
close(b.input)
7479
}

bufferflow_timedraw.go

+24-22
Original file line numberDiff line numberDiff line change
@@ -31,31 +31,32 @@ func NewBufferflowTimedRaw(port string, output chan []byte) *BufferflowTimedRaw
3131

3232
func (b *BufferflowTimedRaw) Init() {
3333
log.Println("Initting timed buffer raw flow (output once every 16ms)")
34+
go b.consumeInput()
35+
}
3436

35-
go func() {
36-
Loop:
37-
for {
38-
select {
39-
case data := <-b.input:
40-
b.bufferedOutputRaw = append(b.bufferedOutputRaw, []byte(data)...)
41-
b.sPortRaw = b.port
42-
case <-b.ticker.C:
43-
if b.bufferedOutputRaw != nil {
44-
m := SpPortMessageRaw{b.sPortRaw, b.bufferedOutputRaw}
45-
buf, _ := json.Marshal(m)
46-
// data is now encoded in base64 format
47-
// need a decoder on the other side
48-
b.output <- []byte(buf)
49-
b.bufferedOutputRaw = nil
50-
b.sPortRaw = ""
51-
}
52-
case <-b.done:
53-
break Loop
37+
func (b *BufferflowTimedRaw) consumeInput() {
38+
Loop:
39+
for {
40+
select {
41+
case data := <-b.input: // use the buffer and append data to it
42+
b.bufferedOutputRaw = append(b.bufferedOutputRaw, []byte(data)...)
43+
b.sPortRaw = b.port
44+
case <-b.ticker.C: // after 16ms send the buffered output message
45+
if b.bufferedOutputRaw != nil {
46+
m := SpPortMessageRaw{b.sPortRaw, b.bufferedOutputRaw}
47+
buf, _ := json.Marshal(m)
48+
// data is now encoded in base64 format
49+
// need a decoder on the other side
50+
b.output <- buf
51+
// reset the buffer and the port
52+
b.bufferedOutputRaw = nil
53+
b.sPortRaw = ""
5454
}
55+
case <-b.done:
56+
break Loop //this is required, a simple break statement would only exit the innermost switch statement
5557
}
56-
57-
close(b.input)
58-
}()
58+
}
59+
close(b.input)
5960
}
6061

6162
func (b *BufferflowTimedRaw) BlockUntilReady(cmd string, id string) (bool, bool) {
@@ -73,5 +74,6 @@ func (b *BufferflowTimedRaw) IsBufferGloballySendingBackIncomingData() bool {
7374

7475
func (b *BufferflowTimedRaw) Close() {
7576
b.ticker.Stop()
77+
b.done <- true
7678
close(b.input)
7779
}

0 commit comments

Comments
 (0)