Skip to content

Commit 6391e4b

Browse files
garyrussellartembilan
authored andcommitted
Tcp Doc Polishing
Also remove some `this.` method calls.
1 parent f82c716 commit 6391e4b

File tree

3 files changed

+70
-52
lines changed

3 files changed

+70
-52
lines changed

spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/connection/TcpNetConnection.java

Lines changed: 18 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2001-2018 the original author or authors.
2+
* Copyright 2001-2019 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -81,7 +81,7 @@ public boolean isLongLived() {
8181
*/
8282
@Override
8383
public void close() {
84-
this.setNoReadErrorOnClose(true);
84+
setNoReadErrorOnClose(true);
8585
try {
8686
this.socket.close();
8787
}
@@ -111,8 +111,8 @@ public synchronized void send(Message<?> message) throws Exception {
111111
this.socketOutputStream.flush();
112112
}
113113
catch (Exception e) {
114-
this.publishConnectionExceptionEvent(new MessagingException(message, "Failed TCP serialization", e));
115-
this.closeConnection(true);
114+
publishConnectionExceptionEvent(new MessagingException(message, "Failed TCP serialization", e));
115+
closeConnection(true);
116116
throw e;
117117
}
118118
if (logger.isDebugEnabled()) {
@@ -122,7 +122,8 @@ public synchronized void send(Message<?> message) throws Exception {
122122

123123
@Override
124124
public Object getPayload() throws Exception {
125-
return this.getDeserializer().deserialize(inputStream());
125+
return getDeserializer()
126+
.deserialize(inputStream());
126127
}
127128

128129
@Override
@@ -172,16 +173,16 @@ protected InputStream inputStream() throws IOException {
172173
public void run() {
173174
boolean okToRun = true;
174175
if (logger.isDebugEnabled()) {
175-
logger.debug(this.getConnectionId() + " Reading...");
176+
logger.debug(getConnectionId() + " Reading...");
176177
}
177178
while (okToRun) {
178179
Message<?> message = null;
179180
try {
180-
message = this.getMapper().toMessage(this);
181+
message = getMapper().toMessage(this);
181182
this.lastRead = System.currentTimeMillis();
182183
}
183184
catch (Exception e) {
184-
this.publishConnectionExceptionEvent(e);
185+
publishConnectionExceptionEvent(e);
185186
if (handleReadException(e)) {
186187
okToRun = false;
187188
}
@@ -218,55 +219,55 @@ protected boolean handleReadException(Exception e) {
218219
* For client connections, we have to wait for 2 timeouts if the last
219220
* send was within the current timeout.
220221
*/
221-
if (!this.isServer() && e instanceof SocketTimeoutException) {
222+
if (!isServer() && e instanceof SocketTimeoutException) {
222223
long now = System.currentTimeMillis();
223224
try {
224225
int soTimeout = this.socket.getSoTimeout();
225226
if (now - this.lastSend < soTimeout && now - this.lastRead < soTimeout * 2) {
226227
doClose = false;
227228
}
228229
if (!doClose && logger.isDebugEnabled()) {
229-
logger.debug("Skipping a socket timeout because we have a recent send " + this.getConnectionId());
230+
logger.debug("Skipping a socket timeout because we have a recent send " + getConnectionId());
230231
}
231232
}
232233
catch (SocketException e1) {
233234
logger.error("Error accessing soTimeout", e1);
234235
}
235236
}
236237
if (doClose) {
237-
boolean noReadErrorOnClose = this.isNoReadErrorOnClose();
238+
boolean noReadErrorOnClose = isNoReadErrorOnClose();
238239
closeConnection(true);
239240
if (!(e instanceof SoftEndOfStreamException)) {
240241
if (e instanceof SocketTimeoutException) {
241242
if (logger.isDebugEnabled()) {
242-
logger.debug("Closed socket after timeout:" + this.getConnectionId());
243+
logger.debug("Closed socket after timeout:" + getConnectionId());
243244
}
244245
}
245246
else {
246247
if (noReadErrorOnClose) {
247248
if (logger.isTraceEnabled()) {
248249
logger.trace("Read exception " +
249-
this.getConnectionId(), e);
250+
getConnectionId(), e);
250251
}
251252
else if (logger.isDebugEnabled()) {
252253
logger.debug("Read exception " +
253-
this.getConnectionId() + " " +
254+
getConnectionId() + " " +
254255
e.getClass().getSimpleName() +
255256
":" + (e.getCause() != null ? e.getCause() + ":" : "") + e.getMessage());
256257
}
257258
}
258259
else if (logger.isTraceEnabled()) {
259260
logger.error("Read exception " +
260-
this.getConnectionId(), e);
261+
getConnectionId(), e);
261262
}
262263
else {
263264
logger.error("Read exception " +
264-
this.getConnectionId() + " " +
265+
getConnectionId() + " " +
265266
e.getClass().getSimpleName() +
266267
":" + (e.getCause() != null ? e.getCause() + ":" : "") + e.getMessage());
267268
}
268269
}
269-
this.sendExceptionToListener(e);
270+
sendExceptionToListener(e);
270271
}
271272
}
272273
return doClose;

spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/connection/TcpNioConnection.java

Lines changed: 37 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2018 the original author or authors.
2+
* Copyright 2002-2019 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -115,7 +115,7 @@ public void setPipeTimeout(long pipeTimeout) {
115115

116116
@Override
117117
public void close() {
118-
this.setNoReadErrorOnClose(true);
118+
setNoReadErrorOnClose(true);
119119
doClose();
120120
}
121121

@@ -144,19 +144,19 @@ public void send(Message<?> message) throws Exception {
144144
synchronized (this.socketChannel) {
145145
if (this.bufferedOutputStream == null) {
146146
int writeBufferSize = this.socketChannel.socket().getSendBufferSize();
147-
this.bufferedOutputStream = new BufferedOutputStream(this.getChannelOutputStream(),
147+
this.bufferedOutputStream = new BufferedOutputStream(getChannelOutputStream(),
148148
writeBufferSize > 0 ? writeBufferSize : 8192);
149149
}
150-
Object object = this.getMapper().fromMessage(message);
150+
Object object = getMapper().fromMessage(message);
151151
Assert.state(object != null, "Mapper mapped the message to 'null'.");
152152
this.lastSend = System.currentTimeMillis();
153153
try {
154-
((Serializer<Object>) this.getSerializer()).serialize(object, this.bufferedOutputStream);
154+
((Serializer<Object>) getSerializer()).serialize(object, this.bufferedOutputStream);
155155
this.bufferedOutputStream.flush();
156156
}
157157
catch (Exception e) {
158-
this.publishConnectionExceptionEvent(new MessagingException(message, "Failed TCP serialization", e));
159-
this.closeConnection(true);
158+
publishConnectionExceptionEvent(new MessagingException(message, "Failed TCP serialization", e));
159+
closeConnection(true);
160160
throw e;
161161
}
162162
if (logger.isDebugEnabled()) {
@@ -167,7 +167,8 @@ public void send(Message<?> message) throws Exception {
167167

168168
@Override
169169
public Object getPayload() throws Exception {
170-
return this.getDeserializer().deserialize(inputStream());
170+
return getDeserializer()
171+
.deserialize(inputStream());
171172
}
172173

173174
@Override
@@ -224,7 +225,7 @@ protected ByteBuffer allocate(int length) {
224225
@Override
225226
public void run() {
226227
if (logger.isTraceEnabled()) {
227-
logger.trace(this.getConnectionId() + " Nio message assembler running...");
228+
logger.trace(getConnectionId() + " Nio message assembler running...");
228229
}
229230
boolean moreDataAvailable = true;
230231
while (moreDataAvailable) {
@@ -242,8 +243,10 @@ public void run() {
242243
catch (RejectedExecutionException e) {
243244
this.executionControl.decrementAndGet();
244245
if (logger.isInfoEnabled()) {
245-
logger.info(getConnectionId() + " Insufficient threads in the assembler fixed thread pool; consider " +
246-
"increasing this task executor pool size; data avail: " + this.channelInputStream.available());
246+
logger.info(getConnectionId()
247+
+ " Insufficient threads in the assembler fixed thread pool; consider "
248+
+ "increasing this task executor pool size; data avail: "
249+
+ this.channelInputStream.available());
247250
}
248251
}
249252
}
@@ -259,24 +262,24 @@ public void run() {
259262
catch (Exception e) {
260263
if (logger.isTraceEnabled()) {
261264
logger.error("Read exception " +
262-
this.getConnectionId(), e);
265+
getConnectionId(), e);
263266
}
264-
else if (!this.isNoReadErrorOnClose()) {
267+
else if (!isNoReadErrorOnClose()) {
265268
logger.error("Read exception " +
266-
this.getConnectionId() + " " +
269+
getConnectionId() + " " +
267270
e.getClass().getSimpleName() +
268271
":" + e.getCause() + ":" + e.getMessage());
269272
}
270273
else {
271274
if (logger.isDebugEnabled()) {
272275
logger.debug("Read exception " +
273-
this.getConnectionId() + " " +
276+
getConnectionId() + " " +
274277
e.getClass().getSimpleName() +
275278
":" + e.getCause() + ":" + e.getMessage());
276279
}
277280
}
278-
this.closeConnection(true);
279-
this.sendExceptionToListener(e);
281+
closeConnection(true);
282+
sendExceptionToListener(e);
280283
return;
281284
}
282285
}
@@ -301,12 +304,13 @@ else if (!this.isNoReadErrorOnClose()) {
301304
}
302305
if (moreDataAvailable) {
303306
if (logger.isTraceEnabled()) {
304-
logger.trace(this.getConnectionId() + " Nio message assembler continuing...");
307+
logger.trace(getConnectionId() + " Nio message assembler continuing...");
305308
}
306309
}
307310
else {
308311
if (logger.isTraceEnabled()) {
309-
logger.trace(this.getConnectionId() + " Nio message assembler exiting... avail: " + this.channelInputStream.available());
312+
logger.trace(getConnectionId() + " Nio message assembler exiting... avail: "
313+
+ this.channelInputStream.available());
310314
}
311315
}
312316
}
@@ -355,13 +359,13 @@ private synchronized Message<?> convert() throws Exception {
355359
}
356360
Message<?> message = null;
357361
try {
358-
message = this.getMapper().toMessage(this);
362+
message = getMapper().toMessage(this);
359363
}
360364
catch (Exception e) {
361-
this.closeConnection(true);
365+
closeConnection(true);
362366
if (e instanceof SocketTimeoutException) {
363367
if (logger.isDebugEnabled()) {
364-
logger.debug("Closing socket after timeout " + this.getConnectionId());
368+
logger.debug("Closing socket after timeout " + getConnectionId());
365369
}
366370
}
367371
else {
@@ -420,7 +424,7 @@ private void doRead() throws Exception {
420424
int len = this.socketChannel.read(this.rawBuffer);
421425
if (len < 0) {
422426
this.writingToPipe = false;
423-
this.closeConnection(true);
427+
closeConnection(true);
424428
}
425429
if (logger.isTraceEnabled()) {
426430
logger.trace("After read:" + this.rawBuffer.position() + "/" + this.rawBuffer.limit());
@@ -432,13 +436,13 @@ private void doRead() throws Exception {
432436
if (logger.isDebugEnabled()) {
433437
logger.debug("Read " + this.rawBuffer.limit() + " into raw buffer");
434438
}
435-
this.sendToPipe(this.rawBuffer);
439+
sendToPipe(this.rawBuffer);
436440
}
437441
catch (RejectedExecutionException e) {
438442
throw e;
439443
}
440444
catch (Exception e) {
441-
this.publishConnectionExceptionEvent(e);
445+
publishConnectionExceptionEvent(e);
442446
throw e;
443447
}
444448
finally {
@@ -450,7 +454,7 @@ private void doRead() throws Exception {
450454
protected void sendToPipe(ByteBuffer rawBuffer) throws IOException {
451455
Assert.notNull(rawBuffer, "rawBuffer cannot be null");
452456
if (logger.isTraceEnabled()) {
453-
logger.trace(this.getConnectionId() + " Sending " + rawBuffer.limit() + " to pipe");
457+
logger.trace(getConnectionId() + " Sending " + rawBuffer.limit() + " to pipe");
454458
}
455459
this.channelInputStream.write(rawBuffer);
456460
rawBuffer.clear();
@@ -462,7 +466,7 @@ private void checkForAssembler() {
462466
// only execute run() if we don't already have one running
463467
this.executionControl.set(1);
464468
if (logger.isDebugEnabled()) {
465-
logger.debug(this.getConnectionId() + " Running an assembler");
469+
logger.debug(getConnectionId() + " Running an assembler");
466470
}
467471
try {
468472
this.taskExecutor.execute2(this);
@@ -487,25 +491,25 @@ private void checkForAssembler() {
487491
*/
488492
public void readPacket() {
489493
if (logger.isDebugEnabled()) {
490-
logger.debug(this.getConnectionId() + " Reading...");
494+
logger.debug(getConnectionId() + " Reading...");
491495
}
492496
try {
493497
doRead();
494498
}
495499
catch (ClosedChannelException cce) {
496500
if (logger.isDebugEnabled()) {
497-
logger.debug(this.getConnectionId() + " Channel is closed");
501+
logger.debug(getConnectionId() + " Channel is closed");
498502
}
499-
this.closeConnection(true);
503+
closeConnection(true);
500504
}
501505
catch (RejectedExecutionException e) {
502506
throw e;
503507
}
504508
catch (Exception e) {
505509
logger.error("Exception on Read " +
506-
this.getConnectionId() + " " +
510+
getConnectionId() + " " +
507511
e.getMessage(), e);
508-
this.closeConnection(true);
512+
closeConnection(true);
509513
}
510514
}
511515

@@ -514,7 +518,7 @@ public void readPacket() {
514518
*/
515519
void timeout() {
516520
this.timedOut = true;
517-
this.closeConnection(true);
521+
closeConnection(true);
518522
}
519523

520524
/**

src/reference/asciidoc/ip.adoc

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -291,6 +291,8 @@ public IntegrationFlow udpEchoUpcaseServer() {
291291
[[tcp-connection-factories]]
292292
=== TCP Connection Factories
293293

294+
==== Overview
295+
294296
For TCP, the configuration of the underlying connection is provided by using a connection factory.
295297
Two types of connection factory are provided: a client connection factory and a server connection factory.
296298
Client connection factories establish outgoing connections.
@@ -371,6 +373,8 @@ The following example shows a client connection factory that uses `java.net.Sock
371373
----
372374
====
373375

376+
==== Message Demarcation (Serializers and Deserializers)
377+
374378
TCP is a streaming protocol.
375379
This means that some structure has to be provided to data transported over TCP so that the receiver can demarcate the data into discrete messages.
376380
Connection factories are configured to use serializers and deserializers to convert between the message payload and the bits that are sent over TCP.
@@ -437,6 +441,8 @@ This is similar to the deserializer side of `ByteArrayRawSerializer` above, exce
437441
Internally, it uses a `ByteArrayOutputStream` that lets the buffer grow as needed.
438442
The client must close the socket in an orderly manner to signal end of message.
439443

444+
WARNING: This deserializer should only be used when the peer is trusted; it is susceptible to a DoS attach due to out of memory conditions.
445+
440446
The `MapJsonSerializer` uses a Jackson `ObjectMapper` to convert between a `Map` and JSON.
441447
You can use this serializer in conjunction with a `MessageConvertingTcpMessageMapper` and a `MapMessageConverter` to transfer selected headers and the payload in JSON.
442448

@@ -447,8 +453,6 @@ By default, a `ByteArrayLfSerializer` is used, resulting in messages with a form
447453
The final standard serializer is `org.springframework.core.serializer.DefaultSerializer`, which you can use to convert serializable objects with Java serialization.
448454
`org.springframework.core.serializer.DefaultDeserializer` is provided for inbound deserialization of streams that contain serializable objects.
449455

450-
To implement a custom serializer and deserializer pair, implement the `org.springframework.core.serializer.Deserializer` and `org.springframework.core.serializer.Serializer` interfaces.
451-
452456
If you do not wish to use the default serializer and deserializer (`ByteArrayCrLfSerializer`), you must set the `serializer` and `deserializer` attributes on the connection factory.
453457
The following example shows how to do so:
454458

@@ -480,6 +484,15 @@ NOTE: You can also modify the attributes of sockets and socket factories.
480484
See <<ssl-tls>>.
481485
As noted there, such modifications are possible whether or not SSL is being used.
482486

487+
==== Custom Serializers and Deserializers
488+
489+
If your data is not in a format supported by one of the standard deserializers, you can implement your own; you can also implement a custom serializer.
490+
491+
To implement a custom serializer and deserializer pair, implement the `org.springframework.core.serializer.Deserializer` and `org.springframework.core.serializer.Serializer` interfaces.
492+
493+
When the deserializer detects a closed input stream between messages, it must throw a `SoftEndOfStreamException`; this is a signal to the framework to indicate that the close was "normal".
494+
If the stream is closed while decoding a message, some other exception should be thrown instead.
495+
483496
[[caching-cf]]
484497
==== TCP Caching Client Connection Factory
485498

0 commit comments

Comments
 (0)