Skip to content

Dask Jobqueue detailed version

Overview

This tutorial describes the following steps to instantiate Dask workers on the IPSL's computer clusters. Each step is developed in its own section.

  1. Dask worker configuration
  2. Job instantiation
  3. Monitoring
  4. Computations
  5. Shutdown

Warning

The tutorial is Dask worker oriented as opposed to the simplified version which is Slurm job oriented.

Dask worker configuration

In this section, we will see how to configure workers that will be instantiated within Slurm jobs using Dask-Jobqueue. From an interactive job, batch job or a Jupyter Notebook, which we shall call the main Slurm job:

  • Load the necessary libraries (installed in a Python module or in your own Python environment)
from dask.distributed import Client
import dask_jobqueue as djq
import numpy as np
import time
import os
from os import path
  • Define the cluster/job factory specifications
### Cluster configuration
# Specify if the cluster adapts the number of workers.
# Read adapt method documentation.
# Should be False for performance testing / configuration optimization <=> determinism.
# Should be True for production, as it play nicely on HPC cluster.
IS_CLUSTER_ADAPTIVE = False
DASK_DIRECTORY_PATH = path.join(os.getcwd(), 'tmp_dask_workspace')
DASK_DASHBOARD_PORT = 9999
  • Define the Dask worker specifications
### Dask worker configuration
# Total number of workers availables for distributed computation.
NB_DASK_WORKERS       = 1
if IS_CLUSTER_ADAPTIVE:
    # The minimum number of worker available in the Dask cluster.
    NB_MIN_WORKERS    = 1
else:
    NB_MIN_WORKERS    = 'not applied'
# Expressed in Gio unit (power of 2 ; not Go (power of 10) ; For Dask Gio == GiB != GB == Go).
DASK_WORKER_MEM       = 1
# Number of threads per worker availables for parallel computation.
# Set more than 1 if your are ** very sure** to take advantage of parallelization (!= distribution).
# Not all the vector computations in Numpy take advantage of parallelization!
NB_THREADS_PER_DASK_WORKER = 1
  • Define the Slurm job specifications
### Slurm job configuration
# Must be consistent with NB_DASK_WORKERS: NB_DASK_WORKERS = NB_DASK_WORKERS_PER_JOB * nb_jobs
# where nb_jobs is an integer.
NB_DASK_WORKERS_PER_JOB   = 1
JOB_WALLTIME              = '00:30:00'
JOB_PARTITION             = 'zen4' # Or zen16 depending of the ratio RAM/number of core.
# Specify the network interface name that jobs use to communicate together (run ip a).
#NET_INTERFACE            = 'ib0' disable!
# Specify where to write worker log files.
DASK_WORKER_LOG_DIRECTORY = path.join(DASK_DIRECTORY_PATH, 'logs')
TMP_DASK_WORKER_DIRECTORY = path.join(DASK_DIRECTORY_PATH, 'tmp')
  • Infer the rest of the specifications
### Inferred worker configuration
# Insure worker specifications.
# Specifies the number of thread per worker.
worker_extra_args = [f'--nthreads={NB_THREADS_PER_DASK_WORKER}']
### Inferred Slurm job configuration
# Number of cluster job requested.
nb_jobs          = int(np.ceil(NB_DASK_WORKERS/NB_DASK_WORKERS_PER_JOB))
# Number of CPU cores per Slurm job.
nb_cpu_cores_per_job = NB_THREADS_PER_DASK_WORKER * NB_DASK_WORKERS_PER_JOB
# Quantity of memory per Slurm job.
total_worker_mem_needed_per_job = NB_DASK_WORKERS_PER_JOB * DASK_WORKER_MEM

# We compute the quantity of RAM per job and so recompute the RAM per worker!
match JOB_PARTITION:
    case 'zen4':
        ram_cpu_ratio = 4
    case 'zen16':
        ram_cpu_ratio = 16
    case _:
        raise ValueError(f'unsupported partition {JOB_PARTITION}')

while total_worker_mem_needed_per_job > nb_cpu_cores_per_job * ram_cpu_ratio:
    nb_cpu_cores_per_job = nb_cpu_cores_per_job + 1

ram_per_job = nb_cpu_cores_per_job * ram_cpu_ratio

Warning

