Skip to content

fix: refactor coder logger to allow flush without deadlock #375

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 8 commits into from
Sep 30, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 11 additions & 1 deletion cmd/envbuilder/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,12 @@ func envbuilderCmd() serpent.Command {
Options: o.CLI(),
Handler: func(inv *serpent.Invocation) error {
o.SetDefaults()
var preExec []func()
defer func() { // Ensure cleanup in case of error.
for _, fn := range preExec {
fn()
}
}()
o.Logger = log.New(os.Stderr, o.Verbose)
if o.CoderAgentURL != "" {
if o.CoderAgentToken == "" {
Expand All @@ -50,6 +56,10 @@ func envbuilderCmd() serpent.Command {
if err == nil {
o.Logger = log.Wrap(o.Logger, coderLog)
defer closeLogs()
preExec = append(preExec, func() {
o.Logger(log.LevelInfo, "Closing logs")
closeLogs()
})
// This adds the envbuilder subsystem.
// If telemetry is enabled in a Coder deployment,
// this will be reported and help us understand
Expand Down Expand Up @@ -78,7 +88,7 @@ func envbuilderCmd() serpent.Command {
return nil
}

err := envbuilder.Run(inv.Context(), o)
err := envbuilder.Run(inv.Context(), o, preExec...)
if err != nil {
o.Logger(log.LevelError, "error: %s", err)
}
Expand Down
7 changes: 6 additions & 1 deletion envbuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,9 @@ type execArgsInfo struct {
// Logger is the logf to use for all operations.
// Filesystem is the filesystem to use for all operations.
// Defaults to the host filesystem.
func Run(ctx context.Context, opts options.Options) error {
// preExec are any functions that should be called before exec'ing the init
// command. This is useful for ensuring that defers get run.
func Run(ctx context.Context, opts options.Options, preExec ...func()) error {
var args execArgsInfo
// Run in a separate function to ensure all defers run before we
// setuid or exec.
Expand All @@ -103,6 +105,9 @@ func Run(ctx context.Context, opts options.Options) error {
}

opts.Logger(log.LevelInfo, "=== Running the init command %s %+v as the %q user...", opts.InitCommand, args.InitArgs, args.UserInfo.user.Username)
for _, fn := range preExec {
fn()
}

err = syscall.Exec(args.InitCommand, append([]string{args.InitCommand}, args.InitArgs...), args.Environ)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ require (
github.com/gliderlabs/ssh v0.3.7
github.com/go-git/go-billy/v5 v5.5.0
github.com/go-git/go-git/v5 v5.12.0
github.com/google/go-cmp v0.6.0
github.com/google/go-containerregistry v0.20.1
github.com/google/uuid v1.6.0
github.com/hashicorp/go-multierror v1.1.1
Expand Down Expand Up @@ -149,7 +150,6 @@ require (
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect
github.com/golang/protobuf v1.5.4 // indirect
github.com/google/btree v1.1.2 // indirect
github.com/google/go-cmp v0.6.0 // indirect
github.com/google/nftables v0.2.0 // indirect
github.com/google/pprof v0.0.0-20230817174616-7a8ec2ada47b // indirect
github.com/gorilla/handlers v1.5.1 // indirect
Expand Down
67 changes: 67 additions & 0 deletions integration/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ import (
"testing"
"time"

"github.com/coder/coder/v2/codersdk"
"github.com/coder/coder/v2/codersdk/agentsdk"
"github.com/coder/envbuilder"
"github.com/coder/envbuilder/devcontainer/features"
"github.com/coder/envbuilder/internal/magicdir"
Expand Down Expand Up @@ -58,6 +60,71 @@ const (
testImageUbuntu = "localhost:5000/envbuilder-test-ubuntu:latest"
)

func TestLogs(t *testing.T) {
t.Parallel()

token := uuid.NewString()
logsDone := make(chan struct{})

logHandler := func(w http.ResponseWriter, r *http.Request) {
switch r.URL.Path {
case "/api/v2/buildinfo":
w.Header().Set("Content-Type", "application/json")
_, _ = w.Write([]byte(`{"version": "v2.8.9"}`))
return
case "/api/v2/workspaceagents/me/logs":
w.WriteHeader(http.StatusOK)
tokHdr := r.Header.Get(codersdk.SessionTokenHeader)
assert.Equal(t, token, tokHdr)
var req agentsdk.PatchLogs
err := json.NewDecoder(r.Body).Decode(&req)
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
for _, log := range req.Logs {
t.Logf("got log: %+v", log)
if strings.Contains(log.Output, "Closing logs") {
close(logsDone)
return
}
}
return
default:
t.Errorf("unexpected request to %s", r.URL.Path)
w.WriteHeader(http.StatusNotFound)
return
}
}
logSrv := httptest.NewServer(http.HandlerFunc(logHandler))
defer logSrv.Close()

// Ensures that a Git repository with a devcontainer.json is cloned and built.
srv := gittest.CreateGitServer(t, gittest.Options{
Files: map[string]string{
"devcontainer.json": `{
"build": {
"dockerfile": "Dockerfile"
},
}`,
"Dockerfile": fmt.Sprintf(`FROM %s`, testImageUbuntu),
},
})
_, err := runEnvbuilder(t, runOpts{env: []string{
envbuilderEnv("GIT_URL", srv.URL),
"CODER_AGENT_URL=" + logSrv.URL,
"CODER_AGENT_TOKEN=" + token,
}})
require.NoError(t, err)
ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
defer cancel()
select {
case <-ctx.Done():
t.Fatal("timed out waiting for logs")
case <-logsDone:
}
}

func TestInitScriptInitCommand(t *testing.T) {
t.Parallel()

Expand Down
97 changes: 52 additions & 45 deletions log/coder.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"
"net/url"
"os"
"sync"
"time"

"cdr.dev/slog"
Expand All @@ -27,13 +28,14 @@ var (
minAgentAPIV2 = "v2.9"
)

// Coder establishes a connection to the Coder instance located at
// coderURL and authenticates using token. It then establishes a
// dRPC connection to the Agent API and begins sending logs.
// If the version of Coder does not support the Agent API, it will
// fall back to using the PatchLogs endpoint.
// The returned function is used to block until all logs are sent.
func Coder(ctx context.Context, coderURL *url.URL, token string) (Func, func(), error) {
// Coder establishes a connection to the Coder instance located at coderURL and
// authenticates using token. It then establishes a dRPC connection to the Agent
// API and begins sending logs. If the version of Coder does not support the
// Agent API, it will fall back to using the PatchLogs endpoint. The closer is
// used to close the logger and to wait at most logSendGracePeriod for logs to
// be sent. Cancelling the context will close the logs immediately without
// waiting for logs to be sent.
func Coder(ctx context.Context, coderURL *url.URL, token string) (logger Func, closer func(), err error) {
// To troubleshoot issues, we need some way of logging.
metaLogger := slog.Make(sloghuman.Sink(os.Stderr))
defer metaLogger.Sync()
Expand All @@ -44,18 +46,20 @@ func Coder(ctx context.Context, coderURL *url.URL, token string) (Func, func(),
}
if semver.Compare(semver.MajorMinor(bi.Version), minAgentAPIV2) < 0 {
metaLogger.Warn(ctx, "Detected Coder version incompatible with AgentAPI v2, falling back to deprecated API", slog.F("coder_version", bi.Version))
sendLogs, flushLogs := sendLogsV1(ctx, client, metaLogger.Named("send_logs_v1"))
return sendLogs, flushLogs, nil
logger, closer = sendLogsV1(ctx, client, metaLogger.Named("send_logs_v1"))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice 👍 I found those previous names hard to reason about.

return logger, closer, nil
}
// Note that ctx passed to initRPC will be inherited by the
// underlying connection, nothing we can do about that here.
dac, err := initRPC(ctx, client, metaLogger.Named("init_rpc"))
if err != nil {
// Logged externally
return nil, nil, fmt.Errorf("init coder rpc client: %w", err)
}
ls := agentsdk.NewLogSender(metaLogger.Named("coder_log_sender"))
metaLogger.Warn(ctx, "Sending logs via AgentAPI v2", slog.F("coder_version", bi.Version))
sendLogs, doneFunc := sendLogsV2(ctx, dac, ls, metaLogger.Named("send_logs_v2"))
return sendLogs, doneFunc, nil
logger, closer = sendLogsV2(ctx, dac, ls, metaLogger.Named("send_logs_v2"))
return logger, closer, nil
}

type coderLogSender interface {
Expand All @@ -74,13 +78,14 @@ func initClient(coderURL *url.URL, token string) *agentsdk.Client {
func initRPC(ctx context.Context, client *agentsdk.Client, l slog.Logger) (proto.DRPCAgentClient20, error) {
var c proto.DRPCAgentClient20
var err error
retryCtx, retryCancel := context.WithTimeout(context.Background(), rpcConnectTimeout)
retryCtx, retryCancel := context.WithTimeout(ctx, rpcConnectTimeout)
defer retryCancel()
attempts := 0
for r := retry.New(100*time.Millisecond, time.Second); r.Wait(retryCtx); {
attempts++
// Maximize compatibility.
c, err = client.ConnectRPC20(ctx)
l.Info(ctx, "Connecting to Coder", slog.F("attempt", attempts), slog.F("error", err))
if err != nil {
l.Debug(ctx, "Failed to connect to Coder", slog.F("error", err), slog.F("attempt", attempts))
continue
Expand All @@ -95,65 +100,67 @@ func initRPC(ctx context.Context, client *agentsdk.Client, l slog.Logger) (proto

// sendLogsV1 uses the PatchLogs endpoint to send logs.
// This is deprecated, but required for backward compatibility with older versions of Coder.
func sendLogsV1(ctx context.Context, client *agentsdk.Client, l slog.Logger) (Func, func()) {
func sendLogsV1(ctx context.Context, client *agentsdk.Client, l slog.Logger) (logger Func, closer func()) {
// nolint: staticcheck // required for backwards compatibility
sendLogs, flushLogs := agentsdk.LogsSender(agentsdk.ExternalLogSourceID, client.PatchLogs, slog.Logger{})
sendLog, flushAndClose := agentsdk.LogsSender(agentsdk.ExternalLogSourceID, client.PatchLogs, slog.Logger{})
var mu sync.Mutex
return func(lvl Level, msg string, args ...any) {
log := agentsdk.Log{
CreatedAt: time.Now(),
Output: fmt.Sprintf(msg, args...),
Level: codersdk.LogLevel(lvl),
}
if err := sendLogs(ctx, log); err != nil {
mu.Lock()
defer mu.Unlock()
if err := sendLog(ctx, log); err != nil {
l.Warn(ctx, "failed to send logs to Coder", slog.Error(err))
}
}, func() {
if err := flushLogs(ctx); err != nil {
ctx, cancel := context.WithTimeout(ctx, logSendGracePeriod)
defer cancel()
if err := flushAndClose(ctx); err != nil {
l.Warn(ctx, "failed to flush logs", slog.Error(err))
}
}
}

// sendLogsV2 uses the v2 agent API to send logs. Only compatibile with coder versions >= 2.9.
func sendLogsV2(ctx context.Context, dest agentsdk.LogDest, ls coderLogSender, l slog.Logger) (Func, func()) {
func sendLogsV2(ctx context.Context, dest agentsdk.LogDest, ls coderLogSender, l slog.Logger) (logger Func, closer func()) {
sendCtx, sendCancel := context.WithCancel(ctx)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should sendCancel be deferred, too?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nope, we want to keep this context alive until we've flushed or hit the timeout in the closer function.

done := make(chan struct{})
uid := uuid.New()
go func() {
defer close(done)
if err := ls.SendLoop(ctx, dest); err != nil {
if err := ls.SendLoop(sendCtx, dest); err != nil {
if !errors.Is(err, context.Canceled) {
l.Warn(ctx, "failed to send logs to Coder", slog.Error(err))
}
}

// Wait for up to 10 seconds for logs to finish sending.
sendCtx, sendCancel := context.WithTimeout(context.Background(), logSendGracePeriod)
defer sendCancel()
// Try once more to send any pending logs
if err := ls.SendLoop(sendCtx, dest); err != nil {
if !errors.Is(err, context.DeadlineExceeded) {
l.Warn(ctx, "failed to send remaining logs to Coder", slog.Error(err))
}
}
ls.Flush(uid)
if err := ls.WaitUntilEmpty(sendCtx); err != nil {
if !errors.Is(err, context.DeadlineExceeded) {
l.Warn(ctx, "log sender did not empty", slog.Error(err))
}
}
}()

logFunc := func(l Level, msg string, args ...any) {
ls.Enqueue(uid, agentsdk.Log{
CreatedAt: time.Now(),
Output: fmt.Sprintf(msg, args...),
Level: codersdk.LogLevel(l),
})
}
var closeOnce sync.Once
return func(l Level, msg string, args ...any) {
ls.Enqueue(uid, agentsdk.Log{
CreatedAt: time.Now(),
Output: fmt.Sprintf(msg, args...),
Level: codersdk.LogLevel(l),
})
}, func() {
closeOnce.Do(func() {
// Trigger a flush and wait for logs to be sent.
ls.Flush(uid)
ctx, cancel := context.WithTimeout(ctx, logSendGracePeriod)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: should we merge context.WithTimeout and context.WithCancel and use one above?

Copy link
Member Author

@mafredri mafredri Sep 30, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's doable, but I feel their purpose is clearer as-is. We could actually make sendCtx not inherit function ctx instead but it's also pointless because ConnectRPC20 applies ctx to connection, so messages won't go through.

defer cancel()
err := ls.WaitUntilEmpty(ctx)
if err != nil {
l.Warn(ctx, "log sender did not empty", slog.Error(err))
}

doneFunc := func() {
<-done
}
// Stop the send loop.
sendCancel()
})

return logFunc, doneFunc
// Wait for the send loop to finish.
<-done
}
}
Loading