forked from kubernetes-sigs/gateway-api-inference-extension
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathserver.go
117 lines (103 loc) · 3.61 KB
/
server.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
package handlers
import (
"io"
extProcPb "github.com/envoyproxy/go-control-plane/envoy/service/ext_proc/v3"
envoyTypePb "github.com/envoyproxy/go-control-plane/envoy/type/v3"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
klog "k8s.io/klog/v2"
"ext-proc/backend"
"ext-proc/scheduling"
)
func NewServer(pp PodProvider, scheduler Scheduler, targetPodHeader string) *Server {
return &Server{
scheduler: scheduler,
podProvider: pp,
targetPodHeader: targetPodHeader,
}
}
// Server implements the Envoy external processing server.
// https://www.envoyproxy.io/docs/envoy/latest/api-v3/service/ext_proc/v3/external_processor.proto
type Server struct {
scheduler Scheduler
podProvider PodProvider
// The key of the header to specify the target pod address. This value needs to match Envoy
// configuration.
targetPodHeader string
}
type Scheduler interface {
Schedule(b *scheduling.LLMRequest) (targetPod *backend.Pod, err error)
}
// PodProvider is an interface to provide set of pods in the backend and information such as metrics.
type PodProvider interface {
GetPodMetrics(pod backend.Pod) (*backend.PodMetrics, bool)
UpdatePodMetrics(pod backend.Pod, pm *backend.PodMetrics)
}
func (s *Server) Process(srv extProcPb.ExternalProcessor_ProcessServer) error {
klog.V(3).Info("Processing")
ctx := srv.Context()
// Create request context to share states during life time of an HTTP request.
// See https://github.com/envoyproxy/envoy/issues/17540.
reqCtx := &RequestContext{}
for {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
req, err := srv.Recv()
if err == io.EOF {
return nil
}
if err != nil {
// This error occurs very frequently, though it doesn't seem to have any impact.
// TODO Figure out if we can remove this noise.
klog.V(3).Infof("cannot receive stream request: %v", err)
return status.Errorf(codes.Unknown, "cannot receive stream request: %v", err)
}
resp := &extProcPb.ProcessingResponse{}
switch v := req.Request.(type) {
case *extProcPb.ProcessingRequest_RequestHeaders:
resp = HandleRequestHeaders(reqCtx, req)
klog.V(3).Infof("Request context after HandleRequestHeaders: %v", reqCtx)
case *extProcPb.ProcessingRequest_RequestBody:
resp, err = s.HandleRequestBody(reqCtx, req)
klog.V(3).Infof("Request context after HandleRequestBody: %v", reqCtx)
case *extProcPb.ProcessingRequest_ResponseHeaders:
resp, err = s.HandleResponseHeaders(reqCtx, req)
klog.V(3).Infof("Request context after HandleResponseHeaders: %v", reqCtx)
default:
klog.Errorf("Unknown Request type %+v", v)
return status.Error(codes.Unknown, "unknown request type")
}
if err != nil {
klog.Errorf("failed to process request: %v", err)
switch status.Code(err) {
// This code can be returned by scheduler when there is no capacity for sheddable
// requests.
case codes.ResourceExhausted:
resp = &extProcPb.ProcessingResponse{
Response: &extProcPb.ProcessingResponse_ImmediateResponse{
ImmediateResponse: &extProcPb.ImmediateResponse{
Status: &envoyTypePb.HttpStatus{
Code: envoyTypePb.StatusCode_TooManyRequests,
},
},
},
}
default:
return status.Errorf(status.Code(err), "failed to handle request: %w", err)
}
}
klog.V(3).Infof("response: %v", resp)
if err := srv.Send(resp); err != nil {
klog.Errorf("send error %v", err)
return status.Errorf(codes.Unknown, "failed to send response back to Envoy: %w", err)
}
}
}
// RequestContext stores context information during the life time of an HTTP request.
type RequestContext struct {
TargetPod *backend.Pod
Model string
}