DASK_WORKER_MEM, NB_THREADS_PER_DASK_WORKER and JOB_PARTITION are keys for the computation of the memory and the number of CPU cores of the Slurm jobs. It may be necessary to increment the number of CPU cores of the Slurm jobs if the requested amount of memory per Dask worker is higher than nb_cpu_cores_per_job * ram_cpu_ratio.

  • Pretty print the configuration
print(
f'''
> Configuration:

# Dask cluster configuration
- cluster adaptive: {IS_CLUSTER_ADAPTIVE}
- Dask dashboard port: {DASK_DASHBOARD_PORT}

# Dask Worker configuration
- nb workers: {NB_DASK_WORKERS}
- min workers: {NB_MIN_WORKERS}
- memory per worker: {DASK_WORKER_MEM if DASK_WORKER_MEM >= ram_per_job/NB_DASK_WORKERS_PER_JOB else ram_per_job/NB_DASK_WORKERS_PER_JOB} Gio
- nb threads per worker: {NB_THREADS_PER_DASK_WORKER}

# Slurm job configuration
- nb jobs: {nb_jobs}
- partition: {JOB_PARTITION}
- nb cores per job: {nb_cpu_cores_per_job}
- ram per job: {ram_per_job}
- nb workers per job: {NB_DASK_WORKERS_PER_JOB}
- job walltime: {JOB_WALLTIME}
- log directory path: {DASK_WORKER_LOG_DIRECTORY}
''')

Start the Dask cluster

cluster = djq.SLURMCluster(cores=nb_cpu_cores_per_job, processes=NB_DASK_WORKERS_PER_JOB,
                           queue=JOB_PARTITION, memory=f'{ram_per_job}GiB',
                           # interface=NET_INTERFACE, disable!
                           walltime=JOB_WALLTIME, log_directory=DASK_WORKER_LOG_DIRECTORY,
                           local_directory=TMP_DASK_WORKER_DIRECTORY,
                           scheduler_options={'dashboard_address': f':{DASK_DASHBOARD_PORT}'},
                           job_directives_skip=['--mem'], # skip mem specificaton for spirit and jean zay!
                           worker_extra_args=worker_extra_args)
print(f'> The dashboard link: {cluster.dashboard_link}')

The Slurm job factory enable a web server that provides a useful dashboard interface. This dashboard displays the task computation graph, the workers memory and CPU occupations, etc. The web server opens a network port, which is set to DASK_DASHBOARD_PORT. If the port is already binded, Dask-Jobqueue opens another port and outputs its actual value.

Warning

Don't forget to check the port value as you won't be able to connect to the dashboard without the right port value (cluster.dashboard_link).

Job instantiation

From the main Slurm job:

Info

The scale and adapt methods trigger the submission of the Slurm job requests to the cluster scheduler. The computation starts when at least one worker, and so one cluster job, is ready.

Tips

As matter of debug, you can output the request that is executed to submit a job to the scheduler of the cluster. print(cluster.job_script())

Monitoring

Command line

Execute this following shell line so as to monitor your Slurm job every 2 seconds (states, runing time, etc.).

watch 'slqueue --me'

Dashboard

Dask-Jobqueue comes with a dashboard that provides a meaningful interface to monitor the workers. The dashboard is enable after the instantiation of the Slurm job factory, and its URL can be displayed with this instruction:

print(f'the dashboard link: {cluster.dashboard_link}')

Pasting the URL on your web browser displays the dashboard provided your computer is connected to the IPSL network (at the office or elsewhere but through the IPSL VPN).

Tip

If you cannot get access to the dashboard, you can try SSL tunneling method which is described here.

Computations

The Python instructions right after the worker instantiations are executed in the Dask distributed computing contexte, provided at least one worker is ready. If these instructions are Dask computations involving chunks of data, delayed or submitted function calls, the computations are distributed and their results are merged back to the main job. The latter step is called reduce or aggregation.

Warning

Make sure that the result of your computation can fit into the memory of the main Slurm job!

Shutdown

The following instructions shutdown Dask infrastructure and terminate the Slurm jobs. Don't forget to execute them after your computations.

client.shutdown()
client.close()
cluster.close()

It is highly recommended to use the with statement in batch jobs:

