Skip to content

Commit a0b2477

Browse files
feat: AWS X-Ray Remote Sampler Part 2 - Add Rules Caching and Rules Matching Logic (#1498)
* Add Rules Caching and Rules Matching Logic * allow sampling_rules_poller to persist in event of an error * cleanup tests, logs, and semconv --------- Co-authored-by: Kayla Reopelle <[email protected]>
1 parent 6654c6e commit a0b2477

9 files changed

+577
-11
lines changed

sampler/xray/lib/opentelemetry/sampler/xray/aws_xray_remote_sampler.rb

Lines changed: 26 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,9 @@
88
require 'json'
99
require 'opentelemetry/sdk'
1010
require_relative 'sampling_rule'
11+
require_relative 'fallback_sampler'
1112
require_relative 'sampling_rule_applier'
13+
require_relative 'rule_cache'
1214
require_relative 'aws_xray_sampling_client'
1315

1416
module OpenTelemetry
@@ -57,7 +59,9 @@ def initialize(endpoint: '127.0.0.1:2000', polling_interval: DEFAULT_RULES_POLLI
5759
@target_polling_jitter_millis = (rand / 10) * 1000
5860

5961
@aws_proxy_endpoint = endpoint || DEFAULT_AWS_PROXY_ENDPOINT
62+
@fallback_sampler = OpenTelemetry::Sampler::XRay::FallbackSampler.new
6063
@client_id = self.class.generate_client_id
64+
@rule_cache = OpenTelemetry::Sampler::XRay::RuleCache.new(resource)
6165

6266
@sampling_client = OpenTelemetry::Sampler::XRay::AWSXRaySamplingClient.new(@aws_proxy_endpoint)
6367

@@ -68,10 +72,25 @@ def initialize(endpoint: '127.0.0.1:2000', polling_interval: DEFAULT_RULES_POLLI
6872
end
6973

7074
def should_sample?(trace_id:, parent_context:, links:, name:, kind:, attributes:)
71-
OpenTelemetry::SDK::Trace::Samplers::Result.new(
72-
decision: OpenTelemetry::SDK::Trace::Samplers::Decision::DROP,
73-
tracestate: tracestate,
74-
attributes: attributes
75+
if @rule_cache.expired?
76+
OpenTelemetry.logger.debug('Rule cache is expired, so using fallback sampling strategy')
77+
return @fallback_sampler.should_sample?(
78+
trace_id: trace_id, parent_context: parent_context, links: links, name: name, kind: kind, attributes: attributes
79+
)
80+
end
81+
82+
matched_rule = @rule_cache.get_matched_rule(attributes)
83+
if matched_rule
84+
return matched_rule.should_sample?(
85+
trace_id: trace_id, parent_context: parent_context, links: links, name: name, kind: kind, attributes: attributes
86+
)
87+
end
88+
89+
OpenTelemetry.logger.debug(
90+
'Using fallback sampler as no rule match was found. This is likely due to a bug, since default rule should always match'
91+
)
92+
@fallback_sampler.should_sample?(
93+
trace_id: trace_id, parent_context: parent_context, links: links, name: name, kind: kind, attributes: attributes
7594
)
7695
end
7796

@@ -102,6 +121,8 @@ def retrieve_and_update_sampling_rules
102121
else
103122
OpenTelemetry.logger.error('GetSamplingRules Response is falsy')
104123
end
124+
rescue StandardError => e
125+
OpenTelemetry.handle_error(exception: e, message: 'Error occurred when retrieving or updating Sampling Rules')
105126
end
106127

107128
def update_sampling_rules(response_object)
@@ -113,7 +134,7 @@ def update_sampling_rules(response_object)
113134
sampling_rules << SamplingRuleApplier.new(sampling_rule)
114135
end
115136
end
116-
# TODO: Add Sampling Rules to a Rule Cache
137+
@rule_cache.update_rules(sampling_rules)
117138
else
118139
OpenTelemetry.logger.error('SamplingRuleRecords from GetSamplingRules request is not defined')
119140
end

sampler/xray/lib/opentelemetry/sampler/xray/aws_xray_sampling_client.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ def parse_endpoint(endpoint)
5151
host, port = endpoint.split(':')
5252
[host, port.to_i]
5353
rescue StandardError => e
54-
OpenTelemetry.logger.error("Invalid endpoint: #{endpoint}")
54+
OpenTelemetry.handle_error(exception: e, message: "Invalid endpoint: #{endpoint}")
5555
raise e
5656
end
5757
end
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
# frozen_string_literal: true
2+
3+
# Copyright OpenTelemetry Authors
4+
#
5+
# SPDX-License-Identifier: Apache-2.0
6+
7+
module OpenTelemetry
8+
module Sampler
9+
module XRay
10+
# FallbackSampler samples 1 req/sec and additional 5% of requests using TraceIdRatioBasedSampler.
11+
class FallbackSampler
12+
def initialize
13+
@fixed_rate_sampler = OpenTelemetry::SDK::Trace::Samplers::TraceIdRatioBased.new(0.05)
14+
end
15+
16+
def should_sample?(trace_id:, parent_context:, links:, name:, kind:, attributes:)
17+
# TODO: implement and use Rate Limiting Sampler
18+
19+
@fixed_rate_sampler.should_sample?(trace_id: trace_id, parent_context: parent_context, links: links, name: name, kind: kind, attributes: attributes)
20+
end
21+
22+
def description
23+
'FallbackSampler{fallback sampling with sampling config of 1 req/sec and 5% of additional requests}'
24+
end
25+
end
26+
end
27+
end
28+
end
Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
# frozen_string_literal: true
2+
3+
# Copyright OpenTelemetry Authors
4+
#
5+
# SPDX-License-Identifier: Apache-2.0
6+
7+
module OpenTelemetry
8+
module Sampler
9+
module XRay
10+
# RuleCache stores all the Sampling Rule Appliers, each corresponding
11+
# to the user's Sampling Rules that were retrieved from AWS X-Ray
12+
class RuleCache
13+
# The cache expires 1 hour after the last refresh time.
14+
RULE_CACHE_TTL_MILLIS = 60 * 60 * 1000
15+
16+
def initialize(sampler_resource)
17+
@rule_appliers = []
18+
@sampler_resource = sampler_resource
19+
@last_updated_epoch_millis = Time.now.to_i * 1000
20+
@cache_lock = Mutex.new
21+
end
22+
23+
def expired?
24+
now_in_millis = Time.now.to_i * 1000
25+
now_in_millis > @last_updated_epoch_millis + RULE_CACHE_TTL_MILLIS
26+
end
27+
28+
def get_matched_rule(attributes)
29+
@rule_appliers.find do |rule|
30+
rule.matches?(attributes, @sampler_resource) || rule.sampling_rule.rule_name == 'Default'
31+
end
32+
end
33+
34+
def update_rules(new_rule_appliers)
35+
old_rule_appliers_map = {}
36+
37+
@cache_lock.synchronize do
38+
@rule_appliers.each do |rule|
39+
old_rule_appliers_map[rule.sampling_rule.rule_name] = rule
40+
end
41+
42+
new_rule_appliers.each_with_index do |new_rule, index|
43+
rule_name_to_check = new_rule.sampling_rule.rule_name
44+
next unless old_rule_appliers_map.key?(rule_name_to_check)
45+
46+
old_rule = old_rule_appliers_map[rule_name_to_check]
47+
new_rule_appliers[index] = old_rule if new_rule.sampling_rule.equals?(old_rule.sampling_rule)
48+
end
49+
50+
@rule_appliers = new_rule_appliers
51+
sort_rules_by_priority
52+
@last_updated_epoch_millis = Time.now.to_i * 1000
53+
end
54+
end
55+
56+
private
57+
58+
def sort_rules_by_priority
59+
@rule_appliers.sort! do |rule1, rule2|
60+
if rule1.sampling_rule.priority == rule2.sampling_rule.priority
61+
rule1.sampling_rule.rule_name < rule2.sampling_rule.rule_name ? -1 : 1
62+
else
63+
rule1.sampling_rule.priority - rule2.sampling_rule.priority
64+
end
65+
end
66+
end
67+
end
68+
end
69+
end
70+
end

sampler/xray/lib/opentelemetry/sampler/xray/sampling_rule_applier.rb

Lines changed: 78 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
require 'opentelemetry/sdk'
88
require 'opentelemetry-semantic_conventions'
99
require 'date'
10+
require_relative 'sampling_rule'
1011
require_relative 'statistics'
1112
require_relative 'utils'
1213

@@ -19,16 +20,92 @@ class SamplingRuleApplier
1920
attr_reader :sampling_rule
2021

2122
MAX_DATE_TIME_SECONDS = Time.at(8_640_000_000_000)
23+
SEMCONV = OpenTelemetry::SemanticConventions
2224

2325
def initialize(sampling_rule, statistics = OpenTelemetry::Sampler::XRay::Statistics.new, target = nil)
2426
@sampling_rule = sampling_rule
27+
@fixed_rate_sampler = OpenTelemetry::SDK::Trace::Samplers::TraceIdRatioBased.new(@sampling_rule.fixed_rate)
28+
29+
# TODO: Add Reservoir Sampler (Rate Limiting Sampler)
30+
31+
@reservoir_expiry_time = MAX_DATE_TIME_SECONDS
32+
@statistics = statistics
33+
end
34+
35+
def matches?(attributes, resource)
36+
http_target = nil
37+
http_url = nil
38+
http_method = nil
39+
http_host = nil
40+
41+
unless attributes.nil?
42+
http_target = attributes[SEMCONV::Trace::HTTP_TARGET]
43+
http_url = attributes[SEMCONV::Trace::HTTP_URL]
44+
http_method = attributes[SEMCONV::Trace::HTTP_METHOD]
45+
http_host = attributes[SEMCONV::Trace::HTTP_HOST]
46+
end
47+
48+
service_type = nil
49+
resource_arn = nil
50+
51+
resource_hash = resource.attribute_enumerator.to_h
52+
53+
if resource
54+
service_name = resource_hash[SEMCONV::Resource::SERVICE_NAME] || ''
55+
cloud_platform = resource_hash[SEMCONV::Resource::CLOUD_PLATFORM]
56+
service_type = OpenTelemetry::Sampler::XRay::Utils::CLOUD_PLATFORM_MAPPING[cloud_platform] if cloud_platform.is_a?(String)
57+
resource_arn = get_arn(resource, attributes)
58+
end
59+
60+
if http_target.nil? && http_url.is_a?(String)
61+
begin
62+
uri = URI(http_url)
63+
http_target = uri.path.empty? ? '/' : uri.path
64+
rescue URI::InvalidURIError
65+
http_target = '/'
66+
end
67+
elsif http_target.nil? && http_url.nil?
68+
http_target = '/'
69+
end
70+
71+
OpenTelemetry::Sampler::XRay::Utils.attribute_match(attributes, @sampling_rule.attributes) &&
72+
OpenTelemetry::Sampler::XRay::Utils.wildcard_match(@sampling_rule.host, http_host) &&
73+
OpenTelemetry::Sampler::XRay::Utils.wildcard_match(@sampling_rule.http_method, http_method) &&
74+
OpenTelemetry::Sampler::XRay::Utils.wildcard_match(@sampling_rule.service_name, service_name) &&
75+
OpenTelemetry::Sampler::XRay::Utils.wildcard_match(@sampling_rule.url_path, http_target) &&
76+
OpenTelemetry::Sampler::XRay::Utils.wildcard_match(@sampling_rule.service_type, service_type) &&
77+
OpenTelemetry::Sampler::XRay::Utils.wildcard_match(@sampling_rule.resource_arn, resource_arn)
2578
end
2679

2780
def should_sample?(trace_id:, parent_context:, links:, name:, kind:, attributes:)
28-
OpenTelemetry::SDK::Trace::Samplers::Result.new(
81+
# TODO: Record Sampling Statistics
82+
83+
result = OpenTelemetry::SDK::Trace::Samplers::Result.new(
2984
decision: OpenTelemetry::SDK::Trace::Samplers::Decision::DROP,
3085
tracestate: OpenTelemetry::Trace::Tracestate::DEFAULT
3186
)
87+
88+
# TODO: Apply Reservoir Sampling
89+
90+
if result.instance_variable_get(:@decision) == OpenTelemetry::SDK::Trace::Samplers::Decision::DROP
91+
result = @fixed_rate_sampler.should_sample?(
92+
trace_id: trace_id, parent_context: parent_context, links: links, name: name, kind: kind, attributes: attributes
93+
)
94+
end
95+
96+
result
97+
end
98+
99+
private
100+
101+
def get_arn(resource, attributes)
102+
resource_hash = resource.attribute_enumerator.to_h
103+
arn = resource_hash[SEMCONV::Resource::AWS_ECS_CONTAINER_ARN] ||
104+
resource_hash[SEMCONV::Resource::AWS_ECS_CLUSTER_ARN] ||
105+
resource_hash[SEMCONV::Resource::AWS_EKS_CLUSTER_ARN]
106+
107+
arn = attributes[SEMCONV::Trace::AWS_LAMBDA_INVOKED_ARN] || resource_hash[SEMCONV::Resource::FAAS_ID] if arn.nil?
108+
arn
32109
end
33110
end
34111
end

sampler/xray/test/aws_xray_remote_sampler_test.rb

Lines changed: 43 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
DATA_DIR_SAMPLING_RULES = File.join(__dir__, 'data/test-remote-sampler_sampling-rules-response-sample.json')
1010
DATA_DIR_SAMPLING_TARGETS = File.join(__dir__, 'data/test-remote-sampler_sampling-targets-response-sample.json')
1111
TEST_URL = 'localhost:2000'
12+
SEMCONV = OpenTelemetry::SemanticConventions
1213

1314
describe OpenTelemetry::Sampler::XRay::AWSXRayRemoteSampler do
1415
it 'creates remote sampler with empty resource' do
@@ -22,6 +23,7 @@
2223
assert !sampler.instance_variable_get(:@rule_poller).nil?
2324
assert_equal(sampler.instance_variable_get(:@rule_polling_interval_millis), 300 * 1000)
2425
assert !sampler.instance_variable_get(:@sampling_client).nil?
26+
assert !sampler.instance_variable_get(:@rule_cache).nil?
2527
assert_match(/[a-f0-9]{24}/, sampler.instance_variable_get(:@client_id))
2628
end
2729

@@ -32,14 +34,16 @@
3234
.to_return(status: 200, body: File.read(DATA_DIR_SAMPLING_TARGETS))
3335

3436
resource = OpenTelemetry::SDK::Resources::Resource.create(
35-
OpenTelemetry::SemanticConventions::Resource::SERVICE_NAME => 'test-service-name',
36-
OpenTelemetry::SemanticConventions::Resource::CLOUD_PLATFORM => 'test-cloud-platform'
37+
SEMCONV::Resource::SERVICE_NAME => 'test-service-name',
38+
SEMCONV::Resource::CLOUD_PLATFORM => 'test-cloud-platform'
3739
)
3840
sampler = OpenTelemetry::Sampler::XRay::InternalAWSXRayRemoteSampler.new(resource: resource)
3941

4042
assert !sampler.instance_variable_get(:@rule_poller).nil?
4143
assert_equal(sampler.instance_variable_get(:@rule_polling_interval_millis), 300 * 1000)
4244
assert !sampler.instance_variable_get(:@sampling_client).nil?
45+
assert !sampler.instance_variable_get(:@rule_cache).nil?
46+
assert_equal(sampler.instance_variable_get(:@rule_cache).instance_variable_get(:@sampler_resource), resource)
4347
assert_match(/[a-f0-9]{24}/, sampler.instance_variable_get(:@client_id))
4448
end
4549

@@ -50,8 +54,8 @@
5054
.to_return(status: 200, body: File.read(DATA_DIR_SAMPLING_TARGETS))
5155

5256
resource = OpenTelemetry::SDK::Resources::Resource.create(
53-
OpenTelemetry::SemanticConventions::Resource::SERVICE_NAME => 'test-service-name',
54-
OpenTelemetry::SemanticConventions::Resource::CLOUD_PLATFORM => 'test-cloud-platform'
57+
SEMCONV::Resource::SERVICE_NAME => 'test-service-name',
58+
SEMCONV::Resource::CLOUD_PLATFORM => 'test-cloud-platform'
5559
)
5660
sampler = OpenTelemetry::Sampler::XRay::InternalAWSXRayRemoteSampler.new(
5761
resource: resource,
@@ -62,10 +66,34 @@
6266
assert !sampler.instance_variable_get(:@rule_poller).nil?
6367
assert_equal(sampler.instance_variable_get(:@rule_polling_interval_millis), 120 * 1000)
6468
assert !sampler.instance_variable_get(:@sampling_client).nil?
69+
assert !sampler.instance_variable_get(:@rule_cache).nil?
70+
assert_equal(sampler.instance_variable_get(:@rule_cache).instance_variable_get(:@sampler_resource), resource)
6571
assert_equal(sampler.instance_variable_get(:@aws_proxy_endpoint), 'abc.com')
6672
assert_match(/[a-f0-9]{24}/, sampler.instance_variable_get(:@client_id))
6773
end
6874

75+
it 'updates sampling rules and targets with pollers and should sample' do
76+
stub_request(:post, "#{TEST_URL}/GetSamplingRules")
77+
.to_return(status: 200, body: File.read(DATA_DIR_SAMPLING_RULES))
78+
stub_request(:post, "#{TEST_URL}/SamplingTargets")
79+
.to_return(status: 200, body: File.read(DATA_DIR_SAMPLING_TARGETS))
80+
81+
resource = OpenTelemetry::SDK::Resources::Resource.create(
82+
SEMCONV::Resource::SERVICE_NAME => 'test-service-name',
83+
SEMCONV::Resource::CLOUD_PLATFORM => 'test-cloud-platform'
84+
)
85+
rs = OpenTelemetry::Sampler::XRay::AWSXRayRemoteSampler.new(resource: resource)
86+
87+
attributes = { 'abc' => '1234' }
88+
89+
test_rule_applier = rs.instance_variable_get(:@root).instance_variable_get(:@root).instance_variable_get(:@rule_cache).instance_variable_get(:@rule_appliers)[0]
90+
assert_equal 'test', test_rule_applier.instance_variable_get(:@sampling_rule).instance_variable_get(:@rule_name)
91+
assert_equal OpenTelemetry::SDK::Trace::Samplers::Decision::DROP,
92+
rs.should_sample?(parent_context: nil, trace_id: '3759e988bd862e3fe1be46a994272793', name: 'name', kind: OpenTelemetry::Trace::SpanKind::SERVER, attributes: attributes, links: []).instance_variable_get(:@decision)
93+
94+
# TODO: Run more tests after updating Sampling Targets
95+
end
96+
6997
it 'generates valid client id' do
7098
client_id = OpenTelemetry::Sampler::XRay::InternalAWSXRayRemoteSampler.generate_client_id
7199
assert_match(/[0-9a-z]{24}/, client_id)
@@ -81,4 +109,15 @@
81109
expected_string = 'InternalAWSXRayRemoteSampler{aws_proxy_endpoint=127.0.0.1:2000, rule_polling_interval_millis=300000}'
82110
assert_equal(sampler.description, expected_string)
83111
end
112+
113+
def create_spans(sampled_array, thread_id, span_attributes, remote_sampler, number_of_spans)
114+
sampled = 0
115+
number_of_spans.times do
116+
sampled += 1 if remote_sampler.should_sample?(parent_context: nil, trace_id: '3759e988bd862e3fe1be46a994272793', name: 'name', kind: OpenTelemetry::Trace::SpanKind::SERVER, attributes: span_attributes,
117+
links: []).instance_variable_get(:@decision) != OpenTelemetry::SDK::Trace::Samplers::Decision::DROP
118+
end
119+
sampled_array[thread_id] = sampled
120+
end
121+
122+
# TODO: Run tests for Reservoir Sampling
84123
end
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
# frozen_string_literal: true
2+
3+
# Copyright OpenTelemetry Authors
4+
#
5+
# SPDX-License-Identifier: Apache-2.0
6+
7+
require 'test_helper'
8+
9+
describe OpenTelemetry::Sampler::XRay::FallbackSampler do
10+
# TODO: Add tests for Fallback sampler when Rate Limiter is implemented
11+
12+
it 'test_to_string' do
13+
assert_equal(
14+
'FallbackSampler{fallback sampling with sampling config of 1 req/sec and 5% of additional requests}',
15+
OpenTelemetry::Sampler::XRay::FallbackSampler.new.description
16+
)
17+
end
18+
end

0 commit comments

Comments
 (0)