Skip to content

Commit b41bc0d

Browse files
committed
failure in sub workflow will terminate the parent workflow
1 parent de83dc5 commit b41bc0d

File tree

8 files changed

+89
-13
lines changed

8 files changed

+89
-13
lines changed
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
namespace WorkflowCore.Models.LifeCycleEvents
2+
{
3+
public class SubWorkflowLifeCycleEvent : LifeCycleEvent
4+
{
5+
6+
}
7+
}

src/WorkflowCore/Primitives/SubWorkflowStepBody.cs

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,6 @@ public SubWorkflowStepBody(IScopeProvider scopeProvider)
1818

1919
public override ExecutionResult Run(IStepExecutionContext context)
2020
{
21-
var eventKey = context.ExecutionPointer.EventKey;
22-
2321
var scope = _scopeProvider.CreateScope(context);
2422
var workflowController = scope.ServiceProvider.GetRequiredService<IWorkflowController>();
2523
var logger = scope.ServiceProvider.GetRequiredService<ILoggerFactory>().CreateLogger(
@@ -32,19 +30,24 @@ public override ExecutionResult Run(IStepExecutionContext context)
3230
logger.LogDebug("Started sub workflow {Name} with id='{SubId}' from workflow {WorkflowDefinitionId} ({Id})",
3331
SubWorkflowId, result, context.Workflow.WorkflowDefinitionId, context.Workflow.Id);
3432

35-
logger.LogDebug("Workflow {Name} ({SubId}) is waiting for event WorkflowCompleted with key='{EventKey}'",
33+
logger.LogDebug("Workflow {Name} ({SubId}) is waiting for event SubWorkflowLifeCycleEvent with key='{EventKey}'",
3634
SubWorkflowId, result, result);
3735

3836
var effectiveDate = DateTime.MinValue;
39-
return ExecutionResult.WaitForEvent(nameof(WorkflowCompleted), result, effectiveDate);
37+
return ExecutionResult.WaitForEvent(nameof(SubWorkflowLifeCycleEvent), result, effectiveDate);
4038
}
4139

4240
logger.LogDebug("Sub workflow {Name} ({SubId}) completed", SubWorkflowId,
4341
context.ExecutionPointer.EventKey);
4442

4543
var persistenceProvider = scope.ServiceProvider.GetRequiredService<IPersistenceProvider>();
46-
47-
Result = persistenceProvider.GetWorkflowInstance(context.ExecutionPointer.EventKey).Result.Data;
44+
var workflowInstance = persistenceProvider.GetWorkflowInstance(context.ExecutionPointer.EventKey).Result;
45+
if (workflowInstance.Status == WorkflowStatus.Terminated)
46+
{
47+
throw new NotImplementedException(workflowInstance.Status.ToString());
48+
}
49+
50+
Result = workflowInstance.Data;
4851
return ExecutionResult.Next();
4952
}
5053

src/WorkflowCore/Services/WorkflowExecutor.cs

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -227,6 +227,12 @@ private async Task DetermineNextExecutionTime(WorkflowInstance workflow, Workflo
227227
return;
228228
}
229229

230+
if (workflow.Status == WorkflowStatus.Terminated)
231+
{
232+
await OnTerminated(workflow, def);
233+
return;
234+
}
235+
230236
foreach (var pointer in workflow.ExecutionPointers.Where(x => x.Active && (x.Children ?? new List<string>()).Count == 0))
231237
{
232238
if (!pointer.SleepUntil.HasValue)
@@ -284,5 +290,28 @@ private async Task OnComplete(WorkflowInstance workflow, WorkflowDefinition def)
284290
Version = workflow.Version
285291
});
286292
}
293+
294+
private async Task OnTerminated(WorkflowInstance workflow, WorkflowDefinition def)
295+
{
296+
workflow.Status = WorkflowStatus.Terminated;
297+
workflow.CompleteTime = _datetimeProvider.UtcNow;
298+
299+
using (var scope = _serviceProvider.CreateScope())
300+
{
301+
var middlewareRunner = scope.ServiceProvider.GetRequiredService<IWorkflowMiddlewareRunner>();
302+
await middlewareRunner.RunPostMiddleware(workflow, def);
303+
}
304+
305+
_logger.LogDebug("Workflow {WorkflowDefinitionId} ({Id}) terminated", workflow.WorkflowDefinitionId, workflow.Id);
306+
307+
_publisher.PublishNotification(new WorkflowTerminated
308+
{
309+
EventTimeUtc = _datetimeProvider.UtcNow,
310+
Reference = workflow.Reference,
311+
WorkflowInstanceId = workflow.Id,
312+
WorkflowDefinitionId = workflow.WorkflowDefinitionId,
313+
Version = workflow.Version
314+
});
315+
}
287316
}
288317
}

src/WorkflowCore/Services/WorkflowHost.cs

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -165,10 +165,13 @@ public Task<bool> TerminateWorkflow(string workflowId)
165165

