Skip to content

Commit a109eca

Browse files
thenewguy39Rupayan Ghoshsmirnoal
authored
add timestamps to tlv (#393)
Co-authored-by: Rupayan Ghosh <[email protected]> Co-authored-by: Alexander Smirnov <[email protected]>
1 parent efc835e commit a109eca

File tree

3 files changed

+35
-8
lines changed

3 files changed

+35
-8
lines changed

aws-lambda-java-runtime-interface-client/src/main/java/com/amazonaws/services/lambda/runtime/api/client/logging/FrameType.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44

55
public enum FrameType {
66

7-
LOG(0xa55a0001);
7+
LOG(0xa55a0003);
88

99
private final int val;
1010

aws-lambda-java-runtime-interface-client/src/main/java/com/amazonaws/services/lambda/runtime/api/client/logging/FramedTelemetryLogSink.java

+13-6
Original file line numberDiff line numberDiff line change
@@ -7,26 +7,27 @@
77
import java.io.IOException;
88
import java.nio.ByteBuffer;
99
import java.nio.ByteOrder;
10+
import java.time.Instant;
1011

1112
/**
1213
* FramedTelemetryLogSink implements the logging contract between runtimes and the platform. It implements a simple
1314
* framing protocol so message boundaries can be determined. Each frame can be visualized as follows:
1415
*
1516
* <pre>
1617
* {@code
17-
* +----------------------+------------------------+-----------------------+
18-
* | Frame Type - 4 bytes | Length (len) - 4 bytes | Message - 'len' bytes |
19-
* +----------------------+------------------------+-----------------------+
18+
* +----------------------+------------------------+---------------------+-----------------------+
19+
* | Frame Type - 4 bytes | Length (len) - 4 bytes | Timestamp - 8 bytes | Message - 'len' bytes |
20+
* +----------------------+------------------------+---------------------+-----------------------+
2021
* }
2122
* </pre>
2223
*
2324
* The first 4 bytes indicate the type of the frame - log frames have a type defined as the hex value 0xa55a0001. The
24-
* second 4 bytes should indicate the message's length. The next 'len' bytes contain the message. The byte order is
25-
* big-endian.
25+
* second 4 bytes should indicate the message's length. The next 8 bytes contain UNIX timestamp of the message in
26+
* microsecond accuracy. The next 'len' bytes contain the message. The byte order is big-endian.
2627
*/
2728
public class FramedTelemetryLogSink implements LogSink {
2829

29-
private static final int HEADER_LENGTH = 8;
30+
private static final int HEADER_LENGTH = 16;
3031

3132
private final FileOutputStream logOutputStream;
3233
private final ByteBuffer headerBuf;
@@ -51,13 +52,19 @@ private void writeFrame(byte[] message) throws IOException {
5152
this.logOutputStream.write(message);
5253
}
5354

55+
private long timestamp() {
56+
Instant instant = Instant.now();
57+
return instant.getEpochSecond() * 1_000_000 + instant.getNano() / 1000; // microsecond precision
58+
}
59+
5460
/**
5561
* Updates the header ByteBuffer with the provided length. The header comprises the frame type and message length.
5662
*/
5763
private void updateHeader(int length) {
5864
this.headerBuf.clear();
5965
this.headerBuf.putInt(FrameType.LOG.getValue());
6066
this.headerBuf.putInt(length);
67+
this.headerBuf.putLong(timestamp());
6168
this.headerBuf.flip();
6269
}
6370

aws-lambda-java-runtime-interface-client/src/test/java/com/amazonaws/services/lambda/runtime/api/client/logging/FramedTelemetryLogSinkTest.java

+21-1
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
import java.nio.ByteBuffer;
1414
import java.nio.channels.ReadableByteChannel;
1515
import java.nio.file.Path;
16+
import java.time.Instant;
1617
import java.util.Arrays;
1718

1819
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
@@ -24,6 +25,11 @@ public class FramedTelemetryLogSinkTest {
2425
private static final int DEFAULT_BUFFER_SIZE = 256;
2526
private static final byte ZERO_BYTE = (byte) 0;
2627

28+
private long timestamp() {
29+
Instant instant = Instant.now();
30+
return instant.getEpochSecond() * 1_000_000 + instant.getNano() / 1000;
31+
}
32+
2733
@TempDir
2834
public Path tmpFolder;
2935

@@ -33,9 +39,11 @@ public void logSingleFrame() throws IOException {
3339
File tmpFile = tmpFolder.resolve("pipe").toFile();
3440
FileOutputStream fos = new FileOutputStream(tmpFile);
3541
FileDescriptor fd = fos.getFD();
42+
long before = timestamp();
3643
try (FramedTelemetryLogSink logSink = new FramedTelemetryLogSink(fd)) {
3744
logSink.log(message);
3845
}
46+
long after = timestamp();
3947

4048
ByteBuffer buf = ByteBuffer.allocate(DEFAULT_BUFFER_SIZE);
4149
ReadableByteChannel readChannel = new FileInputStream(tmpFile).getChannel();
@@ -52,6 +60,11 @@ public void logSingleFrame() throws IOException {
5260
int len = buf.getInt();
5361
assertEquals(message.length, len);
5462

63+
// next 8 bytes should indicate the timestamp
64+
long timestamp = buf.getLong();
65+
assertTrue(before <= timestamp);
66+
assertTrue(timestamp <= after);
67+
5568
// use `len` to allocate a byte array to read the logged message into
5669
byte[] actual = new byte[len];
5770
buf.get(actual);
@@ -69,10 +82,12 @@ public void logMultipleFrames() throws IOException {
6982
File tmpFile = tmpFolder.resolve("pipe").toFile();
7083
FileOutputStream fos = new FileOutputStream(tmpFile);
7184
FileDescriptor fd = fos.getFD();
85+
long before = timestamp();
7286
try (FramedTelemetryLogSink logSink = new FramedTelemetryLogSink(fd)) {
7387
logSink.log(firstMessage);
7488
logSink.log(secondMessage);
7589
}
90+
long after = timestamp();
7691

7792
ByteBuffer buf = ByteBuffer.allocate(DEFAULT_BUFFER_SIZE);
7893
ReadableByteChannel readChannel = new FileInputStream(tmpFile).getChannel();
@@ -90,6 +105,11 @@ public void logMultipleFrames() throws IOException {
90105
int len = buf.getInt();
91106
assertEquals(message.length, len);
92107

108+
// next 8 bytes should indicate the timestamp
109+
long timestamp = buf.getLong();
110+
assertTrue(before <= timestamp);
111+
assertTrue(timestamp <= after);
112+
93113
// use `len` to allocate a byte array to read the logged message into
94114
byte[] actual = new byte[len];
95115
buf.get(actual);
@@ -125,7 +145,7 @@ public void interruptedThread() throws IOException {
125145
FileInputStream logInputStream = new FileInputStream(tmpFile);
126146
int readBytes = logInputStream.read(buffer);
127147

128-
int headerSizeBytes = 8; // message type (4 bytes) + len (4 bytes)
148+
int headerSizeBytes = 16; // message type (4 bytes) + len (4 bytes) + timestamp (8 bytes)
129149
int expectedBytes = headerSizeBytes + message.length;
130150

131151
assertEquals(expectedBytes, readBytes);

0 commit comments

Comments
 (0)