Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -358,6 +358,20 @@ var stateMachine = new StateMachine<State, Trigger>(initialState)

Setting this is vital within a Microsoft Orleans Grain for example, which requires the `SynchronizationContext` in order to make calls to other Grains.

### FiringMode.Serial
By default, Stateless is **NOT** thread-safe.
`FiringMode.Serial` ensures thread-safety for Fire()/FireAsync(), however reading the State Machine's state from multiple threads may still be unsafe and require aditional locks.

In `FiringMode.Serial` triggers are sequentially ran on a separate worker task. So all calls to Fire / FireAsync return as soon as the trigger is enqueued.

If you need to wait for the trigger to be ran or catch its specific exceptions, use `await FireAndWaitAsync(trigger)`. Warning, if you call and await `FireAndWaitAsync` inside a state transition event, you risk deadlocking (because the current transition now waits for a future transition to be completed).

If you need to await the triggers processing task or catch exceptions, you can use `await GetSerialEventsWorkerTask()`.

**Important**: If an unexpected exception is thrown durring the processing of triggers, the faulting trigger is dequeued and the worker task halts (possibly locking tasks waiting via `await FireAndWaitAsync(trigger)`).
If you need to resume execution, you can call the parameterless Fire method.
If you need to cancel all the unexecuted triggers still pending, you can call `CancelPendingTriggers()`.

## Building

Stateless runs on .NET runtime version 4+ and practically all modern .NET platforms by targeting .NET Framework 4.6.2, .NET Standard 2.0, and .NET 8.0, 9.0 and 10.0. Visual Studio 2017 or later is required to build the solution.
Expand Down
233 changes: 215 additions & 18 deletions src/Stateless/StateMachine.Async.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,20 @@

