Skip to content

Commit 7cba383

Browse files
authored
Merge pull request #74 from Gsantomaggio/e2e-behaviour
E2E test for consuming 100 messages.
2 parents 8d79480 + d5ba3f1 commit 7cba383

File tree

5 files changed

+112
-5
lines changed

5 files changed

+112
-5
lines changed

internal/command_types.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,7 @@ const (
118118
CommandSubscribeResponse uint16 = 0x8007
119119
CommandCreditResponse uint16 = 0x8009
120120
CommandQueryOffsetResponse uint16 = 0x800b
121+
CommandUnsubscribeResponse uint16 = 0x800c
121122
CommandCreateResponse uint16 = 0x800d
122123
CommandDeleteResponse uint16 = 0x800e
123124
CommandMetadataResponse uint16 = 0x800f

pkg/e2e/end_to_end_test.go

Lines changed: 92 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ var _ = Describe("E2E", Serial, Label("e2e"), func() {
7575
for {
7676
select {
7777
case <-ctx.Done():
78-
Fail(fmt.Sprintf("context timed out: expected to receive 1_000_000 confirmations: received %d", len(confirmedIds)))
78+
Fail(fmt.Sprintf("context timed out: expected to receive 100_000 confirmations: received %d", len(confirmedIds)))
7979
case confirm, ok := <-c:
8080
if !ok {
8181
return
@@ -113,7 +113,98 @@ var _ = Describe("E2E", Serial, Label("e2e"), func() {
113113

114114
By("closing the connection")
115115
Expect(streamClient.Close(itCtx)).To(Succeed())
116+
116117
}, SpecTimeout(120*time.Second))
118+
119+
// Send and Recveive Messages, assert messages received are valid.
120+
It("sends, and receives messages", Label("behaviour"), func(ctx SpecContext) {
121+
h := slog.HandlerOptions{Level: slog.LevelDebug}.NewTextHandler(GinkgoWriter)
122+
debugLogger := slog.New(h)
123+
itCtx := raw.NewContextWithLogger(ctx, *debugLogger)
124+
streamClientConfiguration, err := raw.NewClientConfiguration("rabbitmq-stream://guest:guest@localhost/%2F")
125+
Expect(err).ToNot(HaveOccurred())
126+
127+
By("preparing the environment")
128+
streamClient, err := raw.DialConfig(itCtx, streamClientConfiguration)
129+
Expect(err).ToNot(HaveOccurred())
130+
131+
const stream = "e2e-consume-test"
132+
// Ensure we don't leak open connection on test failures
133+
DeferCleanup(func(ctx SpecContext) error {
134+
if streamClient.IsOpen() {
135+
_ = streamClient.DeleteStream(ctx, stream)
136+
return streamClient.Close(ctx)
137+
}
138+
return nil
139+
})
140+
141+
Expect(streamClient.IsOpen()).To(BeTrue(), "expected stream client to be open")
142+
Expect(streamClient.ExchangeCommandVersions(ctx)).To(Succeed())
143+
144+
Expect(streamClient.DeclareStream(itCtx, stream, constants.StreamConfiguration{})).To(Succeed())
145+
146+
const publisherId = 2
147+
Expect(
148+
streamClient.DeclarePublisher(itCtx, publisherId, "e2e-send-and-receive", stream),
149+
).To(Succeed())
150+
151+
c := streamClient.NotifyPublish(make(chan *raw.PublishConfirm, 100))
152+
153+
const numMessages = 100
154+
for i := 0; i < numMessages; i++ {
155+
Expect(
156+
streamClient.Send(itCtx, publisherId, wrap[common.PublishingMessager](raw.NewPublishingMessage(uint64(i), &plainTextMessage{messageBody}))),
157+
).To(Succeed())
158+
}
159+
160+
var countOfPublishingIds int
161+
for confirm := range c {
162+
Expect(confirm.PublisherID()).To(BeNumerically("==", publisherId))
163+
countOfPublishingIds += len(confirm.PublishingIds())
164+
if countOfPublishingIds >= numMessages {
165+
break
166+
}
167+
}
168+
169+
// Assert number of PublishConfirms matches the number of messages sent
170+
Expect(countOfPublishingIds).To(Equal(numMessages))
171+
172+
By("receiving the expected number of messages")
173+
chunks := streamClient.NotifyChunk(make(chan *raw.Chunk, 10))
174+
var subscriptionId uint8 = 2
175+
Expect(
176+
streamClient.Subscribe(itCtx, stream, constants.OffsetTypeFirst, subscriptionId, 10, nil, 0),
177+
).To(Succeed())
178+
179+
var numOfEntries uint16 = 0
180+
for chunk := range chunks {
181+
numOfEntries += chunk.NumEntries
182+
debugLogger.Info("chunk received", "chunk", chunk)
183+
184+
m := &plainTextMessage{}
185+
for i := uint16(0); i < chunk.NumEntries; i++ {
186+
x, z := i*104, (i+1)*104
187+
Expect(m.UnmarshalBinary(chunk.Messages[x:z])).To(Succeed())
188+
Expect(m.body).To(Equal(messageBody))
189+
}
190+
191+
if numOfEntries >= numMessages {
192+
break
193+
}
194+
195+
Expect(streamClient.Credit(ctx, subscriptionId, 10)).To(Succeed())
196+
}
197+
198+
By("unsubscribing")
199+
Expect(streamClient.Unsubscribe(ctx, subscriptionId)).To(Succeed())
200+
//Expect(chunks).To(BeClosed())
201+
202+
By("cleaning up")
203+
Expect(streamClient.DeletePublisher(ctx, publisherId)).To(Succeed())
204+
Expect(streamClient.DeleteStream(itCtx, stream)).To(Succeed())
205+
Expect(streamClient.Close(itCtx)).To(Succeed())
206+
}, SpecTimeout(15*time.Second))
207+
117208
})
118209

119210
func wrap[T any](v T) []T {

pkg/e2e/types_test.go

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
package e2e_test
44

55
import (
6+
"bytes"
67
"encoding/binary"
78
"io"
89
)
@@ -11,13 +12,25 @@ type plainTextMessage struct {
1112
body string
1213
}
1314

15+
func (p *plainTextMessage) UnmarshalBinary(data []byte) error {
16+
var dataLen uint32
17+
err := binary.Read(bytes.NewReader(data), binary.BigEndian, &dataLen)
18+
if err != nil {
19+
return err
20+
}
21+
22+
p.body = string(data[4 : dataLen+4])
23+
24+
return nil
25+
}
26+
1427
func (p *plainTextMessage) WriteTo(w io.Writer) (n int64, err error) {
1528
n = 0
1629
err = binary.Write(w, binary.BigEndian, uint32(len(p.body)))
1730
if err != nil {
1831
return
1932
}
20-
n += 2
33+
n += 4
2134

2235
n32, err := w.Write([]byte(p.body))
2336
n += int64(n32)

pkg/raw/client.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -286,7 +286,8 @@ func (tc *Client) handleIncoming(ctx context.Context) error {
286286
internal.CommandDeclarePublisherResponse,
287287
internal.CommandDeletePublisherResponse,
288288
internal.CommandCloseResponse,
289-
internal.CommandSubscribeResponse:
289+
internal.CommandSubscribeResponse,
290+
internal.CommandUnsubscribeResponse:
290291
createResp := new(internal.SimpleResponse)
291292
err = createResp.Read(buffer)
292293
if err != nil {
@@ -366,7 +367,7 @@ func (tc *Client) handleIncoming(ctx context.Context) error {
366367
case <-ctx.Done():
367368
return ctx.Err()
368369
case tc.chunkCh <- chunkResponse:
369-
log.Debug("sent a subscription chunk", "subscriptionId", chunkResponse.SubscriptionId)
370+
log.Debug("received a chunk", "subscriptionId", chunkResponse.SubscriptionId)
370371
}
371372
case internal.CommandExchangeCommandVersionsResponse:
372373
exchangeResponse := new(internal.ExchangeCommandVersionsResponse)
@@ -1071,6 +1072,7 @@ func (tc *Client) Unsubscribe(ctx context.Context, subscriptionId uint8) error {
10711072
if err != nil {
10721073
return err
10731074
}
1075+
// FIXME: close subscription channel
10741076
return streamErrorOrNil(unSubscribeResponse.ResponseCode())
10751077
}
10761078

pkg/raw/stream_suite_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -589,7 +589,7 @@ func (rmq *fakeRabbitMQServer) fakeRabbitMQUnsubscribe(ctx context.Context, subs
589589

590590
/// there server says ok! :)
591591
/// writing the response to the client
592-
writeResponse(ctx, rmq, bufio.NewWriter(rmq.connection), internal.CommandCreate)
592+
writeResponse(ctx, rmq, bufio.NewWriter(rmq.connection), internal.CommandUnsubscribeResponse)
593593
}
594594

595595
func (rmq *fakeRabbitMQServer) fakeRabbitMQServerClosesConnection() {

0 commit comments

Comments
 (0)