RotaryEmbedding
# 旋转位置嵌入,应用于每一层 Q 和 K class RotaryEmbedding(nn.Module): def __init__(self, dim, rope_ratio=1, original_impl=False, device=None, dtype=None): super().__init__() # 除法项定义 inv_freq = 1.0 / (10000 ** (torch.arange(0, dim, 2, device=device).to(dtype=dtype) / dim)) self.register_buffer("inv_freq", inv_freq) # d,嵌入维度 self.dim = dim # (未知) self.original_impl = original_impl # 旋转比例 self.rope_ratio = rope_ratio def forward_impl( self, seq_len: int, n_elem: int, dtype: torch.dtype, device: torch.device, base: int = 10000 ): """Enhanced Transformer with Rotary Position Embedding. Derived from: https://github.com/labmlai/annotated_deep_learning_paper_implementations/blob/master/labml_nn/ transformers/rope/__init__.py. MIT License: https://github.com/labmlai/annotated_deep_learning_paper_implementations/blob/master/license. """ # $\Theta = {\theta_i = 10000^{\frac{2(i-1)}{d}}, i \in [1, 2, ..., \frac{d}{2}]}$ base = base * self.rope_ratio # 嵌入空间中每个二维子空间的旋转角度 theta = 1.0 / (base ** (torch.arange(0, n_elem, 2, dtype=torch.float, device=device) / n_elem)) # 序列 ID,0 ~ SeqLen - 1 的一维数组 seq_idx = torch.arange(seq_len, dtype=torch.float, device=device) # 二者的每个元素相乘,得到序列角度 # 尺寸为 [SeqLen, HeadSize // 2] # idx_theta[i, d] == i * theta[d] idx_theta = torch.outer(seq_idx, theta).float() # 计算序列角度的余弦和正弦,并按最后一维堆叠 # 尺寸为 [SeqLen, HeadSize // 2, 2] # cache[i, d] 是第 i 个向量第 d 个子空间的余弦和正弦值 cache = torch.stack([torch.cos(idx_theta), torch.sin(idx_theta)], dim=-1) # this is to mimic the behaviour of complex32, else we will get different results if dtype in (torch.float16, torch.bfloat16, torch.int8): cache = cache.bfloat16() if dtype == torch.bfloat16 else cache.half() return cache def forward(self, max_seq_len, offset=0): return self.forward_impl( max_seq_len, self.dim, dtype=self.inv_freq.dtype, device=self.inv_freq.device ) @torch.jit.script def apply_rotary_pos_emb(x: torch.Tensor, rope_cache: torch.Tensor) -> torch.Tensor: # 输入:[SeqLen, BatchSize, NHead, HeadSize] # rope:[MaxSeqLen, HeadSize // 2, 2] sq, b, np, hn = x.size(0), x.size(1), x.size(2), x.size(3) # HeadSize rot_dim = rope_cache.shape[-2] * 2 # 如果 X 嵌入维度超过了 HeadSize,将其分为两部分,只处理 HeadSize 之内的部分 x, x_pass = x[..., :rot_dim], x[..., rot_dim:] # rope 截断到 SeqLen 长度 rope_cache = rope_cache[:sq] # 拆分 X 的最后一维,使元素两个一组,[SeqLen, BatchSize, NHead, HeadSize // 2, 2] xshaped = x.reshape(sq, -1, np, rot_dim // 2, 2) # 再 rope 第二维插两个 1,[SeqLen, 1, 1, HeadSize // 2, 2] rope_cache = rope_cache.view(sq, -1, 1, xshaped.size(3), 2) # 执行旋转编码 # xshaped[..., 0]:二维子空间 x0 # xshaped[..., 1]:二位子空间 y0 # xshaped[..., 0]:cosθ # rope_cache[..., 1]:sinθ # x = cosθ * x0 - sinθ * y0 # y = sinθ * x0 + cosθ * y0 x_out2 = torch.stack( [ xshaped[..., 0] * rope_cache[..., 0] - xshaped[..., 1] * rope_cache[..., 1], xshaped[..., 1] * rope_cache[..., 0] + xshaped[..., 0] * rope_cache[..., 1], ], -1, ) # 变形为 [SeqLen, BatchSize, NHead, HeadSize] x_out2 = x_out2.flatten(3) # 将 HeadSize 之外的部分合并回来 return torch.cat((x_out2, x_pass), dim=-1)
Embedding
class Embedding(torch.nn.Module): """Language model embeddings.""" def __init__(self, config: ChatGLMConfig, device=None): super(Embedding, self).__init__() # HidSize:隐藏状态每个向量的维度 self.hidden_size = config.hidden_size # 嵌入层,用于将单词ID转成向量,尺寸 [VocabSize, HidSize] self.word_embeddings = nn.Embedding( config.padded_vocab_size, self.hidden_size, dtype=config.torch_dtype, device=device ) # 控制残差连接是否是 FP32 self.fp32_residual_connection = config.fp32_residual_connection def forward(self, input_ids): # 输入是单词 ID,[BatchSize, SeqLen] # 将单词 ID 传入嵌入层,得到单词向量,作为初始隐藏状态 # [BatchSize, SeqLen, HidSize] words_embeddings = self.word_embeddings(input_ids) embeddings = words_embeddings # 交换初始隐藏状态前两维,[SeqLen, BatchSize, HidSize] embeddings = embeddings.transpose(0, 1).contiguous() # 如果设置了 FP32,将其转换为 FP32 if self.fp32_residual_connection: embeddings = embeddings.float() return embeddings
ChatGLMForConditionalGeneration
class ChatGLMForConditionalGeneration(ChatGLMPreTrainedModel): def __init__(self, config: ChatGLMConfig, empty_init=True, device=None): super().__init__(config) # MaxSeqLen self.max_sequence_length = config.max_length # 前面的 TFM self.transformer = ChatGLMModel(config, empty_init=empty_init, device=device) self.config = config self.quantized = False # 如果指定了量化位数则执行量化 if self.config.quantization_bit: self.quantize(self.config.quantization_bit, empty_init=True) def forward( self, input_ids: Optional[torch.Tensor] = None, position_ids: Optional[torch.Tensor] = None, attention_mask: Optional[torch.Tensor] = None, past_key_values: Optional[Tuple[torch.FloatTensor]] = None, inputs_embeds: Optional[torch.Tensor] = None, labels: Optional[torch.Tensor] = None, use_cache: Optional[bool] = None, output_attentions: Optional[bool] = None, output_hidden_states: Optional[bool] = None, return_dict: Optional[bool] = None, return_last_logit: Optional[bool] = False, ): # 初始化`use_cache`,指定是否返回 KVCache use_cache = use_cache if use_cache is not None else self.config.use_cache # 初始化`return_dict`,指定返回字典还是元组 return_dict = return_dict if return_dict is not None else self.config.use_return_dict # 单词 ID:[BatchSize, SeqLen] # 将单词 ID 等东西传入 TFM transformer_outputs = self.transformer( input_ids=input_ids, position_ids=position_ids, attention_mask=attention_mask, past_key_values=past_key_values, inputs_embeds=inputs_embeds, use_cache=use_cache, output_hidden_states=output_hidden_states, return_dict=return_dict, ) # 得到最终隐藏状态,[SeqLen, BatchSize, HidSize] hidden_states = transformer_outputs[0] # 如果只返回最后一个 logit, 只取隐藏状态的最后一个 if return_last_logit: hidden_states = hidden_states[-1:] # 将隐藏状态传入输出层得到 logits,[SeqLen, BatchSize, VocabSize] lm_logits = self.transformer.output_layer(hidden_states) # 交换前两维,[BatchSize, SeqLen, HidSize] lm_logits = lm_logits.transpose(0, 1).contiguous() loss = None # 如果指定了标签,计算损失 if labels is not None: lm_logits = lm_logits.to(torch.float32) # 截断 logits 的最后一个元素和标签的第一个元素 # 因为需要让单词 #1 的 logits 拟合标签 #2 # logits: A B C D E (F) # 标签: (A) B C D E F shift_logits = lm_logits[..., :-1, :].contiguous() shift_labels = labels[..., 1:].contiguous() # 计算交叉熵,并忽略标签 -100 loss_fct = CrossEntropyLoss(ignore_index=-100) # logits 变形为 [BatchSize * (DeqLen - 1), VocabSize] # 标签变形为 [BatchSize * (DeqLen - 1)] loss = loss_fct(shift_logits.view(-1, shift_logits.size(-1)), shift_labels.view(-1)) ''' 这段逻辑类似于: mask = shift_labels != -100 shift_labels = shift_labels[mask] shift_logits = shift_logits[mask] shift_onehot = torch.nn.functional.one_hot(shift_labels, shift_logits.size(-1)) shift_probs = torch.softmax(shift_logits, -1) loss = - (shift_onehot * torch.log(shift_probs)).sum(-1).mean() ''' lm_logits = lm_logits.to(hidden_states.dtype) loss = loss.to(hidden_states.dtype) # 如果指定不返回字典,将损失,logits 和其他东西打包成元组返回 if not return_dict: output = (lm_logits,) + transformer_outputs[1:] return ((loss,) + output) if loss is not None else output # 否则返回字典 return CausalLMOutputWithPast( loss=loss, logits=lm_logits, past_key_values=transformer_outputs.past_key_values, hidden_states=transformer_outputs.hidden_states, attentions=transformer_outputs.attentions, )