Skip to content

Commit f0d68fb

Browse files
committed
Remove timeout from TimerTask
The timeout in TimerTask is not ensuring tasks are not allowed to continue processing after the timeout has passed. This can lead to threads leaking since TimerTask will try to run the provided task again before the previous execution has completed. Yet TimerTask will not allow the task to run in parallel so more and more worker threads will be queued up waiting to be scheduled for executing the task. To illustrate, imagine running a TimerTask with an execution interval of 1 and a timeout interval of 1, with the task itself running for 4 seconds. Time 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 Worker1 s> t - - < > t . . . . . - - - < Worker2 s > t . - - - < > t . . . . . Worker3 s > t . . . - - - < > t . Worker4 s > t . . . . . . . Worker5 s > t . . . At t=1 worker 1 is spawned(s) and scheduled(>) and will start executing the task. It will timeout(t) after 1 second and cause the spawning of worker 2. Worker 2 will then wait 1 second to be scheduled and then another second to timeout causing the spawn of worker 3 at t=4. Worker 3 is then scheduled to start at t=5 and will timeout at t=6. At this point worker 1 has completed it's previous task so the task queued by worker 3 will go to worker 1 to be scheduled for t=7. At t=8 worker 1 will timeout and since worker 2 is currently executing(-) and worker 3 is current waiting(.) for worker 2 to completed worker 4 will be spawned. This patterns will continue to repeat with new workers/threads spawned every 4 seconds.
1 parent a9ae6de commit f0d68fb

File tree

1 file changed

+6
-48
lines changed

1 file changed

+6
-48
lines changed

lib/concurrent-ruby/concurrent/timer_task.rb

Lines changed: 6 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -25,9 +25,7 @@ module Concurrent
2525
# Should the task experience an unrecoverable crash only the task thread will
2626
# crash. This makes the `TimerTask` very fault tolerant. Additionally, the
2727
# `TimerTask` thread can respond to the success or failure of the task,
28-
# performing logging or ancillary operations. `TimerTask` can also be
29-
# configured with a timeout value allowing it to kill a task that runs too
30-
# long.
28+
# performing logging or ancillary operations.
3129
#
3230
# One other advantage of `TimerTask` is that it forces the business logic to
3331
# be completely decoupled from the concurrency logic. The business logic can
@@ -48,9 +46,7 @@ module Concurrent
4846
# {http://ruby-doc.org/stdlib-2.0/libdoc/observer/rdoc/Observable.html
4947
# Observable} module. On execution the `TimerTask` will notify the observers
5048
# with three arguments: time of execution, the result of the block (or nil on
51-
# failure), and any raised exceptions (or nil on success). If the timeout
52-
# interval is exceeded the observer will receive a `Concurrent::TimeoutError`
53-
# object as the third argument.
49+
# failure), and any raised exceptions (or nil on success).
5450
#
5551
# @!macro copy_options
5652
#
@@ -59,20 +55,18 @@ module Concurrent
5955
# task.execute
6056
#
6157
# task.execution_interval #=> 60 (default)
62-
# task.timeout_interval #=> 30 (default)
6358
#
6459
# # wait 60 seconds...
6560
# #=> 'Boom!'
6661
#
6762
# task.shutdown #=> true
6863
#
69-
# @example Configuring `:execution_interval` and `:timeout_interval`
70-
# task = Concurrent::TimerTask.new(execution_interval: 5, timeout_interval: 5) do
64+
# @example Configuring `:execution_interval`
65+
# task = Concurrent::TimerTask.new(execution_interval: 5) do
7166
# puts 'Boom!'
7267
# end
7368
#
7469
# task.execution_interval #=> 5
75-
# task.timeout_interval #=> 5
7670
#
7771
# @example Immediate execution with `:run_now`
7872
# task = Concurrent::TimerTask.new(run_now: true){ puts 'Boom!' }
@@ -115,15 +109,13 @@ module Concurrent
115109
# def update(time, result, ex)
116110
# if result
117111
# print "(#{time}) Execution successfully returned #{result}\n"
118-
# elsif ex.is_a?(Concurrent::TimeoutError)
119-
# print "(#{time}) Execution timed out\n"
120112
# else
121113
# print "(#{time}) Execution failed with error #{ex}\n"
122114
# end
123115
# end
124116
# end
125117
#
126-
# task = Concurrent::TimerTask.new(execution_interval: 1, timeout_interval: 1){ 42 }
118+
# task = Concurrent::TimerTask.new(execution_interval: 1){ 42 }
127119
# task.add_observer(TaskObserver.new)
128120
# task.execute
129121
# sleep 4
@@ -133,7 +125,7 @@ module Concurrent
133125
# #=> (2013-10-13 19:09:00 -0400) Execution successfully returned 42
134126
# task.shutdown
135127
#
136-
# task = Concurrent::TimerTask.new(execution_interval: 1, timeout_interval: 1){ sleep }
128+
# task = Concurrent::TimerTask.new(execution_interval: 1){ sleep }
137129
# task.add_observer(TaskObserver.new)
138130
# task.execute
139131
#
@@ -160,17 +152,12 @@ class TimerTask < RubyExecutorService
160152
# Default `:execution_interval` in seconds.
161153
EXECUTION_INTERVAL = 60
162154

