Skip to content

Commit 34365d5

Browse files
committed
fix: refactor coder logger to allow flush without deadlock
1 parent eb45adc commit 34365d5

File tree

2 files changed

+215
-82
lines changed

2 files changed

+215
-82
lines changed

log/coder.go

+52-45
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
"fmt"
77
"net/url"
88
"os"
9+
"sync"
910
"time"
1011

1112
"cdr.dev/slog"
@@ -27,13 +28,14 @@ var (
2728
minAgentAPIV2 = "v2.9"
2829
)
2930

30-
// Coder establishes a connection to the Coder instance located at
31-
// coderURL and authenticates using token. It then establishes a
32-
// dRPC connection to the Agent API and begins sending logs.
33-
// If the version of Coder does not support the Agent API, it will
34-
// fall back to using the PatchLogs endpoint.
35-
// The returned function is used to block until all logs are sent.
36-
func Coder(ctx context.Context, coderURL *url.URL, token string) (Func, func(), error) {
31+
// Coder establishes a connection to the Coder instance located at coderURL and
32+
// authenticates using token. It then establishes a dRPC connection to the Agent
33+
// API and begins sending logs. If the version of Coder does not support the
34+
// Agent API, it will fall back to using the PatchLogs endpoint. The closer is
35+
// used to close the logger and to wait at most logSendGracePeriod for logs to
36+
// be sent. Cancelling the context will close the logs immediately without
37+
// waiting for logs to be sent.
38+
func Coder(ctx context.Context, coderURL *url.URL, token string) (logger Func, closer func(), err error) {
3739
// To troubleshoot issues, we need some way of logging.
3840
metaLogger := slog.Make(sloghuman.Sink(os.Stderr))
3941
defer metaLogger.Sync()
@@ -44,18 +46,20 @@ func Coder(ctx context.Context, coderURL *url.URL, token string) (Func, func(),
4446
}
4547
if semver.Compare(semver.MajorMinor(bi.Version), minAgentAPIV2) < 0 {
4648
metaLogger.Warn(ctx, "Detected Coder version incompatible with AgentAPI v2, falling back to deprecated API", slog.F("coder_version", bi.Version))
47-
sendLogs, flushLogs := sendLogsV1(ctx, client, metaLogger.Named("send_logs_v1"))
48-
return sendLogs, flushLogs, nil
49+
logger, closer = sendLogsV1(ctx, client, metaLogger.Named("send_logs_v1"))
50+
return logger, closer, nil
4951
}
52+
// Note that ctx passed to initRPC will be inherited by the
53+
// underlying connection, nothing we can do about that here.
5054
dac, err := initRPC(ctx, client, metaLogger.Named("init_rpc"))
5155
if err != nil {
5256
// Logged externally
5357
return nil, nil, fmt.Errorf("init coder rpc client: %w", err)
5458
}
5559
ls := agentsdk.NewLogSender(metaLogger.Named("coder_log_sender"))
5660
metaLogger.Warn(ctx, "Sending logs via AgentAPI v2", slog.F("coder_version", bi.Version))
57-
sendLogs, doneFunc := sendLogsV2(ctx, dac, ls, metaLogger.Named("send_logs_v2"))
58-
return sendLogs, doneFunc, nil
61+
logger, closer = sendLogsV2(ctx, dac, ls, metaLogger.Named("send_logs_v2"))
62+
return logger, closer, nil
5963
}
6064

6165
type coderLogSender interface {
@@ -74,13 +78,14 @@ func initClient(coderURL *url.URL, token string) *agentsdk.Client {
7478
func initRPC(ctx context.Context, client *agentsdk.Client, l slog.Logger) (proto.DRPCAgentClient20, error) {
7579
var c proto.DRPCAgentClient20
7680
var err error
77-
retryCtx, retryCancel := context.WithTimeout(context.Background(), rpcConnectTimeout)
81+
retryCtx, retryCancel := context.WithTimeout(ctx, rpcConnectTimeout)
7882
defer retryCancel()
7983
attempts := 0
8084
for r := retry.New(100*time.Millisecond, time.Second); r.Wait(retryCtx); {
8185
attempts++
8286
// Maximize compatibility.
8387
c, err = client.ConnectRPC20(ctx)
88+
l.Info(ctx, "Connecting to Coder", slog.F("attempt", attempts), slog.F("error", err))
8489
if err != nil {
8590
l.Debug(ctx, "Failed to connect to Coder", slog.F("error", err), slog.F("attempt", attempts))
8691
continue
@@ -95,65 +100,67 @@ func initRPC(ctx context.Context, client *agentsdk.Client, l slog.Logger) (proto
95100

96101
// sendLogsV1 uses the PatchLogs endpoint to send logs.
97102
// This is deprecated, but required for backward compatibility with older versions of Coder.
98-
func sendLogsV1(ctx context.Context, client *agentsdk.Client, l slog.Logger) (Func, func()) {
103+
func sendLogsV1(ctx context.Context, client *agentsdk.Client, l slog.Logger) (logger Func, closer func()) {
99104
// nolint: staticcheck // required for backwards compatibility
100-
sendLogs, flushLogs := agentsdk.LogsSender(agentsdk.ExternalLogSourceID, client.PatchLogs, slog.Logger{})
105+
sendLog, flushAndClose := agentsdk.LogsSender(agentsdk.ExternalLogSourceID, client.PatchLogs, slog.Logger{})
106+
var mu sync.Mutex
101107
return func(lvl Level, msg string, args ...any) {
102108
log := agentsdk.Log{
103109
CreatedAt: time.Now(),
104110
Output: fmt.Sprintf(msg, args...),
105111
Level: codersdk.LogLevel(lvl),
106112
}
107-
if err := sendLogs(ctx, log); err != nil {
113+
mu.Lock()
114+
defer mu.Unlock()
115+
if err := sendLog(ctx, log); err != nil {
108116
l.Warn(ctx, "failed to send logs to Coder", slog.Error(err))
109117
}
110118
}, func() {
111-
if err := flushLogs(ctx); err != nil {
119+
ctx, cancel := context.WithTimeout(ctx, logSendGracePeriod)
120+
defer cancel()
121+
if err := flushAndClose(ctx); err != nil {
112122
l.Warn(ctx, "failed to flush logs", slog.Error(err))
113123
}
114124
}
115125
}
116126

117127
// sendLogsV2 uses the v2 agent API to send logs. Only compatibile with coder versions >= 2.9.
118-
func sendLogsV2(ctx context.Context, dest agentsdk.LogDest, ls coderLogSender, l slog.Logger) (Func, func()) {
128+
func sendLogsV2(ctx context.Context, dest agentsdk.LogDest, ls coderLogSender, l slog.Logger) (logger Func, closer func()) {
129+
sendCtx, sendCancel := context.WithCancel(ctx)
119130
done := make(chan struct{})
120131
uid := uuid.New()
121132
go func() {
122133
defer close(done)
123-
if err := ls.SendLoop(ctx, dest); err != nil {
134+
if err := ls.SendLoop(sendCtx, dest); err != nil {
124135
if !errors.Is(err, context.Canceled) {
125136
l.Warn(ctx, "failed to send logs to Coder", slog.Error(err))
126137
}
127138
}
128-
129-
// Wait for up to 10 seconds for logs to finish sending.
130-
sendCtx, sendCancel := context.WithTimeout(context.Background(), logSendGracePeriod)
131-
defer sendCancel()
132-
// Try once more to send any pending logs
133-
if err := ls.SendLoop(sendCtx, dest); err != nil {
134-
if !errors.Is(err, context.DeadlineExceeded) {
135-
l.Warn(ctx, "failed to send remaining logs to Coder", slog.Error(err))
136-
}
137-
}
138-
ls.Flush(uid)
139-
if err := ls.WaitUntilEmpty(sendCtx); err != nil {
140-
if !errors.Is(err, context.DeadlineExceeded) {
141-
l.Warn(ctx, "log sender did not empty", slog.Error(err))
142-
}
143-
}
144139
}()
145140

146-
logFunc := func(l Level, msg string, args ...any) {
147-
ls.Enqueue(uid, agentsdk.Log{
148-
CreatedAt: time.Now(),
149-
Output: fmt.Sprintf(msg, args...),
150-
Level: codersdk.LogLevel(l),
151-
})
152-
}
141+
var closeOnce sync.Once
142+
return func(l Level, msg string, args ...any) {
143+
ls.Enqueue(uid, agentsdk.Log{
144+
CreatedAt: time.Now(),
145+
Output: fmt.Sprintf(msg, args...),
146+
Level: codersdk.LogLevel(l),
147+
})
148+
}, func() {
149+
closeOnce.Do(func() {
150+
// Trigger a flush and wait for logs to be sent.
151+
ls.Flush(uid)
152+
ctx, cancel := context.WithTimeout(ctx, logSendGracePeriod)
153+
defer cancel()
154+
err := ls.WaitUntilEmpty(ctx)
155+
if err != nil {
156+
l.Warn(ctx, "log sender did not empty", slog.Error(err))
157+
}
153158

154-
doneFunc := func() {
155-
<-done
156-
}
159+
// Stop the send loop.
160+
sendCancel()
161+
})
157162

158-
return logFunc, doneFunc
163+
// Wait for the send loop to finish.
164+
<-done
165+
}
159166
}

0 commit comments

Comments
 (0)