Skip to content

Commit 5adb0a8

Browse files
authored
Merge pull request #1560 from rabbitmq/lukebakken/misc-changes-from-async-session-shutdown
Misc changes
2 parents d39d79a + 0a06a23 commit 5adb0a8

13 files changed

+396
-213
lines changed

.github/workflows/build-test.yaml

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -73,8 +73,8 @@ jobs:
7373
Receive-Job -Job $tx; `
7474
& "${{ github.workspace }}\.ci\windows\toxiproxy\toxiproxy-cli.exe" list; `
7575
dotnet test `
76-
--environment "RABBITMQ_RABBITMQCTL_PATH=${{ steps.install-start-rabbitmq.outputs.path }}" `
7776
--environment 'RABBITMQ_LONG_RUNNING_TESTS=true' `
77+
--environment "RABBITMQ_RABBITMQCTL_PATH=${{ steps.install-start-rabbitmq.outputs.path }}" `
7878
--environment 'RABBITMQ_TOXIPROXY_TESTS=true' `
7979
--environment 'PASSWORD=grapefruit' `
8080
--environment SSL_CERTS_DIR="${{ github.workspace }}\.ci\certs" `
@@ -114,7 +114,12 @@ jobs:
114114
id: install-start-rabbitmq
115115
run: ${{ github.workspace }}\.ci\windows\gha-setup.ps1
116116
- name: Sequential Integration Tests
117-
run: dotnet test --environment "RABBITMQ_RABBITMQCTL_PATH=${{ steps.install-start-rabbitmq.outputs.path }}" "${{ github.workspace }}\projects\Test\SequentialIntegration\SequentialIntegration.csproj" --no-restore --no-build --logger 'console;verbosity=detailed'
117+
run: dotnet test `
118+
--environment 'RABBITMQ_LONG_RUNNING_TESTS=true' `
119+
--environment "RABBITMQ_RABBITMQCTL_PATH=${{ steps.install-start-rabbitmq.outputs.path }}" `
120+
--environment 'PASSWORD=grapefruit' `
121+
--environment SSL_CERTS_DIR="${{ github.workspace }}\.ci\certs" `
122+
"${{ github.workspace }}\projects\Test\SequentialIntegration\SequentialIntegration.csproj" --no-restore --no-build --logger 'console;verbosity=detailed'
118123
- name: Maybe upload RabbitMQ logs
119124
if: failure()
120125
uses: actions/upload-artifact@v4
@@ -182,8 +187,8 @@ jobs:
182187
- name: Integration Tests
183188
run: |
184189
dotnet test \
185-
--environment "RABBITMQ_RABBITMQCTL_PATH=DOCKER:${{ steps.start-rabbitmq.outputs.id }}" \
186190
--environment 'RABBITMQ_LONG_RUNNING_TESTS=true' \
191+
--environment "RABBITMQ_RABBITMQCTL_PATH=DOCKER:${{ steps.start-rabbitmq.outputs.id }}" \
187192
--environment 'RABBITMQ_TOXIPROXY_TESTS=true' \
188193
--environment 'PASSWORD=grapefruit' \
189194
--environment SSL_CERTS_DIR="${{ github.workspace }}/.ci/certs" \
@@ -222,7 +227,10 @@ jobs:
222227
- name: Sequential Integration Tests
223228
run: |
224229
dotnet test \
230+
--environment 'RABBITMQ_LONG_RUNNING_TESTS=true' \
225231
--environment "RABBITMQ_RABBITMQCTL_PATH=DOCKER:${{ steps.start-rabbitmq.outputs.id }}" \
232+
--environment 'PASSWORD=grapefruit' \
233+
--environment SSL_CERTS_DIR="${{ github.workspace }}/.ci/certs" \
226234
"${{ github.workspace }}/projects/Test/SequentialIntegration/SequentialIntegration.csproj" --no-restore --no-build --logger 'console;verbosity=detailed'
227235
- name: Maybe upload RabbitMQ logs
228236
if: failure()

