I am trying to apply a simple operation in parallel on a datacube, but I keep running into a TypeError.

This is my input datacube (called dcube):

<xarray.DataArray 'A' (time: 6, pol: 1, y: 823, x: 1296)>
dask.array<where, shape=(6, 1, 823, 1296), dtype=float64, chunksize=(6, 1, 823, 1000), chunktype=numpy.ndarray>
  * pol      (pol) object 'HH'
  * y        (y) float64 3.653e+06 3.653e+06 3.653e+06 ... 3.652e+06 3.652e+06
  * x        (x) float64 1.047e+06 1.047e+06 1.047e+06 ... 1.049e+06 1.049e+06
  * time     (time) datetime64[ns] 2020-11-05T06:44:35 ... 2020-12-08T16:42:42
    id       (time) <U51 dask.array<chunksize=(6,), meta=np.ndarray>
    orbit:       A
    transform:   (1.1987408006010196, 0.0, 1047472.9120438152, 0.0, -1.198740...
    crs:         {'proj': 'utm', 'zone': 10, 'datum': 'WGS84', 'units': 'm', ...

The function I am trying to compute over looks like this:

def compute_something(chunk):
    img1 = chunk[0] + 0.00001
    img2 = chunk[1] + 0.00001
    return da.log10(img1 / img2)

When I try running:

da.map_blocks(compute_something, dcube, dtype=np.float32).compute()

This is the error I see:

/usr/local/lib/python3.7/site-packages/tornado/gen.py in run(self)
    761                     try:
--> 762                         value = future.result()
    763                     except Exception:
    764                         exc_info = sys.exc_info()

/usr/local/lib/python3.7/site-packages/distributed/client.py in _gather(self, futures, errors, direct, local_worker)
   1848                             exc = CancelledError(key)
   1849                         else:
-> 1850                             raise exception.with_traceback(traceback)
   1851                         raise exc
   1852                     if errors == "skip":

TypeError: tuple indices must be integers or slices, not tuple

Nothing in the traceback seems to indicate what the error is.

In order to understand the problem a bit more, I created this minimal example:

arr = da.from_array(xa.DataArray([np.random.random((200,200))]),(1,10,10))


Then I try to execute this simple function:

def fn(x):
   return x

When I try to distribute with map_blocks I get an assertion error:

AA = da.map_blocks(fn, arr).compute()
/usr/local/lib/python3.7/site-packages/dask/array/core.py in new_da_object(dsk, name, chunks, meta, dtype)
   5017         from ..dataframe.core import new_dd_object
-> 5019         assert all(len(c) == 1 for c in chunks[1:])
   5020         divisions = [None] * (len(chunks[0]) + 1)
   5021         return new_dd_object(dsk, name, meta, divisions)


This leads me to think perhaps my understanding of how map_blocks works are lacking. Any help would be appreciated.

Update / Edit:

After a little more investigation I have discovered that if the chunk type is ndarray everything works our perfectly, but if the chunk type is xarray it doesn't. Not sure why this is, how one can interconvert; but if I initialize arr as:

arr = da.from_array(np.random.random((200,200)))

map_blocks works perfectly. But if it contains an xarray it doesn't


Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.