Source code for vcfpy.header

# -*- coding: utf-8 -*-
"""Code for representing the VCF header part

The VCF header class structure is modeled after HTSJDK
"""

import json
import sys

from . import exceptions
from .warn_utils import WarningHelper

try:
    from cyordereddict import OrderedDict
except ImportError:
    from collections import OrderedDict

__author__ = 'Manuel Holtgrewe <manuel.holtgrewe@bihealth.de>'

# Tuples of valid entries -----------------------------------------------------
#
#: valid INFO value types
INFO_TYPES = ('Integer', 'Float', 'Flag', 'Character', 'String')
#: valid FORMAT value types
FORMAT_TYPES = ('Integer', 'Float', 'Character', 'String')
#: valid values for "Number" entries, except for integers
VALID_NUMBERS = ('A', 'R', 'G', '.')
#: header lines that contain an "ID" entry
LINES_WITH_ID = ('ALT', 'contig', 'FILTER', 'FORMAT', 'INFO', 'META',
                 'PEDIGREE', 'SAMPLE')

# Constants for "Number" entries ----------------------------------------------
#
#: number of alleles excluding reference
HEADER_NUMBER_ALLELES = 'A'
#: number of alleles including reference
HEADER_NUMBER_REF = 'R'
#: number of genotypes
HEADER_NUMBER_GENOTYPES = 'G'
#: unbounded number of values
HEADER_NUMBER_UNBOUNDED = '.'


