Skip to content
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 32 additions & 0 deletions connector/elasticapmconnector/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,43 @@ By default, aggregated metrics will be exported without any client metadata. It
propagate client metadata from input to exported metrics by specifying a list of metadata keys
in `elasticapm::aggregation::metadata_keys`.

By default, cardinality for aggregated metrics will be limited.
Each limit defines a `max_cardinality` and a list of `attributes` that will be added to the overflow metric.
There are four limits that can be configured:
- `elasticapm::aggregation::resource_limit`: configures the max cardinality of resources
- `elasticapm::aggregation::scope_limit`: configures the max cardinality of scopes within a resource
- `elasticapm::aggregation::metric_limit`: configures the max cardinality of metrics within a scope
- `elasticapm::aggregation::datapoint_limit`: configures the max cardinality of metrics within a scope

```yaml
elasticapm:
aggregation:
directory: /path/to/aggregation/directory
metadata_keys: [list, of, metadata, keys]
resource_limit:
max_cardinality: 8000
overflow:
attributes:
- key: "overflow"
value: true
scope_limit:
max_cardinality: 4000
overflow:
attributes:
- key: "overflow"
value: true
metric_limit:
max_cardinality: 4000
overflow:
attributes:
- key: "overflow"
value: true
datapoint_limit:
max_cardinality: 4000
overflow:
attributes:
- key: "overflow"
value: true
```

### Metrics produced by the connector
Expand Down
19 changes: 18 additions & 1 deletion connector/elasticapmconnector/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,10 @@ import (
"fmt"
"time"

lsmconfig "github.com/elastic/opentelemetry-collector-components/processor/lsmintervalprocessor/config"
signaltometricsconfig "github.com/open-telemetry/opentelemetry-collector-contrib/connector/signaltometricsconnector/config"
"go.opentelemetry.io/collector/component"

lsmconfig "github.com/elastic/opentelemetry-collector-components/processor/lsmintervalprocessor/config"
)

var _ component.Config = (*Config)(nil)
Expand Down Expand Up @@ -64,6 +65,18 @@ type AggregationConfig struct {
// in all other cases -- using this configuration may lead to invalid behavior,
// and will not be supported.
Intervals []time.Duration `mapstructure:"intervals"`

// ResourceLimit defines the max cardinality of resources
ResourceLimit lsmconfig.LimitConfig `mapstructure:"resource_limit"`

// ScopeLimit defines the max cardinality of scopes within a resource
ScopeLimit lsmconfig.LimitConfig `mapstructure:"scope_limit"`

// MetricLimit defines the max cardinality of metrics within a scope
MetricLimit lsmconfig.LimitConfig `mapstructure:"metric_limit"`

// DatapointLimit defines the max cardinality of datapoints within a metric
DatapointLimit lsmconfig.LimitConfig `mapstructure:"datapoint_limit"`
}

