Skip to content
101 changes: 101 additions & 0 deletions sampler/xray/example/xray_sampling_on_rails_demonstration.ru
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
# frozen_string_literal: true

# Copyright The OpenTelemetry Authors
#
# SPDX-License-Identifier: Apache-2.0

require 'bundler/inline'

gemfile(true) do
source 'https://rubygems.org'

gem 'concurrent-ruby', '1.3.4'
gem 'rails', '~> 7.0.4'
gem 'puma'

gem 'opentelemetry-sdk'
gem 'opentelemetry-instrumentation-rails'
gem 'opentelemetry-sampler-xray', path: './../' # Use local version of the X-Ray Sampler
# gem 'opentelemetry-sampler-xray' # Use RubyGems version of the X-Ray Sampler
end

require "action_controller/railtie"
require "action_mailer/railtie"
require "rails/test_unit/railtie"

class App < Rails::Application
config.root = __dir__
config.consider_all_requests_local = true

routes.append do
root to: 'welcome#index'
get "/test" => 'welcome#test'
end
end

class WelcomeController < ActionController::Base
def index
render inline: 'Successfully called "/" endpoint'
end

def test
render inline: 'Successfully called "/test" endpoint'
end
end

ENV['OTEL_TRACES_EXPORTER'] = 'console'
ENV['OTEL_SERVICE_NAME'] = 'xray-sampler-on-rails-service'
OpenTelemetry::SDK.configure do |c|
c.use_all({ 'OpenTelemetry::Instrumentation::ActiveRecord' => { enabled: false } })
end

OpenTelemetry.tracer_provider.sampler = OpenTelemetry::Sampler::XRay::AWSXRayRemoteSampler.new(resource:OpenTelemetry::SDK::Resources::Resource.create({
"service.name"=>"xray-sampler-on-rails-service"
}))

App.initialize!

run App

#### Running and using the Sample App
# To run this example run the `rackup` command with this file
# Example: rackup trace_request_demonstration.ru
# Navigate to http://localhost:9292/
# Spans for any requests sampled by the X-Ray Sampler will appear in the console

#### Required configuration in the OpenTelemetry Collector
# In order for sampling rules to be obtained from AWS X-Ray, the awsproxy extension
# must be configured in the OpenTelemetry Collector, which will use your AWS credentials.
# - https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/extension/awsproxy#aws-proxy
# Without the awsproxy extension, the X-Ray Sampler will use a fallback sampler
# with a sampling strategy of "1 request/second, plus 5% of any additional requests"

#### Testing out configurable X-Ray Sampling Rules against the "service.name" resource attribute.
# Create a new Sampling Rule with the following matching criteria in AWS CloudWatch Settings for X-Ray Traces.
# - https://console.aws.amazon.com/cloudwatch/home#xray:settings/sampling-rules
# Matching Criteria
# ServiceName = xray-sampler-on-rails-service
# ServiceType = *
# Host = *
# ResourceARN = *
# HTTPMethod = *
# URLPath = *
# For the above matching criteria, try out the following settings to sample or not sample requests
# - Limit to 0r/sec then 0 fixed rate
# - Limit to 1r/sec then 0 fixed rate (May take 30 seconds for this setting to apply)
# - Limit to 0r/sec then 100% fixed rate

#### Testing out configurable X-Ray Sampling Rules against the "/test" endpoint in this sample app.
# Create a new Sampling Rule with the following matching criteria in AWS CloudWatch Settings for X-Ray Traces.
# - https://console.aws.amazon.com/cloudwatch/home#xray:settings/sampling-rules
# Matching Criteria
# ServiceName = *
# ServiceType = *
# Host = *
# ResourceARN = *
# HTTPMethod = *
# URLPath = /test
# For the above matching criteria, try out the following settings to sample or not sample requests
# - Limit to 0r/sec then 0 fixed rate
# - Limit to 1r/sec then 0 fixed rate (May take 30 seconds for this setting to apply)
# - Limit to 0r/sec then 100% fixed rate
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,14 @@
#
# SPDX-License-Identifier: Apache-2.0

require 'net/http'
require 'json'
require 'net/http'
require 'opentelemetry/sdk'
require_relative 'sampling_rule'
require_relative 'aws_xray_sampling_client'
require_relative 'fallback_sampler'
require_relative 'sampling_rule_applier'
require_relative 'rule_cache'
require_relative 'aws_xray_sampling_client'
require_relative 'sampling_rule'
require_relative 'sampling_rule_applier'

