Dask is a library for parallel computing in Python. From a programmer’s point of view, it has two components:

  1. Dynamic task scheduler to optimize computation as farm of tasks. Once tasks are formed Dask can pipeline them as a workflow engine/manager and schedule them on available resources

  2. Data structure representations called collections which are conducive to larger than memory computation when dealing with Big Data. When working with these data structures, a Python developer trades in familiar territories as working with Numpy, Pandas or Python futures etc. These collections leverage from the task scheduler to run.

Dask on Shaheen

The following launches a dask-mpi backend and a Jupyter notebook where the dask client can be called to connect to this backend scheduler. The example below show a multicore and multi-node job with each dask worker starting with multiple threads. We also publish dask-dashboard on a separate port to visualize.

note

Note that this example demonstrates the one would scale out dask to obtain more cores and memory than on one node. Please downsize or upsize the number of node appropriate to your case.

Note that this example demonstrates the one would scale out dask to obtain more cores and memory than on one node. Please downsize or upsize the number of node appropriate to your case.

Jobscript

The following jobscript launch Dask with 8 worker ( i.e. 2 workers per node) and each worker with 16 threads

#!/bin/bash -l
#SBATCH --ntasks=8
#SBATCH --ntasks-per-node=2
#SBATCH --cpus-per-task=16
#SBATCH --partition=workq
#SBATCH --time=01:00:00

module load intelpython3
module load dask

export LC_ALL=C.UTF-8
export LANG=C.UTF-8
mkdir workers${SLURM_JOBID}

# get tunneling info 
export XDG_RUNTIME_DIR="" 
node=$(hostname -s) 
user=$(whoami) 
gateway=${EPROXY_LOGIN} 
submit_host=${SLURM_SUBMIT_HOST} 
port=8889
dask_dashboard=9000

srun --hint=nomultithread -n ${SLURM_NTASKS} -c ${SLURM_CPUS_PER_TASK} -N ${SLURM_NNODES} \
dask-mpi --worker-class='distributed.Worker' --nthreads=${SLURM_CPUS_PER_TASK}  \
--memory-limit='57GB' --local-directory=workers${SLURM_JOBID} \
--scheduler-file=scheduler_${SLURM_JOBID}.json \
--interface=ipogif0 --scheduler-port=6192 --dashboard-address=${node}:${dask_dashboard} &

# allow some time for Dask cluster to start in background
sleep 30

echo $node on $gateway pinned to port $port
# print tunneling instructions jupyter-log
echo -e "
To connect to the compute node ${node} on Shaheen running your jupyter notebook server,
you need to run following two commands in a terminal

1. Command to create ssh tunnel from you workstation/laptop to cdlX:
ssh -L ${port}:localhost:${port} -L ${dask_dashboard}:localhost:${dask_dashboard} ${user}@${submit_host}.hpc.kaust.edu.sa

2. Command to create ssh tunnel to run on cdlX:
ssh -L ${port}:${node}:${port} -L ${dask_dashboard}:${node}:${dask_dashboard} ${user}@${gateway}

Copy the link provided below by jupyter-server and replace the nid0XXXX with localhost before pasting it in your browser on your workstation/laptop
"


# Run Jupyter
jupyter lab --no-browser --port=${port} --ip=${node} --notebook-dir=${PWD}

Submit the job to a queue

sbatch jobscript.slurm

The expected looks something as follows:

Loaded dask, dask-mpi and xgboost
distributed.scheduler - INFO - Clear task state
distributed.scheduler - INFO -   Scheduler at:   tcp://10.128.0.178:6192
distributed.scheduler - INFO -   dashboard at:         10.128.0.178:9000
distributed.worker - INFO -       Start worker at:   tcp://10.128.0.178:33793
distributed.worker - INFO -          Listening to:   tcp://10.128.0.178:33793
distributed.worker - INFO -          dashboard at:         10.128.0.178:46679
distributed.worker - INFO - Waiting to connect to:    tcp://10.128.0.178:6192
distributed.worker - INFO - -------------------------------------------------
distributed.worker - INFO -               Threads:                         16
......
distributed.worker - INFO -       Start worker at:   tcp://10.128.0.181:45429
distributed.worker - INFO -          Listening to:   tcp://10.128.0.181:45429
distributed.worker - INFO -          dashboard at:         10.128.0.181:41821
distributed.worker - INFO - Waiting to connect to:    tcp://10.128.0.178:6192
distributed.worker - INFO - -------------------------------------------------
distributed.worker - INFO -               Threads:                         16
distributed.worker - INFO -                Memory:                   57.00 GB
distributed.worker - INFO -       Local Directory: /lustre/project/k01/shaima0d/scratch/dask/workers19108843/dask-worker-space/worker-ioudqy7z
distributed.worker - INFO - -------------------------------------------------
distributed.worker - INFO -         Registered to:    tcp://10.128.0.178:6192
distributed.worker - INFO - -------------------------------------------------
distributed.core - INFO - Starting established connection

......

To connect to the compute node nid00177 on Shaheen running your jupyter notebook server,
you need to run following two commands in a terminal

1. Command to create ssh tunnel from you workstation/laptop to cdlX:
ssh -L 8889:localhost:8889 -L 9000:localhost:9000 shaima0d@cdl2.hpc.kaust.edu.sa

2. Command to create ssh tunnel to run on cdlX:
ssh -L 8889:nid00177:8889 -L 9000:nid00177:9000 shaima0d@gateway2

Copy the link provided below by jupyter-server and replace the nid0XXXX with localhost before pasting it in your browser on your workstation/laptop

[I 16:19:32.470 NotebookApp] Loading IPython parallel extension
[I 16:19:33.081 NotebookApp] JupyterLab extension loaded from /sw/xc40cle7/python/3.8.0/cle7_gnu8.3.0/lib/python3.8/site-packages/jupyterlab
[I 16:19:33.081 NotebookApp] JupyterLab application directory is /sw/xc40cle7/python/3.8.0/cle7_gnu8.3.0/share/jupyter/lab
[I 16:19:33.084 NotebookApp] Serving notebooks from local directory: /lustre/project/k01/shaima0d/scratch/dask
[I 16:19:33.084 NotebookApp] The Jupyter Notebook is running at:
[I 16:19:33.085 NotebookApp] http://nid00177:8889/?token=779db7640e145eb2346215e6e948f5b14f1e163a6971b1d0
[I 16:19:33.085 NotebookApp]  or http://127.0.0.1:8889/?token=779db7640e145eb2346215e6e948f5b14f1e163a6971b1d0
[I 16:19:33.085 NotebookApp] Use Control-C to stop this server and shut down all kernels (twice to skip confirmation).
[C 16:19:33.098 NotebookApp] 
    
    To access the notebook, open this file in a browser:
        file:///lustre/scratch/shaima0d/.local/share/jupyter/runtime/nbserver-63036-open.html
    Or copy and paste one of these URLs:
        http://nid00177:8889/?token=779db7640e145eb2346215e6e948f5b14f1e163a6971b1d0
     or http://127.0.0.1:8889/?token=779db7640e145eb2346215e6e948f5b14f1e163a6971b1d0
ssh -L 8889:localhost:8889 -L 9000:localhost:9000 shaima0d@cdl2.hpc.kaust.edu.sa
ssh -L 8889:nid00177:8889 -L 9000:nid00177:9000 shaima0d@gateway2

We are now ready to connect to our Jupyter server. Examine the last line of SLURM output file and copy the URL starting with https://127.0.0.1/…. Paste it in your local internet browser to open the Jupyter client.

SLURM ouptut file:

http://127.0.0.1:8889/?token=aaa9a1dfa46fce19a79c530608c6fb08f5b30071816ae072

For launching dask dashboard:

http://127.0.0.1:9000

Connecting Dask client to scheduler backend

The following notebook demonstrates connecting to the scheduler backend:

When the computation is in flight, you can monitor Dask dashboard to visualize task launches and their expense in terms of compute and memory utilization: