Source code for nnabla.experimental.graph_converters.quantize

# Copyright (c) 2020 Sony Corporation. All Rights Reserved.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# See the License for the specific language governing permissions and
# limitations under the License.

import nnabla as nn
import nnabla.functions as F
import numpy as np
from collections import defaultdict

from .graph_converter import FunctionModifier
from .batch_normalization_folding import BatchNormalizationFoldingModifier
from .batch_normalization_self_folding import BatchNormalizationSelfFoldingModifier
from .remove_function import RemoveFunctionModifier

[docs]class QuantizeNonQNNToRecordingModifier(FunctionModifier): def __init__(self, functions_ranks, config=None, training=True): super(QuantizeNonQNNToRecordingModifier, self).__init__() self._config = config self._fct_bin_set = { 'Add2': F.add2, 'Sub2': F.sub2, 'Mul2': F.mul2, 'Div2': F.div2, 'Pow2': F.pow2 } self._training = training # Dict to record the rank of each function self.functions_ranks = functions_ranks def get_function_rank(self, f): rank = self.functions_ranks.get(f, -1) return rank def check(self, f): def backward_traverse(f, l): # list functions between input and f for inp in f.inputs: if inp.parent is not None: l.append(inp.parent) backward_traverse(inp.parent, l) def forward_traverse(f, l): # list functions between f and output for fref in f.outputs[0].function_references: l.append(fref) forward_traverse(fref, l) def is_skip_layer(f): skip_inputs_layers = self._config.skip_inputs_layers skip_outputs_layers = self._config.skip_outputs_layers if not skip_inputs_layers and not skip_outputs_layers: return False fs = [] if skip_outputs_layers: forward_traverse(f, fs) fs = list(set([ for func in fs])) is_output_layer = True if skip_outputs_layers else False for skl in skip_outputs_layers: if skl in fs: is_output_layer = False break fs = [] if skip_inputs_layers: backward_traverse(f, fs) is_input_layer = True if skip_inputs_layers else False fs = list(set([ for func in fs])) for skl in skip_inputs_layers: if skl in fs: is_input_layer = False break for skl in skip_inputs_layers: if == skl and is_input_layer: return True for skl in skip_outputs_layers: if == skl and is_output_layer: return True return False fn = cfg = self._config # Do not record sink and bn if fn == 'Sink' or fn == 'BatchNormalization': return False # Only add recorder to specific layers # If record_layers empty, add to all layers record_layers = cfg.record_layers if record_layers and (fn not in record_layers): return False if is_skip_layer(f): return False return True def share_recorder(self, f, inputs, new_inputs, cfg): # share quantization parameters for Add2 and Concat fn = recorder_activation = cfg.recorder_activation recorder_weight = cfg.recorder_weight axes = [3] if cfg.channel_last else [1] if fn in ['Add2', 'Concatenate']: idx = 0 min_rank = inputs[0].rank for i, input_var in enumerate(new_inputs[1:]): if input_var.rank < min_rank: idx = i + 1 min_rank = input_var.rank shared_name = 'x0' scope = self.get_parameter_scope(new_inputs[idx].parent.inputs[1]) for i, input_var in enumerate(new_inputs): if i == idx: continue input_var = input_var.parent.inputs[0] with nn.parameter_scope(scope): input_var = recorder_activation()( input_var, axes=axes, training=self._training, name=shared_name) new_inputs[i] = input_var return new_inputs def add_recorder(self, f, inputs, cfg): fn = function_rank = self.get_function_rank(f) scope = '{}-{}'.format(fn, function_rank) axes = [3] if cfg.channel_last else [1] recorder_activation = cfg.recorder_activation recorder_weight = cfg.recorder_weight params_idx = 1 if fn in ['Concatenate', 'Stack']: params_idx = len(inputs) if fn in self._fct_bin_set: params_idx = 2 new_inputs = [] # Add recorder for variable(activation) for i, input_var in enumerate(inputs[:params_idx]): fref = input_var.function_references if fref and fref[0].info.type_name == cfg.recorder_activation().name(): input_var = fref[0].outputs[0] else: with nn.parameter_scope(scope): parent = input_var.parent if parent and == recorder_activation().name(): input_var = input_var else: input_var = recorder_activation()(input_var, axes=axes, training=self._training, name='x{}'.format(i)) new_inputs.append(input_var) # Add recorder for parameters(weight and bias) for i, input_parameter in enumerate(inputs[params_idx:]): with nn.parameter_scope(scope): input_parameter = recorder_weight()(input_parameter, axes=axes, training=self._training, name='w{}'.format(i)) new_inputs.append(input_parameter) return new_inputs def modify(self, f, inputs): if not self.check(f): return # Skip modify this function fn = cfg = self._config axes = [3] if cfg.channel_last else [1] recorder_activation = cfg.recorder_activation # Add recorder for each input new_inputs = self.add_recorder(f, inputs, cfg) new_inputs = self.share_recorder(f, inputs, new_inputs, cfg) h = self._modify_as_same(f, new_inputs) # Add recorder before/after a function next_func = f.outputs[0].function_references[0] next_func_rank = self.get_function_rank(next_func) scope = '{}-{}'.format(, next_func_rank) with nn.parameter_scope(scope): if cfg.recorder_position == cfg.RecorderPosition.BOTH: h = recorder_activation()(h, axes=axes, training=self._training, name='x0') return h
[docs]class QuantizeRecordingToTrainingModifier(FunctionModifier): class SimulatedQNN(object): """ Doc here. """ def __init__(self, functions_ranks, modifier=None, config=None): self._config = config self._modifier = modifier # input: [tuple(scale, zeropoint)] self._map_input_scale_zeropoint = defaultdict(list) # Dict to record the rank of each function self.functions_ranks = functions_ranks def get_function_rank(self, f): rank = self.functions_ranks.get(f, -1) return rank def _quantize_outputs(self, f, output, axes, cfg): # TODO: multi-outputs rm = cfg.round_mode nr = cfg.narrow_range dt = cfg.dtype pow2 = cfg.pow2.value h = output for next_func in f.outputs[0].function_references: next_fn = next_func_rank = self.get_function_rank(next_func) if next_fn == 'Sink': return h name = 'x0' if next_fn in self._modifier._fct_bin_set: for i, elm in enumerate(next_func.inputs): if f.outputs[0] == elm: name = 'x{}'.format(i) scope = '{}-{}'.format(next_fn, next_func_rank) with nn.parameter_scope(scope): sy, zpy = cfg.recorder_activation.get_scale_zeropoint( h, axes=axes, narrow_range=nr, round_method=pow2, name=name) if sy is not None: break # D->Q h = self.try_to_quantize(h, sy, zpy, rm, nr, dt) self._map_input_scale_zeropoint[h] = (sy, zpy) return h def try_to_quantize(self, x, scale, zero_point, round_mode, narrow_range, dtype): x = F.quantize_linear(x, scale, zero_point, round_mode, narrow_range, dtype) if scale is not None else x return x def try_to_dequantize(self, x, scale, zero_point): x = F.dequantize_linear( x, scale, zero_point) if scale is not None else x return x def get_quantization_params(self, variable): scale, zero_point = None, None if variable.parent is not None and == 'QuantizeLinear': scale = variable.parent.inputs[1].d zero_point = variable.parent.inputs[2].d return scale, zero_point def requantize_bias(self, f, inputs, scope, rm, nr, dt, skip_bias=False): def with_bias(): return True if len(inputs) == 3 else False functions_with_bias = ['Affine', 'Convolution', 'Deconvolution', 'DepthwiseConvolution', 'DepthwiseDeconvolution'] fn = # handle bias. recalculate scale and zero point of bias if fn in functions_with_bias and with_bias(): x, w, b = inputs sx, zpx, sw = None, None, None # Get scale and zero_point of x sx, zpx = self.get_quantization_params(x) # Get scale and zero_point of w sw, _ = self.get_quantization_params(w) if sx is not None and sw is not None: sbd = np.reshape(sx.copy(), (1,)) * \ np.reshape(sw.copy(), (1,)) with nn.parameter_scope(scope): sb = nn.parameter.get_parameter_or_create( 'scale-b', (1,), sbd, False) zpbd = np.reshape(zpx.copy(), (1,)) zpb = nn.parameter.get_parameter_or_create( 'zeropoint-b', (1,), zpbd, False) if b.parent is not None and == 'QuantizeLinear': b = b.parent.inputs[0] # Quantize/Dequantize Bias only when skip bias is False b = F.quantize_linear( b, sb, zpb, rm, nr, dt) if not skip_bias else b inputs[2] = b return inputs def quantize_inputs(self, f, inputs, scope, cfg, axes, rm, nr, dt, pow2): fn = params_idx = 1 if fn in ['Concatenate', 'Stack']: params_idx = len(inputs) if fn in self._modifier._fct_bin_set: params_idx = 2 # Quantize input inps = [] for i, input_var in enumerate(inputs[:params_idx]): if input_var.rank == 0: with nn.parameter_scope(scope): sx, zpx = cfg.recorder_activation.get_scale_zeropoint( input_var, axes=axes, narrow_range=nr, round_method=pow2, name='x{}'.format(i)) input_var = self.try_to_quantize( input_var, sx, zpx, rm, nr, dt) self._map_input_scale_zeropoint[input_var] = (sx, zpx) inps.append(input_var) for i, input_parameter in enumerate(inputs[params_idx:]): with nn.parameter_scope(scope): sw, zpw = cfg.recorder_weight.get_scale_zeropoint( input_parameter, axes=axes, narrow_range=nr, round_method=pow2, name='w{}'.format(i)) input_parameter = self.try_to_quantize( input_parameter, sw, zpw, rm, nr, dt) inps.append(input_parameter) # handle bias. recalculate scale and zero point of bias inps = self.requantize_bias( f, inps, scope, rm, nr, dt, cfg.skip_bias) return inps def dequantize_inputs(self, inps): # Dequantize input for i, var in enumerate(inps): s, zp = None, None frefs = var.function_references for rf in frefs: if == 'DequantizeLinear': var = rf.outputs[0] break if var.parent and == 'QuantizeLinear': s, zp = var.parent.inputs[1:3] var = self.try_to_dequantize(var, s, zp) inps[i] = var return inps def shared_quantization(self, f, inps, cfg): # share quantization parameter for Add2 and Concat fn = if fn in ['Add2', 'Concatenate']: idx = 0 min_rank = inps[0].rank s, zp = None, None for i, x in enumerate(inps[1:]): if x.parent is not None and == 'DequantizeLinear': if x.rank < min_rank: idx = i + 1 min_rank = x.rank if inps[idx].parent is not None and inps[idx] == 'DequantizeLinear': s, zp = inps[idx].parent.inputs[1:3] for i, x in enumerate(inps): if i == idx: continue if inps[i].parent is not None and inps[i] == 'DequantizeLinear': inps[i] = inps[i].parent.inputs[0].parent.inputs[0] inps[i] = self.try_to_quantize(inps[i], s, zp, cfg.round_mode, cfg.narrow_range, cfg.dtype) inps[i] = self.try_to_dequantize(inps[i], s, zp) return inps def modify(self, f, inputs): fn = cfg = self._config function_rank = self.get_function_rank(f) scope = '{}-{}'.format(fn, function_rank) rm = cfg.round_mode nr = cfg.narrow_range dt = cfg.dtype pow2 = cfg.pow2.value axes = [3] if cfg.channel_last else [1] # inputs -> Q -> DQ -> F inps = self.quantize_inputs( f, inputs, scope, cfg, axes, rm, nr, dt, pow2) inps = self.dequantize_inputs(inps) inps = self.shared_quantization(f, inps, cfg) h = self._modifier._modify_as_same(f, inps) if fn == 'Sink': return h # F -> output -> Q -> DQ h = self._quantize_outputs(f, h, axes, cfg) return h def __finish__(self): # input: [tuple(scale, zeropoint)] self._map_input_scale_zeropoint = defaultdict(list) """ Doc here. """ def __init__(self, functions_ranks, config=None): super(QuantizeRecordingToTrainingModifier, self).__init__() self._fct_bin_set = { 'Add2': F.add2, 'Sub2': F.sub2, 'Mul2': F.mul2, 'Div2': F.div2, 'Pow2': F.pow2 } from nnabla.utils.qnn import PrecisionMode self._mode = self._precision_mode_set = { PrecisionMode.QNN: None, PrecisionMode.SIM_QNN: self.SimulatedQNN, PrecisionMode.MIXED_QNN: None }[config.precision_mode](functions_ranks, self, config) def modify(self, f, inputs): return self._mode.modify(f, inputs) def __finish__(self): self._mode.__finish__()