From 81a39e1273ca06144fd28c9a2c42b9a971ae79ab Mon Sep 17 00:00:00 2001 From: Pedram Rezaei Date: Sun, 12 Jan 2025 15:27:41 -0800 Subject: [PATCH 1/3] solve a possible deadlock --- src/Interprocess.Tests/QueueTests.cs | 45 +++++++++++++++++++++++++--- src/Interprocess/Queue/Subscriber.cs | 31 ++++++++++++++----- 2 files changed, 64 insertions(+), 12 deletions(-) diff --git a/src/Interprocess.Tests/QueueTests.cs b/src/Interprocess.Tests/QueueTests.cs index 160541a..022e8f6 100644 --- a/src/Interprocess.Tests/QueueTests.cs +++ b/src/Interprocess.Tests/QueueTests.cs @@ -1,6 +1,7 @@ using FluentAssertions; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Logging.Abstractions; using Xunit; using Xunit.Abstractions; @@ -238,11 +239,47 @@ public void CanRejectLargeMessages() p.TryEnqueue(ByteArray50).Should().BeFalse(); // failed here } + [Fact] + [TestBeforeAfter] + public void CanRecoverIfPublisherCrashes() + { + // This is very complicated test that is trying to replicate a crash scenario when the publisher + // crashes after indicating that it is writing the message but before completing the operation. + + using var dp = new DeadlockCausingPublisher(new("qn", fixture.Path, 1024), NullLoggerFactory.Instance); + dp.TryEnqueue(ByteArray3).Should().BeTrue(); + + using var p = CreatePublisher(1024); + p.TryEnqueue(ByteArray1).Should().BeTrue(); + using var s = CreateSubscriber(1024); + + // This line should take 10 seconds to return (that is how long the timeout is set in teh code) + // After the 10 seconds expires, we should have lost all other messages that were in teh queue when we started the dequeue process. + s.TryDequeue(default, out _).Should().BeFalse(); + + // But then, after this 10 seconds delay, system should fully recover and continue with new messages + p.TryEnqueue(ByteArray1).Should().BeTrue(); + s.TryDequeue(default, out var message).Should().BeTrue(); + message.ToArray().Should().BeEquivalentTo(ByteArray1); + } + private IPublisher CreatePublisher(long capacity) => - queueFactory.CreatePublisher( - new QueueOptions("qn", fixture.Path, capacity)); + queueFactory.CreatePublisher(new("qn", fixture.Path, capacity)); private ISubscriber CreateSubscriber(long capacity) => - queueFactory.CreateSubscriber( - new QueueOptions("qn", fixture.Path, capacity)); + queueFactory.CreateSubscriber(new("qn", fixture.Path, capacity)); + + private sealed class DeadlockCausingPublisher(QueueOptions options, ILoggerFactory loggerFactory) : + Queue(options, loggerFactory), + IPublisher + { + public unsafe bool TryEnqueue(ReadOnlySpan message) + { + var bodyLength = message.Length; + var messageLength = GetPaddedMessageLength(bodyLength); + var header = *Header; + Header->WriteOffset = SafeIncrementMessageOffset(header.WriteOffset, messageLength); + return true; + } + } } \ No newline at end of file diff --git a/src/Interprocess/Queue/Subscriber.cs b/src/Interprocess/Queue/Subscriber.cs index 2947969..31bf604 100644 --- a/src/Interprocess/Queue/Subscriber.cs +++ b/src/Interprocess/Queue/Subscriber.cs @@ -107,14 +107,14 @@ private unsafe bool TryDequeueImpl( return false; var readLockTimestamp = header.ReadLockTimestamp; - var now = DateTime.UtcNow.Ticks; + var start = DateTime.UtcNow.Ticks; // is there already a read-lock or has the previous lock timed out meaning that a subscriber crashed? - if (now - readLockTimestamp < TicksForTenSeconds) + if (start - readLockTimestamp < TicksForTenSeconds) return false; // take a read-lock so no other thread can read a message - if (Interlocked.CompareExchange(ref Header->ReadLockTimestamp, now, readLockTimestamp) != readLockTimestamp) + if (Interlocked.CompareExchange(ref Header->ReadLockTimestamp, start, readLockTimestamp) != readLockTimestamp) return false; try @@ -125,14 +125,29 @@ private unsafe bool TryDequeueImpl( // now finally have a read-lock and the queue is not empty var readOffset = Header->ReadOffset; + var writeOffset = Header->WriteOffset; var messageHeader = (MessageHeader*)Buffer.GetPointer(readOffset); - // was this message fully written by the publisher? if not, wait for the publisher to finish writing it - while (Interlocked.CompareExchange( - ref messageHeader->State, - MessageHeader.LockedToBeConsumedState, - MessageHeader.ReadyToBeConsumedState) != MessageHeader.ReadyToBeConsumedState) + while (true) { + // was this message fully written by the publisher? if not, wait for the publisher to finish writing it + var state = Interlocked.CompareExchange( + ref messageHeader->State, + MessageHeader.LockedToBeConsumedState, + MessageHeader.ReadyToBeConsumedState); + + if (state == MessageHeader.ReadyToBeConsumedState) + break; + + // but if the publisher crashed, we will never get the message, so we need to handle that case by timing out + if (DateTime.UtcNow.Ticks - start > TicksForTenSeconds) + { + // the publisher crashed and we will never get the message + // so we need to release the read-lock and advance the queue for everyone. + // some messages might be lost in this case but this is the best we can do. + Interlocked.Exchange(ref Header->ReadOffset, writeOffset); + return false; + } Thread.Yield(); } From 644f9ccc6387ecac1eda4574c0b627eaf4ab1fd0 Mon Sep 17 00:00:00 2001 From: Pedram Rezaei Date: Sun, 12 Jan 2025 15:30:12 -0800 Subject: [PATCH 2/3] Update src/Interprocess.Tests/QueueTests.cs Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- src/Interprocess.Tests/QueueTests.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Interprocess.Tests/QueueTests.cs b/src/Interprocess.Tests/QueueTests.cs index 022e8f6..b6751ce 100644 --- a/src/Interprocess.Tests/QueueTests.cs +++ b/src/Interprocess.Tests/QueueTests.cs @@ -253,7 +253,7 @@ public void CanRecoverIfPublisherCrashes() p.TryEnqueue(ByteArray1).Should().BeTrue(); using var s = CreateSubscriber(1024); - // This line should take 10 seconds to return (that is how long the timeout is set in teh code) + // This line should take 10 seconds to return (that is how long the timeout is set in the code) // After the 10 seconds expires, we should have lost all other messages that were in teh queue when we started the dequeue process. s.TryDequeue(default, out _).Should().BeFalse(); From 6dfa9b38c682ab3d7dca07736d15bb11aa9e9b63 Mon Sep 17 00:00:00 2001 From: Pedram Rezaei Date: Sun, 12 Jan 2025 15:30:17 -0800 Subject: [PATCH 3/3] Update src/Interprocess.Tests/QueueTests.cs Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- src/Interprocess.Tests/QueueTests.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Interprocess.Tests/QueueTests.cs b/src/Interprocess.Tests/QueueTests.cs index b6751ce..cccdfde 100644 --- a/src/Interprocess.Tests/QueueTests.cs +++ b/src/Interprocess.Tests/QueueTests.cs @@ -254,7 +254,7 @@ public void CanRecoverIfPublisherCrashes() using var s = CreateSubscriber(1024); // This line should take 10 seconds to return (that is how long the timeout is set in the code) - // After the 10 seconds expires, we should have lost all other messages that were in teh queue when we started the dequeue process. + // After the 10 seconds expires, we should have lost all other messages that were in the queue when we started the dequeue process. s.TryDequeue(default, out _).Should().BeFalse(); // But then, after this 10 seconds delay, system should fully recover and continue with new messages