Skip to content

Commit 56274c4

Browse files
authored
Merge pull request #828 from AlexanderZagaynov/threads_naming
Threads naming
2 parents c85e099 + ced8496 commit 56274c4

File tree

7 files changed

+69
-9
lines changed

7 files changed

+69
-9
lines changed

lib/concurrent/executor/abstract_executor_service.rb

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,19 @@ class AbstractExecutorService < Synchronization::LockableObject
1616
# @!macro executor_service_attr_reader_fallback_policy
1717
attr_reader :fallback_policy
1818

19+
attr_reader :name
20+
1921
# Create a new thread pool.
20-
def initialize(*args, &block)
22+
def initialize(opts = {}, &block)
2123
super(&nil)
22-
synchronize { ns_initialize(*args, &block) }
24+
synchronize do
25+
ns_initialize(opts, &block)
26+
@name = opts.fetch(:name) if opts.key?(:name)
27+
end
28+
end
29+
30+
def to_s
31+
name ? "#{super[0..-2]} name: #{name}>" : super
2332
end
2433

2534
# @!macro executor_service_method_shutdown

lib/concurrent/executor/fixed_thread_pool.rb

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,9 @@ module Concurrent
115115
# Thread pools support several configuration options:
116116
#
117117
# * `idletime`: The number of seconds that a thread may be idle before being reclaimed.
118+
# * `name`: The name of the executor (optional). Printed in the executor's `#to_s` output and
119+
# a `<name>-worker-<id>` name is given to its threads if supported by used Ruby
120+
# implementation. `<id>` is uniq for each thread.
118121
# * `max_queue`: The maximum number of tasks that may be waiting in the work queue at
119122
# any one time. When the queue size reaches `max_queue` and no new threads can be created,
120123
# subsequent tasks will be rejected in accordance with the configured `fallback_policy`.

lib/concurrent/executor/java_executor_service.rb

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,6 @@ class JavaExecutorService < AbstractExecutorService
1818
}.freeze
1919
private_constant :FALLBACK_POLICY_CLASSES
2020

21-
def initialize(*args, &block)
22-
super
23-
end
24-
2521
def post(*args, &task)
2622
raise ArgumentError.new('no block given') unless block_given?
2723
return handle_fallback(*args, &task) unless running?

lib/concurrent/executor/ruby_thread_pool_executor.rb

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -131,6 +131,7 @@ def ns_initialize(opts)
131131
@scheduled_task_count = 0
132132
@completed_task_count = 0
133133
@largest_length = 0
134+
@workers_counter = 0
134135
@ruby_pid = $$ # detects if Ruby has forked
135136

136137
@gc_interval = opts.fetch(:gc_interval, @idletime / 2.0).to_i # undocumented
@@ -224,7 +225,8 @@ def ns_worker_died(worker)
224225
def ns_add_busy_worker
225226
return if @pool.size >= @max_length
226227

227-
@pool << (worker = Worker.new(self))
228+
@workers_counter += 1
229+
@pool << (worker = Worker.new(self, @workers_counter))
228230
@largest_length = @pool.length if @pool.length > @largest_length
229231
worker
230232
end
@@ -284,6 +286,7 @@ def ns_reset_if_forked
284286
@scheduled_task_count = 0
285287
@completed_task_count = 0
286288
@largest_length = 0
289+
@workers_counter = 0
287290
@ruby_pid = $$
288291
end
289292
end
@@ -292,11 +295,15 @@ def ns_reset_if_forked
292295
class Worker
293296
include Concern::Logging
294297

295-
def initialize(pool)
298+
def initialize(pool, id)
296299
# instance variables accessed only under pool's lock so no need to sync here again
297300
@queue = Queue.new
298301
@pool = pool
299302
@thread = create_worker @queue, pool, pool.idletime
303+
304+
if @thread.respond_to?(:name=)
305+
@thread.name = [pool.name, 'worker', id].compact.join('-')
306+
end
300307
end
301308

302309
def <<(message)

lib/concurrent/executor/simple_executor_service.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,7 @@ def wait_for_termination(timeout = nil)
9191

9292
private
9393

94-
def ns_initialize
94+
def ns_initialize(*args)
9595
@running = Concurrent::AtomicBoolean.new(true)
9696
@stopped = Concurrent::Event.new
9797
@count = Concurrent::AtomicFixnum.new(0)

spec/concurrent/executor/ruby_thread_pool_executor_spec.rb

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,5 +54,39 @@ module Concurrent
5454
block.count_down
5555
end
5656
end
57+
58+
if Concurrent.on_cruby? && Concurrent.ruby_version(:>=, 2, 3, 0)
59+
context 'threads naming' do
60+
subject do
61+
opts = { min_threads: 2 }
62+
opts[:name] = pool_name if pool_name
63+
described_class.new(opts)
64+
end
65+
66+
let(:names) { Concurrent::Set.new }
67+
68+
before do
69+
subject.post(names) { |names| names << Thread.current.name }
70+
subject.post(names) { |names| names << Thread.current.name }
71+
subject.shutdown
72+
subject.wait_for_termination(pool_termination_timeout)
73+
expect(names.size).to eq 2
74+
end
75+
76+
context 'without pool name' do
77+
let(:pool_name) { }
78+
it 'sets counted name' do
79+
expect(names.all? { |name| name =~ /^worker-\d+$/ }).to be true
80+
end
81+
end
82+
83+
context 'with pool name' do
84+
let(:pool_name) { 'MyExecutor' }
85+
it 'sets counted name' do
86+
expect(names.all? { |name| name =~ /^MyExecutor-worker-\d+$/ }).to be true
87+
end
88+
end
89+
end
90+
end
5791
end
5892
end

spec/concurrent/executor/thread_pool_executor_shared.rb

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,10 @@
3131
it 'defaults :fallback_policy to :abort' do
3232
expect(subject.fallback_policy).to eq :abort
3333
end
34+
35+
it 'defaults :name to nil' do
36+
expect(subject.name).to be_nil
37+
end
3438
end
3539

3640
context "#initialize explicit values" do
@@ -100,6 +104,13 @@
100104
described_class.new(fallback_policy: :bogus)
101105
}.to raise_error(ArgumentError)
102106
end
107+
108+
it 'sets :name' do
109+
pool = described_class.new(name: 'MyPool')
110+
expect(pool.name).to eq 'MyPool'
111+
pool.shutdown
112+
pool.wait_for_termination(pool_termination_timeout)
113+
end
103114
end
104115

105116
context '#max_queue' do

0 commit comments

Comments
 (0)