Skip to content

Commit 0402180

Browse files
provide more granular way to manage embedding cache
1 parent 557c19c commit 0402180

File tree

14 files changed

+658
-100
lines changed

14 files changed

+658
-100
lines changed
Lines changed: 73 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,25 @@
11
package com.exadel.frs.core.trainservice.cache;
22

3+
import com.exadel.frs.commonservice.entity.Embedding;
4+
import com.exadel.frs.commonservice.projection.EmbeddingProjection;
35
import com.exadel.frs.core.trainservice.dto.CacheActionDto;
6+
import com.exadel.frs.core.trainservice.dto.CacheActionDto.AddEmbeddings;
7+
import com.exadel.frs.core.trainservice.dto.CacheActionDto.CacheAction;
8+
import com.exadel.frs.core.trainservice.dto.CacheActionDto.RemoveEmbeddings;
9+
import com.exadel.frs.core.trainservice.dto.CacheActionDto.RemoveSubjects;
10+
import com.exadel.frs.core.trainservice.dto.CacheActionDto.RenameSubjects;
411
import com.exadel.frs.core.trainservice.service.EmbeddingService;
512
import com.exadel.frs.core.trainservice.service.NotificationSenderService;
613
import com.google.common.cache.Cache;
714
import com.google.common.cache.CacheBuilder;
8-
import lombok.RequiredArgsConstructor;
9-
import lombok.extern.slf4j.Slf4j;
10-
import org.springframework.stereotype.Component;
11-
15+
import java.util.List;
16+
import java.util.Map;
1217
import java.util.Optional;
1318
import java.util.concurrent.TimeUnit;
1419
import java.util.function.Consumer;
20+
import lombok.RequiredArgsConstructor;
21+
import lombok.extern.slf4j.Slf4j;
22+
import org.springframework.stereotype.Component;
1523

1624
import static com.exadel.frs.core.trainservice.system.global.Constants.SERVER_UUID;
1725

