10

Intro

I am parallelising some code using dask.distributed (embarrassingly parallel task).

  • I have a list of Paths pointing to different images that I scatter to workers.
  • Each worker loads and filters an image (3D stack) and run some filtering. 3D filtering with scipy saves intermediates outputs.
  • Each filtered image is saved as npy and/or png on disk.
  • I am testing locally before running on a cluster and my setup is:

.

from dask.distributed import Client, LocalCluster
cluster = LocalCluster(n_workers=2, threads_per_worker=1,memory_limit =8e9)
client = Client(cluster)

Issue:

  • When I process only two images (1 image/worker) everything is fine
  • When I scatter more than one image per worker I get this warnings in which the process memory value increases.

.

distributed.worker - WARNING - Memory use is high but worker has no data to store to disk.
Perhaps some other process is leaking memory?  Process memory: 6.21 GB -- Worker memory limit: 8.00 GB

suggesting that part of the RAM used by the worker is non freed between the different files (I guess are leftover filtering intermediates....)

Question

Is there a way to free the memory of the worker before starting the processing of the next image? do I have to run a garbage collector cycle in between running tasks?

edit

I included gc.collect() call at the end of the function run by the worker but didn't eliminate the warnings.

Thanks a lot for the help!

3
  • did you get around this problem?
    – stav
    Commented May 12, 2019 at 21:40
  • 1
    no, still an issue
    – s1mc0d3
    Commented Jun 7, 2019 at 6:26
  • I guess there is still no solution?
    – MehmedB
    Commented Feb 20, 2020 at 9:44

2 Answers 2

1

As long as the reference count for a distributed value is held by a client the cluster won't purge it from memory. This is expounded on in the Managing Memory documentation, specifically the "Clearing data" section.

1

The "Memory use is high" error message could be pointing to a few potential culprits. I found this article by one of the core Dask maintainers helpful in diagnosing and fixing the issue.

Quick summary, either:

  1. Break your data into smaller chunks.
  2. Manually trigger garbage collection and/or tweak the gc settings on the workers through a Worker Plugin (which op has tried but doesn't work; I'll include anyway for other readers)
  3. Trim memory using malloc_trim (esp. if working with non-NumPy data or small NumPy chunks)

Make sure you can see the Dask Dashboard while your computations are running to figure out which approach is working.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Not the answer you're looking for? Browse other questions tagged or ask your own question.