Skip to content

Dask Jobqueue simplified

Overview

This tutorial describes the following steps to instantiate Dask workers on the IPSL's computer clusters. Each step is developed in its own section.

  1. Dask worker configuration
  2. Job instantiation
  3. Monitoring
  4. Computations
  5. Shutdown

Warning

The tutorial is Slurm job oriented and 1 job equals 1 dask worker, as opposed to the detailed version which is Dask worker oriented.

Dask cluster configuration

In this section, we will see how to configure Slurm jobs using Dask-Jobqueue. From an interactive job, batch job or a Jupyter Notebook, which we shall call the main Slurm job:

  • Load the necessary libraries (installed in a Python module or in your own Python environment)
from dask.distributed import Client
import dask_jobqueue as djq
import numpy as np
import time
import os
from os import path
  • Define the Slurm job factory specifications
### Cluster configuration
DASK_DIRECTORY_PATH  = path.join(os.getcwd(), 'tmp_dask_workspace')
DASK_DASHBOARD_PORT  = 9999
  • Define the Slurm job specifications
### Slurm job specifications
# Total number of slurm jobs.
NB_SLURM_JOB         = 1
# Number of CPU cores for each Slurm job.
NB_CPU_CORES_PER_JOB = 1
JOB_PARTITION        = 'zen4'
JOB_WALLTIME         = '00:30:00'
# Specify where to write worker log files.
DASK_WORKER_LOG_DIRECTORY = path.join(DASK_DIRECTORY_PATH, 'logs')
TMP_DASK_WORKER_DIRECTORY = path.join(DASK_DIRECTORY_PATH, 'tmp')

Tip

For zen4: RAM per Slurm job/Dask worker = NB_CPU_CORES_PER_JOB * 4 GiB. For zen16: RAM per Slurm job/Dask worker = NB_CPU_CORES_PER_JOB * 16 GiB

  • Pretty print the configuration
print(
f'''
> Configuration:

# Dask cluster configuration
- Dask dashboard port: {DASK_DASHBOARD_PORT}

# Slurm job configuration
- nb jobs: {NB_SLURM_JOB}
- nb cores per job: {NB_CPU_CORES_PER_JOB}
- ram per job: {ram_per_job}
- partition: {JOB_PARTITION}
- job walltime: {JOB_WALLTIME}
- log directory path: {DASK_WORKER_LOG_DIRECTORY}
''')

Start the Dask cluster

match JOB_PARTITION:
    case 'zen4':
        ram_per_job = 4 * NB_CPU_CORES_PER_JOB
    case 'zen16':
        ram_per_job = 16 * NB_CPU_CORES_PER_JOB
    case _:
        raise ValueError(f'unsupported partition {JOB_PARTITION}')


cluster = djq.SLURMCluster(cores=NB_CPU_CORES_PER_JOB, processes=1,
                           queue=JOB_PARTITION, memory=f'{ram_per_job}GiB',
                           walltime=JOB_WALLTIME, log_directory=DASK_WORKER_LOG_DIRECTORY,
                           local_directory=TMP_DASK_WORKER_DIRECTORY,
                           job_directives_skip=['--mem'], # skip mem specificaton for spirit and jean zay!
                           scheduler_options={'dashboard_address': f':{DASK_DASHBOARD_PORT}'})
print(f'> The dashboard link: {cluster.dashboard_link}')

The Slurm job factory enable a web server that provides a useful dashboard interface. This dashboard displays the task computation graph, the workers memory and CPU occupations, etc. The web server opens a network port, which value is set to DASK_DASHBOARD_PORT. If the port is already binded, Dask-Jobqueue opens another port and outputs its actual value.

Warning

Don't forget to check the port value as you won't be able to connect to the dashboard without the right port value (cluster.dashboard_link).

Job instantiation

From the main Slurm job:

# Better control when scaling on jobs instead of workers.
cluster.scale(jobs=NB_SLURM_JOB)

client = Client(cluster)

# This instruction blocks until the number of ready workers reaches
# the specified value. For the sake of the distributed computing, you should
# make the main Slurm job to wait at least one worker.
print(f'> Waiting for one worker at least')
client.wait_for_workers(n_workers=1)

Tips

As matter of debug, you can output the request that is executed to submit a job to the scheduler of the cluster: print(cluster.job_script())

Monitoring

Command line

Execute this following shell line so as to monitor your Slurm job every 2 seconds (states, runing time, etc.).

watch 'slqueue --me'

Dashboard

Dask-Jobqueue comes with a dashboard that provides a meaningful interface to monitor the workers. The dashboard is enable after the instantiation of the Slurm job factory, and its URL can be displayed with this instruction:

print(f'the dashboard link: {cluster.dashboard_link}')

Pasting the URL on your web browser displays the dashboard provided your computer is connected to the IPSL network (at the office or elsewhere but through the IPSL VPN).

Tip

If you cannot get access to the dashboard, you can try SSL tunneling method which is described here.

Computations