@@ -34,34 +42,81 @@ public class EmbeddingCacheProvider {
3442
.build();
3543

3644
public EmbeddingCollection getOrLoad(final String apiKey) {
37-
3845
var result = cache.getIfPresent(apiKey);
39-
4046
if (result == null) {
4147
result = embeddingService.doWithEnhancedEmbeddingProjectionStream(apiKey, EmbeddingCollection::from);
42-
4348
cache.put(apiKey, result);
44-
45-
notifyCacheEvent("UPDATE", apiKey);
4649
}
47-
4850
return result;
4951
}
5052

51-
public void ifPresent(String apiKey, Consumer<EmbeddingCollection> consumer) {
53+
public void removeEmbedding(String apiKey, EmbeddingProjection embedding) {
5254
Optional.ofNullable(cache.getIfPresent(apiKey))
53-
.ifPresent(consumer);
55+
.ifPresent(
56+
ec -> {
57+
ec.removeEmbedding(embedding);
58+
notifyCacheEvent(
59+
CacheAction.REMOVE_EMBEDDINGS,
60+
apiKey,
61+
new RemoveEmbeddings(Map.of(embedding.subjectName(), List.of(embedding.embeddingId())))
62+
);
63+
}
64+
);
65+
}
5466

55-
cache.getIfPresent(apiKey);
56-
notifyCacheEvent("UPDATE", apiKey);
67+
public void updateSubjectName(String apiKey, String oldSubjectName, String newSubjectName) {
68+
Optional.ofNullable(cache.getIfPresent(apiKey))
69+
.ifPresent(
70+
ec -> {
71+
ec.updateSubjectName(oldSubjectName, newSubjectName);
72+
notifyCacheEvent(CacheAction.RENAME_SUBJECTS, apiKey, new RenameSubjects(Map.of(oldSubjectName, newSubjectName)));
73+
}
74+
);
75+
}
76+
77+
public void removeBySubjectName(String apiKey, String subjectName) {
78+
Optional.ofNullable(cache.getIfPresent(apiKey))
79+
.ifPresent(
80+
ec -> {
81+
ec.removeEmbeddingsBySubjectName(subjectName);
82+
notifyCacheEvent(CacheAction.REMOVE_SUBJECTS, apiKey, new RemoveSubjects(List.of(subjectName)));
83+
}
84+
);
85+
}
86+
87+
88+
public void addEmbedding(String apiKey, Embedding embedding) {
89+
Optional.ofNullable(cache.getIfPresent(apiKey))
90+
.ifPresent(
91+
ec -> {
92+
ec.addEmbedding(embedding);
93+
notifyCacheEvent(CacheAction.ADD_EMBEDDINGS, apiKey, new AddEmbeddings(List.of(embedding.getId())));
94+
}
95+
);
96+
}
97+
98+
/**
99+
* Method can be used to make changes in cache without sending notification.
100+
* Use it carefully, because changes you do will not be visible for other compreface-api instances
101+
*
102+
* @param apiKey domain
103+
* @param action what to do with {@link EmbeddingCollection}
104+
*/
105+
public void expose(String apiKey, Consumer<EmbeddingCollection> action) {
106+
Optional.ofNullable(cache.getIfPresent(apiKey))
107+
.ifPresent(action);
57108
}
58109

59110
public void invalidate(final String apiKey) {
60111
cache.invalidate(apiKey);
61-
notifyCacheEvent("DELETE", apiKey);
112+
notifyCacheEvent(CacheAction.INVALIDATE, apiKey, null);
62113
}
63114

64-
115+
/**
116+
* @deprecated
117+
* See {@link com.exadel.frs.core.trainservice.service.NotificationHandler#handleUpdate(CacheActionDto)}
118+
*/
119+
@Deprecated(forRemoval = true)
65120
public void receivePutOnCache(String apiKey) {
66121
var result = embeddingService.doWithEnhancedEmbeddingProjectionStream(apiKey, EmbeddingCollection::from);
67122
cache.put(apiKey, result);
@@ -71,8 +126,8 @@ public void receiveInvalidateCache(final String apiKey) {
71126
cache.invalidate(apiKey);
72127
}
73128

74-
private void notifyCacheEvent(String event, String apiKey) {
75-
CacheActionDto cacheActionDto = new CacheActionDto(event, apiKey, SERVER_UUID);
129+
private <T> void notifyCacheEvent(CacheAction event, String apiKey, T action) {
130+
CacheActionDto<T> cacheActionDto = new CacheActionDto<>(event, apiKey, SERVER_UUID, action);
76131
notificationSenderService.notifyCacheChange(cacheActionDto);
77132
}
78133
}

java/api/src/main/java/com/exadel/frs/core/trainservice/dao/SubjectDao.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,10 @@ public Collection<String> getSubjectNames(final String apiKey) {
3232
return subjectRepository.getSubjectNames(apiKey);
3333
}
3434

35+
public List<Embedding> loadAllEmbeddingsByIds(Iterable<UUID> ids) {
36+
return embeddingRepository.findAllById(ids);
37+
}
38+
3539
@Transactional
3640
public Subject deleteSubjectByName(final String apiKey, final String subjectName) {
3741
final Optional<Subject> subjectOptional = subjectRepository.findByApiKeyAndSubjectNameIgnoreCase(apiKey, subjectName);
Lines changed: 54 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,62 @@
11
package com.exadel.frs.core.trainservice.dto;
22

3+
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
34
import com.fasterxml.jackson.annotation.JsonProperty;
4-
import lombok.AllArgsConstructor;
5-
import lombok.Data;
6-
import lombok.NoArgsConstructor;
5+
import java.util.List;
6+
import java.util.Map;
7+
import java.util.UUID;
78

8-
@Data
9-
@AllArgsConstructor
10-
@NoArgsConstructor
11-
public class CacheActionDto {
9+
@JsonIgnoreProperties(ignoreUnknown = true) // here and below "ignoreUnknown = true" for backward compatibility
10+
public record CacheActionDto<T>(
11+
CacheAction cacheAction,
12+
String apiKey,
13+
@JsonProperty("uuid")
14+
UUID serverUUID,
15+
T payload
16+
) {
17+
public <S> CacheActionDto<S> withPayload(S payload) {
18+
return new CacheActionDto<>(
19+
cacheAction,
20+
apiKey,
21+
serverUUID,
22+
payload
23+
);
24+
}
1225

13-
@JsonProperty("cacheAction")
14-
private String cacheAction;
26+
public enum CacheAction {
27+
// UPDATE and DELETE stays here to support rolling update
28+
@Deprecated
29+
UPDATE,
30+
@Deprecated
31+
DELETE,
32+
REMOVE_EMBEDDINGS,
33+
REMOVE_SUBJECTS,
34+
ADD_EMBEDDINGS,
35+
RENAME_SUBJECTS,
36+
INVALIDATE
37+
}
1538

16-
@JsonProperty("apiKey")
17-
private String apiKey;
39+
@JsonIgnoreProperties(ignoreUnknown = true)
40+
public record RemoveEmbeddings(
41+
Map<String, List<UUID>> embeddings
42+
) {
43+
}
1844

19-
@JsonProperty("uuid")
20-
private String serverUUID;
45+
@JsonIgnoreProperties(ignoreUnknown = true)
46+
public record RemoveSubjects(
47+
List<String> subjects
48+
) {
49+
}
50+
51+
@JsonIgnoreProperties(ignoreUnknown = true)
52+
public record AddEmbeddings(
53+
List<UUID> embeddings
54+
) {
55+
}
56+
57+
@JsonIgnoreProperties(ignoreUnknown = true)
58+
public record RenameSubjects(
59+
Map<String, String> subjectsNamesMapping
60+
) {
61+
}
2162
}

java/api/src/main/java/com/exadel/frs/core/trainservice/service/EmbeddingService.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@
99
import com.exadel.frs.core.trainservice.system.global.Constants;
1010
import java.util.stream.Stream;
1111
import lombok.RequiredArgsConstructor;
12-
import lombok.val;
1312
import org.springframework.data.domain.Page;
1413
import org.springframework.data.domain.Pageable;
1514
import org.springframework.stereotype.Service;
@@ -32,9 +31,9 @@ public int updateEmbedding(UUID embeddingId, double[] embedding, String calculat
3231
return embeddingRepository.updateEmbedding(embeddingId, embedding, calculator);
3332
}
3433

35-
@Transactional
34+
@org.springframework.transaction.annotation.Transactional(readOnly = true)
3635
public <T> T doWithEnhancedEmbeddingProjectionStream(String apiKey, Function<Stream<EnhancedEmbeddingProjection>, T> func) {
37-
try (val stream = embeddingRepository.findBySubjectApiKey(apiKey)) {
36+
try (var stream = embeddingRepository.findBySubjectApiKey(apiKey)) {
3837
return func.apply(stream);
3938
}
4039
}
Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,108 @@
1+
package com.exadel.frs.core.trainservice.service;
2+
3+
import com.exadel.frs.commonservice.projection.EmbeddingProjection;
4+
import com.exadel.frs.core.trainservice.cache.EmbeddingCacheProvider;
5+
import com.exadel.frs.core.trainservice.dto.CacheActionDto;
6+
import com.exadel.frs.core.trainservice.dto.CacheActionDto.AddEmbeddings;
7+
import com.exadel.frs.core.trainservice.dto.CacheActionDto.RemoveEmbeddings;
8+
import com.exadel.frs.core.trainservice.dto.CacheActionDto.RemoveSubjects;
9+
import com.exadel.frs.core.trainservice.dto.CacheActionDto.RenameSubjects;
10+
import java.util.Objects;
11+
import lombok.RequiredArgsConstructor;
12+
import lombok.extern.slf4j.Slf4j;
13+
import org.apache.commons.lang3.StringUtils;
14+
import org.springframework.stereotype.Service;
15+
16+
@Slf4j
17+
@Service
18+
@RequiredArgsConstructor
19+
public class NotificationHandler {
20+
private final EmbeddingCacheProvider cacheProvider;
21+
private final SubjectService subjectService;
22+
23+
public void removeEmbeddings(CacheActionDto<RemoveEmbeddings> action) {
24+
action.payload().embeddings()
25+
.entrySet()
26+
.stream()
27+
.filter(e -> StringUtils.isNotBlank(e.getKey()))
28+
.filter(e -> Objects.nonNull(e.getValue()))
29+
.filter(e -> !e.getValue().isEmpty())
30+
.flatMap(e -> e.getValue().stream().filter(Objects::nonNull).map(id -> new EmbeddingProjection(id, e.getKey())))
31+
.forEach(
32+
em -> cacheProvider.expose(
33+
action.apiKey(),
34+
c -> c.removeEmbedding(em)
35+
)
36+
);
37+
}
38+
39+
public void removeSubjects(CacheActionDto<RemoveSubjects> action) {
40+
action.payload().subjects()
41+
.stream()
42+
.filter(StringUtils::isNotBlank)
43+
.forEach(
44+
s -> cacheProvider.expose(
45+
action.apiKey(),
46+
c -> c.removeEmbeddingsBySubjectName(s)
47+
)
48+
);
49+
}
50+
51+
52+
public void addEmbeddings(CacheActionDto<AddEmbeddings> action) {
53+
var filtered = action.payload().embeddings()
54+
.stream()
55+
.filter(Objects::nonNull)
56+
.toList();
57+
subjectService.loadEmbeddingsById(filtered)
58+
.forEach(
59+
em -> cacheProvider.expose(
60+
action.apiKey(),
61+
c -> c.addEmbedding(em)
62+
)
63+
);
64+
}
65+
66+
public void renameSubjects(CacheActionDto<RenameSubjects> action) {
67+
action.payload().subjectsNamesMapping()
68+
.entrySet()
69+
.stream()
70+
.filter(e -> StringUtils.isNotBlank(e.getKey()))
71+
.filter(e -> StringUtils.isNotBlank(e.getValue()))
72+
.forEach(
73+
e -> cacheProvider.expose(
74+
action.apiKey(),
75+
c -> c.updateSubjectName(e.getKey(), e.getValue())
76+
)
77+
);
78+
}
79+
80+
public <T> void invalidate(CacheActionDto<T> action) {
81+
cacheProvider.expose(
82+
action.apiKey(),
83+
e -> cacheProvider.receiveInvalidateCache(action.apiKey())
84+
);
85+
}
86+
87+
/**
88+
* @param action cacheAction
89+
* @deprecated in favour more granular cache managing.
90+
* See {@link CacheActionDto}.
91+
* Stays here to support rolling update
92+
*/
93+
@Deprecated(forRemoval = true)
94+
public <T> void handleDelete(CacheActionDto<T> action) {
95+
cacheProvider.receiveInvalidateCache(action.apiKey());
96+
}
97+
98+
/**
99+
* @param action cacheAction
100+
* @deprecated in favour more granular cache managing.
101+
* See {@link CacheActionDto}.
102+
* Stays here to support rolling update
103+
*/
104+
@Deprecated(forRemoval = true)
105+
public <T> void handleUpdate(CacheActionDto<T> action) {
106+
cacheProvider.receivePutOnCache(action.apiKey());
107+
}
108+
}

0 commit comments

Comments
 (0)