1717package org .springframework .integration .jdbc .store ;
1818
1919import java .sql .Types ;
20- import java .util .HashMap ;
2120import java .util .HashSet ;
2221import java .util .List ;
2322import java .util .Map ;
2423import java .util .Set ;
2524import java .util .UUID ;
25+ import java .util .concurrent .ConcurrentHashMap ;
2626import java .util .concurrent .locks .Lock ;
2727import java .util .concurrent .locks .ReadWriteLock ;
2828import java .util .concurrent .locks .ReentrantReadWriteLock ;
29+ import java .util .function .Supplier ;
2930
3031import javax .sql .DataSource ;
3132
@@ -105,6 +106,18 @@ public class JdbcChannelMessageStore implements PriorityCapableChannelMessageSto
105106 */
106107 public static final String DEFAULT_TABLE_PREFIX = "INT_" ;
107108
109+ private enum Query {
110+ CREATE_MESSAGE ,
111+ COUNT_GROUPS ,
112+ GROUP_SIZE ,
113+ DELETE_GROUP ,
114+ POLL ,
115+ POLL_WITH_EXCLUSIONS ,
116+ PRIORITY ,
117+ PRIORITY_WITH_EXCLUSIONS ,
118+ DELETE_MESSAGE
119+ }
120+
108121 /**
109122 * The name of the message header that stores a flag to indicate that the message has been saved. This is an
110123 * optimization for the put method.
@@ -146,7 +159,7 @@ public class JdbcChannelMessageStore implements PriorityCapableChannelMessageSto
146159
147160 private ChannelMessageStorePreparedStatementSetter preparedStatementSetter ;
148161
149- private Map <String , String > queryCache = new HashMap <>();
162+ private final Map <Query , String > queryCache = new ConcurrentHashMap <>();
150163
151164 private MessageGroupFactory messageGroupFactory = new SimpleMessageGroupFactory ();
152165
@@ -409,7 +422,8 @@ public void afterPropertiesSet() throws Exception {
409422 @ Override
410423 public MessageGroup addMessageToGroup (Object groupId , final Message <?> message ) {
411424 try {
412- this .jdbcTemplate .update (getQuery (this .channelMessageStoreQueryProvider .getCreateMessageQuery ()),
425+ this .jdbcTemplate .update (getQuery (Query .CREATE_MESSAGE ,
426+ () -> this .channelMessageStoreQueryProvider .getCreateMessageQuery ()),
413427 ps -> this .preparedStatementSetter .setValues (ps , message , groupId , this .region ,
414428 this .priorityEnabled ));
415429 }
@@ -448,26 +462,22 @@ public MessageGroup getMessageGroup(Object groupId) {
448462 @ ManagedAttribute
449463 public int getMessageGroupCount () {
450464 return this .jdbcTemplate .queryForObject (
451- getQuery ("SELECT COUNT(DISTINCT GROUP_KEY) from %PREFIX%CHANNEL_MESSAGE where REGION = ?" ),
465+ getQuery (Query .COUNT_GROUPS ,
466+ () -> "SELECT COUNT(DISTINCT GROUP_KEY) from %PREFIX%CHANNEL_MESSAGE where REGION = ?" ),
452467 Integer .class , this .region );
453468 }
454469
455470 /**
456471 * Replace patterns in the input to produce a valid SQL query. This implementation lazily initializes a
457472 * simple map-based cache, only replacing the table prefix on the first access to a named query. Further
458473 * accesses will be resolved from the cache.
459- * @param sqlQuery The SQL query to be transformed.
474+ * @param queryName The {@link Query} to be transformed.
475+ * @param queryProvider a supplier to provide the query template.
460476 * @return A transformed query with replacements.
461477 */
462- protected String getQuery (String sqlQuery ) {
463- String query = this .queryCache .get (sqlQuery );
464-
465- if (query == null ) {
466- query = StringUtils .replace (sqlQuery , "%PREFIX%" , this .tablePrefix );
467- this .queryCache .put (sqlQuery , query );
468- }
469-
470- return query ;
478+ protected String getQuery (Query queryName , Supplier <String > queryProvider ) {
479+ return this .queryCache .computeIfAbsent (queryName ,
480+ k -> StringUtils .replace (queryProvider .get (), "%PREFIX%" , this .tablePrefix ));
471481 }
472482
473483 /**
@@ -479,13 +489,17 @@ protected String getQuery(String sqlQuery) {
479489 @ ManagedAttribute
480490 public int messageGroupSize (Object groupId ) {
481491 final String key = getKey (groupId );
482- return this .jdbcTemplate .queryForObject (getQuery (this .channelMessageStoreQueryProvider .getCountAllMessagesInGroupQuery ()),
492+ return this .jdbcTemplate .queryForObject (
493+ getQuery (Query .GROUP_SIZE ,
494+ () -> this .channelMessageStoreQueryProvider .getCountAllMessagesInGroupQuery ()),
483495 Integer .class , key , this .region );
484496 }
485497
486498 @ Override
487499 public void removeMessageGroup (Object groupId ) {
488- this .jdbcTemplate .update (this .getQuery (this .channelMessageStoreQueryProvider .getDeleteMessageGroupQuery ()),
500+ this .jdbcTemplate .update (
501+ this .getQuery (Query .DELETE_GROUP ,
502+ () -> this .channelMessageStoreQueryProvider .getDeleteMessageGroupQuery ()),
489503 this .getKey (groupId ), this .region );
490504 }
491505
@@ -531,19 +545,22 @@ protected Message<?> doPollForMessage(String groupIdKey) {
531545 try {
532546 if (this .usingIdCache && !this .idCache .isEmpty ()) {
533547 if (this .priorityEnabled ) {
534- query = getQuery (this .channelMessageStoreQueryProvider .getPriorityPollFromGroupExcludeIdsQuery ());
548+ query = getQuery (Query .PRIORITY_WITH_EXCLUSIONS ,
549+ () -> this .channelMessageStoreQueryProvider .getPriorityPollFromGroupExcludeIdsQuery ());
535550 }
536551 else {
537- query = getQuery (this .channelMessageStoreQueryProvider .getPollFromGroupExcludeIdsQuery ());
552+ query = getQuery (Query .POLL_WITH_EXCLUSIONS ,
553+ () -> this .channelMessageStoreQueryProvider .getPollFromGroupExcludeIdsQuery ());
538554 }
539555 parameters .addValue ("message_ids" , this .idCache );
540556 }
541557 else {
542558 if (this .priorityEnabled ) {
543- query = getQuery (this .channelMessageStoreQueryProvider .getPriorityPollFromGroupQuery ());
559+ query = getQuery (Query .PRIORITY ,
560+ () -> this .channelMessageStoreQueryProvider .getPriorityPollFromGroupQuery ());
544561 }
545562 else {
546- query = getQuery (this .channelMessageStoreQueryProvider .getPollFromGroupQuery ());
563+ query = getQuery (Query . POLL , () -> this .channelMessageStoreQueryProvider .getPollFromGroupQuery ());
547564 }
548565 }
549566 messages = namedParameterJdbcTemplate .query (query , parameters , this .messageRowMapper );
@@ -582,7 +599,8 @@ protected Message<?> doPollForMessage(String groupIdKey) {
582599 private boolean doRemoveMessageFromGroup (Object groupId , Message <?> messageToRemove ) {
583600 final UUID id = messageToRemove .getHeaders ().getId ();
584601
585- int updated = this .jdbcTemplate .update (getQuery (this .channelMessageStoreQueryProvider .getDeleteMessageQuery ()),
602+ int updated = this .jdbcTemplate .update (
603+ getQuery (Query .DELETE_MESSAGE , () -> this .channelMessageStoreQueryProvider .getDeleteMessageQuery ()),
586604 new Object [] { getKey (id ), getKey (groupId ), this .region },
587605 new int [] { Types .VARCHAR , Types .VARCHAR , Types .VARCHAR });
588606
0 commit comments