From 22531fece49b05a702e126e2dae778ab3f0e3e04 Mon Sep 17 00:00:00 2001 From: David Justo Date: Fri, 12 Apr 2024 16:46:07 -0700 Subject: [PATCH 01/39] naive poison message handler --- .../Messaging/ControlQueue.cs | 27 +++++++++++++++++++ 1 file changed, 27 insertions(+) diff --git a/src/DurableTask.AzureStorage/Messaging/ControlQueue.cs b/src/DurableTask.AzureStorage/Messaging/ControlQueue.cs index 4484c3ee9..c66b298b4 100644 --- a/src/DurableTask.AzureStorage/Messaging/ControlQueue.cs +++ b/src/DurableTask.AzureStorage/Messaging/ControlQueue.cs @@ -19,9 +19,12 @@ namespace DurableTask.AzureStorage.Messaging using System.Linq; using System.Threading; using System.Threading.Tasks; + using Azure; + using Azure.Data.Tables; using DurableTask.AzureStorage.Monitoring; using DurableTask.AzureStorage.Partitioning; using DurableTask.AzureStorage.Storage; + using Microsoft.WindowsAzure.Storage.Table; class ControlQueue : TaskHubQueue, IDisposable { @@ -102,6 +105,30 @@ public async Task> GetMessagesAsync(CancellationToken var batchMessages = new ConcurrentBag(); await batch.ParallelForEachAsync(async delegate (QueueMessage queueMessage) { + // if deuque count is large, just flag it as poison. Don't even deserialize it! + if (queueMessage.DequeueCount > 5) // TODO: make configurable + { + var poisonMessage = new DynamicTableEntity(this.Name, "") + { + Properties = + { + ["TheFullMessage"] = new EntityProperty(queueMessage.Message) + } + }; + + // add to poison table + var poisonMessagesTable = this.azureStorageClient.GetTableReference("PoisonMessagesTable"); + await poisonMessagesTable.CreateIfNotExistsAsync(); + await poisonMessagesTable.InsertAsync(poisonMessage); + + // delete from queue so it doesn't get processed again. + await this.storageQueue.DeleteMessageAsync(queueMessage); + + // TODO: here we should do a forceful update in the instance table. The code doesn't seem to make that easy today :( + return; // ignore this message completely + } + + MessageData messageData; try { From 748b27960b6bb26d271a64d6a61f71192c1fdbab Mon Sep 17 00:00:00 2001 From: David Justo Date: Fri, 12 Apr 2024 17:30:47 -0700 Subject: [PATCH 02/39] incorporate feedback --- src/DurableTask.AzureStorage/MessageManager.cs | 12 +++++++++++- .../Messaging/ControlQueue.cs | 8 ++++++-- 2 files changed, 17 insertions(+), 3 deletions(-) diff --git a/src/DurableTask.AzureStorage/MessageManager.cs b/src/DurableTask.AzureStorage/MessageManager.cs index ea042a27d..834c6c769 100644 --- a/src/DurableTask.AzureStorage/MessageManager.cs +++ b/src/DurableTask.AzureStorage/MessageManager.cs @@ -22,6 +22,7 @@ namespace DurableTask.AzureStorage using System.Text; using System.Threading.Tasks; using DurableTask.AzureStorage.Storage; + using DurableTask.Core.History; using Newtonsoft.Json; using Newtonsoft.Json.Serialization; @@ -139,10 +140,19 @@ internal static bool TryGetLargeMessageReference(string messagePayload, out Uri return false; } - public async Task DeserializeQueueMessageAsync(QueueMessage queueMessage, string queueName) + public async Task DeserializeQueueMessageAsync(QueueMessage queueMessage, string queueName, bool isPoison = false) { + // TODO: if it's poison, we should consider the possibility that deserialization fails. In that case, it may be impossible + // to properly transition an orchestrator to the failed state :( . But we can move the poison message to a poison queue. MessageData envelope = this.DeserializeMessageData(queueMessage.Message); + // override HistoryEvent w/ a suspend event if it's a poisonMessage + if (isPoison) + { + ExecutionSuspendedEvent suspendEvent = new ExecutionSuspendedEvent(-1, "poison message detected"); + envelope.TaskMessage.Event = suspendEvent; + } + if (!string.IsNullOrEmpty(envelope.CompressedBlobName)) { string decompressedMessage = await this.DownloadAndDecompressAsBytesAsync(envelope.CompressedBlobName); diff --git a/src/DurableTask.AzureStorage/Messaging/ControlQueue.cs b/src/DurableTask.AzureStorage/Messaging/ControlQueue.cs index c66b298b4..96674b35b 100644 --- a/src/DurableTask.AzureStorage/Messaging/ControlQueue.cs +++ b/src/DurableTask.AzureStorage/Messaging/ControlQueue.cs @@ -24,6 +24,7 @@ namespace DurableTask.AzureStorage.Messaging using DurableTask.AzureStorage.Monitoring; using DurableTask.AzureStorage.Partitioning; using DurableTask.AzureStorage.Storage; + using DurableTask.Core; using Microsoft.WindowsAzure.Storage.Table; class ControlQueue : TaskHubQueue, IDisposable @@ -105,6 +106,7 @@ public async Task> GetMessagesAsync(CancellationToken var batchMessages = new ConcurrentBag(); await batch.ParallelForEachAsync(async delegate (QueueMessage queueMessage) { + bool isPoison = false; // if deuque count is large, just flag it as poison. Don't even deserialize it! if (queueMessage.DequeueCount > 5) // TODO: make configurable { @@ -124,17 +126,19 @@ await batch.ParallelForEachAsync(async delegate (QueueMessage queueMessage) // delete from queue so it doesn't get processed again. await this.storageQueue.DeleteMessageAsync(queueMessage); - // TODO: here we should do a forceful update in the instance table. The code doesn't seem to make that easy today :( + // since isPoison is `true`, we'll override the deserialized message w/ a suspend event + isPoison = true; return; // ignore this message completely } + MessageData messageData; try { messageData = await this.messageManager.DeserializeQueueMessageAsync( queueMessage, - this.storageQueue.Name); + this.storageQueue.Name,isPoison); } catch (Exception e) { From 40a00dd4ecd8d17716a16dc65f2630f96638bc39 Mon Sep 17 00:00:00 2001 From: David Justo Date: Mon, 15 Apr 2024 09:24:06 -0700 Subject: [PATCH 03/39] add suffix, change to terminated --- .../DurableTask.AzureStorage.csproj | 6 ++++-- src/DurableTask.AzureStorage/MessageManager.cs | 2 +- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/src/DurableTask.AzureStorage/DurableTask.AzureStorage.csproj b/src/DurableTask.AzureStorage/DurableTask.AzureStorage.csproj index be5b6e2bb..1673bd51f 100644 --- a/src/DurableTask.AzureStorage/DurableTask.AzureStorage.csproj +++ b/src/DurableTask.AzureStorage/DurableTask.AzureStorage.csproj @@ -22,14 +22,16 @@ 1 17 1 - $(MajorVersion).$(MinorVersion).$(PatchVersion) + poisonHandler.1 + + $(MajorVersion).$(MinorVersion).$(PatchVersion) $(VersionPrefix).0 $(VersionPrefix).$(FileVersionRevision) $(MajorVersion).$(MinorVersion).0.0 - + $(VersionPrefix) diff --git a/src/DurableTask.AzureStorage/MessageManager.cs b/src/DurableTask.AzureStorage/MessageManager.cs index 834c6c769..4dedfd687 100644 --- a/src/DurableTask.AzureStorage/MessageManager.cs +++ b/src/DurableTask.AzureStorage/MessageManager.cs @@ -149,7 +149,7 @@ public async Task DeserializeQueueMessageAsync(QueueMessage queueMe // override HistoryEvent w/ a suspend event if it's a poisonMessage if (isPoison) { - ExecutionSuspendedEvent suspendEvent = new ExecutionSuspendedEvent(-1, "poison message detected"); + ExecutionTerminatedEvent suspendEvent = new ExecutionTerminatedEvent(-1, "poison message detected"); envelope.TaskMessage.Event = suspendEvent; } From b1b7fba00216acd5e51eb31eb332438bfbc4a104 Mon Sep 17 00:00:00 2001 From: David Justo Date: Mon, 15 Apr 2024 21:22:24 -0700 Subject: [PATCH 04/39] more changes to get poison message handling working E2E. It's hackier than expected --- .../AzureStorageOrchestrationService.cs | 8 ++++++++ .../Messaging/ControlQueue.cs | 3 +-- .../TaskOrchestrationDispatcher.cs | 16 +++++++++++++++- 3 files changed, 24 insertions(+), 3 deletions(-) diff --git a/src/DurableTask.AzureStorage/AzureStorageOrchestrationService.cs b/src/DurableTask.AzureStorage/AzureStorageOrchestrationService.cs index ebe7c219a..5fb65dda9 100644 --- a/src/DurableTask.AzureStorage/AzureStorageOrchestrationService.cs +++ b/src/DurableTask.AzureStorage/AzureStorageOrchestrationService.cs @@ -1046,6 +1046,14 @@ bool IsExecutableInstance(OrchestrationRuntimeState runtimeState, IList 5) // TODO: make configurable { - var poisonMessage = new DynamicTableEntity(this.Name, "") + var poisonMessage = new DynamicTableEntity(queueMessage.Id, this.Name) { Properties = { @@ -128,7 +128,6 @@ await batch.ParallelForEachAsync(async delegate (QueueMessage queueMessage) // since isPoison is `true`, we'll override the deserialized message w/ a suspend event isPoison = true; - return; // ignore this message completely } diff --git a/src/DurableTask.Core/TaskOrchestrationDispatcher.cs b/src/DurableTask.Core/TaskOrchestrationDispatcher.cs index fc051994f..d7bcc1659 100644 --- a/src/DurableTask.Core/TaskOrchestrationDispatcher.cs +++ b/src/DurableTask.Core/TaskOrchestrationDispatcher.cs @@ -342,8 +342,21 @@ protected async Task OnProcessWorkItemAsync(TaskOrchestrationWorkItem work try { + // it appears the dispatcher demands that an orchestrator have an executionStarted event. Here, we compensate for the case where the + // execution started event was poisonous. It seems wrong that the poison message handling is 'leaking' into DTFx.Core. I wonder if there's + // a way to make this compensantion occur strictly at the DTFx.AS level. What we want to avoid though is the orchestrator from replaying, that would + // cause the poison condition to trigger. + if (workItem.NewMessages.Count == 1 && workItem.NewMessages[0].Event is ExecutionTerminatedEvent poisonCandidate && poisonCandidate.Input.Contains("poison")) + { + isCompleted = true; + var executionStartedEvent = new ExecutionStartedEvent(-1, ""); + workItem.OrchestrationRuntimeState.AddEvent(executionStartedEvent); + workItem.OrchestrationRuntimeState.AddEvent(poisonCandidate); + + workItem.OrchestrationRuntimeState.AddEvent(poisonCandidate); + } // Assumes that: if the batch contains a new "ExecutionStarted" event, it is the first message in the batch. - if (!ReconcileMessagesWithState(workItem, nameof(TaskOrchestrationDispatcher), this.errorPropagationMode, logHelper)) + else if (!ReconcileMessagesWithState(workItem, nameof(TaskOrchestrationDispatcher), this.errorPropagationMode, logHelper)) { // TODO : mark an orchestration as faulted if there is data corruption this.logHelper.DroppingOrchestrationWorkItem(workItem, "Received work-item for an invalid orchestration"); @@ -355,6 +368,7 @@ protected async Task OnProcessWorkItemAsync(TaskOrchestrationWorkItem work isCompleted = true; traceActivity?.Dispose(); } + else { do From b1808a1b668a3b3a3acbdd419d59dd7b855c62db Mon Sep 17 00:00:00 2001 From: David Justo Date: Tue, 16 Apr 2024 09:40:46 -0700 Subject: [PATCH 05/39] simplify implementation --- .../AzureStorageOrchestrationService.cs | 8 --- .../MessageManager.cs | 13 +--- .../Messaging/ControlQueue.cs | 61 +++++++++++-------- src/DurableTask.Core/History/HistoryEvent.cs | 5 ++ .../TaskOrchestrationDispatcher.cs | 21 +++---- .../TaskOrchestrationExecutor.cs | 11 ++++ 6 files changed, 61 insertions(+), 58 deletions(-) diff --git a/src/DurableTask.AzureStorage/AzureStorageOrchestrationService.cs b/src/DurableTask.AzureStorage/AzureStorageOrchestrationService.cs index 5fb65dda9..ebe7c219a 100644 --- a/src/DurableTask.AzureStorage/AzureStorageOrchestrationService.cs +++ b/src/DurableTask.AzureStorage/AzureStorageOrchestrationService.cs @@ -1046,14 +1046,6 @@ bool IsExecutableInstance(OrchestrationRuntimeState runtimeState, IList DeserializeQueueMessageAsync(QueueMessage queueMessage, string queueName, bool isPoison = false) + public async Task DeserializeQueueMessageAsync(QueueMessage queueMessage, string queueName) { - // TODO: if it's poison, we should consider the possibility that deserialization fails. In that case, it may be impossible - // to properly transition an orchestrator to the failed state :( . But we can move the poison message to a poison queue. MessageData envelope = this.DeserializeMessageData(queueMessage.Message); - // override HistoryEvent w/ a suspend event if it's a poisonMessage - if (isPoison) - { - ExecutionTerminatedEvent suspendEvent = new ExecutionTerminatedEvent(-1, "poison message detected"); - envelope.TaskMessage.Event = suspendEvent; - } - if (!string.IsNullOrEmpty(envelope.CompressedBlobName)) { string decompressedMessage = await this.DownloadAndDecompressAsBytesAsync(envelope.CompressedBlobName); diff --git a/src/DurableTask.AzureStorage/Messaging/ControlQueue.cs b/src/DurableTask.AzureStorage/Messaging/ControlQueue.cs index 03469e51b..45ab70f7a 100644 --- a/src/DurableTask.AzureStorage/Messaging/ControlQueue.cs +++ b/src/DurableTask.AzureStorage/Messaging/ControlQueue.cs @@ -50,6 +50,36 @@ public ControlQueue( protected override TimeSpan MessageVisibilityTimeout => this.settings.ControlQueueVisibilityTimeout; + private async Task HandleIfPoisonMessageAsync(MessageData messageData) + { + var isPoison = false; + var queueMessage = messageData.OriginalQueueMessage; + + // if deuque count is large, just flag it as poison. Don't even deserialize it! + if (queueMessage.DequeueCount > 0) // TODO: make configurable + { + var poisonMessage = new DynamicTableEntity(queueMessage.Id, this.Name) + { + Properties = + { + ["TheFullMessage"] = new EntityProperty(queueMessage.Message) + } + }; + + // add to poison table + var poisonMessagesTable = this.azureStorageClient.GetTableReference("PoisonMessagesTable"); + await poisonMessagesTable.CreateIfNotExistsAsync(); + await poisonMessagesTable.InsertAsync(poisonMessage); + + // delete from queue so it doesn't get processed again. + await this.storageQueue.DeleteMessageAsync(queueMessage); + + // since isPoison is `true`, we'll override the deserialized message w/ a suspend event + isPoison = true; + } + messageData.TaskMessage.Event.IsPoison = isPoison; + } + public async Task> GetMessagesAsync(CancellationToken cancellationToken) { using (var linkedCts = CancellationTokenSource.CreateLinkedTokenSource(this.releaseCancellationToken, cancellationToken)) @@ -106,38 +136,15 @@ public async Task> GetMessagesAsync(CancellationToken var batchMessages = new ConcurrentBag(); await batch.ParallelForEachAsync(async delegate (QueueMessage queueMessage) { - bool isPoison = false; - // if deuque count is large, just flag it as poison. Don't even deserialize it! - if (queueMessage.DequeueCount > 5) // TODO: make configurable - { - var poisonMessage = new DynamicTableEntity(queueMessage.Id, this.Name) - { - Properties = - { - ["TheFullMessage"] = new EntityProperty(queueMessage.Message) - } - }; - - // add to poison table - var poisonMessagesTable = this.azureStorageClient.GetTableReference("PoisonMessagesTable"); - await poisonMessagesTable.CreateIfNotExistsAsync(); - await poisonMessagesTable.InsertAsync(poisonMessage); - - // delete from queue so it doesn't get processed again. - await this.storageQueue.DeleteMessageAsync(queueMessage); - - // since isPoison is `true`, we'll override the deserialized message w/ a suspend event - isPoison = true; - } - - - MessageData messageData; try { messageData = await this.messageManager.DeserializeQueueMessageAsync( queueMessage, - this.storageQueue.Name,isPoison); + this.storageQueue.Name); + + await this.HandleIfPoisonMessageAsync(messageData); + } catch (Exception e) { diff --git a/src/DurableTask.Core/History/HistoryEvent.cs b/src/DurableTask.Core/History/HistoryEvent.cs index 8e6a8f316..022ebd87b 100644 --- a/src/DurableTask.Core/History/HistoryEvent.cs +++ b/src/DurableTask.Core/History/HistoryEvent.cs @@ -93,5 +93,10 @@ protected HistoryEvent(int eventId) /// Implementation for . /// public ExtensionDataObject? ExtensionData { get; set; } + + /// + /// Gets or sets whether this is a poison message. + /// + public bool IsPoison { get; set; } = false; } } \ No newline at end of file diff --git a/src/DurableTask.Core/TaskOrchestrationDispatcher.cs b/src/DurableTask.Core/TaskOrchestrationDispatcher.cs index d7bcc1659..189aa66c9 100644 --- a/src/DurableTask.Core/TaskOrchestrationDispatcher.cs +++ b/src/DurableTask.Core/TaskOrchestrationDispatcher.cs @@ -346,17 +346,17 @@ protected async Task OnProcessWorkItemAsync(TaskOrchestrationWorkItem work // execution started event was poisonous. It seems wrong that the poison message handling is 'leaking' into DTFx.Core. I wonder if there's // a way to make this compensantion occur strictly at the DTFx.AS level. What we want to avoid though is the orchestrator from replaying, that would // cause the poison condition to trigger. - if (workItem.NewMessages.Count == 1 && workItem.NewMessages[0].Event is ExecutionTerminatedEvent poisonCandidate && poisonCandidate.Input.Contains("poison")) - { - isCompleted = true; - var executionStartedEvent = new ExecutionStartedEvent(-1, ""); - workItem.OrchestrationRuntimeState.AddEvent(executionStartedEvent); - workItem.OrchestrationRuntimeState.AddEvent(poisonCandidate); - - workItem.OrchestrationRuntimeState.AddEvent(poisonCandidate); - } + //if (workItem.NewMessages.Count == 1 && workItem.NewMessages[0].Event is ExecutionTerminatedEvent poisonCandidate && poisonCandidate.Input.Contains("poison")) + //{ + // isCompleted = true; + // var executionStartedEvent = new ExecutionStartedEvent(-1, ""); + // workItem.OrchestrationRuntimeState.AddEvent(executionStartedEvent); + // workItem.OrchestrationRuntimeState.AddEvent(poisonCandidate); + // + // workItem.OrchestrationRuntimeState.AddEvent(poisonCandidate); + //} // Assumes that: if the batch contains a new "ExecutionStarted" event, it is the first message in the batch. - else if (!ReconcileMessagesWithState(workItem, nameof(TaskOrchestrationDispatcher), this.errorPropagationMode, logHelper)) + if (!ReconcileMessagesWithState(workItem, nameof(TaskOrchestrationDispatcher), this.errorPropagationMode, logHelper)) { // TODO : mark an orchestration as faulted if there is data corruption this.logHelper.DroppingOrchestrationWorkItem(workItem, "Received work-item for an invalid orchestration"); @@ -368,7 +368,6 @@ protected async Task OnProcessWorkItemAsync(TaskOrchestrationWorkItem work isCompleted = true; traceActivity?.Dispose(); } - else { do diff --git a/src/DurableTask.Core/TaskOrchestrationExecutor.cs b/src/DurableTask.Core/TaskOrchestrationExecutor.cs index af8b850bf..32f5a1d77 100644 --- a/src/DurableTask.Core/TaskOrchestrationExecutor.cs +++ b/src/DurableTask.Core/TaskOrchestrationExecutor.cs @@ -197,6 +197,17 @@ void ProcessEvents(IEnumerable events) void ProcessEvent(HistoryEvent historyEvent) { + if (historyEvent.IsPoison) + { + var terminationEvent = new ExecutionTerminatedEvent(-1, "detected poison!"); + historyEvent = terminationEvent; + + var taskCompletionSource = new TaskCompletionSource(); + taskCompletionSource.SetResult(""); + + this.result = taskCompletionSource.Task; + } + bool overrideSuspension = historyEvent.EventType == EventType.ExecutionResumed || historyEvent.EventType == EventType.ExecutionTerminated; if (this.context.IsSuspended && !overrideSuspension) { From 45d523bc5c07944bb9f8bbffa209f8ed46cc4c2d Mon Sep 17 00:00:00 2001 From: David Justo Date: Tue, 16 Apr 2024 09:41:43 -0700 Subject: [PATCH 06/39] remove commented out code --- src/DurableTask.Core/TaskOrchestrationDispatcher.cs | 13 ------------- 1 file changed, 13 deletions(-) diff --git a/src/DurableTask.Core/TaskOrchestrationDispatcher.cs b/src/DurableTask.Core/TaskOrchestrationDispatcher.cs index 189aa66c9..fc051994f 100644 --- a/src/DurableTask.Core/TaskOrchestrationDispatcher.cs +++ b/src/DurableTask.Core/TaskOrchestrationDispatcher.cs @@ -342,19 +342,6 @@ protected async Task OnProcessWorkItemAsync(TaskOrchestrationWorkItem work try { - // it appears the dispatcher demands that an orchestrator have an executionStarted event. Here, we compensate for the case where the - // execution started event was poisonous. It seems wrong that the poison message handling is 'leaking' into DTFx.Core. I wonder if there's - // a way to make this compensantion occur strictly at the DTFx.AS level. What we want to avoid though is the orchestrator from replaying, that would - // cause the poison condition to trigger. - //if (workItem.NewMessages.Count == 1 && workItem.NewMessages[0].Event is ExecutionTerminatedEvent poisonCandidate && poisonCandidate.Input.Contains("poison")) - //{ - // isCompleted = true; - // var executionStartedEvent = new ExecutionStartedEvent(-1, ""); - // workItem.OrchestrationRuntimeState.AddEvent(executionStartedEvent); - // workItem.OrchestrationRuntimeState.AddEvent(poisonCandidate); - // - // workItem.OrchestrationRuntimeState.AddEvent(poisonCandidate); - //} // Assumes that: if the batch contains a new "ExecutionStarted" event, it is the first message in the batch. if (!ReconcileMessagesWithState(workItem, nameof(TaskOrchestrationDispatcher), this.errorPropagationMode, logHelper)) { From 82e3531d7a53c2343d11a7f75abfb294db1b0280 Mon Sep 17 00:00:00 2001 From: David Justo Date: Tue, 16 Apr 2024 09:44:28 -0700 Subject: [PATCH 07/39] remove csproj changes --- src/DurableTask.AzureStorage/DurableTask.AzureStorage.csproj | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/src/DurableTask.AzureStorage/DurableTask.AzureStorage.csproj b/src/DurableTask.AzureStorage/DurableTask.AzureStorage.csproj index 1673bd51f..f528afce6 100644 --- a/src/DurableTask.AzureStorage/DurableTask.AzureStorage.csproj +++ b/src/DurableTask.AzureStorage/DurableTask.AzureStorage.csproj @@ -23,15 +23,13 @@ 17 1 poisonHandler.1 - - $(MajorVersion).$(MinorVersion).$(PatchVersion) $(VersionPrefix).0 $(VersionPrefix).$(FileVersionRevision) $(MajorVersion).$(MinorVersion).0.0 - + $(VersionPrefix) From adf4579c7c1affa6144b52085dc3457dc84ed7b4 Mon Sep 17 00:00:00 2001 From: David Justo Date: Tue, 16 Apr 2024 09:45:41 -0700 Subject: [PATCH 08/39] undo change in message manager deps --- src/DurableTask.AzureStorage/MessageManager.cs | 1 + 1 file changed, 1 insertion(+) diff --git a/src/DurableTask.AzureStorage/MessageManager.cs b/src/DurableTask.AzureStorage/MessageManager.cs index df052a5b8..ea042a27d 100644 --- a/src/DurableTask.AzureStorage/MessageManager.cs +++ b/src/DurableTask.AzureStorage/MessageManager.cs @@ -18,6 +18,7 @@ namespace DurableTask.AzureStorage using System.IO; using System.IO.Compression; using System.Reflection; + using System.Runtime.Serialization; using System.Text; using System.Threading.Tasks; using DurableTask.AzureStorage.Storage; From d20bb7e65d55bfeda9a722ec045fe59a14163c55 Mon Sep 17 00:00:00 2001 From: David Justo Date: Tue, 16 Apr 2024 09:46:19 -0700 Subject: [PATCH 09/39] undo csproj changeS --- src/DurableTask.AzureStorage/DurableTask.AzureStorage.csproj | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/DurableTask.AzureStorage/DurableTask.AzureStorage.csproj b/src/DurableTask.AzureStorage/DurableTask.AzureStorage.csproj index f528afce6..044308b8c 100644 --- a/src/DurableTask.AzureStorage/DurableTask.AzureStorage.csproj +++ b/src/DurableTask.AzureStorage/DurableTask.AzureStorage.csproj @@ -22,7 +22,7 @@ 1 17 1 - poisonHandler.1 + $(MajorVersion).$(MinorVersion).$(PatchVersion) $(VersionPrefix).0 $(VersionPrefix).$(FileVersionRevision) From 40baca0806d12c80cf0ec677d3aede8c4670343c Mon Sep 17 00:00:00 2001 From: David Justo Date: Tue, 16 Apr 2024 10:08:27 -0700 Subject: [PATCH 10/39] add activity pmh as well --- .../DurableTask.AzureStorage.csproj | 3 +- .../Messaging/ControlQueue.cs | 2 +- .../Messaging/WorkItemQueue.cs | 34 +++++++++++++++++++ src/DurableTask.Core/DurableTask.Core.csproj | 1 + .../TaskActivityDispatcher.cs | 6 ++++ 5 files changed, 44 insertions(+), 2 deletions(-) diff --git a/src/DurableTask.AzureStorage/DurableTask.AzureStorage.csproj b/src/DurableTask.AzureStorage/DurableTask.AzureStorage.csproj index 044308b8c..8aafa49aa 100644 --- a/src/DurableTask.AzureStorage/DurableTask.AzureStorage.csproj +++ b/src/DurableTask.AzureStorage/DurableTask.AzureStorage.csproj @@ -24,7 +24,8 @@ 1 $(MajorVersion).$(MinorVersion).$(PatchVersion) $(VersionPrefix).0 - + pmh.2 + $(VersionPrefix).$(FileVersionRevision) $(MajorVersion).$(MinorVersion).0.0 diff --git a/src/DurableTask.AzureStorage/Messaging/ControlQueue.cs b/src/DurableTask.AzureStorage/Messaging/ControlQueue.cs index 45ab70f7a..e257d2152 100644 --- a/src/DurableTask.AzureStorage/Messaging/ControlQueue.cs +++ b/src/DurableTask.AzureStorage/Messaging/ControlQueue.cs @@ -56,7 +56,7 @@ private async Task HandleIfPoisonMessageAsync(MessageData messageData) var queueMessage = messageData.OriginalQueueMessage; // if deuque count is large, just flag it as poison. Don't even deserialize it! - if (queueMessage.DequeueCount > 0) // TODO: make configurable + if (queueMessage.DequeueCount > 5) // TODO: make configurable { var poisonMessage = new DynamicTableEntity(queueMessage.Id, this.Name) { diff --git a/src/DurableTask.AzureStorage/Messaging/WorkItemQueue.cs b/src/DurableTask.AzureStorage/Messaging/WorkItemQueue.cs index a420c87bf..f42bf0b9e 100644 --- a/src/DurableTask.AzureStorage/Messaging/WorkItemQueue.cs +++ b/src/DurableTask.AzureStorage/Messaging/WorkItemQueue.cs @@ -17,6 +17,7 @@ namespace DurableTask.AzureStorage.Messaging using System.Threading; using System.Threading.Tasks; using DurableTask.AzureStorage.Storage; + using Microsoft.WindowsAzure.Storage.Table; class WorkItemQueue : TaskHubQueue { @@ -30,6 +31,37 @@ public WorkItemQueue( protected override TimeSpan MessageVisibilityTimeout => this.settings.WorkItemQueueVisibilityTimeout; + private async Task HandleIfPoisonMessageAsync(MessageData messageData) + { + // TODO: put in superclass? + var isPoison = false; + var queueMessage = messageData.OriginalQueueMessage; + + // if deuque count is large, just flag it as poison. Don't even deserialize it! + if (queueMessage.DequeueCount > 5) // TODO: make configurable + { + var poisonMessage = new DynamicTableEntity(queueMessage.Id, this.Name) + { + Properties = + { + ["TheFullMessage"] = new EntityProperty(queueMessage.Message) + } + }; + + // add to poison table + var poisonMessagesTable = this.azureStorageClient.GetTableReference("PoisonMessagesTable"); + await poisonMessagesTable.CreateIfNotExistsAsync(); + await poisonMessagesTable.InsertAsync(poisonMessage); + + // delete from queue so it doesn't get processed again. + await this.storageQueue.DeleteMessageAsync(queueMessage); + + // since isPoison is `true`, we'll override the deserialized message w/ a suspend event + isPoison = true; + } + messageData.TaskMessage.Event.IsPoison = isPoison; + } + public async Task GetMessageAsync(CancellationToken cancellationToken) { while (!cancellationToken.IsCancellationRequested) @@ -44,9 +76,11 @@ public async Task GetMessageAsync(CancellationToken cancellationTok continue; } + // TODO: maybe the message manager should handle the poison? MessageData data = await this.messageManager.DeserializeQueueMessageAsync( queueMessage, this.storageQueue.Name); + await this.HandleIfPoisonMessageAsync(data); this.backoffHelper.Reset(); return data; diff --git a/src/DurableTask.Core/DurableTask.Core.csproj b/src/DurableTask.Core/DurableTask.Core.csproj index 536dfc22b..045d358ae 100644 --- a/src/DurableTask.Core/DurableTask.Core.csproj +++ b/src/DurableTask.Core/DurableTask.Core.csproj @@ -20,6 +20,7 @@ 16 2 $(MajorVersion).$(MinorVersion).$(PatchVersion) + pmh.2 $(VersionPrefix).0 $(VersionPrefix).$(FileVersionRevision) diff --git a/src/DurableTask.Core/TaskActivityDispatcher.cs b/src/DurableTask.Core/TaskActivityDispatcher.cs index 1c0b8464f..5dc9fb3a2 100644 --- a/src/DurableTask.Core/TaskActivityDispatcher.cs +++ b/src/DurableTask.Core/TaskActivityDispatcher.cs @@ -192,6 +192,12 @@ await this.dispatchPipeline.RunAsync(dispatchContext, async _ => try { + if (scheduledEvent.IsPoison) + { + var exception = new TaskFailureException("poison activity message detected!", details: "poison activity message detected!"); + throw exception; + } + string? output = await taskActivity.RunAsync(context, scheduledEvent.Input); responseEvent = new TaskCompletedEvent(-1, scheduledEvent.EventId, output); } From f8963642cf7e77cf451926aa3dda68bec70959cb Mon Sep 17 00:00:00 2001 From: David Justo Date: Tue, 16 Apr 2024 11:40:02 -0700 Subject: [PATCH 11/39] make configurable --- .../AzureStorageOrchestrationServiceSettings.cs | 5 +++++ src/DurableTask.AzureStorage/Messaging/ControlQueue.cs | 8 ++++---- src/DurableTask.AzureStorage/Messaging/WorkItemQueue.cs | 8 ++++---- src/DurableTask.Core/TaskActivityDispatcher.cs | 4 +++- src/DurableTask.Core/TaskOrchestrationExecutor.cs | 4 +++- 5 files changed, 19 insertions(+), 10 deletions(-) diff --git a/src/DurableTask.AzureStorage/AzureStorageOrchestrationServiceSettings.cs b/src/DurableTask.AzureStorage/AzureStorageOrchestrationServiceSettings.cs index 609a8b35d..886098e26 100644 --- a/src/DurableTask.AzureStorage/AzureStorageOrchestrationServiceSettings.cs +++ b/src/DurableTask.AzureStorage/AzureStorageOrchestrationServiceSettings.cs @@ -117,6 +117,11 @@ public class AzureStorageOrchestrationServiceSettings /// public int MaxConcurrentTaskEntityWorkItems { get; set; } = 100; + /// + /// Gets or sets the maximum dequeue count of any message before it is flagged as a "poison message". + /// The default value is 20. + /// + public int PoisonMessageDeuqueCountThreshold { get; set; } = 20; /// /// Gets or sets the maximum number of concurrent storage operations that can be executed in the context /// of a single orchestration instance. diff --git a/src/DurableTask.AzureStorage/Messaging/ControlQueue.cs b/src/DurableTask.AzureStorage/Messaging/ControlQueue.cs index e257d2152..cde8d17cb 100644 --- a/src/DurableTask.AzureStorage/Messaging/ControlQueue.cs +++ b/src/DurableTask.AzureStorage/Messaging/ControlQueue.cs @@ -55,19 +55,19 @@ private async Task HandleIfPoisonMessageAsync(MessageData messageData) var isPoison = false; var queueMessage = messageData.OriginalQueueMessage; - // if deuque count is large, just flag it as poison. Don't even deserialize it! - if (queueMessage.DequeueCount > 5) // TODO: make configurable + if (queueMessage.DequeueCount > this.settings.PoisonMessageDeuqueCountThreshold) { var poisonMessage = new DynamicTableEntity(queueMessage.Id, this.Name) { Properties = { - ["TheFullMessage"] = new EntityProperty(queueMessage.Message) + ["RawMessage"] = new EntityProperty(queueMessage.Message) } }; // add to poison table - var poisonMessagesTable = this.azureStorageClient.GetTableReference("PoisonMessagesTable"); + var poisonMessageTableName = this.settings.TaskHubName.ToLowerInvariant() + "-poison"; + var poisonMessagesTable = this.azureStorageClient.GetTableReference(poisonMessageTableName); await poisonMessagesTable.CreateIfNotExistsAsync(); await poisonMessagesTable.InsertAsync(poisonMessage); diff --git a/src/DurableTask.AzureStorage/Messaging/WorkItemQueue.cs b/src/DurableTask.AzureStorage/Messaging/WorkItemQueue.cs index f42bf0b9e..3421a4325 100644 --- a/src/DurableTask.AzureStorage/Messaging/WorkItemQueue.cs +++ b/src/DurableTask.AzureStorage/Messaging/WorkItemQueue.cs @@ -38,19 +38,19 @@ private async Task HandleIfPoisonMessageAsync(MessageData messageData) var queueMessage = messageData.OriginalQueueMessage; // if deuque count is large, just flag it as poison. Don't even deserialize it! - if (queueMessage.DequeueCount > 5) // TODO: make configurable + if (queueMessage.DequeueCount > this.settings.PoisonMessageDeuqueCountThreshold) // TODO: make configurable { var poisonMessage = new DynamicTableEntity(queueMessage.Id, this.Name) { Properties = { - ["TheFullMessage"] = new EntityProperty(queueMessage.Message) + ["RawMessage"] = new EntityProperty(queueMessage.Message) } }; // add to poison table - var poisonMessagesTable = this.azureStorageClient.GetTableReference("PoisonMessagesTable"); - await poisonMessagesTable.CreateIfNotExistsAsync(); + var poisonMessageTableName = this.settings.TaskHubName.ToLowerInvariant() + "-poison"; + var poisonMessagesTable = this.azureStorageClient.GetTableReference(poisonMessageTableName); await poisonMessagesTable.CreateIfNotExistsAsync(); await poisonMessagesTable.InsertAsync(poisonMessage); // delete from queue so it doesn't get processed again. diff --git a/src/DurableTask.Core/TaskActivityDispatcher.cs b/src/DurableTask.Core/TaskActivityDispatcher.cs index 5dc9fb3a2..a03fdf65d 100644 --- a/src/DurableTask.Core/TaskActivityDispatcher.cs +++ b/src/DurableTask.Core/TaskActivityDispatcher.cs @@ -194,7 +194,9 @@ await this.dispatchPipeline.RunAsync(dispatchContext, async _ => { if (scheduledEvent.IsPoison) { - var exception = new TaskFailureException("poison activity message detected!", details: "poison activity message detected!"); + // TODO: improve wording here + var errorMessage = $"The activity invocation request for '{scheduledEvent.Name}' was labeled a 'poison message'."; + var exception = new TaskFailureException(errorMessage, details: errorMessage); throw exception; } diff --git a/src/DurableTask.Core/TaskOrchestrationExecutor.cs b/src/DurableTask.Core/TaskOrchestrationExecutor.cs index 32f5a1d77..25e89504c 100644 --- a/src/DurableTask.Core/TaskOrchestrationExecutor.cs +++ b/src/DurableTask.Core/TaskOrchestrationExecutor.cs @@ -199,7 +199,9 @@ void ProcessEvent(HistoryEvent historyEvent) { if (historyEvent.IsPoison) { - var terminationEvent = new ExecutionTerminatedEvent(-1, "detected poison!"); + // TODO: improve wording here + var errorMessage = $"The 'XXXX' was detected as a poison message."; + var terminationEvent = new ExecutionTerminatedEvent(-1, errorMessage); historyEvent = terminationEvent; var taskCompletionSource = new TaskCompletionSource(); From cef1410c08dfec2bf35550c3ddb0f4f861289dfc Mon Sep 17 00:00:00 2001 From: David Justo Date: Tue, 16 Apr 2024 11:43:42 -0700 Subject: [PATCH 12/39] move poison message handler to superclass --- .../Messaging/ControlQueue.cs | 30 ------------------ .../Messaging/TaskHubQueue.cs | 31 +++++++++++++++++++ .../Messaging/WorkItemQueue.cs | 31 ------------------- 3 files changed, 31 insertions(+), 61 deletions(-) diff --git a/src/DurableTask.AzureStorage/Messaging/ControlQueue.cs b/src/DurableTask.AzureStorage/Messaging/ControlQueue.cs index cde8d17cb..4680821d5 100644 --- a/src/DurableTask.AzureStorage/Messaging/ControlQueue.cs +++ b/src/DurableTask.AzureStorage/Messaging/ControlQueue.cs @@ -50,36 +50,6 @@ public ControlQueue( protected override TimeSpan MessageVisibilityTimeout => this.settings.ControlQueueVisibilityTimeout; - private async Task HandleIfPoisonMessageAsync(MessageData messageData) - { - var isPoison = false; - var queueMessage = messageData.OriginalQueueMessage; - - if (queueMessage.DequeueCount > this.settings.PoisonMessageDeuqueCountThreshold) - { - var poisonMessage = new DynamicTableEntity(queueMessage.Id, this.Name) - { - Properties = - { - ["RawMessage"] = new EntityProperty(queueMessage.Message) - } - }; - - // add to poison table - var poisonMessageTableName = this.settings.TaskHubName.ToLowerInvariant() + "-poison"; - var poisonMessagesTable = this.azureStorageClient.GetTableReference(poisonMessageTableName); - await poisonMessagesTable.CreateIfNotExistsAsync(); - await poisonMessagesTable.InsertAsync(poisonMessage); - - // delete from queue so it doesn't get processed again. - await this.storageQueue.DeleteMessageAsync(queueMessage); - - // since isPoison is `true`, we'll override the deserialized message w/ a suspend event - isPoison = true; - } - messageData.TaskMessage.Event.IsPoison = isPoison; - } - public async Task> GetMessagesAsync(CancellationToken cancellationToken) { using (var linkedCts = CancellationTokenSource.CreateLinkedTokenSource(this.releaseCancellationToken, cancellationToken)) diff --git a/src/DurableTask.AzureStorage/Messaging/TaskHubQueue.cs b/src/DurableTask.AzureStorage/Messaging/TaskHubQueue.cs index 9dba81579..3dd0ea159 100644 --- a/src/DurableTask.AzureStorage/Messaging/TaskHubQueue.cs +++ b/src/DurableTask.AzureStorage/Messaging/TaskHubQueue.cs @@ -22,6 +22,7 @@ namespace DurableTask.AzureStorage.Messaging using DurableTask.AzureStorage.Storage; using DurableTask.Core; using DurableTask.Core.History; + using Microsoft.WindowsAzure.Storage.Table; abstract class TaskHubQueue { @@ -57,6 +58,36 @@ public TaskHubQueue( this.backoffHelper = new BackoffPollingHelper(minPollingDelay, maxPollingDelay); } + public async Task HandleIfPoisonMessageAsync(MessageData messageData) + { + var isPoison = false; + var queueMessage = messageData.OriginalQueueMessage; + + if (queueMessage.DequeueCount > this.settings.PoisonMessageDeuqueCountThreshold) + { + var poisonMessage = new DynamicTableEntity(queueMessage.Id, this.Name) + { + Properties = + { + ["RawMessage"] = new EntityProperty(queueMessage.Message) + } + }; + + // add to poison table + var poisonMessageTableName = this.settings.TaskHubName.ToLowerInvariant() + "-poison"; + var poisonMessagesTable = this.azureStorageClient.GetTableReference(poisonMessageTableName); + await poisonMessagesTable.CreateIfNotExistsAsync(); + await poisonMessagesTable.InsertAsync(poisonMessage); + + // delete from queue so it doesn't get processed again. + await this.storageQueue.DeleteMessageAsync(queueMessage); + + // since isPoison is `true`, we'll override the deserialized message w/ a suspend event + isPoison = true; + } + messageData.TaskMessage.Event.IsPoison = isPoison; + } + public string Name => this.storageQueue.Name; public Uri Uri => this.storageQueue.Uri; diff --git a/src/DurableTask.AzureStorage/Messaging/WorkItemQueue.cs b/src/DurableTask.AzureStorage/Messaging/WorkItemQueue.cs index 3421a4325..eccbc7b6f 100644 --- a/src/DurableTask.AzureStorage/Messaging/WorkItemQueue.cs +++ b/src/DurableTask.AzureStorage/Messaging/WorkItemQueue.cs @@ -31,37 +31,6 @@ public WorkItemQueue( protected override TimeSpan MessageVisibilityTimeout => this.settings.WorkItemQueueVisibilityTimeout; - private async Task HandleIfPoisonMessageAsync(MessageData messageData) - { - // TODO: put in superclass? - var isPoison = false; - var queueMessage = messageData.OriginalQueueMessage; - - // if deuque count is large, just flag it as poison. Don't even deserialize it! - if (queueMessage.DequeueCount > this.settings.PoisonMessageDeuqueCountThreshold) // TODO: make configurable - { - var poisonMessage = new DynamicTableEntity(queueMessage.Id, this.Name) - { - Properties = - { - ["RawMessage"] = new EntityProperty(queueMessage.Message) - } - }; - - // add to poison table - var poisonMessageTableName = this.settings.TaskHubName.ToLowerInvariant() + "-poison"; - var poisonMessagesTable = this.azureStorageClient.GetTableReference(poisonMessageTableName); await poisonMessagesTable.CreateIfNotExistsAsync(); - await poisonMessagesTable.InsertAsync(poisonMessage); - - // delete from queue so it doesn't get processed again. - await this.storageQueue.DeleteMessageAsync(queueMessage); - - // since isPoison is `true`, we'll override the deserialized message w/ a suspend event - isPoison = true; - } - messageData.TaskMessage.Event.IsPoison = isPoison; - } - public async Task GetMessageAsync(CancellationToken cancellationToken) { while (!cancellationToken.IsCancellationRequested) From eeea1598f3970ae0f518be653f0718a657fb9b56 Mon Sep 17 00:00:00 2001 From: David Justo Date: Tue, 16 Apr 2024 11:45:24 -0700 Subject: [PATCH 13/39] remove unecessary imports --- src/DurableTask.AzureStorage/Messaging/ControlQueue.cs | 4 ---- 1 file changed, 4 deletions(-) diff --git a/src/DurableTask.AzureStorage/Messaging/ControlQueue.cs b/src/DurableTask.AzureStorage/Messaging/ControlQueue.cs index 4680821d5..e0744b3ee 100644 --- a/src/DurableTask.AzureStorage/Messaging/ControlQueue.cs +++ b/src/DurableTask.AzureStorage/Messaging/ControlQueue.cs @@ -19,13 +19,9 @@ namespace DurableTask.AzureStorage.Messaging using System.Linq; using System.Threading; using System.Threading.Tasks; - using Azure; - using Azure.Data.Tables; using DurableTask.AzureStorage.Monitoring; using DurableTask.AzureStorage.Partitioning; using DurableTask.AzureStorage.Storage; - using DurableTask.Core; - using Microsoft.WindowsAzure.Storage.Table; class ControlQueue : TaskHubQueue, IDisposable { From 961d64b9b969a9f8e98cf703e6f09a50dd618c8c Mon Sep 17 00:00:00 2001 From: David Justo Date: Tue, 16 Apr 2024 11:46:26 -0700 Subject: [PATCH 14/39] remove unecessary import --- src/DurableTask.AzureStorage/Messaging/WorkItemQueue.cs | 1 - 1 file changed, 1 deletion(-) diff --git a/src/DurableTask.AzureStorage/Messaging/WorkItemQueue.cs b/src/DurableTask.AzureStorage/Messaging/WorkItemQueue.cs index eccbc7b6f..b83265201 100644 --- a/src/DurableTask.AzureStorage/Messaging/WorkItemQueue.cs +++ b/src/DurableTask.AzureStorage/Messaging/WorkItemQueue.cs @@ -17,7 +17,6 @@ namespace DurableTask.AzureStorage.Messaging using System.Threading; using System.Threading.Tasks; using DurableTask.AzureStorage.Storage; - using Microsoft.WindowsAzure.Storage.Table; class WorkItemQueue : TaskHubQueue { From 5dfe896e9a3aa11b6c4b9336020ba2927e926e0d Mon Sep 17 00:00:00 2001 From: David Justo Date: Tue, 16 Apr 2024 11:56:39 -0700 Subject: [PATCH 15/39] simplify code a bit --- src/DurableTask.AzureStorage/Messaging/TaskHubQueue.cs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/DurableTask.AzureStorage/Messaging/TaskHubQueue.cs b/src/DurableTask.AzureStorage/Messaging/TaskHubQueue.cs index 3dd0ea159..7b52f6927 100644 --- a/src/DurableTask.AzureStorage/Messaging/TaskHubQueue.cs +++ b/src/DurableTask.AzureStorage/Messaging/TaskHubQueue.cs @@ -83,9 +83,8 @@ public async Task HandleIfPoisonMessageAsync(MessageData messageData) await this.storageQueue.DeleteMessageAsync(queueMessage); // since isPoison is `true`, we'll override the deserialized message w/ a suspend event - isPoison = true; + messageData.TaskMessage.Event.IsPoison = true; } - messageData.TaskMessage.Event.IsPoison = isPoison; } public string Name => this.storageQueue.Name; From 4a25c5bf278d17d65ea54241cc9bb9769bf46a9e Mon Sep 17 00:00:00 2001 From: David Justo Date: Tue, 16 Apr 2024 11:58:19 -0700 Subject: [PATCH 16/39] remove unused variable --- src/DurableTask.AzureStorage/Messaging/TaskHubQueue.cs | 1 - 1 file changed, 1 deletion(-) diff --git a/src/DurableTask.AzureStorage/Messaging/TaskHubQueue.cs b/src/DurableTask.AzureStorage/Messaging/TaskHubQueue.cs index 7b52f6927..63f94e46f 100644 --- a/src/DurableTask.AzureStorage/Messaging/TaskHubQueue.cs +++ b/src/DurableTask.AzureStorage/Messaging/TaskHubQueue.cs @@ -60,7 +60,6 @@ public TaskHubQueue( public async Task HandleIfPoisonMessageAsync(MessageData messageData) { - var isPoison = false; var queueMessage = messageData.OriginalQueueMessage; if (queueMessage.DequeueCount > this.settings.PoisonMessageDeuqueCountThreshold) From 8afbfc2707c5befbf1d35b75beda7010c06d6a42 Mon Sep 17 00:00:00 2001 From: David Justo Date: Tue, 16 Apr 2024 12:12:53 -0700 Subject: [PATCH 17/39] simplify and unify guidance --- .../Messaging/TaskHubQueue.cs | 15 +++++++++++---- src/DurableTask.Core/History/HistoryEvent.cs | 5 +++++ src/DurableTask.Core/TaskActivityDispatcher.cs | 4 +--- src/DurableTask.Core/TaskOrchestrationExecutor.cs | 4 +--- 4 files changed, 18 insertions(+), 10 deletions(-) diff --git a/src/DurableTask.AzureStorage/Messaging/TaskHubQueue.cs b/src/DurableTask.AzureStorage/Messaging/TaskHubQueue.cs index 63f94e46f..e7a99d20f 100644 --- a/src/DurableTask.AzureStorage/Messaging/TaskHubQueue.cs +++ b/src/DurableTask.AzureStorage/Messaging/TaskHubQueue.cs @@ -22,6 +22,7 @@ namespace DurableTask.AzureStorage.Messaging using DurableTask.AzureStorage.Storage; using DurableTask.Core; using DurableTask.Core.History; + using Microsoft.WindowsAzure.Storage.Queue.Protocol; using Microsoft.WindowsAzure.Storage.Table; abstract class TaskHubQueue @@ -61,8 +62,8 @@ public TaskHubQueue( public async Task HandleIfPoisonMessageAsync(MessageData messageData) { var queueMessage = messageData.OriginalQueueMessage; - - if (queueMessage.DequeueCount > this.settings.PoisonMessageDeuqueCountThreshold) + var maxThreshold = this.settings.PoisonMessageDeuqueCountThreshold; + if (queueMessage.DequeueCount > maxThreshold) { var poisonMessage = new DynamicTableEntity(queueMessage.Id, this.Name) { @@ -73,8 +74,8 @@ public async Task HandleIfPoisonMessageAsync(MessageData messageData) }; // add to poison table - var poisonMessageTableName = this.settings.TaskHubName.ToLowerInvariant() + "-poison"; - var poisonMessagesTable = this.azureStorageClient.GetTableReference(poisonMessageTableName); + string poisonMessageTableName = this.settings.TaskHubName.ToLowerInvariant() + "-poison"; + Table poisonMessagesTable = this.azureStorageClient.GetTableReference(poisonMessageTableName); await poisonMessagesTable.CreateIfNotExistsAsync(); await poisonMessagesTable.InsertAsync(poisonMessage); @@ -83,6 +84,12 @@ public async Task HandleIfPoisonMessageAsync(MessageData messageData) // since isPoison is `true`, we'll override the deserialized message w/ a suspend event messageData.TaskMessage.Event.IsPoison = true; + + string guidance = $"Queue message ID '{queueMessage.Id}' was dequeued {queueMessage.DequeueCount} times," + + $" which is greater than the threshold poison message threshold ({maxThreshold}). " + + $"The message has been moved to {poisonMessageTableName} for manual review. " + + $"This will fail the consuming orchestrator, activity, or entity"; + messageData.TaskMessage.Event.PoisonGuidance = guidance; } } diff --git a/src/DurableTask.Core/History/HistoryEvent.cs b/src/DurableTask.Core/History/HistoryEvent.cs index 022ebd87b..8b32d2ef1 100644 --- a/src/DurableTask.Core/History/HistoryEvent.cs +++ b/src/DurableTask.Core/History/HistoryEvent.cs @@ -98,5 +98,10 @@ protected HistoryEvent(int eventId) /// Gets or sets whether this is a poison message. /// public bool IsPoison { get; set; } = false; + + /// + /// Gets or sets user-facing details for why a message was labeled as poison. + /// + public string PoisonGuidance { get; set; } = ""; } } \ No newline at end of file diff --git a/src/DurableTask.Core/TaskActivityDispatcher.cs b/src/DurableTask.Core/TaskActivityDispatcher.cs index a03fdf65d..e4cc29352 100644 --- a/src/DurableTask.Core/TaskActivityDispatcher.cs +++ b/src/DurableTask.Core/TaskActivityDispatcher.cs @@ -194,9 +194,7 @@ await this.dispatchPipeline.RunAsync(dispatchContext, async _ => { if (scheduledEvent.IsPoison) { - // TODO: improve wording here - var errorMessage = $"The activity invocation request for '{scheduledEvent.Name}' was labeled a 'poison message'."; - var exception = new TaskFailureException(errorMessage, details: errorMessage); + var exception = new TaskFailureException(scheduledEvent.PoisonGuidance, details: scheduledEvent.PoisonGuidance); throw exception; } diff --git a/src/DurableTask.Core/TaskOrchestrationExecutor.cs b/src/DurableTask.Core/TaskOrchestrationExecutor.cs index 25e89504c..e9acef920 100644 --- a/src/DurableTask.Core/TaskOrchestrationExecutor.cs +++ b/src/DurableTask.Core/TaskOrchestrationExecutor.cs @@ -199,9 +199,7 @@ void ProcessEvent(HistoryEvent historyEvent) { if (historyEvent.IsPoison) { - // TODO: improve wording here - var errorMessage = $"The 'XXXX' was detected as a poison message."; - var terminationEvent = new ExecutionTerminatedEvent(-1, errorMessage); + var terminationEvent = new ExecutionTerminatedEvent(-1, historyEvent.PoisonGuidance); historyEvent = terminationEvent; var taskCompletionSource = new TaskCompletionSource(); From 9057bfd2b647fdad1c8eecd0d8d7741ae12b6072 Mon Sep 17 00:00:00 2001 From: David Justo Date: Tue, 16 Apr 2024 12:13:37 -0700 Subject: [PATCH 18/39] improve guidance --- src/DurableTask.AzureStorage/Messaging/TaskHubQueue.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/DurableTask.AzureStorage/Messaging/TaskHubQueue.cs b/src/DurableTask.AzureStorage/Messaging/TaskHubQueue.cs index e7a99d20f..d5b63a036 100644 --- a/src/DurableTask.AzureStorage/Messaging/TaskHubQueue.cs +++ b/src/DurableTask.AzureStorage/Messaging/TaskHubQueue.cs @@ -87,7 +87,7 @@ public async Task HandleIfPoisonMessageAsync(MessageData messageData) string guidance = $"Queue message ID '{queueMessage.Id}' was dequeued {queueMessage.DequeueCount} times," + $" which is greater than the threshold poison message threshold ({maxThreshold}). " + - $"The message has been moved to {poisonMessageTableName} for manual review. " + + $"The message has been moved to the '{poisonMessageTableName}' table for manual review. " + $"This will fail the consuming orchestrator, activity, or entity"; messageData.TaskMessage.Event.PoisonGuidance = guidance; } From 686682840cc6134101c8f8b98785cded23b179d2 Mon Sep 17 00:00:00 2001 From: David Justo Date: Tue, 16 Apr 2024 12:14:09 -0700 Subject: [PATCH 19/39] call out backend-specificness --- src/DurableTask.AzureStorage/Messaging/TaskHubQueue.cs | 1 + 1 file changed, 1 insertion(+) diff --git a/src/DurableTask.AzureStorage/Messaging/TaskHubQueue.cs b/src/DurableTask.AzureStorage/Messaging/TaskHubQueue.cs index d5b63a036..5a9511b20 100644 --- a/src/DurableTask.AzureStorage/Messaging/TaskHubQueue.cs +++ b/src/DurableTask.AzureStorage/Messaging/TaskHubQueue.cs @@ -85,6 +85,7 @@ public async Task HandleIfPoisonMessageAsync(MessageData messageData) // since isPoison is `true`, we'll override the deserialized message w/ a suspend event messageData.TaskMessage.Event.IsPoison = true; + // provide guidance, which is backend-specific string guidance = $"Queue message ID '{queueMessage.Id}' was dequeued {queueMessage.DequeueCount} times," + $" which is greater than the threshold poison message threshold ({maxThreshold}). " + $"The message has been moved to the '{poisonMessageTableName}' table for manual review. " + From b0d739cd782a3f0672102bde3270d673fb41ea49 Mon Sep 17 00:00:00 2001 From: David Justo Date: Tue, 16 Apr 2024 15:57:01 -0700 Subject: [PATCH 20/39] clean up PR --- src/DurableTask.AzureStorage/DurableTask.AzureStorage.csproj | 1 - src/DurableTask.AzureStorage/Messaging/TaskHubQueue.cs | 2 +- src/DurableTask.AzureStorage/Messaging/WorkItemQueue.cs | 1 - src/DurableTask.Core/DurableTask.Core.csproj | 1 - 4 files changed, 1 insertion(+), 4 deletions(-) diff --git a/src/DurableTask.AzureStorage/DurableTask.AzureStorage.csproj b/src/DurableTask.AzureStorage/DurableTask.AzureStorage.csproj index 8aafa49aa..05e7f8d8a 100644 --- a/src/DurableTask.AzureStorage/DurableTask.AzureStorage.csproj +++ b/src/DurableTask.AzureStorage/DurableTask.AzureStorage.csproj @@ -24,7 +24,6 @@ 1 $(MajorVersion).$(MinorVersion).$(PatchVersion) $(VersionPrefix).0 - pmh.2 $(VersionPrefix).$(FileVersionRevision) diff --git a/src/DurableTask.AzureStorage/Messaging/TaskHubQueue.cs b/src/DurableTask.AzureStorage/Messaging/TaskHubQueue.cs index 5a9511b20..1202ee86f 100644 --- a/src/DurableTask.AzureStorage/Messaging/TaskHubQueue.cs +++ b/src/DurableTask.AzureStorage/Messaging/TaskHubQueue.cs @@ -82,7 +82,7 @@ public async Task HandleIfPoisonMessageAsync(MessageData messageData) // delete from queue so it doesn't get processed again. await this.storageQueue.DeleteMessageAsync(queueMessage); - // since isPoison is `true`, we'll override the deserialized message w/ a suspend event + // since isPoison is `true`, we'll override the deserialized message messageData.TaskMessage.Event.IsPoison = true; // provide guidance, which is backend-specific diff --git a/src/DurableTask.AzureStorage/Messaging/WorkItemQueue.cs b/src/DurableTask.AzureStorage/Messaging/WorkItemQueue.cs index b83265201..cf1563b0c 100644 --- a/src/DurableTask.AzureStorage/Messaging/WorkItemQueue.cs +++ b/src/DurableTask.AzureStorage/Messaging/WorkItemQueue.cs @@ -44,7 +44,6 @@ public async Task GetMessageAsync(CancellationToken cancellationTok continue; } - // TODO: maybe the message manager should handle the poison? MessageData data = await this.messageManager.DeserializeQueueMessageAsync( queueMessage, this.storageQueue.Name); diff --git a/src/DurableTask.Core/DurableTask.Core.csproj b/src/DurableTask.Core/DurableTask.Core.csproj index 045d358ae..536dfc22b 100644 --- a/src/DurableTask.Core/DurableTask.Core.csproj +++ b/src/DurableTask.Core/DurableTask.Core.csproj @@ -20,7 +20,6 @@ 16 2 $(MajorVersion).$(MinorVersion).$(PatchVersion) - pmh.2 $(VersionPrefix).0 $(VersionPrefix).$(FileVersionRevision) From 71e0b36a5732db1d35357413b2a34bcdc0d2a8d2 Mon Sep 17 00:00:00 2001 From: David Justo Date: Tue, 16 Apr 2024 15:58:02 -0700 Subject: [PATCH 21/39] clean up csproj --- src/DurableTask.AzureStorage/DurableTask.AzureStorage.csproj | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/DurableTask.AzureStorage/DurableTask.AzureStorage.csproj b/src/DurableTask.AzureStorage/DurableTask.AzureStorage.csproj index 05e7f8d8a..98f7631c0 100644 --- a/src/DurableTask.AzureStorage/DurableTask.AzureStorage.csproj +++ b/src/DurableTask.AzureStorage/DurableTask.AzureStorage.csproj @@ -22,9 +22,9 @@ 1 17 1 - $(MajorVersion).$(MinorVersion).$(PatchVersion) + $(MajorVersion).$(MinorVersion).$(PatchVersion) $(VersionPrefix).0 - + $(VersionPrefix).$(FileVersionRevision) $(MajorVersion).$(MinorVersion).0.0 From 5934076650e4c93fadd7bbc153ff1ab7f199cc1b Mon Sep 17 00:00:00 2001 From: David Justo Date: Tue, 16 Apr 2024 16:03:02 -0700 Subject: [PATCH 22/39] indent csproj comment --- src/DurableTask.AzureStorage/DurableTask.AzureStorage.csproj | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/DurableTask.AzureStorage/DurableTask.AzureStorage.csproj b/src/DurableTask.AzureStorage/DurableTask.AzureStorage.csproj index 98f7631c0..be5b6e2bb 100644 --- a/src/DurableTask.AzureStorage/DurableTask.AzureStorage.csproj +++ b/src/DurableTask.AzureStorage/DurableTask.AzureStorage.csproj @@ -24,7 +24,7 @@ 1 $(MajorVersion).$(MinorVersion).$(PatchVersion) $(VersionPrefix).0 - + $(VersionPrefix).$(FileVersionRevision) $(MajorVersion).$(MinorVersion).0.0 From a94cc4e088f1d106f0477d451859cb7064dfb264 Mon Sep 17 00:00:00 2001 From: David Justo Date: Tue, 16 Apr 2024 16:37:05 -0700 Subject: [PATCH 23/39] remove unused import --- src/DurableTask.AzureStorage/Messaging/TaskHubQueue.cs | 1 - 1 file changed, 1 deletion(-) diff --git a/src/DurableTask.AzureStorage/Messaging/TaskHubQueue.cs b/src/DurableTask.AzureStorage/Messaging/TaskHubQueue.cs index 1202ee86f..64b57ce04 100644 --- a/src/DurableTask.AzureStorage/Messaging/TaskHubQueue.cs +++ b/src/DurableTask.AzureStorage/Messaging/TaskHubQueue.cs @@ -22,7 +22,6 @@ namespace DurableTask.AzureStorage.Messaging using DurableTask.AzureStorage.Storage; using DurableTask.Core; using DurableTask.Core.History; - using Microsoft.WindowsAzure.Storage.Queue.Protocol; using Microsoft.WindowsAzure.Storage.Table; abstract class TaskHubQueue From 37dbac47cbd9f2eb7f5af772d141625757a369d0 Mon Sep 17 00:00:00 2001 From: David Justo Date: Wed, 17 Apr 2024 17:36:57 -0700 Subject: [PATCH 24/39] have valid table-naming scheme --- src/DurableTask.AzureStorage/Messaging/TaskHubQueue.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/DurableTask.AzureStorage/Messaging/TaskHubQueue.cs b/src/DurableTask.AzureStorage/Messaging/TaskHubQueue.cs index 64b57ce04..8ea4d9f64 100644 --- a/src/DurableTask.AzureStorage/Messaging/TaskHubQueue.cs +++ b/src/DurableTask.AzureStorage/Messaging/TaskHubQueue.cs @@ -73,7 +73,7 @@ public async Task HandleIfPoisonMessageAsync(MessageData messageData) }; // add to poison table - string poisonMessageTableName = this.settings.TaskHubName.ToLowerInvariant() + "-poison"; + string poisonMessageTableName = this.settings.TaskHubName.ToLowerInvariant() + "Poison"; Table poisonMessagesTable = this.azureStorageClient.GetTableReference(poisonMessageTableName); await poisonMessagesTable.CreateIfNotExistsAsync(); await poisonMessagesTable.InsertAsync(poisonMessage); From 865aa208f59156f0d517205e4170b4f83969cedf Mon Sep 17 00:00:00 2001 From: David Justo Date: Wed, 17 Apr 2024 17:43:33 -0700 Subject: [PATCH 25/39] add log --- .../Messaging/TaskHubQueue.cs | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/src/DurableTask.AzureStorage/Messaging/TaskHubQueue.cs b/src/DurableTask.AzureStorage/Messaging/TaskHubQueue.cs index 8ea4d9f64..a5b6dcde7 100644 --- a/src/DurableTask.AzureStorage/Messaging/TaskHubQueue.cs +++ b/src/DurableTask.AzureStorage/Messaging/TaskHubQueue.cs @@ -90,6 +90,17 @@ public async Task HandleIfPoisonMessageAsync(MessageData messageData) $"The message has been moved to the '{poisonMessageTableName}' table for manual review. " + $"This will fail the consuming orchestrator, activity, or entity"; messageData.TaskMessage.Event.PoisonGuidance = guidance; + + this.settings.Logger.PoisonMessageDetected( + this.storageAccountName, + this.settings.TaskHubName, + messageData.TaskMessage.Event.EventType.ToString(), + messageData.TaskMessage.Event.EventId, + messageData.OriginalQueueMessage.Id, + messageData.TaskMessage.OrchestrationInstance.InstanceId, + messageData.TaskMessage.OrchestrationInstance.ExecutionId, + this.Name, + messageData.OriginalQueueMessage.DequeueCount); } } From 57bb966180e6222a5a543d1a9c8432d378c016f7 Mon Sep 17 00:00:00 2001 From: David Justo Date: Wed, 17 Apr 2024 17:50:02 -0700 Subject: [PATCH 26/39] add comments --- src/DurableTask.Core/History/HistoryEvent.cs | 1 + src/DurableTask.Core/TaskActivityDispatcher.cs | 4 ++++ src/DurableTask.Core/TaskOrchestrationExecutor.cs | 7 ++++++- 3 files changed, 11 insertions(+), 1 deletion(-) diff --git a/src/DurableTask.Core/History/HistoryEvent.cs b/src/DurableTask.Core/History/HistoryEvent.cs index 8b32d2ef1..29f799b57 100644 --- a/src/DurableTask.Core/History/HistoryEvent.cs +++ b/src/DurableTask.Core/History/HistoryEvent.cs @@ -101,6 +101,7 @@ protected HistoryEvent(int eventId) /// /// Gets or sets user-facing details for why a message was labeled as poison. + /// This is to be set by each storage provider. /// public string PoisonGuidance { get; set; } = ""; } diff --git a/src/DurableTask.Core/TaskActivityDispatcher.cs b/src/DurableTask.Core/TaskActivityDispatcher.cs index e4cc29352..308e7dedc 100644 --- a/src/DurableTask.Core/TaskActivityDispatcher.cs +++ b/src/DurableTask.Core/TaskActivityDispatcher.cs @@ -194,6 +194,10 @@ await this.dispatchPipeline.RunAsync(dispatchContext, async _ => { if (scheduledEvent.IsPoison) { + // if the activity is "poison", then we should not executed again. Instead, we'll manually fail the activity + // by throwing an exception on behalf of the user-code. In the exception, we provide the storage-provider's guidance + // on how to deal with the poison message. + // TODO: this exception seems to not be getting de-serialized correctly on the orchestrator end. Why? var exception = new TaskFailureException(scheduledEvent.PoisonGuidance, details: scheduledEvent.PoisonGuidance); throw exception; } diff --git a/src/DurableTask.Core/TaskOrchestrationExecutor.cs b/src/DurableTask.Core/TaskOrchestrationExecutor.cs index e9acef920..9d6039169 100644 --- a/src/DurableTask.Core/TaskOrchestrationExecutor.cs +++ b/src/DurableTask.Core/TaskOrchestrationExecutor.cs @@ -199,12 +199,17 @@ void ProcessEvent(HistoryEvent historyEvent) { if (historyEvent.IsPoison) { + // If the message is labeled as "poison", then we should avoid processing it again. + // Therefore, we replace the event "in place" with an "ExecutionTerminatedEvent", so the + // orchestrator stops immediately. + var terminationEvent = new ExecutionTerminatedEvent(-1, historyEvent.PoisonGuidance); historyEvent = terminationEvent; + // since replay is not guaranteed, we need to populate `this.result` + // with a completed task var taskCompletionSource = new TaskCompletionSource(); taskCompletionSource.SetResult(""); - this.result = taskCompletionSource.Task; } From 6c3bb7980c0bd8b6b2d559996193d7f9624b783a Mon Sep 17 00:00:00 2001 From: David Justo Date: Wed, 17 Apr 2024 18:45:41 -0700 Subject: [PATCH 27/39] create valid serializable activity failure --- src/DurableTask.Core/TaskActivityDispatcher.cs | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/src/DurableTask.Core/TaskActivityDispatcher.cs b/src/DurableTask.Core/TaskActivityDispatcher.cs index 308e7dedc..22782e8d4 100644 --- a/src/DurableTask.Core/TaskActivityDispatcher.cs +++ b/src/DurableTask.Core/TaskActivityDispatcher.cs @@ -22,6 +22,7 @@ namespace DurableTask.Core using DurableTask.Core.History; using DurableTask.Core.Logging; using DurableTask.Core.Middleware; + using DurableTask.Core.Serializing; using DurableTask.Core.Tracing; /// @@ -197,9 +198,14 @@ await this.dispatchPipeline.RunAsync(dispatchContext, async _ => // if the activity is "poison", then we should not executed again. Instead, we'll manually fail the activity // by throwing an exception on behalf of the user-code. In the exception, we provide the storage-provider's guidance // on how to deal with the poison message. - // TODO: this exception seems to not be getting de-serialized correctly on the orchestrator end. Why? - var exception = new TaskFailureException(scheduledEvent.PoisonGuidance, details: scheduledEvent.PoisonGuidance); - throw exception; + + // We need to account for all possible deserialization modes, so we construct an exception valid in all modes. + // TODO: revise - this is clunky + var exception = new Exception(scheduledEvent.PoisonGuidance); + var failureDetails = new FailureDetails(exception); + var details = Utils.SerializeCause(exception, JsonDataConverter.Default); + var taskFailure = new TaskFailureException(details, exception, details).WithFailureDetails(failureDetails); + throw taskFailure; } string? output = await taskActivity.RunAsync(context, scheduledEvent.Input); From b15dbb5b9d32a1463743e24d87b12a075918c652 Mon Sep 17 00:00:00 2001 From: David Justo Date: Thu, 13 Jun 2024 20:46:10 -0700 Subject: [PATCH 28/39] handle de-serialization errors as well --- .../EntityTrackingStoreQueries.cs | 5 +- .../MessageManager.cs | 10 +-- .../Messaging/ControlQueue.cs | 47 ++++++++----- .../Messaging/TaskHubQueue.cs | 70 +++++++++++++++---- .../Messaging/WorkItemQueue.cs | 35 +++++++--- 5 files changed, 122 insertions(+), 45 deletions(-) diff --git a/src/DurableTask.AzureStorage/EntityTrackingStoreQueries.cs b/src/DurableTask.AzureStorage/EntityTrackingStoreQueries.cs index 3ba243908..b37cc8e6c 100644 --- a/src/DurableTask.AzureStorage/EntityTrackingStoreQueries.cs +++ b/src/DurableTask.AzureStorage/EntityTrackingStoreQueries.cs @@ -235,9 +235,10 @@ bool OrchestrationIsRunning(OrchestrationStatus? status) { // first, retrieve the entity scheduler state (= input of the orchestration state), possibly from blob storage. string serializedSchedulerState; - if (MessageManager.TryGetLargeMessageReference(state.Input, out Uri blobUrl)) + if (MessageManager.TryGetLargeMessageReference(state.Input, out Uri? blobUrl)) { - serializedSchedulerState = await this.messageManager.DownloadAndDecompressAsBytesAsync(blobUrl); + // we know blobUrl is not null because TryGetLargeMessageReference returned true + serializedSchedulerState = await this.messageManager.DownloadAndDecompressAsBytesAsync(blobUrl!); } else { diff --git a/src/DurableTask.AzureStorage/MessageManager.cs b/src/DurableTask.AzureStorage/MessageManager.cs index ea042a27d..64225814e 100644 --- a/src/DurableTask.AzureStorage/MessageManager.cs +++ b/src/DurableTask.AzureStorage/MessageManager.cs @@ -10,7 +10,7 @@ // See the License for the specific language governing permissions and // limitations under the License. // ---------------------------------------------------------------------------------- - +#nullable enable namespace DurableTask.AzureStorage { using System; @@ -118,9 +118,10 @@ public async Task SerializeMessageDataAsync(MessageData messageData) /// Actual string representation of message. public async Task FetchLargeMessageIfNecessary(string message) { - if (TryGetLargeMessageReference(message, out Uri blobUrl)) + if (TryGetLargeMessageReference(message, out Uri? blobUrl)) { - return await this.DownloadAndDecompressAsBytesAsync(blobUrl); + // we know blobUrl is not null because TryGetLargeMessageReference returned true + return await this.DownloadAndDecompressAsBytesAsync(blobUrl!); } else { @@ -128,7 +129,7 @@ public async Task FetchLargeMessageIfNecessary(string message) } } - internal static bool TryGetLargeMessageReference(string messagePayload, out Uri blobUrl) + internal static bool TryGetLargeMessageReference(string messagePayload, out Uri? blobUrl) { if (Uri.IsWellFormedUriString(messagePayload, UriKind.Absolute)) { @@ -314,6 +315,7 @@ public async Task DeleteLargeMessageBlobs(string sanitizedInstanceId) return storageOperationCount; } } +#nullable disable #if NETSTANDARD2_0 class TypeNameSerializationBinder : ISerializationBinder diff --git a/src/DurableTask.AzureStorage/Messaging/ControlQueue.cs b/src/DurableTask.AzureStorage/Messaging/ControlQueue.cs index e0744b3ee..681d39b2a 100644 --- a/src/DurableTask.AzureStorage/Messaging/ControlQueue.cs +++ b/src/DurableTask.AzureStorage/Messaging/ControlQueue.cs @@ -105,30 +105,27 @@ await batch.ParallelForEachAsync(async delegate (QueueMessage queueMessage) MessageData messageData; try { + // try to de-serialize message messageData = await this.messageManager.DeserializeQueueMessageAsync( queueMessage, this.storageQueue.Name); + // if successful, check if it's a poison message. If so, we handle it + // and log metadata about it as the de-serialization succeeded. await this.HandleIfPoisonMessageAsync(messageData); - } - catch (Exception e) + catch (Exception exception) { - // We have limited information about the details of the message - // since we failed to deserialize it. - this.settings.Logger.MessageFailure( - this.storageAccountName, - this.settings.TaskHubName, - queueMessage.Id /* MessageId */, - string.Empty /* InstanceId */, - string.Empty /* ExecutionId */, - this.storageQueue.Name, - string.Empty /* EventType */, - 0 /* TaskEventId */, - e.ToString()); - - // Abandon the message so we can try it again later. - await this.AbandonMessageAsync(queueMessage); + // Deserialization errors can be persistent, so we check if this is a poison message. + bool isPoisonMessage = await this.TryHandlingDeserializationPoisonMessage(queueMessage, exception); + if (isPoisonMessage) + { + // we have already handled the poison message, so we move on. + return; + } + + // This is not a poison message (at least not yet), so we abandon it to retry later. + await this.AbandonMessageAsync(queueMessage, exception); return; } @@ -193,8 +190,22 @@ await batch.ParallelForEachAsync(async delegate (QueueMessage queueMessage) } // This overload is intended for cases where we aren't able to deserialize an instance of MessageData. - public Task AbandonMessageAsync(QueueMessage queueMessage) + public Task AbandonMessageAsync(QueueMessage queueMessage, Exception exception) { + + // We have limited information about the details of the message + // since we failed to deserialize it. + this.settings.Logger.MessageFailure( + this.storageAccountName, + this.settings.TaskHubName, + queueMessage.Id /* MessageId */, + string.Empty /* InstanceId */, + string.Empty /* ExecutionId */, + this.storageQueue.Name, + string.Empty /* EventType */, + 0 /* TaskEventId */, + exception.ToString()); + this.stats.PendingOrchestratorMessages.TryRemove(queueMessage.Id, out _); return base.AbandonMessageAsync( queueMessage, diff --git a/src/DurableTask.AzureStorage/Messaging/TaskHubQueue.cs b/src/DurableTask.AzureStorage/Messaging/TaskHubQueue.cs index a5b6dcde7..1d9644cb0 100644 --- a/src/DurableTask.AzureStorage/Messaging/TaskHubQueue.cs +++ b/src/DurableTask.AzureStorage/Messaging/TaskHubQueue.cs @@ -62,20 +62,30 @@ public async Task HandleIfPoisonMessageAsync(MessageData messageData) { var queueMessage = messageData.OriginalQueueMessage; var maxThreshold = this.settings.PoisonMessageDeuqueCountThreshold; + if (queueMessage.DequeueCount > maxThreshold) { + string poisonMessageTableName = this.settings.TaskHubName.ToLowerInvariant() + "Poison"; + Table poisonMessagesTable = this.azureStorageClient.GetTableReference(poisonMessageTableName); + await poisonMessagesTable.CreateIfNotExistsAsync(); + + // provide guidance, which is backend-specific + string guidance = $"Queue message ID '{queueMessage.Id}' was dequeued {queueMessage.DequeueCount} times," + + $" which is greater than the threshold poison message threshold ({maxThreshold}). " + + $"The message has been moved to the '{poisonMessageTableName}' table for manual review. " + + $"This will fail the consuming orchestrator, activity, or entity"; + messageData.TaskMessage.Event.PoisonGuidance = guidance; + + // add to poison table var poisonMessage = new DynamicTableEntity(queueMessage.Id, this.Name) { Properties = { - ["RawMessage"] = new EntityProperty(queueMessage.Message) + ["RawMessage"] = new EntityProperty(queueMessage.Message), + ["Reason"] = new EntityProperty(guidance) } }; - // add to poison table - string poisonMessageTableName = this.settings.TaskHubName.ToLowerInvariant() + "Poison"; - Table poisonMessagesTable = this.azureStorageClient.GetTableReference(poisonMessageTableName); - await poisonMessagesTable.CreateIfNotExistsAsync(); await poisonMessagesTable.InsertAsync(poisonMessage); // delete from queue so it doesn't get processed again. @@ -84,13 +94,6 @@ public async Task HandleIfPoisonMessageAsync(MessageData messageData) // since isPoison is `true`, we'll override the deserialized message messageData.TaskMessage.Event.IsPoison = true; - // provide guidance, which is backend-specific - string guidance = $"Queue message ID '{queueMessage.Id}' was dequeued {queueMessage.DequeueCount} times," + - $" which is greater than the threshold poison message threshold ({maxThreshold}). " + - $"The message has been moved to the '{poisonMessageTableName}' table for manual review. " + - $"This will fail the consuming orchestrator, activity, or entity"; - messageData.TaskMessage.Event.PoisonGuidance = guidance; - this.settings.Logger.PoisonMessageDetected( this.storageAccountName, this.settings.TaskHubName, @@ -104,6 +107,49 @@ public async Task HandleIfPoisonMessageAsync(MessageData messageData) } } + public async Task TryHandlingDeserializationPoisonMessage(QueueMessage queueMessage, Exception deserializationException) + { + + var maxThreshold = this.settings.PoisonMessageDeuqueCountThreshold; + bool isPoisonMessage = queueMessage.DequeueCount > maxThreshold; + if (isPoisonMessage) + { + isPoisonMessage = true; + string guidance = $"Queue message ID '{queueMessage.Id}' was dequeued {queueMessage.DequeueCount} times," + + $" which is greater than the threshold poison message threshold ({maxThreshold}). " + + $"A de-serialization error ocurred: \n {deserializationException}"; + var poisonMessage = new DynamicTableEntity(queueMessage.Id, this.Name) + { + Properties = + { + ["RawMessage"] = new EntityProperty(queueMessage.Message), + ["Reason"] = new EntityProperty(guidance) + } + }; + + // add to poison table + string poisonMessageTableName = this.settings.TaskHubName.ToLowerInvariant() + "Poison"; + Table poisonMessagesTable = this.azureStorageClient.GetTableReference(poisonMessageTableName); + await poisonMessagesTable.CreateIfNotExistsAsync(); + await poisonMessagesTable.InsertAsync(poisonMessage); + + // delete from queue so it doesn't get processed again. + await this.storageQueue.DeleteMessageAsync(queueMessage); + + this.settings.Logger.PoisonMessageDetected( + this.storageAccountName, + this.settings.TaskHubName, + string.Empty, + 0, + string.Empty, + string.Empty, + string.Empty, + this.Name, + queueMessage.DequeueCount); + } + return isPoisonMessage; + } + public string Name => this.storageQueue.Name; public Uri Uri => this.storageQueue.Uri; diff --git a/src/DurableTask.AzureStorage/Messaging/WorkItemQueue.cs b/src/DurableTask.AzureStorage/Messaging/WorkItemQueue.cs index cf1563b0c..eaf01402b 100644 --- a/src/DurableTask.AzureStorage/Messaging/WorkItemQueue.cs +++ b/src/DurableTask.AzureStorage/Messaging/WorkItemQueue.cs @@ -10,7 +10,7 @@ // See the License for the specific language governing permissions and // limitations under the License. // ---------------------------------------------------------------------------------- - +#nullable enable namespace DurableTask.AzureStorage.Messaging { using System; @@ -30,13 +30,13 @@ public WorkItemQueue( protected override TimeSpan MessageVisibilityTimeout => this.settings.WorkItemQueueVisibilityTimeout; - public async Task GetMessageAsync(CancellationToken cancellationToken) + public async Task GetMessageAsync(CancellationToken cancellationToken) { while (!cancellationToken.IsCancellationRequested) { try { - QueueMessage queueMessage = await this.storageQueue.GetMessageAsync(this.settings.WorkItemQueueVisibilityTimeout, cancellationToken); + QueueMessage? queueMessage = await this.storageQueue.GetMessageAsync(this.settings.WorkItemQueueVisibilityTimeout, cancellationToken); if (queueMessage == null) { @@ -44,13 +44,30 @@ public async Task GetMessageAsync(CancellationToken cancellationTok continue; } - MessageData data = await this.messageManager.DeserializeQueueMessageAsync( - queueMessage, - this.storageQueue.Name); - await this.HandleIfPoisonMessageAsync(data); + try + { + MessageData data = await this.messageManager.DeserializeQueueMessageAsync( + queueMessage, + this.storageQueue.Name); + + // if successful, check if it's a poison message. If so, we handle it + // and log metadata about it as the de-serialization succeeded. + await this.HandleIfPoisonMessageAsync(data); + this.backoffHelper.Reset(); + return data; + } + catch (Exception exception) + { + // Deserialization errors can be persistent, so we check if this is a poison message. + bool isPoisonMessage = await this.TryHandlingDeserializationPoisonMessage(queueMessage, exception); + if (isPoisonMessage) + { + // we have already handled the poison message, so we move on. + continue; + } + } + - this.backoffHelper.Reset(); - return data; } catch (Exception e) { From cbb827490ac58289af773cd7bc9aa049977a4fd3 Mon Sep 17 00:00:00 2001 From: David Justo Date: Mon, 24 Jun 2024 19:44:56 -0700 Subject: [PATCH 29/39] add version suffix --- src/DurableTask.AzureStorage/DurableTask.AzureStorage.csproj | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/DurableTask.AzureStorage/DurableTask.AzureStorage.csproj b/src/DurableTask.AzureStorage/DurableTask.AzureStorage.csproj index be5b6e2bb..99c068124 100644 --- a/src/DurableTask.AzureStorage/DurableTask.AzureStorage.csproj +++ b/src/DurableTask.AzureStorage/DurableTask.AzureStorage.csproj @@ -21,8 +21,9 @@ 1 17 - 1 + 2 $(MajorVersion).$(MinorVersion).$(PatchVersion) + poisonmessagehandler.1 $(VersionPrefix).0 $(VersionPrefix).$(FileVersionRevision) From 16f38f16c6d781c8cb0b38c18bc9e8849a30f463 Mon Sep 17 00:00:00 2001 From: David Justo Date: Mon, 24 Jun 2024 20:00:24 -0700 Subject: [PATCH 30/39] rev patch --- src/DurableTask.AzureStorage/DurableTask.AzureStorage.csproj | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/DurableTask.AzureStorage/DurableTask.AzureStorage.csproj b/src/DurableTask.AzureStorage/DurableTask.AzureStorage.csproj index f2ea59a4b..ac910e61a 100644 --- a/src/DurableTask.AzureStorage/DurableTask.AzureStorage.csproj +++ b/src/DurableTask.AzureStorage/DurableTask.AzureStorage.csproj @@ -21,7 +21,7 @@ 1 17 - 3 + 4 $(MajorVersion).$(MinorVersion).$(PatchVersion) poisonmessagehandler.1 $(VersionPrefix).0 From 74dc0f7491cc03327c29a246872bca734c1f6d86 Mon Sep 17 00:00:00 2001 From: David Justo Date: Tue, 25 Jun 2024 08:00:26 -0700 Subject: [PATCH 31/39] add dtfx.core --- src/DurableTask.AzureStorage/DurableTask.AzureStorage.csproj | 2 +- src/DurableTask.Core/DurableTask.Core.csproj | 3 ++- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/src/DurableTask.AzureStorage/DurableTask.AzureStorage.csproj b/src/DurableTask.AzureStorage/DurableTask.AzureStorage.csproj index ac910e61a..79be953de 100644 --- a/src/DurableTask.AzureStorage/DurableTask.AzureStorage.csproj +++ b/src/DurableTask.AzureStorage/DurableTask.AzureStorage.csproj @@ -23,7 +23,7 @@ 17 4 $(MajorVersion).$(MinorVersion).$(PatchVersion) - poisonmessagehandler.1 + poisonmessagehandler.3 $(VersionPrefix).0 $(VersionPrefix).$(FileVersionRevision) diff --git a/src/DurableTask.Core/DurableTask.Core.csproj b/src/DurableTask.Core/DurableTask.Core.csproj index 25c247e1b..39cbc748f 100644 --- a/src/DurableTask.Core/DurableTask.Core.csproj +++ b/src/DurableTask.Core/DurableTask.Core.csproj @@ -18,8 +18,9 @@ 2 17 - 1 + 3 $(MajorVersion).$(MinorVersion).$(PatchVersion) + poisonmessagehandler.3 $(VersionPrefix).0 $(VersionPrefix).$(FileVersionRevision) From 584cf8da732af3fb7f7a7a6c985e7759086a6e27 Mon Sep 17 00:00:00 2001 From: David Justo Date: Thu, 27 Jun 2024 10:35:23 -0700 Subject: [PATCH 32/39] merge mixed deserializtion hotfix --- .../DataContractJsonConverter.cs | 57 ++--- .../DurableTask.AzureStorage.csproj | 2 +- .../MessageManager.cs | 10 +- .../Messaging/ControlQueue.cs | 205 ++++++++++++++++++ .../DataContractJsonConverterTests.cs | 20 ++ 5 files changed, 254 insertions(+), 40 deletions(-) diff --git a/src/DurableTask.AzureStorage/DataContractJsonConverter.cs b/src/DurableTask.AzureStorage/DataContractJsonConverter.cs index bd0ac265c..e6727d861 100644 --- a/src/DurableTask.AzureStorage/DataContractJsonConverter.cs +++ b/src/DurableTask.AzureStorage/DataContractJsonConverter.cs @@ -29,6 +29,8 @@ namespace DurableTask.AzureStorage /// internal class DataContractJsonConverter : JsonConverter { + public JsonSerializer alternativeSerializer = null; + public override bool CanConvert(Type objectType) { if (objectType == null) @@ -59,15 +61,24 @@ public override object ReadJson( throw new ArgumentNullException(nameof(serializer)); } - using (var stream = new MemoryStream()) - using (var writer = new StreamWriter(stream)) - using (var jsonWriter = new JsonTextWriter(writer)) + // JsonReader is forward only, need to make a copy so we can read it twice. + using var stream = new MemoryStream(); + using var writer = new StreamWriter(stream); + using var jsonWriter = new JsonTextWriter(writer); + jsonWriter.WriteToken(reader, writeChildren: true); + jsonWriter.Flush(); + stream.Position = 0; + + try + { + using var reader2 = new JsonTextReader(new StreamReader(stream)); + reader2.CloseInput = false; + return this.alternativeSerializer.Deserialize(reader2, objectType); + } + catch { - jsonWriter.WriteToken(reader, writeChildren: true); - jsonWriter.Flush(); stream.Position = 0; - - var contractSerializer = CreateSerializer(objectType, serializer); + DataContractJsonSerializer contractSerializer = CreateSerializer(objectType, serializer); return contractSerializer.ReadObject(stream); } } @@ -75,34 +86,8 @@ public override object ReadJson( /// public override void WriteJson(JsonWriter writer, object value, JsonSerializer serializer) { - if (writer == null) - { - throw new ArgumentNullException(nameof(writer)); - } - - if (value == null) - { - writer.WriteNull(); - return; - } - - if (serializer == null) - { - throw new ArgumentNullException(nameof(serializer)); - } - - using (var memoryStream = new MemoryStream()) - { - var contractSerializer = CreateSerializer(value.GetType(), serializer); - contractSerializer.WriteObject(memoryStream, value); - memoryStream.Position = 0; - - using (var streamReader = new StreamReader(memoryStream)) - using (var jsonReader = new JsonTextReader(streamReader)) - { - writer.WriteToken(jsonReader, writeChildren: true); - } - } + // Ignore data contract, use Newtonsoft + this.alternativeSerializer.Serialize(writer, value); } private static DataContractJsonSerializer CreateSerializer(Type type, JsonSerializer serializer) @@ -115,4 +100,4 @@ private static DataContractJsonSerializer CreateSerializer(Type type, JsonSerial }); } } -} +} \ No newline at end of file diff --git a/src/DurableTask.AzureStorage/DurableTask.AzureStorage.csproj b/src/DurableTask.AzureStorage/DurableTask.AzureStorage.csproj index 79be953de..c0a81b6d9 100644 --- a/src/DurableTask.AzureStorage/DurableTask.AzureStorage.csproj +++ b/src/DurableTask.AzureStorage/DurableTask.AzureStorage.csproj @@ -23,7 +23,7 @@ 17 4 $(MajorVersion).$(MinorVersion).$(PatchVersion) - poisonmessagehandler.3 + poisonmessagehandler.4 $(VersionPrefix).0 $(VersionPrefix).$(FileVersionRevision) diff --git a/src/DurableTask.AzureStorage/MessageManager.cs b/src/DurableTask.AzureStorage/MessageManager.cs index eda03dc59..1345f6121 100644 --- a/src/DurableTask.AzureStorage/MessageManager.cs +++ b/src/DurableTask.AzureStorage/MessageManager.cs @@ -50,7 +50,7 @@ public MessageManager( { this.settings = settings; this.azureStorageClient = azureStorageClient; - this.blobContainer = this.azureStorageClient.GetBlobContainerReference(blobContainerName); + this.blobContainer = this.azureStorageClient?.GetBlobContainerReference(blobContainerName); this.taskMessageSerializerSettings = new JsonSerializerSettings { TypeNameHandling = TypeNameHandling.Objects, @@ -61,9 +61,13 @@ public MessageManager( #endif }; - if (this.settings.UseDataContractSerialization) + JsonSerializer newtonSoftSerializer = JsonSerializer.Create(taskMessageSerializerSettings); + + if (this.settings.UseDataContractSerialization) // for hotfix to work, set setting to `true` { - this.taskMessageSerializerSettings.Converters.Add(new DataContractJsonConverter()); + var dataConverter = new DataContractJsonConverter(); + dataConverter.alternativeSerializer = newtonSoftSerializer; + this.taskMessageSerializerSettings.Converters.Add(dataConverter); } // We _need_ to create the Json serializer after providing the data converter, diff --git a/src/DurableTask.AzureStorage/Messaging/ControlQueue.cs b/src/DurableTask.AzureStorage/Messaging/ControlQueue.cs index 681d39b2a..d68efee19 100644 --- a/src/DurableTask.AzureStorage/Messaging/ControlQueue.cs +++ b/src/DurableTask.AzureStorage/Messaging/ControlQueue.cs @@ -46,6 +46,199 @@ public ControlQueue( protected override TimeSpan MessageVisibilityTimeout => this.settings.ControlQueueVisibilityTimeout; + internal async Task FetchInstanceStatusInternalAsync(string instanceId, bool fetchInput) + { + if (instanceId == null) + { + throw new ArgumentNullException(nameof(instanceId)); + } + + var queryCondition = new OrchestrationInstanceStatusQueryCondition + { + InstanceId = instanceId, + FetchInput = fetchInput, + }; + + string instancesTableName = settings.InstanceTableName; + + var instancesTable = this.azureStorageClient.GetTableReference(instancesTableName); + + var tableEntitiesResponseInfo = await instancesTable.ExecuteQueryAsync(queryCondition.ToTableQuery()); + + var tableEntity = tableEntitiesResponseInfo.ReturnedEntities.FirstOrDefault(); + + OrchestrationState? orchestrationState = null; + if (tableEntity != null) + { + orchestrationState = await this.ConvertFromAsync(tableEntity); + } + + this.settings.Logger.FetchedInstanceStatus( + this.storageAccountName, + this.settings.TaskHubName, + instanceId, + orchestrationState?.OrchestrationInstance.ExecutionId ?? string.Empty, + orchestrationState?.OrchestrationStatus.ToString() ?? "NotFound", + tableEntitiesResponseInfo.ElapsedMilliseconds); + + if (tableEntity == null || orchestrationState == null) + { + return null; + } + + return new InstanceStatus(orchestrationState, tableEntity.ETag); + } + + async Task ConvertFromAsync(DynamicTableEntity tableEntity) + { + var orchestrationInstanceStatus = await CreateOrchestrationInstanceStatusAsync(tableEntity.Properties); + var instanceId = KeySanitation.UnescapePartitionKey(tableEntity.PartitionKey); + return await ConvertFromAsync(orchestrationInstanceStatus, instanceId); + } + + private async Task CreateOrchestrationInstanceStatusAsync(IDictionary properties) + { + var instance = new OrchestrationInstanceStatus(); + EntityProperty property; + + if (properties.TryGetValue(nameof(OrchestrationInstanceStatus.ExecutionId), out property)) + { + instance.ExecutionId = property.StringValue; + } + + if (properties.TryGetValue(nameof(OrchestrationInstanceStatus.Name), out property)) + { + instance.Name = property.StringValue; + } + + if (properties.TryGetValue(nameof(OrchestrationInstanceStatus.Version), out property)) + { + instance.Version = property.StringValue; + } + + if (properties.TryGetValue(nameof(OrchestrationInstanceStatus.Input), out property)) + { + instance.Input = property.StringValue; + } + + if (properties.TryGetValue(nameof(OrchestrationInstanceStatus.Output), out property)) + { + instance.Output = property.StringValue; + } + + if (properties.TryGetValue(nameof(OrchestrationInstanceStatus.CustomStatus), out property)) + { + instance.CustomStatus = property.StringValue; + } + + if (properties.TryGetValue(nameof(OrchestrationInstanceStatus.CreatedTime), out property) && property.DateTime is { } createdTime) + { + instance.CreatedTime = createdTime; + } + + if (properties.TryGetValue(nameof(OrchestrationInstanceStatus.LastUpdatedTime), out property) && property.DateTime is { } lastUpdatedTime) + { + instance.LastUpdatedTime = lastUpdatedTime; + } + + if (properties.TryGetValue(nameof(OrchestrationInstanceStatus.CompletedTime), out property)) + { + instance.CompletedTime = property.DateTime; + } + + if (properties.TryGetValue(nameof(OrchestrationInstanceStatus.RuntimeStatus), out property)) + { + instance.RuntimeStatus = property.StringValue; + } + + if (properties.TryGetValue(nameof(OrchestrationInstanceStatus.ScheduledStartTime), out property)) + { + instance.ScheduledStartTime = property.DateTime; + } + + if (properties.TryGetValue(nameof(OrchestrationInstanceStatus.Generation), out property) && property.Int32Value is { } generation) + { + instance.Generation = generation; + } + + if (properties.TryGetValue(nameof(OrchestrationInstanceStatus.Tags), out property) && property.StringValue is { Length: > 0 } tags) + { + instance.Tags = TagsSerializer.Deserialize(tags); + } + else if (properties.TryGetValue(nameof(OrchestrationInstanceStatus.Tags) + "BlobName", out property) && property.StringValue is { Length: > 0 } blob) + { + var blobContents = await messageManager.DownloadAndDecompressAsBytesAsync(blob); + instance.Tags = TagsSerializer.Deserialize(blobContents); + } + + return instance; + } + + async Task ConvertFromAsync(OrchestrationInstanceStatus orchestrationInstanceStatus, string instanceId) + { + var orchestrationState = new OrchestrationState(); + if (!Enum.TryParse(orchestrationInstanceStatus.RuntimeStatus, out orchestrationState.OrchestrationStatus)) + { + // This is not expected, but could happen if there is invalid data in the Instances table. + orchestrationState.OrchestrationStatus = (OrchestrationStatus)(-1); + } + + orchestrationState.OrchestrationInstance = new OrchestrationInstance + { + InstanceId = instanceId, + ExecutionId = orchestrationInstanceStatus.ExecutionId, + }; + + orchestrationState.Name = orchestrationInstanceStatus.Name; + orchestrationState.Version = orchestrationInstanceStatus.Version; + orchestrationState.Status = orchestrationInstanceStatus.CustomStatus; + orchestrationState.CreatedTime = orchestrationInstanceStatus.CreatedTime; + orchestrationState.CompletedTime = orchestrationInstanceStatus.CompletedTime.GetValueOrDefault(); + orchestrationState.LastUpdatedTime = orchestrationInstanceStatus.LastUpdatedTime; + orchestrationState.Input = orchestrationInstanceStatus.Input; + orchestrationState.Output = orchestrationInstanceStatus.Output; + orchestrationState.ScheduledStartTime = orchestrationInstanceStatus.ScheduledStartTime; + orchestrationState.Generation = orchestrationInstanceStatus.Generation; + orchestrationState.Tags = orchestrationInstanceStatus.Tags; + + if (this.settings.FetchLargeMessageDataEnabled) + { + if (MessageManager.TryGetLargeMessageReference(orchestrationState.Input, out Uri blobUrl)) + { + string json = await this.messageManager.DownloadAndDecompressAsBytesAsync(blobUrl); + + // Depending on which blob this is, we interpret it differently. + if (blobUrl.AbsolutePath.EndsWith("ExecutionStarted.json.gz")) + { + // The downloaded content is an ExecutedStarted message payload that + // was created when the orchestration was started. + MessageData msg = this.messageManager.DeserializeMessageData(json); + if (msg?.TaskMessage?.Event is ExecutionStartedEvent startEvent) + { + orchestrationState.Input = startEvent.Input; + } + else + { + this.settings.Logger.GeneralWarning( + this.storageAccountName, + this.settings.TaskHubName, + $"Orchestration input blob URL '{blobUrl}' contained unrecognized data.", + instanceId); + } + } + else + { + // The downloaded content is the raw input JSON + orchestrationState.Input = json; + } + } + + orchestrationState.Output = await this.messageManager.FetchLargeMessageIfNecessary(orchestrationState.Output); + } + + return orchestrationState; + } + public async Task> GetMessagesAsync(CancellationToken cancellationToken) { using (var linkedCts = CancellationTokenSource.CreateLinkedTokenSource(this.releaseCancellationToken, cancellationToken)) @@ -113,6 +306,18 @@ await batch.ParallelForEachAsync(async delegate (QueueMessage queueMessage) // if successful, check if it's a poison message. If so, we handle it // and log metadata about it as the de-serialization succeeded. await this.HandleIfPoisonMessageAsync(messageData); + + var instanceId = messageData.TaskMessage.OrchestrationInstance.InstanceId; + InstanceStatus? status = await FetchInstanceStatusInternalAsync(instanceId, false); + if (status != null) + { + if (status.State.OrchestrationStatus == OrchestrationStatus.Terminated) + { + // delete the message, the orchestration is terminated, we won't load it's history + // TODO: possibly do not delete, but move to poison message table? + await this.InnerQueue.DeleteMessageAsync(queueMessage); + } + } } catch (Exception exception) { diff --git a/test/DurableTask.AzureStorage.Tests/DataContractJsonConverterTests.cs b/test/DurableTask.AzureStorage.Tests/DataContractJsonConverterTests.cs index fa28622f6..998d9195b 100644 --- a/test/DurableTask.AzureStorage.Tests/DataContractJsonConverterTests.cs +++ b/test/DurableTask.AzureStorage.Tests/DataContractJsonConverterTests.cs @@ -12,12 +12,32 @@ public class DataContractJsonConverterTests { private static readonly JsonSerializerSettings serializerSettings = new JsonSerializerSettings { + TypeNameHandling = TypeNameHandling.Objects, Converters = { new DataContractJsonConverter(), + new DataContractJsonConverter() + { + alternativeSerializer = JsonSerializer.Create(new JsonSerializerSettings + { + TypeNameHandling = TypeNameHandling.Objects, + }), + }, } }; + [TestMethod] + public void Fallback_Succeeds() + { + var messageManager = new MessageManager( + new AzureStorageOrchestrationServiceSettings() { UseDataContractSerialization = true }, + null, + "test"); + string json = @"{""$type"":""DurableTask.AzureStorage.MessageData"",""ActivityId"":""03d10904-8a30-4bc5-9659-3c2c46b9435c"",""TaskMessage"":{""Event"":{""__type"":""TimerFiredEvent:#DurableTask.Core.History"",""EventId"":-1,""EventType"":11,""IsPlayed"":false,""Timestamp"":""2024-03-14T16:00:05.1285428Z"",""FireAt"":""2024-03-14T19:00:05.0563363Z"",""TimerId"":1},""OrchestrationInstance"":{""ExecutionId"":""8c0384066f414e1e9f4f8af94f9cc03f"",""InstanceId"":""8f30203236d34f8ea5eeff5428eb5ee7:20""},""SequenceNumber"":0},""SequenceNumber"":147825,""Episode"":1,""Sender"":{""ExecutionId"":""8c0384066f414e1e9f4f8af94f9cc03f"",""InstanceId"":""8f30203236d34f8ea5eeff5428eb5ee7:20""},""SerializableTraceContext"":""{\""$id\"":\""1\"",\""$type\"":\""DurableTask.Core.W3CTraceContext, DurableTask.Core\"",\""TraceParent\"":\""00-8fdaec3721ee2f7e0e43b5be5b22e54f-264d84ac14ac0835-00\"",\""TraceState\"":\""corId=8bb00592-7083-4190-8a20-98703183e8ca\"",\""ParentSpanId\"":\""abcd4f1b232271e8\"",\""StartTime\"":\""2024-03-14T16:00:05.1286681+00:00\"",\""TelemetryType\"":\""Dependency\"",\""OrchestrationTraceContexts\"":[{\""$ref\"":\""1\""},{\""$id\"":\""2\"",\""$type\"":\""DurableTask.Core.W3CTraceContext, DurableTask.Core\"",\""TraceParent\"":\""00-8fdaec3721ee2f7e0e43b5be5b22e54f-abcd4f1b232271e8-00\"",\""TraceState\"":\""corId=8bb00592-7083-4190-8a20-98703183e8ca\"",\""ParentSpanId\"":\""19d9c9a2ac642d79\"",\""StartTime\"":\""2024-03-14T16:00:04.7358608+00:00\"",\""TelemetryType\"":\""Request\"",\""OrchestrationTraceContexts\"":[{\""$ref\"":\""2\""},{\""$id\"":\""3\"",\""$type\"":\""DurableTask.Core.W3CTraceContext, DurableTask.Core\"",\""TraceParent\"":\""00-8fdaec3721ee2f7e0e43b5be5b22e54f-19d9c9a2ac642d79-00\"",\""TraceState\"":\""corId=8bb00592-7083-4190-8a20-98703183e8ca\"",\""ParentSpanId\"":\""586d2cfbe8ad7fb6\"",\""StartTime\"":\""2024-03-14T16:00:03.8475412+00:00\"",\""TelemetryType\"":\""Dependency\"",\""OrchestrationTraceContexts\"":[{\""$ref\"":\""3\""},{\""$id\"":\""4\"",\""$type\"":\""DurableTask.Core.W3CTraceContext, DurableTask.Core\"",\""TraceParent\"":\""00-8fdaec3721ee2f7e0e43b5be5b22e54f-586d2cfbe8ad7fb6-00\"",\""TraceState\"":\""corId=8bb00592-7083-4190-8a20-98703183e8ca\"",\""ParentSpanId\"":\""1402edc2597b9617\"",\""StartTime\"":\""2024-03-14T16:00:01.7224413+00:00\"",\""TelemetryType\"":\""Request\"",\""OrchestrationTraceContexts\"":[{\""$id\"":\""5\"",\""$type\"":\""DurableTask.Core.W3CTraceContext, DurableTask.Core\"",\""TraceParent\"":\""00-8fdaec3721ee2f7e0e43b5be5b22e54f-22a551c6fd5d3acd-00\"",\""TraceState\"":\""corId=8bb00592-7083-4190-8a20-98703183e8ca\"",\""ParentSpanId\"":\""9a882fa89395da4f\"",\""StartTime\"":\""2024-03-14T15:56:00.8004397+00:00\"",\""TelemetryType\"":\""Request\"",\""OrchestrationTraceContexts\"":[{\""$ref\"":\""5\""}],\""OperationName\"":\""DtOrchestrator\""},{\""$id\"":\""6\"",\""$type\"":\""DurableTask.Core.W3CTraceContext, DurableTask.Core\"",\""TraceParent\"":\""00-8fdaec3721ee2f7e0e43b5be5b22e54f-e331e54d39cad3c4-00\"",\""TraceState\"":\""corId=8bb00592-7083-4190-8a20-98703183e8ca\"",\""ParentSpanId\"":\""22a551c6fd5d3acd\"",\""StartTime\"":\""2024-03-14T15:56:01.3294411+00:00\"",\""TelemetryType\"":\""Dependency\"",\""OrchestrationTraceContexts\"":[{\""$ref\"":\""5\""},{\""$ref\"":\""6\""}],\""OperationName\"":\""DtOrchestrator Microsoft.LabServices.ManagedLabs.Application.Schedules.RunScheduleActionOrchestration+Handler\""},{\""$id\"":\""7\"",\""$type\"":\""DurableTask.Core.W3CTraceContext, DurableTask.Core\"",\""TraceParent\"":\""00-8fdaec3721ee2f7e0e43b5be5b22e54f-179a6ba5961635d7-00\"",\""TraceState\"":\""corId=8bb00592-7083-4190-8a20-98703183e8ca\"",\""ParentSpanId\"":\""e331e54d39cad3c4\"",\""StartTime\"":\""2024-03-14T15:56:01.4212406+00:00\"",\""TelemetryType\"":\""Request\"",\""OrchestrationTraceContexts\"":[{\""$ref\"":\""7\""},{\""$ref\"":\""6\""},{\""$ref\"":\""5\""}],\""OperationName\"":\""DtOrchestrator\""},{\""$id\"":\""8\"",\""$type\"":\""DurableTask.Core.W3CTraceContext, DurableTask.Core\"",\""TraceParent\"":\""00-8fdaec3721ee2f7e0e43b5be5b22e54f-1402edc2597b9617-00\"",\""TraceState\"":\""corId=8bb00592-7083-4190-8a20-98703183e8ca\"",\""ParentSpanId\"":\""179a6ba5961635d7\"",\""StartTime\"":\""2024-03-14T16:00:01.3381712+00:00\"",\""TelemetryType\"":\""Dependency\"",\""OrchestrationTraceContexts\"":[{\""$ref\"":\""5\""},{\""$ref\"":\""6\""},{\""$ref\"":\""7\""},{\""$ref\"":\""8\""}],\""OperationName\"":\""DtOrchestrator Microsoft.LabServices.ManagedLabs.Application.Labs.NotifyLabVmTrackersOrchestration+Handler\""},{\""$ref\"":\""4\""}],\""OperationName\"":\""DtOrchestrator\""},{\""$ref\"":\""8\""},{\""$ref\"":\""7\""},{\""$ref\"":\""6\""},{\""$ref\"":\""5\""}],\""OperationName\"":\""DtOrchestrator Microsoft.LabServices.ManagedLabs.Application.VirtualMachines.NotifyVmTrackerOrchestration+Handler\""},{\""$ref\"":\""4\""},{\""$ref\"":\""8\""},{\""$ref\"":\""7\""},{\""$ref\"":\""6\""},{\""$ref\"":\""5\""}],\""OperationName\"":\""DtOrchestrator\""},{\""$ref\"":\""3\""},{\""$ref\"":\""4\""},{\""$ref\"":\""8\""},{\""$ref\"":\""7\""},{\""$ref\"":\""6\""},{\""$ref\"":\""5\""}],\""OperationName\"":\""DtOrchestrator outbound\""}""}"; + + object obj = messageManager.DeserializeMessageData(json); + } + [TestMethod] public void ReadWrite_Null_Succeeds() { From 51978a05fecf2bde6af50463a59bf620caf684e1 Mon Sep 17 00:00:00 2001 From: David Justo Date: Thu, 27 Jun 2024 10:44:43 -0700 Subject: [PATCH 33/39] add imports --- src/DurableTask.AzureStorage/Messaging/ControlQueue.cs | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/src/DurableTask.AzureStorage/Messaging/ControlQueue.cs b/src/DurableTask.AzureStorage/Messaging/ControlQueue.cs index d68efee19..d6fad4eab 100644 --- a/src/DurableTask.AzureStorage/Messaging/ControlQueue.cs +++ b/src/DurableTask.AzureStorage/Messaging/ControlQueue.cs @@ -22,6 +22,10 @@ namespace DurableTask.AzureStorage.Messaging using DurableTask.AzureStorage.Monitoring; using DurableTask.AzureStorage.Partitioning; using DurableTask.AzureStorage.Storage; + using DurableTask.AzureStorage.Tracking; + using DurableTask.Core; + using DurableTask.Core.History; + using Microsoft.WindowsAzure.Storage.Table; class ControlQueue : TaskHubQueue, IDisposable { From 65c29c407ea00c3f617687133e145c3f22b83029 Mon Sep 17 00:00:00 2001 From: David Justo Date: Thu, 27 Jun 2024 11:09:16 -0700 Subject: [PATCH 34/39] pass nullable analysis --- src/DurableTask.AzureStorage/MessageManager.cs | 2 +- src/DurableTask.AzureStorage/Messaging/ControlQueue.cs | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/src/DurableTask.AzureStorage/MessageManager.cs b/src/DurableTask.AzureStorage/MessageManager.cs index 1345f6121..df1e00982 100644 --- a/src/DurableTask.AzureStorage/MessageManager.cs +++ b/src/DurableTask.AzureStorage/MessageManager.cs @@ -50,7 +50,7 @@ public MessageManager( { this.settings = settings; this.azureStorageClient = azureStorageClient; - this.blobContainer = this.azureStorageClient?.GetBlobContainerReference(blobContainerName); + this.blobContainer = this.azureStorageClient.GetBlobContainerReference(blobContainerName); this.taskMessageSerializerSettings = new JsonSerializerSettings { TypeNameHandling = TypeNameHandling.Objects, diff --git a/src/DurableTask.AzureStorage/Messaging/ControlQueue.cs b/src/DurableTask.AzureStorage/Messaging/ControlQueue.cs index d6fad4eab..71119b701 100644 --- a/src/DurableTask.AzureStorage/Messaging/ControlQueue.cs +++ b/src/DurableTask.AzureStorage/Messaging/ControlQueue.cs @@ -207,12 +207,12 @@ async Task ConvertFromAsync(OrchestrationInstanceStatus orch if (this.settings.FetchLargeMessageDataEnabled) { - if (MessageManager.TryGetLargeMessageReference(orchestrationState.Input, out Uri blobUrl)) + if (MessageManager.TryGetLargeMessageReference(orchestrationState.Input, out Uri? blobUrl)) { - string json = await this.messageManager.DownloadAndDecompressAsBytesAsync(blobUrl); + string json = await this.messageManager.DownloadAndDecompressAsBytesAsync(blobUrl!); // Depending on which blob this is, we interpret it differently. - if (blobUrl.AbsolutePath.EndsWith("ExecutionStarted.json.gz")) + if (blobUrl!.AbsolutePath.EndsWith("ExecutionStarted.json.gz")) { // The downloaded content is an ExecutedStarted message payload that // was created when the orchestration was started. From de7e46b1456ee52c500f21dbd1b469a5c3ead59d Mon Sep 17 00:00:00 2001 From: David Justo Date: Thu, 27 Jun 2024 11:22:40 -0700 Subject: [PATCH 35/39] make hotfix always occur --- src/DurableTask.AzureStorage/MessageManager.cs | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/src/DurableTask.AzureStorage/MessageManager.cs b/src/DurableTask.AzureStorage/MessageManager.cs index df1e00982..0caca357c 100644 --- a/src/DurableTask.AzureStorage/MessageManager.cs +++ b/src/DurableTask.AzureStorage/MessageManager.cs @@ -63,12 +63,10 @@ public MessageManager( JsonSerializer newtonSoftSerializer = JsonSerializer.Create(taskMessageSerializerSettings); - if (this.settings.UseDataContractSerialization) // for hotfix to work, set setting to `true` - { - var dataConverter = new DataContractJsonConverter(); - dataConverter.alternativeSerializer = newtonSoftSerializer; - this.taskMessageSerializerSettings.Converters.Add(dataConverter); - } + //hotfix is always allowed + var dataConverter = new DataContractJsonConverter(); + dataConverter.alternativeSerializer = newtonSoftSerializer; + this.taskMessageSerializerSettings.Converters.Add(dataConverter); // We _need_ to create the Json serializer after providing the data converter, // otherwise the converters will be ignored. From a8b24e576b6cd2ec312eb7631818f81a651ca2a9 Mon Sep 17 00:00:00 2001 From: David Justo Date: Thu, 27 Jun 2024 15:08:35 -0700 Subject: [PATCH 36/39] move nullable analysis --- src/DurableTask.AzureStorage/MessageManager.cs | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/src/DurableTask.AzureStorage/MessageManager.cs b/src/DurableTask.AzureStorage/MessageManager.cs index 0caca357c..eba67ee9e 100644 --- a/src/DurableTask.AzureStorage/MessageManager.cs +++ b/src/DurableTask.AzureStorage/MessageManager.cs @@ -10,7 +10,6 @@ // See the License for the specific language governing permissions and // limitations under the License. // ---------------------------------------------------------------------------------- -#nullable enable namespace DurableTask.AzureStorage { using System; @@ -50,7 +49,7 @@ public MessageManager( { this.settings = settings; this.azureStorageClient = azureStorageClient; - this.blobContainer = this.azureStorageClient.GetBlobContainerReference(blobContainerName); + this.blobContainer = this.azureStorageClient?.GetBlobContainerReference(blobContainerName); this.taskMessageSerializerSettings = new JsonSerializerSettings { TypeNameHandling = TypeNameHandling.Objects, @@ -73,7 +72,7 @@ public MessageManager( this.serializer = JsonSerializer.Create(taskMessageSerializerSettings); } - +#nullable enable public async Task EnsureContainerAsync() { bool created = false; From a746b1e7469667ee0157ea17fbe3bceb2d4b3397 Mon Sep 17 00:00:00 2001 From: David Justo Date: Thu, 27 Jun 2024 16:11:12 -0700 Subject: [PATCH 37/39] make hotfix conditional on setting --- src/DurableTask.AzureStorage/MessageManager.cs | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/src/DurableTask.AzureStorage/MessageManager.cs b/src/DurableTask.AzureStorage/MessageManager.cs index eba67ee9e..82b6293ad 100644 --- a/src/DurableTask.AzureStorage/MessageManager.cs +++ b/src/DurableTask.AzureStorage/MessageManager.cs @@ -62,10 +62,12 @@ public MessageManager( JsonSerializer newtonSoftSerializer = JsonSerializer.Create(taskMessageSerializerSettings); - //hotfix is always allowed - var dataConverter = new DataContractJsonConverter(); - dataConverter.alternativeSerializer = newtonSoftSerializer; - this.taskMessageSerializerSettings.Converters.Add(dataConverter); + if (this.settings.UseDataContractSerialization) // for hotfix to work, set setting to `true` + { + var dataConverter = new DataContractJsonConverter(); + dataConverter.alternativeSerializer = newtonSoftSerializer; + this.taskMessageSerializerSettings.Converters.Add(dataConverter); + } // We _need_ to create the Json serializer after providing the data converter, // otherwise the converters will be ignored. From d219ffa13c5470ae5ab5a65ccc68801952add780 Mon Sep 17 00:00:00 2001 From: David Justo Date: Thu, 27 Jun 2024 17:10:01 -0700 Subject: [PATCH 38/39] match diffs --- .../DataContractJsonConverterTests.cs | 1 - 1 file changed, 1 deletion(-) diff --git a/test/DurableTask.AzureStorage.Tests/DataContractJsonConverterTests.cs b/test/DurableTask.AzureStorage.Tests/DataContractJsonConverterTests.cs index 998d9195b..1fab2b512 100644 --- a/test/DurableTask.AzureStorage.Tests/DataContractJsonConverterTests.cs +++ b/test/DurableTask.AzureStorage.Tests/DataContractJsonConverterTests.cs @@ -15,7 +15,6 @@ public class DataContractJsonConverterTests TypeNameHandling = TypeNameHandling.Objects, Converters = { - new DataContractJsonConverter(), new DataContractJsonConverter() { alternativeSerializer = JsonSerializer.Create(new JsonSerializerSettings From b2e1f0c193796611bab7484039020163b49d18a1 Mon Sep 17 00:00:00 2001 From: David Justo Date: Thu, 27 Jun 2024 17:28:26 -0700 Subject: [PATCH 39/39] make hotfix always run --- src/DurableTask.AzureStorage/MessageManager.cs | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/src/DurableTask.AzureStorage/MessageManager.cs b/src/DurableTask.AzureStorage/MessageManager.cs index 82b6293ad..bcbae912f 100644 --- a/src/DurableTask.AzureStorage/MessageManager.cs +++ b/src/DurableTask.AzureStorage/MessageManager.cs @@ -62,12 +62,10 @@ public MessageManager( JsonSerializer newtonSoftSerializer = JsonSerializer.Create(taskMessageSerializerSettings); - if (this.settings.UseDataContractSerialization) // for hotfix to work, set setting to `true` - { - var dataConverter = new DataContractJsonConverter(); - dataConverter.alternativeSerializer = newtonSoftSerializer; - this.taskMessageSerializerSettings.Converters.Add(dataConverter); - } + // make hotfix always present + var dataConverter = new DataContractJsonConverter(); + dataConverter.alternativeSerializer = newtonSoftSerializer; + this.taskMessageSerializerSettings.Converters.Add(dataConverter); // We _need_ to create the Json serializer after providing the data converter, // otherwise the converters will be ignored.