...
allows to express the workflow
allows a way to map tasks to resources (i.e. number of cores for each task, etc .. )
gives monitoring capability to keep track of e.g. progress of task farm, CPU load, memory consumption etc.
As a by-product, we get the benefit of:
packing thin tasks (e.g. 1 core task) on a single Shaheen node, maximizing the node utilization
introducing a hook to dynamically allocate more worker nodes as SLURM jobs and extend the resources available to the scheduler to run the task farm
possibility to resume the task farm as a new set of jobs (depending on if you have added some logic) , therefore allowing checkpoint-restart.
Implementation
We are leveraging Dask’s execution engine on Shaheen’s compute nodes for this purpose.
...
A
user_workflow.py
script to express the steps in a workflow. It also allows expressing the parameters and passes a task and its corresponding parameter to the Dask cluster via theclient
API.A
wrapper.sh
is an executable bash script which does steps common to each task, e.g. setting the environment. The command line (including the options/arguments) are passes as an argument when invoking the wrapper scriptTwo jobscripts to interact with SLURM
sched.slurm
is a jobscript which invokes Dask scheduler and invokes theuser_workflow.py
when worker nodes are ready. It also submits theworker.slurm
script to SLURM depending on the value ofNUM_WORKERS
set in the scriptworker.slurm
allocates resources for a worker node which is a Shaheen compute node as a SLURM job, where Dask workers will start and will connect to an existing Dask scheduler . This jobscript also allows tuning the configuration of resources available for each Dask worker, i.e CPUs or memory
User workflow script
The script below
Code Block | ||
---|---|---|
| ||
#!/usr/bin/env python from dask.distributed import Client,as_completed import os,time, subprocess as sb import numpy as np #client = Client(scheduler_file='scheduler_%s.json'%jobid,direct_to_workers=True) # start local workers as processes client = Client(scheduler_file='scheduler.json') # start local workers as processes def params(filename='foo.txt'): f = open(filename,'r+') files = f.read().splitlines() # List with stripped line-breaks f.close() return files def func(x,out_dir): fo=open(os.path.join(out_dir,'out.log'),'w+') fe=open(os.path.join(out_dir,'err.log'),'w+') srcdir='/scratch/shaima0d/tickets/39404/user_case' EXE='FreeFem++-nw' # Pre-processing steps -- before launching the application o = sb.run(['rsync','-r','%s'%(os.path.join(srcdir,'pv_LIR_etau_IVCurve_SF.edp')), 'pv_LIR_etau_IVCurve_SF.edp'] ,cwd=out_dir) o = sb.run(['rsync','-r','%s'%(os.path.join(srcdir,'BF_RefMeshLIR_100x100x95x100.msh')), 'BF_RefMeshLIR_100x100x95x100.msh'] ,cwd=out_dir) o = sb.run(['rsync','-r','%s'%(os.path.join(os.environ['PWD'],'wrapper.sh')), 'wrapper.sh'] ,cwd=out_dir) # Launch the application along with its optinos as command line arugument to wrapper script o = sb.run(['./wrapper.sh','%s'%(EXE),'pv_LIR_etau_IVCurve_SF.edp','%s'%(x)], stdout=fo,stderr=fe, shell=False,cwd=out_dir) # Post-processing steps # - Copy the output file and rename it to index according to the task # - Delete the task directory if the processing was successful return True base_dir=os.environ['EXP_NAME'] os.makedirs(base_dir,exist_ok=True) # the logic of parameter setting is encapsulated in params function: x = params() outdirs=list() for i in range(len(x)): sample_dir=os.path.join(base_dir,'%s'%(str(i+1))) os.makedirs(sample_dir,exist_ok=True) outdirs.append(sample_dir) print('outdirs[%d]:: '%(len(outdirs)),outdirs) futures=client.map(func,x,outdirs) for future in as_completed(futures): print('result: ',future.result()) client.close() |
Wrapper script
Here you can set the environment to run the target application and invoke it via the arguments passed in from the user_workflow.py
script where the wrapper.sh
was invoked.
Code Block |
---|
#!/bin/bash
module swap PrgEnv-cray PrgEnv-gnu
module load freefem/4.7
module list
echo "running $@"
$@
|
SLURM jobscripts
There are two scripts needed in this workflow
Scheduler script
The scheduler script will, e.g. look as below. Until line 59, its a boiler plate code, which remains pretty much same for any workload. You can control and modify the ports dask_dashboard
port and the job’s wall time
if you like. Lines 59-64 invoke the user_workflow.py
script on the head node which will spawn work on the available worker nodes. There is a sleep
command to wait until all worker nodes are up and running.
Code Block | ||
---|---|---|
| ||
#!/bin/bash -l
#SBATCH --ntasks=1
#SBATCH --cpus-per-task=32
#SBATCH --partition=workq
#SBATCH --hint=nomultithread
#SBATCH --time=01:00:00
module load dask
NUM_WORKERS=4
WORKER_JOB_PREFIX=test_workers
export EXP_NAME=experiment_${SLURM_JOBID}
export LC_ALL=C.UTF-8
export LANG=C.UTF-8
# get tunneling info
export XDG_RUNTIME_DIR=""
node=$(hostname -s)
user=$(whoami)
gateway=${EPROXY_LOGIN}
submit_host=${SLURM_SUBMIT_HOST}
port=8889
dask_dashboard=9000
if [ -f 'scheduler.json' ]; then
rm scheduler.json
fi
srun -u --hint=nomultithread dask-scheduler --scheduler-file=scheduler.json --dashboard-address=${node}:${dask_dashboard} --port=6192 --interface=ipogif0 &
echo $node on $gateway pinned to port $port
# print tunneling instructions jupyter-log
echo -e "
To connect to the compute node ${node} on Shaheen running your jupyter notebook server,
you need to run following two commands in a terminal
1. Command to create ssh tunnel from you workstation/laptop to cdlX:
ssh -L ${dask_dashboard}:localhost:${dask_dashboard} ${user}@${submit_host}.hpc.kaust.edu.sa
2. Command to create ssh tunnel to run on cdlX:
ssh -L ${dask_dashboard}:${node}:${dask_dashboard} ${user}@${gateway}
Copy the link provided below by jupyter-server and replace the nid0XXXX with localhost before pasting it in your browser on your workstation/laptop
"
while [ ! -f 'scheduler.json' ] ;
do
sleep 2
echo "Waiting for dask scheduler to start"
done
for ((i=1; i< $((NUM_WORKERS + 1)); i++))
do
sbatch -J ${WORKER_JOB_PREFIX} worker.slurm
done
sleep 180
echo "Starting workload"
python -u user_workflow.py
scancel -n ${WORKER_JOB_PREFIX}
exit 0
wait |
Worker script
The worker.slurm
script is submitted automatically by the sched.slurm
script. However, if you feel that the task farm requires more resource, you can submit additional worker jobs manually to extend the compute resources
Code Block |
---|
#!/bin/bash -l
#SBATCH --ntasks=1
#SBATCH --cpus-per-task=4
#SBATCH --partition=workq
#SBATCH --hint=nomultithread
#SBATCH --time=01:00:00
module load dask
export LC_ALL=C.UTF-8
export LANG=C.UTF-8
mkdir workers${SLURM_JOBID}
srun -n 1 -c ${SLURM_CPUS_PER_TASK} -u --hint=nomultithread --cpu-bind=cores dask-worker --scheduler-file=scheduler.json --interface=ipogif0 --nprocs=${SLURM_CPUS_PER_TASK} |
Execution
When all scripts are ready, you can simply submit the sched.slurm
job to start the workflow:
Code Block |
---|
sbatch sched.slurm |
As on optional step, for additional workers more than that set in the sched.slurm
script as parameter NUM_WORKERS
a job can manually be submitted to SLURM queue to extend the compute resource.
Code Block |
---|
sbatch worker.slurm |
For instructions on how to connect to the Dask Dashboard and monitor the progress, please following: