@@ -13,6 +13,7 @@ import (
1313 "github.com/cloudquery/plugin-sdk/v4/scheduler/metrics"
1414 "github.com/cloudquery/plugin-sdk/v4/scheduler/resolvers"
1515 "github.com/cloudquery/plugin-sdk/v4/schema"
16+ "github.com/samber/lo"
1617 "go.opentelemetry.io/otel"
1718 "go.opentelemetry.io/otel/attribute"
1819 "go.opentelemetry.io/otel/trace"
@@ -157,8 +158,11 @@ func (s *syncClient) resolveResourcesDfs(ctx context.Context, table *schema.Tabl
157158 go func () {
158159 defer close (resourcesChan )
159160 var wg sync.WaitGroup
160- for i := range resourcesSlice {
161- i := i
161+ chunks := [][]any {resourcesSlice }
162+ if table .PreResourceChunkResolver != nil {
163+ chunks = lo .Chunk (resourcesSlice , table .PreResourceChunkResolver .ChunkSize )
164+ }
165+ for i := range chunks {
162166 resourceConcurrencyKey := table .Name + "-" + client .ID () + "-" + "resource"
163167 resourceSemVal , _ := s .scheduler .singleTableConcurrency .LoadOrStore (resourceConcurrencyKey , semaphore .NewWeighted (s .scheduler .singleResourceMaxConcurrency ))
164168 resourceSem := resourceSemVal .(* semaphore.Weighted )
@@ -183,33 +187,34 @@ func (s *syncClient) resolveResourcesDfs(ctx context.Context, table *schema.Tabl
183187 defer resourceSem .Release (1 )
184188 defer s .scheduler .resourceSem .Release (1 )
185189 defer wg .Done ()
186- //nolint:all
187- resolvedResource := resolvers .ResolveSingleResource (ctx , s .logger , s .metrics , table , client , parent , resourcesSlice [i ], s .scheduler .caser )
188- if resolvedResource == nil {
190+ resolvedResources := resolvers .ResolveResourcesChunk (ctx , s .logger , s .metrics , table , client , parent , chunks [i ], s .scheduler .caser )
191+ if len (resolvedResources ) == 0 {
189192 return
190193 }
191194
192- if err := resolvedResource .CalculateCQID (s .deterministicCQID ); err != nil {
193- s .logger .Error ().Err (err ).Str ("table" , table .Name ).Str ("client" , client .ID ()).Msg ("resource resolver finished with primary key calculation error" )
194- s .metrics .AddErrors (ctx , 1 , selector )
195- return
196- }
197- if err := resolvedResource .StoreCQClientID (client .ID ()); err != nil {
198- s .logger .Error ().Err (err ).Str ("table" , table .Name ).Str ("client" , client .ID ()).Msg ("failed to store _cq_client_id" )
199- }
200- if err := resolvedResource .Validate (); err != nil {
201- switch err .(type ) {
202- case * schema.PKError :
203- s .logger .Error ().Err (err ).Str ("table" , table .Name ).Str ("client" , client .ID ()).Msg ("resource resolver finished with validation error" )
195+ for _ , resolvedResource := range resolvedResources {
196+ if err := resolvedResource .CalculateCQID (s .deterministicCQID ); err != nil {
197+ s .logger .Error ().Err (err ).Str ("table" , table .Name ).Str ("client" , client .ID ()).Msg ("resource resolver finished with primary key calculation error" )
204198 s .metrics .AddErrors (ctx , 1 , selector )
205199 return
206- case * schema.PKComponentError :
207- s .logger .Warn ().Err (err ).Str ("table" , table .Name ).Str ("client" , client .ID ()).Msg ("resource resolver finished with validation warning" )
208200 }
209- }
210- select {
211- case resourcesChan <- resolvedResource :
212- case <- ctx .Done ():
201+ if err := resolvedResource .StoreCQClientID (client .ID ()); err != nil {
202+ s .logger .Error ().Err (err ).Str ("table" , table .Name ).Str ("client" , client .ID ()).Msg ("failed to store _cq_client_id" )
203+ }
204+ if err := resolvedResource .Validate (); err != nil {
205+ switch err .(type ) {
206+ case * schema.PKError :
207+ s .logger .Error ().Err (err ).Str ("table" , table .Name ).Str ("client" , client .ID ()).Msg ("resource resolver finished with validation error" )
208+ s .metrics .AddErrors (ctx , 1 , selector )
209+ return
210+ case * schema.PKComponentError :
211+ s .logger .Warn ().Err (err ).Str ("table" , table .Name ).Str ("client" , client .ID ()).Msg ("resource resolver finished with validation warning" )
212+ }
213+ }
214+ select {
215+ case resourcesChan <- resolvedResource :
216+ case <- ctx .Done ():
217+ }
213218 }
214219 }()
215220 }
0 commit comments