Skip to content

Dask Jobqueue

Dask is a flexible library for parallel and distributed computing in Python. It enables parallelization thank to its own scheduling system that is backed with threads or processes. It also makes possible the distributed computing backed with a HPC cluster, by submitting jobs. Thus, Dask and Dask-Joqueue are able to tackle HPC and big data computations, i.e. when a dataset can't be loaded in a single machine RAM. But even if you don't work on big data or HPC projects, you would want to submit cluster jobs with Python scripts. This tutorial ends with some examples (e.g., Map-Reduce).

Warning

This tutorial only focuses on the instantiation of jobs/workers using Dask-Jobqueue. It doesn't address parallel and distributed computing.

Info

Distribution is achieved by running many Dask workers within jobs on a HPC cluster, each worker sending messages/data to the others (like OpenMPI). Parallelization is achieved by running many threads within a worker, sharing the same data (like OpenMP).

Info

The Dask-Jobqueue library is installed in the Python and IA modules. You can also install dask-jobqueue via pip following the Anaconda module extension procedure described here.

Knowledge requirements

  • Unix processes and threads.
  • Distributed and parallel computing
  • Python Global Interpreter Lock (GIL) and why multithreading in Python is inefficient for CPU-bound tasks when using libraries that don't release the GIL, and why multithreading in Python may be useful for IO-bound tasks.
  • Dask:
    • DataFrame, delayed, future concepts in Dask (link).
    • The beginner's guide for distributed and parallel computation (link), specially for the specification of the workers (number of workers, number of threads per worker, etc.).
    • The Dask distributed library documentation (link).
  • Dask Jobqueue documentation (link).

Terminology

  • Dask tasks: a chunk of data (DataFrame) to be processed, delayed (lazy) or submitted (immediate) function calls.

  • Dask workers: computation units that process tasks, one by one. They are Unix processes that can instantiate threads for parallel computing. Workers can be instantiated within cluster jobs.

  • Cluster jobs: usual definition for HPC clusters, nothing fancy.

  • Dask-Jobqueue Cluster: Dask-Jobqueue defines the concept of cluster (*Cluster classes) that is just a job factory. It holds the specifications of cluster jobs, not the specification of a computer cluster.

  • Master job: in this tutorial, the master job refers to the job (interactive, Jupyter Notebook or batch) that instantiates the workers (that runs the Dask scheduler).

Points to consider

Sources

Chunks

"Dask is often used in situations where the data are too big to fit in memory. In these cases the data are split into chunks or partitions. Each task is computed on the chunk and then the results are aggregated."

"The central scheduler spends a few hundred microseconds on every task. For optimal performance, task durations should be greater than 10-100ms."

"Individual tasks should be a comfortable size so as not to overwhelm any particular worker."

Info

You have to choose wisely the specifications of the chunks of data or to implement wisely your delayed and submitted function logic.

Warning

The resources of the master job (memory and cores) must also be chosen wisely as the aggregation of the computation results must fit in the memory of the master job and the Dask scheduler that takes care of workers must not suffer from CPU starvation.

Workers

"If each task contains a non-trivial amount of work, then the fastest way to run dask is to have a worker for each concurrent task. For chunked data, if each worker is able to comfortably hold one data chunk in memory and do some computation on that data, then the number of chunks should be a multiple of the number of workers. This ensures that there is always enough work for a worker to do."

"When setting up a dask cluster you have to decide how many workers to use. It can be tempting to use many workers, but that isn’t always a good idea. If you use too many workers some may not have enough to do and spend much of their time idle."

Info

The number of workers is a multiple of the number of tasks (chunks, delayed and submitted function calls) and it has to be optimized.

Warning

Don't forget that requesting a lot of workers implies submitting a lot of cluster job requests that can take a long time to get them running.

"Dask can not parallelize within individual tasks."

"If your computations are mostly numeric in nature (for example NumPy and Pandas computations) and release the GIL entirely then it is advisable to run dask-worker processes with many threads and one process. This reduces communication costs and generally simplifies deployment."

"If you truly are doing mostly numerical computations, you can specify as many total threads as you have core".

"There are some rules of thumb when to worry about GIL lockages, and thus prefer more workers over heavier individual workers with high nthreads: If your code is mostly pure Python (in non-optimized Python libraries) on non-numerical data. If your code causes computations external to Python that are long running and don’t release the GIL explicitly."

Info

Specify workers with more than one thread if you are sure to take advantage of parallelization. Dask doesn't magically parallelize your computations. The latters have to be designed to be so, and implemented with libraries that can do so, e.g. Numpy based libraries. Remember that for some but not all vector computations, Numpy is able to automatically parallelize them.

