Source code for vcfpy.header
# -*- coding: utf-8 -*-
"""Code for representing the VCF header part
The VCF header class structure is modeled after HTSJDK
"""
import json
import sys
from . import exceptions
try:
from cyordereddict import OrderedDict
except ImportError:
from collections import OrderedDict
__author__ = 'Manuel Holtgrewe <manuel.holtgrewe@bihealth.de>'
# Tuples of valid entries -----------------------------------------------------
#
#: valid INFO value types
INFO_TYPES = ('Integer', 'Float', 'Flag', 'Character', 'String')
#: valid FORMAT value types
FORMAT_TYPES = ('Integer', 'Float', 'Character', 'String')
#: valid values for "Number" entries, except for integers
VALID_NUMBERS = ('A', 'R', 'G', '.')
#: header lines that contain an "ID" entry
LINES_WITH_ID = ('FORMAT', 'INFO', 'FILTER', 'contig')
# Constants for "Number" entries ----------------------------------------------
#
#: number of alleles excluding reference
HEADER_NUMBER_ALLELES = 'A'
#: number of alleles including reference
HEADER_NUMBER_REF = 'R'
#: number of genotypes
HEADER_NUMBER_GENOTYPES = 'G'
#: unbounded number of values
HEADER_NUMBER_UNBOUNDED = '.'
def _warn(msg):
"""Print warning message in case of missing attributes"""
print('[vcfpy] WARNING: {}'.format(msg), file=sys.stderr)
# header files to enforce double-quoting for
QUOTE_FIELDS = ('Description', 'Source', 'Version')
[docs]def serialize_for_header(key, value):
"""Serialize value for the given mapping key for a VCF header line"""
if key in QUOTE_FIELDS:
return json.dumps(value)
elif type(value) is str:
if ' ' in value or '\t' in value:
return json.dumps(value)
else:
return value
else:
return str(value)
[docs]class FieldInfo:
"""Core information for describing field type and number"""
def __init__(self, type_, number):
#: The type, one of INFO_TYPES or FORMAT_TYPES
self.type = type_
#: Number description, either an int or constant
self.number = number
def __str__(self):
return 'FieldInfo({}, {})'.format(*map(repr, [self.type, self.number]))
def __repr__(self):
return str(self)
[docs]class VCFHeader:
"""Represent header of VCF file
While this class allows mutating records, it should not be changed once it
has been assigned to
"""
def __init__(self, lines=[], samples=None):
#: ``list`` of :py:VCFHeaderLine objects
self.lines = list(lines)
#: :py:class:`SamplesInfo` object
self.samples = samples
# build indices for the different field types
self._indices = self._build_indices()
def _build_indices(self):
"""Build indices for the different field types"""
result = {}
for line in self.lines:
if line.key in LINES_WITH_ID:
result.setdefault(line.key, {})
if line.mapping['ID'] in result[line.key]:
_warn(('Seen {} header more than once: {}, using first'
'occurence').format(line.key, line.mapping['ID']))
else:
result[line.key][line.mapping['ID']] = line
else:
result.setdefault(line.key, [])
result[line.key].append(line)
return result
[docs] def get_info_field_info(self, key):
"""Return :py:class:`FieldInfo` for the given INFO field"""
return self._get_field_info('INFO', key)
[docs] def get_format_field_info(self, key):
"""Return :py:class:`FieldInfo` for the given INFO field"""
return self._get_field_info('FORMAT', key)
def _get_field_info(self, type_, key):
result = self._indices[type_].get(key)
if result:
return result
_warn('{} {} not found using String/"." instead'.format(
type_, key))
return FieldInfo('String', HEADER_NUMBER_UNBOUNDED)
def __str__(self):
tpl = 'VCFHeader(lines={}, samples={})'
return tpl.format(*map(repr, (self.lines, self.samples)))
def __repr__(self):
return str(self)
[docs]class VCFHeaderLine:
"""Base class for VCF header lines
"""
def __init__(self, key, value):
#: ``str`` with key of header line
self.key = key
#: ``str`` with raw value of header line
self.value = value
[docs] def serialize(self):
"""Return VCF-serialized version of this header line"""
return ''.join(('##', self.key, '=', self.value))
def __str__(self):
return 'VCFHeaderLine({}, {})'.format(
*map(repr, (self.key, self.value)))
def __repr__(self):
return str(self)
[docs]class VCFSimpleHeaderLine(VCFHeaderLine):
"""Base class for simple header lines, currently contig and filter
header lines
:raises: :py:class:`vcfpy.exceptions.InvalidHeaderException` in
the case of missing key ``"ID"``
"""
def __init__(self, key, value, mapping):
super().__init__(key, value)
# check existence of key "ID"
if 'ID' not in mapping:
raise exceptions.InvalidHeaderException(
'Missing key "ID" in header line "{}={}"'.format(
key, value))
#: ``collections.OrderedDict`` with key/value mapping of the attributes
self.mapping = OrderedDict(mapping.items())
[docs] def serialize(self):
result = ['##', self.key, '=<']
for i, (key, value) in enumerate(self.mapping.items()):
if i > 0:
result.append(',')
result += [key, '=', serialize_for_header(key, value)]
result += ['>']
return ''.join(map(str, result))
def __str__(self):
return 'VCFSimpleHeaderLine({}, {}, {})'.format(
*map(repr, (self.key, self.value, self.mapping)))
[docs]class VCFContigHeaderLine(VCFSimpleHeaderLine):
"""Contig header line
Most importantly, parses the ``'length'`` key into an integer
"""
def __init__(self, key, value, mapping):
super().__init__(key, value, mapping)
# convert 'length' entry to integer if possible
if 'length' in self.mapping:
mapping['length'] = int(mapping['length'])
else:
_warn(
'Field "length" not found in header line {}={}'.format(
key, value))
#: name of the contig
self.id = self.mapping['ID']
#: length of the contig, ``None`` if missing
self.length = self.mapping.get('length')
def __str__(self):
return 'VCFContigHeaderLine({}, {}, {})'.format(
*map(repr, (self.key, self.value, self.mapping)))
[docs]class VCFFilterHeaderLine(VCFSimpleHeaderLine):
"""FILTER header line
"""
def __init__(self, key, value, mapping):
super().__init__(key, value, mapping)
# check for "Description" key
if 'Description' not in self.mapping:
_warn(
'Field "Description" not found in header line {}={}'.format(
key, value))
#: token for the filter
self.id = self.mapping['ID']
#: description for the filter, ``None`` if missing
self.description = self.mapping.get('Description')
def __str__(self):
return 'VCFFilterHeaderLine({}, {}, {})'.format(
*map(repr, (self.key, self.value, self.mapping)))
[docs]class VCFCompoundHeaderLine(VCFHeaderLine):
"""Base class for compound header lines, currently format and header lines
Compound header lines describe fields that can have more than one entry.
"""
def __init__(self, key, value, mapping):
super().__init__(key, value)
#: OrderedDict with key/value mapping
self.mapping = OrderedDict(mapping.items())
# check that 'Number' is given and use "." otherwise
if 'Number' not in self.mapping:
print(('[vcfpy] WARNING: missing number, using '
'unbounded/"." instead'), file=sys.stderr)
self.mapping['Number'] = '.'
try:
self.mapping['Number'] = self._parse_number(
self.mapping['Number'])
except ValueError:
print(('[vcfpy] WARNING: invalid number {}, using '
'unbounded/"." instead').format(self.mapping['Number']),
file=sys.stderr)
self.mapping['Number'] = '.'
def _parse_number(self, number):
"""Parse ``number`` into an ``int`` or return ``number`` if a valid
expression for a INFO/FORMAT "Number".
:param str number: ``str`` to parse and check
"""
try:
return int(number)
except ValueError as e:
if number in VALID_NUMBERS:
return number
else:
raise e
[docs] def serialize(self):
result = ['##', self.key, '=<']
for i, (key, value) in enumerate(self.mapping.items()):
if i > 0:
result.append(',')
result += [key, '=', serialize_for_header(key, value)]
result += ['>']
return ''.join(map(str, result))
def __str__(self):
return 'VCFCompoundHeaderLine({}, {}, {})'.format(
*map(repr, (self.key, self.value, self.mapping)))
[docs]class VCFInfoHeaderLine(VCFCompoundHeaderLine):
"""Header line for INFO fields
Note that the ``Number`` field will be parsed into an ``int`` if
possible. Otherwise, the constants ``HEADER_NUMBER_*`` will be used.
"""
def __init__(self, key, value, mapping):
super().__init__(key, value, mapping)
#: key in the INFO field
self.id = self.mapping['ID']
# check for "Number" field
self.number = self.mapping['Number']
# check for "Type" field
type_ = self.mapping.get('Type')
if 'Type' not in self.mapping:
_warn(
('Field "Type" not found in header line, using String '
'instead {}={}').format(key, value))
type_ = 'String'
if 'Type' in self.mapping and type_ not in INFO_TYPES:
_warn(
('Invalid INFO value type {} in header line, using String '
'instead, {}={}').format(self.mapping['Type'], key, value))
type_ = 'String'
#: value type
self.type = type_
# check for "Description" key
if 'Description' not in self.mapping:
_warn(
'Field "Description" not found in header line {}={}'.format(
key, value))
#: description, should be given, ``None`` if not given
self.description = self.mapping.get('Description')
#: source of INFO field, ``None`` if not given
self.source = self.mapping.get('Source')
#: version of INFO field, ``None`` if not given
self.version = self.mapping.get('Version')
def __str__(self):
return 'VCFInfoHeaderLine({}, {}, {})'.format(
*map(repr, (self.key, self.value, self.mapping)))
[docs]class VCFFormatHeaderLine(VCFCompoundHeaderLine):
"""Header line for FORMAT fields
"""
def __init__(self, key, value, mapping):
super().__init__(key, value, mapping)
#: key in the INFO field
self.id = self.mapping['ID']
# check for "Number" field
self.number = self.mapping['Number']
# check for "Type" field
type_ = self.mapping.get('Type')
if 'Type' not in self.mapping:
_warn(
('Field "Type" not found in header line, using String '
'instead {}={}').format(key, value))
type_ = 'String'
if 'Type' in self.mapping and type_ not in FORMAT_TYPES:
_warn(
('Invalid INFO value type {} in header line, using String '
'instead, {}={}').format(self.mapping['Type'], key, value))
type_ = 'String'
#: value type
self.type = type_
# check for "Description" key
if 'Description' not in self.mapping:
_warn(
'Field "Description" not found in header line {}={}'.format(
key, value))
#: description, should be given, ``None`` if not given
self.description = self.mapping.get('Description')
#: source of INFO field, ``None`` if not given
self.source = self.mapping.get('Source')
#: version of INFO field, ``None`` if not given
self.version = self.mapping.get('Version')
def __str__(self):
return 'VCFFormatHeaderLine({}, {}, {})'.format(
*map(repr, (self.key, self.value, self.mapping)))
[docs]class SamplesInfos:
"""Helper class for handling and mapping of sample names to numeric indices
"""
def __init__(self, sample_names):
#: list of sample names
self.names = list(sample_names)
#: mapping from sample name to index
self.name_to_idx = dict([
(name, idx) for idx, name in enumerate(self.names)])
def __str__(self):
tpl = 'SampleInfo(names={}, name_to_idx={})'
return tpl.format(self.names, self.name_to_idx)
def __repr__(self):
return str(self)