Mianzhi Wang

Ph.D. in Electrical Engineering

On Sharing Large Arrays When Using Python's Multiprocessing


Recently, I was asked about sharing large numpy arrays when using Python's multiprocessing.Pool. While not explicitly documented, this is indeed possible. I will write about this small trick in this short article. Hope it helps :) It should be noted that I am using Python 3.6. Therefore this tutorial may not work on earlier versions of Python.

The multiprocessing.Pool provides easy ways to parallel CPU bound tasks in Python. However, unlike multithreading, when pass arguments to the the child processes, these data in the arguments must be pickled. In most cases this is fine. For some scenarios, it is not. Now suppose that we have a big data matrix. We want each child process to perform independent tasks based on this data matrix, and they only need read only access to the data matrix. If we send the data matrix to each child process normally, it will be pickled each time we schedule a new task, and lots of memory space and bandwidth will be wasted. To mitigate this problem, we can share the data matrix among the child processes.

The multiprocessing package provides the following sharable objects: RawValue, RawArray, Value, Array. The details can be found here. Basically, RawValue and RawArray do not come with a lock, while Value and Array do. Because we only need read only access and we want to share a matrix, we will use RawArray. The following code will create a RawArray of doubles:

# Create an 100-element shared array of double precision without a lock.
from multiprocessing import RawArray
X = RawArray('d', 100)

This RawArray is an 1D array, or a chunk of memory that will be used to hold the data matrix. To make it easier to manipulate its data, we can wrap it as an numpy array by using the frombuffer function. The following code shows how a RawArray is wrapped as an numpy array and how numpy.copyto can be used to fill the RawArray with existing data.

X_shape = (16, 1000000)
# Randomly generate some data
data = np.random.randn(*X_shape)
X = RawArray('d', X_shape[0] * X_shape[1])
# Wrap X as an numpy array so we can easily manipulates its data.
X_np = np.frombuffer(X, dtype=np.float64).reshape(X_shape)
# Copy data to our shared array.
np.copyto(X_np, data)

Note: you do not need to copy all the data to the newly created RawArray at once. For instance, you can fill it row by row using something like X_np[i,:] = load_row(i).

Important: X_np should NOT be shared with child processes.

Now we are ready to share the data matrix with child processes. Basically, each child process need to have access to X and X_shape (X_shape can be copied to each process without sharing). Python requires the shared object to be shared by inheritance. Therefore, we cannot pass X as an argument when using Pool.map or Pool.apply_async. Instead, when creating the pool, we specify a initializer and its initargs. The initargs will contain our X and X_shape. The initializer will be called when the child process is initialized and is responsible to store X and X_shape as a global variable. A simplest implementation of this initializer looks like the following:

# A global dictionary storing the variables passed from the initializer.
var_dict = {}

def init_worker(X, X_shape):
    # Using a dictionary is not strictly necessary. You can also
    # use global variables.
    var_dict['X'] = X
    var_dict['X_shape'] = X_shape

The pool is then launched via:

from multiprocessing import Pool
with Pool(processes=4, initializer=init_worker, initargs=(X, X_shape)) as pool:
    # Schedule tasks here

Now we can write our worker function. As a toy example, our worker function will simply accept one argument, the row index i, and compute the sum of the i-th row of X. Because both X and X_shape are stored in var_dict now, the worker function is as simple as follows:

def worker_func(i):
    # Simply computes the sum of the i-th row of the input matrix X
    X_np = np.frombuffer(var_dict['X']).reshape(var_dict['X_shape'])
    time.sleep(1) # Some heavy computations
    return np.asscalar(np.sum(X_np[i,:]))

Note that because X is a RawArray, we need to wrap it again using the shape information X_shape before using numpy functions.

With the worker function ready, we can use Pool.map to assign the tasks of summing each row to the child processes and collect the results. Putting everything together, we have:

import numpy as np
import time
from multiprocessing import Pool, RawArray

# A global dictionary storing the variables passed from the initializer.
var_dict = {}

def init_worker(X, X_shape):
    # Using a dictionary is not strictly necessary. You can also
    # use global variables.
    var_dict['X'] = X
    var_dict['X_shape'] = X_shape

def worker_func(i):
    # Simply computes the sum of the i-th row of the input matrix X
    X_np = np.frombuffer(var_dict['X']).reshape(var_dict['X_shape'])
    time.sleep(1) # Some heavy computations
    return np.asscalar(np.sum(X_np[i,:]))

# We need this check for Windows to prevent infinitely spawning new child
# processes.
if __name__ == '__main__':
    X_shape = (16, 1000000)
    # Randomly generate some data
    data = np.random.randn(*X_shape)
    X = RawArray('d', X_shape[0] * X_shape[1])
    # Wrap X as an numpy array so we can easily manipulates its data.
    X_np = np.frombuffer(X).reshape(X_shape)
    # Copy data to our shared array.
    np.copyto(X_np, data)
    # Start the process pool and do the computation.
    # Here we pass X and X_shape to the initializer of each worker.
    # (Because X_shape is not a shared variable, it will be copied to each
    # child process.)
    with Pool(processes=4, initializer=init_worker, initargs=(X, X_shape)) as pool:
        result = pool.map(worker_func, range(X_shape[0]))
        print('Results (pool):\n', np.array(result))
    # Should print the same results.
    print('Results (numpy):\n', np.sum(X_np, 1))

Writing to Share Arrays ⚠️

It is also possible to write data to a shared array from the child process. However, we have to be careful because unsynchronized writes may lead to unexpected results.

If each child process writes to non-overlapping parts of the array, everything will be fine. If not, we will need proper locking to avoid racing conditions. When locking is required, we can use Array instead of RawArray. Additionally, when wrapping a Array with a lock as an numpy array, we need to use the following instead:

np.from_buffer(X.get_obj()).reshape(X_shape)