Skip to content

Commit 6cd598b

Browse files
committed
Adjust locks to fix sync problem
1 parent 3c3e587 commit 6cd598b

File tree

1 file changed

+21
-13
lines changed

1 file changed

+21
-13
lines changed

src/KubernetesClient/Controllers/ResourceEventDeltaBlock.cs

Lines changed: 21 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
using k8s.Informers.Notifications;
88
using Microsoft.Extensions.Logging;
99
using Microsoft.Extensions.Logging.Abstractions;
10+
using Org.BouncyCastle.Asn1.X509;
1011

1112
namespace k8s.Controllers
1213
{
@@ -104,7 +105,8 @@ public DataflowMessageStatus OfferMessage(DataflowMessageHeader messageHeader, R
104105
_log.LogTrace($"Queuing up {resourceEvent}");
105106
QueueActionLocked(resourceEvent);
106107
}
107-
108+
if (_queue.Count == 1) // we've just added to empty queue, kick off processing
109+
OfferMessagesToLinks();
108110
return DataflowMessageStatus.Accepted;
109111
}
110112

@@ -139,8 +141,7 @@ private void QueueActionLocked(ResourceEvent<TResource> obj)
139141

140142
deltas.Add(obj);
141143
CombineDeltas(deltas);
142-
if (_queue.Count == 1) // we've just added to empty queue, kick off processing
143-
OfferMessagesToLinks();
144+
144145

145146
}
146147

@@ -210,27 +211,32 @@ public List<ResourceEvent<TResource>> ConsumeMessage(DataflowMessageHeader messa
210211
private void OfferMessagesToLinks()
211212
{
212213
_log.LogTrace("Offering messages to links");
213-
lock (_lock)
214-
{
215-
foreach (var link in _targets.ToList().Where(x => x.LastOfferedMessageReply != DataflowMessageStatus.Postponed))
214+
List<TargetLink> linksToOfferTo;
215+
lock (_lock)
216+
{
217+
linksToOfferTo = _targets.Where(x => x.LastOfferedMessageReply != DataflowMessageStatus.Postponed).ToList();
218+
}
219+
220+
foreach (var link in linksToOfferTo)
221+
{
222+
DataflowMessageStatus reply;
223+
do // keep feeding the link messages until queue is either empty or it tells us that it can't handle any more
216224
{
217-
do // keep feeding the link messages until queue is either empty or it tells us that it can't handle any more
218-
{
219-
OfferMessageToLink(link);
220-
} while (_queue.Count > 0 && link.LastOfferedMessageReply == DataflowMessageStatus.Accepted);
221-
}
225+
reply = OfferMessageToLink(link);
226+
} while (reply == DataflowMessageStatus.Accepted);
222227
}
228+
223229
}
224230

225-
private void OfferMessageToLink(TargetLink link)
231+
private DataflowMessageStatus OfferMessageToLink(TargetLink link)
226232
{
227233
List<ResourceEvent<TResource>> msg;
228234
lock (_lock)
229235
{
230236
if (!_queue.TryPeek(out msg))
231237
{
232238
_log.LogTrace("Nothing to offer - queue is empty");
233-
return; // queue is empty
239+
return DataflowMessageStatus.NotAvailable; // queue is empty
234240
}
235241
}
236242

@@ -242,6 +248,8 @@ private void OfferMessageToLink(TargetLink link)
242248
_targets.Remove(link);
243249
_log.LogTrace("Link is no longer accepting messages - removing link");
244250
}
251+
252+
return link.LastOfferedMessageReply;
245253
}
246254

247255
public IDisposable LinkTo(ITargetBlock<List<ResourceEvent<TResource>>> target, DataflowLinkOptions linkOptions)

0 commit comments

Comments
 (0)