1+ using Microsoft . EntityFrameworkCore ;
2+ using System ;
3+ using System . Collections . Generic ;
4+ using System . Data ;
5+ using System . Linq ;
6+ using System . Threading . Tasks ;
7+ using WorkflowCore . Persistence . EntityFramework . Models ;
8+ using WorkflowCore . Models ;
9+ using WorkflowCore . Persistence . EntityFramework . Interfaces ;
10+ using System . Threading ;
11+ using WorkflowCore . Interface ;
12+
13+ namespace WorkflowCore . Persistence . EntityFramework . Services
14+ {
15+ public sealed class LargeDataOptimizedEntityFrameworkPersistenceProvider : EntityFrameworkPersistenceProvider , IPersistenceProvider
16+ {
17+ private readonly IWorkflowDbContextFactory _contextFactory ;
18+
19+ public LargeDataOptimizedEntityFrameworkPersistenceProvider ( IWorkflowDbContextFactory contextFactory , bool canCreateDb , bool canMigrateDb )
20+ : base ( contextFactory , canCreateDb , canMigrateDb )
21+ {
22+ _contextFactory = contextFactory ;
23+ }
24+
25+ /// <inheritdoc/>
26+ public new async Task < IEnumerable < WorkflowInstance > > GetWorkflowInstances ( WorkflowStatus ? status , string type , DateTime ? createdFrom , DateTime ? createdTo , int skip , int take )
27+ {
28+ using ( var db = _contextFactory . Build ( ) )
29+ {
30+ IQueryable < PersistedWorkflow > query = db . Set < PersistedWorkflow > ( )
31+ . Include ( wf => wf . ExecutionPointers )
32+ . ThenInclude ( ep => ep . ExtensionAttributes )
33+ . Include ( wf => wf . ExecutionPointers )
34+ . AsSplitQuery ( )
35+ . AsQueryable ( ) ;
36+
37+ if ( status . HasValue )
38+ {
39+ query = query . Where ( x => x . Status == status . Value ) ;
40+ }
41+
42+ if ( ! string . IsNullOrEmpty ( type ) )
43+ {
44+ query = query . Where ( x => x . WorkflowDefinitionId == type ) ;
45+ }
46+
47+ if ( createdFrom . HasValue )
48+ {
49+ query = query . Where ( x => x . CreateTime >= createdFrom . Value ) ;
50+ }
51+
52+ if ( createdTo . HasValue )
53+ {
54+ query = query . Where ( x => x . CreateTime <= createdTo . Value ) ;
55+ }
56+
57+ var rawResult = await query . OrderBy ( x => x . PersistenceId ) . Skip ( skip ) . Take ( take ) . ToListAsync ( ) ;
58+
59+ var result = new List < WorkflowInstance > ( rawResult . Count ) ;
60+
61+ foreach ( var item in rawResult )
62+ {
63+ result . Add ( item . ToWorkflowInstance ( ) ) ;
64+ }
65+
66+ return result ;
67+ }
68+ }
69+
70+ /// <inheritdoc/>
71+ public new async Task < WorkflowInstance > GetWorkflowInstance ( string id , CancellationToken cancellationToken = default )
72+ {
73+ using ( var db = _contextFactory . Build ( ) )
74+ {
75+ var uid = new Guid ( id ) ;
76+ var raw = await db . Set < PersistedWorkflow > ( )
77+ . Include ( wf => wf . ExecutionPointers )
78+ . ThenInclude ( ep => ep . ExtensionAttributes )
79+ . Include ( wf => wf . ExecutionPointers )
80+ . AsSplitQuery ( )
81+ . FirstAsync ( x => x . InstanceId == uid , cancellationToken ) ;
82+
83+ return raw ? . ToWorkflowInstance ( ) ;
84+ }
85+ }
86+
87+ /// <inheritdoc/>
88+ public new async Task < IEnumerable < WorkflowInstance > > GetWorkflowInstances ( IEnumerable < string > ids , CancellationToken cancellationToken = default )
89+ {
90+ if ( ids == null )
91+ {
92+ return Array . Empty < WorkflowInstance > ( ) ;
93+ }
94+
95+ using ( var db = _contextFactory . Build ( ) )
96+ {
97+ var uids = ids . Select ( i => new Guid ( i ) ) ;
98+ var raw = db . Set < PersistedWorkflow > ( )
99+ . Include ( wf => wf . ExecutionPointers )
100+ . ThenInclude ( ep => ep . ExtensionAttributes )
101+ . Include ( wf => wf . ExecutionPointers )
102+ . AsSplitQuery ( )
103+ . Where ( x => uids . Contains ( x . InstanceId ) ) ;
104+
105+ var persistedWorkflows = await raw . ToListAsync ( cancellationToken ) ;
106+
107+ return persistedWorkflows . Select ( i => i . ToWorkflowInstance ( ) ) ;
108+ }
109+ }
110+
111+ /// <inheritdoc/>
112+ public new async Task PersistWorkflow ( WorkflowInstance workflow , CancellationToken cancellationToken = default )
113+ {
114+ using ( var db = _contextFactory . Build ( ) )
115+ using ( var transaction = await db . Database . BeginTransactionAsync ( IsolationLevel . RepeatableRead , cancellationToken ) )
116+ {
117+ var uid = new Guid ( workflow . Id ) ;
118+ var existingEntity = await db . Set < PersistedWorkflow > ( )
119+ . Where ( x => x . InstanceId == uid )
120+ . Include ( wf => wf . ExecutionPointers )
121+ . ThenInclude ( ep => ep . ExtensionAttributes )
122+ . Include ( wf => wf . ExecutionPointers )
123+ . AsSplitQuery ( )
124+ . AsTracking ( )
125+ . FirstAsync ( cancellationToken ) ;
126+
127+ _ = workflow . ToPersistable ( existingEntity ) ;
128+
129+ await db . SaveChangesAsync ( cancellationToken ) ;
130+
131+ await transaction . CommitAsync ( cancellationToken ) ;
132+ }
133+ }
134+ }
135+ }
0 commit comments