The Python instructions right after the worker instantiations are executed in the Dask distributed computing contexte, provided at least one worker is ready. If these instructions are Dask computations involving chunks of data, delayed or submitted function calls, the computations are distributed and their results are merged back to the main Slurm job. The latter step is called reduce or aggregation.

Warning

Make sure that the result of your computation can fit into the memory of the main Slurm job!

Shutdown

The following instructions shutdown Dask infrastructure and terminate the Slurm jobs. Don't forget to execute them after your computations.

client.shutdown()
client.close()
cluster.close()

It is highly recommended to use the with statement in batch jobs:

with djq.SLURMCluster(cores=NB_CPU_CORES_PER_JOB, processes=1,
                      queue=JOB_PARTITION, memory=f'{ram_per_job}GiB',
                      walltime=JOB_WALLTIME, log_directory=DASK_WORKER_LOG_DIRECTORY,
                      local_directory=TMP_DASK_WORKER_DIRECTORY,
                      job_directives_skip=['--mem'], # skip mem specificaton for spirit and jean zay!
                      scheduler_options={'dashboard_address': f':{DASK_DASHBOARD_PORT}'}) as cluster, Client(cluster) as client:
    # Better control when scaling on jobs instead of workers.
    cluster.scale(jobs=NB_SLURM_JOB)

    # This instruction blocks until the number of ready workers reaches
    # the specified value. For the sake of the distributed computing, you should
    # make the main job to wait at least one worker.
    print(f'> Waiting for one worker at least')
    client.wait_for_workers(n_workers=1)
    # ...

Info

You may restart The Dask cluster and keep your allocated Slurm jobs with the following instruction: client.restart().

Info

Exiting a Python/iPython interpreter, batch job or interactive job that runs the main Slurm job will terminate the instantiated Slurm jobs.

Warning

Aborting a Python/iPython interpreter, batch job or interactive job that runs the main job doesn't abort the instantiated Slurm jobs! Don't forget to abort them too!

Vector computation example

This example loads about 18.6 GiB of random data. As each Slurm job memory cannot exceed 4 GiB, this example demonstrates that the data array is chunked and distributed across the 5 Slurm jobs by Dask. You can see the distribution on the dashboard (tab 'workers', column 'managed').

  • Connect to HPC cluster
  • Run an interactive Slurm job (or a JupyterLab)

srun --pty bash
* Load a Python module and open a Python interpreter

# e.g. pangeo-meso or any other module that contains the required libraries.
module load pangeo-meso/2024.01.22
ipython
* Run the following script

from dask.distributed import Client
import dask_jobqueue as djq
import numpy as np
import time
import os
from os import path

### Cluster configuration
DASK_DIRECTORY_PATH  = path.join(os.getcwd(), 'tmp_dask_workspace')
DASK_DASHBOARD_PORT  = 9999

# Total number of slurm jobs.
NB_SLURM_JOB         = 5
# Number of CPU cores for each Slurm job.
NB_CPU_CORES_PER_JOB = 1
JOB_PARTITION        = 'zen4'
JOB_WALLTIME         = '00:30:00'
# Specify where to write worker log files.
DASK_WORKER_LOG_DIRECTORY = path.join(DASK_DIRECTORY_PATH, 'logs')
TMP_DASK_WORKER_DIRECTORY = path.join(DASK_DIRECTORY_PATH, 'tmp')

match JOB_PARTITION:
    case 'zen4':
        ram_per_job = 4 * NB_CPU_CORES_PER_JOB
    case 'zen16':
        ram_per_job = 16 * NB_CPU_CORES_PER_JOB
    case _:
        raise ValueError(f'unsupported partition {JOB_PARTITION}')

cluster = djq.SLURMCluster(
                      cores=NB_CPU_CORES_PER_JOB, processes=1,
                      queue=JOB_PARTITION, memory=f'{ram_per_job}GiB',
                      walltime=JOB_WALLTIME, log_directory=DASK_WORKER_LOG_DIRECTORY,
                      local_directory=TMP_DASK_WORKER_DIRECTORY,
                      job_directives_skip=['--mem'], # skip mem specificaton for spirit and jean zay!
                      scheduler_options={'dashboard_address': f':{DASK_DASHBOARD_PORT}'})
print(f'> The dashboard link: {cluster.dashboard_link}')

# Better control when scaling on jobs instead of workers.
cluster.scale(jobs=NB_SLURM_JOB)
client = Client(cluster)

# It is better to wait all the workers to be ready
# as the random data doesn't fit into one worker memory
# (it will lead to spilling/swapping data on disk).
print(f'> Waiting for {NB_SLURM_JOB} worker(s)')
# This instruction blocks until all workers are ready.
client.wait_for_workers(n_workers=NB_SLURM_JOB)

# For this example only.
import dask.array as da
from datetime import datetime
import time

############ DISTRIBUTED COMPUTING ZONE ############
x = da.random.random((5e4, 5e4), chunks=(1000, 1000))
print(f'size in Gib: {x.nbytes/(1024*1024*1024)}')
y = x + x.T
y.persist()
####################################################

