Skip to content

Commit 27fde60

Browse files
committed
Code reorganisation/refactoring for async and task CE extensions
1 parent 6f7eb2f commit 27fde60

File tree

9 files changed

+251
-200
lines changed

9 files changed

+251
-200
lines changed
Lines changed: 26 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1,32 +1,39 @@
1-
module TaskSeq.Extenions
1+
module TaskSeq.Tests.Extensions
22

33
open System
44
open Xunit
55
open FsUnit.Xunit
6-
open FsToolkit.ErrorHandling
76

87
open FSharp.Control
98

109
//
11-
// TaskSeq.except
12-
// TaskSeq.exceptOfSeq
10+
// Task extensions
11+
// Async extensions
1312
//
1413

1514

16-
module TaskBuilder =
17-
open TaskSeq.Tests
15+
module TaskCE =
16+
[<Theory; ClassData(typeof<TestImmTaskSeq>)>]
17+
let ``Task-for CE with taskSeq`` variant = task {
18+
let values = Gen.getSeqImmutable variant
19+
20+
let mutable sum = 0
21+
22+
for x in values do
23+
sum <- sum + x
24+
25+
sum |> should equal 55
26+
}
1827

28+
module AsyncCE =
1929
[<Theory; ClassData(typeof<TestImmTaskSeq>)>]
20-
let ``TaskSeq-existsAsync happy path last item of seq`` variant =
21-
task {
22-
let values = Gen.getSeqImmutable variant
23-
24-
let mutable sum = 0
25-
for x in values do
26-
sum <- sum + x
27-
28-
// let! expected =
29-
// (0, values)
30-
// ||> TaskSeq.fold((+))
31-
Assert.Equal(55, sum)
32-
}
30+
let ``Async-for CE with taskSeq`` variant = async {
31+
let values = Gen.getSeqImmutable variant
32+
33+
let mutable sum = 0
34+
35+
for x in values do
36+
sum <- sum + x
37+
38+
sum |> should equal 55
39+
}
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
namespace FSharp.Control
2+
3+
open System.Collections.Generic
4+
open System.Threading
5+
open System.Threading.Tasks
6+
7+
#nowarn "57"
8+
#nowarn "1204"
9+
#nowarn "3513"
10+
11+
12+
[<AutoOpen>]
13+
module AsyncExtensions =
14+
15+
// Add asynchronous for loop to the 'async' computation builder
16+
type Microsoft.FSharp.Control.AsyncBuilder with
17+
18+
member x.For(tasksq: IAsyncEnumerable<'T>, action: 'T -> Async<unit>) =
19+
tasksq
20+
|> TaskSeq.iterAsync (action >> Async.StartAsTask)
21+
|> Async.AwaitTask
22+
23+
24+
// temp example
25+
let foo () = async {
26+
let mutable sum = 0
27+
28+
let xs = taskSeq {
29+
1
30+
2
31+
3
32+
}
33+
34+
for x in xs do
35+
sum <- sum + x
36+
}
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
namespace FSharp.Control
2+
3+
#nowarn "1204"
4+
5+
[<AutoOpen>]
6+
module AsyncExtensions =
7+
8+
type AsyncBuilder with
9+
10+
member For: tasksq: System.Collections.Generic.IAsyncEnumerable<'T> * action: ('T -> Async<unit>) -> Async<unit>

src/FSharp.Control.TaskSeq/FSharp.Control.TaskSeq.fsproj

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -71,13 +71,12 @@ Generates optimized IL code through the new resumable state machines, and comes
7171
<Compile Include="TaskSeqInternal.fs" />
7272
<Compile Include="TaskSeq.fsi" />
7373
<Compile Include="TaskSeq.fs" />
74+
<Compile Include="TaskExtensions.fsi" />
75+
<Compile Include="TaskExtensions.fs" />
76+
<Compile Include="AsyncExtensions.fsi" />
77+
<Compile Include="AsyncExtensions.fs" />
7478
</ItemGroup>
7579

76-
<ItemGroup />
77-
78-
79-
<ItemGroup />
80-
8180
<ItemGroup>
8281
<!-- maximal compatibility with minimal required FSharp.Core version for TaskSeq -->
8382
<PackageReference Update="FSharp.Core" Version="6.0.2" />
Lines changed: 145 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,145 @@
1+
namespace FSharp.Control
2+
3+
open System.Collections.Generic
4+
open System.Threading
5+
open System.Threading.Tasks
6+
7+
#nowarn "57"
8+
#nowarn "1204"
9+
#nowarn "3513"
10+
11+
12+
[<AutoOpen>]
13+
module TaskExtensions =
14+
open Microsoft.FSharp.Core.CompilerServices
15+
open Microsoft.FSharp.Core.CompilerServices.StateMachineHelpers
16+
open Microsoft.FSharp.Core.LanguagePrimitives.IntrinsicOperators
17+
18+
let rec WhileDynamic
19+
(
20+
sm: byref<TaskStateMachine<'Data>>,
21+
condition: unit -> ValueTask<bool>,
22+
body: TaskCode<'Data, unit>
23+
) : bool =
24+
let vt = condition ()
25+
26+
TaskBuilderBase.BindDynamic(
27+
&sm,
28+
vt,
29+
fun result ->
30+
TaskCode<_, _>(fun sm ->
31+
if result then
32+
if body.Invoke(&sm) then
33+
WhileDynamic(&sm, condition, body)
34+
else
35+
let rf = sm.ResumptionDynamicInfo.ResumptionFunc
36+
37+
sm.ResumptionDynamicInfo.ResumptionFunc <-
38+
(TaskResumptionFunc<'Data>(fun sm -> WhileBodyDynamicAux(&sm, condition, body, rf)))
39+
40+
false
41+
else
42+
true)
43+
)
44+
45+
46+
and WhileBodyDynamicAux
47+
(
48+
sm: byref<TaskStateMachine<'Data>>,
49+
condition: unit -> ValueTask<bool>,
50+
body: TaskCode<'Data, unit>,
51+
rf: TaskResumptionFunc<_>
52+
) : bool =
53+
if rf.Invoke(&sm) then
54+
WhileDynamic(&sm, condition, body)
55+
else
56+
let rf = sm.ResumptionDynamicInfo.ResumptionFunc
57+
58+
sm.ResumptionDynamicInfo.ResumptionFunc <-
59+
(TaskResumptionFunc<'Data>(fun sm -> WhileBodyDynamicAux(&sm, condition, body, rf)))
60+
61+
false
62+
63+
// Add asynchronous for loop to the 'task' computation builder
64+
type Microsoft.FSharp.Control.TaskBuilder with
65+
66+
/// Used by `For`. F# currently doesn't support `while!`, so this cannot be called directly from the task CE
67+
/// This code is mostly a copy of TaskSeq.WhileAsync.
68+
member inline _.WhileAsync
69+
(
70+
[<InlineIfLambda>] condition: unit -> ValueTask<bool>,
71+
body: TaskCode<_, unit>
72+
) : TaskCode<_, _> =
73+
let mutable condition_res = true
74+
75+
ResumableCode.While(
76+
(fun () -> condition_res),
77+
TaskCode<_, _>(fun sm ->
78+
let mutable __stack_condition_fin = true
79+
let __stack_vtask = condition ()
80+
81+
let mutable awaiter = __stack_vtask.GetAwaiter()
82+
83+
if awaiter.IsCompleted then
84+
// logInfo "at WhileAsync: returning completed task"
85+
86+
__stack_condition_fin <- true
87+
condition_res <- awaiter.GetResult()
88+
else
89+
// logInfo "at WhileAsync: awaiting non-completed task"
90+
91+
// This will yield with __stack_fin = false
92+
// This will resume with __stack_fin = true
93+
let __stack_yield_fin = ResumableCode.Yield().Invoke(&sm)
94+
__stack_condition_fin <- __stack_yield_fin
95+
96+
if __stack_condition_fin then
97+
condition_res <- awaiter.GetResult()
98+
99+
100+
if __stack_condition_fin then
101+
if condition_res then body.Invoke(&sm) else true
102+
else
103+
sm.Data.MethodBuilder.AwaitUnsafeOnCompleted(&awaiter, &sm)
104+
false)
105+
)
106+
107+
member inline this.For(tasksq: IAsyncEnumerable<'T>, body: 'T -> TaskCode<_, unit>) : TaskCode<_, unit> =
108+
// tasksq
109+
// |> TaskSeq.iterAsync (body >> task.Run)
110+
// |> task.ReturnFrom
111+
112+
// task.ReturnFrom <|
113+
// task {
114+
// let mutable continueWhile = true
115+
// use e = tasksq.GetAsyncEnumerator()
116+
// while continueWhile do
117+
// let! next = e.MoveNextAsync()
118+
// if next then
119+
// do! task.Run(body e.Current)
120+
// else
121+
// continueWhile <- false
122+
// }
123+
124+
TaskCode<'TOverall, unit>(fun sm ->
125+
126+
this
127+
.Using(
128+
tasksq.GetAsyncEnumerator(CancellationToken()),
129+
(fun e -> this.WhileAsync(e.MoveNextAsync, (fun sm -> (body e.Current).Invoke(&sm))))
130+
)
131+
.Invoke(&sm))
132+
133+
// temp example
134+
let foo () = task {
135+
let mutable sum = 0
136+
137+
let xs = taskSeq {
138+
1
139+
2
140+
3
141+
}
142+
143+
for x in xs do
144+
sum <- sum + x
145+
}
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
namespace FSharp.Control
2+
3+
#nowarn "1204"
4+
5+
[<AutoOpen>]
6+
module TaskExtensions =
7+
8+
val WhileDynamic:
9+
sm: byref<TaskStateMachine<'Data>> *
10+
condition: (unit -> System.Threading.Tasks.ValueTask<bool>) *
11+
body: TaskCode<'Data, unit> ->
12+
bool
13+
14+
val WhileBodyDynamicAux:
15+
sm: byref<TaskStateMachine<'Data>> *
16+
condition: (unit -> System.Threading.Tasks.ValueTask<bool>) *
17+
body: TaskCode<'Data, unit> *
18+
rf: TaskResumptionFunc<'Data> ->
19+
bool
20+
21+
type TaskBuilder with
22+
23+
member inline WhileAsync:
24+
condition: (unit -> System.Threading.Tasks.ValueTask<bool>) * body: TaskCode<'TOverall, unit> ->
25+
TaskCode<'TOverall, unit>
26+
27+
member inline For:
28+
tasksq: System.Collections.Generic.IAsyncEnumerable<'T> * body: ('T -> TaskCode<'TOverall, unit>) ->
29+
TaskCode<'TOverall, unit>

0 commit comments

Comments
 (0)