# -*- coding: utf-8 -*-
"""Parsing of VCF files from ``str``
"""
import ast
import collections
import itertools
import sys
from . import header
from . import record
from . import exceptions
try:
from cyordereddict import OrderedDict
except ImportError:
from collections import OrderedDict
__author__ = 'Manuel Holtgrewe <manuel.holtgrewe@bihealth.de>'
# expected "#CHROM" header prefix when there are samples
REQUIRE_SAMPLE_HEADER = (
'#CHROM', 'POS', 'ID', 'REF', 'ALT', 'QUAL', 'FILTER', 'INFO', 'FORMAT')
# expected "#CHROM" header prefix when there are no samples
REQUIRE_NO_SAMPLE_HEADER = (
'#CHROM', 'POS', 'ID', 'REF', 'ALT', 'QUAL', 'FILTER', 'INFO')
#: Supported VCF versions, a warning will be issued otherwise
SUPPORTED_VCF_VERSIONS = (
'VCFv4.0', 'VCFv4.1', 'VCFv4.2', 'VCFv4.3')
def _warn(msg):
"""Print warning message"""
print('[vcfpy] WARNING: {}'.format(msg), file=sys.stderr)
[docs]def split_quoted_string(s, delim=',', quote='"'):
"""Split string ``s`` at delimiter, correctly interpreting quotes"""
# collect positions
begins, ends = [0], []
# run state automaton
NORMAL, QUOTED, ESCAPED, DELIM = 0, 1, 2, 3
state = NORMAL
for pos, c in enumerate(s):
if state == NORMAL:
if c == delim:
ends.append(pos)
state = DELIM
elif c == quote:
state = QUOTED
else:
pass # noop
elif state == QUOTED:
if c == '\\':
state = ESCAPED
elif c == quote:
state = NORMAL
else:
pass # noop
elif state == DELIM:
begins.append(pos)
state = NORMAL
else: # state == ESCAPED
state = QUOTED
ends.append(len(s))
assert len(begins) == len(ends)
# Build resulting list
return [s[start:end] for start, end in zip(begins, ends)]
[docs]def parse_mapping(value):
"""Parse the given VCF header line mapping
Such a mapping consists of "key=value" pairs, separated by commas and
wrapped into angular brackets ("<...>"). Strings are usually quoted,
for certain known keys, exceptions are made, depending on the tag key.
this, however, only gets important when serializing.
:raises: :py:class:`vcfpy.exceptions.InvalidHeaderException` if
there was a problem parsing the file
"""
if not value.startswith('<') or not value.endswith('>'):
raise exceptions.InvalidHeaderException(
'Header mapping value was not wrapped in angular brackets')
# split the comma-separated list into pairs, ignoring commas in quotes
pairs = split_quoted_string(value[1:-1], delim=',', quote='"')
# split these pairs into key/value pairs, converting flags to mappings
# to True
key_values = []
for pair in pairs:
if '=' in pair:
key, value = pair.split('=', 1)
key = key.strip() # XXX lenient parsing
if value.startswith('"') and value.endswith('"'):
value = ast.literal_eval(value)
else:
key, value = pair, True
key_values.append((key, value))
# return completely parsed mapping as OrderedDict
return OrderedDict(key_values)
# Parsers to use for each VCF header type (given left of '=')
HEADER_PARSERS = {
'FILTER': MappingHeaderLineParser(header.FilterHeaderLine),
'FORMAT': MappingHeaderLineParser(header.FormatHeaderLine),
'INFO': MappingHeaderLineParser(header.InfoHeaderLine),
'contig': MappingHeaderLineParser(header.ContigHeaderLine),
'__default__': StupidHeaderLineParser(), # fallback
# 'ALT': None,
# 'assembly': None,
# 'META': None,
# 'SAMPLE': None,
# 'PEDIGREE': None,
# 'pedigreeDB': None,
}
# Field value converters
_CONVERTERS = {
'Integer': int,
'Float': float,
'Flag': lambda x: True,
'Character': str,
'String': str,
}
[docs]def convert_field_value(key, type_, value):
"""Convert atomic field value according to the type"""
if value == '.':
return None
else:
return _CONVERTERS[type_](value)
[docs]def parse_field_value(key, field_info, value):
"""Parse ``value`` according to ``field_info``
"""
# XXX lenient parsing, ignoring if counts differ from field_info
if field_info.type == 'Flag':
assert value is True
return True
elif field_info.number == 1:
return convert_field_value(key, field_info.type, value)
else:
if value == '.':
return []
else:
return [convert_field_value(key, field_info.type, x)
for x in value.split(',')]
[docs]def process_alt(header, ref, alt_str):
"""Process alternative value using Header in ``header``"""
# By its nature, this function contains a large number of case distinctions
if ']' in alt_str or '[' in alt_str:
return record.BreakEnd(record.BND, alt_str)
elif alt_str.startswith('<') and alt_str.endswith('>'):
inner = alt_str[1:-1]
if any([inner.startswith(code) for code in record.SV_CODES]):
return record.SV(record.SV, alt_str)
else:
return record.SymbolicAllele(record.SYMBOLIC, alt_str)
else: # substitution
if len(ref) == len(alt_str):
if len(ref) == 1:
return record.Substitution(record.SNV, alt_str)
else:
return record.Substitution(record.MNV, alt_str)
elif len(ref) > len(alt_str):
if len(alt_str) == 0:
raise exceptions.InvalidRecordException(
'Invalid VCF, empty ALT')
elif len(alt_str) == 1:
if ref[0] == alt_str[0]:
return record.Substitution(record.DEL, alt_str)
else:
return record.Substitution(record.INDEL, alt_str)
else:
return record.Substitution(record.INDEL, alt_str)
else: # len(ref) < len(alt_str):
if len(ref) == 0:
raise exceptions.InvalidRecordException(
'Invalid VCF, empty REF')
elif len(ref) == 1:
if ref[0] == alt_str[0]:
return record.Substitution(record.INS, alt_str)
else:
return record.Substitution(record.INDEL, alt_str)
else:
return record.Substitution(record.INDEL, alt_str)
[docs]class RecordParser:
"""Helper class for parsing VCF records"""
def __init__(self, header, samples):
#: Header with the meta information
self.header = header
#: SamplesInfos with sample information
self.samples = samples
# Expected number of fields
if self.samples.names:
self.expected_fields = 9 + len(self.samples.names)
else:
self.expected_fields = 8
# Cache of FieldInfo objects by FORMAT string
self._format_cache = {}
[docs] def parse_line(self, line_str):
"""Parse line from file (including trailing line break) and return
resulting Record
"""
line_str = line_str.rstrip()
if not line_str:
return None # empty line, EOF
arr = self._split_line(line_str)
# CHROM
chrom = arr[0]
# POS
pos = int(arr[1])
# IDS
if arr[2] == '.':
ids = []
else:
ids = arr[2].split(';')
# REF
ref = arr[3]
# ALT
alts = []
if arr[4] != '.':
for alt in arr[4].split(','):
alts.append(process_alt(self.header, ref, alt))
# QUAL
if arr[5] == '.':
qual = None
else:
try:
qual = int(arr[5])
except ValueError: # try as float
qual = float(arr[5])
# FILTER
if arr[6] == '.':
filt = []
else:
filt = arr[6].split(';')
# INFO
info = self._parse_info(arr[7])
if not self.samples.names:
format, calls = [], []
else:
# FORMAT
format_str = arr[8]
format = format_str.split(':')
if format_str not in self._format_cache:
self._format_cache[format_str] = list(map(
self.header.get_format_field_info,
format))
# per-sample calls
calls = [record.Call(sample, data) for sample, data in
zip(self.samples.names,
self._parse_calls_data(
format, self._format_cache[format_str], arr[9:]))]
return record.Record(
chrom, pos, ids, ref, alts, qual, filt, info, format, calls)
def _split_line(self, line_str):
"""Split line and check number of columns"""
arr = line_str.rstrip().split('\t')
if len(arr) != self.expected_fields:
raise exceptions.InvalidRecordException(
('The line contains an invalid number of fields. Was '
'{} but expected {}\n{}'.format(
len(arr), 9 + len(self.samples.names),
line_str)))
return arr
def _parse_info(self, info_str):
"""Parse INFO column from string"""
result = OrderedDict()
if info_str == '.':
return result
# The standard is very nice to parsers, we can simply split at
# semicolon characters, although I (Manuel) don't know how strict
# programs follow this
for entry in info_str.split(';'):
if '=' not in entry: # flag
result[entry] = parse_field_value(
entry, self.header.get_info_field_info(entry), True)
else:
key, value = entry.split('=', 1)
key = key.strip() # XXX lenient parsing
result[key] = parse_field_value(
key, self.header.get_info_field_info(key), value)
return result
def _parse_calls_data(self, format, infos, arr):
"""Parse genotype call information from arrays using format array
:param list format: List of strings with format names
:param list arr: List of strings with genotype information values
for each sample
"""
result = []
for entry in arr:
data = OrderedDict()
# The standard is very nice to parsers, we can simply split at
# colon characters, although I (Manuel) don't know how strict
# programs follow this
for key, info, value in zip(format, infos, entry.split(':')):
data[key] = parse_field_value(
key, info, value)
result.append(data)
return result
[docs]class Parser:
"""Class for line-wise parsing of VCF files
In most cases, you want to use :py:class:`vcfpy.reader.Reader` instead.
:param stream: ``file``-like object to read from
:param str path: path the VCF is parsed from, for display purposes
only, optional
"""
def __init__(self, stream, path=None):
self.stream = stream
self.path = path
#: header, once it has been read
self.header = None
# the currently read line
self._line = stream.readline() # trailing '\n'
#: :py:class:`vcfpy.header.SamplesInfos` with sample information;
#: set on parsing the header
self.samples = None
# helper for parsing the records
self._record_parser = None
def _read_next_line(self):
"""Read next line store in self._line and return old one"""
prev_line = self._line
self._line = self.stream.readline()
return prev_line
def _check_header_lines(self, header_lines):
"""Check header lines, in particular for starting file "##fileformat"
:raises: ``vcfpy.exceptions.InvalidHeaderException`` in the case of
problems reading the header
"""
if not header_lines:
raise exceptions.InvalidHeaderException(
'The VCF file did not contain any header lines!')
first = header_lines[0]
if first.key != 'fileformat':
raise exceptions.InvalidHeaderException(
'The VCF file did not start with ##fileformat')
if first.value not in SUPPORTED_VCF_VERSIONS:
print(('[vcfpy] WARNING: The VCF version {} is not known, '
'going on regardlessly').format(first.value),
file=sys.stderr)
[docs] def parse_line(self, line):
"""Pare the given line without reading another one from the stream"""
return self._record_parser.parse_line(line)
[docs] def parse_next_record(self):
"""Read, parse and return next :py:class:`vcfpy.record.Record`
:returns: next VCF record or ``None`` if at end
:raises: ``vcfpy.exceptions.InvalidRecordException`` in the case of
problems reading the record
"""
return self.parse_line(self._read_next_line())