# coding=utf-8 # Copyright 2024 Cohere Inc. HuggingFace Inc. team. All rights reserved. # # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. from functools import partial from typing import Callable, Optional, Tuple import torch import torch.nn as nn import torch.utils.checkpoint from ...cache_utils import Cache, HybridCache from ...configuration_utils import PretrainedConfig from ...modeling_flash_attention_utils import FlashAttentionKwargs from ...modeling_outputs import ( BaseModelOutputWithPast, ) from ...modeling_rope_utils import rope_config_validation from ...modeling_utils import ALL_ATTENTION_FUNCTIONS from ...processing_utils import Unpack from ...utils import ( logging, ) from ..cohere.modeling_cohere import ( CohereAttention, CohereDecoderLayer, CohereForCausalLM, CohereLayerNorm, CoherePreTrainedModel, CohereRotaryEmbedding, apply_rotary_pos_emb, eager_attention_forward, ) from ..gemma2.modeling_gemma2 import Gemma2Model logger = logging.get_logger(__name__) class Cohere2Config(PretrainedConfig): r""" This is the configuration class to store the configuration of a [`CohereModel`]. It is used to instantiate an Cohere model according to the specified arguments, defining the model architecture. Configuration objects inherit from [`PretrainedConfig`] and can be used to control the model outputs. Read the documentation from [`PretrainedConfig`] for more information. Instantiating a configuration with the defaults will yield a similar configuration to that of the [CohereForAI/c4ai-command-r-v01](https://huggingface.co/CohereForAI/c4ai-command-r-v01) model. Args: vocab_size (`int`, *optional*, defaults to 256000): Vocabulary size of the Cohere model. Defines the number of different tokens that can be represented by the `inputs_ids` passed when calling [`CohereModel`] hidden_size (`int`, *optional*, defaults to 8192): Dimension of the hidden representations. intermediate_size (`int`, *optional*, defaults to 22528): Dimension of the MLP representations. logit_scale (`float`, *optional*, defaults to 0.0625): The scaling factor for the output logits. num_hidden_layers (`int`, *optional*, defaults to 40): Number of hidden layers in the Transformer decoder. num_attention_heads (`int`, *optional*, defaults to 64): Number of attention heads for each attention layer in the Transformer decoder. num_key_value_heads (`int`, *optional*): This is the number of key_value heads that should be used to implement Grouped Query Attention. If `num_key_value_heads=num_attention_heads`, the model will use Multi Head Attention (MHA), if `num_key_value_heads=1` the model will use Multi Query Attention (MQA) otherwise GQA is used. When converting a multi-head checkpoint to a GQA checkpoint, each group key and value head should be constructed by meanpooling all the original heads within that group. For more details checkout [this paper](https://arxiv.org/pdf/2305.13245.pdf). If it is not specified, will default to `num_attention_heads`. hidden_act (`str` or `function`, *optional*, defaults to `"silu"`): The non-linear activation function (function or string) in the decoder. max_position_embeddings (`int`, *optional*, defaults to 8192): The maximum sequence length that this model might ever be used with. initializer_range (`float`, *optional*, defaults to 0.02): The standard deviation of the truncated_normal_initializer for initializing all weight matrices. layer_norm_eps (`float`, *optional*, defaults to 1e-05): The epsilon used by the layer normalization. use_cache (`bool`, *optional*, defaults to `True`): Whether or not the model should return the last key/values attentions (not used by all models). Only relevant if `config.is_decoder=True`. pad_token_id (`int`, *optional*, defaults to 0): Padding token id. bos_token_id (`int`, *optional*, defaults to 5): Beginning of stream token id. eos_token_id (`int`, *optional*, defaults to 255001): End of stream token id. tie_word_embeddings (`bool`, *optional*, defaults to `True`): Whether to tie weight embeddings rope_theta (`float`, *optional*, defaults to 10000.0): The base period of the RoPE embeddings. rope_scaling (`Dict`, *optional*): Dictionary containing the scaling configuration for the RoPE embeddings. NOTE: if you apply new rope type and you expect the model to work on longer `max_position_embeddings`, we recommend you to update this value accordingly. Expected contents: `rope_type` (`str`): The sub-variant of RoPE to use. Can be one of ['default', 'linear', 'dynamic', 'yarn', 'longrope', 'llama3'], with 'default' being the original RoPE implementation. `factor` (`float`, *optional*): Used with all rope types except 'default'. The scaling factor to apply to the RoPE embeddings. In most scaling types, a `factor` of x will enable the model to handle sequences of length x * original maximum pre-trained length. `original_max_position_embeddings` (`int`, *optional*): Used with 'dynamic', 'longrope' and 'llama3'. The original max position embeddings used during pretraining. `attention_factor` (`float`, *optional*): Used with 'yarn' and 'longrope'. The scaling factor to be applied on the attention computation. If unspecified, it defaults to value recommended by the implementation, using the `factor` field to infer the suggested value. `beta_fast` (`float`, *optional*): Only used with 'yarn'. Parameter to set the boundary for extrapolation (only) in the linear ramp function. If unspecified, it defaults to 32. `beta_slow` (`float`, *optional*): Only used with 'yarn'. Parameter to set the boundary for interpolation (only) in the linear ramp function. If unspecified, it defaults to 1. `short_factor` (`List[float]`, *optional*): Only used with 'longrope'. The scaling factor to be applied to short contexts (< `original_max_position_embeddings`). Must be a list of numbers with the same length as the hidden size divided by the number of attention heads divided by 2 `long_factor` (`List[float]`, *optional*): Only used with 'longrope'. The scaling factor to be applied to long contexts (< `original_max_position_embeddings`). Must be a list of numbers with the same length as the hidden size divided by the number of attention heads divided by 2 `low_freq_factor` (`float`, *optional*): Only used with 'llama3'. Scaling factor applied to low frequency components of the RoPE `high_freq_factor` (`float`, *optional*): Only used with 'llama3'. Scaling factor applied to high frequency components of the RoPE attention_bias (`bool`, defaults to `False`, *optional*, defaults to `False`): Whether to use a bias in the query, key, value and output projection layers during self-attention. attention_dropout (`float`, *optional*, defaults to 0.0): The dropout ratio for the attention probabilities. sliding_window (`int`, *optional*, defaults to 4096): Size of the sliding window attention context. sliding_window_pattern (`int`, *optional*, defaults to 4): Pattern for the sliding window attention. cache_implementation (`str`, *optional*, defaults to `"hybrid"`): the cache type to be used with `generate`. ```python >>> from transformers import Cohere2Model, Cohere2Config >>> # Initializing a Cohere Nextmodel configuration >>> configuration = Cohere2Config() >>> # Initializing a model from the Cohere2 configuration >>> model = Cohere2Model(configuration) # doctest: +SKIP >>> # Accessing the model configuration >>> configuration = model.config # doctest: +SKIP ``` """ model_type = "cohere2" keys_to_ignore_at_inference = ["past_key_values"] base_model_tp_plan = { "layers.*.self_attn.q_proj": "colwise", "layers.*.self_attn.k_proj": "colwise", "layers.*.self_attn.v_proj": "colwise", "layers.*.self_attn.o_proj": "rowwise", "layers.*.mlp.gate_proj": "colwise", "layers.*.mlp.up_proj": "colwise", "layers.*.mlp.down_proj": "rowwise", } base_model_pp_plan = { "embed_tokens": (["input_ids"], ["inputs_embeds"]), "layers": (["hidden_states", "attention_mask"], ["hidden_states"]), "norm": (["hidden_states"], ["hidden_states"]), } def __init__( self, vocab_size=256000, hidden_size=8192, intermediate_size=22528, logit_scale=0.0625, num_hidden_layers=40, num_attention_heads=64, num_key_value_heads=None, hidden_act="silu", max_position_embeddings=8192, initializer_range=0.02, layer_norm_eps=1e-5, use_cache=True, pad_token_id=0, bos_token_id=5, eos_token_id=255001, tie_word_embeddings=True, rope_theta=10000.0, rope_scaling=None, attention_bias=False, attention_dropout=0.0, sliding_window=4096, sliding_window_pattern=4, cache_implementation="hybrid", **kwargs, ): self.vocab_size = vocab_size self.max_position_embeddings = max_position_embeddings self.hidden_size = hidden_size self.logit_scale = logit_scale self.intermediate_size = intermediate_size self.num_hidden_layers = num_hidden_layers self.num_attention_heads = num_attention_heads # for backward compatibility if num_key_value_heads is None: num_key_value_heads = num_attention_heads self.num_key_value_heads = num_key_value_heads self.hidden_act = hidden_act self.initializer_range = initializer_range self.layer_norm_eps = layer_norm_eps self.use_cache = use_cache self.rope_theta = rope_theta self.rope_scaling = rope_scaling self.attention_bias = attention_bias self.attention_dropout = attention_dropout self.sliding_window = sliding_window self.sliding_window_pattern = sliding_window_pattern # Need to specify head_dim in the config so it can be used in the attention forward functions self.head_dim = hidden_size // num_attention_heads self.cache_implementation = cache_implementation # Validate the correctness of rotary position embeddings parameters rope_config_validation(self) super().__init__( pad_token_id=pad_token_id, bos_token_id=bos_token_id, eos_token_id=eos_token_id, tie_word_embeddings=tie_word_embeddings, **kwargs, ) class Cohere2RotaryEmbedding(CohereRotaryEmbedding): pass class Cohere2LayerNorm(CohereLayerNorm): pass class Cohere2Attention(CohereAttention, nn.Module): """Multi-headed attention from 'Attention Is All You Need' paper""" def __init__(self, config: Cohere2Config, layer_idx: Optional[int] = None): nn.Module.__init__() self.config = config self.layer_idx = layer_idx self.head_dim = getattr(config, "head_dim", config.hidden_size // config.num_attention_heads) self.num_key_value_groups = config.num_attention_heads // config.num_key_value_heads self.scaling = self.head_dim**-0.5 self.attention_dropout = config.attention_dropout self.is_causal = True self.q_proj = nn.Linear( config.hidden_size, config.num_attention_heads * self.head_dim, bias=config.attention_bias ) self.k_proj = nn.Linear( config.hidden_size, config.num_key_value_heads * self.head_dim, bias=config.attention_bias ) self.v_proj = nn.Linear( config.hidden_size, config.num_key_value_heads * self.head_dim, bias=config.attention_bias ) self.o_proj = nn.Linear( config.num_attention_heads * self.head_dim, config.hidden_size, bias=config.attention_bias ) self.sliding_window = ( config.sliding_window if (self.layer_idx + 1) % self.config.sliding_window_pattern != 0 else None ) def forward( self, hidden_states: torch.Tensor, position_embeddings: Tuple[torch.Tensor, torch.Tensor], attention_mask: Optional[torch.Tensor], past_key_value: Optional[Cache] = None, cache_position: Optional[torch.LongTensor] = None, **kwargs: Unpack[FlashAttentionKwargs], ) -> Tuple[torch.Tensor, Optional[torch.Tensor], Optional[Tuple[torch.Tensor]]]: input_shape = hidden_states.shape[:-1] hidden_shape = (*input_shape, -1, self.head_dim) query_states = self.q_proj(hidden_states).view(hidden_shape).transpose(1, 2) key_states = self.k_proj(hidden_states).view(hidden_shape).transpose(1, 2) value_states = self.v_proj(hidden_states).view(hidden_shape).transpose(1, 2) cos, sin = position_embeddings if self.sliding_window is not None: query_states, key_states = apply_rotary_pos_emb(query_states, key_states, cos, sin) if past_key_value is not None: cache_kwargs = { "sin": sin, "cos": cos, "sliding_window": self.sliding_window, "cache_position": cache_position, } key_states, value_states = past_key_value.update(key_states, value_states, self.layer_idx, cache_kwargs) # Here we need to slice as we use a static cache by default, but FA2 does not support it if attention_mask is not None and self.config._attn_implementation == "flash_attention_2": seq_len = attention_mask.shape[-1] key_states, value_states = key_states[:, :, :seq_len, :], value_states[:, :, :seq_len, :] attention_interface: Callable = eager_attention_forward if self.config._attn_implementation != "eager": if self.config._attn_implementation == "sdpa" and kwargs.get("output_attentions", False): logger.warning_once( "`torch.nn.functional.scaled_dot_product_attention` does not support `output_attentions=True`. Falling back to " 'eager attention. This warning can be removed using the argument `attn_implementation="eager"` when loading the model.' ) else: attention_interface = ALL_ATTENTION_FUNCTIONS[self.config._attn_implementation] attn_output, attn_weights = attention_interface( self, query_states, key_states, value_states, attention_mask, dropout=0.0 if not self.training else self.attention_dropout, scaling=self.scaling, sliding_window=self.sliding_window, **kwargs, ) attn_output = attn_output.reshape(*input_shape, -1).contiguous() attn_output = self.o_proj(attn_output) return attn_output, attn_weights class Cohere2DecoderLayer(CohereDecoderLayer): def __init__(self, config: Cohere2Config, layer_idx: int): super().__init__(config, layer_idx) self.self_attn = Cohere2Attention(config, layer_idx) self.config = config self.is_sliding = (layer_idx + 1) % self.config.sliding_window_pattern != 0 self.sliding_window = config.sliding_window def forward( self, hidden_states: torch.Tensor, position_embeddings: Tuple[torch.Tensor, torch.Tensor], attention_mask: Optional[torch.Tensor] = None, past_key_value: Optional[Cache] = None, output_attentions: Optional[bool] = False, use_cache: Optional[bool] = False, cache_position: Optional[torch.LongTensor] = None, last_cache_position: int = 0, **kwargs: Unpack[FlashAttentionKwargs], ) -> Tuple[torch.FloatTensor, Optional[Tuple[torch.FloatTensor, torch.FloatTensor]]]: """ Args: hidden_states (`torch.FloatTensor`): input to the layer of shape `(batch, seq_len, embed_dim)` position_embeddings (`Tuple[torch.FloatTensor, torch.FloatTensor]`): Tuple containing the cosine and sine positional embeddings of shape `(batch_size, seq_len, head_dim)`, with `head_dim` being the embedding dimension of each attention head. attention_mask (`torch.FloatTensor`, *optional*): attention mask of size `(batch_size, sequence_length)` if flash attention is used or `(batch_size, 1, query_sequence_length, key_sequence_length)` if default attention is used. past_key_value (`Tuple(torch.FloatTensor)`, *optional*): cached past key and value projection states output_attentions (`bool`, *optional*): Whether or not to return the attentions tensors of all attention layers. See `attentions` under returned tensors for more detail. use_cache (`bool`, *optional*): If set to `True`, `past_key_values` key value states are returned and can be used to speed up decoding (see `past_key_values`). cache_position (`torch.LongTensor` of shape `(sequence_length)`, *optional*): Indices depicting the position of the input sequence tokens in the sequence last_cache_position (`int`): equivalent to `cache_position[-1]` but allow indexing without breaking dynamo tracing """ if self.is_sliding and attention_mask is not None: # efficient SDPA and no padding # In prefill, we may be larger than sliding window effective_seq_len = max(cache_position.shape[0], self.sliding_window) # For FA2, the mask is 2D and is of shape [bs, processed_tokens] (not [bs, max_cache_len]), # thus we must slice from the right (at most `effective_seq_len` elements) if self.config._attn_implementation == "flash_attention_2": attention_mask = attention_mask[:, -effective_seq_len:] # Otherwise, the mask is 4D of shape [bs, 1, query_len, max_cache_len] thus we must slice # from the left, with an offset if we are beyond the sliding window else: min_dtype = torch.finfo(hidden_states.dtype).min sliding_window_mask = torch.tril( torch.ones_like(attention_mask, dtype=torch.bool), diagonal=-self.sliding_window ) attention_mask = torch.where(sliding_window_mask, min_dtype, attention_mask) # In case we are beyond the sliding window, we need to correctly offset the mask slicing # `last_cache_position` is equivalent to `cache_position[-1]` but without breaking dynamo offset = last_cache_position - effective_seq_len # Should only be used when beyond the sliding window (i.e. offset > 0) offset = max(0, offset) attention_mask = attention_mask[:, :, :, offset : offset + effective_seq_len] residual = hidden_states hidden_states = self.input_layernorm(hidden_states) # Self Attention hidden_states_attention, self_attn_weights = self.self_attn( hidden_states=hidden_states, position_embeddings=position_embeddings, attention_mask=attention_mask, past_key_value=past_key_value, output_attentions=output_attentions, use_cache=use_cache, cache_position=cache_position, **kwargs, ) # Fully Connected hidden_states_mlp = self.mlp(hidden_states) # Add everything together hidden_states = residual + hidden_states_attention + hidden_states_mlp outputs = (hidden_states,) if output_attentions: outputs += (self_attn_weights,) return outputs class Cohere2PreTrainedModel(CoherePreTrainedModel): config_class = Cohere2Config class Cohere2Model(Gemma2Model): """ Transformer decoder consisting of *config.num_hidden_layers* layers. Each layer is a [`Cohere2DecoderLayer`] Args: config: Cohere2Config """ def __init__(self, config: Cohere2Config): super().__init__(config) self.norm = Cohere2LayerNorm(hidden_size=(config.hidden_size), eps=config.layer_norm_eps) self.rotary_emb = Cohere2RotaryEmbedding(config=config) def forward( self, input_ids: Optional[torch.LongTensor] = None, attention_mask: Optional[torch.Tensor] = None, position_ids: Optional[torch.LongTensor] = None, past_key_values: Optional[HybridCache] = None, inputs_embeds: Optional[torch.FloatTensor] = None, use_cache: Optional[bool] = None, output_attentions: Optional[bool] = None, output_hidden_states: Optional[bool] = None, cache_position: Optional[torch.LongTensor] = None, last_cache_position: Optional[int] = None, **flash_attn_kwargs: Unpack[FlashAttentionKwargs], ) -> BaseModelOutputWithPast: output_attentions = output_attentions if output_attentions is not None else self.config.output_attentions output_hidden_states = ( output_hidden_states if output_hidden_states is not None else self.config.output_hidden_states ) use_cache = use_cache if use_cache is not None else self.config.use_cache if (input_ids is None) ^ (inputs_embeds is not None): raise ValueError("You must specify exactly one of input_ids or inputs_embeds") if self.gradient_checkpointing and self.training and use_cache: logger.warning_once( "`use_cache=True` is incompatible with gradient checkpointing. Setting `use_cache=False`." ) use_cache = False if inputs_embeds is None: inputs_embeds = self.embed_tokens(input_ids) if use_cache and past_key_values is None and not self.training: batch_size, seq_len, _ = inputs_embeds.shape # NOTE: ideally, `HybridCache` should be initialized outside the model with `layer_device_map` past_key_values = HybridCache( self.config, max_batch_size=batch_size, max_cache_len=seq_len, dtype=inputs_embeds.dtype, device=self.device, ) if cache_position is None: past_seen_tokens = past_key_values.get_seq_length() if past_key_values is not None else 0 cache_position = torch.arange( past_seen_tokens, past_seen_tokens + inputs_embeds.shape[1], device=inputs_embeds.device ) if position_ids is None: position_ids = cache_position.unsqueeze(0) # This is needed to correctly slice the mask without data-dependent slicing later on if using dynamo tracing # (retrieving the same value from `cache_position` later on would crash dynamo) if last_cache_position is None: last_cache_position = 0 if attention_mask is not None: # In case a 4d mask is passed directly without using `generate`, we have to rely on cache_position # It will break dynamo tracing but there are no way around it (and it should never happen in practice) last_cache_position = ( attention_mask.shape[-1] if attention_mask.dim() == 2 else cache_position[-1].item() ) causal_mask = self._update_causal_mask( attention_mask, inputs_embeds, cache_position, past_key_values, output_attentions ) hidden_states = inputs_embeds # create position embeddings to be shared across the decoder layers position_embeddings = self.rotary_emb(hidden_states, position_ids) # decoder layers all_hidden_states = () if output_hidden_states else None all_self_attns = () if output_attentions else None for decoder_layer in self.layers: if output_hidden_states: all_hidden_states += (hidden_states,) if self.gradient_checkpointing and self.training: layer_outputs = self._gradient_checkpointing_func( partial(decoder_layer.__call__, **flash_attn_kwargs), hidden_states, position_embeddings, causal_mask, past_key_values, output_attentions, use_cache, cache_position, last_cache_position, ) else: layer_outputs = decoder_layer( hidden_states, position_embeddings=position_embeddings, attention_mask=causal_mask, past_key_value=past_key_values, output_attentions=output_attentions, use_cache=use_cache, cache_position=cache_position, last_cache_position=last_cache_position, **flash_attn_kwargs, ) hidden_states = layer_outputs[0] if output_attentions: all_self_attns += (layer_outputs[1],) hidden_states = self.norm(hidden_states) # add hidden states from the last decoder layer if output_hidden_states: all_hidden_states += (hidden_states,) return BaseModelOutputWithPast( last_hidden_state=hidden_states, past_key_values=past_key_values, hidden_states=all_hidden_states, attentions=all_self_attns, ) class Cohere2ForCausalLM(CohereForCausalLM): def __init__(self, config: Cohere2Config): super().__init__(config) def prepare_inputs_for_generation( self, input_ids, past_key_values=None, attention_mask=None, inputs_embeds=None, cache_position=None, position_ids=None, use_cache=True, logits_to_keep=None, **kwargs, ): # Overwritten: has a special cache type, `HybridCache` # If we have cache: let's slice `input_ids` through `cache_position`, to keep only the unprocessed tokens # Exception 1: when passing input_embeds, input_ids may be missing entries # Exception 2: some generation methods do special slicing of input_ids, so we don't need to do it here # Exception 3: with synced GPUs cache_position may go out of bounds, but we only want dummy token in that case. # (we can't check exception 3 while compiling) if past_key_values is not None: if ( inputs_embeds is not None # Exception 1 or cache_position[-1] >= input_ids.shape[1] # Exception 3 ): input_ids = input_ids[:, -cache_position.shape[0] :] elif input_ids.shape[1] != cache_position.shape[0]: # Default case (the "else", a no op, is Exception 2) input_ids = input_ids[:, cache_position] if attention_mask is not None and position_ids is None: # create position_ids on the fly for batch generation position_ids = attention_mask.long().cumsum(-1) - 1 position_ids.masked_fill_(attention_mask == 0, 1) if past_key_values: position_ids = position_ids[:, -input_ids.shape[1] :] # This `clone` call is needed to avoid recapturing cuda graphs with `torch.compile`'s # `mode="reduce-overhead`, as otherwise the input `position_ids` would have various stride # during the decoding. Here, simply using `.contiguous()` is not sufficient as in the # batch size = 1 case, `position_ids` is already contiguous but with varying stride # which retriggers a capture. position_ids = position_ids.clone(memory_format=torch.contiguous_format) # if `inputs_embeds` are passed, we only want to use them in the 1st generation step if inputs_embeds is not None and cache_position[0] == 0: model_inputs = {"inputs_embeds": inputs_embeds, "input_ids": None} else: # The clone here is for the same reason as for `position_ids`. model_inputs = {"input_ids": input_ids.clone(memory_format=torch.contiguous_format), "inputs_embeds": None} # This is needed to correctly slice the mask without data-dependent slicing later on if using dynamo tracing # (retrieving the same value from `cache_position` later on would crash dynamo) model_inputs["last_cache_position"] = attention_mask.shape[-1] if attention_mask is not None else 0 if ( isinstance(past_key_values, HybridCache) and attention_mask.ndim == 2 and not self.config._attn_implementation == "flash_attention_2" ): if model_inputs["inputs_embeds"] is not None: batch_size, sequence_length, _ = model_inputs["inputs_embeds"].shape device = model_inputs["inputs_embeds"].device else: batch_size, sequence_length = model_inputs["input_ids"].shape device = model_inputs["input_ids"].device attention_mask = self.model._prepare_4d_causal_attention_mask_with_cache_position( attention_mask, sequence_length=sequence_length, target_length=past_key_values.get_max_cache_shape(), dtype=self.lm_head.weight.dtype, device=device, cache_position=cache_position, batch_size=batch_size, ) if logits_to_keep is not None: model_inputs["logits_to_keep"] = logits_to_keep model_inputs.update( { "position_ids": position_ids, "cache_position": cache_position, "past_key_values": past_key_values, "use_cache": use_cache, "attention_mask": attention_mask, } ) return model_inputs __all__ = ["Cohere2Config", "Cohere2ForCausalLM", "Cohere2Model", "Cohere2PreTrainedModel"]
Memory