双序列比对

简介: 双序列比对

通过动态规划算法,计算打分矩阵,使得两条序列自动对齐,比对分为两种,一种是全局比对,一种是局部比对。

1f81caf6e3c3b999d479e39a73763782_640_wx_fmt=png&wxfrom=5&wx_lazy=1&wx_co=1.png

比对过程种可以出现4种情况:

  • 碱基 对 空位(gap)
  • 空位(gap) 对 碱基
  • A碱基 对 A碱基
  • A碱基 对 B碱基

全局比对

全局比对又叫Needleman–Wunsch算法,具体算法过程如下:

3aed3f6cdd6ae17df22e6da43bf4f388_640_wx_fmt=png&wxfrom=5&wx_lazy=1&wx_co=1.png

可以分为三步:

第一步: 初始化打分矩阵和回溯矩阵。打分矩阵和回溯矩阵都是一个二维矩阵,在代码实现上可以采用一维数组实现,也可以采用二维数组实现。打分矩阵初始化过程会将第一列和第一行进行赋值,具体赋值方法为个数乘以gap罚分。

第二步: 计算打分矩阵和回溯矩阵。打分矩矩阵,每个单元格的最终得分是来自于三个方向中的最大值,三个方向分别为上图中的H(对角线)、F(水平)和E(竖直)方向。每计算一个单元格,同时在回溯矩阵中记录一下得分的来源。打分矩阵从左至右,从上到下依次计算,直至计算完整个打分矩阵。在具体实现上可以采用二层的嵌套循环进行实现。

第三步:根据回溯矩阵进行回溯。从回溯矩阵右下角回溯到左上角即为回溯路径。

局部比对

局部比对算法又叫Smith-Waterman算法,具体算法过程如下:

247926ee7a865e6b35d2e8aaf96e17eb_640_wx_fmt=png&wxfrom=5&wx_lazy=1&wx_co=1.png

局部比对与全局比对相比,步骤也可以分为三步

第一步:初始化打分矩阵和回溯矩阵。这一步与全局比对相同

第二步:计算打分矩阵与回溯矩阵。与全局比对不同的是,局部比对在计算打分矩阵中的一个单元格时,除了要比较来自HEF方向的值,还要与0进行比较,单元格最终值取max(H, E, F, 0)。通过与0进行比较,可以使得最终的打分矩阵中所有的值均大于等于0,也就是说,只要是比对上就一定是正值,根据这一点,就可以找到一条局部最优的比对路径。

第三步:回溯,与全局比对不同的时,此时不再从右下角回溯到左上角,而是从最大值处回溯到0。这里可以理解为,从最大值处开始回溯,即为局部最优处回溯,而回溯到0,即为当前局部最优值的起始点,一头一尾从而构成了局部最优路径。这里需要注意的是,有可能打分矩阵中存在多个相同的最大值,意味着可能存在多条回溯路径。

下面附有一份Python实现的全局比对和局部比对代码,详细代码可以查看https://github.com/dwpeng/pairwise-alignment

from abc import ABC, abstractmethod
from enum import Enum, IntEnum
from typing import Mapping
import numpy as np
class Path(IntEnum):
    F = 1
    H = 2
    E = 3
class AlignmentBase(ABC):
    def __init__(
        self,
        target: str,
        query: str,
        M: int,
        X: int,
        E: int
    ):
        self.target = target
        self.query = query
        self.M = M
        self.X = X
        self.E = E
        self.target_len = len(target)
        self.query_len = len(query)
        # 打分矩阵
        self.score_matrix = np.zeros(
            (self.target_len + 1, self.query_len + 1),
            dtype=np.int32
        )
        # 初始化打分矩阵
        for i in range(self.target_len+1):
            self.score_matrix[i, 0] = i * E
        for i in range(self.query_len+1):
            self.score_matrix[0, i] = i * E
        # 回溯路径
        self.traceback = np.zeros_like(
            self.score_matrix,
            dtype=np.int8
        )
        # 比对路径
        self.path = []
        # 最高分数
        self.max_score = -10000000
        # 最高分数出现的位置
        self.max_t, self.max_q = self.target_len, self.query_len
    def align(self, min_s=-1000000):
        """ 比对 """
        for tindex, t in enumerate(self.target, start=1):
            for qindex, q in enumerate(self.query, start=1):
                s = self.M if t == q else self.X
                f = self.score_matrix[tindex, qindex-1] + self.E
                e = self.score_matrix[tindex-1, qindex] + self.E
                h = s + self.score_matrix[tindex-1, qindex-1]
                s = max(h, f, e, min_s)
                # 局部比对需要从最高值开始回溯
                if s >= self.max_score:
                    self.max_score = s
                    self.max_t, self.max_q = tindex, qindex
                self.score_matrix[tindex, qindex] = s
                if h >= f and h >= e:
                    b = Path.H
                elif f >= e:
                    b = Path.F
                else:
                    b = Path.E
                self.traceback[tindex, qindex] = b
        return self
    @abstractmethod
    def do_traceback(self):
        raise NotImplementedError
    def _update(
        self,
        path_flag,
        tlen,
        qlen,
    ):
        """ 更新回溯路径 """
        update_path_map = {
            Path.H: (-1, -1),
            Path.F: (0, -1),
            Path.E: (-1, 0)
        }
        if path_flag == Path.H:
            self.path.append((
                (tlen-1, self.target[tlen-1]),
                (qlen-1, self.query[qlen-1])
            ))
        elif path_flag == Path.E:
            self.path.append((
                (tlen-1, self.target[tlen-1]),
                (None, None)
            ))
        elif path_flag == Path.F:
            self.path.append((
                (None, None),
                (qlen-1, self.query[qlen-1]),
            ))
        return update_path_map[path_flag]
    def print(self):
        print_path = []
        for (_, t), (_, q) in self.path:
            if t is None or q is None:
                format = '-'
            elif t == q:
                format = '|'
            else:
                format = '*'
            if t == None:
                t = ' '
            if q == None:
                q = ' '
            print_path.append((t, q, format))
        print(''.join([i[0] for i in print_path]))
        print(''.join([i[2] for i in print_path]))
        print(''.join([i[1] for i in print_path]))
