# ------------------------------------------------------------------------- # Copyright (c) Microsoft Corporation. All rights reserved. # Licensed under the MIT License. See License.txt in the project root for # license information. # -------------------------------------------------------------------------- from __future__ import annotations import logging from dataclasses import dataclass from enum import Enum from typing import Any import numpy as np import onnx import onnx.numpy_helper from onnx import TensorProto from onnx import onnx_pb as onnx_proto from .base_quantizer import BaseQuantizer, QuantizationParams from .calibrate import TensorData from .quant_utils import ( DEQUANT_OP_NAME, ONNX_TYPE_TO_NP_TYPE, QUANT_OP_NAME, QuantizedValue, QuantizedValueType, __producer__, __version__, add_dequant_output_suffix, add_dequant_suffix, add_quant_input_suffix, add_quant_output_suffix, add_quant_suffix, compute_data_quant_params, compute_scale_zp, compute_scale_zp_float8, find_by_name, get_qmin_qmax_for_qType, ms_domain, normalize_axis, quantize_onnx_initializer, tensor_proto_to_array, ) from .registry import CreateQDQQuantizer class QDQQuantTensorType(Enum): ACTIVATION = 0 WEIGHT = 1 BIAS = 2 # Holds the name of the node input from which a node output will share the # same quantization param initializers (zero-point and scale initializers). # Ex: A Transpose node's output will use the same quant param initializers used at the input. @dataclass class QDQQuantParamProvider: input_name: str node_name: str # Holds information for tensors that have been marked for quantization by operator quantizers. # Does not hold information for bias tensors. class QDQTensorQuantInfo: def __init__(self, tensor_type=QDQQuantTensorType.ACTIVATION, quant_para_provider=None, axis=None, data_type=None): self.tensor_type = tensor_type self.quant_para_provider = quant_para_provider self.axis = axis self.is_shared = quant_para_provider is not None assert data_type is not None self.data_type = data_type # Holds information for bias tensors that have been marked for quantization by operator quantizers. @dataclass class QDQBiasQuantInfo: node_name: str input_name: str weight_name: str beta: float # Holds quantization parameter values (scale, zp) for a tensor. # A tensor typically has a one set of quantization parameters, unless the tensor is # at a "mixed-precision" boundary where the activation quantization type changes (e.g., from uint8 to uint16). @dataclass class QDQTensorQuantParams: original: QuantizationParams # Generated by producer node. converted: QuantizationParams | None # Converted type consumed by some (or all/none) consumer nodes. converted_recv_nodes: set[str] | None # The name of nodes that consume the converted type. def get_for_consumer(self, consumer_node_name) -> QuantizationParams: if self.converted is None: # Quantized value is not converted, return original return self.original if self.converted_recv_nodes is None: # All consumers receive the converted value return self.converted # Check if consumer node name is in the list of nodes that # receive the converted quantization value. If not, return the original value generated # by the tensor's producer. return self.converted if (consumer_node_name in self.converted_recv_nodes) else self.original # Holds scale and zero_point initializer TensorProtos. @dataclass class QDQScaleZpInitializers: scale: TensorProto zero_point: TensorProto # Holds all scale and zero-point initializers for a tensor. # A tensor typically has a one set of quantization parameters, unless the tensor is # at a "mixed-precision" boundary where the activation quantization type changes (e.g., from uint8 to uint16). @dataclass class QDQTensorScaleZpInitializers: original: QDQScaleZpInitializers converted: QDQScaleZpInitializers | None converted_recv_nodes: set[str] | None # Holds cached information of a tensor's quantized values (types, zp/scale initializer names, etc.). # A tensor typically has a one set of quantization parameters, unless the tensor is # at a "mixed-precision" boundary where the activation quantization type changes (e.g., from uint8 to uint16). @dataclass class QDQTensorQuantizedValue: original: QuantizedValue converted: QuantizedValue | None converted_recv_nodes: set[str] | None def get_for_consumer(self, consumer_node_name) -> QuantizedValue: if self.converted is None: # Quantized value is not converted, return original return self.original if self.converted_recv_nodes is None: # All consumers receive the converted value return self.converted # Check if consumer node name is in the list of nodes that # receive the converted quantization value. If not, return the original value generated # by the tensor's producer. return self.converted if (consumer_node_name in self.converted_recv_nodes) else self.original class QDQQuantizer(BaseQuantizer): def __init__( self, model, per_channel, reduce_range, weight_qType, activation_qType, tensors_range, nodes_to_quantize, nodes_to_exclude, op_types_to_quantize, extra_options=None, ): BaseQuantizer.__init__( self, model, per_channel, reduce_range, weight_qType, activation_qType, tensors_range, nodes_to_quantize, nodes_to_exclude, op_types_to_quantize, extra_options, ) self.tensors_to_quantize: dict[str, QDQTensorQuantInfo] = {} self.bias_to_quantize: dict[str, QDQBiasQuantInfo] = {} self.nodes_to_remove = [] # Specific op types to exclude qdq quantization for their outputs. # In TRT, it's not recommended to quantize outputs for weighted ops such as Conv, Matmul, Gemm # because those ops may be followed by nodes that require high resolution inputs. # Adding QDQ for those ops' output may end up with worse accuracy. # So, we don't recommend to add QDQ to node's output under such condition. self.op_types_to_exclude_output_quantization = extra_options.get("OpTypesToExcludeOutputQuantization", []) # We do quantization on Dequantizelinear's input to remove Quantizelinear for weight as an optimization. # In some cases, for example QDQ BERT model for TensorRT, QDQ should always appear as a pair. # Therefore, we need to disable this optimization and add qdq pair to weight. self.add_qdq_pair_to_weight = extra_options.get("AddQDQPairToWeight", False) # Some scenarios do not need the bias quantized. For example, in the case of Quantization Aware Training, # quantizing the bias is not needed. This is because in QAT, all model parameters are expected to be in # floating point format. To that end, we can use the FakeQuant operator for weights and activations that # can always have QDQ pairs (by using AddQDQPairToWeight). But for biases in a quantized model, we can't use # FakeQuant because it only ever appears before a DQ (since it is quantized as int32). self.quantize_bias = extra_options.get("QuantizeBias", True) # The default behavior is that multiple nodes can share a QDQ pair as their inputs. # In TRT, QDQ pair can`t be shared between nodes, so it will create dedicated QDQ pairs for each node. self.dedicated_qdq_pair = extra_options.get("DedicatedQDQPair", False) self.tensor_to_its_receiving_nodes = {} # Let user set channel axis for specific op type and it's effective only when per channel quantization is supported and per_channel is True. self.qdq_op_type_per_channel_support_to_axis = extra_options.get("QDQOpTypePerChannelSupportToAxis", {}) self.qdq_op_domain = ms_domain if extra_options.get("UseQDQContribOps", False) else None # User can specify if removable activations, like Clip/Relu, should be kept in the graph. # Used in the QDQRemovableActivation class. self.qdq_keep_removable_activations = extra_options.get("QDQKeepRemovableActivations", False) # Let user disable adjustment of weight scales for bias inputs that are quantized to int32. self.qdq_disable_weight_adjust_for_int32_bias = extra_options.get("QDQDisableWeightAdjustForInt32Bias", False) # The ONNX spec did not support 16-bit Q/DQ ops before opset 21. # So, may have to override the Q/DQ op domain to 'com.microsoft' if the activation or weight types # are 16-bit or 4-bit integers. if self.opset_version < 21: opset21_types = (TensorProto.UINT16, TensorProto.INT16, TensorProto.UINT4, TensorProto.INT4) overrides_have_opset21_types = any( t.tensor_type in opset21_types for t in self.tensor_quant_override_qtypes ) if not self.qdq_op_domain and ( self.activation_qType in opset21_types or self.weight_qType in opset21_types or overrides_have_opset21_types ): logging.warning( "ONNX QuantizeLinear and DequantizeLinear operators do not support " "16-bit/4-bit integer quantization types prior to opset 21. " f"The domain of QuantizeLinear and DequantizeLinear operators will be set to '{ms_domain}' to " "enable support." ) self.qdq_op_domain = ms_domain self.quantization_params = self.calc_graph_quant_params() self.initializer_quant_params: dict[str, QuantizationParams] = {} # Map of all original value names to quantized value names self.quantized_value_map = {} def _get_tensor_type(self, tensor_name): """ Check if tensor can be quantized """ weight = find_by_name(tensor_name, self.model.initializer()) if weight is not None: return weight.data_type elif tensor_name in self.value_infos: vi = self.value_infos[tensor_name] if vi.type.HasField("tensor_type"): return vi.type.tensor_type.elem_type return None def _is_tensor_quantizable(self, tensor_name): """ Check if tensor can be quantized """ weight = find_by_name(tensor_name, self.model.initializer()) if weight is not None: if weight.data_type in (onnx_proto.TensorProto.FLOAT, onnx_proto.TensorProto.FLOAT16): return True elif tensor_name in self.value_infos: vi = self.value_infos[tensor_name] if vi.type.HasField("tensor_type") and vi.type.tensor_type.elem_type in ( TensorProto.FLOAT, TensorProto.FLOAT16, ): return True else: logging.warning( f"failed to infer the type of tensor: {tensor_name}. Skip to quantize it. Please check if it is expected." ) return False def __quantize_tensor(self, tensor_name, quant_sharing_provider=None, tensor_type=QDQQuantTensorType.ACTIVATION): """ Adds a tensor to the list (actually a dict) of tensors to quantize. Called indirectly by op quantizers that want to quantize a tensor (i.e., "mark" a tensor for quantization). If quant_sharing_provider is not None, tensor with name tensor_name will be quantized with the same quantization parameters as the node input specified in quant_sharing_provider. Ex: A Tranpose node's output will typically use the same quantization parameter initializers used at the Transpose node's input. Args: tensor_name: name of the tensor to quantize quant_sharing_provider: name of the tensor and node that provides quantization parameter tensor_type: QDQQuantTensorType default ACTIVATION """ if self._is_tensor_quantizable(tensor_name): if quant_sharing_provider: if not isinstance(quant_sharing_provider, QDQQuantParamProvider): raise TypeError( f"quant_sharing_provider must be of type QDQQuantParamProvider, not {type(quant_sharing_provider)}." ) data_type = self._get_tensor_type(tensor_name) self.tensors_to_quantize[tensor_name] = QDQTensorQuantInfo( tensor_type=tensor_type, quant_para_provider=quant_sharing_provider, data_type=data_type ) elif tensor_name not in self.tensors_to_quantize: data_type = self._get_tensor_type(tensor_name) self.tensors_to_quantize[tensor_name] = QDQTensorQuantInfo(tensor_type=tensor_type, data_type=data_type) def quantize_activation_tensor(self, tensor_name: str): """ Adds a tensor to the list of tensors to quantize. Called by op quantizers that want to quantize a tensor (i.e., "mark" a tensor for quantization). Args: tensor_name: name of the tensor to quantize """ return self.__quantize_tensor(tensor_name, None, QDQQuantTensorType.ACTIVATION) def quantize_output_same_as_input(self, output_name: str, input_name: str, node_name: str): """ Adds a tensor to the list of tensors to quantize. Called by op quantizers that want to quantize an output tensor using the same quantization parameters as one of the node's inputs. Ex: A Tranpose node's output will typically use the same quantization parameter initializers used at the Transpose node's input. Args: output_name: name of the node output to quantize so that it uses the same quantization params as an input. input_name: name of the node input from which the output tensor will get its quantization params. node_name: name of the node that consumes `input_name`. """ return self.__quantize_tensor( output_name, QDQQuantParamProvider(input_name, node_name), QDQQuantTensorType.ACTIVATION ) def quantize_weight_tensor(self, tensor_name: str): """ Adds a tensor to the list of weight tensors to quantize. Called by op quantizers that want to quantize a weight (i.e., "mark" a weight for quantization). Args: tensor_name: name of the weight to quantize """ return self.__quantize_tensor(tensor_name, None, QDQQuantTensorType.WEIGHT) def quantize_weight_tensor_per_channel(self, tensor_name, axis): weight = find_by_name(tensor_name, self.model.initializer()) if weight: if weight.data_type in (onnx_proto.TensorProto.FLOAT, onnx_proto.TensorProto.FLOAT16): self.tensors_to_quantize[tensor_name] = QDQTensorQuantInfo( tensor_type=QDQQuantTensorType.WEIGHT, axis=axis, data_type=weight.data_type ) else: logging.warning(f"only support per-channel quantization on weight. Tensor: {tensor_name} is not quantized.") def _dup_initializer(self, initializer: onnx.TensorProto) -> onnx.TensorProto: """ Duplicates an existing initializer and adds it to the model. Returns the new initializer. """ name_suffix: int = self.model.get_largest_initializer_name_suffix(initializer.name) + 1 new_initializer_name = f"{initializer.name}{name_suffix}" new_initializer = onnx.TensorProto() new_initializer.CopyFrom(initializer) new_initializer.name = new_initializer_name self.model.add_initializer(new_initializer) return new_initializer def quantize_bias_tensor(self, node_name, bias_name, input_name, weight_name, beta=1.0): """ Adds a bias tensor to the list of bias tensors to quantize. Called by op quantizers that want to quantize a bias with bias_zero_point = 0 and bias_scale = input_scale * weight_scale * beta. TODO: Explain the reasoning for using this formula. Args: node_name: name of the node that consumes the bias, input, and weight tensors. bias_name: name of the bias tensor to quantize. input_name: name of the input tensor whose scale is used to compute the bias's scale. weight_name: name of the weight tensor whose scale is used to compute the bias's scale. beta: Multiplier used to compute the bias's scale. """ # If the user provided quantization overrides for this tensor, treat it as a regular weight. if self.tensor_quant_overrides.get(bias_name): logging.info( f"Quantizing bias tensor '{bias_name}' as a weight due to the presence of user-specified overrides" ) is_per_channel, axis = self.is_tensor_per_channel(bias_name, default_axis=0) if is_per_channel: self.quantize_weight_tensor_per_channel(bias_name, axis) else: self.quantize_weight_tensor(bias_name) return bias_initializer = find_by_name(bias_name, self.model.initializer()) if bias_initializer is None: logging.warning(f"Expected bias '{bias_name}' to be an initializer") return if bias_initializer.data_type not in (onnx_proto.TensorProto.FLOAT, onnx_proto.TensorProto.FLOAT16): logging.info(f"Expected bias '{bias_name}' to be an floating-point initializer") return actual_bias_name = bias_name if bias_name in self.bias_to_quantize: # This bias input is consumed by two different nodes. We need to duplicate the bias so that # each node has its own bias input. This is necessary because the bias's scale is computed # from the node's other input scales. new_bias_initializer = self._dup_initializer(bias_initializer) actual_bias_name = new_bias_initializer.name # Replace this node's bias input self.model.replace_input_of_nodes(bias_name, actual_bias_name, {node_name}) logging.info(f"Created a copy of bias input '{bias_name}' called '{actual_bias_name}'") # Add this to our list of biases to quantize. self.bias_to_quantize[actual_bias_name] = QDQBiasQuantInfo(node_name, input_name, weight_name, beta) def _adjust_weight_scale_for_int32_bias( self, input_scale: np.ndarray, weight_scale: np.ndarray, weight_name: str, bias_tp: onnx.TensorProto, is_per_channel: bool, ) -> tuple[bool, np.ndarray | None]: """ Checks if the bias scale (input_scale * weight_scale) that we intend to use is too small. A bias scale that is too small leads to quantized bias values that fall outside the range of a int32 and have to be clipped, which decreases accuracy. If this function detects such a scenario, the weight_scale value will be increased to prevent this from happening. Although the adjustment method and amount differs, the idea to adjust the weight's scale came from the following reference: https://github.com/tensorflow/tensorflow/blob/master/tensorflow/lite/tools/optimize/quantization_utils.cc#L252 :param input_scale: The input's scale. :param weight_scale: The weight scale to potentially adjust. :param weight_name: The weight initializer's name. Used for logging. :param bias_tp: The bias ONNX initializer. :param is_per_channel: True if the bias and weight are quantized per-channel. :return: A tuple with a bool indicating if the weight's scale was adjusted and the new weight scale. """ if not weight_scale.size: return False, None bias_float_data = tensor_proto_to_array(bias_tp) int32_info = np.iinfo(np.int32) multiplicative_epsilon = 1.0001 qrange = np.array(int32_info.max, dtype=np.float64) - np.array(int32_info.min + 1, dtype=np.float64) weight_scale_dtype = weight_scale.dtype updated_an_elem = False if not is_per_channel: rmin = np.minimum(bias_float_data.min(), np.array(0, dtype=np.float64)) rmax = np.maximum(bias_float_data.max(), np.array(0, dtype=np.float64)) absmax = np.maximum(np.abs(rmin), np.abs(rmax)) bias_smallest_valid_scale = multiplicative_epsilon * (2.0 * absmax) / qrange input_scale_fp64 = np.array(input_scale.item(), dtype=np.float64) weight_scale_fp64 = np.array(weight_scale.item(), dtype=np.float64) bias_candidate_scale = input_scale_fp64 * weight_scale_fp64 if (bias_candidate_scale < bias_smallest_valid_scale) and (bias_candidate_scale > 0.0): # The candidate bias scale would be too small, so increase the weight_scale by the necessary ratio. ratio = bias_smallest_valid_scale / bias_candidate_scale logging.info( f"Increasing scale for weight `{weight_name}` by the ratio {ratio} to " f"ensure bias input `{bias_tp.name}` has a valid scale." ) new_scale = weight_scale_fp64 * ratio weight_scale = new_scale.astype(weight_scale_dtype) updated_an_elem = True elif weight_scale.shape and len(weight_scale.shape) == 1: # per-channel case num_elems = weight_scale.shape[0] for i in range(num_elems): bias_rmax = np.abs(bias_float_data[i]) bias_smallest_valid_scale = multiplicative_epsilon * (2.0 * bias_rmax) / qrange input_scale_fp64 = np.array(input_scale.item(), dtype=np.float64) weight_scale_fp64 = np.array(weight_scale[i].item(), dtype=np.float64) bias_candidate_scale = input_scale_fp64 * weight_scale_fp64 if (bias_candidate_scale < bias_smallest_valid_scale) and (bias_candidate_scale > 0.0): # The candidate bias scale would be too small, so increase the weight_scale by the necessary ratio. ratio = bias_smallest_valid_scale / bias_candidate_scale logging.info( f"Increased scale[{i}] for weight `{weight_name}` by ratio {ratio} " f"to ensure bias input `{bias_tp.name}` has a valid scale." ) new_scale = weight_scale_fp64 * ratio weight_scale[i] = new_scale.astype(weight_scale_dtype) updated_an_elem = True return updated_an_elem, weight_scale def _adjust_weight_quant_params_for_bias_tensors(self): """ Iterates through all bias inputs that should be quantized to int32. If the intended bias scale (equal to input_scale * weight_scale) is too small, this function will increase the associated weight's scale to ensure the bias does not overflow the int32 range when quantized. """ if self.qdq_disable_weight_adjust_for_int32_bias: # User passed an extra_option to disable this adjustment. return for bias_name, bias_info in self.bias_to_quantize.items(): if ( bias_info.input_name not in self.quantization_params or bias_info.input_name not in self.tensors_to_quantize or bias_info.weight_name not in self.initializer_quant_params ): continue # Get the associated input's scale. input_qparams = self.quantization_params[bias_info.input_name].get_for_consumer(bias_info.node_name) input_info = self.tensors_to_quantize[bias_info.input_name] input_scale = np.asarray( input_qparams["scale"], dtype=onnx.helper.tensor_dtype_to_np_dtype(input_info.data_type) ) weight_quant_params = self.initializer_quant_params[bias_info.weight_name] weight_quant_type = weight_quant_params["quant_type"] if weight_quant_type not in (onnx.TensorProto.INT8, onnx.TensorProto.INT16): continue weight_zero_point: np.ndarray = weight_quant_params["zero_point"] if weight_zero_point.any(): # Skip if zero_point(s) are not all zero (i.e., symmetric quant) continue weight_scale: np.ndarray = weight_quant_params["scale"] is_per_channel = weight_quant_params.get("axis", None) is not None # Get adjusted weight scales. did_update_weight_scale, new_weight_scale = self._adjust_weight_scale_for_int32_bias( input_scale, weight_scale, bias_info.weight_name, find_by_name(bias_name, self.model.initializer()), is_per_channel, ) if did_update_weight_scale: weight_quant_params["scale"] = new_weight_scale def remove_node(self, node): self.nodes_to_remove.append(node) def remove_nodes(self): self.model.remove_nodes(self.nodes_to_remove) def quantize_model(self): for node in self.model.nodes(): if self.should_quantize_node(node): op_quantizer = CreateQDQQuantizer(self, node) op_quantizer.quantize() for tensor_name in node.input: if tensor_name not in self.tensor_to_its_receiving_nodes: self.tensor_to_its_receiving_nodes[tensor_name] = [] self.tensor_to_its_receiving_nodes[tensor_name].append(node) self.initializer_quant_params = self._calc_initializer_quant_params() self._adjust_weight_quant_params_for_bias_tensors() self._quantize_normal_tensors() self._quantize_sharing_param_tensors() if self.quantize_bias: self._quantize_bias_tensors() self.remove_nodes() if not self.add_qdq_pair_to_weight: self.model.clean_initializers() self.model.model.producer_name = __producer__ self.model.model.producer_version = __version__ if self.qdq_op_domain == ms_domain: self.model.set_opset_import(ms_domain, 1) return self.model.model def try_replacing_upstream_output(self, upstream_output_name, output_name): if ( output_name in self.quantization_params and self.quantization_params[output_name].converted is None and self.quantization_params[upstream_output_name].converted is None and len(self.model.input_name_to_nodes()[upstream_output_name]) == 1 and not self.model.is_graph_output(upstream_output_name) and not self.model.is_graph_input(upstream_output_name) ): self.model.replace_output_of_all_nodes(upstream_output_name, output_name) if upstream_output_name in self.tensors_to_quantize: del self.tensors_to_quantize[upstream_output_name] return True return False def _create_q_node( self, q_input: str, q_output: str, quant_node_name: str, scale_name: str, zp_name: str, axis: int | None = None, ): """ Creates a QuantizeLinear node and adds it to the model. """ qlinear_node = onnx.helper.make_node( QUANT_OP_NAME, [q_input, scale_name, zp_name], [q_output], quant_node_name, axis=axis, domain=self.qdq_op_domain, ) self.model.add_nodes([qlinear_node]) def _create_dq_node( self, dq_input: str, dq_output: str, dequant_node_name: str, scale_name: str, zp_name: str, axis: int | None = None, ): """ Creates a DequantizeLinear node and adds it to the model. """ dequant_node = onnx.helper.make_node( DEQUANT_OP_NAME, [dq_input, scale_name, zp_name], [dq_output], dequant_node_name, axis=axis, domain=self.qdq_op_domain, ) self.model.add_nodes([dequant_node]) def _create_qdq_nodes( self, q_input, q_output, quant_node_name, dq_input, dq_output, dequant_node_name, scale_name, zp_name, axis=None ): qlinear_node = onnx.helper.make_node( QUANT_OP_NAME, [q_input, scale_name, zp_name], [q_output], quant_node_name, axis=axis, domain=self.qdq_op_domain, ) dequant_node = onnx.helper.make_node( DEQUANT_OP_NAME, [dq_input, scale_name, zp_name], [dq_output], dequant_node_name, axis=axis, domain=self.qdq_op_domain, ) self.model.add_nodes([qlinear_node, dequant_node]) def _add_qdq_nodes_for_initializer(self, weight_proto: onnx.TensorProto): """ Adds Q/DQ nodes for an initializer. If `self.add_qdq_pair_to_weight` is true, creates the sequence (weight_f32 -> Q -> DQ -> ). Otherwise, this function quantizes the initializer and adds the sequence (weight_quant -> DQ ->). """ weight_name = weight_proto.name if weight_name in self.quantized_value_map: return quant_params: QuantizationParams = self.initializer_quant_params[weight_name] axis: int = quant_params.get("axis") scale_zp_initializers = self._make_scale_zp_initializers(weight_name, quant_params) q_weight_name: str | None = None weight_dequant_output = add_dequant_output_suffix(weight_name) self.model.replace_input_of_all_nodes(weight_name, weight_dequant_output) if self.add_qdq_pair_to_weight: # Don't actually quantize the weight. Instead, keep floating-point weight and create the node # sequence (weight_f32 -> Q -> DQ -> weight_dequant) weight_quant_output = add_quant_output_suffix(weight_name) self._create_qdq_nodes( weight_name, weight_quant_output, add_quant_suffix(weight_name), weight_quant_output, weight_dequant_output, add_dequant_suffix(weight_name), scale_zp_initializers.scale.name, scale_zp_initializers.zero_point.name, axis, ) else: # Quantize the weight and create the node sequence: # (weight_quantized -> DQ -> weight_dequant) quant_weight = quantize_onnx_initializer( weight_proto, quant_params["quant_type"], quant_params["zero_point"], quant_params["scale"], axis, ) self.model.add_initializer(quant_weight) q_weight_name = quant_weight.name dequant_node = onnx.helper.make_node( DEQUANT_OP_NAME, [quant_weight.name, scale_zp_initializers.scale.name, scale_zp_initializers.zero_point.name], [weight_dequant_output], add_dequant_suffix(weight_name), axis=axis, domain=self.qdq_op_domain, ) self.model.add_node(dequant_node) # Log entry for this quantized weight quantized_value = QuantizedValue( weight_name, q_weight_name, scale_zp_initializers.scale.name, scale_zp_initializers.zero_point.name, QuantizedValueType.Initializer, axis=axis, ) self.quantized_value_map[weight_name] = QDQTensorQuantizedValue(quantized_value, None, None) def _add_qdq_pair_for_activation(self, tensor_name, scale_name, zp_name, data_type=None): if ( self.dedicated_qdq_pair and tensor_name in self.tensor_to_its_receiving_nodes and len(self.tensor_to_its_receiving_nodes[tensor_name]) > 1 ): num_dedicated_qdq_pair = len(self.tensor_to_its_receiving_nodes[tensor_name]) for i in range(num_dedicated_qdq_pair): postfix = f"_{i + 1}" tensor_name_quant_output_postfix = add_quant_output_suffix(tensor_name) + postfix tensor_name_dequant_output_postfix = add_dequant_output_suffix(tensor_name) + postfix quant_node_name_postfix = add_quant_suffix(tensor_name) + postfix dequant_node_name_postfix = add_dequant_suffix(tensor_name) + postfix self._create_qdq_nodes( tensor_name, tensor_name_quant_output_postfix, quant_node_name_postfix, tensor_name_quant_output_postfix, tensor_name_dequant_output_postfix, dequant_node_name_postfix, scale_name, zp_name, ) node = self.tensor_to_its_receiving_nodes[tensor_name][i] self.model.replace_node_input(node, tensor_name, tensor_name_dequant_output_postfix) if i == 0: quantized_value = QuantizedValue( tensor_name, tensor_name_dequant_output_postfix, scale_name, zp_name, QuantizedValueType.Input, scale_type=data_type, ) self.quantized_value_map[tensor_name] = QDQTensorQuantizedValue(quantized_value, None, None) else: q_input = tensor_name dq_output = add_dequant_output_suffix(tensor_name) if self.model.is_graph_output(tensor_name): q_input = add_quant_input_suffix(tensor_name) dq_output = tensor_name self.model.replace_output_of_all_nodes(tensor_name, q_input) else: self.model.replace_input_of_all_nodes(tensor_name, dq_output) self._create_qdq_nodes( q_input, add_quant_output_suffix(tensor_name), add_quant_suffix(tensor_name), add_quant_output_suffix(tensor_name), dq_output, add_dequant_suffix(tensor_name), scale_name, zp_name, ) quantized_value = QuantizedValue( tensor_name, dq_output, scale_name, zp_name, QuantizedValueType.Input, scale_type=data_type, ) self.quantized_value_map[tensor_name] = QDQTensorQuantizedValue(quantized_value, None, None) def _add_qdq_ops_for_converted_activation( self, tensor_name, first_scale_name, first_zp_name, scale_data_type, convert_scale_name, convert_zp_name, convert_recv_nodes, ): """ Adds Q and DQ ops to a tensor whose quantized data type is converted. That is, some consumers may use the original data type from the producer, while other consumers use the converted data type. This is generally done by adding a sequence of ops that convert from one data type (e.g., uint8) to another (e.g., uint16). T_float ---> Quant(to u8) ---> Convert(to u16) ---> Dequant(to float) ---> T_float' where Convert(to u16) is equivalent to: ---> Dequant(to float) ---> Quant(to u16) ---> This function handles the following scenarios: 1) Tensor T is not a graph output; all consumers use the converted type <Producer> ---> Q1 ---> DQ1 ---> Q2 ---> DQ2 ---> <Consumers> 2) Tensor T is not a graph output; some consumers use the original type, others use the converted type <Producer> ---> Q1 -+-> DQ1 ---> <Consumers of original type> | +-> DQ1' ---> Q2 ---> DQ2 ---> <Consumers of converted type> 3) Tensor T is a graph output; all consumers use the converted type <Producer> ---> Q1 ---> DQ1 ---> Q2 ---> DQ2 -+-> <Consumers> | +-> <Graph output> 4) Tensor T is a graph output; some consumers use the original type, others use the converted type <Producer> ---> Q1 -+-> DQ1 -+-> <Consumers of original type> | | | +-> <Graph output> | +-> DQ1' ---> Q2 ---> DQ2 ---> <Consumers of converted type> 5) Tensor T is a graph output that is not consumed by any other nodes. <Producer> ---> Q1 ---> DQ1 ---> Q2 ---> DQ2 ---> <Graph output> """ tensor_recv_nodes = set([node.name for node in self.tensor_to_its_receiving_nodes.get(tensor_name, [])]) if ( self.dedicated_qdq_pair and tensor_name in self.tensor_to_its_receiving_nodes and len(self.tensor_to_its_receiving_nodes[tensor_name]) > 1 ): # TODO: Add support for dedicated_qdq_pair if/when needed. raise ValueError( "Do not currently support converted quant_types in TensorQuantOverrides when the `dedicated_qdq_pair` extra_option is enabled" ) # Determine which nodes consume the original quantized type and which nodes # consume the converted quantized type. original_recv_nodes = tensor_recv_nodes if convert_recv_nodes is None: # In this case, all consumers receive the converted type. convert_recv_nodes = tensor_recv_nodes original_recv_nodes = set() else: original_recv_nodes = original_recv_nodes - convert_recv_nodes all_use_converted = len(convert_recv_nodes) == len(tensor_recv_nodes) is_graph_output = self.model.is_graph_output(tensor_name) # Create first Q op. first_q_input = tensor_name if is_graph_output: first_q_input = add_quant_input_suffix(tensor_name) self.model.replace_output_of_all_nodes(tensor_name, first_q_input) first_q_output = add_quant_output_suffix(tensor_name) self._create_q_node( first_q_input, first_q_output, add_quant_suffix(tensor_name), first_scale_name, first_zp_name ) # Create first DQ op. first_dq_output = add_dequant_output_suffix(tensor_name) if is_graph_output and not all_use_converted: first_dq_output = tensor_name if original_recv_nodes and first_dq_output != tensor_name: self.model.replace_input_of_nodes(tensor_name, first_dq_output, original_recv_nodes) self._create_dq_node( first_q_output, first_dq_output, add_dequant_suffix(tensor_name), first_scale_name, first_zp_name ) # Create parallel clone of first DQ op if _not all_ consumers use the converted type. # --> DQ1' --> Q2 --> DQ2 --> <Consumers of converted type> # # This DQ clone would only have one consumer Q node (Q2) and could be potentially fused with # it by some EPs (e.g., QNN) without breaking other "node units". # Ex QNN fusion: # --> Convert (fused) --> DQ2 --> <Consumers of converted type> second_q_input = first_dq_output if not all_use_converted: second_q_input = add_quant_input_suffix(f"{tensor_name}_convert") self._create_dq_node( first_q_output, second_q_input, add_dequant_suffix(f"{tensor_name}_convert_clone"), first_scale_name, first_zp_name, ) # Create second Q op. second_q_output = add_quant_output_suffix(f"{tensor_name}_convert") self._create_q_node( second_q_input, second_q_output, add_quant_suffix(f"{tensor_name}_convert"), convert_scale_name, convert_zp_name, ) # Create second DQ op. second_dq_output = add_dequant_output_suffix(f"{tensor_name}_convert") if is_graph_output and all_use_converted: second_dq_output = tensor_name if convert_recv_nodes and second_dq_output != tensor_name: self.model.replace_input_of_nodes(tensor_name, second_dq_output, convert_recv_nodes) self._create_dq_node( second_q_output, second_dq_output, add_dequant_suffix(f"{tensor_name}_convert"), convert_scale_name, convert_zp_name, ) # Store in quantized_value_map original_quantized_value = QuantizedValue( tensor_name, first_dq_output, first_scale_name, first_zp_name, QuantizedValueType.Input, scale_type=scale_data_type, ) converted_quantized_value = QuantizedValue( tensor_name, second_dq_output, convert_scale_name, convert_zp_name, QuantizedValueType.Input, scale_type=scale_data_type, ) self.quantized_value_map[tensor_name] = QDQTensorQuantizedValue( original_quantized_value, converted_quantized_value, convert_recv_nodes ) def _quantize_normal_tensors(self): """ Adds Q/DQ ops to tensors (activations and weights) that have been marked for quantization by op quantizers. """ for tensor_name, tensor_info in self.tensors_to_quantize.copy().items(): if tensor_name in self.quantized_value_map: continue if not tensor_info.is_shared: # Quantize the input initializer = find_by_name(tensor_name, self.model.initializer()) if initializer: self._add_qdq_nodes_for_initializer(initializer) else: tensor_qparam_initializers = self._make_tensor_scale_zp_initializers(tensor_name) if not tensor_qparam_initializers: raise ValueError( f"Quantization parameters are not specified for param {tensor_name}. " "In static mode quantization params for inputs and outputs of nodes to be quantized are required." ) if tensor_qparam_initializers.converted is None: # Normal case: <producer> --> Q --> DQ --> <consumers> self._add_qdq_pair_for_activation( tensor_name, tensor_qparam_initializers.original.scale.name, tensor_qparam_initializers.original.zero_point.name, data_type=tensor_info.data_type, ) else: # Conversion case: <producer> ---> Q1 -+-> DQ1 --> <consumers of original type> # | # +-> DQ1' --> Q2 --> DQ2 --> <consumers of converted type> assert tensor_info.data_type == tensor_qparam_initializers.original.scale.data_type self._add_qdq_ops_for_converted_activation( tensor_name, tensor_qparam_initializers.original.scale.name, tensor_qparam_initializers.original.zero_point.name, tensor_info.data_type, tensor_qparam_initializers.converted.scale.name, tensor_qparam_initializers.converted.zero_point.name, tensor_qparam_initializers.converted_recv_nodes, ) del self.tensors_to_quantize[tensor_name] def _quantize_sharing_param_tensors(self): """ Adds Q/DQ ops to tensors that have been marked for quantization by op quantizers. Only operates on tensors that want to use the quantization parameter initializers from an upstream tensor. For example, a Transpose node's output tensor will typically want to use the same quantization parameter initializers as the Transpose node's input. """ while self.tensors_to_quantize: for tensor_name, tensor_info in self.tensors_to_quantize.copy().items(): quant_provider = tensor_info.quant_para_provider if quant_provider and quant_provider.input_name in self.quantized_value_map: del self.tensors_to_quantize[tensor_name] quantized_value = self.quantized_value_map[quant_provider.input_name].get_for_consumer( quant_provider.node_name ) if self.is_input_a_initializer(tensor_name): raise ValueError("Quantization parameter shared mode is not supported for weight yet") # Need to check if this tensor's quant_type is converted for some consumers. # If so, create new scale/zp initializers for these consumers. converted_qparam_inits = None converted_recv_nodes = None if tensor_name in self.quantization_params: tensor_params = self.quantization_params[tensor_name] if tensor_params.converted: converted_qparam_inits = self._make_scale_zp_initializers( tensor_name, tensor_params.converted, "_convert" ) converted_recv_nodes = tensor_params.converted_recv_nodes if converted_qparam_inits is None: # Normal case: <producer> --> Q_shared --> DQ_shared --> <consumers> self._add_qdq_pair_for_activation( tensor_name, quantized_value.scale_name, quantized_value.zp_name ) else: # Conversion case: <producer> ---> Q_shared -+-> DQ_shared --> <consumers of original type> # | # +-> DQ_shared' --> Q2 --> DQ2 --> <consumers of converted type> self._add_qdq_ops_for_converted_activation( tensor_name, quantized_value.scale_name, quantized_value.zp_name, converted_qparam_inits.scale.data_type, converted_qparam_inits.scale.name, converted_qparam_inits.zero_point.name, converted_recv_nodes, ) def _quantize_bias_tensors(self): """ Adds DQ ops (or Cast) for bias tensors that have been marked for quantization by op quantizers. """ for bias_name, bias_info in self.bias_to_quantize.items(): if bias_name in self.quantized_value_map: continue # Quantize the input self.quantize_bias_static(bias_name, bias_info) init = find_by_name(bias_name, self.model.initializer()) self.model.remove_initializer(init) quant_value = self.quantized_value_map[bias_name].original if quant_value.node_type == "Cast": # simple cast to float 16 and not DequantizeLinear # cublasLtMatmul only supports (b)float16, float bias. if not isinstance(init.data_type, int): raise TypeError(f"Unexpected type {type(init.data_type)} for input={bias_info.input_name!r}") node_name = add_dequant_suffix(bias_name) dequant_node = onnx.helper.make_node( "Cast", [quant_value.q_name], [bias_name], name=node_name, to=init.data_type, ) elif quant_value.node_type in (None, "DequantizeLinear"): if quant_value.node_qtype in { onnx.TensorProto.FLOAT16, onnx.TensorProto.BFLOAT16, onnx.TensorProto.FLOAT, }: raise RuntimeError(f"Unexpected quantize type {quant_value.node_qtype} for DequantizeLinear.") inputs = [quant_value.q_name, quant_value.scale_name, quant_value.zp_name] node_name = add_dequant_suffix(bias_name) if quant_value.axis is not None: dequant_node = onnx.helper.make_node( "DequantizeLinear", inputs, [bias_name], node_name, axis=quant_value.axis, domain=self.qdq_op_domain, ) else: dequant_node = onnx.helper.make_node( "DequantizeLinear", inputs, [bias_name], node_name, domain=self.qdq_op_domain, ) else: raise RuntimeError(f"Unexpected operator type {quant_value.node_type!r}.") self.model.add_node(dequant_node) def is_tensor_quantized(self, tensor_name: str): return tensor_name in self.tensors_to_quantize or tensor_name in self.bias_to_quantize def is_tensor_per_channel( self, tensor_name: str, default_axis: int, op_type: str | None = None, ) -> tuple[bool, int | None]: """ Checks if a given tensor is configured to be quantized per-channel. If so, also returns the channel axis. ORT only supports per-channel quantization on static weights (i.e., ONNX initializers). If the user did not provide tensor quantization overrides for this tensor, then the value of self.per_channel determines if the weight is to be quantized per-channel. Params: tensor_name: The name of the tensor to check. default_axis: The default channel axis. This method checks if the normalized axis is within bounds. Can be overridden via the extra_options 'QDQOpTypePerChannelSupportToAxis' and 'TensorQuantOverrides'. op_type: Optional, defaults to None. The operator type that is the only consumer of this weight. Used to access the extra option 'QDQOpTypePerChannelSupportToAxis'. Returns: A tuple (is_per_channel, axis) in which the first element indicates whether the tensor is quantized per-channel and the second element is the channel axis. The returned axis is only None if the tensor is not per-channel or the axis is out of bounds. """ weight_initializer = self.initializers.get(tensor_name) if weight_initializer is None: return False, None # Only support per-channel weights if self.tensor_quant_overrides.has_per_tensor_overrides(tensor_name): return False, None # User provided per-tensor overrides for this initializer has_per_chan_overrides = self.tensor_quant_overrides.has_per_channel_overrides(tensor_name) if not self.per_channel and not has_per_chan_overrides: return False, None # global self.per_channel is off and user did not provide per-channel overrides. axis = self.qdq_op_type_per_channel_support_to_axis.get(op_type, default_axis) if op_type else default_axis if has_per_chan_overrides: per_chan_overrides = self.tensor_quant_overrides.get_per_channel_overrides(tensor_name) axis = per_chan_overrides[0]["axis"] # Prefer axis from user-specified tensor-level overrides if available weight_rank = len(weight_initializer.dims) axis_valid, axis = normalize_axis(axis, weight_rank) if not axis_valid: logging.warning(f"Axis {axis} is out-of-range for weight '{tensor_name}' with rank {weight_rank}") return False, None return True, axis def quantize_bias_static(self, bias_name: str, bias_info: QDQBiasQuantInfo) -> str: """ Quantized the bias. Zero Point == 0 and Scale == Input_Scale * Weight_Scale """ # Handle case where bias already in quantization map if bias_name in self.quantized_value_map: return self.quantized_value_map[bias_name].original.q_name # get scale for weight weight_scale_name = self.quantized_value_map[bias_info.weight_name].original.scale_name weight_scale_initializer = find_by_name(weight_scale_name, self.model.initializer()) weight_scale = tensor_proto_to_array(weight_scale_initializer) # get scale for input input_scale_name = ( self.quantized_value_map[bias_info.input_name].get_for_consumer(bias_info.node_name).scale_name ) input_scale_initializer = find_by_name(input_scale_name, self.model.initializer()) input_scale = tensor_proto_to_array(input_scale_initializer) ( quantized_bias_name, quantized_bias_scale_name, quantized_bias_zp_name, bias_scale_data, node_type, node_qtype, ) = self.quantize_bias_static_impl(bias_name, input_scale, weight_scale, bias_info.beta) quantized_value = QuantizedValue( bias_name, quantized_bias_name, quantized_bias_scale_name, quantized_bias_zp_name, QuantizedValueType.Initializer, 0 if bias_scale_data.size > 1 else None, node_type=node_type, node_qtype=node_qtype, ) self.quantized_value_map[bias_name] = QDQTensorQuantizedValue(quantized_value, None, None) return quantized_bias_name def _make_scale_zp_initializers( self, param_name: str, quant_params: QuantizationParams, init_name_suffix: str = "" ) -> QDQScaleZpInitializers: """ Creates and returns scale and zero-point initializers for the given quantization params. The initializers are named: - {param_name}_zero_point{init_name_suffix} - {param_name}_scale{init_name_suffix} """ zero_point = quant_params["zero_point"] scale = quant_params["scale"] zero_point_type = quant_params["quant_type"] axis: int | None = quant_params.get("axis") assert (axis is not None and len(scale.shape) == 1) or ( axis is None and len(scale.shape) == 0 ), "Wrong scale/zp shapes" assert len(scale.shape) == len(zero_point.shape), "Scale and zero-point must have the same rank" zero_point_name = param_name + "_zero_point" + init_name_suffix scale_name = param_name + "_scale" + init_name_suffix # Add initializers to model init_zp = onnx.helper.make_tensor( zero_point_name, zero_point_type, zero_point.shape, zero_point.ravel().tolist() ) self.model.add_initializer(init_zp) if scale.dtype == np.float32: scale_type = onnx_proto.TensorProto.FLOAT elif scale.dtype == np.float16: scale_type = onnx_proto.TensorProto.FLOAT16 else: raise ValueError(f"Unexpected dtype={scale.dtype} for param_name={param_name!r}") init_scale = onnx.helper.make_tensor(scale_name, scale_type, scale.shape, scale.ravel().tolist()) self.model.add_initializer(init_scale) return QDQScaleZpInitializers(init_scale, init_zp) def _make_tensor_scale_zp_initializers(self, tensor_name: str) -> QDQTensorScaleZpInitializers | None: """ Create and returns all scale/zero_point initializers for a given tensor. If the tensor is converted to a different quantization type, this function creates two pairs of zp/scale initializers. Otherwise, only one pair of zp/scale initializers is created. """ if self.quantization_params is None or tensor_name not in self.quantization_params: logging.info(f'Quantization parameters for tensor:"{tensor_name}" not specified') return None tensor_params = self.quantization_params[tensor_name] if not isinstance(tensor_params, QDQTensorQuantParams): raise TypeError(f"Unexpected type {type(tensor_params)} for {tensor_name!r}.") original_inits = self._make_scale_zp_initializers(tensor_name, tensor_params.original) converted_inits = ( self._make_scale_zp_initializers(tensor_name, tensor_params.converted, "_convert") if tensor_params.converted else None ) return QDQTensorScaleZpInitializers(original_inits, converted_inits, tensor_params.converted_recv_nodes) def calc_quant_params(self, tensor_data: TensorData, quant_overrides: dict[str, Any]) -> QuantizationParams: """ Calculates quantization parameters (scale/zero-point) given a tensor's min/max range and optional user-provided overrides. """ quant_type = self.activation_qType if "quant_type" in quant_overrides: quant_type = quant_overrides["quant_type"].tensor_type if "scale" in quant_overrides and "zero_point" in quant_overrides: zero, scale = quant_overrides["zero_point"], quant_overrides["scale"] elif quant_type == onnx.TensorProto.FLOAT8E4M3FN: zero, scale = compute_scale_zp_float8(quant_type, tensor_data.avg_std[1]) else: rmin = quant_overrides.get("rmin", tensor_data.range_value[0]) rmax = quant_overrides.get("rmax", tensor_data.range_value[1]) symmetric = quant_overrides.get("symmetric", self.is_activation_symmetric) reduce_range = quant_overrides.get("reduce_range", False) qmin, qmax = get_qmin_qmax_for_qType(quant_type, reduce_range=reduce_range, symmetric=symmetric) zero, scale = compute_scale_zp(rmin, rmax, qmin, qmax, symmetric, self.min_real_range) return QuantizationParams(zero_point=zero.squeeze(), scale=scale.squeeze(), quant_type=quant_type) def calc_graph_quant_params(self) -> dict[str, QDQTensorQuantParams]: """ Calculates quantization parameters (scale/zero-point) for all tensors in the graph using each tensor's min/max range and optional user-provided overrides. """ if self.tensors_range is None: return {} self.adjust_tensor_ranges() quantization_params = {} for tensor_name in self.tensors_range: td = self.tensors_range[tensor_name] if not isinstance(td, TensorData): raise TypeError(f"Unexpected type {type(td)} for {tensor_name!r}.") quant_overrides = self.tensor_quant_overrides.get_per_tensor_overrides(tensor_name, default_val={}) original = self.calc_quant_params(td, quant_overrides) converted = None converted_recv_nodes = None if "convert" in quant_overrides: converted = self.calc_quant_params(td, quant_overrides["convert"]) converted_recv_nodes = quant_overrides["convert"].get("recv_nodes") quantization_params[tensor_name] = QDQTensorQuantParams(original, converted, converted_recv_nodes) return quantization_params def _calc_initializer_quant_params(self) -> dict[str, QuantizationParams]: """ Returns quantization parameters (scale/zero_point/quant_type) for all initializers. """ quantization_params: dict[str, QuantizationParams] = {} for tensor_name, tensor_info in self.tensors_to_quantize.items(): initializer = find_by_name(tensor_name, self.model.initializer()) if not initializer: continue initializer_data = tensor_proto_to_array(initializer) initializer_rank = len(initializer_data.shape) # initializers for elementwise ops use the quant_type for activations. is_weight = tensor_info.tensor_type is QDQQuantTensorType.WEIGHT quant_type = self.weight_qType if is_weight else self.activation_qType # Try to get scale/zp directly from user's overrides and avoid computation. if self.tensor_quant_overrides.overrides_scale_zp(tensor_name): overrides = self.tensor_quant_overrides[tensor_name] if "quant_type" in overrides[0]: quant_type = overrides[0]["quant_type"].tensor_type zp_dtype = ONNX_TYPE_TO_NP_TYPE[quant_type] is_per_channel = "axis" in overrides[0] if not is_per_channel: quantization_params[tensor_name] = QuantizationParams( zero_point=np.array(overrides[0]["zero_point"], dtype=zp_dtype), scale=np.array(overrides[0]["scale"], initializer_data.dtype), quant_type=quant_type, ) else: zero_points_list = [] scales_list = [] for chan_overrides in overrides: zero_points_list.append(np.array(chan_overrides["zero_point"], zp_dtype)) scales_list.append(np.array(chan_overrides["scale"], dtype=initializer_data.dtype)) channel_axis = overrides[0]["axis"] is_axis_valid, norm_channel_axis = normalize_axis(channel_axis, initializer_rank) if not is_axis_valid: raise ValueError( f"Weight {initializer.name} has a per-channel axis with value {channel_axis} that is " f"out-of-bounds for rank {initializer_rank}" ) quantization_params[tensor_name] = QuantizationParams( zero_point=np.array(zero_points_list), scale=np.array(scales_list), quant_type=quant_type, axis=norm_channel_axis, ) continue # Compute scale/zp normally. User's overrides may still override parameters # used to compute the scale/zp (e.g., rmin, rmax, symmetric, etc.) overrides = self.tensor_quant_overrides.get(tensor_name, [{}]) if "quant_type" in overrides[0]: quant_type = overrides[0]["quant_type"].tensor_type channel_axis = overrides[0].get("axis", tensor_info.axis) is_per_channel = channel_axis is not None # Note: always quantize per-channel initializers as symmetric because QLinear* ops require the # same zero-point in every channel, which is necessarily the case for symmetric quantization. is_symmetric_default = is_per_channel or ( self.is_weight_symmetric(quant_type) if is_weight else self.is_activation_symmetric ) is_symmetric = overrides[0].get("symmetric", is_symmetric_default) reduce_range = overrides[0].get("reduce_range", self.reduce_range) zero_point: np.ndarray | None = None scale: np.ndarray | None = None if not is_per_channel: zero_point, scale = compute_data_quant_params( initializer_data.flatten(), quant_type, is_symmetric, reduce_range=reduce_range, min_real_range=self.min_real_range, rmin_override=overrides[0].get("rmin"), rmax_override=overrides[0].get("rmax"), ) else: is_axis_valid, norm_channel_axis = normalize_axis(channel_axis, initializer_rank) if not is_axis_valid: raise ValueError( f"Weight {initializer.name} has a per-channel axis with value {channel_axis} that is " f"out-of-bounds for rank {initializer_rank}" ) channel_axis = norm_channel_axis channel_count = initializer_data.shape[channel_axis] zero_points_list = [] scales_list = [] for i in range(channel_count): per_channel_data = initializer_data.take(i, channel_axis) channel_overrides = overrides[i] if overrides and i < len(overrides) else {} channel_zero_point, channel_scale = compute_data_quant_params( per_channel_data.ravel(), quant_type, is_symmetric, reduce_range=reduce_range, min_real_range=self.min_real_range, rmin_override=channel_overrides.get("rmin"), rmax_override=channel_overrides.get("rmax"), ) zero_points_list.append(channel_zero_point) scales_list.append(channel_scale) zero_point = np.asarray(zero_points_list) scale = np.asarray(scales_list) quantization_params[tensor_name] = QuantizationParams( zero_point=zero_point, scale=scale, quant_type=quant_type, axis=channel_axis, ) return quantization_params
Memory