Source code for hgvs.normalizer

# -*- coding: utf-8 -*-
"""hgvs.normalizer
"""

from __future__ import absolute_import, division, print_function, unicode_literals

import copy

from bioutils.sequences import reverse_complement

import hgvs
import hgvs.validator
import hgvs.variantmapper
from hgvs.utils.norm import normalize_alleles
from hgvs.exceptions import HGVSDataNotAvailableError, HGVSUnsupportedOperationError, HGVSInvalidVariantError


[docs]class Normalizer(object): """Perform variant normalization """ def __init__(self, hdp, cross_boundaries=hgvs.global_config.normalizer.cross_boundaries, shuffle_direction=hgvs.global_config.normalizer.shuffle_direction, alt_aln_method=hgvs.global_config.mapping.alt_aln_method, validate=hgvs.global_config.normalizer.validate): """Initialize and configure the normalizer :param hdp: HGVS Data Provider Interface-compliant instance (see :class:`hgvs.dataproviders.interface.Interface`) :param cross_boundaries: whether allow the shuffling to cross the exon-intron boundary :param shuffle_direction: shuffling direction :param alt_aln_method: sequence alignment method (e.g., splign, blat) :param validate: whether validating the input variant before normalizing """ assert shuffle_direction == 3 or shuffle_direction == 5, \ "The shuffling direction should be 3 (3' most) or 5 (5' most)." self.hdp = hdp self.shuffle_direction = shuffle_direction self.cross_boundaries = cross_boundaries self.alt_aln_method = alt_aln_method self.validator = None if validate: self.validator = hgvs.validator.IntrinsicValidator(strict=False) self.hm = hgvs.variantmapper.VariantMapper(self.hdp)
[docs] def normalize(self, var): """Perform sequence variants normalization for single variant """ assert isinstance(var, hgvs.sequencevariant.SequenceVariant ), "variant must be a parsed HGVS sequence variant object" if self.validator: self.validator.validate(var) init_met = False if var.posedit is not None and isinstance(var.posedit, hgvs.edit.AARefAlt): init_met = var.posedit.init_met if var.posedit is None or var.posedit.uncertain or init_met or var.posedit.pos is None: return var type = var.type if type == "p": raise HGVSUnsupportedOperationError( "Unsupported normalization of protein level variants: {0}".format(var)) if var.posedit.edit.type == "con": raise HGVSUnsupportedOperationError( "Unsupported normalization of conversion variants: {0}", format(var)) var.fill_ref(self.hdp) if var.posedit.edit.type == "identity": var_norm = copy.deepcopy(var) return var_norm # For c. variants normalization, first convert to n. variant # and perform normalization at the n. level, then convert the # normalized n. variant back to c. variant. if type == "c": var = self.hm.c_to_n(var) if var.type in "nr": if var.posedit.pos.start.offset != 0 or var.posedit.pos.end.offset != 0: raise HGVSUnsupportedOperationError( "Normalization of intronic variants is not supported") # g, m, n, r sequences all use sequence start as the datum # That"s an essential assumption herein # (this is why we may have converted from c to n above) assert var.type in "gmnr", "Internal Error: variant must be of type g, m, n, r" bound_s, bound_e = self._get_boundary(var) boundary = (bound_s, bound_e) start, end, (ref, alt) = self._normalize_alleles(var, boundary) ref_len = len(ref) alt_len = len(alt) # Generate normalized variant if alt_len == ref_len: ref_start = start ref_end = end - 1 # inversion if ref_len > 1 and ref == reverse_complement(alt): edit = hgvs.edit.Inv(ref=ref) # ident elif ref_len == 0 and alt_len == 0: ref_start = ref_end edit = hgvs.edit.NARefAlt(ref=ref, alt=alt) # substitution or delins else: edit = hgvs.edit.NARefAlt(ref=ref, alt=alt) if alt_len < ref_len: # del or delins ref_start = start ref_end = end - 1 edit = hgvs.edit.NARefAlt(ref=ref, alt=None if alt_len == 0 else alt) elif alt_len > ref_len: # ins or dup if ref_len == 0: if self.shuffle_direction == 3: adj_seq = self._fetch_bounded_seq(var, start - alt_len - 1, end - 1, 0, boundary) else: adj_seq = self._fetch_bounded_seq(var, start - 1, start + alt_len - 1, 0, boundary) # ins if alt != adj_seq: ref_start = start - 1 ref_end = end edit = hgvs.edit.NARefAlt(ref=None, alt=alt) # dup else: if self.shuffle_direction == 3: ref_start = start - alt_len ref_end = end - 1 edit = hgvs.edit.Dup(ref=alt) else: ref_start = start ref_end = start + alt_len - 1 edit = hgvs.edit.Dup(ref=alt) # delins else: ref_start = start ref_end = end - 1 edit = hgvs.edit.NARefAlt(ref=ref, alt=alt) # ensure the start is not 0 if ref_start == 0: ref = self._fetch_bounded_seq(var, 0, 1, 0, boundary) alt = alt + ref edit = hgvs.edit.NARefAlt(ref=ref, alt=alt) ref_start = 1 ref_end = 1 # ensure the end is not outside of reference sequence tgt_len = self._get_tgt_length(var) if ref_end == tgt_len + 1: ref = self._fetch_bounded_seq(var, tgt_len - 1, tgt_len, 0, boundary) alt = ref + alt edit = hgvs.edit.NARefAlt(ref=ref, alt=alt) ref_start = tgt_len ref_end = tgt_len var_norm = copy.deepcopy(var) var_norm.posedit.edit = edit var_norm.posedit.pos.start.base = ref_start var_norm.posedit.pos.end.base = ref_end if type == "c": var_norm = self.hm.n_to_c(var_norm) return var_norm
def _get_boundary(self, var): """Get the position of exon-intron boundary for current variant """ if var.type == "r" or var.type == "n": if self.cross_boundaries: return 0, float("inf") else: # Get genomic sequence access number for this transcript map_info = self.hdp.get_tx_mapping_options(var.ac) if not map_info: raise HGVSDataNotAvailableError( "No mapping info available for {ac}".format(ac=var.ac)) map_info = [ item for item in map_info if item["alt_aln_method"] == self.alt_aln_method ] alt_ac = map_info[0]["alt_ac"] # Get tx info tx_info = self.hdp.get_tx_info(var.ac, alt_ac, self.alt_aln_method) cds_start = tx_info["cds_start_i"] cds_end = tx_info["cds_end_i"] # Get exon info exon_info = self.hdp.get_tx_exons(var.ac, alt_ac, self.alt_aln_method) exon_starts = [exon["tx_start_i"] for exon in exon_info] exon_ends = [exon["tx_end_i"] for exon in exon_info] exon_starts.sort() exon_ends.sort() exon_starts.append(exon_ends[-1]) exon_ends.append(float("inf")) # Find the end pos of the exon where the var locates left = 0 right = float("inf") # TODO: #242: implement methods to find tx regions for i, _ in enumerate(exon_starts): if (var.posedit.pos.start.base - 1 >= exon_starts[i] and var.posedit.pos.start.base - 1 < exon_ends[i]): break for j, _ in enumerate(exon_starts): if (var.posedit.pos.end.base - 1 >= exon_starts[j] and var.posedit.pos.end.base - 1 < exon_ends[j]): break if i != j: raise HGVSUnsupportedOperationError( "Unsupported normalization of variants spanning the exon-intron boundary ({var})" .format(var=var)) left = exon_starts[i] right = exon_ends[i] if cds_start is None: pass elif var.posedit.pos.end.base - 1 < cds_start: right = min(right, cds_start) elif var.posedit.pos.start.base - 1 >= cds_start: left = max(left, cds_start) else: raise HGVSUnsupportedOperationError( "Unsupported normalization of variants spanning the UTR-exon boundary ({var})" .format(var=var)) if cds_end is None: pass elif var.posedit.pos.start.base - 1 >= cds_end: left = max(left, cds_end) elif var.posedit.pos.end.base - 1 < cds_end: right = min(right, cds_end) else: raise HGVSUnsupportedOperationError( "Unsupported normalization of variants spanning the exon-UTR boundary ({var})" .format(var=var)) return left, right else: # For variant type of g and m etc. return 0, float("inf") def _get_tgt_length(self, var): """Get the total length of the whole reference sequence """ if var.type == "g" or var.type == "m": return float("inf") else: # Get genomic sequence access number for this transcript identity_info = self.hdp.get_tx_identity_info(var.ac) if not identity_info: raise HGVSDataNotAvailableError( "No identity info available for {ac}".format(ac=var.ac)) tgt_len = sum(identity_info["lengths"]) return tgt_len def _fetch_bounded_seq(self, var, start, end, window_size, boundary): """Fetch reference sequence from hgvs data provider. The start position is 0 and the interval is half open """ var_len = end - start - window_size start = start if start >= boundary[0] else boundary[0] end = end if end <= boundary[1] else boundary[1] if start >= end: return "" seq = self.hdp.get_seq(var.ac, start, end) if len(seq) < end - start and len(seq) < var_len: raise HGVSInvalidVariantError( "Variant span is outside sequence bounds ({var})".format(var=var)) return seq def _get_ref_alt(self, var, boundary): """Get reference allele and alternative allele of the variant """ # Get reference allele if var.posedit.edit.type == "ins" or var.posedit.edit.type == "dup": ref = "" else: # For NARefAlt and Inv if var.posedit.edit.ref_s is None or var.posedit.edit.ref == "": ref = self._fetch_bounded_seq(var, var.posedit.pos.start.base - 1, var.posedit.pos.end.base, 0, boundary) else: ref = var.posedit.edit.ref # Get alternative allele if var.posedit.edit.type == "sub" or var.posedit.edit.type == "delins" or var.posedit.edit.type == "ins": alt = var.posedit.edit.alt elif var.posedit.edit.type == "del": alt = "" elif var.posedit.edit.type == "dup": alt = var.posedit.edit.ref or self._fetch_bounded_seq( var, var.posedit.pos.start.base - 1, var.posedit.pos.end.base, 0, boundary) elif var.posedit.edit.type == "inv": alt = reverse_complement(ref) elif var.posedit.edit.type == "identity": alt = ref return ref, alt def _normalize_alleles(self, var, boundary): """Normalize the variant until it could not be shuffled """ ref, alt = self._get_ref_alt(var, boundary) win_size = hgvs.global_config.normalizer.window_size if self.shuffle_direction == 3: if var.posedit.edit.type == "ins": base = var.posedit.pos.start.base start = 1 stop = 1 elif var.posedit.edit.type == "dup": base = var.posedit.pos.end.base start = 1 stop = 1 else: base = var.posedit.pos.start.base start = 0 stop = var.posedit.pos.end.base - base + 1 while True: ref_seq = self._fetch_bounded_seq(var, base - 1, base + stop - 1 + win_size, win_size, boundary) if ref_seq == "": break orig_start, orig_stop = start, stop start, stop, (ref, alt) = normalize_alleles(ref_seq, start, stop, (ref, alt), len(ref_seq), win_size, False) if stop < len(ref_seq) or start == orig_start: break # if stop at the end of the window, try to extend the shuffling to the right base += start - orig_start stop -= start - orig_start start = orig_start elif self.shuffle_direction == 5: if var.posedit.edit.type == "ins": base = max(var.posedit.pos.start.base - win_size, 1) start = var.posedit.pos.end.base - base stop = var.posedit.pos.end.base - base elif var.posedit.edit.type == "dup": base = max(var.posedit.pos.start.base - win_size, 1) start = var.posedit.pos.end.base - base + 1 stop = var.posedit.pos.end.base - base + 1 else: base = max(var.posedit.pos.start.base - win_size, 1) start = var.posedit.pos.start.base - base stop = var.posedit.pos.end.base - base + 1 while True: if base < boundary[0] + 1: start -= boundary[0] + 1 - base stop -= boundary[0] + 1 - base base = boundary[0] + 1 ref_seq = self._fetch_bounded_seq(var, base - 1, base + stop - 1, start, boundary) if ref_seq == "": break orig_start, orig_stop = start, stop start, stop, (ref, alt) = normalize_alleles(ref_seq, start, stop, (ref, alt), 0, win_size, True) if start > 0 or stop == orig_stop: break # if stop at the end of the window, try to extend the shuffling to the left base -= orig_stop - stop start += orig_stop - stop stop = orig_stop return base + start, base + stop, (ref, alt)
if __name__ == "__main__": hgvsparser = Parser() var = hgvsparser.parse_hgvs_variant("NM_001166478.1:c.61delG") hdp = connect() norm = Normalizer(hdp, shuffle_direction=5, cross_boundaries=False) res = norm.normalize(var) print(str(var) + " => " + str(res)) # <LICENSE> # Copyright 2018 HGVS Contributors (https://github.com/biocommons/hgvs) # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # </LICENSE>