Communicator API

Communicator transfers parameters over the compute graphs.

This is an alias to communicator.py.

Communicator interface

class nnabla.communicators.Communicator

Communicator interface class.

Communicator exchanges data (e.g., gradient) using MPI-like collectives. This class is used for the distributed training.

abort(self)

Terminates MPI execution environment

add_context_and_parameters(self, ctx_param_dict)

Add context and parameters.

Parameters:ctx_param_dict (tuple of Context, dict) – Key of the dictionary is string and value of the dictionry is Varible.
all_gather(self, ndarray, ndarray_list, string group='world')

All gather over data.

Parameters:
  • ndarray (NdArray) – Data to be gathered.
  • ndarray_list (NdArray) – Data to be saved.
  • group (string) – Name of a group. This groups is used when the collective is called.

Example:

In case of the multi-process data parallel distributed training,

# Communicator and Context
extension_module = "cudnn"
ctx = get_extension_context(extension_module)
comm = C.MultiProcessDataParalellCommunicator(ctx)
comm.init()

# Data
x = nn.Variable(10, 10)
x.d = np.random.rand(y.shape)
y_list = [nn.Variable(10, 10), nn.Variable(10, 10), nn.Variable(10, 10)]

# AllGather
comm.all_gather(x.data, [y.data for y in y_list])
all_reduce(self, data, bool division=False, bool inplace=False, string group='world')

All reduce over data.

Parameters:
  • data (NdArray or list of NdArray) –
  • division (bool) – Flag to divide the reduce data by the number of contexts added, or the number of devices.
  • inplace (bool) – Flag to use a packed array. Default is false. When true, it is memory-efficient but slow. When false, it is not memory efficient but fast. In both case, one can get the result in the same memory region.
  • group (string) – Name of a group. This groups is used when the collective is called.

Example:

In case of the multi-process data parallel distributed training,

# Communicator and Context
extension_module = "cudnn"
ctx = get_extension_context(extension_module)
comm = C.MultiProcessDataParalellCommunicator(ctx)
comm.init()

# Data
x_list = [nn.Variable(10, 10), nn.Variable(10, 10), nn.Variable(10, 10)]
for x in x_list:
    x.d = np.random.rand(x.shape)

# AllReduce
comm.all_reduce([x.data for x in x_list], inplace=True)
all_reduce_callback(self, data, size_t pack_size, bool division=False, string group='world')

All reduce over data.

Note

This function does not support shared parameters (such as RNNs) currently.

Parameters:
  • data (NdArray or list of NdArray) –
  • pack_size (int) – The number of values contained in the packed data.
  • division (bool) – Flag to divide the reduce data by the number of contexts added, or the number of devices.
  • group (string) – Name of a group. This groups is used when the collective is called.

Example:

In case of the multi-process data parallel distributed training,

# Communicator and Context
extension_module = "cuda.cudnn"
ctx = extension_context(extension_module)
comm = C.MultiProcessDataParalellCommunicator(ctx)
comm.init()

n_class = 2
b, c, h, w = 4, 1, 32, 32

# Data
x = nn.Variable([b, c, h, w])
y = nn.Variable([b, 1])

# Network setting
h = PF.convolution(x, 1, (3, 3), (1, 1), (1, 1))
pred = PF.affine(h, 2)
loss = F.mean(F.softmax_cross_entropy(pred, y))

loss.forward()
# AllReduce during backward
loss.backward(communicator_callbacks = comm.all_reduce_callback([v.grad for v in nn.get_parameters().values()], 1024 * 1024 * 2))
allreduce(self, bool division=False, bool inplace=False)

Deprecated. See all_reduce, instead.

Allreduce over parameters added. Currently, allreduce is applied to gradient regions.

Parameters:
  • division (bool) – Flag to divide the reduce data by the number of contexts added, or the number of devices.
  • inplace (bool) – Flag to use a packed array. Default is false. When true, it is memory-efficient but slow. When false, it is not memory efficient but fast. In both case, one can get the result in the same memory region.
barrier(self)

Blocks until all processes in the communicator have reached this routine.

bcast(self, data, int src, bool inplace=False, string group='world')

Reduce over data.

Parameters:
  • data (NdArray or list of NdArray) –
  • src (int) – Source rank where the data is broadcasted.
  • inplace (bool) – Flag to use a packed array. Default is false. When true, it is memory-efficient but slow. When false, it is not memory efficient but fast. In both case, one can get the result in the same memory region.
  • group (string) – Name of a group. This groups is used when the collective is called.

Example:

In case of the multi-process data parallel distributed training,

# Communicator and Context
extension_module = "cudnn"
ctx = get_extension_context(extension_module)
comm = C.MultiProcessDataParalellCommunicator(ctx)
comm.init()

# Data
x_list = [nn.Variable(10, 10), nn.Variable(10, 10), nn.Variable(10, 10)]
for x in x_list:
    x.d = np.random.rand(x.shape)

