Skip to content

Commit 47dedbd

Browse files
committed
Add implementation of TaskSeq.except/exceptOfSeq with tests and xml documentation
1 parent c236b1a commit 47dedbd

File tree

5 files changed

+282
-23
lines changed

5 files changed

+282
-23
lines changed

src/FSharp.Control.TaskSeq.Test/FSharp.Control.TaskSeq.Test.fsproj

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
<Compile Include="TaskSeq.Delay.Tests.fs" />
2222
<Compile Include="TaskSeq.Empty.Tests.fs" />
2323
<Compile Include="TaskSeq.ExactlyOne.Tests.fs" />
24+
<Compile Include="TaskSeq.Except.Tests.fs" />
2425
<Compile Include="TaskSeq.Exists.Tests.fs" />
2526
<Compile Include="TaskSeq.Filter.Tests.fs" />
2627
<Compile Include="TaskSeq.FindIndex.Tests.fs" />
Lines changed: 141 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,141 @@
1+
module TaskSeq.Tests.Except
2+
3+
open System
4+
open Xunit
5+
open FsUnit.Xunit
6+
open FsToolkit.ErrorHandling
7+
8+
open FSharp.Control
9+
10+
//
11+
// TaskSeq.except
12+
// TaskSeq.exceptOfSeq
13+
//
14+
15+
16+
module EmptySeq =
17+
[<Theory; ClassData(typeof<TestEmptyVariants>)>]
18+
let ``TaskSeq-except`` variant =
19+
Gen.getEmptyVariant variant
20+
|> TaskSeq.except (Gen.getEmptyVariant variant)
21+
|> verifyEmpty
22+
23+
[<Theory; ClassData(typeof<TestEmptyVariants>)>]
24+
let ``TaskSeq-exceptOfSeq`` variant =
25+
Gen.getEmptyVariant variant
26+
|> TaskSeq.exceptOfSeq Seq.empty
27+
|> verifyEmpty
28+
29+
[<Theory; ClassData(typeof<TestEmptyVariants>)>]
30+
let ``TaskSeq-except v2`` variant =
31+
Gen.getEmptyVariant variant
32+
|> TaskSeq.except TaskSeq.empty
33+
|> verifyEmpty
34+
35+
[<Theory; ClassData(typeof<TestEmptyVariants>)>]
36+
let ``TaskSeq-except v3`` variant =
37+
TaskSeq.empty
38+
|> TaskSeq.except (Gen.getEmptyVariant variant)
39+
|> verifyEmpty
40+
41+
[<Theory; ClassData(typeof<TestEmptyVariants>)>]
42+
let ``TaskSeq-except no side effect in exclude seq if source seq is empty`` variant =
43+
let mutable i = 0
44+
45+
let exclude = taskSeq {
46+
i <- i + 1
47+
yield 12
48+
}
49+
50+
TaskSeq.empty
51+
|> TaskSeq.except exclude
52+
|> verifyEmpty
53+
|> Task.map (fun () -> i |> should equal 0) // exclude seq is only enumerated after first item in source
54+
55+
module Immutable =
56+
[<Theory; ClassData(typeof<TestImmTaskSeq>)>]
57+
let ``TaskSeq-except removes duplicates`` variant =
58+
TaskSeq.ofList [ 1; 1; 2; 3; 4; 12; 12; 12; 13; 13; 13; 13; 13; 99 ]
59+
|> TaskSeq.except (Gen.getSeqImmutable variant)
60+
|> TaskSeq.toArrayAsync
61+
|> Task.map (should equal [| 12; 13; 99 |])
62+
63+
[<Fact>]
64+
let ``TaskSeq-except removes duplicates with empty itemsToExcept`` () =
65+
TaskSeq.ofList [ 1; 1; 2; 3; 4; 12; 12; 12; 13; 13; 13; 13; 13; 99 ]
66+
|> TaskSeq.except TaskSeq.empty
67+
|> TaskSeq.toArrayAsync
68+
|> Task.map (should equal [| 1; 2; 3; 4; 12; 13; 99 |])
69+
70+
[<Theory; ClassData(typeof<TestImmTaskSeq>)>]
71+
let ``TaskSeq-except removes everything`` variant =
72+
Gen.getSeqImmutable variant
73+
|> TaskSeq.except (Gen.getSeqImmutable variant)
74+
|> verifyEmpty
75+
76+
[<Theory; ClassData(typeof<TestImmTaskSeq>)>]
77+
let ``TaskSeq-except removes everything with duplicates`` variant =
78+
taskSeq {
79+
yield! Gen.getSeqImmutable variant
80+
yield! Gen.getSeqImmutable variant
81+
yield! Gen.getSeqImmutable variant
82+
yield! Gen.getSeqImmutable variant
83+
}
84+
|> TaskSeq.except (Gen.getSeqImmutable variant)
85+
|> verifyEmpty
86+
87+
[<Theory; ClassData(typeof<TestImmTaskSeq>)>]
88+
let ``TaskSeq-exceptOfSeq removes duplicates`` variant =
89+
TaskSeq.ofList [ 1; 1; 2; 3; 4; 12; 12; 12; 13; 13; 13; 13; 13; 99 ]
90+
|> TaskSeq.exceptOfSeq [ 1..10 ]
91+
|> TaskSeq.toArrayAsync
92+
|> Task.map (should equal [| 12; 13; 99 |])
93+
94+
[<Fact>]
95+
let ``TaskSeq-exceptOfSeq removes duplicates with empty itemsToExcept`` () =
96+
TaskSeq.ofList [ 1; 1; 2; 3; 4; 12; 12; 12; 13; 13; 13; 13; 13; 99 ]
97+
|> TaskSeq.exceptOfSeq Seq.empty
98+
|> TaskSeq.toArrayAsync
99+
|> Task.map (should equal [| 1; 2; 3; 4; 12; 13; 99 |])
100+
101+
[<Theory; ClassData(typeof<TestImmTaskSeq>)>]
102+
let ``TaskSeq-exceptOfSeq removes everything`` variant =
103+
Gen.getSeqImmutable variant
104+
|> TaskSeq.exceptOfSeq [ 1..10 ]
105+
|> verifyEmpty
106+
107+
[<Theory; ClassData(typeof<TestImmTaskSeq>)>]
108+
let ``TaskSeq-exceptOfSeq removes everything with duplicates`` variant =
109+
taskSeq {
110+
yield! Gen.getSeqImmutable variant
111+
yield! Gen.getSeqImmutable variant
112+
yield! Gen.getSeqImmutable variant
113+
yield! Gen.getSeqImmutable variant
114+
}
115+
|> TaskSeq.exceptOfSeq [ 1..10 ]
116+
|> verifyEmpty
117+
118+
module SideEffects =
119+
[<Theory; ClassData(typeof<TestSideEffectTaskSeq>)>]
120+
let ``TaskSeq-except removes duplicates`` variant =
121+
TaskSeq.ofList [ 1; 1; 2; 3; 4; 12; 12; 12; 13; 13; 13; 13; 13; 99 ]
122+
|> TaskSeq.except (Gen.getSeqWithSideEffect variant)
123+
|> TaskSeq.toArrayAsync
124+
|> Task.map (should equal [| 12; 13; 99 |])
125+
126+
[<Theory; ClassData(typeof<TestSideEffectTaskSeq>)>]
127+
let ``TaskSeq-except removes everything`` variant =
128+
Gen.getSeqWithSideEffect variant
129+
|> TaskSeq.except (Gen.getSeqWithSideEffect variant)
130+
|> verifyEmpty
131+
132+
[<Theory; ClassData(typeof<TestSideEffectTaskSeq>)>]
133+
let ``TaskSeq-except removes everything with duplicates`` variant =
134+
taskSeq {
135+
yield! Gen.getSeqWithSideEffect variant
136+
yield! Gen.getSeqWithSideEffect variant
137+
yield! Gen.getSeqWithSideEffect variant
138+
yield! Gen.getSeqWithSideEffect variant
139+
}
140+
|> TaskSeq.except (Gen.getSeqWithSideEffect variant)
141+
|> verifyEmpty

