Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@
require 'json'
require 'opentelemetry/sdk'
require_relative 'sampling_rule'
require_relative 'fallback_sampler'
require_relative 'sampling_rule_applier'
require_relative 'rule_cache'
require_relative 'aws_xray_sampling_client'

module OpenTelemetry
Expand Down Expand Up @@ -57,7 +59,9 @@ def initialize(endpoint: '127.0.0.1:2000', polling_interval: DEFAULT_RULES_POLLI
@target_polling_jitter_millis = (rand / 10) * 1000

@aws_proxy_endpoint = endpoint || DEFAULT_AWS_PROXY_ENDPOINT
@fallback_sampler = OpenTelemetry::Sampler::XRay::FallbackSampler.new
@client_id = self.class.generate_client_id
@rule_cache = OpenTelemetry::Sampler::XRay::RuleCache.new(resource)

@sampling_client = OpenTelemetry::Sampler::XRay::AWSXRaySamplingClient.new(@aws_proxy_endpoint)

Expand All @@ -68,10 +72,25 @@ def initialize(endpoint: '127.0.0.1:2000', polling_interval: DEFAULT_RULES_POLLI
end

def should_sample?(trace_id:, parent_context:, links:, name:, kind:, attributes:)
OpenTelemetry::SDK::Trace::Samplers::Result.new(
decision: OpenTelemetry::SDK::Trace::Samplers::Decision::DROP,
tracestate: tracestate,
attributes: attributes
if @rule_cache.expired?
OpenTelemetry.logger.debug('Rule cache is expired, so using fallback sampling strategy')
return @fallback_sampler.should_sample?(
trace_id: trace_id, parent_context: parent_context, links: links, name: name, kind: kind, attributes: attributes
)
end

matched_rule = @rule_cache.get_matched_rule(attributes)
if matched_rule
return matched_rule.should_sample?(
trace_id: trace_id, parent_context: parent_context, links: links, name: name, kind: kind, attributes: attributes
)
end

OpenTelemetry.logger.debug(
'Using fallback sampler as no rule match was found. This is likely due to a bug, since default rule should always match'
)
@fallback_sampler.should_sample?(
trace_id: trace_id, parent_context: parent_context, links: links, name: name, kind: kind, attributes: attributes
)
end

Expand Down Expand Up @@ -113,7 +132,7 @@ def update_sampling_rules(response_object)
sampling_rules << SamplingRuleApplier.new(sampling_rule)
end
end
# TODO: Add Sampling Rules to a Rule Cache
@rule_cache.update_rules(sampling_rules)
else
OpenTelemetry.logger.error('SamplingRuleRecords from GetSamplingRules request is not defined')
end
Expand Down
28 changes: 28 additions & 0 deletions sampler/xray/lib/opentelemetry/sampler/xray/fallback_sampler.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
# frozen_string_literal: true

# Copyright OpenTelemetry Authors
#
# SPDX-License-Identifier: Apache-2.0

module OpenTelemetry
module Sampler
module XRay
# FallbackSampler samples 1 req/sec and additional 5% of requests using TraceIdRatioBasedSampler.
class FallbackSampler
def initialize
@fixed_rate_sampler = OpenTelemetry::SDK::Trace::Samplers::TraceIdRatioBased.new(0.05)
end

def should_sample?(trace_id:, parent_context:, links:, name:, kind:, attributes:)
# TODO: implement and use Rate Limiting Sampler

@fixed_rate_sampler.should_sample?(trace_id: trace_id, parent_context: parent_context, links: links, name: name, kind: kind, attributes: attributes)
end

def description
'FallbackSampler{fallback sampling with sampling config of 1 req/sec and 5% of additional requests}'
end
end
end
end
end
70 changes: 70 additions & 0 deletions sampler/xray/lib/opentelemetry/sampler/xray/rule_cache.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
# frozen_string_literal: true

# Copyright OpenTelemetry Authors
#
# SPDX-License-Identifier: Apache-2.0

module OpenTelemetry
module Sampler
module XRay
# RuleCache stores all the Sampling Rule Appliers, each corresponding
# to the user's Sampling Rules that were retrieved from AWS X-Ray
class RuleCache
# The cache expires 1 hour after the last refresh time.
RULE_CACHE_TTL_MILLIS = 60 * 60 * 1000

def initialize(sampler_resource)
@rule_appliers = []
@sampler_resource = sampler_resource
@last_updated_epoch_millis = Time.now.to_i * 1000
@cache_lock = Mutex.new
end

def expired?
now_in_millis = Time.now.to_i * 1000
now_in_millis > @last_updated_epoch_millis + RULE_CACHE_TTL_MILLIS
end

def get_matched_rule(attributes)
@rule_appliers.find do |rule|
rule.matches?(attributes, @sampler_resource) || rule.sampling_rule.rule_name == 'Default'
end
end

def update_rules(new_rule_appliers)
old_rule_appliers_map = {}

@cache_lock.synchronize do
@rule_appliers.each do |rule|
old_rule_appliers_map[rule.sampling_rule.rule_name] = rule
end

new_rule_appliers.each_with_index do |new_rule, index|
rule_name_to_check = new_rule.sampling_rule.rule_name
next unless old_rule_appliers_map.key?(rule_name_to_check)

old_rule = old_rule_appliers_map[rule_name_to_check]
new_rule_appliers[index] = old_rule if new_rule.sampling_rule.equals?(old_rule.sampling_rule)
end

@rule_appliers = new_rule_appliers
sort_rules_by_priority
@last_updated_epoch_millis = Time.now.to_i * 1000
end
end

private

def sort_rules_by_priority
@rule_appliers.sort! do |rule1, rule2|
if rule1.sampling_rule.priority == rule2.sampling_rule.priority
rule1.sampling_rule.rule_name < rule2.sampling_rule.rule_name ? -1 : 1
else
rule1.sampling_rule.priority - rule2.sampling_rule.priority
end
end
end
end
end
end
end
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
require 'opentelemetry/sdk'
require 'opentelemetry-semantic_conventions'
require 'date'
require_relative 'sampling_rule'
require_relative 'statistics'
require_relative 'utils'

Expand All @@ -22,13 +23,88 @@ class SamplingRuleApplier

def initialize(sampling_rule, statistics = OpenTelemetry::Sampler::XRay::Statistics.new, target = nil)
@sampling_rule = sampling_rule
@fixed_rate_sampler = OpenTelemetry::SDK::Trace::Samplers::TraceIdRatioBased.new(@sampling_rule.fixed_rate)

# TODO: Add Reservoir Sampler (Rate Limiting Sampler)

@reservoir_expiry_time = MAX_DATE_TIME_SECONDS
@statistics = statistics
end

def matches?(attributes, resource)
http_target = nil
http_url = nil
http_method = nil
http_host = nil

unless attributes.nil?
http_target = attributes[OpenTelemetry::SemanticConventions::Trace::HTTP_TARGET]
http_url = attributes[OpenTelemetry::SemanticConventions::Trace::HTTP_URL]
http_method = attributes[OpenTelemetry::SemanticConventions::Trace::HTTP_METHOD]
http_host = attributes[OpenTelemetry::SemanticConventions::Trace::HTTP_HOST]
end

service_type = nil
resource_arn = nil

resource_hash = resource.attribute_enumerator.to_h

if resource
service_name = resource_hash[OpenTelemetry::SemanticConventions::Resource::SERVICE_NAME] || ''
cloud_platform = resource_hash[OpenTelemetry::SemanticConventions::Resource::CLOUD_PLATFORM]
service_type = OpenTelemetry::Sampler::XRay::Utils::CLOUD_PLATFORM_MAPPING[cloud_platform] if cloud_platform.is_a?(String)
resource_arn = get_arn(resource, attributes)
end

if http_target.nil? && http_url.is_a?(String)
begin
uri = URI(http_url)
http_target = uri.path.empty? ? '/' : uri.path
rescue URI::InvalidURIError
http_target = '/'
end
elsif http_target.nil? && http_url.nil?
http_target = '/'
end

OpenTelemetry::Sampler::XRay::Utils.attribute_match(attributes, @sampling_rule.attributes) &&
OpenTelemetry::Sampler::XRay::Utils.wildcard_match(@sampling_rule.host, http_host) &&
OpenTelemetry::Sampler::XRay::Utils.wildcard_match(@sampling_rule.http_method, http_method) &&
OpenTelemetry::Sampler::XRay::Utils.wildcard_match(@sampling_rule.service_name, service_name) &&
OpenTelemetry::Sampler::XRay::Utils.wildcard_match(@sampling_rule.url_path, http_target) &&
OpenTelemetry::Sampler::XRay::Utils.wildcard_match(@sampling_rule.service_type, service_type) &&
OpenTelemetry::Sampler::XRay::Utils.wildcard_match(@sampling_rule.resource_arn, resource_arn)
end

def should_sample?(trace_id:, parent_context:, links:, name:, kind:, attributes:)
OpenTelemetry::SDK::Trace::Samplers::Result.new(
# TODO: Record Sampling Statistics

result = OpenTelemetry::SDK::Trace::Samplers::Result.new(
decision: OpenTelemetry::SDK::Trace::Samplers::Decision::DROP,
tracestate: OpenTelemetry::Trace::Tracestate::DEFAULT
)

# TODO: Apply Reservoir Sampling

if result.instance_variable_get(:@decision) == OpenTelemetry::SDK::Trace::Samplers::Decision::DROP
result = @fixed_rate_sampler.should_sample?(
trace_id: trace_id, parent_context: parent_context, links: links, name: name, kind: kind, attributes: attributes
)
end

result
end

private

def get_arn(resource, attributes)
resource_hash = resource.attribute_enumerator.to_h
arn = resource_hash[OpenTelemetry::SemanticConventions::Resource::AWS_ECS_CONTAINER_ARN] ||
resource_hash[OpenTelemetry::SemanticConventions::Resource::AWS_ECS_CLUSTER_ARN] ||
resource_hash[OpenTelemetry::SemanticConventions::Resource::AWS_EKS_CLUSTER_ARN]

arn = attributes[OpenTelemetry::SemanticConventions::Trace::AWS_LAMBDA_INVOKED_ARN] || resource_hash[OpenTelemetry::SemanticConventions::Resource::FAAS_ID] if arn.nil?
arn
end
end
end
Expand Down
39 changes: 39 additions & 0 deletions sampler/xray/test/aws_xray_remote_sampler_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
assert !sampler.instance_variable_get(:@rule_poller).nil?
assert_equal(sampler.instance_variable_get(:@rule_polling_interval_millis), 300 * 1000)
assert !sampler.instance_variable_get(:@sampling_client).nil?
assert !sampler.instance_variable_get(:@rule_cache).nil?
assert_match(/[a-f0-9]{24}/, sampler.instance_variable_get(:@client_id))
end

Expand All @@ -40,6 +41,8 @@
assert !sampler.instance_variable_get(:@rule_poller).nil?
assert_equal(sampler.instance_variable_get(:@rule_polling_interval_millis), 300 * 1000)
assert !sampler.instance_variable_get(:@sampling_client).nil?
assert !sampler.instance_variable_get(:@rule_cache).nil?
assert_equal(sampler.instance_variable_get(:@rule_cache).instance_variable_get(:@sampler_resource), resource)
assert_match(/[a-f0-9]{24}/, sampler.instance_variable_get(:@client_id))
end

Expand All @@ -62,10 +65,35 @@
assert !sampler.instance_variable_get(:@rule_poller).nil?
assert_equal(sampler.instance_variable_get(:@rule_polling_interval_millis), 120 * 1000)
assert !sampler.instance_variable_get(:@sampling_client).nil?
assert !sampler.instance_variable_get(:@rule_cache).nil?
assert_equal(sampler.instance_variable_get(:@rule_cache).instance_variable_get(:@sampler_resource), resource)
assert_equal(sampler.instance_variable_get(:@aws_proxy_endpoint), 'abc.com')
assert_match(/[a-f0-9]{24}/, sampler.instance_variable_get(:@client_id))
end

it 'updates sampling rules and targets with pollers and should sample' do
stub_request(:post, "#{TEST_URL}/GetSamplingRules")
.to_return(status: 200, body: File.read(DATA_DIR_SAMPLING_RULES))
stub_request(:post, "#{TEST_URL}/SamplingTargets")
.to_return(status: 200, body: File.read(DATA_DIR_SAMPLING_TARGETS))

resource = OpenTelemetry::SDK::Resources::Resource.create(
OpenTelemetry::SemanticConventions::Resource::SERVICE_NAME => 'test-service-name',
OpenTelemetry::SemanticConventions::Resource::CLOUD_PLATFORM => 'test-cloud-platform'
)
rs = OpenTelemetry::Sampler::XRay::AWSXRayRemoteSampler.new(resource: resource)

attributes = { 'abc' => '1234' }

sleep(1.0)
test_rule_applier = rs.instance_variable_get(:@root).instance_variable_get(:@root).instance_variable_get(:@rule_cache).instance_variable_get(:@rule_appliers)[0]
assert_equal 'test', test_rule_applier.instance_variable_get(:@sampling_rule).instance_variable_get(:@rule_name)
assert_equal OpenTelemetry::SDK::Trace::Samplers::Decision::DROP,
rs.should_sample?(parent_context: nil, trace_id: '3759e988bd862e3fe1be46a994272793', name: 'name', kind: OpenTelemetry::Trace::SpanKind::SERVER, attributes: attributes, links: []).instance_variable_get(:@decision)

# TODO: Run more tests after updating Sampling Targets
end

it 'generates valid client id' do
client_id = OpenTelemetry::Sampler::XRay::InternalAWSXRayRemoteSampler.generate_client_id
assert_match(/[0-9a-z]{24}/, client_id)
Expand All @@ -81,4 +109,15 @@
expected_string = 'InternalAWSXRayRemoteSampler{aws_proxy_endpoint=127.0.0.1:2000, rule_polling_interval_millis=300000}'
assert_equal(sampler.description, expected_string)
end

def create_spans(sampled_array, thread_id, span_attributes, remote_sampler, number_of_spans)
sampled = 0
number_of_spans.times do
sampled += 1 if remote_sampler.should_sample?(parent_context: nil, trace_id: '3759e988bd862e3fe1be46a994272793', name: 'name', kind: OpenTelemetry::Trace::SpanKind::SERVER, attributes: span_attributes,
links: []).instance_variable_get(:@decision) != OpenTelemetry::SDK::Trace::Samplers::Decision::DROP
end
sampled_array[thread_id] = sampled
end

# TODO: Run tests for Reservoir Sampling
end
18 changes: 18 additions & 0 deletions sampler/xray/test/fallback_sampler_test.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# frozen_string_literal: true

# Copyright OpenTelemetry Authors
#
# SPDX-License-Identifier: Apache-2.0

require 'test_helper'

describe OpenTelemetry::Sampler::XRay::FallbackSampler do
# TODO: Add tests for Fallback sampler when Rate Limiter is implemented

it 'test_to_string' do
assert_equal(
'FallbackSampler{fallback sampling with sampling config of 1 req/sec and 5% of additional requests}',
OpenTelemetry::Sampler::XRay::FallbackSampler.new.description
)
end
end
Loading
Loading