From 2a4382551d5d5dbad449485356fa60fe6ca40445 Mon Sep 17 00:00:00 2001 From: Ted Date: Fri, 19 Dec 2025 11:34:46 +0200 Subject: [PATCH 1/6] Implemented FiringMode.Serial with standard locks. --- README.md | 10 ++ src/Stateless/StateMachine.Async.cs | 62 +++++++++ src/Stateless/StateMachine.cs | 81 ++++++++++- .../SerialModeThreadSafetyFixture.cs | 126 ++++++++++++++++++ 4 files changed, 278 insertions(+), 1 deletion(-) create mode 100644 test/Stateless.Tests/SerialModeThreadSafetyFixture.cs diff --git a/README.md b/README.md index 0913b092..e19393b7 100644 --- a/README.md +++ b/README.md @@ -358,6 +358,16 @@ var stateMachine = new StateMachine(initialState) Setting this is vital within a Microsoft Orleans Grain for example, which requires the `SynchronizationContext` in order to make calls to other Grains. +### Thread safety +By default, Stateless is **NOT** thread-safe. +`FiringMode.Sequential` ensures thread-safety for Fire(), however reading the State Machine's state from multiple threads may still be unsafe and require aditional locks. + +Stateless processes triggers sequentially, and as a result there can only be one thread "driving" the processing at a time. + +In `FiringMode.Sequential`, if the main processing thread throws an error, unprocessed triggers should be removed from the queue in order to ensure consistency. Otherwise the event queue may still hold unprocessed triggers which would require another Fire() call to resume processing. +Set `DropUnprocessedEventsOnErrorInSerialMode` to true if you need consistent behaviour. +Set `DropUnprocessedEventsOnErrorInSerialMode` to false if you don't want triggers to be dropped (default). + ## Building Stateless runs on .NET runtime version 4+ and practically all modern .NET platforms by targeting .NET Framework 4.6.2, .NET Standard 2.0, and .NET 8.0, 9.0 and 10.0. Visual Studio 2017 or later is required to build the solution. diff --git a/src/Stateless/StateMachine.Async.cs b/src/Stateless/StateMachine.Async.cs index 685c751e..ca230cb5 100644 --- a/src/Stateless/StateMachine.Async.cs +++ b/src/Stateless/StateMachine.Async.cs @@ -162,12 +162,74 @@ async Task InternalFireAsync(TTrigger trigger, params object[] args) case FiringMode.Queued: await InternalFireQueuedAsync(trigger, args); break; + case FiringMode.Serial: + await InternalFireSerialAsync(trigger, args); + break; default: // If something is completely messed up we let the user know ;-) throw new InvalidOperationException("The firing mode has not been configured!"); } } + /// + /// Queue events and then fire in order. + /// If only one event is queued, this behaves identically to the non-queued version. + /// + /// The trigger. + /// A variable-length parameters list containing arguments. + async Task InternalFireSerialAsync(TTrigger trigger, params object[] args) { + + lock (_serialModeLock) + { + // Add trigger to queue + _eventQueue.Enqueue(new QueuedTrigger { Trigger = trigger, Args = args }); + + // If a trigger is already being handled then the trigger will be queued (FIFO) and processed later. + if (_firing) + return; + + _firing = true; + } + + try + { + + // Empty queue for triggers + while (true) + { + + QueuedTrigger queuedEvent; + + lock (_serialModeLock) + { + + if (_eventQueue.Count == 0) + { + _firing = false; + break; + } + + queuedEvent = _eventQueue.Dequeue(); + } + + await InternalFireOneAsync(queuedEvent.Trigger, queuedEvent.Args).ConfigureAwait(RetainSynchronizationContext); + } + } + catch + { + + lock (_serialModeLock) + { + if (DropUnprocessedEventsOnErrorInSerialMode) + _eventQueue.Clear(); + + _firing = false; + } + + throw; + } + } + /// /// Queue events and then fire in order. /// If only one event is queued, this behaves identically to the non-queued version. diff --git a/src/Stateless/StateMachine.cs b/src/Stateless/StateMachine.cs index 999297e2..855ba711 100644 --- a/src/Stateless/StateMachine.cs +++ b/src/Stateless/StateMachine.cs @@ -14,7 +14,9 @@ public enum FiringMode /// Use immediate mode when the queuing of trigger events are not needed. Care must be taken when using this mode, as there is no run-to-completion guaranteed. Immediate, /// Use the queued Fireing mode when run-to-completion is required. This is the recommended mode. - Queued + Queued, + /// Equivalent to Queued mode, but thread-safe for Fire. + Serial } /// @@ -40,9 +42,20 @@ private class QueuedTrigger public object[] Args { get; set; } } + private object _serialModeLock = new object(); private readonly Queue _eventQueue = new Queue(); private bool _firing; + /// + /// If the main processing thread throws an error, unprocessed triggers should be removed from + /// the queue in order to ensure consistency. Otherwise the event queue + /// may still hold unprocessed triggers which would require another + /// Fire() call to resume processing. + /// Set this to true if you need consistent behaviour. + /// Set this to false if you don't want triggers to be dropped. + /// + public bool DropUnprocessedEventsOnErrorInSerialMode { get; set; } = false; + /// /// Construct a state machine with external state storage. /// @@ -341,12 +354,78 @@ void InternalFire(TTrigger trigger, params object[] args) case FiringMode.Queued: InternalFireQueued(trigger, args); break; + case FiringMode.Serial: + InternalFireSerial(trigger, args); + break; default: // If something is completely messed up we let the user know ;-) throw new InvalidOperationException("The firing mode has not been configured!"); } } + /// + /// Queue events and then fire in order. + /// If only one event is queued, this behaves identically to the non-queued version. + /// This method is almost equivalent to InternalFireQueued, but it employs simple locks + /// in order to ensure thread-safety. + /// Warning! If processing a trigger throws an unexpected error, unprocessed events will be dropped + /// if DropUnprocessedEventsOnErrorInSerialMode is set to true. + /// + /// The trigger. + /// A variable-length parameters list containing arguments. + private void InternalFireSerial(TTrigger trigger, params object[] args) { + + lock (_serialModeLock) + { + // Add trigger to queue + _eventQueue.Enqueue(new QueuedTrigger { Trigger = trigger, Args = args }); + + // If a trigger is already being handled then the trigger will be queued (FIFO) and processed later. + if (_firing) + return; + + _firing = true; + } + + try + { + + // Empty queue for triggers + while (true) + { + + QueuedTrigger queuedEvent; + + lock (_serialModeLock) + { + + if (_eventQueue.Count == 0) + { + _firing = false; + break; + } + + queuedEvent = _eventQueue.Dequeue(); + } + + InternalFireOne(queuedEvent.Trigger, queuedEvent.Args); + } + } + catch + { + + lock (_serialModeLock) + { + if (DropUnprocessedEventsOnErrorInSerialMode) + _eventQueue.Clear(); + + _firing = false; + } + + throw; + } + } + /// /// Queue events and then fire in order. /// If only one event is queued, this behaves identically to the non-queued version. diff --git a/test/Stateless.Tests/SerialModeThreadSafetyFixture.cs b/test/Stateless.Tests/SerialModeThreadSafetyFixture.cs new file mode 100644 index 00000000..60eafaf9 --- /dev/null +++ b/test/Stateless.Tests/SerialModeThreadSafetyFixture.cs @@ -0,0 +1,126 @@ +using System.Linq; +using System.Threading; +using System.Threading.Tasks; +using Xunit; + +namespace Stateless.Tests { + /// + /// We disable parallelization to ensure other threads are not occupied by other tests. + /// Some tests regarding DropUnprocessedEventsOnErrorInSerialMode need to be added. + /// + [CollectionDefinition("SerialModeThreadSafetyFixture", DisableParallelization = true)] + public class SerialModeThreadSafetyFixture { + + [Fact] + public async Task IncrementIsThreadSafeUnderContentionSync() { + + var stateMachine = new StateMachine(State.A, FiringMode.Serial); + + int counter = 0; + var startGate = new ManualResetEventSlim(false); + + stateMachine.Configure(State.A) + .OnEntry(() => { + counter = counter + 1; + }) + .PermitReentry(Trigger.X); + + var tasks = Enumerable.Range(0, 16).Select(_ => + Task.Run(() => { + startGate.Wait(); + for (int i = 0; i < 10_000; i++) { + stateMachine.Fire(Trigger.X); + } + }) + ).ToArray(); + + startGate.Set(); + await Task.WhenAll(tasks); + + Assert.Equal(160_000, counter); + } + + [Fact] + public async Task IncrementIsThreadSafeUnderContentionAsync() { + + var stateMachine = new StateMachine(State.A, FiringMode.Serial); + + int counter = 0; + var startGate = new ManualResetEventSlim(false); + + stateMachine.Configure(State.A) + .OnEntryAsync(async () => { + await Task.Yield(); + counter = counter + 1; + }) + .PermitReentry(Trigger.X); + + var tasks = Enumerable.Range(0, 16).Select(_ => + Task.Run(async () => { + startGate.Wait(); + for (int i = 0; i < 1_000; i++) { + await stateMachine.FireAsync(Trigger.X); + } + }) + ).ToArray(); + + startGate.Set(); + await Task.WhenAll(tasks); + + Assert.Equal(16_000, counter); + } + + [Fact] + public async Task RecursiveFireDoesNotDeadlockSync() { + + var stateMachine = new StateMachine(State.A, FiringMode.Serial); + + stateMachine.Configure(State.A) + .OnEntry(() => { + stateMachine.Fire(Trigger.Y); + }) + .Permit(Trigger.Y, State.B) + .PermitReentry(Trigger.X); + + stateMachine.Configure(State.B) + .OnEntry(() => { + stateMachine.Fire(Trigger.Y); + }) + .Permit(Trigger.Y, State.C) + .PermitReentry(Trigger.X); + + stateMachine.Configure(State.C); + + stateMachine.Fire(Trigger.X); + + Assert.Equal(State.C, stateMachine.State); + } + + [Fact] + public async Task RecursiveFireDoesNotDeadlockAsync() { + + var stateMachine = new StateMachine(State.A, FiringMode.Serial); + + stateMachine.Configure(State.A) + .OnEntryAsync(async () => { + await stateMachine.FireAsync(Trigger.Y); + }) + .Permit(Trigger.Y, State.B) + .PermitReentry(Trigger.X); + + stateMachine.Configure(State.B) + .OnEntryAsync(async () => { + await stateMachine.FireAsync(Trigger.Y); + }) + .Permit(Trigger.Y, State.C) + .PermitReentry(Trigger.X); + + stateMachine.Configure(State.C); + + await stateMachine.FireAsync(Trigger.X); + + Assert.Equal(State.C, stateMachine.State); + } + + } +} From 49c1192f5dd4052231341ea11c4bca4e89d995bf Mon Sep 17 00:00:00 2001 From: Ted Chirvasiu Date: Fri, 19 Dec 2025 18:59:57 +0200 Subject: [PATCH 2/6] Fixed typo in documentation regarding FiringMode.Serial. --- README.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index e19393b7..f987ffc8 100644 --- a/README.md +++ b/README.md @@ -360,11 +360,11 @@ Setting this is vital within a Microsoft Orleans Grain for example, which requir ### Thread safety By default, Stateless is **NOT** thread-safe. -`FiringMode.Sequential` ensures thread-safety for Fire(), however reading the State Machine's state from multiple threads may still be unsafe and require aditional locks. +`FiringMode.Serial` ensures thread-safety for Fire(), however reading the State Machine's state from multiple threads may still be unsafe and require aditional locks. Stateless processes triggers sequentially, and as a result there can only be one thread "driving" the processing at a time. -In `FiringMode.Sequential`, if the main processing thread throws an error, unprocessed triggers should be removed from the queue in order to ensure consistency. Otherwise the event queue may still hold unprocessed triggers which would require another Fire() call to resume processing. +In `FiringMode.Serial`, if the main processing thread throws an error, unprocessed triggers should be removed from the queue in order to ensure consistency. Otherwise the event queue may still hold unprocessed triggers which would require another Fire() call to resume processing. Set `DropUnprocessedEventsOnErrorInSerialMode` to true if you need consistent behaviour. Set `DropUnprocessedEventsOnErrorInSerialMode` to false if you don't want triggers to be dropped (default). From c336936ecd23865c70c677d53553cb2fb7a3acc9 Mon Sep 17 00:00:00 2001 From: Ted Chirvasiu Date: Sat, 20 Dec 2025 11:06:15 +0200 Subject: [PATCH 3/6] Implemented FiringMode.Serial on a separate worker task. --- src/Stateless/StateMachine.Async.cs | 241 ++++++++++++++---- src/Stateless/StateMachine.cs | 135 +++++----- .../SerialModeThreadSafetyFixture.cs | 23 +- 3 files changed, 267 insertions(+), 132 deletions(-) diff --git a/src/Stateless/StateMachine.Async.cs b/src/Stateless/StateMachine.Async.cs index ca230cb5..ea6a614d 100644 --- a/src/Stateless/StateMachine.Async.cs +++ b/src/Stateless/StateMachine.Async.cs @@ -6,8 +6,20 @@ namespace Stateless { - public partial class StateMachine + public partial class StateMachine : IDisposable { + + private bool _disposing; + + private class QueuedSerialTrigger : QueuedTrigger { + public TaskCompletionSource TaskCompletionSource { get; set; } + } + + private readonly Queue _serialEventQueue = new Queue(); + + private Task _serialEventQueueProcessingTask; + private CancellationTokenSource _serialEventQueueCancellationToken; + /// /// Activates current state in asynchronous fashion. Actions associated with activating the current state /// will be invoked. The activation is idempotent and subsequent activation of the same current state @@ -43,6 +55,16 @@ public async Task> GetPermittedTriggersAsync(params object return await CurrentRepresentation.GetPermittedTriggersAsync(args); } + /// Obtains the curent serial events worker task. + public Task GetSerialEventsWorkerTask() { + return _serialEventQueueProcessingTask ?? Task.CompletedTask; + } + + /// Resumes the execution of the event processing queue if not empty. + public Task FireAsync() { + return InternalFireAsync(); + } + /// /// Transition from the current state via the specified trigger in async fashion. /// The target state is determined by the configuration of the current state. @@ -147,24 +169,58 @@ public Task FireAsync(TriggerWithParameters + /// Fires the trigger and waits for the trigger to be processed. + /// Relevant for FiringMode.Serial. + /// + /// The trigger to fire. + /// A variable-length parameters list containing arguments. + public Task FireAndWaitAsync(TTrigger trigger, params object[] args) { + switch (_firingMode) { + case FiringMode.Immediate: + return InternalFireOneAsync(trigger, args); + case FiringMode.Queued: + return InternalFireQueuedAsync(trigger, args); + case FiringMode.Serial: + return InternalFireSerialAsync(trigger, getTriggerCompletionTask: true, args); + default: + // If something is completely messed up we let the user know ;-) + throw new InvalidOperationException("The firing mode has not been configured!"); + } + } + /// /// Determine how to Fire the trigger /// /// The trigger. /// A variable-length parameters list containing arguments. - async Task InternalFireAsync(TTrigger trigger, params object[] args) + Task InternalFireAsync(TTrigger trigger, params object[] args) { switch (_firingMode) { case FiringMode.Immediate: - await InternalFireOneAsync(trigger, args); - break; + return InternalFireOneAsync(trigger, args); case FiringMode.Queued: - await InternalFireQueuedAsync(trigger, args); - break; + return InternalFireQueuedAsync(trigger, args); case FiringMode.Serial: - await InternalFireSerialAsync(trigger, args); - break; + return InternalFireSerialAsync(trigger, getTriggerCompletionTask: false, args); + default: + // If something is completely messed up we let the user know ;-) + throw new InvalidOperationException("The firing mode has not been configured!"); + } + } + + + /// Resumes the execution of the event processing queue if not empty. + Task InternalFireAsync() { + switch (_firingMode) { + case FiringMode.Immediate: + return Task.CompletedTask; + case FiringMode.Queued: + return InternalFireQueuedAsync(); + case FiringMode.Serial: + return InternalFireSerialAsync(); default: // If something is completely messed up we let the user know ;-) throw new InvalidOperationException("The firing mode has not been configured!"); @@ -172,62 +228,118 @@ async Task InternalFireAsync(TTrigger trigger, params object[] args) } /// - /// Queue events and then fire in order. - /// If only one event is queued, this behaves identically to the non-queued version. + /// Queue events and then fire in order on a separate worker thread. + /// This returns immediately after queueing the trigger. + /// This method is thread-safe. /// /// The trigger. + /// If true, returns the task associated with the processing of the trigger. /// A variable-length parameters list containing arguments. - async Task InternalFireSerialAsync(TTrigger trigger, params object[] args) { + Task InternalFireSerialAsync(TTrigger trigger, bool getTriggerCompletionTask, params object[] args) + { + + if (_disposing) + throw new ObjectDisposedException("State machine"); + + var taskCompletionSource = new TaskCompletionSource(); lock (_serialModeLock) { - // Add trigger to queue - _eventQueue.Enqueue(new QueuedTrigger { Trigger = trigger, Args = args }); + _serialEventQueue.Enqueue(new QueuedSerialTrigger { Trigger = trigger, Args = args, TaskCompletionSource = taskCompletionSource }); - // If a trigger is already being handled then the trigger will be queued (FIFO) and processed later. - if (_firing) - return; + if (_firing) + return getTriggerCompletionTask ? taskCompletionSource.Task : Task.CompletedTask; _firing = true; } - try + _serialEventQueueProcessingTask = InternalFireSerialAsync_RunProcessingQueue(); + + return getTriggerCompletionTask ? taskCompletionSource.Task : Task.CompletedTask; + } + + /// Resumes the execution if the event queue is not empty + Task InternalFireSerialAsync() + { + lock (_serialModeLock) { + if (_firing || _serialEventQueue.Count == 0) + return Task.CompletedTask; + + _firing = true; + } + + _serialEventQueueProcessingTask = InternalFireSerialAsync_RunProcessingQueue(); + + return Task.CompletedTask; + } - // Empty queue for triggers - while (true) + /// + /// Starts a serial event processing thread. + /// Only call if you aquired "_firing = true" inside a lock. + /// + Task InternalFireSerialAsync_RunProcessingQueue() + { + + _serialEventQueueCancellationToken = new CancellationTokenSource(); + + return Task.Run( + async () => { - QueuedTrigger queuedEvent; + QueuedSerialTrigger queuedEvent = null; + Task currentTask = null; - lock (_serialModeLock) + try { + if (_disposing) + throw new ObjectDisposedException("State machine"); - if (_eventQueue.Count == 0) + while (true) { - _firing = false; - break; - } - queuedEvent = _eventQueue.Dequeue(); - } + lock (_serialModeLock) + { - await InternalFireOneAsync(queuedEvent.Trigger, queuedEvent.Args).ConfigureAwait(RetainSynchronizationContext); - } - } - catch - { + if (_serialEventQueue.Count == 0) + { + _firing = false; + break; + } - lock (_serialModeLock) - { - if (DropUnprocessedEventsOnErrorInSerialMode) - _eventQueue.Clear(); + queuedEvent = _serialEventQueue.Dequeue(); + } - _firing = false; - } + currentTask = InternalFireOneAsync(queuedEvent.Trigger, queuedEvent.Args); - throw; - } + await currentTask; + + queuedEvent.TaskCompletionSource.SetResult(true); + + _serialEventQueueCancellationToken.Token.ThrowIfCancellationRequested(); + } + } + catch + { + + lock (_serialModeLock) + { + _firing = false; + } + + if (currentTask?.IsCanceled == true) + { + queuedEvent?.TaskCompletionSource.SetCanceled(); + } + else if (currentTask?.IsFaulted == true) + { + queuedEvent?.TaskCompletionSource.SetException(currentTask.Exception); + } + + throw; + } + } + ); } /// @@ -236,28 +348,29 @@ async Task InternalFireSerialAsync(TTrigger trigger, params object[] args) { /// /// The trigger. /// A variable-length parameters list containing arguments. - async Task InternalFireQueuedAsync(TTrigger trigger, params object[] args) + Task InternalFireQueuedAsync(TTrigger trigger, params object[] args) { - if (_firing) - { - _eventQueue.Enqueue(new QueuedTrigger { Trigger = trigger, Args = args }); + + _eventQueue.Enqueue(new QueuedTrigger { Trigger = trigger, Args = args }); + + return InternalFireQueuedAsync(); + } + + /// Processes the event queue + async Task InternalFireQueuedAsync() { + + if (_firing) { return; } - try - { + try { _firing = true; - await InternalFireOneAsync(trigger, args).ConfigureAwait(RetainSynchronizationContext); - - while (_eventQueue.Count != 0) - { + while (_eventQueue.Count != 0) { var queuedEvent = _eventQueue.Dequeue(); await InternalFireOneAsync(queuedEvent.Trigger, queuedEvent.Args).ConfigureAwait(RetainSynchronizationContext); } - } - finally - { + } finally { _firing = false; } } @@ -535,5 +648,27 @@ public void OnTransitionCompletedAsyncUnregister(Func onTransi if (onTransitionAction == null) throw new ArgumentNullException(nameof(onTransitionAction)); _onTransitionCompletedEvent.Unregister(onTransitionAction); } + + /// + /// Dispose the state machine + /// + public void Dispose() + { + _disposing = true; + + if (_firingMode == FiringMode.Serial) + { + _serialEventQueueCancellationToken?.Cancel(); + + if (!_serialEventQueueProcessingTask?.IsCompleted == true) + { + try { + _serialEventQueueProcessingTask.Wait(); + } catch { + //Since we are disposing, ignore any errors + } + } + } + } } } diff --git a/src/Stateless/StateMachine.cs b/src/Stateless/StateMachine.cs index 855ba711..3da3856f 100644 --- a/src/Stateless/StateMachine.cs +++ b/src/Stateless/StateMachine.cs @@ -46,16 +46,6 @@ private class QueuedTrigger private readonly Queue _eventQueue = new Queue(); private bool _firing; - /// - /// If the main processing thread throws an error, unprocessed triggers should be removed from - /// the queue in order to ensure consistency. Otherwise the event queue - /// may still hold unprocessed triggers which would require another - /// Fire() call to resume processing. - /// Set this to true if you need consistent behaviour. - /// Set this to false if you don't want triggers to be dropped. - /// - public bool DropUnprocessedEventsOnErrorInSerialMode { get; set; } = false; - /// /// Construct a state machine with external state storage. /// @@ -216,6 +206,11 @@ public StateConfiguration Configure(TState state) return new StateConfiguration(this, GetRepresentation(state), GetRepresentation); } + /// Resumes the execution of the event processing queue if not empty. + public void Fire() { + InternalFire(); + } + /// /// Transition from the current state via the specified trigger. /// The target state is determined by the configuration of the current state. @@ -363,67 +358,39 @@ void InternalFire(TTrigger trigger, params object[] args) } } + /// Resumes the execution of the event processing queue if not empty. + void InternalFire() { + switch (_firingMode) { + case FiringMode.Immediate: + break; + case FiringMode.Queued: + InternalFireQueued(); + break; + case FiringMode.Serial: + InternalFireSerial(); + break; + default: + // If something is completely messed up we let the user know ;-) + throw new InvalidOperationException("The firing mode has not been configured!"); + } + } + /// - /// Queue events and then fire in order. - /// If only one event is queued, this behaves identically to the non-queued version. - /// This method is almost equivalent to InternalFireQueued, but it employs simple locks - /// in order to ensure thread-safety. - /// Warning! If processing a trigger throws an unexpected error, unprocessed events will be dropped - /// if DropUnprocessedEventsOnErrorInSerialMode is set to true. + /// Queue events and then fire in order on a separate worker thread. + /// This returns immediately after queueing the trigger. + /// This method is thread-safe. /// /// The trigger. /// A variable-length parameters list containing arguments. private void InternalFireSerial(TTrigger trigger, params object[] args) { + // Since the processing happens on a separate task, we re-use the async method + _ = InternalFireSerialAsync(trigger, getTriggerCompletionTask: false, args); + } - lock (_serialModeLock) - { - // Add trigger to queue - _eventQueue.Enqueue(new QueuedTrigger { Trigger = trigger, Args = args }); - - // If a trigger is already being handled then the trigger will be queued (FIFO) and processed later. - if (_firing) - return; - - _firing = true; - } - - try - { - - // Empty queue for triggers - while (true) - { - - QueuedTrigger queuedEvent; - - lock (_serialModeLock) - { - - if (_eventQueue.Count == 0) - { - _firing = false; - break; - } - - queuedEvent = _eventQueue.Dequeue(); - } - - InternalFireOne(queuedEvent.Trigger, queuedEvent.Args); - } - } - catch - { - - lock (_serialModeLock) - { - if (DropUnprocessedEventsOnErrorInSerialMode) - _eventQueue.Clear(); - - _firing = false; - } - - throw; - } + /// Resumes the execution if the event queue is not empty + private void InternalFireSerial() { + // Since the processing happens on a separate task, we re-use the async method + _ = InternalFireSerialAsync(); } /// @@ -437,26 +404,48 @@ private void InternalFireQueued(TTrigger trigger, params object[] args) // Add trigger to queue _eventQueue.Enqueue(new QueuedTrigger { Trigger = trigger, Args = args }); + InternalFireQueued(); + } + + /// Processes the event queue + private void InternalFireQueued() { + // If a trigger is already being handled then the trigger will be queued (FIFO) and processed later. - if (_firing) - { + if (_firing) { return; } - try - { + try { _firing = true; // Empty queue for triggers - while (_eventQueue.Any()) - { + while (_eventQueue.Any()) { var queuedEvent = _eventQueue.Dequeue(); InternalFireOne(queuedEvent.Trigger, queuedEvent.Args); } + } finally { + _firing = false; + } + } + + /// + /// Cancel unprocessed triggers that are still waiting in the queue. + /// Applies only to FiringMode.Queued and FiringMode.Serial. + /// + public void CancelPendingTriggers() { + if (_firingMode == FiringMode.Queued) + { + _eventQueue.Clear(); } - finally + else if (_firingMode == FiringMode.Serial) { - _firing = false; + lock (_serialModeLock) + { + while (_serialEventQueue.Count > 0) { + var queuedEvent = _serialEventQueue.Dequeue(); + queuedEvent.TaskCompletionSource.SetCanceled(); + } + } } } diff --git a/test/Stateless.Tests/SerialModeThreadSafetyFixture.cs b/test/Stateless.Tests/SerialModeThreadSafetyFixture.cs index 60eafaf9..bf264f26 100644 --- a/test/Stateless.Tests/SerialModeThreadSafetyFixture.cs +++ b/test/Stateless.Tests/SerialModeThreadSafetyFixture.cs @@ -11,7 +11,7 @@ namespace Stateless.Tests { [CollectionDefinition("SerialModeThreadSafetyFixture", DisableParallelization = true)] public class SerialModeThreadSafetyFixture { - [Fact] + [Fact(Timeout = 60 * 1000)] public async Task IncrementIsThreadSafeUnderContentionSync() { var stateMachine = new StateMachine(State.A, FiringMode.Serial); @@ -37,10 +37,13 @@ public async Task IncrementIsThreadSafeUnderContentionSync() { startGate.Set(); await Task.WhenAll(tasks); + // Wait for the tasks to be done processing + await stateMachine.GetSerialEventsWorkerTask(); + Assert.Equal(160_000, counter); } - [Fact] + [Fact(Timeout = 60 * 1000)] public async Task IncrementIsThreadSafeUnderContentionAsync() { var stateMachine = new StateMachine(State.A, FiringMode.Serial); @@ -50,7 +53,6 @@ public async Task IncrementIsThreadSafeUnderContentionAsync() { stateMachine.Configure(State.A) .OnEntryAsync(async () => { - await Task.Yield(); counter = counter + 1; }) .PermitReentry(Trigger.X); @@ -59,7 +61,7 @@ public async Task IncrementIsThreadSafeUnderContentionAsync() { Task.Run(async () => { startGate.Wait(); for (int i = 0; i < 1_000; i++) { - await stateMachine.FireAsync(Trigger.X); + var forget = stateMachine.FireAsync(Trigger.X); } }) ).ToArray(); @@ -67,10 +69,13 @@ public async Task IncrementIsThreadSafeUnderContentionAsync() { startGate.Set(); await Task.WhenAll(tasks); + // Wait for the tasks to be done processing + await stateMachine.GetSerialEventsWorkerTask(); + Assert.Equal(16_000, counter); } - [Fact] + [Fact(Timeout = 60 * 1000)] public async Task RecursiveFireDoesNotDeadlockSync() { var stateMachine = new StateMachine(State.A, FiringMode.Serial); @@ -93,10 +98,13 @@ public async Task RecursiveFireDoesNotDeadlockSync() { stateMachine.Fire(Trigger.X); + // Wait for the tasks to be done processing + await stateMachine.GetSerialEventsWorkerTask(); + Assert.Equal(State.C, stateMachine.State); } - [Fact] + [Fact(Timeout = 60*1000)] public async Task RecursiveFireDoesNotDeadlockAsync() { var stateMachine = new StateMachine(State.A, FiringMode.Serial); @@ -119,6 +127,9 @@ public async Task RecursiveFireDoesNotDeadlockAsync() { await stateMachine.FireAsync(Trigger.X); + // Wait for the tasks to be done processing + await stateMachine.GetSerialEventsWorkerTask(); + Assert.Equal(State.C, stateMachine.State); } From 3034bb28690c52c5dbba7ed5c7cf2553da275d31 Mon Sep 17 00:00:00 2001 From: Ted Chirvasiu Date: Sat, 20 Dec 2025 11:26:51 +0200 Subject: [PATCH 4/6] Updated docs for FiringMode.Serial. --- README.md | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) diff --git a/README.md b/README.md index f987ffc8..6460d1a7 100644 --- a/README.md +++ b/README.md @@ -358,15 +358,19 @@ var stateMachine = new StateMachine(initialState) Setting this is vital within a Microsoft Orleans Grain for example, which requires the `SynchronizationContext` in order to make calls to other Grains. -### Thread safety +### FiringMode.Serial By default, Stateless is **NOT** thread-safe. -`FiringMode.Serial` ensures thread-safety for Fire(), however reading the State Machine's state from multiple threads may still be unsafe and require aditional locks. +`FiringMode.Serial` ensures thread-safety for Fire()/FireAsync(), however reading the State Machine's state from multiple threads may still be unsafe and require aditional locks. -Stateless processes triggers sequentially, and as a result there can only be one thread "driving" the processing at a time. +In `FiringMode.Serial` triggers are sequentially ran on a separate worker task. So all calls to Fire / FireAsync return as soon as the trigger is enqueued. -In `FiringMode.Serial`, if the main processing thread throws an error, unprocessed triggers should be removed from the queue in order to ensure consistency. Otherwise the event queue may still hold unprocessed triggers which would require another Fire() call to resume processing. -Set `DropUnprocessedEventsOnErrorInSerialMode` to true if you need consistent behaviour. -Set `DropUnprocessedEventsOnErrorInSerialMode` to false if you don't want triggers to be dropped (default). +If you need to wait for the trigger to be ran or catch its specific exceptions, use `await FireAndWaitAsync(trigger)`. Warning, if you call and await `FireAndWaitAsync` inside a state transition event, you risk deadlocking (because the current transition now waits for a future transition to be completed). + +If you need to await the triggers processing task or catch exceptions, you can use `await GetSerialEventsWorkerTask()`. + +**Important**: If an unexpected exception is thrown durring the processing of triggers, the faulting trigger is dequeued and the worker task halts (possibly locking tasks waiting via `await FireAndWaitAsync(trigger)`). +If you need to resume execution, you can call the parameterless Fire method. +If you need to cancel all the unexecuted triggers still pending, you can call `CancelPendingTriggers()`. ## Building From 5da9bc514991e73a9fea0584bda8eba96d93cbed Mon Sep 17 00:00:00 2001 From: Ted Chirvasiu Date: Sat, 20 Dec 2025 11:30:25 +0200 Subject: [PATCH 5/6] Improved tests for SerialModeThreadSafetyFixture. --- test/Stateless.Tests/SerialModeThreadSafetyFixture.cs | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/test/Stateless.Tests/SerialModeThreadSafetyFixture.cs b/test/Stateless.Tests/SerialModeThreadSafetyFixture.cs index bf264f26..908c4ded 100644 --- a/test/Stateless.Tests/SerialModeThreadSafetyFixture.cs +++ b/test/Stateless.Tests/SerialModeThreadSafetyFixture.cs @@ -41,6 +41,8 @@ public async Task IncrementIsThreadSafeUnderContentionSync() { await stateMachine.GetSerialEventsWorkerTask(); Assert.Equal(160_000, counter); + + stateMachine.Dispose(); } [Fact(Timeout = 60 * 1000)] @@ -61,7 +63,7 @@ public async Task IncrementIsThreadSafeUnderContentionAsync() { Task.Run(async () => { startGate.Wait(); for (int i = 0; i < 1_000; i++) { - var forget = stateMachine.FireAsync(Trigger.X); + await stateMachine.FireAsync(Trigger.X); } }) ).ToArray(); @@ -73,6 +75,8 @@ public async Task IncrementIsThreadSafeUnderContentionAsync() { await stateMachine.GetSerialEventsWorkerTask(); Assert.Equal(16_000, counter); + + stateMachine.Dispose(); } [Fact(Timeout = 60 * 1000)] @@ -102,6 +106,8 @@ public async Task RecursiveFireDoesNotDeadlockSync() { await stateMachine.GetSerialEventsWorkerTask(); Assert.Equal(State.C, stateMachine.State); + + stateMachine.Dispose(); } [Fact(Timeout = 60*1000)] @@ -131,6 +137,8 @@ public async Task RecursiveFireDoesNotDeadlockAsync() { await stateMachine.GetSerialEventsWorkerTask(); Assert.Equal(State.C, stateMachine.State); + + stateMachine.Dispose(); } } From 240c151e05aef89456afdcfc8386f4b44b4ec6d5 Mon Sep 17 00:00:00 2001 From: Ted Chirvasiu Date: Sat, 20 Dec 2025 23:31:41 +0200 Subject: [PATCH 6/6] Added await Task.Yield() in IncrementIsThreadSafeUnderContentionAsync in SerialModeThreadSafetyFixture. --- test/Stateless.Tests/SerialModeThreadSafetyFixture.cs | 1 + 1 file changed, 1 insertion(+) diff --git a/test/Stateless.Tests/SerialModeThreadSafetyFixture.cs b/test/Stateless.Tests/SerialModeThreadSafetyFixture.cs index 908c4ded..1b07346d 100644 --- a/test/Stateless.Tests/SerialModeThreadSafetyFixture.cs +++ b/test/Stateless.Tests/SerialModeThreadSafetyFixture.cs @@ -55,6 +55,7 @@ public async Task IncrementIsThreadSafeUnderContentionAsync() { stateMachine.Configure(State.A) .OnEntryAsync(async () => { + await Task.Yield(); counter = counter + 1; }) .PermitReentry(Trigger.X);