Warning

Threads share the memory of the worker that instantiates them. It means that for a worker with 10 Gio RAM and 5 threads, each thread gets 2 Gio. If the computation takes more than 2 Gio, the thread will write onto the disk (memory spilling), that will slow down a lot the process or may die.

Overview

This tutorial describes the following steps to instantiate workers on the IPSL's computer clusters. Each step is developed in its own section.

  1. Worker configuration
  2. Job instantiation
  3. Monitoring
  4. Computations
  5. Shutdown

Worker configuration

In this section, we will see how to configure workers that will be instantiated within cluster jobs using Dask-Jobqueue. From an interactive job, Jupyter Notebook or a batch job, which we shall call the master job:

  • Load the necessary libraries (installed in a Python module or in your own Python environment)
from dask.distributed import Client
import dask_jobqueue as djq
import numpy as np
import time
import os
from os import path
  • Define the cluster/job factory specifications
### Cluster configuration
# Specify if the cluster adapts the number of workers.
# Read adapt method documentation.
# Should be False for performance testing / configuration optimization <=> determinism.
# Should be True for production, as it play nicely on HPC cluster.
IS_CLUSTER_ADAPTIVE = False
DASK_DIRECTORY_PATH = path.join(os.getcwd(), 'tmp_dask_workspace')
  • Define the worker specifications
### Dask worker configuration
# Total number of workers availables for distributed computation.
NB_WORKERS            = 1
if IS_CLUSTER_ADAPTIVE:
    NB_MIN_WORKERS    = 1
else:
    NB_MIN_WORKERS    = 'not applied'
# In Gio (power of 2 ; not Go (power of 10) ; For Dask Gio == GiB != GB == Go).
WORKER_MEM            = 1
# Number of threads per worker availables for parallel computation.
# Set more than 1 if your are **sure** to take advantage of parallelization.
NB_THREADS_PER_WORKER = 1
  • Define the cluster job specifications
### Cluster job configuration
# Must be consistent with NB_WORKERS: NB_WORKERS = NB_WORKERS_PER_JOB * nb_jobs
# where nb_jobs is an integer.
NB_WORKERS_PER_JOB   = 1
JOB_WALLTIME         = '00:30:00'
# Specify the network interface name that jobs use to communicate together (run ip a).
#NET_INTERFACE        = 'ib0' disable!
# Specify where to write worker log files.
WORKER_LOG_DIRECTORY = path.join(DASK_DIRECTORY_PATH, 'logs')
TMP_WORKER_DIRECTORY = path.join(DASK_DIRECTORY_PATH, 'tmp')
  • Infer the rest of the specifications

### Inferred worker configuration
# Insure worker specifications.
# Dask Jobqueue 0.7.3 is unable to handle this correctly...
worker_extra_opts = [f'--nthreads={NB_THREADS_PER_WORKER}',
                     f'--nprocs={NB_WORKERS_PER_JOB}',
                     f'--memory-limit={WORKER_MEM}GiB']
### Inferred job configuration
# Number of cluster job requested.
nb_jobs          = int(np.ceil(NB_WORKERS/NB_WORKERS_PER_JOB))
# Number of CPU cores per job.
nb_cores_per_job = NB_THREADS_PER_WORKER * NB_WORKERS_PER_JOB
# Quantity of memory for a job.
job_mem          = NB_WORKERS_PER_JOB * WORKER_MEM
job_vmem         = job_mem
# Specific to Ciclad and Climserv.
# Unnecessary for Spirit.
job_extra_opts   = (f'-l mem={job_mem}gb', f'-l vmem={job_vmem}gb',
                    f'-l nodes=1:ppn={nb_cores_per_job}')
* Pretty print the configuration

print(
f'''
> Configuration:

# Cluster configuration
- cluster adaptive: {IS_CLUSTER_ADAPTIVE}

# Worker configuration
- nb workers: {NB_WORKERS}
- min workers: {NB_MIN_WORKERS}
- memory per worker: {WORKER_MEM} Gio
- nb threads per worker: {NB_THREADS_PER_WORKER}

# Cluster job configuration
- nb jobs: {nb_jobs}
- nb cores per job {nb_cores_per_job}
- nb workers per job: {NB_WORKERS_PER_JOB}
- memory per job: {job_mem} Gio
- virtual memory per job: {job_vmem} Gio
- job walltime: {JOB_WALLTIME}
- log directory path: {WORKER_LOG_DIRECTORY}
#- job network interface: {NET_INTERFACE} disable!
''')

Start the Dask cluster