src/FSharp.Control.TaskSeq/TaskSeq.fs

Lines changed: 2 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -56,29 +56,6 @@ module TaskSeq =
5656
e.DisposeAsync().AsTask().Wait()
5757
}
5858

59-
// FIXME: incomplete and incorrect code!!! TODO: still needed?
60-
let toSeqOfTasks (source: taskSeq<'T>) = seq {
61-
let e = source.GetAsyncEnumerator(CancellationToken())
62-
63-
// TODO: check this!
64-
try
65-
let mutable go = false
66-
67-
while go do
68-
yield task {
69-
let! step = e.MoveNextAsync()
70-
go <- step
71-
72-
if step then
73-
return e.Current
74-
else
75-
return Unchecked.defaultof<_> // FIXME!
76-
}
77-
78-
finally
79-
e.DisposeAsync().AsTask().Wait()
80-
}
81-
8259
let toArrayAsync source =
8360
Internal.toResizeArrayAsync source
8461
|> Task.map (fun a -> a.ToArray())
@@ -281,6 +258,8 @@ module TaskSeq =
281258
let tryFindAsync predicate source = Internal.tryFind (PredicateAsync predicate) source
282259
let tryFindIndex predicate source = Internal.tryFindIndex (Predicate predicate) source
283260
let tryFindIndexAsync predicate source = Internal.tryFindIndex (PredicateAsync predicate) source
261+
let except itemsToExclude source = Internal.except itemsToExclude source
262+
let exceptOfSeq itemsToExclude source = Internal.exceptOfSeq itemsToExclude source
284263

