diff --git a/README.md b/README.md index 6a68a725..115958b9 100644 --- a/README.md +++ b/README.md @@ -29,8 +29,8 @@ Latest version [can be installed from Nuget][nuget]. - [Progress `taskSeq` CE](#progress-taskseq-ce) - [Progress and implemented `TaskSeq` module functions](#progress-and-implemented-taskseq-module-functions) - [More information](#more-information) - - [Futher reading `IAsyncEnumerable`](#futher-reading-iasyncenumerable) - - [Futher reading on resumable state machines](#futher-reading-on-resumable-state-machines) + - [Further reading `IAsyncEnumerable`](#further-reading-iasyncenumerable) + - [Further reading on resumable state machines](#further-reading-on-resumable-state-machines) - [Further reading on computation expressions](#further-reading-on-computation-expressions) - [Building & testing](#building--testing) - [Prerequisites](#prerequisites) @@ -109,15 +109,25 @@ As package reference in `fsproj` or `csproj` file: ```f# open System.IO - open FSharp.Control // singleton is fine -let hello = taskSeq { yield "Hello, World!" } +let helloTs = taskSeq { yield "Hello, World!" } + +// cold-started, that is, delay-executed +let f() = task { + // using toList forces execution of whole sequence + let! hello = TaskSeq.toList helloTs // toList returns a Task<'T list> + return List.head hello +} // can be mixed with normal sequences let oneToTen = taskSeq { yield! [1..10] } +// can be used with F#'s task and async in a for-loop +let f() = task { for x in oneToTen do printfn "Number %i" x } +let g() = async { for x in oneToTen do printfn "Number %i" x } + // returns a delayed sequence of IAsyncEnumerable let allFilesAsLines() = taskSeq { let files = Directory.EnumerateFiles(@"c:\temp") @@ -313,14 +323,14 @@ The following is the progress report: ## More information -### Futher reading `IAsyncEnumerable` +### Further reading `IAsyncEnumerable` - A good C#-based introduction [can be found in this blog][8]. - [An MSDN article][9] written shortly after it was introduced. - Converting a `seq` to an `IAsyncEnumerable` [demo gist][10] as an example, though `TaskSeq` contains many more utility functions and uses a slightly different approach. - If you're looking for using `IAsyncEnumerable` with `async` and not `task`, the excellent [`AsyncSeq`][11] library should be used. While `TaskSeq` is intended to consume `async` just like `task` does, it won't create an `AsyncSeq` type (at least not yet). If you want classic Async and parallelism, you should get this library instead. -### Futher reading on resumable state machines +### Further reading on resumable state machines - A state machine from a monadic perspective in F# [can be found here][12], which works with the pre-F# 6.0 non-resumable internals. - The [original RFC for F# 6.0 on resumable state machines][13] diff --git a/assets/nuget-package-readme.md b/assets/nuget-package-readme.md index 19cf16ab..7e106635 100644 --- a/assets/nuget-package-readme.md +++ b/assets/nuget-package-readme.md @@ -19,8 +19,8 @@ An implementation of [`IAsyncEnumerable<'T>`][3] as a computation expression: `t - [Examples](#examples) - [`TaskSeq` module functions](#taskseq-module-functions) - [More information](#more-information) - - [Futher reading `IAsyncEnumerable`](#futher-reading-iasyncenumerable) - - [Futher reading on resumable state machines](#futher-reading-on-resumable-state-machines) + - [Further reading `IAsyncEnumerable`](#further-reading-iasyncenumerable) + - [Further reading on resumable state machines](#further-reading-on-resumable-state-machines) - [Further reading on computation expressions](#further-reading-on-computation-expressions) ----------------------------------------- @@ -47,15 +47,25 @@ for `use` and `use!`, `try-with` and `try-finally` and `while` loops within the ```f# open System.IO - open FSharp.Control // singleton is fine -let hello = taskSeq { yield "Hello, World!" } +let helloTs = taskSeq { yield "Hello, World!" } + +// cold-started, that is, delay-executed +let f() = task { + // using toList forces execution of whole sequence + let! hello = TaskSeq.toList helloTs // toList returns a Task<'T list> + return List.head hello +} // can be mixed with normal sequences let oneToTen = taskSeq { yield! [1..10] } +// can be used with F#'s task and async in a for-loop +let f() = task { for x in oneToTen do printfn "Number %i" x } +let g() = async { for x in oneToTen do printfn "Number %i" x } + // returns a delayed sequence of IAsyncEnumerable let allFilesAsLines() = taskSeq { let files = Directory.EnumerateFiles(@"c:\temp") @@ -237,14 +247,14 @@ _The motivation for `readOnly` in `Seq` is that a cast from a mutable array or l ## More information -### Futher reading `IAsyncEnumerable` +### Further reading `IAsyncEnumerable` - A good C#-based introduction [can be found in this blog][8]. - [An MSDN article][9] written shortly after it was introduced. - Converting a `seq` to an `IAsyncEnumerable` [demo gist][10] as an example, though `TaskSeq` contains many more utility functions and uses a slightly different approach. - If you're looking for using `IAsyncEnumerable` with `async` and not `task`, the excellent [`AsyncSeq`][11] library should be used. While `TaskSeq` is intended to consume `async` just like `task` does, it won't create an `AsyncSeq` type (at least not yet). If you want classic Async and parallelism, you should get this library instead. -### Futher reading on resumable state machines +### Further reading on resumable state machines - A state machine from a monadic perspective in F# [can be found here][12], which works with the pre-F# 6.0 non-resumable internals. - The [original RFC for F# 6.0 on resumable state machines][13] diff --git a/src/FSharp.Control.TaskSeq.Test/FSharp.Control.TaskSeq.Test.fsproj b/src/FSharp.Control.TaskSeq.Test/FSharp.Control.TaskSeq.Test.fsproj index fdeb70de..02c3b80d 100644 --- a/src/FSharp.Control.TaskSeq.Test/FSharp.Control.TaskSeq.Test.fsproj +++ b/src/FSharp.Control.TaskSeq.Test/FSharp.Control.TaskSeq.Test.fsproj @@ -48,6 +48,8 @@ + + diff --git a/src/FSharp.Control.TaskSeq.Test/TaskSeq.AsyncExtensions.Tests.fs b/src/FSharp.Control.TaskSeq.Test/TaskSeq.AsyncExtensions.Tests.fs new file mode 100644 index 00000000..1f0b9efd --- /dev/null +++ b/src/FSharp.Control.TaskSeq.Test/TaskSeq.AsyncExtensions.Tests.fs @@ -0,0 +1,144 @@ +module TaskSeq.Tests.AsyncExtensions + +open System +open Xunit +open FsUnit.Xunit + +open FSharp.Control + +// +// Async extensions +// + +module EmptySeq = + [)>] + let ``Async-for CE with empty taskSeq`` variant = async { + let values = Gen.getEmptyVariant variant + + let mutable sum = 42 + + for x in values do + sum <- sum + x + + sum |> should equal 42 + } + + [] + let ``Async-for CE must execute side effect in empty taskseq`` () = async { + let mutable data = 0 + let values = taskSeq { do data <- 42 } + + for x in values do + () + + data |> should equal 42 + } + + +module Immutable = + [)>] + let ``Async-for CE with taskSeq`` variant = async { + let values = Gen.getSeqImmutable variant + + let mutable sum = 0 + + for x in values do + sum <- sum + x + + sum |> should equal 55 + } + + [)>] + let ``Async-for CE with taskSeq multiple iterations`` variant = async { + let values = Gen.getSeqImmutable variant + + let mutable sum = 0 + + for x in values do + sum <- sum + x + + // each following iteration should start at the beginning + for x in values do + sum <- sum + x + + for x in values do + sum <- sum + x + + sum |> should equal 165 + } + + [] + let ``Async-for mixing both types of for loops`` () = async { + // this test ensures overload resolution is correct + let ts = TaskSeq.singleton 20 + let sq = Seq.singleton 20 + let mutable sum = 2 + + for x in ts do + sum <- sum + x + + for x in sq do + sum <- sum + x + + sum |> should equal 42 + } + +module SideEffects = + [)>] + let ``Async-for CE with taskSeq`` variant = async { + let values = Gen.getSeqWithSideEffect variant + + let mutable sum = 0 + + for x in values do + sum <- sum + x + + sum |> should equal 55 + } + + [)>] + let ``Async-for CE with taskSeq multiple iterations`` variant = async { + let values = Gen.getSeqWithSideEffect variant + + let mutable sum = 0 + + for x in values do + sum <- sum + x + + // each following iteration should start at the beginning + // with the "side effect" tests, the mutable state updates + for x in values do + sum <- sum + x // starts at 11 + + for x in values do + sum <- sum + x // starts at 21 + + sum |> should equal 465 // eq to: List.sum [1..30] + } + +module Other = + [] + let ``Async-for CE must call dispose in empty taskSeq`` () = async { + let disposed = ref 0 + let values = Gen.getEmptyDisposableTaskSeq disposed + + for x in values do + () + + // the DisposeAsync should be called by now + disposed.Value |> should equal 1 + } + + [] + let ``Async-for CE must call dispose on singleton`` () = async { + let disposed = ref 0 + let mutable sum = 0 + let values = Gen.getSingletonDisposableTaskSeq disposed + + for x in values do + sum <- x + + // the DisposeAsync should be called by now + disposed.Value |> should equal 1 + sum |> should equal 42 + } diff --git a/src/FSharp.Control.TaskSeq.Test/TaskSeq.TaskExtensions.Tests.fs b/src/FSharp.Control.TaskSeq.Test/TaskSeq.TaskExtensions.Tests.fs new file mode 100644 index 00000000..5d5316a4 --- /dev/null +++ b/src/FSharp.Control.TaskSeq.Test/TaskSeq.TaskExtensions.Tests.fs @@ -0,0 +1,143 @@ +module TaskSeq.Tests.TaskExtensions + +open System +open Xunit +open FsUnit.Xunit + +open FSharp.Control + +// +// Task extensions +// + +module EmptySeq = + [)>] + let ``Task-for CE with empty taskSeq`` variant = task { + let values = Gen.getEmptyVariant variant + + let mutable sum = 42 + + for x in values do + sum <- sum + x + + sum |> should equal 42 + } + + [] + let ``Task-for CE must execute side effect in empty taskseq`` () = task { + let mutable data = 0 + let values = taskSeq { do data <- 42 } + + for x in values do + () + + data |> should equal 42 + } + +module Immutable = + [)>] + let ``Task-for CE with taskSeq`` variant = task { + let values = Gen.getSeqImmutable variant + + let mutable sum = 0 + + for x in values do + sum <- sum + x + + sum |> should equal 55 + } + + [)>] + let ``Task-for CE with taskSeq multiple iterations`` variant = task { + let values = Gen.getSeqImmutable variant + + let mutable sum = 0 + + for x in values do + sum <- sum + x + + // each following iteration should start at the beginning + for x in values do + sum <- sum + x + + for x in values do + sum <- sum + x + + sum |> should equal 165 + } + + [] + let ``Task-for mixing both types of for loops`` () = async { + // this test ensures overload resolution is correct + let ts = TaskSeq.singleton 20 + let sq = Seq.singleton 20 + let mutable sum = 2 + + for x in ts do + sum <- sum + x + + for x in sq do + sum <- sum + x + + sum |> should equal 42 + } + +module SideEffects = + [)>] + let ``Task-for CE with taskSeq`` variant = task { + let values = Gen.getSeqWithSideEffect variant + + let mutable sum = 0 + + for x in values do + sum <- sum + x + + sum |> should equal 55 + } + + [)>] + let ``Task-for CE with taskSeq multiple iterations`` variant = task { + let values = Gen.getSeqWithSideEffect variant + + let mutable sum = 0 + + for x in values do + sum <- sum + x + + // each following iteration should start at the beginning + // with the "side effect" tests, the mutable state updates + for x in values do + sum <- sum + x // starts at 11 + + for x in values do + sum <- sum + x // starts at 21 + + sum |> should equal 465 // eq to: List.sum [1..30] + } + +module Other = + [] + let ``Task-for CE must call dispose in empty taskSeq`` () = async { + let disposed = ref 0 + let values = Gen.getEmptyDisposableTaskSeq disposed + + for x in values do + () + + // the DisposeAsync should be called by now + disposed.Value |> should equal 1 + } + + [] + let ``Task-for CE must call dispose on singleton`` () = async { + let disposed = ref 0 + let mutable sum = 0 + let values = Gen.getSingletonDisposableTaskSeq disposed + + for x in values do + sum <- x + + // the DisposeAsync should be called by now + disposed.Value |> should equal 1 + sum |> should equal 42 + } diff --git a/src/FSharp.Control.TaskSeq.Test/TestUtils.fs b/src/FSharp.Control.TaskSeq.Test/TestUtils.fs index dd4873b3..28b36031 100644 --- a/src/FSharp.Control.TaskSeq.Test/TestUtils.fs +++ b/src/FSharp.Control.TaskSeq.Test/TestUtils.fs @@ -521,6 +521,44 @@ module TestUtils = } | x -> failwithf "Invalid test variant: %A" x + /// An empty taskSeq that can be used with tests for checking if the dispose method gets called. + /// Will add 1 to the passed integer upon disposing. + let getEmptyDisposableTaskSeq (disposed: int ref) = + { new IAsyncEnumerable<'T> with + member _.GetAsyncEnumerator(_) = + { new IAsyncEnumerator<'T> with + member _.MoveNextAsync() = ValueTask.False + member _.Current = Unchecked.defaultof<'T> + member _.DisposeAsync() = ValueTask(task { do disposed.Value <- disposed.Value + 1 }) + } + } + + /// A singleton taskSeq that can be used with tests for checking if the dispose method gets called + /// The singleton value is '42'. Will add 1 to the passed integer upon disposing. + let getSingletonDisposableTaskSeq (disposed: int ref) = + { new IAsyncEnumerable with + member _.GetAsyncEnumerator(_) = + let mutable status = BeforeAll + + { new IAsyncEnumerator with + member _.MoveNextAsync() = + match status with + | BeforeAll -> + status <- WithCurrent + ValueTask.True + | WithCurrent -> + status <- AfterAll + ValueTask.False + | AfterAll -> ValueTask.False + + member _.Current: int = + match status with + | WithCurrent -> 42 + | _ -> Unchecked.defaultof + + member _.DisposeAsync() = ValueTask(task { do disposed.Value <- disposed.Value + 1 }) + } + } // // following types can be used with Theory & TestData // diff --git a/src/FSharp.Control.TaskSeq/AsyncExtensions.fs b/src/FSharp.Control.TaskSeq/AsyncExtensions.fs new file mode 100644 index 00000000..51c46b4e --- /dev/null +++ b/src/FSharp.Control.TaskSeq/AsyncExtensions.fs @@ -0,0 +1,14 @@ +namespace FSharp.Control + +open FSharp.Control.TaskSeqBuilders + +[] +module AsyncExtensions = + + // Add asynchronous for loop to the 'async' computation builder + type Microsoft.FSharp.Control.AsyncBuilder with + + member _.For(source: taskSeq<'T>, action: 'T -> Async) = + source + |> TaskSeq.iterAsync (action >> Async.StartAsTask) + |> Async.AwaitTask diff --git a/src/FSharp.Control.TaskSeq/AsyncExtensions.fsi b/src/FSharp.Control.TaskSeq/AsyncExtensions.fsi new file mode 100644 index 00000000..7470eafd --- /dev/null +++ b/src/FSharp.Control.TaskSeq/AsyncExtensions.fsi @@ -0,0 +1,12 @@ +namespace FSharp.Control + +[] +module AsyncExtensions = + open FSharp.Control.TaskSeqBuilders + + type AsyncBuilder with + + /// + /// Inside , iterate over all values of a . + /// + member For: source: taskSeq<'T> * action: ('T -> Async) -> Async diff --git a/src/FSharp.Control.TaskSeq/FSharp.Control.TaskSeq.fsproj b/src/FSharp.Control.TaskSeq/FSharp.Control.TaskSeq.fsproj index ffe66c2f..1575392d 100644 --- a/src/FSharp.Control.TaskSeq/FSharp.Control.TaskSeq.fsproj +++ b/src/FSharp.Control.TaskSeq/FSharp.Control.TaskSeq.fsproj @@ -23,10 +23,11 @@ Generates optimized IL code through the new resumable state machines, and comes nuget-package-readme.md Release notes: - 0.2.3 (unreleased) - - add TaskSeq.singleton, #90 (by @gusty) - - improve TaskSeq.empty by not relying on resumable state, #89 (by @gusty) - - do not throw exception for unequal lengths in TaskSeq.zip, fixes #32 + 0.3.0 (unreleased) + - adds support for 'for .. in ..' with task sequences in F# tasks and async, #75, #93 and #99 (with help from @theangrybyrd) + - adds TaskSeq.singleton, #90 (by @gusty) + - improves TaskSeq.empty by not relying on resumable state, #89 (by @gusty) + - does not throw exceptions anymore for unequal lengths in TaskSeq.zip, fixes #32 0.2.2 - removes TaskSeq.toSeqCachedAsync, which was incorrectly named. Use toSeq or toListAsync instead. - renames TaskSeq.toSeqCached to TaskSeq.toSeq, which was its actual operational behavior. @@ -71,13 +72,12 @@ Generates optimized IL code through the new resumable state machines, and comes + + + + - - - - - diff --git a/src/FSharp.Control.TaskSeq/TaskExtensions.fs b/src/FSharp.Control.TaskSeq/TaskExtensions.fs new file mode 100644 index 00000000..aa40599f --- /dev/null +++ b/src/FSharp.Control.TaskSeq/TaskExtensions.fs @@ -0,0 +1,82 @@ +namespace FSharp.Control + +open System.Collections.Generic +open System.Threading +open System.Threading.Tasks + +open Microsoft.FSharp.Core.CompilerServices +open Microsoft.FSharp.Core.LanguagePrimitives.IntrinsicOperators + +open FSharp.Control.TaskSeqBuilders + +#nowarn "57" +#nowarn "1204" +#nowarn "3513" + +[] +module TaskExtensions = + + // Add asynchronous for loop to the 'task' computation builder + type Microsoft.FSharp.Control.TaskBuilder with + + /// Used by `For`. F# currently doesn't support `while!`, so this cannot be called directly from the task CE + /// This code is mostly a copy of TaskSeq.WhileAsync. + member inline _.WhileAsync + ( + [] condition: unit -> ValueTask, + body: TaskCode<_, unit> + ) : TaskCode<_, _> = + let mutable condition_res = true + + // note that this While itself has both a dynamic and static implementation + // so we don't need to add that here (TODO: how to verify?). + ResumableCode.While( + (fun () -> condition_res), + TaskCode<_, _>(fun sm -> + let mutable __stack_condition_fin = true + let __stack_vtask = condition () + + let mutable awaiter = __stack_vtask.GetAwaiter() + + if awaiter.IsCompleted then + Debug.logInfo "at Task.WhileAsync: returning completed task" + + __stack_condition_fin <- true + condition_res <- awaiter.GetResult() + else + Debug.logInfo "at Task.WhileAsync: awaiting non-completed task" + + // This will yield with __stack_fin = false + // This will resume with __stack_fin = true + let __stack_yield_fin = ResumableCode.Yield().Invoke(&sm) + __stack_condition_fin <- __stack_yield_fin + + if __stack_condition_fin then + condition_res <- awaiter.GetResult() + + + if __stack_condition_fin then + if condition_res then body.Invoke(&sm) else true + else + sm.Data.MethodBuilder.AwaitUnsafeOnCompleted(&awaiter, &sm) + false) + ) + + member inline this.For(source: taskSeq<'T>, body: 'T -> TaskCode<_, unit>) : TaskCode<_, unit> = + TaskCode<'TOverall, unit>(fun sm -> + this + .Using( + source.GetAsyncEnumerator(CancellationToken()), + (fun e -> + this.WhileAsync( + // __debugPoint is only available from FSharp.Core 6.0.4 + //(fun () -> + // Microsoft.FSharp.Core.CompilerServices.StateMachineHelpers.__debugPoint + // "ForLoop.InOrToKeyword" + + // e.MoveNextAsync()), + e.MoveNextAsync, + (fun sm -> (body e.Current).Invoke(&sm)) + )) + ) + .Invoke(&sm)) diff --git a/src/FSharp.Control.TaskSeq/TaskExtensions.fsi b/src/FSharp.Control.TaskSeq/TaskExtensions.fsi new file mode 100644 index 00000000..b5a97e7d --- /dev/null +++ b/src/FSharp.Control.TaskSeq/TaskExtensions.fsi @@ -0,0 +1,14 @@ +namespace FSharp.Control + +#nowarn "1204" + +[] +module TaskExtensions = + open FSharp.Control.TaskSeqBuilders + + type TaskBuilder with + + /// + /// Inside , iterate over all values of a . + /// + member inline For: source: taskSeq<'T> * body: ('T -> TaskCode<'TOverall, unit>) -> TaskCode<'TOverall, unit> diff --git a/src/FSharp.Control.TaskSeq/TaskSeq.fs b/src/FSharp.Control.TaskSeq/TaskSeq.fs index fb45a1ae..e1a13d1b 100644 --- a/src/FSharp.Control.TaskSeq/TaskSeq.fs +++ b/src/FSharp.Control.TaskSeq/TaskSeq.fs @@ -4,6 +4,8 @@ open System.Collections.Generic open System.Threading open System.Threading.Tasks +#nowarn "57" + module TaskSeq = // F# BUG: the following module is 'AutoOpen' and this isn't needed in the Tests project. Why do we need to open it? open FSharp.Control.TaskSeqBuilders diff --git a/src/FSharp.Control.TaskSeq/TaskSeq.fsi b/src/FSharp.Control.TaskSeq/TaskSeq.fsi index 0d9fe0c9..172ab83c 100644 --- a/src/FSharp.Control.TaskSeq/TaskSeq.fsi +++ b/src/FSharp.Control.TaskSeq/TaskSeq.fsi @@ -1,5 +1,7 @@ namespace FSharp.Control +#nowarn "1204" + module TaskSeq = open System.Collections.Generic open System.Threading.Tasks diff --git a/src/FSharp.Control.TaskSeq/TaskSeqBuilder.fs b/src/FSharp.Control.TaskSeq/TaskSeqBuilder.fs index 2331fc28..195ed87b 100644 --- a/src/FSharp.Control.TaskSeq/TaskSeqBuilder.fs +++ b/src/FSharp.Control.TaskSeq/TaskSeqBuilder.fs @@ -36,50 +36,6 @@ module Internal = // cannot be marked with 'internal' scope with _ -> false - type Debug = - - [] - static val mutable private verbose: bool option - - /// Setting from environment variable TASKSEQ_LOG_VERBOSE, which, - /// when set, enables (very) verbose printing of flow and state - static member private getVerboseSetting() = - match Debug.verbose with - | None -> - let verboseEnv = - try - match Environment.GetEnvironmentVariable "TASKSEQ_LOG_VERBOSE" with - | null -> false - | x -> - match x.ToLowerInvariant().Trim() with - | "1" - | "true" - | "on" - | "yes" -> true - | _ -> false - - with _ -> - false - - Debug.verbose <- Some verboseEnv - verboseEnv - - | Some setting -> setting - - [] - static member private print value = - match Debug.getVerboseSetting () with - | false -> () - | true -> - // don't use ksprintf here, because the compiler does not remove all allocations due to - // the way PrintfFormat types are compiled, even if we set the Conditional attribute. - printfn "%i (%b): %s" Thread.CurrentThread.ManagedThreadId Thread.CurrentThread.IsThreadPoolThread value - - [] - static member logInfo(str) = Debug.print str - - [] - static member logInfo(str, data) = Debug.print $"%s{str}{data}" /// Call MoveNext on an IAsyncStateMachine by reference let inline moveNextRef (x: byref<'T> when 'T :> IAsyncStateMachine) = x.MoveNext() @@ -89,8 +45,6 @@ module Internal = // cannot be marked with 'internal' scope NotImplementedException "Abstract Class: method or property not implemented" |> raise -open type Debug - type taskSeq<'T> = IAsyncEnumerable<'T> type IPriority1 = @@ -244,7 +198,7 @@ and [] TaskSeq<'Machine, 'T this // just return 'self' here | _ -> - logInfo "GetAsyncEnumerator, start cloning..." + Debug.logInfo "GetAsyncEnumerator, start cloning..." // We need to reset state, but only to the "initial machine", resetting the _machine to // Unchecked.defaultof<_> is wrong, as the compiler uses this to track state. However, @@ -260,7 +214,7 @@ and [] TaskSeq<'Machine, 'T clone._machine <- this._initialMachine clone._initialMachine <- this._initialMachine // TODO: proof with a test that this is necessary: probably not clone.InitMachineData(ct, &clone._machine) - logInfo "GetAsyncEnumerator, finished cloning..." + Debug.logInfo "GetAsyncEnumerator, finished cloning..." clone interface System.Collections.Generic.IAsyncEnumerator<'T> with @@ -275,39 +229,39 @@ and [] TaskSeq<'Machine, 'T Unchecked.defaultof<'T> member this.MoveNextAsync() = - logInfo "MoveNextAsync..." + Debug.logInfo "MoveNextAsync..." if this._machine.ResumptionPoint = -1 then // can't use as IAsyncEnumerator before IAsyncEnumerable - logInfo "at MoveNextAsync: Resumption point = -1" + Debug.logInfo "at MoveNextAsync: Resumption point = -1" ValueTask.False elif this._machine.Data.completed then - logInfo "at MoveNextAsync: completed = true" + Debug.logInfo "at MoveNextAsync: completed = true" // return False when beyond the last item this._machine.Data.promiseOfValueOrEnd.Reset() ValueTask.False else - logInfo "at MoveNextAsync: normal resumption scenario" + Debug.logInfo "at MoveNextAsync: normal resumption scenario" let data = this._machine.Data data.promiseOfValueOrEnd.Reset() let mutable ts = this - logInfo "at MoveNextAsync: start calling builder.MoveNext()" + Debug.logInfo "at MoveNextAsync: start calling builder.MoveNext()" data.builder.MoveNext(&ts) - logInfo "at MoveNextAsync: finished calling builder.MoveNext()" + Debug.logInfo "at MoveNextAsync: finished calling builder.MoveNext()" this.MoveNextAsyncResult() /// Disposes of the IAsyncEnumerator (*not* the IAsyncEnumerable!!!) member this.DisposeAsync() = task { - logInfo "DisposeAsync..." + Debug.logInfo "DisposeAsync..." match this._machine.Data.disposalStack with | null -> () @@ -335,7 +289,7 @@ and [] TaskSeq<'Machine, 'T match status with | ValueTaskSourceStatus.Succeeded -> - logInfo "at MoveNextAsyncResult: case succeeded..." + Debug.logInfo "at MoveNextAsyncResult: case succeeded..." let result = data.promiseOfValueOrEnd.GetResult(version) @@ -349,11 +303,11 @@ and [] TaskSeq<'Machine, 'T | ValueTaskSourceStatus.Faulted | ValueTaskSourceStatus.Canceled | ValueTaskSourceStatus.Pending as state -> - logInfo ("at MoveNextAsyncResult: case ", state) + Debug.logInfo ("at MoveNextAsyncResult: case ", state) ValueTask.ofIValueTaskSource this version | _ -> - logInfo "at MoveNextAsyncResult: Unexpected state" + Debug.logInfo "at MoveNextAsyncResult: Unexpected state" // assume it's a possibly new, not yet supported case, treat as default ValueTask.ofIValueTaskSource this version @@ -376,12 +330,12 @@ type TaskSeqBuilder() = __resumeAt sm.ResumptionPoint try - logInfo "at Run.MoveNext start" + Debug.logInfo "at Run.MoveNext start" let __stack_code_fin = code.Invoke(&sm) if __stack_code_fin then - logInfo $"at Run.MoveNext, done" + Debug.logInfo $"at Run.MoveNext, done" // Signal we're at the end // NOTE: if we don't do it here, as well as in IValueTaskSource.GetResult @@ -392,14 +346,14 @@ type TaskSeqBuilder() = sm.Data.completed <- true elif sm.Data.current.IsSome then - logInfo $"at Run.MoveNext, still more items in enumerator" + Debug.logInfo $"at Run.MoveNext, still more items in enumerator" // Signal there's more data: sm.Data.promiseOfValueOrEnd.SetResult(true) else // Goto request - logInfo $"at Run.MoveNext, await, MoveNextAsync has not completed yet" + Debug.logInfo $"at Run.MoveNext, await, MoveNextAsync has not completed yet" // don't capture the full object in the next closure (won't work because: byref) // but only a reference to itself. @@ -412,7 +366,7 @@ type TaskSeqBuilder() = ) with exn -> - logInfo ("Setting exception of PromiseOfValueOrEnd to: ", exn.Message) + Debug.logInfo ("Setting exception of PromiseOfValueOrEnd to: ", exn.Message) sm.Data.promiseOfValueOrEnd.SetException(exn) sm.Data.builder.Complete() @@ -420,7 +374,7 @@ type TaskSeqBuilder() = )) (SetStateMachineMethodImpl<_>(fun sm state -> ())) // not used in reference impl (AfterCode<_, _>(fun sm -> - logInfo "at AfterCode<_, _>, after F# inits the sm, and we can attach extra info" + Debug.logInfo "at AfterCode<_, _>, after F# inits the sm, and we can attach extra info" let ts = TaskSeq, 'T>() ts._initialMachine <- sm @@ -440,14 +394,15 @@ type TaskSeqBuilder() = member inline _.Zero() : TaskSeqCode<'T> = - logInfo "at Zero()" + Debug.logInfo "at Zero()" ResumableCode.Zero() member inline _.Combine(task1: TaskSeqCode<'T>, task2: TaskSeqCode<'T>) : TaskSeqCode<'T> = - logInfo "at Combine(.., ..)" + Debug.logInfo "at Combine(.., ..)" ResumableCode.Combine(task1, task2) + /// Used by `For`. F# currently doesn't support `while!`, so this cannot be called directly from the CE member inline _.WhileAsync ( [] condition: unit -> ValueTask, @@ -462,12 +417,12 @@ type TaskSeqBuilder() = let __stack_vtask = condition () if __stack_vtask.IsCompleted then - logInfo "at WhileAsync: returning completed task" + Debug.logInfo "at WhileAsync: returning completed task" __stack_condition_fin <- true condition_res <- __stack_vtask.Result else - logInfo "at WhileAsync: awaiting non-completed task" + Debug.logInfo "at WhileAsync: awaiting non-completed task" let task = __stack_vtask.AsTask() let mutable awaiter = task.GetAwaiter() @@ -489,10 +444,7 @@ type TaskSeqBuilder() = ) member inline b.While([] condition: unit -> bool, body: TaskSeqCode<'T>) : TaskSeqCode<'T> = - logInfo "at While(...)" - - // was this: - // b.WhileAsync((fun () -> ValueTask(condition ())), body) + Debug.logInfo "at While(...)" ResumableCode.While(condition, body) member inline _.TryWith(body: TaskSeqCode<'T>, catch: exn -> TaskSeqCode<'T>) : TaskSeqCode<'T> = @@ -597,7 +549,7 @@ type TaskSeqBuilder() = TaskSeqCode<'T>(fun sm -> // This will yield with __stack_fin = false // This will resume with __stack_fin = true - logInfo "at Yield" + Debug.logInfo "at Yield" let __stack_fin = ResumableCode.Yield().Invoke(&sm) sm.Data.current <- ValueSome v @@ -614,7 +566,7 @@ type TaskSeqBuilder() = let mutable awaiter = task.GetAwaiter() let mutable __stack_fin = true - logInfo "at Bind" + Debug.logInfo "at Bind" if not awaiter.IsCompleted then // This will yield with __stack_fin2 = false @@ -622,15 +574,15 @@ type TaskSeqBuilder() = let __stack_fin2 = ResumableCode.Yield().Invoke(&sm) __stack_fin <- __stack_fin2 - logInfo ("at Bind: with __stack_fin = ", __stack_fin) - logInfo ("at Bind: this.completed = ", sm.Data.completed) + Debug.logInfo ("at Bind: with __stack_fin = ", __stack_fin) + Debug.logInfo ("at Bind: this.completed = ", sm.Data.completed) if __stack_fin then let result = awaiter.GetResult() (continuation result).Invoke(&sm) else - logInfo "at Bind: calling AwaitUnsafeOnCompleted" + Debug.logInfo "at Bind: calling AwaitUnsafeOnCompleted" sm.Data.awaiter <- awaiter sm.Data.current <- ValueNone @@ -641,7 +593,7 @@ type TaskSeqBuilder() = let mutable awaiter = task.GetAwaiter() let mutable __stack_fin = true - logInfo "at BindV" + Debug.logInfo "at BindV" if not awaiter.IsCompleted then // This will yield with __stack_fin2 = false @@ -653,7 +605,7 @@ type TaskSeqBuilder() = let result = awaiter.GetResult() (continuation result).Invoke(&sm) else - logInfo "at BindV: calling AwaitUnsafeOnCompleted" + Debug.logInfo "at BindV: calling AwaitUnsafeOnCompleted" sm.Data.awaiter <- awaiter sm.Data.current <- ValueNone diff --git a/src/FSharp.Control.TaskSeq/Utils.fs b/src/FSharp.Control.TaskSeq/Utils.fs index 09d0e504..15fce9b2 100644 --- a/src/FSharp.Control.TaskSeq/Utils.fs +++ b/src/FSharp.Control.TaskSeq/Utils.fs @@ -1,6 +1,9 @@ namespace FSharp.Control open System.Threading.Tasks +open System +open System.Diagnostics +open System.Threading [] module ValueTaskExtensions = @@ -103,3 +106,52 @@ module Async = /// Bind an Async<'T> let inline bind binder (task: Async<'T>) : Async<'U> = ExtraTopLevelOperators.async { return! binder task } + +type Debug = + + [] + static val mutable private verbose: bool option + + /// Setting from environment variable TASKSEQ_LOG_VERBOSE, which, + /// when set, enables (very) verbose printing of flow and state + static member private getVerboseSetting() = + match Debug.verbose with + | None -> + let verboseEnv = + try + match Environment.GetEnvironmentVariable "TASKSEQ_LOG_VERBOSE" with + | null -> false + | x -> + match x.ToLowerInvariant().Trim() with + | "1" + | "true" + | "on" + | "yes" -> true + | _ -> false + + with _ -> + false + + Debug.verbose <- Some verboseEnv + verboseEnv + + | Some setting -> setting + + /// Private helper to log to stdout in DEBUG builds only + [] + static member private print value = + match Debug.getVerboseSetting () with + | false -> () + | true -> + // don't use ksprintf here, because the compiler does not remove all allocations due to + // the way PrintfFormat types are compiled, even if we set the Conditional attribute. + let ct = Thread.CurrentThread + printfn "%i (%b): %s" ct.ManagedThreadId ct.IsThreadPoolThread value + + /// Log to stdout in DEBUG builds only + [] + static member logInfo(str) = Debug.print str + + /// Log to stdout in DEBUG builds only + [] + static member logInfo(str, data) = Debug.print $"%s{str}{data}"