@@ -78,6 +78,8 @@ def init(connector, config, callback)
78
78
79
79
@executor = Concurrent ::FixedThreadPool . new ( 10 )
80
80
81
+ # Used for locking the evalution and target metrics maps before we clone them
82
+ @metric_maps_mutex = Mutex . new
81
83
@evaluation_metrics = FrequencyMap . new
82
84
@target_metrics = Concurrent ::Map . new
83
85
@@ -181,21 +183,27 @@ def run_one_iteration
181
183
end
182
184
183
185
def send_data_and_reset_cache ( evaluation_metrics_map , target_metrics_map )
184
- # Clone and clear evaluation metrics map
186
+
185
187
evaluation_metrics_map_clone = Concurrent ::Map . new
188
+ target_metrics_map_clone = Concurrent ::Map . new
186
189
187
- evaluation_metrics_map . each_pair do |key , value |
188
- evaluation_metrics_map_clone [ key ] = value
189
- end
190
+ # A single lock is used to synchronise access to both the evaluation and target metrics maps.
191
+ # While separate locks could be applied to each map individually, we want an interval's eval/target
192
+ # metrics to be processed in an atomic unit.
193
+ @metric_maps_mutex . synchronize do
194
+ # Clone and clear evaluation metrics map
195
+ evaluation_metrics_map . each_pair do |key , value |
196
+ evaluation_metrics_map_clone [ key ] = value
197
+ end
190
198
191
- evaluation_metrics_map . clear
192
- target_metrics_map_clone = Concurrent ::Map . new
199
+ evaluation_metrics_map . clear
193
200
194
- target_metrics_map . each_pair do |key , value |
195
- target_metrics_map_clone [ key ] = value
196
- end
201
+ target_metrics_map . each_pair do |key , value |
202
+ target_metrics_map_clone [ key ] = value
203
+ end
197
204
198
- target_metrics_map . clear
205
+ target_metrics_map . clear
206
+ end
199
207
200
208
@evaluation_warning_issued . make_false
201
209
@target_warning_issued . make_false
0 commit comments