Makefile

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,13 +21,17 @@ build:
2121
test:
2222
dotnet test $(CURDIR)/projects/Test/Unit/Unit.csproj --logger 'console;verbosity=detailed'
2323
dotnet test --environment 'GITHUB_ACTIONS=true' \
24-
--environment "RABBITMQ_RABBITMQCTL_PATH=DOCKER:$$(docker inspect --format='{{.Id}}' $(RABBITMQ_DOCKER_NAME))" \
2524
--environment 'RABBITMQ_LONG_RUNNING_TESTS=true' \
25+
--environment "RABBITMQ_RABBITMQCTL_PATH=DOCKER:$$(docker inspect --format='{{.Id}}' $(RABBITMQ_DOCKER_NAME))" \
2626
--environment 'RABBITMQ_TOXIPROXY_TESTS=true' \
2727
--environment 'PASSWORD=grapefruit' \
2828
--environment SSL_CERTS_DIR="$(CURDIR)/.ci/certs" \
2929
"$(CURDIR)/projects/Test/Integration/Integration.csproj" --logger 'console;verbosity=detailed'
30-
dotnet test --environment "RABBITMQ_RABBITMQCTL_PATH=DOCKER:$$(docker inspect --format='{{.Id}}' $(RABBITMQ_DOCKER_NAME))" $(CURDIR)/projects/Test/SequentialIntegration/SequentialIntegration.csproj --logger 'console;verbosity=detailed'
30+
dotnet test --environment 'RABBITMQ_LONG_RUNNING_TESTS=true' \
31+
--environment "RABBITMQ_RABBITMQCTL_PATH=DOCKER:$$(docker inspect --format='{{.Id}}' $(RABBITMQ_DOCKER_NAME))" \
32+
--environment 'PASSWORD=grapefruit' \
33+
--environment SSL_CERTS_DIR="$(CURDIR)/.ci/certs" \
34+
$(CURDIR)/projects/Test/SequentialIntegration/SequentialIntegration.csproj --logger 'console;verbosity=detailed'
3135

3236
# Note:
3337
# You must have the expected OAuth2 environment set up for this target
Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
using System;
2+
using System.Linq;
3+
using System.Threading.Tasks;
4+
using RabbitMQ.Client.Events;
5+
6+
namespace RabbitMQ.Client.Impl
7+
{
8+
#nullable enable
9+
internal struct AsyncEventingWrapper<T>
10+
{
11+
private event AsyncEventHandler<T>? _event;
12+
private Delegate[]? _handlers;
13+
private string? _context;
14+
private Func<Exception, string, Task>? _onException;
15+
16+
public readonly bool IsEmpty => _event is null;
17+
18+
public AsyncEventingWrapper(string context, Func<Exception, string, Task> onException)
19+
{
20+
_event = null;
21+
_handlers = null;
22+
_context = context;
23+
_onException = onException;
24+
}
25+
26+
public void AddHandler(AsyncEventHandler<T>? handler)
27+
{
28+
_event += handler;
29+
_handlers = null;
30+
}
31+
32+
public void RemoveHandler(AsyncEventHandler<T>? handler)
33+
{
34+
_event -= handler;
35+
_handlers = null;
36+
}
37+
38+
// Do not make this function async! (This type is a struct that gets copied at the start of an async method => empty _handlers is copied)
39+
public Task InvokeAsync(object sender, T parameter)
40+
{
41+
Delegate[]? handlers = _handlers;
42+
if (handlers is null)
43+
{
44+
handlers = _event?.GetInvocationList();
45+
if (handlers is null)
46+
{
47+
return Task.CompletedTask;
48+
}
49+
50+
_handlers = handlers;
51+
}
52+
53+
return InternalInvoke(handlers, sender, parameter);
54+
}
55+
56+
private readonly async Task InternalInvoke(Delegate[] handlers, object sender, T parameter)
57+
{
58+
foreach (AsyncEventHandler<T> action in handlers.Cast<AsyncEventHandler<T>>())
59+
{
60+
try
61+
{
62+
await action(sender, parameter)
63+
.ConfigureAwait(false);
64+
}
65+
catch (Exception exception)
66+
{
67+
if (_onException != null)
68+
{
69+
await _onException(exception, _context!)
70+
.ConfigureAwait(false);
71+
}
72+
else
73+
{
74+
throw;
75+
}
76+
}
77+
}
78+
}
79+
80+
public void Takeover(in AsyncEventingWrapper<T> other)
81+
{
82+
_event = other._event;
83+
_handlers = other._handlers;
84+
_context = other._context;
85+
_onException = other._onException;
86+
}
87+
}
88+
}

