@@ -14,6 +14,8 @@ public class MpscBoundedBufferSoakTests
1414 private readonly ITestOutputHelper testOutputHelper ;
1515 private static readonly TimeSpan Timeout = TimeSpan . FromSeconds ( 30 ) ;
1616
17+ private readonly MpscBoundedBuffer < string > buffer = new MpscBoundedBuffer < string > ( 1024 ) ;
18+
1719 public MpscBoundedBufferSoakTests ( ITestOutputHelper testOutputHelper )
1820 {
1921 this . testOutputHelper = testOutputHelper ;
@@ -22,8 +24,6 @@ public MpscBoundedBufferSoakTests(ITestOutputHelper testOutputHelper)
2224 [ Fact ]
2325 public async Task WhenAddIsContendedBufferCanBeFilled ( )
2426 {
25- var buffer = new MpscBoundedBuffer < string > ( 1024 ) ;
26-
2727 await Threaded . Run ( 4 , ( ) =>
2828 {
2929 while ( buffer . TryAdd ( "hello" ) != BufferStatus . Full )
@@ -39,25 +39,7 @@ public async Task WhileBufferIsFilledItemsCanBeTaken()
3939 {
4040 this . testOutputHelper . WriteLine ( $ "ProcessorCount={ Environment . ProcessorCount } .") ;
4141
42- var buffer = new MpscBoundedBuffer < string > ( 1024 ) ;
43-
44- var fill = Threaded . Run ( 4 , ( ) =>
45- {
46- var spin = new SpinWait ( ) ;
47- int count = 0 ;
48- while ( count < 256 )
49- {
50- while ( true )
51- {
52- if ( buffer . TryAdd ( "hello" ) == BufferStatus . Success )
53- {
54- break ;
55- }
56- spin . SpinOnce ( ) ;
57- }
58- count ++ ;
59- }
60- } ) ;
42+ var fill = CreateParallelFill ( buffer , threads : 4 , itemsPerThread : 256 ) ;
6143
6244 var take = Task . Run ( ( ) =>
6345 {
@@ -83,13 +65,53 @@ public async Task WhileBufferIsFilledBufferCanBeDrained()
8365 {
8466 this . testOutputHelper . WriteLine ( $ "ProcessorCount={ Environment . ProcessorCount } .") ;
8567
86- var buffer = new MpscBoundedBuffer < string > ( 1024 ) ;
68+ var fill = CreateParallelFill ( buffer , threads : 4 , itemsPerThread : 256 ) ;
8769
88- var fill = Threaded . Run ( 4 , ( ) =>
70+ var drain = Task . Run ( ( ) =>
71+ {
72+ int drained = 0 ;
73+ var drainBuffer = new ArraySegment < string > ( new string [ 1024 ] ) ;
74+
75+ while ( drained < 1024 )
76+ {
77+ drained += buffer . DrainTo ( drainBuffer ) ;
78+ }
79+ } ) ;
80+
81+ await fill . TimeoutAfter ( Timeout , "fill timed out" ) ;
82+ await drain . TimeoutAfter ( Timeout , "drain timed out" ) ;
83+ }
84+
85+ [ Fact ]
86+ public async Task WhileBufferIsFilledCountCanBeTaken ( )
87+ {
88+ this . testOutputHelper . WriteLine ( $ "ProcessorCount={ Environment . ProcessorCount } .") ;
89+
90+ var fill = CreateParallelFill ( buffer , threads : 4 , itemsPerThread : 256 ) ;
91+
92+ var count = Threaded . Run ( 4 , ( ) =>
93+ {
94+ int count = 0 ;
95+
96+ while ( ! fill . IsCompleted )
97+ {
98+ int newcount = buffer . Count ;
99+ newcount . Should ( ) . BeGreaterThanOrEqualTo ( count ) ;
100+ count = newcount ;
101+ }
102+ } ) ;
103+
104+ await fill . TimeoutAfter ( Timeout , "fill timed out" ) ;
105+ await count . TimeoutAfter ( Timeout , "count timed out" ) ;
106+ }
107+
108+ private Task CreateParallelFill ( MpscBoundedBuffer < string > buffer , int threads , int itemsPerThread )
109+ {
110+ return Threaded . Run ( threads , ( ) =>
89111 {
90112 var spin = new SpinWait ( ) ;
91113 int count = 0 ;
92- while ( count < 256 )
114+ while ( count < itemsPerThread )
93115 {
94116 while ( true )
95117 {
@@ -102,20 +124,6 @@ public async Task WhileBufferIsFilledBufferCanBeDrained()
102124 count ++ ;
103125 }
104126 } ) ;
105-
106- var drain = Task . Run ( ( ) =>
107- {
108- int drained = 0 ;
109- var drainBuffer = new ArraySegment < string > ( new string [ 1024 ] ) ;
110-
111- while ( drained < 1024 )
112- {
113- drained += buffer . DrainTo ( drainBuffer ) ;
114- }
115- } ) ;
116-
117- await fill . TimeoutAfter ( Timeout , "fill timed out" ) ;
118- await drain . TimeoutAfter ( Timeout , "drain timed out" ) ;
119127 }
120128 }
121129}
0 commit comments