@@ -27,6 +27,7 @@ import (
2727	conventions "go.opentelemetry.io/collector/semconv/v1.9.0" 
2828
2929	"github.com/open-telemetry/opentelemetry-collector-contrib/internal/common/testutil" 
30+ 	"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/resourcetotelemetry" 
3031)
3132
3233func  TestNewWithDefaultConfig (t  * testing.T ) {
@@ -45,10 +46,31 @@ func TestConsumeMetricsNoServer(t *testing.T) {
4546		exportertest .NewNopCreateSettings ())
4647	require .NoError (t , err )
4748	require .NoError (t , exp .Start (context .Background (), componenttest .NewNopHost ()))
48- 	require .Error (t , exp .ConsumeMetrics (context .Background (), generateLargeBatch ()))
49+ 	require .Error (t , exp .ConsumeMetrics (context .Background (), generateSmallBatch ()))
4950	require .NoError (t , exp .Shutdown (context .Background ()))
5051}
5152
53+ func  TestConsumeMetricsWithResourceToTelemetry (t  * testing.T ) {
54+ 	addr  :=  testutil .GetAvailableLocalAddress (t )
55+ 	cs  :=  newCarbonServer (t , addr , "test_0;k0=v0;k1=v1;service.name=test_carbon 0" )
56+ 	// Each metric point will generate one Carbon line, set up the wait 
57+ 	// for all of them. 
58+ 	cs .start (t , 1 )
59+ 
60+ 	exp , err  :=  newCarbonExporter (
61+ 		& Config {
62+ 			TCPAddr :                   confignet.TCPAddr {Endpoint : addr },
63+ 			TimeoutSettings :           exporterhelper.TimeoutSettings {Timeout : 5  *  time .Second },
64+ 			ResourceToTelemetryConfig : resourcetotelemetry.Settings {Enabled : true },
65+ 		},
66+ 		exportertest .NewNopCreateSettings ())
67+ 	require .NoError (t , err )
68+ 	require .NoError (t , exp .Start (context .Background (), componenttest .NewNopHost ()))
69+ 	require .NoError (t , exp .ConsumeMetrics (context .Background (), generateSmallBatch ()))
70+ 	assert .NoError (t , exp .Shutdown (context .Background ()))
71+ 	cs .shutdownAndVerify (t )
72+ }
73+ 
5274func  TestConsumeMetrics (t  * testing.T ) {
5375	if  runtime .GOOS  ==  "windows"  {
5476		t .Skip ("skipping test on windows, see https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/10147" )
@@ -94,7 +116,7 @@ func TestConsumeMetrics(t *testing.T) {
94116	for  _ , tt  :=  range  tests  {
95117		t .Run (tt .name , func (t  * testing.T ) {
96118			addr  :=  testutil .GetAvailableLocalAddress (t )
97- 			cs  :=  newCarbonServer (t , addr )
119+ 			cs  :=  newCarbonServer (t , addr ,  "" )
98120			// Each metric point will generate one Carbon line, set up the wait 
99121			// for all of them. 
100122			cs .start (t , tt .numProducers * tt .writesPerProducer * tt .md .DataPointCount ())
@@ -133,25 +155,21 @@ func TestConsumeMetrics(t *testing.T) {
133155}
134156
135157func  generateSmallBatch () pmetric.Metrics  {
136- 	metrics  :=  pmetric .NewMetrics ()
137- 	m  :=  metrics .ResourceMetrics ().AppendEmpty ().ScopeMetrics ().AppendEmpty ().Metrics ().AppendEmpty ()
138- 	m .SetName ("test_gauge" )
139- 	dp  :=  m .SetEmptyGauge ().DataPoints ().AppendEmpty ()
140- 	dp .Attributes ().PutStr ("k0" , "v0" )
141- 	dp .Attributes ().PutStr ("k1" , "v1" )
142- 	dp .SetTimestamp (pcommon .NewTimestampFromTime (time .Now ()))
143- 	dp .SetDoubleValue (123 )
144- 	return  metrics 
158+ 	return  generateMetricsBatch (1 )
145159}
146160
147161func  generateLargeBatch () pmetric.Metrics  {
162+ 	return  generateMetricsBatch (1024 )
163+ }
164+ 
165+ func  generateMetricsBatch (size  int ) pmetric.Metrics  {
148166	ts  :=  time .Now ()
149167	metrics  :=  pmetric .NewMetrics ()
150168	rm  :=  metrics .ResourceMetrics ().AppendEmpty ()
151169	rm .Resource ().Attributes ().PutStr (conventions .AttributeServiceName , "test_carbon" )
152170	ms  :=  rm .ScopeMetrics ().AppendEmpty ().Metrics ()
153171
154- 	for  i  :=  0 ; i  <  1028 ; i ++  {
172+ 	for  i  :=  0 ; i  <  size ; i ++  {
155173		m  :=  ms .AppendEmpty ()
156174		m .SetName ("test_"  +  strconv .Itoa (i ))
157175		dp  :=  m .SetEmptyGauge ().DataPoints ().AppendEmpty ()
@@ -165,19 +183,21 @@ func generateLargeBatch() pmetric.Metrics {
165183}
166184
167185type  carbonServer  struct  {
168- 	ln          * net.TCPListener 
169- 	doneServer  * atomic.Bool 
170- 	wg          sync.WaitGroup 
186+ 	ln                     * net.TCPListener 
187+ 	doneServer             * atomic.Bool 
188+ 	wg                     sync.WaitGroup 
189+ 	expectedContainsValue  string 
171190}
172191
173- func  newCarbonServer (t  * testing.T , addr  string ) * carbonServer  {
192+ func  newCarbonServer (t  * testing.T , addr  string ,  expectedContainsValue   string ) * carbonServer  {
174193	laddr , err  :=  net .ResolveTCPAddr ("tcp" , addr )
175194	require .NoError (t , err )
176195	ln , err  :=  net .ListenTCP ("tcp" , laddr )
177196	require .NoError (t , err )
178197	return  & carbonServer {
179- 		ln :         ln ,
180- 		doneServer : & atomic.Bool {},
198+ 		ln :                    ln ,
199+ 		doneServer :            & atomic.Bool {},
200+ 		expectedContainsValue : expectedContainsValue ,
181201	}
182202}
183203
@@ -198,14 +218,16 @@ func (cs *carbonServer) start(t *testing.T, numExpectedReq int) {
198218
199219				reader  :=  bufio .NewReader (conn )
200220				for  {
201- 					// Actual metric validation is done by other tests, here it 
202- 					// is just flow. 
203- 					_ , err  :=  reader .ReadBytes (byte ('\n' ))
221+ 					buf , err  :=  reader .ReadBytes (byte ('\n' ))
204222					if  errors .Is (err , io .EOF ) {
205223						return 
206224					}
207225					require .NoError (t , err )
208226
227+ 					if  cs .expectedContainsValue  !=  ""  {
228+ 						assert .Contains (t , string (buf ), cs .expectedContainsValue )
229+ 					}
230+ 
209231					cs .wg .Done ()
210232				}
211233			}(conn )
0 commit comments