Using IPython for parallel computing on an MPI cluster using SLURM

I assume that you already familiar with using IPython.parallel, otherwise have a look at the documentation. Just one point of caution, if you move code that you want to run in parallel to a package or module, you might stumble upon the problem that Python cannot find the function you imported. The problem is that IPython.parallel only looks in the global namespace. The solution is using @interactive from IPython.parallel as described in this Stack Overflow post.

Although, IPython.parallel comes with built-in support for distributing work using MPI, it is going to create MPI tasks itself. However, usually compute clusters come with a job scheduler like SLURM that manages all resources. Consequently, the scheduler determines how many resources you have access to. In the case of SLURM, you have to define the number of tasks you want to process in parallel and the maximum time our job will require when adding your job to the queue.

 #!/bin/bash
#SBATCH -J ipython-parallel-test
#SBATCH --ntasks=112
#SBATCH --time=00:10:00

The above script gives the job a name, requests resources to 112 tasks and sets the maximum required time to 10 minutes.

Normally, you would use ipcluster start -n 112, but since we are not allowed to create MPI tasks ourselves, we have to start the individual pieces manually via ipcontroller and ipengine. The controller provides a single point of contact the engines connect to and the engines take commands and execute them.

profile=job_${SLURM_JOB_ID}_$(hostname)

echo "Creating profile ${profile}"
ipython profile create ${profile}

echo "Launching controller"
ipcontroller --ip="*" --profile=${profile} --log-to-file &
sleep 10

echo "Launching engines"
srun ipengine --profile=${profile} --location=$(hostname) --log-to-file &
sleep 45

First of all, we create a new IPython profile, which will contain log files and temporary files that are necessary to establish the communication between controller and engines. To avoid clashes with other jobs, the name of the profile contains the job’s ID and the hostname of the machine it is executed on. This will create a folder named profile_job_XYZ_hostname in the ~/.ipython folder.

Next, we start the controller and instruct it to listen on all available interfaces, use the newly create profile and write output to log files residing in the profile’s directory. Note that this command is executed only on a single node, thus we only have a single controller per job.

Finally, we can create the engines, one one for each task, and instruct them to connect to the correct controller. Explicitly specifying the location of the controller is necessary if engines are spread across multiple physical machines and machines have multiple Ethernet interfaces. Removing this option, engines running on other machines are likely to fail connecting to the controller because they might look for the controller at the wrong location (usually localhost). You can easily find out whether this is the case by looking at the ipengine log files in the ~/.ipython/profile_job_XYZ_hostname/log directory.

Finally, we can start our Python script that uses IPython.parallel to distribute work across multiple nodes.

echo "Launching job"
python ipython-parallel-test.py --profile ${profile}

To make things more interesting, I created an actual script that approximates the number Pi in parallel.

import argparse
from IPython.parallel import Client
import numpy as np
import os


def compute_pi(n_samples):
	s = 0
	for i in range(n_samples):
		x = random()
		y = random()
		if x * x + y * y <= 1:
			s += 1
	return 4. * s / n_samples


def main(profile):
	rc = Client(profile=profile)
	views = rc[:]
	with views.sync_imports():
		from random import random

	results = views.apply_sync(compute_pi, int(1e9))
	my_pi = np.sum(results) / len(results)
	filename = "result-job-{0}.txt".format(os.environ["SLURM_JOB_ID"])
	with open(filename, "w") as fp:
		fp.write("%.20f\n" % my_pi)


if __name__ == '__main__':
	parser = argparse.ArgumentParser()
	parser.add_argument("-p", "--profile", required=True,
		help="Name of IPython profile to use")

	args = parser.parse_args()

	main(args.profile)

I ran this on the Linux cluster of the Leibniz Supercomputing Centre. Using 112 tasks, it took a little more than 8 minutes, and the result I got was 3.14159637160714266813. You can download the SLURM script and Python code from above here.

Avatar
Sebastian Pölsterl
AI Researcher

My research interests include machine learning for time-to-event analysis, causal inference and biomedical applications.