Skip to article frontmatterSkip to article content
IRSA Tutorials

Parallelizing image convolution

Learning Goals

By the end of this tutorial, you will be able to:

Introduction

This notebook shows how to speed up an image convolution task using these three libraries:

Imports

# Uncomment the next line to install dependencies if needed.
# !pip install dask[distributed] numpy ray scipy
from multiprocessing import Pool
import time

from dask.distributed import Client
import numpy as np
import psutil
import scipy.signal
import ray

Find the cpus available

Find and print the number of cpus (taken from https://towardsdatascience.com/10x-faster-parallel-python-without-python-multiprocessing-e5017c93cce1)

num_cpus = psutil.cpu_count(logical=True)
print(num_cpus)

Process serially using a conventional loop

Use scipy.signal to convolve two 2-dimensional arrays and return a 5x5 downsampled result.

def fconv(image, random_filter):
    return scipy.signal.convolve2d(image, random_filter)[::5, ::5]
filters = [np.random.normal(size=(4, 4)) for _ in range(num_cpus)]

Process 100 iterations serially, then extrapolate to num_cpus*100

start = time.time()
num_iter = 100
image = np.zeros((3000, 3000))
for i in range(num_iter):
    result = fconv(image, filters[i % num_cpus])
duration_conv = time.time() - start
print("(scaled) conventional duration for {:d} iterations = {:.1f} seconds"
      .format(num_cpus*num_iter, duration_conv*num_cpus))

Process in parallel using Ray

Documentation for ray

The warning raised by ray.init only affects shared object usage, which is not an issue for this tutorial. It may harm performance in other scenarios.

ray.init(num_cpus=num_cpus)

Use scipy.signal to convolve two 2-dimensional arrays and return a 5x5 downsampled result. To use Ray, we decorate the function that is doing the work.

@ray.remote
def fray(image, random_filter):
    return scipy.signal.convolve2d(image, random_filter)[::5, ::5]

In the following loop, ray.put places the image into shared memory. The call to ray.get retrieves the result.

start = time.time()
image = np.zeros((3000, 3000))
for _ in range(100):
    image_id = ray.put(image)
    ray.get([fray.remote(image_id, filters[i]) for i in range(num_cpus)])
duration_ray = time.time() - start
print("Ray duration = {:.1f}, speedup = {:.2f}"
      .format(duration_ray, duration_conv*num_cpus / duration_ray))
ray.shutdown()

Process in parallel using multiprocessing

Documentation for multiprocessing

Use scipy.signal to convolve two 2-dimensional arrays and return a 5x5 downsampled result. The call to the function has a slightly different form than that for the serial loop.

# Note: Mac and Windows users may need to copy the contents of this cell into a separate '.py' file
# and then import it in order to use the `fmp` function with `multiprocessing`. This has to do with
# differences in what does / does not get copied into the child processes in different operating systems.
import scipy.signal

def fmp(args):
    image, random_filter = args
    return scipy.signal.convolve2d(image, random_filter)[::5, ::5]

Use a multiprocessing pool with the number of cpus we found earlier.

pool = Pool(num_cpus)

Using pool.map is the closest analog in multiprocessing to the Ray API.

start = time.time()
image = np.zeros((3000, 3000))
for _ in range(100):
    pool.map(fmp, zip(num_cpus * [image], filters))
duration_mp = time.time() - start
print("Multiprocessing duration = {:.1f}, speedup = {:.2f}"
      .format(duration_mp, duration_conv*num_cpus / duration_mp))

Process using Dask

Documentation for Dask

Define a Dask distributed client with number of workers set to the number of cpus we found earlier, and with one thread per worker.

client = Client(n_workers=num_cpus, threads_per_worker=1)
print(client)

Dask recommends scattering the large inputs across the workers, though this makes little difference in execution time.

start = time.time()
image = np.zeros((3000, 3000))
for _ in range(100):
    for j in range(num_cpus):
        big_future = client.scatter((image, filters[j % num_cpus]))
        future = client.submit(fmp, big_future)
duration_dask = time.time() - start
print("Dask duration = {:.1f}, speedup = {:.2f}"
      .format(duration_dask, duration_conv*num_cpus / duration_dask))
client.close()

Conclusions


About this notebook

Author: David Shupe in conjunction with Jessica Krick and the IRSA Science Platform team at IPAC.

Updated: 2024-09-24

Contact: the IRSA Helpdesk with questions or reporting problems.

Citations

If you use these software packages in your work, please use the following citations: