Skip to main content
Filter by
Sorted by
Tagged with
0 votes
0 answers
7 views

Initializing a local cluster in Dask takes forever

I'm trying out some things with Dask for the first time, and while I had it running a few weeks ago, I now find that I can't get the LocalCluster initiated. I've cut if off after running 30 minutes at ...
MKJ's user avatar
  • 308
0 votes
0 answers
28 views

dask_cuda problem with Local CUDA Cluster

I am trying to get this code to work and then use it to train various models on two gpu's: from dask_cuda import LocalCUDACluster from dask.distributed import Client if __name__ == "__main__&...
Danilo Caputo's user avatar
0 votes
0 answers
23 views

Processing the same array, dask.array is too slow compared to numpy.array

import BWStest as bws import numpy as np from skimage.measure import label import dask.array from tqdm import tqdm CalWin = [7,25] stack = [] thershold = 0.05 for i in range(5): image = np.random....
user19298695's user avatar
1 vote
1 answer
28 views

How to nest dask.delayed functions within other dask.delayed functions

I am trying to learn dask, and have created the following toy example of a delayed pipeline. +-----+ +-----+ +-----+ | baz +--+ bar +--+ foo | +-----+ +-----+ +-----+ So baz has a dependency on ...
Steve Lorimer's user avatar
0 votes
1 answer
33 views

Dask distributed pause task to wait for subtask - how-to, or bad practice?

I am running tasks using client.submit thus: from dask.distributed import Client, get_client, wait, as_completed # other imports zip_and_upload_futures = [ client.submit(zip_and_upload, id, path, ...
Dave's user avatar
  • 420
0 votes
1 answer
23 views

Use dask with numactl

I am using dask to parallelize an operation that is memory-bound. So, I want to ensure each dask worker has access to a single NUMA node and prevent cross-node memory access. I can do this in the ...
kgully's user avatar
  • 662
0 votes
0 answers
75 views

"Sending large graph": what is the significance of this warning?

I have a zarr dataset on disk, which I open with xarray using: import xarray as xr import numpy as np import dask.distributed as dd # setup dask cluster = dd.LocalCluster() client = dd.Client(cluster)...
Colo's user avatar
  • 26
0 votes
0 answers
24 views

How does parquet read happens in the Modin distributed environment?

I am using Modin for reading the parquet file through panda. I have two clusters of 256 GB of RAM but even though duckdb on the single machine takes 50 seconds to go group by. Modin is going infinite ...
Bhaskar Dabhi's user avatar
0 votes
0 answers
21 views

Dask worker running long calls

The code running on the dask worker calls asyncio.run() and proceeds to exectue a series of async calls (on the worker running event_loop) that gather data, and then run a small computation. This ...
Dirich's user avatar
  • 442
0 votes
0 answers
22 views

Dask-distributed LocalCluster spinup hangs only in non-interactive execution

I want to start and connect to a dask.distributed LocalCluster from within a python script. However, I get different behavior depending on the way in which I invoke the same lines of code. The ...
Charles Davis's user avatar
0 votes
0 answers
25 views

Importing SQL Table from Snowflake into Jupyter using Dask

I have an SQL Table in Snowflake,100K rows and 15 Columns. I want to import this table into my Jupyter notebook using Dask for further analysis. Primarily doing this a form of practice since I am new ...
OscarOz's user avatar
0 votes
1 answer
64 views

Understanding if current process is part of a multiprocessing pool with --multiprocessing-fork

I need to find a way for a python process to figure out if it was launched as part of a multiprocessing pool. I am using dask to parallelize calculations, using dask.distributed.LocalCluster. For UX ...
pnjun's user avatar
  • 151
0 votes
0 answers
21 views

ModuleNotFoundError: no module named ... when submitting job to client with an object from that module

I have a weird issue I am not able to solve. I have created a Local dask client, and I have a preload script that imports local modules. The preload script contains a list of paths, and runs sys.path....
Rezzy's user avatar
  • 131
0 votes
1 answer
29 views

Common workflow for using Dask on HPC systems

I’m new to Dask. I’m currently working in an HPC managed by SLURM with some compute nodes (those that execute the jobs) and the login node (which I access through SSH to send the SLURM jobs). I’m ...
Joseph Pena's user avatar
0 votes
0 answers
44 views

How to fix memory errors merging large dask dataframes?

I am trying to read 23 CSV files into dask dataframes, merge them together using dask, and ouptut to parquet. However, it's failing due to memory issues. I used to use pandas to join these together ...
ifightfortheuserz's user avatar
0 votes
1 answer
58 views

How do I set the timeout parameter in dask scheduler

