Skip to content

Commit 518881e

Browse files
authored
Merge pull request #2641 from dsyer/portforward
Fix resource lifecycle issues in KubectlPortForward
2 parents 0a7627d + 5fadb77 commit 518881e

File tree

1 file changed

+28
-28
lines changed

1 file changed

+28
-28
lines changed

extended/src/main/java/io/kubernetes/client/extended/kubectl/KubectlPortForward.java

Lines changed: 28 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import java.net.ServerSocket;
2525
import java.net.Socket;
2626
import java.util.ArrayList;
27+
import java.util.Arrays;
2728
import java.util.List;
2829
import java.util.Optional;
2930
import java.util.function.Consumer;
@@ -78,47 +79,46 @@ public void shutdown() {
7879
private void executeInternal()
7980
throws ApiException, KubectlException, IOException, InterruptedException {
8081
PortForward pf = new PortForward(apiClient);
81-
PortForward.PortForwardResult result = pf.forward(namespace, name, targetPorts);
82-
if (result == null) {
83-
throw new KubectlException("PortForward failed!");
84-
}
8582
// TODO: Convert this to NIO to reduce the number of threads?
8683
List<Thread> threads = new ArrayList<>();
8784
for (int i = 0; i < localPorts.size(); i++) {
8885
int targetPort = targetPorts.get(i);
8986
threads.add(
90-
portForward(
91-
new ServerSocket(localPorts.get(i)),
92-
result.getInputStream(targetPort),
93-
result.getOutboundStream(targetPort)));
87+
portForward(pf,
88+
new ServerSocket(localPorts.get(i)), targetPort));
9489
}
9590
for (Thread t : threads) {
9691
t.join();
9792
}
9893
}
9994

100-
private Thread portForward(ServerSocket server, InputStream in, OutputStream out) {
101-
Thread t =
102-
new Thread(
103-
new Runnable() {
104-
@Override
105-
public void run() {
106-
while (running) {
107-
try {
108-
Socket sock = server.accept();
109-
Thread t1 = copyAsync(sock.getInputStream(), out, onUnhandledError);
110-
Thread t2 = copyAsync(in, sock.getOutputStream(), onUnhandledError);
111-
112-
t1.join();
113-
t2.join();
114-
} catch (InterruptedException | IOException ex) {
115-
Optional.ofNullable(onUnhandledError)
116-
.orElse(Throwable::printStackTrace)
117-
.accept(ex);
118-
}
95+
private Thread portForward(PortForward pf, ServerSocket server, int targetPort) {
96+
Thread t = new Thread(
97+
new Runnable() {
98+
@Override
99+
public void run() {
100+
while (running) {
101+
try (Socket sock = server.accept()) {
102+
PortForward.PortForwardResult result = pf.forward(namespace, name, Arrays.asList(targetPort));
103+
if (result == null) {
104+
throw new KubectlException("PortForward failed!");
119105
}
106+
InputStream in = result.getInputStream(targetPort);
107+
OutputStream out = result.getOutboundStream(targetPort);
108+
Thread t1 = copyAsync(sock.getInputStream(), out, onUnhandledError);
109+
Thread t2 = copyAsync(in, sock.getOutputStream(), onUnhandledError);
110+
111+
t1.join();
112+
in.close();
113+
t2.join();
114+
} catch (Exception ex) {
115+
Optional.ofNullable(onUnhandledError)
116+
.orElse(Throwable::printStackTrace)
117+
.accept(ex);
120118
}
121-
});
119+
}
120+
}
121+
});
122122
t.start();
123123
return t;
124124
}

0 commit comments

Comments
 (0)