Dask is an interesting pythonic framework, primarily targeted to run larger than memory workloads on multiple cores and nodes of a cluster of resources.

In this case, we want to demonstrate how Dask can be used to run a swarm of thin tasks which have load-imbalance on a compute resource which is fewer than the tasks, but can dynamically be increased if the task-farm is running at slower pace than tolerable.

Problem statement

As an example, we have a few steps in a task that needs to be done multiple times. One essential condition that the task must fulfill is that it is independent, that is, it can run exclusively without any dependency on another task of the farm.

Below is a pseudo-code with step of a representative task:

  1. Create a directory

  2. Copy task specific input files from a common source directory

  3. Load software environment with the target application installed

  4. Launch the command and its arguments and Log progress to a log file in Present Working Directory

  5. When finished, copy the output in a common output director

  6. Delete the task specific directory

Consider that you must do this a million times. Its would be useful to have an execution framework which:

As a by-product, we get the benefit of:

Implementation

We are leveraging Dask’s execution engine on Shaheen’s compute nodes for this purpose.

We break our workflow into three components:

User workflow script

The script below

#!/usr/bin/env python

from dask.distributed import Client,as_completed
import os,time, subprocess as sb
import numpy as np
#client = Client(scheduler_file='scheduler_%s.json'%jobid,direct_to_workers=True)  # start local workers as processes
client = Client(scheduler_file='scheduler.json')  # start local workers as processes


def params(filename='foo.txt'):
    f = open(filename,'r+')
    files = f.read().splitlines() # List with stripped line-breaks 
    f.close()
    return files

def func(x,out_dir):
    fo=open(os.path.join(out_dir,'out.log'),'w+')
    fe=open(os.path.join(out_dir,'err.log'),'w+')
    
    srcdir='/scratch/shaima0d/tickets/39404/user_case'
    EXE='FreeFem++-nw'

    # Pre-processing steps --  before launching the application
    o = sb.run(['rsync','-r','%s'%(os.path.join(srcdir,'pv_LIR_etau_IVCurve_SF.edp')),
        'pv_LIR_etau_IVCurve_SF.edp']
        ,cwd=out_dir)
    o = sb.run(['rsync','-r','%s'%(os.path.join(srcdir,'BF_RefMeshLIR_100x100x95x100.msh')),
        'BF_RefMeshLIR_100x100x95x100.msh']
        ,cwd=out_dir)
    o = sb.run(['rsync','-r','%s'%(os.path.join(os.environ['PWD'],'wrapper.sh')),
        'wrapper.sh']
        ,cwd=out_dir)

    # Launch the application along with its optinos as command line arugument to wrapper script 
    o = sb.run(['./wrapper.sh','%s'%(EXE),'pv_LIR_etau_IVCurve_SF.edp','%s'%(x)],
                stdout=fo,stderr=fe,
                shell=False,cwd=out_dir)
    # Post-processing steps
        # - Copy the output file and rename it to index according to the task
        # - Delete the task directory if the processing was successful
    
    return True

base_dir=os.environ['EXP_NAME']
os.makedirs(base_dir,exist_ok=True)

# the logic of parameter setting is encapsulated in params function:
x = params()

outdirs=list()
for i in range(len(x)):
    sample_dir=os.path.join(base_dir,'%s'%(str(i+1)))
    os.makedirs(sample_dir,exist_ok=True)
    outdirs.append(sample_dir)
print('outdirs[%d]:: '%(len(outdirs)),outdirs)


futures=client.map(func,x,outdirs)

for future in as_completed(futures):
    print('result: ',future.result())

client.close()

Wrapper script

Here you can set the environment to run the target application and invoke it via the arguments passed in from the user_workflow.py script where the wrapper.sh was invoked.

#!/bin/bash

module swap PrgEnv-cray PrgEnv-gnu
module load freefem/4.7
module list
echo "running $@"
$@

