Intro
I am parallelising some code using dask.distributed
(embarrassingly parallel task).
- I have a list of Paths pointing to different images that I scatter to workers.
- Each worker loads and filters an image (3D stack) and run some filtering. 3D filtering with scipy saves intermediates outputs.
- Each filtered image is saved as npy and/or png on disk.
- I am testing locally before running on a cluster and my setup is:
.
from dask.distributed import Client, LocalCluster
cluster = LocalCluster(n_workers=2, threads_per_worker=1,memory_limit =8e9)
client = Client(cluster)
Issue:
- When I process only two images (1 image/worker) everything is fine
- When I scatter more than one image per worker I get this warnings in which the process memory value increases.
.
distributed.worker - WARNING - Memory use is high but worker has no data to store to disk.
Perhaps some other process is leaking memory? Process memory: 6.21 GB -- Worker memory limit: 8.00 GB
suggesting that part of the RAM used by the worker is non freed
between the different files (I guess are leftover filtering intermediates....)
Question
Is there a way to free the memory of the worker before starting the processing of the next image? do I have to run a garbage collector
cycle in between running tasks?
edit
I included gc.collect()
call at the end of the function run by the worker but didn't eliminate the warnings.
Thanks a lot for the help!