module OpenTelemetry
module Sampler
Expand Down Expand Up @@ -68,7 +68,8 @@ def initialize(endpoint: '127.0.0.1:2000', polling_interval: DEFAULT_RULES_POLLI
# Start the Sampling Rules poller
start_sampling_rules_poller

# TODO: Start the Sampling Targets poller
# Start the Sampling Targets poller
start_sampling_targets_poller
end

def should_sample?(trace_id:, parent_context:, links:, name:, kind:, attributes:)
Expand Down Expand Up @@ -113,6 +114,15 @@ def start_sampling_rules_poller
end
end

def start_sampling_targets_poller
@target_poller = Thread.new do
loop do
sleep(((@target_polling_interval * 1000) + @target_polling_jitter_millis) / 1000.0)
retrieve_and_update_sampling_targets
end
end
end

def retrieve_and_update_sampling_rules
sampling_rules_response = @sampling_client.fetch_sampling_rules
if sampling_rules_response&.body && sampling_rules_response.body != ''
Expand All @@ -125,6 +135,19 @@ def retrieve_and_update_sampling_rules
OpenTelemetry.handle_error(exception: e, message: 'Error occurred when retrieving or updating Sampling Rules')
end

def retrieve_and_update_sampling_targets
request_body = {
SamplingStatisticsDocuments: @rule_cache.create_sampling_statistics_documents(@client_id)
}
sampling_targets_response = @sampling_client.fetch_sampling_targets(request_body)
if sampling_targets_response&.body && sampling_targets_response.body != ''
response_body = JSON.parse(sampling_targets_response.body)
update_sampling_targets(response_body)
else
OpenTelemetry.logger.debug('SamplingTargets Response is falsy')
end
end

def update_sampling_rules(response_object)
sampling_rules = []
if response_object && response_object['SamplingRuleRecords']
Expand All @@ -140,6 +163,33 @@ def update_sampling_rules(response_object)
end
end

def update_sampling_targets(response_object)
if response_object && response_object['SamplingTargetDocuments']
target_documents = {}

response_object['SamplingTargetDocuments'].each do |new_target|
target_documents[new_target['RuleName']] = new_target
end

refresh_sampling_rules, next_polling_interval = @rule_cache.update_targets(
target_documents,
response_object['LastRuleModification']
)

@target_polling_interval = next_polling_interval

if refresh_sampling_rules
OpenTelemetry.logger.debug('Performing out-of-band sampling rule polling to fetch updated rules.')
@rule_poller&.kill
start_sampling_rules_poller
end
else
OpenTelemetry.logger.debug('SamplingTargetDocuments from SamplingTargets request is not defined')
end
rescue StandardError => e
OpenTelemetry.logger.debug("Error occurred when updating Sampling Targets: #{e}")
end

class << self
def generate_client_id
hex_chars = ('0'..'9').to_a + ('a'..'f').to_a
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,24 @@
#
# SPDX-License-Identifier: Apache-2.0

require_relative 'rate_limiting_sampler'

module OpenTelemetry
module Sampler
module XRay
# FallbackSampler samples 1 req/sec and additional 5% of requests using TraceIdRatioBasedSampler.
class FallbackSampler
def initialize
@fixed_rate_sampler = OpenTelemetry::SDK::Trace::Samplers::TraceIdRatioBased.new(0.05)
@rate_limiting_sampler = RateLimitingSampler.new(1)
end

def should_sample?(trace_id:, parent_context:, links:, name:, kind:, attributes:)
# TODO: implement and use Rate Limiting Sampler
sampling_result = @rate_limiting_sampler.should_sample?(
trace_id: trace_id, parent_context: parent_context, links: links, name: name, kind: kind, attributes: attributes
)

return sampling_result if sampling_result.instance_variable_get(:@decision) != OpenTelemetry::SDK::Trace::Samplers::Decision::DROP

@fixed_rate_sampler.should_sample?(trace_id: trace_id, parent_context: parent_context, links: links, name: name, kind: kind, attributes: attributes)
end
Expand Down
48 changes: 48 additions & 0 deletions sampler/xray/lib/opentelemetry/sampler/xray/rate_limiter.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
# frozen_string_literal: true

# Copyright OpenTelemetry Authors
#
# SPDX-License-Identifier: Apache-2.0

module OpenTelemetry
module Sampler
module XRay
# RateLimiter keeps track of the current reservoir quota balance available (measured via available time)
# If enough time has elapsed, the RateLimiter will allow quota balance to be consumed/taken (decrease available time)
# A RateLimitingSampler uses this RateLimiter to determine if it should sample or not based on the quota balance available.
class RateLimiter
def initialize(quota, max_balance_in_seconds = 1)
@max_balance_millis = max_balance_in_seconds * 1000.0
@quota = quota
@wallet_floor_millis = Time.now.to_f * 1000
# current "balance" would be `ceiling - floor`
@lock = Mutex.new
end

def take(cost = 1)
return false if @quota.zero?

quota_per_millis = @quota / 1000.0

# assume divide by zero not possible
cost_in_millis = cost / quota_per_millis

@lock.synchronize do
wallet_ceiling_millis = Time.now.to_f * 1000
current_balance_millis = wallet_ceiling_millis - @wallet_floor_millis
current_balance_millis = [current_balance_millis, @max_balance_millis].min
pending_remaining_balance_millis = current_balance_millis - cost_in_millis

if pending_remaining_balance_millis >= 0
@wallet_floor_millis = wallet_ceiling_millis - pending_remaining_balance_millis
return true
end

# No changes to the wallet state
false
end
end
end
end
end
end
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
# frozen_string_literal: true

# Copyright OpenTelemetry Authors
#
# SPDX-License-Identifier: Apache-2.0

require_relative 'rate_limiter'

module OpenTelemetry
module Sampler
module XRay
# RateLimitingSampler is a Sampler that uses a RateLimiter to determine
# if it should sample or not based on the quota balance available.
class RateLimitingSampler
def initialize(quota)
@quota = quota
@reservoir = RateLimiter.new(quota)
end

def should_sample?(trace_id:, parent_context:, links:, name:, kind:, attributes:)
tracestate = OpenTelemetry::Trace.current_span(parent_context).context.tracestate
if @reservoir.take(1)
OpenTelemetry::SDK::Trace::Samplers::Result.new(
decision: OpenTelemetry::SDK::Trace::Samplers::Decision::RECORD_AND_SAMPLE,
tracestate: tracestate,
attributes: attributes
)
else
OpenTelemetry::SDK::Trace::Samplers::Result.new(
decision: OpenTelemetry::SDK::Trace::Samplers::Decision::DROP,
tracestate: tracestate,
attributes: attributes
)
end
end

def to_s
"RateLimitingSampler{rate limiting sampling with sampling config of #{@quota} req/sec and 0% of additional requests}"
end
end
end
end
end
46 changes: 46 additions & 0 deletions sampler/xray/lib/opentelemetry/sampler/xray/rule_cache.rb
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,52 @@ def update_rules(new_rule_appliers)
end
end

def create_sampling_statistics_documents(client_id)
statistics_documents = []

@cache_lock.synchronize do
@rule_appliers.each do |rule|
statistics = rule.snapshot_statistics
now_in_seconds = Time.now.to_i

sampling_statistics_doc = {
ClientID: client_id,
RuleName: rule.sampling_rule.rule_name,
Timestamp: now_in_seconds,
RequestCount: statistics.request_count,
BorrowCount: statistics.borrow_count,
SampledCount: statistics.sample_count
}

statistics_documents << sampling_statistics_doc
end
end

statistics_documents
end

def update_targets(target_documents, last_rule_modification)
min_polling_interval = nil
next_polling_interval = DEFAULT_TARGET_POLLING_INTERVAL_SECONDS

@cache_lock.synchronize do
@rule_appliers.each_with_index do |rule, index|
target = target_documents[rule.sampling_rule.rule_name]
if target
@rule_appliers[index] = rule.with_target(target)
min_polling_interval = target['Interval'] if target['Interval'] && (min_polling_interval.nil? || min_polling_interval > target['Interval'])
else
OpenTelemetry.logger.debug('Invalid sampling target: missing rule name')
end
end

next_polling_interval = min_polling_interval if min_polling_interval

refresh_sampling_rules = last_rule_modification * 1000 > @last_updated_epoch_millis
return [refresh_sampling_rules, next_polling_interval]
end
end

private

def sort_rules_by_priority
Expand Down
Loading
Loading