Skip to content

Commit 71eff1d

Browse files
Return an error in the leader is not ready yet (#98)
* Return an error in the leader is not ready yet, add also a small fix in the aggregation Co-authored-by: Victor Elias <[email protected]>
1 parent e191752 commit 71eff1d

File tree

5 files changed

+34
-10
lines changed

5 files changed

+34
-10
lines changed

pkg/stream/aggregation.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -40,15 +40,15 @@ func (compression Compression) Gzip() Compression {
4040
}
4141

4242
func (compression Compression) Snappy() Compression {
43-
return Compression{value: SNAPPY}
43+
return Compression{value: SNAPPY, enabled: true}
4444
}
4545

4646
func (compression Compression) Zstd() Compression {
47-
return Compression{value: ZSTD}
47+
return Compression{value: ZSTD, enabled: true}
4848
}
4949

5050
func (compression Compression) Lz4() Compression {
51-
return Compression{value: LZ4}
51+
return Compression{value: LZ4, enabled: true}
5252
}
5353

5454
type subEntry struct {

pkg/stream/client.go

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -590,7 +590,7 @@ func (c *Client) BrokerLeader(stream string) (*Broker, error) {
590590
return nil, lookErrorCode(streamMetadata.responseCode)
591591
}
592592
if streamMetadata.Leader == nil {
593-
return nil, fmt.Errorf("leader error for stream for stream: %s", stream)
593+
return nil, LeaderNotReady
594594
}
595595

596596
streamMetadata.Leader.advPort = streamMetadata.Leader.Port
@@ -610,16 +610,28 @@ func (c *Client) StreamExists(stream string) bool {
610610
func (c *Client) BrokerForConsumer(stream string) (*Broker, error) {
611611
streamsMetadata := c.metaData(stream)
612612
if streamsMetadata == nil {
613-
return nil, fmt.Errorf("leader error for stream for stream: %s", stream)
613+
return nil, fmt.Errorf("leader error for stream: %s", stream)
614614
}
615615

616616
streamMetadata := streamsMetadata.Get(stream)
617617
if streamMetadata.responseCode != responseCodeOk {
618618
return nil, lookErrorCode(streamMetadata.responseCode)
619619
}
620-
var brokers []*Broker
620+
621+
if streamMetadata.Leader == nil {
622+
return nil, LeaderNotReady
623+
}
624+
625+
brokers := make([]*Broker, 0, 1+len(streamMetadata.Replicas))
621626
brokers = append(brokers, streamMetadata.Leader)
622-
brokers = append(brokers, streamMetadata.Replicas...)
627+
for idx, replica := range streamMetadata.Replicas {
628+
if replica == nil {
629+
logs.LogWarn("Stream %s replica not ready: %d", stream, idx)
630+
continue
631+
}
632+
brokers = append(brokers, replica)
633+
}
634+
623635
rand.Seed(time.Now().UnixNano())
624636
n := rand.Intn(len(brokers))
625637
return brokers[n], nil

pkg/stream/constants.go

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,10 @@ package stream
22

33
import (
44
"fmt"
5+
"time"
6+
57
"github.com/pkg/errors"
68
"github.com/rabbitmq/rabbitmq-stream-go-client/pkg/logs"
7-
"time"
89
)
910

1011
// is needed to indicate the general status
@@ -112,6 +113,11 @@ var PublisherDoesNotExist = errors.New("Publisher Does Not Exist")
112113
var FrameTooLarge = errors.New("Frame Too Large, the buffer is too big")
113114
var CodeAccessRefused = errors.New("Resources Access Refused")
114115
var ConnectionClosed = errors.New("Can't send the message, connection closed")
116+
var StreamNotAvailable = errors.New("Stream Not Available")
117+
var UnknownFrame = errors.New("Unknown Frame")
118+
var InternalError = errors.New("Internal Error")
119+
120+
var LeaderNotReady = errors.New("Leader not Ready yet")
115121

116122
func lookErrorCode(errorCode uint16) error {
117123
switch errorCode {
@@ -135,6 +141,12 @@ func lookErrorCode(errorCode uint16) error {
135141
return FrameTooLarge
136142
case responseCodeAccessRefused:
137143
return CodeAccessRefused
144+
case responseCodeStreamNotAvailable:
145+
return StreamNotAvailable
146+
case responseCodeUnknownFrame:
147+
return UnknownFrame
148+
case responseCodeInternalError:
149+
return InternalError
138150
default:
139151
{
140152
logs.LogWarn("Error not handled %d", errorCode)

pkg/stream/consumer_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -426,7 +426,7 @@ var _ = Describe("Streaming Consumers", func() {
426426

427427
Eventually(func() int32 {
428428
return atomic.LoadInt32(&messagesReceived)
429-
}, 9*time.Second).Should(Equal(int32((2000*5)+(50*len(batchMessages)))),
429+
}, 15*time.Second).Should(Equal(int32((2000*5)+(50*len(batchMessages)))),
430430
"consumer should be the same sent from different publishers settings")
431431

432432
Expect(producer1.Close()).NotTo(HaveOccurred())

pkg/stream/enviroment.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -174,7 +174,7 @@ func (env *Environment) StreamMetaData(streamName string) (*StreamMetadata, erro
174174

175175
if streamMetadata.Leader == nil {
176176

177-
return nil, errors.New("can't find leader for stream: " + streamName)
177+
return nil, LeaderNotReady
178178
}
179179

180180
return streamMetadata, nil

0 commit comments

Comments
 (0)