Source code for vcfpy.header
# -*- coding: utf-8 -*-
"""Code for representing the VCF header part
The VCF header class structure is modeled after HTSJDK
"""
import json
import sys
from . import exceptions
try:
from cyordereddict import OrderedDict
except ImportError:
from collections import OrderedDict
__author__ = 'Manuel Holtgrewe <manuel.holtgrewe@bihealth.de>'
# Tuples of valid entries -----------------------------------------------------
#
#: valid INFO value types
INFO_TYPES = ('Integer', 'Float', 'Flag', 'Character', 'String')
#: valid FORMAT value types
FORMAT_TYPES = ('Integer', 'Float', 'Character', 'String')
#: valid values for "Number" entries, except for integers
VALID_NUMBERS = ('A', 'R', 'G', '.')
#: header lines that contain an "ID" entry
LINES_WITH_ID = ('ALT', 'contig', 'FILTER', 'FORMAT', 'INFO', 'META',
'PEDIGREE', 'SAMPLE')
# Constants for "Number" entries ----------------------------------------------
#
#: number of alleles excluding reference
HEADER_NUMBER_ALLELES = 'A'
#: number of alleles including reference
HEADER_NUMBER_REF = 'R'
#: number of genotypes
HEADER_NUMBER_GENOTYPES = 'G'
#: unbounded number of values
HEADER_NUMBER_UNBOUNDED = '.'
def _warn(msg):
"""Print warning message in case of missing attributes"""
print('[vcfpy] WARNING: {}'.format(msg), file=sys.stderr)
# header files to enforce double-quoting for
QUOTE_FIELDS = ('Description', 'Source', 'Version')
[docs]def serialize_for_header(key, value):
"""Serialize value for the given mapping key for a VCF header line"""
if key in QUOTE_FIELDS:
return json.dumps(value)
elif type(value) is str:
if ' ' in value or '\t' in value:
return json.dumps(value)
else:
return value
elif type(value) is list:
return '[{}]'.format(', '.join(value))
else:
return str(value)
[docs]class FieldInfo:
"""Core information for describing field type and number"""
def __init__(self, type_, number):
#: The type, one of INFO_TYPES or FORMAT_TYPES
self.type = type_
#: Number description, either an int or constant
self.number = number
def __str__(self):
return 'FieldInfo({}, {})'.format(*map(repr, [self.type, self.number]))
def __repr__(self):
return str(self)
[docs]def header_without_lines(header, remove):
"""Return :py:class:`Header` without lines given in ``remove``
``remove`` is an iterable of pairs ``key``/``ID`` with the VCF header key
and ``ID`` of entry to remove. In the case that a line does not have
a ``mapping`` entry, you can give the full value to remove.
"""
remove = set(remove)
# Copy over lines that are not removed
lines = []
for line in header.lines:
if hasattr(line, 'mapping'):
if (line.key, line.mapping.get('ID', None)) in remove:
continue # filter out
else:
if (line.key, line.value) in remove:
continue # filter out
lines.append(line)
return Header(lines, header.samples)
[docs]class Header:
"""Represent header of VCF file
While this class allows mutating records, it should not be changed once it
has been assigned to
This class provides function for adding lines to a header and updating the
supporting index data structures. There is no explicit API for removing
header lines, the best way is to reconstruct a new ``Header`` instance with
a filtered list of header lines.
"""
def __init__(self, lines=[], samples=None):
#: ``list`` of :py:HeaderLine objects
self.lines = list(lines)
#: :py:class:`SamplesInfo` object
self.samples = samples
# build indices for the different field types
self._indices = self._build_indices()
def _build_indices(self):
"""Build indices for the different field types"""
result = {key: OrderedDict() for key in LINES_WITH_ID}
for line in self.lines:
if line.key in LINES_WITH_ID:
result.setdefault(line.key, OrderedDict())
if line.mapping['ID'] in result[line.key]:
_warn(('Seen {} header more than once: {}, using first'
'occurence').format(line.key, line.mapping['ID']))
else:
result[line.key][line.mapping['ID']] = line
else:
result.setdefault(line.key, [])
result[line.key].append(line)
return result
[docs] def add_filter_line(self, mapping):
"""Add FILTER header line constructed from the given mapping"""
self.add_line(FilterHeaderLine.from_mapping(mapping))
[docs] def add_contig_line(self, mapping):
"""Add "contig" header line constructed from the given mapping"""
self.add_line(ContigHeaderLine.from_mapping(mapping))
[docs] def add_info_line(self, mapping):
"""Add INFO header line constructed from the given mapping"""
self.add_line(InfoHeaderLine.from_mapping(mapping))
[docs] def add_format_line(self, mapping):
"""Add FORMAT header line constructed from the given mapping"""
self.add_line(FormatHeaderLine.from_mapping(mapping))
[docs] def format_ids(self):
"""Return list of all format IDs"""
return list(self._indices['FORMAT'].keys())
[docs] def filter_ids(self):
"""Return list of all filter IDs"""
return list(self._indices['FILTER'].keys())
[docs] def info_ids(self):
"""Return list of all info IDs"""
return list(self._indices['INFO'].keys())
[docs] def get_lines(self, key):
"""Return header lines having the given ``key`` as their type"""
if key in self._indices:
return self._indices[key].values()
else:
return []
[docs] def add_line(self, header_line):
"""Add header line, updating any necessary support indices"""
self.lines.append(header_line)
self._indices.setdefault(header_line.key, OrderedDict())
if not hasattr(header_line, 'mapping'):
return # no registration required
if header_line.mapping['ID'] in self._indices[header_line.key]:
_warn(('Detected duplicate header line with type {} and ID {}. '
'Ignoring this and subsequent one').format(
header_line.key, header_line.mapping['ID']))
else:
self._indices[header_line.key][
header_line.mapping['ID']] = header_line
[docs] def get_info_field_info(self, key):
"""Return :py:class:`FieldInfo` for the given INFO field"""
return self._get_field_info('INFO', key)
[docs] def get_format_field_info(self, key):
"""Return :py:class:`FieldInfo` for the given INFO field"""
return self._get_field_info('FORMAT', key)
def _get_field_info(self, type_, key):
result = self._indices[type_].get(key)
if result:
return result
_warn('{} {} not found using String/"." instead'.format(
type_, key))
return FieldInfo('String', HEADER_NUMBER_UNBOUNDED)
def __str__(self):
tpl = 'Header(lines={}, samples={})'
return tpl.format(*map(repr, (self.lines, self.samples)))
def __repr__(self):
return str(self)
[docs]class HeaderLine:
"""Base class for VCF header lines
"""
def __init__(self, key, value):
#: ``str`` with key of header line
self.key = key
# ``str`` with raw value of header line
self._value = value
@property
def value(self):
return self._value
[docs] def serialize(self):
"""Return VCF-serialized version of this header line"""
return ''.join(('##', self.key, '=', self.value))
def __str__(self):
return 'HeaderLine({}, {})'.format(
*map(repr, (self.key, self.value)))
def __repr__(self):
return str(self)
[docs]def mapping_to_str(mapping):
"""Convert mapping to string"""
result = ['<']
for i, (key, value) in enumerate(mapping.items()):
if i > 0:
result.append(',')
result += [key, '=', serialize_for_header(key, value)]
result += ['>']
return ''.join(result)
[docs]class SimpleHeaderLine(HeaderLine):
"""Base class for simple header lines, currently contig and filter
header lines
:raises: :py:class:`vcfpy.exceptions.InvalidHeaderException` in
the case of missing key ``"ID"``
"""
def __init__(self, key, value, mapping):
super().__init__(key, value)
# check existence of key "ID"
if 'ID' not in mapping:
raise exceptions.InvalidHeaderException(
'Missing key "ID" in header line "{}={}"'.format(
key, value))
#: ``collections.OrderedDict`` with key/value mapping of the attributes
self.mapping = OrderedDict(mapping.items())
@property
def value(self):
return mapping_to_str(self.mapping)
def __str__(self):
return 'SimpleHeaderLine({}, {}, {})'.format(
*map(repr, (self.key, self.value, self.mapping)))
[docs]class AltAlleleHeaderLine(SimpleHeaderLine):
"""Alternative allele header line
Mostly used for defining symbolic alleles for structural variants and
IUPAC ambiguity codes
"""
@classmethod
[docs] def from_mapping(klass, mapping):
"""Construct from mapping, not requiring the string value"""
return AltAlleleHeaderLine('ALT', mapping_to_str(mapping), mapping)
def __init__(self, key, value, mapping):
super().__init__(key, value, mapping)
#: name of the alternative allele
self.id = self.mapping['ID']
def __str__(self):
return 'AltAlleleHeaderLine({}, {}, {})'.format(
*map(repr, (self.key, self.value, self.mapping)))
[docs]class ContigHeaderLine(SimpleHeaderLine):
"""Contig header line
Most importantly, parses the ``'length'`` key into an integer
"""
@classmethod
[docs] def from_mapping(klass, mapping):
"""Construct from mapping, not requiring the string value"""
return ContigHeaderLine('contig', mapping_to_str(mapping), mapping)
def __init__(self, key, value, mapping):
super().__init__(key, value, mapping)
# convert 'length' entry to integer if possible
if 'length' in self.mapping:
mapping['length'] = int(mapping['length'])
else:
_warn(
'Field "length" not found in header line {}={}'.format(
key, value))
#: name of the contig
self.id = self.mapping['ID']
#: length of the contig, ``None`` if missing
self.length = self.mapping.get('length')
def __str__(self):
return 'ContigHeaderLine({}, {}, {})'.format(
*map(repr, (self.key, self.value, self.mapping)))
[docs]class FilterHeaderLine(SimpleHeaderLine):
"""FILTER header line
"""
@classmethod
[docs] def from_mapping(klass, mapping):
"""Construct from mapping, not requiring the string value"""
return FilterHeaderLine('FILTER', mapping_to_str(mapping), mapping)
def __init__(self, key, value, mapping):
super().__init__(key, value, mapping)
# check for "Description" key
if 'Description' not in self.mapping:
_warn(
'Field "Description" not found in header line {}={}'.format(
key, value))
#: token for the filter
self.id = self.mapping['ID']
#: description for the filter, ``None`` if missing
self.description = self.mapping.get('Description')
def __str__(self):
return 'FilterHeaderLine({}, {}, {})'.format(
*map(repr, (self.key, self.value, self.mapping)))
[docs]class MetaHeaderLine(SimpleHeaderLine):
"""Alternative allele header line
Used for defining set of valid values for samples keys
"""
@classmethod
[docs] def from_mapping(klass, mapping):
"""Construct from mapping, not requiring the string value"""
return MetaHeaderLine('META', mapping_to_str(mapping), mapping)
def __init__(self, key, value, mapping):
super().__init__(key, value, mapping)
#: name of the alternative allele
self.id = self.mapping['ID']
def __str__(self):
return 'MetaHeaderLine({}, {}, {})'.format(
*map(repr, (self.key, self.value, self.mapping)))
[docs]class PedigreeHeaderLine(SimpleHeaderLine):
"""Header line for defining a pedigree entry
"""
@classmethod
[docs] def from_mapping(klass, mapping):
"""Construct from mapping, not requiring the string value"""
return PedigreeHeaderLine('PEDIGREE', mapping_to_str(mapping), mapping)
def __init__(self, key, value, mapping):
super().__init__(key, value, mapping)
#: name of the alternative allele
self.id = self.mapping['ID']
def __str__(self):
return 'PedigreeHeaderLine({}, {}, {})'.format(
*map(repr, (self.key, self.value, self.mapping)))
[docs]class SampleHeaderLine(SimpleHeaderLine):
"""Header line for defining a SAMPLE entry
"""
@classmethod
[docs] def from_mapping(klass, mapping):
"""Construct from mapping, not requiring the string value"""
return PedigreeHeaderLine('SAMPLE', mapping_to_str(mapping), mapping)
def __init__(self, key, value, mapping):
super().__init__(key, value, mapping)
#: name of the alternative allele
self.id = self.mapping['ID']
def __str__(self):
return 'SampleHeaderLine({}, {}, {})'.format(
*map(repr, (self.key, self.value, self.mapping)))
[docs]class CompoundHeaderLine(HeaderLine):
"""Base class for compound header lines, currently format and header lines
Compound header lines describe fields that can have more than one entry.
"""
def __init__(self, key, value, mapping):
super().__init__(key, value)
#: OrderedDict with key/value mapping
self.mapping = OrderedDict(mapping.items())
# check that 'Number' is given and use "." otherwise
if 'Number' not in self.mapping:
print(('[vcfpy] WARNING: missing number, using '
'unbounded/"." instead'), file=sys.stderr)
self.mapping['Number'] = '.'
try:
self.mapping['Number'] = self._parse_number(
self.mapping['Number'])
except ValueError:
print(('[vcfpy] WARNING: invalid number {}, using '
'unbounded/"." instead').format(self.mapping['Number']),
file=sys.stderr)
self.mapping['Number'] = '.'
def _parse_number(self, number):
"""Parse ``number`` into an ``int`` or return ``number`` if a valid
expression for a INFO/FORMAT "Number".
:param str number: ``str`` to parse and check
"""
try:
return int(number)
except ValueError as e:
if number in VALID_NUMBERS:
return number
else:
raise e
@property
def value(self):
return mapping_to_str(self.mapping)
def __str__(self):
return 'CompoundHeaderLine({}, {}, {})'.format(
*map(repr, (self.key, self.value, self.mapping)))
[docs]class InfoHeaderLine(CompoundHeaderLine):
"""Header line for INFO fields
Note that the ``Number`` field will be parsed into an ``int`` if
possible. Otherwise, the constants ``HEADER_NUMBER_*`` will be used.
"""
@classmethod
[docs] def from_mapping(klass, mapping):
"""Construct from mapping, not requiring the string value"""
return InfoHeaderLine('INFO', mapping_to_str(mapping), mapping)
def __init__(self, key, value, mapping):
super().__init__(key, value, mapping)
#: key in the INFO field
self.id = self.mapping['ID']
# check for "Number" field
self.number = self.mapping['Number']
# check for "Type" field
type_ = self.mapping.get('Type')
if 'Type' not in self.mapping:
_warn(
('Field "Type" not found in header line, using String '
'instead {}={}').format(key, value))
type_ = 'String'
if 'Type' in self.mapping and type_ not in INFO_TYPES:
_warn(
('Invalid INFO value type {} in header line, using String '
'instead, {}={}').format(self.mapping['Type'], key, value))
type_ = 'String'
#: value type
self.type = type_
# check for "Description" key
if 'Description' not in self.mapping:
_warn(
'Field "Description" not found in header line {}={}'.format(
key, value))
#: description, should be given, ``None`` if not given
self.description = self.mapping.get('Description')
#: source of INFO field, ``None`` if not given
self.source = self.mapping.get('Source')
#: version of INFO field, ``None`` if not given
self.version = self.mapping.get('Version')
def __str__(self):
return 'InfoHeaderLine({}, {}, {})'.format(
*map(repr, (self.key, self.value, self.mapping)))
[docs]class FormatHeaderLine(CompoundHeaderLine):
"""Header line for FORMAT fields
"""
@classmethod
[docs] def from_mapping(klass, mapping):
"""Construct from mapping, not requiring the string value"""
return FormatHeaderLine('FORMAT', mapping_to_str(mapping), mapping)
def __init__(self, key, value, mapping):
super().__init__(key, value, mapping)
#: key in the INFO field
self.id = self.mapping['ID']
# check for "Number" field
self.number = self.mapping['Number']
# check for "Type" field
type_ = self.mapping.get('Type')
if 'Type' not in self.mapping:
_warn(
('Field "Type" not found in header line, using String '
'instead {}={}').format(key, value))
type_ = 'String'
if 'Type' in self.mapping and type_ not in FORMAT_TYPES:
_warn(
('Invalid INFO value type {} in header line, using String '
'instead, {}={}').format(self.mapping['Type'], key, value))
type_ = 'String'
#: value type
self.type = type_
# check for "Description" key
if 'Description' not in self.mapping:
_warn(
'Field "Description" not found in header line {}={}'.format(
key, value))
#: description, should be given, ``None`` if not given
self.description = self.mapping.get('Description')
#: source of INFO field, ``None`` if not given
self.source = self.mapping.get('Source')
#: version of INFO field, ``None`` if not given
self.version = self.mapping.get('Version')
def __str__(self):
return 'FormatHeaderLine({}, {}, {})'.format(
*map(repr, (self.key, self.value, self.mapping)))
[docs]class SamplesInfos:
"""Helper class for handling and mapping of sample names to numeric indices
"""
def __init__(self, sample_names):
#: list of sample names
self.names = list(sample_names)
#: mapping from sample name to index
self.name_to_idx = dict([
(name, idx) for idx, name in enumerate(self.names)])
def __str__(self):
tpl = 'SampleInfo(names={}, name_to_idx={})'
return tpl.format(self.names, self.name_to_idx)
def __repr__(self):
return str(self)