projects/RabbitMQ.Client/client/impl/Connection.Heartbeat.cs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -130,6 +130,13 @@ await FinishCloseAsync(cts.Token)
130130
_heartbeatReadTimer?.Change((int)Heartbeat.TotalMilliseconds, Timeout.Infinite);
131131
}
132132
}
133+
catch (OperationCanceledException)
134+
{
135+
if (false == _mainLoopCts.IsCancellationRequested)
136+
{
137+
throw;
138+
}
139+
}
133140
catch (ObjectDisposedException)
134141
{
135142
// timer is already disposed,

projects/RabbitMQ.Client/client/impl/Connection.cs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
using System.IO;
3636
using System.Runtime.CompilerServices;
3737
using System.Threading;
38+
using System.Threading.Channels;
3839
using System.Threading.Tasks;
3940
using RabbitMQ.Client.Events;
4041
using RabbitMQ.Client.Exceptions;
@@ -335,6 +336,13 @@ await _session0.TransmitAsync(method, cancellationToken)
335336
.ConfigureAwait(false);
336337
}
337338
}
339+
catch (ChannelClosedException)
340+
{
341+
if (false == abort)
342+
{
343+
throw;
344+
}
345+
}
338346
catch (AlreadyClosedException)
339347
{
340348
if (false == abort)

projects/RabbitMQ.Client/client/impl/ConsumerDispatching/AsyncConsumerDispatcher.cs

Lines changed: 29 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -16,39 +16,49 @@ internal AsyncConsumerDispatcher(ChannelBase channel, int concurrency)
1616

1717
protected override async Task ProcessChannelAsync(CancellationToken token)
1818
{
19-
while (await _reader.WaitToReadAsync(token).ConfigureAwait(false))
19+
try
2020
{
21-
while (_reader.TryRead(out WorkStruct work))
21+
while (await _reader.WaitToReadAsync(token).ConfigureAwait(false))
2222
{
23-
using (work)
23+
while (_reader.TryRead(out WorkStruct work))
2424
{
25-
try
25+
using (work)
2626
{
27-
Task task = work.WorkType switch
27+
try
2828
{
29-
WorkType.Deliver => work.AsyncConsumer.HandleBasicDeliver(
30-
work.ConsumerTag, work.DeliveryTag, work.Redelivered,
31-
work.Exchange, work.RoutingKey, work.BasicProperties, work.Body.Memory),
29+
Task task = work.WorkType switch
30+
{
31+
WorkType.Deliver => work.AsyncConsumer.HandleBasicDeliver(
32+
work.ConsumerTag, work.DeliveryTag, work.Redelivered,
33+
work.Exchange, work.RoutingKey, work.BasicProperties, work.Body.Memory),
3234

33-
WorkType.Cancel => work.AsyncConsumer.HandleBasicCancel(work.ConsumerTag),
35+
WorkType.Cancel => work.AsyncConsumer.HandleBasicCancel(work.ConsumerTag),
3436

35-
WorkType.CancelOk => work.AsyncConsumer.HandleBasicCancelOk(work.ConsumerTag),
37+
WorkType.CancelOk => work.AsyncConsumer.HandleBasicCancelOk(work.ConsumerTag),
3638

37-
WorkType.ConsumeOk => work.AsyncConsumer.HandleBasicConsumeOk(work.ConsumerTag),
39+
WorkType.ConsumeOk => work.AsyncConsumer.HandleBasicConsumeOk(work.ConsumerTag),
3840

39-
WorkType.Shutdown => work.AsyncConsumer.HandleChannelShutdown(_channel, work.Reason),
41+
WorkType.Shutdown => work.AsyncConsumer.HandleChannelShutdown(_channel, work.Reason),
4042

41-
_ => Task.CompletedTask
42-
};
43-
await task.ConfigureAwait(false);
44-
}
45-
catch (Exception e)
46-
{
47-
_channel.OnCallbackException(CallbackExceptionEventArgs.Build(e, work.WorkType.ToString(), work.Consumer));
43+
_ => Task.CompletedTask
44+
};
45+
await task.ConfigureAwait(false);
46+
}
47+
catch (Exception e)
48+
{
49+
_channel.OnCallbackException(CallbackExceptionEventArgs.Build(e, work.WorkType.ToString(), work.Consumer));
50+
}
4851
}
4952
}
5053
}
5154
}
55+
catch (OperationCanceledException)
56+
{
57+
if (false == token.IsCancellationRequested)
58+
{
59+
throw;
60+
}
61+
}
5262
}
5363
}
5464
}

