Skip to content

Commit 17bfc89

Browse files
Make extension use RuntimeDone events
1 parent b11f8b4 commit 17bfc89

File tree

2 files changed

+35
-16
lines changed

2 files changed

+35
-16
lines changed

go-example-logs-api-extension/logsapi/client.go

Lines changed: 22 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,13 @@ const (
4141
Extension EventType = "extension"
4242
)
4343

44+
type SubEventType string
45+
46+
const (
47+
// RuntimeDone event is sent when lambda function is finished it's execution
48+
RuntimeDone SubEventType = "platform.runtimeDone"
49+
)
50+
4451
// BufferingCfg is the configuration set for receiving logs from Logs API. Whichever of the conditions below is met first, the logs will be sent
4552
type BufferingCfg struct {
4653
// MaxItems is the maximum number of events to be buffered in memory. (default: 10000, minimum: 1000, maximum: 10000)
@@ -86,11 +93,19 @@ type Destination struct {
8693
Encoding HttpEncoding `json:"encoding"`
8794
}
8895

96+
type SchemaVersion string
97+
98+
const (
99+
SchemaVersion20210318 = "2021-03-18"
100+
SchemaVersionLatest = SchemaVersion20210318
101+
)
102+
89103
// SubscribeRequest is the request body that is sent to Logs API on subscribe
90104
type SubscribeRequest struct {
91-
EventTypes []EventType `json:"types"`
92-
BufferingCfg BufferingCfg `json:"buffering"`
93-
Destination Destination `json:"destination"`
105+
SchemaVersion SchemaVersion `json:"schemaVersion"`
106+
EventTypes []EventType `json:"types"`
107+
BufferingCfg BufferingCfg `json:"buffering"`
108+
Destination Destination `json:"destination"`
94109
}
95110

96111
// SubscribeResponse is the response body that is received from Logs API on subscribe
@@ -103,9 +118,10 @@ func (c *Client) Subscribe(types []EventType, bufferingCfg BufferingCfg, destina
103118

104119
data, err := json.Marshal(
105120
&SubscribeRequest{
106-
EventTypes: types,
107-
BufferingCfg: bufferingCfg,
108-
Destination: destination,
121+
SchemaVersion: SchemaVersionLatest,
122+
EventTypes: types,
123+
BufferingCfg: bufferingCfg,
124+
Destination: destination,
109125
})
110126
if err != nil {
111127
return nil, errors.WithMessage(err, "failed to marshal SubscribeRequest")

go-example-logs-api-extension/main.go

Lines changed: 13 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -4,17 +4,18 @@
44
package main
55

66
import (
7+
"aws-lambda-extensions/go-example-logs-api-extension/agent"
8+
"aws-lambda-extensions/go-example-logs-api-extension/extension"
9+
"aws-lambda-extensions/go-example-logs-api-extension/logsapi"
710
"context"
811
"fmt"
12+
"github.com/golang-collections/go-datastructures/queue"
13+
log "github.com/sirupsen/logrus"
914
"os"
1015
"os/signal"
1116
"path"
17+
"strings"
1218
"syscall"
13-
14-
"aws-lambda-extensions/go-example-logs-api-extension/agent"
15-
"aws-lambda-extensions/go-example-logs-api-extension/extension"
16-
"github.com/golang-collections/go-datastructures/queue"
17-
log "github.com/sirupsen/logrus"
1819
)
1920

2021
// INITIAL_QUEUE_SIZE is the initial size set for the synchronous logQueue
@@ -54,14 +55,16 @@ func main() {
5455
// and process the logs from main goroutine (consumer)
5556
logQueue := queue.New(INITIAL_QUEUE_SIZE)
5657
// Helper function to empty the log queue
57-
flushLogQueue := func() {
58-
for !logQueue.Empty() {
58+
var logsStr string = ""
59+
flushLogQueue := func(force bool) {
60+
for !(logQueue.Empty() && (force || strings.Contains(logsStr, string(logsapi.RuntimeDone)))) {
5961
logs, err := logQueue.Get(1)
6062
if err != nil {
6163
logger.Error(printPrefix, err)
6264
return
6365
}
64-
err = logsApiLogger.PushLog(fmt.Sprintf("%v", logs[0]))
66+
logsStr = fmt.Sprintf("%v", logs[0])
67+
err = logsApiLogger.PushLog(logsStr)
6568
if err != nil {
6669
logger.Error(printPrefix, err)
6770
return
@@ -98,11 +101,11 @@ func main() {
98101
return
99102
}
100103
// Flush log queue in here after waking up
101-
flushLogQueue()
104+
flushLogQueue(false)
102105
// Exit if we receive a SHUTDOWN event
103106
if res.EventType == extension.Shutdown {
104107
logger.Info(printPrefix, "Received SHUTDOWN event")
105-
flushLogQueue()
108+
flushLogQueue(true)
106109
logsApiAgent.Shutdown()
107110
logger.Info(printPrefix, "Exiting")
108111
return

0 commit comments

Comments
 (0)