Skip to content

Commit f912aad

Browse files
garyrussellartembilan
authored andcommitted
INT-4465: Fix delay in close propagation with NIO
JIRA: https://jira.spring.io/browse/INT-4465 There is a one second delay before a socket close is propagated if there is an active assembler. This is generally only a problem with deserializers that use EOF to signal message end (such as the `ByteArrayElasticRawDeserializer`). Attempt to insert an EOF marker into the buffer queue so that the `getNextBuffer()` will exit immediately on `close()` if it is blocked awaiting a buffer. **cherry-pick to 5.0.x, 4.3.x** (cherry picked from commit d9186f1) # Conflicts: # spring-integration-ip/src/test/java/org/springframework/integration/ip/tcp/connection/TcpNioConnectionTests.java
1 parent c625f9e commit f912aad

File tree

2 files changed

+54
-2
lines changed

2 files changed

+54
-2
lines changed

spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/connection/TcpNioConnection.java

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2016 the original author or authors.
2+
* Copyright 2002-2018 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -58,6 +58,8 @@ public class TcpNioConnection extends TcpConnectionSupport {
5858

5959
private static final long DEFAULT_PIPE_TIMEOUT = 60000;
6060

61+
private static final byte[] EOF = new byte[0]; // EOF marker buffer
62+
6163
private final SocketChannel socketChannel;
6264

6365
private final ChannelOutputStream channelOutputStream;
@@ -703,7 +705,7 @@ private byte[] getNextBuffer() throws IOException {
703705
while (buffer == null) {
704706
try {
705707
buffer = this.buffers.poll(1, TimeUnit.SECONDS);
706-
if (buffer == null && this.isClosed) {
708+
if (buffer == EOF || (buffer == null && this.isClosed)) {
707709
return null;
708710
}
709711
}
@@ -747,6 +749,12 @@ public void write(ByteBuffer byteBuffer) throws IOException {
747749
public void close() throws IOException {
748750
super.close();
749751
this.isClosed = true;
752+
try {
753+
this.buffers.offer(EOF, TcpNioConnection.this.pipeTimeout, TimeUnit.SECONDS);
754+
}
755+
catch (InterruptedException e) {
756+
Thread.currentThread().interrupt();
757+
}
750758
}
751759

752760
@Override

spring-integration-ip/src/test/java/org/springframework/integration/ip/tcp/connection/TcpNioConnectionTests.java

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
package org.springframework.integration.ip.tcp.connection;
1818

1919
import static org.hamcrest.Matchers.containsString;
20+
import static org.hamcrest.Matchers.lessThan;
2021
import static org.hamcrest.Matchers.not;
2122
import static org.junit.Assert.assertEquals;
2223
import static org.junit.Assert.assertNotNull;
@@ -65,6 +66,7 @@
6566
import org.apache.commons.logging.Log;
6667
import org.apache.commons.logging.LogFactory;
6768
import org.apache.log4j.Level;
69+
import org.junit.Ignore;
6870
import org.junit.Rule;
6971
import org.junit.Test;
7072
import org.junit.rules.TestName;
@@ -91,6 +93,7 @@
9193
import org.springframework.util.ReflectionUtils;
9294
import org.springframework.util.ReflectionUtils.FieldCallback;
9395
import org.springframework.util.ReflectionUtils.FieldFilter;
96+
import org.springframework.util.StopWatch;
9497

9598

9699
/**
@@ -129,6 +132,7 @@ public void run() {
129132
Socket s = server.accept();
130133
// block so we fill the buffer
131134
done.await(10, TimeUnit.SECONDS);
135+
s.close();
132136
}
133137
catch (Exception e) {
134138
e.printStackTrace();
@@ -822,6 +826,46 @@ public Void answer(InvocationOnMock invocation) throws Throwable {
822826
factory.stop();
823827
}
824828

829+
@Test
830+
@Ignore // Timing is too short for CI/Travis
831+
public void testNoDelayOnClose() throws Exception {
832+
TcpNioServerConnectionFactory cf = new TcpNioServerConnectionFactory(0);
833+
final CountDownLatch reading = new CountDownLatch(1);
834+
final StopWatch watch = new StopWatch();
835+
cf.setDeserializer(is -> {
836+
reading.countDown();
837+
watch.start();
838+
is.read();
839+
is.read();
840+
watch.stop();
841+
return null;
842+
});
843+
cf.registerListener(m -> false);
844+
final CountDownLatch listening = new CountDownLatch(1);
845+
cf.setApplicationEventPublisher(new ApplicationEventPublisher() {
846+
847+
@Override
848+
public void publishEvent(Object e) {
849+
listening.countDown();
850+
}
851+
852+
@Override
853+
public void publishEvent(ApplicationEvent event) {
854+
publishEvent((Object) event);
855+
}
856+
857+
});
858+
cf.afterPropertiesSet();
859+
cf.start();
860+
assertTrue(listening.await(10, TimeUnit.SECONDS));
861+
Socket socket = SocketFactory.getDefault().createSocket("localhost", cf.getPort());
862+
socket.getOutputStream().write("x".getBytes());
863+
assertTrue(reading.await(10, TimeUnit.SECONDS));
864+
socket.close();
865+
cf.stop();
866+
assertThat(watch.getLastTaskTimeMillis(), lessThan(950L));
867+
}
868+
825869
private void readFully(InputStream is, byte[] buff) throws IOException {
826870
for (int i = 0; i < buff.length; i++) {
827871
buff[i] = (byte) is.read();

0 commit comments

Comments
 (0)