Skip to content

Commit 04ef611

Browse files
authored
Support sockets as transport for framed telemetry (#378)
Sockets cannot be re-opened (unlike pipes and regular files). We have to reflectively construct the `FileDescriptor` with the right number as it does not have a constructor that takes int.
1 parent 8696626 commit 04ef611

File tree

3 files changed

+36
-11
lines changed

3 files changed

+36
-11
lines changed

aws-lambda-java-runtime-interface-client/src/main/java/com/amazonaws/services/lambda/runtime/api/client/AWSLambda.java

+22-5
Original file line numberDiff line numberDiff line change
@@ -24,10 +24,12 @@
2424

2525
import java.io.ByteArrayOutputStream;
2626
import java.io.File;
27+
import java.io.FileDescriptor;
2728
import java.io.FileInputStream;
2829
import java.io.IOException;
2930
import java.io.OutputStream;
3031
import java.io.PrintStream;
32+
import java.lang.reflect.Constructor;
3133
import java.lang.reflect.Method;
3234
import java.net.URLClassLoader;
3335
import java.nio.file.Paths;
@@ -169,16 +171,31 @@ public static String getEnvOrExit(String envVariableName) {
169171

170172
protected static URLClassLoader customerClassLoader;
171173

174+
/**
175+
* convert an integer into a FileDescriptor object using reflection to access private members.
176+
*/
177+
private static FileDescriptor intToFd(int fd) throws RuntimeException {
178+
try {
179+
Class<FileDescriptor> clazz = FileDescriptor.class;
180+
Constructor<FileDescriptor> c = clazz.getDeclaredConstructor(new Class<?>[] { Integer.TYPE });
181+
c.setAccessible(true);
182+
return c.newInstance(new Integer(fd));
183+
} catch(Exception e) {
184+
throw new RuntimeException(e);
185+
}
186+
}
187+
172188
private static LogSink createLogSink() {
173-
final String fd = System.getenv("_LAMBDA_TELEMETRY_LOG_FD");
174-
if(fd == null) {
189+
final String fdStr = System.getenv("_LAMBDA_TELEMETRY_LOG_FD");
190+
if(fdStr == null) {
175191
return new StdOutLogSink();
176192
}
177193

178194
try {
179-
File pipeFdFile = Paths.get("/proc", "self", "fd", fd).toFile();
180-
return new FramedTelemetryLogSink(pipeFdFile);
181-
} catch (IOException e) {
195+
int fdInt = Integer.parseInt(fdStr);
196+
FileDescriptor fd = intToFd(fdInt);
197+
return new FramedTelemetryLogSink(fd);
198+
} catch (Exception e) {
182199
return new StdOutLogSink();
183200
}
184201
}

aws-lambda-java-runtime-interface-client/src/main/java/com/amazonaws/services/lambda/runtime/api/client/logging/FramedTelemetryLogSink.java

+3-3
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
package com.amazonaws.services.lambda.runtime.api.client.logging;
44

5-
import java.io.File;
5+
import java.io.FileDescriptor;
66
import java.io.FileOutputStream;
77
import java.io.IOException;
88
import java.nio.ByteBuffer;
@@ -31,8 +31,8 @@ public class FramedTelemetryLogSink implements LogSink {
3131
private final FileOutputStream logOutputStream;
3232
private final ByteBuffer headerBuf;
3333

34-
public FramedTelemetryLogSink(File file) throws IOException {
35-
this.logOutputStream = new FileOutputStream(file);
34+
public FramedTelemetryLogSink(FileDescriptor fd) throws IOException {
35+
this.logOutputStream = new FileOutputStream(fd);
3636
this.headerBuf = ByteBuffer.allocate(HEADER_LENGTH).order(ByteOrder.BIG_ENDIAN);
3737
}
3838

aws-lambda-java-runtime-interface-client/src/test/java/com/amazonaws/services/lambda/runtime/api/client/logging/FramedTelemetryLogSinkTest.java

+11-3
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,9 @@
66
import org.junit.jupiter.api.io.TempDir;
77

88
import java.io.File;
9+
import java.io.FileDescriptor;
910
import java.io.FileInputStream;
11+
import java.io.FileOutputStream;
1012
import java.io.IOException;
1113
import java.nio.ByteBuffer;
1214
import java.nio.channels.ReadableByteChannel;
@@ -29,7 +31,9 @@ public class FramedTelemetryLogSinkTest {
2931
public void logSingleFrame() throws IOException {
3032
byte[] message = "hello world\nsomething on a new line!\n".getBytes();
3133
File tmpFile = tmpFolder.resolve("pipe").toFile();
32-
try (FramedTelemetryLogSink logSink = new FramedTelemetryLogSink(tmpFile)) {
34+
FileOutputStream fos = new FileOutputStream(tmpFile);
35+
FileDescriptor fd = fos.getFD();
36+
try (FramedTelemetryLogSink logSink = new FramedTelemetryLogSink(fd)) {
3337
logSink.log(message);
3438
}
3539

@@ -63,7 +67,9 @@ public void logMultipleFrames() throws IOException {
6367
byte[] firstMessage = "hello world\nsomething on a new line!".getBytes();
6468
byte[] secondMessage = "hello again\nhere's another message\n".getBytes();
6569
File tmpFile = tmpFolder.resolve("pipe").toFile();
66-
try (FramedTelemetryLogSink logSink = new FramedTelemetryLogSink(tmpFile)) {
70+
FileOutputStream fos = new FileOutputStream(tmpFile);
71+
FileDescriptor fd = fos.getFD();
72+
try (FramedTelemetryLogSink logSink = new FramedTelemetryLogSink(fd)) {
6773
logSink.log(firstMessage);
6874
logSink.log(secondMessage);
6975
}
@@ -107,7 +113,9 @@ public void interruptedThread() throws IOException {
107113
try {
108114
byte[] message = "hello world\nsomething on a new line!\n".getBytes();
109115
File tmpFile = tmpFolder.resolve("pipe").toFile();
110-
try (FramedTelemetryLogSink logSink = new FramedTelemetryLogSink(tmpFile)) {
116+
FileOutputStream fos = new FileOutputStream(tmpFile);
117+
FileDescriptor fd = fos.getFD();
118+
try (FramedTelemetryLogSink logSink = new FramedTelemetryLogSink(fd)) {
111119
Thread.currentThread().interrupt();
112120

113121
logSink.log(message);

0 commit comments

Comments
 (0)