# Bcast
comm.bcast([x.data for x in x_list], src=0, inplace=True)
clear_context_parameters(self)

Clear all registered contexts and parameters.

find_group(self, group)

Return the list of ranks in the group. If the group does not exist, the empty list is returned.

Parameters:group (str) – Name of the group.
Returns:List of ranks (int).
Return type:ranks (list)
init(self)

Initialize a communicator.

Initall or initrank, depending multi-threads or multi-processes. This function MUST be called after all parameters communicated are added by add_context_and_parameters.

list_groups(self)
Returns:Groups (str) of name to ranks (list).
Return type:groups (dict)
local_rank

Get local rank of communicator.

name

Get communicator name.

new_group(self, name_ranks)
Parameters:name_ranks (tuple) – Tuple of name (str) and ranks (list).
Returns:group name (str)

Example:

In case of the multi-process data parallel distributed training,

# Communicator and Context
extension_module = "cudnn"
ctx = get_extension_context(extension_module)
comm = C.MultiProcessDataParalellCommunicator(ctx)
comm.init()

# New group
group = comm.new_group("node0", [0, 1, 2, 3])
rank

Get rank of communicator.

reduce(self, data, int dst, bool division=False, bool inplace=False, string group='world')

Reduce over data.

Parameters:
  • data (NdArray or list of NdArray) –
  • dst (int) – Distination rank where the result is saved.
  • division (bool) – Flag to divide the reduce data by the number of contexts added, or the number of devices.
  • inplace (bool) – Flag to use a packed array. Default is false. When true, it is memory-efficient but slow. When false, it is not memory efficient but fast. In both case, one can get the result in the same memory region.
  • group (string) – Name of a group. This groups is used when the collective is called.

Example:

In case of the multi-process data parallel distributed training,

# Communicator and Context
extension_module = "cudnn"
ctx = get_extension_context(extension_module)
comm = C.MultiProcessDataParalellCommunicator(ctx)
comm.init()

# Data
x_list = [nn.Variable(10, 10), nn.Variable(10, 10), nn.Variable(10, 10)]
for x in x_list:
    x.d = np.random.rand(x.shape)

# Reduce
comm.reduce([x.data for x in x_list], dst=0, inplace=True)
reduce_scatter(self, ndarray_list, ndarray, bool division=False, string group='world')

Reduce scatter over data.

Parameters:
  • ndarray_list (NdArray) – Data to be saved.
  • ndarray (NdArray) – Data to be gathered.
  • group (string) – Name of a group. This groups is used when the collective is called.

Example:

In case of the multi-process data parallel distributed training,

# Communicator and Context
extension_module = "cudnn"
ctx = get_extension_context(extension_module)
comm = C.MultiProcessDataParalellCommunicator(ctx)
comm.init()

# Data
y = nn.Variable(10, 10)
x_list = [nn.Variable(10, 10), nn.Variable(10, 10), nn.Variable(10, 10)]
for x in x_list:
    x.d = np.random.rand(x.shape)

# ReduceScatter
    comm.reduce_scatter([x.data for x in x_list], y.data)
size

Get size of communicator.

List of communicators

nnabla.communicators.DataParalellCommunicator(CContext ctx)

Data Parallel Communicator for Distributed Training.

This class does collectives in a single-process in a machine.

Parameters:context (Context) – context used in this communicator.

Example:

In case of the multi-thread data parallel distributed training,

# Networks and Solvers building comes above
import nnabla.communicators as C
comm = C.DataParalellCommunicator(ctx)

# Add contexts and parameters to the communicator
for i in range(n_devices):
    device_scope_name = "device{}".format(i)
    with nn.parameter_scope(device_scope_name):
        ctx = ctxs[i]
        params = nn.get_parameters()
        comm.add_context_and_parameters((ctx, params))
comm.init()

# Training loop
for itr in range(num_itr):

    # Forward, zerograd, backward
    for i in range(n_devices):
        losses[i].forward()
        solvers[i].zero_grad()
        losses[i].backward()

    # Allreduce
    comm.allreduce()

    # Update
    for i in range(n_devices):
        solvers[i].update()
nnabla.communicators.MultiProcessDataParalellCommunicator(CContext ctx)

Multi Process Data Parallel Communicator for Distributed Training.

Parameters:context (Context) – context used in this communicator.

Example:

In case of the multi-process data parallel distributed training,

# Communicator and Context
extension_module = "cudnn"
ctx = get_extension_context(extension_module)
comm = C.MultiProcessDataParalellCommunicator(ctx)
comm.init()
n_devices = comm.size
mpi_rank = comm.rank
device_id = comm.local_rank
ctx.device_id = str(device_id)
nn.set_default_context(ctx)

# Network and Solver created here

...


# Training loop
for itr in range(num_itr):
    # Forward, zerograd, backward
    losse.forward()
    solver.zero_grad()
    loss.backward()

    # Allreduce
    comm.allreduce([v.grad for v in nn.get_parameters().values()])

    # Update
    solver.update()