Skip to content

Commit 20e962c

Browse files
committed
* Add more JavaDocs to ZeroMqProxy and ZeroMqChannel
* Expose `ZeroMqChannel.setZeroMqProxy()` option for easier configuration within the same application context * Make `ZeroMqChannel` sockets configuration and connection dependant on provided `ZeroMqProxy` (if any) * Add `Consumer<ZMQ.Socket>` configuration callbacks to the `ZeroMqChannel` * Expose `ZeroMqChannel.consumeDelay` option
1 parent f221a7f commit 20e962c

File tree

3 files changed

+124
-22
lines changed

3 files changed

+124
-22
lines changed

spring-integration-zeromq/src/main/java/org/springframework/integration/zeromq/ZeroMqProxy.java

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@
4646
* to implement any possible patterns for ZeroMQ intermediary. Defaults to @link {@link ZeroMqProxy.Type#PULL_PUSH}.
4747
* <p>
4848
* The control socket is exposed as a {@link SocketType#PAIR} with an inter-thread transport
49-
* on tne {@code "inproc://" + beanName + ".control"} address; it can be obtained via {@link #getControlAddress()}.
49+
* on the {@code "inproc://" + beanName + ".control"} address; it can be obtained via {@link #getControlAddress()}.
5050
* Should be used with the same application from {@link SocketType#PAIR} socket to send
5151
* {@link zmq.ZMQ#PROXY_TERMINATE}, {@link zmq.ZMQ#PROXY_PAUSE} and/or {@link zmq.ZMQ#PROXY_RESUME} commands.
5252
* <p>
@@ -56,6 +56,7 @@
5656
* With an {@link #exposeCaptureSocket} option, an additional capture data socket is bound to inter-thread transport
5757
* as a {@link SocketType#PUB}. There is no specific topic selection, so all the subscribers to this socket
5858
* must subscribe with plain {@link ZMQ#SUBSCRIPTION_ALL}.
59+
* The address for this socket is {@code "inproc://" + beanName + ".capture"}.
5960
*
6061
* @author Artem Bilan
6162
*
@@ -100,28 +101,53 @@ public class ZeroMqProxy implements InitializingBean, SmartLifecycle, BeanNameAw
100101

101102
private int phase;
102103

104+
/**
105+
* Create a {@link ZeroMqProxy} instance based on the provided {@link ZContext}
106+
* and {@link Type#PULL_PUSH} as default mode.
107+
* @param context the {@link ZContext} to use
108+
*/
103109
public ZeroMqProxy(ZContext context) {
104110
this(context, Type.PULL_PUSH);
105111
}
106112

113+
/**
114+
* Create a {@link ZeroMqProxy} instance based on the provided {@link ZContext}
115+
* and {@link Type}.
116+
* @param context the {@link ZContext} to use
117+
* @param type the {@link Type} to use.
118+
*/
107119
public ZeroMqProxy(ZContext context, Type type) {
108120
Assert.notNull(context, "'context' must not be null");
109121
Assert.notNull(type, "'type' must not be null");
110122
this.context = context;
111123
this.type = type;
112124
}
113125

126+
/**
127+
* Configure an executor to perform a a ZeroMQ proxy loop.
128+
* The thread is held until ZeroMQ proxy loop is terminated.
129+
* By default an internal {@link Executors#newSingleThreadExecutor} instance is used.
130+
* @param proxyExecutor the {@link Executor} to use for ZeroMQ proxy loop
131+
*/
114132
public void setProxyExecutor(Executor proxyExecutor) {
115133
Assert.notNull(proxyExecutor, "'proxyExecutor' must not be null");
116134
this.proxyExecutor = proxyExecutor;
117135
this.proxyExecutorExplicitlySet = true;
118136
}
119137

138+
/**
139+
* Specify a fixed port for frontend socket of the proxy.
140+
* @param frontendPort the port to use; must be more than 0
141+
*/
120142
public void setFrontendPort(int frontendPort) {
121143
Assert.isTrue(frontendPort > 0, "'frontendPort' must not be zero or negative");
122144
this.frontendPort.set(frontendPort);
123145
}
124146

147+
/**
148+
* Specify a fixed port for backend socket of the proxy.
149+
* @param backendPort the port to use; must be more than 0
150+
*/
125151
public void setBackendPort(int backendPort) {
126152
Assert.isTrue(backendPort > 0, "'backendPort' must not be zero or negative");
127153
this.backendPort.set(backendPort);

spring-integration-zeromq/src/main/java/org/springframework/integration/zeromq/channel/ZeroMqChannel.java

Lines changed: 86 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import java.util.HashMap;
2121
import java.util.HashSet;
2222
import java.util.Map;
23+
import java.util.function.Consumer;
2324
import java.util.function.Supplier;
2425

2526
import org.zeromq.SocketType;
@@ -29,6 +30,7 @@
2930
import org.springframework.integration.channel.AbstractMessageChannel;
3031
import org.springframework.integration.mapping.BytesMessageMapper;
3132
import org.springframework.integration.support.json.EmbeddedJsonHeadersMessageMapper;
33+
import org.springframework.integration.zeromq.ZeroMqProxy;
3234
import org.springframework.lang.Nullable;
3335
import org.springframework.messaging.Message;
3436
import org.springframework.messaging.MessageHandler;
@@ -55,7 +57,10 @@
5557
* The {@link #setConnectUrl(String)} has to be as a standard ZeroMQ connect string, but with an extra port
5658
* over the colon - representing a frontend and backend sockets pair on ZeroMQ proxy.
5759
* For example: {@code tcp://localhost:6001:6002}.
58-
* This way a sending and receiving operations on this channel are similar to interaction over a messaging broker.
60+
* Another option is to provide a reference to the {@link ZeroMqProxy} instance managed in the same application:
61+
* frontend and backend ports are evaluated from this proxy and respective connection string is built from them.
62+
* <p>
63+
* This way sending and receiving operations on this channel are similar to interaction over a messaging broker.
5964
* <p>
6065
* An internal logic of this message channel implementation is based on the project Reactor using its
6166
* {@link Mono}, {@link Flux} and {@link Scheduler} API for better thead model and flow control to avoid
@@ -67,6 +72,8 @@
6772
*/
6873
public class ZeroMqChannel extends AbstractMessageChannel implements SubscribableChannel {
6974

75+
public static final Duration DEFAULT_CONSUME_DELAY = Duration.ofSeconds(1);
76+
7077
private final Map<MessageHandler, Disposable> subscribers = new HashMap<>();
7178

7279
private final Scheduler publisherScheduler = Schedulers.newSingle("publisherScheduler");
@@ -83,8 +90,17 @@ public class ZeroMqChannel extends AbstractMessageChannel implements Subscribabl
8390

8491
private final Flux<? extends Message<?>> subscriberData;
8592

93+
private Duration consumeDelay = DEFAULT_CONSUME_DELAY;
94+
8695
private BytesMessageMapper messageMapper = new EmbeddedJsonHeadersMessageMapper();
8796

97+
private Consumer<ZMQ.Socket> sendSocketConfigurer = (socket) -> { };
98+
99+
private Consumer<ZMQ.Socket> subscribeSocketConfigurer = (socket) -> { };
100+
101+
@Nullable
102+
private ZeroMqProxy zeroMqProxy;
103+
88104
@Nullable
89105
private volatile String connectSendUrl;
90106

@@ -107,30 +123,51 @@ public ZeroMqChannel(ZContext context, boolean pubSub) {
107123

108124
Supplier<String> localPairConnection = () -> "inproc://" + getComponentName() + ".pair";
109125

126+
Mono<?> proxyMono =
127+
Mono.defer(() -> {
128+
if (this.zeroMqProxy != null) {
129+
return Mono.just(this.zeroMqProxy.getBackendPort())
130+
.filter((port) -> port > 0)
131+
.repeatWhenEmpty((repeat) -> repeat.delayElements(Duration.ofMillis(100))) // NOSONAR
132+
.doOnNext((port) ->
133+
setConnectUrl("tcp://localhost:" + this.zeroMqProxy.getFrontendPort() +
134+
':' + this.zeroMqProxy.getBackendPort()));
135+
}
136+
else {
137+
return Mono.empty();
138+
}
139+
})
140+
.cache();
141+
110142
this.sendSocket =
111-
Mono.fromCallable(() ->
112-
this.context.createSocket(
113-
this.connectSendUrl == null
114-
? SocketType.PAIR
115-
: (this.pubSub ? SocketType.XPUB : SocketType.PUSH))
116-
)
143+
proxyMono
117144
.publishOn(this.publisherScheduler)
145+
.then(Mono.fromCallable(() ->
146+
this.context.createSocket(
147+
this.connectSendUrl == null
148+
? SocketType.PAIR
149+
: (this.pubSub ? SocketType.XPUB : SocketType.PUSH))
150+
))
151+
.doOnNext(this.sendSocketConfigurer)
118152
.doOnNext((socket) ->
119153
socket.connect(this.connectSendUrl != null
120154
? this.connectSendUrl
121155
: localPairConnection.get()))
122-
.delayUntil((socket) -> (this.pubSub && this.connectSendUrl != null)
123-
? Mono.just(socket).map(ZMQ.Socket::recv)
124-
: Mono.empty())
156+
.delayUntil((socket) ->
157+
(this.pubSub && this.connectSendUrl != null)
158+
? Mono.just(socket).map(ZMQ.Socket::recv)
159+
: Mono.empty())
125160
.cache();
126161

127162
this.subscribeSocket =
128-
Mono.fromCallable(() ->
129-
this.context.createSocket(
130-
this.connectSubscribeUrl == null
131-
? SocketType.PAIR
132-
: (this.pubSub ? SocketType.SUB : SocketType.PULL)))
163+
proxyMono
133164
.publishOn(this.subscriberScheduler)
165+
.then(Mono.fromCallable(() ->
166+
this.context.createSocket(
167+
this.connectSubscribeUrl == null
168+
? SocketType.PAIR
169+
: (this.pubSub ? SocketType.SUB : SocketType.PULL))))
170+
.doOnNext(this.subscribeSocketConfigurer)
134171
.doOnNext((socket) -> {
135172
if (this.connectSubscribeUrl != null) {
136173
socket.connect(this.connectSubscribeUrl);
@@ -160,7 +197,7 @@ public ZeroMqChannel(ZContext context, boolean pubSub) {
160197
.doOnError((error) -> logger.error("Error processing ZeroMQ message", error))
161198
.repeatWhenEmpty((repeat) ->
162199
this.initialized
163-
? repeat.delayElements(Duration.ofMillis(100))
200+
? repeat.delayElements(this.consumeDelay)
164201
: repeat)
165202
.repeat(() -> this.initialized);
166203

@@ -173,6 +210,12 @@ public ZeroMqChannel(ZContext context, boolean pubSub) {
173210

174211
}
175212

213+
/**
214+
* Configure a connection to the ZeroMQ proxy with the pair of ports over colon
215+
* for proxy frontend and backend sockets. Mutually exclusive with the {@link #setZeroMqProxy(ZeroMqProxy)}.
216+
* @param connectUrl the connection string in format {@code PROTOCOL://HOST:FRONTEND_PORT:BACKEND_PORT},
217+
* e.g. {@code tcp://localhost:6001:6002}
218+
*/
176219
public void setConnectUrl(@Nullable String connectUrl) {
177220
if (connectUrl != null) {
178221
this.connectSendUrl = connectUrl.substring(0, connectUrl.lastIndexOf(':'));
@@ -182,13 +225,40 @@ public void setConnectUrl(@Nullable String connectUrl) {
182225
}
183226
}
184227

228+
/**
229+
* Specify a reference to a {@link ZeroMqProxy} instance in the same application
230+
* to rely on its ports configuration and make a natural lifecycle dependency without guessing
231+
* when the proxy is started. Mutually exclusive with the {@link #setConnectUrl(String)}.
232+
* @param zeroMqProxy the {@link ZeroMqProxy} instance to use
233+
*/
234+
public void setZeroMqProxy(@Nullable ZeroMqProxy zeroMqProxy) {
235+
this.zeroMqProxy = zeroMqProxy;
236+
}
237+
238+
public void setConsumeDelay(Duration consumeDelay) {
239+
Assert.notNull(consumeDelay, "'consumeDelay' must not be null");
240+
this.consumeDelay = consumeDelay;
241+
}
242+
185243
public void setMessageMapper(BytesMessageMapper messageMapper) {
186244
Assert.notNull(messageMapper, "'messageMapper' must not be null");
187245
this.messageMapper = messageMapper;
188246
}
189247

248+
public void setSendSocketConfigurer(Consumer<ZMQ.Socket> sendSocketConfigurer) {
249+
Assert.notNull(sendSocketConfigurer, "'sendSocketConfigurer' must not be null");
250+
this.sendSocketConfigurer = sendSocketConfigurer;
251+
}
252+
253+
public void setSubscribeSocketConfigurer(Consumer<ZMQ.Socket> subscribeSocketConfigurer) {
254+
Assert.notNull(subscribeSocketConfigurer, "'subscribeSocketConfigurer' must not be null");
255+
this.subscribeSocketConfigurer = subscribeSocketConfigurer;
256+
}
257+
190258
@Override
191259
protected void onInit() {
260+
Assert.state(this.zeroMqProxy == null || this.connectSendUrl == null,
261+
"Or 'zeroMqProxy' or 'connectUrl' can be provided (or none), but not both.");
192262
super.onInit();
193263
this.sendSocket.subscribe();
194264
this.initialized = true;

spring-integration-zeromq/src/test/java/org/springframework/integration/zeromq/channel/ZeroMqChannelTests.java

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import static org.assertj.core.api.Assertions.assertThat;
2020
import static org.awaitility.Awaitility.await;
2121

22+
import java.time.Duration;
2223
import java.util.concurrent.BlockingQueue;
2324
import java.util.concurrent.LinkedBlockingQueue;
2425
import java.util.concurrent.TimeUnit;
@@ -52,6 +53,7 @@ static void tearDown() {
5253
void testSimpleSendAndReceive() throws InterruptedException {
5354
ZeroMqChannel channel = new ZeroMqChannel(CONTEXT);
5455
channel.setBeanName("testChannel1");
56+
channel.setConsumeDelay(Duration.ofMillis(10));
5557
channel.afterPropertiesSet();
5658

5759
BlockingQueue<Message<?>> received = new LinkedBlockingQueue<>();
@@ -82,6 +84,7 @@ void testSimpleSendAndReceive() throws InterruptedException {
8284
void testPubSubLocal() throws InterruptedException {
8385
ZeroMqChannel channel = new ZeroMqChannel(CONTEXT, true);
8486
channel.setBeanName("testChannel2");
87+
channel.setConsumeDelay(Duration.ofMillis(10));
8588
channel.afterPropertiesSet();
8689

8790
BlockingQueue<Message<?>> received = new LinkedBlockingQueue<>();
@@ -115,8 +118,9 @@ void testPushPullBind() throws InterruptedException {
115118
captureSocket.subscribe(ZMQ.SUBSCRIPTION_ALL);
116119

117120
ZeroMqChannel channel = new ZeroMqChannel(CONTEXT);
118-
channel.setConnectUrl("tcp://*:" + proxy.getFrontendPort() + ':' + proxy.getBackendPort());
121+
channel.setConnectUrl("tcp://localhost:" + proxy.getFrontendPort() + ':' + proxy.getBackendPort());
119122
channel.setBeanName("testChannel3");
123+
channel.setConsumeDelay(Duration.ofMillis(10));
120124
channel.afterPropertiesSet();
121125

122126
BlockingQueue<Message<?>> received = new LinkedBlockingQueue<>();
@@ -150,21 +154,23 @@ void testPubSubBind() throws InterruptedException {
150154
proxy.afterPropertiesSet();
151155
proxy.start();
152156

153-
await().until(() -> proxy.getBackendPort() > 0);
154-
155157
ZeroMqChannel channel = new ZeroMqChannel(CONTEXT, true);
156-
channel.setConnectUrl("tcp://*:" + proxy.getFrontendPort() + ':' + proxy.getBackendPort());
158+
channel.setZeroMqProxy(proxy);
157159
channel.setBeanName("testChannel4");
160+
channel.setConsumeDelay(Duration.ofMillis(10));
158161
channel.afterPropertiesSet();
159162

160163
BlockingQueue<Message<?>> received = new LinkedBlockingQueue<>();
161164

162165
channel.subscribe(received::offer);
163166
channel.subscribe(received::offer);
164167

168+
await().until(() -> proxy.getBackendPort() > 0);
169+
165170
ZeroMqChannel channel2 = new ZeroMqChannel(CONTEXT, true);
166-
channel2.setConnectUrl("tcp://*:" + proxy.getFrontendPort() + ':' + proxy.getBackendPort());
171+
channel2.setConnectUrl("tcp://localhost:" + proxy.getFrontendPort() + ':' + proxy.getBackendPort());
167172
channel2.setBeanName("testChannel5");
173+
channel.setConsumeDelay(Duration.ofMillis(10));
168174
channel2.afterPropertiesSet();
169175

170176
channel.subscribe(received::offer);

0 commit comments

Comments
 (0)