[docs]class FieldInfo: """Core information for describing field type and number""" def __init__(self, type_, number, description=None): #: The type, one of INFO_TYPES or FORMAT_TYPES self.type = type_ #: Number description, either an int or constant self.number = number #: Description for the header field, optional self.description = description def __str__(self): return 'FieldInfo({}, {}, {})'.format( *map(repr, [self.type, self.number, self.description])) def __repr__(self): return str(self)
# Reserved INFO keys ---------------------------------------------------------- #: Reserved fields for INFO from VCF v4.3 RESERVED_INFO = { # VCF v4.3, Section 1.6.1 'AA': FieldInfo('String', 1, 'Ancestral Allele'), 'AC': FieldInfo('Integer', 'A', 'Allele count in genotypes, for each ALT allele, in the ' 'same order as listed'), 'AD': FieldInfo('Integer', 'R', 'Total read depth for each allele'), 'ADF': FieldInfo('Integer', 'R', 'Forward read depth for each allele'), 'ADR': FieldInfo('Integer', 'R', 'Reverse read depth for each allele'), 'AF': FieldInfo('Float', 'A', 'Allele frequency for each ALT allele in the same order ' 'as listed: used for estimating from primary data not ' 'called genotypes'), 'AN': FieldInfo('Integer', 1, 'Total number of alleles in called genotypes'), 'BQ': FieldInfo('Float', 1, 'RMS base quality at this position'), 'CIGAR': FieldInfo('String', 'A', 'CIGAR string describing how to align each ALT allele ' 'to the reference allele'), 'DB': FieldInfo('Flag', 0, 'dbSNP membership'), 'DP': FieldInfo('Integer', 1, 'Combined depth across samples'), 'H2': FieldInfo('Flag', 0, 'Membership in HapMap 2'), 'H3': FieldInfo('Flag', 0, 'Membership in HapMap 3'), 'MQ': FieldInfo('Integer', 1, 'RMS mapping quality'), 'MQ0': FieldInfo('Integer', 1, 'Number of MAPQ == 0 reads covering this record'), 'NS': FieldInfo('Integer', 1, 'Number of samples with data'), 'SB': FieldInfo('Integer', 4, 'Strand bias at this position'), 'SOMATIC': FieldInfo('Flag', 0, 'Indicates that the record is a somatic mutation, ' 'for cancer genomics'), 'VALIDATED': FieldInfo('Flag', 0, 'Validated by follow-up experiment'), '1000G': FieldInfo('Flag', 0, 'Membership in 1000 Genomes'), # VCF v4.3, Section 3 'IMPRECISE': FieldInfo('Flag', 0, 'Imprecise structural variation'), 'NOVEL': FieldInfo('Flag', 0, 'Indicates a novel structural variation'), 'END': FieldInfo('Integer', 1, 'End position of the variant described in this record ' '(for symbolic alleles)'), 'SVTYPE': FieldInfo('String', 1, 'Type of structural variant'), 'SVLEN': FieldInfo('Integer', 1, 'Difference in length between REF and ALT alleles'), 'CIPOS': FieldInfo('Integer', 2, 'Confidence interval around POS for imprecise ' 'variants'), 'CIEND': FieldInfo('Integer', 2, 'Confidence interval around END for imprecise ' 'variants'), 'HOMLEN': FieldInfo('Integer', '.', 'Length of base pair identical micro-homology at ' 'event breakpoints'), 'HOMSEQ': FieldInfo('String', '.', 'Sequence of base pair identical micro-homology at ' 'event breakpoints'), 'BKPTID': FieldInfo('String', '.', 'ID of the assembled alternate allele in the ' 'assembly file'), 'MEINFO': FieldInfo('String', 4, 'Mobile element info of the form ' 'NAME,START,END,POLARITY'), 'METRANS': FieldInfo('String', 4, 'Mobile element transduction info of the form ' 'CHR,START,END,POLARITY'), 'DGVID': FieldInfo('String', 1, 'ID of this element in Database of Genomic Variation'), 'DBVARID': FieldInfo('String', 1, 'ID of this element in DBVAR'), 'DBRIPID': FieldInfo('String', 1, 'ID of this element in DBRIP'), 'MATEID': FieldInfo('String', '.', 'ID of mate breakends'), 'PARID': FieldInfo('String', 1, 'ID of partner breakend'), 'EVENT': FieldInfo('String', 1, 'ID of event associated to breakend'), 'CILEN': FieldInfo('Integer', 2, 'Confidence interval around the inserted material ' 'between breakends'), 'DP': FieldInfo('Integer', 1, 'Read Depth of segment containing breakend'), 'DPADJ': FieldInfo('Integer', '.', 'Read Depth of adjacency'), 'CN': FieldInfo('Integer', 1, 'Copy number of segment containing breakend'), 'CNADJ': FieldInfo('Integer', '.', 'Copy number of adjacency'), 'CICN': FieldInfo('Integer', 2, 'Confidence interval around copy number for the ' 'segment'), 'CICNADJ': FieldInfo('Integer', '.', 'Confidence interval around copy number for the ' 'adjacency'), } # Reserved FORMAT keys -------------------------------------------------------- RESERVED_FORMAT = { # VCF v 4.3, Section 1.6.2 'AD': FieldInfo('Integer', 'R', 'Total, per-sample read depth'), 'ADF': FieldInfo('Integer', 'R', 'Forward-strand, per-sample read depth'), 'ADR': FieldInfo('Integer', 'R', 'Reverse-strand, per-sample read depth'), 'DP': FieldInfo('Integer', 1, 'Read depth at this position for this sample'), 'EC': FieldInfo('Integer', 'A', 'Expected alternate allele counts for each alternate ' 'allele'), 'FT': FieldInfo('String', '.', 'Filters applied for this sample'), 'GQ': FieldInfo('Integer', 'G', 'Phred-scale, conditional genotype quality'), 'GP': FieldInfo('Float', 'G', 'Genotype posterior probabilities'), 'GT': FieldInfo('String', 1, 'Genotype call'), 'GL': FieldInfo('Float', 'G', 'Log10-scaled likelihoods for genotypes'), 'HQ': FieldInfo('Integer', 2, 'Haplotype qualities'), 'MQ': FieldInfo('Integer', 1, 'RMS mapping quality'), 'PL': FieldInfo('Integer', 'G', 'Phred-scaled genotype likelihoods, rounded to integers'), 'PQ': FieldInfo('Integer', 1, 'Phasing quality'), 'PS': FieldInfo('Integer', 1, 'Non-negative 32 bit integer giving phasing set ' 'for this sample and this chromosome'), # VCF v4.3, Section 4 'CN': FieldInfo('Integer', 1, 'Copy number genotype for imprecise events'), 'CNQ': FieldInfo('Float', 1, 'Copy number genotype quality for imprecise events'), 'CNL': FieldInfo('Float', 'G', 'Copy number genotype likelihood for imprecise events'), 'CNP': FieldInfo('Float', 'G', 'Copy number posterior probabilities'), 'NQ': FieldInfo('Integer', 1, 'Phred style probability score that the variant is novel'), 'HAP': FieldInfo('Integer', 1, 'Unique haplotype identifier'), 'AHAP': FieldInfo('Integer', 1, 'Unique identifier of ancestral haplotype'), } # header files to enforce double-quoting for QUOTE_FIELDS = ('Description', 'Source', 'Version')
[docs]def serialize_for_header(key, value): """Serialize value for the given mapping key for a VCF header line""" if key in QUOTE_FIELDS: return json.dumps(value) elif type(value) is str: if ' ' in value or '\t' in value: return json.dumps(value) else: return value elif type(value) is list: return '[{}]'.format(', '.join(value)) else: return str(value)
[docs]def header_without_lines(header, remove): """Return :py:class:`Header` without lines given in ``remove`` ``remove`` is an iterable of pairs ``key``/``ID`` with the VCF header key and ``ID`` of entry to remove. In the case that a line does not have a ``mapping`` entry, you can give the full value to remove. """ remove = set(remove) # Copy over lines that are not removed lines = [] for line in header.lines: if hasattr(line, 'mapping'): if (line.key, line.mapping.get('ID', None)) in remove: continue # filter out else: if (line.key, line.value) in remove: continue # filter out lines.append(line) return Header(lines, header.samples)
[docs]class HeaderLine: """Base class for VCF header lines """ def __init__(self, key, value, warning_helper=WarningHelper()): #: ``str`` with key of header line self.key = key # ``str`` with raw value of header line self._value = value #: Helper for printing warnings self.warning_helper = warning_helper @property def value(self): return self._value
[docs] def serialize(self): """Return VCF-serialized version of this header line""" return ''.join(('##', self.key, '=', self.value))
def __str__(self): return 'HeaderLine({}, {})'.format( *map(repr, (self.key, self.value))) def __repr__(self): return str(self)
[docs]def mapping_to_str(mapping): """Convert mapping to string""" result = ['<'] for i, (key, value) in enumerate(mapping.items()): if i > 0: result.append(',') result += [key, '=', serialize_for_header(key, value)] result += ['>'] return ''.join(result)
[docs]class SimpleHeaderLine(HeaderLine): """Base class for simple header lines, currently contig and filter header lines :raises: :py:class:`vcfpy.exceptions.InvalidHeaderException` in the case of missing key ``"ID"`` """ def __init__(self, key, value, mapping, warning_helper=WarningHelper()): super().__init__(key, value, warning_helper) # check existence of key "ID" if 'ID' not in mapping: raise exceptions.InvalidHeaderException( 'Missing key "ID" in header line "{}={}"'.format( key, value)) #: ``collections.OrderedDict`` with key/value mapping of the attributes self.mapping = OrderedDict(mapping.items()) @property def value(self): return mapping_to_str(self.mapping)
[docs] def serialize(self): return ''.join(map(str, ['##', self.key, '=', self.value]))
def __str__(self): return 'SimpleHeaderLine({}, {}, {})'.format( *map(repr, (self.key, self.value, self.mapping)))
[docs]class AltAlleleHeaderLine(SimpleHeaderLine): """Alternative allele header line Mostly used for defining symbolic alleles for structural variants and IUPAC ambiguity codes """ @classmethod
[docs] def from_mapping(klass, mapping): """Construct from mapping, not requiring the string value""" return AltAlleleHeaderLine('ALT', mapping_to_str(mapping), mapping)
def __init__(self, key, value, mapping, warning_helper=WarningHelper()): super().__init__(key, value, mapping) #: name of the alternative allele self.id = self.mapping['ID'] def __str__(self): return 'AltAlleleHeaderLine({}, {}, {})'.format( *map(repr, (self.key, self.value, self.mapping)))
[docs]class ContigHeaderLine(SimpleHeaderLine): """Contig header line Most importantly, parses the ``'length'`` key into an integer """ @classmethod
[docs] def from_mapping(klass, mapping): """Construct from mapping, not requiring the string value""" return ContigHeaderLine('contig', mapping_to_str(mapping), mapping)
def __init__(self, key, value, mapping, warning_helper=WarningHelper()): super().__init__(key, value, mapping, warning_helper) # convert 'length' entry to integer if possible if 'length' in self.mapping: mapping['length'] = int(mapping['length']) else: self.warning_helper.warn_once( 'Field "length" not found in header line {}={}'.format( key, value)) #: name of the contig self.id = self.mapping['ID'] #: length of the contig, ``None`` if missing self.length = self.mapping.get('length') def __str__(self): return 'ContigHeaderLine({}, {}, {})'.format( *map(repr, (self.key, self.value, self.mapping)))
[docs]class FilterHeaderLine(SimpleHeaderLine): """FILTER header line """ @classmethod
[docs] def from_mapping(klass, mapping): """Construct from mapping, not requiring the string value""" return FilterHeaderLine('FILTER', mapping_to_str(mapping), mapping)
def __init__(self, key, value, mapping, warning_helper=WarningHelper()): super().__init__(key, value, mapping, warning_helper) # check for "Description" key if 'Description' not in self.mapping: self.warning_helper.warn_once( 'Field "Description" not found in header line {}={}'.format( key, value)) #: token for the filter self.id = self.mapping['ID'] #: description for the filter, ``None`` if missing self.description = self.mapping.get('Description') def __str__(self): return 'FilterHeaderLine({}, {}, {})'.format( *map(repr, (self.key, self.value, self.mapping)))
[docs]class MetaHeaderLine(SimpleHeaderLine): """Alternative allele header line Used for defining set of valid values for samples keys """ @classmethod
[docs] def from_mapping(klass, mapping): """Construct from mapping, not requiring the string value""" return MetaHeaderLine('META', mapping_to_str(mapping), mapping)
def __init__(self, key, value, mapping, warning_helper=WarningHelper()): super().__init__(key, value, mapping) #: name of the alternative allele self.id = self.mapping['ID'] def __str__(self): return 'MetaHeaderLine({}, {}, {})'.format( *map(repr, (self.key, self.value, self.mapping)))
[docs]class PedigreeHeaderLine(SimpleHeaderLine): """Header line for defining a pedigree entry """ @classmethod
[docs] def from_mapping(klass, mapping): """Construct from mapping, not requiring the string value""" return PedigreeHeaderLine('PEDIGREE', mapping_to_str(mapping), mapping)
def __init__(self, key, value, mapping, warning_helper=WarningHelper()): super().__init__(key, value, mapping) #: name of the alternative allele self.id = self.mapping['ID'] def __str__(self): return 'PedigreeHeaderLine({}, {}, {})'.format( *map(repr, (self.key, self.value, self.mapping)))
[docs]class SampleHeaderLine(SimpleHeaderLine): """Header line for defining a SAMPLE entry """ @classmethod
[docs] def from_mapping(klass, mapping): """Construct from mapping, not requiring the string value""" return PedigreeHeaderLine('SAMPLE', mapping_to_str(mapping), mapping)
def __init__(self, key, value, mapping, warning_helper=WarningHelper()): super().__init__(key, value, mapping) #: name of the alternative allele self.id = self.mapping['ID'] def __str__(self): return 'SampleHeaderLine({}, {}, {})'.format( *map(repr, (self.key, self.value, self.mapping)))
[docs]class CompoundHeaderLine(HeaderLine): """Base class for compound header lines, currently format and header lines Compound header lines describe fields that can have more than one entry. """ def __init__(self, key, value, mapping, warning_helper=WarningHelper()): super().__init__(key, value) #: OrderedDict with key/value mapping self.mapping = OrderedDict(mapping.items()) # check that 'Number' is given and use "." otherwise if 'Number' not in self.mapping: print(('[vcfpy] WARNING: missing number, using ' 'unbounded/"." instead'), file=sys.stderr) self.mapping['Number'] = '.' try: self.mapping['Number'] = self._parse_number( self.mapping['Number']) except ValueError: print(('[vcfpy] WARNING: invalid number {}, using ' 'unbounded/"." instead').format(self.mapping['Number']), file=sys.stderr) self.mapping['Number'] = '.' def _parse_number(self, number): """Parse ``number`` into an ``int`` or return ``number`` if a valid expression for a INFO/FORMAT "Number". :param str number: ``str`` to parse and check """ try: return int(number) except ValueError as e: if number in VALID_NUMBERS: return number else: raise e @property def value(self): return mapping_to_str(self.mapping)
[docs] def serialize(self): return ''.join(map(str, ['##', self.key, '=', self.value]))
def __str__(self): return 'CompoundHeaderLine({}, {}, {})'.format( *map(repr, (self.key, self.value, self.mapping)))
[docs]class InfoHeaderLine(CompoundHeaderLine): """Header line for INFO fields Note that the ``Number`` field will be parsed into an ``int`` if possible. Otherwise, the constants ``HEADER_NUMBER_*`` will be used. """ @classmethod
[docs] def from_mapping(klass, mapping): """Construct from mapping, not requiring the string value""" return InfoHeaderLine('INFO', mapping_to_str(mapping), mapping)
def __init__(self, key, value, mapping, warning_helper=WarningHelper()): super().__init__(key, value, mapping) #: key in the INFO field self.id = self.mapping['ID'] # check for "Number" field self.number = self.mapping['Number'] # check for "Type" field type_ = self.mapping.get('Type') if 'Type' not in self.mapping: self.warning_helper.warn_once( ('Field "Type" not found in header line, using String ' 'instead {}={}').format(key, value)) type_ = 'String' if 'Type' in self.mapping and type_ not in INFO_TYPES: self.warning_helper.warn_once( ('Invalid INFO value type {} in header line, using String ' 'instead, {}={}').format(self.mapping['Type'], key, value)) type_ = 'String' #: value type self.type = type_ # check for "Description" key if 'Description' not in self.mapping: self.warning_helper.warn_once( 'Field "Description" not found in header line {}={}'.format( key, value)) #: description, should be given, ``None`` if not given self.description = self.mapping.get('Description') #: source of INFO field, ``None`` if not given self.source = self.mapping.get('Source') #: version of INFO field, ``None`` if not given self.version = self.mapping.get('Version') def __str__(self): return 'InfoHeaderLine({}, {}, {})'.format( *map(repr, (self.key, self.value, self.mapping)))
[docs]class FormatHeaderLine(CompoundHeaderLine): """Header line for FORMAT fields """ @classmethod
[docs] def from_mapping(klass, mapping): """Construct from mapping, not requiring the string value""" return FormatHeaderLine('FORMAT', mapping_to_str(mapping), mapping)
def __init__(self, key, value, mapping, warning_helper=WarningHelper()): super().__init__(key, value, mapping) #: key in the INFO field self.id = self.mapping['ID'] # check for "Number" field self.number = self.mapping['Number'] # check for "Type" field type_ = self.mapping.get('Type') if 'Type' not in self.mapping: self.warning_helper.warn_once( ('Field "Type" not found in header line, using String ' 'instead {}={}').format(key, value)) type_ = 'String' if 'Type' in self.mapping and type_ not in FORMAT_TYPES: self.warning_helper.warn_once( ('Invalid INFO value type {} in header line, using String ' 'instead, {}={}').format(self.mapping['Type'], key, value)) type_ = 'String' #: value type self.type = type_ # check for "Description" key if 'Description' not in self.mapping: self.warning_helper.warn_once( 'Field "Description" not found in header line {}={}'.format( key, value)) #: description, should be given, ``None`` if not given self.description = self.mapping.get('Description') #: source of INFO field, ``None`` if not given self.source = self.mapping.get('Source') #: version of INFO field, ``None`` if not given self.version = self.mapping.get('Version') def __str__(self): return 'FormatHeaderLine({}, {}, {})'.format( *map(repr, (self.key, self.value, self.mapping)))
[docs]class SamplesInfos: """Helper class for handling and mapping of sample names to numeric indices """ def __init__(self, sample_names): #: list of sample names self.names = list(sample_names) #: mapping from sample name to index self.name_to_idx = dict([ (name, idx) for idx, name in enumerate(self.names)]) def __str__(self): tpl = 'SampleInfo(names={}, name_to_idx={})' return tpl.format(self.names, self.name_to_idx) def __repr__(self): return str(self)