Source code for hgvs.alignmentmapper

# -*- coding: utf-8 -*-
"""Mapping positions between pairs of sequence alignments

The AlignmentMapper class is at the heart of mapping between aligned sequences.

"""

from __future__ import absolute_import, division, print_function, unicode_literals

import re
from six.moves import range
from bioutils.coordinates import strand_int_to_pm

import hgvs.location

from hgvs.exceptions import HGVSError, HGVSUsageError, HGVSDataNotAvailableError, HGVSInvalidIntervalError
from hgvs.utils import build_tx_cigar
from hgvs.enums import Datum

cigar_re = re.compile(r"(?P<len>\d+)(?P<op>[=DIMNX])")


[docs]class AlignmentMapper(object): """Provides coordinate (not variant) mapping operations between genomic (g), non-coding (n) and cds (c) coordinates according to a CIGAR. :param hdp: HGVS Data Provider Interface-compliant instance (see :class:`hgvs.dataproviders.interface.Interface`) :param str tx_ac: string representing transcript accession (e.g., NM_000551.2) :param str alt_ac: string representing the reference sequence accession (e.g., NC_000019.10) :param str alt_aln_method: string representing the alignment method; valid values depend on data source """ __slots__ = ("tx_ac", "alt_ac", "alt_aln_method", "strand", "gc_offset", "cds_start_i", "cds_end_i", "tgt_len", "cigar", "ref_pos", "tgt_pos", "cigar_op") def __init__(self, hdp, tx_ac, alt_ac, alt_aln_method): self.tx_ac = tx_ac self.alt_ac = alt_ac self.alt_aln_method = alt_aln_method if self.alt_aln_method != "transcript": tx_info = hdp.get_tx_info(self.tx_ac, self.alt_ac, self.alt_aln_method) if tx_info is None: raise HGVSDataNotAvailableError( "AlignmentMapper(tx_ac={self.tx_ac}, " "alt_ac={self.alt_ac}, alt_aln_method={self.alt_aln_method}): " "No transcript info".format(self=self)) tx_exons = hdp.get_tx_exons(self.tx_ac, self.alt_ac, self.alt_aln_method) if tx_exons is None: raise HGVSDataNotAvailableError( "AlignmentMapper(tx_ac={self.tx_ac}, " "alt_ac={self.alt_ac}, alt_aln_method={self.alt_aln_method}): " "No transcript exons".format(self=self)) # hgvs-386: An assumption when building the cigar string # is that exons are adjacent. Assert that here. sorted_tx_exons = sorted(tx_exons, key=lambda e: e["ord"]) for i in range(1, len(sorted_tx_exons)): if sorted_tx_exons[i - 1]["tx_end_i"] != sorted_tx_exons[i]["tx_start_i"]: raise HGVSDataNotAvailableError( "AlignmentMapper(tx_ac={self.tx_ac}, " "alt_ac={self.alt_ac}, alt_aln_method={self.alt_aln_method}): " "Exons {a} and {b} are not adjacent".format(self=self, a=i, b=i + 1)) self.strand = tx_exons[0]["alt_strand"] self.gc_offset = tx_exons[0]["alt_start_i"] self.cds_start_i = tx_info["cds_start_i"] self.cds_end_i = tx_info["cds_end_i"] self.cigar = build_tx_cigar(tx_exons, self.strand) self.ref_pos, self.tgt_pos, self.cigar_op = self._parse_cigar(self.cigar) self.tgt_len = self.tgt_pos[-1] else: # this covers the identity cases n <-> c tx_identity_info = hdp.get_tx_identity_info(self.tx_ac) if tx_identity_info is None: raise HGVSDataNotAvailableError( "AlignmentMapper(tx_ac={self.tx_ac}, " "alt_ac={self.alt_ac}, alt_aln_method={self.alt_aln_method}): " "No transcript identity info".format(self=self)) self.cds_start_i = tx_identity_info["cds_start_i"] self.cds_end_i = tx_identity_info["cds_end_i"] self.tgt_len = sum(tx_identity_info["lengths"]) assert not ( (self.cds_start_i is None) ^ (self.cds_end_i is None)), "CDS start and end must both be defined or neither defined" def __str__(self): return "{self.__class__.__name__}: {self.tx_ac} ~ {self.alt_ac} ~ {self.alt_aln_method}; " \ "{strand_pm} strand; offset={self.gc_offset}".format( self=self, strand_pm=strand_int_to_pm(self.strand)) def _parse_cigar(self, cigar): """For a given CIGAR string, return the start positions of each aligned segment in ref and tgt, and a list of CIGAR operators. """ ces = [m.groupdict() for m in cigar_re.finditer(cigar)] ref_pos = [None] * len(ces) tgt_pos = [None] * len(ces) cigar_op = [None] * len(ces) ref_cur = tgt_cur = 0 for i, ce in enumerate(ces): ref_pos[i] = ref_cur tgt_pos[i] = tgt_cur cigar_op[i] = ce["op"] step = int(ce["len"]) if ce["op"] in "=MINX": ref_cur += step if ce["op"] in "=MDX": tgt_cur += step ref_pos.append(ref_cur) tgt_pos.append(tgt_cur) return ref_pos, tgt_pos, cigar_op def _map(self, from_pos, to_pos, pos, base): """Map position between aligned sequences Positions in this function are 0-based. """ pos_i = -1 while pos_i < len(self.cigar_op) and pos >= from_pos[pos_i + 1]: pos_i += 1 if pos_i == -1 or pos_i == len(self.cigar_op): raise HGVSInvalidIntervalError("Position is beyond the bounds of transcript record") if self.cigar_op[pos_i] in "=MX": mapped_pos = to_pos[pos_i] + (pos - from_pos[pos_i]) mapped_pos_offset = 0 elif self.cigar_op[pos_i] in "DI": if base == "start": mapped_pos = to_pos[pos_i] - 1 elif base == "end": mapped_pos = to_pos[pos_i] mapped_pos_offset = 0 elif self.cigar_op[pos_i] == "N": if pos - from_pos[pos_i] + 1 <= from_pos[pos_i + 1] - pos: mapped_pos = to_pos[pos_i] - 1 mapped_pos_offset = pos - from_pos[pos_i] + 1 else: mapped_pos = to_pos[pos_i] mapped_pos_offset = -(from_pos[pos_i + 1] - pos) return mapped_pos, mapped_pos_offset, self.cigar_op[pos_i]
[docs] def g_to_n(self, g_interval): """convert a genomic (g.) interval to a transcript cDNA (n.) interval""" grs, gre = g_interval.start.base - 1 - self.gc_offset, g_interval.end.base - 1 - self.gc_offset # frs, fre = (f)orward (r)na (s)tart & (e)nd; forward w.r.t. genome frs, frs_offset, frs_cigar = self._map( from_pos=self.ref_pos, to_pos=self.tgt_pos, pos=grs, base="start") fre, fre_offset, fre_cigar = self._map( from_pos=self.ref_pos, to_pos=self.tgt_pos, pos=gre, base="end") if self.strand == -1: frs, fre = self.tgt_len - fre - 1, self.tgt_len - frs - 1 frs_offset, fre_offset = -fre_offset, -frs_offset # The returned interval would be uncertain when locating at alignment gaps return hgvs.location.BaseOffsetInterval( start=hgvs.location.BaseOffsetPosition( base=frs + 1, offset=frs_offset, datum=Datum.SEQ_START), end=hgvs.location.BaseOffsetPosition( base=fre + 1, offset=fre_offset, datum=Datum.SEQ_START), uncertain=frs_cigar in 'DI' or fre_cigar in 'DI')
[docs] def n_to_g(self, n_interval): """convert a transcript (n.) interval to a genomic (g.) interval""" frs, fre = n_interval.start.base - 1, n_interval.end.base - 1 start_offset, end_offset = n_interval.start.offset, n_interval.end.offset if self.strand == -1: fre, frs = self.tgt_len - frs - 1, self.tgt_len - fre - 1 start_offset, end_offset = -end_offset, -start_offset # returns the genomic range start (grs) and end (gre) grs, _, grs_cigar = self._map( from_pos=self.tgt_pos, to_pos=self.ref_pos, pos=frs, base="start") gre, _, gre_cigar = self._map( from_pos=self.tgt_pos, to_pos=self.ref_pos, pos=fre, base="end") grs, gre = grs + self.gc_offset + 1, gre + self.gc_offset + 1 gs, ge = grs + start_offset, gre + end_offset # The returned interval would be uncertain when locating at alignment gaps return hgvs.location.Interval( start=hgvs.location.SimplePosition(gs, uncertain=n_interval.start.uncertain), end=hgvs.location.SimplePosition(ge, uncertain=n_interval.end.uncertain), uncertain=grs_cigar in 'DI' or gre_cigar in 'DI')
[docs] def n_to_c(self, n_interval): """convert a transcript cDNA (n.) interval to a transcript CDS (c.) interval""" if self.cds_start_i is None: # cds_start_i defined iff cds_end_i defined; see assertion above raise HGVSUsageError( "CDS is undefined for {self.tx_ac}; cannot map to c. coordinate (non-coding transcript?)" .format(self=self)) if n_interval.start.base <= 0 or n_interval.end.base > self.tgt_len: raise HGVSInvalidIntervalError( "The given coordinate is outside the bounds of the reference sequence.") # start if n_interval.start.base <= self.cds_start_i: cs = n_interval.start.base - (self.cds_start_i + 1) cs_datum = Datum.CDS_START elif n_interval.start.base > self.cds_start_i and n_interval.start.base <= self.cds_end_i: cs = n_interval.start.base - self.cds_start_i cs_datum = Datum.CDS_START else: cs = n_interval.start.base - self.cds_end_i cs_datum = Datum.CDS_END # end if n_interval.end.base <= self.cds_start_i: ce = n_interval.end.base - (self.cds_start_i + 1) ce_datum = Datum.CDS_START elif n_interval.end.base > self.cds_start_i and n_interval.end.base <= self.cds_end_i: ce = n_interval.end.base - self.cds_start_i ce_datum = Datum.CDS_START else: ce = n_interval.end.base - self.cds_end_i ce_datum = Datum.CDS_END c_interval = hgvs.location.BaseOffsetInterval( start=hgvs.location.BaseOffsetPosition( base=cs, offset=n_interval.start.offset, datum=cs_datum), end=hgvs.location.BaseOffsetPosition( base=ce, offset=n_interval.end.offset, datum=ce_datum), uncertain=n_interval.uncertain) return c_interval
[docs] def c_to_n(self, c_interval): """convert a transcript CDS (c.) interval to a transcript cDNA (n.) interval""" if self.cds_start_i is None: # cds_start_i defined iff cds_end_i defined; see assertion above raise HGVSUsageError( "CDS is undefined for {self.tx_ac}; cannot map from c. coordinate (non-coding transcript?)" .format(self=self)) # start if c_interval.start.datum == Datum.CDS_START and c_interval.start.base < 0: r_start = c_interval.start.base + self.cds_start_i + 1 elif c_interval.start.datum == Datum.CDS_START and c_interval.start.base > 0: r_start = c_interval.start.base + self.cds_start_i elif c_interval.start.datum == Datum.CDS_END: r_start = c_interval.start.base + self.cds_end_i # end if c_interval.end.datum == Datum.CDS_START and c_interval.end.base < 0: r_end = c_interval.end.base + self.cds_start_i + 1 elif c_interval.end.datum == Datum.CDS_START and c_interval.end.base > 0: r_end = c_interval.end.base + self.cds_start_i elif c_interval.end.datum == Datum.CDS_END: r_end = c_interval.end.base + self.cds_end_i if r_start <= 0 or r_end > self.tgt_len: raise HGVSInvalidIntervalError( "The given coordinate is outside the bounds of the reference sequence.") n_interval = hgvs.location.BaseOffsetInterval( start=hgvs.location.BaseOffsetPosition( base=r_start, offset=c_interval.start.offset, datum=Datum.SEQ_START), end=hgvs.location.BaseOffsetPosition( base=r_end, offset=c_interval.end.offset, datum=Datum.SEQ_START), uncertain=c_interval.uncertain) return n_interval
[docs] def g_to_c(self, g_interval): """convert a genomic (g.) interval to a transcript CDS (c.) interval""" return self.n_to_c(self.g_to_n(g_interval))
[docs] def c_to_g(self, c_interval): """convert a transcript CDS (c.) interval to a genomic (g.) interval""" return self.n_to_g(self.c_to_n(c_interval))
@property def is_coding_transcript(self): if ((self.cds_start_i is not None) ^ (self.cds_end_i is not None)): raise HGVSError("{self.tx_ac}: CDS start_i and end_i" " must be both defined or both undefined".format(self=self)) return self.cds_start_i is not None
# <LICENSE> # Copyright 2018 HGVS Contributors (https://github.com/biocommons/hgvs) # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # </LICENSE>