-
-
Notifications
You must be signed in to change notification settings - Fork 1.2k
Description
What is your issue?
I am trying to scatter an large array and pass it as keyword argument to a function applied using apply_ufunc
but that is currently not working.
The same function works if providing the actual array, but if providing the Future linked to the scatter data the task fails.
Here is a minimal example to reproduce this issue
import dask.array as da
import xarray as xr
import numpy as np
data = xr.DataArray(data=da.random.random((15, 15, 20)), coords={'x': range(15), 'y': range(15), 'z': range(20)}, dims=('x', 'y', 'z'))
test = np.full((20,), 30)
test_future = client.scatter(test, broadcast=True)
def _copy_test(d, test=None):
return test
new_data_actual = xr.apply_ufunc(
_copy_test,
data,
input_core_dims=[['z']],
output_core_dims=[['new_z']],
vectorize=True,
dask='parallelized',
output_dtypes="float64",
kwargs={'test':test},
dask_gufunc_kwargs = {'output_sizes':{'new_z':20}}
)
new_data_future = xr.apply_ufunc(
_copy_test,
data,
input_core_dims=[['z']],
output_core_dims=[['new_z']],
vectorize=True,
dask='parallelized',
output_dtypes="float64",
kwargs={'test':test_future},
dask_gufunc_kwargs = {'output_sizes':{'new_z':20}}
)
data[0, 0].compute()
#[0.3034994 , 0.08172002, 0.34731092, ...]
new_data_actual[0, 0].compute()
#[30.0, 30.0, 30.0, ...]
new_data_future[0,0].compute()
#KilledWorker
I tried different versions of this, going from explicitly calling test.result()
to change the way the Future was passed, but nothing worked.
I also tried to raise exceptions within the function and various way to print information, but that also did not work. This last issue makes me think that if passing a Future I actually don't get to the scope of that function
Am I trying to do something completely silly? or is this an unexpected behavior?