I am trying to apply a simple operation in parallel on a datacube, but I keep running into
a TypeError
.
This is my input datacube (called dcube
):
<xarray.DataArray 'A' (time: 6, pol: 1, y: 823, x: 1296)>
dask.array<where, shape=(6, 1, 823, 1296), dtype=float64, chunksize=(6, 1, 823, 1000), chunktype=numpy.ndarray>
Coordinates:
* pol (pol) object 'HH'
* y (y) float64 3.653e+06 3.653e+06 3.653e+06 ... 3.652e+06 3.652e+06
* x (x) float64 1.047e+06 1.047e+06 1.047e+06 ... 1.049e+06 1.049e+06
* time (time) datetime64[ns] 2020-11-05T06:44:35 ... 2020-12-08T16:42:42
id (time) <U51 dask.array<chunksize=(6,), meta=np.ndarray>
Attributes:
orbit: A
transform: (1.1987408006010196, 0.0, 1047472.9120438152, 0.0, -1.198740...
crs: {'proj': 'utm', 'zone': 10, 'datum': 'WGS84', 'units': 'm', ...
The function I am trying to compute over looks like this:
def compute_something(chunk):
img1 = chunk[0] + 0.00001
img2 = chunk[1] + 0.00001
return da.log10(img1 / img2)
When I try running:
da.map_blocks(compute_something, dcube, dtype=np.float32).compute()
This is the error I see:
/usr/local/lib/python3.7/site-packages/tornado/gen.py in run(self)
760
761 try:
--> 762 value = future.result()
763 except Exception:
764 exc_info = sys.exc_info()
/usr/local/lib/python3.7/site-packages/distributed/client.py in _gather(self, futures, errors, direct, local_worker)
1848 exc = CancelledError(key)
1849 else:
-> 1850 raise exception.with_traceback(traceback)
1851 raise exc
1852 if errors == "skip":
TypeError: tuple indices must be integers or slices, not tuple
Nothing in the traceback seems to indicate what the error is.
In order to understand the problem a bit more, I created this minimal example:
arr = da.from_array(xa.DataArray([np.random.random((200,200))]),(1,10,10))
Then I try to execute this simple function:
def fn(x):
return x
When I try to distribute with map_blocks
I get an assertion error:
AA = da.map_blocks(fn, arr).compute()
/usr/local/lib/python3.7/site-packages/dask/array/core.py in new_da_object(dsk, name, chunks, meta, dtype)
5017 from ..dataframe.core import new_dd_object
5018
-> 5019 assert all(len(c) == 1 for c in chunks[1:])
5020 divisions = [None] * (len(chunks[0]) + 1)
5021 return new_dd_object(dsk, name, meta, divisions)
AssertionError:
This leads me to think perhaps my understanding of how map_blocks
works are lacking. Any help would be appreciated.
Update / Edit:
After a little more investigation I have discovered that if the chunk type is ndarray
everything works our perfectly, but if the chunk type is xarray
it doesn't. Not sure why this is, how one can interconvert; but if I initialize arr
as:
arr = da.from_array(np.random.random((200,200)))
map_blocks
works perfectly. But if it contains an xarray
it doesn't