285264
let exists predicate source =
286265
Internal.tryFind (Predicate predicate) source

src/FSharp.Control.TaskSeq/TaskSeq.fsi

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -500,6 +500,44 @@ module TaskSeq =
500500
/// <exception cref="T:ArgumentNullException">Thrown when the input sequence is null.</exception>
501501
val existsAsync: predicate: ('T -> #Task<bool>) -> source: taskSeq<'T> -> Task<bool>
502502

503+
/// <summary>
504+
/// Returns a new task sequence with the distinct elements of the second task sequence which do not appear in the
505+
/// <paramref name="itemsToExclude" />, using generic hash and equality comparisons to compare values.
506+
/// </summary>
507+
///
508+
/// <remarks>
509+
/// Note that this function returns a task sequence that digests the whole of the first input task sequence as soon as
510+
/// the result sequence first gets awaited or iterated. As a result this function should not be used with
511+
/// large or infinite sequences in the first parameter. The function makes no assumption on the ordering of the first input
512+
/// sequence.
513+
/// </remarks>
514+
///
515+
/// <param name="itemsToExclude">A task sequence whose elements that also occur in the second sequence will cause those elements to be removed from the returned sequence.</param>
516+
/// <param name="source">A sequence whose elements that are not also in first will be returned.</param>
517+
/// <returns>A sequence that contains the set difference of the elements of two sequences.</returns>
518+
///
519+
/// <exception cref="T:ArgumentNullException">Thrown when either of the two input sequences is null.</exception>
520+
val except<'T when 'T: equality> : itemsToExclude: taskSeq<'T> -> source: taskSeq<'T> -> taskSeq<'T>
521+
522+
/// <summary>
523+
/// Returns a new task sequence with the distinct elements of the second task sequence which do not appear in the
524+
/// <paramref name="itemsToExclude" />, using generic hash and equality comparisons to compare values.
525+
/// </summary>
526+
///
527+
/// <remarks>
528+
/// Note that this function returns a task sequence that digests the whole of the first input task sequence as soon as
529+
/// the result sequence first gets awaited or iterated. As a result this function should not be used with
530+
/// large or infinite sequences in the first parameter. The function makes no assumption on the ordering of the first input
531+
/// sequence.
532+
/// </remarks>
533+
///
534+
/// <param name="itemsToExclude">A task sequence whose elements that also occur in the second sequence will cause those elements to be removed from the returned sequence.</param>
535+
/// <param name="source">A sequence whose elements that are not also in first will be returned.</param>
536+
/// <returns>A sequence that contains the set difference of the elements of two sequences.</returns>
537+
///
538+
/// <exception cref="T:ArgumentNullException">Thrown when either of the two input sequences is null.</exception>
539+
val exceptOfSeq<'T when 'T: equality> : itemsToExclude: seq<'T> -> source: taskSeq<'T> -> taskSeq<'T>
540+
503541
/// <summary>
504542
/// Zips two task sequences, returning a taskSeq of the tuples of each sequence, in order. May raise ArgumentException
505543
/// if the sequences are or unequal length.

src/FSharp.Control.TaskSeq/TaskSeqInternal.fs

Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -517,3 +517,103 @@ module internal TaskSeqInternal =
517517
| true -> yield item
518518
| false -> ()
519519
}
520+
// Consider turning using an F# version of this instead?
521+
// https://github.com/i3arnon/ConcurrentHashSet
522+
type ConcurrentHashSet<'T when 'T: equality>(ct) =
523+
let _rwLock = new ReaderWriterLockSlim()
524+
let hashSet = HashSet<'T>(Array.empty, HashIdentity.Structural)
525+
526+
member _.Add item =
527+
_rwLock.EnterWriteLock()
528+
529+
try
530+
hashSet.Add item
531+
finally
532+
_rwLock.ExitWriteLock()
533+
534+
member _.AddMany items =
535+
_rwLock.EnterWriteLock()
536+
537+
try
538+
for item in items do
539+
hashSet.Add item |> ignore
540+
541+
finally
542+
_rwLock.ExitWriteLock()
543+
544+
member _.AddManyAsync(source: taskSeq<'T>) = task {
545+
use e = source.GetAsyncEnumerator(ct)
546+
let mutable go = true
547+
let! step = e.MoveNextAsync()
548+
go <- step
549+
550+
while go do
551+
// NOTE: r/w lock cannot cross thread boundaries. Should we use SemaphoreSlim instead?
552+
// or alternatively, something like this: https://github.com/StephenCleary/AsyncEx/blob/8a73d0467d40ca41f9f9cf827c7a35702243abb8/src/Nito.AsyncEx.Coordination/AsyncReaderWriterLock.cs#L16
553+
// not sure how they compare.
554+
555+
_rwLock.EnterWriteLock()
556+
557+
try
558+
hashSet.Add e.Current |> ignore
559+
finally
560+
_rwLock.ExitWriteLock()
561+
562+
let! step = e.MoveNextAsync()
563+
go <- step
564+
}
565+
566+
interface IAsyncDisposable with
567+
override _.DisposeAsync() =
568+
if not (isNull _rwLock) then
569+
_rwLock.Dispose()
570+
571+
ValueTask.CompletedTask
572+
573+
let except itemsToExclude (source: taskSeq<_>) = taskSeq {
574+
use e = source.GetAsyncEnumerator(CancellationToken())
575+
let mutable go = true
576+
let! step = e.MoveNextAsync()
577+
go <- step
578+
579+
if step then
580+
// only create hashset by the time we actually start iterating
581+
use hashSet = new ConcurrentHashSet<_>(CancellationToken())
582+
do! hashSet.AddManyAsync itemsToExclude
583+
584+
while go do
585+
let current = e.Current
586+
587+
// if true, it was added, and therefore unique, so we return it
588+
// if false, it existed, and therefore a duplicate, and we skip
589+
if hashSet.Add current then
590+
yield current
591+
592+
let! step = e.MoveNextAsync()
593+
go <- step
594+
595+
}
596+
597+
let exceptOfSeq itemsToExclude (source: taskSeq<_>) = taskSeq {
598+
use e = source.GetAsyncEnumerator(CancellationToken())
599+
let mutable go = true
600+
let! step = e.MoveNextAsync()
601+
go <- step
602+
603+
if step then
604+
// only create hashset by the time we actually start iterating
605+
use hashSet = new ConcurrentHashSet<_>(CancellationToken())
606+
do hashSet.AddMany itemsToExclude
607+
608+
while go do
609+
let current = e.Current
610+
611+
// if true, it was added, and therefore unique, so we return it
612+
// if false, it existed, and therefore a duplicate, and we skip
613+
if hashSet.Add current then
614+
yield current
615+
616+
let! step = e.MoveNextAsync()
617+
go <- step
618+
619+
}

0 commit comments

Comments
 (0)