cluster = djq.PBSCluster(cores=nb_cores_per_job, processes=NB_WORKERS_PER_JOB,
                         memory=f'{job_mem}GiB', interface=NET_INTERFACE,
                         walltime=JOB_WALLTIME, log_directory=WORKER_LOG_DIRECTORY,
                         local_directory=TMP_WORKER_DIRECTORY,
                         extra=worker_extra_opts, job_extra=job_extra_opts)
print(f'> The dashboard link: {cluster.dashboard_link}')

The job factory enable a web server that provides a useful dashboard interface. This dashboard displays the task computation graph, the workers memory and CPU occupations, etc. The web server opens a network port, which default value is 8787. If the port is already binded, Dask-Jobqueue opens another port and outputs its actual value.

Warning

Don't forget to check the port value as you won't be able to connect to the dashboard without the right port value.

Job instantiation

From the master job:

Info

The scale and adapt methods trigger the submission of the job requests to the cluster scheduler. The computation starts when at least one worker, and so one cluster job, is ready.

Info

As matter of debug, you can output the request that is executed to submit a job to the scheduler of the cluster. print(cluster.job_script())

Monitoring

Command line

Execute this following shell line so as to monitor your cluster job every 2 seconds (states, runing time, etc.).

watch -n 2 'qstat -a -u $USER | egrep "Q|R|H"'

Dashboard

Dask-Jobqueue comes with a dashboard that provides a meaningful interface to monitor the workers. The dashboard is enable after the instantiation of the cluster/job factory, and its URL can be displayed with this instruction:

print(f'the dashboard link: {cluster.dashboard_link}')

Pasting the URL on your web browser displays the dashboard provided your computer is connected to the IPSL network (at the office or elsewhere but through the IPSL VPN).

Tip

If you cannot get access to the dashboard, you can try SSL tunneling method which is described here.

Computations

The Python instructions right after the worker instantiations are executed in the Dask distributed computing contexte, provided at least one worker is ready. If these instructions are Dask computations involving chunks of data, delayed or submitted function calls, the computations are distributed and their results are merged back to the master job.

Warning

Make sure that the result of your computation can fit into the memory of the master job!

Shutdown

The following instructions shutdown Dask infrastructure and terminate the cluster jobs. Don't forget to execute them after your computations.

client.shutdown()
client.close()
cluster.close()

It is highly recommended to use the with statement in batch jobs:

with djq.PBSCluster(cores=nb_cores_per_job, processes=NB_WORKERS_PER_JOB,
                    memory=f'{job_mem}GiB', interface=NET_INTERFACE,
                    walltime=JOB_WALLTIME, log_directory=WORKER_LOG_DIRECTORY,
                    local_directory=TMP_WORKER_DIRECTORY,
                    extra=worker_extra_opts,
                    job_extra=job_extra_opts) as cluster, Client(cluster) as client:
    if IS_CLUSTER_ADAPTIVE:
        cluster.adapt(minimum=NB_MIN_WORKERS, maximum=NB_WORKERS)
    else:
        # Better control when scaling on jobs instead of workers.
        cluster.scale(jobs=nb_jobs)

    # This instruction blocks until the number of ready workers reaches
    # the specified value. For the sake of the distributed computing, you should
    # make the master job to wait at least one worker.
    print(f'> Waiting for one worker at least')
    client.wait_for_workers(n_workers=1)
    # ...

Info

You may restart The Dask cluster and keep your allocated cluster jobs with the following instruction: client.restart().

Info

Exiting a Python/iPython interpreter, batch job or interactive job that runs the master job will terminate the instantiated cluster jobs.

Warning

Aborting a Python/iPython interpreter, batch job or interactive job that runs the master job doesn't abort the instantiated cluster jobs! Don't forget to abort them too!

Vector computation example

This example loads about 18.6 GiB of random data. As each job memory cannot exceed 8 GiB, this example demonstrates that the data array is chunked and distributed across the 3 jobs by Dask. You can see the distribution on the dashboard (tab 'workers', column 'managed').

  • Connect to HPC cluster
  • Run an interactive job (or a JupyterLab)
qsub -IV -l mem=2gb -l vmem=2gb -l nodes=1:ppn=1
  • Load a Python module and open a Python interpreter
# Or any other module that contains the required libraries.
module load python/meso-3.9
ipython
  • Run the following script

from dask.distributed import Client
import dask_jobqueue as djq
import numpy as np
import time
import os
from os import path

### Cluster configuration
DASK_DIRECTORY_PATH   = path.join(os.getcwd(), 'tmp_dask_workspace')

