Source code for vcfpy.header

# -*- coding: utf-8 -*-
"""Code for representing the VCF header part

The VCF header class structure is modeled after HTSJDK
"""

import json
import sys

from . import exceptions

try:
    from cyordereddict import OrderedDict
except ImportError:
    from collections import OrderedDict

__author__ = 'Manuel Holtgrewe <manuel.holtgrewe@bihealth.de>'

# Tuples of valid entries -----------------------------------------------------
#
#: valid INFO value types
INFO_TYPES = ('Integer', 'Float', 'Flag', 'Character', 'String')
#: valid FORMAT value types
FORMAT_TYPES = ('Integer', 'Float', 'Character', 'String')
#: valid values for "Number" entries, except for integers
VALID_NUMBERS = ('A', 'R', 'G', '.')
#: header lines that contain an "ID" entry
LINES_WITH_ID = ('ALT', 'contig', 'FILTER', 'FORMAT', 'INFO', 'META',
                 'PEDIGREE', 'SAMPLE')

# Constants for "Number" entries ----------------------------------------------
#
#: number of alleles excluding reference
HEADER_NUMBER_ALLELES = 'A'
#: number of alleles including reference
HEADER_NUMBER_REF = 'R'
#: number of genotypes
HEADER_NUMBER_GENOTYPES = 'G'
#: unbounded number of values
HEADER_NUMBER_UNBOUNDED = '.'


def _warn(msg):
    """Print warning message in case of missing attributes"""
    print('[vcfpy] WARNING: {}'.format(msg), file=sys.stderr)


# header files to enforce double-quoting for
QUOTE_FIELDS = ('Description', 'Source', 'Version')


