From 5b5b53d741e708117a25942c58c4c028ee1afcb8 Mon Sep 17 00:00:00 2001 From: Samuel Williams Date: Sun, 22 Jan 2023 09:24:10 +1300 Subject: [PATCH 1/6] Introduce FiberLocalVar and LockLocalVar for alignment with `Mutex` scope. # Conflicts: # lib/concurrent-ruby/concurrent/atomic/reentrant_read_write_lock.rb # spec/concurrent/atomic/reentrant_read_write_lock_spec.rb --- ...thread_local_var.rb => fiber_local_var.rb} | 19 +- .../atomic/java_thread_local_var.rb | 37 - .../concurrent/atomic/lock_local_var.rb | 20 + .../atomic/reentrant_read_write_lock.rb | 4 +- .../atomic/ruby_thread_local_var.rb | 181 ---- .../concurrent/atomic/thread_local_var.rb | 146 ++-- .../atomic/reentrant_read_write_lock_spec.rb | 824 ++++++++++-------- .../atomic/thread_local_var_spec.rb | 12 +- 8 files changed, 530 insertions(+), 713 deletions(-) rename lib/concurrent-ruby/concurrent/atomic/{abstract_thread_local_var.rb => fiber_local_var.rb} (73%) delete mode 100644 lib/concurrent-ruby/concurrent/atomic/java_thread_local_var.rb create mode 100644 lib/concurrent-ruby/concurrent/atomic/lock_local_var.rb delete mode 100644 lib/concurrent-ruby/concurrent/atomic/ruby_thread_local_var.rb diff --git a/lib/concurrent-ruby/concurrent/atomic/abstract_thread_local_var.rb b/lib/concurrent-ruby/concurrent/atomic/fiber_local_var.rb similarity index 73% rename from lib/concurrent-ruby/concurrent/atomic/abstract_thread_local_var.rb rename to lib/concurrent-ruby/concurrent/atomic/fiber_local_var.rb index fcdeed7f8..8f13eea55 100644 --- a/lib/concurrent-ruby/concurrent/atomic/abstract_thread_local_var.rb +++ b/lib/concurrent-ruby/concurrent/atomic/fiber_local_var.rb @@ -1,13 +1,7 @@ require 'concurrent/constants' module Concurrent - - # @!macro thread_local_var - # @!macro internal_implementation_note - # @!visibility private - class AbstractThreadLocalVar - - # @!macro thread_local_var_method_initialize + class FiberLocalVar def initialize(default = nil, &default_block) if default && block_given? raise ArgumentError, "Cannot use both value and block as default value" @@ -21,17 +15,17 @@ def initialize(default = nil, &default_block) @default = default end - allocate_storage + @name = :"concurrent_variable_#{object_id}" end # @!macro thread_local_var_method_get def value - raise NotImplementedError + Thread.current.fetch(@name) {default} end # @!macro thread_local_var_method_set def value=(value) - raise NotImplementedError + Thread.current[@name] = value end # @!macro thread_local_var_method_bind @@ -49,11 +43,6 @@ def bind(value, &block) protected - # @!visibility private - def allocate_storage - raise NotImplementedError - end - # @!visibility private def default if @default_block diff --git a/lib/concurrent-ruby/concurrent/atomic/java_thread_local_var.rb b/lib/concurrent-ruby/concurrent/atomic/java_thread_local_var.rb deleted file mode 100644 index b41018ffe..000000000 --- a/lib/concurrent-ruby/concurrent/atomic/java_thread_local_var.rb +++ /dev/null @@ -1,37 +0,0 @@ -require 'concurrent/atomic/abstract_thread_local_var' - -if Concurrent.on_jruby? - - module Concurrent - - # @!visibility private - # @!macro internal_implementation_note - class JavaThreadLocalVar < AbstractThreadLocalVar - - # @!macro thread_local_var_method_get - def value - value = @var.get - - if value.nil? - default - elsif value == NULL - nil - else - value - end - end - - # @!macro thread_local_var_method_set - def value=(value) - @var.set(value) - end - - protected - - # @!visibility private - def allocate_storage - @var = java.lang.ThreadLocal.new - end - end - end -end diff --git a/lib/concurrent-ruby/concurrent/atomic/lock_local_var.rb b/lib/concurrent-ruby/concurrent/atomic/lock_local_var.rb new file mode 100644 index 000000000..9be85f9f3 --- /dev/null +++ b/lib/concurrent-ruby/concurrent/atomic/lock_local_var.rb @@ -0,0 +1,20 @@ +require_relative 'fiber_local_var' +require_relative 'thread_local_var' + +module Concurrent + def self.mutex_owned_per_thread? + mutex = Mutex.new + + # Lock the mutex: + mutex.synchronize do + # Check if the mutex is still owned in a child fiber: + Fiber.new{mutex.owned?}.resume + end + end + + if mutex_owned_per_thread? + LockLocalVar = ThreadLocalVar + else + LockLocalVar = FiberLocalVar + end +end diff --git a/lib/concurrent-ruby/concurrent/atomic/reentrant_read_write_lock.rb b/lib/concurrent-ruby/concurrent/atomic/reentrant_read_write_lock.rb index 3c17acd1b..6b425a3c9 100644 --- a/lib/concurrent-ruby/concurrent/atomic/reentrant_read_write_lock.rb +++ b/lib/concurrent-ruby/concurrent/atomic/reentrant_read_write_lock.rb @@ -4,7 +4,7 @@ require 'concurrent/errors' require 'concurrent/synchronization/object' require 'concurrent/synchronization/lock' -require 'concurrent/atomic/thread_local_var' +require 'concurrent/atomic/lock_local_var' module Concurrent @@ -111,7 +111,7 @@ def initialize @Counter = AtomicFixnum.new(0) # single integer which represents lock state @ReadQueue = Synchronization::Lock.new # used to queue waiting readers @WriteQueue = Synchronization::Lock.new # used to queue waiting writers - @HeldCount = ThreadLocalVar.new(0) # indicates # of R & W locks held by this thread + @HeldCount = LockLocalVar.new(0) # indicates # of R & W locks held by this thread end # Execute a block operation within a read lock. diff --git a/lib/concurrent-ruby/concurrent/atomic/ruby_thread_local_var.rb b/lib/concurrent-ruby/concurrent/atomic/ruby_thread_local_var.rb deleted file mode 100644 index 9a51eb288..000000000 --- a/lib/concurrent-ruby/concurrent/atomic/ruby_thread_local_var.rb +++ /dev/null @@ -1,181 +0,0 @@ -require 'thread' -require 'concurrent/atomic/abstract_thread_local_var' - -module Concurrent - - # @!visibility private - # @!macro internal_implementation_note - class RubyThreadLocalVar < AbstractThreadLocalVar - - # Each thread has a (lazily initialized) array of thread-local variable values - # Each time a new thread-local var is created, we allocate an "index" for it - # For example, if the allocated index is 1, that means slot #1 in EVERY - # thread's thread-local array will be used for the value of that TLV - # - # The good thing about using a per-THREAD structure to hold values, rather - # than a per-TLV structure, is that no synchronization is needed when - # reading and writing those values (since the structure is only ever - # accessed by a single thread) - # - # Of course, when a TLV is GC'd, 1) we need to recover its index for use - # by other new TLVs (otherwise the thread-local arrays could get bigger - # and bigger with time), and 2) we need to null out all the references - # held in the now-unused slots (both to avoid blocking GC of those objects, - # and also to prevent "stale" values from being passed on to a new TLV - # when the index is reused) - # Because we need to null out freed slots, we need to keep references to - # ALL the thread-local arrays -- ARRAYS is for that - # But when a Thread is GC'd, we need to drop the reference to its thread-local - # array, so we don't leak memory - - FREE = [] - LOCK = Mutex.new - THREAD_LOCAL_ARRAYS = {} # used as a hash set - - # synchronize when not on MRI - # on MRI using lock in finalizer leads to "can't be called from trap context" error - # so the code is carefully written to be tread-safe on MRI relying on GIL - - if Concurrent.on_cruby? - # @!visibility private - def self.semi_sync(&block) - block.call - end - else - # @!visibility private - def self.semi_sync(&block) - LOCK.synchronize(&block) - end - end - - private_constant :FREE, :LOCK, :THREAD_LOCAL_ARRAYS - - # @!macro thread_local_var_method_get - def value - if (array = get_threadlocal_array) - value = array[@index] - if value.nil? - default - elsif value.equal?(NULL) - nil - else - value - end - else - default - end - end - - # @!macro thread_local_var_method_set - def value=(value) - me = Thread.current - # We could keep the thread-local arrays in a hash, keyed by Thread - # But why? That would require locking - # Using Ruby's built-in thread-local storage is faster - unless (array = get_threadlocal_array(me)) - array = set_threadlocal_array([], me) - self.class.semi_sync { THREAD_LOCAL_ARRAYS[array.object_id] = array } - ObjectSpace.define_finalizer(me, self.class.thread_finalizer(array.object_id)) - end - array[@index] = (value.nil? ? NULL : value) - value - end - - protected - - # @!visibility private - def allocate_storage - @index = FREE.pop || next_index - - ObjectSpace.define_finalizer(self, self.class.thread_local_finalizer(@index)) - end - - # @!visibility private - def self.thread_local_finalizer(index) - proc do - semi_sync do - # The cost of GC'ing a TLV is linear in the number of threads using TLVs - # But that is natural! More threads means more storage is used per TLV - # So naturally more CPU time is required to free more storage - # - # DO NOT use each_value which might conflict with new pair assignment - # into the hash in #value= method - THREAD_LOCAL_ARRAYS.values.each { |array| array[index] = nil } - # free index has to be published after the arrays are cleared - FREE.push(index) - end - end - end - - # @!visibility private - def self.thread_finalizer(id) - proc do - semi_sync do - # The thread which used this thread-local array is now gone - # So don't hold onto a reference to the array (thus blocking GC) - THREAD_LOCAL_ARRAYS.delete(id) - end - end - end - - private - - # noinspection RubyClassVariableUsageInspection - @@next = 0 - # noinspection RubyClassVariableUsageInspection - def next_index - LOCK.synchronize do - result = @@next - @@next += 1 - result - end - end - - if Thread.instance_methods.include?(:thread_variable_get) - - def get_threadlocal_array(thread = Thread.current) - thread.thread_variable_get(:__threadlocal_array__) - end - - def set_threadlocal_array(array, thread = Thread.current) - thread.thread_variable_set(:__threadlocal_array__, array) - end - - else - - def get_threadlocal_array(thread = Thread.current) - thread[:__threadlocal_array__] - end - - def set_threadlocal_array(array, thread = Thread.current) - thread[:__threadlocal_array__] = array - end - end - - # This exists only for use in testing - # @!visibility private - def value_for(thread) - if (array = get_threadlocal_array(thread)) - value = array[@index] - if value.nil? - get_default - elsif value.equal?(NULL) - nil - else - value - end - else - get_default - end - end - - # @!visibility private - def get_default - if @default_block - raise "Cannot use default_for with default block" - else - @default - end - end - end -end diff --git a/lib/concurrent-ruby/concurrent/atomic/thread_local_var.rb b/lib/concurrent-ruby/concurrent/atomic/thread_local_var.rb index 100cc8de8..80b21e1e4 100644 --- a/lib/concurrent-ruby/concurrent/atomic/thread_local_var.rb +++ b/lib/concurrent-ruby/concurrent/atomic/thread_local_var.rb @@ -1,104 +1,70 @@ -require 'concurrent/utility/engine' -require 'concurrent/atomic/ruby_thread_local_var' -require 'concurrent/atomic/java_thread_local_var' +require 'concurrent/constants' module Concurrent - ################################################################### - - # @!macro thread_local_var_method_initialize - # - # Creates a thread local variable. - # - # @param [Object] default the default value when otherwise unset - # @param [Proc] default_block Optional block that gets called to obtain the - # default value for each thread + # @!macro thread_local_var + class ThreadLocalVar + # @!macro thread_local_var_method_initialize + def initialize(default = nil, &default_block) + if default && block_given? + raise ArgumentError, "Cannot use both value and block as default value" + end - # @!macro thread_local_var_method_get - # - # Returns the value in the current thread's copy of this thread-local variable. - # - # @return [Object] the current value + if block_given? + @default_block = default_block + @default = nil + else + @default_block = nil + @default = default + end - # @!macro thread_local_var_method_set - # - # Sets the current thread's copy of this thread-local variable to the specified value. - # - # @param [Object] value the value to set - # @return [Object] the new value + @name = :"concurrent_variable_#{object_id}" + end - # @!macro thread_local_var_method_bind - # - # Bind the given value to thread local storage during - # execution of the given block. - # - # @param [Object] value the value to bind - # @yield the operation to be performed with the bound variable - # @return [Object] the value + # @!macro thread_local_var_method_get + def value + value = Thread.current.thread_variable_get(@name) + if value.nil? + default + elsif value.equal?(NULL) + nil + else + value + end + end - ################################################################### + # @!macro thread_local_var_method_set + def value=(value) + if value.nil? + value = NULL + end - # @!macro thread_local_var_public_api - # - # @!method initialize(default = nil, &default_block) - # @!macro thread_local_var_method_initialize - # - # @!method value - # @!macro thread_local_var_method_get - # - # @!method value=(value) - # @!macro thread_local_var_method_set - # - # @!method bind(value, &block) - # @!macro thread_local_var_method_bind + Thread.current.thread_variable_set(@name, value) + end - ################################################################### + # @!macro thread_local_var_method_bind + def bind(value, &block) + if block_given? + old_value = self.value + begin + self.value = value + yield + ensure + self.value = old_value + end + end + end - # @!visibility private - # @!macro internal_implementation_note - ThreadLocalVarImplementation = case - when Concurrent.on_jruby? - JavaThreadLocalVar - else - RubyThreadLocalVar - end - private_constant :ThreadLocalVarImplementation + protected - # @!macro thread_local_var - # - # A `ThreadLocalVar` is a variable where the value is different for each thread. - # Each variable may have a default value, but when you modify the variable only - # the current thread will ever see that change. - # - # @!macro thread_safe_variable_comparison - # - # @example - # v = ThreadLocalVar.new(14) - # v.value #=> 14 - # v.value = 2 - # v.value #=> 2 - # - # @example - # v = ThreadLocalVar.new(14) - # - # t1 = Thread.new do - # v.value #=> 14 - # v.value = 1 - # v.value #=> 1 - # end - # - # t2 = Thread.new do - # v.value #=> 14 - # v.value = 2 - # v.value #=> 2 - # end - # - # v.value #=> 14 - # - # @see https://docs.oracle.com/javase/7/docs/api/java/lang/ThreadLocal.html Java ThreadLocal - # - # @!macro thread_local_var_public_api - class ThreadLocalVar < ThreadLocalVarImplementation + # @!visibility private + def default + if @default_block + self.value = @default_block.call + else + @default + end + end end end diff --git a/spec/concurrent/atomic/reentrant_read_write_lock_spec.rb b/spec/concurrent/atomic/reentrant_read_write_lock_spec.rb index 1ef054267..53283dbc8 100644 --- a/spec/concurrent/atomic/reentrant_read_write_lock_spec.rb +++ b/spec/concurrent/atomic/reentrant_read_write_lock_spec.rb @@ -2,485 +2,555 @@ require 'concurrent/atomic/reentrant_read_write_lock' require 'concurrent/atomic/count_down_latch' require 'concurrent/atomic/atomic_boolean' +require 'timeout' +require 'fiber' -unless Concurrent.on_jruby? +module Concurrent + BaseMatcher = RSpec::Matchers::BuiltIn::BaseMatcher - # NOTE: These tests depend heavily on the private/undocumented - # `ThreadLocalVar#value_for` method. This method does not, and cannot work - # on JRuby at this time. Therefore, these tests cannot be run successfully on - # JRuby. The initial implementation used a variation of `ThreadLocalVar` - # which was pure-Ruby on all platforms and exhibited behavior that allowed - # these tests to work. Functionality on JRuby was verified at that time. So - # long as `ReentrantReadWriteLock` is not changed (with respect to - # `ThreadLocalVar`) we can safely assume proper functionality on JRuby. + class TrackedReentrantReadWriteLock < ReentrantReadWriteLock + def initialize(scope = Thread) + super() - require 'timeout' - - module Concurrent - - BaseMatcher = RSpec::Matchers::BuiltIn::BaseMatcher - RRWL = ReentrantReadWriteLock # no way I'm typing that 50 times + @scope = scope + @read_acquired = Concurrent::Set.new + @write_acquired = Concurrent::Set.new + end - # **************************************************************** - # First some custom matchers to make our tests all nice and pretty - # **************************************************************** + attr :read_acquired + attr :write_acquired - class HoldLock - def initialize(lock) - @lock = lock + def try_read_lock + if super + @read_acquired.add(@scope.current) + true + else + false end + end - def for_read - HoldReadLock.new(@lock) + def try_write_lock + if super + @write_acquired.add(@scope.current) + true + else + false end + end - def for_write - HoldWriteLock.new(@lock) + def acquire_read_lock + if super + @read_acquired.add(@scope.current) + true + else + false end + end - def for_both - HoldBoth.new(@lock) + def acquire_write_lock + if super + @write_acquired.add(@scope.current) + true + else + false end end - class HoldReadLock < BaseMatcher - def match(lock, thread) - ((lock.instance_eval { @Counter.value } & RRWL::MAX_READERS) != 0) && - ((lock.instance_eval { @HeldCount.send(:value_for, thread) } & RRWL::READ_LOCK_MASK) > 0) + def release_read_lock + super.tap do + @read_acquired.delete(@scope.current) end end - class HoldWriteLock < BaseMatcher - def match(lock, thread) - ((lock.instance_eval { @Counter.value } & RRWL::RUNNING_WRITER) != 0) && - ((lock.instance_eval { @HeldCount.send(:value_for, thread) } & RRWL::WRITE_LOCK_MASK) > 0) + def release_write_lock + super.tap do + @write_acquired.delete(@scope.current) end end + end - class HoldBoth < BaseMatcher - def match(lock, thread) - HoldReadLock.new(lock).matches?(thread) && HoldWriteLock.new(lock).matches?(thread) - end + # **************************************************************** + # First some custom matchers to make our tests all nice and pretty + # **************************************************************** + + class HoldLock + def initialize(lock) + @lock = lock end - class BeFree < BaseMatcher - MASK = RRWL::MAX_READERS + RRWL::RUNNING_WRITER + def for_read + HoldReadLock.new(@lock) + end - def matches?(lock) - (lock.instance_eval { @Counter.value } & MASK) == 0 - end + def for_write + HoldWriteLock.new(@lock) + end - def failure_message - "expected lock to be free" - end + def for_both + HoldBoth.new(@lock) + end + end + + class HoldReadLock < BaseMatcher + def match(lock, scope) + ((lock.instance_eval { @Counter.value } & ReentrantReadWriteLock::MAX_READERS) != 0) && lock.read_acquired.include?(scope) + end + end + + class HoldWriteLock < BaseMatcher + def match(lock, scope) + ((lock.instance_eval { @Counter.value } & ReentrantReadWriteLock::RUNNING_WRITER) != 0) && lock.write_acquired.include?(scope) end + end - # ******************************************************* + class HoldBoth < BaseMatcher + def match(lock, scope) + HoldReadLock.new(lock).matches?(scope) && HoldWriteLock.new(lock).matches?(scope) + end + end - RSpec.describe ReentrantReadWriteLock do + class BeFree < BaseMatcher + MASK = ReentrantReadWriteLock::MAX_READERS + ReentrantReadWriteLock::RUNNING_WRITER - let(:lock) { RRWL.new } + def matches?(lock) + (lock.instance_eval { @Counter.value } & MASK) == 0 + end - def hold(lock) - HoldLock.new(lock) - end + def failure_message + "expected lock to be free" + end + end - def be_free - BeFree.new - end + # ******************************************************* - def wait_up_to(secs, &condition) - _end = Time.now + secs - while !condition.call && Time.now < _end - sleep(0.001) - end + RSpec.shared_context TrackedReentrantReadWriteLock do + def hold(lock) + HoldLock.new(lock) + end + + def be_free + BeFree.new + end + + def wait_up_to(secs, &condition) + _end = Time.now + secs + while !condition.call && Time.now < _end + sleep(0.001) end + end + end - context "read lock" do - - it "allows other read locks to be acquired at the same time" do - lock # stupid RSpec 'let' is not thread-safe! - Timeout.timeout(3) do - got_lock = 10.times.collect { CountDownLatch.new } - threads = 10.times.collect do |n| - in_thread do - # Each thread takes the read lock and then waits for another one - # They will only finish if ALL of them get their read lock - expect(lock.acquire_read_lock).to be true - expect(Thread.current).to hold(lock).for_read - got_lock[n].count_down - got_lock[(n+1) % 10].wait - end + RSpec.describe ReentrantReadWriteLock do + include_context TrackedReentrantReadWriteLock + + let(:lock) { TrackedReentrantReadWriteLock.new } + + context "read lock" do + + it "allows other read locks to be acquired at the same time" do + lock # stupid RSpec 'let' is not thread-safe! + Timeout.timeout(3) do + got_lock = 10.times.collect { CountDownLatch.new } + threads = 10.times.collect do |n| + in_thread do + # Each thread takes the read lock and then waits for another one + # They will only finish if ALL of them get their read lock + expect(lock.acquire_read_lock).to be true + expect(Thread.current).to hold(lock).for_read + got_lock[n].count_down + got_lock[(n+1) % 10].wait end - threads.each(&:join) end + threads.each(&:join) end + end - it "can be acquired more than once" do - Timeout.timeout(3) do - 10.times { expect(lock.acquire_read_lock).to be true } - expect(Thread.current).to hold(lock).for_read - 10.times { expect(lock.release_read_lock).to be true } - expect(Thread.current).not_to hold(lock).for_read - expect(lock).to be_free - end + it "can be acquired more than once" do + Timeout.timeout(3) do + 10.times { expect(lock.acquire_read_lock).to be true } + expect(Thread.current).to hold(lock).for_read + 10.times { expect(lock.release_read_lock).to be true } + expect(Thread.current).not_to hold(lock).for_read + expect(lock).to be_free end + end - it "can be acquired while holding a write lock" do - Timeout.timeout(3) do - expect(lock.acquire_write_lock).to be true - expect(Thread.current).to hold(lock).for_write - expect(lock.acquire_read_lock).to be true - expect(Thread.current).to hold(lock).for_both - expect(lock.release_read_lock).to be true - expect(Thread.current).to hold(lock).for_write - expect(Thread.current).not_to hold(lock).for_read - expect(lock.release_write_lock).to be true - expect(lock).to be_free - end + it "can be acquired while holding a write lock" do + Timeout.timeout(3) do + expect(lock.acquire_write_lock).to be true + expect(Thread.current).to hold(lock).for_write + expect(lock.acquire_read_lock).to be true + expect(Thread.current).to hold(lock).for_both + expect(lock.release_read_lock).to be true + expect(Thread.current).to hold(lock).for_write + expect(Thread.current).not_to hold(lock).for_read + expect(lock.release_write_lock).to be true + expect(lock).to be_free end + end - it "can be upgraded to a write lock" do - Timeout.timeout(3) do - expect(lock.acquire_read_lock).to be true - expect(Thread.current).to hold(lock).for_read - # now we want to upgrade... - expect(lock.acquire_write_lock).to be true - expect(lock.release_read_lock).to be true - expect(Thread.current).to hold(lock).for_write - - expect(lock.release_write_lock).to be true - expect(lock).to be_free - end + it "can be upgraded to a write lock" do + Timeout.timeout(3) do + expect(lock.acquire_read_lock).to be true + expect(Thread.current).to hold(lock).for_read + # now we want to upgrade... + expect(lock.acquire_write_lock).to be true + expect(lock.release_read_lock).to be true + expect(Thread.current).to hold(lock).for_write + + expect(lock.release_write_lock).to be true + expect(lock).to be_free end + end - it "can be upgraded to a write lock when read lock acquired more than once" do - Timeout.timeout(3) do - expect(lock.acquire_read_lock).to be true - expect(lock.acquire_read_lock).to be true - expect(Thread.current).to hold(lock).for_read + it "can be upgraded to a write lock when read lock acquired more than once" do + Timeout.timeout(3) do + expect(lock.acquire_read_lock).to be true + expect(lock.acquire_read_lock).to be true + expect(Thread.current).to hold(lock).for_read - # now we want to upgrade... - expect(lock.acquire_write_lock).to be true - expect(lock.release_read_lock).to be true - expect(lock.release_read_lock).to be true - expect(Thread.current).to hold(lock).for_write + # now we want to upgrade... + expect(lock.acquire_write_lock).to be true + expect(lock.release_read_lock).to be true + expect(lock.release_read_lock).to be true + expect(Thread.current).to hold(lock).for_write - expect(lock.release_write_lock).to be true - expect(lock).to be_free - end + expect(lock.release_write_lock).to be true + expect(lock).to be_free end + end - it "cannot be released when not held" do + it "cannot be released when not held" do + expect { lock.release_read_lock }.to raise_error(IllegalOperationError) + end + + it "cannot be released more times than it was taken" do + Timeout.timeout(3) do + 2.times { lock.acquire_read_lock } + 2.times { lock.release_read_lock } expect { lock.release_read_lock }.to raise_error(IllegalOperationError) end + end - it "cannot be released more times than it was taken" do - Timeout.timeout(3) do - 2.times { lock.acquire_read_lock } - 2.times { lock.release_read_lock } - expect { lock.release_read_lock }.to raise_error(IllegalOperationError) - end - end + it "wakes up waiting writers when the last read lock is released" do + latch1,latch2 = CountDownLatch.new(3),CountDownLatch.new + good = AtomicBoolean.new(false) + threads = [ + in_thread { lock.acquire_read_lock; latch1.count_down; latch2.wait; lock.release_read_lock }, + in_thread { lock.acquire_read_lock; latch1.count_down; latch2.wait; lock.release_read_lock }, + in_thread { lock.acquire_read_lock; latch1.count_down; latch2.wait; lock.release_read_lock }, + in_thread { latch1.wait; lock.acquire_write_lock; good.value = true } + ] + wait_up_to(0.2) { threads[3].status == 'sleep' } + # The last thread should be waiting to acquire a write lock now... + expect(threads[3].status).to eql "sleep" + expect(threads[3]).not_to hold(lock).for_write + expect(good.value).to be false + # Throw latch2 and the 3 readers will wake up and all release their read locks... + latch2.count_down + wait_up_to(0.2) { good.value } + expect(threads[3]).to hold(lock).for_write + expect(good.value).to be true + end + end - it "wakes up waiting writers when the last read lock is released" do - latch1,latch2 = CountDownLatch.new(3),CountDownLatch.new - good = AtomicBoolean.new(false) - threads = [ - in_thread { lock.acquire_read_lock; latch1.count_down; latch2.wait; lock.release_read_lock }, - in_thread { lock.acquire_read_lock; latch1.count_down; latch2.wait; lock.release_read_lock }, - in_thread { lock.acquire_read_lock; latch1.count_down; latch2.wait; lock.release_read_lock }, - in_thread { latch1.wait; lock.acquire_write_lock; good.value = true } - ] - wait_up_to(0.2) { threads[3].status == 'sleep' } - # The last thread should be waiting to acquire a write lock now... - expect(threads[3].status).to eql "sleep" - expect(threads[3]).not_to hold(lock).for_write - expect(good.value).to be false - # Throw latch2 and the 3 readers will wake up and all release their read locks... - latch2.count_down - wait_up_to(0.2) { good.value } - expect(threads[3]).to hold(lock).for_write - expect(good.value).to be true - end + context "write lock" do + it "cannot be acquired when another thread holds a write lock" do + latch = CountDownLatch.new + threads = [ + in_thread { lock.acquire_write_lock; latch.count_down }, + in_thread { latch.wait; lock.acquire_write_lock } + ] + expect { Timeout.timeout(1) { threads[0].join }}.not_to raise_error + expect(threads[0]).to hold(lock).for_write + expect(threads[1]).not_to hold(lock).for_write + wait_up_to(0.2) { threads[1].status == 'sleep' } + expect(threads[1].status).to eql "sleep" end - context "write lock" do - it "cannot be acquired when another thread holds a write lock" do - latch = CountDownLatch.new - threads = [ - in_thread { lock.acquire_write_lock; latch.count_down }, - in_thread { latch.wait; lock.acquire_write_lock } - ] - expect { Timeout.timeout(1) { threads[0].join }}.not_to raise_error - expect(threads[0]).to hold(lock).for_write - expect(threads[1]).not_to hold(lock).for_write - wait_up_to(0.2) { threads[1].status == 'sleep' } - expect(threads[1].status).to eql "sleep" - end + it "cannot be acquired when another thread holds a read lock" do + latch = CountDownLatch.new + threads = [ + in_thread { lock.acquire_read_lock; latch.count_down }, + in_thread { latch.wait; lock.acquire_write_lock } + ] + expect { Timeout.timeout(1) { threads[0].join }}.not_to raise_error + expect(threads[0]).to hold(lock).for_read + expect(threads[1]).not_to hold(lock).for_write + wait_up_to(0.2) { threads[1].status == 'sleep' } + expect(threads[1].status).to eql "sleep" + end - it "cannot be acquired when another thread holds a read lock" do - latch = CountDownLatch.new - threads = [ - in_thread { lock.acquire_read_lock; latch.count_down }, - in_thread { latch.wait; lock.acquire_write_lock } - ] - expect { Timeout.timeout(1) { threads[0].join }}.not_to raise_error - expect(threads[0]).to hold(lock).for_read - expect(threads[1]).not_to hold(lock).for_write - wait_up_to(0.2) { threads[1].status == 'sleep' } - expect(threads[1].status).to eql "sleep" + it "can be acquired more than once" do + Timeout.timeout(3) do + 10.times { expect(lock.acquire_write_lock).to be true } + expect(Thread.current).to hold(lock).for_write + 10.times { expect(lock.release_write_lock).to be true } + expect(Thread.current).not_to hold(lock).for_write + expect(lock).to be_free end + end - it "can be acquired more than once" do - Timeout.timeout(3) do - 10.times { expect(lock.acquire_write_lock).to be true } - expect(Thread.current).to hold(lock).for_write - 10.times { expect(lock.release_write_lock).to be true } - expect(Thread.current).not_to hold(lock).for_write - expect(lock).to be_free - end + it "can be acquired while holding a read lock" do + Timeout.timeout(3) do + expect(lock.acquire_read_lock).to be true + expect(Thread.current).to hold(lock).for_read + expect(lock.acquire_write_lock).to be true + expect(Thread.current).to hold(lock).for_both + expect(lock.release_write_lock).to be true + expect(Thread.current).to hold(lock).for_read + expect(Thread.current).not_to hold(lock).for_write + expect(lock.release_read_lock).to be true + expect(lock).to be_free end + end - it "can be acquired while holding a read lock" do - Timeout.timeout(3) do - expect(lock.acquire_read_lock).to be true - expect(Thread.current).to hold(lock).for_read - expect(lock.acquire_write_lock).to be true - expect(Thread.current).to hold(lock).for_both - expect(lock.release_write_lock).to be true - expect(Thread.current).to hold(lock).for_read - expect(Thread.current).not_to hold(lock).for_write - expect(lock.release_read_lock).to be true - expect(lock).to be_free - end - end + it "can be downgraded to a read lock" do + Timeout.timeout(3) do + expect(lock.acquire_write_lock).to be true + expect(Thread.current).to hold(lock).for_write + # now we want to downgrade... + expect(lock.acquire_read_lock).to be true + expect(lock.release_write_lock).to be true + expect(Thread.current).to hold(lock).for_read - it "can be downgraded to a read lock" do - Timeout.timeout(3) do - expect(lock.acquire_write_lock).to be true - expect(Thread.current).to hold(lock).for_write - # now we want to downgrade... - expect(lock.acquire_read_lock).to be true - expect(lock.release_write_lock).to be true - expect(Thread.current).to hold(lock).for_read - - expect(lock.release_read_lock).to be true - expect(lock).to be_free - end + expect(lock.release_read_lock).to be true + expect(lock).to be_free end + end - it "cannot be released when not held" do - expect { lock.release_write_lock }.to raise_error(IllegalOperationError) - end + it "cannot be released when not held" do + expect { lock.release_write_lock }.to raise_error(IllegalOperationError) + end - it "cannot be released more times than it was taken" do - Timeout.timeout(3) do - 2.times { lock.acquire_write_lock } - 2.times { lock.release_write_lock } - expect { lock.release_write_lock }.to raise_error(IllegalOperationError) - end + it "cannot be released more times than it was taken" do + Timeout.timeout(3) do + 2.times { lock.acquire_write_lock } + 2.times { lock.release_write_lock } + expect { lock.release_write_lock }.to raise_error(IllegalOperationError) end + end - it "wakes up waiting readers when the write lock is released" do - latch1,latch2 = CountDownLatch.new,CountDownLatch.new - good = AtomicFixnum.new(0) - threads = [ - in_thread { lock.acquire_write_lock; latch1.count_down; latch2.wait; lock.release_write_lock }, - in_thread { latch1.wait; lock.acquire_read_lock; good.update { |n| n+1 }}, - in_thread { latch1.wait; lock.acquire_read_lock; good.update { |n| n+1 }}, - in_thread { latch1.wait; lock.acquire_read_lock; good.update { |n| n+1 }} - ] - wait_up_to(0.2) { threads[3].status == 'sleep' } - # The last 3 threads should be waiting to acquire read locks now... - # TODO (pitr-ch 15-Oct-2016): https://travis-ci.org/ruby-concurrency/concurrent-ruby/jobs/166777543 - (1..3).each { |n| expect(threads[n].status).to eql "sleep" } - (1..3).each { |n| expect(threads[n]).not_to hold(lock).for_read } - # Throw latch2 and the writer will wake up and release its write lock... - latch2.count_down - wait_up_to(0.2) { good.value == 3 } - (1..3).each { |n| expect(threads[n]).to hold(lock).for_read } - end + it "wakes up waiting readers when the write lock is released" do + latch1,latch2 = CountDownLatch.new,CountDownLatch.new + good = AtomicFixnum.new(0) + threads = [ + in_thread { lock.acquire_write_lock; latch1.count_down; latch2.wait; lock.release_write_lock }, + in_thread { latch1.wait; lock.acquire_read_lock; good.update { |n| n+1 }}, + in_thread { latch1.wait; lock.acquire_read_lock; good.update { |n| n+1 }}, + in_thread { latch1.wait; lock.acquire_read_lock; good.update { |n| n+1 }} + ] + wait_up_to(0.2) { threads[3].status == 'sleep' } + # The last 3 threads should be waiting to acquire read locks now... + # TODO (pitr-ch 15-Oct-2016): https://travis-ci.org/ruby-concurrency/concurrent-ruby/jobs/166777543 + (1..3).each { |n| expect(threads[n].status).to eql "sleep" } + (1..3).each { |n| expect(threads[n]).not_to hold(lock).for_read } + # Throw latch2 and the writer will wake up and release its write lock... + latch2.count_down + wait_up_to(0.2) { good.value == 3 } + (1..3).each { |n| expect(threads[n]).to hold(lock).for_read } + end - it "wakes up waiting writers when the write lock is released" do - latch1,latch2 = CountDownLatch.new,CountDownLatch.new - good = AtomicBoolean.new(false) - threads = [ - in_thread { lock.acquire_write_lock; latch1.count_down; latch2.wait; lock.release_write_lock }, - in_thread { latch1.wait; lock.acquire_write_lock; good.value = true }, - ] - wait_up_to(0.2) { threads[1].status == 'sleep' } - # The last thread should be waiting to acquire a write lock now... - expect(threads[1].status).to eql "sleep" - expect(threads[1]).not_to hold(lock).for_write - # Throw latch2 and the writer will wake up and release its write lock... - latch2.count_down - wait_up_to(0.2) { good.value } - expect(threads[1]).to hold(lock).for_write - end + it "wakes up waiting writers when the write lock is released" do + latch1,latch2 = CountDownLatch.new,CountDownLatch.new + good = AtomicBoolean.new(false) + threads = [ + in_thread { lock.acquire_write_lock; latch1.count_down; latch2.wait; lock.release_write_lock }, + in_thread { latch1.wait; lock.acquire_write_lock; good.value = true }, + ] + wait_up_to(0.2) { threads[1].status == 'sleep' } + # The last thread should be waiting to acquire a write lock now... + expect(threads[1].status).to eql "sleep" + expect(threads[1]).not_to hold(lock).for_write + # Throw latch2 and the writer will wake up and release its write lock... + latch2.count_down + wait_up_to(0.2) { good.value } + expect(threads[1]).to hold(lock).for_write end + end - context "#with_read_lock" do + context "#with_read_lock" do - it "acquires read block before yielding, then releases it" do - expect(lock).to be_free - lock.with_read_lock { expect(Thread.current).to hold(lock).for_read } - expect(lock).to be_free - end + it "acquires read block before yielding, then releases it" do + expect(lock).to be_free + lock.with_read_lock { expect(Thread.current).to hold(lock).for_read } + expect(lock).to be_free + end - it "releases read lock if an exception is raised in block" do - expect { - lock.with_read_lock { raise "Bad" } - }.to raise_error(RuntimeError, 'Bad') - expect(lock).to be_free - expect(Thread.current).not_to hold(lock).for_read - end + it "releases read lock if an exception is raised in block" do + expect { + lock.with_read_lock { raise "Bad" } + }.to raise_error(RuntimeError, 'Bad') + expect(lock).to be_free + expect(Thread.current).not_to hold(lock).for_read end + end - context "#with_write_lock" do + context "#with_write_lock" do - it "acquires write block before yielding, then releases it" do - expect(lock).to be_free - lock.with_write_lock { expect(Thread.current).to hold(lock).for_write } - expect(lock).to be_free - end + it "acquires write block before yielding, then releases it" do + expect(lock).to be_free + lock.with_write_lock { expect(Thread.current).to hold(lock).for_write } + expect(lock).to be_free + end - it "releases write lock if an exception is raised in block" do - expect { - lock.with_write_lock { raise "Bad" } - }.to raise_error(RuntimeError, 'Bad') - expect(lock).to be_free - expect(Thread.current).not_to hold(lock).for_write - end + it "releases write lock if an exception is raised in block" do + expect { + lock.with_write_lock { raise "Bad" } + }.to raise_error(RuntimeError, 'Bad') + expect(lock).to be_free + expect(Thread.current).not_to hold(lock).for_write end + end - context "#try_read_lock" do + context "#try_read_lock" do - it "returns false immediately if read lock cannot be obtained" do - Timeout.timeout(3) do - latch = CountDownLatch.new - in_thread { lock.acquire_write_lock; latch.count_down } + it "returns false immediately if read lock cannot be obtained" do + Timeout.timeout(3) do + latch = CountDownLatch.new + in_thread { lock.acquire_write_lock; latch.count_down } - latch.wait - expect { - Timeout.timeout(0.01) { expect(lock.try_read_lock).to be false } - }.not_to raise_error - expect(Thread.current).not_to hold(lock).for_read - end + latch.wait + expect { + Timeout.timeout(0.01) { expect(lock.try_read_lock).to be false } + }.not_to raise_error + expect(Thread.current).not_to hold(lock).for_read end + end - it "acquires read lock and returns true if it can do so without blocking" do - Timeout.timeout(3) do - latch = CountDownLatch.new - in_thread { lock.acquire_read_lock; latch.count_down } - - latch.wait - expect { - Timeout.timeout(0.01) { expect(lock.try_read_lock).to be true } - }.not_to raise_error - expect(lock).not_to be_free - expect(Thread.current).to hold(lock).for_read - end + it "acquires read lock and returns true if it can do so without blocking" do + Timeout.timeout(3) do + latch = CountDownLatch.new + in_thread { lock.acquire_read_lock; latch.count_down } + + latch.wait + expect { + Timeout.timeout(0.01) { expect(lock.try_read_lock).to be true } + }.not_to raise_error + expect(lock).not_to be_free + expect(Thread.current).to hold(lock).for_read end + end - it "can acquire a read lock if a read lock is already held" do - Timeout.timeout(3) do - expect(lock.acquire_read_lock).to be true - expect(lock.try_read_lock).to be true - expect(Thread.current).to hold(lock).for_read - expect(lock.release_read_lock).to be true - expect(lock.release_read_lock).to be true - expect(Thread.current).not_to hold(lock).for_read - expect(lock).to be_free - end + it "can acquire a read lock if a read lock is already held" do + Timeout.timeout(3) do + expect(lock.acquire_read_lock).to be true + expect(lock.try_read_lock).to be true + expect(Thread.current).to hold(lock).for_read + expect(lock.release_read_lock).to be true + expect(lock.release_read_lock).to be true + expect(Thread.current).not_to hold(lock).for_read + expect(lock).to be_free end + end - it "can acquire a read lock if a write lock is already held" do - Timeout.timeout(3) do - expect(lock.acquire_write_lock).to be true - expect(lock.try_read_lock).to be true - expect(Thread.current).to hold(lock).for_read - expect(lock.release_read_lock).to be true - expect(lock.release_write_lock).to be true - expect(Thread.current).not_to hold(lock).for_read - expect(lock).to be_free - end + it "can acquire a read lock if a write lock is already held" do + Timeout.timeout(3) do + expect(lock.acquire_write_lock).to be true + expect(lock.try_read_lock).to be true + expect(Thread.current).to hold(lock).for_read + expect(lock.release_read_lock).to be true + expect(lock.release_write_lock).to be true + expect(Thread.current).not_to hold(lock).for_read + expect(lock).to be_free end end + end - context "#try_write_lock" do + context "#try_write_lock" do - it "returns false immediately if write lock cannot be obtained" do - Timeout.timeout(3) do - latch = CountDownLatch.new - in_thread { lock.acquire_write_lock; latch.count_down } + it "returns false immediately if write lock cannot be obtained" do + Timeout.timeout(3) do + latch = CountDownLatch.new + in_thread { lock.acquire_write_lock; latch.count_down } - latch.wait - expect { - Timeout.timeout(0.02) { expect(lock.try_write_lock).to be false } - }.not_to raise_error - expect(Thread.current).not_to hold(lock).for_write - end + latch.wait + expect { + Timeout.timeout(0.02) { expect(lock.try_write_lock).to be false } + }.not_to raise_error + expect(Thread.current).not_to hold(lock).for_write end + end - it "acquires write lock and returns true if it can do so without blocking" do - Timeout.timeout(3) do - expect { - Timeout.timeout(0.02) { expect(lock.try_write_lock).to be true } - }.not_to raise_error - expect(lock).not_to be_free - expect(Thread.current).to hold(lock).for_write - end + it "acquires write lock and returns true if it can do so without blocking" do + Timeout.timeout(3) do + expect { + Timeout.timeout(0.02) { expect(lock.try_write_lock).to be true } + }.not_to raise_error + expect(lock).not_to be_free + expect(Thread.current).to hold(lock).for_write end + end - it "can acquire a write lock if a read lock is already held" do - Timeout.timeout(3) do - expect(lock.acquire_read_lock).to be true - expect(lock.try_write_lock).to be true - expect(Thread.current).to hold(lock).for_write - expect(lock.release_write_lock).to be true - expect(lock.release_read_lock).to be true - expect(Thread.current).not_to hold(lock).for_write - expect(lock).to be_free - end + it "can acquire a write lock if a read lock is already held" do + Timeout.timeout(3) do + expect(lock.acquire_read_lock).to be true + expect(lock.try_write_lock).to be true + expect(Thread.current).to hold(lock).for_write + expect(lock.release_write_lock).to be true + expect(lock.release_read_lock).to be true + expect(Thread.current).not_to hold(lock).for_write + expect(lock).to be_free end + end - it "can acquire a write lock if a write lock is already held" do - Timeout.timeout(3) do - expect(lock.acquire_write_lock).to be true - expect(lock.try_write_lock).to be true - expect(Thread.current).to hold(lock).for_write - expect(lock.release_write_lock).to be true - expect(lock.release_write_lock).to be true - expect(Thread.current).not_to hold(lock).for_write - expect(lock).to be_free - end + it "can acquire a write lock if a write lock is already held" do + Timeout.timeout(3) do + expect(lock.acquire_write_lock).to be true + expect(lock.try_write_lock).to be true + expect(Thread.current).to hold(lock).for_write + expect(lock.release_write_lock).to be true + expect(lock.release_write_lock).to be true + expect(Thread.current).not_to hold(lock).for_write + expect(lock).to be_free end end + end - it "can survive a torture test" do - count = 0 - writers = 5.times.collect do - in_thread do - 500.times do - lock.with_write_lock do - value = (count += 1) - sleep(0.0001) - count = value+1 - end + it "can survive a torture test" do + count = 0 + writers = 5.times.collect do + in_thread do + 500.times do + lock.with_write_lock do + value = (count += 1) + sleep(0.0001) + count = value+1 end end end - readers = 15.times.collect do - in_thread do - 500.times do - lock.with_read_lock { expect(count % 2).to eq 0 } - end + end + readers = 15.times.collect do + in_thread do + 500.times do + lock.with_read_lock { expect(count % 2).to eq 0 } end end - writers.each(&:join) - readers.each(&:join) - expect(count).to eq 5000 + end + writers.each(&:join) + readers.each(&:join) + expect(count).to eq 5000 + end + end + + RSpec.describe Concurrent::ReentrantReadWriteLock, if: (Concurrent::LockLocalVar == Concurrent::FiberLocalVar) do + include_context TrackedReentrantReadWriteLock + + let(:lock) { Concurrent::TrackedReentrantReadWriteLock.new(Fiber) } + + it "can acquire locks in separate fibers" do + lock.with_read_lock do + expect(Fiber.current).to hold(lock).for_read + Fiber.new do + expect(Fiber.current).to_not hold(lock).for_read + + lock.with_read_lock do + expect(Fiber.current).to hold(lock).for_read + end + + end.resume + + expect(Fiber.current).to hold(lock).for_read end end end diff --git a/spec/concurrent/atomic/thread_local_var_spec.rb b/spec/concurrent/atomic/thread_local_var_spec.rb index bb960049c..1616f23d6 100644 --- a/spec/concurrent/atomic/thread_local_var_spec.rb +++ b/spec/concurrent/atomic/thread_local_var_spec.rb @@ -26,16 +26,6 @@ module Concurrent expect(t2.value).to eq 14 end - if Concurrent.on_jruby? - it 'extends JavaThreadLocalVar' do - expect(described_class.ancestors).to include(Concurrent::JavaThreadLocalVar) - end - else - it 'extends RubyThreadLocalVar' do - expect(described_class.ancestors).to include(Concurrent::RubyThreadLocalVar) - end - end - it 'can set a block to be called to get the initial value' do v = described_class.new { 14 } expect(v.value).to eq 14 @@ -110,7 +100,7 @@ module Concurrent end it 'does not modify the value for other threads' do - v.value = 2 + v.value = 3 b1 = CountDownLatch.new(2) b2 = CountDownLatch.new(2) From 6827b55a8cd598c7c3887d43b6aec684d7ad2b8b Mon Sep 17 00:00:00 2001 From: Samuel Williams Date: Thu, 12 Jan 2023 14:42:12 +1300 Subject: [PATCH 2/6] Revert back to shared implementation of "locals" storage array. --- .../concurrent/atomic/fiber_local_var.rb | 45 +++- .../concurrent/atomic/locals.rb | 192 ++++++++++++++++++ .../concurrent/atomic/thread_local_var.rb | 50 +++-- 3 files changed, 266 insertions(+), 21 deletions(-) create mode 100644 lib/concurrent-ruby/concurrent/atomic/locals.rb diff --git a/lib/concurrent-ruby/concurrent/atomic/fiber_local_var.rb b/lib/concurrent-ruby/concurrent/atomic/fiber_local_var.rb index 8f13eea55..f3b3076c7 100644 --- a/lib/concurrent-ruby/concurrent/atomic/fiber_local_var.rb +++ b/lib/concurrent-ruby/concurrent/atomic/fiber_local_var.rb @@ -1,7 +1,40 @@ require 'concurrent/constants' +require_relative 'locals' module Concurrent class FiberLocalVar + LOCALS = FiberLocals.new(:concurrent_fiber_local_var) + + # @!macro fiber_local_var_method_initialize + # + # Creates a fiber local variable. + # + # @param [Object] default the default value when otherwise unset + # @param [Proc] default_block Optional block that gets called to obtain the + # default value for each fiber + + # @!macro fiber_local_var_method_get + # + # Returns the value in the current fiber's copy of this fiber-local variable. + # + # @return [Object] the current value + + # @!macro fiber_local_var_method_set + # + # Sets the current fiber's copy of this fiber-local variable to the specified value. + # + # @param [Object] value the value to set + # @return [Object] the new value + + # @!macro fiber_local_var_method_bind + # + # Bind the given value to fiber local storage during + # execution of the given block. + # + # @param [Object] value the value to bind + # @yield the operation to be performed with the bound variable + # @return [Object] the value + def initialize(default = nil, &default_block) if default && block_given? raise ArgumentError, "Cannot use both value and block as default value" @@ -15,20 +48,20 @@ def initialize(default = nil, &default_block) @default = default end - @name = :"concurrent_variable_#{object_id}" + @index = LOCALS.next_index(self) end - # @!macro thread_local_var_method_get + # @!macro fiber_local_var_method_get def value - Thread.current.fetch(@name) {default} + LOCALS.fetch(@index) {default} end - # @!macro thread_local_var_method_set + # @!macro fiber_local_var_method_set def value=(value) - Thread.current[@name] = value + LOCALS.set(@index, value) end - # @!macro thread_local_var_method_bind + # @!macro fiber_local_var_method_bind def bind(value, &block) if block_given? old_value = self.value diff --git a/lib/concurrent-ruby/concurrent/atomic/locals.rb b/lib/concurrent-ruby/concurrent/atomic/locals.rb new file mode 100644 index 000000000..b4261af44 --- /dev/null +++ b/lib/concurrent-ruby/concurrent/atomic/locals.rb @@ -0,0 +1,192 @@ +require 'concurrent/constants' + +module Concurrent + # @!visibility private + # @!macro internal_implementation_note + # + # An abstract implementation of local storage, with sub-classes for + # per-thread and per-fiber locals. + # + # Each execution context (EC, thread or fiber) has a lazily initialized array + # of local variable values. Each time a new local variable is created, we + # allocate an "index" for it. + # + # For example, if the allocated index is 1, that means slot #1 in EVERY EC's + # locals array will be used for the value of that variable. + # + # The good thing about using a per-EC structure to hold values, rather than + # a global, is that no synchronization is needed when reading and writing + # those values (since the structure is only ever accessed by a single + # thread). + # + # Of course, when a local variable is GC'd, 1) we need to recover its index + # for use by other new local variables (otherwise the locals arrays could + # get bigger and bigger with time), and 2) we need to null out all the + # references held in the now-unused slots (both to avoid blocking GC of those + # objects, and also to prevent "stale" values from being passed on to a new + # local when the index is reused). + # + # Because we need to null out freed slots, we need to keep references to + # ALL the locals arrays, so we can null out the appropriate slots in all of + # them. This is why we need to use a finalizer to clean up the locals array + # when the EC goes out of scope. + class AbstractLocals + def initialize(name_prefix = :concurrent_locals) + @free = [] + @lock = Mutex.new + @all_locals = {} + @next = 0 + + @name = :"#{name_prefix}_#{object_id}" + end + + def synchronize + @lock.synchronize { yield } + end + + if Concurrent.on_cruby? + def weak_synchronize + yield + end + else + alias_method :weak_synchronize, :synchronize + end + + def next_index(target) + index = synchronize do + if @free.empty? + @next += 1 + else + @free.pop + end + end + + # When the target goes out of scope, we should free the associated index + # and all values stored into it. + ObjectSpace.define_finalizer(target, target_finalizer(index)) + + return index + end + + def free_index(index) + weak_synchronize do + # The cost of GC'ing a TLV is linear in the number of ECs using local + # variables. But that is natural! More ECs means more storage is used + # per local variable. So naturally more CPU time is required to free + # more storage. + # + # DO NOT use each_value which might conflict with new pair assignment + # into the hash in #set method. + @all_locals.values.each do |locals| + locals[index] = nil + end + + # free index has to be published after the arrays are cleared: + @free << index + end + end + + def fetch(index, default = nil) + if locals = self.locals + value = locals[index] + end + + if value.nil? + if block_given? + yield + else + default + end + elsif value.equal?(NULL) + nil + else + value + end + end + + def set(index, value) + locals = self.locals! + locals[index] = (value.nil? ? NULL : value) + + value + end + + private + + # When the target index goes out of scope, clean up that slot across all locals currently assigned. + def target_finalizer(index) + proc do + free_index(index) + end + end + + # When a target (locals) goes out of scope, delete the locals from all known locals. + def locals_finalizer(locals_object_id) + proc do |locals_id| + weak_synchronize do + @all_locals.delete(locals_object_id) + end + end + end + + # Returns the locals for the current scope, or nil if none exist. + def locals + raise NotImplementedError + end + + # Returns the locals for the current scope, creating them if necessary. + def locals! + raise NotImplementedError + end + end + + # @!visibility private + # @!macro internal_implementation_note + # An array-backed storage of indexed variables per thread. + class ThreadLocals < AbstractLocals + def locals + Thread.current.thread_variable_get(@name) + end + + def locals! + thread = Thread.current + locals = thread.thread_variable_get(@name) + + unless locals + locals = thread.thread_variable_set(@name, []) + weak_synchronize do + @all_locals[locals.object_id] = locals + # When the thread goes out of scope, we should delete the associated locals: + ObjectSpace.define_finalizer(thread, locals_finalizer(locals.object_id)) + end + end + + return locals + end + end + + # @!visibility private + # @!macro internal_implementation_note + # An array-backed storage of indexed variables per fiber. + class FiberLocals < AbstractLocals + def locals + Thread.current[@name] + end + + def locals! + thread = Thread.current + locals = thread[@name] + + unless locals + locals = thread[@name] = [] + weak_synchronize do + @all_locals[locals.object_id] = locals + # When the thread goes out of scope, we should delete the associated locals: + ObjectSpace.define_finalizer(Fiber.current, locals_finalizer(locals.object_id)) + end + end + + return locals + end + end +end diff --git a/lib/concurrent-ruby/concurrent/atomic/thread_local_var.rb b/lib/concurrent-ruby/concurrent/atomic/thread_local_var.rb index 80b21e1e4..f3c20dfc5 100644 --- a/lib/concurrent-ruby/concurrent/atomic/thread_local_var.rb +++ b/lib/concurrent-ruby/concurrent/atomic/thread_local_var.rb @@ -1,10 +1,42 @@ require 'concurrent/constants' +require_relative 'locals' module Concurrent # @!macro thread_local_var class ThreadLocalVar + LOCALS = ThreadLocals.new(:concurrent_fiber_local_var) + # @!macro thread_local_var_method_initialize + # + # Creates a thread local variable. + # + # @param [Object] default the default value when otherwise unset + # @param [Proc] default_block Optional block that gets called to obtain the + # default value for each thread + + # @!macro thread_local_var_method_get + # + # Returns the value in the current thread's copy of this thread-local variable. + # + # @return [Object] the current value + + # @!macro thread_local_var_method_set + # + # Sets the current thread's copy of this thread-local variable to the specified value. + # + # @param [Object] value the value to set + # @return [Object] the new value + + # @!macro thread_local_var_method_bind + # + # Bind the given value to thread local storage during + # execution of the given block. + # + # @param [Object] value the value to bind + # @yield the operation to be performed with the bound variable + # @return [Object] the value + def initialize(default = nil, &default_block) if default && block_given? raise ArgumentError, "Cannot use both value and block as default value" @@ -18,29 +50,17 @@ def initialize(default = nil, &default_block) @default = default end - @name = :"concurrent_variable_#{object_id}" + @index = LOCALS.next_index(self) end # @!macro thread_local_var_method_get def value - value = Thread.current.thread_variable_get(@name) - - if value.nil? - default - elsif value.equal?(NULL) - nil - else - value - end + LOCALS.fetch(@index) {default} end # @!macro thread_local_var_method_set def value=(value) - if value.nil? - value = NULL - end - - Thread.current.thread_variable_set(@name, value) + LOCALS.set(@index, value) end # @!macro thread_local_var_method_bind From b1f4fee6d4066d5f9dbde488430f185ba823a7a7 Mon Sep 17 00:00:00 2001 From: Samuel Williams Date: Sat, 14 Jan 2023 12:39:35 +1300 Subject: [PATCH 3/6] Mark the `mutex_owned_per_thread?` method as private. --- lib/concurrent-ruby/concurrent/atomic/lock_local_var.rb | 1 + 1 file changed, 1 insertion(+) diff --git a/lib/concurrent-ruby/concurrent/atomic/lock_local_var.rb b/lib/concurrent-ruby/concurrent/atomic/lock_local_var.rb index 9be85f9f3..3f954d624 100644 --- a/lib/concurrent-ruby/concurrent/atomic/lock_local_var.rb +++ b/lib/concurrent-ruby/concurrent/atomic/lock_local_var.rb @@ -2,6 +2,7 @@ require_relative 'thread_local_var' module Concurrent + # @!visibility private def self.mutex_owned_per_thread? mutex = Mutex.new From 50b538c434365ed4c5edba638b3c6f942156ab81 Mon Sep 17 00:00:00 2001 From: Samuel Williams Date: Sun, 22 Jan 2023 09:27:23 +1300 Subject: [PATCH 4/6] Add missing `require "concurrent/set.rb"`. --- spec/concurrent/atomic/reentrant_read_write_lock_spec.rb | 1 + 1 file changed, 1 insertion(+) diff --git a/spec/concurrent/atomic/reentrant_read_write_lock_spec.rb b/spec/concurrent/atomic/reentrant_read_write_lock_spec.rb index 53283dbc8..05a713b2f 100644 --- a/spec/concurrent/atomic/reentrant_read_write_lock_spec.rb +++ b/spec/concurrent/atomic/reentrant_read_write_lock_spec.rb @@ -2,6 +2,7 @@ require 'concurrent/atomic/reentrant_read_write_lock' require 'concurrent/atomic/count_down_latch' require 'concurrent/atomic/atomic_boolean' +require 'concurrent/set' require 'timeout' require 'fiber' From 4959d7618372a7d8b77bf65ae2639d318c0522e0 Mon Sep 17 00:00:00 2001 From: Samuel Williams Date: Sun, 22 Jan 2023 09:54:02 +1300 Subject: [PATCH 5/6] Fix missing method `on_cruby?`. --- lib/concurrent-ruby/concurrent/atomic/locals.rb | 1 + 1 file changed, 1 insertion(+) diff --git a/lib/concurrent-ruby/concurrent/atomic/locals.rb b/lib/concurrent-ruby/concurrent/atomic/locals.rb index b4261af44..d29a3abc8 100644 --- a/lib/concurrent-ruby/concurrent/atomic/locals.rb +++ b/lib/concurrent-ruby/concurrent/atomic/locals.rb @@ -1,3 +1,4 @@ +require 'concurrent/utility/engine' require 'concurrent/constants' module Concurrent From 9c09274ed09ba62da906b7f54ce8f637d77f2a80 Mon Sep 17 00:00:00 2001 From: Benoit Daloze Date: Mon, 23 Jan 2023 13:11:35 +0100 Subject: [PATCH 6/6] Various fixes for ThreadLocalVar and FiberLocalVar --- .../concurrent/atomic/fiber_local_var.rb | 93 ++++++++----- .../concurrent/atomic/locals.rb | 77 +++++------ .../concurrent/atomic/lock_local_var.rb | 11 +- .../atomic/reentrant_read_write_lock.rb | 2 +- .../concurrent/atomic/thread_local_var.rb | 95 ++++++++------ .../concurrent/atomic/fiber_local_var_spec.rb | 123 ++++++++++++++++++ spec/concurrent/atomic/lock_local_var_spec.rb | 20 +++ .../atomic/thread_local_var_spec.rb | 1 - spec/support/example_group_extensions.rb | 8 +- 9 files changed, 310 insertions(+), 120 deletions(-) create mode 100644 spec/concurrent/atomic/fiber_local_var_spec.rb create mode 100644 spec/concurrent/atomic/lock_local_var_spec.rb diff --git a/lib/concurrent-ruby/concurrent/atomic/fiber_local_var.rb b/lib/concurrent-ruby/concurrent/atomic/fiber_local_var.rb index f3b3076c7..e90fc24f9 100644 --- a/lib/concurrent-ruby/concurrent/atomic/fiber_local_var.rb +++ b/lib/concurrent-ruby/concurrent/atomic/fiber_local_var.rb @@ -2,39 +2,50 @@ require_relative 'locals' module Concurrent - class FiberLocalVar - LOCALS = FiberLocals.new(:concurrent_fiber_local_var) - - # @!macro fiber_local_var_method_initialize - # - # Creates a fiber local variable. - # - # @param [Object] default the default value when otherwise unset - # @param [Proc] default_block Optional block that gets called to obtain the - # default value for each fiber - # @!macro fiber_local_var_method_get - # - # Returns the value in the current fiber's copy of this fiber-local variable. - # - # @return [Object] the current value - - # @!macro fiber_local_var_method_set - # - # Sets the current fiber's copy of this fiber-local variable to the specified value. - # - # @param [Object] value the value to set - # @return [Object] the new value + # A `FiberLocalVar` is a variable where the value is different for each fiber. + # Each variable may have a default value, but when you modify the variable only + # the current fiber will ever see that change. + # + # This is similar to Ruby's built-in fiber-local variables (`Thread.current[:name]`), + # but with these major advantages: + # * `FiberLocalVar` has its own identity, it doesn't need a Symbol. + # * Each Ruby's built-in fiber-local variable leaks some memory forever (it's a Symbol held forever on the fiber), + # so it's only OK to create a small amount of them. + # `FiberLocalVar` has no such issue and it is fine to create many of them. + # * Ruby's built-in fiber-local variables leak forever the value set on each fiber (unless set to nil explicitly). + # `FiberLocalVar` automatically removes the mapping for each fiber once the `FiberLocalVar` instance is GC'd. + # + # @example + # v = FiberLocalVar.new(14) + # v.value #=> 14 + # v.value = 2 + # v.value #=> 2 + # + # @example + # v = FiberLocalVar.new(14) + # + # Fiber.new do + # v.value #=> 14 + # v.value = 1 + # v.value #=> 1 + # end.resume + # + # Fiber.new do + # v.value #=> 14 + # v.value = 2 + # v.value #=> 2 + # end.resume + # + # v.value #=> 14 + class FiberLocalVar + LOCALS = FiberLocals.new - # @!macro fiber_local_var_method_bind + # Creates a fiber local variable. # - # Bind the given value to fiber local storage during - # execution of the given block. - # - # @param [Object] value the value to bind - # @yield the operation to be performed with the bound variable - # @return [Object] the value - + # @param [Object] default the default value when otherwise unset + # @param [Proc] default_block Optional block that gets called to obtain the + # default value for each fiber def initialize(default = nil, &default_block) if default && block_given? raise ArgumentError, "Cannot use both value and block as default value" @@ -51,22 +62,32 @@ def initialize(default = nil, &default_block) @index = LOCALS.next_index(self) end - # @!macro fiber_local_var_method_get + # Returns the value in the current fiber's copy of this fiber-local variable. + # + # @return [Object] the current value def value - LOCALS.fetch(@index) {default} + LOCALS.fetch(@index) { default } end - # @!macro fiber_local_var_method_set + # Sets the current fiber's copy of this fiber-local variable to the specified value. + # + # @param [Object] value the value to set + # @return [Object] the new value def value=(value) LOCALS.set(@index, value) end - # @!macro fiber_local_var_method_bind - def bind(value, &block) + # Bind the given value to fiber local storage during + # execution of the given block. + # + # @param [Object] value the value to bind + # @yield the operation to be performed with the bound variable + # @return [Object] the value + def bind(value) if block_given? old_value = self.value + self.value = value begin - self.value = value yield ensure self.value = old_value diff --git a/lib/concurrent-ruby/concurrent/atomic/locals.rb b/lib/concurrent-ruby/concurrent/atomic/locals.rb index d29a3abc8..6d268a335 100644 --- a/lib/concurrent-ruby/concurrent/atomic/locals.rb +++ b/lib/concurrent-ruby/concurrent/atomic/locals.rb @@ -32,13 +32,11 @@ module Concurrent # them. This is why we need to use a finalizer to clean up the locals array # when the EC goes out of scope. class AbstractLocals - def initialize(name_prefix = :concurrent_locals) + def initialize @free = [] @lock = Mutex.new - @all_locals = {} + @all_arrays = {} @next = 0 - - @name = :"#{name_prefix}_#{object_id}" end def synchronize @@ -53,7 +51,7 @@ def weak_synchronize alias_method :weak_synchronize, :synchronize end - def next_index(target) + def next_index(local) index = synchronize do if @free.empty? @next += 1 @@ -62,11 +60,11 @@ def next_index(target) end end - # When the target goes out of scope, we should free the associated index + # When the local goes out of scope, we should free the associated index # and all values stored into it. - ObjectSpace.define_finalizer(target, target_finalizer(index)) + ObjectSpace.define_finalizer(local, local_finalizer(index)) - return index + index end def free_index(index) @@ -78,7 +76,7 @@ def free_index(index) # # DO NOT use each_value which might conflict with new pair assignment # into the hash in #set method. - @all_locals.values.each do |locals| + @all_arrays.values.each do |locals| locals[index] = nil end @@ -87,18 +85,13 @@ def free_index(index) end end - def fetch(index, default = nil) - if locals = self.locals - value = locals[index] - end + def fetch(index) + locals = self.locals + value = locals ? locals[index] : nil - if value.nil? - if block_given? - yield - else - default - end - elsif value.equal?(NULL) + if nil == value + yield + elsif NULL.equal?(value) nil else value @@ -107,25 +100,25 @@ def fetch(index, default = nil) def set(index, value) locals = self.locals! - locals[index] = (value.nil? ? NULL : value) + locals[index] = (nil == value ? NULL : value) value end private - # When the target index goes out of scope, clean up that slot across all locals currently assigned. - def target_finalizer(index) + # When the local goes out of scope, clean up that slot across all locals currently assigned. + def local_finalizer(index) proc do free_index(index) end end - # When a target (locals) goes out of scope, delete the locals from all known locals. - def locals_finalizer(locals_object_id) - proc do |locals_id| + # When a thread/fiber goes out of scope, remove the array from @all_arrays. + def thread_fiber_finalizer(array_object_id) + proc do weak_synchronize do - @all_locals.delete(locals_object_id) + @all_arrays.delete(array_object_id) end end end @@ -146,23 +139,23 @@ def locals! # An array-backed storage of indexed variables per thread. class ThreadLocals < AbstractLocals def locals - Thread.current.thread_variable_get(@name) + Thread.current.thread_variable_get(:concurrent_thread_locals) end def locals! thread = Thread.current - locals = thread.thread_variable_get(@name) + locals = thread.thread_variable_get(:concurrent_thread_locals) unless locals - locals = thread.thread_variable_set(@name, []) + locals = thread.thread_variable_set(:concurrent_thread_locals, []) weak_synchronize do - @all_locals[locals.object_id] = locals - # When the thread goes out of scope, we should delete the associated locals: - ObjectSpace.define_finalizer(thread, locals_finalizer(locals.object_id)) + @all_arrays[locals.object_id] = locals end + # When the thread goes out of scope, we should delete the associated locals: + ObjectSpace.define_finalizer(thread, thread_fiber_finalizer(locals.object_id)) end - return locals + locals end end @@ -171,23 +164,25 @@ def locals! # An array-backed storage of indexed variables per fiber. class FiberLocals < AbstractLocals def locals - Thread.current[@name] + Thread.current[:concurrent_fiber_locals] end def locals! thread = Thread.current - locals = thread[@name] + locals = thread[:concurrent_fiber_locals] unless locals - locals = thread[@name] = [] + locals = thread[:concurrent_fiber_locals] = [] weak_synchronize do - @all_locals[locals.object_id] = locals - # When the thread goes out of scope, we should delete the associated locals: - ObjectSpace.define_finalizer(Fiber.current, locals_finalizer(locals.object_id)) + @all_arrays[locals.object_id] = locals end + # When the fiber goes out of scope, we should delete the associated locals: + ObjectSpace.define_finalizer(Fiber.current, thread_fiber_finalizer(locals.object_id)) end - return locals + locals end end + + private_constant :AbstractLocals, :ThreadLocals, :FiberLocals end diff --git a/lib/concurrent-ruby/concurrent/atomic/lock_local_var.rb b/lib/concurrent-ruby/concurrent/atomic/lock_local_var.rb index 3f954d624..ebf23a241 100644 --- a/lib/concurrent-ruby/concurrent/atomic/lock_local_var.rb +++ b/lib/concurrent-ruby/concurrent/atomic/lock_local_var.rb @@ -1,15 +1,17 @@ +require 'concurrent/utility/engine' require_relative 'fiber_local_var' require_relative 'thread_local_var' module Concurrent # @!visibility private def self.mutex_owned_per_thread? - mutex = Mutex.new + return false if Concurrent.on_jruby? || Concurrent.on_truffleruby? + mutex = Mutex.new # Lock the mutex: mutex.synchronize do # Check if the mutex is still owned in a child fiber: - Fiber.new{mutex.owned?}.resume + Fiber.new { mutex.owned? }.resume end end @@ -18,4 +20,9 @@ def self.mutex_owned_per_thread? else LockLocalVar = FiberLocalVar end + + # Either {FiberLocalVar} or {ThreadLocalVar} depending on whether Mutex (and Monitor) + # are held, respectively, per Fiber or per Thread. + class LockLocalVar + end end diff --git a/lib/concurrent-ruby/concurrent/atomic/reentrant_read_write_lock.rb b/lib/concurrent-ruby/concurrent/atomic/reentrant_read_write_lock.rb index 6b425a3c9..6d72a3a09 100644 --- a/lib/concurrent-ruby/concurrent/atomic/reentrant_read_write_lock.rb +++ b/lib/concurrent-ruby/concurrent/atomic/reentrant_read_write_lock.rb @@ -111,7 +111,7 @@ def initialize @Counter = AtomicFixnum.new(0) # single integer which represents lock state @ReadQueue = Synchronization::Lock.new # used to queue waiting readers @WriteQueue = Synchronization::Lock.new # used to queue waiting writers - @HeldCount = LockLocalVar.new(0) # indicates # of R & W locks held by this thread + @HeldCount = LockLocalVar.new(0) # indicates # of R & W locks held by this thread end # Execute a block operation within a read lock. diff --git a/lib/concurrent-ruby/concurrent/atomic/thread_local_var.rb b/lib/concurrent-ruby/concurrent/atomic/thread_local_var.rb index f3c20dfc5..3b7e12b5b 100644 --- a/lib/concurrent-ruby/concurrent/atomic/thread_local_var.rb +++ b/lib/concurrent-ruby/concurrent/atomic/thread_local_var.rb @@ -3,40 +3,51 @@ module Concurrent - # @!macro thread_local_var + # A `ThreadLocalVar` is a variable where the value is different for each thread. + # Each variable may have a default value, but when you modify the variable only + # the current thread will ever see that change. + # + # This is similar to Ruby's built-in thread-local variables (`Thread#thread_variable_get`), + # but with these major advantages: + # * `ThreadLocalVar` has its own identity, it doesn't need a Symbol. + # * Each Ruby's built-in thread-local variable leaks some memory forever (it's a Symbol held forever on the thread), + # so it's only OK to create a small amount of them. + # `ThreadLocalVar` has no such issue and it is fine to create many of them. + # * Ruby's built-in thread-local variables leak forever the value set on each thread (unless set to nil explicitly). + # `ThreadLocalVar` automatically removes the mapping for each thread once the `ThreadLocalVar` instance is GC'd. + # + # @!macro thread_safe_variable_comparison + # + # @example + # v = ThreadLocalVar.new(14) + # v.value #=> 14 + # v.value = 2 + # v.value #=> 2 + # + # @example + # v = ThreadLocalVar.new(14) + # + # t1 = Thread.new do + # v.value #=> 14 + # v.value = 1 + # v.value #=> 1 + # end + # + # t2 = Thread.new do + # v.value #=> 14 + # v.value = 2 + # v.value #=> 2 + # end + # + # v.value #=> 14 class ThreadLocalVar - LOCALS = ThreadLocals.new(:concurrent_fiber_local_var) + LOCALS = ThreadLocals.new - # @!macro thread_local_var_method_initialize + # Creates a thread local variable. # - # Creates a thread local variable. - # - # @param [Object] default the default value when otherwise unset - # @param [Proc] default_block Optional block that gets called to obtain the - # default value for each thread - - # @!macro thread_local_var_method_get - # - # Returns the value in the current thread's copy of this thread-local variable. - # - # @return [Object] the current value - - # @!macro thread_local_var_method_set - # - # Sets the current thread's copy of this thread-local variable to the specified value. - # - # @param [Object] value the value to set - # @return [Object] the new value - - # @!macro thread_local_var_method_bind - # - # Bind the given value to thread local storage during - # execution of the given block. - # - # @param [Object] value the value to bind - # @yield the operation to be performed with the bound variable - # @return [Object] the value - + # @param [Object] default the default value when otherwise unset + # @param [Proc] default_block Optional block that gets called to obtain the + # default value for each thread def initialize(default = nil, &default_block) if default && block_given? raise ArgumentError, "Cannot use both value and block as default value" @@ -53,22 +64,32 @@ def initialize(default = nil, &default_block) @index = LOCALS.next_index(self) end - # @!macro thread_local_var_method_get + # Returns the value in the current thread's copy of this thread-local variable. + # + # @return [Object] the current value def value - LOCALS.fetch(@index) {default} + LOCALS.fetch(@index) { default } end - # @!macro thread_local_var_method_set + # Sets the current thread's copy of this thread-local variable to the specified value. + # + # @param [Object] value the value to set + # @return [Object] the new value def value=(value) LOCALS.set(@index, value) end - # @!macro thread_local_var_method_bind - def bind(value, &block) + # Bind the given value to thread local storage during + # execution of the given block. + # + # @param [Object] value the value to bind + # @yield the operation to be performed with the bound variable + # @return [Object] the value + def bind(value) if block_given? old_value = self.value + self.value = value begin - self.value = value yield ensure self.value = old_value diff --git a/spec/concurrent/atomic/fiber_local_var_spec.rb b/spec/concurrent/atomic/fiber_local_var_spec.rb new file mode 100644 index 000000000..8c485fdba --- /dev/null +++ b/spec/concurrent/atomic/fiber_local_var_spec.rb @@ -0,0 +1,123 @@ +require 'concurrent/atomic/fiber_local_var' + +module Concurrent + + RSpec.describe FiberLocalVar do + + context '#initialize' do + + it 'can set an initial value' do + v = described_class.new(14) + expect(v.value).to eq 14 + end + + it 'sets nil as a default initial value' do + v = described_class.new + expect(v.value).to be_nil + end + + it 'sets the same initial value for all fibers' do + v = described_class.new(14) + f1 = in_fiber { v.value } + f2 = in_fiber { v.value } + expect(f1.resume).to eq 14 + expect(f2.resume).to eq 14 + end + + it 'can set a block to be called to get the initial value' do + v = described_class.new { 14 } + expect(v.value).to eq 14 + end + + context 'when attempting to set both an initial value and a block' do + it do + expect { described_class.new(14) { 14 } }.to raise_error(ArgumentError) + end + end + end + + context '#value' do + let(:v) { described_class.new(14) } + + it 'returns the current value' do + expect(v.value).to eq 14 + end + + it 'returns the value after modification' do + v.value = 2 + expect(v.value).to eq 2 + end + + context 'when using a block to initialize the value' do + it 'calls the block to initialize the value' do + block = proc { } + + expect(block).to receive(:call) + + v = described_class.new(&block) + v.value + end + + it 'sets the block return value as the current value' do + value = 13 + + v = described_class.new { value += 1 } + + v.value + expect(v.value).to be 14 + end + + it 'calls the block to initialize the value for each fiber' do + block = proc { } + + expect(block).to receive(:call).twice + + v = described_class.new(&block) + in_fiber { v.value }.resume + in_fiber { v.value }.resume + end + end + end + + context '#value=' do + let(:v) { described_class.new(14) } + + it 'sets a new value' do + v.value = 2 + expect(v.value).to eq 2 + end + + it 'returns the new value' do + expect(v.value = 2).to eq 2 + end + + it 'does not modify the initial value for other fibers' do + v.value = 2 + f = in_fiber { v.value } + expect(f.resume).to eq 14 + end + + it 'does not modify the value for other fibers' do + v.value = 3 + + f1 = in_fiber do + v.value = 1 + Fiber.yield + v.value + end + + f2 = in_fiber do + v.value = 2 + Fiber.yield + v.value + end + + f1.resume + f2.resume + + expect(f1.resume).to eq 1 + expect(f2.resume).to eq 2 + end + end + end +end diff --git a/spec/concurrent/atomic/lock_local_var_spec.rb b/spec/concurrent/atomic/lock_local_var_spec.rb new file mode 100644 index 000000000..f13400047 --- /dev/null +++ b/spec/concurrent/atomic/lock_local_var_spec.rb @@ -0,0 +1,20 @@ +require 'concurrent/atomic/lock_local_var' + +module Concurrent + + RSpec.describe LockLocalVar do + mutex = Mutex.new + mutex_owned_per_thread = mutex.synchronize do + Fiber.new { mutex.owned? }.resume + end + + it "uses FiberLocalVar if Mutex is per Fiber", if: !mutex_owned_per_thread do + expect(LockLocalVar).to be(FiberLocalVar) + end + + it "uses ThreadLocalVar if Mutex is per Thread", if: mutex_owned_per_thread do + expect(LockLocalVar).to be(ThreadLocalVar) + end + end + +end diff --git a/spec/concurrent/atomic/thread_local_var_spec.rb b/spec/concurrent/atomic/thread_local_var_spec.rb index 1616f23d6..906dba028 100644 --- a/spec/concurrent/atomic/thread_local_var_spec.rb +++ b/spec/concurrent/atomic/thread_local_var_spec.rb @@ -1,4 +1,3 @@ -require 'rbconfig' require 'concurrent/atomic/thread_local_var' require 'concurrent/atomic/count_down_latch' diff --git a/spec/support/example_group_extensions.rb b/spec/support/example_group_extensions.rb index 069639933..25d9739e9 100644 --- a/spec/support/example_group_extensions.rb +++ b/spec/support/example_group_extensions.rb @@ -23,13 +23,17 @@ def monotonic_interval Concurrent.monotonic_time - start_time end + def in_fiber(&block) + Fiber.new(&block) + end + def in_thread(*arguments, &block) @created_threads ||= Queue.new - new_thread = Thread.new(*arguments) do |*args, &b| + new_thread = Thread.new(*arguments) do |*args, &b| Thread.abort_on_exception = true block.call(*args, &b) end - @created_threads.push new_thread + @created_threads << new_thread new_thread end