with djq.SLURMCluster(cores=nb_cpu_cores_per_job, processes=NB_DASK_WORKERS_PER_JOB,
                      queue=JOB_PARTITION, memory=f'{ram_per_job}GiB',
                      # interface=NET_INTERFACE, disable!
                      walltime=JOB_WALLTIME, log_directory=DASK_WORKER_LOG_DIRECTORY,
                      local_directory=TMP_DASK_WORKER_DIRECTORY,
                      scheduler_options={'dashboard_address': f':{DASK_DASHBOARD_PORT}'},
                      job_directives_skip=['--mem'], # skip mem specificaton for spirit and jean zay!
                      worker_extra_args=worker_extra_args) as cluster, Client(cluster) as client:
    if IS_CLUSTER_ADAPTIVE:
        cluster.adapt(minimum=NB_MIN_WORKERS, maximum=NB_DASK_WORKERS)
    else:
        # Better control when scaling on jobs instead of workers.
        cluster.scale(jobs=nb_jobs)

    # This instruction blocks until the number of ready workers reaches
    # the specified value. For the sake of the distributed computing, you should
    # make the main job to wait at least one worker.
    print(f'> Waiting for one worker at least')
    client.wait_for_workers(n_workers=1)
    # ...

Info

You may restart The Dask cluster and keep your allocated Slurm jobs with the following instruction: client.restart().

Info

Exiting a Python/iPython interpreter, batch job or interactive job that runs the main Slurm job will terminate the instantiated Slurm jobs.

Warning

Aborting a Python/iPython interpreter, batch job or interactive job that runs the main Slurm job doesn't abort the instantiated Slurm jobs! Don't forget to abort them too!

Vector computation example

This example loads about 18.6 GiB of random data. As each Slurm job memory cannot exceed 8 GiB, this example demonstrates that the data array is chunked and distributed across the 3 Slurm jobs by Dask. You can see the distribution on the dashboard (tab 'workers', column 'managed').

  • Connect to HPC cluster
  • Run an interactive Slurm job (or a JupyterLab)

srun --pty bash
* Load a Python module and open a Python interpreter

# e.g. pangeo-meso or any other module that contains the required libraries.
module load pangeo-meso/2024.01.22
ipython
* Run the following script

from dask.distributed import Client
import dask_jobqueue as djq
import numpy as np
import time
import os
from os import path

### Cluster configuration
DASK_DIRECTORY_PATH = path.join(os.getcwd(), 'tmp_dask_workspace')
DASK_DASHBOARD_PORT = 9999

### Dask worker configuration
# Total number of workers availables for distributed computation.
NB_DASK_WORKERS     = 3
# In Gio (power of 2 ; not Go (power of 10) ; For Dask Gio == GiB != GB == Go).
DASK_WORKER_MEM     = 8
# Number of threads per worker availables for parallel computation.
# Set more than 1 if your are sure to take advantage of parallelization.
NB_THREADS_PER_DASK_WORKER = 1

### Slurm job configuration
# Must be consistent with NB_DASK_WORKERS: NB_DASK_WORKERS = NB_DASK_WORKERS_PER_JOB * nb_jobs
# where nb_jobs is an integer.
NB_DASK_WORKERS_PER_JOB = 1
JOB_WALLTIME            = '00:30:00'
JOB_PARTITION           = 'zen4' # Or zen16
# Specify the network interface name that jobs use to communicate together (run ip a).
#NET_INTERFACE        = 'ib0' disable!
# Specify where to write worker log files.
DASK_WORKER_LOG_DIRECTORY = path.join(DASK_DIRECTORY_PATH, 'logs')
TMP_DASK_WORKER_DIRECTORY = path.join(DASK_DIRECTORY_PATH, 'tmp')

### Inferred worker configuration
# Insure worker specifications.
# Specifies the number of thread per worker.
worker_extra_args = [f'--nthreads={NB_THREADS_PER_DASK_WORKER}']
### Inferred Slurm job configuration
# Number of Slurm job requested.
nb_jobs          = int(np.ceil(NB_DASK_WORKERS/NB_DASK_WORKERS_PER_JOB))
# Number of CPU cores per Slurm job.
nb_cpu_cores_per_job = NB_THREADS_PER_DASK_WORKER * NB_DASK_WORKERS_PER_JOB
# Quantity of memory per Slurm job.
total_worker_mem_needed_per_job = NB_DASK_WORKERS_PER_JOB * DASK_WORKER_MEM

