File tree Expand file tree Collapse file tree 5 files changed +96
-11
lines changed
Expand file tree Collapse file tree 5 files changed +96
-11
lines changed Original file line number Diff line number Diff line change 3232require 'concurrent/tvar'
3333require 'concurrent/utilities'
3434
35+ require 'concurrent/channel/probe'
3536require 'concurrent/channel/unbuffered_channel'
3637
3738require 'concurrent/cached_thread_pool'
Original file line number Diff line number Diff line change 1+ module Concurrent
2+ class Probe < IVar
3+
4+ def initialize ( value = NO_VALUE , opts = { } )
5+ super ( value , opts )
6+ end
7+
8+ def set_unless_assigned ( value )
9+ mutex . synchronize do
10+ return false if [ :fulfilled , :rejected ] . include? @state
11+
12+ set_state ( true , value , nil )
13+ event . set
14+ true
15+ end
16+
17+ end
18+ end
19+ end
Original file line number Diff line number Diff line change @@ -9,30 +9,33 @@ def initialize
99 end
1010
1111 def push ( value )
12- probe = @mutex . synchronize do
13- @condition . wait ( @mutex ) while @wait_set . empty?
14- @wait_set . shift
12+ until first_waiting_probe . set_unless_assigned ( value )
1513 end
16-
17- probe . set ( value )
1814 end
1915
2016 def pop
21- probe = IVar . new
17+ probe = Probe . new
18+ select ( probe )
19+ probe . value
20+ end
2221
22+ def select ( probe )
2323 @mutex . synchronize do
2424 @wait_set << probe
2525 @condition . signal
2626 end
27-
28- probe . value
29- end
30-
31- def select ( probe )
3227 end
3328
3429 def remove_probe ( probe )
3530 end
3631
32+ private
33+ def first_waiting_probe
34+ @mutex . synchronize do
35+ @condition . wait ( @mutex ) while @wait_set . empty?
36+ @wait_set . shift
37+ end
38+ end
39+
3740 end
3841end
Original file line number Diff line number Diff line change 1+ require 'spec_helper'
2+
3+ module Concurrent
4+
5+ describe Probe do
6+ it 'should be written'
7+ end
8+ end
Original file line number Diff line number Diff line change @@ -31,6 +31,7 @@ module Concurrent
3131 end
3232
3333 context 'cooperating threads' do
34+
3435 it 'passes the pushed value to thread waiting on pop' do
3536 result = nil
3637
@@ -41,7 +42,60 @@ module Concurrent
4142
4243 result . should eq 42
4344 end
45+
46+ it 'passes the pushed value to only one thread' do
47+ result = [ ]
48+
49+ Thread . new { channel . push 37 }
50+ Thread . new { result << channel . pop }
51+ Thread . new { result << channel . pop }
52+
53+ sleep ( 0.05 )
54+
55+ result . should have ( 1 ) . items
56+ end
4457 end
4558
59+ describe 'select' do
60+
61+ let ( :probe ) { Probe . new }
62+
63+ it 'does not block' do
64+ t = Thread . new { channel . select ( probe ) }
65+
66+ sleep ( 0.05 )
67+
68+ t . status . should eq false
69+ end
70+
71+ it 'gets notified by writer thread' do
72+ channel . select ( probe )
73+
74+ Thread . new { channel . push 82 }
75+
76+ probe . value . should eq 82
77+ end
78+
79+ it 'ignores already set probes and waits for a new one' do
80+ probe . set ( 27 )
81+
82+ channel . select ( probe )
83+
84+ t = Thread . new { channel . push 72 }
85+
86+ sleep ( 0.05 )
87+
88+ t . status . should eq 'sleep'
89+
90+ new_probe = Probe . new
91+
92+ channel . select ( new_probe )
93+
94+ sleep ( 0.05 )
95+
96+ new_probe . value . should eq 72
97+ end
98+
99+ end
46100 end
47101end
You can’t perform that action at this time.
0 commit comments