Parallelizing image convolution#

Learning Goals#

By the end of this tutorial, you will be able to:

  • Employ three parallelization libraries to speed up a serial process.

  • Calculate the speedup of the different approaches shown.

  • Evaluate which library is suited to your task.

Introduction#

This notebook shows how to speed up an image convolution task using these three libraries:

  • Ray: an open-source unified compute framework that makes it easy to scale AI and Python workloads.

  • Multiprocessing: part of the standard library; supports spawning processes using an API similar to the threading module; offers both local and remote concurrency, effectively side-stepping the Global Interpreter Lock by using subprocesses instead of threads.

  • Dask: developed to natively scale computational packages like numpy, pandas and scikit-learn, and the surrounding ecosystem, to multi-core machines and distributed clusters when datasets exceed memory.

Imports#

  • multiprocessing.Pool for multiprocessing using the standard library

  • time for timing the processes

  • dask.distributed.Client for making a local Dask cluster

  • numpy and scipy.signal for numerical work

  • psutil for finding the available processors on your machine

  • ray for scaling up Python tasks

# Uncomment the next line to install dependencies if needed.
# !pip install dask[distributed] numpy ray scipy
from multiprocessing import Pool
import time

from dask.distributed import Client
import numpy as np
import psutil
import scipy.signal
import ray
2025-02-19 23:13:25,213	INFO util.py:154 -- Missing packages: ['ipywidgets']. Run `pip install -U ipywidgets`, then restart the notebook server for rich notebook output.

Find the cpus available#

Find and print the number of cpus (taken from https://towardsdatascience.com/10x-faster-parallel-python-without-python-multiprocessing-e5017c93cce1)

num_cpus = psutil.cpu_count(logical=True)
print(num_cpus)
4

Process serially using a conventional loop#

Use scipy.signal to convolve two 2-dimensional arrays and return a 5x5 downsampled result.

def fconv(image, random_filter):
    return scipy.signal.convolve2d(image, random_filter)[::5, ::5]
filters = [np.random.normal(size=(4, 4)) for _ in range(num_cpus)]

Process 100 iterations serially, then extrapolate to num_cpus*100

start = time.time()
num_iter = 100
image = np.zeros((3000, 3000))
for i in range(num_iter):
    result = fconv(image, filters[i % num_cpus])
duration_conv = time.time() - start
print("(scaled) conventional duration for {:d} iterations = {:.1f} seconds"
      .format(num_cpus*num_iter, duration_conv*num_cpus))
(scaled) conventional duration for 400 iterations = 148.5 seconds

Process in parallel using Ray#

Documentation for ray

The warning raised by ray.init only affects shared object usage, which is not an issue for this tutorial. It may harm performance in other scenarios.

ray.init(num_cpus=num_cpus)
2025-02-19 23:14:04,436	INFO worker.py:1841 -- Started a local Ray instance.

Use scipy.signal to convolve two 2-dimensional arrays and return a 5x5 downsampled result. To use Ray, we decorate the function that is doing the work.

@ray.remote
def fray(image, random_filter):
    return scipy.signal.convolve2d(image, random_filter)[::5, ::5]

In the following loop, ray.put places the image into shared memory. The call to ray.get retrieves the result.

start = time.time()
image = np.zeros((3000, 3000))
for _ in range(100):
    image_id = ray.put(image)
    ray.get([fray.remote(image_id, filters[i]) for i in range(num_cpus)])
duration_ray = time.time() - start
print("Ray duration = {:.1f}, speedup = {:.2f}"
      .format(duration_ray, duration_conv*num_cpus / duration_ray))
Ray duration = 86.4, speedup = 1.72
ray.shutdown()

Process in parallel using multiprocessing#

Documentation for multiprocessing

Use scipy.signal to convolve two 2-dimensional arrays and return a 5x5 downsampled result. The call to the function has a slightly different form than that for the serial loop.

# Note: Mac and Windows users may need to copy the contents of this cell into a separate '.py' file
# and then import it in order to use the `fmp` function with `multiprocessing`. This has to do with
# differences in what does / does not get copied into the child processes in different operating systems.
import scipy.signal

def fmp(args):
    image, random_filter = args
    return scipy.signal.convolve2d(image, random_filter)[::5, ::5]

Use a multiprocessing pool with the number of cpus we found earlier.

pool = Pool(num_cpus)

Using pool.map is the closest analog in multiprocessing to the Ray API.

start = time.time()
image = np.zeros((3000, 3000))
for _ in range(100):
    pool.map(fmp, zip(num_cpus * [image], filters))
duration_mp = time.time() - start
print("Multiprocessing duration = {:.1f}, speedup = {:.2f}"
      .format(duration_mp, duration_conv*num_cpus / duration_mp))
Multiprocessing duration = 162.1, speedup = 0.92

Process using Dask#

Documentation for Dask

Define a Dask distributed client with number of workers set to the number of cpus we found earlier, and with one thread per worker.

client = Client(n_workers=num_cpus, threads_per_worker=1)
print(client)
<Client: 'tcp://127.0.0.1:37643' processes=4 threads=4, memory=15.62 GiB>

Dask recommends scattering the large inputs across the workers, though this makes little difference in execution time.

start = time.time()
image = np.zeros((3000, 3000))
for _ in range(100):
    for j in range(num_cpus):
        big_future = client.scatter((image, filters[j % num_cpus]))
        future = client.submit(fmp, big_future)
duration_dask = time.time() - start
print("Dask duration = {:.1f}, speedup = {:.2f}"
      .format(duration_dask, duration_conv*num_cpus / duration_dask))
Dask duration = 104.0, speedup = 1.43
client.close()
2025-02-19 23:20:01,333 - distributed.worker.state_machine - WARNING - Async instruction for <Task cancelled name="execute('fmp-cc16fcd2a38c195f5789c2ad8feeb734')" coro=<Worker.execute() done, defined at /home/runner/work/irsa-tutorials/irsa-tutorials/.tox/py311-buildhtml/lib/python3.11/site-packages/distributed/worker_state_machine.py:3607>> ended with CancelledError
2025-02-19 23:20:01,416 - distributed.worker.state_machine - WARNING - Async instruction for <Task cancelled name="execute('fmp-ec5d29fb4617f353f7d4d63bff8e4d59')" coro=<Worker.execute() done, defined at /home/runner/work/irsa-tutorials/irsa-tutorials/.tox/py311-buildhtml/lib/python3.11/site-packages/distributed/worker_state_machine.py:3607>> ended with CancelledError

Conclusions#

  • Ray is the most effective at speeding up the convolution workload by fully utilizing all available processes

  • Multiprocessing is second in effectiveness

  • Dask delivers the least speedup; perhaps due to having only six processes on the dask.distributed client


About this notebook#

Author: David Shupe in conjunction with Jessica Krick and the IRSA Science Platform team at IPAC.

Updated: 2024-09-24

Contact: the IRSA Helpdesk with questions or reporting problems.

Citations#

If you use these software packages in your work, please use the following citations:

  • Dask: Dask Development Team (2016). Dask: Library for dynamic task scheduling. URL https://dask.org

  • Ray: The Ray Development Team. URL https://docs.ray.io