On Sharing Large Arrays When Using Python's Multiprocessing
Date: 3-7 2018
Tags: python, programming
Recently, I was asked about sharing large numpy arrays when using Python's multiprocessing.Pool
. While not explicitly documented, this is indeed possible. I will write about this small trick in this short article. Hope it helps :) It should be noted that I am using Python 3.6. Therefore this tutorial may not work on earlier versions of Python.
The multiprocessing.Pool
provides easy ways to parallel CPU bound tasks in Python. However, unlike multithreading, when pass arguments to the the child processes, these data in the arguments must be pickled. In most cases this is fine. For some scenarios, it is not. Now suppose that we have a big data matrix. We want each child process to perform independent tasks based on this data matrix, and they only need read only access to the data matrix. If we send the data matrix to each child process normally, it will be pickled each time we schedule a new task, and lots of memory space and bandwidth will be wasted. To mitigate this problem, we can share the data matrix among the child processes.
The multiprocessing
package provides the following sharable objects: RawValue
, RawArray
, Value
, Array
. The details can be found here. Basically, RawValue
and RawArray
do not come with a lock, while Value
and Array
do. Because we only need read only access and we want to share a matrix, we will use RawArray
. The following code will create a RawArray
of doubles:
# Create an 100-element shared array of double precision without a lock.
from multiprocessing import RawArray
X = RawArray('d', 100)
This RawArray
is an 1D array, or a chunk of memory that will be used to hold the data matrix. To make it easier to manipulate its data, we can wrap it as an numpy
array by using the frombuffer
function. The following code shows how a RawArray
is wrapped as an numpy
array and how numpy.copyto
can be used to fill the RawArray
with existing data.
X_shape = (16, 1000000)
# Randomly generate some data
data = np.random.randn(*X_shape)
X = RawArray('d', X_shape[0] * X_shape[1])
# Wrap X as an numpy array so we can easily manipulates its data.
X_np = np.frombuffer(X, dtype=np.float64).reshape(X_shape)
# Copy data to our shared array.
np.copyto(X_np, data)
Note: you do not need to copy all the data to the newly created
RawArray
at once. For instance, you can fill it row by row using something likeX_np[i,:] = load_row(i)
.
Important:
X_np
should NOT be shared with child processes.
Now we are ready to share the data matrix with child processes. Basically, each child process need to have access to X
and X_shape
(X_shape
can be copied to each process without sharing). Python requires the shared object to be shared by inheritance. Therefore, we cannot pass X
as an argument when using Pool.map
or Pool.apply_async
. Instead, when creating the pool, we specify a initializer
and its initargs
. The initargs
will contain our X
and X_shape
. The initializer
will be called when the child process is initialized and is responsible to store X
and X_shape
as a global variable. A simplest implementation of this initializer
looks like the following:
# A global dictionary storing the variables passed from the initializer.
var_dict = {}
def init_worker(X, X_shape):
# Using a dictionary is not strictly necessary. You can also
# use global variables.
var_dict['X'] = X
var_dict['X_shape'] = X_shape
The pool is then launched via:
from multiprocessing import Pool
with Pool(processes=4, initializer=init_worker, initargs=(X, X_shape)) as pool:
# Schedule tasks here
Now we can write our worker function. As a toy example, our worker function will simply accept one argument, the row index i, and compute the sum of the i-th row of X
. Because both X
and X_shape
are stored in var_dict
now, the worker function is as simple as follows:
def worker_func(i):
# Simply computes the sum of the i-th row of the input matrix X
X_np = np.frombuffer(var_dict['X']).reshape(var_dict['X_shape'])
time.sleep(1) # Some heavy computations
return np.asscalar(np.sum(X_np[i,:]))
Note that because X
is a RawArray
, we need to wrap it again using the shape information X_shape
before using numpy
functions.
With the worker function ready, we can use Pool.map
to assign the tasks of summing each row to the child processes and collect the results. Putting everything together, we have:
import numpy as np
import time
from multiprocessing import Pool, RawArray
# A global dictionary storing the variables passed from the initializer.
var_dict = {}
def init_worker(X, X_shape):
# Using a dictionary is not strictly necessary. You can also
# use global variables.
var_dict['X'] = X
var_dict['X_shape'] = X_shape
def worker_func(i):
# Simply computes the sum of the i-th row of the input matrix X
X_np = np.frombuffer(var_dict['X']).reshape(var_dict['X_shape'])
time.sleep(1) # Some heavy computations
return np.asscalar(np.sum(X_np[i,:]))
# We need this check for Windows to prevent infinitely spawning new child
# processes.
if __name__ == '__main__':
X_shape = (16, 1000000)
# Randomly generate some data
data = np.random.randn(*X_shape)
X = RawArray('d', X_shape[0] * X_shape[1])
# Wrap X as an numpy array so we can easily manipulates its data.
X_np = np.frombuffer(X).reshape(X_shape)
# Copy data to our shared array.
np.copyto(X_np, data)
# Start the process pool and do the computation.
# Here we pass X and X_shape to the initializer of each worker.
# (Because X_shape is not a shared variable, it will be copied to each
# child process.)
with Pool(processes=4, initializer=init_worker, initargs=(X, X_shape)) as pool:
result = pool.map(worker_func, range(X_shape[0]))
print('Results (pool):\n', np.array(result))
# Should print the same results.
print('Results (numpy):\n', np.sum(X_np, 1))
Writing to Share Arrays ⚠️
It is also possible to write data to a shared array from the child process. However, we have to be careful because unsynchronized writes may lead to unexpected results.
If each child process writes to non-overlapping parts of the array, everything will be fine. If not, we will need proper locking to avoid racing conditions. When locking is required, we can use Array
instead of RawArray
. Additionally, when wrapping a Array
with a lock as an numpy
array, we need to use the following instead:
np.from_buffer(X.get_obj()).reshape(X_shape)