Skip to content

INT-2182 refactored MessageGroupQueue and related classes #152

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Oct 25, 2011
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ public final Object processMessageGroup(MessageGroup group) {
protected Map<String, Object> aggregateHeaders(MessageGroup group) {
Map<String, Object> aggregatedHeaders = new HashMap<String, Object>();
Set<String> conflictKeys = new HashSet<String>();
for (Message<?> message : group.getUnmarked()) {
for (Message<?> message : group.getMessages()) {
MessageHeaders currentHeaders = message.getHeaders();
for (String key : currentHeaders.keySet()) {
if (MessageHeaders.ID.equals(key) || MessageHeaders.TIMESTAMP.equals(key)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,13 +85,6 @@ public abstract class AbstractCorrelatingMessageHandler extends AbstractMessageH
private final Object correlationLocksMonitor = new Object();

private final ConcurrentMap<Object, Object> locks = new ConcurrentHashMap<Object, Object>();

protected volatile boolean keepReleasedMessages = true;


public void setKeepReleasedMessages(boolean keepReleasedMessages) {
this.keepReleasedMessages = keepReleasedMessages;
}

public AbstractCorrelatingMessageHandler(MessageGroupProcessor processor, MessageGroupStore store,
CorrelationStrategy correlationStrategy, ReleaseStrategy releaseStrategy) {
Expand Down Expand Up @@ -288,7 +281,7 @@ private void expireGroup(Object correlationKey, MessageGroup group) {
logger.debug("Discarding messages of partially complete group with key ["
+ correlationKey + "] to: " + discardChannel);
}
for (Message<?> message : group.getUnmarked()) {
for (Message<?> message : group.getMessages()) {
discardChannel.send(message);
}
}
Expand All @@ -307,6 +300,7 @@ private Collection<Message<?>> completeGroup(Message<?> message, Object correlat
if (logger.isDebugEnabled()) {
logger.debug("Completing group with correlationKey [" + correlationKey + "]");
}

Object result = outputProcessor.processMessageGroup(group);
Collection<Message<?>> partialSequence = null;
if (result instanceof Collection<?>) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ public class AggregatingMessageHandler extends AbstractCorrelatingMessageHandler

private volatile boolean expireGroupsUponCompletion = false;


public AggregatingMessageHandler(MessageGroupProcessor processor, MessageGroupStore store,
CorrelationStrategy correlationStrategy, ReleaseStrategy releaseStrategy) {
super(processor, store, correlationStrategy, releaseStrategy);
Expand Down Expand Up @@ -71,16 +70,8 @@ protected void afterRelease(MessageGroup messageGroup, Collection<Message<?>> co
remove(messageGroup);
}
else {
if (this.keepReleasedMessages){
messageStore.markMessageGroup(messageGroup);
}
else {
for (Message<?> message : messageGroup.getMarked()) {
this.messageStore.removeMessageFromGroup(messageGroup.getGroupId(), message);
}
for (Message<?> message : messageGroup.getUnmarked()) {
this.messageStore.removeMessageFromGroup(messageGroup.getGroupId(), message);
}
for (Message<?> message : messageGroup.getMessages()) {
this.messageStore.removeMessageFromGroup(messageGroup.getGroupId(), message);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2010 the original author or authors.
* Copyright 2002-2011 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
Expand Down Expand Up @@ -40,6 +40,7 @@
* for each correlation key.
*
* @author Iwein Fuld
* @author Oleg Zhurakousky
*
* @see CorrelatingMessageHandler
*/
Expand Down Expand Up @@ -102,9 +103,9 @@ public Message<Object> receive() {
if (releaseStrategy.canRelease(group)) {
Message<?> nextMessage = null;

Iterator<Message<?>> unmarked = group.getUnmarked().iterator();
if (unmarked.hasNext()) {
nextMessage = unmarked.next();
Iterator<Message<?>> messages = group.getMessages().iterator();
if (messages.hasNext()) {
nextMessage = messages.next();
store.removeMessageFromGroup(key, nextMessage);
if (log.isDebugEnabled()) {
log.debug(String.format("Released message for key [%s]: %s.", key, nextMessage));
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2010 the original author or authors.
* Copyright 2002-2011 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -38,7 +38,7 @@ public class DefaultAggregatingMessageGroupProcessor extends AbstractAggregating

@Override
protected final Object aggregatePayloads(MessageGroup group, Map<String, Object> headers) {
Collection<Message<?>> messages = group.getUnmarked();
Collection<Message<?>> messages = group.getMessages();
Assert.notEmpty(messages, this.getClass().getSimpleName() + " cannot process empty message groups");
List<Object> payloads = new ArrayList<Object>(messages.size());
for (Message<?> message : messages) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2010 the original author or authors.
* Copyright 2002-2011 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -54,12 +54,12 @@ public void setExpectedType(Class<?> expectedType) {
}

/**
* Evaluate the expression provided on the unmarked messages (a collection) in the group, and delegate to the
* Evaluate the expression provided on the messages (a collection) in the group, and delegate to the
* {@link MessagingTemplate} to send downstream.
*/
@Override
protected Object aggregatePayloads(MessageGroup group, Map<String, Object> headers) {
return processor.process(group.getUnmarked());
return processor.process(group.getMessages());
}

}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2008 the original author or authors.
* Copyright 2002-2011 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -31,11 +31,11 @@ public ExpressionEvaluatingReleaseStrategy(String expression) {
}

/**
* Evaluate the expression provided on the unmarked messages (a collection) in the group and return the result (must
* Evaluate the expression provided on the messages (a collection) in the group and return the result (must
* be boolean).
*/
public boolean canRelease(MessageGroup messages) {
return ((Boolean) process(messages.getUnmarked())).booleanValue();
return ((Boolean) process(messages.getMessages())).booleanValue();
}

}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2010 the original author or authors.
* Copyright 2002-2011 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
Expand All @@ -19,6 +19,7 @@
* A {@link ReleaseStrategy} that releases only the first <code>n</code> messages, where <code>n</code> is a threshold.
*
* @author Dave Syer
* @author Oleg Zhurakousky
*
*/
public class MessageCountReleaseStrategy implements ReleaseStrategy {
Expand All @@ -41,12 +42,12 @@ public MessageCountReleaseStrategy() {
}

/**
* Release the group if it has more messages than the threshold and has not previously been released. Previous
* releases leave an imprint on the group in the form of marked messages. It is possible that more messages than the
* threshold could be released, but only if multiple consumers receive messages from the same group concurrently.
* Release the group if it has more messages than the threshold and has not previously been released.
* It is possible that more messages than the threshold could be released, but only if multiple consumers
* receive messages from the same group concurrently.
*/
public boolean canRelease(MessageGroup group) {
return group.size() >= threshold && group.getMarked().size() == 0;
return group.size() >= threshold;
}

}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2010 the original author or authors.
* Copyright 2002-2011 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -79,7 +79,7 @@ public void setBeanFactory(BeanFactory beanFactory) {

@Override
protected final Object aggregatePayloads(MessageGroup group, Map<String, Object> headers) {
final Collection<Message<?>> messagesUpForProcessing = group.getUnmarked();
final Collection<Message<?>> messagesUpForProcessing = group.getMessages();
return this.processor.process(messagesUpForProcessing, headers);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2010 the original author or authors.
* Copyright 2002-2011 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -51,7 +51,7 @@ public void setBeanFactory(BeanFactory beanFactory) {
}

public boolean canRelease(MessageGroup messages) {
return this.adapter.process(messages.getUnmarked(), null);
return this.adapter.process(messages.getMessages(), null);
}

}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2010 the original author or authors.
* Copyright 2002-2011 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
Expand All @@ -16,7 +16,7 @@
import org.springframework.integration.store.MessageGroup;

/**
* This implementation of MessageGroupProcessor will return all unmarked messages inside the group.
* This implementation of MessageGroupProcessor will return all messages inside the group.
* This is useful if there is no requirement to process the messages, but they should just be
* blocked as a group until their ReleaseStrategy lets them pass through.
*
Expand All @@ -26,7 +26,7 @@
public class PassThroughMessageGroupProcessor implements MessageGroupProcessor {

public Object processMessageGroup(MessageGroup group) {
return group.getUnmarked();
return group.getMessages();
}

}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2010 the original author or authors.
* Copyright 2002-2011 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
Expand Down Expand Up @@ -40,7 +40,7 @@ public void setComparator(Comparator<Message<?>> comparator) {
}

public Object processMessageGroup(MessageGroup group) {
Collection<Message<?>> messages = group.getUnmarked();
Collection<Message<?>> messages = group.getMessages();

if (messages.size() > 0) {
List<Message<?>> sorted = new ArrayList<Message<?>>(messages);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ public ResequencingMessageHandler(MessageGroupProcessor processor) {
@Override
protected void afterRelease(MessageGroup messageGroup, Collection<Message<?>> completedMessages) {

int size = messageGroup.getUnmarked().size() + messageGroup.getMarked().size();
int size = messageGroup.getMessages().size();
int sequenceSize = 0;
Message<?> message = messageGroup.getOne();
if (message != null){
Expand All @@ -62,17 +62,8 @@ protected void afterRelease(MessageGroup messageGroup, Collection<Message<?>> co
if (completedMessages != null){
int lastReleasedSequenceNumber = this.findLastReleasedSequenceNumber(messageGroup.getGroupId(), completedMessages);
messageStore.setLastReleasedSequenceNumberForGroup(messageGroup.getGroupId(), lastReleasedSequenceNumber);

if (this.keepReleasedMessages){
Object id = messageGroup.getGroupId();
for (Message<?> msg : completedMessages) {
messageStore.markMessageFromGroup(id, msg);
}
}
else {
for (Message<?> msg : completedMessages) {
this.messageStore.removeMessageFromGroup(messageGroup.getGroupId(), msg);
}
for (Message<?> msg : completedMessages) {
this.messageStore.removeMessageFromGroup(messageGroup.getGroupId(), msg);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,14 +67,14 @@ public boolean canRelease(MessageGroup messageGroup) {

boolean canRelease = false;

Collection<Message<?>> unmarked = messageGroup.getUnmarked();
Collection<Message<?>> messages = messageGroup.getMessages();

if (releasePartialSequences && !unmarked.isEmpty()) {
if (releasePartialSequences && !messages.isEmpty()) {

if (logger.isTraceEnabled()) {
logger.trace("Considering partial release of group [" + messageGroup + "]");
}
List<Message<?>> sorted = new ArrayList<Message<?>>(unmarked);
List<Message<?>> sorted = new ArrayList<Message<?>>(messages);
Collections.sort(sorted, comparator);

int nextSequenceNumber = sorted.get(0).getHeaders().getSequenceNumber();
Expand All @@ -85,7 +85,7 @@ public boolean canRelease(MessageGroup messageGroup) {
}
}
else {
int size = messageGroup.getUnmarked().size();
int size = messages.size();

if (size == 0){
canRelease = true;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2008 the original author or authors.
* Copyright 2002-2011 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
Expand Down Expand Up @@ -60,7 +60,7 @@ public TimeoutCountSequenceSizeReleaseStrategy(int threshold, long timeout) {

public boolean canRelease(MessageGroup messages) {
long elapsedTime = System.currentTimeMillis() - findEarliestTimestamp(messages);
return messages.isComplete() || messages.getUnmarked().size() >= threshold || elapsedTime > timeout;
return messages.isComplete() || messages.getMessages().size() >= threshold || elapsedTime > timeout;
}

/**
Expand All @@ -69,7 +69,7 @@ public boolean canRelease(MessageGroup messages) {
*/
private long findEarliestTimestamp(MessageGroup messages) {
long result = Long.MAX_VALUE;
for (Message<?> message : messages.getUnmarked()) {
for (Message<?> message : messages.getMessages()) {
long timestamp = message.getHeaders().getTimestamp();
if (timestamp < result) {
result = timestamp;
Expand Down
Loading