namespace Stateless
{
public partial class StateMachine<TState, TTrigger>
public partial class StateMachine<TState, TTrigger> : IDisposable
{

private bool _disposing;

private class QueuedSerialTrigger : QueuedTrigger {
public TaskCompletionSource<bool> TaskCompletionSource { get; set; }
}

private readonly Queue<QueuedSerialTrigger> _serialEventQueue = new Queue<QueuedSerialTrigger>();

private Task _serialEventQueueProcessingTask;
private CancellationTokenSource _serialEventQueueCancellationToken;

/// <summary>
/// Activates current state in asynchronous fashion. Actions associated with activating the current state
/// will be invoked. The activation is idempotent and subsequent activation of the same current state
Expand Down Expand Up @@ -43,6 +55,16 @@ public async Task<IEnumerable<TTrigger>> GetPermittedTriggersAsync(params object
return await CurrentRepresentation.GetPermittedTriggersAsync(args);
}

/// <summary> Obtains the curent serial events worker task. </summary>
public Task GetSerialEventsWorkerTask() {
return _serialEventQueueProcessingTask ?? Task.CompletedTask;
}

/// <summary> Resumes the execution of the event processing queue if not empty. </summary>
public Task FireAsync() {
return InternalFireAsync();
}

/// <summary>
/// Transition from the current state via the specified trigger in async fashion.
/// The target state is determined by the configuration of the current state.
Expand Down Expand Up @@ -147,55 +169,208 @@ public Task FireAsync<TArg0, TArg1, TArg2>(TriggerWithParameters<TArg0, TArg1, T
return InternalFireAsync(trigger.Trigger, arg0, arg1, arg2);
}


/// <summary>
/// Fires the trigger and waits for the trigger to be processed.
/// Relevant for FiringMode.Serial.
/// </summary>
/// <param name="trigger">The trigger to fire.</param>
/// <param name="args">A variable-length parameters list containing arguments. </param>
public Task FireAndWaitAsync(TTrigger trigger, params object[] args) {
switch (_firingMode) {
case FiringMode.Immediate:
return InternalFireOneAsync(trigger, args);
case FiringMode.Queued:
return InternalFireQueuedAsync(trigger, args);
case FiringMode.Serial:
return InternalFireSerialAsync(trigger, getTriggerCompletionTask: true, args);
default:
// If something is completely messed up we let the user know ;-)
throw new InvalidOperationException("The firing mode has not been configured!");
}
}

/// <summary>
/// Determine how to Fire the trigger
/// </summary>
/// <param name="trigger">The trigger. </param>
/// <param name="args">A variable-length parameters list containing arguments. </param>
async Task InternalFireAsync(TTrigger trigger, params object[] args)
Task InternalFireAsync(TTrigger trigger, params object[] args)
{
switch (_firingMode)
{
case FiringMode.Immediate:
await InternalFireOneAsync(trigger, args);
break;
return InternalFireOneAsync(trigger, args);
case FiringMode.Queued:
await InternalFireQueuedAsync(trigger, args);
break;
return InternalFireQueuedAsync(trigger, args);
case FiringMode.Serial:
return InternalFireSerialAsync(trigger, getTriggerCompletionTask: false, args);
default:
// If something is completely messed up we let the user know ;-)
throw new InvalidOperationException("The firing mode has not been configured!");
}
}


/// <summary> Resumes the execution of the event processing queue if not empty. </summary>
Task InternalFireAsync() {
switch (_firingMode) {
case FiringMode.Immediate:
return Task.CompletedTask;
case FiringMode.Queued:
return InternalFireQueuedAsync();
case FiringMode.Serial:
return InternalFireSerialAsync();
default:
// If something is completely messed up we let the user know ;-)
throw new InvalidOperationException("The firing mode has not been configured!");
}
}

/// <summary>
/// Queue events and then fire in order.
/// If only one event is queued, this behaves identically to the non-queued version.
/// Queue events and then fire in order on a separate worker thread.
/// This returns immediately after queueing the trigger.
/// This method is thread-safe.
/// </summary>
/// <param name="trigger"> The trigger. </param>
/// <param name="getTriggerCompletionTask"> If true, returns the task associated with the processing of the trigger. </param>
/// <param name="args"> A variable-length parameters list containing arguments. </param>
async Task InternalFireQueuedAsync(TTrigger trigger, params object[] args)
Task InternalFireSerialAsync(TTrigger trigger, bool getTriggerCompletionTask, params object[] args)
{
if (_firing)

if (_disposing)
throw new ObjectDisposedException("State machine");

var taskCompletionSource = new TaskCompletionSource<bool>();

lock (_serialModeLock)
{
_eventQueue.Enqueue(new QueuedTrigger { Trigger = trigger, Args = args });
return;
_serialEventQueue.Enqueue(new QueuedSerialTrigger { Trigger = trigger, Args = args, TaskCompletionSource = taskCompletionSource });

if (_firing)
return getTriggerCompletionTask ? taskCompletionSource.Task : Task.CompletedTask;

_firing = true;
}

try
_serialEventQueueProcessingTask = InternalFireSerialAsync_RunProcessingQueue();

return getTriggerCompletionTask ? taskCompletionSource.Task : Task.CompletedTask;
}

/// <summary> Resumes the execution if the event queue is not empty </summary>
Task InternalFireSerialAsync()
{
lock (_serialModeLock)
{
if (_firing || _serialEventQueue.Count == 0)
return Task.CompletedTask;

_firing = true;
}

_serialEventQueueProcessingTask = InternalFireSerialAsync_RunProcessingQueue();

return Task.CompletedTask;
}

await InternalFireOneAsync(trigger, args).ConfigureAwait(RetainSynchronizationContext);
/// <summary>
/// Starts a serial event processing thread.
/// Only call if you aquired "_firing = true" inside a lock.
/// </summary>
Task InternalFireSerialAsync_RunProcessingQueue()
{

_serialEventQueueCancellationToken = new CancellationTokenSource();

while (_eventQueue.Count != 0)
return Task.Run(
async () =>
{

QueuedSerialTrigger queuedEvent = null;
Task currentTask = null;

try
{
if (_disposing)
throw new ObjectDisposedException("State machine");

while (true)
{

lock (_serialModeLock)
{

if (_serialEventQueue.Count == 0)
{
_firing = false;
break;
}

queuedEvent = _serialEventQueue.Dequeue();
}

currentTask = InternalFireOneAsync(queuedEvent.Trigger, queuedEvent.Args);

await currentTask;

queuedEvent.TaskCompletionSource.SetResult(true);

_serialEventQueueCancellationToken.Token.ThrowIfCancellationRequested();
}
}
catch
{

lock (_serialModeLock)
{
_firing = false;
}

if (currentTask?.IsCanceled == true)
{
queuedEvent?.TaskCompletionSource.SetCanceled();
}
else if (currentTask?.IsFaulted == true)
{
queuedEvent?.TaskCompletionSource.SetException(currentTask.Exception);
}

throw;
}
}
);
}

/// <summary>
/// Queue events and then fire in order.
/// If only one event is queued, this behaves identically to the non-queued version.
/// </summary>
/// <param name="trigger"> The trigger. </param>
/// <param name="args"> A variable-length parameters list containing arguments. </param>
Task InternalFireQueuedAsync(TTrigger trigger, params object[] args)
{

_eventQueue.Enqueue(new QueuedTrigger { Trigger = trigger, Args = args });

return InternalFireQueuedAsync();
}

/// <summary> Processes the event queue </summary>
async Task InternalFireQueuedAsync() {

if (_firing) {
return;
}

try {
_firing = true;

while (_eventQueue.Count != 0) {
var queuedEvent = _eventQueue.Dequeue();
await InternalFireOneAsync(queuedEvent.Trigger, queuedEvent.Args).ConfigureAwait(RetainSynchronizationContext);
}
}
finally
{
} finally {
_firing = false;
}
}
Expand Down Expand Up @@ -473,5 +648,27 @@ public void OnTransitionCompletedAsyncUnregister(Func<Transition, Task> onTransi
if (onTransitionAction == null) throw new ArgumentNullException(nameof(onTransitionAction));
_onTransitionCompletedEvent.Unregister(onTransitionAction);
}

/// <summary>
/// Dispose the state machine
/// </summary>
public void Dispose()
{
_disposing = true;

if (_firingMode == FiringMode.Serial)
{
_serialEventQueueCancellationToken?.Cancel();

if (!_serialEventQueueProcessingTask?.IsCompleted == true)
{
try {
_serialEventQueueProcessingTask.Wait();
} catch {
//Since we are disposing, ignore any errors
}
}
}
}
}
}
Loading