Skip to content

reapply: "fix: refactor coder logger to allow flush without deadlock (#375)" #377

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Sep 30, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 13 additions & 2 deletions cmd/envbuilder/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,15 @@ func envbuilderCmd() serpent.Command {
Options: o.CLI(),
Handler: func(inv *serpent.Invocation) error {
o.SetDefaults()
var preExecs []func()
preExec := func() {
for _, fn := range preExecs {
fn()
}
preExecs = nil
}
defer preExec() // Ensure cleanup in case of error.

o.Logger = log.New(os.Stderr, o.Verbose)
if o.CoderAgentURL != "" {
if o.CoderAgentToken == "" {
Expand All @@ -49,7 +58,9 @@ func envbuilderCmd() serpent.Command {
coderLog, closeLogs, err := log.Coder(inv.Context(), u, o.CoderAgentToken)
if err == nil {
o.Logger = log.Wrap(o.Logger, coderLog)
defer closeLogs()
preExecs = append(preExecs, func() {
closeLogs()
})
// This adds the envbuilder subsystem.
// If telemetry is enabled in a Coder deployment,
// this will be reported and help us understand
Expand Down Expand Up @@ -78,7 +89,7 @@ func envbuilderCmd() serpent.Command {
return nil
}

err := envbuilder.Run(inv.Context(), o)
err := envbuilder.Run(inv.Context(), o, preExec)
if err != nil {
o.Logger(log.LevelError, "error: %s", err)
}
Expand Down
7 changes: 6 additions & 1 deletion envbuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,9 @@ type execArgsInfo struct {
// Logger is the logf to use for all operations.
// Filesystem is the filesystem to use for all operations.
// Defaults to the host filesystem.
func Run(ctx context.Context, opts options.Options) error {
// preExec are any functions that should be called before exec'ing the init
// command. This is useful for ensuring that defers get run.
func Run(ctx context.Context, opts options.Options, preExec ...func()) error {
var args execArgsInfo
// Run in a separate function to ensure all defers run before we
// setuid or exec.
Expand All @@ -103,6 +105,9 @@ func Run(ctx context.Context, opts options.Options) error {
}

opts.Logger(log.LevelInfo, "=== Running the init command %s %+v as the %q user...", opts.InitCommand, args.InitArgs, args.UserInfo.user.Username)
for _, fn := range preExec {
fn()
}

err = syscall.Exec(args.InitCommand, append([]string{args.InitCommand}, args.InitArgs...), args.Environ)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ require (
github.com/gliderlabs/ssh v0.3.7
github.com/go-git/go-billy/v5 v5.5.0
github.com/go-git/go-git/v5 v5.12.0
github.com/google/go-cmp v0.6.0
github.com/google/go-containerregistry v0.20.1
github.com/google/uuid v1.6.0
github.com/hashicorp/go-multierror v1.1.1
Expand Down Expand Up @@ -149,7 +150,6 @@ require (
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect
github.com/golang/protobuf v1.5.4 // indirect
github.com/google/btree v1.1.2 // indirect
github.com/google/go-cmp v0.6.0 // indirect
github.com/google/nftables v0.2.0 // indirect
github.com/google/pprof v0.0.0-20230817174616-7a8ec2ada47b // indirect
github.com/gorilla/handlers v1.5.1 // indirect
Expand Down
67 changes: 67 additions & 0 deletions integration/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ import (
"testing"
"time"

"github.com/coder/coder/v2/codersdk"
"github.com/coder/coder/v2/codersdk/agentsdk"
"github.com/coder/envbuilder"
"github.com/coder/envbuilder/devcontainer/features"
"github.com/coder/envbuilder/internal/magicdir"
Expand Down Expand Up @@ -58,6 +60,71 @@ const (
testImageUbuntu = "localhost:5000/envbuilder-test-ubuntu:latest"
)

func TestLogs(t *testing.T) {
t.Parallel()

token := uuid.NewString()
logsDone := make(chan struct{})

logHandler := func(w http.ResponseWriter, r *http.Request) {
switch r.URL.Path {
case "/api/v2/buildinfo":
w.Header().Set("Content-Type", "application/json")
_, _ = w.Write([]byte(`{"version": "v2.8.9"}`))
return
case "/api/v2/workspaceagents/me/logs":
w.WriteHeader(http.StatusOK)
tokHdr := r.Header.Get(codersdk.SessionTokenHeader)
assert.Equal(t, token, tokHdr)
var req agentsdk.PatchLogs
err := json.NewDecoder(r.Body).Decode(&req)
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
for _, log := range req.Logs {
t.Logf("got log: %+v", log)
if strings.Contains(log.Output, "Closing logs") {
close(logsDone)
return
}
}
return
default:
t.Errorf("unexpected request to %s", r.URL.Path)
w.WriteHeader(http.StatusNotFound)
return
}
}
logSrv := httptest.NewServer(http.HandlerFunc(logHandler))
defer logSrv.Close()

// Ensures that a Git repository with a devcontainer.json is cloned and built.
srv := gittest.CreateGitServer(t, gittest.Options{
Files: map[string]string{
"devcontainer.json": `{
"build": {
"dockerfile": "Dockerfile"
},
}`,
"Dockerfile": fmt.Sprintf(`FROM %s`, testImageUbuntu),
},
})
_, err := runEnvbuilder(t, runOpts{env: []string{
envbuilderEnv("GIT_URL", srv.URL),
"CODER_AGENT_URL=" + logSrv.URL,
"CODER_AGENT_TOKEN=" + token,
}})
require.NoError(t, err)
ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
defer cancel()
select {
case <-ctx.Done():
t.Fatal("timed out waiting for logs")
case <-logsDone:
}
}

func TestInitScriptInitCommand(t *testing.T) {
t.Parallel()

Expand Down
115 changes: 70 additions & 45 deletions log/coder.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"
"net/url"
"os"
"sync"
"time"

"cdr.dev/slog"
Expand All @@ -27,13 +28,14 @@ var (
minAgentAPIV2 = "v2.9"
)

// Coder establishes a connection to the Coder instance located at
// coderURL and authenticates using token. It then establishes a
// dRPC connection to the Agent API and begins sending logs.
// If the version of Coder does not support the Agent API, it will
// fall back to using the PatchLogs endpoint.
// The returned function is used to block until all logs are sent.
func Coder(ctx context.Context, coderURL *url.URL, token string) (Func, func(), error) {
// Coder establishes a connection to the Coder instance located at coderURL and
// authenticates using token. It then establishes a dRPC connection to the Agent
// API and begins sending logs. If the version of Coder does not support the
// Agent API, it will fall back to using the PatchLogs endpoint. The closer is
// used to close the logger and to wait at most logSendGracePeriod for logs to
// be sent. Cancelling the context will close the logs immediately without
// waiting for logs to be sent.
func Coder(ctx context.Context, coderURL *url.URL, token string) (logger Func, closer func(), err error) {
// To troubleshoot issues, we need some way of logging.
metaLogger := slog.Make(sloghuman.Sink(os.Stderr))
defer metaLogger.Sync()
Expand All @@ -44,18 +46,39 @@ func Coder(ctx context.Context, coderURL *url.URL, token string) (Func, func(),
}
if semver.Compare(semver.MajorMinor(bi.Version), minAgentAPIV2) < 0 {
metaLogger.Warn(ctx, "Detected Coder version incompatible with AgentAPI v2, falling back to deprecated API", slog.F("coder_version", bi.Version))
sendLogs, flushLogs := sendLogsV1(ctx, client, metaLogger.Named("send_logs_v1"))
return sendLogs, flushLogs, nil
logger, closer = sendLogsV1(ctx, client, metaLogger.Named("send_logs_v1"))
return logger, closer, nil
}

// Create a new context so we can ensure the connection is torn down.
ctx, cancel := context.WithCancel(ctx)
defer func() {
if err != nil {
cancel()
}
}()
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Review: This is a cautious addition to ensure there's no internal logic in client.ConnectRPC20(ctx) that might block or leave the connection open. This is to make it safer to call drpcconn close later.

// Note that ctx passed to initRPC will be inherited by the
// underlying connection, nothing we can do about that here.
dac, err := initRPC(ctx, client, metaLogger.Named("init_rpc"))
if err != nil {
// Logged externally
return nil, nil, fmt.Errorf("init coder rpc client: %w", err)
}
ls := agentsdk.NewLogSender(metaLogger.Named("coder_log_sender"))
metaLogger.Warn(ctx, "Sending logs via AgentAPI v2", slog.F("coder_version", bi.Version))
sendLogs, doneFunc := sendLogsV2(ctx, dac, ls, metaLogger.Named("send_logs_v2"))
return sendLogs, doneFunc, nil
logger, loggerCloser := sendLogsV2(ctx, dac, ls, metaLogger.Named("send_logs_v2"))
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Review: This used to be called closer, i.e. return variable name. After a last minute change in #375 we wrapped this function to ensure we can close drpcconn as well. This resulted in the variable being reassigned and calling itself.

 goroutine stack exceeds 1000000000-byte limit
runtime: sp=0xc048806380 stack=[0xc048806000, 0xc068806000]
fatal error: stack overflow

runtime stack:
runtime.throw({0x2088d94?, 0x200000001?})
	/usr/local/go/src/runtime/panic.go:1023 +0x5c fp=0x7ffdb0730768 sp=0x7ffdb0730738 pc=0x43e05c
runtime.newstack()
	/usr/local/go/src/runtime/stack.go:1103 +0x5bd fp=0x7ffdb0730918 sp=0x7ffdb0730768 pc=0x45a77d
runtime.morestack()
	/usr/local/go/src/runtime/asm_amd64.s:616 +0x7a fp=0x7ffdb0730920 sp=0x7ffdb0730918 pc=0x477b3a

var closeOnce sync.Once
closer = func() {
loggerCloser()

closeOnce.Do(func() {
// Typically cancel would be after Close, but we want to be
// sure there's nothing that might block on Close.
cancel()
_ = dac.DRPCConn().Close()
})
}
return logger, closer, nil
}

type coderLogSender interface {
Expand All @@ -74,7 +97,7 @@ func initClient(coderURL *url.URL, token string) *agentsdk.Client {
func initRPC(ctx context.Context, client *agentsdk.Client, l slog.Logger) (proto.DRPCAgentClient20, error) {
var c proto.DRPCAgentClient20
var err error
retryCtx, retryCancel := context.WithTimeout(context.Background(), rpcConnectTimeout)
retryCtx, retryCancel := context.WithTimeout(ctx, rpcConnectTimeout)
defer retryCancel()
attempts := 0
for r := retry.New(100*time.Millisecond, time.Second); r.Wait(retryCtx); {
Expand All @@ -95,65 +118,67 @@ func initRPC(ctx context.Context, client *agentsdk.Client, l slog.Logger) (proto

// sendLogsV1 uses the PatchLogs endpoint to send logs.
// This is deprecated, but required for backward compatibility with older versions of Coder.
func sendLogsV1(ctx context.Context, client *agentsdk.Client, l slog.Logger) (Func, func()) {
func sendLogsV1(ctx context.Context, client *agentsdk.Client, l slog.Logger) (logger Func, closer func()) {
// nolint: staticcheck // required for backwards compatibility
sendLogs, flushLogs := agentsdk.LogsSender(agentsdk.ExternalLogSourceID, client.PatchLogs, slog.Logger{})
sendLog, flushAndClose := agentsdk.LogsSender(agentsdk.ExternalLogSourceID, client.PatchLogs, slog.Logger{})
var mu sync.Mutex
return func(lvl Level, msg string, args ...any) {
log := agentsdk.Log{
CreatedAt: time.Now(),
Output: fmt.Sprintf(msg, args...),
Level: codersdk.LogLevel(lvl),
}
if err := sendLogs(ctx, log); err != nil {
mu.Lock()
defer mu.Unlock()
if err := sendLog(ctx, log); err != nil {
l.Warn(ctx, "failed to send logs to Coder", slog.Error(err))
}
}, func() {
if err := flushLogs(ctx); err != nil {
ctx, cancel := context.WithTimeout(ctx, logSendGracePeriod)
defer cancel()
if err := flushAndClose(ctx); err != nil {
l.Warn(ctx, "failed to flush logs", slog.Error(err))
}
}
}

// sendLogsV2 uses the v2 agent API to send logs. Only compatibile with coder versions >= 2.9.
func sendLogsV2(ctx context.Context, dest agentsdk.LogDest, ls coderLogSender, l slog.Logger) (Func, func()) {
func sendLogsV2(ctx context.Context, dest agentsdk.LogDest, ls coderLogSender, l slog.Logger) (logger Func, closer func()) {
sendCtx, sendCancel := context.WithCancel(ctx)
done := make(chan struct{})
uid := uuid.New()
go func() {
defer close(done)
if err := ls.SendLoop(ctx, dest); err != nil {
if err := ls.SendLoop(sendCtx, dest); err != nil {
if !errors.Is(err, context.Canceled) {
l.Warn(ctx, "failed to send logs to Coder", slog.Error(err))
}
}

// Wait for up to 10 seconds for logs to finish sending.
sendCtx, sendCancel := context.WithTimeout(context.Background(), logSendGracePeriod)
defer sendCancel()
// Try once more to send any pending logs
if err := ls.SendLoop(sendCtx, dest); err != nil {
if !errors.Is(err, context.DeadlineExceeded) {
l.Warn(ctx, "failed to send remaining logs to Coder", slog.Error(err))
}
}
ls.Flush(uid)
if err := ls.WaitUntilEmpty(sendCtx); err != nil {
if !errors.Is(err, context.DeadlineExceeded) {
l.Warn(ctx, "log sender did not empty", slog.Error(err))
}
}
}()

logFunc := func(l Level, msg string, args ...any) {
ls.Enqueue(uid, agentsdk.Log{
CreatedAt: time.Now(),
Output: fmt.Sprintf(msg, args...),
Level: codersdk.LogLevel(l),
})
}
var closeOnce sync.Once
return func(l Level, msg string, args ...any) {
ls.Enqueue(uid, agentsdk.Log{
CreatedAt: time.Now(),
Output: fmt.Sprintf(msg, args...),
Level: codersdk.LogLevel(l),
})
}, func() {
closeOnce.Do(func() {
// Trigger a flush and wait for logs to be sent.
ls.Flush(uid)
ctx, cancel := context.WithTimeout(ctx, logSendGracePeriod)
defer cancel()
err := ls.WaitUntilEmpty(ctx)
if err != nil {
l.Warn(ctx, "log sender did not empty", slog.Error(err))
}

doneFunc := func() {
<-done
}
// Stop the send loop.
sendCancel()
})

return logFunc, doneFunc
// Wait for the send loop to finish.
<-done
}
}
Loading
Loading