Skip to content

Commit 9be84e0

Browse files
authored
Reduce fsspec usage in tests (#776)
* Reduce use of fsspec in tests. Remove the code that uses fsspec to delete data from cloud buckets. This is not needed as the cloud bucket is configured to delete all data over 24 hours in age. * Add references to obstore as well as fsspec in docs.
1 parent 0ebb206 commit 9be84e0

File tree

6 files changed

+117
-153
lines changed

6 files changed

+117
-153
lines changed

cubed/core/array.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -375,7 +375,7 @@ def measure_reserved_mem(
375375
reports peak memory, such as Lithops or Modal.
376376
377377
work_dir : str or None, optional
378-
The directory path (specified as an fsspec URL) used for storing intermediate data.
378+
The directory path (specified as an fsspec or obstore URL) used for storing intermediate data.
379379
This is required when using a cloud runtime.
380380
381381
kwargs

cubed/spec.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ def __init__(
2929
Parameters
3030
----------
3131
work_dir : str or None
32-
The directory path (specified as an fsspec URL) used for storing intermediate data.
32+
The directory path (specified as an fsspec or obstore URL) used for storing intermediate data.
3333
allowed_mem : int or str, optional
3434
The total memory available to a worker for running a task, in bytes.
3535
@@ -68,7 +68,7 @@ def __init__(
6868

6969
@property
7070
def work_dir(self) -> Optional[str]:
71-
"""The directory path (specified as an fsspec URL) used for storing intermediate data."""
71+
"""The directory path (specified as an fsspec or obstore URL) used for storing intermediate data."""
7272
return self._work_dir
7373

7474
@property

cubed/tests/runtime/test_modal.py

Lines changed: 59 additions & 81 deletions
Original file line numberDiff line numberDiff line change
@@ -6,13 +6,11 @@
66

77
import asyncio
88

9-
import fsspec
10-
119
from cubed.runtime.asyncio import async_map_unordered
1210
from cubed.runtime.executors.modal import modal_create_futures_func
1311
from cubed.tests.runtime.utils import check_invocation_counts, deterministic_failure
1412

15-
tmp_path = "s3://cubed-unittest/map_unordered"
13+
BASE_PATH = "s3://cubed-unittest/map_unordered"
1614
region = "us-east-1" # S3 region for above bucket
1715

1816
app = modal.App("cubed-test-app", include_source=True)
@@ -104,24 +102,20 @@ async def run_test(app_function, input, use_backups=False, batch_size=None, **kw
104102
# fmt: on
105103
@pytest.mark.parametrize("use_backups", [False, True])
106104
@pytest.mark.cloud
107-
def test_success(timing_map, n_tasks, retries, use_backups):
108-
try:
109-
outputs = asyncio.run(
110-
run_test(
111-
app_function=deterministic_failure_modal,
112-
input=range(n_tasks),
113-
use_backups=use_backups,
114-
path=tmp_path,
115-
timing_map=timing_map,
116-
)
105+
def test_success(tmp_path, timing_map, n_tasks, retries, use_backups):
106+
path = f"{BASE_PATH}/{tmp_path.name}"
107+
outputs = asyncio.run(
108+
run_test(
109+
app_function=deterministic_failure_modal,
110+
input=range(n_tasks),
111+
use_backups=use_backups,
112+
path=path,
113+
timing_map=timing_map,
117114
)
115+
)
118116

119-
assert outputs == set(range(n_tasks))
120-
check_invocation_counts(tmp_path, timing_map, n_tasks, retries)
121-
122-
finally:
123-
fs = fsspec.open(tmp_path).fs
124-
fs.rm(tmp_path, recursive=True)
117+
assert outputs == set(range(n_tasks))
118+
check_invocation_counts(path, timing_map, n_tasks, retries)
125119

126120

127121
# fmt: off
@@ -135,24 +129,20 @@ def test_success(timing_map, n_tasks, retries, use_backups):
135129
# fmt: on
136130
@pytest.mark.parametrize("use_backups", [False, True])
137131
@pytest.mark.cloud
138-
def test_failure(timing_map, n_tasks, retries, use_backups):
139-
try:
140-
with pytest.raises(RuntimeError):
141-
asyncio.run(
142-
run_test(
143-
app_function=deterministic_failure_modal,
144-
input=range(n_tasks),
145-
use_backups=use_backups,
146-
path=tmp_path,
147-
timing_map=timing_map,
148-
)
132+
def test_failure(tmp_path, timing_map, n_tasks, retries, use_backups):
133+
path = f"{BASE_PATH}/{tmp_path.name}"
134+
with pytest.raises(RuntimeError):
135+
asyncio.run(
136+
run_test(
137+
app_function=deterministic_failure_modal,
138+
input=range(n_tasks),
139+
use_backups=use_backups,
140+
path=path,
141+
timing_map=timing_map,
149142
)
143+
)
150144

151-
check_invocation_counts(tmp_path, timing_map, n_tasks, retries)
152-
153-
finally:
154-
fs = fsspec.open(tmp_path).fs
155-
fs.rm(tmp_path, recursive=True)
145+
check_invocation_counts(path, timing_map, n_tasks, retries)
156146

157147

158148
# fmt: off
@@ -165,24 +155,20 @@ def test_failure(timing_map, n_tasks, retries, use_backups):
165155
# fmt: on
166156
@pytest.mark.parametrize("use_backups", [False, True])
167157
@pytest.mark.cloud
168-
def test_large_number_of_tasks(timing_map, n_tasks, retries, use_backups):
169-
try:
170-
outputs = asyncio.run(
171-
run_test(
172-
app_function=deterministic_failure_modal,
173-
input=range(n_tasks),
174-
use_backups=use_backups,
175-
path=tmp_path,
176-
timing_map=timing_map
177-
)
158+
def test_large_number_of_tasks(tmp_path, timing_map, n_tasks, retries, use_backups):
159+
path = f"{BASE_PATH}/{tmp_path.name}"
160+
outputs = asyncio.run(
161+
run_test(
162+
app_function=deterministic_failure_modal,
163+
input=range(n_tasks),
164+
use_backups=use_backups,
165+
path=path,
166+
timing_map=timing_map
178167
)
168+
)
179169

180-
assert outputs == set(range(n_tasks))
181-
check_invocation_counts(tmp_path, timing_map, n_tasks, retries)
182-
183-
finally:
184-
fs = fsspec.open(tmp_path).fs
185-
fs.rm(tmp_path, recursive=True)
170+
assert outputs == set(range(n_tasks))
171+
check_invocation_counts(path, timing_map, n_tasks, retries)
186172

187173

188174
# fmt: off
@@ -195,43 +181,35 @@ def test_large_number_of_tasks(timing_map, n_tasks, retries, use_backups):
195181
)
196182
# fmt: on
197183
@pytest.mark.cloud
198-
def test_stragglers(timing_map, n_tasks, retries, expected_invocation_counts_overrides):
199-
try:
200-
outputs = asyncio.run(
201-
run_test(
202-
app_function=deterministic_failure_modal_long_timeout,
203-
input=range(n_tasks),
204-
path=tmp_path,
205-
timing_map=timing_map,
206-
use_backups=True,
207-
)
184+
def test_stragglers(tmp_path, timing_map, n_tasks, retries, expected_invocation_counts_overrides):
185+
path = f"{BASE_PATH}/{tmp_path.name}"
186+
outputs = asyncio.run(
187+
run_test(
188+
app_function=deterministic_failure_modal_long_timeout,
189+
input=range(n_tasks),
190+
path=path,
191+
timing_map=timing_map,
192+
use_backups=True,
208193
)
194+
)
209195

210-
assert outputs == set(range(n_tasks))
211-
check_invocation_counts(tmp_path, timing_map, n_tasks, retries, expected_invocation_counts_overrides)
212-
213-
finally:
214-
fs = fsspec.open(tmp_path).fs
215-
fs.rm(tmp_path, recursive=True)
196+
assert outputs == set(range(n_tasks))
197+
check_invocation_counts(path, timing_map, n_tasks, retries, expected_invocation_counts_overrides)
216198

217199

218200
@pytest.mark.cloud
219201
def test_batch(tmp_path):
220202
# input is unbounded, so if entire input were consumed and not read
221203
# in batches then it would never return, since it would never
222204
# run the first (failing) input
223-
try:
224-
with pytest.raises(RuntimeError):
225-
asyncio.run(
226-
run_test(
227-
app_function=deterministic_failure_modal_no_retries,
228-
input=itertools.count(),
229-
path=tmp_path,
230-
timing_map={0: [-1]},
231-
batch_size=10,
232-
)
205+
path = f"{BASE_PATH}/{tmp_path.name}"
206+
with pytest.raises(RuntimeError):
207+
asyncio.run(
208+
run_test(
209+
app_function=deterministic_failure_modal_no_retries,
210+
input=itertools.count(),
211+
path=path,
212+
timing_map={0: [-1]},
213+
batch_size=10,
233214
)
234-
235-
finally:
236-
fs = fsspec.open(tmp_path).fs
237-
fs.rm(tmp_path, recursive=True)
215+
)

cubed/tests/test_array_api.py

Lines changed: 35 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
import fsspec
21
import numpy as np
32
import pytest
43

@@ -430,52 +429,46 @@ def test_matmul(spec, executor):
430429

431430
@pytest.mark.cloud
432431
def test_matmul_cloud(executor):
433-
tmp_path = "gs://barry-zarr-test/matmul"
434-
spec = cubed.Spec(tmp_path, allowed_mem=100000)
435-
try:
436-
a = xp.asarray(
437-
[[1, 2, 3, 4], [5, 6, 7, 8], [9, 10, 11, 12], [13, 14, 15, 16]],
438-
chunks=(2, 2),
439-
spec=spec,
440-
)
441-
b = xp.asarray(
442-
[[1, 2, 3, 4], [5, 6, 7, 8], [9, 10, 11, 12], [13, 14, 15, 16]],
443-
chunks=(2, 2),
444-
spec=spec,
445-
)
446-
c = xp.matmul(a, b)
447-
x = np.array([[1, 2, 3, 4], [5, 6, 7, 8], [9, 10, 11, 12], [13, 14, 15, 16]])
448-
y = np.array([[1, 2, 3, 4], [5, 6, 7, 8], [9, 10, 11, 12], [13, 14, 15, 16]])
449-
expected = np.matmul(x, y)
450-
assert_array_equal(c.compute(executor=executor), expected)
451-
finally:
452-
fs = fsspec.open(tmp_path).fs
453-
fs.rm(tmp_path, recursive=True)
432+
tmp_path = "s3://cubed-unittest/matmul"
433+
spec = cubed.Spec(tmp_path, allowed_mem=100000, storage_options=dict(use_obstore=True))
434+
435+
a = xp.asarray(
436+
[[1, 2, 3, 4], [5, 6, 7, 8], [9, 10, 11, 12], [13, 14, 15, 16]],
437+
chunks=(2, 2),
438+
spec=spec,
439+
)
440+
b = xp.asarray(
441+
[[1, 2, 3, 4], [5, 6, 7, 8], [9, 10, 11, 12], [13, 14, 15, 16]],
442+
chunks=(2, 2),
443+
spec=spec,
444+
)
445+
c = xp.matmul(a, b)
446+
x = np.array([[1, 2, 3, 4], [5, 6, 7, 8], [9, 10, 11, 12], [13, 14, 15, 16]])
447+
y = np.array([[1, 2, 3, 4], [5, 6, 7, 8], [9, 10, 11, 12], [13, 14, 15, 16]])
448+
expected = np.matmul(x, y)
449+
assert_array_equal(c.compute(executor=executor), expected)
454450

455451

456452
@pytest.mark.cloud
457453
def test_matmul_modal(modal_executor):
458454
tmp_path = "s3://cubed-unittest/matmul"
459-
spec = cubed.Spec(tmp_path, allowed_mem=100000)
460-
try:
461-
a = xp.asarray(
462-
[[1, 2, 3, 4], [5, 6, 7, 8], [9, 10, 11, 12], [13, 14, 15, 16]],
463-
chunks=(2, 2),
464-
spec=spec,
465-
)
466-
b = xp.asarray(
467-
[[1, 2, 3, 4], [5, 6, 7, 8], [9, 10, 11, 12], [13, 14, 15, 16]],
468-
chunks=(2, 2),
469-
spec=spec,
470-
)
471-
c = xp.matmul(a, b)
472-
x = np.array([[1, 2, 3, 4], [5, 6, 7, 8], [9, 10, 11, 12], [13, 14, 15, 16]])
473-
y = np.array([[1, 2, 3, 4], [5, 6, 7, 8], [9, 10, 11, 12], [13, 14, 15, 16]])
474-
expected = np.matmul(x, y)
475-
assert_array_equal(c.compute(executor=modal_executor), expected)
476-
finally:
477-
fs = fsspec.open(tmp_path).fs
478-
fs.rm(tmp_path, recursive=True)
455+
spec = cubed.Spec(tmp_path, allowed_mem=100000, storage_options=dict(use_obstore=True))
456+
457+
a = xp.asarray(
458+
[[1, 2, 3, 4], [5, 6, 7, 8], [9, 10, 11, 12], [13, 14, 15, 16]],
459+
chunks=(2, 2),
460+
spec=spec,
461+
)
462+
b = xp.asarray(
463+
[[1, 2, 3, 4], [5, 6, 7, 8], [9, 10, 11, 12], [13, 14, 15, 16]],
464+
chunks=(2, 2),
465+
spec=spec,
466+
)
467+
c = xp.matmul(a, b)
468+
x = np.array([[1, 2, 3, 4], [5, 6, 7, 8], [9, 10, 11, 12], [13, 14, 15, 16]])
469+
y = np.array([[1, 2, 3, 4], [5, 6, 7, 8], [9, 10, 11, 12], [13, 14, 15, 16]])
470+
expected = np.matmul(x, y)
471+
assert_array_equal(c.compute(executor=modal_executor), expected)
479472

480473

481474
def test_outer(spec, executor):

cubed/tests/test_executor_features.py

Lines changed: 19 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22
import platform
33
import re
44

5-
import fsspec
65
import numpy as np
76
import psutil
87
import pytest
@@ -152,20 +151,17 @@ def test_callbacks_modal(spec, modal_executor):
152151
task_counter = TaskCounter(check_timestamps=False)
153152
tmp_path = "s3://cubed-unittest/callbacks"
154153
spec = cubed.Spec(tmp_path, allowed_mem=100000)
155-
try:
156-
a = xp.asarray([[1, 2, 3], [4, 5, 6], [7, 8, 9]], chunks=(2, 2), spec=spec)
157-
b = xp.asarray([[1, 1, 1], [1, 1, 1], [1, 1, 1]], chunks=(2, 2), spec=spec)
158-
c = xp.add(a, b)
159-
assert_array_equal(
160-
c.compute(executor=modal_executor, callbacks=[task_counter]),
161-
np.array([[2, 3, 4], [5, 6, 7], [8, 9, 10]]),
162-
)
163154

164-
num_created_arrays = 1
165-
assert task_counter.value == num_created_arrays + 4
166-
finally:
167-
fs = fsspec.open(tmp_path).fs
168-
fs.rm(tmp_path, recursive=True)
155+
a = xp.asarray([[1, 2, 3], [4, 5, 6], [7, 8, 9]], chunks=(2, 2), spec=spec)
156+
b = xp.asarray([[1, 1, 1], [1, 1, 1], [1, 1, 1]], chunks=(2, 2), spec=spec)
157+
c = xp.add(a, b)
158+
assert_array_equal(
159+
c.compute(executor=modal_executor, callbacks=[task_counter]),
160+
np.array([[2, 3, 4], [5, 6, 7], [8, 9, 10]]),
161+
)
162+
163+
num_created_arrays = 1
164+
assert task_counter.value == num_created_arrays + 4
169165

170166

171167
@pytest.mark.skipif(
@@ -246,19 +242,16 @@ def test_compute_arrays_in_parallel(spec, any_executor, compute_arrays_in_parall
246242
def test_compute_arrays_in_parallel_modal(modal_executor, compute_arrays_in_parallel):
247243
tmp_path = "s3://cubed-unittest/parallel_pipelines"
248244
spec = cubed.Spec(tmp_path, allowed_mem=100000)
249-
try:
250-
a = cubed.random.random((10, 10), chunks=(5, 5), spec=spec)
251-
b = cubed.random.random((10, 10), chunks=(5, 5), spec=spec)
252-
c = xp.add(a, b)
253245

254-
# note that this merely checks that compute_arrays_in_parallel is accepted
255-
c.compute(
256-
executor=modal_executor,
257-
compute_arrays_in_parallel=compute_arrays_in_parallel,
258-
)
259-
finally:
260-
fs = fsspec.open(tmp_path).fs
261-
fs.rm(tmp_path, recursive=True)
246+
a = cubed.random.random((10, 10), chunks=(5, 5), spec=spec)
247+
b = cubed.random.random((10, 10), chunks=(5, 5), spec=spec)
248+
c = xp.add(a, b)
249+
250+
# note that this merely checks that compute_arrays_in_parallel is accepted
251+
c.compute(
252+
executor=modal_executor,
253+
compute_arrays_in_parallel=compute_arrays_in_parallel,
254+
)
262255

263256

264257
def test_check_runtime_memory_dask(spec, executor):

docs/user-guide/storage.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ Cubed will delete intermediate data only when the main Python process running th
88

99
## Cloud storage
1010

11-
When using a cloud service, the working directory should be set to a cloud storage directory in the same cloud region that the executor runtimes are in. In this case the directory is specified as a [`fsspec`](https://filesystem-spec.readthedocs.io/en/latest/) URL, such as `s3://cubed-tomwhite-temp`. This is how you would set it using a {py:class}`Spec <cubed.Spec>` object:
11+
When using a cloud service, the working directory should be set to a cloud storage directory in the same cloud region that the executor runtimes are in. In this case the directory is specified as a [`fsspec`](https://filesystem-spec.readthedocs.io/en/latest/) or [`obstore`](https://developmentseed.org/obstore/latest/) URL, such as `s3://cubed-tomwhite-temp`. This is how you would set it using a {py:class}`Spec <cubed.Spec>` object:
1212

1313
```python
1414
import cubed

0 commit comments

Comments
 (0)