11from __future__ import annotations
22
3+ import asyncio
34import os
45
56import pytest
@@ -24,22 +25,27 @@ def force_spill():
2425
2526 manager = get_global_manager ()
2627
27- # 24 bytes
28+ # Allocate a new dataframe and trigger spilling by setting a 1 byte limit
2829 df = cudf .DataFrame ({"a" : [1 , 2 , 3 ]})
30+ manager .spill_to_device_limit (1 )
2931
30- return manager .spill_to_device_limit (1 )
32+ # Get bytes spilled from GPU to CPU
33+ spill_totals , _ = get_global_manager ().statistics .spill_totals [("gpu" , "cpu" )]
34+ return spill_totals
3135
3236
3337@gen_cluster (
3438 client = True ,
3539 nthreads = [("127.0.0.1" , 1 )],
3640)
37- @pytest .mark .flaky (reruns = 10 , reruns_delay = 5 )
3841async def test_cudf_metrics (c , s , * workers ):
3942 w = list (s .workers .values ())[0 ]
4043 assert "cudf" in w .metrics
4144 assert w .metrics ["cudf" ]["cudf-spilled" ] == 0
4245
43- await c .run (force_spill )
44-
45- assert w .metrics ["cudf" ]["cudf-spilled" ] == 24
46+ spill_totals = (await c .run (force_spill , workers = [w .address ]))[w .address ]
47+ assert spill_totals > 0
48+ # We have to wait for the worker's metrics to update.
49+ # TODO: avoid sleep, is it possible to wait on the next update of metrics?
50+ await asyncio .sleep (1 )
51+ assert w .metrics ["cudf" ]["cudf-spilled" ] == spill_totals
0 commit comments