Skip to content

Commit 3efea70

Browse files
committed
Revert back to shared implementation of "locals" storage array.
1 parent 21cea96 commit 3efea70

File tree

3 files changed

+191
-18
lines changed

3 files changed

+191
-18
lines changed

lib/concurrent-ruby/concurrent/atomic/fiber_local_var.rb

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,10 @@
11
require 'concurrent/constants'
2+
require_relative 'locals'
23

34
module Concurrent
45
class FiberLocalVar
6+
LOCALS = FiberLocals.new(:concurrent_fiber_local_var)
7+
58
def initialize(default = nil, &default_block)
69
if default && block_given?
710
raise ArgumentError, "Cannot use both value and block as default value"
@@ -15,17 +18,17 @@ def initialize(default = nil, &default_block)
1518
@default = default
1619
end
1720

18-
@name = :"concurrent_variable_#{object_id}"
21+
@index = LOCALS.next_index(self)
1922
end
2023

2124
# @!macro thread_local_var_method_get
2225
def value
23-
Thread.current.fetch(@name) {default}
26+
LOCALS.fetch(@index) {default}
2427
end
2528

2629
# @!macro thread_local_var_method_set
2730
def value=(value)
28-
Thread.current[@name] = value
31+
LOCALS.set(@index, value)
2932
end
3033

3134
# @!macro thread_local_var_method_bind
Lines changed: 179 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,179 @@
1+
require 'concurrent/constants'
2+
3+
module Concurrent
4+
# @!visibility private
5+
# @!macro internal_implementation_note
6+
#
7+
# An abstract implementation of local storage, with sub-classes for
8+
# per-thread and per-fiber locals.
9+
#
10+
# Each execution context (EC, thread or fiber) has a lazily initialized array
11+
# of local variable values. Each time a new local variable is created, we
12+
# allocate an "index" for it.
13+
#
14+
# For example, if the allocated index is 1, that means slot #1 in EVERY EC's
15+
# locals array will be used for the value of that variable.
16+
#
17+
# The good thing about using a per-EC structure to hold values, rather than
18+
# a global, is that no synchronization is needed when reading and writing
19+
# those values (since the structure is only ever accessed by a single
20+
# thread).
21+
#
22+
# Of course, when a local variable is GC'd, 1) we need to recover its index
23+
# for use by other new local variables (otherwise the locals arrays could
24+
# get bigger and bigger with time), and 2) we need to null out all the
25+
# references held in the now-unused slots (both to avoid blocking GC of those
26+
# objects, and also to prevent "stale" values from being passed on to a new
27+
# local when the index is reused).
28+
#
29+
# Because we need to null out freed slots, we need to keep references to
30+
# ALL the locals arrays, so we can null out the appropriate slots in all of
31+
# them. This is why we need to use a finalizer to clean up the locals array
32+
# when the EC goes out of scope.
33+
class AbstractLocals
34+
def initialize(name_prefix = :concurrent_locals)
35+
@free = []
36+
@lock = Mutex.new
37+
@all_locals = {}
38+
@next = 0
39+
40+
@name = :"#{name_prefix}_#{object_id}"
41+
end
42+
43+
def synchronize
44+
@lock.synchronize { yield }
45+
end
46+
47+
if Concurrent.on_cruby?
48+
def weak_synchronize
49+
yield
50+
end
51+
else
52+
alias_method :weak_synchronize, :synchronize
53+
end
54+
55+
def next_index(target)
56+
index = synchronize do
57+
if @free.empty?
58+
@next += 1
59+
else
60+
@free.pop
61+
end
62+
end
63+
64+
# When the target goes out of scope, we should free the associated index and all values stored into it.
65+
ObjectSpace.define_finalizer(target, target_finalizer(index))
66+
67+
return index
68+
end
69+
70+
def free_index(index)
71+
weak_synchronize do
72+
@all_locals.values.each do |locals|
73+
locals[index] = nil
74+
end
75+
76+
@free << index
77+
end
78+
end
79+
80+
def fetch(index, default = nil)
81+
if locals = self.locals
82+
value = locals[index]
83+
end
84+
85+
if value.nil?
86+
if block_given?
87+
yield
88+
else
89+
default
90+
end
91+
elsif value.equal?(NULL)
92+
nil
93+
else
94+
value
95+
end
96+
end
97+
98+
def set(index, value)
99+
locals = self.locals!
100+
locals[index] = (value.nil? ? NULL : value)
101+
102+
value
103+
end
104+
105+
private
106+
107+
# When the target index goes out of scope, clean up that slot across all locals currently assigned.
108+
def target_finalizer(index)
109+
proc do
110+
free_index(index)
111+
end
112+
end
113+
114+
# When a target (locals) goes out of scope, delete the locals from all known locals.
115+
def locals_finalizer(locals_object_id)
116+
proc do |locals_id|
117+
weak_synchronize do
118+
@all_locals.delete(locals_object_id)
119+
end
120+
end
121+
end
122+
123+
# Returns the locals for the current scope, or nil if none exist.
124+
def locals
125+
raise NotImplementedError
126+
end
127+
128+
# Returns the locals for the current scope, creating them if necessary.
129+
def locals!
130+
raise NotImplementedError
131+
end
132+
end
133+
134+
# An array-backed storage of indexed variables per thread.
135+
class ThreadLocals < AbstractLocals
136+
def locals
137+
Thread.current.thread_variable_get(@name)
138+
end
139+
140+
def locals!
141+
thread = Thread.current
142+
locals = thread.thread_variable_get(@name)
143+
144+
unless locals
145+
locals = thread.thread_variable_set(@name, [])
146+
weak_synchronize do
147+
@all_locals[locals.object_id] = locals
148+
# When the thread goes out of scope, we should delete the associated locals:
149+
ObjectSpace.define_finalizer(thread, locals_finalizer(locals.object_id))
150+
end
151+
end
152+
153+
return locals
154+
end
155+
end
156+
157+
# An array-backed storage of indexed variables per fiber.
158+
class FiberLocals < AbstractLocals
159+
def locals
160+
Thread.current[@name]
161+
end
162+
163+
def locals!
164+
thread = Thread.current
165+
locals = thread[@name]
166+
167+
unless locals
168+
locals = thread[@name] = []
169+
weak_synchronize do
170+
@all_locals[locals.object_id] = locals
171+
# When the thread goes out of scope, we should delete the associated locals:
172+
ObjectSpace.define_finalizer(Fiber.current, locals_finalizer(locals.object_id))
173+
end
174+
end
175+
176+
return locals
177+
end
178+
end
179+
end