### Dask worker configuration
# Total number of workers availables for distributed computation.
NB_WORKERS            = 3
# In Gio (power of 2 ; not Go (power of 10) ; For Dask Gio == GiB != GB == Go).
WORKER_MEM            = 8
# Number of threads per worker availables for parallel computation.
# Set more than 1 if your are sure to take advantage of parallelization.
NB_THREADS_PER_WORKER = 1

### Cluster job configuration
# Must be consistent with NB_WORKERS: NB_WORKERS = NB_WORKERS_PER_JOB * nb_jobs
# where nb_jobs is an integer.
NB_WORKERS_PER_JOB   = 1
JOB_WALLTIME         = '00:30:00'
# Specify the network interface name that jobs use to communicate together (run ip a).
NET_INTERFACE        = 'ib0'
# Specify where to write worker log files.
WORKER_LOG_DIRECTORY = path.join(DASK_DIRECTORY_PATH, 'logs')
TMP_WORKER_DIRECTORY = path.join(DASK_DIRECTORY_PATH, 'tmp')

### Inferred worker configuration
# Insure worker specifications.
# Dask Jobqueue 0.7.3 is unable to handle this correctly...
worker_extra_opts = [f'--nthreads={NB_THREADS_PER_WORKER}',
                     f'--nprocs={NB_WORKERS_PER_JOB}',
                     f'--memory-limit={WORKER_MEM}GiB']
### Inferred job configuration
# Number of cluster job requested.
nb_jobs          = int(np.ceil(NB_WORKERS/NB_WORKERS_PER_JOB))
# Number of CPU cores per job.
nb_cores_per_job = NB_THREADS_PER_WORKER * NB_WORKERS_PER_JOB
# Quantity of memory for a job.
job_mem          = NB_WORKERS_PER_JOB * WORKER_MEM
job_vmem         = job_mem
# Specific to Ciclad and Climserv.
# Unnecessary for Spirit.
job_extra_opts   = (f'-l mem={job_mem}gb', f'-l vmem={job_vmem}gb',
                    f'-l nodes=1:ppn={nb_cores_per_job}')

cluster = djq.PBSCluster(
                      cores=nb_cores_per_job, processes=NB_WORKERS_PER_JOB,
                      memory=f'{job_mem}GiB', interface=NET_INTERFACE,
                      walltime=JOB_WALLTIME, log_directory=WORKER_LOG_DIRECTORY,
                      local_directory=TMP_WORKER_DIRECTORY,
                      extra=worker_extra_opts, job_extra=job_extra_opts)
print(f'> The dashboard link: {cluster.dashboard_link}')

# Better control when scaling on jobs instead of workers.
cluster.scale(jobs=nb_jobs)
client = Client(cluster)

# It is better to wait all the workers to be ready
# as the random data doesn't fit into one worker memory
# (it will lead to spilling/swapping data on disk).
print(f'> Waiting for {NB_WORKERS} worker(s)')
# This instruction blocks until all workers are ready.
client.wait_for_workers(n_workers=NB_WORKERS)

# For this example only.
import dask.array as da
from datetime import datetime
import time

############ DISTRIBUTED COMPUTING ZONE ############
x = da.random.random((5e4, 5e4), chunks=(1000, 1000))
print(f'size in Gib: {x.nbytes/(1024*1024*1024)}')
y = x + x.T
y.persist()
####################################################
When the computations end, don't forget to shutdown Dask:

client.shutdown()
client.close()
cluster.close()

Map-Reduce example

The following code compute the mean of temperature monthly from 1980 to 2020 in ERA5 dataset for a location near Gazost (French Pyrénées). It is based on the Map-Reduce computing model, it instantiates 10 workers of 1 thread each, as giving more thread doesn't accelarate the computation (ERA5 NetCDF files are not already chunked ; open_mfdataset doesn't seem to parallize IO). The Map step consists of openning each file by one worker than compute the mean temperature of a month as an ERA5 file represents a month. The Reduce step consists of concatenating the means into the same dataset object so as to plot them or to compute the mean of means.

  • Connect to HPC cluster
  • Run an interactive job (or a JupyterLab)
qsub -IV -l mem=2gb -l vmem=2gb -l nodes=1:ppn=1
  • Load a Python module and open a Python interpreter
# Or any other module that contains the required libraries.
module load python/meso-3.9
ipython
  • Run the following script
from dask.distributed import Client
import dask_jobqueue as djq
import numpy as np
import time
import os
from os import path

### Cluster configuration
DASK_DIRECTORY_PATH   = path.join(os.getcwd(), 'tmp_dask_workspace')

