@@ -17,7 +17,11 @@ def initialize(options = nil, &block)
17
17
18
18
def increment ( key )
19
19
compute ( key ) do |old_value |
20
- if old_value == nil ; 1 else old_value + 1 end
20
+ if old_value == nil ;
21
+ 1
22
+ else
23
+ old_value + 1
24
+ end
21
25
end
22
26
end
23
27
@@ -39,7 +43,6 @@ def drain_to_map
39
43
end
40
44
end
41
45
42
-
43
46
def init ( connector , config , callback )
44
47
45
48
unless connector . kind_of? ( Connector )
@@ -59,14 +62,12 @@ def init(connector, config, callback)
59
62
@connector = connector
60
63
61
64
@sdk_type = "SDK_TYPE"
62
- @global_target_set = Set [ ]
63
- @staging_target_set = Set [ ]
64
65
@target_attribute = "target"
65
- @global_target = "__global__cf_target" # <--- This target identifier is used to aggregate and send data for all
66
+ @global_target_identifier = "__global__cf_target" # <--- This target identifier is used to aggregate and send data for all
66
67
# targets as a summary
67
-
68
+ @global_target = Target . new ( "RubySDK1" , identifier = @global_target_identifier , name = @global_target_name )
68
69
@ready = false
69
- @jar_version = ""
70
+ @jar_version = Ff :: Ruby :: Server :: Sdk :: VERSION
70
71
@server = "server"
71
72
@sdk_version = "SDK_VERSION"
72
73
@sdk_language = "SDK_LANGUAGE"
@@ -76,10 +77,20 @@ def init(connector, config, callback)
76
77
77
78
@executor = Concurrent ::FixedThreadPool . new ( 10 )
78
79
79
- @frequency_map = FrequencyMap . new
80
+ @evaluation_metrics = FrequencyMap . new
81
+ @target_metrics = Concurrent ::Map . new
82
+
83
+ # Keep track of targets that have already been sent to avoid sending them again
84
+ @seen_targets = Concurrent ::Map . new
80
85
81
86
@max_buffer_size = config . buffer_size - 1
82
87
88
+ # Max 100k targets per interval
89
+ @max_targets_buffer_size = 100000
90
+
91
+ @evaluation_warning_issued = Concurrent ::AtomicBoolean . new
92
+ @target_warning_issued = Concurrent ::AtomicBoolean . new
93
+
83
94
@callback . on_metrics_ready
84
95
end
85
96
@@ -99,65 +110,102 @@ def close
99
110
end
100
111
101
112
def register_evaluation ( target , feature_config , variation )
113
+ register_evaluation_metric ( feature_config , variation )
114
+ register_target_metric ( target )
115
+ end
102
116
103
- if @frequency_map . size > @max_buffer_size
104
- @config . logger . warn "metrics buffer is full #{ @frequency_map . size } - flushing metrics"
105
- @executor . post do
106
- run_one_iteration
117
+ private
118
+
119
+ def register_evaluation_metric ( feature_config , variation )
120
+ if @evaluation_metrics . size > @max_buffer_size
121
+ unless @evaluation_warning_issued . true?
122
+ SdkCodes . warn_metrics_evaluations_max_size_exceeded ( @config . logger )
123
+ @evaluation_warning_issued . make_true
107
124
end
125
+ return
108
126
end
109
127
110
- event = MetricsEvent . new ( feature_config , target , variation )
111
- @frequency_map . increment event
128
+ event = MetricsEvent . new ( feature_config , @global_target , variation )
129
+ @evaluation_metrics . increment event
112
130
end
113
131
114
- private
132
+ def register_target_metric ( target )
133
+ if @target_metrics . size > @max_targets_buffer_size
134
+ unless @target_warning_issued . true?
135
+ SdkCodes . warn_metrics_targets_max_size_exceeded ( @config . logger )
136
+ @target_warning_issued . make_true
137
+ end
138
+ return
139
+ end
140
+
141
+ if target . is_private
142
+ return
143
+ end
144
+
145
+ already_seen = @seen_targets . put_if_absent ( target . identifier , true )
146
+
147
+ if already_seen
148
+ return
149
+ end
150
+
151
+ @target_metrics . put ( target . identifier , target )
152
+ end
115
153
116
154
def run_one_iteration
117
- send_data_and_reset_cache @frequency_map . drain_to_map
155
+ send_data_and_reset_cache ( @evaluation_metrics , @target_metrics )
118
156
119
- @config . logger . debug "metrics: frequency map size #{ @frequency_map . size } . global target size #{ @global_target_set . size } "
157
+ @config . logger . debug "metrics: frequency map size #{ @evaluation_metrics . size } . targets map size #{ @target_metrics . size } global target size #{ @seen_targets . size } "
120
158
end
121
159
122
- def send_data_and_reset_cache ( map )
123
- metrics = prepare_summary_metrics_body ( map )
160
+ def send_data_and_reset_cache ( evaluation_metrics_map , target_metrics_map )
161
+ evaluation_metrics_map_clone = evaluation_metrics_map . drain_to_map
162
+
163
+ target_metrics_map_clone = Concurrent ::Map . new
164
+
165
+ target_metrics_map . each_pair do |key , value |
166
+ target_metrics_map_clone [ key ] = value
167
+ end
168
+
169
+ target_metrics_map . clear
170
+
171
+ @evaluation_warning_issued . make_false
172
+ @target_warning_issued . make_false
173
+
174
+ metrics = prepare_summary_metrics_body ( evaluation_metrics_map_clone , target_metrics_map_clone )
124
175
125
- if ! metrics . metrics_data . empty? && ! metrics . target_data . empty?
176
+ unless metrics . metrics_data . empty?
126
177
start_time = ( Time . now . to_f * 1000 ) . to_i
127
178
@connector . post_metrics ( metrics )
128
179
end_time = ( Time . now . to_f * 1000 ) . to_i
129
180
if end_time - start_time > @config . metrics_service_acceptable_duration
130
181
@config . logger . debug "Metrics service API duration=[" + ( end_time - start_time ) . to_s + "]"
131
182
end
132
183
end
133
-
134
- @global_target_set . merge ( @staging_target_set )
135
- @staging_target_set . clear
136
-
137
184
end
138
185
139
- def prepare_summary_metrics_body ( freq_map )
186
+ def prepare_summary_metrics_body ( evaluation_metrics_map , target_metrics_map )
140
187
metrics = OpenapiClient ::Metrics . new ( { :target_data => [ ] , :metrics_data => [ ] } )
141
- add_target_data ( metrics , Target . new ( name = @global_target_name , identifier = @global_target ) )
142
- freq_map . each_key do |key |
143
- add_target_data ( metrics , key . target )
144
- end
188
+
145
189
total_count = 0
146
- freq_map . each do |key , value |
190
+ evaluation_metrics_map . each do |key , value |
147
191
total_count += value
148
192
metrics_data = OpenapiClient ::MetricsData . new ( { :attributes => [ ] } )
149
193
metrics_data . timestamp = ( Time . now . to_f * 1000 ) . to_i
150
194
metrics_data . count = value
151
195
metrics_data . metrics_type = "FFMETRICS"
152
196
metrics_data . attributes . push ( OpenapiClient ::KeyValue . new ( { :key => @feature_name_attribute , :value => key . feature_config . feature } ) )
153
197
metrics_data . attributes . push ( OpenapiClient ::KeyValue . new ( { :key => @variation_identifier_attribute , :value => key . variation . identifier } ) )
154
- metrics_data . attributes . push ( OpenapiClient ::KeyValue . new ( { :key => @target_attribute , :value => @global_target } ) )
198
+ metrics_data . attributes . push ( OpenapiClient ::KeyValue . new ( { :key => @target_attribute , :value => @global_target_identifier } ) )
155
199
metrics_data . attributes . push ( OpenapiClient ::KeyValue . new ( { :key => @sdk_type , :value => @server } ) )
156
200
metrics_data . attributes . push ( OpenapiClient ::KeyValue . new ( { :key => @sdk_language , :value => "ruby" } ) )
157
201
metrics_data . attributes . push ( OpenapiClient ::KeyValue . new ( { :key => @sdk_version , :value => @jar_version } ) )
158
202
metrics . metrics_data . push ( metrics_data )
159
203
end
160
- @config . logger . debug "Pushed #{ total_count } metric evaluations to server. metrics_data count is #{ freq_map . size } "
204
+ @config . logger . debug "Pushed #{ total_count } metric evaluations to server. metrics_data count is #{ evaluation_metrics_map . size } . target_data count is #{ target_metrics_map . size } "
205
+
206
+ target_metrics_map . each_pair do |_ , value |
207
+ add_target_data ( metrics , value )
208
+ end
161
209
162
210
metrics
163
211
end
@@ -167,28 +215,25 @@ def add_target_data(metrics, target)
167
215
target_data = OpenapiClient ::TargetData . new ( { :attributes => [ ] } )
168
216
private_attributes = target . private_attributes
169
217
170
- if !@staging_target_set . include? ( target ) && !@global_target_set . include? ( target ) && !target . is_private
171
- @staging_target_set . add ( target )
172
- attributes = target . attributes
173
- attributes . each do |k , v |
174
- key_value = OpenapiClient ::KeyValue . new
175
- if !private_attributes . empty?
176
- unless private_attributes . include? ( k )
177
- key_value = OpenapiClient ::KeyValue . new ( { :key => k , :value => v . to_s } )
178
- end
179
- else
218
+ attributes = target . attributes
219
+ attributes . each do |k , v |
220
+ key_value = OpenapiClient ::KeyValue . new
221
+ if !private_attributes . empty?
222
+ unless private_attributes . include? ( k )
180
223
key_value = OpenapiClient ::KeyValue . new ( { :key => k , :value => v . to_s } )
181
224
end
182
- target_data . attributes . push ( key_value )
183
- end
184
- target_data . identifier = target . identifier
185
- if target . name == nil || target . name == ""
186
- target_data . name = target . identifier
187
225
else
188
- target_data . name = target . name
226
+ key_value = OpenapiClient :: KeyValue . new ( { :key => k , :value => v . to_s } )
189
227
end
190
- metrics . target_data . push ( target_data )
228
+ target_data . attributes . push ( key_value )
229
+ end
230
+ target_data . identifier = target . identifier
231
+ if target . name == nil || target . name == ""
232
+ target_data . name = target . identifier
233
+ else
234
+ target_data . name = target . name
191
235
end
236
+ metrics . target_data . push ( target_data )
192
237
end
193
238
194
239
def start_async
@@ -218,7 +263,7 @@ def get_version
218
263
end
219
264
220
265
def get_frequency_map
221
- @frequency_map
266
+ @evaluation_metrics
222
267
end
223
268
224
269
end
0 commit comments