Communicator API¶
Communicator transfers parameters over the compute graphs.
This is an alias to communicator.py.
Communicator interface¶
-
class
nnabla.communicators.
Communicator
¶ Communicator interface class.
Communicator exchanges data (e.g., gradient) using MPI-like collectives. This class is used for the distributed training.
-
abort
(self)¶ Terminates MPI execution environment
-
add_context_and_parameters
(self, ctx_param_dict)¶ Add context and parameters.
-
all_gather
(self, ndarray, ndarray_list, string group='world')¶ All gather over data in different device.
- Parameters
ndarray (
NdArray
) – Data to be gathered.ndarray_list (
NdArray
) – Data to be saved.group (string) – Name of a group. This groups is used when the collective is called.
Example:
# Run like `mpirun -n 2 python <code_snippet.py>` # note: the order of the output to stdout are stochastic because of multiprocesses. # Communicator and Context import numpy as np import nnabla as nn import nnabla.communicators as C from nnabla.ext_utils import get_extension_context extension_module = "cudnn" ctx = get_extension_context(extension_module) comm = C.MultiProcessCommunicator(ctx) comm.init() # Data x = nn.Variable([2, 2]) x.d = np.random.rand(*x.shape) y_list = [nn.Variable([2, 2]), nn.Variable([2, 2])] print("Before the collective ({}-th)".format(comm.rank)) print(x.d) # AllGather comm.all_gather(x.data, [y.data for y in y_list]) # Check print("After the collective ({}-th)".format(comm.rank)) for y in y_list: print(y.d)
-
all_reduce
(self, data, bool division=False, bool inplace=False, string group='world')¶ All reduce over data in different device.
- Parameters
data (
NdArray
or list ofNdArray
) –division (bool) – Flag to divide the reduce data by the number of
contexts
added, or the number of devices.inplace (bool) – Flag to use a packed array. Default is false. When true, it is memory-efficient but slow. When false, it is not memory efficient but fast. In both case, one can get the result in the same memory region.
group (string) – Name of a group. This groups is used when the collective is called.
Example:
# Run like `mpirun -n 2 python <code_snippet.py>` # note: the order of the output to stdout are stochastic because of multiprocesses. # Communicator and Context import numpy as np import nnabla as nn import nnabla.communicators as C from nnabla.ext_utils import get_extension_context extension_module = "cudnn" ctx = get_extension_context(extension_module) comm = C.MultiProcessCommunicator(ctx) comm.init() # Data x_list = [nn.Variable([2, 2]), nn.Variable([2, 2])] print("Before the collective ({}-th)".format(comm.rank)) for x in x_list: x.d = np.random.rand(*x.shape) print(x.d) # AllReduce comm.all_reduce([x.data for x in x_list], inplace=True) # Check print("After the collective ({}-th)".format(comm.rank)) for x in x_list: print(x.d)
-
all_reduce_callback
(self, data, size_t pack_size, bool division=False, string group='world')¶ All reduce over data in different device.
Note
This function does not support shared parameters (such as RNNs) currently.
- Parameters
Example:
In case of the multi-process data parallel distributed training,
# Run like `mpirun -n 2 python <code_snippet.py>` # Communicator and Context import numpy as np import nnabla as nn import nnabla.communicators as C from nnabla.ext_utils import get_extension_context extension_module = "cudnn" ctx = get_extension_context(extension_module) comm = C.MultiProcessCommunicator(ctx) comm.init() n_class = 2 b, c, h, w = 4, 1, 32, 32 # Data x = nn.Variable([b, c, h, w]) y = nn.Variable([b, 1]) # Network setting h = PF.convolution(x, 1, (3, 3), (1, 1), (1, 1)) pred = PF.affine(h, 2) loss = F.mean(F.softmax_cross_entropy(pred, y)) loss.forward() # AllReduce during backward loss.backward(communicator_callbacks = comm.all_reduce_callback([v.grad for v in nn.get_parameters().values()], 1024 * 1024 * 2))
-
allreduce
(self, bool division=False, bool inplace=False)¶ Deprecated. See all_reduce, instead.
Allreduce over parameters added. Currently,
allreduce
is applied to gradient regions.- Parameters
division (bool) – Flag to divide the reduce data by the number of
contexts
added, or the number of devices.inplace (bool) – Flag to use a packed array. Default is false. When true, it is memory-efficient but slow. When false, it is not memory efficient but fast. In both case, one can get the result in the same memory region.
-
barrier
(self)¶ Blocks until all processes in the communicator have reached this routine.
-
bcast
(self, data, int src, bool inplace=False, string group='world')¶ Broadcast data to different devices.
- Parameters
data (
NdArray
or list ofNdArray
) –src (int) – Source rank where the data is broadcasted.
inplace (bool) – Flag to use a packed array. Default is false. When true, it is memory-efficient but slow. When false, it is not memory efficient but fast. In both case, one can get the result in the same memory region.
group (string) – Name of a group. This groups is used when the collective is called.
Example:
# Run like `mpirun -n 2 python <code_snippet.py>` # note: the order of the output to stdout are stochastic because of multiprocesses. # Communicator and Context import numpy as np import nnabla as nn import nnabla.communicators as C from nnabla.ext_utils import get_extension_context extension_module = "cudnn" ctx = get_extension_context(extension_module) comm = C.MultiProcessCommunicator(ctx) comm.init() # Data x_list = [nn.Variable([2, 2]), nn.Variable([2, 2])] print("Before the collective ({}-th)".format(comm.rank)) for x in x_list: x.d = np.random.rand(*x.shape) print(x.d) # Bcast comm.bcast([x.data for x in x_list], src=0, inplace=True) # Check print("After the collective ({}-th)".format(comm.rank)) for x in x_list: print(x.d)
-
clear_context_parameters
(self)¶ Clear all registered contexts and parameters.
-
find_group
(self, group)¶ Return the list of ranks in the group. If the group does not exist, the empty list is returned.
-
init
(self)¶ Initialize a communicator.
Initall or initrank, depending multi-threads or multi-processes. This function MUST be called after all parameters communicated are added by
add_context_and_parameters
.
-
local_rank
¶ Get local rank of communicator.
-
name
¶ Get communicator name.
-
new_group
(self, name_ranks)¶ -
Example:
# Communicator and Context extension_module = "cudnn" ctx = get_extension_context(extension_module) comm = C.MultiProcessCommunicator(ctx) comm.init() # New group group = comm.new_group("node0", [0, 1, 2, 3])
-
rank
¶ Get rank of communicator.
-
reduce
(self, data, int dst, bool division=False, bool inplace=False, string group='world')¶ Reduce over data in different device.
- Parameters
data (
NdArray
or list ofNdArray
) –dst (int) – Destination rank where the result is saved.
division (bool) – Flag to divide the reduce data by the number of
contexts
added, or the number of devices.inplace (bool) – Flag to use a packed array. Default is false. When true, it is memory-efficient but slow. When false, it is not memory efficient but fast. In both case, one can get the result in the same memory region.
group (string) – Name of a group. This groups is used when the collective is called.
Example:
# Run like `mpirun -n 2 python <code_snippet.py>` # note: the order of the output to stdout are stochastic because of multiprocesses. # Communicator and Context import numpy as np import nnabla as nn import nnabla.communicators as C from nnabla.ext_utils import get_extension_context extension_module = "cudnn" ctx = get_extension_context(extension_module) comm = C.MultiProcessCommunicator(ctx) comm.init() # Data x_list = [nn.Variable([2, 2]), nn.Variable([2, 2])] print("Before the collective ({}-th)".format(comm.rank)) for x in x_list: x.d = np.random.rand(*x.shape) print(x.d) # Reduce comm.reduce([x.data for x in x_list], dst=0, inplace=True) # Check print("After the collective ({}-th)".format(comm.rank)) for x in x_list: print(x.d)
-
reduce_scatter
(self, ndarray_list, ndarray, bool division=False, string group='world')¶ Reduce scatter over data in different device.
- Parameters
ndarray_list (
NdArray
) – List of data to be reduced over different devices.ndarray (
NdArray
) – Data to be saved.group (string) – Name of a group. This groups is used when the collective is called.
Example:
# Run like `mpirun -n 2 python <code_snippet.py>` # note: the order of the output to stdout are stochastic because of multiprocesses. # Communicator and Context import numpy as np import nnabla as nn import nnabla.communicators as C from nnabla.ext_utils import get_extension_context extension_module = "cudnn" ctx = get_extension_context(extension_module) comm = C.MultiProcessCommunicator(ctx) comm.init() # Data x_list = [nn.Variable([2, 2]), nn.Variable([2, 2])] y = nn.Variable([2, 2]) print("Before the collective ({}-th)".format(comm.rank)) for x in x_list: x.d = np.random.rand(*x.shape) print(x.d) # ReduceScatter comm.reduce_scatter([x.data for x in x_list], y.data) # Check print("After the collective ({}-th)".format(comm.rank)) print(y.d)
-
size
¶ Get size of communicator.
-
List of communicators¶
-
nnabla.communicators.
MultiProcessDataParalellCommunicator
()¶ MultiProcessDataParallelCommunicator(CContext ctx)
Multi Process Data Parallel Communicator for Distributed Training.
- Parameters
context (
Context
) – context used in this communicator.
Example:
In case of the multi-process data parallel distributed training,
# Communicator and Context extension_module = "cudnn" ctx = get_extension_context(extension_module) comm = C.MultiProcessCommunicator(ctx) comm.init() n_devices = comm.size mpi_rank = comm.rank device_id = comm.local_rank ctx.device_id = str(device_id) nn.set_default_context(ctx) # Network and Solver created here ... # Training loop for itr in range(num_itr): # Forward, zerograd, backward loss.forward() solver.zero_grad() loss.backward() # Allreduce comm.all_reduce([v.grad for v in nn.get_parameters().values()]) # Update solver.update()