Communicator
Communicator transfers parameters over the compute graphs.
This is an alias to communicator.py.
Communicator interface
- class nnabla.communicators.Communicator
Communicator interface class.
Communicator exchanges data (e.g., gradient) using MPI-like collectives. This class is used for the distributed training.
- abort(self)
Terminates MPI execution environment
- add_context_and_parameters(self, ctx_param_dict)
Add context and parameters.
- all_gather(self, ndarray, ndarray_list, string group='world')
All gather over data in different device.
- Parameters:
ndarray (
NdArray
) – Data to be gathered.ndarray_list (
NdArray
) – Data to be saved.group (string) – Name of a group. This groups is used when the collective is called.
Example:
# Run like `mpirun -n 2 python <code_snippet.py>` # note: the order of the output to stdout are stochastic because of multiprocesses. # Communicator and Context import numpy as np import nnabla as nn import nnabla.communicators as C from nnabla.ext_utils import get_extension_context extension_module = "cudnn" ctx = get_extension_context(extension_module) comm = C.MultiProcessCommunicator(ctx) comm.init() # Data x = nn.Variable([2, 2]) x.d = np.random.rand(*x.shape) y_list = [nn.Variable([2, 2]), nn.Variable([2, 2])] print("Before the collective ({}-th)".format(comm.rank)) print(x.d) # AllGather comm.all_gather(x.data, [y.data for y in y_list]) # Check print("After the collective ({}-th)".format(comm.rank)) for y in y_list: print(y.d)
- all_reduce(self, data, bool division=False, bool inplace=False, string group='world')
All reduce over data in different device.
- Parameters:
data (
NdArray
or list ofNdArray
) –division (bool) – Flag to divide the reduce data by the number of
contexts
added, or the number of devices.inplace (bool) – Flag to use a packed array. Default is false. When true, it is memory-efficient but slow. When false, it is not memory efficient but fast. In both case, one can get the result in the same memory region.
group (string) – Name of a group. This groups is used when the collective is called.
Example:
# Run like `mpirun -n 2 python <code_snippet.py>` # note: the order of the output to stdout are stochastic because of multiprocesses. # Communicator and Context import numpy as np import nnabla as nn import nnabla.communicators as C from nnabla.ext_utils import get_extension_context extension_module = "cudnn" ctx = get_extension_context(extension_module) comm = C.MultiProcessCommunicator(ctx) comm.init() # Data x_list = [nn.Variable([2, 2]), nn.Variable([2, 2])] print("Before the collective ({}-th)".format(comm.rank)) for x in x_list: x.d = np.random.rand(*x.shape) print(x.d) # AllReduce comm.all_reduce([x.data for x in x_list], inplace=True) # Check print("After the collective ({}-th)".format(comm.rank)) for x in x_list: print(x.d)
- all_reduce_callback(self, data, size_t pack_size, bool division=False, string group='world', float scale_grad=1, bool keep_dtype=False)
All reduce over data in different device.
Note
This function does not support shared parameters (such as RNNs) currently.
- Parameters:
data (
NdArray
or list ofNdArray
) –pack_size (int) – The number of values contained in the packed data.
division (bool) – Flag to divide the reduce data by the number of
contexts
added, or the number of devices.group (string) – Name of a group. This groups is used when the collective is called.
scale_grad (float) – Apply scaling by the specified factor before performing all-reduce. This is useful when you apply loss scaling in mixed precision training and cancel it for gradient arrays before all-reduce.
keep_dtype (bool) – If True, the dtype of arrays is kept the same regardless of communicator’s dtype used in all-reduce operation. This is useful when you use the all-reduce callback in mixed precision training and when any of gradient
NdArray`s is narrowed by :py:meth:`NdArray.narrow
. In this case, you will get an error unless you specify True because a narrowed array prohibits dtype casting.
Example:
In case of the multi-process data parallel distributed training,
# Run like `mpirun -n 2 python <code_snippet.py>` # Communicator and Context import numpy as np import nnabla as nn import nnabla.communicators as C from nnabla.ext_utils import get_extension_context extension_module = "cudnn" ctx = get_extension_context(extension_module) comm = C.MultiProcessCommunicator(ctx) comm.init() n_class = 2 b, c, h, w = 4, 1, 32, 32 # Data x = nn.Variable([b, c, h, w]) y = nn.Variable([b, 1]) # Network setting h = PF.convolution(x, 1, (3, 3), (1, 1), (1, 1)) pred = PF.affine(h, 2) loss = F.mean(F.softmax_cross_entropy(pred, y)) loss.forward() # AllReduce during backward loss.backward(communicator_callbacks = comm.all_reduce_callback([v.grad for v in nn.get_parameters().values()], 1024 * 1024 * 2))
- allreduce(self, bool division=False, bool inplace=False)
Deprecated. See all_reduce, instead.
Allreduce over parameters added. Currently,
allreduce
is applied to gradient regions.- Parameters:
division (bool) – Flag to divide the reduce data by the number of
contexts
added, or the number of devices.inplace (bool) – Flag to use a packed array. Default is false. When true, it is memory-efficient but slow. When false, it is not memory efficient but fast. In both case, one can get the result in the same memory region.
- barrier(self)
Blocks until all processes in the communicator have reached this routine.
- bcast(self, data, int src, bool inplace=False, string group='world')
Broadcast data to different devices.
- Parameters:
data (
NdArray
or list ofNdArray
) –src (int) – Source rank where the data is broadcasted.
inplace (bool) – Flag to use a packed array. Default is false. When true, it is memory-efficient but slow. When false, it is not memory efficient but fast. In both case, one can get the result in the same memory region.
group (string) – Name of a group. This groups is used when the collective is called.
Example:
# Run like `mpirun -n 2 python <code_snippet.py>` # note: the order of the output to stdout are stochastic because of multiprocesses. # Communicator and Context import numpy as np import nnabla as nn import nnabla.communicators as C from nnabla.ext_utils import get_extension_context extension_module = "cudnn" ctx = get_extension_context(extension_module) comm = C.MultiProcessCommunicator(ctx) comm.init() # Data x_list = [nn.Variable([2, 2]), nn.Variable([2, 2])] print("Before the collective ({}-th)".format(comm.rank)) for x in x_list: x.d = np.random.rand(*x.shape) print(x.d) # Bcast comm.bcast([x.data for x in x_list], src=0, inplace=True) # Check print("After the collective ({}-th)".format(comm.rank)) for x in x_list: print(x.d)
- clear_context_parameters(self)
Clear all registered contexts and parameters.
- find_group(self, group)
Return the list of ranks in the group. If the group does not exist, the empty list is returned.
- init(self)
Initialize a communicator.
Initall or initrank, depending multi-threads or multi-processes. This function MUST be called after all parameters communicated are added by
add_context_and_parameters
.
- local_rank
Get local rank of communicator.
- name
Get communicator name.
- new_group(self, name_ranks)
-
Example:
# Communicator and Context extension_module = "cudnn" ctx = get_extension_context(extension_module) comm = C.MultiProcessCommunicator(ctx) comm.init() # New group group = comm.new_group("node0", [0, 1, 2, 3])
- rank
Get rank of communicator.
- reduce(self, data, int dst, bool division=False, bool inplace=False, string group='world')
Reduce over data in different device.
- Parameters:
data (
NdArray
or list ofNdArray
) –dst (int) – Destination rank where the result is saved.
division (bool) – Flag to divide the reduce data by the number of
contexts
added, or the number of devices.inplace (bool) – Flag to use a packed array. Default is false. When true, it is memory-efficient but slow. When false, it is not memory efficient but fast. In both case, one can get the result in the same memory region.
group (string) – Name of a group. This groups is used when the collective is called.
Example:
# Run like `mpirun -n 2 python <code_snippet.py>` # note: the order of the output to stdout are stochastic because of multiprocesses. # Communicator and Context import numpy as np import nnabla as nn import nnabla.communicators as C from nnabla.ext_utils import get_extension_context extension_module = "cudnn" ctx = get_extension_context(extension_module) comm = C.MultiProcessCommunicator(ctx) comm.init() # Data x_list = [nn.Variable([2, 2]), nn.Variable([2, 2])] print("Before the collective ({}-th)".format(comm.rank)) for x in x_list: x.d = np.random.rand(*x.shape) print(x.d) # Reduce comm.reduce([x.data for x in x_list], dst=0, inplace=True) # Check print("After the collective ({}-th)".format(comm.rank)) for x in x_list: print(x.d)
- reduce_scatter(self, ndarray_list, ndarray, bool division=False, string group='world')
Reduce scatter over data in different device.
- Parameters:
ndarray_list (
NdArray
) – List of data to be reduced over different devices.ndarray (
NdArray
) – Data to be saved.group (string) – Name of a group. This groups is used when the collective is called.
Example:
# Run like `mpirun -n 2 python <code_snippet.py>` # note: the order of the output to stdout are stochastic because of multiprocesses. # Communicator and Context import numpy as np import nnabla as nn import nnabla.communicators as C from nnabla.ext_utils import get_extension_context extension_module = "cudnn" ctx = get_extension_context(extension_module) comm = C.MultiProcessCommunicator(ctx) comm.init() # Data x_list = [nn.Variable([2, 2]), nn.Variable([2, 2])] y = nn.Variable([2, 2]) print("Before the collective ({}-th)".format(comm.rank)) for x in x_list: x.d = np.random.rand(*x.shape) print(x.d) # ReduceScatter comm.reduce_scatter([x.data for x in x_list], y.data) # Check print("After the collective ({}-th)".format(comm.rank)) print(y.d)
- size
Get size of communicator.
List of communicators
- nnabla.communicators.MultiProcessDataParalellCommunicator(ctx)
MultiProcessDataParallelCommunicator(CContext ctx)
Multi Process Data Parallel Communicator for Distributed Training.
- Parameters:
context (
Context
) – context used in this communicator.
Example:
In case of the multi-process data parallel distributed training,
# Communicator and Context extension_module = "cudnn" ctx = get_extension_context(extension_module) comm = C.MultiProcessCommunicator(ctx) comm.init() n_devices = comm.size mpi_rank = comm.rank device_id = comm.local_rank ctx.device_id = str(device_id) nn.set_default_context(ctx) # Network and Solver created here ... # Training loop for itr in range(num_itr): # Forward, zerograd, backward loss.forward() solver.zero_grad() loss.backward() # Allreduce comm.all_reduce([v.grad for v in nn.get_parameters().values()]) # Update solver.update()