1,150 questions
0
votes
0
answers
7
views
Initializing a local cluster in Dask takes forever
I'm trying out some things with Dask for the first time, and while I had it running a few weeks ago, I now find that I can't get the LocalCluster initiated. I've cut if off after running 30 minutes at ...
0
votes
0
answers
28
views
dask_cuda problem with Local CUDA Cluster
I am trying to get this code to work and then use it to train various models on two gpu's:
from dask_cuda import LocalCUDACluster
from dask.distributed import Client
if __name__ == "__main__&...
0
votes
0
answers
23
views
Processing the same array, dask.array is too slow compared to numpy.array
import BWStest as bws
import numpy as np
from skimage.measure import label
import dask.array
from tqdm import tqdm
CalWin = [7,25]
stack = []
thershold = 0.05
for i in range(5):
image = np.random....
1
vote
1
answer
28
views
How to nest dask.delayed functions within other dask.delayed functions
I am trying to learn dask, and have created the following toy example of a delayed pipeline.
+-----+ +-----+ +-----+
| baz +--+ bar +--+ foo |
+-----+ +-----+ +-----+
So baz has a dependency on ...
0
votes
1
answer
33
views
Dask distributed pause task to wait for subtask - how-to, or bad practice?
I am running tasks using client.submit thus:
from dask.distributed import Client, get_client, wait, as_completed
# other imports
zip_and_upload_futures = [ client.submit(zip_and_upload, id, path, ...
0
votes
1
answer
23
views
Use dask with numactl
I am using dask to parallelize an operation that is memory-bound. So, I want to ensure each dask worker has access to a single NUMA node and prevent cross-node memory access. I can do this in the ...
0
votes
0
answers
75
views
"Sending large graph": what is the significance of this warning?
I have a zarr dataset on disk, which I open with xarray using:
import xarray as xr
import numpy as np
import dask.distributed as dd
# setup dask
cluster = dd.LocalCluster()
client = dd.Client(cluster)...
0
votes
0
answers
24
views
How does parquet read happens in the Modin distributed environment?
I am using Modin for reading the parquet file through panda. I have two clusters of 256 GB of RAM but even though duckdb on the single machine takes 50 seconds to go group by. Modin is going infinite ...
0
votes
0
answers
21
views
Dask worker running long calls
The code running on the dask worker calls asyncio.run() and proceeds to exectue a series of async calls (on the worker running event_loop) that gather data, and then run a small computation.
This ...
0
votes
0
answers
22
views
Dask-distributed LocalCluster spinup hangs only in non-interactive execution
I want to start and connect to a dask.distributed LocalCluster from within a python script. However, I get different behavior depending on the way in which I invoke the same lines of code. The ...
0
votes
0
answers
25
views
Importing SQL Table from Snowflake into Jupyter using Dask
I have an SQL Table in Snowflake,100K rows and 15 Columns. I want to import this table into my Jupyter notebook using Dask for further analysis. Primarily doing this a form of practice since I am new ...
0
votes
1
answer
64
views
Understanding if current process is part of a multiprocessing pool with --multiprocessing-fork
I need to find a way for a python process to figure out if it was launched as part of a multiprocessing pool.
I am using dask to parallelize calculations, using dask.distributed.LocalCluster. For UX ...
0
votes
0
answers
21
views
ModuleNotFoundError: no module named ... when submitting job to client with an object from that module
I have a weird issue I am not able to solve.
I have created a Local dask client, and I have a preload script that imports local modules. The preload script contains a list of paths, and runs sys.path....
0
votes
1
answer
29
views
Common workflow for using Dask on HPC systems
I’m new to Dask. I’m currently working in an HPC managed by SLURM with some compute nodes (those that execute the jobs) and the login node (which I access through SSH to send the SLURM jobs). I’m ...
0
votes
0
answers
44
views
How to fix memory errors merging large dask dataframes?
I am trying to read 23 CSV files into dask dataframes, merge them together using dask, and ouptut to parquet. However, it's failing due to memory issues.
I used to use pandas to join these together ...
0
votes
1
answer
58
views
How do I set the timeout parameter in dask scheduler
I was trying to run dask-distributed to distribute some big computation in a slurm cluster.
I was always getting a "TimeoutError: No valid workers found" message (this came from line 6130 in ...
0
votes
0
answers
23
views
Does dask worker heartbeat when loaded with tasks?
When dask workers have been assigned some tasks by schedulers, do the workers send heartbeats to scheduler or while performing tasks then don't heart beat. I am thinking that while working dask-...
0
votes
0
answers
39
views
Null values in log file from Python logging module when used with Dask
I have been trying to setup logging using the logging module in a Python script, and I have got it working properly. It can now log to both the console and a log file. But if fails when I setup a Dask ...
0
votes
1
answer
50
views
How can I keep Dask workers busy when processing large datasets to prevent them from running out of tasks?
I'm trying to process a large dataset (around 1 million tasks) using Dask distributed computing in Python. (I am getting data from a database to process it, and I am retriving around 1M rows). Here I ...
0
votes
0
answers
33
views
Memory issues when using Dask
I have just started using 'dask', and I am running into some memory issues. I would like to share my problem along with a dummy code snippet that illustrates what I’ve done.
My goal is to read a large ...
0
votes
1
answer
64
views
How do I modularize functions that work with dask?
I'm trying to modularize my functions that use Dask, but I keep encountering the error "No module named 'setup'". I can't import any local module that is related to Dask, and currently, ...
2
votes
1
answer
204
views
Is there a way to save into a zarr file an xarray, with the possibility of appending in multiple dimensions?
I'm currently doing an internship where I need to create large datasets, often hundreds of GB in size. I'm collecting temporal samples for cartography, where I collect 500 samples for each ...
0
votes
1
answer
42
views
Dask - High CPU consumption unloading parallel workers
I’m using dask to make parallel processing of a simulation. It consists of a series of differential equations that are numerically solved using numpy arrays that are compiled using numba @jit ...
0
votes
0
answers
32
views
Efficiently processing large molecular datasets with Dask Disctributed, DataFrames and Prefect,
I'm working with a large dataset of molecular structures (approximately 240,000 records) stored in a PostgreSQL database. I need to perform computations on each molecule using RDKit. I'm using Dask ...
0
votes
0
answers
36
views
Optimal way to convert an xr.Dataset to a Dask DataFrame?
I am running a function in a Dask cluster to compute the normalized difference between two of my data variables in my xarray Dataset object. However, I need this to be in a Dask DataFrame format first ...
0
votes
0
answers
83
views
How to speed up interpolation in dask
I have a piece of data code that performs interpolation on a large number of arrays.
This is extremely quick with numpy, but:
The data the code will work with in reality will often not fit in memory
...
0
votes
0
answers
30
views
How to retrieve a task's progress as percentage done in Dask?
I'd like to query for a task's status in a Dask cluster by retrieving a percentage completed beyond the visual progress bar or dashboard. For example, I'm submitting this task below:
from dask....
0
votes
1
answer
135
views
How to use user-defined fsspec filesystem with dask?
I made my own filesystem in the fsspec library and I am trying to read in dask dataframes from this filesystem object to open the dataframe file. However I am getting an error when I try to do this. ...
0
votes
1
answer
69
views
What is the cleanest way to detect whether I'm running in a dask Worker
Given a dask.distributed cluster, for example a LocalCluster, what is the most robust way to detect if I'm running a python code from within a Worker instance?
This can be code that is not strictly ...
-1
votes
1
answer
108
views
Understanding task stream and speeding up Distributed Dask
I have implemented some data analysis in Dask using dask-distributed, but the performance is very far from the same analysis implemented in numpy/pandas and I am finding it difficult to understand the ...
0
votes
0
answers
22
views
xarray/dask wait drive I/O before continuing processing
The pseudocode looks as follows:
def obtain_chunk(da):
c = download_chunk()
c = process_stuff(c)
return c
# lazy dask array wrapped into xarray with many chunks
da = xr.DataArray(dask....
0
votes
0
answers
41
views
How to properly use SQLAlchemy with Dask distributed jobs
I'm having a lot of trouble using SQLAlchemy to store the results of distributed jobs. Previously, I had an abstraction layer between the SQLAlchemy objects and the main code, meaning that all ...
2
votes
0
answers
16
views
Is there a way to restrict stack traces from emitting dataframe contents?
By default, our system logs stack traces in logs output. Generally, we're careful to not log contents of dataframes we're working with as they may contain sensitive user data. However, when Dask ...
0
votes
0
answers
100
views
Dask Can't synchronously read data shared on NFS
Running Dask Scheduler on system A and workers on system A and B. NFS volume from system A is shared on the network through NFS with system B, and contains the data files. This folder has a symbolic ...
1
vote
1
answer
121
views
Default n_workers when creating a Dask cluster?
Simple question. If I create a Dask cluster using the following code:
from dask.distributed import Client
client = Client()
How many workers will it create? I ran this code on one machine, and it ...
1
vote
0
answers
38
views
Dask worker nodes and multiprocessing raising TypeError during object initialization
I have a program that I wrote. I define a class in this program that is a subclass of a class I import. If I run this code without Dask, I successfully run it. When I plug in Dask, I get an error ...
1
vote
1
answer
52
views
How does dask handles the datasets larger than the memory?
I'm seeking guidance on efficiently profiling data using Dask.
I've opted to use Dask to lazily load the DataFrame, either from SQL tables (dask.read_sql_table) or CSV files (dask.read_csv).
I am ...
0
votes
0
answers
71
views
How to Efficiently Profile Data Using Dask in Parallel with Kubernetes Cluster
I'm seeking guidance on efficiently profiling data using Dask within a Kubernetes cluster. My data sources include data lakes and file systems like S3. Profiling tasks entail generating various ...
1
vote
0
answers
50
views
asyncio.exceptions.CancelledError when using Dask LocalCluster with processes=False and progress
This is an example:
import numpy as np
import zarr
from dask.distributed import Client, LocalCluster
from dask import array as da
from dask.distributed import progress
def same(x):
return x
x = ...
0
votes
0
answers
26
views
Dask change divisions when write to_parquet file
Work with dask dataframe:
Read parquet file with two column - ddf.
Set index (based on existing column).
ddf.compute_current_divisions() return divisions like [0, 1000000, 2000000, 3000000, ...]
...
0
votes
0
answers
87
views
Is there a way to analyze the dask worker killed?
I have ~30GB uncompressed spatial data, it contains id, tags, and coordinates as three columns in parquet file with row group size 64MB.
I used dask read_parquet with block_size 32MiB got 118 ...
1
vote
0
answers
119
views
Using dask.distributed with rioxarray rio.to_raster results in `ValueError: Lock is not yet acquired`
I am trying to write some code using dask.distributed.Client and rioxarray to_raster that:
Concatenates two rasters (dask arrays)
Applies a function across all blocks in the concatenated array
Writes ...
0
votes
1
answer
80
views
How to convert convert a datetime string to timestamp in dask cudf and then sort the dataframe by this column
I would like to convert a datetime string to timestamp in dask cudf and then sort the dataframe by this column.
Example:
import dask_cudf as ddf
import pandas as pd
# Sample data (replace with your ...
0
votes
0
answers
54
views
Using dask performance report with concurrent.future
I have a workflow that runs with dask and/or with concurrent.futures. Basically, depending on the machine that the code runs, it runs on dask or with the futures executor. Now, inside my code I am ...
0
votes
1
answer
46
views
How Dask manages file descriptors
How does Dask manage file descriptors?
For example when creating a dask.array from an hdf5 file. When the array is large enough to be chunked.
Do the created tasks inherit the file descriptor created ...
-1
votes
1
answer
62
views
read file csv and do the aggregation with multiple workers , dask.distributed , dask.dataframe
i have server ip:192.168.33.10 launche the schudeler dask scheduler --host 0.0.0.0 this is master in this server i have file "/var/shared/job_skills.csv" and the workers is
192.168.33.11,192....
1
vote
1
answer
52
views
Why dask shows smaller size than the actual size of the data (numpy array)?
Dask shows slightly smaller size than the actual size of a numpy array. Here is an example of a numpy array that is exactly 32 Mb:
import dask as da
import dask.array
import numpy as np
shape = (1000,...
0
votes
0
answers
32
views
Training sklearn estimators on large datasets throws aiohttp.client_exceptions.ClientOSError: [Errno 104] Connection reset by peer
I am trying to run prefect flow on Kubernetes. I have bigger dataset and want to train multiple estimators in parallel. See the below code for more in details.
from sklearn.datasets import ...
0
votes
0
answers
42
views
Airflow daskexecutor exception: "FileNotFoundError(2, 'No such file or directory')" on dask worker
I deploy 2 services by Dockerfile+Docker-compose:
Airflow 2.8.1 (with apache-airflow-providers-daskexecutor==1.1.1)
Daskexecutor (pip install "dask[complete]==2023.4.1")
My dag on airflow:
...
0
votes
1
answer
77
views
Can't dd.read_sql on jupyter, kernel crashes
I'm coming here because I don't understand my problem.
I created a dockerfile + compose which creates 1 dask scheduler and 2 workers:
docker-compose.yaml:
version: '3.8'
services:
dask-scheduler:
...