Parallelizing image convolution#

Learning Goals#

By the end of this tutorial, you will be able to:

  • Employ three parallelization libraries to speed up a serial process.

  • Calculate the speedup of the different approaches shown.

  • Evaluate which library is suited to your task.

Introduction#

This notebook shows how to speed up an image convolution task using these three libraries:

  • Ray: an open-source unified compute framework that makes it easy to scale AI and Python workloads.

  • Multiprocessing: part of the standard library; supports spawning processes using an API similar to the threading module; offers both local and remote concurrency, effectively side-stepping the Global Interpreter Lock by using subprocesses instead of threads.

  • Dask: developed to natively scale computational packages like numpy, pandas and scikit-learn, and the surrounding ecosystem, to multi-core machines and distributed clusters when datasets exceed memory.

Imports#

  • multiprocessing.Pool for multiprocessing using the standard library

  • time for timing the processes

  • dask.distributed.Client for making a local Dask cluster

  • numpy and scipy.signal for numerical work

  • psutil for finding the available processors on your machine

  • ray for scaling up Python tasks

# Uncomment the next line to install dependencies if needed.
# !pip install dask[distributed] numpy ray scipy
from multiprocessing import Pool
import time

from dask.distributed import Client
import numpy as np
import psutil
import scipy.signal
import ray
2024-10-28 17:00:39,104	INFO util.py:154 -- Missing packages: ['ipywidgets']. Run `pip install -U ipywidgets`, then restart the notebook server for rich notebook output.

Find the cpus available#

Find and print the number of cpus (taken from https://towardsdatascience.com/10x-faster-parallel-python-without-python-multiprocessing-e5017c93cce1)

num_cpus = psutil.cpu_count(logical=True)
print(num_cpus)
4

Process serially using a conventional loop#

Use scipy.signal to convolve two 2-dimensional arrays and return a 5x5 downsampled result.

def fconv(image, random_filter):
    return scipy.signal.convolve2d(image, random_filter)[::5, ::5]
filters = [np.random.normal(size=(4, 4)) for _ in range(num_cpus)]

Process 100 iterations serially, then extrapolate to num_cpus*100

start = time.time()
num_iter = 100
image = np.zeros((3000, 3000))
for i in range(num_iter):
    result = fconv(image, filters[i % num_cpus])
duration_conv = time.time() - start
print("(scaled) conventional duration for {:d} iterations = {:.1f} seconds"
      .format(num_cpus*num_iter, duration_conv*num_cpus))
(scaled) conventional duration for 400 iterations = 149.6 seconds

Process in parallel using Ray#

Documentation for ray

The warning raised by ray.init only affects shared object usage, which is not an issue for this tutorial. It may harm performance in other scenarios.

ray.init(num_cpus=num_cpus)
2024-10-28 17:01:18,467	INFO worker.py:1816 -- Started a local Ray instance.

Use scipy.signal to convolve two 2-dimensional arrays and return a 5x5 downsampled result. To use Ray, we decorate the function that is doing the work.

@ray.remote
def fray(image, random_filter):
    return scipy.signal.convolve2d(image, random_filter)[::5, ::5]

In the following loop, ray.put places the image into shared memory. The call to ray.get retrieves the result.

start = time.time()
image = np.zeros((3000, 3000))
for _ in range(100):
    image_id = ray.put(image)
    ray.get([fray.remote(image_id, filters[i]) for i in range(num_cpus)])
duration_ray = time.time() - start
print("Ray duration = {:.1f}, speedup = {:.2f}"
      .format(duration_ray, duration_conv*num_cpus / duration_ray))
Ray duration = 85.9, speedup = 1.74
ray.shutdown()

Process in parallel using multiprocessing#

Documentation for multiprocessing

Use scipy.signal to convolve two 2-dimensional arrays and return a 5x5 downsampled result. The call to the function has a slightly different form than that for the serial loop.

# Note: Mac and Windows users may need to copy the contents of this cell into a separate '.py' file
# and then import it in order to use the `fmp` function with `multiprocessing`. This has to do with
# differences in what does / does not get copied into the child processes in different operating systems.
import scipy.signal

def fmp(args):
    image, random_filter = args
    return scipy.signal.convolve2d(image, random_filter)[::5, ::5]

Use a multiprocessing pool with the number of cpus we found earlier.

pool = Pool(num_cpus)

Using pool.map is the closest analog in multiprocessing to the Ray API.

start = time.time()
image = np.zeros((3000, 3000))
for _ in range(100):
    pool.map(fmp, zip(num_cpus * [image], filters))
duration_mp = time.time() - start
print("Multiprocessing duration = {:.1f}, speedup = {:.2f}"
      .format(duration_mp, duration_conv*num_cpus / duration_mp))
Multiprocessing duration = 134.8, speedup = 1.11

Process using Dask#

Documentation for Dask

Define a Dask distributed client with number of workers set to the number of cpus we found earlier, and with one thread per worker.

client = Client(n_workers=num_cpus, threads_per_worker=1)
print(client)
<Client: 'tcp://127.0.0.1:45175' processes=4 threads=4, memory=15.61 GiB>

Dask recommends scattering the large inputs across the workers, though this makes little difference in execution time.

start = time.time()
image = np.zeros((3000, 3000))
for _ in range(100):
    for j in range(num_cpus):
        big_future = client.scatter((image, filters[j % num_cpus]))
        future = client.submit(fmp, big_future)
duration_dask = time.time() - start
print("Dask duration = {:.1f}, speedup = {:.2f}"
      .format(duration_dask, duration_conv*num_cpus / duration_dask))
Dask duration = 105.9, speedup = 1.41
client.close()
2024-10-28 17:06:49,028 - distributed.worker.state_machine - WARNING - Async instruction for <Task cancelled name="execute('fmp-c68000748138411ba48797f29b37ecd8')" coro=<Worker.execute() done, defined at /home/runner/work/irsa-tutorials/irsa-tutorials/.tox/py311-buildhtml/lib/python3.11/site-packages/distributed/worker_state_machine.py:3609>> ended with CancelledError
2024-10-28 17:06:49,327 - distributed.worker.state_machine - WARNING - Async instruction for <Task cancelled name="execute('fmp-37dc2c2005ec66498dcd9fffbcae5fca')" coro=<Worker.execute() done, defined at /home/runner/work/irsa-tutorials/irsa-tutorials/.tox/py311-buildhtml/lib/python3.11/site-packages/distributed/worker_state_machine.py:3609>> ended with CancelledError

Conclusions#

  • Ray is the most effective at speeding up the convolution workload by fully utilizing all available processes

  • Multiprocessing is second in effectiveness

  • Dask delivers the least speedup; perhaps due to having only six processes on the dask.distributed client

About this notebook#

This notebook was developed by David Shupe (shupe@ipac.caltech.edu) in conjunction with Jessica Krick and the IRSA Science Platform team at IPAC.

Citations#

If you use these software packages in your work, please use the following citations:

  • Dask: Dask Development Team (2016). Dask: Library for dynamic task scheduling. URL https://dask.org

  • Ray: The Ray Development Team. URL https://docs.ray.io