1616
1717package org .springframework .integration .handler ;
1818
19- import java .util .Set ;
20- import java .util .concurrent .ConcurrentHashMap ;
21-
2219import org .reactivestreams .Subscription ;
2320
24- import org .springframework .core .Ordered ;
25- import org .springframework .integration .IntegrationPattern ;
26- import org .springframework .integration .IntegrationPatternType ;
27- import org .springframework .integration .context .IntegrationObjectSupport ;
28- import org .springframework .integration .context .Orderable ;
2921import org .springframework .integration .history .MessageHistory ;
3022import org .springframework .integration .support .management .AbstractMessageHandlerMetrics ;
31- import org .springframework .integration .support .management .ConfigurableMetricsAware ;
32- import org .springframework .integration .support .management .DefaultMessageHandlerMetrics ;
33- import org .springframework .integration .support .management .IntegrationManagedResource ;
34- import org .springframework .integration .support .management .TrackableComponent ;
35- import org .springframework .integration .support .management .metrics .MeterFacade ;
3623import org .springframework .integration .support .management .metrics .MetricsCaptor ;
3724import org .springframework .integration .support .management .metrics .SampleFacade ;
38- import org .springframework .integration .support .management .metrics .TimerFacade ;
3925import org .springframework .integration .support .utils .IntegrationUtils ;
40- import org .springframework .lang .Nullable ;
4126import org .springframework .messaging .Message ;
4227import org .springframework .messaging .MessageHandler ;
4328import org .springframework .util .Assert ;
4429
4530import reactor .core .CoreSubscriber ;
4631
4732/**
48- * Base class for MessageHandler implementations that provides basic validation
49- * and error handling capabilities. Asserts that the incoming Message is not
50- * null and that it does not contain a null payload. Converts checked exceptions
51- * into runtime {@link org.springframework.messaging.MessagingException}s.
33+ * Base class for {@link MessageHandler} implementations.
5234 *
53- * @author Mark Fisher
54- * @author Oleg Zhurakousky
55- * @author Gary Russell
35+ * @author David Turanski
5636 * @author Artem Bilan
57- * @author Amit Sadafule
5837 */
59- @ SuppressWarnings ("deprecation" )
60- @ IntegrationManagedResource
61- public abstract class AbstractMessageHandler extends IntegrationObjectSupport
62- implements MessageHandler ,
63- org .springframework .integration .support .management .MessageHandlerMetrics ,
64- ConfigurableMetricsAware <AbstractMessageHandlerMetrics >,
65- TrackableComponent , Orderable , CoreSubscriber <Message <?>>,
66- IntegrationPattern {
67-
68- private final ManagementOverrides managementOverrides = new ManagementOverrides ();
69-
70- private final Set <TimerFacade > timers = ConcurrentHashMap .newKeySet ();
71-
72- private volatile boolean shouldTrack = false ;
73-
74- private volatile int order = Ordered .LOWEST_PRECEDENCE ;
75-
76- private volatile AbstractMessageHandlerMetrics handlerMetrics = new DefaultMessageHandlerMetrics ();
77-
78- private volatile boolean statsEnabled ;
79-
80- private volatile boolean countsEnabled ;
81-
82- private volatile String managedName ;
83-
84- private volatile String managedType ;
85-
86- private volatile boolean loggingEnabled = true ;
87-
88- private MetricsCaptor metricsCaptor ;
89-
90- private TimerFacade successTimer ;
91-
92- @ Override
93- public boolean isLoggingEnabled () {
94- return this .loggingEnabled ;
95- }
96-
97- @ Override
98- public void setLoggingEnabled (boolean loggingEnabled ) {
99- this .loggingEnabled = loggingEnabled ;
100- this .managementOverrides .loggingConfigured = true ;
101- }
102-
103- @ Override
104- public void registerMetricsCaptor (MetricsCaptor metricsCaptorToRegister ) {
105- this .metricsCaptor = metricsCaptorToRegister ;
106- }
107-
108- @ Nullable
109- protected MetricsCaptor getMetricsCaptor () {
110- return this .metricsCaptor ;
111- }
112-
113- @ Override
114- public void setOrder (int order ) {
115- this .order = order ;
116- }
117-
118- @ Override
119- public int getOrder () {
120- return this .order ;
121- }
122-
123- @ Override
124- public String getComponentType () {
125- return "message-handler" ;
126- }
127-
128- @ Override
129- public void setShouldTrack (boolean shouldTrack ) {
130- this .shouldTrack = shouldTrack ;
131- }
132-
133- @ Override
134- public void configureMetrics (AbstractMessageHandlerMetrics metrics ) {
135- Assert .notNull (metrics , "'metrics' must not be null" );
136- this .handlerMetrics = metrics ;
137- this .managementOverrides .metricsConfigured = true ;
138- }
139-
140- @ Override
141- public ManagementOverrides getOverrides () {
142- return this .managementOverrides ;
143- }
144-
145- @ Override
146- public IntegrationPatternType getIntegrationPatternType () {
147- return IntegrationPatternType .outbound_channel_adapter ;
148- }
149-
150- @ Override
151- protected void onInit () {
152- if (this .statsEnabled ) {
153- this .handlerMetrics .setFullStatsEnabled (true );
154- }
155- }
38+ public abstract class AbstractMessageHandler extends MessageHandlerSupport
39+ implements MessageHandler , CoreSubscriber <Message <?>> {
15640
157- @ Override
158- public void handleMessage (Message <?> messageArg ) {
159- Message <?> message = messageArg ;
41+ @ SuppressWarnings ("deprecation" )
42+ public void handleMessage (Message <?> message ) {
16043 Assert .notNull (message , "Message must not be null" );
161- Assert .notNull (message .getPayload (), "Message payload must not be null" ); //NOSONAR - false positive
162- if (this .loggingEnabled && this .logger .isDebugEnabled ()) {
44+ if (isLoggingEnabled () && this .logger .isDebugEnabled ()) {
16345 this .logger .debug (this + " received message: " + message );
16446 }
16547 org .springframework .integration .support .management .MetricsContext start = null ;
166- boolean countsAreEnabled = this .countsEnabled ;
167- AbstractMessageHandlerMetrics metrics = this .handlerMetrics ;
16848 SampleFacade sample = null ;
169- if (countsAreEnabled && this .metricsCaptor != null ) {
170- sample = this .metricsCaptor .start ();
49+ MetricsCaptor metricsCaptor = getMetricsCaptor ();
50+ if (metricsCaptor != null && isCountsEnabled ()) {
51+ sample = metricsCaptor .start ();
17152 }
17253 try {
173- if (this . shouldTrack ) {
54+ if (shouldTrack () ) {
17455 message = MessageHistory .write (message , this , getMessageBuilderFactory ());
17556 }
176- if (countsAreEnabled ) {
177- start = metrics .beforeHandle ();
57+ AbstractMessageHandlerMetrics handlerMetrics = getHandlerMetrics ();
58+ if (isCountsEnabled ()) {
59+ start = handlerMetrics .beforeHandle ();
17860 handleMessageInternal (message );
17961 if (sample != null ) {
18062 sample .stop (sendTimer ());
18163 }
182- metrics .afterHandle (start , true );
64+ handlerMetrics .afterHandle (start , true );
18365 }
18466 else {
18567 handleMessageInternal (message );
@@ -189,44 +71,20 @@ public void handleMessage(Message<?> messageArg) {
18971 if (sample != null ) {
19072 sample .stop (buildSendTimer (false , e .getClass ().getSimpleName ()));
19173 }
192- if (countsAreEnabled ) {
193- metrics .afterHandle (start , false );
74+ if (isCountsEnabled () ) {
75+ getHandlerMetrics () .afterHandle (start , false );
19476 }
19577 throw IntegrationUtils .wrapInHandlingExceptionIfNecessary (message ,
19678 () -> "error occurred in message handler [" + this + "]" , e );
19779 }
19880 }
19981
200- private TimerFacade sendTimer () {
201- if (this .successTimer == null ) {
202- this .successTimer = buildSendTimer (true , "none" );
203- }
204- return this .successTimer ;
205- }
206-
207- private TimerFacade buildSendTimer (boolean success , String exception ) {
208- TimerFacade timer = this .metricsCaptor .timerBuilder (SEND_TIMER_NAME )
209- .tag ("type" , "handler" )
210- .tag ("name" , getComponentName () == null ? "unknown" : getComponentName ())
211- .tag ("result" , success ? "success" : "failure" )
212- .tag ("exception" , exception )
213- .description ("Send processing time" )
214- .build ();
215- this .timers .add (timer );
216- return timer ;
217- }
218-
21982 @ Override
22083 public void onSubscribe (Subscription subscription ) {
22184 Assert .notNull (subscription , "'subscription' must not be null" );
22285 subscription .request (Long .MAX_VALUE );
22386 }
22487
225- @ Override
226- public void onNext (Message <?> message ) {
227- handleMessage (message );
228- }
229-
23088 @ Override
23189 public void onError (Throwable throwable ) {
23290
@@ -237,125 +95,11 @@ public void onComplete() {
23795
23896 }
23997
240- protected abstract void handleMessageInternal (Message <?> message );
241-
24298 @ Override
243- public void reset () {
244- this .handlerMetrics .reset ();
245- }
246-
247- @ Override
248- public long getHandleCountLong () {
249- return this .handlerMetrics .getHandleCountLong ();
250- }
251-
252- @ Override
253- public int getHandleCount () {
254- return this .handlerMetrics .getHandleCount ();
255- }
256-
257- @ Override
258- public int getErrorCount () {
259- return this .handlerMetrics .getErrorCount ();
260- }
261-
262- @ Override
263- public long getErrorCountLong () {
264- return this .handlerMetrics .getErrorCountLong ();
265- }
266-
267- @ Override
268- public double getMeanDuration () {
269- return this .handlerMetrics .getMeanDuration ();
270- }
271-
272- @ Override
273- public double getMinDuration () {
274- return this .handlerMetrics .getMinDuration ();
275- }
276-
277- @ Override
278- public double getMaxDuration () {
279- return this .handlerMetrics .getMaxDuration ();
280- }
281-
282- @ Override
283- public double getStandardDeviationDuration () {
284- return this .handlerMetrics .getStandardDeviationDuration ();
285- }
286-
287- @ Override
288- public int getActiveCount () {
289- return this .handlerMetrics .getActiveCount ();
290- }
291-
292- @ Override
293- public long getActiveCountLong () {
294- return this .handlerMetrics .getActiveCountLong ();
295- }
296-
297- @ Override
298- public org .springframework .integration .support .management .Statistics getDuration () {
299- return this .handlerMetrics .getDuration ();
300- }
301-
302- @ Override
303- public void setStatsEnabled (boolean statsEnabled ) {
304- if (statsEnabled ) {
305- this .countsEnabled = true ;
306- this .managementOverrides .countsConfigured = true ;
307- }
308- this .statsEnabled = statsEnabled ;
309- if (this .handlerMetrics != null ) {
310- this .handlerMetrics .setFullStatsEnabled (statsEnabled );
311- }
312- this .managementOverrides .statsConfigured = true ;
313- }
314-
315- @ Override
316- public boolean isStatsEnabled () {
317- return this .statsEnabled ;
318- }
319-
320- @ Override
321- public void setCountsEnabled (boolean countsEnabled ) {
322- this .countsEnabled = countsEnabled ;
323- this .managementOverrides .countsConfigured = true ;
324- if (!countsEnabled ) {
325- this .statsEnabled = false ;
326- this .managementOverrides .statsConfigured = true ;
327- }
328- }
329-
330- @ Override
331- public boolean isCountsEnabled () {
332- return this .countsEnabled ;
333- }
334-
335- @ Override
336- public void setManagedName (String managedName ) {
337- this .managedName = managedName ;
338- }
339-
340- @ Override
341- public String getManagedName () {
342- return this .managedName ;
343- }
344-
345- @ Override
346- public void setManagedType (String managedType ) {
347- this .managedType = managedType ;
348- }
349-
350- @ Override
351- public String getManagedType () {
352- return this .managedType ;
99+ public void onNext (Message <?> message ) {
100+ handleMessage (message );
353101 }
354102
355- @ Override
356- public void destroy () {
357- this .timers .forEach (MeterFacade ::remove );
358- this .timers .clear ();
359- }
103+ protected abstract void handleMessageInternal (Message <?> message );
360104
361105}
0 commit comments