Skip to content

Emit multi-line logs with timestamps #393

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Jan 6, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

public enum FrameType {

LOG(0xa55a0001);
LOG(0xa55a0003);

private final int val;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,26 +7,27 @@
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.time.Instant;

/**
* FramedTelemetryLogSink implements the logging contract between runtimes and the platform. It implements a simple
* framing protocol so message boundaries can be determined. Each frame can be visualized as follows:
*
* <pre>
* {@code
* +----------------------+------------------------+-----------------------+
* | Frame Type - 4 bytes | Length (len) - 4 bytes | Message - 'len' bytes |
* +----------------------+------------------------+-----------------------+
* +----------------------+------------------------+---------------------+-----------------------+
* | Frame Type - 4 bytes | Length (len) - 4 bytes | Timestamp - 8 bytes | Message - 'len' bytes |
* +----------------------+------------------------+---------------------+-----------------------+
* }
* </pre>
*
* The first 4 bytes indicate the type of the frame - log frames have a type defined as the hex value 0xa55a0001. The
* second 4 bytes should indicate the message's length. The next 'len' bytes contain the message. The byte order is
* big-endian.
* second 4 bytes should indicate the message's length. The next 8 bytes contain UNIX timestamp of the message in
* microsecond accuracy. The next 'len' bytes contain the message. The byte order is big-endian.
*/
public class FramedTelemetryLogSink implements LogSink {

private static final int HEADER_LENGTH = 8;
private static final int HEADER_LENGTH = 16;

private final FileOutputStream logOutputStream;
private final ByteBuffer headerBuf;
Expand All @@ -51,13 +52,19 @@ private void writeFrame(byte[] message) throws IOException {
this.logOutputStream.write(message);
}

private long timestamp() {
Instant instant = Instant.now();
return instant.getEpochSecond() * 1_000_000 + instant.getNano() / 1000; // microsecond precision
}

/**
* Updates the header ByteBuffer with the provided length. The header comprises the frame type and message length.
*/
private void updateHeader(int length) {
this.headerBuf.clear();
this.headerBuf.putInt(FrameType.LOG.getValue());
this.headerBuf.putInt(length);
this.headerBuf.putLong(timestamp());
this.headerBuf.flip();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import java.nio.ByteBuffer;
import java.nio.channels.ReadableByteChannel;
import java.nio.file.Path;
import java.time.Instant;
import java.util.Arrays;

import static org.junit.jupiter.api.Assertions.assertArrayEquals;
Expand All @@ -24,6 +25,11 @@ public class FramedTelemetryLogSinkTest {
private static final int DEFAULT_BUFFER_SIZE = 256;
private static final byte ZERO_BYTE = (byte) 0;

private long timestamp() {
Instant instant = Instant.now();
return instant.getEpochSecond() * 1_000_000 + instant.getNano() / 1000;
}

@TempDir
public Path tmpFolder;

Expand All @@ -33,9 +39,11 @@ public void logSingleFrame() throws IOException {
File tmpFile = tmpFolder.resolve("pipe").toFile();
FileOutputStream fos = new FileOutputStream(tmpFile);
FileDescriptor fd = fos.getFD();
long before = timestamp();
try (FramedTelemetryLogSink logSink = new FramedTelemetryLogSink(fd)) {
logSink.log(message);
}
long after = timestamp();

ByteBuffer buf = ByteBuffer.allocate(DEFAULT_BUFFER_SIZE);
ReadableByteChannel readChannel = new FileInputStream(tmpFile).getChannel();
Expand All @@ -52,6 +60,11 @@ public void logSingleFrame() throws IOException {
int len = buf.getInt();
assertEquals(message.length, len);

// next 8 bytes should indicate the timestamp
long timestamp = buf.getLong();
assertTrue(before <= timestamp);
assertTrue(timestamp <= after);

// use `len` to allocate a byte array to read the logged message into
byte[] actual = new byte[len];
buf.get(actual);
Expand All @@ -69,10 +82,12 @@ public void logMultipleFrames() throws IOException {
File tmpFile = tmpFolder.resolve("pipe").toFile();
FileOutputStream fos = new FileOutputStream(tmpFile);
FileDescriptor fd = fos.getFD();
long before = timestamp();
try (FramedTelemetryLogSink logSink = new FramedTelemetryLogSink(fd)) {
logSink.log(firstMessage);
logSink.log(secondMessage);
}
long after = timestamp();

ByteBuffer buf = ByteBuffer.allocate(DEFAULT_BUFFER_SIZE);
ReadableByteChannel readChannel = new FileInputStream(tmpFile).getChannel();
Expand All @@ -90,6 +105,11 @@ public void logMultipleFrames() throws IOException {
int len = buf.getInt();
assertEquals(message.length, len);

// next 8 bytes should indicate the timestamp
long timestamp = buf.getLong();
assertTrue(before <= timestamp);
assertTrue(timestamp <= after);

// use `len` to allocate a byte array to read the logged message into
byte[] actual = new byte[len];
buf.get(actual);
Expand Down Expand Up @@ -125,7 +145,7 @@ public void interruptedThread() throws IOException {
FileInputStream logInputStream = new FileInputStream(tmpFile);
int readBytes = logInputStream.read(buffer);

int headerSizeBytes = 8; // message type (4 bytes) + len (4 bytes)
int headerSizeBytes = 16; // message type (4 bytes) + len (4 bytes) + timestamp (8 bytes)
int expectedBytes = headerSizeBytes + message.length;

assertEquals(expectedBytes, readBytes);
Expand Down