Communicator API¶
Communicator transfers parameters over the compute graphs.
This is an alias to communicator.py.
Communicator interface¶
-
class
nnabla.communicators.
Communicator
¶ Communicator interface class.
Communicator exchanges data (e.g., gradient) using MPI-like collectives. This class is used for the distributed training.
-
abort
(self)¶ Terminates MPI execution environment
-
add_context_and_parameters
(self, ctx_param_dict)¶ Add context and parameters.
Parameters: ctx_param_dict ( tuple
ofContext
,dict
) – Key of the dictionary isstring
and value of the dictionary isVariable
.
-
all_gather
(self, ndarray, ndarray_list, string group='world')¶ All gather over data.
Parameters: - ndarray (
NdArray
) – Data to be gathered. - ndarray_list (
NdArray
) – Data to be saved. - group (string) – Name of a group. This groups is used when the collective is called.
Example:
In case of the multi-process data parallel distributed training,
# Communicator and Context extension_module = "cudnn" ctx = get_extension_context(extension_module) comm = C.MultiProcessCommunicator(ctx) comm.init() # Data x = nn.Variable(10, 10) x.d = np.random.rand(y.shape) y_list = [nn.Variable(10, 10), nn.Variable(10, 10), nn.Variable(10, 10)] # AllGather comm.all_gather(x.data, [y.data for y in y_list])
- ndarray (
-
all_reduce
(self, data, bool division=False, bool inplace=False, string group='world')¶ All reduce over data.
Parameters: - data (
NdArray
or list ofNdArray
) – - division (bool) – Flag to divide the reduce data by the
number of
contexts
added, or the number of devices. - inplace (bool) – Flag to use a packed array. Default is false. When true, it is memory-efficient but slow. When false, it is not memory efficient but fast. In both case, one can get the result in the same memory region.
- group (string) – Name of a group. This groups is used when the collective is called.
Example:
In case of the multi-process data parallel distributed training,
# Communicator and Context extension_module = "cudnn" ctx = get_extension_context(extension_module) comm = C.MultiProcessCommunicator(ctx) comm.init() # Data x_list = [nn.Variable(10, 10), nn.Variable(10, 10), nn.Variable(10, 10)] for x in x_list: x.d = np.random.rand(x.shape) # AllReduce comm.all_reduce([x.data for x in x_list], inplace=True)
- data (
-
all_reduce_callback
(self, data, size_t pack_size, bool division=False, string group='world')¶ All reduce over data.
Note
This function does not support shared parameters (such as RNNs) currently.
Parameters: Example:
In case of the multi-process data parallel distributed training,
# Communicator and Context extension_module = "cuda.cudnn" ctx = extension_context(extension_module) comm = C.MultiProcessCommunicator(ctx) comm.init() n_class = 2 b, c, h, w = 4, 1, 32, 32 # Data x = nn.Variable([b, c, h, w]) y = nn.Variable([b, 1]) # Network setting h = PF.convolution(x, 1, (3, 3), (1, 1), (1, 1)) pred = PF.affine(h, 2) loss = F.mean(F.softmax_cross_entropy(pred, y)) loss.forward() # AllReduce during backward loss.backward(communicator_callbacks = comm.all_reduce_callback([v.grad for v in nn.get_parameters().values()], 1024 * 1024 * 2))
-
allreduce
(self, bool division=False, bool inplace=False)¶ Deprecated. See all_reduce, instead.
Allreduce over parameters added. Currently,
allreduce
is applied to gradient regions.Parameters: - division (bool) – Flag to divide the reduce data by the
number of
contexts
added, or the number of devices. - inplace (bool) – Flag to use a packed array. Default is false. When true, it is memory-efficient but slow. When false, it is not memory efficient but fast. In both case, one can get the result in the same memory region.
- division (bool) – Flag to divide the reduce data by the
number of
-
barrier
(self)¶ Blocks until all processes in the communicator have reached this routine.
-
bcast
(self, data, int src, bool inplace=False, string group='world')¶ Reduce over data.
Parameters: - data (
NdArray
or list ofNdArray
) – - src (int) – Source rank where the data is broadcasted.
- inplace (bool) – Flag to use a packed array. Default is false. When true, it is memory-efficient but slow. When false, it is not memory efficient but fast. In both case, one can get the result in the same memory region.
- group (string) – Name of a group. This groups is used when the collective is called.
Example:
In case of the multi-process data parallel distributed training,
# Communicator and Context extension_module = "cudnn" ctx = get_extension_context(extension_module) comm = C.MultiProcessCommunicator(ctx) comm.init() # Data x_list = [nn.Variable(10, 10), nn.Variable(10, 10), nn.Variable(10, 10)] for x in x_list: x.d = np.random.rand(x.shape) # Bcast comm.bcast([x.data for x in x_list], src=0, inplace=True)
- data (
-
clear_context_parameters
(self)¶ Clear all registered contexts and parameters.
-
find_group
(self, group)¶ Return the list of ranks in the group. If the group does not exist, the empty list is returned.
Parameters: group (str) – Name of the group. Returns: List of ranks ( int
).Return type: ranks (list)
-
init
(self)¶ Initialize a communicator.
Initall or initrank, depending multi-threads or multi-processes. This function MUST be called after all parameters communicated are added by
add_context_and_parameters
.
-
local_rank
¶ Get local rank of communicator.
-
name
¶ Get communicator name.
-
new_group
(self, name_ranks)¶ Parameters: name_ranks (tuple) – Tuple of name ( str
) and ranks (list
).Returns: group name (str) Example:
In case of the multi-process data parallel distributed training,
# Communicator and Context extension_module = "cudnn" ctx = get_extension_context(extension_module) comm = C.MultiProcessCommunicator(ctx) comm.init() # New group group = comm.new_group("node0", [0, 1, 2, 3])
-
rank
¶ Get rank of communicator.
-
reduce
(self, data, int dst, bool division=False, bool inplace=False, string group='world')¶ Reduce over data.
Parameters: - data (
NdArray
or list ofNdArray
) – - dst (int) – Destination rank where the result is saved.
- division (bool) – Flag to divide the reduce data by the
number of
contexts
added, or the number of devices. - inplace (bool) – Flag to use a packed array. Default is false. When true, it is memory-efficient but slow. When false, it is not memory efficient but fast. In both case, one can get the result in the same memory region.
- group (string) – Name of a group. This groups is used when the collective is called.
Example:
In case of the multi-process data parallel distributed training,
# Communicator and Context extension_module = "cudnn" ctx = get_extension_context(extension_module) comm = C.MultiProcessCommunicator(ctx) comm.init() # Data x_list = [nn.Variable(10, 10), nn.Variable(10, 10), nn.Variable(10, 10)] for x in x_list: x.d = np.random.rand(x.shape) # Reduce comm.reduce([x.data for x in x_list], dst=0, inplace=True)
- data (
-
reduce_scatter
(self, ndarray_list, ndarray, bool division=False, string group='world')¶ Reduce scatter over data.
Parameters: - ndarray_list (
NdArray
) – Data to be saved. - ndarray (
NdArray
) – Data to be gathered. - group (string) – Name of a group. This groups is used when the collective is called.
Example:
In case of the multi-process data parallel distributed training,
# Communicator and Context extension_module = "cudnn" ctx = get_extension_context(extension_module) comm = C.MultiProcessCommunicator(ctx) comm.init() # Data y = nn.Variable(10, 10) x_list = [nn.Variable(10, 10), nn.Variable(10, 10), nn.Variable(10, 10)] for x in x_list: x.d = np.random.rand(x.shape) # ReduceScatter comm.reduce_scatter([x.data for x in x_list], y.data)
- ndarray_list (
-
size
¶ Get size of communicator.
-
List of communicators¶
-
nnabla.communicators.
MultiProcessDataParalellCommunicator
()¶ MultiProcessDataParallelCommunicator(CContext ctx)
Multi Process Data Parallel Communicator for Distributed Training.
Parameters: context ( Context
) – context used in this communicator.Example:
In case of the multi-process data parallel distributed training,
# Communicator and Context extension_module = "cudnn" ctx = get_extension_context(extension_module) comm = C.MultiProcessCommunicator(ctx) comm.init() n_devices = comm.size mpi_rank = comm.rank device_id = comm.local_rank ctx.device_id = str(device_id) nn.set_default_context(ctx) # Network and Solver created here ... # Training loop for itr in range(num_itr): # Forward, zerograd, backward loss.forward() solver.zero_grad() loss.backward() # Allreduce comm.allreduce([v.grad for v in nn.get_parameters().values()]) # Update solver.update()