class AlignmentGlobal(AlignmentBase):
    def do_traceback(self):
        """ 回溯路径 """
        tlen, qlen = self.target_len, self.query_len
        while tlen > 0 and qlen > 0:
            p = self.traceback[tlen, qlen]
            update_target, update_query = self._update(p, tlen, qlen)
            tlen, qlen = tlen + update_target, qlen + update_query
        if tlen > 0 or qlen > 0:
            if tlen > 0:
                for t in range(tlen, 0, -1):
                    self._update(Path.E, t, 1)
            else:
                for q in range(qlen, 0, -1):
                    self._update(Path.F, 1, q)
        self.path = self.path[::-1]
        return self
class AlignmentLocal(AlignmentBase):
    def align(self):
        return super().align(0)
    def do_traceback(self):
        start = self.score_matrix[self.max_t, self.max_q]
        # 局部比对到0结束
        while start != 0:
            p = self.traceback[self.max_t, self.max_q]
            update_t, update_q = self._update(p, self.max_t, self.max_q)
            self.max_t, self.max_q = self.max_t + update_t, self.max_q + update_q
            start = self.score_matrix[self.max_t, self.max_q]
        if self.max_t > 0 or self.max_q > 0:
            self._update(Path.H, self.max_t, self.max_q)
        self.path = self.path[::-1]
        return self
class AlignMode(Enum):
    GLOBAL = 'global'
    LOCAL = 'local'
def align(tseq, qseq, M, X, E, mode=AlignMode.GLOBAL) -> AlignmentBase:
    align_cls: Mapping[AlignMode, AlignmentBase] = {
        AlignMode.GLOBAL: AlignmentGlobal,
        AlignMode.LOCAL: AlignmentLocal
    }
    ali = align_cls[mode]
    return (
        ali(tseq, qseq, M, X, E)
        .align()
        .do_traceback()
    )
if __name__ == '__main__':
    tseq = 'GAGCT'
    qseq = 'GAGCTXXXX'
    M, X, E = 2, -4, -2
    # 全局比对
    a = align(tseq, qseq, M, X, E, mode=AlignMode.GLOBAL)
    a.print()
    # 局部比对
    a = align(tseq, qseq, M, X, E, mode=AlignMode.LOCAL)
    a.print()


相关文章
|
6月前
|
安全
基因序列比对的注意点
基因序列比对的注意点
|
6月前
|
算法 C++
C++哈希表企业级运用----DNA序列的检测
C++哈希表企业级运用----DNA序列的检测
|
6月前
leetcode-187:重复的DNA序列
leetcode-187:重复的DNA序列
50 0
|
机器人 Java 开发工具
生成指定长度的随机数字,用对方法精准提效数10倍!
生成指定长度的随机数字这一函数功能可能在以下情况下被使用:
|
存储 算法 索引
【每日挠头算法题(3)】字符串解码|数组中重复的数字
【每日挠头算法题(3)】字符串解码|数组中重复的数字
|
监控 算法
算法训练Day37|738.单调递增的数字 ● 968.监控二叉树
算法训练Day37|738.单调递增的数字 ● 968.监控二叉树
|
机器学习/深度学习 存储 算法
算法训练Day25|216.组合总和III● 17.电话号码的字母组合
算法训练Day25|216.组合总和III● 17.电话号码的字母组合
算法训练Day27|39. 组合总和● 40.组合总和II● 131.分割回文串
算法训练Day27|39. 组合总和● 40.组合总和II● 131.分割回文串
|
算法 固态存储
分别使用SAD匹配,NCC匹配,SSD匹配三种算法提取双目图像的深度信息
分别使用SAD匹配,NCC匹配,SSD匹配三种算法提取双目图像的深度信息
170 0
分别使用SAD匹配,NCC匹配,SSD匹配三种算法提取双目图像的深度信息
m 序列(最长线性反馈移位寄存器序列)详解
m 序列(最长线性反馈移位寄存器序列)详解
516 0