[docs]def serialize_for_header(key, value): """Serialize value for the given mapping key for a VCF header line""" if key in QUOTE_FIELDS: return json.dumps(value) elif type(value) is str: if ' ' in value or '\t' in value: return json.dumps(value) else: return value elif type(value) is list: return '[{}]'.format(', '.join(value)) else: return str(value)
[docs]class FieldInfo: """Core information for describing field type and number""" def __init__(self, type_, number): #: The type, one of INFO_TYPES or FORMAT_TYPES self.type = type_ #: Number description, either an int or constant self.number = number def __str__(self): return 'FieldInfo({}, {})'.format(*map(repr, [self.type, self.number])) def __repr__(self): return str(self)
[docs]def header_without_lines(header, remove): """Return :py:class:`Header` without lines given in ``remove`` ``remove`` is an iterable of pairs ``key``/``ID`` with the VCF header key and ``ID`` of entry to remove. In the case that a line does not have a ``mapping`` entry, you can give the full value to remove. """ remove = set(remove) # Copy over lines that are not removed lines = [] for line in header.lines: if hasattr(line, 'mapping'): if (line.key, line.mapping.get('ID', None)) in remove: continue # filter out else: if (line.key, line.value) in remove: continue # filter out lines.append(line) return Header(lines, header.samples)
[docs]class HeaderLine: """Base class for VCF header lines """ def __init__(self, key, value): #: ``str`` with key of header line self.key = key # ``str`` with raw value of header line self._value = value @property def value(self): return self._value
[docs] def serialize(self): """Return VCF-serialized version of this header line""" return ''.join(('##', self.key, '=', self.value))
def __str__(self): return 'HeaderLine({}, {})'.format( *map(repr, (self.key, self.value))) def __repr__(self): return str(self)
[docs]def mapping_to_str(mapping): """Convert mapping to string""" result = ['<'] for i, (key, value) in enumerate(mapping.items()): if i > 0: result.append(',') result += [key, '=', serialize_for_header(key, value)] result += ['>'] return ''.join(result)
[docs]class SimpleHeaderLine(HeaderLine): """Base class for simple header lines, currently contig and filter header lines :raises: :py:class:`vcfpy.exceptions.InvalidHeaderException` in the case of missing key ``"ID"`` """ def __init__(self, key, value, mapping): super().__init__(key, value) # check existence of key "ID" if 'ID' not in mapping: raise exceptions.InvalidHeaderException( 'Missing key "ID" in header line "{}={}"'.format( key, value)) #: ``collections.OrderedDict`` with key/value mapping of the attributes self.mapping = OrderedDict(mapping.items()) @property def value(self): return mapping_to_str(self.mapping)
[docs] def serialize(self): return ''.join(map(str, ['##', self.key, '=', self.value]))
def __str__(self): return 'SimpleHeaderLine({}, {}, {})'.format( *map(repr, (self.key, self.value, self.mapping)))
[docs]class AltAlleleHeaderLine(SimpleHeaderLine): """Alternative allele header line Mostly used for defining symbolic alleles for structural variants and IUPAC ambiguity codes """ @classmethod
[docs] def from_mapping(klass, mapping): """Construct from mapping, not requiring the string value""" return AltAlleleHeaderLine('ALT', mapping_to_str(mapping), mapping)
def __init__(self, key, value, mapping): super().__init__(key, value, mapping) #: name of the alternative allele self.id = self.mapping['ID'] def __str__(self): return 'AltAlleleHeaderLine({}, {}, {})'.format( *map(repr, (self.key, self.value, self.mapping)))
[docs]class ContigHeaderLine(SimpleHeaderLine): """Contig header line Most importantly, parses the ``'length'`` key into an integer """ @classmethod
[docs] def from_mapping(klass, mapping): """Construct from mapping, not requiring the string value""" return ContigHeaderLine('contig', mapping_to_str(mapping), mapping)
def __init__(self, key, value, mapping): super().__init__(key, value, mapping) # convert 'length' entry to integer if possible if 'length' in self.mapping: mapping['length'] = int(mapping['length']) else: _warn( 'Field "length" not found in header line {}={}'.format( key, value)) #: name of the contig self.id = self.mapping['ID'] #: length of the contig, ``None`` if missing self.length = self.mapping.get('length') def __str__(self): return 'ContigHeaderLine({}, {}, {})'.format( *map(repr, (self.key, self.value, self.mapping)))
[docs]class FilterHeaderLine(SimpleHeaderLine): """FILTER header line """ @classmethod
[docs] def from_mapping(klass, mapping): """Construct from mapping, not requiring the string value""" return FilterHeaderLine('FILTER', mapping_to_str(mapping), mapping)
def __init__(self, key, value, mapping): super().__init__(key, value, mapping) # check for "Description" key if 'Description' not in self.mapping: _warn( 'Field "Description" not found in header line {}={}'.format( key, value)) #: token for the filter self.id = self.mapping['ID'] #: description for the filter, ``None`` if missing self.description = self.mapping.get('Description') def __str__(self): return 'FilterHeaderLine({}, {}, {})'.format( *map(repr, (self.key, self.value, self.mapping)))
[docs]class MetaHeaderLine(SimpleHeaderLine): """Alternative allele header line Used for defining set of valid values for samples keys """ @classmethod
[docs] def from_mapping(klass, mapping): """Construct from mapping, not requiring the string value""" return MetaHeaderLine('META', mapping_to_str(mapping), mapping)
def __init__(self, key, value, mapping): super().__init__(key, value, mapping) #: name of the alternative allele self.id = self.mapping['ID'] def __str__(self): return 'MetaHeaderLine({}, {}, {})'.format( *map(repr, (self.key, self.value, self.mapping)))
[docs]class PedigreeHeaderLine(SimpleHeaderLine): """Header line for defining a pedigree entry """ @classmethod
[docs] def from_mapping(klass, mapping): """Construct from mapping, not requiring the string value""" return PedigreeHeaderLine('PEDIGREE', mapping_to_str(mapping), mapping)
def __init__(self, key, value, mapping): super().__init__(key, value, mapping) #: name of the alternative allele self.id = self.mapping['ID'] def __str__(self): return 'PedigreeHeaderLine({}, {}, {})'.format( *map(repr, (self.key, self.value, self.mapping)))
[docs]class SampleHeaderLine(SimpleHeaderLine): """Header line for defining a SAMPLE entry """ @classmethod
[docs] def from_mapping(klass, mapping): """Construct from mapping, not requiring the string value""" return PedigreeHeaderLine('SAMPLE', mapping_to_str(mapping), mapping)
def __init__(self, key, value, mapping): super().__init__(key, value, mapping) #: name of the alternative allele self.id = self.mapping['ID'] def __str__(self): return 'SampleHeaderLine({}, {}, {})'.format( *map(repr, (self.key, self.value, self.mapping)))
[docs]class CompoundHeaderLine(HeaderLine): """Base class for compound header lines, currently format and header lines Compound header lines describe fields that can have more than one entry. """ def __init__(self, key, value, mapping): super().__init__(key, value) #: OrderedDict with key/value mapping self.mapping = OrderedDict(mapping.items()) # check that 'Number' is given and use "." otherwise if 'Number' not in self.mapping: print(('[vcfpy] WARNING: missing number, using ' 'unbounded/"." instead'), file=sys.stderr) self.mapping['Number'] = '.' try: self.mapping['Number'] = self._parse_number( self.mapping['Number']) except ValueError: print(('[vcfpy] WARNING: invalid number {}, using ' 'unbounded/"." instead').format(self.mapping['Number']), file=sys.stderr) self.mapping['Number'] = '.' def _parse_number(self, number): """Parse ``number`` into an ``int`` or return ``number`` if a valid expression for a INFO/FORMAT "Number". :param str number: ``str`` to parse and check """ try: return int(number) except ValueError as e: if number in VALID_NUMBERS: return number else: raise e @property def value(self): return mapping_to_str(self.mapping)
[docs] def serialize(self): return ''.join(map(str, ['##', self.key, '=', self.value]))
def __str__(self): return 'CompoundHeaderLine({}, {}, {})'.format( *map(repr, (self.key, self.value, self.mapping)))
[docs]class InfoHeaderLine(CompoundHeaderLine): """Header line for INFO fields Note that the ``Number`` field will be parsed into an ``int`` if possible. Otherwise, the constants ``HEADER_NUMBER_*`` will be used. """ @classmethod
[docs] def from_mapping(klass, mapping): """Construct from mapping, not requiring the string value""" return InfoHeaderLine('INFO', mapping_to_str(mapping), mapping)
def __init__(self, key, value, mapping): super().__init__(key, value, mapping) #: key in the INFO field self.id = self.mapping['ID'] # check for "Number" field self.number = self.mapping['Number'] # check for "Type" field type_ = self.mapping.get('Type') if 'Type' not in self.mapping: _warn( ('Field "Type" not found in header line, using String ' 'instead {}={}').format(key, value)) type_ = 'String' if 'Type' in self.mapping and type_ not in INFO_TYPES: _warn( ('Invalid INFO value type {} in header line, using String ' 'instead, {}={}').format(self.mapping['Type'], key, value)) type_ = 'String' #: value type self.type = type_ # check for "Description" key if 'Description' not in self.mapping: _warn( 'Field "Description" not found in header line {}={}'.format( key, value)) #: description, should be given, ``None`` if not given self.description = self.mapping.get('Description') #: source of INFO field, ``None`` if not given self.source = self.mapping.get('Source') #: version of INFO field, ``None`` if not given self.version = self.mapping.get('Version') def __str__(self): return 'InfoHeaderLine({}, {}, {})'.format( *map(repr, (self.key, self.value, self.mapping)))
[docs]class FormatHeaderLine(CompoundHeaderLine): """Header line for FORMAT fields """ @classmethod
[docs] def from_mapping(klass, mapping): """Construct from mapping, not requiring the string value""" return FormatHeaderLine('FORMAT', mapping_to_str(mapping), mapping)
def __init__(self, key, value, mapping): super().__init__(key, value, mapping) #: key in the INFO field self.id = self.mapping['ID'] # check for "Number" field self.number = self.mapping['Number'] # check for "Type" field type_ = self.mapping.get('Type') if 'Type' not in self.mapping: _warn( ('Field "Type" not found in header line, using String ' 'instead {}={}').format(key, value)) type_ = 'String' if 'Type' in self.mapping and type_ not in FORMAT_TYPES: _warn( ('Invalid INFO value type {} in header line, using String ' 'instead, {}={}').format(self.mapping['Type'], key, value)) type_ = 'String' #: value type self.type = type_ # check for "Description" key if 'Description' not in self.mapping: _warn( 'Field "Description" not found in header line {}={}'.format( key, value)) #: description, should be given, ``None`` if not given self.description = self.mapping.get('Description') #: source of INFO field, ``None`` if not given self.source = self.mapping.get('Source') #: version of INFO field, ``None`` if not given self.version = self.mapping.get('Version') def __str__(self): return 'FormatHeaderLine({}, {}, {})'.format( *map(repr, (self.key, self.value, self.mapping)))
[docs]class SamplesInfos: """Helper class for handling and mapping of sample names to numeric indices """ def __init__(self, sample_names): #: list of sample names self.names = list(sample_names) #: mapping from sample name to index self.name_to_idx = dict([ (name, idx) for idx, name in enumerate(self.names)]) def __str__(self): tpl = 'SampleInfo(names={}, name_to_idx={})' return tpl.format(self.names, self.name_to_idx) def __repr__(self): return str(self)