-
Notifications
You must be signed in to change notification settings - Fork 43
reapply: "fix: refactor coder logger to allow flush without deadlock (#375)" #377
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from 2 commits
Commits
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -6,6 +6,7 @@ import ( | |
"fmt" | ||
"net/url" | ||
"os" | ||
"sync" | ||
"time" | ||
|
||
"cdr.dev/slog" | ||
|
@@ -27,13 +28,14 @@ var ( | |
minAgentAPIV2 = "v2.9" | ||
) | ||
|
||
// Coder establishes a connection to the Coder instance located at | ||
// coderURL and authenticates using token. It then establishes a | ||
// dRPC connection to the Agent API and begins sending logs. | ||
// If the version of Coder does not support the Agent API, it will | ||
// fall back to using the PatchLogs endpoint. | ||
// The returned function is used to block until all logs are sent. | ||
func Coder(ctx context.Context, coderURL *url.URL, token string) (Func, func(), error) { | ||
// Coder establishes a connection to the Coder instance located at coderURL and | ||
// authenticates using token. It then establishes a dRPC connection to the Agent | ||
// API and begins sending logs. If the version of Coder does not support the | ||
// Agent API, it will fall back to using the PatchLogs endpoint. The closer is | ||
// used to close the logger and to wait at most logSendGracePeriod for logs to | ||
// be sent. Cancelling the context will close the logs immediately without | ||
// waiting for logs to be sent. | ||
func Coder(ctx context.Context, coderURL *url.URL, token string) (logger Func, closer func(), err error) { | ||
// To troubleshoot issues, we need some way of logging. | ||
metaLogger := slog.Make(sloghuman.Sink(os.Stderr)) | ||
defer metaLogger.Sync() | ||
|
@@ -44,18 +46,39 @@ func Coder(ctx context.Context, coderURL *url.URL, token string) (Func, func(), | |
} | ||
if semver.Compare(semver.MajorMinor(bi.Version), minAgentAPIV2) < 0 { | ||
metaLogger.Warn(ctx, "Detected Coder version incompatible with AgentAPI v2, falling back to deprecated API", slog.F("coder_version", bi.Version)) | ||
sendLogs, flushLogs := sendLogsV1(ctx, client, metaLogger.Named("send_logs_v1")) | ||
return sendLogs, flushLogs, nil | ||
logger, closer = sendLogsV1(ctx, client, metaLogger.Named("send_logs_v1")) | ||
return logger, closer, nil | ||
} | ||
|
||
// Create a new context so we can ensure the connection is torn down. | ||
ctx, cancel := context.WithCancel(ctx) | ||
defer func() { | ||
if err != nil { | ||
cancel() | ||
} | ||
}() | ||
// Note that ctx passed to initRPC will be inherited by the | ||
// underlying connection, nothing we can do about that here. | ||
dac, err := initRPC(ctx, client, metaLogger.Named("init_rpc")) | ||
if err != nil { | ||
// Logged externally | ||
return nil, nil, fmt.Errorf("init coder rpc client: %w", err) | ||
} | ||
ls := agentsdk.NewLogSender(metaLogger.Named("coder_log_sender")) | ||
metaLogger.Warn(ctx, "Sending logs via AgentAPI v2", slog.F("coder_version", bi.Version)) | ||
sendLogs, doneFunc := sendLogsV2(ctx, dac, ls, metaLogger.Named("send_logs_v2")) | ||
return sendLogs, doneFunc, nil | ||
logger, loggerCloser := sendLogsV2(ctx, dac, ls, metaLogger.Named("send_logs_v2")) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Review: This used to be called closer, i.e. return variable name. After a last minute change in #375 we wrapped this function to ensure we can close drpcconn as well. This resulted in the variable being reassigned and calling itself.
|
||
var closeOnce sync.Once | ||
closer = func() { | ||
loggerCloser() | ||
|
||
closeOnce.Do(func() { | ||
// Typically cancel would be after Close, but we want to be | ||
// sure there's nothing that might block on Close. | ||
cancel() | ||
_ = dac.DRPCConn().Close() | ||
}) | ||
} | ||
return logger, closer, nil | ||
} | ||
|
||
type coderLogSender interface { | ||
|
@@ -74,7 +97,7 @@ func initClient(coderURL *url.URL, token string) *agentsdk.Client { | |
func initRPC(ctx context.Context, client *agentsdk.Client, l slog.Logger) (proto.DRPCAgentClient20, error) { | ||
var c proto.DRPCAgentClient20 | ||
var err error | ||
retryCtx, retryCancel := context.WithTimeout(context.Background(), rpcConnectTimeout) | ||
retryCtx, retryCancel := context.WithTimeout(ctx, rpcConnectTimeout) | ||
defer retryCancel() | ||
attempts := 0 | ||
for r := retry.New(100*time.Millisecond, time.Second); r.Wait(retryCtx); { | ||
|
@@ -95,65 +118,67 @@ func initRPC(ctx context.Context, client *agentsdk.Client, l slog.Logger) (proto | |
|
||
// sendLogsV1 uses the PatchLogs endpoint to send logs. | ||
// This is deprecated, but required for backward compatibility with older versions of Coder. | ||
func sendLogsV1(ctx context.Context, client *agentsdk.Client, l slog.Logger) (Func, func()) { | ||
func sendLogsV1(ctx context.Context, client *agentsdk.Client, l slog.Logger) (logger Func, closer func()) { | ||
// nolint: staticcheck // required for backwards compatibility | ||
sendLogs, flushLogs := agentsdk.LogsSender(agentsdk.ExternalLogSourceID, client.PatchLogs, slog.Logger{}) | ||
sendLog, flushAndClose := agentsdk.LogsSender(agentsdk.ExternalLogSourceID, client.PatchLogs, slog.Logger{}) | ||
var mu sync.Mutex | ||
return func(lvl Level, msg string, args ...any) { | ||
log := agentsdk.Log{ | ||
CreatedAt: time.Now(), | ||
Output: fmt.Sprintf(msg, args...), | ||
Level: codersdk.LogLevel(lvl), | ||
} | ||
if err := sendLogs(ctx, log); err != nil { | ||
mu.Lock() | ||
defer mu.Unlock() | ||
if err := sendLog(ctx, log); err != nil { | ||
l.Warn(ctx, "failed to send logs to Coder", slog.Error(err)) | ||
} | ||
}, func() { | ||
if err := flushLogs(ctx); err != nil { | ||
ctx, cancel := context.WithTimeout(ctx, logSendGracePeriod) | ||
defer cancel() | ||
if err := flushAndClose(ctx); err != nil { | ||
l.Warn(ctx, "failed to flush logs", slog.Error(err)) | ||
} | ||
} | ||
} | ||
|
||
// sendLogsV2 uses the v2 agent API to send logs. Only compatibile with coder versions >= 2.9. | ||
func sendLogsV2(ctx context.Context, dest agentsdk.LogDest, ls coderLogSender, l slog.Logger) (Func, func()) { | ||
func sendLogsV2(ctx context.Context, dest agentsdk.LogDest, ls coderLogSender, l slog.Logger) (logger Func, closer func()) { | ||
sendCtx, sendCancel := context.WithCancel(ctx) | ||
done := make(chan struct{}) | ||
uid := uuid.New() | ||
go func() { | ||
defer close(done) | ||
if err := ls.SendLoop(ctx, dest); err != nil { | ||
if err := ls.SendLoop(sendCtx, dest); err != nil { | ||
if !errors.Is(err, context.Canceled) { | ||
l.Warn(ctx, "failed to send logs to Coder", slog.Error(err)) | ||
} | ||
} | ||
|
||
// Wait for up to 10 seconds for logs to finish sending. | ||
sendCtx, sendCancel := context.WithTimeout(context.Background(), logSendGracePeriod) | ||
defer sendCancel() | ||
// Try once more to send any pending logs | ||
if err := ls.SendLoop(sendCtx, dest); err != nil { | ||
if !errors.Is(err, context.DeadlineExceeded) { | ||
l.Warn(ctx, "failed to send remaining logs to Coder", slog.Error(err)) | ||
} | ||
} | ||
ls.Flush(uid) | ||
if err := ls.WaitUntilEmpty(sendCtx); err != nil { | ||
if !errors.Is(err, context.DeadlineExceeded) { | ||
l.Warn(ctx, "log sender did not empty", slog.Error(err)) | ||
} | ||
} | ||
}() | ||
|
||
logFunc := func(l Level, msg string, args ...any) { | ||
ls.Enqueue(uid, agentsdk.Log{ | ||
CreatedAt: time.Now(), | ||
Output: fmt.Sprintf(msg, args...), | ||
Level: codersdk.LogLevel(l), | ||
}) | ||
} | ||
var closeOnce sync.Once | ||
return func(l Level, msg string, args ...any) { | ||
ls.Enqueue(uid, agentsdk.Log{ | ||
CreatedAt: time.Now(), | ||
Output: fmt.Sprintf(msg, args...), | ||
Level: codersdk.LogLevel(l), | ||
}) | ||
}, func() { | ||
closeOnce.Do(func() { | ||
// Trigger a flush and wait for logs to be sent. | ||
ls.Flush(uid) | ||
ctx, cancel := context.WithTimeout(ctx, logSendGracePeriod) | ||
defer cancel() | ||
err := ls.WaitUntilEmpty(ctx) | ||
if err != nil { | ||
l.Warn(ctx, "log sender did not empty", slog.Error(err)) | ||
} | ||
|
||
doneFunc := func() { | ||
<-done | ||
} | ||
// Stop the send loop. | ||
sendCancel() | ||
}) | ||
|
||
return logFunc, doneFunc | ||
// Wait for the send loop to finish. | ||
<-done | ||
} | ||
} |
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Review: This is a cautious addition to ensure there's no internal logic in
client.ConnectRPC20(ctx)
that might block or leave the connection open. This is to make it safer to call drpcconn close later.