# On Sharing Large Arrays When Using Python's Multiprocessing

Date: 3-7 2018

Tags: python, programming

Recently, I was asked about sharing large numpy arrays when using Python's `multiprocessing.Pool`

. While not explicitly documented, this is indeed possible. I will write about this small trick in this short article. Hope it helps :) It should be noted that I am using Python 3.6. Therefore this tutorial may not work on earlier versions of Python.

The `multiprocessing.Pool`

provides easy ways to parallel CPU bound tasks in Python. However, unlike multithreading, when pass arguments to the the child processes, these data in the arguments must be pickled. In most cases this is fine. For some scenarios, it is not. Now suppose that we have a big data matrix. We want each child process to perform independent tasks based on this data matrix, and they only need **read only access** to the data matrix. If we send the data matrix to each child process normally, it will be pickled each time we schedule a new task, and lots of memory space and bandwidth will be wasted. To mitigate this problem, we can share the data matrix among the child processes.

The `multiprocessing`

package provides the following sharable objects: `RawValue`

, `RawArray`

, `Value`

, `Array`

. The details can be found here. Basically, `RawValue`

and `RawArray`

do not come with a lock, while `Value`

and `Array`

do. Because we only need read only access and we want to share a matrix, we will use `RawArray`

. The following code will create a `RawArray`

of doubles:

```
# Create an 100-element shared array of double precision without a lock.
from multiprocessing import RawArray
X = RawArray('d', 100)
```

This `RawArray`

is an 1D array, or a chunk of memory that will be used to hold the data matrix. To make it easier to manipulate its data, we can wrap it as an `numpy`

array by using the `frombuffer`

function. The following code shows how a `RawArray`

is wrapped as an `numpy`

array and how `numpy.copyto`

can be used to fill the `RawArray`

with existing data.

```
X_shape = (16, 1000000)
# Randomly generate some data
data = np.random.randn(*X_shape)
X = RawArray('d', X_shape[0] * X_shape[1])
# Wrap X as an numpy array so we can easily manipulates its data.
X_np = np.frombuffer(X, dtype=np.float64).reshape(X_shape)
# Copy data to our shared array.
np.copyto(X_np, data)
```

Note:you do not need to copy all the data to the newly created`RawArray`

at once. For instance, you can fill it row by row using something like`X_np[i,:] = load_row(i)`

.

Important:`X_np`

should NOT be shared with child processes.

Now we are ready to share the data matrix with child processes. Basically, each child process need to have access to `X`

and `X_shape`

(`X_shape`

can be copied to each process without sharing). Python requires the shared object to be shared by inheritance. Therefore, we cannot pass `X`

as an argument when using `Pool.map`

or `Pool.apply_async`

. Instead, when creating the pool, we specify a `initializer`

and its `initargs`

. The `initargs`

will contain our `X`

and `X_shape`

. The `initializer`

will be called when the child process is initialized and is responsible to store `X`

and `X_shape`

as a global variable. A simplest implementation of this `initializer`

looks like the following:

```
# A global dictionary storing the variables passed from the initializer.
var_dict = {}
def init_worker(X, X_shape):
# Using a dictionary is not strictly necessary. You can also
# use global variables.
var_dict['X'] = X
var_dict['X_shape'] = X_shape
```

The pool is then launched via:

```
from multiprocessing import Pool
with Pool(processes=4, initializer=init_worker, initargs=(X, X_shape)) as pool:
# Schedule tasks here
```

Now we can write our worker function. As a toy example, our worker function will simply accept one argument, the row index i, and compute the sum of the i-th row of `X`

. Because both `X`

and `X_shape`

are stored in `var_dict`

now, the worker function is as simple as follows:

```
def worker_func(i):
# Simply computes the sum of the i-th row of the input matrix X
X_np = np.frombuffer(var_dict['X']).reshape(var_dict['X_shape'])
time.sleep(1) # Some heavy computations
return np.asscalar(np.sum(X_np[i,:]))
```

Note that because `X`

is a `RawArray`

, we need to wrap it again using the shape information `X_shape`

before using `numpy`

functions.

With the worker function ready, we can use `Pool.map`

to assign the tasks of summing each row to the child processes and collect the results. Putting everything together, we have:

```
import numpy as np
import time
from multiprocessing import Pool, RawArray
# A global dictionary storing the variables passed from the initializer.
var_dict = {}
def init_worker(X, X_shape):
# Using a dictionary is not strictly necessary. You can also
# use global variables.
var_dict['X'] = X
var_dict['X_shape'] = X_shape
def worker_func(i):
# Simply computes the sum of the i-th row of the input matrix X
X_np = np.frombuffer(var_dict['X']).reshape(var_dict['X_shape'])
time.sleep(1) # Some heavy computations
return np.asscalar(np.sum(X_np[i,:]))
# We need this check for Windows to prevent infinitely spawning new child
# processes.
if __name__ == '__main__':
X_shape = (16, 1000000)
# Randomly generate some data
data = np.random.randn(*X_shape)
X = RawArray('d', X_shape[0] * X_shape[1])
# Wrap X as an numpy array so we can easily manipulates its data.
X_np = np.frombuffer(X).reshape(X_shape)
# Copy data to our shared array.
np.copyto(X_np, data)
# Start the process pool and do the computation.
# Here we pass X and X_shape to the initializer of each worker.
# (Because X_shape is not a shared variable, it will be copied to each
# child process.)
with Pool(processes=4, initializer=init_worker, initargs=(X, X_shape)) as pool:
result = pool.map(worker_func, range(X_shape[0]))
print('Results (pool):\n', np.array(result))
# Should print the same results.
print('Results (numpy):\n', np.sum(X_np, 1))
```

## Writing to Share Arrays ⚠️

It is also possible to write data to a shared array from the child process. However, we have to be careful because unsynchronized writes may lead to unexpected results.

If each child process writes to non-overlapping parts of the array, everything will be fine. If not, we will need proper locking to avoid racing conditions. When locking is required, we can use `Array`

instead of `RawArray`

. Additionally, when wrapping a `Array`

with a lock as an `numpy`

array, we need to use the following instead:

```
np.from_buffer(X.get_obj()).reshape(X_shape)
```