projects/RabbitMQ.Client/client/impl/ConsumerDispatching/ConsumerDispatcher.cs

Lines changed: 39 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -16,45 +16,55 @@ internal ConsumerDispatcher(ChannelBase channel, int concurrency)
1616

1717
protected override async Task ProcessChannelAsync(CancellationToken token)
1818
{
19-
while (await _reader.WaitToReadAsync(token).ConfigureAwait(false))
19+
try
2020
{
21-
while (_reader.TryRead(out var work))
21+
while (await _reader.WaitToReadAsync(token).ConfigureAwait(false))
2222
{
23-
using (work)
23+
while (_reader.TryRead(out WorkStruct work))
2424
{
25-
try
25+
using (work)
2626
{
27-
IBasicConsumer consumer = work.Consumer;
28-
string? consumerTag = work.ConsumerTag;
29-
switch (work.WorkType)
27+
try
3028
{
31-
case WorkType.Deliver:
32-
await consumer.HandleBasicDeliverAsync(
33-
consumerTag, work.DeliveryTag, work.Redelivered,
34-
work.Exchange, work.RoutingKey, work.BasicProperties, work.Body.Memory)
35-
.ConfigureAwait(false);
36-
break;
37-
case WorkType.Cancel:
38-
consumer.HandleBasicCancel(consumerTag);
39-
break;
40-
case WorkType.CancelOk:
41-
consumer.HandleBasicCancelOk(consumerTag);
42-
break;
43-
case WorkType.ConsumeOk:
44-
consumer.HandleBasicConsumeOk(consumerTag);
45-
break;
46-
case WorkType.Shutdown:
47-
consumer.HandleChannelShutdown(_channel, work.Reason);
48-
break;
29+
IBasicConsumer consumer = work.Consumer;
30+
string? consumerTag = work.ConsumerTag;
31+
switch (work.WorkType)
32+
{
33+
case WorkType.Deliver:
34+
await consumer.HandleBasicDeliverAsync(
35+
consumerTag, work.DeliveryTag, work.Redelivered,
36+
work.Exchange, work.RoutingKey, work.BasicProperties, work.Body.Memory)
37+
.ConfigureAwait(false);
38+
break;
39+
case WorkType.Cancel:
40+
consumer.HandleBasicCancel(consumerTag);
41+
break;
42+
case WorkType.CancelOk:
43+
consumer.HandleBasicCancelOk(consumerTag);
44+
break;
45+
case WorkType.ConsumeOk:
46+
consumer.HandleBasicConsumeOk(consumerTag);
47+
break;
48+
case WorkType.Shutdown:
49+
consumer.HandleChannelShutdown(_channel, work.Reason);
50+
break;
51+
}
52+
}
53+
catch (Exception e)
54+
{
55+
_channel.OnCallbackException(CallbackExceptionEventArgs.Build(e, work.WorkType.ToString(), work.Consumer));
4956
}
50-
}
51-
catch (Exception e)
52-
{
53-
_channel.OnCallbackException(CallbackExceptionEventArgs.Build(e, work.WorkType.ToString(), work.Consumer));
5457
}
5558
}
5659
}
5760
}
61+
catch (OperationCanceledException)
62+
{
63+
if (false == token.IsCancellationRequested)
64+
{
65+
throw;
66+
}
67+
}
5868
}
5969
}
6070
}

0 commit comments

Comments
 (0)