diff --git a/spring-integration-core/src/main/java/org/springframework/integration/aggregator/AbstractAggregatingMessageGroupProcessor.java b/spring-integration-core/src/main/java/org/springframework/integration/aggregator/AbstractAggregatingMessageGroupProcessor.java index 16bb1afe116..f863df4e334 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/aggregator/AbstractAggregatingMessageGroupProcessor.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/aggregator/AbstractAggregatingMessageGroupProcessor.java @@ -63,7 +63,7 @@ public final Object processMessageGroup(MessageGroup group) { protected Map aggregateHeaders(MessageGroup group) { Map aggregatedHeaders = new HashMap(); Set conflictKeys = new HashSet(); - for (Message message : group.getUnmarked()) { + for (Message message : group.getMessages()) { MessageHeaders currentHeaders = message.getHeaders(); for (String key : currentHeaders.keySet()) { if (MessageHeaders.ID.equals(key) || MessageHeaders.TIMESTAMP.equals(key) diff --git a/spring-integration-core/src/main/java/org/springframework/integration/aggregator/AbstractCorrelatingMessageHandler.java b/spring-integration-core/src/main/java/org/springframework/integration/aggregator/AbstractCorrelatingMessageHandler.java index 2b10a707396..d63dcb79e81 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/aggregator/AbstractCorrelatingMessageHandler.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/aggregator/AbstractCorrelatingMessageHandler.java @@ -85,13 +85,6 @@ public abstract class AbstractCorrelatingMessageHandler extends AbstractMessageH private final Object correlationLocksMonitor = new Object(); private final ConcurrentMap locks = new ConcurrentHashMap(); - - protected volatile boolean keepReleasedMessages = true; - - - public void setKeepReleasedMessages(boolean keepReleasedMessages) { - this.keepReleasedMessages = keepReleasedMessages; - } public AbstractCorrelatingMessageHandler(MessageGroupProcessor processor, MessageGroupStore store, CorrelationStrategy correlationStrategy, ReleaseStrategy releaseStrategy) { @@ -288,7 +281,7 @@ private void expireGroup(Object correlationKey, MessageGroup group) { logger.debug("Discarding messages of partially complete group with key [" + correlationKey + "] to: " + discardChannel); } - for (Message message : group.getUnmarked()) { + for (Message message : group.getMessages()) { discardChannel.send(message); } } @@ -307,6 +300,7 @@ private Collection> completeGroup(Message message, Object correlat if (logger.isDebugEnabled()) { logger.debug("Completing group with correlationKey [" + correlationKey + "]"); } + Object result = outputProcessor.processMessageGroup(group); Collection> partialSequence = null; if (result instanceof Collection) { diff --git a/spring-integration-core/src/main/java/org/springframework/integration/aggregator/AggregatingMessageHandler.java b/spring-integration-core/src/main/java/org/springframework/integration/aggregator/AggregatingMessageHandler.java index fd35d0f65d4..2b615ff4b0f 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/aggregator/AggregatingMessageHandler.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/aggregator/AggregatingMessageHandler.java @@ -31,7 +31,6 @@ public class AggregatingMessageHandler extends AbstractCorrelatingMessageHandler private volatile boolean expireGroupsUponCompletion = false; - public AggregatingMessageHandler(MessageGroupProcessor processor, MessageGroupStore store, CorrelationStrategy correlationStrategy, ReleaseStrategy releaseStrategy) { super(processor, store, correlationStrategy, releaseStrategy); @@ -71,16 +70,8 @@ protected void afterRelease(MessageGroup messageGroup, Collection> co remove(messageGroup); } else { - if (this.keepReleasedMessages){ - messageStore.markMessageGroup(messageGroup); - } - else { - for (Message message : messageGroup.getMarked()) { - this.messageStore.removeMessageFromGroup(messageGroup.getGroupId(), message); - } - for (Message message : messageGroup.getUnmarked()) { - this.messageStore.removeMessageFromGroup(messageGroup.getGroupId(), message); - } + for (Message message : messageGroup.getMessages()) { + this.messageStore.removeMessageFromGroup(messageGroup.getGroupId(), message); } } } diff --git a/spring-integration-core/src/main/java/org/springframework/integration/aggregator/CorrelatingMessageBarrier.java b/spring-integration-core/src/main/java/org/springframework/integration/aggregator/CorrelatingMessageBarrier.java index 86ba925d9dd..d99cef548cc 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/aggregator/CorrelatingMessageBarrier.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/aggregator/CorrelatingMessageBarrier.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2010 the original author or authors. + * Copyright 2002-2011 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at @@ -40,6 +40,7 @@ * for each correlation key. * * @author Iwein Fuld + * @author Oleg Zhurakousky * * @see CorrelatingMessageHandler */ @@ -102,9 +103,9 @@ public Message receive() { if (releaseStrategy.canRelease(group)) { Message nextMessage = null; - Iterator> unmarked = group.getUnmarked().iterator(); - if (unmarked.hasNext()) { - nextMessage = unmarked.next(); + Iterator> messages = group.getMessages().iterator(); + if (messages.hasNext()) { + nextMessage = messages.next(); store.removeMessageFromGroup(key, nextMessage); if (log.isDebugEnabled()) { log.debug(String.format("Released message for key [%s]: %s.", key, nextMessage)); diff --git a/spring-integration-core/src/main/java/org/springframework/integration/aggregator/DefaultAggregatingMessageGroupProcessor.java b/spring-integration-core/src/main/java/org/springframework/integration/aggregator/DefaultAggregatingMessageGroupProcessor.java index 677944f108c..6e83bc6f5d0 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/aggregator/DefaultAggregatingMessageGroupProcessor.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/aggregator/DefaultAggregatingMessageGroupProcessor.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2010 the original author or authors. + * Copyright 2002-2011 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -38,7 +38,7 @@ public class DefaultAggregatingMessageGroupProcessor extends AbstractAggregating @Override protected final Object aggregatePayloads(MessageGroup group, Map headers) { - Collection> messages = group.getUnmarked(); + Collection> messages = group.getMessages(); Assert.notEmpty(messages, this.getClass().getSimpleName() + " cannot process empty message groups"); List payloads = new ArrayList(messages.size()); for (Message message : messages) { diff --git a/spring-integration-core/src/main/java/org/springframework/integration/aggregator/ExpressionEvaluatingMessageGroupProcessor.java b/spring-integration-core/src/main/java/org/springframework/integration/aggregator/ExpressionEvaluatingMessageGroupProcessor.java index bbfd121fe6d..6ad9edd2766 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/aggregator/ExpressionEvaluatingMessageGroupProcessor.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/aggregator/ExpressionEvaluatingMessageGroupProcessor.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2010 the original author or authors. + * Copyright 2002-2011 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -54,12 +54,12 @@ public void setExpectedType(Class expectedType) { } /** - * Evaluate the expression provided on the unmarked messages (a collection) in the group, and delegate to the + * Evaluate the expression provided on the messages (a collection) in the group, and delegate to the * {@link MessagingTemplate} to send downstream. */ @Override protected Object aggregatePayloads(MessageGroup group, Map headers) { - return processor.process(group.getUnmarked()); + return processor.process(group.getMessages()); } } diff --git a/spring-integration-core/src/main/java/org/springframework/integration/aggregator/ExpressionEvaluatingReleaseStrategy.java b/spring-integration-core/src/main/java/org/springframework/integration/aggregator/ExpressionEvaluatingReleaseStrategy.java index bb2ed6bcc00..eed9b14ace0 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/aggregator/ExpressionEvaluatingReleaseStrategy.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/aggregator/ExpressionEvaluatingReleaseStrategy.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2008 the original author or authors. + * Copyright 2002-2011 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -31,11 +31,11 @@ public ExpressionEvaluatingReleaseStrategy(String expression) { } /** - * Evaluate the expression provided on the unmarked messages (a collection) in the group and return the result (must + * Evaluate the expression provided on the messages (a collection) in the group and return the result (must * be boolean). */ public boolean canRelease(MessageGroup messages) { - return ((Boolean) process(messages.getUnmarked())).booleanValue(); + return ((Boolean) process(messages.getMessages())).booleanValue(); } } diff --git a/spring-integration-core/src/main/java/org/springframework/integration/aggregator/MessageCountReleaseStrategy.java b/spring-integration-core/src/main/java/org/springframework/integration/aggregator/MessageCountReleaseStrategy.java index 7f75aa063cd..9287289c7b5 100755 --- a/spring-integration-core/src/main/java/org/springframework/integration/aggregator/MessageCountReleaseStrategy.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/aggregator/MessageCountReleaseStrategy.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2010 the original author or authors. + * Copyright 2002-2011 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at @@ -19,6 +19,7 @@ * A {@link ReleaseStrategy} that releases only the first n messages, where n is a threshold. * * @author Dave Syer + * @author Oleg Zhurakousky * */ public class MessageCountReleaseStrategy implements ReleaseStrategy { @@ -41,12 +42,12 @@ public MessageCountReleaseStrategy() { } /** - * Release the group if it has more messages than the threshold and has not previously been released. Previous - * releases leave an imprint on the group in the form of marked messages. It is possible that more messages than the - * threshold could be released, but only if multiple consumers receive messages from the same group concurrently. + * Release the group if it has more messages than the threshold and has not previously been released. + * It is possible that more messages than the threshold could be released, but only if multiple consumers + * receive messages from the same group concurrently. */ public boolean canRelease(MessageGroup group) { - return group.size() >= threshold && group.getMarked().size() == 0; + return group.size() >= threshold; } } diff --git a/spring-integration-core/src/main/java/org/springframework/integration/aggregator/MethodInvokingMessageGroupProcessor.java b/spring-integration-core/src/main/java/org/springframework/integration/aggregator/MethodInvokingMessageGroupProcessor.java index e1a697f2fa9..2706e67672d 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/aggregator/MethodInvokingMessageGroupProcessor.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/aggregator/MethodInvokingMessageGroupProcessor.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2010 the original author or authors. + * Copyright 2002-2011 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -79,7 +79,7 @@ public void setBeanFactory(BeanFactory beanFactory) { @Override protected final Object aggregatePayloads(MessageGroup group, Map headers) { - final Collection> messagesUpForProcessing = group.getUnmarked(); + final Collection> messagesUpForProcessing = group.getMessages(); return this.processor.process(messagesUpForProcessing, headers); } diff --git a/spring-integration-core/src/main/java/org/springframework/integration/aggregator/MethodInvokingReleaseStrategy.java b/spring-integration-core/src/main/java/org/springframework/integration/aggregator/MethodInvokingReleaseStrategy.java index 38e2255d5e1..88f81ef730d 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/aggregator/MethodInvokingReleaseStrategy.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/aggregator/MethodInvokingReleaseStrategy.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2010 the original author or authors. + * Copyright 2002-2011 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -51,7 +51,7 @@ public void setBeanFactory(BeanFactory beanFactory) { } public boolean canRelease(MessageGroup messages) { - return this.adapter.process(messages.getUnmarked(), null); + return this.adapter.process(messages.getMessages(), null); } } diff --git a/spring-integration-core/src/main/java/org/springframework/integration/aggregator/PassThroughMessageGroupProcessor.java b/spring-integration-core/src/main/java/org/springframework/integration/aggregator/PassThroughMessageGroupProcessor.java index ccb078448e4..27ce3e0a492 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/aggregator/PassThroughMessageGroupProcessor.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/aggregator/PassThroughMessageGroupProcessor.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2010 the original author or authors. + * Copyright 2002-2011 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at @@ -16,7 +16,7 @@ import org.springframework.integration.store.MessageGroup; /** - * This implementation of MessageGroupProcessor will return all unmarked messages inside the group. + * This implementation of MessageGroupProcessor will return all messages inside the group. * This is useful if there is no requirement to process the messages, but they should just be * blocked as a group until their ReleaseStrategy lets them pass through. * @@ -26,7 +26,7 @@ public class PassThroughMessageGroupProcessor implements MessageGroupProcessor { public Object processMessageGroup(MessageGroup group) { - return group.getUnmarked(); + return group.getMessages(); } } diff --git a/spring-integration-core/src/main/java/org/springframework/integration/aggregator/ResequencingMessageGroupProcessor.java b/spring-integration-core/src/main/java/org/springframework/integration/aggregator/ResequencingMessageGroupProcessor.java index 9fb9839b5a5..8161a4daf12 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/aggregator/ResequencingMessageGroupProcessor.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/aggregator/ResequencingMessageGroupProcessor.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2010 the original author or authors. + * Copyright 2002-2011 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at @@ -40,7 +40,7 @@ public void setComparator(Comparator> comparator) { } public Object processMessageGroup(MessageGroup group) { - Collection> messages = group.getUnmarked(); + Collection> messages = group.getMessages(); if (messages.size() > 0) { List> sorted = new ArrayList>(messages); diff --git a/spring-integration-core/src/main/java/org/springframework/integration/aggregator/ResequencingMessageHandler.java b/spring-integration-core/src/main/java/org/springframework/integration/aggregator/ResequencingMessageHandler.java index 829363593f1..62944d8891d 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/aggregator/ResequencingMessageHandler.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/aggregator/ResequencingMessageHandler.java @@ -48,7 +48,7 @@ public ResequencingMessageHandler(MessageGroupProcessor processor) { @Override protected void afterRelease(MessageGroup messageGroup, Collection> completedMessages) { - int size = messageGroup.getUnmarked().size() + messageGroup.getMarked().size(); + int size = messageGroup.getMessages().size(); int sequenceSize = 0; Message message = messageGroup.getOne(); if (message != null){ @@ -62,17 +62,8 @@ protected void afterRelease(MessageGroup messageGroup, Collection> co if (completedMessages != null){ int lastReleasedSequenceNumber = this.findLastReleasedSequenceNumber(messageGroup.getGroupId(), completedMessages); messageStore.setLastReleasedSequenceNumberForGroup(messageGroup.getGroupId(), lastReleasedSequenceNumber); - - if (this.keepReleasedMessages){ - Object id = messageGroup.getGroupId(); - for (Message msg : completedMessages) { - messageStore.markMessageFromGroup(id, msg); - } - } - else { - for (Message msg : completedMessages) { - this.messageStore.removeMessageFromGroup(messageGroup.getGroupId(), msg); - } + for (Message msg : completedMessages) { + this.messageStore.removeMessageFromGroup(messageGroup.getGroupId(), msg); } } } diff --git a/spring-integration-core/src/main/java/org/springframework/integration/aggregator/SequenceSizeReleaseStrategy.java b/spring-integration-core/src/main/java/org/springframework/integration/aggregator/SequenceSizeReleaseStrategy.java index f76b7e1e586..e36d6325592 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/aggregator/SequenceSizeReleaseStrategy.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/aggregator/SequenceSizeReleaseStrategy.java @@ -67,14 +67,14 @@ public boolean canRelease(MessageGroup messageGroup) { boolean canRelease = false; - Collection> unmarked = messageGroup.getUnmarked(); + Collection> messages = messageGroup.getMessages(); - if (releasePartialSequences && !unmarked.isEmpty()) { + if (releasePartialSequences && !messages.isEmpty()) { if (logger.isTraceEnabled()) { logger.trace("Considering partial release of group [" + messageGroup + "]"); } - List> sorted = new ArrayList>(unmarked); + List> sorted = new ArrayList>(messages); Collections.sort(sorted, comparator); int nextSequenceNumber = sorted.get(0).getHeaders().getSequenceNumber(); @@ -85,7 +85,7 @@ public boolean canRelease(MessageGroup messageGroup) { } } else { - int size = messageGroup.getUnmarked().size(); + int size = messages.size(); if (size == 0){ canRelease = true; diff --git a/spring-integration-core/src/main/java/org/springframework/integration/aggregator/TimeoutCountSequenceSizeReleaseStrategy.java b/spring-integration-core/src/main/java/org/springframework/integration/aggregator/TimeoutCountSequenceSizeReleaseStrategy.java index adfd01c422f..34a60fdaede 100755 --- a/spring-integration-core/src/main/java/org/springframework/integration/aggregator/TimeoutCountSequenceSizeReleaseStrategy.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/aggregator/TimeoutCountSequenceSizeReleaseStrategy.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2008 the original author or authors. + * Copyright 2002-2011 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at @@ -60,7 +60,7 @@ public TimeoutCountSequenceSizeReleaseStrategy(int threshold, long timeout) { public boolean canRelease(MessageGroup messages) { long elapsedTime = System.currentTimeMillis() - findEarliestTimestamp(messages); - return messages.isComplete() || messages.getUnmarked().size() >= threshold || elapsedTime > timeout; + return messages.isComplete() || messages.getMessages().size() >= threshold || elapsedTime > timeout; } /** @@ -69,7 +69,7 @@ public boolean canRelease(MessageGroup messages) { */ private long findEarliestTimestamp(MessageGroup messages) { long result = Long.MAX_VALUE; - for (Message message : messages.getUnmarked()) { + for (Message message : messages.getMessages()) { long timestamp = message.getHeaders().getTimestamp(); if (timestamp < result) { result = timestamp; diff --git a/spring-integration-core/src/main/java/org/springframework/integration/store/AbstractKeyValueMessageStore.java b/spring-integration-core/src/main/java/org/springframework/integration/store/AbstractKeyValueMessageStore.java index 61f386626ff..fbf0fbd1054 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/store/AbstractKeyValueMessageStore.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/store/AbstractKeyValueMessageStore.java @@ -19,9 +19,13 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Iterator; +import java.util.Map; import java.util.UUID; +import org.springframework.beans.DirectFieldAccessor; import org.springframework.integration.Message; +import org.springframework.integration.MessageHeaders; +import org.springframework.integration.support.MessageBuilder; import org.springframework.jmx.export.annotation.ManagedAttribute; import org.springframework.util.Assert; @@ -36,17 +40,18 @@ public abstract class AbstractKeyValueMessageStore extends AbstractMessageGroupS protected static final String MESSAGE_KEY_PREFIX = "MESSAGE_"; protected static final String MESSAGE_GROUP_KEY_PREFIX = "MESSAGE_GROUP_"; + + protected static final String CREATED_DATE = "CREATED_DATE"; // MessageStore methods - + public Message getMessage(UUID id) { - Assert.notNull(id, "'id' must not be null"); - Object message = this.doRetrieve(MESSAGE_KEY_PREFIX + id); - if (message != null) { - Assert.isInstanceOf(Message.class, message); - } - return (Message) message; + Message message = this.getRawMessage(id); + if (message != null){ + return this.normalizeMessage(message); + } + return null; } @SuppressWarnings("unchecked") @@ -54,7 +59,7 @@ public Message addMessage(Message message) { Assert.notNull(message, "'message' must not be null"); UUID messageId = message.getHeaders().getId(); this.doStore(MESSAGE_KEY_PREFIX + messageId, message); - return (Message) this.getMessage(messageId); + return (Message) this.getRawMessage(messageId); } public Message removeMessage(UUID id) { @@ -79,30 +84,9 @@ public long getMessageCount() { * Will create a new instance of SimpleMessageGroup if necessary. */ public MessageGroup getMessageGroup(Object groupId) { - Assert.notNull(groupId, "'groupId' must not be null"); - Object mgm = this.doRetrieve(MESSAGE_GROUP_KEY_PREFIX + groupId); - if (mgm != null) { - Assert.isInstanceOf(MessageGroupMetadata.class, mgm); - MessageGroupMetadata messageGroupMetadata = (MessageGroupMetadata) mgm; - ArrayList> markedMessages = new ArrayList>(); - for (UUID uuid : messageGroupMetadata.getMarkedMessageIds()) { - markedMessages.add(this.getMessage(uuid)); - } - ArrayList> unmarkedMessages = new ArrayList>(); - for (UUID uuid : messageGroupMetadata.getUnmarkedMessageIds()) { - unmarkedMessages.add(this.getMessage(uuid)); - } - SimpleMessageGroup messageGroup = new SimpleMessageGroup(unmarkedMessages, markedMessages, - groupId, messageGroupMetadata.getTimestamp(), messageGroupMetadata.isComplete()); - if (messageGroupMetadata.getLastReleasedMessageSequenceNumber() > 0) { - messageGroup.setLastReleasedMessageSequenceNumber(messageGroupMetadata.getLastReleasedMessageSequenceNumber()); - } - return messageGroup; - } - else { - return new SimpleMessageGroup(groupId); - } + return this.buildMessageGroup(groupId, false); } + /** * Add a Message to the group with the provided group ID. @@ -110,22 +94,26 @@ public MessageGroup getMessageGroup(Object groupId) { public MessageGroup addMessageToGroup(Object groupId, Message message) { Assert.notNull(groupId, "'groupId' must not be null"); Assert.notNull(message, "'message' must not be null"); - SimpleMessageGroup messageGroup = this.getSimpleMessageGroup(this.getMessageGroup(groupId)); + + // add message as is to the MG accessible by the caller + SimpleMessageGroup messageGroup = this.getSimpleMessageGroup(this.getMessageGroup(groupId)); + messageGroup.add(message); - this.doStore(MESSAGE_GROUP_KEY_PREFIX + groupId, new MessageGroupMetadata(messageGroup)); - this.addMessage(message); - return messageGroup; - } - - /** - * Mark all messages in the provided group. - */ - public MessageGroup markMessageGroup(MessageGroup group) { - Assert.notNull(group, "'group' must not be null"); - Object groupId = group.getGroupId(); - SimpleMessageGroup messageGroup = this.getSimpleMessageGroup(group); - messageGroup.markAll(); - this.doStore(MESSAGE_GROUP_KEY_PREFIX + groupId, new MessageGroupMetadata(messageGroup)); + + // enrich Message with additional headers and add it to MS + Message enrichedMessage = this.enrichMessage(message); + + this.addMessage(enrichedMessage); + + // build raw MessageGroup and add enriched Message to it + SimpleMessageGroup rawGroup = this.buildMessageGroup(groupId, true); + + rawGroup.add(enrichedMessage); + + // store MessageGroupMetadata built from enriched MG + this.doStore(MESSAGE_GROUP_KEY_PREFIX + groupId, new MessageGroupMetadata(rawGroup)); + + // return clean MG return messageGroup; } @@ -135,27 +123,30 @@ public MessageGroup markMessageGroup(MessageGroup group) { public MessageGroup removeMessageFromGroup(Object groupId, Message messageToRemove) { Assert.notNull(groupId, "'groupId' must not be null"); Assert.notNull(messageToRemove, "'messageToRemove' must not be null"); - SimpleMessageGroup messageGroup = this.getSimpleMessageGroup(this.getMessageGroup(groupId)); - messageGroup.remove(messageToRemove); - this.doStore(MESSAGE_GROUP_KEY_PREFIX + groupId, new MessageGroupMetadata(messageGroup)); - return messageGroup; - } + + // build raw MG + SimpleMessageGroup rawGroup = this.buildMessageGroup(groupId, true); + + // create a clean instance of + SimpleMessageGroup messageGroup = this.normalizeSimpleMessageGroup(rawGroup); + + for (Message message : rawGroup.getMessages()) { + if (message.getHeaders().getId().equals(messageToRemove.getHeaders().getId())){ + rawGroup.remove(message); + } + } + this.removeMessage(messageToRemove.getHeaders().getId()); - /** - * Mark the given Message within the group corresponding to the provided group ID. - */ - public MessageGroup markMessageFromGroup(Object groupId, Message messageToMark) { - Assert.notNull(groupId, "'groupId' must not be null"); - Assert.notNull(messageToMark, "'messageToMark' must not be null"); - SimpleMessageGroup messageGroup = this.getSimpleMessageGroup(this.getMessageGroup(groupId)); - messageGroup.mark(messageToMark); - this.doStore(MESSAGE_GROUP_KEY_PREFIX + groupId, new MessageGroupMetadata(messageGroup)); + this.doStore(MESSAGE_GROUP_KEY_PREFIX + groupId, new MessageGroupMetadata(rawGroup)); + messageGroup = this.getSimpleMessageGroup(this.getMessageGroup(groupId)); + return messageGroup; } + public void completeGroup(Object groupId) { Assert.notNull(groupId, "'groupId' must not be null"); - SimpleMessageGroup messageGroup = this.getSimpleMessageGroup(this.getMessageGroup(groupId)); + SimpleMessageGroup messageGroup = this.buildMessageGroup(this.getMessageGroup(groupId), true); messageGroup.complete(); this.doStore(MESSAGE_GROUP_KEY_PREFIX + groupId, new MessageGroupMetadata(messageGroup)); } @@ -169,26 +160,98 @@ public void removeMessageGroup(Object groupId) { if (mgm != null) { Assert.isInstanceOf(MessageGroupMetadata.class, mgm); MessageGroupMetadata messageGroupMetadata = (MessageGroupMetadata) mgm; - for (UUID messageId : messageGroupMetadata.getMarkedMessageIds()) { - this.removeMessage(messageId); - } - for (UUID messageId : messageGroupMetadata.getUnmarkedMessageIds()) { - this.removeMessage(messageId); + + Iterator messageIds = messageGroupMetadata.messageIdIterator(); + while (messageIds.hasNext()){ + this.removeMessage(messageIds.next()); } } } public void setLastReleasedSequenceNumberForGroup(Object groupId, int sequenceNumber) { Assert.notNull(groupId, "'groupId' must not be null"); - SimpleMessageGroup messageGroup = this.getSimpleMessageGroup(this.getMessageGroup(groupId)); + SimpleMessageGroup messageGroup = this.buildMessageGroup(groupId, true); messageGroup.setLastReleasedMessageSequenceNumber(sequenceNumber); this.doStore(MESSAGE_GROUP_KEY_PREFIX + groupId, new MessageGroupMetadata(messageGroup)); } + + public Message pollMessageFromGroup(Object groupId) { + Assert.notNull(groupId, "'groupId' must not be null"); + Object mgm = this.doRetrieve(MESSAGE_GROUP_KEY_PREFIX + groupId); + if (mgm != null) { + Assert.isInstanceOf(MessageGroupMetadata.class, mgm); + MessageGroupMetadata messageGroupMetadata = (MessageGroupMetadata) mgm; + + Message message = this.removeMessage(messageGroupMetadata.firstId()); + Message normalizedMessage = this.normalizeMessage(message); + return normalizedMessage; + } + return null; + } public Iterator iterator() { final Iterator idIterator = this.doListKeys(MESSAGE_GROUP_KEY_PREFIX + "*").iterator(); return new MessageGroupIterator(idIterator); } + + protected abstract Object doRetrieve(Object id); + + protected abstract void doStore(Object id, Object objectToStore); + + protected abstract Object doRemove(Object id); + + protected abstract Collection doListKeys(String keyPattern); + + @SuppressWarnings({ "rawtypes", "unchecked" }) + private Message normalizeMessage(Message message){ + Message normalizedMessage = MessageBuilder.fromMessage(message).removeHeader("CREATED_DATE").build(); + Map innerMap = (Map) new DirectFieldAccessor(normalizedMessage.getHeaders()).getPropertyValue("headers"); + innerMap.put(MessageHeaders.ID, message.getHeaders().getId()); + innerMap.put(MessageHeaders.TIMESTAMP, message.getHeaders().getTimestamp()); + return normalizedMessage; + } + + /** + * Will enrich Message with additional meta headers + * @param message + * @return + */ + @SuppressWarnings({ "rawtypes", "unchecked" }) + private Message enrichMessage(Message message){ + Message enrichedMessage = MessageBuilder.fromMessage(message).setHeader(CREATED_DATE, System.currentTimeMillis()).build(); + Map innerMap = (Map) new DirectFieldAccessor(enrichedMessage.getHeaders()).getPropertyValue("headers"); + innerMap.put(MessageHeaders.ID, message.getHeaders().getId()); + innerMap.put(MessageHeaders.TIMESTAMP, message.getHeaders().getTimestamp()); + return enrichedMessage; + } + + private SimpleMessageGroup buildMessageGroup(Object groupId, boolean raw){ + Assert.notNull(groupId, "'groupId' must not be null"); + Object mgm = this.doRetrieve(MESSAGE_GROUP_KEY_PREFIX + groupId); + if (mgm != null) { + Assert.isInstanceOf(MessageGroupMetadata.class, mgm); + MessageGroupMetadata messageGroupMetadata = (MessageGroupMetadata) mgm; + ArrayList> messages = new ArrayList>(); + + Iterator messageIds = messageGroupMetadata.messageIdIterator(); + while (messageIds.hasNext()){ + if (raw){ + messages.add(this.getRawMessage(messageIds.next())); + } + else { + messages.add(this.getMessage(messageIds.next())); + } + } + + SimpleMessageGroup messageGroup = new SimpleMessageGroup(messages, + groupId, messageGroupMetadata.getTimestamp(), messageGroupMetadata.isComplete()); + messageGroup.setLastReleasedMessageSequenceNumber(messageGroupMetadata.getLastReleasedMessageSequenceNumber()); + return messageGroup; + } + else { + return new SimpleMessageGroup(groupId); + } + } private SimpleMessageGroup getSimpleMessageGroup(MessageGroup messageGroup){ if (messageGroup instanceof SimpleMessageGroup){ @@ -198,15 +261,22 @@ private SimpleMessageGroup getSimpleMessageGroup(MessageGroup messageGroup){ return new SimpleMessageGroup(messageGroup); } } - - protected abstract Object doRetrieve(Object id); - protected abstract void doStore(Object id, Object objectToStore); + private SimpleMessageGroup normalizeSimpleMessageGroup(SimpleMessageGroup messageGroup){ + SimpleMessageGroup normalizedGroup = new SimpleMessageGroup(messageGroup.getGroupId()); + for (Message message : messageGroup.getMessages()) { + Message normailizedMessage = this.normalizeMessage(message); + normalizedGroup.add(normailizedMessage); + } + return normalizedGroup; + } - protected abstract Object doRemove(Object id); - - protected abstract Collection doListKeys(String keyPattern); - + private Message getRawMessage(UUID id) { + Assert.notNull(id, "'id' must not be null"); + Object message = this.doRetrieve(MESSAGE_KEY_PREFIX + id); + Assert.isInstanceOf(Message.class, message); + return (Message) message; + } private class MessageGroupIterator implements Iterator { diff --git a/spring-integration-core/src/main/java/org/springframework/integration/store/AbstractMessageGroupStore.java b/spring-integration-core/src/main/java/org/springframework/integration/store/AbstractMessageGroupStore.java index 0336f7a0f42..c55632ffd83 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/store/AbstractMessageGroupStore.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/store/AbstractMessageGroupStore.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2010 the original author or authors. + * Copyright 2002-2011 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at @@ -22,6 +22,7 @@ /** * @author Dave Syer + * @author Oleg Zhurakousky * * @since 2.0 * @@ -76,15 +77,6 @@ public int getMessageCountForAllMessageGroups() { return count; } - @ManagedAttribute - public int getMarkedMessageCountForAllMessageGroups() { - int count = 0; - for (MessageGroup group : this) { - count += group.getMarked().size(); - } - return count; - } - @ManagedAttribute public int getMessageGroupCount() { int count = 0; @@ -112,7 +104,6 @@ private void expire(MessageGroup group) { if (exception != null) { throw exception; } - } } \ No newline at end of file diff --git a/spring-integration-core/src/main/java/org/springframework/integration/store/MessageGroup.java b/spring-integration-core/src/main/java/org/springframework/integration/store/MessageGroup.java index 43e9f292632..b479536cc8c 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/store/MessageGroup.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/store/MessageGroup.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2010 the original author or authors. + * Copyright 2002-2011 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -21,12 +21,13 @@ import org.springframework.integration.Message; /** - * A group of messages that are correlated with each other and should be processed in the same context. The group is - * divided into marked and unmarked messages. The marked messages are typically already processed, the unmarked messages - * are to be processed in the future. + * A group of messages that are correlated with each other and should be processed in the same context. *

* The message group allows implementations to be mutable, but this behavior is optional. Implementations should take * care to document their thread safety and mutability. + * + * @author Dave Syer + * @author Oleg Zhurakousky */ public interface MessageGroup { @@ -36,14 +37,9 @@ public interface MessageGroup { boolean canAdd(Message message); /** - * @return unmarked messages in the group at time of the invocation + * Returns all available Messages from the group at the time of invocation */ - Collection> getUnmarked(); - - /** - * @return marked messages in the group at the time of the invocation - */ - Collection> getMarked(); + Collection> getMessages(); /** * @return the key that links these messages together @@ -71,7 +67,7 @@ public interface MessageGroup { int getSequenceSize(); /** - * @return the total number of messages (marked and unmarked) in this group + * @return the total number of messages in this group */ int size(); diff --git a/spring-integration-core/src/main/java/org/springframework/integration/store/MessageGroupMetadata.java b/spring-integration-core/src/main/java/org/springframework/integration/store/MessageGroupMetadata.java index 4a7413e3e8a..469dad2b77b 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/store/MessageGroupMetadata.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/store/MessageGroupMetadata.java @@ -17,9 +17,8 @@ package org.springframework.integration.store; import java.io.Serializable; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; +import java.util.Iterator; +import java.util.TreeMap; import java.util.UUID; import org.springframework.integration.Message; @@ -34,13 +33,12 @@ public class MessageGroupMetadata implements Serializable{ private static final long serialVersionUID = 1L; - + + private final static String CREATED_DATE = "CREATED_DATE"; private final Object groupId; - private final List markedMessageIds; - - private final List unmarkedMessageIds; + private final TreeMap messageCreationDateToIdMappings; private final boolean complete; @@ -48,17 +46,16 @@ public class MessageGroupMetadata implements Serializable{ private final int lastReleasedMessageSequenceNumber; - public MessageGroupMetadata(MessageGroup messageGroup) { + Assert.notNull(messageGroup, "'messageGroup' must not be null"); this.groupId = messageGroup.getGroupId(); - this.markedMessageIds = new ArrayList(); - for (Message message : messageGroup.getMarked()) { - this.markedMessageIds.add(message.getHeaders().getId()); - } - this.unmarkedMessageIds = new ArrayList(); - for (Message message : messageGroup.getUnmarked()) { - this.unmarkedMessageIds.add(message.getHeaders().getId()); + this.messageCreationDateToIdMappings = new TreeMap(); + + for (Message message : messageGroup.getMessages()) { + Long createdDate = (Long) message.getHeaders().get(CREATED_DATE); + Assert.notNull(createdDate > 0, CREATED_DATE + " must not be null"); + this.messageCreationDateToIdMappings.put(createdDate, message.getHeaders().getId()); } this.complete = messageGroup.isComplete(); this.timestamp = messageGroup.getTimestamp(); @@ -69,13 +66,13 @@ public MessageGroupMetadata(MessageGroup messageGroup) { public Object getGroupId() { return this.groupId; } - - public List getMarkedMessageIds() { - return Collections.unmodifiableList(markedMessageIds); + + public Iterator messageIdIterator(){ + return this.messageCreationDateToIdMappings.values().iterator(); } - - public List getUnmarkedMessageIds() { - return Collections.unmodifiableList(this.unmarkedMessageIds); + + public UUID firstId(){ + return messageCreationDateToIdMappings.firstEntry().getValue(); } public boolean isComplete() { diff --git a/spring-integration-core/src/main/java/org/springframework/integration/store/MessageGroupQueue.java b/spring-integration-core/src/main/java/org/springframework/integration/store/MessageGroupQueue.java index efd0d320fb0..ffa664db283 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/store/MessageGroupQueue.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/store/MessageGroupQueue.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2008 the original author or authors. + * Copyright 2002-2011 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -71,11 +71,11 @@ public void setStoreLock(Object storeLock) { } public Iterator> iterator() { - return getUnmarked().iterator(); + return getMessages().iterator(); } public int size() { - return getUnmarked().size(); + return this.messageGroupStore.getMessageGroup(groupId).size(); } public boolean offer(Message e) { @@ -92,22 +92,17 @@ public boolean offer(Message e) { } public Message peek() { - Collection> unmarked = getUnmarked(); - if (unmarked.isEmpty()) { + Collection> messages = getMessages(); + if (messages.isEmpty()) { return null; } - return unmarked.iterator().next(); + return messages.iterator().next(); } public Message poll() { - Message result; + Message result = null; synchronized (storeLock) { - Collection> unmarked = getUnmarked(); - if (unmarked.isEmpty()) { - return null; - } - result = unmarked.iterator().next(); - messageGroupStore.removeMessageFromGroup(groupId, result); + result = this.messageGroupStore.pollMessageFromGroup(groupId); } synchronized (writeLock) { writeLock.notifyAll(); @@ -116,26 +111,24 @@ public Message poll() { } public int drainTo(Collection> c) { - Collection> unmarked; synchronized (storeLock) { - unmarked = getUnmarked(); - c.addAll(unmarked); - messageGroupStore.markMessageGroup(messageGroupStore.getMessageGroup(groupId)); + for (Message message = this.messageGroupStore.pollMessageFromGroup(groupId); message != null;) { + c.add(message); + } } synchronized (writeLock) { writeLock.notifyAll(); } - return unmarked.size(); + return this.messageGroupStore.getMessageGroup(groupId).size(); } public int drainTo(Collection> c, int maxElements) { ArrayList> list = new ArrayList>(); synchronized (storeLock) { - Iterator> unmarked = getUnmarked().iterator(); - for (int i = 0; i < maxElements && unmarked.hasNext(); i++) { - Message message = unmarked.next(); - messageGroupStore.removeMessageFromGroup(groupId, message); - list.add(message); + Message message = this.messageGroupStore.pollMessageFromGroup(groupId); + for (int i = 0; i < maxElements && message != null; i++) { + list.add(message); + message = this.messageGroupStore.pollMessageFromGroup(groupId); } } synchronized (writeLock) { @@ -195,8 +188,8 @@ public Message take() throws InterruptedException { return message; } - private Collection> getUnmarked() { - return messageGroupStore.getMessageGroup(groupId).getUnmarked(); + private Collection> getMessages(){ + return messageGroupStore.getMessageGroup(groupId).getMessages(); } } diff --git a/spring-integration-core/src/main/java/org/springframework/integration/store/MessageGroupStore.java b/spring-integration-core/src/main/java/org/springframework/integration/store/MessageGroupStore.java index ac115027a5e..cc988485b70 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/store/MessageGroupStore.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/store/MessageGroupStore.java @@ -37,17 +37,6 @@ public interface MessageGroupStore { */ @ManagedAttribute int getMessageCountForAllMessageGroups(); - - /** - * Optional attribute giving the number of marked messages in the store for all groups. Implementations may decline - * to respond by throwing an exception. - * - * @return the number of marked messages in each group - * @throws UnsupportedOperationException if not implemented - */ - @ManagedAttribute - int getMarkedMessageCountForAllMessageGroups(); - /** * Optional attribute giving the number of message groups. Implementations may decline * to respond by throwing an exception. @@ -73,15 +62,7 @@ public interface MessageGroupStore { * @param message a message */ MessageGroup addMessageToGroup(Object groupId, Message message); - - /** - * Persist the mark on all the messages from the group. The group is modified in the process as all its unmarked - * messages become marked. - * - * @param group a MessageGroup with no unmarked messages - */ - MessageGroup markMessageGroup(MessageGroup group); - + /** * Persist a deletion on a single message from the group. The group is modified to reflect that 'messageToRemove' is * no longer present in the group. @@ -90,14 +71,6 @@ public interface MessageGroupStore { */ MessageGroup removeMessageFromGroup(Object key, Message messageToRemove); - /** - * Persist a mark on a single message from the group. The group is modified to reflect that 'messageToMark' is no - * longer unmarked but became marked instead. - * @param key the groupId for the group containing the message - * @param messageToMark the message to be marked - */ - MessageGroup markMessageFromGroup(Object key, Message messageToMark); - /** * Remove the message group with this id. * @@ -136,6 +109,13 @@ public interface MessageGroupStore { */ Iterator iterator(); + + /** + * Polls Message from this {@link MessageGroup} (in FIFO style if supported by the implementation) + * while also removing the polled {@link Message} + */ + Message pollMessageFromGroup(Object groupId); + /** * Completes this MessageGroup. Completion of the MessageGroup generally means * that this group should not be allowing any more mutating operation to be performed on it. diff --git a/spring-integration-core/src/main/java/org/springframework/integration/store/SimpleMessageGroup.java b/spring-integration-core/src/main/java/org/springframework/integration/store/SimpleMessageGroup.java index 90c4de4068e..92fbe3eb83d 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/store/SimpleMessageGroup.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/store/SimpleMessageGroup.java @@ -32,15 +32,8 @@ public class SimpleMessageGroup implements MessageGroup { private final Object groupId; - - // Guards(marked, unmarked) - private final Object lock = new Object(); - - // @GuardedBy(lock) - public final BlockingQueue> marked = new LinkedBlockingQueue>(); - - // @GuardedBy(lock) - public final BlockingQueue> unmarked = new LinkedBlockingQueue>(); + + public final BlockingQueue> messages = new LinkedBlockingQueue>(); private volatile int lastReleasedMessageSequence; @@ -49,50 +42,26 @@ public class SimpleMessageGroup implements MessageGroup { private volatile boolean complete; public SimpleMessageGroup(Object groupId) { - this(Collections.> emptyList(), Collections.> emptyList(), groupId, System - .currentTimeMillis(), false); + this(Collections.> emptyList(), groupId, System.currentTimeMillis(), false); } - public SimpleMessageGroup(Collection> unmarked, Object groupId) { - this(unmarked, Collections.> emptyList(), groupId, System.currentTimeMillis(), false); + public SimpleMessageGroup(Collection> messages, Object groupId) { + this(messages, groupId, System.currentTimeMillis(), false); } - public SimpleMessageGroup(Collection> unmarked, Collection> marked, - Object groupId, long timestamp, boolean complete) { + public SimpleMessageGroup(Collection> messages, Object groupId, long timestamp, boolean complete) { this.groupId = groupId; this.timestamp = timestamp; this.complete = complete; - synchronized (lock) { - for (Message message : unmarked) { - addUnmarked(message); - } - for (Message message : marked) { - addMarked(message); - } + for (Message message : messages) { + addMessage(message); } } - public SimpleMessageGroup(MessageGroup template) { - this.groupId = template.getGroupId(); - this.complete = template.isComplete(); - synchronized (lock) { - // Explicit iteration to work around bug in JDK (before 1.6.0_20 - for (Message message : template.getMarked()) { - if (message != null) { - this.marked.add(message); - } - } - for (Message message : template.getUnmarked()) { - if (message != null) { - this.unmarked.add(message); - } - } - } - this.timestamp = template.getTimestamp(); + public SimpleMessageGroup(MessageGroup messageGroup) { + this(messageGroup.getMessages(), messageGroup.getGroupId(), messageGroup.getTimestamp(), messageGroup.isComplete()); } - - public long getTimestamp() { return timestamp; } @@ -102,54 +71,29 @@ public boolean canAdd(Message message) { } public void add(Message message) { - addUnmarked(message); + addMessage(message); } public void remove(Message message) { - synchronized (lock) { - marked.remove(message); - unmarked.remove(message); - } + messages.remove(message); } public int getLastReleasedMessageSequenceNumber() { return lastReleasedMessageSequence; } - private boolean addUnmarked(Message message) { - if (isMember(message)) { - return false; - } - synchronized (lock) { - return this.unmarked.offer(message); - } - } - - private boolean addMarked(Message message) { - if (isMember(message)) { - return false; - } - synchronized (lock) { - return this.marked.offer(message); - } + private boolean addMessage(Message message) { + return this.messages.offer(message); } - public Collection> getUnmarked() { - synchronized (lock) { - return Collections.unmodifiableCollection(unmarked); - } + public Collection> getMessages() { + return Collections.unmodifiableCollection(messages); } public void setLastReleasedMessageSequenceNumber(int sequenceNumber){ this.lastReleasedMessageSequence = sequenceNumber; } - public Collection> getMarked() { - synchronized (lock) { - return Collections.unmodifiableCollection(marked); - } - } - public Object getGroupId() { return groupId; } @@ -169,39 +113,17 @@ public int getSequenceSize() { return getOne().getHeaders().getSequenceSize(); } - /** - * Mark the given message in this group. If the message is not part of this group then this call has no effect. - */ - public void mark(Message messageToMark) { - synchronized (lock) { - unmarked.remove(messageToMark); - marked.offer(messageToMark); - } - } - - public void markAll() { - synchronized (lock) { - unmarked.drainTo(marked); - } - } - public int size() { - synchronized (lock) { - return marked.size() + unmarked.size(); - } + return this.messages.size(); } public Message getOne() { - Message one = unmarked.peek(); - if (one == null) { - one = marked.peek(); - } + Message one = messages.peek(); return one; } public void clear(){ - this.marked.clear(); - this.unmarked.clear(); + this.messages.clear(); } /** @@ -220,12 +142,7 @@ private boolean isMember(Message message) { return true; } else { - synchronized (lock) { - if (containsSequenceNumber(unmarked, messageSequenceNumber) - || containsSequenceNumber(marked, messageSequenceNumber)) { - return true; - } - } + return this.containsSequenceNumber(messages, messageSequenceNumber); } } return false; @@ -245,10 +162,8 @@ private boolean containsSequenceNumber(Collection> messages, Integer public String toString() { return "SimpleMessageGroup{" + "groupId=" + groupId + - ", lock=" + lock + - ", marked=" + marked + - ", unmarked=" + unmarked + + ", messages=" + messages + ", timestamp=" + timestamp + '}'; } -} +} \ No newline at end of file diff --git a/spring-integration-core/src/main/java/org/springframework/integration/store/SimpleMessageStore.java b/spring-integration-core/src/main/java/org/springframework/integration/store/SimpleMessageStore.java index 8b1560073f9..c032c58e3ff 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/store/SimpleMessageStore.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/store/SimpleMessageStore.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2009 the original author or authors. + * Copyright 2002-2011 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at @@ -13,6 +13,7 @@ package org.springframework.integration.store; +import java.util.Collection; import java.util.HashSet; import java.util.Iterator; import java.util.UUID; @@ -25,14 +26,16 @@ import org.springframework.jmx.export.annotation.ManagedAttribute; import org.springframework.jmx.export.annotation.ManagedResource; import org.springframework.util.Assert; +import org.springframework.util.CollectionUtils; /** - * Map-based implementation of {@link MessageStore} and {@link MessageGroupStore}. Enforces a maximum capacity for the + * Map-based in-memory implementation of {@link MessageStore} and {@link MessageGroupStore}. Enforces a maximum capacity for the * store. * * @author Iwein Fuld * @author Mark Fisher * @author Dave Syer + * @author Oleg Zhurakousky * * @since 2.0 */ @@ -42,10 +45,12 @@ public class SimpleMessageStore extends AbstractMessageGroupStore implements Mes private final ConcurrentMap> idToMessage; private final ConcurrentMap groupIdToMessageGroup; - + private final UpperBound individualUpperBound; private final UpperBound groupUpperBound; + + private final Object lock = new Object(); /** * Creates a SimpleMessageStore with a maximum size limited by the given capacity, or unlimited size if the given @@ -121,19 +126,14 @@ public MessageGroup addMessageToGroup(Object groupId, Message message) { return group; } - public MessageGroup markMessageGroup(MessageGroup group) { - Object groupId = group.getGroupId(); - SimpleMessageGroup internal = getMessageGroupInternal(groupId); - internal.markAll(); - return internal; - } - public void removeMessageGroup(Object groupId) { - if (!groupIdToMessageGroup.containsKey(groupId)) { - return; + synchronized (lock) { + if (!groupIdToMessageGroup.containsKey(groupId)) { + return; + } + groupUpperBound.release(groupIdToMessageGroup.get(groupId).size()); + groupIdToMessageGroup.remove(groupId); } - groupUpperBound.release(groupIdToMessageGroup.get(groupId).size()); - groupIdToMessageGroup.remove(groupId); } public MessageGroup removeMessageFromGroup(Object key, Message messageToRemove) { @@ -142,12 +142,6 @@ public MessageGroup removeMessageFromGroup(Object key, Message messageToRemov return group; } - public MessageGroup markMessageFromGroup(Object key, Message messageToMark) { - SimpleMessageGroup group = getMessageGroupInternal(key); - group.mark(messageToMark); - return group; - } - public Iterator iterator() { return new HashSet(groupIdToMessageGroup.values()).iterator(); } @@ -157,6 +151,23 @@ public void setLastReleasedSequenceNumberForGroup(Object groupId, int sequenceNu group.setLastReleasedMessageSequenceNumber(sequenceNumber); } + public void completeGroup(Object groupId) { + SimpleMessageGroup group = getMessageGroupInternal(groupId); + group.complete(); + } + + public Message pollMessageFromGroup(Object groupId) { + Collection> messageList = this.getMessageGroup(groupId).getMessages(); + Message message = null; + if (!CollectionUtils.isEmpty(messageList)){ + message = messageList.iterator().next(); + if (message != null){ + this.removeMessageFromGroup(groupId, message); + } + } + return message; + } + private SimpleMessageGroup getMessageGroupInternal(Object groupId) { if (!groupIdToMessageGroup.containsKey(groupId)) { groupIdToMessageGroup.putIfAbsent(groupId, new SimpleMessageGroup(groupId)); @@ -164,9 +175,4 @@ private SimpleMessageGroup getMessageGroupInternal(Object groupId) { return groupIdToMessageGroup.get(groupId); } - public void completeGroup(Object groupId) { - SimpleMessageGroup group = getMessageGroupInternal(groupId); - group.complete(); - } - } diff --git a/spring-integration-core/src/main/java/org/springframework/integration/util/UpperBound.java b/spring-integration-core/src/main/java/org/springframework/integration/util/UpperBound.java index 0cc23352b28..f65376d5735 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/util/UpperBound.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/util/UpperBound.java @@ -39,6 +39,13 @@ public final class UpperBound { public UpperBound(int capacity) { this.semaphore = (capacity > 0) ? new Semaphore(capacity, true) : null; } + + public int availablePermits(){ + if (semaphore == null){ + return Integer.MAX_VALUE; + } + return this.semaphore.availablePermits(); + } /** diff --git a/spring-integration-core/src/test/java/org/springframework/integration/aggregator/AggregatorTests.java b/spring-integration-core/src/test/java/org/springframework/integration/aggregator/AggregatorTests.java index 1cd83b08b93..73d7f19cb11 100644 --- a/spring-integration-core/src/test/java/org/springframework/integration/aggregator/AggregatorTests.java +++ b/spring-integration-core/src/test/java/org/springframework/integration/aggregator/AggregatorTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2010 the original author or authors. + * Copyright 2002-2011 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at @@ -130,7 +130,7 @@ public void testMultipleGroupsSimultaneously() throws InterruptedException { public void testTrackedCorrelationIdsCapacityAtLimit() { QueueChannel replyChannel = new QueueChannel(); QueueChannel discardChannel = new QueueChannel(); - // this.aggregator.setTrackedCorrelationIdCapacity(3); + this.aggregator.setDiscardChannel(discardChannel); this.aggregator.handleMessage(createMessage(1, 1, 1, 1, replyChannel, null)); assertEquals(1, replyChannel.receive(100).getPayload()); @@ -149,7 +149,7 @@ public void testTrackedCorrelationIdsCapacityAtLimit() { public void testTrackedCorrelationIdsCapacityPassesLimit() { QueueChannel replyChannel = new QueueChannel(); QueueChannel discardChannel = new QueueChannel(); - // this.aggregator.setTrackedCorrelationIdCapacity(3); + this.aggregator.setDiscardChannel(discardChannel); this.aggregator.handleMessage(createMessage(1, 1, 1, 1, replyChannel, null)); assertEquals(1, replyChannel.receive(100).getPayload()); @@ -238,7 +238,7 @@ private static Message createMessage(Object payload, Object correlationId, in private class MultiplyingProcessor implements MessageGroupProcessor { public Object processMessageGroup(MessageGroup group) { Integer product = 1; - for (Message message : group.getUnmarked()) { + for (Message message : group.getMessages()) { product *= (Integer) message.getPayload(); } return product; diff --git a/spring-integration-core/src/test/java/org/springframework/integration/aggregator/ConcurrentAggregatorTests.java b/spring-integration-core/src/test/java/org/springframework/integration/aggregator/ConcurrentAggregatorTests.java index bacb102732e..bc85fb573c5 100644 --- a/spring-integration-core/src/test/java/org/springframework/integration/aggregator/ConcurrentAggregatorTests.java +++ b/spring-integration-core/src/test/java/org/springframework/integration/aggregator/ConcurrentAggregatorTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2010 the original author or authors. + * Copyright 2002-2011 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -350,11 +350,10 @@ public void run() { private class MultiplyingProcessor implements MessageGroupProcessor { public Object processMessageGroup(MessageGroup group) { Integer product = 1; - for (Message message : group.getUnmarked()) { + for (Message message : group.getMessages()) { product *= (Integer) message.getPayload(); } return product; - //messagingTemplate.send(outputChannel, MessageBuilder.withPayload(product).build()); } } diff --git a/spring-integration-core/src/test/java/org/springframework/integration/aggregator/CorrelatingMessageBarrierTests.java b/spring-integration-core/src/test/java/org/springframework/integration/aggregator/CorrelatingMessageBarrierTests.java index c0c98b19247..24a53253649 100644 --- a/spring-integration-core/src/test/java/org/springframework/integration/aggregator/CorrelatingMessageBarrierTests.java +++ b/spring-integration-core/src/test/java/org/springframework/integration/aggregator/CorrelatingMessageBarrierTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2010 the original author or authors. + * Copyright 2002-2011 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at @@ -122,10 +122,8 @@ private static class OneMessagePerKeyReleaseStrategy implements ReleaseStrategy private final ConcurrentMap keyLocks = new ConcurrentHashMap(); public boolean canRelease(MessageGroup messageGroup) { - // System.out.println("Trying to release group: " + messageGroup + "\n to thread: " + Thread.currentThread()); Object correlationKey = messageGroup.getGroupId(); Semaphore lock = lockForKey(correlationKey); - // System.out.println(Thread.currentThread() + " got lock: " + lock); return lock.tryAcquire(); } diff --git a/spring-integration-core/src/test/java/org/springframework/integration/aggregator/ExpressionEvaluatingMessageGroupProcessorTests.java b/spring-integration-core/src/test/java/org/springframework/integration/aggregator/ExpressionEvaluatingMessageGroupProcessorTests.java index 9bcfded2c17..6332503f555 100644 --- a/spring-integration-core/src/test/java/org/springframework/integration/aggregator/ExpressionEvaluatingMessageGroupProcessorTests.java +++ b/spring-integration-core/src/test/java/org/springframework/integration/aggregator/ExpressionEvaluatingMessageGroupProcessorTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2010 the original author or authors. + * Copyright 2002-2011 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -60,7 +60,7 @@ public void setup() { @Test public void testProcessAndSendWithSizeExpressionEvaluated() throws Exception { - when(group.getUnmarked()).thenReturn(messages); + when(group.getMessages()).thenReturn(messages); processor = new ExpressionEvaluatingMessageGroupProcessor("#root.size()"); Object result = processor.processMessageGroup(group); assertTrue(result instanceof Message); @@ -70,7 +70,7 @@ public void testProcessAndSendWithSizeExpressionEvaluated() throws Exception { @Test public void testProcessAndCheckHeaders() throws Exception { - when(group.getUnmarked()).thenReturn(messages); + when(group.getMessages()).thenReturn(messages); processor = new ExpressionEvaluatingMessageGroupProcessor("#root"); Object result = processor.processMessageGroup(group); assertTrue(result instanceof Message); @@ -80,7 +80,7 @@ public void testProcessAndCheckHeaders() throws Exception { @Test public void testProcessAndSendWithProjectionExpressionEvaluated() throws Exception { - when(group.getUnmarked()).thenReturn(messages); + when(group.getMessages()).thenReturn(messages); processor = new ExpressionEvaluatingMessageGroupProcessor("![payload]"); Object result = processor.processMessageGroup(group); assertTrue(result instanceof Message); @@ -97,7 +97,7 @@ public void testProcessAndSendWithProjectionExpressionEvaluated() throws Excepti @Test public void testProcessAndSendWithFilterAndProjectionExpressionEvaluated() throws Exception { - when(group.getUnmarked()).thenReturn(messages); + when(group.getMessages()).thenReturn(messages); processor = new ExpressionEvaluatingMessageGroupProcessor("?[payload>2].![payload]"); Object result = processor.processMessageGroup(group); assertTrue(result instanceof Message); @@ -112,7 +112,7 @@ public void testProcessAndSendWithFilterAndProjectionExpressionEvaluated() throw @Test public void testProcessAndSendWithFilterAndProjectionAndMethodInvokingExpressionEvaluated() throws Exception { - when(group.getUnmarked()).thenReturn(messages); + when(group.getMessages()).thenReturn(messages); processor = new ExpressionEvaluatingMessageGroupProcessor(String.format("T(%s).sum(?[payload>2].![payload])", getClass().getName())); Object result = processor.processMessageGroup(group); diff --git a/spring-integration-core/src/test/java/org/springframework/integration/aggregator/MethodInvokingMessageGroupProcessorTests.java b/spring-integration-core/src/test/java/org/springframework/integration/aggregator/MethodInvokingMessageGroupProcessorTests.java index 98e7c62bbe9..1751755686b 100644 --- a/spring-integration-core/src/test/java/org/springframework/integration/aggregator/MethodInvokingMessageGroupProcessorTests.java +++ b/spring-integration-core/src/test/java/org/springframework/integration/aggregator/MethodInvokingMessageGroupProcessorTests.java @@ -92,7 +92,7 @@ public String know(List flags) { } MessageGroupProcessor processor = new MethodInvokingMessageGroupProcessor(new AnnotatedAggregatorMethod()); - when(messageGroupMock.getUnmarked()).thenReturn(messagesUpForProcessing); + when(messageGroupMock.getMessages()).thenReturn(messagesUpForProcessing); Object result = processor.processMessageGroup(messageGroupMock); assertThat((Integer) ((Message) result).getPayload(), is(7)); } @@ -112,7 +112,7 @@ public Integer and(List flags) { } MessageGroupProcessor processor = new MethodInvokingMessageGroupProcessor(new SimpleAggregator()); - when(messageGroupMock.getUnmarked()).thenReturn(messagesUpForProcessing); + when(messageGroupMock.getMessages()).thenReturn(messagesUpForProcessing); Object result = processor.processMessageGroup(messageGroupMock); assertThat((Integer) ((Message) result).getPayload(), is(7)); } @@ -132,7 +132,7 @@ public Integer and(List> flags) { } MessageGroupProcessor processor = new MethodInvokingMessageGroupProcessor(new SimpleAggregator()); - when(messageGroupMock.getUnmarked()).thenReturn(messagesUpForProcessing); + when(messageGroupMock.getMessages()).thenReturn(messagesUpForProcessing); Object result = processor.processMessageGroup(messageGroupMock); assertThat((Integer) ((Message) result).getPayload(), is(7)); } @@ -156,7 +156,7 @@ public String and(@Payloads List flags, @Header("foo") List he MessageGroupProcessor processor = new MethodInvokingMessageGroupProcessor(new SimpleAggregator()); messagesUpForProcessing.add(MessageBuilder.withPayload(3).setHeader("foo", Arrays.asList(101, 102)).build()); - when(messageGroupMock.getUnmarked()).thenReturn(messagesUpForProcessing); + when(messageGroupMock.getMessages()).thenReturn(messagesUpForProcessing); Object result = processor.processMessageGroup(messageGroupMock); assertThat((String) ((Message) result).getPayload(), is("[1, 2, 4, 3, 101, 102]")); } @@ -180,7 +180,7 @@ public String or(List flags) { } MessageGroupProcessor processor = new MethodInvokingMessageGroupProcessor(new SimpleAggregator()); - when(messageGroupMock.getUnmarked()).thenReturn(messagesUpForProcessing); + when(messageGroupMock.getMessages()).thenReturn(messagesUpForProcessing); Object result = processor.processMessageGroup(messageGroupMock); assertThat((String) ((Message) result).getPayload(), is("[1, 2, 4]")); } @@ -200,7 +200,7 @@ public Integer and(Collection flags) { } MessageGroupProcessor processor = new MethodInvokingMessageGroupProcessor(new SimpleAggregator()); - when(messageGroupMock.getUnmarked()).thenReturn(messagesUpForProcessing); + when(messageGroupMock.getMessages()).thenReturn(messagesUpForProcessing); Object result = processor.processMessageGroup(messageGroupMock); assertThat((Integer) ((Message) result).getPayload(), is(7)); } @@ -220,7 +220,7 @@ public Integer and(int[] flags) { } MessageGroupProcessor processor = new MethodInvokingMessageGroupProcessor(new SimpleAggregator()); - when(messageGroupMock.getUnmarked()).thenReturn(messagesUpForProcessing); + when(messageGroupMock.getMessages()).thenReturn(messagesUpForProcessing); Object result = processor.processMessageGroup(messageGroupMock); assertThat((Integer) ((Message) result).getPayload(), is(7)); } @@ -247,7 +247,7 @@ public Iterator convert(ArrayList source) { } }); processor.setConversionService(conversionService); - when(messageGroupMock.getUnmarked()).thenReturn(messagesUpForProcessing); + when(messageGroupMock.getMessages()).thenReturn(messagesUpForProcessing); Object result = processor.processMessageGroup(messageGroupMock); assertThat((Integer) ((Message) result).getPayload(), is(7)); } @@ -276,7 +276,7 @@ public String methodAcceptingNoCollectionShouldBeIgnored(String irrelevant) { } MessageGroupProcessor processor = new MethodInvokingMessageGroupProcessor(new UnannotatedAggregator()); - when(messageGroupMock.getUnmarked()).thenReturn(messagesUpForProcessing); + when(messageGroupMock.getMessages()).thenReturn(messagesUpForProcessing); Object result = processor.processMessageGroup(messageGroupMock); assertThat((Integer) ((Message) result).getPayload(), is(7)); } @@ -302,7 +302,7 @@ public String methodAcceptingNoCollectionShouldBeIgnored(String irrelevant) { } MessageGroupProcessor processor = new MethodInvokingMessageGroupProcessor(new UnannotatedAggregator()); - when(messageGroupMock.getUnmarked()).thenReturn(messagesUpForProcessing); + when(messageGroupMock.getMessages()).thenReturn(messagesUpForProcessing); Object result = processor.processMessageGroup(messageGroupMock); assertTrue(((Message)result).getPayload() instanceof Iterator); } diff --git a/spring-integration-core/src/test/java/org/springframework/integration/aggregator/ResequencingMessageGroupProcessorTest.java b/spring-integration-core/src/test/java/org/springframework/integration/aggregator/ResequencingMessageGroupProcessorTest.java index 47de862cb4c..43eab810517 100644 --- a/spring-integration-core/src/test/java/org/springframework/integration/aggregator/ResequencingMessageGroupProcessorTest.java +++ b/spring-integration-core/src/test/java/org/springframework/integration/aggregator/ResequencingMessageGroupProcessorTest.java @@ -1,3 +1,18 @@ +/* + * Copyright 2002-2011 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.springframework.integration.aggregator; import org.junit.Test; @@ -22,14 +37,14 @@ public class ResequencingMessageGroupProcessorTest { @Test public void shouldProcessSequence() { Message prototypeMessage = MessageBuilder.withPayload("foo").setCorrelationId("x").setSequenceNumber(1).setSequenceSize(3).build(); - List> unmarked= new ArrayList>(); + List> messages= new ArrayList>(); Message message1 = MessageBuilder.fromMessage(prototypeMessage).setSequenceNumber(1).build(); Message message2 = MessageBuilder.fromMessage(prototypeMessage).setSequenceNumber(2).build(); Message message3 = MessageBuilder.fromMessage(prototypeMessage).setSequenceNumber(3).build(); - unmarked.add(message1); - unmarked.add(message2); - unmarked.add(message3); - SimpleMessageGroup group = new SimpleMessageGroup(unmarked,"x"); + messages.add(message1); + messages.add(message2); + messages.add(message3); + SimpleMessageGroup group = new SimpleMessageGroup(messages,"x"); List processedMessages = (List) processor.processMessageGroup(group); assertThat(processedMessages, hasItems(message1, message2, message3)); } @@ -38,14 +53,14 @@ public void shouldProcessSequence() { @Test public void shouldPartiallProcessIncompleteSequence() { Message prototypeMessage = MessageBuilder.withPayload("foo").setCorrelationId("x").setSequenceNumber(1).setSequenceSize(4).build(); - List> unmarked= new ArrayList>(); + List> messages= new ArrayList>(); Message message2 = MessageBuilder.fromMessage(prototypeMessage).setSequenceNumber(4).build(); Message message1 = MessageBuilder.fromMessage(prototypeMessage).setSequenceNumber(1).build(); Message message3 = MessageBuilder.fromMessage(prototypeMessage).setSequenceNumber(3).build(); - unmarked.add(message1); - unmarked.add(message2); - unmarked.add(message3); - SimpleMessageGroup group = new SimpleMessageGroup(unmarked,"x"); + messages.add(message1); + messages.add(message2); + messages.add(message3); + SimpleMessageGroup group = new SimpleMessageGroup(messages,"x"); List processedMessages = (List) processor.processMessageGroup(group); assertThat(processedMessages, hasItems(message1)); assertThat(processedMessages.size(), is(1)); diff --git a/spring-integration-core/src/test/java/org/springframework/integration/aggregator/SequenceSizeReleaseStrategyTests.java b/spring-integration-core/src/test/java/org/springframework/integration/aggregator/SequenceSizeReleaseStrategyTests.java index 3217e47e2be..677e9af49cc 100644 --- a/spring-integration-core/src/test/java/org/springframework/integration/aggregator/SequenceSizeReleaseStrategyTests.java +++ b/spring-integration-core/src/test/java/org/springframework/integration/aggregator/SequenceSizeReleaseStrategyTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2008 the original author or authors. + * Copyright 2002-2011 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -58,13 +58,13 @@ public void testEmptyList() { } @Test - public void testEmptyUnmarked() { + public void testEmptyGroup() { SequenceSizeReleaseStrategy releaseStrategy = new SequenceSizeReleaseStrategy(); releaseStrategy.setReleasePartialSequences(true); SimpleMessageGroup messages = new SimpleMessageGroup("FOO"); Message message = MessageBuilder.withPayload("test1").setSequenceSize(1).build(); messages.add(message); - messages.mark(message); + messages.remove(message); assertTrue(releaseStrategy.canRelease(messages)); } diff --git a/spring-integration-core/src/test/java/org/springframework/integration/aggregator/integration/AggregatorSupportedUseCasesTests.java b/spring-integration-core/src/test/java/org/springframework/integration/aggregator/integration/AggregatorSupportedUseCasesTests.java index 9119ae48b40..6295b39daab 100644 --- a/spring-integration-core/src/test/java/org/springframework/integration/aggregator/integration/AggregatorSupportedUseCasesTests.java +++ b/spring-integration-core/src/test/java/org/springframework/integration/aggregator/integration/AggregatorSupportedUseCasesTests.java @@ -47,15 +47,13 @@ public void waitForAllDefaultReleaseStrategyWithLateArrivals(){ QueueChannel discardChannel = new QueueChannel(); defaultHandler.setOutputChannel(outputChannel); defaultHandler.setDiscardChannel(discardChannel); - defaultHandler.setKeepReleasedMessages(false); for (int i = 0; i < 5; i++) { defaultHandler.handleMessage(MessageBuilder.withPayload(i).setSequenceSize(5).setCorrelationId("A").setSequenceNumber(i).build()); } assertEquals(5, ((List)outputChannel.receive(0).getPayload()).size()); assertNull(discardChannel.receive(0)); - assertEquals(0, store.getMessageGroup("A").getUnmarked().size()); - assertEquals(0, store.getMessageGroup("A").getMarked().size()); + assertEquals(0, store.getMessageGroup("A").getMessages().size()); // send another message with the same correlation id and see it in the discard channel defaultHandler.handleMessage(MessageBuilder.withPayload("foo").setSequenceSize(5).setCorrelationId("A").setSequenceNumber(3).build()); @@ -65,7 +63,7 @@ public void waitForAllDefaultReleaseStrategyWithLateArrivals(){ defaultHandler.setExpireGroupsUponCompletion(true); defaultHandler.handleMessage(MessageBuilder.withPayload("foo").setSequenceSize(5).setCorrelationId("A").setSequenceNumber(3).build()); assertNull(discardChannel.receive(0)); - assertEquals(1, store.getMessageGroup("A").getUnmarked().size()); + assertEquals(1, store.getMessageGroup("A").getMessages().size()); } @Test @@ -75,15 +73,13 @@ public void waitForAllCustomReleaseStrategyWithLateArrivals(){ defaultHandler.setOutputChannel(outputChannel); defaultHandler.setDiscardChannel(discardChannel); defaultHandler.setReleaseStrategy(new SampleSizeReleaseStrategy()); - defaultHandler.setKeepReleasedMessages(false); for (int i = 0; i < 5; i++) { defaultHandler.handleMessage(MessageBuilder.withPayload(i).setCorrelationId("A").build()); } assertEquals(5, ((List)outputChannel.receive(0).getPayload()).size()); assertNull(discardChannel.receive(0)); - assertEquals(0, store.getMessageGroup("A").getUnmarked().size()); - assertEquals(0, store.getMessageGroup("A").getMarked().size()); + assertEquals(0, store.getMessageGroup("A").getMessages().size()); // send another message with the same correlation id and see it in the discard channel defaultHandler.handleMessage(MessageBuilder.withPayload("foo").setCorrelationId("A").build()); @@ -93,7 +89,7 @@ public void waitForAllCustomReleaseStrategyWithLateArrivals(){ defaultHandler.setExpireGroupsUponCompletion(true); defaultHandler.handleMessage(MessageBuilder.withPayload("foo").setCorrelationId("A").build()); assertNull(discardChannel.receive(0)); - assertEquals(1, store.getMessageGroup("A").getUnmarked().size()); + assertEquals(1, store.getMessageGroup("A").getMessages().size()); } @Test @@ -146,13 +142,13 @@ public void batchingWithLeftovers(){ assertEquals(5, ((List)outputChannel.receive(0).getPayload()).size()); assertEquals(5, ((List)outputChannel.receive(0).getPayload()).size()); assertNull(discardChannel.receive(0)); - assertEquals(2, store.getMessageGroup("A").getUnmarked().size()); + assertEquals(2, store.getMessageGroup("A").getMessages().size()); } private class SampleSizeReleaseStrategy implements ReleaseStrategy { public boolean canRelease(MessageGroup group) { - return group.getUnmarked().size() == 5; + return group.getMessages().size() == 5; } } diff --git a/spring-integration-core/src/test/java/org/springframework/integration/aggregator/integration/ResequencerIntegrationTest-context.xml b/spring-integration-core/src/test/java/org/springframework/integration/aggregator/integration/ResequencerIntegrationTest-context.xml index 46a19096558..6b65e31ab3e 100644 --- a/spring-integration-core/src/test/java/org/springframework/integration/aggregator/integration/ResequencerIntegrationTest-context.xml +++ b/spring-integration-core/src/test/java/org/springframework/integration/aggregator/integration/ResequencerIntegrationTest-context.xml @@ -6,8 +6,8 @@ http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd"> - + diff --git a/spring-integration-core/src/test/java/org/springframework/integration/aggregator/integration/ResequencerIntegrationTest.java b/spring-integration-core/src/test/java/org/springframework/integration/aggregator/integration/ResequencerIntegrationTest.java index adea088d127..1e6d531d592 100644 --- a/spring-integration-core/src/test/java/org/springframework/integration/aggregator/integration/ResequencerIntegrationTest.java +++ b/spring-integration-core/src/test/java/org/springframework/integration/aggregator/integration/ResequencerIntegrationTest.java @@ -85,8 +85,7 @@ public void validateUnboundedResequencerLight(){ assertEquals((Integer)6, message6.getHeaders().getSequenceNumber()); - assertEquals(0, store.getMessageGroup("A").getUnmarked().size()); - assertEquals(0, store.getMessageGroup("A").getMarked().size()); + assertEquals(0, store.getMessageGroup("A").getMessages().size()); } @Test @@ -109,7 +108,6 @@ public void validateUnboundedResequencerDeep(){ inputChannel.send(message2); assertNotNull(outputChannel.receive(0)); assertNotNull(outputChannel.receive(0)); - assertEquals(0, store.getMessageGroup("A").getUnmarked().size()); - assertEquals(3, store.getMessageGroup("A").getMarked().size()); + assertEquals(0, store.getMessageGroup("A").getMessages().size()); } } diff --git a/spring-integration-core/src/test/java/org/springframework/integration/config/xml/DefaultConfiguringBeanFactoryPostProcessorTests.java b/spring-integration-core/src/test/java/org/springframework/integration/config/xml/DefaultConfiguringBeanFactoryPostProcessorTests.java index 74d1ebd7cf3..3354e9dc5cd 100644 --- a/spring-integration-core/src/test/java/org/springframework/integration/config/xml/DefaultConfiguringBeanFactoryPostProcessorTests.java +++ b/spring-integration-core/src/test/java/org/springframework/integration/config/xml/DefaultConfiguringBeanFactoryPostProcessorTests.java @@ -16,8 +16,6 @@ package org.springframework.integration.config.xml; -import java.util.Arrays; - import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.beans.DirectFieldAccessor; @@ -79,7 +77,6 @@ public void taskSchedulerNotRegisteredMoreThanOnce() { new ClassPathXmlApplicationContext(new String[]{"org/springframework/integration/config/xml/parentApplicationContext.xml"}, superParentApplicationContext); ClassPathXmlApplicationContext childApplicationContext = new ClassPathXmlApplicationContext(new String[]{"org/springframework/integration/config/xml/childApplicationContext.xml"}, parentApplicationContext); - System.out.println(Arrays.asList(childApplicationContext.getBeanDefinitionNames())); TaskScheduler parentScheduler = childApplicationContext.getParent().getBean("taskScheduler", TaskScheduler.class); TaskScheduler childScheduler = childApplicationContext.getBean("taskScheduler", TaskScheduler.class); diff --git a/spring-integration-core/src/test/java/org/springframework/integration/history/AnnotatedAdapter.java b/spring-integration-core/src/test/java/org/springframework/integration/history/AnnotatedAdapter.java index 7a0add65e7c..e5423b7a45c 100644 --- a/spring-integration-core/src/test/java/org/springframework/integration/history/AnnotatedAdapter.java +++ b/spring-integration-core/src/test/java/org/springframework/integration/history/AnnotatedAdapter.java @@ -32,7 +32,6 @@ public class AnnotatedAdapter implements ApplicationContextAware{ @SuppressWarnings("serial") public void handle(Message message) { - System.out.println("Message: " + message); applicationContext.publishEvent(new ApplicationEvent(message) {}); } diff --git a/spring-integration-core/src/test/java/org/springframework/integration/store/MessageGroupQueueTests.java b/spring-integration-core/src/test/java/org/springframework/integration/store/MessageGroupQueueTests.java index 8f96d4f04ad..dc22fc0c4e9 100644 --- a/spring-integration-core/src/test/java/org/springframework/integration/store/MessageGroupQueueTests.java +++ b/spring-integration-core/src/test/java/org/springframework/integration/store/MessageGroupQueueTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2010 the original author or authors. + * Copyright 2002-2011 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -16,11 +16,6 @@ package org.springframework.integration.store; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertTrue; - import java.util.HashSet; import java.util.Set; import java.util.concurrent.Callable; @@ -32,11 +27,16 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.junit.Ignore; import org.junit.Test; - import org.springframework.integration.Message; import org.springframework.integration.message.GenericMessage; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + /** * @author Dave Syer * @since 2.0 @@ -92,11 +92,13 @@ public void testPutAndTake() throws Exception { } @Test + @Ignore public void testConcurrentAccess() throws Exception { doTestConcurrentAccess(50, 20, new HashSet()); } @Test + @Ignore public void testConcurrentAccessUniqueResults() throws Exception { doTestConcurrentAccess(50, 20, null); } diff --git a/spring-integration-core/src/test/java/org/springframework/integration/store/MessageStoreTests.java b/spring-integration-core/src/test/java/org/springframework/integration/store/MessageStoreTests.java index fd6c8e2dc15..0b5536c5af8 100644 --- a/spring-integration-core/src/test/java/org/springframework/integration/store/MessageStoreTests.java +++ b/spring-integration-core/src/test/java/org/springframework/integration/store/MessageStoreTests.java @@ -74,19 +74,14 @@ public void testGroupSizes() throws Exception { assertEquals(1, store.getMessageCountForAllMessageGroups()); } - @Test - public void testMarkedGroupSizes() throws Exception { - TestMessageStore store = new TestMessageStore(); - assertEquals(0, store.getMarkedMessageCountForAllMessageGroups()); - } - private static class TestMessageStore extends AbstractMessageGroupStore { @SuppressWarnings("unchecked") MessageGroup testMessages = new SimpleMessageGroup(Arrays.asList(new GenericMessage("foo")), "bar"); private boolean removed = false; - + + public Iterator iterator() { return Arrays.asList(testMessages).iterator(); } @@ -126,6 +121,11 @@ public void completeGroup(Object groupId) { throw new UnsupportedOperationException(); } + public Message pollMessageFromGroup(Object groupId) { + // TODO Auto-generated method stub + return null; + } + } } diff --git a/spring-integration-core/src/test/java/org/springframework/integration/transformer/ObjectToMapTransformerTests.java b/spring-integration-core/src/test/java/org/springframework/integration/transformer/ObjectToMapTransformerTests.java index 21013a808d7..e499d1b78e5 100644 --- a/spring-integration-core/src/test/java/org/springframework/integration/transformer/ObjectToMapTransformerTests.java +++ b/spring-integration-core/src/test/java/org/springframework/integration/transformer/ObjectToMapTransformerTests.java @@ -59,7 +59,6 @@ public void testObjectToSpelMapTransformer() throws JsonParseException, JsonMapp Message transformedMessage = transformer.transform(message); Map transformedMap = (Map) transformedMessage.getPayload(); - System.out.println(transformedMap); assertNotNull(transformedMap); Object valueFromTheMap = null; diff --git a/spring-integration-gemfire/src/test/java/org/springframework/integration/gemfire/store/GemfireGroupStoreTests.java b/spring-integration-gemfire/src/test/java/org/springframework/integration/gemfire/store/GemfireGroupStoreTests.java index 1997edab187..974776c1854 100644 --- a/spring-integration-gemfire/src/test/java/org/springframework/integration/gemfire/store/GemfireGroupStoreTests.java +++ b/spring-integration-gemfire/src/test/java/org/springframework/integration/gemfire/store/GemfireGroupStoreTests.java @@ -81,45 +81,25 @@ public void testMessageGroupWithAddedMessage() throws Exception{ assertEquals(1, messageGroup.size()); } - @Test - public void testRemoveMessageGroup() throws Exception{ - GemfireMessageStore store = new GemfireMessageStore(this.cache); - store.afterPropertiesSet(); - MessageGroup messageGroup = store.getMessageGroup(1); - Message message = new GenericMessage("Hello"); - messageGroup = store.addMessageToGroup(messageGroup.getGroupId(), message); - assertEquals(1, messageGroup.size()); - - store.removeMessageGroup(1); - MessageGroup messageGroupA = store.getMessageGroup(1); - assertNotSame(messageGroup, messageGroupA); - assertEquals(0, messageGroupA.getMarked().size()); - assertEquals(0, messageGroupA.getUnmarked().size()); - assertEquals(0, messageGroupA.size()); - - // make sure the store is properly rebuild from Gemfire - store = new GemfireMessageStore(this.cache); - store.afterPropertiesSet(); - - messageGroup = store.getMessageGroup(1); - - assertEquals(0, messageGroup.getMarked().size()); - assertEquals(0, messageGroup.getUnmarked().size()); - assertEquals(0, messageGroup.size()); - } - @Test public void testRemoveMessageFromTheGroup() throws Exception{ GemfireMessageStore store = new GemfireMessageStore(this.cache); store.afterPropertiesSet(); MessageGroup messageGroup = store.getMessageGroup(1); Message message = new GenericMessage("2"); - store.addMessageToGroup(messageGroup.getGroupId(), new GenericMessage("1")); - store.addMessageToGroup(messageGroup.getGroupId(), message); + + messageGroup = store.addMessageToGroup(messageGroup.getGroupId(), new GenericMessage("1")); + messageGroup = store.getMessageGroup(1); + + messageGroup = store.addMessageToGroup(messageGroup.getGroupId(), message); + messageGroup = store.getMessageGroup(1); + messageGroup = store.addMessageToGroup(messageGroup.getGroupId(), new GenericMessage("3")); + messageGroup = store.getMessageGroup(1); assertEquals(3, messageGroup.size()); - messageGroup = store.removeMessageFromGroup(1, message); + messageGroup = store.removeMessageFromGroup(messageGroup.getGroupId(), message); + messageGroup = store.getMessageGroup(1); assertEquals(2, messageGroup.size()); // make sure the store is properly rebuild from Gemfire @@ -132,28 +112,28 @@ public void testRemoveMessageFromTheGroup() throws Exception{ } @Test - public void testMarkAllMessagesInMessageGroup() throws Exception{ + public void testRemoveMessageGroup() throws Exception{ GemfireMessageStore store = new GemfireMessageStore(this.cache); store.afterPropertiesSet(); MessageGroup messageGroup = store.getMessageGroup(1); - store.addMessageToGroup(messageGroup.getGroupId(), new GenericMessage("1")); - store.addMessageToGroup(messageGroup.getGroupId(), new GenericMessage("2")); - messageGroup = store.addMessageToGroup(messageGroup.getGroupId(), new GenericMessage("3")); - - assertEquals(3, messageGroup.getUnmarked().size()); - assertEquals(0, messageGroup.getMarked().size()); - messageGroup = store.markMessageGroup(messageGroup); - - assertEquals(0, messageGroup.getUnmarked().size()); - assertEquals(3, messageGroup.getMarked().size()); + Message message = new GenericMessage("Hello"); + messageGroup = store.addMessageToGroup(messageGroup.getGroupId(), message); + assertEquals(1, messageGroup.size()); + + store.removeMessageGroup(1); + MessageGroup messageGroupA = store.getMessageGroup(1); + assertNotSame(messageGroup, messageGroupA); + assertEquals(0, messageGroupA.getMessages().size()); + assertEquals(0, messageGroupA.size()); // make sure the store is properly rebuild from Gemfire store = new GemfireMessageStore(this.cache); store.afterPropertiesSet(); messageGroup = store.getMessageGroup(1); - assertEquals(0, messageGroup.getUnmarked().size()); - assertEquals(3, messageGroup.getMarked().size()); + + assertEquals(0, messageGroup.getMessages().size()); + assertEquals(0, messageGroup.size()); } @Test @@ -172,30 +152,6 @@ public void testRemoveNonExistingMessageFromNonExistingTheGroup() throws Excepti store.removeMessageFromGroup(1, new GenericMessage("2")); } - @Test - public void testMarkMessageInMessageGroup() throws Exception{ - GemfireMessageStore store = new GemfireMessageStore(this.cache); - store.afterPropertiesSet(); - MessageGroup messageGroup = store.getMessageGroup(1); - Message messageToMark = new GenericMessage("1"); - store.addMessageToGroup(messageGroup.getGroupId(), messageToMark); - store.addMessageToGroup(messageGroup.getGroupId(), new GenericMessage("2")); - messageGroup = store.addMessageToGroup(messageGroup.getGroupId(), new GenericMessage("3")); - - assertEquals(3, messageGroup.getUnmarked().size()); - assertEquals(0, messageGroup.getMarked().size()); - messageGroup = store.markMessageFromGroup(1, messageToMark); - assertEquals(2, messageGroup.getUnmarked().size()); - assertEquals(1, messageGroup.getMarked().size()); - - // make sure the store is properly rebuild from Gemfire - store = new GemfireMessageStore(this.cache); - store.afterPropertiesSet(); - - messageGroup = store.getMessageGroup(1); - assertEquals(2, messageGroup.getUnmarked().size()); - assertEquals(1, messageGroup.getMarked().size()); - } @Test public void testCompleteMessageGroup() throws Exception{ @@ -233,16 +189,14 @@ public void testMultipleInstancesOfGroupStore() throws Exception{ store1.addMessageToGroup(1, message); MessageGroup messageGroup = store2.addMessageToGroup(1, new GenericMessage("2")); - assertEquals(2, messageGroup.getUnmarked().size()); - assertEquals(0, messageGroup.getMarked().size()); + assertEquals(2, messageGroup.getMessages().size()); GemfireMessageStore store3 = new GemfireMessageStore(this.cache); store3.afterPropertiesSet(); - messageGroup = store3.markMessageFromGroup(1, message); + messageGroup = store3.removeMessageFromGroup(1, message); - assertEquals(1, messageGroup.getUnmarked().size()); - assertEquals(1, messageGroup.getMarked().size()); + assertEquals(1, messageGroup.getMessages().size()); } @Test @@ -297,7 +251,7 @@ public void testConcurrentModifications() throws Exception{ executor.execute(new Runnable() { public void run() { MessageGroup group = store1.addMessageToGroup(1, message); - if (group.getUnmarked().size() != 1){ + if (group.getMessages().size() != 1){ failures.add("ADD"); throw new AssertionFailedError("Failed on ADD"); } @@ -306,7 +260,7 @@ public void run() { executor.execute(new Runnable() { public void run() { MessageGroup group = store2.removeMessageFromGroup(1, message); - if (group.getUnmarked().size() != 0){ + if (group.getMessages().size() != 0){ failures.add("REMOVE"); throw new AssertionFailedError("Failed on Remove"); } diff --git a/spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/JdbcMessageStore.java b/spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/JdbcMessageStore.java index 5c2549858e6..a34214e3d6c 100644 --- a/spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/JdbcMessageStore.java +++ b/spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/JdbcMessageStore.java @@ -22,18 +22,23 @@ import java.util.Date; import java.util.Iterator; import java.util.List; +import java.util.Map; import java.util.UUID; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import javax.sql.DataSource; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.springframework.beans.DirectFieldAccessor; import org.springframework.core.serializer.Deserializer; import org.springframework.core.serializer.Serializer; import org.springframework.core.serializer.support.DeserializingConverter; import org.springframework.core.serializer.support.SerializingConverter; +import org.springframework.dao.DataAccessException; import org.springframework.integration.Message; +import org.springframework.integration.MessageHeaders; import org.springframework.integration.store.AbstractMessageGroupStore; import org.springframework.integration.store.MessageGroup; import org.springframework.integration.store.MessageStore; @@ -43,6 +48,7 @@ import org.springframework.jdbc.core.JdbcOperations; import org.springframework.jdbc.core.JdbcTemplate; import org.springframework.jdbc.core.PreparedStatementSetter; +import org.springframework.jdbc.core.ResultSetExtractor; import org.springframework.jdbc.core.RowCallbackHandler; import org.springframework.jdbc.core.RowMapper; import org.springframework.jdbc.core.SingleColumnRowMapper; @@ -83,6 +89,8 @@ public class JdbcMessageStore extends AbstractMessageGroupStore implements Messa private static final String LIST_MESSAGES_BY_GROUP_KEY = "SELECT MESSAGE_ID, CREATED_DATE, GROUP_KEY, MESSAGE_BYTES, MARKED, COMPLETE, LAST_RELEASED_SEQUENCE from %PREFIX%MESSAGE_GROUP where GROUP_KEY=? and REGION=? order by CREATED_DATE"; + private static final String LIST_MESSAGEIDS_BY_GROUP_KEY = "SELECT MESSAGE_ID, CREATED_DATE from %PREFIX%MESSAGE_GROUP where GROUP_KEY=? and REGION=? order by CREATED_DATE"; + private static final String COUNT_ALL_GROUPS = "SELECT COUNT(GROUP_KEY) from %PREFIX%MESSAGE_GROUP where REGION=?"; private static final String COUNT_ALL_MARKED_MESSAGES_IN_GROUPS = "SELECT COUNT(MESSAGE_ID) from %PREFIX%MESSAGE_GROUP where MARKED=1 AND REGION=?"; @@ -101,8 +109,8 @@ public class JdbcMessageStore extends AbstractMessageGroupStore implements Messa private static final String DELETE_MESSAGE_GROUP = "DELETE from %PREFIX%MESSAGE_GROUP where GROUP_KEY=? and REGION=?"; - private static final String CREATE_MESSAGE_IN_GROUP = "INSERT into %PREFIX%MESSAGE_GROUP(MESSAGE_ID, REGION, CREATED_DATE, GROUP_KEY, MARKED, COMPLETE, LAST_RELEASED_SEQUENCE, MESSAGE_BYTES)" - + " values (?, ?, ?, ?, 0, 0, 0, ?)"; + private static final String CREATE_MESSAGE_IN_GROUP = "INSERT into %PREFIX%MESSAGE_GROUP(MESSAGE_ID, REGION, CREATED_DATE, GROUP_KEY, MARKED, COMPLETE, LAST_RELEASED_SEQUENCE)" + + " values (?, ?, ?, ?, 0, 0, 0)"; private static final String LIST_GROUP_KEYS = "SELECT distinct GROUP_KEY as CREATED from %PREFIX%MESSAGE_GROUP where REGION=?"; @@ -266,9 +274,9 @@ public Message getMessage(UUID id) { return list.get(0); } + @SuppressWarnings({ "rawtypes", "unchecked" }) public Message addMessage(final Message message) { if (message.getHeaders().containsKey(SAVED_KEY)) { - @SuppressWarnings("unchecked") Message saved = (Message) getMessage(message.getHeaders().getId()); if (saved != null) { if (saved.equals(message)) { @@ -280,6 +288,11 @@ public Message addMessage(final Message message) { final long createdDate = System.currentTimeMillis(); Message result = MessageBuilder.fromMessage(message).setHeader(SAVED_KEY, Boolean.TRUE) .setHeader(CREATED_DATE_KEY, new Long(createdDate)).build(); + + Map innerMap = (Map) new DirectFieldAccessor(result.getHeaders()).getPropertyValue("headers"); + // using reflection to set ID since it is immutable through MessageHeaders + innerMap.put(MessageHeaders.ID, message.getHeaders().get(MessageHeaders.ID)); + final String messageId = getKey(result.getHeaders().getId()); final byte[] messageBytes = serializer.convert(result); @@ -300,7 +313,6 @@ public MessageGroup addMessageToGroup(Object groupId, Message message) { final long createdDate = System.currentTimeMillis(); final String messageId = getKey(message.getHeaders().getId()); final String groupKey = getKey(groupId); - final byte[] messageBytes = serializer.convert(message); jdbcTemplate.update(getQuery(CREATE_MESSAGE_IN_GROUP), new PreparedStatementSetter() { public void setValues(PreparedStatement ps) throws SQLException { @@ -309,10 +321,9 @@ public void setValues(PreparedStatement ps) throws SQLException { ps.setString(2, region); ps.setTimestamp(3, new Timestamp(createdDate)); ps.setString(4, groupKey); - lobHandler.getLobCreator().setBlobAsBytes(ps, 5, messageBytes); } }); - + this.addMessage(message); return getMessageGroup(groupId); } @@ -334,25 +345,20 @@ public int getMarkedMessageCountForAllMessageGroups() { public MessageGroup getMessageGroup(Object groupId) { String key = getKey(groupId); - final List> marked = new ArrayList>(); - final List> unmarked = new ArrayList>(); + final List> messages = new ArrayList>(); final AtomicReference date = new AtomicReference(); final AtomicReference completeFlag = new AtomicReference(); final AtomicReference lastReleasedSequenceRef = new AtomicReference(); + final AtomicInteger size = new AtomicInteger(); jdbcTemplate.query(getQuery(LIST_MESSAGES_BY_GROUP_KEY), new Object[] { key, region }, + new RowCallbackHandler() { - int count = 0; - public void processRow(ResultSet rs) throws SQLException { - int markedFlag = rs.getInt("MARKED"); - Message message = mapper.mapRow(rs, count++); - if (markedFlag > 0) { - marked.add(message); - } - else { - unmarked.add(message); - } + size.incrementAndGet(); + + messages.add(getMessage(UUID.fromString(rs.getString("MESSAGE_ID")))); + date.set(rs.getTimestamp("CREATED_DATE")); completeFlag.set(rs.getInt("COMPLETE") > 0); @@ -360,17 +366,19 @@ public void processRow(ResultSet rs) throws SQLException { lastReleasedSequenceRef.set(rs.getInt("LAST_RELEASED_SEQUENCE")); } }); - if (marked.isEmpty() && unmarked.isEmpty()) { + + if (size.get() == 0){ return new SimpleMessageGroup(groupId); } Assert.state(date.get() != null, "Could not locate created date for groupId=" + groupId); long timestamp = date.get().getTime(); boolean complete = completeFlag.get().booleanValue(); - SimpleMessageGroup messageGroup = new SimpleMessageGroup(unmarked, marked, groupId, timestamp, complete); + SimpleMessageGroup messageGroup = new SimpleMessageGroup(messages, groupId, timestamp, complete); int lastReleasedSequenceNumber = lastReleasedSequenceRef.get(); if (lastReleasedSequenceNumber > 0){ messageGroup.setLastReleasedMessageSequenceNumber(lastReleasedSequenceNumber); } + return messageGroup; } @@ -404,6 +412,7 @@ public void setValues(PreparedStatement ps) throws SQLException { ps.setString(3, messageId); } }); + this.removeMessage(messageToRemove.getHeaders().getId()); return getMessageGroup(groupId); } @@ -431,6 +440,10 @@ public void setValues(PreparedStatement ps) throws SQLException { public void removeMessageGroup(Object groupId) { final String groupKey = getKey(groupId); + + for (UUID messageIds : this.getMessageIdsForGroup(groupId)) { + this.removeMessage(messageIds); + } jdbcTemplate.update(getQuery(DELETE_MESSAGE_GROUP), new PreparedStatementSetter() { public void setValues(PreparedStatement ps) throws SQLException { @@ -439,6 +452,7 @@ public void setValues(PreparedStatement ps) throws SQLException { ps.setString(2, region); } }); + } public void completeGroup(Object groupId) { @@ -470,6 +484,46 @@ public void setValues(PreparedStatement ps) throws SQLException { } }); } + + public Message pollMessageFromGroup(Object groupId) { + String key = getKey(groupId); + + UUID messageId = jdbcTemplate.query(getQuery(LIST_MESSAGEIDS_BY_GROUP_KEY), new Object[] { key, region }, + new ResultSetExtractor() { + public UUID extractData(ResultSet rs) + throws SQLException, DataAccessException { + if (rs.next()) { + UUID uuid = UUID.fromString(rs.getString(1)); + return uuid; + } + return null; + } + }); + + if (messageId != null){ + Message message = this.getMessage(messageId); + this.removeMessageFromGroup(groupId, message); + return message; + } + return null; + } + + private List getMessageIdsForGroup(Object groupId){ + String key = getKey(groupId); + + final List messageIds = new ArrayList(); + + jdbcTemplate.query(getQuery(LIST_MESSAGEIDS_BY_GROUP_KEY), new Object[] { key, region }, + new RowCallbackHandler() { + + public void processRow(ResultSet rs) throws SQLException { + messageIds.add(UUID.fromString(rs.getString(1))); + } + + } + ); + return messageIds; + } public Iterator iterator() { diff --git a/spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/JdbcMessageStoreChannelIntegrationTests.java b/spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/JdbcMessageStoreChannelIntegrationTests.java index 1bb07e13c67..3efc40244d9 100644 --- a/spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/JdbcMessageStoreChannelIntegrationTests.java +++ b/spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/JdbcMessageStoreChannelIntegrationTests.java @@ -81,7 +81,7 @@ public void testSendAndActivateWithRollback() throws Exception { Service.await(1000); assertEquals(1, Service.messages.size()); // After a rollback in the poller the message is still waiting to be delivered - // but unless we use a transactin here there is a chance that the queue will + // but unless we use a transaction here there is a chance that the queue will // appear empty.... new TransactionTemplate(transactionManager).execute(new TransactionCallback() { diff --git a/spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/JdbcMessageStoreChannelTests.java b/spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/JdbcMessageStoreChannelTests.java index 1deeaab7b6f..73dbf4bc906 100644 --- a/spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/JdbcMessageStoreChannelTests.java +++ b/spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/JdbcMessageStoreChannelTests.java @@ -70,7 +70,6 @@ public void testSendAndActivateWithRollback() throws Exception { assertEquals(1, Service.messages.size()); // After a rollback in the poller the message is still waiting to be delivered assertEquals(1, messageStore.getMessageGroup("input-queue").size()); - assertEquals(1, messageStore.getMessageGroup("input-queue").getUnmarked().size()); } @Test @@ -89,7 +88,6 @@ public void testSendAndActivateTransactionalSend() throws Exception { assertEquals(0, Service.messages.size()); // But inside the transaction the message is still there assertEquals(1, messageStore.getMessageGroup("input-queue").size()); - assertEquals(1, messageStore.getMessageGroup("input-queue").getUnmarked().size()); } public static class Service { diff --git a/spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/JdbcMessageStoreTests.java b/spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/JdbcMessageStoreTests.java index b37052cb942..06b00431b78 100644 --- a/spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/JdbcMessageStoreTests.java +++ b/spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/JdbcMessageStoreTests.java @@ -16,21 +16,11 @@ package org.springframework.integration.jdbc; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertNotSame; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertSame; -import static org.junit.Assert.assertThat; -import static org.junit.Assert.assertTrue; -import static org.springframework.integration.test.matcher.PayloadAndHeaderMatcher.sameExceptIgnorableHeaders; - import java.io.BufferedReader; import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; import java.io.OutputStream; -import java.util.Iterator; import java.util.UUID; import javax.sql.DataSource; @@ -51,6 +41,16 @@ import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; import org.springframework.transaction.annotation.Transactional; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNotSame; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertSame; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; + +import static org.springframework.integration.test.matcher.PayloadAndHeaderMatcher.sameExceptIgnorableHeaders; + @ContextConfiguration @RunWith(SpringJUnit4ClassRunner.class) public class JdbcMessageStoreTests { @@ -77,7 +77,7 @@ public void testGetNonExistent() throws Exception { public void testAddAndGet() throws Exception { Message message = MessageBuilder.withPayload("foo").build(); Message saved = messageStore.addMessage(message); - assertNull(messageStore.getMessage(message.getHeaders().getId())); + assertNotNull(messageStore.getMessage(message.getHeaders().getId())); Message result = messageStore.getMessage(saved.getHeaders().getId()); assertNotNull(result); assertThat(saved, sameExceptIgnorableHeaders(result)); @@ -111,7 +111,7 @@ public GenericMessage deserialize(InputStream inputStream) throws IOExce }); Message message = MessageBuilder.withPayload("foo").build(); Message saved = messageStore.addMessage(message); - assertNull(messageStore.getMessage(message.getHeaders().getId())); + assertNotNull(messageStore.getMessage(message.getHeaders().getId())); Message result = messageStore.getMessage(saved.getHeaders().getId()); assertNotNull(result); assertEquals("foo", result.getPayload()); @@ -263,31 +263,8 @@ public void testOrderInMessageGroup() throws Exception { messageStore.addMessageToGroup(groupId, message); MessageGroup group = messageStore.getMessageGroup(groupId); assertEquals(2, group.size()); - Iterator> iterator = group.getUnmarked().iterator(); - assertEquals("foo", iterator.next().getPayload()); - assertEquals("bar", iterator.next().getPayload()); - } - - @Test - @Transactional - public void testAddAndMarkMessageGroup() throws Exception { - String groupId = "X"; - Message message = MessageBuilder.withPayload("foo").setCorrelationId(groupId).build(); - messageStore.addMessageToGroup(groupId, message); - MessageGroup group = messageStore.getMessageGroup(groupId); - group = messageStore.markMessageGroup(group); - assertEquals(1, group.getMarked().size()); - } - - @Test - @Transactional - public void testAddAndMarkMessageInGroup() throws Exception { - String groupId = "X"; - Message message = MessageBuilder.withPayload("foo").setCorrelationId(groupId).build(); - messageStore.addMessageToGroup(groupId, message); - messageStore.addMessageToGroup(groupId, MessageBuilder.withPayload("bar").setCorrelationId(groupId).build()); - MessageGroup group = messageStore.markMessageFromGroup(groupId, message); - assertEquals(1, group.getMarked().size()); + assertEquals("foo", messageStore.pollMessageFromGroup(groupId).getPayload()); + assertEquals("bar", messageStore.pollMessageFromGroup(groupId).getPayload()); } @Test diff --git a/spring-integration-jmx/src/test/java/org/springframework/integration/jmx/config/MessageStoreTests.java b/spring-integration-jmx/src/test/java/org/springframework/integration/jmx/config/MessageStoreTests.java index 61940bd61e2..48a44927e45 100644 --- a/spring-integration-jmx/src/test/java/org/springframework/integration/jmx/config/MessageStoreTests.java +++ b/spring-integration-jmx/src/test/java/org/springframework/integration/jmx/config/MessageStoreTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2010 the original author or authors. + * Copyright 2002-2011 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at diff --git a/spring-integration-mongodb/src/main/java/org/springframework/integration/mongodb/store/MongoDbMessageStore.java b/spring-integration-mongodb/src/main/java/org/springframework/integration/mongodb/store/MongoDbMessageStore.java index ea86c1d8619..2615b77caaf 100644 --- a/spring-integration-mongodb/src/main/java/org/springframework/integration/mongodb/store/MongoDbMessageStore.java +++ b/spring-integration-mongodb/src/main/java/org/springframework/integration/mongodb/store/MongoDbMessageStore.java @@ -16,8 +16,6 @@ package org.springframework.integration.mongodb.store; -import static org.springframework.data.mongodb.core.query.Criteria.where; - import java.util.ArrayList; import java.util.HashMap; import java.util.Iterator; @@ -36,6 +34,7 @@ import org.springframework.data.mongodb.core.mapping.MongoMappingContext; import org.springframework.data.mongodb.core.mapping.MongoPersistentEntity; import org.springframework.data.mongodb.core.mapping.MongoPersistentProperty; +import org.springframework.data.mongodb.core.query.Order; import org.springframework.data.mongodb.core.query.Query; import org.springframework.data.mongodb.core.query.Update; import org.springframework.integration.Message; @@ -49,10 +48,13 @@ import org.springframework.jmx.export.annotation.ManagedAttribute; import org.springframework.util.Assert; import org.springframework.util.ClassUtils; +import org.springframework.util.CollectionUtils; import org.springframework.util.StringUtils; import com.mongodb.DBObject; +import static org.springframework.data.mongodb.core.query.Criteria.where; + /** * An implementation of both the {@link MessageStore} and {@link MessageGroupStore} * strategies that relies upon MongoDB for persistence. @@ -67,8 +69,6 @@ public class MongoDbMessageStore extends AbstractMessageGroupStore implements Me private final static String GROUP_ID_KEY = "_groupId"; - private final static String MARKED_KEY = "_marked"; - private final static String GROUP_COMPLETE_KEY = "_group_complete"; private final static String LAST_RELEASED_SEQUENCE_NUMBER = "_last_released_sequence"; @@ -76,6 +76,8 @@ public class MongoDbMessageStore extends AbstractMessageGroupStore implements Me private final static String GROUP_TIMESTAMP_KEY = "_group_timestamp"; private final static String PAYLOAD_TYPE_KEY = "_payloadType"; + + private final static String CREATED_DATE = "_createdDate"; private final MongoTemplate template; @@ -135,8 +137,7 @@ public Message removeMessage(UUID id) { public MessageGroup getMessageGroup(Object groupId) { Assert.notNull(groupId, "'groupId' must not be null"); List messageWrappers = this.template.find(whereGroupIdIs(groupId), MessageWrapper.class, this.collectionName); - List> unmarkedMessages = new ArrayList>(); - List> markedMessages = new ArrayList>(); + List> messages = new ArrayList>(); long timestamp = 0; int lastReleasedSequenceNumber = 0; boolean completeGroup = false; @@ -148,17 +149,13 @@ public MessageGroup getMessageGroup(Object groupId) { } for (MessageWrapper messageWrapper : messageWrappers) { - if (messageWrapper.isMarked()) { - markedMessages.add(messageWrapper.getMessage()); - } - else { - unmarkedMessages.add(messageWrapper.getMessage()); - } + messages.add(messageWrapper.getMessage()); } - SimpleMessageGroup messageGroup = new SimpleMessageGroup(unmarkedMessages, markedMessages, groupId, timestamp, completeGroup); + SimpleMessageGroup messageGroup = new SimpleMessageGroup(messages, groupId, timestamp, completeGroup); if (lastReleasedSequenceNumber > 0){ messageGroup.setLastReleasedMessageSequenceNumber(lastReleasedSequenceNumber); } + return messageGroup; } @@ -177,16 +174,6 @@ public MessageGroup addMessageToGroup(Object groupId, Message message) { return this.getMessageGroup(groupId); } - public MessageGroup markMessageGroup(MessageGroup group) { - Assert.notNull(group, "'group' must not be null"); - Object groupId = group.getGroupId(); - List messageWrappers = this.template.find(whereGroupIdIs(groupId), MessageWrapper.class, this.collectionName); - for (MessageWrapper messageWrapper : messageWrappers) { - this.markMessageFromGroup(groupId, messageWrapper.getMessage()); - } - return this.getMessageGroup(groupId); - } - public MessageGroup removeMessageFromGroup(Object groupId, Message messageToRemove) { Assert.notNull(groupId, "'groupId' must not be null"); Assert.notNull(messageToRemove, "'messageToRemove' must not be null"); @@ -194,13 +181,6 @@ public MessageGroup removeMessageFromGroup(Object groupId, Message messageToR return this.getMessageGroup(groupId); } - public MessageGroup markMessageFromGroup(Object groupId, Message messageToMark) { - Update update = Update.update(MARKED_KEY, true); - Query q = whereMessageIdIs(messageToMark.getHeaders().getId()); - this.template.updateFirst(q, update, this.collectionName); - return this.getMessageGroup(groupId); - } - public void removeMessageGroup(Object groupId) { List messageWrappers = this.template.find(whereGroupIdIs(groupId), MessageWrapper.class, this.collectionName); for (MessageWrapper messageWrapper : messageWrappers) { @@ -231,6 +211,19 @@ public void setLastReleasedSequenceNumberForGroup(Object groupId, int sequenceNu Query q = whereGroupIdIs(groupId); this.template.updateFirst(q, update, this.collectionName); } + + public Message pollMessageFromGroup(Object groupId) { + Assert.notNull(groupId, "'groupId' must not be null"); + List messageWrappers = this.template.find(whereGroupIdIsOrdered(groupId), MessageWrapper.class, this.collectionName); + Message message = null; + + if (!CollectionUtils.isEmpty(messageWrappers)){ + message = messageWrappers.get(0).getMessage(); + this.removeMessageFromGroup(groupId, message); + } + + return message; + } /* * Common Queries @@ -247,6 +240,12 @@ private static Query whereGroupIdIs(Object groupId) { private static Query whereGroupIdExists() { return new Query(where(GROUP_ID_KEY).exists(true)); } + + private static Query whereGroupIdIsOrdered(Object groupId) { + Query q = new Query(where(GROUP_ID_KEY).is(groupId)).limit(1); + q.sort().on(CREATED_DATE, Order.ASCENDING); + return q; + } /** @@ -272,7 +271,7 @@ public void afterPropertiesSet() { public void write(Object source, DBObject target) { Message message = null; Object groupId = null; - boolean marked = false; + boolean groupComplete = false; long groupTimestamp = 0; int lastReleasedSequenceNumber = 0; @@ -280,7 +279,6 @@ public void write(Object source, DBObject target) { MessageWrapper wrapper = (MessageWrapper) source; message = wrapper.getMessage(); groupId = wrapper.getGroupId(); - marked = wrapper.isMarked(); groupComplete = wrapper.isCompletedGroup(); lastReleasedSequenceNumber = wrapper.getLastReleasedSequenceNumber(); groupTimestamp = wrapper.getGroupTimestamp(); @@ -289,6 +287,7 @@ public void write(Object source, DBObject target) { Class sourceType = (source != null) ? source.getClass() : null; throw new IllegalArgumentException("Unexpected source type [" + sourceType + "]. Should be a MessageWrapper."); } + target.put(CREATED_DATE, System.currentTimeMillis()); target.put(PAYLOAD_TYPE_KEY, message.getPayload().getClass().getName()); if (groupId != null) { target.put(GROUP_ID_KEY, groupId); @@ -296,9 +295,7 @@ public void write(Object source, DBObject target) { target.put(LAST_RELEASED_SEQUENCE_NUMBER, lastReleasedSequenceNumber); target.put(GROUP_TIMESTAMP_KEY, groupTimestamp); } - if (marked) { - target.put(MARKED_KEY, marked); - } + super.write(message, target); } @@ -341,8 +338,6 @@ public S read(Class clazz, DBObject source) { if (lastReleasedSequenceNumber != null){ wrapper.setLastReleasedSequenceNumber(lastReleasedSequenceNumber); } - - wrapper.setMarked(source.get(MARKED_KEY) != null); if (completeGroup != null){ wrapper.setCompletedGroup(completeGroup.booleanValue()); @@ -376,8 +371,6 @@ private static final class MessageWrapper { private volatile Object groupId; - private volatile boolean marked; - private final Message message; private volatile long groupTimestamp; @@ -406,10 +399,6 @@ public Object getGroupId() { return groupId; } - public boolean isMarked() { - return marked; - } - public Message getMessage() { return message; } @@ -418,10 +407,6 @@ public void setGroupId(Object groupId) { this.groupId = groupId; } - public void setMarked(boolean marked) { - this.marked = marked; - } - public void setGroupTimestamp(long groupTimestamp) { this.groupTimestamp = groupTimestamp; } diff --git a/spring-integration-mongodb/src/test/java/org/springframework/integration/mongodb/store/MongoDbMessageGroupStoreTests.java b/spring-integration-mongodb/src/test/java/org/springframework/integration/mongodb/store/MongoDbMessageGroupStoreTests.java index 99786f5965c..0f17cbdef1d 100644 --- a/spring-integration-mongodb/src/test/java/org/springframework/integration/mongodb/store/MongoDbMessageGroupStoreTests.java +++ b/spring-integration-mongodb/src/test/java/org/springframework/integration/mongodb/store/MongoDbMessageGroupStoreTests.java @@ -85,19 +85,16 @@ public void testMessageGroupMarkingMessage() throws Exception{ Message messageB = new GenericMessage("B"); store.addMessageToGroup(1, messageA); messageGroup = store.addMessageToGroup(1, messageB); - assertEquals(0, messageGroup.getMarked().size()); - assertEquals(2, messageGroup.getUnmarked().size()); + assertEquals(2, messageGroup.size()); - messageGroup = store.markMessageFromGroup(1, messageA); - assertEquals(1, messageGroup.getMarked().size()); - assertEquals(1, messageGroup.getUnmarked().size()); + messageGroup = store.removeMessageFromGroup(1, messageA); + assertEquals(1, messageGroup.size()); // validate that the updates were propagated to Mongo as well store = new MongoDbMessageStore(mongoDbFactory); messageGroup = store.getMessageGroup(1); - assertEquals(1, messageGroup.getMarked().size()); - assertEquals(1, messageGroup.getUnmarked().size()); + assertEquals(1, messageGroup.size()); } @Test @@ -167,33 +164,6 @@ public void testRemoveMessageFromTheGroup() throws Exception{ assertEquals(2, messageGroup.size()); } - @Test - @MongoDbAvailable - public void testMarkAllMessagesInMessageGroup() throws Exception { - MongoDbFactory mongoDbFactory = this.prepareMongoFactory(); - MongoDbMessageStore store = new MongoDbMessageStore(mongoDbFactory); - - MessageGroup messageGroup = store.getMessageGroup(1); - - store.addMessageToGroup(1, new GenericMessage("1")); - store.addMessageToGroup(1, new GenericMessage("2")); - messageGroup = store.addMessageToGroup(1, new GenericMessage("3")); - - assertEquals(3, messageGroup.getUnmarked().size()); - assertEquals(0, messageGroup.getMarked().size()); - - messageGroup = store.markMessageGroup(messageGroup); - - assertEquals(0, messageGroup.getUnmarked().size()); - assertEquals(3, messageGroup.getMarked().size()); - - store = new MongoDbMessageStore(mongoDbFactory); - - messageGroup = store.getMessageGroup(1); - assertEquals(0, messageGroup.getUnmarked().size()); - assertEquals(3, messageGroup.getMarked().size()); - } - @Test @MongoDbAvailable public void testMultipleMessageStores() throws Exception{ @@ -210,12 +180,12 @@ public void testMultipleMessageStores() throws Exception{ MessageGroup messageGroup = store3.getMessageGroup(1); - assertEquals(3, messageGroup.getUnmarked().size()); + assertEquals(3, messageGroup.size()); store3.removeMessageFromGroup(1, message); messageGroup = store2.getMessageGroup(1); - assertEquals(2, messageGroup.getUnmarked().size()); + assertEquals(2, messageGroup.size()); } @Test diff --git a/spring-integration-redis/src/test/java/org/springframework/integration/redis/store/RedisMessageGroupStoreTests.java b/spring-integration-redis/src/test/java/org/springframework/integration/redis/store/RedisMessageGroupStoreTests.java index cb1a25e0d40..4c2213e7617 100644 --- a/spring-integration-redis/src/test/java/org/springframework/integration/redis/store/RedisMessageGroupStoreTests.java +++ b/spring-integration-redis/src/test/java/org/springframework/integration/redis/store/RedisMessageGroupStoreTests.java @@ -94,8 +94,8 @@ public void testRemoveMessageGroup() throws Exception{ store.removeMessageGroup(1); MessageGroup messageGroupA = store.getMessageGroup(1); assertNotSame(messageGroup, messageGroupA); - assertEquals(0, messageGroupA.getMarked().size()); - assertEquals(0, messageGroupA.getUnmarked().size()); +// assertEquals(0, messageGroupA.getMarked().size()); + assertEquals(0, messageGroupA.getMessages().size()); assertEquals(0, messageGroupA.size()); // make sure the store is properly rebuild from Redis @@ -103,8 +103,7 @@ public void testRemoveMessageGroup() throws Exception{ messageGroup = store.getMessageGroup(1); - assertEquals(0, messageGroup.getMarked().size()); - assertEquals(0, messageGroup.getUnmarked().size()); + assertEquals(0, messageGroup.getMessages().size()); assertEquals(0, messageGroup.size()); } @@ -177,58 +176,6 @@ public void testRemoveNonExistingMessageFromNonExistingTheGroup() throws Excepti store.removeMessageFromGroup(1, new GenericMessage("2")); } - @Test - @RedisAvailable - public void testMarkAllMessagesInMessageGroup() throws Exception{ - JedisConnectionFactory jcf = this.getConnectionFactoryForTest(); - RedisMessageStore store = new RedisMessageStore(jcf); - - MessageGroup messageGroup = store.getMessageGroup(1); - store.addMessageToGroup(messageGroup.getGroupId(), new GenericMessage("1")); - store.addMessageToGroup(messageGroup.getGroupId(), new GenericMessage("2")); - messageGroup = store.addMessageToGroup(messageGroup.getGroupId(), new GenericMessage("3")); - - assertEquals(3, messageGroup.getUnmarked().size()); - assertEquals(0, messageGroup.getMarked().size()); - messageGroup = store.markMessageGroup(messageGroup); - - assertEquals(0, messageGroup.getUnmarked().size()); - assertEquals(3, messageGroup.getMarked().size()); - - // make sure the store is properly rebuild from Redis - store = new RedisMessageStore(jcf); - - messageGroup = store.getMessageGroup(1); - assertEquals(0, messageGroup.getUnmarked().size()); - assertEquals(3, messageGroup.getMarked().size()); - } - - @Test - @RedisAvailable - public void testMarkMessageInMessageGroup() throws Exception{ - JedisConnectionFactory jcf = this.getConnectionFactoryForTest(); - RedisMessageStore store = new RedisMessageStore(jcf); - - MessageGroup messageGroup = store.getMessageGroup(1); - Message messageToMark = new GenericMessage("1"); - store.addMessageToGroup(messageGroup.getGroupId(), messageToMark); - store.addMessageToGroup(messageGroup.getGroupId(), new GenericMessage("2")); - messageGroup = store.addMessageToGroup(messageGroup.getGroupId(), new GenericMessage("3")); - - assertEquals(3, messageGroup.getUnmarked().size()); - assertEquals(0, messageGroup.getMarked().size()); - messageGroup = store.markMessageFromGroup(1, messageToMark); - assertEquals(2, messageGroup.getUnmarked().size()); - assertEquals(1, messageGroup.getMarked().size()); - - // make sure the store is properly rebuild from Redis - store = new RedisMessageStore(jcf); - - messageGroup = store.getMessageGroup(1); - assertEquals(2, messageGroup.getUnmarked().size()); - assertEquals(1, messageGroup.getMarked().size()); - } - @Test @@ -243,15 +190,13 @@ public void testMultipleInstancesOfGroupStore() throws Exception{ store1.addMessageToGroup(1, message); MessageGroup messageGroup = store2.addMessageToGroup(1, new GenericMessage("2")); - assertEquals(2, messageGroup.getUnmarked().size()); - assertEquals(0, messageGroup.getMarked().size()); + assertEquals(2, messageGroup.getMessages().size()); RedisMessageStore store3 = new RedisMessageStore(jcf); - messageGroup = store3.markMessageFromGroup(1, message); + messageGroup = store3.removeMessageFromGroup(1, message); - assertEquals(1, messageGroup.getUnmarked().size()); - assertEquals(1, messageGroup.getMarked().size()); + assertEquals(1, messageGroup.getMessages().size()); } @Test @@ -305,7 +250,7 @@ public void testConcurrentModifications() throws Exception{ executor.execute(new Runnable() { public void run() { MessageGroup group = store1.addMessageToGroup(1, message); - if (group.getUnmarked().size() != 1){ + if (group.getMessages().size() != 1){ failures.add("ADD"); throw new AssertionFailedError("Failed on ADD"); } @@ -314,7 +259,7 @@ public void run() { executor.execute(new Runnable() { public void run() { MessageGroup group = store2.removeMessageFromGroup(1, message); - if (group.getUnmarked().size() != 0){ + if (group.getMessages().size() != 0){ failures.add("REMOVE"); throw new AssertionFailedError("Failed on Remove"); }