I was trying to run dask-distributed to distribute some big computation in a slurm cluster. I was always getting a "TimeoutError: No valid workers found" message (this came from line 6130 in ...
Fidel's user avatar
  • 37
0 votes
0 answers
23 views

Does dask worker heartbeat when loaded with tasks?

When dask workers have been assigned some tasks by schedulers, do the workers send heartbeats to scheduler or while performing tasks then don't heart beat. I am thinking that while working dask-...
Chaitanya Kshirsagar's user avatar
0 votes
0 answers
39 views

Null values in log file from Python logging module when used with Dask

I have been trying to setup logging using the logging module in a Python script, and I have got it working properly. It can now log to both the console and a log file. But if fails when I setup a Dask ...
RogUE's user avatar
  • 363
0 votes
1 answer
50 views

How can I keep Dask workers busy when processing large datasets to prevent them from running out of tasks?

I'm trying to process a large dataset (around 1 million tasks) using Dask distributed computing in Python. (I am getting data from a database to process it, and I am retriving around 1M rows). Here I ...
Polymood's user avatar
  • 461
0 votes
0 answers
33 views

Memory issues when using Dask

I have just started using 'dask', and I am running into some memory issues. I would like to share my problem along with a dummy code snippet that illustrates what I’ve done. My goal is to read a large ...
jei L's user avatar
  • 33
0 votes
1 answer
64 views

How do I modularize functions that work with dask?

I'm trying to modularize my functions that use Dask, but I keep encountering the error "No module named 'setup'". I can't import any local module that is related to Dask, and currently, ...
Anderson's user avatar
2 votes
1 answer
204 views

Is there a way to save into a zarr file an xarray, with the possibility of appending in multiple dimensions?

I'm currently doing an internship where I need to create large datasets, often hundreds of GB in size. I'm collecting temporal samples for cartography, where I collect 500 samples for each ...
Allan Delautre's user avatar
0 votes
1 answer
42 views

Dask - High CPU consumption unloading parallel workers

I’m using dask to make parallel processing of a simulation. It consists of a series of differential equations that are numerically solved using numpy arrays that are compiled using numba @jit ...
nsantana's user avatar
0 votes
0 answers
32 views

Efficiently processing large molecular datasets with Dask Disctributed, DataFrames and Prefect,

I'm working with a large dataset of molecular structures (approximately 240,000 records) stored in a PostgreSQL database. I need to perform computations on each molecule using RDKit. I'm using Dask ...
Polymood's user avatar
  • 461
0 votes
0 answers
36 views

Optimal way to convert an xr.Dataset to a Dask DataFrame?

I am running a function in a Dask cluster to compute the normalized difference between two of my data variables in my xarray Dataset object. However, I need this to be in a Dask DataFrame format first ...
Adriano Matos's user avatar
0 votes
0 answers
83 views

How to speed up interpolation in dask

I have a piece of data code that performs interpolation on a large number of arrays. This is extremely quick with numpy, but: The data the code will work with in reality will often not fit in memory ...
abinitio's user avatar
  • 805
0 votes
0 answers
30 views

How to retrieve a task's progress as percentage done in Dask?

I'd like to query for a task's status in a Dask cluster by retrieving a percentage completed beyond the visual progress bar or dashboard. For example, I'm submitting this task below: from dask....
dmn's user avatar
  • 23
0 votes
1 answer
135 views

How to use user-defined fsspec filesystem with dask?

I made my own filesystem in the fsspec library and I am trying to read in dask dataframes from this filesystem object to open the dataframe file. However I am getting an error when I try to do this. ...
Brian Moths's user avatar
  • 1,215
0 votes
1 answer
69 views

What is the cleanest way to detect whether I'm running in a dask Worker

Given a dask.distributed cluster, for example a LocalCluster, what is the most robust way to detect if I'm running a python code from within a Worker instance? This can be code that is not strictly ...
Alessio Arena's user avatar
-1 votes
1 answer
108 views

Understanding task stream and speeding up Distributed Dask

I have implemented some data analysis in Dask using dask-distributed, but the performance is very far from the same analysis implemented in numpy/pandas and I am finding it difficult to understand the ...
abinitio's user avatar
  • 805
0 votes
0 answers
22 views

xarray/dask wait drive I/O before continuing processing

The pseudocode looks as follows: def obtain_chunk(da): c = download_chunk() c = process_stuff(c) return c # lazy dask array wrapped into xarray with many chunks da = xr.DataArray(dask....
cmosig's user avatar
  • 1,317
0 votes
0 answers
41 views

How to properly use SQLAlchemy with Dask distributed jobs

I'm having a lot of trouble using SQLAlchemy to store the results of distributed jobs. Previously, I had an abstraction layer between the SQLAlchemy objects and the main code, meaning that all ...
Roberto's user avatar
  • 151
2 votes
0 answers
16 views

Is there a way to restrict stack traces from emitting dataframe contents?

By default, our system logs stack traces in logs output. Generally, we're careful to not log contents of dataframes we're working with as they may contain sensitive user data. However, when Dask ...
Kenny Leftin's user avatar
0 votes
0 answers
100 views

Dask Can't synchronously read data shared on NFS

Running Dask Scheduler on system A and workers on system A and B. NFS volume from system A is shared on the network through NFS with system B, and contains the data files. This folder has a symbolic ...
Steffan's user avatar
  • 556
1 vote
1 answer
121 views

Default n_workers when creating a Dask cluster?

Simple question. If I create a Dask cluster using the following code: from dask.distributed import Client client = Client() How many workers will it create? I ran this code on one machine, and it ...
Adriano Matos's user avatar
1 vote
0 answers
38 views

Dask worker nodes and multiprocessing raising TypeError during object initialization

I have a program that I wrote. I define a class in this program that is a subclass of a class I import. If I run this code without Dask, I successfully run it. When I plug in Dask, I get an error ...
olivarb's user avatar
  • 237
1 vote
1 answer
52 views

How does dask handles the datasets larger than the memory?

I'm seeking guidance on efficiently profiling data using Dask. I've opted to use Dask to lazily load the DataFrame, either from SQL tables (dask.read_sql_table) or CSV files (dask.read_csv). I am ...
Faizan's user avatar
  • 333
0 votes
0 answers
71 views

How to Efficiently Profile Data Using Dask in Parallel with Kubernetes Cluster

I'm seeking guidance on efficiently profiling data using Dask within a Kubernetes cluster. My data sources include data lakes and file systems like S3. Profiling tasks entail generating various ...
Faizan's user avatar
  • 333
1 vote
0 answers
50 views

asyncio.exceptions.CancelledError when using Dask LocalCluster with processes=False and progress

This is an example: import numpy as np import zarr from dask.distributed import Client, LocalCluster from dask import array as da from dask.distributed import progress def same(x): return x x = ...
Kang Liang's user avatar
0 votes
0 answers
26 views

Dask change divisions when write to_parquet file

Work with dask dataframe: Read parquet file with two column - ddf. Set index (based on existing column). ddf.compute_current_divisions() return divisions like [0, 1000000, 2000000, 3000000, ...] ...
Ann's user avatar
  • 1
0 votes
0 answers
87 views

Is there a way to analyze the dask worker killed?

I have ~30GB uncompressed spatial data, it contains id, tags, and coordinates as three columns in parquet file with row group size 64MB. I used dask read_parquet with block_size 32MiB got 118 ...
GUOZHAN SUN's user avatar
1 vote
0 answers
119 views

Using dask.distributed with rioxarray rio.to_raster results in `ValueError: Lock is not yet acquired`

I am trying to write some code using dask.distributed.Client and rioxarray to_raster that: Concatenates two rasters (dask arrays) Applies a function across all blocks in the concatenated array Writes ...
katieb1's user avatar
  • 11
0 votes
1 answer
80 views

How to convert convert a datetime string to timestamp in dask cudf and then sort the dataframe by this column

I would like to convert a datetime string to timestamp in dask cudf and then sort the dataframe by this column. Example: import dask_cudf as ddf import pandas as pd # Sample data (replace with your ...
user3448011's user avatar
  • 1,599
0 votes
0 answers
54 views

Using dask performance report with concurrent.future

I have a workflow that runs with dask and/or with concurrent.futures. Basically, depending on the machine that the code runs, it runs on dask or with the futures executor. Now, inside my code I am ...
Alejandro's user avatar
  • 5,206
0 votes
1 answer
46 views

How Dask manages file descriptors

How does Dask manage file descriptors? For example when creating a dask.array from an hdf5 file. When the array is large enough to be chunked. Do the created tasks inherit the file descriptor created ...
Mitchou's user avatar
  • 37
-1 votes
1 answer
62 views

read file csv and do the aggregation with multiple workers , dask.distributed , dask.dataframe

i have server ip:192.168.33.10 launche the schudeler dask scheduler --host 0.0.0.0 this is master in this server i have file "/var/shared/job_skills.csv" and the workers is 192.168.33.11,192....
Mohamed Amine's user avatar
1 vote
1 answer
52 views

Why dask shows smaller size than the actual size of the data (numpy array)?

Dask shows slightly smaller size than the actual size of a numpy array. Here is an example of a numpy array that is exactly 32 Mb: import dask as da import dask.array import numpy as np shape = (1000,...
Ress's user avatar
  • 780
0 votes
0 answers
32 views

Training sklearn estimators on large datasets throws aiohttp.client_exceptions.ClientOSError: [Errno 104] Connection reset by peer

I am trying to run prefect flow on Kubernetes. I have bigger dataset and want to train multiple estimators in parallel. See the below code for more in details. from sklearn.datasets import ...
maxx's user avatar
  • 1
0 votes
0 answers
42 views

Airflow daskexecutor exception: "FileNotFoundError(2, 'No such file or directory')" on dask worker

I deploy 2 services by Dockerfile+Docker-compose: Airflow 2.8.1 (with apache-airflow-providers-daskexecutor==1.1.1) Daskexecutor (pip install "dask[complete]==2023.4.1") My dag on airflow: ...
Tai Lu's user avatar
  • 43
0 votes
1 answer
77 views

Can't dd.read_sql on jupyter, kernel crashes

I'm coming here because I don't understand my problem. I created a dockerfile + compose which creates 1 dask scheduler and 2 workers: docker-compose.yaml: version: '3.8' services: dask-scheduler: ...
gtnchtb's user avatar

1
2 3 4 5
23