通过动态规划算法,计算打分矩阵,使得两条序列自动对齐,比对分为两种,一种是全局比对,一种是局部比对。
比对过程种可以出现4种情况:
- 碱基 对 空位(gap)
- 空位(gap) 对 碱基
- A碱基 对 A碱基
- A碱基 对 B碱基
全局比对
全局比对又叫Needleman–Wunsch算法,具体算法过程如下:
可以分为三步:
第一步: 初始化打分矩阵和回溯矩阵。打分矩阵和回溯矩阵都是一个二维矩阵,在代码实现上可以采用一维数组实现,也可以采用二维数组实现。打分矩阵初始化过程会将第一列和第一行进行赋值,具体赋值方法为个数乘以gap罚分。
第二步: 计算打分矩阵和回溯矩阵。打分矩矩阵,每个单元格的最终得分是来自于三个方向中的最大值,三个方向分别为上图中的H(对角线)、F(水平)和E(竖直)方向。每计算一个单元格,同时在回溯矩阵中记录一下得分的来源。打分矩阵从左至右,从上到下依次计算,直至计算完整个打分矩阵。在具体实现上可以采用二层的嵌套循环进行实现。
第三步:根据回溯矩阵进行回溯。从回溯矩阵右下角回溯到左上角即为回溯路径。
局部比对
局部比对算法又叫Smith-Waterman算法,具体算法过程如下:
局部比对与全局比对相比,步骤也可以分为三步
第一步:初始化打分矩阵和回溯矩阵。这一步与全局比对相同
第二步:计算打分矩阵与回溯矩阵。与全局比对不同的是,局部比对在计算打分矩阵中的一个单元格时,除了要比较来自HEF方向的值,还要与0进行比较,单元格最终值取max(H, E, F, 0)。通过与0进行比较,可以使得最终的打分矩阵中所有的值均大于等于0,也就是说,只要是比对上就一定是正值,根据这一点,就可以找到一条局部最优的比对路径。
第三步:回溯,与全局比对不同的时,此时不再从右下角回溯到左上角,而是从最大值处回溯到0。这里可以理解为,从最大值处开始回溯,即为局部最优处回溯,而回溯到0,即为当前局部最优值的起始点,一头一尾从而构成了局部最优路径。这里需要注意的是,有可能打分矩阵中存在多个相同的最大值,意味着可能存在多条回溯路径。
下面附有一份Python实现的全局比对和局部比对代码,详细代码可以查看https://github.com/dwpeng/pairwise-alignment
from abc import ABC, abstractmethod from enum import Enum, IntEnum from typing import Mapping import numpy as np class Path(IntEnum): F = 1 H = 2 E = 3 class AlignmentBase(ABC): def __init__( self, target: str, query: str, M: int, X: int, E: int ): self.target = target self.query = query self.M = M self.X = X self.E = E self.target_len = len(target) self.query_len = len(query) # 打分矩阵 self.score_matrix = np.zeros( (self.target_len + 1, self.query_len + 1), dtype=np.int32 ) # 初始化打分矩阵 for i in range(self.target_len+1): self.score_matrix[i, 0] = i * E for i in range(self.query_len+1): self.score_matrix[0, i] = i * E # 回溯路径 self.traceback = np.zeros_like( self.score_matrix, dtype=np.int8 ) # 比对路径 self.path = [] # 最高分数 self.max_score = -10000000 # 最高分数出现的位置 self.max_t, self.max_q = self.target_len, self.query_len def align(self, min_s=-1000000): """ 比对 """ for tindex, t in enumerate(self.target, start=1): for qindex, q in enumerate(self.query, start=1): s = self.M if t == q else self.X f = self.score_matrix[tindex, qindex-1] + self.E e = self.score_matrix[tindex-1, qindex] + self.E h = s + self.score_matrix[tindex-1, qindex-1] s = max(h, f, e, min_s) # 局部比对需要从最高值开始回溯 if s >= self.max_score: self.max_score = s self.max_t, self.max_q = tindex, qindex self.score_matrix[tindex, qindex] = s if h >= f and h >= e: b = Path.H elif f >= e: b = Path.F else: b = Path.E self.traceback[tindex, qindex] = b return self @abstractmethod def do_traceback(self): raise NotImplementedError def _update( self, path_flag, tlen, qlen, ): """ 更新回溯路径 """ update_path_map = { Path.H: (-1, -1), Path.F: (0, -1), Path.E: (-1, 0) } if path_flag == Path.H: self.path.append(( (tlen-1, self.target[tlen-1]), (qlen-1, self.query[qlen-1]) )) elif path_flag == Path.E: self.path.append(( (tlen-1, self.target[tlen-1]), (None, None) )) elif path_flag == Path.F: self.path.append(( (None, None), (qlen-1, self.query[qlen-1]), )) return update_path_map[path_flag] def print(self): print_path = [] for (_, t), (_, q) in self.path: if t is None or q is None: format = '-' elif t == q: format = '|' else: format = '*' if t == None: t = ' ' if q == None: q = ' ' print_path.append((t, q, format)) print(''.join([i[0] for i in print_path])) print(''.join([i[2] for i in print_path])) print(''.join([i[1] for i in print_path])) class AlignmentGlobal(AlignmentBase): def do_traceback(self): """ 回溯路径 """ tlen, qlen = self.target_len, self.query_len while tlen > 0 and qlen > 0: p = self.traceback[tlen, qlen] update_target, update_query = self._update(p, tlen, qlen) tlen, qlen = tlen + update_target, qlen + update_query if tlen > 0 or qlen > 0: if tlen > 0: for t in range(tlen, 0, -1): self._update(Path.E, t, 1) else: for q in range(qlen, 0, -1): self._update(Path.F, 1, q) self.path = self.path[::-1] return self class AlignmentLocal(AlignmentBase): def align(self): return super().align(0) def do_traceback(self): start = self.score_matrix[self.max_t, self.max_q] # 局部比对到0结束 while start != 0: p = self.traceback[self.max_t, self.max_q] update_t, update_q = self._update(p, self.max_t, self.max_q) self.max_t, self.max_q = self.max_t + update_t, self.max_q + update_q start = self.score_matrix[self.max_t, self.max_q] if self.max_t > 0 or self.max_q > 0: self._update(Path.H, self.max_t, self.max_q) self.path = self.path[::-1] return self class AlignMode(Enum): GLOBAL = 'global' LOCAL = 'local' def align(tseq, qseq, M, X, E, mode=AlignMode.GLOBAL) -> AlignmentBase: align_cls: Mapping[AlignMode, AlignmentBase] = { AlignMode.GLOBAL: AlignmentGlobal, AlignMode.LOCAL: AlignmentLocal } ali = align_cls[mode] return ( ali(tseq, qseq, M, X, E) .align() .do_traceback() ) if __name__ == '__main__': tseq = 'GAGCT' qseq = 'GAGCTXXXX' M, X, E = 2, -4, -2 # 全局比对 a = align(tseq, qseq, M, X, E, mode=AlignMode.GLOBAL) a.print() # 局部比对 a = align(tseq, qseq, M, X, E, mode=AlignMode.LOCAL) a.print()