Skip to content

Commit 69a19be

Browse files
committed
Merge branch 'cfredri4-revert-876'
2 parents f47d276 + ca0d142 commit 69a19be

File tree

2 files changed

+13
-27
lines changed

2 files changed

+13
-27
lines changed

src/org/jgroups/protocols/ReliableMulticast.java

Lines changed: 8 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -105,9 +105,6 @@ public abstract class ReliableMulticast extends Protocol implements DiagnosticsH
105105
@ManagedAttribute(description="Number of messages received",type=SCALAR)
106106
protected final LongAdder num_messages_received=new LongAdder();
107107

108-
@ManagedAttribute(description="Number of sends dropped",type=SCALAR)
109-
protected final LongAdder num_sends_dropped=new LongAdder();
110-
111108
protected static final Message DUMMY_OOB_MSG=new EmptyMessage().setFlag(OOB);
112109

113110
// Accepts messages which are (1) non-null, (2) no DUMMY_OOB_MSGs and (3) not OOB_DELIVERED
@@ -242,7 +239,6 @@ public ReliableMulticast clearCachedBatches() {
242239
public ReliableMulticast sendsCanBlock(boolean s) {this.sends_can_block=s; return this;}
243240
public long getNumMessagesSent() {return num_messages_sent.sum();}
244241
public long getNumMessagesReceived() {return num_messages_received.sum();}
245-
public long getNumSendsDropped() {return num_sends_dropped.sum();}
246242
public boolean reuseMessageBatches() {return reuse_message_batches;}
247243
public ReliableMulticast reuseMessageBatches(boolean b) {this.reuse_message_batches=b; return this;}
248244
public boolean sendAtomically() {return send_atomically;}
@@ -356,7 +352,6 @@ protected Entry sendEntry() {
356352
public void resetStats() {
357353
num_messages_sent.reset();
358354
num_messages_received.reset();
359-
num_sends_dropped.reset();
360355
xmit_reqs_received.reset();
361356
xmit_reqs_sent.reset();
362357
xmit_rsps_received.reset();
@@ -514,17 +509,14 @@ public Object down(Message msg) {
514509
msg.setSrc(local_addr); // this needs to be done so we can check whether the message sender is the local_addr
515510
boolean dont_loopback_set=msg.isFlagSet(DONT_LOOPBACK);
516511
Buffer<Message> win=send_entry.buf();
517-
boolean sent=send(msg, win, dont_loopback_set);
518-
last_seqno_resender.skipNext();
519-
if(sent) {
512+
if(send(msg, win, dont_loopback_set)) {
520513
num_messages_sent.increment();
521514
if(dont_loopback_set && needToSendAck(send_entry, 1))
522515
handleAck(local_addr, win.highestDelivered()); // https://issues.redhat.com/browse/JGRP-2829
523516
}
524-
else {
525-
num_sends_dropped.increment();
526-
log.warn("%s: discarded message due to full send buffer, message: %s", local_addr, msg);
527-
}
517+
else
518+
log.trace("%s: dropped message due to closed send buffer, message: %s", local_addr, msg);
519+
last_seqno_resender.skipNext();
528520
return null; // don't pass down the stack
529521
}
530522

@@ -710,10 +702,10 @@ protected boolean send(Message msg, Buffer<Message> win, boolean dont_loopback_s
710702
if(is_trace)
711703
log.trace("%s --> [all]: #%d", local_addr, msg_id);
712704
msg.putHeader(this.id, NakAckHeader.createMessageHeader(msg_id));
713-
boolean added=addToSendBuffer(win, msg_id, msg, dont_loopback_set? remove_filter : null);
714-
if(added)
715-
down_prot.down(msg); // if this fails, since msg is in sent_msgs, it can be retransmitted
716-
return added;
705+
if(!addToSendBuffer(win, msg_id, msg, dont_loopback_set? remove_filter : null))
706+
return false; // e.g. message already present in send buffer, or buffer is closed
707+
down_prot.down(msg); // if this fails, since msg is in sent_msgs, it can be retransmitted
708+
return true;
717709
}
718710
finally {
719711
if(lock != null)

src/org/jgroups/protocols/ReliableUnicast.java

Lines changed: 5 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -101,8 +101,6 @@ public abstract class ReliableUnicast extends Protocol implements AgeOutCache.Ha
101101
protected final LongAdder num_msgs_sent=new LongAdder();
102102
@ManagedAttribute(description="Number of message received",type=SCALAR)
103103
protected final LongAdder num_msgs_received=new LongAdder();
104-
@ManagedAttribute(description="Number of sends dropped",type=SCALAR)
105-
protected final LongAdder num_sends_dropped=new LongAdder();
106104
@ManagedAttribute(description="Number of acks sent",type=SCALAR)
107105
protected final LongAdder num_acks_sent=new LongAdder();
108106
@ManagedAttribute(description="Number of acks received",type=SCALAR)
@@ -287,7 +285,6 @@ public ReliableUnicast trimCachedBatches() {
287285
public long getNumMessagesReceived() {return num_msgs_received.sum();}
288286

289287

290-
public long getNumSendsDropped() {return num_sends_dropped.sum();}
291288
public long getNumAcksSent() {return num_acks_sent.sum();}
292289
public long getNumAcksReceived() {return num_acks_received.sum();}
293290
public long getNumXmits() {return num_xmits.sum();}
@@ -376,7 +373,7 @@ public String printSendWindowMessages() {
376373

377374
public void resetStats() {
378375
avg_delivery_batch_size.clear();
379-
Stream.of(num_msgs_sent, num_msgs_received, num_sends_dropped, num_acks_sent, num_acks_received, num_xmits,
376+
Stream.of(num_msgs_sent, num_msgs_received, num_acks_sent, num_acks_received, num_xmits,
380377
xmit_reqs_received, xmit_reqs_sent, xmit_rsps_sent, num_loopbacks).forEach(LongAdder::reset);
381378
send_table.values().stream().map(e -> e.buf).forEach(Buffer::resetStats);
382379
recv_table.values().stream().map(e -> e.buf).forEach(Buffer::resetStats);
@@ -683,10 +680,8 @@ public Object down(Message msg) {
683680
boolean dont_loopback_set=msg.isFlagSet(DONT_LOOPBACK) && dst.equals(local_addr);
684681
if(send(msg, entry, dont_loopback_set))
685682
num_msgs_sent.increment();
686-
else {
687-
num_sends_dropped.increment();
688-
log.warn("%s: discarded message due to full send buffer, message: %s", local_addr, msg);
689-
}
683+
else
684+
log.trace("%s: dropped message due to closed send buffer, message: %s", local_addr, msg);
690685
return null; // the message was already sent down the stack in send()
691686
}
692687

@@ -1092,9 +1087,8 @@ protected boolean send(Message msg, SenderEntry entry, boolean dont_loopback_set
10921087
try {
10931088
seqno=entry.seqno.getAndIncrement();
10941089
msg.putHeader(this.id,UnicastHeader.createDataHeader(seqno, send_conn_id,seqno == DEFAULT_FIRST_SEQNO));
1095-
boolean added=addToSendBuffer(buf, seqno, msg, dont_loopback_set? remove_filter : null);
1096-
if(!added) // e.g. message already present in send buffer
1097-
return false;
1090+
if(!addToSendBuffer(buf, seqno, msg, dont_loopback_set? remove_filter : null))
1091+
return false; // e.g. message already present in send buffer, or buffer is closed
10981092
down_prot.down(msg); // if this fails, since msg is in sent_msgs, it can be retransmitted
10991093
if(entry.state() == State.CLOSING)
11001094
entry.state(State.OPEN);

0 commit comments

Comments
 (0)