lib/concurrent-ruby/concurrent/atomic/thread_local_var.rb

Lines changed: 6 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,12 @@
11
require 'concurrent/constants'
2+
require_relative 'locals'
23

34
module Concurrent
45

56
# @!macro thread_local_var
67
class ThreadLocalVar
8+
LOCALS = ThreadLocals.new(:concurrent_fiber_local_var)
9+
710
# @!macro thread_local_var_method_initialize
811
def initialize(default = nil, &default_block)
912
if default && block_given?
@@ -18,29 +21,17 @@ def initialize(default = nil, &default_block)
1821
@default = default
1922
end
2023

21-
@name = :"concurrent_variable_#{object_id}"
24+
@index = LOCALS.next_index(self)
2225
end
2326

2427
# @!macro thread_local_var_method_get
2528
def value
26-
value = Thread.current.thread_variable_get(@name)
27-
28-
if value.nil?
29-
default
30-
elsif value.equal?(NULL)
31-
nil
32-
else
33-
value
34-
end
29+
LOCALS.fetch(@index) {default}
3530
end
3631

3732
# @!macro thread_local_var_method_set
3833
def value=(value)
39-
if value.nil?
40-
value = NULL
41-
end
42-
43-
Thread.current.thread_variable_set(@name, value)
34+
LOCALS.set(@index, value)
4435
end
4536

4637
# @!macro thread_local_var_method_bind

0 commit comments

Comments
 (0)