From fe436732d6c5f98da8a4e260b89d4814d7f73f92 Mon Sep 17 00:00:00 2001 From: Sophia Tevosyan Date: Fri, 11 Jul 2025 13:27:50 -0700 Subject: [PATCH 01/17] first commit --- src/DurableTask.Core/TaskEntityDispatcher.cs | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/src/DurableTask.Core/TaskEntityDispatcher.cs b/src/DurableTask.Core/TaskEntityDispatcher.cs index a91ae97e2..68b8ef62e 100644 --- a/src/DurableTask.Core/TaskEntityDispatcher.cs +++ b/src/DurableTask.Core/TaskEntityDispatcher.cs @@ -119,7 +119,7 @@ async Task OnProcessWorkItemSessionAsync(TaskOrchestrationWorkItem workItem) if (workItem.Session == null) { // Legacy behavior - await this.OnProcessWorkItemAsync(workItem); + await this.OnProcessWorkItemAsync(workItem, true); return; } @@ -133,7 +133,8 @@ async Task OnProcessWorkItemSessionAsync(TaskOrchestrationWorkItem workItem) // While the work item contains messages that need to be processed, execute them. if (workItem.NewMessages?.Count > 0) { - bool isCompletedOrInterrupted = await this.OnProcessWorkItemAsync(workItem); + workItem.IsExtendedSession = true; + bool isCompletedOrInterrupted = await this.OnProcessWorkItemAsync(workItem, processCount == 0); if (isCompletedOrInterrupted) { break; @@ -196,7 +197,8 @@ internal class WorkItemEffects /// Method to process a new work item /// /// The work item to process - protected async Task OnProcessWorkItemAsync(TaskOrchestrationWorkItem workItem) + /// + protected async Task OnProcessWorkItemAsync(TaskOrchestrationWorkItem workItem, bool firstExecution) { OrchestrationRuntimeState originalOrchestrationRuntimeState = workItem.OrchestrationRuntimeState; @@ -245,7 +247,7 @@ protected async Task OnProcessWorkItemAsync(TaskOrchestrationWorkItem work if (workToDoNow.OperationCount > 0) { // execute the user-defined operations on this entity, via the middleware - var result = await this.ExecuteViaMiddlewareAsync(workToDoNow, runtimeState.OrchestrationInstance, schedulerState.EntityState); + var result = await this.ExecuteViaMiddlewareAsync(workToDoNow, runtimeState.OrchestrationInstance, schedulerState.EntityState, workItem.IsExtendedSession, firstExecution); var operationResults = result.Results!; // if we encountered an error, record it as the result of the operations @@ -919,7 +921,7 @@ internal void ProcessSendStartMessage(WorkItemEffects effects, OrchestrationRunt #endregion - async Task ExecuteViaMiddlewareAsync(Work workToDoNow, OrchestrationInstance instance, string serializedEntityState) + async Task ExecuteViaMiddlewareAsync(Work workToDoNow, OrchestrationInstance instance, string serializedEntityState, bool isExtendedSession, bool includeEntityState) { var (operations, traceActivities) = workToDoNow.GetOperationRequestsAndTraceActivities(instance.InstanceId); // the request object that will be passed to the worker @@ -942,6 +944,8 @@ async Task ExecuteViaMiddlewareAsync(Work workToDoNow, Orches var dispatchContext = new DispatchMiddlewareContext(); dispatchContext.SetProperty(request); + dispatchContext.SetProperty("extendedSession", isExtendedSession); + dispatchContext.SetProperty("includeEntityState", includeEntityState); await this.dispatchPipeline.RunAsync(dispatchContext, async _ => { From c21ab15ab99829bd04a7b071040e873315243e1f Mon Sep 17 00:00:00 2001 From: Sophia Tevosyan Date: Wed, 30 Jul 2025 12:33:07 -0700 Subject: [PATCH 02/17] some small updates --- src/DurableTask.Core/TaskEntityDispatcher.cs | 17 ++++++++++++++--- 1 file changed, 14 insertions(+), 3 deletions(-) diff --git a/src/DurableTask.Core/TaskEntityDispatcher.cs b/src/DurableTask.Core/TaskEntityDispatcher.cs index 68b8ef62e..c28ef98dd 100644 --- a/src/DurableTask.Core/TaskEntityDispatcher.cs +++ b/src/DurableTask.Core/TaskEntityDispatcher.cs @@ -33,6 +33,17 @@ namespace DurableTask.Core /// public class TaskEntityDispatcher { + /// + /// Whether or not the execution of the work item is within an extended session. + /// + public const string IsExtendedSession = "extendedSession"; + + /// + /// Whether or not to include the entity state when executing the work item via middleware. + /// This assumes that the middleware is able to handle extended sessions and does not require the entity state to execute the batch request. + /// + public const string IncludeEntityState = "includeEntityState"; + readonly INameVersionObjectManager objectManager; readonly IOrchestrationService orchestrationService; readonly IEntityOrchestrationService entityOrchestrationService; @@ -197,7 +208,7 @@ internal class WorkItemEffects /// Method to process a new work item /// /// The work item to process - /// + /// Whether or not this is the first execution of an extended session (if this item is being processed within an extended session). protected async Task OnProcessWorkItemAsync(TaskOrchestrationWorkItem workItem, bool firstExecution) { OrchestrationRuntimeState originalOrchestrationRuntimeState = workItem.OrchestrationRuntimeState; @@ -944,8 +955,8 @@ async Task ExecuteViaMiddlewareAsync(Work workToDoNow, Orches var dispatchContext = new DispatchMiddlewareContext(); dispatchContext.SetProperty(request); - dispatchContext.SetProperty("extendedSession", isExtendedSession); - dispatchContext.SetProperty("includeEntityState", includeEntityState); + dispatchContext.SetProperty(IsExtendedSession, isExtendedSession); + dispatchContext.SetProperty(IncludeEntityState, includeEntityState); await this.dispatchPipeline.RunAsync(dispatchContext, async _ => { From 850aaa112041b5b7196f9b0c745116c888a358c4 Mon Sep 17 00:00:00 2001 From: Sophia Tevosyan Date: Thu, 13 Nov 2025 23:35:33 -0800 Subject: [PATCH 03/17] refactoring --- src/DurableTask.Core/TaskEntityDispatcher.cs | 38 +++++++------------ .../TaskOrchestrationDispatcher.cs | 4 +- src/DurableTask.Core/WorkItemMetadata.cs | 7 ++-- 3 files changed, 20 insertions(+), 29 deletions(-) diff --git a/src/DurableTask.Core/TaskEntityDispatcher.cs b/src/DurableTask.Core/TaskEntityDispatcher.cs index a04e0dfc3..5493bd3ca 100644 --- a/src/DurableTask.Core/TaskEntityDispatcher.cs +++ b/src/DurableTask.Core/TaskEntityDispatcher.cs @@ -33,17 +33,6 @@ namespace DurableTask.Core /// public class TaskEntityDispatcher { - /// - /// Whether or not the execution of the work item is within an extended session. - /// - public const string IsExtendedSession = "extendedSession"; - - /// - /// Whether or not to include the entity state when executing the work item via middleware. - /// This assumes that the middleware is able to handle extended sessions and does not require the entity state to execute the batch request. - /// - public const string IncludeEntityState = "includeEntityState"; - readonly INameVersionObjectManager objectManager; readonly IOrchestrationService orchestrationService; readonly IEntityOrchestrationService entityOrchestrationService; @@ -146,8 +135,7 @@ async Task OnProcessWorkItemSessionAsync(TaskOrchestrationWorkItem workItem) return; } - var isExtendedSession = false; - + var concurrencyLockAcquired = false; var processCount = 0; try { @@ -156,7 +144,14 @@ async Task OnProcessWorkItemSessionAsync(TaskOrchestrationWorkItem workItem) // While the work item contains messages that need to be processed, execute them. if (workItem.NewMessages?.Count > 0) { - workItem.IsExtendedSession = true; + // We only need to acquire the lock on the first execution within the extended session + if (!concurrencyLockAcquired) + { + concurrencyLockAcquired = this.concurrentSessionLock.Acquire(); + } + workItem.IsExtendedSession = concurrencyLockAcquired; + // Regardless of whether or not we acquired the concurrent session lock, we will make sure to execute this work item. + // If we failed to acquire it, we will end the extended session after this execution. bool isCompletedOrInterrupted = await this.OnProcessWorkItemAsync(workItem, processCount == 0); if (isCompletedOrInterrupted) { @@ -166,14 +161,10 @@ async Task OnProcessWorkItemSessionAsync(TaskOrchestrationWorkItem workItem) processCount++; } - // Fetches beyond the first require getting an extended session lock, used to prevent starvation. - if (processCount > 0 && !isExtendedSession) + // If we failed to acquire the concurrent session lock, we will end the extended session after the execution of the first work item + if (processCount > 0 && !concurrencyLockAcquired) { - isExtendedSession = this.concurrentSessionLock.Acquire(); - if (!isExtendedSession) - { - break; - } + break; } Stopwatch timer = Stopwatch.StartNew(); @@ -191,7 +182,7 @@ async Task OnProcessWorkItemSessionAsync(TaskOrchestrationWorkItem workItem) } finally { - if (isExtendedSession) + if (concurrencyLockAcquired) { this.concurrentSessionLock.Release(); } @@ -967,8 +958,7 @@ async Task ExecuteViaMiddlewareAsync(Work workToDoNow, Orches var dispatchContext = new DispatchMiddlewareContext(); dispatchContext.SetProperty(request); - dispatchContext.SetProperty(IsExtendedSession, isExtendedSession); - dispatchContext.SetProperty(IncludeEntityState, includeEntityState); + dispatchContext.SetProperty(new WorkItemMetadata { IsExtendedSession = isExtendedSession, IncludeState = includeEntityState }); await this.dispatchPipeline.RunAsync(dispatchContext, async _ => { diff --git a/src/DurableTask.Core/TaskOrchestrationDispatcher.cs b/src/DurableTask.Core/TaskOrchestrationDispatcher.cs index b81cbae50..6cab483ba 100644 --- a/src/DurableTask.Core/TaskOrchestrationDispatcher.cs +++ b/src/DurableTask.Core/TaskOrchestrationDispatcher.cs @@ -780,7 +780,7 @@ async Task ExecuteOrchestrationAsync(Orchestration dispatchContext.SetProperty(workItem); dispatchContext.SetProperty(GetOrchestrationExecutionContext(runtimeState)); dispatchContext.SetProperty(this.entityParameters); - dispatchContext.SetProperty(new WorkItemMetadata { IsExtendedSession = workItem.IsExtendedSession, IncludePastEvents = true }); + dispatchContext.SetProperty(new WorkItemMetadata { IsExtendedSession = workItem.IsExtendedSession, IncludeState = true }); TaskOrchestrationExecutor? executor = null; @@ -833,7 +833,7 @@ async Task ResumeOrchestrationAsync(TaskOrchestrationWorkItem workItem) dispatchContext.SetProperty(cursor.TaskOrchestration); dispatchContext.SetProperty(cursor.RuntimeState); dispatchContext.SetProperty(workItem); - dispatchContext.SetProperty(new WorkItemMetadata { IsExtendedSession = true, IncludePastEvents = false }); + dispatchContext.SetProperty(new WorkItemMetadata { IsExtendedSession = true, IncludeState = false }); cursor.LatestDecisions = Enumerable.Empty(); await this.dispatchPipeline.RunAsync(dispatchContext, _ => diff --git a/src/DurableTask.Core/WorkItemMetadata.cs b/src/DurableTask.Core/WorkItemMetadata.cs index ae3de4651..df0438b40 100644 --- a/src/DurableTask.Core/WorkItemMetadata.cs +++ b/src/DurableTask.Core/WorkItemMetadata.cs @@ -11,9 +11,10 @@ public class WorkItemMetadata public bool IsExtendedSession { get; set; } /// - /// Gets or sets whether or not to include past events in the orchestration history when executing the work item via middleware. - /// This assumes that the middleware is able to handle extended sessions and does not require history for replays. + /// Gets or sets whether or not to include instance state when executing the work item via middleware. + /// This assumes that the middleware is able to handle extended sessions and does not require the instance + /// state to fulfill the request. /// - public bool IncludePastEvents { get; set; } + public bool IncludeState { get; set; } } } From 9bf8aa8becbff5ebb2f79fcbd95b4ee025809c52 Mon Sep 17 00:00:00 2001 From: Sophia Tevosyan Date: Fri, 14 Nov 2025 19:56:46 -0800 Subject: [PATCH 04/17] added the extended sessions logic to the TaskEntityDispatcher --- src/DurableTask.Core/TaskEntityDispatcher.cs | 38 +++++++++++++------- 1 file changed, 26 insertions(+), 12 deletions(-) diff --git a/src/DurableTask.Core/TaskEntityDispatcher.cs b/src/DurableTask.Core/TaskEntityDispatcher.cs index 5493bd3ca..603f9e90c 100644 --- a/src/DurableTask.Core/TaskEntityDispatcher.cs +++ b/src/DurableTask.Core/TaskEntityDispatcher.cs @@ -131,12 +131,13 @@ async Task OnProcessWorkItemSessionAsync(TaskOrchestrationWorkItem workItem) if (workItem.Session == null) { // Legacy behavior - await this.OnProcessWorkItemAsync(workItem, true); + await this.OnProcessWorkItemAsync(workItem, null); return; } var concurrencyLockAcquired = false; var processCount = 0; + SchedulerState schedulerState = null; try { while (true) @@ -152,12 +153,18 @@ async Task OnProcessWorkItemSessionAsync(TaskOrchestrationWorkItem workItem) workItem.IsExtendedSession = concurrencyLockAcquired; // Regardless of whether or not we acquired the concurrent session lock, we will make sure to execute this work item. // If we failed to acquire it, we will end the extended session after this execution. - bool isCompletedOrInterrupted = await this.OnProcessWorkItemAsync(workItem, processCount == 0); - if (isCompletedOrInterrupted) + schedulerState = await this.OnProcessWorkItemAsync(workItem, schedulerState); + + // The entity has been deleted, so we end the extended session. + if (this.EntityIsDeleted(schedulerState)) { break; } + // We do this to avoid keeping the entity state in memory between executions within the extended session. + // The middleware will keep the entity state cached, so it does not need it to fulfill requests within the extended session. + schedulerState.EntityState = null; + processCount++; } @@ -211,8 +218,8 @@ internal class WorkItemEffects /// Method to process a new work item /// /// The work item to process - /// Whether or not this is the first execution of an extended session (if this item is being processed within an extended session). - protected async Task OnProcessWorkItemAsync(TaskOrchestrationWorkItem workItem, bool firstExecution) + /// + private async Task OnProcessWorkItemAsync(TaskOrchestrationWorkItem workItem, SchedulerState schedulerState) { OrchestrationRuntimeState originalOrchestrationRuntimeState = workItem.OrchestrationRuntimeState; @@ -249,13 +256,14 @@ protected async Task OnProcessWorkItemAsync(TaskOrchestrationWorkItem work } else { + bool firstExecution = schedulerState == null; // we start with processing all the requests and figuring out which ones to execute now // results can depend on whether the entity is locked, what the maximum batch size is, // and whether the messages arrived out of order this.DetermineWork(workItem.OrchestrationRuntimeState, - out SchedulerState schedulerState, + schedulerState, out Work workToDoNow); if (workToDoNow.OperationCount > 0) @@ -421,7 +429,7 @@ await this.orchestrationService.CompleteTaskOrchestrationWorkItemAsync( workItem.OrchestrationRuntimeState = runtimeState; } - return true; + return schedulerState; } void ProcessLockRequest(WorkItemEffects effects, SchedulerState schedulerState, RequestMessage request) @@ -449,7 +457,7 @@ void ProcessLockRequest(WorkItemEffects effects, SchedulerState schedulerState, string SerializeSchedulerStateForNextExecution(SchedulerState schedulerState) { - if (this.entityBackendProperties.SupportsImplicitEntityDeletion && schedulerState.IsEmpty && !schedulerState.Suspended) + if (this.EntityIsDeleted(schedulerState)) { // this entity scheduler is idle and the entity is deleted, so the instance and history can be removed from storage // we convey this to the durability provider by issuing a continue-as-new with null input @@ -464,10 +472,9 @@ string SerializeSchedulerStateForNextExecution(SchedulerState schedulerState) #region Preprocess to determine work - void DetermineWork(OrchestrationRuntimeState runtimeState, out SchedulerState schedulerState, out Work batch) + void DetermineWork(OrchestrationRuntimeState runtimeState, SchedulerState schedulerState, out Work batch) { string instanceId = runtimeState.OrchestrationInstance.InstanceId; - schedulerState = new SchedulerState(); batch = new Work(); Queue lockHolderMessages = null; @@ -478,11 +485,13 @@ void DetermineWork(OrchestrationRuntimeState runtimeState, out SchedulerState sc { case EventType.ExecutionStarted: - - if (runtimeState.Input != null) + // Only attempt to deserialize the scheduler state if we don't already have it in memory (which can be true + // for extended sessions) + if (runtimeState.Input != null && schedulerState == null) { try { + schedulerState = new SchedulerState(); // restore the scheduler state from the input JsonConvert.PopulateObject(runtimeState.Input, schedulerState, Serializer.InternalSerializerSettings); } @@ -628,6 +637,11 @@ void DetermineWork(OrchestrationRuntimeState runtimeState, out SchedulerState sc } } + bool EntityIsDeleted(SchedulerState schedulerState) + { + return this.entityBackendProperties.SupportsImplicitEntityDeletion && schedulerState.IsEmpty && !schedulerState.Suspended; + } + class Work { List operationBatch; // a (possibly empty) sequence of operations to be executed on the entity From 5a53c890a0981effc4a3468be03cfc85851a24aa Mon Sep 17 00:00:00 2001 From: Sophia Tevosyan Date: Fri, 14 Nov 2025 22:50:18 -0800 Subject: [PATCH 05/17] fixing a bug --- src/DurableTask.Core/TaskEntityDispatcher.cs | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/DurableTask.Core/TaskEntityDispatcher.cs b/src/DurableTask.Core/TaskEntityDispatcher.cs index 603f9e90c..0877a39a5 100644 --- a/src/DurableTask.Core/TaskEntityDispatcher.cs +++ b/src/DurableTask.Core/TaskEntityDispatcher.cs @@ -262,7 +262,7 @@ private async Task OnProcessWorkItemAsync(TaskOrchestrationWorkI // results can depend on whether the entity is locked, what the maximum batch size is, // and whether the messages arrived out of order - this.DetermineWork(workItem.OrchestrationRuntimeState, + schedulerState = this.DetermineWork(workItem.OrchestrationRuntimeState, schedulerState, out Work workToDoNow); @@ -472,7 +472,7 @@ string SerializeSchedulerStateForNextExecution(SchedulerState schedulerState) #region Preprocess to determine work - void DetermineWork(OrchestrationRuntimeState runtimeState, SchedulerState schedulerState, out Work batch) + SchedulerState DetermineWork(OrchestrationRuntimeState runtimeState, SchedulerState schedulerState, out Work batch) { string instanceId = runtimeState.OrchestrationInstance.InstanceId; batch = new Work(); @@ -635,6 +635,7 @@ void DetermineWork(OrchestrationRuntimeState runtimeState, SchedulerState schedu } } } + return schedulerState; } bool EntityIsDeleted(SchedulerState schedulerState) From 9dfea22cdbeb11369cb1331337aac4e9b0bc7edd Mon Sep 17 00:00:00 2001 From: Sophia Tevosyan Date: Mon, 17 Nov 2025 13:01:33 -0800 Subject: [PATCH 06/17] fixing another bug where i wasnt creating the scheduler state if it was null --- src/DurableTask.Core/TaskEntityDispatcher.cs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/DurableTask.Core/TaskEntityDispatcher.cs b/src/DurableTask.Core/TaskEntityDispatcher.cs index 0877a39a5..628913da0 100644 --- a/src/DurableTask.Core/TaskEntityDispatcher.cs +++ b/src/DurableTask.Core/TaskEntityDispatcher.cs @@ -475,6 +475,8 @@ string SerializeSchedulerStateForNextExecution(SchedulerState schedulerState) SchedulerState DetermineWork(OrchestrationRuntimeState runtimeState, SchedulerState schedulerState, out Work batch) { string instanceId = runtimeState.OrchestrationInstance.InstanceId; + bool deserializeState = schedulerState == null; + schedulerState ??= new(); batch = new Work(); Queue lockHolderMessages = null; @@ -487,7 +489,7 @@ SchedulerState DetermineWork(OrchestrationRuntimeState runtimeState, SchedulerSt // Only attempt to deserialize the scheduler state if we don't already have it in memory (which can be true // for extended sessions) - if (runtimeState.Input != null && schedulerState == null) + if (runtimeState.Input != null && deserializeState) { try { From 699ff9ca00a581f05f36d57f260151a7f06df60c Mon Sep 17 00:00:00 2001 From: Sophia Tevosyan Date: Mon, 17 Nov 2025 14:39:13 -0800 Subject: [PATCH 07/17] added a param comment --- src/DurableTask.Core/TaskEntityDispatcher.cs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/DurableTask.Core/TaskEntityDispatcher.cs b/src/DurableTask.Core/TaskEntityDispatcher.cs index 628913da0..1486c7927 100644 --- a/src/DurableTask.Core/TaskEntityDispatcher.cs +++ b/src/DurableTask.Core/TaskEntityDispatcher.cs @@ -218,7 +218,8 @@ internal class WorkItemEffects /// Method to process a new work item /// /// The work item to process - /// + /// If extended sessions are enabled, the scheduler state that is being cached across executions. + /// If they are not enabled, or if this is the first execution from within an extended session, this parameter is null. private async Task OnProcessWorkItemAsync(TaskOrchestrationWorkItem workItem, SchedulerState schedulerState) { OrchestrationRuntimeState originalOrchestrationRuntimeState = workItem.OrchestrationRuntimeState; From 402e3de45a17a6de634a63fce7e1b01ca0e9f7f6 Mon Sep 17 00:00:00 2001 From: Sophia Tevosyan Date: Tue, 25 Nov 2025 17:02:20 -0800 Subject: [PATCH 08/17] addressing PR comments --- src/DurableTask.Core/TaskEntityDispatcher.cs | 1 - 1 file changed, 1 deletion(-) diff --git a/src/DurableTask.Core/TaskEntityDispatcher.cs b/src/DurableTask.Core/TaskEntityDispatcher.cs index 1486c7927..cef9007d5 100644 --- a/src/DurableTask.Core/TaskEntityDispatcher.cs +++ b/src/DurableTask.Core/TaskEntityDispatcher.cs @@ -494,7 +494,6 @@ SchedulerState DetermineWork(OrchestrationRuntimeState runtimeState, SchedulerSt { try { - schedulerState = new SchedulerState(); // restore the scheduler state from the input JsonConvert.PopulateObject(runtimeState.Input, schedulerState, Serializer.InternalSerializerSettings); } From 81d06bc625ccc5da24aa059613f87b4f180dba6a Mon Sep 17 00:00:00 2001 From: sophiatev <38052607+sophiatev@users.noreply.github.com> Date: Mon, 15 Dec 2025 11:48:11 -0800 Subject: [PATCH 09/17] Update src/DurableTask.Core/WorkItemMetadata.cs Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- src/DurableTask.Core/WorkItemMetadata.cs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/DurableTask.Core/WorkItemMetadata.cs b/src/DurableTask.Core/WorkItemMetadata.cs index df0438b40..da47d0d55 100644 --- a/src/DurableTask.Core/WorkItemMetadata.cs +++ b/src/DurableTask.Core/WorkItemMetadata.cs @@ -12,8 +12,8 @@ public class WorkItemMetadata /// /// Gets or sets whether or not to include instance state when executing the work item via middleware. - /// This assumes that the middleware is able to handle extended sessions and does not require the instance - /// state to fulfill the request. + /// When false, this assumes that the middleware is able to handle extended sessions and has already cached + /// the instance state from a previous execution, so it does not need to be included again. /// public bool IncludeState { get; set; } } From 0ffeccdacc639966345471c14f4032ad2c53978c Mon Sep 17 00:00:00 2001 From: Sophia Tevosyan Date: Mon, 15 Dec 2025 11:55:26 -0800 Subject: [PATCH 10/17] addressing copilot PR comment --- src/DurableTask.Core/TaskEntityDispatcher.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/DurableTask.Core/TaskEntityDispatcher.cs b/src/DurableTask.Core/TaskEntityDispatcher.cs index cef9007d5..6dbb04be6 100644 --- a/src/DurableTask.Core/TaskEntityDispatcher.cs +++ b/src/DurableTask.Core/TaskEntityDispatcher.cs @@ -642,7 +642,7 @@ SchedulerState DetermineWork(OrchestrationRuntimeState runtimeState, SchedulerSt bool EntityIsDeleted(SchedulerState schedulerState) { - return this.entityBackendProperties.SupportsImplicitEntityDeletion && schedulerState.IsEmpty && !schedulerState.Suspended; + return schedulerState != null && this.entityBackendProperties.SupportsImplicitEntityDeletion && schedulerState.IsEmpty && !schedulerState.Suspended; } class Work From 3ec7f8f98e34ab5934854e5f1bf5f26e328c34db Mon Sep 17 00:00:00 2001 From: sophiatev <38052607+sophiatev@users.noreply.github.com> Date: Wed, 17 Dec 2025 14:46:03 -0800 Subject: [PATCH 11/17] Update src/DurableTask.Core/TaskEntityDispatcher.cs Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- src/DurableTask.Core/TaskEntityDispatcher.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/DurableTask.Core/TaskEntityDispatcher.cs b/src/DurableTask.Core/TaskEntityDispatcher.cs index 6dbb04be6..29535f0ff 100644 --- a/src/DurableTask.Core/TaskEntityDispatcher.cs +++ b/src/DurableTask.Core/TaskEntityDispatcher.cs @@ -162,7 +162,7 @@ async Task OnProcessWorkItemSessionAsync(TaskOrchestrationWorkItem workItem) } // We do this to avoid keeping the entity state in memory between executions within the extended session. - // The middleware will keep the entity state cached, so it does not need it to fulfill requests within the extended session. + // The middleware will keep the entity state cached, so the scheduler state does not need to retain the entity state to fulfill requests within the extended session. schedulerState.EntityState = null; processCount++; From 02ec8ea1cd19af30b5b518726305e55916f93b98 Mon Sep 17 00:00:00 2001 From: Sophia Tevosyan Date: Wed, 17 Dec 2025 15:32:28 -0800 Subject: [PATCH 12/17] variable rename --- src/DurableTask.Core/TaskEntityDispatcher.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/DurableTask.Core/TaskEntityDispatcher.cs b/src/DurableTask.Core/TaskEntityDispatcher.cs index 29535f0ff..fe6d44692 100644 --- a/src/DurableTask.Core/TaskEntityDispatcher.cs +++ b/src/DurableTask.Core/TaskEntityDispatcher.cs @@ -257,7 +257,7 @@ private async Task OnProcessWorkItemAsync(TaskOrchestrationWorkI } else { - bool firstExecution = schedulerState == null; + bool firstExecutionIfExtendedSession = schedulerState == null; // we start with processing all the requests and figuring out which ones to execute now // results can depend on whether the entity is locked, what the maximum batch size is, From 6dcef7e1e4da4faae63c68075e636c0a973da626 Mon Sep 17 00:00:00 2001 From: Sophia Tevosyan Date: Thu, 18 Dec 2025 11:07:50 -0800 Subject: [PATCH 13/17] fixing the compile error --- src/DurableTask.Core/TaskEntityDispatcher.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/DurableTask.Core/TaskEntityDispatcher.cs b/src/DurableTask.Core/TaskEntityDispatcher.cs index fe6d44692..2e8fd5f2c 100644 --- a/src/DurableTask.Core/TaskEntityDispatcher.cs +++ b/src/DurableTask.Core/TaskEntityDispatcher.cs @@ -270,7 +270,7 @@ private async Task OnProcessWorkItemAsync(TaskOrchestrationWorkI if (workToDoNow.OperationCount > 0) { // execute the user-defined operations on this entity, via the middleware - var result = await this.ExecuteViaMiddlewareAsync(workToDoNow, runtimeState.OrchestrationInstance, schedulerState.EntityState, workItem.IsExtendedSession, firstExecution); + var result = await this.ExecuteViaMiddlewareAsync(workToDoNow, runtimeState.OrchestrationInstance, schedulerState.EntityState, workItem.IsExtendedSession, firstExecutionIfExtendedSession); var operationResults = result.Results!; // if we encountered an error, record it as the result of the operations From 0bce0b1f42593776fcdf7c5e500e42bc6267a844 Mon Sep 17 00:00:00 2001 From: Sophia Tevosyan Date: Thu, 18 Dec 2025 12:15:58 -0800 Subject: [PATCH 14/17] updated scheduler state parameter to a ref --- src/DurableTask.Core/TaskEntityDispatcher.cs | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/src/DurableTask.Core/TaskEntityDispatcher.cs b/src/DurableTask.Core/TaskEntityDispatcher.cs index 2e8fd5f2c..f53f9bf6a 100644 --- a/src/DurableTask.Core/TaskEntityDispatcher.cs +++ b/src/DurableTask.Core/TaskEntityDispatcher.cs @@ -263,8 +263,8 @@ private async Task OnProcessWorkItemAsync(TaskOrchestrationWorkI // results can depend on whether the entity is locked, what the maximum batch size is, // and whether the messages arrived out of order - schedulerState = this.DetermineWork(workItem.OrchestrationRuntimeState, - schedulerState, + this.DetermineWork(workItem.OrchestrationRuntimeState, + ref schedulerState, out Work workToDoNow); if (workToDoNow.OperationCount > 0) @@ -473,7 +473,7 @@ string SerializeSchedulerStateForNextExecution(SchedulerState schedulerState) #region Preprocess to determine work - SchedulerState DetermineWork(OrchestrationRuntimeState runtimeState, SchedulerState schedulerState, out Work batch) + void DetermineWork(OrchestrationRuntimeState runtimeState, ref SchedulerState schedulerState, out Work batch) { string instanceId = runtimeState.OrchestrationInstance.InstanceId; bool deserializeState = schedulerState == null; @@ -637,7 +637,6 @@ SchedulerState DetermineWork(OrchestrationRuntimeState runtimeState, SchedulerSt } } } - return schedulerState; } bool EntityIsDeleted(SchedulerState schedulerState) From 3fcca42e229b472d402f81bb071da9551bde2e74 Mon Sep 17 00:00:00 2001 From: Sophia Tevosyan Date: Thu, 18 Dec 2025 12:16:45 -0800 Subject: [PATCH 15/17] removed a space --- src/DurableTask.Core/TaskEntityDispatcher.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/DurableTask.Core/TaskEntityDispatcher.cs b/src/DurableTask.Core/TaskEntityDispatcher.cs index f53f9bf6a..7b2809d38 100644 --- a/src/DurableTask.Core/TaskEntityDispatcher.cs +++ b/src/DurableTask.Core/TaskEntityDispatcher.cs @@ -263,7 +263,7 @@ private async Task OnProcessWorkItemAsync(TaskOrchestrationWorkI // results can depend on whether the entity is locked, what the maximum batch size is, // and whether the messages arrived out of order - this.DetermineWork(workItem.OrchestrationRuntimeState, + this.DetermineWork(workItem.OrchestrationRuntimeState, ref schedulerState, out Work workToDoNow); From bb217e95fee9cca21e41f25a001e3e907c4ff9a6 Mon Sep 17 00:00:00 2001 From: sophiatev <38052607+sophiatev@users.noreply.github.com> Date: Thu, 18 Dec 2025 12:23:42 -0800 Subject: [PATCH 16/17] Update src/DurableTask.Core/TaskEntityDispatcher.cs Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- src/DurableTask.Core/TaskEntityDispatcher.cs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/DurableTask.Core/TaskEntityDispatcher.cs b/src/DurableTask.Core/TaskEntityDispatcher.cs index 7b2809d38..3dc891d5e 100644 --- a/src/DurableTask.Core/TaskEntityDispatcher.cs +++ b/src/DurableTask.Core/TaskEntityDispatcher.cs @@ -488,8 +488,8 @@ void DetermineWork(OrchestrationRuntimeState runtimeState, ref SchedulerState sc { case EventType.ExecutionStarted: - // Only attempt to deserialize the scheduler state if we don't already have it in memory (which can be true - // for extended sessions) + // Only attempt to deserialize the scheduler state if we don't already have it in memory. + // This occurs on the first execution within an extended session, or when extended sessions are disabled. if (runtimeState.Input != null && deserializeState) { try From cc03f086a5d596b6cbd523a410dc769727b8a3cd Mon Sep 17 00:00:00 2001 From: Sophia Tevosyan Date: Thu, 18 Dec 2025 14:28:15 -0800 Subject: [PATCH 17/17] comment update --- src/DurableTask.Core/TaskEntityDispatcher.cs | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/DurableTask.Core/TaskEntityDispatcher.cs b/src/DurableTask.Core/TaskEntityDispatcher.cs index 3dc891d5e..72e31428a 100644 --- a/src/DurableTask.Core/TaskEntityDispatcher.cs +++ b/src/DurableTask.Core/TaskEntityDispatcher.cs @@ -161,8 +161,9 @@ async Task OnProcessWorkItemSessionAsync(TaskOrchestrationWorkItem workItem) break; } - // We do this to avoid keeping the entity state in memory between executions within the extended session. - // The middleware will keep the entity state cached, so the scheduler state does not need to retain the entity state to fulfill requests within the extended session. + // When extended sessions are enabled, the handler caches the entity state after the first execution of the extended session, so there + // is no need to retain a reference to it here. + // We set the local reference to null so that the entity state can be garbage collected while we wait for more messages to arrive. schedulerState.EntityState = null; processCount++;