Communicator API

Communicator transfers parameters over the compute graphs.

This is an alias to communicator.py.

Communicator interface

class nnabla.communicators.Communicator

Communicator interface class.

Communicator exchanges data (e.g., gradient) using MPI-like collectives. This class is used for the distributed training.

abort(self)

Terminates MPI execution environment

add_context_and_parameters(self, ctx_param_dict)

Add context and parameters.

Parameters:ctx_param_dict (tuple of Context, dict) – Key of the dictionary is string and value of the dictionary is Variable.
all_gather(self, ndarray, ndarray_list, string group='world')

All gather over data in different device.

Parameters:
  • ndarray (NdArray) – Data to be gathered.
  • ndarray_list (NdArray) – Data to be saved.
  • group (string) – Name of a group. This groups is used when the collective is called.

Example:

# Run like `mpirun -n 2 python <code_snippet.py>`
# note: the order of the output to stdout are stochastic because of multiprocesses.

# Communicator and Context
import numpy as np
import nnabla as nn
import nnabla.communicators as C
from nnabla.ext_utils import get_extension_context

extension_module = "cudnn"
ctx = get_extension_context(extension_module)
comm = C.MultiProcessCommunicator(ctx)
comm.init()

# Data
x = nn.Variable([2, 2])
x.d = np.random.rand(*x.shape)
y_list = [nn.Variable([2, 2]), nn.Variable([2, 2])]
print("Before the collective ({}-th)".format(comm.rank))
print(x.d)

# AllGather
comm.all_gather(x.data, [y.data for y in y_list])

# Check
print("After the collective ({}-th)".format(comm.rank))
for y in y_list:
    print(y.d)
all_reduce(self, data, bool division=False, bool inplace=False, string group='world')

All reduce over data in different device.

Parameters:
  • data (NdArray or list of NdArray) –
  • division (bool) – Flag to divide the reduce data by the number of contexts added, or the number of devices.
  • inplace (bool) – Flag to use a packed array. Default is false. When true, it is memory-efficient but slow. When false, it is not memory efficient but fast. In both case, one can get the result in the same memory region.
  • group (string) – Name of a group. This groups is used when the collective is called.

Example:

# Run like `mpirun -n 2 python <code_snippet.py>`
# note: the order of the output to stdout are stochastic because of multiprocesses.

# Communicator and Context
import numpy as np
import nnabla as nn
import nnabla.communicators as C
from nnabla.ext_utils import get_extension_context

extension_module = "cudnn"
ctx = get_extension_context(extension_module)
comm = C.MultiProcessCommunicator(ctx)
comm.init()

# Data
x_list = [nn.Variable([2, 2]), nn.Variable([2, 2])]
print("Before the collective ({}-th)".format(comm.rank))
for x in x_list:
    x.d = np.random.rand(*x.shape)
    print(x.d)

# AllReduce
comm.all_reduce([x.data for x in x_list], inplace=True)

# Check
print("After the collective ({}-th)".format(comm.rank))
for x in x_list:
    print(x.d)
all_reduce_callback(self, data, size_t pack_size, bool division=False, string group='world')

All reduce over data in different device.

Note

This function does not support shared parameters (such as RNNs) currently.

Parameters:
  • data (NdArray or list of NdArray) –
  • pack_size (int) – The number of values contained in the packed data.
  • division (bool) – Flag to divide the reduce data by the number of contexts added, or the number of devices.
  • group (string) – Name of a group. This groups is used when the collective is called.

Example:

In case of the multi-process data parallel distributed training,

# Run like `mpirun -n 2 python <code_snippet.py>`

# Communicator and Context
import numpy as np
import nnabla as nn
import nnabla.communicators as C
from nnabla.ext_utils import get_extension_context

extension_module = "cudnn"
ctx = get_extension_context(extension_module)
comm = C.MultiProcessCommunicator(ctx)
comm.init()

n_class = 2
b, c, h, w = 4, 1, 32, 32

# Data
x = nn.Variable([b, c, h, w])
y = nn.Variable([b, 1])

# Network setting
h = PF.convolution(x, 1, (3, 3), (1, 1), (1, 1))
pred = PF.affine(h, 2)
loss = F.mean(F.softmax_cross_entropy(pred, y))

loss.forward()
# AllReduce during backward
loss.backward(communicator_callbacks = comm.all_reduce_callback([v.grad for v in nn.get_parameters().values()], 1024 * 1024 * 2))
allreduce(self, bool division=False, bool inplace=False)

Deprecated. See all_reduce, instead.

Allreduce over parameters added. Currently, allreduce is applied to gradient regions.

Parameters:
  • division (bool) – Flag to divide the reduce data by the number of contexts added, or the number of devices.
  • inplace (bool) – Flag to use a packed array. Default is false. When true, it is memory-efficient but slow. When false, it is not memory efficient but fast. In both case, one can get the result in the same memory region.
barrier(self)

Blocks until all processes in the communicator have reached this routine.

bcast(self, data, int src, bool inplace=False, string group='world')

Broadcast data to different devices.

Parameters:
  • data (NdArray or list of NdArray) –
  • src (int) – Source rank where the data is broadcasted.
  • inplace (bool) – Flag to use a packed array. Default is false. When true, it is memory-efficient but slow. When false, it is not memory efficient but fast. In both case, one can get the result in the same memory region.
  • group (string) – Name of a group. This groups is used when the collective is called.

Example:

