Skip to content

Commit c6264a9

Browse files
authored
examples: add an example of flow control behavior (#6648)
1 parent ee4b62c commit c6264a9

File tree

4 files changed

+270
-0
lines changed

4 files changed

+270
-0
lines changed

examples/examples_test.sh

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@ EXAMPLES=(
5959
"features/encryption/TLS"
6060
"features/error_details"
6161
"features/error_handling"
62+
"features/flow_control"
6263
"features/interceptor"
6364
"features/load_balancing"
6465
"features/metadata"
@@ -112,6 +113,7 @@ declare -A EXPECTED_SERVER_OUTPUT=(
112113
["features/encryption/TLS"]=""
113114
["features/error_details"]=""
114115
["features/error_handling"]=""
116+
["features/flow_control"]="Stream ended successfully."
115117
["features/interceptor"]="unary echoing message \"hello world\""
116118
["features/load_balancing"]="serving on :50051"
117119
["features/metadata"]="message:\"this is examples/metadata\", sending echo"
@@ -134,6 +136,7 @@ declare -A EXPECTED_CLIENT_OUTPUT=(
134136
["features/encryption/TLS"]="UnaryEcho: hello world"
135137
["features/error_details"]="Greeting: Hello world"
136138
["features/error_handling"]="Received error"
139+
["features/flow_control"]="Stream ended successfully."
137140
["features/interceptor"]="UnaryEcho: hello world"
138141
["features/load_balancing"]="calling helloworld.Greeter/SayHello with pick_first"
139142
["features/metadata"]="this is examples/metadata"
Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
# Flow Control
2+
3+
Flow control is a feature in gRPC that prevents senders from writing more data
4+
on a stream than a receiver is capable of handling. This feature behaves the
5+
same for both clients and servers. Because gRPC-Go uses a blocking-style API
6+
for stream operations, flow control pushback is implemented by simply blocking
7+
send operations on a stream when that stream's flow control limits have been
8+
reached. When the receiver has read enough data from the stream, the send
9+
operation will unblock automatically. Flow control is configured automatically
10+
based on a connection's Bandwidth Delay Product (BDP) to ensure the buffer is
11+
the minimum size necessary to allow for maximum throughput on the stream if the
12+
receiver is reading at its maximum speed.
13+
14+
## Try it
15+
16+
```
17+
go run ./server
18+
```
19+
20+
```
21+
go run ./client
22+
```
23+
24+
## Example explanation
25+
26+
The example client and server are written to demonstrate the blocking by
27+
intentionally sending messages while the other side is not receiving. The
28+
bidirectional echo stream in the example begins by having the client send
29+
messages until it detects it has blocked (utilizing another goroutine). The
30+
server sleeps for 2 seconds to allow this to occur. Then the server will read
31+
all of these messages, and the roles of the client and server are swapped so the
32+
server attempts to send continuously while the client sleeps. After the client
33+
sleeps for 2 seconds, it will read again to unblock the server. The server will
34+
detect that it has blocked, and end the stream once it has unblocked.
35+
36+
### Expected Output
37+
38+
The client output should look like:
39+
```
40+
2023/09/19 15:49:49 New stream began.
41+
2023/09/19 15:49:50 Sending is blocked.
42+
2023/09/19 15:49:51 Sent 25 messages.
43+
2023/09/19 15:49:53 Read 25 messages.
44+
2023/09/19 15:49:53 Stream ended successfully.
45+
```
46+
47+
while the server should output the following logs:
48+
49+
```
50+
2023/09/19 15:49:49 New stream began.
51+
2023/09/19 15:49:51 Read 25 messages.
52+
2023/09/19 15:49:52 Sending is blocked.
53+
2023/09/19 15:49:53 Sent 25 messages.
54+
2023/09/19 15:49:53 Stream ended successfully.
55+
```
Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,102 @@
1+
/*
2+
*
3+
* Copyright 2023 gRPC authors.
4+
*
5+
* Licensed under the Apache License, Version 2.0 (the "License");
6+
* you may not use this file except in compliance with the License.
7+
* You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*
17+
*/
18+
19+
// Binary client is an example client.
20+
package main
21+
22+
import (
23+
"context"
24+
"flag"
25+
"io"
26+
"log"
27+
"time"
28+
29+
"google.golang.org/grpc"
30+
"google.golang.org/grpc/credentials/insecure"
31+
pb "google.golang.org/grpc/examples/features/proto/echo"
32+
"google.golang.org/grpc/internal/grpcsync"
33+
)
34+
35+
var addr = flag.String("addr", "localhost:50052", "the address to connect to")
36+
37+
var payload string = string(make([]byte, 8*1024)) // 8KB
38+
39+
func main() {
40+
flag.Parse()
41+
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
42+
defer cancel()
43+
44+
conn, err := grpc.Dial(*addr, grpc.WithTransportCredentials(insecure.NewCredentials()))
45+
if err != nil {
46+
log.Fatalf("did not connect: %v", err)
47+
}
48+
defer conn.Close()
49+
50+
c := pb.NewEchoClient(conn)
51+
52+
stream, err := c.BidirectionalStreamingEcho(ctx)
53+
if err != nil {
54+
log.Fatalf("Error creating stream: %v", err)
55+
}
56+
log.Printf("New stream began.")
57+
58+
// First we will send data on the stream until we cannot send any more. We
59+
// detect this by not seeing a message sent 1s after the last sent message.
60+
stopSending := grpcsync.NewEvent()
61+
sentOne := make(chan struct{})
62+
go func() {
63+
i := 0
64+
for !stopSending.HasFired() {
65+
i++
66+
if err := stream.Send(&pb.EchoRequest{Message: payload}); err != nil {
67+
log.Fatalf("Error sending data: %v", err)
68+
}
69+
sentOne <- struct{}{}
70+
}
71+
log.Printf("Sent %v messages.", i)
72+
stream.CloseSend()
73+
}()
74+
75+
for !stopSending.HasFired() {
76+
after := time.NewTimer(time.Second)
77+
select {
78+
case <-sentOne:
79+
after.Stop()
80+
case <-after.C:
81+
log.Printf("Sending is blocked.")
82+
stopSending.Fire()
83+
<-sentOne
84+
}
85+
}
86+
87+
// Next, we wait 2 seconds before reading from the stream, to give the
88+
// server an opportunity to block while sending its responses.
89+
time.Sleep(2 * time.Second)
90+
91+
// Finally, read all the data sent by the server to allow it to unblock.
92+
for i := 0; true; i++ {
93+
if _, err := stream.Recv(); err != nil {
94+
log.Printf("Read %v messages.", i)
95+
if err == io.EOF {
96+
log.Printf("Stream ended successfully.")
97+
return
98+
}
99+
log.Fatalf("Error receiving data: %v", err)
100+
}
101+
}
102+
}
Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,110 @@
1+
/*
2+
*
3+
* Copyright 2023 gRPC authors.
4+
*
5+
* Licensed under the Apache License, Version 2.0 (the "License");
6+
* you may not use this file except in compliance with the License.
7+
* You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*
17+
*/
18+
19+
// Binary server is an example server.
20+
package main
21+
22+
import (
23+
"flag"
24+
"fmt"
25+
"io"
26+
"log"
27+
"net"
28+
"time"
29+
30+
"google.golang.org/grpc"
31+
32+
pb "google.golang.org/grpc/examples/features/proto/echo"
33+
"google.golang.org/grpc/internal/grpcsync"
34+
)
35+
36+
var port = flag.Int("port", 50052, "port number")
37+
38+
var payload string = string(make([]byte, 8*1024)) // 8KB
39+
40+
// server is used to implement EchoServer.
41+
type server struct {
42+
pb.UnimplementedEchoServer
43+
}
44+
45+
func (s *server) BidirectionalStreamingEcho(stream pb.Echo_BidirectionalStreamingEchoServer) error {
46+
log.Printf("New stream began.")
47+
// First, we wait 2 seconds before reading from the stream, to give the
48+
// client an opportunity to block while sending its requests.
49+
time.Sleep(2 * time.Second)
50+
51+
// Next, read all the data sent by the client to allow it to unblock.
52+
for i := 0; true; i++ {
53+
if _, err := stream.Recv(); err != nil {
54+
log.Printf("Read %v messages.", i)
55+
if err == io.EOF {
56+
break
57+
}
58+
log.Printf("Error receiving data: %v", err)
59+
return err
60+
}
61+
}
62+
63+
// Finally, send data until we block, then end the stream after we unblock.
64+
stopSending := grpcsync.NewEvent()
65+
sentOne := make(chan struct{})
66+
go func() {
67+
for !stopSending.HasFired() {
68+
after := time.NewTimer(time.Second)
69+
select {
70+
case <-sentOne:
71+
after.Stop()
72+
case <-after.C:
73+
log.Printf("Sending is blocked.")
74+
stopSending.Fire()
75+
<-sentOne
76+
}
77+
}
78+
}()
79+
80+
i := 0
81+
for !stopSending.HasFired() {
82+
i++
83+
if err := stream.Send(&pb.EchoResponse{Message: payload}); err != nil {
84+
log.Printf("Error sending data: %v", err)
85+
return err
86+
}
87+
sentOne <- struct{}{}
88+
}
89+
log.Printf("Sent %v messages.", i)
90+
91+
log.Printf("Stream ended successfully.")
92+
return nil
93+
}
94+
95+
func main() {
96+
flag.Parse()
97+
98+
address := fmt.Sprintf(":%v", *port)
99+
lis, err := net.Listen("tcp", address)
100+
if err != nil {
101+
log.Fatalf("failed to listen: %v", err)
102+
}
103+
104+
grpcServer := grpc.NewServer()
105+
pb.RegisterEchoServer(grpcServer, &server{})
106+
107+
if err := grpcServer.Serve(lis); err != nil {
108+
log.Fatalf("failed to serve: %v", err)
109+
}
110+
}

0 commit comments

Comments
 (0)