func (cfg Config) Validate() error {
Expand Down Expand Up @@ -94,6 +107,10 @@ func (cfg Config) lsmConfig() *lsmconfig.Config {
if cfg.Aggregation != nil {
lsmConfig.Directory = cfg.Aggregation.Directory
lsmConfig.MetadataKeys = cfg.Aggregation.MetadataKeys
lsmConfig.ResourceLimit = cfg.Aggregation.ResourceLimit
lsmConfig.ScopeLimit = cfg.Aggregation.ScopeLimit
lsmConfig.MetricLimit = cfg.Aggregation.MetricLimit
lsmConfig.DatapointLimit = cfg.Aggregation.DatapointLimit
}
return lsmConfig
}
Expand Down
40 changes: 39 additions & 1 deletion connector/elasticapmconnector/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,14 @@ import (
"testing"
"time"

"github.com/elastic/opentelemetry-collector-components/connector/elasticapmconnector/internal/metadata"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/confmap/confmaptest"
"go.opentelemetry.io/collector/confmap/xconfmap"

"github.com/elastic/opentelemetry-collector-components/connector/elasticapmconnector/internal/metadata"
lsmconfig "github.com/elastic/opentelemetry-collector-components/processor/lsmintervalprocessor/config"
)

func TestConfig(t *testing.T) {
Expand All @@ -47,6 +49,42 @@ func TestConfig(t *testing.T) {
Directory: "/path/to/aggregation/state",
MetadataKeys: []string{"a", "B", "c"},
Intervals: []time.Duration{time.Second, time.Minute},
ResourceLimit: lsmconfig.LimitConfig{
MaxCardinality: 1,
Overflow: lsmconfig.OverflowConfig{
Attributes: []lsmconfig.Attribute{
{Key: "overflow", Value: true},
{Key: "other_overflow_label", Value: "overflow"},
},
},
},
ScopeLimit: lsmconfig.LimitConfig{
MaxCardinality: 1,
Overflow: lsmconfig.OverflowConfig{
Attributes: []lsmconfig.Attribute{
{Key: "overflow", Value: true},
{Key: "other_overflow_label", Value: "overflow"},
},
},
},
MetricLimit: lsmconfig.LimitConfig{
MaxCardinality: 1,
Overflow: lsmconfig.OverflowConfig{
Attributes: []lsmconfig.Attribute{
{Key: "overflow", Value: true},
{Key: "other_overflow_label", Value: "overflow"},
},
},
},
DatapointLimit: lsmconfig.LimitConfig{
MaxCardinality: 1,
Overflow: lsmconfig.OverflowConfig{
Attributes: []lsmconfig.Attribute{
{Key: "overflow", Value: true},
{Key: "other_overflow_label", Value: "overflow"},
},
},
},
},
},
},
Expand Down
104 changes: 91 additions & 13 deletions connector/elasticapmconnector/connector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ import (
"path/filepath"
"testing"

"github.com/elastic/opentelemetry-collector-components/connector/elasticapmconnector/internal/metadata"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/golden"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest/pmetrictest"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/client"
Expand All @@ -36,25 +37,45 @@ import (
"go.opentelemetry.io/collector/pdata/plog"
"go.opentelemetry.io/collector/pdata/pmetric"

"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/golden"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest/pmetrictest"
"github.com/elastic/opentelemetry-collector-components/connector/elasticapmconnector/internal/metadata"
lsmconfig "github.com/elastic/opentelemetry-collector-components/processor/lsmintervalprocessor/config"
)

var update = flag.Bool("update", false, "Update golden files")

func TestConnector_LogsToMetrics(t *testing.T) {
oneCardinalityLimitConfig := lsmconfig.LimitConfig{
MaxCardinality: 1,
Overflow: lsmconfig.OverflowConfig{
Attributes: []lsmconfig.Attribute{{Key: "test_overflow", Value: any(true)}},
},
}
oneCardinalityAggregationConfig := Config{
Aggregation: &AggregationConfig{
ResourceLimit: oneCardinalityLimitConfig,
ScopeLimit: oneCardinalityLimitConfig,
MetricLimit: oneCardinalityLimitConfig,
DatapointLimit: oneCardinalityLimitConfig,
},
}

testCases := []struct {
name string
cfg *Config
}{
{name: "logs/service_summary"},
// output should remain the same for all provided configs
{name: "logs/service_summary", cfg: &Config{}},
{name: "logs/service_summary", cfg: &oneCardinalityAggregationConfig},

// output should show overflow behavior
{name: "logs/service_summary_overflow", cfg: &oneCardinalityAggregationConfig},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
nextMetrics := &consumertest.MetricsSink{}

cfg := &Config{}
l2m := newLogsToMetrics(t, connectortest.NewNopSettings(metadata.Type), cfg, nextMetrics)
l2m := newLogsToMetrics(t, connectortest.NewNopSettings(metadata.Type), tc.cfg, nextMetrics)

dir := filepath.Join("testdata", tc.name)
input, err := golden.ReadLogs(filepath.Join(dir, "input.yaml"))
Expand All @@ -77,18 +98,38 @@ func TestConnector_LogsToMetrics(t *testing.T) {
}

func TestConnector_MetricsToMetrics(t *testing.T) {
oneCardinalityLimitConfig := lsmconfig.LimitConfig{
MaxCardinality: 1,
Overflow: lsmconfig.OverflowConfig{
Attributes: []lsmconfig.Attribute{{Key: "test_overflow", Value: any(true)}},
},
}
oneCardinalityAggregationConfig := Config{
Aggregation: &AggregationConfig{
ResourceLimit: oneCardinalityLimitConfig,
ScopeLimit: oneCardinalityLimitConfig,
MetricLimit: oneCardinalityLimitConfig,
DatapointLimit: oneCardinalityLimitConfig,
},
}

testCases := []struct {
name string
cfg *Config
}{
{name: "metrics/service_summary"},
// output should remain the same for all provided configs
{name: "metrics/service_summary", cfg: &Config{}},
{name: "metrics/service_summary", cfg: &oneCardinalityAggregationConfig},

// output should show overflow
{name: "metrics/service_summary_overflow", cfg: &oneCardinalityAggregationConfig},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
nextMetrics := &consumertest.MetricsSink{}

cfg := &Config{}
m2m := newMetricsConnector(t, connectortest.NewNopSettings(metadata.Type), cfg, nextMetrics)
m2m := newMetricsConnector(t, connectortest.NewNopSettings(metadata.Type), tc.cfg, nextMetrics)

dir := filepath.Join("testdata", tc.name)
input, err := golden.ReadMetrics(filepath.Join(dir, "input.yaml"))
Expand All @@ -111,19 +152,55 @@ func TestConnector_MetricsToMetrics(t *testing.T) {
}

func TestConnector_TracesToMetrics(t *testing.T) {
fourCardinalityLimitConfig := lsmconfig.LimitConfig{
MaxCardinality: 4, // min limit to prevent overflow behavior
Overflow: lsmconfig.OverflowConfig{
Attributes: []lsmconfig.Attribute{{Key: "test_overflow", Value: any(true)}},
},
}
fourCardinalityAggregationConfig := Config{
Aggregation: &AggregationConfig{
ResourceLimit: fourCardinalityLimitConfig,
ScopeLimit: fourCardinalityLimitConfig,
MetricLimit: fourCardinalityLimitConfig,
DatapointLimit: fourCardinalityLimitConfig,
},
}

oneCardinalityLimitConfig := lsmconfig.LimitConfig{
MaxCardinality: 1,
Overflow: lsmconfig.OverflowConfig{
Attributes: []lsmconfig.Attribute{{Key: "test_overflow", Value: any(true)}},
},
}
oneCardinalityAggregationConfig := Config{
Aggregation: &AggregationConfig{
ResourceLimit: oneCardinalityLimitConfig,
ScopeLimit: oneCardinalityLimitConfig,
MetricLimit: oneCardinalityLimitConfig,
DatapointLimit: oneCardinalityLimitConfig,
},
}

testCases := []struct {
name string
cfg *Config
}{
{name: "traces/transaction_metrics"},
{name: "traces/span_metrics"},
// output should remain the same for all provided configs
{name: "traces/transaction_metrics", cfg: &Config{}},
{name: "traces/transaction_metrics", cfg: &fourCardinalityAggregationConfig},
{name: "traces/span_metrics", cfg: &Config{}},
{name: "traces/span_metrics", cfg: &fourCardinalityAggregationConfig},

// output should show overflow
{name: "traces/span_metrics_overflow", cfg: &oneCardinalityAggregationConfig},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
nextMetrics := &consumertest.MetricsSink{}

cfg := &Config{}
t2m := newTracesConnector(t, connectortest.NewNopSettings(metadata.Type), cfg, nextMetrics)
t2m := newTracesConnector(t, connectortest.NewNopSettings(metadata.Type), tc.cfg, nextMetrics)

dir := filepath.Join("testdata", tc.name)
input, err := golden.ReadTraces(filepath.Join(dir, "input.yaml"))
Expand Down Expand Up @@ -162,6 +239,7 @@ func TestConnector_AggregationDirectory(t *testing.T) {
require.NoError(t, err)
require.NotEmpty(t, entries)
}

func TestConnector_AggregationMetadataKeys(t *testing.T) {
cfg := &Config{Aggregation: &AggregationConfig{MetadataKeys: []string{"k"}}}

Expand Down
22 changes: 21 additions & 1 deletion connector/elasticapmconnector/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (

"github.com/elastic/opentelemetry-collector-components/connector/elasticapmconnector/internal/metadata"
"github.com/elastic/opentelemetry-collector-components/internal/sharedcomponent"
lsmconfig "github.com/elastic/opentelemetry-collector-components/processor/lsmintervalprocessor/config"
)

type sharedcomponentKey struct {
Expand All @@ -48,7 +49,26 @@ func NewFactory() connector.Factory {

// createDefaultConfig creates the default configuration.
func createDefaultConfig() component.Config {
return &Config{}
return &Config{
Aggregation: &AggregationConfig{
ResourceLimit: lsmconfig.LimitConfig{
MaxCardinality: 8000,
Overflow: lsmconfig.OverflowConfig{},
},
ScopeLimit: lsmconfig.LimitConfig{
MaxCardinality: 4000,
Overflow: lsmconfig.OverflowConfig{},
},
MetricLimit: lsmconfig.LimitConfig{
MaxCardinality: 4000,
Overflow: lsmconfig.OverflowConfig{},
},
DatapointLimit: lsmconfig.LimitConfig{
MaxCardinality: 4000,
Overflow: lsmconfig.OverflowConfig{},
},
},
}
}

func createLogsToMetrics(
Expand Down
33 changes: 33 additions & 0 deletions connector/elasticapmconnector/testdata/config/full.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,36 @@ elasticapm:
directory: /path/to/aggregation/state
metadata_keys: [a, B, c]
intervals: [1s, 1m]
resource_limit:
max_cardinality: 1
overflow:
attributes:
- key: "overflow"
value: true
- key: "other_overflow_label"
value: "overflow"
scope_limit:
max_cardinality: 1
overflow:
attributes:
- key: "overflow"
value: true
- key: "other_overflow_label"
value: "overflow"
metric_limit:
max_cardinality: 1
overflow:
attributes:
- key: "overflow"
value: true
- key: "other_overflow_label"
value: "overflow"
datapoint_limit:
max_cardinality: 1
overflow:
attributes:
- key: "overflow"
value: true
- key: "other_overflow_label"
value: "overflow"

Loading