Skip to content

Commit d679936

Browse files
committed
INT-4397: Fix headers filtering for @transformer
JIRA: https://jira.spring.io/browse/INT-4397 The `AbstractMessageProcessingTransformer` doesn't honor a configured `notPropagatedHeaders` and copies all the request headers to the message to return * Add `setNotPropagatedHeaders()` into the `AbstractMessageProcessingTransformer` and implement there a logic to filter headers, similar to what we have in the `AbstractMessageProducingHandler` * Overrider `updateNotPropagatedHeaders()` in the `MessageTransformingHandler` to propagate `notPropagatedHeaders` to the `AbstractMessageProcessingTransformer` delegate
1 parent 1b46224 commit d679936

File tree

4 files changed

+94
-9
lines changed

4 files changed

+94
-9
lines changed

spring-integration-core/src/main/java/org/springframework/integration/handler/AbstractMessageProducingHandler.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -139,7 +139,7 @@ public void setNotPropagatedHeaders(String... headers) {
139139
* @param merge true to merge with existing patterns; false to replace.
140140
* @since 5.0.2
141141
*/
142-
protected final void updateNotPropagatedHeaders(String[] headers, boolean merge) {
142+
protected void updateNotPropagatedHeaders(String[] headers, boolean merge) {
143143
Set<String> headerPatterns = new HashSet<>();
144144

145145
if (merge && this.notPropagatedHeaders != null) {
@@ -151,7 +151,7 @@ protected final void updateNotPropagatedHeaders(String[] headers, boolean merge)
151151

152152
headerPatterns.addAll(Arrays.asList(headers));
153153

154-
this.notPropagatedHeaders = headerPatterns.toArray(new String[headerPatterns.size()]);
154+
this.notPropagatedHeaders = headerPatterns.toArray(new String[0]);
155155
}
156156

157157
boolean hasAsterisk = headerPatterns.contains("*");

spring-integration-core/src/main/java/org/springframework/integration/transformer/AbstractMessageProcessingTransformer.java

Lines changed: 52 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2015 the original author or authors.
2+
* Copyright 2002-2018 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -16,15 +16,23 @@
1616

1717
package org.springframework.integration.transformer;
1818

19+
import java.util.Arrays;
20+
import java.util.HashMap;
21+
import java.util.Map;
22+
1923
import org.springframework.beans.factory.BeanFactory;
2024
import org.springframework.beans.factory.BeanFactoryAware;
2125
import org.springframework.context.Lifecycle;
2226
import org.springframework.integration.handler.MessageProcessor;
27+
import org.springframework.integration.support.AbstractIntegrationMessageBuilder;
2328
import org.springframework.integration.support.DefaultMessageBuilderFactory;
2429
import org.springframework.integration.support.MessageBuilderFactory;
2530
import org.springframework.integration.support.utils.IntegrationUtils;
2631
import org.springframework.messaging.Message;
32+
import org.springframework.messaging.MessageHeaders;
2733
import org.springframework.util.Assert;
34+
import org.springframework.util.ObjectUtils;
35+
import org.springframework.util.PatternMatchUtils;
2836

2937
/**
3038
* Base class for Message Transformers that delegate to a {@link MessageProcessor}.
@@ -37,11 +45,15 @@ public abstract class AbstractMessageProcessingTransformer
3745

3846
private final MessageProcessor<?> messageProcessor;
3947

40-
private volatile MessageBuilderFactory messageBuilderFactory = new DefaultMessageBuilderFactory();
48+
private BeanFactory beanFactory;
4149

42-
private volatile boolean messageBuilderFactorySet;
50+
private MessageBuilderFactory messageBuilderFactory = new DefaultMessageBuilderFactory();
4351

44-
private BeanFactory beanFactory;
52+
private boolean messageBuilderFactorySet;
53+
54+
private String[] notPropagatedHeaders;
55+
56+
private boolean selectiveHeaderPropagation;
4557

4658
protected AbstractMessageProcessingTransformer(MessageProcessor<?> messageProcessor) {
4759
Assert.notNull(messageProcessor, "messageProcessor must not be null");
@@ -85,6 +97,22 @@ public boolean isRunning() {
8597
return !(this.messageProcessor instanceof Lifecycle) || ((Lifecycle) this.messageProcessor).isRunning();
8698
}
8799

100+
/**
101+
* Set headers that will NOT be copied from the inbound message if
102+
* the handler is configured to copy headers.
103+
* @param headers the headers to not propagate from the inbound message.
104+
* @since 5.1
105+
*/
106+
public void setNotPropagatedHeaders(String... headers) {
107+
if (!ObjectUtils.isEmpty(headers)) {
108+
Assert.noNullElements(headers, "null elements are not allowed in 'headers'");
109+
110+
this.notPropagatedHeaders = Arrays.copyOf(headers, headers.length);
111+
}
112+
113+
this.selectiveHeaderPropagation = !ObjectUtils.isEmpty(this.notPropagatedHeaders);
114+
}
115+
88116
@Override
89117
public final Message<?> transform(Message<?> message) {
90118
Object result = this.messageProcessor.processMessage(message);
@@ -94,7 +122,26 @@ public final Message<?> transform(Message<?> message) {
94122
if (result instanceof Message<?>) {
95123
return (Message<?>) result;
96124
}
97-
return getMessageBuilderFactory().withPayload(result).copyHeaders(message.getHeaders()).build();
125+
126+
MessageHeaders requestHeaders = message.getHeaders();
127+
128+
AbstractIntegrationMessageBuilder<?> messageBuilder =
129+
getMessageBuilderFactory()
130+
.withPayload(result);
131+
132+
if (this.selectiveHeaderPropagation) {
133+
Map<String, Object> headersToCopy = new HashMap<>(requestHeaders);
134+
135+
headersToCopy.entrySet()
136+
.removeIf(entry -> PatternMatchUtils.simpleMatch(this.notPropagatedHeaders, entry.getKey()));
137+
138+
messageBuilder.copyHeadersIfAbsent(headersToCopy);
139+
}
140+
else {
141+
messageBuilder.copyHeadersIfAbsent(requestHeaders);
142+
}
143+
144+
return messageBuilder.build();
98145
}
99146

100147
}

spring-integration-core/src/main/java/org/springframework/integration/transformer/MessageTransformingHandler.java

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2016 the original author or authors.
2+
* Copyright 2002-2018 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -16,6 +16,8 @@
1616

1717
package org.springframework.integration.transformer;
1818

19+
import java.util.Collection;
20+
1921
import org.springframework.beans.factory.BeanFactoryAware;
2022
import org.springframework.context.Lifecycle;
2123
import org.springframework.integration.handler.AbstractReplyProducingMessageHandler;
@@ -64,6 +66,18 @@ protected void doInit() {
6466
}
6567
}
6668

69+
@Override
70+
protected void updateNotPropagatedHeaders(String[] headers, boolean merge) {
71+
super.updateNotPropagatedHeaders(headers, merge);
72+
73+
Collection<String> notPropagatedHeaders = getNotPropagatedHeaders();
74+
75+
if (this.transformer instanceof AbstractMessageProcessingTransformer && !notPropagatedHeaders.isEmpty()) {
76+
((AbstractMessageProcessingTransformer) this.transformer)
77+
.setNotPropagatedHeaders(notPropagatedHeaders.toArray(new String[0]));
78+
}
79+
}
80+
6781
@Override
6882
public void start() {
6983
if (this.transformer instanceof Lifecycle) {

spring-integration-core/src/test/java/org/springframework/integration/handler/HeaderAnnotationTransformerTests.java

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2014 the original author or authors.
2+
* Copyright 2002-2018 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -17,6 +17,7 @@
1717
package org.springframework.integration.handler;
1818

1919
import static org.junit.Assert.assertEquals;
20+
import static org.junit.Assert.assertFalse;
2021
import static org.junit.Assert.assertNotNull;
2122
import static org.mockito.Mockito.mock;
2223

@@ -34,6 +35,8 @@
3435
/**
3536
* @author Mark Fisher
3637
* @author Gary Russell
38+
* @author Artem Bilan
39+
*
3740
* @since 2.0
3841
*/
3942
public class HeaderAnnotationTransformerTests {
@@ -103,6 +106,27 @@ public void headerAnnotationWithUnprefixedHeaderAndRelativeExpression() {
103106
}
104107

105108

109+
@Test
110+
public void testNotPropagatedHeaders() {
111+
Object target = new TestTransformer();
112+
MethodInvokingTransformer transformer = new MethodInvokingTransformer(target, "evalFoo");
113+
MessageTransformingHandler handler = new MessageTransformingHandler(transformer);
114+
handler.setBeanFactory(mock(BeanFactory.class));
115+
handler.setNotPropagatedHeaders(IntegrationMessageHeaderAccessor.CORRELATION_ID);
116+
handler.afterPropertiesSet();
117+
QueueChannel outputChannel = new QueueChannel();
118+
handler.setOutputChannel(outputChannel);
119+
handler.handleMessage(
120+
MessageBuilder.withPayload("test")
121+
.setCorrelationId("abc")
122+
.setHeader("foo", "bar")
123+
.build());
124+
Message<?> result = outputChannel.receive(0);
125+
assertNotNull(result);
126+
assertEquals("BAR", result.getPayload());
127+
assertFalse(result.getHeaders().containsKey(IntegrationMessageHeaderAccessor.CORRELATION_ID));
128+
}
129+
106130
public static class TestTransformer {
107131

108132
public String appendCorrelationId(Object payload,

0 commit comments

Comments
 (0)