Parallelizing image convolution#
Learning Goals#
By the end of this tutorial, you will be able to:
Employ three parallelization libraries to speed up a serial process.
Calculate the speedup of the different approaches shown.
Evaluate which library is suited to your task.
Introduction#
This notebook shows how to speed up an image convolution task using these three libraries:
Ray: an open-source unified compute framework that makes it easy to scale AI and Python workloads.
Multiprocessing: part of the standard library; supports spawning processes using an API similar to the threading module; offers both local and remote concurrency, effectively side-stepping the Global Interpreter Lock by using subprocesses instead of threads.
Dask: developed to natively scale computational packages like numpy, pandas and scikit-learn, and the surrounding ecosystem, to multi-core machines and distributed clusters when datasets exceed memory.
Imports#
multiprocessing.Pool for multiprocessing using the standard library
time for timing the processes
dask.distributed.Client for making a local Dask cluster
numpy and scipy.signal for numerical work
psutil for finding the available processors on your machine
ray for scaling up Python tasks
# Uncomment the next line to install dependencies if needed.
# !pip install dask[distributed] numpy ray scipy
from multiprocessing import Pool
import time
from dask.distributed import Client
import numpy as np
import psutil
import scipy.signal
import ray
2024-10-28 17:00:39,104 INFO util.py:154 -- Missing packages: ['ipywidgets']. Run `pip install -U ipywidgets`, then restart the notebook server for rich notebook output.
Find the cpus available#
Find and print the number of cpus (taken from https://towardsdatascience.com/10x-faster-parallel-python-without-python-multiprocessing-e5017c93cce1)
num_cpus = psutil.cpu_count(logical=True)
print(num_cpus)
4
Process serially using a conventional loop#
Use scipy.signal
to convolve two 2-dimensional arrays and return a 5x5 downsampled result.
def fconv(image, random_filter):
return scipy.signal.convolve2d(image, random_filter)[::5, ::5]
filters = [np.random.normal(size=(4, 4)) for _ in range(num_cpus)]
Process 100 iterations serially, then extrapolate to num_cpus*100
start = time.time()
num_iter = 100
image = np.zeros((3000, 3000))
for i in range(num_iter):
result = fconv(image, filters[i % num_cpus])
duration_conv = time.time() - start
print("(scaled) conventional duration for {:d} iterations = {:.1f} seconds"
.format(num_cpus*num_iter, duration_conv*num_cpus))
(scaled) conventional duration for 400 iterations = 149.6 seconds
Process in parallel using Ray#
The warning raised by ray.init
only affects shared object usage, which is not an issue for this tutorial. It may harm performance in other scenarios.
ray.init(num_cpus=num_cpus)
2024-10-28 17:01:18,467 INFO worker.py:1816 -- Started a local Ray instance.
Python version: | 3.11.10 |
Ray version: | 2.38.0 |
Use scipy.signal
to convolve two 2-dimensional arrays and return a 5x5 downsampled result. To use Ray, we decorate the function that is doing the work.
@ray.remote
def fray(image, random_filter):
return scipy.signal.convolve2d(image, random_filter)[::5, ::5]
In the following loop, ray.put
places the image into shared memory. The call to ray.get
retrieves the result.
start = time.time()
image = np.zeros((3000, 3000))
for _ in range(100):
image_id = ray.put(image)
ray.get([fray.remote(image_id, filters[i]) for i in range(num_cpus)])
duration_ray = time.time() - start
print("Ray duration = {:.1f}, speedup = {:.2f}"
.format(duration_ray, duration_conv*num_cpus / duration_ray))
Ray duration = 85.9, speedup = 1.74
ray.shutdown()
Process in parallel using multiprocessing#
Documentation for multiprocessing
Use scipy.signal
to convolve two 2-dimensional arrays and return a 5x5 downsampled result. The call to the function has a slightly different form than that for the serial loop.
# Note: Mac and Windows users may need to copy the contents of this cell into a separate '.py' file
# and then import it in order to use the `fmp` function with `multiprocessing`. This has to do with
# differences in what does / does not get copied into the child processes in different operating systems.
import scipy.signal
def fmp(args):
image, random_filter = args
return scipy.signal.convolve2d(image, random_filter)[::5, ::5]
Use a multiprocessing pool with the number of cpus we found earlier.
pool = Pool(num_cpus)
Using pool.map
is the closest analog in multiprocessing to the Ray API.
start = time.time()
image = np.zeros((3000, 3000))
for _ in range(100):
pool.map(fmp, zip(num_cpus * [image], filters))
duration_mp = time.time() - start
print("Multiprocessing duration = {:.1f}, speedup = {:.2f}"
.format(duration_mp, duration_conv*num_cpus / duration_mp))
Multiprocessing duration = 134.8, speedup = 1.11
Process using Dask#
Define a Dask distributed client with number of workers set to the number of cpus we found earlier, and with one thread per worker.
client = Client(n_workers=num_cpus, threads_per_worker=1)
print(client)
<Client: 'tcp://127.0.0.1:45175' processes=4 threads=4, memory=15.61 GiB>
Dask recommends scattering the large inputs across the workers, though this makes little difference in execution time.
start = time.time()
image = np.zeros((3000, 3000))
for _ in range(100):
for j in range(num_cpus):
big_future = client.scatter((image, filters[j % num_cpus]))
future = client.submit(fmp, big_future)
duration_dask = time.time() - start
print("Dask duration = {:.1f}, speedup = {:.2f}"
.format(duration_dask, duration_conv*num_cpus / duration_dask))
Dask duration = 105.9, speedup = 1.41
client.close()
2024-10-28 17:06:49,028 - distributed.worker.state_machine - WARNING - Async instruction for <Task cancelled name="execute('fmp-c68000748138411ba48797f29b37ecd8')" coro=<Worker.execute() done, defined at /home/runner/work/irsa-tutorials/irsa-tutorials/.tox/py311-buildhtml/lib/python3.11/site-packages/distributed/worker_state_machine.py:3609>> ended with CancelledError
2024-10-28 17:06:49,327 - distributed.worker.state_machine - WARNING - Async instruction for <Task cancelled name="execute('fmp-37dc2c2005ec66498dcd9fffbcae5fca')" coro=<Worker.execute() done, defined at /home/runner/work/irsa-tutorials/irsa-tutorials/.tox/py311-buildhtml/lib/python3.11/site-packages/distributed/worker_state_machine.py:3609>> ended with CancelledError
Conclusions#
Ray is the most effective at speeding up the convolution workload by fully utilizing all available processes
Multiprocessing is second in effectiveness
Dask delivers the least speedup; perhaps due to having only six processes on the dask.distributed client
About this notebook#
This notebook was developed by David Shupe (shupe@ipac.caltech.edu) in conjunction with Jessica Krick and the IRSA Science Platform team at IPAC.
Citations#
If you use these software packages in your work, please use the following citations:
Dask: Dask Development Team (2016). Dask: Library for dynamic task scheduling. URL https://dask.org
Ray: The Ray Development Team. URL https://docs.ray.io