Working with numerical data in shared memory (memmaping)¶
By default the workers of the pool are real Python processes forked using the
multiprocessing
module of the Python standard library when n_jobs != 1
.
The arguments passed as input to the Parallel
call are serialized and
reallocated in the memory of each worker process.
This can be problematic for large arguments as they will be reallocated
n_jobs
times by the workers.
As this problem can often occur in scientific computing with numpy
based datastructures, joblib.Parallel
provides a special
handling for large arrays to automatically dump them on the filesystem
and pass a reference to the worker to open them as memory map
on that file using the numpy.memmap
subclass of numpy.ndarray
.
This makes it possible to share a segment of data between all the
worker processes.
Note
The following only applies with the default "multiprocessing"
backend. If
your code can release the GIL, then using backend="threading"
is even
more efficient.
Automated array to memmap conversion¶
The automated array to memmap conversion is triggered by a configurable threshold on the size of the array:
>>> import numpy as np
>>> from joblib import Parallel, delayed
>>> from joblib.pool import has_shareable_memory
>>> Parallel(n_jobs=2, max_nbytes=1e6)(
... delayed(has_shareable_memory)(np.ones(int(i)))
... for i in [1e2, 1e4, 1e6])
[False, False, True]
By default the data is dumped to the /dev/shm
shared-memory partition if it
exists and writeable (typically the case under Linux). Otherwise the operating
system’s temporary folder is used. The location of the temporary data files can
be customized by passing a temp_folder
argument to the Parallel
constructor.
Passing max_nbytes=None
makes it possible to disable the automated array to
memmap conversion.
Manual management of memmaped input data¶
For even finer tuning of the memory usage it is also possible to dump the array as an memmap directly from the parent process to free the memory before forking the worker processes. For instance let’s allocate a large array in the memory of the parent process:
>>> large_array = np.ones(int(1e6))
Dump it to a local file for memmaping:
>>> import tempfile
>>> import os
>>> from joblib import load, dump
>>> temp_folder = tempfile.mkdtemp()
>>> filename = os.path.join(temp_folder, 'joblib_test.mmap')
>>> if os.path.exists(filename): os.unlink(filename)
>>> _ = dump(large_array, filename)
>>> large_memmap = load(filename, mmap_mode='r+')
The large_memmap
variable is pointing to a numpy.memmap
instance:
>>> large_memmap.__class__.__name__, large_array.nbytes, large_array.shape
('memmap', 8000000, (1000000,))
>>> np.allclose(large_array, large_memmap)
True
We can free the original array from the main process memory:
>>> del large_array
>>> import gc
>>> _ = gc.collect()
It is possible to slice large_memmap
into a smaller memmap:
>>> small_memmap = large_memmap[2:5]
>>> small_memmap.__class__.__name__, small_memmap.nbytes, small_memmap.shape
('memmap', 24, (3,))
Finally we can also take a np.ndarray
view backed on that same
memory mapped file:
>>> small_array = np.asarray(small_memmap)
>>> small_array.__class__.__name__, small_array.nbytes, small_array.shape
('ndarray', 24, (3,))
All those three datastructures point to the same memory buffer and
this same buffer will also be reused directly by the worker processes
of a Parallel
call:
>>> Parallel(n_jobs=2, max_nbytes=None)(
... delayed(has_shareable_memory)(a)
... for a in [large_memmap, small_memmap, small_array])
[True, True, True]
Note that here we used max_nbytes=None
to disable the auto-dumping
feature of Parallel
. small_array
is still in shared memory in the
worker processes because it was already backed by shared memory in the
parent process.
The pickling machinery of Parallel
multiprocessing queues are
able to detect this situation and optimize it on the fly to limit
the number of memory copies.
Writing parallel computation results in shared memory¶
If you open your data using the w+
or r+
mode in the main program, the
worker will get r+
mode access. Thus the worker will be able to write
its results directly to the original data, alleviating the need of the
serialization to send back the results to the parent process.
Here is an example script on parallel processing with preallocated
numpy.memmap
datastructures:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 | """Demonstrate the usage of numpy.memmap with joblib.Parallel
This example shows how to preallocate data in memmap arrays both for input and
output of the parallel worker processes.
Sample output for this program::
[Worker 93486] Sum for row 0 is -1599.756454
[Worker 93487] Sum for row 1 is -243.253165
[Worker 93488] Sum for row 3 is 610.201883
[Worker 93489] Sum for row 2 is 187.982005
[Worker 93489] Sum for row 7 is 326.381617
[Worker 93486] Sum for row 4 is 137.324438
[Worker 93489] Sum for row 8 is -198.225809
[Worker 93487] Sum for row 5 is -1062.852066
[Worker 93488] Sum for row 6 is 1666.334107
[Worker 93486] Sum for row 9 is -463.711714
Expected sums computed in the parent process:
[-1599.75645426 -243.25316471 187.98200458 610.20188337 137.32443803
-1062.85206633 1666.33410715 326.38161713 -198.22580876 -463.71171369]
Actual sums computed by the worker processes:
[-1599.75645426 -243.25316471 187.98200458 610.20188337 137.32443803
-1062.85206633 1666.33410715 326.38161713 -198.22580876 -463.71171369]
"""
import tempfile
import shutil
import os
import numpy as np
from joblib import Parallel, delayed
from joblib import load, dump
def sum_row(input, output, i):
"""Compute the sum of a row in input and store it in output"""
sum_ = input[i, :].sum()
print("[Worker %d] Sum for row %d is %f" % (os.getpid(), i, sum_))
output[i] = sum_
if __name__ == "__main__":
rng = np.random.RandomState(42)
folder = tempfile.mkdtemp()
samples_name = os.path.join(folder, 'samples')
sums_name = os.path.join(folder, 'sums')
try:
# Generate some data and an allocate an output buffer
samples = rng.normal(size=(10, int(1e6)))
# Pre-allocate a writeable shared memory map as a container for the
# results of the parallel computation
sums = np.memmap(sums_name, dtype=samples.dtype,
shape=samples.shape[0], mode='w+')
# Dump the input data to disk to free the memory
dump(samples, samples_name)
# Release the reference on the original in memory array and replace it
# by a reference to the memmap array so that the garbage collector can
# release the memory before forking. gc.collect() is internally called
# in Parallel just before forking.
samples = load(samples_name, mmap_mode='r')
# Fork the worker processes to perform computation concurrently
Parallel(n_jobs=4)(delayed(sum_row)(samples, sums, i)
for i in range(samples.shape[0]))
# Compare the results from the output buffer with the ground truth
print("Expected sums computed in the parent process:")
expected_result = samples.sum(axis=1)
print(expected_result)
print("Actual sums computed by the worker processes:")
print(sums)
assert np.allclose(expected_result, sums)
finally:
try:
shutil.rmtree(folder)
except:
print("Failed to delete: " + folder)
|
Warning
Having concurrent workers write on overlapping shared memory data segments, for instance by using inplace operators and assignments on a numpy.memmap instance, can lead to data corruption as numpy does not offer atomic operations. The previous example does not risk that issue as each task is updating an exclusive segment of the shared result array.
Some C/C++ compilers offer lock-free atomic primitives such as add-and-fetch or compare-and-swap that could be exposed to Python via CFFI for instance. However providing numpy-aware atomic constructs is outside of the scope of the joblib project.
A final note: don’t forget to clean up any temporary folder when you are done with the computation:
>>> import shutil
>>> try:
... shutil.rmtree(temp_folder)
... except OSError:
... pass # this can sometimes fail under Windows