match JOB_PARTITION:
    case 'zen4':
        ram_cpu_ratio = 4
    case 'zen16':
        ram_cpu_ratio = 16
    case _:
        raise ValueError(f'unsupported partition {JOB_PARTITION}')

while total_worker_mem_needed_per_job > nb_cpu_cores_per_job * ram_cpu_ratio:
    nb_cpu_cores_per_job = nb_cpu_cores_per_job + 1

ram_per_job = nb_cpu_cores_per_job * ram_cpu_ratio

cluster = djq.SLURMCluster(
                      cores=nb_cpu_cores_per_job, processes=NB_DASK_WORKERS_PER_JOB,
                      queue=JOB_PARTITION, memory=f'{ram_per_job}GiB',
                      # interface=NET_INTERFACE, disable!
                      walltime=JOB_WALLTIME, log_directory=DASK_WORKER_LOG_DIRECTORY,
                      local_directory=TMP_DASK_WORKER_DIRECTORY,
                      scheduler_options={'dashboard_address': f':{DASK_DASHBOARD_PORT}'},
                      job_directives_skip=['--mem'], # skip mem specificaton for spirit and jean zay!
                      worker_extra_args=worker_extra_args)
print(f'> The dashboard link: {cluster.dashboard_link}')

# Better control when scaling on jobs instead of workers.
cluster.scale(jobs=nb_jobs)
client = Client(cluster)

# It is better to wait all the workers to be ready
# as the random data doesn't fit into one worker memory
# (it will lead to spilling/swapping data on disk).
print(f'> Waiting for {NB_DASK_WORKERS} worker(s)')
# This instruction blocks until all workers are ready.
client.wait_for_workers(n_workers=NB_DASK_WORKERS)

# For this example only.
import dask.array as da
from datetime import datetime
import time

############ DISTRIBUTED COMPUTING ZONE ############
x = da.random.random((5e4, 5e4), chunks=(1000, 1000))
print(f'size in Gib: {x.nbytes/(1024*1024*1024)}')
y = x + x.T
y.persist()
####################################################

When the computations end, don't forget to shutdown Dask:

client.shutdown()
client.close()
cluster.close()

Map-Reduce example

The following code compute the mean of temperature monthly from 1980 to 1990 in ERA5 dataset for a location near Gazost (French Pyrénées). It is based on the Map-Reduce computing model, it instantiates 10 workers of 1 thread each, as giving more thread doesn't accelarate the computation (ERA5 NetCDF files are not already chunked ; open_mfdataset doesn't seem to parallize IO). The Map step consists of openning each file by one worker than compute the mean temperature of a month as an ERA5 file represents a month. The Reduce step consists of concatenating the means into the same dataset object so as to plot them or to compute the mean of means.

  • Connect to HPC cluster
  • Run an interactive Slurm job (or JupyterLab)

srun --pty bash
* Load a Python module and open a Python interpreter

# e.g. pangeo-meso or any other module that contains the required libraries.
module load pangeo-meso/2024.01.22
ipython
* Run the following script

from dask.distributed import Client
import dask_jobqueue as djq
import numpy as np
import time
import os
from os import path

### Cluster configuration
DASK_DIRECTORY_PATH = path.join(os.getcwd(), 'tmp_dask_workspace')
DASK_DASHBOARD_PORT = 9999

### Dask worker configuration
# Total number of workers availables for distributed computation.
NB_DASK_WORKERS     = 10
# In Gio (power of 2 ; not Go (power of 10) ; For Dask Gio == GiB != GB == Go).
DASK_WORKER_MEM     = 2
# Number of threads per worker availables for parallel computation.
# Set more than 1 if your are sure to take advantage of parallelization.
NB_THREADS_PER_DASK_WORKER = 1

### Slurm job configuration
# Must be consistent with NB_DASK_WORKERS: NB_DASK_WORKERS = NB_DASK_WORKERS_PER_JOB * nb_jobs
# where nb_jobs is an integer.
NB_DASK_WORKERS_PER_JOB = 1
JOB_WALLTIME            = '00:30:00'
JOB_PARTITION           = 'zen4' # Or zen16
# Specify the network interface name that Slurm jobs use to communicate together (run ip a).
#NET_INTERFACE        = 'ib0' disable!
# Specify where to write worker log files.
DASK_WORKER_LOG_DIRECTORY = path.join(DASK_DIRECTORY_PATH, 'logs')
TMP_DASK_WORKER_DIRECTORY = path.join(DASK_DIRECTORY_PATH, 'tmp')

