Skip to content

Commit 07a9ef1

Browse files
authored
Refactor search isolate control + minimal test. (#8954)
1 parent ba49b79 commit 07a9ef1

File tree

9 files changed

+185
-54
lines changed

9 files changed

+185
-54
lines changed

app/bin/tools/search_benchmark.dart

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ Future<void> main(List<String> args) async {
1616
'max memory: ${ProcessInfo.maxRss ~/ 1024} KiB',
1717
);
1818
// Assumes that the first argument is a search snapshot file.
19-
final index = await loadInMemoryPackageIndexFromFile(args.first);
19+
final index = await loadInMemoryPackageIndexFromUrl(args.first);
2020
print(
2121
'Loaded. Current memory: ${ProcessInfo.currentRss ~/ 1024} KiB, '
2222
'max memory: ${ProcessInfo.maxRss ~/ 1024} KiB',

app/lib/fake/server/fake_search_service.dart

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,13 +9,16 @@ import 'package:fake_gcloud/mem_datastore.dart';
99
import 'package:fake_gcloud/mem_storage.dart';
1010
import 'package:gcloud/service_scope.dart' as ss;
1111
import 'package:logging/logging.dart';
12+
import 'package:meta/meta.dart';
1213
import 'package:pub_dev/fake/backend/fake_download_counts.dart';
1314
import 'package:pub_dev/search/handlers.dart';
15+
import 'package:pub_dev/search/models.dart';
1416
import 'package:pub_dev/search/sdk_mem_index.dart';
1517
import 'package:pub_dev/search/updater.dart';
1618
import 'package:pub_dev/service/services.dart';
1719
import 'package:pub_dev/shared/configuration.dart';
1820
import 'package:pub_dev/shared/handler_helpers.dart';
21+
import 'package:pub_dev/shared/handlers.dart';
1922
import 'package:pub_dev/task/cloudcompute/fakecloudcompute.dart';
2023
import 'package:shelf/shelf.dart' as shelf;
2124
import 'package:shelf/shelf_io.dart';
@@ -71,3 +74,17 @@ class FakeSearchService {
7174
_logger.info('closed');
7275
}
7376
}
77+
78+
@visibleForTesting
79+
Future<IOServer> setupLocalSnapshotServer() async {
80+
final snapshotServer = await IOServer.bind('localhost', 0);
81+
serveRequests(snapshotServer.server, (request) async {
82+
await generateFakeDownloadCountsInDatastore();
83+
final snapshot = SearchSnapshot.fromDocuments(
84+
// ignore: invalid_use_of_visible_for_testing_member
85+
await indexUpdater.loadAllPackageDocuments(),
86+
);
87+
return jsonResponse(snapshot.toJson());
88+
});
89+
return snapshotServer;
90+
}

app/lib/search/models.dart

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,14 @@ class SearchSnapshot {
1919

2020
factory SearchSnapshot() => SearchSnapshot._(clock.now().toUtc(), {});
2121

22+
factory SearchSnapshot.fromDocuments(Iterable<PackageDocument> documents) {
23+
final snapshot = SearchSnapshot();
24+
for (final doc in documents) {
25+
snapshot.add(doc);
26+
}
27+
return snapshot;
28+
}
29+
2230
factory SearchSnapshot.fromJson(Map<String, dynamic> json) =>
2331
_$SearchSnapshotFromJson(json);
2432

app/lib/search/updater.dart

Lines changed: 23 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import 'dart:async';
66
import 'dart:convert';
77
import 'dart:io';
88

9+
import 'package:_pub_shared/utils/http.dart';
910
import 'package:gcloud/service_scope.dart' as ss;
1011
import 'package:logging/logging.dart';
1112
import 'package:meta/meta.dart';
@@ -28,14 +29,20 @@ void registerIndexUpdater(IndexUpdater updater) =>
2829
/// The active index updater.
2930
IndexUpdater get indexUpdater => ss.lookup(#_indexUpdater) as IndexUpdater;
3031

31-
/// Loads a local search snapshot file and builds an in-memory package index from it.
32-
Future<InMemoryPackageIndex> loadInMemoryPackageIndexFromFile(
33-
String path,
34-
) async {
35-
final file = File(path);
36-
final content =
37-
json.decode(utf8.decode(gzip.decode(await file.readAsBytes())))
38-
as Map<String, Object?>;
32+
/// Loads a search snapshot from an URL or from a local file and builds
33+
/// an in-memory package index from it.
34+
Future<InMemoryPackageIndex> loadInMemoryPackageIndexFromUrl(String url) async {
35+
late String textContent;
36+
if (url.startsWith('http://') || url.startsWith('https://')) {
37+
textContent = await httpGetWithRetry(
38+
Uri.parse(url),
39+
responseFn: (rs) => rs.body,
40+
);
41+
} else {
42+
final file = File(url);
43+
textContent = utf8.decode(gzip.decode(await file.readAsBytes()));
44+
}
45+
final content = json.decode(textContent) as Map<String, Object?>;
3946
final snapshot = SearchSnapshot.fromJson(content);
4047
return InMemoryPackageIndex(
4148
documents: snapshot.documents!.values.where(
@@ -83,12 +90,19 @@ class IndexUpdater {
8390
/// complete document for the index.
8491
@visibleForTesting
8592
Future<void> updateAllPackages() async {
93+
final documents = await loadAllPackageDocuments();
94+
updatePackageIndex(InMemoryPackageIndex(documents: documents));
95+
}
96+
97+
/// Loads all packages as PackageDocuments.
98+
@visibleForTesting
99+
Future<List<PackageDocument>> loadAllPackageDocuments() async {
86100
final documents = <PackageDocument>[];
87101
await for (final p in _db.query<Package>().run()) {
88102
final doc = await searchBackend.loadDocument(p.name!);
89103
documents.add(doc);
90104
}
91-
updatePackageIndex(InMemoryPackageIndex(documents: documents));
105+
return documents;
92106
}
93107

94108
/// Returns whether the snapshot was initialized and loaded properly.

app/lib/service/entrypoint/search.dart

Lines changed: 71 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import 'package:logging/logging.dart';
1111
import 'package:pub_dev/search/result_combiner.dart';
1212
import 'package:pub_dev/service/entrypoint/sdk_isolate_index.dart';
1313
import 'package:pub_dev/service/entrypoint/search_index.dart';
14+
import 'package:shelf/shelf.dart';
1415

1516
import '../../search/backend.dart';
1617
import '../../search/handlers.dart';
@@ -38,47 +39,77 @@ class SearchCommand extends Command {
3839

3940
envConfig.checkServiceEnvironment(name);
4041
await withServices(() async {
41-
final packageIsolate = await startSearchIsolate(logger: _logger);
42-
registerScopeExitCallback(packageIsolate.close);
43-
44-
final sdkIsolate = await startQueryIsolate(
45-
logger: _logger,
46-
kind: 'sdk',
47-
spawnUri: Uri.parse(
48-
'package:pub_dev/service/entrypoint/sdk_isolate_index.dart',
49-
),
50-
);
51-
registerScopeExitCallback(sdkIsolate.close);
52-
53-
registerSearchIndex(
54-
SearchResultCombiner(
55-
primaryIndex: LatencyAwareSearchIndex(
56-
IsolateSearchIndex(packageIsolate),
57-
),
58-
sdkIndex: SdkIsolateIndex(sdkIsolate),
59-
),
42+
await runSearchInstanceController(
43+
port: 8080,
44+
renewPackageIndex: _createRenewStream(delayDrift: delayDrift),
6045
);
61-
62-
void scheduleRenew() {
63-
scheduleMicrotask(() async {
64-
// 12 - 17 minutes delay
65-
final delay = Duration(
66-
minutes: 12,
67-
seconds: delayDrift + _random.nextInt(240),
68-
);
69-
await Future.delayed(delay);
70-
71-
// create a new index and handover with a 2-minute maximum wait
72-
await packageIsolate.renew(count: 1, wait: Duration(minutes: 2));
73-
74-
// schedule the renewal again
75-
scheduleRenew();
76-
});
77-
}
78-
79-
scheduleRenew();
80-
81-
await runHandler(_logger, searchServiceHandler);
8246
});
8347
}
8448
}
49+
50+
/// Creates a stream with events separated by 12 - 17 minutes
51+
Stream<Completer> _createRenewStream({required int delayDrift}) {
52+
return Stream.periodic(Duration(minutes: 12), (_) => Completer()).asyncMap(
53+
(c) => Future.delayed(
54+
Duration(seconds: delayDrift + _random.nextInt(240)),
55+
() => c,
56+
),
57+
);
58+
}
59+
60+
/// Runs the search instance main controller, which creates separate isolates
61+
/// for the package and the SDK indexes.
62+
///
63+
/// When the [renewPackageIndex] has a new event, it will trigger the renewal of the
64+
/// package index isolate, updating the search index.
65+
Future<void> runSearchInstanceController({
66+
required int port,
67+
required Stream<Completer> renewPackageIndex,
68+
Duration renewWait = const Duration(minutes: 2),
69+
String? snapshot,
70+
Handler? handler,
71+
Future<void> Function()? processTerminationSignal,
72+
}) async {
73+
final packageIsolate = await startSearchIsolate(
74+
logger: _logger,
75+
snapshot: snapshot,
76+
);
77+
registerScopeExitCallback(packageIsolate.close);
78+
79+
final sdkIsolate = await startQueryIsolate(
80+
logger: _logger,
81+
kind: 'sdk',
82+
spawnUri: Uri.parse(
83+
'package:pub_dev/service/entrypoint/sdk_isolate_index.dart',
84+
),
85+
);
86+
registerScopeExitCallback(sdkIsolate.close);
87+
88+
registerSearchIndex(
89+
SearchResultCombiner(
90+
primaryIndex: LatencyAwareSearchIndex(IsolateSearchIndex(packageIsolate)),
91+
sdkIndex: SdkIsolateIndex(sdkIsolate),
92+
),
93+
);
94+
95+
final updateStream = renewPackageIndex.asyncMap((c) async {
96+
try {
97+
// create a new index and handover with a 2-minute maximum wait
98+
await packageIsolate.renew(count: 1, wait: renewWait);
99+
c.complete(null);
100+
} catch (e, st) {
101+
c.completeError(e, st);
102+
}
103+
});
104+
final updateListener = updateStream.listen((_) {
105+
_logger.info('Package SDK isolate renewed.');
106+
});
107+
108+
await runHandler(
109+
_logger,
110+
handler ?? searchServiceHandler,
111+
port: port,
112+
processTerminationSignal: processTerminationSignal,
113+
);
114+
await updateListener.cancel();
115+
}

app/lib/service/entrypoint/search_index.dart

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ Future<void> main(List<String> args, var message) async {
4949
if (snapshot == null) {
5050
await indexUpdater.init();
5151
} else {
52-
updatePackageIndex(await loadInMemoryPackageIndexFromFile(snapshot));
52+
updatePackageIndex(await loadInMemoryPackageIndexFromUrl(snapshot));
5353
}
5454

5555
await runIsolateFunctions(

app/lib/shared/handler_helpers.dart

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ Future<void> runHandler(
3838
shelf.Handler handler, {
3939
bool sanitize = false,
4040
int port = 8080,
41+
Future<void> Function()? processTerminationSignal,
4142
}) async {
4243
handler = wrapHandler(logger, handler, sanitize: sanitize);
4344
if (envConfig.isRunningInAppengine) {
@@ -60,7 +61,8 @@ Future<void> runHandler(
6061
port,
6162
shared: true,
6263
);
63-
await waitForProcessSignalTermination();
64+
processTerminationSignal ??= waitForProcessSignalTermination;
65+
await processTerminationSignal();
6466
await server.close();
6567
}
6668
}
Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
// Copyright (c) 2025, the Dart project authors. Please see the AUTHORS file
2+
// for details. All rights reserved. Use of this source code is governed by a
3+
// BSD-style license that can be found in the LICENSE file.
4+
5+
import 'dart:async';
6+
7+
import 'package:pub_dev/fake/server/fake_search_service.dart';
8+
import 'package:pub_dev/service/entrypoint/search.dart';
9+
import 'package:shelf/shelf_io.dart';
10+
import 'package:test/test.dart';
11+
12+
import '../../shared/test_services.dart';
13+
14+
void main() {
15+
group('Main search isolate controller', () {
16+
testWithProfile(
17+
'update the package index isolate',
18+
fn: () async {
19+
final snapshotServer = await setupLocalSnapshotServer();
20+
final renewController = StreamController<Completer>.broadcast();
21+
final processTerminationCompleter = Completer();
22+
final handlerStartedCompleter = Completer();
23+
try {
24+
final port = await _detectFreePort();
25+
final doneFuture = runSearchInstanceController(
26+
port: port,
27+
renewPackageIndex: renewController.stream,
28+
processTerminationSignal: () async {
29+
handlerStartedCompleter.complete();
30+
return await processTerminationCompleter.future;
31+
},
32+
renewWait: Duration.zero,
33+
snapshot: 'http://localhost:${snapshotServer.server.port}/',
34+
);
35+
await handlerStartedCompleter.future;
36+
37+
// force renew
38+
final c = Completer();
39+
renewController.add(c);
40+
await c.future;
41+
42+
processTerminationCompleter.complete();
43+
await doneFuture;
44+
} finally {
45+
await snapshotServer.close();
46+
await renewController.close();
47+
}
48+
},
49+
timeout: Timeout.factor(8),
50+
);
51+
});
52+
}
53+
54+
Future<int> _detectFreePort() async {
55+
final server = await IOServer.bind('localhost', 0);
56+
final port = server.server.port;
57+
await server.close();
58+
return port;
59+
}

pkg/pub_integration/lib/src/fake_pub_server_process.dart

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -151,13 +151,13 @@ class FakePubServerProcess {
151151
Future<void> get started => _startedCompleter.future;
152152

153153
Future<void> kill() async {
154-
// First try SIGINT, and after 10 seconds do SIGTERM.
154+
// First try SIGINT, and after 10 seconds do SIGKILL.
155155
print('Sending INT signal to ${_process.pid}...');
156156
_process.kill(ProcessSignal.sigint);
157157
await _coverageConfig?.waitForCollect();
158158
final timer = Timer(Duration(seconds: 10), () {
159159
print('Sending TERM signal to ${_process.pid}...');
160-
_process.kill();
160+
_process.kill(ProcessSignal.sigkill);
161161
});
162162
final exitCode = await _process.exitCode;
163163
print('Exit code: $exitCode');

0 commit comments

Comments
 (0)