Skip to content

Dask Jobqueue introduction

Dask is a flexible library for parallel and distributed computing in Python. It enables parallelization thank to its own scheduling system that is backed with threads or processes. It also makes possible the distributed computing backed with a HPC cluster, by submitting jobs. Thus, Dask and Dask-Joqueue are able to tackle HPC and big data computations, i.e. when a dataset can't be loaded in a single machine RAM. But even if you don't work on big data or HPC projects, you would want to submit cluster jobs with Python scripts. This tutorial ends with some examples (e.g., Map-Reduce).

Warning

This tutorial only focuses on the instantiation of jobs/workers using Dask-Jobqueue. It doesn't address parallel and distributed computing.

Info

Distribution is achieved by running many Dask workers within jobs on a HPC cluster, each worker sending messages/data to the others (like OpenMPI). Parallelization is achieved by running many threads within a worker, sharing the same data (like OpenMP).

Info

The Dask-Jobqueue library is installed in the Python and IA modules. You can also install dask-jobqueue via pip following the Anaconda module extension procedure described here.

Knowledge requirements

  • Unix processes and threads.
  • Distributed and parallel computing
  • Python Global Interpreter Lock (GIL) and why multithreading in Python is inefficient for CPU-bound tasks when using libraries that don't release the GIL, and why multithreading in Python may be useful for IO-bound tasks.
  • Dask:
    • DataFrame, delayed, future concepts in Dask (link).
    • The beginner's guide for distributed and parallel computation (link), specially for the specification of the workers (number of workers, number of threads per worker, etc.).
    • The Dask distributed library documentation (link).
  • Dask Jobqueue documentation (link).

Terminology

  • Dask tasks: a chunk of data (DataFrame) to be processed, delayed (lazy) or submitted (immediate) function calls.

  • Dask workers: computation units that process tasks, one by one. They are Unix processes that can instantiate threads for parallel computing. WorkerS can be instantiated within Slurm jobs.

  • Slurm jobs: usual definition for HPC clusters, nothing fancy.

  • Dask-Jobqueue Cluster: Dask-Jobqueue defines the concept of cluster (*Cluster classes) that is just a job factory. It holds the specifications of cluster jobs, not the specification of a computer cluster.

  • Main Slurm job: in this tutorial, the main job refers to the Slurm job (interactive, batch or Jupyter Lab/Notebook) that instantiates the workers (and runs the Dask scheduler).

Points to consider

Sources

Chunks

"Dask is often used in situations where the data are too big to fit in memory. In these cases the data are split into chunks or partitions. Each task is computed on the chunk and then the results are aggregated."

"The central scheduler spends a few hundred microseconds on every task. For optimal performance, task durations should be greater than 10-100ms."

"Individual tasks should be a comfortable size so as not to overwhelm any particular worker."

Info

You have to choose wisely the specifications of the chunks of data or to implement wisely your delayed and submitted function logic.

Warning

The resources of the main Slurm job (memory and cores) must also be chosen wisely as the aggregation of the computation results must fit in the memory of the main Slurm job and the Dask scheduler that takes care of workers must not suffer from CPU starvation.

Dask workers

"If each task contains a non-trivial amount of work, then the fastest way to run dask is to have a worker for each concurrent task. For chunked data, if each worker is able to comfortably hold one data chunk in memory and do some computation on that data, then the number of chunks should be a multiple of the number of workers. This ensures that there is always enough work for a worker to do."

"When setting up a dask cluster you have to decide how many workers to use. It can be tempting to use many workers, but that isn’t always a good idea. If you use too many workers some may not have enough to do and spend much of their time idle."

Info

The number of workers is a multiple of the number of tasks (chunks, delayed and submitted function calls) and it has to be optimized.

Warning

Don't forget that requesting a lot of workers implies submitting a lot of Slurm job requests that can take a long time to get them running.

"Dask can not parallelize within individual tasks."

"If your computations are mostly numeric in nature (for example NumPy and Pandas computations) and release the GIL entirely then it is advisable to run dask-worker processes with many threads and one process. This reduces communication costs and generally simplifies deployment."

"If you truly are doing mostly numerical computations, you can specify as many total threads as you have core".

"There are some rules of thumb when to worry about GIL lockages, and thus prefer more workers over heavier individual workers with high nthreads: If your code is mostly pure Python (in non-optimized Python libraries) on non-numerical data. If your code causes computations external to Python that are long running and don’t release the GIL explicitly."

Info

Specify workers with more than one thread if you are sure to take advantage of parallelization. Dask doesn't magically parallelize your computations. The latters have to be designed to be so, and implemented with libraries that can do so, e.g. Numpy based libraries. Remember that for some but not all vector computations, Numpy is able to automatically parallelize them.

Warning

Threads share the memory of the worker that instantiates them. It means that for a worker with 10 Gio RAM and 5 threads, each thread gets 2 Gio. If the computation takes more than 2 Gio, the thread will write onto the disk (memory spilling), that will slow down a lot the process or may die.