From 8810719b6142f11b4c387e7a942fbfd6aaf13e8b Mon Sep 17 00:00:00 2001 From: Rupayan Ghosh Date: Thu, 5 Jan 2023 21:23:44 +0000 Subject: [PATCH] add timestamps to tlv --- .../runtime/api/client/logging/FrameType.java | 2 +- .../logging/FramedTelemetryLogSink.java | 19 +++++++++++----- .../logging/FramedTelemetryLogSinkTest.java | 22 ++++++++++++++++++- 3 files changed, 35 insertions(+), 8 deletions(-) diff --git a/aws-lambda-java-runtime-interface-client/src/main/java/com/amazonaws/services/lambda/runtime/api/client/logging/FrameType.java b/aws-lambda-java-runtime-interface-client/src/main/java/com/amazonaws/services/lambda/runtime/api/client/logging/FrameType.java index 1284bc64..351a39ed 100644 --- a/aws-lambda-java-runtime-interface-client/src/main/java/com/amazonaws/services/lambda/runtime/api/client/logging/FrameType.java +++ b/aws-lambda-java-runtime-interface-client/src/main/java/com/amazonaws/services/lambda/runtime/api/client/logging/FrameType.java @@ -4,7 +4,7 @@ public enum FrameType { - LOG(0xa55a0001); + LOG(0xa55a0003); private final int val; diff --git a/aws-lambda-java-runtime-interface-client/src/main/java/com/amazonaws/services/lambda/runtime/api/client/logging/FramedTelemetryLogSink.java b/aws-lambda-java-runtime-interface-client/src/main/java/com/amazonaws/services/lambda/runtime/api/client/logging/FramedTelemetryLogSink.java index 7bb80ac2..bb436217 100644 --- a/aws-lambda-java-runtime-interface-client/src/main/java/com/amazonaws/services/lambda/runtime/api/client/logging/FramedTelemetryLogSink.java +++ b/aws-lambda-java-runtime-interface-client/src/main/java/com/amazonaws/services/lambda/runtime/api/client/logging/FramedTelemetryLogSink.java @@ -7,6 +7,7 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.nio.ByteOrder; +import java.time.Instant; /** * FramedTelemetryLogSink implements the logging contract between runtimes and the platform. It implements a simple @@ -14,19 +15,19 @@ * *
  * {@code
- * +----------------------+------------------------+-----------------------+
- * | Frame Type - 4 bytes | Length (len) - 4 bytes | Message - 'len' bytes |
- * +----------------------+------------------------+-----------------------+
+ * +----------------------+------------------------+---------------------+-----------------------+
+ * | Frame Type - 4 bytes | Length (len) - 4 bytes | Timestamp - 8 bytes | Message - 'len' bytes |
+ * +----------------------+------------------------+---------------------+-----------------------+
  * }
  * 
* * The first 4 bytes indicate the type of the frame - log frames have a type defined as the hex value 0xa55a0001. The - * second 4 bytes should indicate the message's length. The next 'len' bytes contain the message. The byte order is - * big-endian. + * second 4 bytes should indicate the message's length. The next 8 bytes contain UNIX timestamp of the message in + * microsecond accuracy. The next 'len' bytes contain the message. The byte order is big-endian. */ public class FramedTelemetryLogSink implements LogSink { - private static final int HEADER_LENGTH = 8; + private static final int HEADER_LENGTH = 16; private final FileOutputStream logOutputStream; private final ByteBuffer headerBuf; @@ -51,6 +52,11 @@ private void writeFrame(byte[] message) throws IOException { this.logOutputStream.write(message); } + private long timestamp() { + Instant instant = Instant.now(); + return instant.getEpochSecond() * 1_000_000 + instant.getNano() / 1000; // microsecond precision + } + /** * Updates the header ByteBuffer with the provided length. The header comprises the frame type and message length. */ @@ -58,6 +64,7 @@ private void updateHeader(int length) { this.headerBuf.clear(); this.headerBuf.putInt(FrameType.LOG.getValue()); this.headerBuf.putInt(length); + this.headerBuf.putLong(timestamp()); this.headerBuf.flip(); } diff --git a/aws-lambda-java-runtime-interface-client/src/test/java/com/amazonaws/services/lambda/runtime/api/client/logging/FramedTelemetryLogSinkTest.java b/aws-lambda-java-runtime-interface-client/src/test/java/com/amazonaws/services/lambda/runtime/api/client/logging/FramedTelemetryLogSinkTest.java index b5181b46..e8dbb73b 100644 --- a/aws-lambda-java-runtime-interface-client/src/test/java/com/amazonaws/services/lambda/runtime/api/client/logging/FramedTelemetryLogSinkTest.java +++ b/aws-lambda-java-runtime-interface-client/src/test/java/com/amazonaws/services/lambda/runtime/api/client/logging/FramedTelemetryLogSinkTest.java @@ -13,6 +13,7 @@ import java.nio.ByteBuffer; import java.nio.channels.ReadableByteChannel; import java.nio.file.Path; +import java.time.Instant; import java.util.Arrays; import static org.junit.jupiter.api.Assertions.assertArrayEquals; @@ -24,6 +25,11 @@ public class FramedTelemetryLogSinkTest { private static final int DEFAULT_BUFFER_SIZE = 256; private static final byte ZERO_BYTE = (byte) 0; + private long timestamp() { + Instant instant = Instant.now(); + return instant.getEpochSecond() * 1_000_000 + instant.getNano() / 1000; + } + @TempDir public Path tmpFolder; @@ -33,9 +39,11 @@ public void logSingleFrame() throws IOException { File tmpFile = tmpFolder.resolve("pipe").toFile(); FileOutputStream fos = new FileOutputStream(tmpFile); FileDescriptor fd = fos.getFD(); + long before = timestamp(); try (FramedTelemetryLogSink logSink = new FramedTelemetryLogSink(fd)) { logSink.log(message); } + long after = timestamp(); ByteBuffer buf = ByteBuffer.allocate(DEFAULT_BUFFER_SIZE); ReadableByteChannel readChannel = new FileInputStream(tmpFile).getChannel(); @@ -52,6 +60,11 @@ public void logSingleFrame() throws IOException { int len = buf.getInt(); assertEquals(message.length, len); + // next 8 bytes should indicate the timestamp + long timestamp = buf.getLong(); + assertTrue(before <= timestamp); + assertTrue(timestamp <= after); + // use `len` to allocate a byte array to read the logged message into byte[] actual = new byte[len]; buf.get(actual); @@ -69,10 +82,12 @@ public void logMultipleFrames() throws IOException { File tmpFile = tmpFolder.resolve("pipe").toFile(); FileOutputStream fos = new FileOutputStream(tmpFile); FileDescriptor fd = fos.getFD(); + long before = timestamp(); try (FramedTelemetryLogSink logSink = new FramedTelemetryLogSink(fd)) { logSink.log(firstMessage); logSink.log(secondMessage); } + long after = timestamp(); ByteBuffer buf = ByteBuffer.allocate(DEFAULT_BUFFER_SIZE); ReadableByteChannel readChannel = new FileInputStream(tmpFile).getChannel(); @@ -90,6 +105,11 @@ public void logMultipleFrames() throws IOException { int len = buf.getInt(); assertEquals(message.length, len); + // next 8 bytes should indicate the timestamp + long timestamp = buf.getLong(); + assertTrue(before <= timestamp); + assertTrue(timestamp <= after); + // use `len` to allocate a byte array to read the logged message into byte[] actual = new byte[len]; buf.get(actual); @@ -125,7 +145,7 @@ public void interruptedThread() throws IOException { FileInputStream logInputStream = new FileInputStream(tmpFile); int readBytes = logInputStream.read(buffer); - int headerSizeBytes = 8; // message type (4 bytes) + len (4 bytes) + int headerSizeBytes = 16; // message type (4 bytes) + len (4 bytes) + timestamp (8 bytes) int expectedBytes = headerSizeBytes + message.length; assertEquals(expectedBytes, readBytes);