Skip to content

Commit d6d5d42

Browse files
authored
feat: split tpa requests (#429)
Signed-off-by: Ruben Romero Montes <[email protected]>
1 parent 73174a9 commit d6d5d42

File tree

16 files changed

+149
-85
lines changed

16 files changed

+149
-85
lines changed

src/main/java/com/redhat/exhort/integration/Constants.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ private Constants() {}
5858
public static final MediaType MULTIPART_MIXED_TYPE = new MediaType("multipart", "mixed");
5959
public static final String MULTIPART_MIXED = MULTIPART_MIXED_TYPE.toString();
6060
public static final String SPDX_MEDIATYPE_JSON = "application/vnd.spdx+json";
61+
public static final String CYCLONEDX_MEDIATYPE_JSON = "application/vnd.cyclonedx+json";
6162

6263
public static final String SNYK_PROVIDER = "snyk";
6364
public static final String OSS_INDEX_PROVIDER = "oss-index";

src/main/java/com/redhat/exhort/integration/backend/sbom/SbomParserFactory.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,6 @@
1818

1919
package com.redhat.exhort.integration.backend.sbom;
2020

21-
import org.cyclonedx.CycloneDxMediaType;
22-
2321
import com.redhat.exhort.integration.Constants;
2422
import com.redhat.exhort.integration.backend.sbom.cyclonedx.CycloneDxParser;
2523
import com.redhat.exhort.integration.backend.sbom.spdx.SpdxParser;
@@ -31,7 +29,7 @@ public class SbomParserFactory {
3129

3230
public static final SbomParser newInstance(String mediaType) {
3331
return switch (mediaType) {
34-
case CycloneDxMediaType.APPLICATION_CYCLONEDX_JSON -> new CycloneDxParser();
32+
case Constants.CYCLONEDX_MEDIATYPE_JSON -> new CycloneDxParser();
3533
case Constants.SPDX_MEDIATYPE_JSON -> new SpdxParser();
3634
default -> throw new ClientErrorException(
3735
"Unsupported Content-Type header: " + mediaType, Response.Status.UNSUPPORTED_MEDIA_TYPE);

src/main/java/com/redhat/exhort/integration/providers/ossindex/OssIndexIntegration.java

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@
2929
import com.redhat.exhort.integration.Constants;
3030
import com.redhat.exhort.integration.providers.VulnerabilityProvider;
3131
import com.redhat.exhort.model.DependencyTree;
32-
import com.redhat.exhort.monitoring.MonitoringProcessor;
3332

3433
import jakarta.enterprise.context.ApplicationScoped;
3534
import jakarta.inject.Inject;
@@ -47,8 +46,6 @@ public class OssIndexIntegration extends EndpointRouteBuilder {
4746

4847
@Inject OssIndexResponseHandler responseHandler;
4948

50-
@Inject MonitoringProcessor monitoringProcessor;
51-
5249
@Override
5350
public void configure() {
5451

src/main/java/com/redhat/exhort/integration/providers/ossindex/OssIndexRequestBuilder.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ public class OssIndexRequestBuilder {
4040

4141
private static final int BULK_SIZE = 128;
4242

43-
private ObjectMapper mapper = ObjectMapperProducer.newInstance();
43+
private final ObjectMapper mapper = ObjectMapperProducer.newInstance();
4444

4545
public List<List<PackageRef>> split(DependencyTree tree) {
4646
List<List<PackageRef>> bulks = new ArrayList<>();

src/main/java/com/redhat/exhort/integration/providers/ossindex/OssIndexResponseHandler.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@ public class OssIndexResponseHandler extends ProviderResponseHandler {
5656

5757
@Inject ObjectMapper mapper;
5858

59+
@Override
5960
public ProviderResponse responseToIssues(
6061
@Body byte[] response,
6162
@ExchangeProperty(Constants.PROVIDER_PRIVATE_DATA_PROPERTY) String privateProviders,

src/main/java/com/redhat/exhort/integration/providers/tpa/TpaIntegration.java

Lines changed: 22 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222

2323
import org.apache.camel.Exchange;
2424
import org.apache.camel.Message;
25+
import org.apache.camel.builder.AggregationStrategies;
2526
import org.apache.camel.builder.endpoint.EndpointRouteBuilder;
2627
import org.eclipse.microprofile.config.inject.ConfigProperty;
2728

@@ -65,26 +66,27 @@ public void configure() throws Exception {
6566
.transform().method(responseHandler, "buildReport")
6667
.endChoice()
6768
.otherwise()
68-
.to(direct("tpaRequest"))
69-
.transform().method(responseHandler, "buildReport")
70-
.endChoice();
71-
72-
from(direct("tpaRequest"))
73-
.routeId("tpaRequest")
74-
.circuitBreaker()
75-
.faultToleranceConfiguration()
76-
.timeoutEnabled(true)
77-
.timeoutDuration(timeout)
78-
.end()
79-
.transform(method(requestBuilder, "buildRequest"))
80-
.process(this::processRequest)
81-
.process(requestBuilder::addAuthentication)
82-
.to(http("{{api.tpa.host}}"))
83-
.transform().method(responseHandler, "responseToIssues")
84-
.endCircuitBreaker()
85-
.onFallback()
86-
.process(responseHandler::processResponseError)
87-
.end();
69+
.to(direct("tpaSplitRequest"))
70+
.transform().method(responseHandler, "buildReport");
71+
72+
from(direct("tpaSplitRequest"))
73+
.routeId("tpaSplitRequest")
74+
.transform(method(TpaRequestBuilder.class, "split"))
75+
.split(body(), AggregationStrategies.beanAllowNull(responseHandler, "aggregateSplit"))
76+
.parallelProcessing()
77+
.transform().method(requestBuilder, "buildRequest")
78+
.process(this::processRequest)
79+
.process(requestBuilder::addAuthentication)
80+
.circuitBreaker()
81+
.faultToleranceConfiguration()
82+
.timeoutEnabled(true)
83+
.timeoutDuration(timeout)
84+
.end()
85+
.to(http("{{api.tpa.host}}"))
86+
.transform(method(responseHandler, "responseToIssues"))
87+
.onFallback()
88+
.process(responseHandler::processResponseError);
89+
8890

8991
from(direct("tpaHealthCheck"))
9092
.routeId("tpaHealthCheck")

src/main/java/com/redhat/exhort/integration/providers/tpa/TpaRequestBuilder.java

Lines changed: 23 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818

1919
package com.redhat.exhort.integration.providers.tpa;
2020

21+
import java.util.ArrayList;
22+
import java.util.List;
2123
import java.util.Optional;
2224

2325
import org.apache.camel.Exchange;
@@ -37,19 +39,37 @@
3739
@RegisterForReflection
3840
public class TpaRequestBuilder {
3941

42+
private static final int BULK_SIZE = 128;
43+
4044
@ConfigProperty(name = "api.tpa.token")
4145
Optional<String> defaultToken;
4246

43-
private ObjectMapper mapper = ObjectMapperProducer.newInstance();
47+
private final ObjectMapper mapper = ObjectMapperProducer.newInstance();
4448

45-
public String buildRequest(DependencyTree tree) throws JsonProcessingException {
49+
public String buildRequest(List<String> refs) throws JsonProcessingException {
4650
var request = mapper.createObjectNode();
4751
var purls = mapper.createArrayNode();
48-
tree.getAll().forEach(dep -> purls.add(dep.ref()));
52+
refs.forEach(dep -> purls.add(dep));
4953
request.set("purls", purls);
5054
return mapper.writeValueAsString(request);
5155
}
5256

57+
public List<List<String>> split(DependencyTree tree) {
58+
List<List<String>> bulks = new ArrayList<>();
59+
List<String> bulk = new ArrayList<>();
60+
for (var pkg : tree.getAll()) {
61+
if (bulk.size() == BULK_SIZE) {
62+
bulks.add(bulk);
63+
bulk = new ArrayList<>();
64+
}
65+
bulk.add(pkg.ref());
66+
}
67+
if (!bulk.isEmpty()) {
68+
bulks.add(bulk);
69+
}
70+
return bulks;
71+
}
72+
5373
public void addAuthentication(Exchange exchange) {
5474
var message = exchange.getMessage();
5575
var userToken = message.getHeader(Constants.TPA_TOKEN_HEADER, String.class);

src/main/java/com/redhat/exhort/integration/trustedcontent/TcResponseAggregation.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
package com.redhat.exhort.integration.trustedcontent;
2020

21+
import java.util.Collections;
2122
import java.util.HashMap;
2223
import java.util.Map;
2324
import java.util.concurrent.ExecutionException;
@@ -27,6 +28,7 @@
2728
import org.apache.camel.ExchangeProperty;
2829

2930
import com.redhat.exhort.api.PackageRef;
31+
import com.redhat.exhort.api.v4.ProviderStatus;
3032
import com.redhat.exhort.integration.Constants;
3133
import com.redhat.exhort.integration.cache.CacheService;
3234
import com.redhat.exhort.model.trustedcontent.IndexedRecommendation;
@@ -37,6 +39,7 @@
3739

3840
import jakarta.inject.Inject;
3941
import jakarta.inject.Singleton;
42+
import jakarta.ws.rs.core.Response.Status;
4043

4144
@Singleton
4245
@RegisterForReflection
@@ -57,6 +60,16 @@ public TrustedContentResponse aggregateCachedResponse(
5760
Exchange exchange)
5861
throws ExecutionException {
5962
var externalResponse = exchange.getIn().getBody(TrustedContentResponse.class);
63+
if (externalResponse == null) {
64+
externalResponse =
65+
new TrustedContentResponse(
66+
Collections.emptyMap(),
67+
new ProviderStatus()
68+
.name(Constants.TRUSTED_CONTENT_PROVIDER)
69+
.code(Status.OK.getStatusCode())
70+
.message(Status.OK.getReasonPhrase())
71+
.ok(Boolean.TRUE));
72+
}
6073
cacheService.cacheRecommendations(externalResponse, cached.miss());
6174
Map<PackageRef, IndexedRecommendation> recommendations =
6275
new HashMap<>(externalResponse.recommendations());

src/main/java/com/redhat/exhort/integration/trustedcontent/TcResponseHandler.java

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -62,11 +62,22 @@ public class TcResponseHandler extends ProviderResponseHandler {
6262

6363
@Inject UBIRecommendation ubiRecommendation;
6464

65-
public TrustedContentResponse parseResponse(
65+
public TrustedContentResponse mergeSplitRecommendations(
66+
TrustedContentResponse oldResponse, TrustedContentResponse newResponse) {
67+
if (oldResponse == null) {
68+
return newResponse;
69+
}
70+
if (!oldResponse.status().getOk()) {
71+
return oldResponse;
72+
}
73+
oldResponse.recommendations().putAll(newResponse.recommendations());
74+
return oldResponse;
75+
}
76+
77+
public TrustedContentResponse processRecommendations(
6678
@Body byte[] tcResponse, @ExchangeProperty(Constants.SBOM_ID_PROPERTY) String sbomId)
6779
throws IOException {
6880
var recommendations = mapper.readValue(tcResponse, Recommendations.class);
69-
7081
var mergedRecommendations = mergeRecommendations(recommendations);
7182
mergedRecommendations.putAll(getUBIRecommendation(sbomId));
7283

@@ -82,6 +93,9 @@ public TrustedContentResponse parseResponse(
8293
private Map<PackageRef, IndexedRecommendation> mergeRecommendations(
8394
Recommendations recommendations) {
8495
Map<PackageRef, IndexedRecommendation> result = new HashMap<>();
96+
if (recommendations == null) {
97+
return result;
98+
}
8599
recommendations.getMatchings().entrySet().stream()
86100
.forEach(
87101
e -> result.put(new PackageRef(e.getKey()), aggregateRecommendations(e.getValue())));

src/main/java/com/redhat/exhort/integration/trustedcontent/TrustedContentIntegration.java

Lines changed: 15 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
package com.redhat.exhort.integration.trustedcontent;
2020

2121
import org.apache.camel.Exchange;
22+
import org.apache.camel.builder.AggregationStrategies;
2223
import org.apache.camel.builder.endpoint.EndpointRouteBuilder;
2324
import org.eclipse.microprofile.config.inject.ConfigProperty;
2425

@@ -55,19 +56,20 @@ public void configure() {
5556

5657
from(direct("getRemoteTrustedContent"))
5758
.routeId("getRemoteTrustedContent")
58-
.circuitBreaker()
59-
.faultToleranceConfiguration()
60-
.timeoutEnabled(true)
61-
.timeoutDuration(timeout)
62-
.end()
63-
.transform().method(requestBuilder, "buildRequest")
64-
.process(this::handleHeaders)
65-
.to(vertxHttp("{{api.trustedcontent.host}}"))
66-
.transform(method(TcResponseHandler.class, "parseResponse"))
67-
.endCircuitBreaker()
68-
.onFallback()
69-
.process(responseHandler::processResponseError);
70-
59+
.transform(method(requestBuilder, "split"))
60+
.split(body(), AggregationStrategies.bean(TcResponseHandler.class, "mergeSplitRecommendations"))
61+
.parallelProcessing()
62+
.transform().method(requestBuilder, "buildRequest")
63+
.process(this::handleHeaders)
64+
.circuitBreaker()
65+
.faultToleranceConfiguration()
66+
.timeoutEnabled(true)
67+
.timeoutDuration(timeout)
68+
.end()
69+
.to(vertxHttp("{{api.trustedcontent.host}}"))
70+
.transform(method(TcResponseHandler.class, "processRecommendations"))
71+
.onFallback()
72+
.process(responseHandler::processResponseError);
7173
// fmt:on
7274
}
7375

0 commit comments

Comments
 (0)