166166
public void HandleLifeCycleEvent(LifeCycleEvent evt)
167167
{
168-
if (evt is WorkflowCompleted completed)
168+
switch (evt)
169169
{
170-
_workflowController.PublishEvent(nameof(WorkflowCompleted), completed.WorkflowInstanceId,
171-
completed.Reference);
170+
// publish the event as sub workflow lifecycle event
171+
case WorkflowCompleted _:
172+
case WorkflowTerminated _:
173+
_workflowController.PublishEvent(nameof(SubWorkflowLifeCycleEvent), evt.WorkflowInstanceId, evt.Reference);
174+
break;
172175
}
173176

174177
OnLifeCycleEvent?.Invoke(evt);

src/providers/WorkflowCore.Persistence.EntityFramework/Services/EntityFrameworkPersistenceProvider.cs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,10 @@ public async Task<WorkflowInstance> GetWorkflowInstance(string Id, CancellationT
100100
{
101101
using (var db = ConstructDbContext())
102102
{
103+
if (!Guid.TryParse(Id, out _))
104+
{
105+
106+
}
103107
var uid = new Guid(Id);
104108
var raw = await db.Set<PersistedWorkflow>()
105109
.Include(wf => wf.ExecutionPointers)

test/WorkflowCore.IntegrationTests/Scenarios/ApprovalScenario.cs renamed to test/WorkflowCore.IntegrationTests/Scenarios/SubWorkflowScenario.cs

Lines changed: 32 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99

1010
namespace WorkflowCore.IntegrationTests.Scenarios;
1111

12-
public class ApprovalScenario : WorkflowTest<ApprovalScenario.ParentWorkflow, ApprovalScenario.ApprovalInput>
12+
public class SubWorkflowScenario : WorkflowTest<SubWorkflowScenario.ParentWorkflow, SubWorkflowScenario.ApprovalInput>
1313
{
1414
public class ApprovalInput
1515
{
@@ -28,6 +28,7 @@ public class ParentWorkflow : IWorkflow<ApprovalInput>
2828
public void Build(IWorkflowBuilder<ApprovalInput> builder)
2929
{
3030
builder
31+
.UseDefaultErrorBehavior(WorkflowErrorHandling.Terminate)
3132
.StartWith(context => ExecutionResult.Next())
3233
.SubWorkflow(nameof(ChildWorkflow))
3334
.Output(i => i.Approved, step => ((ApprovalInput)step.Result).Approved)
@@ -48,6 +49,7 @@ public class ChildWorkflow : IWorkflow<ApprovalInput>
4849
public void Build(IWorkflowBuilder<ApprovalInput> builder)
4950
{
5051
builder
52+
.UseDefaultErrorBehavior(WorkflowErrorHandling.Terminate)
5153
.StartWith(context => ExecutionResult.Next())
5254
.Parallel()
5355
.Do(then
@@ -71,7 +73,7 @@ public void Build(IWorkflowBuilder<ApprovalInput> builder)
7173
}
7274
}
7375

74-
public ApprovalScenario()
76+
public SubWorkflowScenario()
7577
{
7678
Setup();
7779
}
@@ -114,6 +116,34 @@ public void Scenario(bool approved)
114116
});
115117
}
116118

119+
[Fact]
120+
public void Failure()
121+
{
122+
Host.Registry.RegisterWorkflow(new ChildWorkflow());
123+
124+
var eventKey = Guid.NewGuid().ToString();
125+
var workflowId = StartWorkflow(new ApprovalInput
126+
{
127+
Id = eventKey,
128+
TimeSpan = TimeSpan.FromMinutes(10)
129+
});
130+
131+
WaitForEventSubscription("Approved", workflowId, TimeSpan.FromSeconds(5));
132+
UnhandledStepErrors.Should().BeEmpty();
133+
134+
Host.PublishEvent("Approved", workflowId, new
135+
{
136+
Approved = "string"
137+
});
138+
139+
WaitForWorkflowToComplete(workflowId, TimeSpan.FromSeconds(20));
140+
141+
System.Threading.Thread.Sleep(2000);
142+
143+
UnhandledStepErrors.Should().NotBeEmpty();
144+
GetStatus(workflowId).Should().Be(WorkflowStatus.Terminated);
145+
}
146+
117147
[Fact]
118148
public void Timeout()
119149
{

test/WorkflowCore.Tests.SqlServer/Scenarios/SqlServerApprovalScenario.cs renamed to test/WorkflowCore.Tests.SqlServer/Scenarios/SqlServerSubWorkflowScenario.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
namespace WorkflowCore.Tests.SqlServer.Scenarios
77
{
88
[Collection("SqlServer collection")]
9-
public class SqlServerApprovalScenario() : ApprovalScenario()
9+
public class SqlServerSubWorkflowScenario() : SubWorkflowScenario()
1010
{
1111
protected override void ConfigureServices(IServiceCollection services)
1212
{

test/WorkflowCore.Tests.Sqlite/Scenarios/SqliteApprovalScenario.cs renamed to test/WorkflowCore.Tests.Sqlite/Scenarios/SqliteSubWorkflowScenario.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
namespace WorkflowCore.Tests.Sqlite.Scenarios
88
{
99
[Collection("Sqlite collection")]
10-
public class SqliteApprovalScenario : ApprovalScenario
10+
public class SqliteSubWorkflowScenario : SubWorkflowScenario
1111
{
1212
protected override void ConfigureServices(IServiceCollection services)
1313
{

0 commit comments

Comments
 (0)