When the computations end, don't forget to shutdown Dask:

client.shutdown()
client.close()
cluster.close()

Map-Reduce example

The following code compute the mean of temperature monthly from 1980 to 1990 in ERA5 dataset for a location near Gazost (French Pyrénées). It is based on the Map-Reduce computing model, it instantiates 5 workers/jobs of 1 thread each, as giving more thread doesn't accelarate the computation (ERA5 NetCDF files are not already chunked ; open_mfdataset doesn't seem to parallize IO). The Map step consists of openning each file by one worker than compute the mean temperature of a month as an ERA5 file represents a month. The Reduce step consists of concatenating the means into the same dataset object so as to plot them or to compute the mean of means.

  • Connect to HPC cluster
  • Run an interactive Slurm job (or JupyterLab)

srun --pty bash
* Load a Python module and open a Python interpreter

# e.g. pangeo-meso or any other module that contains the required libraries.
module load pangeo-meso/2024.01.22
ipython
* Run the following script

from dask.distributed import Client
import dask_jobqueue as djq
import numpy as np
import time
import os
from os import path

### Cluster configuration
DASK_DIRECTORY_PATH  = path.join(os.getcwd(), 'tmp_dask_workspace')
DASK_DASHBOARD_PORT  = 9999

# Total number of slurm jobs.
NB_SLURM_JOB         = 5
# Number of CPU cores for each Slurm job.
NB_CPU_CORES_PER_JOB = 1
JOB_PARTITION        = 'zen4'
JOB_WALLTIME         = '00:30:00'
# Specify where to write worker log files.
DASK_WORKER_LOG_DIRECTORY = path.join(DASK_DIRECTORY_PATH, 'logs')
TMP_DASK_WORKER_DIRECTORY = path.join(DASK_DIRECTORY_PATH, 'tmp')

match JOB_PARTITION:
    case 'zen4':
        ram_per_job = 4 * NB_CPU_CORES_PER_JOB
    case 'zen16':
        ram_per_job = 16 * NB_CPU_CORES_PER_JOB
    case _:
        raise ValueError(f'unsupported partition {JOB_PARTITION}')

cluster = djq.SLURMCluster(
                      cores=NB_CPU_CORES_PER_JOB, processes=1,
                      queue=JOB_PARTITION, memory=f'{ram_per_job}GiB',
                      walltime=JOB_WALLTIME, log_directory=DASK_WORKER_LOG_DIRECTORY,
                      local_directory=TMP_DASK_WORKER_DIRECTORY,
                      job_directives_skip=['--mem'], # skip mem specificaton for spirit and jean zay!
                      scheduler_options={'dashboard_address': f':{DASK_DASHBOARD_PORT}'})
print(f'> The dashboard link: {cluster.dashboard_link}')

# Better control when scaling on Slurm jobs instead of workers.
cluster.scale(jobs=NB_SLURM_JOB)
client = Client(cluster)

# This instruction blocks until the number of ready workers reaches
# the specified value.
print(f'> Waiting for one worker at least')
client.wait_for_workers(n_workers=1)

import xarray as xr
from datetime import datetime

MONTHS   = [month for month in range(1, 12)]
YEARS = [year for year in range(1980, 1990)]
ERA5_FILE_PATH_PATTERN = '/bdd/ERA5/NETCDF/GLOBAL_025/4xdaily/AN_PL/{year}/ta.{year}{month2d}.aphe5.GLOBAL_025.nc'
TIME_VARIABLE_NAME = 'time'

def compute_month_temperature(year:int, month: int, lat: float, lon: float, level:int)-> xr.Dataset:
    netcdf_file_path = ERA5_FILE_PATH_PATTERN.format(year=year, month2d=f"{month:02d}")
    dataset = xr.open_dataset(netcdf_file_path, lock=False)
    extracted_point = dataset.sel(latitude=lat, longitude=lon, level=level)
    # The time dimension is lost.
    result = extracted_point.mean()
    # Reconstruct the time dimension.
    date = datetime(year, month, 1)
    result = result.assign_coords({TIME_VARIABLE_NAME: date})
    dataset.close()
    return result

# Near Gazost.
lat = 43
lon = 0
level = 1000

start = time.time()
# The Map step.
futures = list()
for year in YEARS:
    for month in MONTHS:
        future = client.submit(compute_month_temperature, year, month, lat, lon, level)
        futures.append(future)

print('> waiting for results')
# Results are gathered in a random order!
# This call blocks until all the results are gathered.
results = client.gather(futures)
# The Reduce step. It must involves commutative and associative instructions!
temperatures = xr.concat(results, dim=TIME_VARIABLE_NAME).sortby(TIME_VARIABLE_NAME)
print(f'> temperatures: {temperatures}')
print(f'> mean of temperature means is: {temperatures.mean()}')
print(f'> computation time: {time.time() - start}')

When it is over, don't forget to shutdown Dask cluster:

client.shutdown()
client.close()
cluster.close()