Learning Goals¶
By the end of this tutorial, you will be able to:
- Employ three parallelization libraries to speed up a serial process.
- Calculate the speedup of the different approaches shown.
- Evaluate which library is suited to your task.
Introduction¶
This notebook shows how to speed up an image convolution task using these three libraries:
- Ray: an open-source unified compute framework that makes it easy to scale AI and Python workloads.
- Multiprocessing: part of the standard library; supports spawning processes using an API similar to the threading module; offers both local and remote concurrency, effectively side-stepping the Global Interpreter Lock by using subprocesses instead of threads.
- Dask: developed to natively scale computational packages like numpy, pandas and scikit-learn, and the surrounding ecosystem, to multi-core machines and distributed clusters when datasets exceed memory.
Imports¶
- multiprocessing.Pool for multiprocessing using the standard library
- time for timing the processes
- dask.distributed.Client for making a local Dask cluster
- numpy and scipy.signal for numerical work
- psutil for finding the available processors on your machine
- ray for scaling up Python tasks
# Uncomment the next line to install dependencies if needed.
# !pip install dask[distributed] numpy ray scipyfrom multiprocessing import Pool
import time
from dask.distributed import Client
import numpy as np
import psutil
import scipy.signal
import rayFind the cpus available¶
Find and print the number of cpus
(taken from https://
num_cpus = psutil.cpu_count(logical=True)
print(num_cpus)Process serially using a conventional loop¶
Use scipy.signal to convolve two 2-dimensional arrays and return a 5x5 downsampled result.
def fconv(image, random_filter):
return scipy.signal.convolve2d(image, random_filter)[::5, ::5]filters = [np.random.normal(size=(4, 4)) for _ in range(num_cpus)]Process 100 iterations serially, then extrapolate to num_cpus*100
start = time.time()
num_iter = 100
image = np.zeros((3000, 3000))
for i in range(num_iter):
result = fconv(image, filters[i % num_cpus])
duration_conv = time.time() - start
print("(scaled) conventional duration for {:d} iterations = {:.1f} seconds"
.format(num_cpus*num_iter, duration_conv*num_cpus))Process in parallel using Ray¶
The warning raised by ray.init only affects shared object usage, which is not an issue for this tutorial. It may harm performance in other scenarios.
ray.init(num_cpus=num_cpus)Use scipy.signal to convolve two 2-dimensional arrays and return a 5x5 downsampled result. To use Ray, we decorate the function that is doing the work.
@ray.remote
def fray(image, random_filter):
return scipy.signal.convolve2d(image, random_filter)[::5, ::5]In the following loop, ray.put places the image into shared memory. The call to ray.get retrieves the result.
start = time.time()
image = np.zeros((3000, 3000))
for _ in range(100):
image_id = ray.put(image)
ray.get([fray.remote(image_id, filters[i]) for i in range(num_cpus)])
duration_ray = time.time() - start
print("Ray duration = {:.1f}, speedup = {:.2f}"
.format(duration_ray, duration_conv*num_cpus / duration_ray))ray.shutdown()Process in parallel using multiprocessing¶
Documentation for multiprocessing
Use scipy.signal to convolve two 2-dimensional arrays and return a 5x5 downsampled result. The call to the function has a slightly different form than that for the serial loop.
# Note: Mac and Windows users may need to copy the contents of this cell into a separate '.py' file
# and then import it in order to use the `fmp` function with `multiprocessing`. This has to do with
# differences in what does / does not get copied into the child processes in different operating systems.
import scipy.signal
def fmp(args):
image, random_filter = args
return scipy.signal.convolve2d(image, random_filter)[::5, ::5]Use a multiprocessing pool with the number of cpus we found earlier.
pool = Pool(num_cpus)Using pool.map is the closest analog in multiprocessing to the Ray API.
start = time.time()
image = np.zeros((3000, 3000))
for _ in range(100):
pool.map(fmp, zip(num_cpus * [image], filters))
duration_mp = time.time() - start
print("Multiprocessing duration = {:.1f}, speedup = {:.2f}"
.format(duration_mp, duration_conv*num_cpus / duration_mp))Process using Dask¶
Define a Dask distributed client with number of workers set to the number of cpus we found earlier, and with one thread per worker.
client = Client(n_workers=num_cpus, threads_per_worker=1)print(client)Dask recommends scattering the large inputs across the workers, though this makes little difference in execution time.
start = time.time()
image = np.zeros((3000, 3000))
for _ in range(100):
for j in range(num_cpus):
big_future = client.scatter((image, filters[j % num_cpus]))
future = client.submit(fmp, big_future)
duration_dask = time.time() - start
print("Dask duration = {:.1f}, speedup = {:.2f}"
.format(duration_dask, duration_conv*num_cpus / duration_dask))client.close()Conclusions¶
- Ray is the most effective at speeding up the convolution workload by fully utilizing all available processes
- Multiprocessing is second in effectiveness
- Dask delivers the least speedup; perhaps due to having only six processes on the dask.distributed client
About this notebook¶
Author: David Shupe in conjunction with Jessica Krick and the IRSA Science Platform team at IPAC.
Updated: 2024-09-24
Contact: the IRSA Helpdesk with questions or reporting problems.
Citations¶
If you use these software packages in your work, please use the following citations:
- Dask: Dask Development Team (2016). Dask: Library for dynamic task scheduling. URL https://dask.org
- Ray: The Ray Development Team. URL https://docs.ray.io