In the above example, the software environment is set by loading some installed modules on Shaheen. However, this can be replaced by sourcing a conda environment, if your software was installed in your project directory using conda package manager.

SLURM jobscripts

There are two scripts needed in this workflow

Scheduler script

The scheduler script will, e.g. look as below. Until line 59, its a boiler plate code, which remains pretty much same for any workload. You can control and modify the ports dask_dashboard port and the job’s wall time if you like. Lines 59-64 invoke the user_workflow.py script on the head node which will spawn work on the available worker nodes. There is a sleep command to wait until all worker nodes are up and running.

#!/bin/bash -l
#SBATCH --ntasks=1
#SBATCH --cpus-per-task=32
#SBATCH --partition=workq
#SBATCH --hint=nomultithread
#SBATCH --time=01:00:00

module load dask

NUM_WORKERS=4
WORKER_JOB_PREFIX=test_workers
export EXP_NAME=experiment_${SLURM_JOBID}

export LC_ALL=C.UTF-8
export LANG=C.UTF-8

# get tunneling info 
export XDG_RUNTIME_DIR="" 
node=$(hostname -s) 
user=$(whoami) 
gateway=${EPROXY_LOGIN} 
submit_host=${SLURM_SUBMIT_HOST} 
port=8889
dask_dashboard=9000

if [ -f 'scheduler.json' ]; then
	rm scheduler.json
fi

srun -u --hint=nomultithread dask-scheduler --scheduler-file=scheduler.json --dashboard-address=${node}:${dask_dashboard} --port=6192 --interface=ipogif0 &


echo $node on $gateway pinned to port $port
# print tunneling instructions jupyter-log
echo -e "
To connect to the compute node ${node} on Shaheen running your jupyter notebook server,
you need to run following two commands in a terminal

1. Command to create ssh tunnel from you workstation/laptop to cdlX:
ssh -L ${dask_dashboard}:localhost:${dask_dashboard} ${user}@${submit_host}.hpc.kaust.edu.sa

2. Command to create ssh tunnel to run on cdlX:
ssh -L ${dask_dashboard}:${node}:${dask_dashboard} ${user}@${gateway}

Copy the link provided below by jupyter-server and replace the nid0XXXX with localhost before pasting it in your browser on your workstation/laptop
"

while [ ! -f 'scheduler.json' ] ; 
do
	sleep 2
	echo "Waiting for dask scheduler to start"
done

for ((i=1; i< $((NUM_WORKERS + 1)); i++))
	do
		sbatch -J ${WORKER_JOB_PREFIX}  worker.slurm
done

sleep 180
echo "Starting workload"
python -u user_workflow.py
scancel -n ${WORKER_JOB_PREFIX} 
exit 0
wait

Worker script

The worker.slurm script is submitted automatically by the sched.slurm script. However, if you feel that the task farm requires more resource, you can submit additional worker jobs manually to extend the compute resources

#!/bin/bash -l
#SBATCH --ntasks=1
#SBATCH --cpus-per-task=4
#SBATCH --partition=workq
#SBATCH --hint=nomultithread
#SBATCH --time=01:00:00

module load dask

export LC_ALL=C.UTF-8
export LANG=C.UTF-8
mkdir workers${SLURM_JOBID}


srun -n 1 -c ${SLURM_CPUS_PER_TASK} -u --hint=nomultithread --cpu-bind=cores dask-worker --scheduler-file=scheduler.json --interface=ipogif0 --nprocs=${SLURM_CPUS_PER_TASK}

Execution

When all scripts are ready, you can simply submit the sched.slurm job to start the workflow:

sbatch sched.slurm

As on optional step, for additional workers more than that set in the sched.slurm script as parameter NUM_WORKERS a job can manually be submitted to SLURM queue to extend the compute resource.

sbatch worker.slurm

For instructions on how to connect to the Dask Dashboard and monitor the progress, please following:

Using Dask on Shaheen