From 3202e672b1798d956fabbdd3b0354c8432614b61 Mon Sep 17 00:00:00 2001 From: Abel Braaksma Date: Mon, 14 Nov 2022 04:28:43 +0100 Subject: [PATCH 01/14] Initial attempt at implementing resumable code aware version of `For` in `Task` with `TaskSeq<'T>` --- src/FSharp.Control.TaskSeq/TaskSeq.fs | 32 +++++++++++++++++++++++++++ 1 file changed, 32 insertions(+) diff --git a/src/FSharp.Control.TaskSeq/TaskSeq.fs b/src/FSharp.Control.TaskSeq/TaskSeq.fs index fb45a1ae..7f6cb51a 100644 --- a/src/FSharp.Control.TaskSeq/TaskSeq.fs +++ b/src/FSharp.Control.TaskSeq/TaskSeq.fs @@ -4,6 +4,8 @@ open System.Collections.Generic open System.Threading open System.Threading.Tasks +#nowarn "57" + module TaskSeq = // F# BUG: the following module is 'AutoOpen' and this isn't needed in the Tests project. Why do we need to open it? open FSharp.Control.TaskSeqBuilders @@ -322,3 +324,33 @@ module TaskSeq = let fold folder state source = Internal.fold (FolderAction folder) state source let foldAsync folder state source = Internal.fold (AsyncFolderAction folder) state source + +[] +module AsyncSeqExtensions = + open Microsoft.FSharp.Core.CompilerServices + + // Add asynchronous for loop to the 'async' computation builder + type Microsoft.FSharp.Control.AsyncBuilder with + + member x.For(tasksq: IAsyncEnumerable<'T>, action: 'T -> Async) = + tasksq + |> TaskSeq.iterAsync (action >> Async.StartAsTask) + |> Async.AwaitTask + + // Add asynchronous for loop to the 'task' computation builder + type Microsoft.FSharp.Control.TaskBuilder with + + member inline this.For + ( + tasksq: IAsyncEnumerable<'T>, + body: 'T -> TaskCode<'TOverall, unit> + ) : TaskCode<'TOverall, unit> = + TaskCode<'TOverall, unit>(fun sm -> + this + .Using( + tasksq.GetAsyncEnumerator(CancellationToken()), + (fun e -> + // TODO: fix 'true' with e.MoveNextAsync() + this.While((fun () -> true), (fun sm -> (body e.Current).Invoke(&sm)))) + ) + .Invoke(&sm)) From 4481e79b5b7c2723cff4fbe7d15616661bfef010 Mon Sep 17 00:00:00 2001 From: Jimmy Byrd Date: Mon, 21 Nov 2022 22:34:31 -0500 Subject: [PATCH 02/14] WhileDynamic for task files --- .../FSharp.Control.TaskSeq.Test.fsproj | 6 +- .../TaskSeq.Extensions.Tests.fs | 28 ++++++++ src/FSharp.Control.TaskSeq/TaskSeq.fs | 68 ++++++++++++++++++- src/FSharp.Control.TaskSeq/TaskSeq.fsi | 35 ++++++++++ 4 files changed, 130 insertions(+), 7 deletions(-) create mode 100644 src/FSharp.Control.TaskSeq.Test/TaskSeq.Extensions.Tests.fs diff --git a/src/FSharp.Control.TaskSeq.Test/FSharp.Control.TaskSeq.Test.fsproj b/src/FSharp.Control.TaskSeq.Test/FSharp.Control.TaskSeq.Test.fsproj index fdeb70de..d7c3e65e 100644 --- a/src/FSharp.Control.TaskSeq.Test/FSharp.Control.TaskSeq.Test.fsproj +++ b/src/FSharp.Control.TaskSeq.Test/FSharp.Control.TaskSeq.Test.fsproj @@ -2,12 +2,10 @@ net6.0 - false false ..\..\assets\TaskSeq.ico - @@ -48,9 +46,9 @@ + - @@ -68,9 +66,7 @@ all - - diff --git a/src/FSharp.Control.TaskSeq.Test/TaskSeq.Extensions.Tests.fs b/src/FSharp.Control.TaskSeq.Test/TaskSeq.Extensions.Tests.fs new file mode 100644 index 00000000..0b30e575 --- /dev/null +++ b/src/FSharp.Control.TaskSeq.Test/TaskSeq.Extensions.Tests.fs @@ -0,0 +1,28 @@ +module TaskSeq.Extenions + +open System +open Xunit +open FsUnit.Xunit +open FsToolkit.ErrorHandling + +open FSharp.Control + +// +// TaskSeq.except +// TaskSeq.exceptOfSeq +// + + +// module TaskBuilder = +// open TaskSeq.Tests + +// [)>] +// let ``TaskSeq-existsAsync happy path last item of seq`` variant = +// task { +// let values = Gen.getSeqImmutable variant +// let mutable sum = 0 +// for x in values do +// sum <- sum + x +// } + // |> TaskSeq.existsAsync (fun x -> task { return x = 10 }) + // |> Task.map (should be True) diff --git a/src/FSharp.Control.TaskSeq/TaskSeq.fs b/src/FSharp.Control.TaskSeq/TaskSeq.fs index 7f6cb51a..bf9dbf1f 100644 --- a/src/FSharp.Control.TaskSeq/TaskSeq.fs +++ b/src/FSharp.Control.TaskSeq/TaskSeq.fs @@ -325,8 +325,54 @@ module TaskSeq = let foldAsync folder state source = Internal.fold (AsyncFolderAction folder) state source +#nowarn "1204" +#nowarn "3513" + + [] module AsyncSeqExtensions = + + let rec WhileDynamic + ( + sm: byref>, + condition: unit -> ValueTask, + body: TaskCode<'Data, unit> + ) : bool = + let vt = condition () + TaskBuilderBase.BindDynamic(&sm, vt, fun result -> + TaskCode<_,_>(fun sm -> + if result then + if body.Invoke(&sm) then + WhileDynamic(&sm, condition, body) + else + let rf = sm.ResumptionDynamicInfo.ResumptionFunc + + sm.ResumptionDynamicInfo.ResumptionFunc <- + (TaskResumptionFunc<'Data>(fun sm -> WhileBodyDynamicAux(&sm, condition, body, rf))) + + false + else + true + ) + ) + + + and WhileBodyDynamicAux + ( + sm: byref>, + condition: unit -> ValueTask, + body: TaskCode<'Data, unit>, + rf: TaskResumptionFunc<_> + ) : bool = + if rf.Invoke(&sm) then + WhileDynamic(&sm, condition, body) + else + let rf = sm.ResumptionDynamicInfo.ResumptionFunc + + sm.ResumptionDynamicInfo.ResumptionFunc <- + (TaskResumptionFunc<'Data>(fun sm -> WhileBodyDynamicAux(&sm, condition, body, rf))) + + false open Microsoft.FSharp.Core.CompilerServices // Add asynchronous for loop to the 'async' computation builder @@ -340,17 +386,35 @@ module AsyncSeqExtensions = // Add asynchronous for loop to the 'task' computation builder type Microsoft.FSharp.Control.TaskBuilder with + + member inline this.While(condition : unit -> ValueTask, body : TaskCode<'TOverall,unit>) = + TaskCode<_,_>(fun sm -> + WhileDynamic(&sm, condition, body) + + ) + + + member inline this.For ( tasksq: IAsyncEnumerable<'T>, body: 'T -> TaskCode<'TOverall, unit> ) : TaskCode<'TOverall, unit> = TaskCode<'TOverall, unit>(fun sm -> + this .Using( tasksq.GetAsyncEnumerator(CancellationToken()), (fun e -> - // TODO: fix 'true' with e.MoveNextAsync() - this.While((fun () -> true), (fun sm -> (body e.Current).Invoke(&sm)))) + let next () = e.MoveNextAsync() + this.While(next, (fun sm -> (body e.Current).Invoke(&sm)))) ) .Invoke(&sm)) + + let foo () = + task { + let mutable sum = 0 + let xs = taskSeq { 1; 2; 3} + for x in xs do + sum <- sum + x + } diff --git a/src/FSharp.Control.TaskSeq/TaskSeq.fsi b/src/FSharp.Control.TaskSeq/TaskSeq.fsi index 0d9fe0c9..63fdb00d 100644 --- a/src/FSharp.Control.TaskSeq/TaskSeq.fsi +++ b/src/FSharp.Control.TaskSeq/TaskSeq.fsi @@ -1,5 +1,7 @@ namespace FSharp.Control +#nowarn "1204" + module TaskSeq = open System.Collections.Generic open System.Threading.Tasks @@ -567,3 +569,36 @@ module TaskSeq = /// If the accumulator function does not need to be asynchronous, consider using . /// val foldAsync: folder: ('State -> 'T -> #Task<'State>) -> state: 'State -> source: taskSeq<'T> -> Task<'State> + + + +[] +module AsyncSeqExtensions = + + val WhileDynamic: + sm: byref> * + condition: (unit -> System.Threading.Tasks.ValueTask) * + body: TaskCode<'Data, unit> -> + bool + + val WhileBodyDynamicAux: + sm: byref> * + condition: (unit -> System.Threading.Tasks.ValueTask) * + body: TaskCode<'Data, unit> * + rf: TaskResumptionFunc<'Data> -> + bool + + type AsyncBuilder with + + member For: tasksq: System.Collections.Generic.IAsyncEnumerable<'T> * action: ('T -> Async) -> Async + + type TaskBuilder with + + member inline While: + condition: (unit -> System.Threading.Tasks.ValueTask) * body: TaskCode<'TOverall, unit> -> + TaskCode<'TOverall, 'a> + + member inline For: + tasksq: System.Collections.Generic.IAsyncEnumerable<'T> * body: ('T -> TaskCode<'TOverall, unit>) -> + TaskCode<'TOverall, unit> + From 57e7ceea78ff010982d9af91cdbcd2b75dc2d516 Mon Sep 17 00:00:00 2001 From: Jimmy Byrd Date: Tue, 22 Nov 2022 00:27:30 -0500 Subject: [PATCH 03/14] WhileAsync statically compiled --- .../TaskSeq.Extensions.Tests.fs | 31 ++++++++------ src/FSharp.Control.TaskSeq/TaskSeq.fs | 42 ++++++++++++++++--- src/FSharp.Control.TaskSeq/TaskSeq.fsi | 4 +- 3 files changed, 57 insertions(+), 20 deletions(-) diff --git a/src/FSharp.Control.TaskSeq.Test/TaskSeq.Extensions.Tests.fs b/src/FSharp.Control.TaskSeq.Test/TaskSeq.Extensions.Tests.fs index 0b30e575..6fc7ad6a 100644 --- a/src/FSharp.Control.TaskSeq.Test/TaskSeq.Extensions.Tests.fs +++ b/src/FSharp.Control.TaskSeq.Test/TaskSeq.Extensions.Tests.fs @@ -13,16 +13,21 @@ open FSharp.Control // -// module TaskBuilder = -// open TaskSeq.Tests - -// [)>] -// let ``TaskSeq-existsAsync happy path last item of seq`` variant = -// task { -// let values = Gen.getSeqImmutable variant -// let mutable sum = 0 -// for x in values do -// sum <- sum + x -// } - // |> TaskSeq.existsAsync (fun x -> task { return x = 10 }) - // |> Task.map (should be True) +module TaskBuilder = + open TaskSeq.Tests + + [)>] + let ``TaskSeq-existsAsync happy path last item of seq`` variant = + task { + let values = Gen.getSeqImmutable variant + + let mutable sum = 0 + for x in values do + sum <- sum + x + + // let! expected = + // (0, values) + // ||> TaskSeq.fold((+)) + Assert.Equal(55, sum) + } + diff --git a/src/FSharp.Control.TaskSeq/TaskSeq.fs b/src/FSharp.Control.TaskSeq/TaskSeq.fs index bf9dbf1f..7831e436 100644 --- a/src/FSharp.Control.TaskSeq/TaskSeq.fs +++ b/src/FSharp.Control.TaskSeq/TaskSeq.fs @@ -374,6 +374,8 @@ module AsyncSeqExtensions = false open Microsoft.FSharp.Core.CompilerServices + open Microsoft.FSharp.Core.CompilerServices.StateMachineHelpers + open Microsoft.FSharp.Core.LanguagePrimitives.IntrinsicOperators // Add asynchronous for loop to the 'async' computation builder type Microsoft.FSharp.Control.AsyncBuilder with @@ -387,14 +389,44 @@ module AsyncSeqExtensions = type Microsoft.FSharp.Control.TaskBuilder with - member inline this.While(condition : unit -> ValueTask, body : TaskCode<'TOverall,unit>) = - TaskCode<_,_>(fun sm -> - WhileDynamic(&sm, condition, body) + member inline _.WhileAsync + ( + [] condition: unit -> ValueTask, + body: TaskCode<_,unit> + ) : TaskCode<_,_> = + let mutable condition_res = true + + ResumableCode.While( + (fun () -> condition_res), + ResumableCode<_, _>(fun sm -> + let mutable __stack_condition_fin = true + let __stack_vtask = condition () + + let mutable awaiter = __stack_vtask.GetAwaiter() + if awaiter.IsCompleted then + // logInfo "at WhileAsync: returning completed task" + + __stack_condition_fin <- true + condition_res <- __stack_vtask.Result + else + // logInfo "at WhileAsync: awaiting non-completed task" - ) + // This will yield with __stack_fin = false + // This will resume with __stack_fin = true + let __stack_yield_fin = ResumableCode.Yield().Invoke(&sm) + __stack_condition_fin <- __stack_yield_fin + if __stack_condition_fin then + condition_res <- awaiter.GetResult() + if __stack_condition_fin then + if condition_res then body.Invoke(&sm) else true + else + sm.Data.MethodBuilder.AwaitUnsafeOnCompleted(&awaiter, &sm) + false) + ) + member inline this.For ( tasksq: IAsyncEnumerable<'T>, @@ -407,7 +439,7 @@ module AsyncSeqExtensions = tasksq.GetAsyncEnumerator(CancellationToken()), (fun e -> let next () = e.MoveNextAsync() - this.While(next, (fun sm -> (body e.Current).Invoke(&sm)))) + this.WhileAsync(next, (fun sm -> (body e.Current).Invoke(&sm)))) ) .Invoke(&sm)) diff --git a/src/FSharp.Control.TaskSeq/TaskSeq.fsi b/src/FSharp.Control.TaskSeq/TaskSeq.fsi index 63fdb00d..7df46d46 100644 --- a/src/FSharp.Control.TaskSeq/TaskSeq.fsi +++ b/src/FSharp.Control.TaskSeq/TaskSeq.fsi @@ -594,9 +594,9 @@ module AsyncSeqExtensions = type TaskBuilder with - member inline While: + member inline WhileAsync: condition: (unit -> System.Threading.Tasks.ValueTask) * body: TaskCode<'TOverall, unit> -> - TaskCode<'TOverall, 'a> + TaskCode<'TOverall, unit> member inline For: tasksq: System.Collections.Generic.IAsyncEnumerable<'T> * body: ('T -> TaskCode<'TOverall, unit>) -> From 247e430cc8c4d1d68d2547a9add5db7bae4faa55 Mon Sep 17 00:00:00 2001 From: Jimmy Byrd Date: Tue, 22 Nov 2022 20:43:36 -0500 Subject: [PATCH 04/14] 3 different ways to possibly implement WhileAsync --- src/FSharp.Control.TaskSeq/TaskSeq.fs | 27 +++++++++++++++++++++------ 1 file changed, 21 insertions(+), 6 deletions(-) diff --git a/src/FSharp.Control.TaskSeq/TaskSeq.fs b/src/FSharp.Control.TaskSeq/TaskSeq.fs index 7831e436..a328ded2 100644 --- a/src/FSharp.Control.TaskSeq/TaskSeq.fs +++ b/src/FSharp.Control.TaskSeq/TaskSeq.fs @@ -398,7 +398,7 @@ module AsyncSeqExtensions = ResumableCode.While( (fun () -> condition_res), - ResumableCode<_, _>(fun sm -> + TaskCode<_, _>(fun sm -> let mutable __stack_condition_fin = true let __stack_vtask = condition () @@ -407,7 +407,7 @@ module AsyncSeqExtensions = // logInfo "at WhileAsync: returning completed task" __stack_condition_fin <- true - condition_res <- __stack_vtask.Result + condition_res <- awaiter.GetResult() else // logInfo "at WhileAsync: awaiting non-completed task" @@ -430,16 +430,31 @@ module AsyncSeqExtensions = member inline this.For ( tasksq: IAsyncEnumerable<'T>, - body: 'T -> TaskCode<'TOverall, unit> - ) : TaskCode<'TOverall, unit> = + body: 'T -> TaskCode<_, unit> + ) : TaskCode<_, unit> = + // tasksq + // |> TaskSeq.iterAsync (body >> task.Run) + // |> task.ReturnFrom + + // task.ReturnFrom <| + // task { + // let mutable continueWhile = true + // use e = tasksq.GetAsyncEnumerator() + // while continueWhile do + // let! next = e.MoveNextAsync() + // if next then + // do! task.Run(body e.Current) + // else + // continueWhile <- false + // } + TaskCode<'TOverall, unit>(fun sm -> this .Using( tasksq.GetAsyncEnumerator(CancellationToken()), (fun e -> - let next () = e.MoveNextAsync() - this.WhileAsync(next, (fun sm -> (body e.Current).Invoke(&sm)))) + this.WhileAsync(e.MoveNextAsync, (fun sm -> (body e.Current).Invoke(&sm)))) ) .Invoke(&sm)) From 785af2a180099516b67c180d875e0d1628bbb145 Mon Sep 17 00:00:00 2001 From: Jimmy Byrd Date: Mon, 21 Nov 2022 22:34:31 -0500 Subject: [PATCH 05/14] WhileDynamic for task files --- src/FSharp.Control.TaskSeq.Test/TaskSeq.Extensions.Tests.fs | 1 - src/FSharp.Control.TaskSeq/TaskSeq.fsi | 1 - 2 files changed, 2 deletions(-) diff --git a/src/FSharp.Control.TaskSeq.Test/TaskSeq.Extensions.Tests.fs b/src/FSharp.Control.TaskSeq.Test/TaskSeq.Extensions.Tests.fs index 6fc7ad6a..7cbd1b08 100644 --- a/src/FSharp.Control.TaskSeq.Test/TaskSeq.Extensions.Tests.fs +++ b/src/FSharp.Control.TaskSeq.Test/TaskSeq.Extensions.Tests.fs @@ -30,4 +30,3 @@ module TaskBuilder = // ||> TaskSeq.fold((+)) Assert.Equal(55, sum) } - diff --git a/src/FSharp.Control.TaskSeq/TaskSeq.fsi b/src/FSharp.Control.TaskSeq/TaskSeq.fsi index 7df46d46..fe556101 100644 --- a/src/FSharp.Control.TaskSeq/TaskSeq.fsi +++ b/src/FSharp.Control.TaskSeq/TaskSeq.fsi @@ -601,4 +601,3 @@ module AsyncSeqExtensions = member inline For: tasksq: System.Collections.Generic.IAsyncEnumerable<'T> * body: ('T -> TaskCode<'TOverall, unit>) -> TaskCode<'TOverall, unit> - From 6f7eb2f1697e9e6c5133d1d25369cd33edf1643d Mon Sep 17 00:00:00 2001 From: Abel Braaksma Date: Fri, 25 Nov 2022 16:06:07 +0100 Subject: [PATCH 06/14] Simplify diff --- .../FSharp.Control.TaskSeq.Test.fsproj | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/src/FSharp.Control.TaskSeq.Test/FSharp.Control.TaskSeq.Test.fsproj b/src/FSharp.Control.TaskSeq.Test/FSharp.Control.TaskSeq.Test.fsproj index d7c3e65e..38215053 100644 --- a/src/FSharp.Control.TaskSeq.Test/FSharp.Control.TaskSeq.Test.fsproj +++ b/src/FSharp.Control.TaskSeq.Test/FSharp.Control.TaskSeq.Test.fsproj @@ -2,10 +2,12 @@ net6.0 + false false ..\..\assets\TaskSeq.ico + @@ -49,6 +51,7 @@ + @@ -66,7 +69,9 @@ all + + From 27fde605f5ff30adfe801cfb3a2877c0f3ff6ca3 Mon Sep 17 00:00:00 2001 From: Abel Braaksma Date: Fri, 25 Nov 2022 16:24:30 +0100 Subject: [PATCH 07/14] Code reorganisation/refactoring for `async` and `task` CE extensions --- .../TaskSeq.Extensions.Tests.fs | 45 +++--- src/FSharp.Control.TaskSeq/AsyncExtensions.fs | 36 +++++ .../AsyncExtensions.fsi | 10 ++ .../FSharp.Control.TaskSeq.fsproj | 9 +- src/FSharp.Control.TaskSeq/TaskExtensions.fs | 145 ++++++++++++++++++ src/FSharp.Control.TaskSeq/TaskExtensions.fsi | 29 ++++ src/FSharp.Control.TaskSeq/TaskSeq.fs | 141 ----------------- src/FSharp.Control.TaskSeq/TaskSeq.fsi | 32 ---- src/FSharp.Control.TaskSeq/TaskSeqBuilder.fs | 4 +- 9 files changed, 251 insertions(+), 200 deletions(-) create mode 100644 src/FSharp.Control.TaskSeq/AsyncExtensions.fs create mode 100644 src/FSharp.Control.TaskSeq/AsyncExtensions.fsi create mode 100644 src/FSharp.Control.TaskSeq/TaskExtensions.fs create mode 100644 src/FSharp.Control.TaskSeq/TaskExtensions.fsi diff --git a/src/FSharp.Control.TaskSeq.Test/TaskSeq.Extensions.Tests.fs b/src/FSharp.Control.TaskSeq.Test/TaskSeq.Extensions.Tests.fs index 7cbd1b08..682fb18e 100644 --- a/src/FSharp.Control.TaskSeq.Test/TaskSeq.Extensions.Tests.fs +++ b/src/FSharp.Control.TaskSeq.Test/TaskSeq.Extensions.Tests.fs @@ -1,32 +1,39 @@ -module TaskSeq.Extenions +module TaskSeq.Tests.Extensions open System open Xunit open FsUnit.Xunit -open FsToolkit.ErrorHandling open FSharp.Control // -// TaskSeq.except -// TaskSeq.exceptOfSeq +// Task extensions +// Async extensions // -module TaskBuilder = - open TaskSeq.Tests +module TaskCE = + [)>] + let ``Task-for CE with taskSeq`` variant = task { + let values = Gen.getSeqImmutable variant + + let mutable sum = 0 + + for x in values do + sum <- sum + x + + sum |> should equal 55 + } +module AsyncCE = [)>] - let ``TaskSeq-existsAsync happy path last item of seq`` variant = - task { - let values = Gen.getSeqImmutable variant - - let mutable sum = 0 - for x in values do - sum <- sum + x - - // let! expected = - // (0, values) - // ||> TaskSeq.fold((+)) - Assert.Equal(55, sum) - } + let ``Async-for CE with taskSeq`` variant = async { + let values = Gen.getSeqImmutable variant + + let mutable sum = 0 + + for x in values do + sum <- sum + x + + sum |> should equal 55 + } diff --git a/src/FSharp.Control.TaskSeq/AsyncExtensions.fs b/src/FSharp.Control.TaskSeq/AsyncExtensions.fs new file mode 100644 index 00000000..b7e86654 --- /dev/null +++ b/src/FSharp.Control.TaskSeq/AsyncExtensions.fs @@ -0,0 +1,36 @@ +namespace FSharp.Control + +open System.Collections.Generic +open System.Threading +open System.Threading.Tasks + +#nowarn "57" +#nowarn "1204" +#nowarn "3513" + + +[] +module AsyncExtensions = + + // Add asynchronous for loop to the 'async' computation builder + type Microsoft.FSharp.Control.AsyncBuilder with + + member x.For(tasksq: IAsyncEnumerable<'T>, action: 'T -> Async) = + tasksq + |> TaskSeq.iterAsync (action >> Async.StartAsTask) + |> Async.AwaitTask + + + // temp example + let foo () = async { + let mutable sum = 0 + + let xs = taskSeq { + 1 + 2 + 3 + } + + for x in xs do + sum <- sum + x + } diff --git a/src/FSharp.Control.TaskSeq/AsyncExtensions.fsi b/src/FSharp.Control.TaskSeq/AsyncExtensions.fsi new file mode 100644 index 00000000..7e5a7228 --- /dev/null +++ b/src/FSharp.Control.TaskSeq/AsyncExtensions.fsi @@ -0,0 +1,10 @@ +namespace FSharp.Control + +#nowarn "1204" + +[] +module AsyncExtensions = + + type AsyncBuilder with + + member For: tasksq: System.Collections.Generic.IAsyncEnumerable<'T> * action: ('T -> Async) -> Async diff --git a/src/FSharp.Control.TaskSeq/FSharp.Control.TaskSeq.fsproj b/src/FSharp.Control.TaskSeq/FSharp.Control.TaskSeq.fsproj index ffe66c2f..f6d1d6c1 100644 --- a/src/FSharp.Control.TaskSeq/FSharp.Control.TaskSeq.fsproj +++ b/src/FSharp.Control.TaskSeq/FSharp.Control.TaskSeq.fsproj @@ -71,13 +71,12 @@ Generates optimized IL code through the new resumable state machines, and comes + + + + - - - - - diff --git a/src/FSharp.Control.TaskSeq/TaskExtensions.fs b/src/FSharp.Control.TaskSeq/TaskExtensions.fs new file mode 100644 index 00000000..0a2ace5d --- /dev/null +++ b/src/FSharp.Control.TaskSeq/TaskExtensions.fs @@ -0,0 +1,145 @@ +namespace FSharp.Control + +open System.Collections.Generic +open System.Threading +open System.Threading.Tasks + +#nowarn "57" +#nowarn "1204" +#nowarn "3513" + + +[] +module TaskExtensions = + open Microsoft.FSharp.Core.CompilerServices + open Microsoft.FSharp.Core.CompilerServices.StateMachineHelpers + open Microsoft.FSharp.Core.LanguagePrimitives.IntrinsicOperators + + let rec WhileDynamic + ( + sm: byref>, + condition: unit -> ValueTask, + body: TaskCode<'Data, unit> + ) : bool = + let vt = condition () + + TaskBuilderBase.BindDynamic( + &sm, + vt, + fun result -> + TaskCode<_, _>(fun sm -> + if result then + if body.Invoke(&sm) then + WhileDynamic(&sm, condition, body) + else + let rf = sm.ResumptionDynamicInfo.ResumptionFunc + + sm.ResumptionDynamicInfo.ResumptionFunc <- + (TaskResumptionFunc<'Data>(fun sm -> WhileBodyDynamicAux(&sm, condition, body, rf))) + + false + else + true) + ) + + + and WhileBodyDynamicAux + ( + sm: byref>, + condition: unit -> ValueTask, + body: TaskCode<'Data, unit>, + rf: TaskResumptionFunc<_> + ) : bool = + if rf.Invoke(&sm) then + WhileDynamic(&sm, condition, body) + else + let rf = sm.ResumptionDynamicInfo.ResumptionFunc + + sm.ResumptionDynamicInfo.ResumptionFunc <- + (TaskResumptionFunc<'Data>(fun sm -> WhileBodyDynamicAux(&sm, condition, body, rf))) + + false + + // Add asynchronous for loop to the 'task' computation builder + type Microsoft.FSharp.Control.TaskBuilder with + + /// Used by `For`. F# currently doesn't support `while!`, so this cannot be called directly from the task CE + /// This code is mostly a copy of TaskSeq.WhileAsync. + member inline _.WhileAsync + ( + [] condition: unit -> ValueTask, + body: TaskCode<_, unit> + ) : TaskCode<_, _> = + let mutable condition_res = true + + ResumableCode.While( + (fun () -> condition_res), + TaskCode<_, _>(fun sm -> + let mutable __stack_condition_fin = true + let __stack_vtask = condition () + + let mutable awaiter = __stack_vtask.GetAwaiter() + + if awaiter.IsCompleted then + // logInfo "at WhileAsync: returning completed task" + + __stack_condition_fin <- true + condition_res <- awaiter.GetResult() + else + // logInfo "at WhileAsync: awaiting non-completed task" + + // This will yield with __stack_fin = false + // This will resume with __stack_fin = true + let __stack_yield_fin = ResumableCode.Yield().Invoke(&sm) + __stack_condition_fin <- __stack_yield_fin + + if __stack_condition_fin then + condition_res <- awaiter.GetResult() + + + if __stack_condition_fin then + if condition_res then body.Invoke(&sm) else true + else + sm.Data.MethodBuilder.AwaitUnsafeOnCompleted(&awaiter, &sm) + false) + ) + + member inline this.For(tasksq: IAsyncEnumerable<'T>, body: 'T -> TaskCode<_, unit>) : TaskCode<_, unit> = + // tasksq + // |> TaskSeq.iterAsync (body >> task.Run) + // |> task.ReturnFrom + + // task.ReturnFrom <| + // task { + // let mutable continueWhile = true + // use e = tasksq.GetAsyncEnumerator() + // while continueWhile do + // let! next = e.MoveNextAsync() + // if next then + // do! task.Run(body e.Current) + // else + // continueWhile <- false + // } + + TaskCode<'TOverall, unit>(fun sm -> + + this + .Using( + tasksq.GetAsyncEnumerator(CancellationToken()), + (fun e -> this.WhileAsync(e.MoveNextAsync, (fun sm -> (body e.Current).Invoke(&sm)))) + ) + .Invoke(&sm)) + + // temp example + let foo () = task { + let mutable sum = 0 + + let xs = taskSeq { + 1 + 2 + 3 + } + + for x in xs do + sum <- sum + x + } diff --git a/src/FSharp.Control.TaskSeq/TaskExtensions.fsi b/src/FSharp.Control.TaskSeq/TaskExtensions.fsi new file mode 100644 index 00000000..2fbc9930 --- /dev/null +++ b/src/FSharp.Control.TaskSeq/TaskExtensions.fsi @@ -0,0 +1,29 @@ +namespace FSharp.Control + +#nowarn "1204" + +[] +module TaskExtensions = + + val WhileDynamic: + sm: byref> * + condition: (unit -> System.Threading.Tasks.ValueTask) * + body: TaskCode<'Data, unit> -> + bool + + val WhileBodyDynamicAux: + sm: byref> * + condition: (unit -> System.Threading.Tasks.ValueTask) * + body: TaskCode<'Data, unit> * + rf: TaskResumptionFunc<'Data> -> + bool + + type TaskBuilder with + + member inline WhileAsync: + condition: (unit -> System.Threading.Tasks.ValueTask) * body: TaskCode<'TOverall, unit> -> + TaskCode<'TOverall, unit> + + member inline For: + tasksq: System.Collections.Generic.IAsyncEnumerable<'T> * body: ('T -> TaskCode<'TOverall, unit>) -> + TaskCode<'TOverall, unit> diff --git a/src/FSharp.Control.TaskSeq/TaskSeq.fs b/src/FSharp.Control.TaskSeq/TaskSeq.fs index a328ded2..e1a13d1b 100644 --- a/src/FSharp.Control.TaskSeq/TaskSeq.fs +++ b/src/FSharp.Control.TaskSeq/TaskSeq.fs @@ -324,144 +324,3 @@ module TaskSeq = let fold folder state source = Internal.fold (FolderAction folder) state source let foldAsync folder state source = Internal.fold (AsyncFolderAction folder) state source - -#nowarn "1204" -#nowarn "3513" - - -[] -module AsyncSeqExtensions = - - let rec WhileDynamic - ( - sm: byref>, - condition: unit -> ValueTask, - body: TaskCode<'Data, unit> - ) : bool = - let vt = condition () - TaskBuilderBase.BindDynamic(&sm, vt, fun result -> - TaskCode<_,_>(fun sm -> - if result then - if body.Invoke(&sm) then - WhileDynamic(&sm, condition, body) - else - let rf = sm.ResumptionDynamicInfo.ResumptionFunc - - sm.ResumptionDynamicInfo.ResumptionFunc <- - (TaskResumptionFunc<'Data>(fun sm -> WhileBodyDynamicAux(&sm, condition, body, rf))) - - false - else - true - ) - ) - - - and WhileBodyDynamicAux - ( - sm: byref>, - condition: unit -> ValueTask, - body: TaskCode<'Data, unit>, - rf: TaskResumptionFunc<_> - ) : bool = - if rf.Invoke(&sm) then - WhileDynamic(&sm, condition, body) - else - let rf = sm.ResumptionDynamicInfo.ResumptionFunc - - sm.ResumptionDynamicInfo.ResumptionFunc <- - (TaskResumptionFunc<'Data>(fun sm -> WhileBodyDynamicAux(&sm, condition, body, rf))) - - false - open Microsoft.FSharp.Core.CompilerServices - open Microsoft.FSharp.Core.CompilerServices.StateMachineHelpers - open Microsoft.FSharp.Core.LanguagePrimitives.IntrinsicOperators - - // Add asynchronous for loop to the 'async' computation builder - type Microsoft.FSharp.Control.AsyncBuilder with - - member x.For(tasksq: IAsyncEnumerable<'T>, action: 'T -> Async) = - tasksq - |> TaskSeq.iterAsync (action >> Async.StartAsTask) - |> Async.AwaitTask - - // Add asynchronous for loop to the 'task' computation builder - type Microsoft.FSharp.Control.TaskBuilder with - - - member inline _.WhileAsync - ( - [] condition: unit -> ValueTask, - body: TaskCode<_,unit> - ) : TaskCode<_,_> = - let mutable condition_res = true - - ResumableCode.While( - (fun () -> condition_res), - TaskCode<_, _>(fun sm -> - let mutable __stack_condition_fin = true - let __stack_vtask = condition () - - let mutable awaiter = __stack_vtask.GetAwaiter() - if awaiter.IsCompleted then - // logInfo "at WhileAsync: returning completed task" - - __stack_condition_fin <- true - condition_res <- awaiter.GetResult() - else - // logInfo "at WhileAsync: awaiting non-completed task" - - // This will yield with __stack_fin = false - // This will resume with __stack_fin = true - let __stack_yield_fin = ResumableCode.Yield().Invoke(&sm) - __stack_condition_fin <- __stack_yield_fin - - if __stack_condition_fin then - condition_res <- awaiter.GetResult() - - - if __stack_condition_fin then - if condition_res then body.Invoke(&sm) else true - else - sm.Data.MethodBuilder.AwaitUnsafeOnCompleted(&awaiter, &sm) - false) - ) - - member inline this.For - ( - tasksq: IAsyncEnumerable<'T>, - body: 'T -> TaskCode<_, unit> - ) : TaskCode<_, unit> = - // tasksq - // |> TaskSeq.iterAsync (body >> task.Run) - // |> task.ReturnFrom - - // task.ReturnFrom <| - // task { - // let mutable continueWhile = true - // use e = tasksq.GetAsyncEnumerator() - // while continueWhile do - // let! next = e.MoveNextAsync() - // if next then - // do! task.Run(body e.Current) - // else - // continueWhile <- false - // } - - TaskCode<'TOverall, unit>(fun sm -> - - this - .Using( - tasksq.GetAsyncEnumerator(CancellationToken()), - (fun e -> - this.WhileAsync(e.MoveNextAsync, (fun sm -> (body e.Current).Invoke(&sm)))) - ) - .Invoke(&sm)) - - let foo () = - task { - let mutable sum = 0 - let xs = taskSeq { 1; 2; 3} - for x in xs do - sum <- sum + x - } diff --git a/src/FSharp.Control.TaskSeq/TaskSeq.fsi b/src/FSharp.Control.TaskSeq/TaskSeq.fsi index fe556101..172ab83c 100644 --- a/src/FSharp.Control.TaskSeq/TaskSeq.fsi +++ b/src/FSharp.Control.TaskSeq/TaskSeq.fsi @@ -569,35 +569,3 @@ module TaskSeq = /// If the accumulator function does not need to be asynchronous, consider using . /// val foldAsync: folder: ('State -> 'T -> #Task<'State>) -> state: 'State -> source: taskSeq<'T> -> Task<'State> - - - -[] -module AsyncSeqExtensions = - - val WhileDynamic: - sm: byref> * - condition: (unit -> System.Threading.Tasks.ValueTask) * - body: TaskCode<'Data, unit> -> - bool - - val WhileBodyDynamicAux: - sm: byref> * - condition: (unit -> System.Threading.Tasks.ValueTask) * - body: TaskCode<'Data, unit> * - rf: TaskResumptionFunc<'Data> -> - bool - - type AsyncBuilder with - - member For: tasksq: System.Collections.Generic.IAsyncEnumerable<'T> * action: ('T -> Async) -> Async - - type TaskBuilder with - - member inline WhileAsync: - condition: (unit -> System.Threading.Tasks.ValueTask) * body: TaskCode<'TOverall, unit> -> - TaskCode<'TOverall, unit> - - member inline For: - tasksq: System.Collections.Generic.IAsyncEnumerable<'T> * body: ('T -> TaskCode<'TOverall, unit>) -> - TaskCode<'TOverall, unit> diff --git a/src/FSharp.Control.TaskSeq/TaskSeqBuilder.fs b/src/FSharp.Control.TaskSeq/TaskSeqBuilder.fs index 2331fc28..a1709bb4 100644 --- a/src/FSharp.Control.TaskSeq/TaskSeqBuilder.fs +++ b/src/FSharp.Control.TaskSeq/TaskSeqBuilder.fs @@ -448,6 +448,7 @@ type TaskSeqBuilder() = ResumableCode.Combine(task1, task2) + /// Used by `For`. F# currently doesn't support `while!`, so this cannot be called directly from the CE member inline _.WhileAsync ( [] condition: unit -> ValueTask, @@ -490,9 +491,6 @@ type TaskSeqBuilder() = member inline b.While([] condition: unit -> bool, body: TaskSeqCode<'T>) : TaskSeqCode<'T> = logInfo "at While(...)" - - // was this: - // b.WhileAsync((fun () -> ValueTask(condition ())), body) ResumableCode.While(condition, body) member inline _.TryWith(body: TaskSeqCode<'T>, catch: exn -> TaskSeqCode<'T>) : TaskSeqCode<'T> = From 18dd2fdafe1bdfbd85ebeec1e0061ece4688953e Mon Sep 17 00:00:00 2001 From: Abel Braaksma Date: Sat, 26 Nov 2022 17:51:34 +0100 Subject: [PATCH 08/14] Rename and split extension tests file between async and task --- .../FSharp.Control.TaskSeq.Test.fsproj | 3 +- ...ts.fs => TaskSeq.AsyncExtensions.Tests.fs} | 0 .../TaskSeq.TaskExtensions.Tests.fs | 39 +++++++++++++++++++ 3 files changed, 41 insertions(+), 1 deletion(-) rename src/FSharp.Control.TaskSeq.Test/{TaskSeq.Extensions.Tests.fs => TaskSeq.AsyncExtensions.Tests.fs} (100%) create mode 100644 src/FSharp.Control.TaskSeq.Test/TaskSeq.TaskExtensions.Tests.fs diff --git a/src/FSharp.Control.TaskSeq.Test/FSharp.Control.TaskSeq.Test.fsproj b/src/FSharp.Control.TaskSeq.Test/FSharp.Control.TaskSeq.Test.fsproj index 38215053..02c3b80d 100644 --- a/src/FSharp.Control.TaskSeq.Test/FSharp.Control.TaskSeq.Test.fsproj +++ b/src/FSharp.Control.TaskSeq.Test/FSharp.Control.TaskSeq.Test.fsproj @@ -48,7 +48,8 @@ - + + diff --git a/src/FSharp.Control.TaskSeq.Test/TaskSeq.Extensions.Tests.fs b/src/FSharp.Control.TaskSeq.Test/TaskSeq.AsyncExtensions.Tests.fs similarity index 100% rename from src/FSharp.Control.TaskSeq.Test/TaskSeq.Extensions.Tests.fs rename to src/FSharp.Control.TaskSeq.Test/TaskSeq.AsyncExtensions.Tests.fs diff --git a/src/FSharp.Control.TaskSeq.Test/TaskSeq.TaskExtensions.Tests.fs b/src/FSharp.Control.TaskSeq.Test/TaskSeq.TaskExtensions.Tests.fs new file mode 100644 index 00000000..a3934086 --- /dev/null +++ b/src/FSharp.Control.TaskSeq.Test/TaskSeq.TaskExtensions.Tests.fs @@ -0,0 +1,39 @@ +module TaskSeq.Tests.TaskExtensions + +open System +open Xunit +open FsUnit.Xunit + +open FSharp.Control + +// +// Task extensions +// Async extensions +// + + +module TaskCE = + [)>] + let ``Task-for CE with taskSeq`` variant = task { + let values = Gen.getSeqImmutable variant + + let mutable sum = 0 + + for x in values do + sum <- sum + x + + sum |> should equal 55 + } + +module AsyncCE = + [)>] + let ``Async-for CE with taskSeq`` variant = async { + let values = Gen.getSeqImmutable variant + + let mutable sum = 0 + + for x in values do + sum <- sum + x + + sum |> should equal 55 + } From e6cf49bb2b1736141643afd3012b09e9a87a9767 Mon Sep 17 00:00:00 2001 From: Abel Braaksma Date: Sat, 26 Nov 2022 18:07:56 +0100 Subject: [PATCH 09/14] Add more tests for corner cases of async-for and task-for --- .../TaskSeq.AsyncExtensions.Tests.fs | 90 +++++++++++++++++-- .../TaskSeq.TaskExtensions.Tests.fs | 85 +++++++++++++++++- 2 files changed, 165 insertions(+), 10 deletions(-) diff --git a/src/FSharp.Control.TaskSeq.Test/TaskSeq.AsyncExtensions.Tests.fs b/src/FSharp.Control.TaskSeq.Test/TaskSeq.AsyncExtensions.Tests.fs index 682fb18e..608085de 100644 --- a/src/FSharp.Control.TaskSeq.Test/TaskSeq.AsyncExtensions.Tests.fs +++ b/src/FSharp.Control.TaskSeq.Test/TaskSeq.AsyncExtensions.Tests.fs @@ -1,4 +1,4 @@ -module TaskSeq.Tests.Extensions +module TaskSeq.Tests.AsyncExtensions open System open Xunit @@ -7,14 +7,37 @@ open FsUnit.Xunit open FSharp.Control // -// Task extensions // Async extensions // +module EmptySeq = + [)>] + let ``Async-for CE with empty taskSeq`` variant = async { + let values = Gen.getEmptyVariant variant -module TaskCE = + let mutable sum = 42 + + for x in values do + sum <- sum + x + + sum |> should equal 42 + } + + [] + let ``Async-for CE must execute side effect in empty taskseq`` () = async { + let mutable data = 0 + let values = taskSeq { do data <- 42 } + + for x in values do + () + + data |> should equal 42 + } + + +module Immutable = [)>] - let ``Task-for CE with taskSeq`` variant = task { + let ``Async-for CE with taskSeq`` variant = async { let values = Gen.getSeqImmutable variant let mutable sum = 0 @@ -25,15 +48,70 @@ module TaskCE = sum |> should equal 55 } -module AsyncCE = [)>] - let ``Async-for CE with taskSeq`` variant = async { + let ``Async-for CE with taskSeq multiple iterations`` variant = async { let values = Gen.getSeqImmutable variant let mutable sum = 0 + for x in values do + sum <- sum + x + + // each following iteration should start at the beginning + for x in values do + sum <- sum + x + + for x in values do + sum <- sum + x + + sum |> should equal 165 + } + + [] + let ``Async-for mixing both types of for loops`` () = async { + // this test ensures overload resolution is correct + let ts = TaskSeq.singleton 20 + let sq = Seq.singleton 20 + let mutable sum = 2 + + for x in ts do + sum <- sum + x + + for x in sq do + sum <- sum + x + + sum |> should equal 42 + } + +module SideEffects = + [)>] + let ``Async-for CE with taskSeq`` variant = async { + let values = Gen.getSeqWithSideEffect variant + + let mutable sum = 0 + for x in values do sum <- sum + x sum |> should equal 55 } + + [)>] + let ``Async-for CE with taskSeq multiple iterations`` variant = async { + let values = Gen.getSeqWithSideEffect variant + + let mutable sum = 0 + + for x in values do + sum <- sum + x + + // each following iteration should start at the beginning + // with the "side effect" tests, the mutable state updates + for x in values do + sum <- sum + x // starts at 11 + + for x in values do + sum <- sum + x // starts at 21 + + sum |> should equal 465 // eq to: List.sum [1..30] + } diff --git a/src/FSharp.Control.TaskSeq.Test/TaskSeq.TaskExtensions.Tests.fs b/src/FSharp.Control.TaskSeq.Test/TaskSeq.TaskExtensions.Tests.fs index a3934086..9441f1fb 100644 --- a/src/FSharp.Control.TaskSeq.Test/TaskSeq.TaskExtensions.Tests.fs +++ b/src/FSharp.Control.TaskSeq.Test/TaskSeq.TaskExtensions.Tests.fs @@ -8,11 +8,33 @@ open FSharp.Control // // Task extensions -// Async extensions // +module EmptySeq = + [)>] + let ``Task-for CE with empty taskSeq`` variant = task { + let values = Gen.getEmptyVariant variant -module TaskCE = + let mutable sum = 42 + + for x in values do + sum <- sum + x + + sum |> should equal 42 + } + + [] + let ``Task-for CE must execute side effect in empty taskseq`` () = task { + let mutable data = 0 + let values = taskSeq { do data <- 42 } + + for x in values do + () + + data |> should equal 42 + } + +module Immutable = [)>] let ``Task-for CE with taskSeq`` variant = task { let values = Gen.getSeqImmutable variant @@ -25,15 +47,70 @@ module TaskCE = sum |> should equal 55 } -module AsyncCE = [)>] - let ``Async-for CE with taskSeq`` variant = async { + let ``Task-for CE with taskSeq multiple iterations`` variant = task { let values = Gen.getSeqImmutable variant let mutable sum = 0 + for x in values do + sum <- sum + x + + // each following iteration should start at the beginning + for x in values do + sum <- sum + x + + for x in values do + sum <- sum + x + + sum |> should equal 165 + } + + [] + let ``Task-for mixing both types of for loops`` () = async { + // this test ensures overload resolution is correct + let ts = TaskSeq.singleton 20 + let sq = Seq.singleton 20 + let mutable sum = 2 + + for x in ts do + sum <- sum + x + + for x in sq do + sum <- sum + x + + sum |> should equal 42 + } + +module SideEffects = + [)>] + let ``Task-for CE with taskSeq`` variant = task { + let values = Gen.getSeqWithSideEffect variant + + let mutable sum = 0 + for x in values do sum <- sum + x sum |> should equal 55 } + + [)>] + let ``Task-for CE with taskSeq multiple iterations`` variant = task { + let values = Gen.getSeqWithSideEffect variant + + let mutable sum = 0 + + for x in values do + sum <- sum + x + + // each following iteration should start at the beginning + // with the "side effect" tests, the mutable state updates + for x in values do + sum <- sum + x // starts at 11 + + for x in values do + sum <- sum + x // starts at 21 + + sum |> should equal 465 // eq to: List.sum [1..30] + } From 5ad78fdb9a0740289aad9cb8b596bd66e1fe52cb Mon Sep 17 00:00:00 2001 From: Abel Braaksma Date: Sat, 26 Nov 2022 18:28:22 +0100 Subject: [PATCH 10/14] Add tests to ensure DisposeAsync is called in task-for and async-for --- .../TaskSeq.AsyncExtensions.Tests.fs | 27 +++++++++++++ .../TaskSeq.TaskExtensions.Tests.fs | 27 +++++++++++++ src/FSharp.Control.TaskSeq.Test/TestUtils.fs | 38 +++++++++++++++++++ 3 files changed, 92 insertions(+) diff --git a/src/FSharp.Control.TaskSeq.Test/TaskSeq.AsyncExtensions.Tests.fs b/src/FSharp.Control.TaskSeq.Test/TaskSeq.AsyncExtensions.Tests.fs index 608085de..1f0b9efd 100644 --- a/src/FSharp.Control.TaskSeq.Test/TaskSeq.AsyncExtensions.Tests.fs +++ b/src/FSharp.Control.TaskSeq.Test/TaskSeq.AsyncExtensions.Tests.fs @@ -115,3 +115,30 @@ module SideEffects = sum |> should equal 465 // eq to: List.sum [1..30] } + +module Other = + [] + let ``Async-for CE must call dispose in empty taskSeq`` () = async { + let disposed = ref 0 + let values = Gen.getEmptyDisposableTaskSeq disposed + + for x in values do + () + + // the DisposeAsync should be called by now + disposed.Value |> should equal 1 + } + + [] + let ``Async-for CE must call dispose on singleton`` () = async { + let disposed = ref 0 + let mutable sum = 0 + let values = Gen.getSingletonDisposableTaskSeq disposed + + for x in values do + sum <- x + + // the DisposeAsync should be called by now + disposed.Value |> should equal 1 + sum |> should equal 42 + } diff --git a/src/FSharp.Control.TaskSeq.Test/TaskSeq.TaskExtensions.Tests.fs b/src/FSharp.Control.TaskSeq.Test/TaskSeq.TaskExtensions.Tests.fs index 9441f1fb..5d5316a4 100644 --- a/src/FSharp.Control.TaskSeq.Test/TaskSeq.TaskExtensions.Tests.fs +++ b/src/FSharp.Control.TaskSeq.Test/TaskSeq.TaskExtensions.Tests.fs @@ -114,3 +114,30 @@ module SideEffects = sum |> should equal 465 // eq to: List.sum [1..30] } + +module Other = + [] + let ``Task-for CE must call dispose in empty taskSeq`` () = async { + let disposed = ref 0 + let values = Gen.getEmptyDisposableTaskSeq disposed + + for x in values do + () + + // the DisposeAsync should be called by now + disposed.Value |> should equal 1 + } + + [] + let ``Task-for CE must call dispose on singleton`` () = async { + let disposed = ref 0 + let mutable sum = 0 + let values = Gen.getSingletonDisposableTaskSeq disposed + + for x in values do + sum <- x + + // the DisposeAsync should be called by now + disposed.Value |> should equal 1 + sum |> should equal 42 + } diff --git a/src/FSharp.Control.TaskSeq.Test/TestUtils.fs b/src/FSharp.Control.TaskSeq.Test/TestUtils.fs index dd4873b3..28b36031 100644 --- a/src/FSharp.Control.TaskSeq.Test/TestUtils.fs +++ b/src/FSharp.Control.TaskSeq.Test/TestUtils.fs @@ -521,6 +521,44 @@ module TestUtils = } | x -> failwithf "Invalid test variant: %A" x + /// An empty taskSeq that can be used with tests for checking if the dispose method gets called. + /// Will add 1 to the passed integer upon disposing. + let getEmptyDisposableTaskSeq (disposed: int ref) = + { new IAsyncEnumerable<'T> with + member _.GetAsyncEnumerator(_) = + { new IAsyncEnumerator<'T> with + member _.MoveNextAsync() = ValueTask.False + member _.Current = Unchecked.defaultof<'T> + member _.DisposeAsync() = ValueTask(task { do disposed.Value <- disposed.Value + 1 }) + } + } + + /// A singleton taskSeq that can be used with tests for checking if the dispose method gets called + /// The singleton value is '42'. Will add 1 to the passed integer upon disposing. + let getSingletonDisposableTaskSeq (disposed: int ref) = + { new IAsyncEnumerable with + member _.GetAsyncEnumerator(_) = + let mutable status = BeforeAll + + { new IAsyncEnumerator with + member _.MoveNextAsync() = + match status with + | BeforeAll -> + status <- WithCurrent + ValueTask.True + | WithCurrent -> + status <- AfterAll + ValueTask.False + | AfterAll -> ValueTask.False + + member _.Current: int = + match status with + | WithCurrent -> 42 + | _ -> Unchecked.defaultof + + member _.DisposeAsync() = ValueTask(task { do disposed.Value <- disposed.Value + 1 }) + } + } // // following types can be used with Theory & TestData // From 13adeae5cd53a551e66db0238279aed2554eeceb Mon Sep 17 00:00:00 2001 From: Abel Braaksma Date: Sat, 26 Nov 2022 18:45:56 +0100 Subject: [PATCH 11/14] Renames, cleanup and overall fixup of task-for and async-for CE code --- src/FSharp.Control.TaskSeq/AsyncExtensions.fs | 28 ++---------- .../AsyncExtensions.fsi | 6 +-- src/FSharp.Control.TaskSeq/TaskExtensions.fs | 44 +++---------------- src/FSharp.Control.TaskSeq/TaskExtensions.fsi | 22 +--------- 4 files changed, 15 insertions(+), 85 deletions(-) diff --git a/src/FSharp.Control.TaskSeq/AsyncExtensions.fs b/src/FSharp.Control.TaskSeq/AsyncExtensions.fs index b7e86654..51c46b4e 100644 --- a/src/FSharp.Control.TaskSeq/AsyncExtensions.fs +++ b/src/FSharp.Control.TaskSeq/AsyncExtensions.fs @@ -1,13 +1,6 @@ namespace FSharp.Control -open System.Collections.Generic -open System.Threading -open System.Threading.Tasks - -#nowarn "57" -#nowarn "1204" -#nowarn "3513" - +open FSharp.Control.TaskSeqBuilders [] module AsyncExtensions = @@ -15,22 +8,7 @@ module AsyncExtensions = // Add asynchronous for loop to the 'async' computation builder type Microsoft.FSharp.Control.AsyncBuilder with - member x.For(tasksq: IAsyncEnumerable<'T>, action: 'T -> Async) = - tasksq + member _.For(source: taskSeq<'T>, action: 'T -> Async) = + source |> TaskSeq.iterAsync (action >> Async.StartAsTask) |> Async.AwaitTask - - - // temp example - let foo () = async { - let mutable sum = 0 - - let xs = taskSeq { - 1 - 2 - 3 - } - - for x in xs do - sum <- sum + x - } diff --git a/src/FSharp.Control.TaskSeq/AsyncExtensions.fsi b/src/FSharp.Control.TaskSeq/AsyncExtensions.fsi index 7e5a7228..d4432756 100644 --- a/src/FSharp.Control.TaskSeq/AsyncExtensions.fsi +++ b/src/FSharp.Control.TaskSeq/AsyncExtensions.fsi @@ -1,10 +1,10 @@ namespace FSharp.Control -#nowarn "1204" - [] module AsyncExtensions = + open FSharp.Control.TaskSeqBuilders type AsyncBuilder with - member For: tasksq: System.Collections.Generic.IAsyncEnumerable<'T> * action: ('T -> Async) -> Async + /// Iterate over all values of a taskSeq. + member For: source: taskSeq<'T> * action: ('T -> Async) -> Async diff --git a/src/FSharp.Control.TaskSeq/TaskExtensions.fs b/src/FSharp.Control.TaskSeq/TaskExtensions.fs index 0a2ace5d..adb9b109 100644 --- a/src/FSharp.Control.TaskSeq/TaskExtensions.fs +++ b/src/FSharp.Control.TaskSeq/TaskExtensions.fs @@ -4,16 +4,17 @@ open System.Collections.Generic open System.Threading open System.Threading.Tasks +open Microsoft.FSharp.Core.CompilerServices +open Microsoft.FSharp.Core.LanguagePrimitives.IntrinsicOperators + +open FSharp.Control.TaskSeqBuilders + #nowarn "57" #nowarn "1204" #nowarn "3513" - [] module TaskExtensions = - open Microsoft.FSharp.Core.CompilerServices - open Microsoft.FSharp.Core.CompilerServices.StateMachineHelpers - open Microsoft.FSharp.Core.LanguagePrimitives.IntrinsicOperators let rec WhileDynamic ( @@ -104,42 +105,11 @@ module TaskExtensions = false) ) - member inline this.For(tasksq: IAsyncEnumerable<'T>, body: 'T -> TaskCode<_, unit>) : TaskCode<_, unit> = - // tasksq - // |> TaskSeq.iterAsync (body >> task.Run) - // |> task.ReturnFrom - - // task.ReturnFrom <| - // task { - // let mutable continueWhile = true - // use e = tasksq.GetAsyncEnumerator() - // while continueWhile do - // let! next = e.MoveNextAsync() - // if next then - // do! task.Run(body e.Current) - // else - // continueWhile <- false - // } - + member inline this.For(source: taskSeq<'T>, body: 'T -> TaskCode<_, unit>) : TaskCode<_, unit> = TaskCode<'TOverall, unit>(fun sm -> - this .Using( - tasksq.GetAsyncEnumerator(CancellationToken()), + source.GetAsyncEnumerator(CancellationToken()), (fun e -> this.WhileAsync(e.MoveNextAsync, (fun sm -> (body e.Current).Invoke(&sm)))) ) .Invoke(&sm)) - - // temp example - let foo () = task { - let mutable sum = 0 - - let xs = taskSeq { - 1 - 2 - 3 - } - - for x in xs do - sum <- sum + x - } diff --git a/src/FSharp.Control.TaskSeq/TaskExtensions.fsi b/src/FSharp.Control.TaskSeq/TaskExtensions.fsi index 2fbc9930..81a441a3 100644 --- a/src/FSharp.Control.TaskSeq/TaskExtensions.fsi +++ b/src/FSharp.Control.TaskSeq/TaskExtensions.fsi @@ -4,26 +4,8 @@ namespace FSharp.Control [] module TaskExtensions = - - val WhileDynamic: - sm: byref> * - condition: (unit -> System.Threading.Tasks.ValueTask) * - body: TaskCode<'Data, unit> -> - bool - - val WhileBodyDynamicAux: - sm: byref> * - condition: (unit -> System.Threading.Tasks.ValueTask) * - body: TaskCode<'Data, unit> * - rf: TaskResumptionFunc<'Data> -> - bool + open FSharp.Control.TaskSeqBuilders type TaskBuilder with - member inline WhileAsync: - condition: (unit -> System.Threading.Tasks.ValueTask) * body: TaskCode<'TOverall, unit> -> - TaskCode<'TOverall, unit> - - member inline For: - tasksq: System.Collections.Generic.IAsyncEnumerable<'T> * body: ('T -> TaskCode<'TOverall, unit>) -> - TaskCode<'TOverall, unit> + member inline For: source: taskSeq<'T> * body: ('T -> TaskCode<'TOverall, unit>) -> TaskCode<'TOverall, unit> From 4046eb8d6af19370c27713ecf796f07a0fcf150d Mon Sep 17 00:00:00 2001 From: Abel Braaksma Date: Sat, 26 Nov 2022 20:05:04 +0100 Subject: [PATCH 12/14] Remove WhileDynamic, it's already implicitly present, and add debug logging to the new code --- src/FSharp.Control.TaskSeq/TaskExtensions.fs | 51 +-------- src/FSharp.Control.TaskSeq/TaskSeqBuilder.fs | 106 ++++++------------- src/FSharp.Control.TaskSeq/Utils.fs | 52 +++++++++ 3 files changed, 86 insertions(+), 123 deletions(-) diff --git a/src/FSharp.Control.TaskSeq/TaskExtensions.fs b/src/FSharp.Control.TaskSeq/TaskExtensions.fs index adb9b109..a5fac864 100644 --- a/src/FSharp.Control.TaskSeq/TaskExtensions.fs +++ b/src/FSharp.Control.TaskSeq/TaskExtensions.fs @@ -16,51 +16,6 @@ open FSharp.Control.TaskSeqBuilders [] module TaskExtensions = - let rec WhileDynamic - ( - sm: byref>, - condition: unit -> ValueTask, - body: TaskCode<'Data, unit> - ) : bool = - let vt = condition () - - TaskBuilderBase.BindDynamic( - &sm, - vt, - fun result -> - TaskCode<_, _>(fun sm -> - if result then - if body.Invoke(&sm) then - WhileDynamic(&sm, condition, body) - else - let rf = sm.ResumptionDynamicInfo.ResumptionFunc - - sm.ResumptionDynamicInfo.ResumptionFunc <- - (TaskResumptionFunc<'Data>(fun sm -> WhileBodyDynamicAux(&sm, condition, body, rf))) - - false - else - true) - ) - - - and WhileBodyDynamicAux - ( - sm: byref>, - condition: unit -> ValueTask, - body: TaskCode<'Data, unit>, - rf: TaskResumptionFunc<_> - ) : bool = - if rf.Invoke(&sm) then - WhileDynamic(&sm, condition, body) - else - let rf = sm.ResumptionDynamicInfo.ResumptionFunc - - sm.ResumptionDynamicInfo.ResumptionFunc <- - (TaskResumptionFunc<'Data>(fun sm -> WhileBodyDynamicAux(&sm, condition, body, rf))) - - false - // Add asynchronous for loop to the 'task' computation builder type Microsoft.FSharp.Control.TaskBuilder with @@ -73,6 +28,8 @@ module TaskExtensions = ) : TaskCode<_, _> = let mutable condition_res = true + // note that this While itself has both a dynamic and static implementation + // so we don't need to add that here (TODO: how to verify?). ResumableCode.While( (fun () -> condition_res), TaskCode<_, _>(fun sm -> @@ -82,12 +39,12 @@ module TaskExtensions = let mutable awaiter = __stack_vtask.GetAwaiter() if awaiter.IsCompleted then - // logInfo "at WhileAsync: returning completed task" + Debug.logInfo "at Task.WhileAsync: returning completed task" __stack_condition_fin <- true condition_res <- awaiter.GetResult() else - // logInfo "at WhileAsync: awaiting non-completed task" + Debug.logInfo "at Task.WhileAsync: awaiting non-completed task" // This will yield with __stack_fin = false // This will resume with __stack_fin = true diff --git a/src/FSharp.Control.TaskSeq/TaskSeqBuilder.fs b/src/FSharp.Control.TaskSeq/TaskSeqBuilder.fs index a1709bb4..195ed87b 100644 --- a/src/FSharp.Control.TaskSeq/TaskSeqBuilder.fs +++ b/src/FSharp.Control.TaskSeq/TaskSeqBuilder.fs @@ -36,50 +36,6 @@ module Internal = // cannot be marked with 'internal' scope with _ -> false - type Debug = - - [] - static val mutable private verbose: bool option - - /// Setting from environment variable TASKSEQ_LOG_VERBOSE, which, - /// when set, enables (very) verbose printing of flow and state - static member private getVerboseSetting() = - match Debug.verbose with - | None -> - let verboseEnv = - try - match Environment.GetEnvironmentVariable "TASKSEQ_LOG_VERBOSE" with - | null -> false - | x -> - match x.ToLowerInvariant().Trim() with - | "1" - | "true" - | "on" - | "yes" -> true - | _ -> false - - with _ -> - false - - Debug.verbose <- Some verboseEnv - verboseEnv - - | Some setting -> setting - - [] - static member private print value = - match Debug.getVerboseSetting () with - | false -> () - | true -> - // don't use ksprintf here, because the compiler does not remove all allocations due to - // the way PrintfFormat types are compiled, even if we set the Conditional attribute. - printfn "%i (%b): %s" Thread.CurrentThread.ManagedThreadId Thread.CurrentThread.IsThreadPoolThread value - - [] - static member logInfo(str) = Debug.print str - - [] - static member logInfo(str, data) = Debug.print $"%s{str}{data}" /// Call MoveNext on an IAsyncStateMachine by reference let inline moveNextRef (x: byref<'T> when 'T :> IAsyncStateMachine) = x.MoveNext() @@ -89,8 +45,6 @@ module Internal = // cannot be marked with 'internal' scope NotImplementedException "Abstract Class: method or property not implemented" |> raise -open type Debug - type taskSeq<'T> = IAsyncEnumerable<'T> type IPriority1 = @@ -244,7 +198,7 @@ and [] TaskSeq<'Machine, 'T this // just return 'self' here | _ -> - logInfo "GetAsyncEnumerator, start cloning..." + Debug.logInfo "GetAsyncEnumerator, start cloning..." // We need to reset state, but only to the "initial machine", resetting the _machine to // Unchecked.defaultof<_> is wrong, as the compiler uses this to track state. However, @@ -260,7 +214,7 @@ and [] TaskSeq<'Machine, 'T clone._machine <- this._initialMachine clone._initialMachine <- this._initialMachine // TODO: proof with a test that this is necessary: probably not clone.InitMachineData(ct, &clone._machine) - logInfo "GetAsyncEnumerator, finished cloning..." + Debug.logInfo "GetAsyncEnumerator, finished cloning..." clone interface System.Collections.Generic.IAsyncEnumerator<'T> with @@ -275,39 +229,39 @@ and [] TaskSeq<'Machine, 'T Unchecked.defaultof<'T> member this.MoveNextAsync() = - logInfo "MoveNextAsync..." + Debug.logInfo "MoveNextAsync..." if this._machine.ResumptionPoint = -1 then // can't use as IAsyncEnumerator before IAsyncEnumerable - logInfo "at MoveNextAsync: Resumption point = -1" + Debug.logInfo "at MoveNextAsync: Resumption point = -1" ValueTask.False elif this._machine.Data.completed then - logInfo "at MoveNextAsync: completed = true" + Debug.logInfo "at MoveNextAsync: completed = true" // return False when beyond the last item this._machine.Data.promiseOfValueOrEnd.Reset() ValueTask.False else - logInfo "at MoveNextAsync: normal resumption scenario" + Debug.logInfo "at MoveNextAsync: normal resumption scenario" let data = this._machine.Data data.promiseOfValueOrEnd.Reset() let mutable ts = this - logInfo "at MoveNextAsync: start calling builder.MoveNext()" + Debug.logInfo "at MoveNextAsync: start calling builder.MoveNext()" data.builder.MoveNext(&ts) - logInfo "at MoveNextAsync: finished calling builder.MoveNext()" + Debug.logInfo "at MoveNextAsync: finished calling builder.MoveNext()" this.MoveNextAsyncResult() /// Disposes of the IAsyncEnumerator (*not* the IAsyncEnumerable!!!) member this.DisposeAsync() = task { - logInfo "DisposeAsync..." + Debug.logInfo "DisposeAsync..." match this._machine.Data.disposalStack with | null -> () @@ -335,7 +289,7 @@ and [] TaskSeq<'Machine, 'T match status with | ValueTaskSourceStatus.Succeeded -> - logInfo "at MoveNextAsyncResult: case succeeded..." + Debug.logInfo "at MoveNextAsyncResult: case succeeded..." let result = data.promiseOfValueOrEnd.GetResult(version) @@ -349,11 +303,11 @@ and [] TaskSeq<'Machine, 'T | ValueTaskSourceStatus.Faulted | ValueTaskSourceStatus.Canceled | ValueTaskSourceStatus.Pending as state -> - logInfo ("at MoveNextAsyncResult: case ", state) + Debug.logInfo ("at MoveNextAsyncResult: case ", state) ValueTask.ofIValueTaskSource this version | _ -> - logInfo "at MoveNextAsyncResult: Unexpected state" + Debug.logInfo "at MoveNextAsyncResult: Unexpected state" // assume it's a possibly new, not yet supported case, treat as default ValueTask.ofIValueTaskSource this version @@ -376,12 +330,12 @@ type TaskSeqBuilder() = __resumeAt sm.ResumptionPoint try - logInfo "at Run.MoveNext start" + Debug.logInfo "at Run.MoveNext start" let __stack_code_fin = code.Invoke(&sm) if __stack_code_fin then - logInfo $"at Run.MoveNext, done" + Debug.logInfo $"at Run.MoveNext, done" // Signal we're at the end // NOTE: if we don't do it here, as well as in IValueTaskSource.GetResult @@ -392,14 +346,14 @@ type TaskSeqBuilder() = sm.Data.completed <- true elif sm.Data.current.IsSome then - logInfo $"at Run.MoveNext, still more items in enumerator" + Debug.logInfo $"at Run.MoveNext, still more items in enumerator" // Signal there's more data: sm.Data.promiseOfValueOrEnd.SetResult(true) else // Goto request - logInfo $"at Run.MoveNext, await, MoveNextAsync has not completed yet" + Debug.logInfo $"at Run.MoveNext, await, MoveNextAsync has not completed yet" // don't capture the full object in the next closure (won't work because: byref) // but only a reference to itself. @@ -412,7 +366,7 @@ type TaskSeqBuilder() = ) with exn -> - logInfo ("Setting exception of PromiseOfValueOrEnd to: ", exn.Message) + Debug.logInfo ("Setting exception of PromiseOfValueOrEnd to: ", exn.Message) sm.Data.promiseOfValueOrEnd.SetException(exn) sm.Data.builder.Complete() @@ -420,7 +374,7 @@ type TaskSeqBuilder() = )) (SetStateMachineMethodImpl<_>(fun sm state -> ())) // not used in reference impl (AfterCode<_, _>(fun sm -> - logInfo "at AfterCode<_, _>, after F# inits the sm, and we can attach extra info" + Debug.logInfo "at AfterCode<_, _>, after F# inits the sm, and we can attach extra info" let ts = TaskSeq, 'T>() ts._initialMachine <- sm @@ -440,11 +394,11 @@ type TaskSeqBuilder() = member inline _.Zero() : TaskSeqCode<'T> = - logInfo "at Zero()" + Debug.logInfo "at Zero()" ResumableCode.Zero() member inline _.Combine(task1: TaskSeqCode<'T>, task2: TaskSeqCode<'T>) : TaskSeqCode<'T> = - logInfo "at Combine(.., ..)" + Debug.logInfo "at Combine(.., ..)" ResumableCode.Combine(task1, task2) @@ -463,12 +417,12 @@ type TaskSeqBuilder() = let __stack_vtask = condition () if __stack_vtask.IsCompleted then - logInfo "at WhileAsync: returning completed task" + Debug.logInfo "at WhileAsync: returning completed task" __stack_condition_fin <- true condition_res <- __stack_vtask.Result else - logInfo "at WhileAsync: awaiting non-completed task" + Debug.logInfo "at WhileAsync: awaiting non-completed task" let task = __stack_vtask.AsTask() let mutable awaiter = task.GetAwaiter() @@ -490,7 +444,7 @@ type TaskSeqBuilder() = ) member inline b.While([] condition: unit -> bool, body: TaskSeqCode<'T>) : TaskSeqCode<'T> = - logInfo "at While(...)" + Debug.logInfo "at While(...)" ResumableCode.While(condition, body) member inline _.TryWith(body: TaskSeqCode<'T>, catch: exn -> TaskSeqCode<'T>) : TaskSeqCode<'T> = @@ -595,7 +549,7 @@ type TaskSeqBuilder() = TaskSeqCode<'T>(fun sm -> // This will yield with __stack_fin = false // This will resume with __stack_fin = true - logInfo "at Yield" + Debug.logInfo "at Yield" let __stack_fin = ResumableCode.Yield().Invoke(&sm) sm.Data.current <- ValueSome v @@ -612,7 +566,7 @@ type TaskSeqBuilder() = let mutable awaiter = task.GetAwaiter() let mutable __stack_fin = true - logInfo "at Bind" + Debug.logInfo "at Bind" if not awaiter.IsCompleted then // This will yield with __stack_fin2 = false @@ -620,15 +574,15 @@ type TaskSeqBuilder() = let __stack_fin2 = ResumableCode.Yield().Invoke(&sm) __stack_fin <- __stack_fin2 - logInfo ("at Bind: with __stack_fin = ", __stack_fin) - logInfo ("at Bind: this.completed = ", sm.Data.completed) + Debug.logInfo ("at Bind: with __stack_fin = ", __stack_fin) + Debug.logInfo ("at Bind: this.completed = ", sm.Data.completed) if __stack_fin then let result = awaiter.GetResult() (continuation result).Invoke(&sm) else - logInfo "at Bind: calling AwaitUnsafeOnCompleted" + Debug.logInfo "at Bind: calling AwaitUnsafeOnCompleted" sm.Data.awaiter <- awaiter sm.Data.current <- ValueNone @@ -639,7 +593,7 @@ type TaskSeqBuilder() = let mutable awaiter = task.GetAwaiter() let mutable __stack_fin = true - logInfo "at BindV" + Debug.logInfo "at BindV" if not awaiter.IsCompleted then // This will yield with __stack_fin2 = false @@ -651,7 +605,7 @@ type TaskSeqBuilder() = let result = awaiter.GetResult() (continuation result).Invoke(&sm) else - logInfo "at BindV: calling AwaitUnsafeOnCompleted" + Debug.logInfo "at BindV: calling AwaitUnsafeOnCompleted" sm.Data.awaiter <- awaiter sm.Data.current <- ValueNone diff --git a/src/FSharp.Control.TaskSeq/Utils.fs b/src/FSharp.Control.TaskSeq/Utils.fs index 09d0e504..15fce9b2 100644 --- a/src/FSharp.Control.TaskSeq/Utils.fs +++ b/src/FSharp.Control.TaskSeq/Utils.fs @@ -1,6 +1,9 @@ namespace FSharp.Control open System.Threading.Tasks +open System +open System.Diagnostics +open System.Threading [] module ValueTaskExtensions = @@ -103,3 +106,52 @@ module Async = /// Bind an Async<'T> let inline bind binder (task: Async<'T>) : Async<'U> = ExtraTopLevelOperators.async { return! binder task } + +type Debug = + + [] + static val mutable private verbose: bool option + + /// Setting from environment variable TASKSEQ_LOG_VERBOSE, which, + /// when set, enables (very) verbose printing of flow and state + static member private getVerboseSetting() = + match Debug.verbose with + | None -> + let verboseEnv = + try + match Environment.GetEnvironmentVariable "TASKSEQ_LOG_VERBOSE" with + | null -> false + | x -> + match x.ToLowerInvariant().Trim() with + | "1" + | "true" + | "on" + | "yes" -> true + | _ -> false + + with _ -> + false + + Debug.verbose <- Some verboseEnv + verboseEnv + + | Some setting -> setting + + /// Private helper to log to stdout in DEBUG builds only + [] + static member private print value = + match Debug.getVerboseSetting () with + | false -> () + | true -> + // don't use ksprintf here, because the compiler does not remove all allocations due to + // the way PrintfFormat types are compiled, even if we set the Conditional attribute. + let ct = Thread.CurrentThread + printfn "%i (%b): %s" ct.ManagedThreadId ct.IsThreadPoolThread value + + /// Log to stdout in DEBUG builds only + [] + static member logInfo(str) = Debug.print str + + /// Log to stdout in DEBUG builds only + [] + static member logInfo(str, data) = Debug.print $"%s{str}{data}" From 5053ba27729b2e78f5d9fd18c53d53a559bf6f5f Mon Sep 17 00:00:00 2001 From: Abel Braaksma Date: Sat, 26 Nov 2022 20:13:12 +0100 Subject: [PATCH 13/14] Add note on future usage of `__debugPoint` --- src/FSharp.Control.TaskSeq/TaskExtensions.fs | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/src/FSharp.Control.TaskSeq/TaskExtensions.fs b/src/FSharp.Control.TaskSeq/TaskExtensions.fs index a5fac864..aa40599f 100644 --- a/src/FSharp.Control.TaskSeq/TaskExtensions.fs +++ b/src/FSharp.Control.TaskSeq/TaskExtensions.fs @@ -67,6 +67,16 @@ module TaskExtensions = this .Using( source.GetAsyncEnumerator(CancellationToken()), - (fun e -> this.WhileAsync(e.MoveNextAsync, (fun sm -> (body e.Current).Invoke(&sm)))) + (fun e -> + this.WhileAsync( + // __debugPoint is only available from FSharp.Core 6.0.4 + //(fun () -> + // Microsoft.FSharp.Core.CompilerServices.StateMachineHelpers.__debugPoint + // "ForLoop.InOrToKeyword" + + // e.MoveNextAsync()), + e.MoveNextAsync, + (fun sm -> (body e.Current).Invoke(&sm)) + )) ) .Invoke(&sm)) From 82f149bbf548971b9e0b81ce05cf97eacaad703c Mon Sep 17 00:00:00 2001 From: Abel Braaksma Date: Sat, 26 Nov 2022 20:30:37 +0100 Subject: [PATCH 14/14] Add examples to readme.md and add doc comments to the fsi files and update release notes --- README.md | 22 ++++++++++++++----- assets/nuget-package-readme.md | 22 ++++++++++++++----- .../AsyncExtensions.fsi | 4 +++- .../FSharp.Control.TaskSeq.fsproj | 9 ++++---- src/FSharp.Control.TaskSeq/TaskExtensions.fsi | 3 +++ 5 files changed, 43 insertions(+), 17 deletions(-) diff --git a/README.md b/README.md index 6a68a725..115958b9 100644 --- a/README.md +++ b/README.md @@ -29,8 +29,8 @@ Latest version [can be installed from Nuget][nuget]. - [Progress `taskSeq` CE](#progress-taskseq-ce) - [Progress and implemented `TaskSeq` module functions](#progress-and-implemented-taskseq-module-functions) - [More information](#more-information) - - [Futher reading `IAsyncEnumerable`](#futher-reading-iasyncenumerable) - - [Futher reading on resumable state machines](#futher-reading-on-resumable-state-machines) + - [Further reading `IAsyncEnumerable`](#further-reading-iasyncenumerable) + - [Further reading on resumable state machines](#further-reading-on-resumable-state-machines) - [Further reading on computation expressions](#further-reading-on-computation-expressions) - [Building & testing](#building--testing) - [Prerequisites](#prerequisites) @@ -109,15 +109,25 @@ As package reference in `fsproj` or `csproj` file: ```f# open System.IO - open FSharp.Control // singleton is fine -let hello = taskSeq { yield "Hello, World!" } +let helloTs = taskSeq { yield "Hello, World!" } + +// cold-started, that is, delay-executed +let f() = task { + // using toList forces execution of whole sequence + let! hello = TaskSeq.toList helloTs // toList returns a Task<'T list> + return List.head hello +} // can be mixed with normal sequences let oneToTen = taskSeq { yield! [1..10] } +// can be used with F#'s task and async in a for-loop +let f() = task { for x in oneToTen do printfn "Number %i" x } +let g() = async { for x in oneToTen do printfn "Number %i" x } + // returns a delayed sequence of IAsyncEnumerable let allFilesAsLines() = taskSeq { let files = Directory.EnumerateFiles(@"c:\temp") @@ -313,14 +323,14 @@ The following is the progress report: ## More information -### Futher reading `IAsyncEnumerable` +### Further reading `IAsyncEnumerable` - A good C#-based introduction [can be found in this blog][8]. - [An MSDN article][9] written shortly after it was introduced. - Converting a `seq` to an `IAsyncEnumerable` [demo gist][10] as an example, though `TaskSeq` contains many more utility functions and uses a slightly different approach. - If you're looking for using `IAsyncEnumerable` with `async` and not `task`, the excellent [`AsyncSeq`][11] library should be used. While `TaskSeq` is intended to consume `async` just like `task` does, it won't create an `AsyncSeq` type (at least not yet). If you want classic Async and parallelism, you should get this library instead. -### Futher reading on resumable state machines +### Further reading on resumable state machines - A state machine from a monadic perspective in F# [can be found here][12], which works with the pre-F# 6.0 non-resumable internals. - The [original RFC for F# 6.0 on resumable state machines][13] diff --git a/assets/nuget-package-readme.md b/assets/nuget-package-readme.md index 19cf16ab..7e106635 100644 --- a/assets/nuget-package-readme.md +++ b/assets/nuget-package-readme.md @@ -19,8 +19,8 @@ An implementation of [`IAsyncEnumerable<'T>`][3] as a computation expression: `t - [Examples](#examples) - [`TaskSeq` module functions](#taskseq-module-functions) - [More information](#more-information) - - [Futher reading `IAsyncEnumerable`](#futher-reading-iasyncenumerable) - - [Futher reading on resumable state machines](#futher-reading-on-resumable-state-machines) + - [Further reading `IAsyncEnumerable`](#further-reading-iasyncenumerable) + - [Further reading on resumable state machines](#further-reading-on-resumable-state-machines) - [Further reading on computation expressions](#further-reading-on-computation-expressions) ----------------------------------------- @@ -47,15 +47,25 @@ for `use` and `use!`, `try-with` and `try-finally` and `while` loops within the ```f# open System.IO - open FSharp.Control // singleton is fine -let hello = taskSeq { yield "Hello, World!" } +let helloTs = taskSeq { yield "Hello, World!" } + +// cold-started, that is, delay-executed +let f() = task { + // using toList forces execution of whole sequence + let! hello = TaskSeq.toList helloTs // toList returns a Task<'T list> + return List.head hello +} // can be mixed with normal sequences let oneToTen = taskSeq { yield! [1..10] } +// can be used with F#'s task and async in a for-loop +let f() = task { for x in oneToTen do printfn "Number %i" x } +let g() = async { for x in oneToTen do printfn "Number %i" x } + // returns a delayed sequence of IAsyncEnumerable let allFilesAsLines() = taskSeq { let files = Directory.EnumerateFiles(@"c:\temp") @@ -237,14 +247,14 @@ _The motivation for `readOnly` in `Seq` is that a cast from a mutable array or l ## More information -### Futher reading `IAsyncEnumerable` +### Further reading `IAsyncEnumerable` - A good C#-based introduction [can be found in this blog][8]. - [An MSDN article][9] written shortly after it was introduced. - Converting a `seq` to an `IAsyncEnumerable` [demo gist][10] as an example, though `TaskSeq` contains many more utility functions and uses a slightly different approach. - If you're looking for using `IAsyncEnumerable` with `async` and not `task`, the excellent [`AsyncSeq`][11] library should be used. While `TaskSeq` is intended to consume `async` just like `task` does, it won't create an `AsyncSeq` type (at least not yet). If you want classic Async and parallelism, you should get this library instead. -### Futher reading on resumable state machines +### Further reading on resumable state machines - A state machine from a monadic perspective in F# [can be found here][12], which works with the pre-F# 6.0 non-resumable internals. - The [original RFC for F# 6.0 on resumable state machines][13] diff --git a/src/FSharp.Control.TaskSeq/AsyncExtensions.fsi b/src/FSharp.Control.TaskSeq/AsyncExtensions.fsi index d4432756..7470eafd 100644 --- a/src/FSharp.Control.TaskSeq/AsyncExtensions.fsi +++ b/src/FSharp.Control.TaskSeq/AsyncExtensions.fsi @@ -6,5 +6,7 @@ module AsyncExtensions = type AsyncBuilder with - /// Iterate over all values of a taskSeq. + /// + /// Inside , iterate over all values of a . + /// member For: source: taskSeq<'T> * action: ('T -> Async) -> Async diff --git a/src/FSharp.Control.TaskSeq/FSharp.Control.TaskSeq.fsproj b/src/FSharp.Control.TaskSeq/FSharp.Control.TaskSeq.fsproj index f6d1d6c1..1575392d 100644 --- a/src/FSharp.Control.TaskSeq/FSharp.Control.TaskSeq.fsproj +++ b/src/FSharp.Control.TaskSeq/FSharp.Control.TaskSeq.fsproj @@ -23,10 +23,11 @@ Generates optimized IL code through the new resumable state machines, and comes nuget-package-readme.md Release notes: - 0.2.3 (unreleased) - - add TaskSeq.singleton, #90 (by @gusty) - - improve TaskSeq.empty by not relying on resumable state, #89 (by @gusty) - - do not throw exception for unequal lengths in TaskSeq.zip, fixes #32 + 0.3.0 (unreleased) + - adds support for 'for .. in ..' with task sequences in F# tasks and async, #75, #93 and #99 (with help from @theangrybyrd) + - adds TaskSeq.singleton, #90 (by @gusty) + - improves TaskSeq.empty by not relying on resumable state, #89 (by @gusty) + - does not throw exceptions anymore for unequal lengths in TaskSeq.zip, fixes #32 0.2.2 - removes TaskSeq.toSeqCachedAsync, which was incorrectly named. Use toSeq or toListAsync instead. - renames TaskSeq.toSeqCached to TaskSeq.toSeq, which was its actual operational behavior. diff --git a/src/FSharp.Control.TaskSeq/TaskExtensions.fsi b/src/FSharp.Control.TaskSeq/TaskExtensions.fsi index 81a441a3..b5a97e7d 100644 --- a/src/FSharp.Control.TaskSeq/TaskExtensions.fsi +++ b/src/FSharp.Control.TaskSeq/TaskExtensions.fsi @@ -8,4 +8,7 @@ module TaskExtensions = type TaskBuilder with + /// + /// Inside , iterate over all values of a . + /// member inline For: source: taskSeq<'T> * body: ('T -> TaskCode<'TOverall, unit>) -> TaskCode<'TOverall, unit>