1+ require 'thread'
12require 'concurrent/configuration'
23require 'concurrent/ivar'
3- require 'concurrent/executor/immediate_executor'
44
55module Concurrent
66
@@ -42,10 +42,9 @@ module Concurrent
4242 # ## Basic Usage
4343 #
4444 # When this module is mixed into a class, objects of the class become inherently
45- # asynchronous. Each object gets its own background thread (specifically,
46- # `SingleThreadExecutor`) on which to post asynchronous method calls.
47- # Asynchronous method calls are executed in the background one at a time in
48- # the order they are received.
45+ # asynchronous. Each object gets its own background thread on which to post
46+ # asynchronous method calls. Asynchronous method calls are executed in the
47+ # background one at a time in the order they are received.
4948 #
5049 # To create an asynchronous class, simply mix in the `Concurrent::Async` module:
5150 #
@@ -232,7 +231,6 @@ module Concurrent
232231 # # returns an IVar in the :complete state
233232 #
234233 # @see Concurrent::Actor
235- # @see Concurrent::SingleThreadExecutor
236234 # @see https://en.wikipedia.org/wiki/Actor_model "Actor Model" at Wikipedia
237235 # @see http://www.erlang.org/doc/man/gen_server.html Erlang gen_server
238236 # @see http://c2.com/cgi/wiki?LetItCrash "Let It Crash" at http://c2.com/
@@ -307,13 +305,15 @@ class AsyncDelegator
307305 # given executor. Block if necessary.
308306 #
309307 # @param [Object] delegate the object to wrap and delegate method calls to
310- # @param [Concurrent::ExecutorService] executor the executor on which to execute delegated method calls
308+ # @param [Array] job queue which guarantees serialization of method calls
309+ # @param [Mutex] mutex which synchronizes queue operations
311310 # @param [Boolean] blocking will block awaiting result when `true`
312- def initialize ( delegate , executor , serializer , blocking )
311+ def initialize ( delegate , queue , mutex , blocking )
313312 @delegate = delegate
314- @executor = executor
315- @serializer = serializer
313+ @queue = queue
314+ @mutex = mutex
316315 @blocking = blocking
316+ @executor = Concurrent . global_io_executor
317317 end
318318
319319 # Delegates method calls to the wrapped object. For performance,
@@ -332,15 +332,28 @@ def method_missing(method, *args, &block)
332332 Async ::validate_argc ( @delegate , method , *args )
333333
334334 ivar = Concurrent ::IVar . new
335- @serializer . post ( @executor , args ) do |arguments |
335+ @mutex . synchronize do
336+ @queue . push [ ivar , method , args , block ]
337+ @executor . post { perform } if @queue . length == 1
338+ end
339+
340+ ivar . wait if @blocking
341+ ivar
342+ end
343+
344+ def perform
345+ loop do
346+ ivar , method , args , block = @mutex . synchronize { @queue . first }
347+ break unless ivar # queue is empty
348+
336349 begin
337- ivar . set ( @delegate . send ( method , *arguments , &block ) )
350+ ivar . set ( @delegate . send ( method , *args , &block ) )
338351 rescue => error
339352 ivar . fail ( error )
340353 end
354+
355+ @mutex . synchronize { @queue . shift }
341356 end
342- ivar . wait if @blocking
343- ivar
344357 end
345358 end
346359 private_constant :AsyncDelegator
@@ -387,8 +400,6 @@ def await
387400 end
388401 alias_method :call , :await
389402
390- private
391-
392403 # Initialize the internal serializer and other stnchronization mechanisms.
393404 #
394405 # @note This method *must* be called immediately upon object construction.
@@ -398,9 +409,10 @@ def await
398409 def init_synchronization
399410 return self if @__async_initialized__
400411 @__async_initialized__ = true
401- serializer = Concurrent ::SerializedExecution . new
402- @__await_delegator__ = AsyncDelegator . new ( self , Concurrent ::ImmediateExecutor . new , serializer , true )
403- @__async_delegator__ = AsyncDelegator . new ( self , Concurrent . global_io_executor , serializer , false )
412+ queue = [ ]
413+ mutex = Mutex . new
414+ @__await_delegator__ = AsyncDelegator . new ( self , queue , mutex , true )
415+ @__async_delegator__ = AsyncDelegator . new ( self , queue , mutex , false )
404416 self
405417 end
406418 end
0 commit comments