163-
# Default `:timeout_interval` in seconds.
164-
TIMEOUT_INTERVAL = 30
165-
166155
# Create a new TimerTask with the given task and configuration.
167156
#
168157
# @!macro timer_task_initialize
169158
# @param [Hash] opts the options defining task execution.
170159
# @option opts [Integer] :execution_interval number of seconds between
171160
# task executions (default: EXECUTION_INTERVAL)
172-
# @option opts [Integer] :timeout_interval number of seconds a task can
173-
# run before it is considered to have failed (default: TIMEOUT_INTERVAL)
174161
# @option opts [Boolean] :run_now Whether to run the task immediately
175162
# upon instantiation or to wait until the first # execution_interval
176163
# has passed (default: false)
@@ -252,24 +239,6 @@ def execution_interval=(value)
252239
end
253240
end
254241

255-
# @!attribute [rw] timeout_interval
256-
# @return [Fixnum] Number of seconds the task can run before it is
257-
# considered to have failed.
258-
def timeout_interval
259-
synchronize { @timeout_interval }
260-
end
261-
262-
# @!attribute [rw] timeout_interval
263-
# @return [Fixnum] Number of seconds the task can run before it is
264-
# considered to have failed.
265-
def timeout_interval=(value)
266-
if (value = value.to_f) <= 0.0
267-
raise ArgumentError.new('must be greater than zero')
268-
else
269-
synchronize { @timeout_interval = value }
270-
end
271-
end
272-
273242
private :post, :<<
274243

275244
private
@@ -278,7 +247,6 @@ def ns_initialize(opts, &task)
278247
set_deref_options(opts)
279248

280249
self.execution_interval = opts[:execution] || opts[:execution_interval] || EXECUTION_INTERVAL
281-
self.timeout_interval = opts[:timeout] || opts[:timeout_interval] || TIMEOUT_INTERVAL
282250
@run_now = opts[:now] || opts[:run_now]
283251
@executor = Concurrent::SafeTaskExecutor.new(task)
284252
@running = Concurrent::AtomicBoolean.new(false)
@@ -308,7 +276,6 @@ def schedule_next_task(interval = execution_interval)
308276
# @!visibility private
309277
def execute_task(completion)
310278
return nil unless @running.true?
311-
ScheduledTask.execute(timeout_interval, args: [completion], &method(:timeout_task))
312279
_success, value, reason = @executor.execute(self)
313280
if completion.try?
314281
self.value = value
@@ -320,14 +287,5 @@ def execute_task(completion)
320287
end
321288
nil
322289
end
323-
324-
# @!visibility private
325-
def timeout_task(completion)
326-
return unless @running.true?
327-
if completion.try?
328-
schedule_next_task
329-
observers.notify_observers(Time.now, nil, Concurrent::TimeoutError.new)
330-
end
331-
end
332290
end
333291
end

0 commit comments

Comments
 (0)