Skip to content

Commit 61649ec

Browse files
Merge pull request #94 from awslabs/caowei
Fix TCP client seeing "already connected" error when resend message after having exception in the first send
2 parents 1a7dd30 + 93d156f commit 61649ec

File tree

4 files changed

+53
-3
lines changed

4 files changed

+53
-3
lines changed

README.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -324,11 +324,12 @@ We have 2 different types of tests:
324324
```sh
325325
export AWS_ACCESS_KEY_ID=YOUR_ACCESS_KEY_ID
326326
export AWS_SECRET_ACCESS_KEY=YOUR_ACCESS_KEY
327+
export AWS_SESSION_TOKEN=YOUR_AWS_SESSION_TOKEN
327328
export AWS_REGION=us-west-2
328329
./gradlew integ
329330
```
330331

331-
**NOTE**: You need to replace the access key id and access key with your own AWS credentials.
332+
**NOTE**: You need to replace the access key id and access key with your own AWS credentials. Another option is using IAM user access key pair without session token.
332333

333334
### Formatting
334335

bin/start-agent.sh

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
# usage:
77
# export AWS_ACCESS_KEY_ID=
88
# export AWS_SECRET_ACCESS_KEY=
9+
# export AWS_SESSION_TOKEN=
910
# export AWS_REGION=us-west-2
1011
# ./start-agent.sh
1112

@@ -22,6 +23,7 @@ pushd $rootdir/src/integration-test/resources/agent
2223
echo "[AmazonCloudWatchAgent]
2324
aws_access_key_id = $AWS_ACCESS_KEY_ID
2425
aws_secret_access_key = $AWS_SECRET_ACCESS_KEY
26+
aws_session_token = $AWS_SESSION_TOKEN
2527
" > ./.aws/credentials
2628

2729
echo "[profile AmazonCloudWatchAgent]
@@ -33,5 +35,6 @@ docker run -p 25888:25888/udp -p 25888:25888/tcp \
3335
-e AWS_REGION=$AWS_REGION \
3436
-e AWS_ACCESS_KEY_ID=$AWS_ACCESS_KEY_ID \
3537
-e AWS_SECRET_ACCESS_KEY=$AWS_SECRET_ACCESS_KEY \
38+
-e AWS_SESSION_TOKEN=$AWS_SESSION_TOKEN \
3639
agent:latest &> $tempfile &
3740
popd

src/main/java/software/amazon/cloudwatchlogs/emf/sinks/TCPClient.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,12 +31,12 @@ public class TCPClient implements SocketClient {
3131
private boolean shouldConnect = true;
3232

3333
public TCPClient(Endpoint endpoint) {
34-
socket = createSocket();
3534
this.endpoint = endpoint;
3635
}
3736

3837
private void connect() {
3938
try {
39+
socket = createSocket();
4040
socket.connect(new InetSocketAddress(endpoint.getHost(), endpoint.getPort()));
4141
shouldConnect = false;
4242
} catch (Exception e) {
@@ -51,7 +51,7 @@ protected Socket createSocket() {
5151

5252
@Override
5353
public synchronized void sendMessage(String message) {
54-
if (socket.isClosed() || shouldConnect) {
54+
if (socket == null || socket.isClosed() || shouldConnect) {
5555
connect();
5656
}
5757

src/test/java/software/amazon/cloudwatchlogs/emf/sinks/TCPClientTest.java

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,4 +48,50 @@ protected Socket createSocket() {
4848

4949
assertEquals(bos.toString(), message);
5050
}
51+
52+
@Test
53+
public void testSendMessageWithGetOSException_THEN_createSocketTwice() throws IOException {
54+
Socket socket = mock(Socket.class);
55+
doNothing().when(socket).connect(any());
56+
when(socket.getOutputStream()).thenThrow(IOException.class);
57+
58+
Endpoint endpoint = Endpoint.DEFAULT_TCP_ENDPOINT;
59+
TCPClient client =
60+
new TCPClient(endpoint) {
61+
@Override
62+
protected Socket createSocket() {
63+
return socket;
64+
}
65+
};
66+
67+
TCPClient spyClient = spy(client);
68+
69+
String message = "Test message";
70+
spyClient.sendMessage(message);
71+
verify(spyClient, atLeast(2)).createSocket();
72+
}
73+
74+
@Test
75+
public void testSendMessageWithWriteOSException_THEN_createSocketTwice() throws IOException {
76+
Socket socket = mock(Socket.class);
77+
doNothing().when(socket).connect(any());
78+
ByteArrayOutputStream bos = mock(ByteArrayOutputStream.class);
79+
when(socket.getOutputStream()).thenReturn(bos);
80+
doThrow(IOException.class).when(bos).write(any());
81+
82+
Endpoint endpoint = Endpoint.DEFAULT_TCP_ENDPOINT;
83+
TCPClient client =
84+
new TCPClient(endpoint) {
85+
@Override
86+
protected Socket createSocket() {
87+
return socket;
88+
}
89+
};
90+
91+
TCPClient spyClient = spy(client);
92+
93+
String message = "Test message";
94+
spyClient.sendMessage(message);
95+
verify(spyClient, atLeast(2)).createSocket();
96+
}
5197
}

0 commit comments

Comments
 (0)