diff --git a/src/Interprocess.Tests/QueueTests.cs b/src/Interprocess.Tests/QueueTests.cs index 160541a..cccdfde 100644 --- a/src/Interprocess.Tests/QueueTests.cs +++ b/src/Interprocess.Tests/QueueTests.cs @@ -1,6 +1,7 @@ using FluentAssertions; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Logging.Abstractions; using Xunit; using Xunit.Abstractions; @@ -238,11 +239,47 @@ public void CanRejectLargeMessages() p.TryEnqueue(ByteArray50).Should().BeFalse(); // failed here } + [Fact] + [TestBeforeAfter] + public void CanRecoverIfPublisherCrashes() + { + // This is very complicated test that is trying to replicate a crash scenario when the publisher + // crashes after indicating that it is writing the message but before completing the operation. + + using var dp = new DeadlockCausingPublisher(new("qn", fixture.Path, 1024), NullLoggerFactory.Instance); + dp.TryEnqueue(ByteArray3).Should().BeTrue(); + + using var p = CreatePublisher(1024); + p.TryEnqueue(ByteArray1).Should().BeTrue(); + using var s = CreateSubscriber(1024); + + // This line should take 10 seconds to return (that is how long the timeout is set in the code) + // After the 10 seconds expires, we should have lost all other messages that were in the queue when we started the dequeue process. + s.TryDequeue(default, out _).Should().BeFalse(); + + // But then, after this 10 seconds delay, system should fully recover and continue with new messages + p.TryEnqueue(ByteArray1).Should().BeTrue(); + s.TryDequeue(default, out var message).Should().BeTrue(); + message.ToArray().Should().BeEquivalentTo(ByteArray1); + } + private IPublisher CreatePublisher(long capacity) => - queueFactory.CreatePublisher( - new QueueOptions("qn", fixture.Path, capacity)); + queueFactory.CreatePublisher(new("qn", fixture.Path, capacity)); private ISubscriber CreateSubscriber(long capacity) => - queueFactory.CreateSubscriber( - new QueueOptions("qn", fixture.Path, capacity)); + queueFactory.CreateSubscriber(new("qn", fixture.Path, capacity)); + + private sealed class DeadlockCausingPublisher(QueueOptions options, ILoggerFactory loggerFactory) : + Queue(options, loggerFactory), + IPublisher + { + public unsafe bool TryEnqueue(ReadOnlySpan message) + { + var bodyLength = message.Length; + var messageLength = GetPaddedMessageLength(bodyLength); + var header = *Header; + Header->WriteOffset = SafeIncrementMessageOffset(header.WriteOffset, messageLength); + return true; + } + } } \ No newline at end of file diff --git a/src/Interprocess/Queue/Subscriber.cs b/src/Interprocess/Queue/Subscriber.cs index 2947969..31bf604 100644 --- a/src/Interprocess/Queue/Subscriber.cs +++ b/src/Interprocess/Queue/Subscriber.cs @@ -107,14 +107,14 @@ private unsafe bool TryDequeueImpl( return false; var readLockTimestamp = header.ReadLockTimestamp; - var now = DateTime.UtcNow.Ticks; + var start = DateTime.UtcNow.Ticks; // is there already a read-lock or has the previous lock timed out meaning that a subscriber crashed? - if (now - readLockTimestamp < TicksForTenSeconds) + if (start - readLockTimestamp < TicksForTenSeconds) return false; // take a read-lock so no other thread can read a message - if (Interlocked.CompareExchange(ref Header->ReadLockTimestamp, now, readLockTimestamp) != readLockTimestamp) + if (Interlocked.CompareExchange(ref Header->ReadLockTimestamp, start, readLockTimestamp) != readLockTimestamp) return false; try @@ -125,14 +125,29 @@ private unsafe bool TryDequeueImpl( // now finally have a read-lock and the queue is not empty var readOffset = Header->ReadOffset; + var writeOffset = Header->WriteOffset; var messageHeader = (MessageHeader*)Buffer.GetPointer(readOffset); - // was this message fully written by the publisher? if not, wait for the publisher to finish writing it - while (Interlocked.CompareExchange( - ref messageHeader->State, - MessageHeader.LockedToBeConsumedState, - MessageHeader.ReadyToBeConsumedState) != MessageHeader.ReadyToBeConsumedState) + while (true) { + // was this message fully written by the publisher? if not, wait for the publisher to finish writing it + var state = Interlocked.CompareExchange( + ref messageHeader->State, + MessageHeader.LockedToBeConsumedState, + MessageHeader.ReadyToBeConsumedState); + + if (state == MessageHeader.ReadyToBeConsumedState) + break; + + // but if the publisher crashed, we will never get the message, so we need to handle that case by timing out + if (DateTime.UtcNow.Ticks - start > TicksForTenSeconds) + { + // the publisher crashed and we will never get the message + // so we need to release the read-lock and advance the queue for everyone. + // some messages might be lost in this case but this is the best we can do. + Interlocked.Exchange(ref Header->ReadOffset, writeOffset); + return false; + } Thread.Yield(); }