# Run like `mpirun -n 2 python <code_snippet.py>`
# note: the order of the output to stdout are stochastic because of multiprocesses.

# Communicator and Context
import numpy as np
import nnabla as nn
import nnabla.communicators as C
from nnabla.ext_utils import get_extension_context

extension_module = "cudnn"
ctx = get_extension_context(extension_module)
comm = C.MultiProcessCommunicator(ctx)
comm.init()

# Data
x_list = [nn.Variable([2, 2]), nn.Variable([2, 2])]
print("Before the collective ({}-th)".format(comm.rank))
for x in x_list:
    x.d = np.random.rand(*x.shape)
    print(x.d)

# Bcast
comm.bcast([x.data for x in x_list], src=0, inplace=True)

# Check
print("After the collective ({}-th)".format(comm.rank))
for x in x_list:
    print(x.d)
clear_context_parameters(self)

Clear all registered contexts and parameters.

find_group(self, group)

Return the list of ranks in the group. If the group does not exist, the empty list is returned.

Parameters:group (str) – Name of the group.
Returns:List of ranks (int).
Return type:ranks (list)
init(self)

Initialize a communicator.

Initall or initrank, depending multi-threads or multi-processes. This function MUST be called after all parameters communicated are added by add_context_and_parameters.

list_groups(self)
Returns:Groups (str) of name to ranks (list).
Return type:groups (dict)
local_rank

Get local rank of communicator.

name

Get communicator name.

new_group(self, name_ranks)
Parameters:name_ranks (tuple) – Tuple of name (str) and ranks (list).
Returns:group name (str)

Example:

# Communicator and Context
extension_module = "cudnn"
ctx = get_extension_context(extension_module)
comm = C.MultiProcessCommunicator(ctx)
comm.init()

# New group
group = comm.new_group("node0", [0, 1, 2, 3])
rank

Get rank of communicator.

reduce(self, data, int dst, bool division=False, bool inplace=False, string group='world')

Reduce over data in different device.

Parameters:
  • data (NdArray or list of NdArray) –
  • dst (int) – Destination rank where the result is saved.
  • division (bool) – Flag to divide the reduce data by the number of contexts added, or the number of devices.
  • inplace (bool) – Flag to use a packed array. Default is false. When true, it is memory-efficient but slow. When false, it is not memory efficient but fast. In both case, one can get the result in the same memory region.
  • group (string) – Name of a group. This groups is used when the collective is called.

Example:

# Run like `mpirun -n 2 python <code_snippet.py>`
# note: the order of the output to stdout are stochastic because of multiprocesses.

# Communicator and Context
import numpy as np
import nnabla as nn
import nnabla.communicators as C
from nnabla.ext_utils import get_extension_context

extension_module = "cudnn"
ctx = get_extension_context(extension_module)
comm = C.MultiProcessCommunicator(ctx)
comm.init()

# Data
x_list = [nn.Variable([2, 2]), nn.Variable([2, 2])]
print("Before the collective ({}-th)".format(comm.rank))
for x in x_list:
    x.d = np.random.rand(*x.shape)
    print(x.d)

# Reduce
comm.reduce([x.data for x in x_list], dst=0, inplace=True)

# Check
print("After the collective ({}-th)".format(comm.rank))
for x in x_list:
    print(x.d)
reduce_scatter(self, ndarray_list, ndarray, bool division=False, string group='world')

Reduce scatter over data in different device.

Parameters:
  • ndarray_list (NdArray) – List of data to be reduced over different devices.
  • ndarray (NdArray) – Data to be saved.
  • group (string) – Name of a group. This groups is used when the collective is called.

Example:

# Run like `mpirun -n 2 python <code_snippet.py>`
# note: the order of the output to stdout are stochastic because of multiprocesses.

# Communicator and Context
import numpy as np
import nnabla as nn
import nnabla.communicators as C
from nnabla.ext_utils import get_extension_context

extension_module = "cudnn"
ctx = get_extension_context(extension_module)
comm = C.MultiProcessCommunicator(ctx)
comm.init()

# Data
x_list = [nn.Variable([2, 2]), nn.Variable([2, 2])]
y = nn.Variable([2, 2])
print("Before the collective ({}-th)".format(comm.rank))
for x in x_list:
    x.d = np.random.rand(*x.shape)
    print(x.d)

# ReduceScatter
comm.reduce_scatter([x.data for x in x_list], y.data)

# Check
print("After the collective ({}-th)".format(comm.rank))
print(y.d)
size

Get size of communicator.

List of communicators

nnabla.communicators.MultiProcessDataParalellCommunicator()

MultiProcessDataParallelCommunicator(CContext ctx)

Multi Process Data Parallel Communicator for Distributed Training.

Parameters:context (Context) – context used in this communicator.

Example:

In case of the multi-process data parallel distributed training,

# Communicator and Context
extension_module = "cudnn"
ctx = get_extension_context(extension_module)
comm = C.MultiProcessCommunicator(ctx)
comm.init()
n_devices = comm.size
mpi_rank = comm.rank
device_id = comm.local_rank
ctx.device_id = str(device_id)
nn.set_default_context(ctx)

# Network and Solver created here

...


# Training loop
for itr in range(num_itr):
    # Forward, zerograd, backward
    loss.forward()
    solver.zero_grad()
    loss.backward()

    # Allreduce
    comm.all_reduce([v.grad for v in nn.get_parameters().values()])

    # Update
    solver.update()