### Dask worker configuration
# Total number of workers availables for distributed computation.
NB_WORKERS            = 10
# In Gio (power of 2 ; not Go (power of 10) ; For Dask Gio == GiB != GB == Go).
WORKER_MEM            = 2
# Number of threads per worker availables for parallel computation.
# Set more than 1 if your are sure to take advantage of parallelization.
NB_THREADS_PER_WORKER = 1

### Cluster job configuration
# Must be consistent with NB_WORKERS: NB_WORKERS = NB_WORKERS_PER_JOB * nb_jobs
# where nb_jobs is an integer.
NB_WORKERS_PER_JOB   = 1
JOB_WALLTIME         = '00:30:00'
# Specify the network interface name that jobs use to communicate together (run ip a).
NET_INTERFACE        = 'ib0'
# Specify where to write worker log files.
WORKER_LOG_DIRECTORY = path.join(DASK_DIRECTORY_PATH, 'logs')
TMP_WORKER_DIRECTORY = path.join(DASK_DIRECTORY_PATH, 'tmp')

### Inferred worker configuration
# Insure worker specifications.
# Dask Jobqueue 0.7.3 is unable to handle this correctly...
worker_extra_opts = [f'--nthreads={NB_THREADS_PER_WORKER}',
                     f'--nprocs={NB_WORKERS_PER_JOB}',
                     f'--memory-limit={WORKER_MEM}GiB']
### Inferred job configuration
# Number of cluster job requested.
nb_jobs          = int(np.ceil(NB_WORKERS/NB_WORKERS_PER_JOB))
# Number of CPU cores per job.
nb_cores_per_job = NB_THREADS_PER_WORKER * NB_WORKERS_PER_JOB
# Quantity of memory for a job.
job_mem          = NB_WORKERS_PER_JOB * WORKER_MEM
job_vmem         = job_mem
# Specific to Ciclad and Climserv.
# Unnecessary for Spirit.
job_extra_opts   = (f'-l mem={job_mem}gb', f'-l vmem={job_vmem}gb',
                    f'-l nodes=1:ppn={nb_cores_per_job}')

cluster = djq.PBSCluster(
                      cores=nb_cores_per_job, processes=NB_WORKERS_PER_JOB,
                      memory=f'{job_mem}GiB', interface=NET_INTERFACE,
                      walltime=JOB_WALLTIME, log_directory=WORKER_LOG_DIRECTORY,
                      local_directory=TMP_WORKER_DIRECTORY,
                      extra=worker_extra_opts, job_extra=job_extra_opts)
print(f'> The dashboard link: {cluster.dashboard_link}')

# Better control when scaling on jobs instead of workers.
cluster.scale(jobs=nb_jobs)
client = Client(cluster)

# This instruction blocks until the number of ready workers reaches
# the specified value.
print(f'> Waiting for one worker at least')
client.wait_for_workers(n_workers=1)

import xarray as xr
from datetime import datetime

MONTHS   = [month for month in range(1, 12)]
YEARS = [year for year in range(1980, 2020)]
ERA5_FILE_PATH_PATTERN = '/bdd/ERA5/NETCDF/GLOBAL_025/4xdaily/AN_PL/{year}/ta.{year}{month2d}.aphe5.GLOBAL_025.nc'
TIME_VARIABLE_NAME = 'time'

def compute_month_temperature(year:int, month: int, lat: float, lon: float, level:int)-> xr.Dataset:
    netcdf_file_path = ERA5_FILE_PATH_PATTERN.format(year=year, month2d=f"{month:02d}")
    dataset = xr.open_dataset(netcdf_file_path, lock=False)
    extracted_point = dataset.sel(latitude=lat, longitude=lon, level=level)
    # The time dimension is lost.
    result = extracted_point.mean()
    # Reconstruct the time dimension.
    date = datetime(year, month, 1)
    result = result.assign_coords({TIME_VARIABLE_NAME: date})
    dataset.close()
    return result

# Near Gazost.
lat = 43
lon = 0
level = 1000

start = time.time()
# The Map step.
futures = list()
for year in YEARS:
    for month in MONTHS:
        future = client.submit(compute_month_temperature, year, month, lat, lon, level)
        futures.append(future)

print('> start gathering results')
# Results are gathered in a random order!
# This call blocks until all the results are gathered.
results = client.gather(futures)
# The Reduce step. It must involves commutative and associative instructions!
temperatures = xr.concat(results, dim=TIME_VARIABLE_NAME).sortby(TIME_VARIABLE_NAME)
print(f'> temperatures: {temperatures}')
print(f'> mean of temperature means is: {temperatures.mean()}')
print(f'> computation time: {time.time() - start}')

When it is over, don't forget to shutdown Dask cluster:

client.shutdown()
client.close()
cluster.close()