### Infered worker configuration
# Specifies the number of thread per worker.
worker_extra_args = [f'--nthreads={NB_THREADS_PER_DASK_WORKER}']
### Infered Slurm job configuration
# Number of Slurm job requested.
nb_jobs           = int(np.ceil(NB_DASK_WORKERS/NB_DASK_WORKERS_PER_JOB))
# Number of CPU cores per Slurm job.
nb_cpu_cores_per_job = NB_THREADS_PER_DASK_WORKER * NB_DASK_WORKERS_PER_JOB
# Quantity of memory per Slurm job.
total_worker_mem_needed_per_job = NB_DASK_WORKERS_PER_JOB * DASK_WORKER_MEM

match JOB_PARTITION:
    case 'zen4':
        ram_cpu_ratio = 4
    case 'zen16':
        ram_cpu_ratio = 16
    case _:
        raise ValueError(f'unsupported partition {JOB_PARTITION}')

while total_worker_mem_needed_per_job > nb_cpu_cores_per_job * ram_cpu_ratio:
    nb_cpu_cores_per_job = nb_cpu_cores_per_job + 1

ram_per_job = nb_cpu_cores_per_job * ram_cpu_ratio

cluster = djq.SLURMCluster(
                      cores=nb_cpu_cores_per_job, processes=NB_DASK_WORKERS_PER_JOB,
                      queue=JOB_PARTITION, memory=f'{ram_per_job}GiB',
                      # interface=NET_INTERFACE, disable!
                      walltime=JOB_WALLTIME, log_directory=DASK_WORKER_LOG_DIRECTORY,
                      local_directory=TMP_DASK_WORKER_DIRECTORY,
                      scheduler_options={'dashboard_address': f':{DASK_DASHBOARD_PORT}'},
                      job_directives_skip=['--mem'], # skip mem specificaton for spirit and jean zay!
                      worker_extra_args=worker_extra_args)
print(f'> The dashboard link: {cluster.dashboard_link}')

# Better control when scaling on Slurm jobs instead of workers.
cluster.scale(jobs=nb_jobs)
client = Client(cluster)

# This instruction blocks until the number of ready workers reaches
# the specified value.
print(f'> Waiting for one worker at least')
client.wait_for_workers(n_workers=1)

import xarray as xr
from datetime import datetime

MONTHS   = [month for month in range(1, 12)]
YEARS = [year for year in range(1980, 1990)]
ERA5_FILE_PATH_PATTERN = '/bdd/ERA5/NETCDF/GLOBAL_025/4xdaily/AN_PL/{year}/ta.{year}{month2d}.aphe5.GLOBAL_025.nc'
TIME_VARIABLE_NAME = 'time'

def compute_month_temperature(year:int, month: int, lat: float, lon: float, level:int)-> xr.Dataset:
    netcdf_file_path = ERA5_FILE_PATH_PATTERN.format(year=year, month2d=f"{month:02d}")
    dataset = xr.open_dataset(netcdf_file_path, lock=False)
    extracted_point = dataset.sel(latitude=lat, longitude=lon, level=level)
    # The time dimension is lost.
    result = extracted_point.mean()
    # Reconstruct the time dimension.
    date = datetime(year, month, 1)
    result = result.assign_coords({TIME_VARIABLE_NAME: date})
    dataset.close()
    return result

# Near Gazost.
lat = 43
lon = 0
level = 1000

start = time.time()
# The Map step.
futures = list()
for year in YEARS:
    for month in MONTHS:
        future = client.submit(compute_month_temperature, year, month, lat, lon, level)
        futures.append(future)

print('> waiting for results')
# Results are gathered in a random order!
# This call blocks until all the results are gathered.
results = client.gather(futures)
# The Reduce step. It must involves commutative and associative instructions!
temperatures = xr.concat(results, dim=TIME_VARIABLE_NAME).sortby(TIME_VARIABLE_NAME)
print(f'> temperatures: {temperatures}')
print(f'> mean of temperature means is: {temperatures.mean()}')
print(f'> computation time: {time.time() - start}')

When it is over, don't forget to shutdown Dask cluster:

client.shutdown()
client.close()
cluster.close()