Source code for vcfpy.parser

# -*- coding: utf-8 -*-
"""Parsing of VCF files from ``str``
"""

import ast
import functools
import math
import re
import warnings

from . import header
from . import record
from . import exceptions
from .exceptions import (
    CannotConvertValue, LeadingTrailingSpaceInKey, UnknownFilter,
    UnknownVCFVersion, SpaceInChromLine)


from .compat import OrderedDict

__author__ = 'Manuel Holtgrewe <manuel.holtgrewe@bihealth.de>'


# expected "#CHROM" header prefix when there are samples
REQUIRE_SAMPLE_HEADER = (
    '#CHROM', 'POS', 'ID', 'REF', 'ALT', 'QUAL', 'FILTER', 'INFO', 'FORMAT')
# expected "#CHROM" header prefix when there are no samples
REQUIRE_NO_SAMPLE_HEADER = (
    '#CHROM', 'POS', 'ID', 'REF', 'ALT', 'QUAL', 'FILTER', 'INFO')

#: Supported VCF versions, a warning will be issued otherwise
SUPPORTED_VCF_VERSIONS = (
    'VCFv4.0', 'VCFv4.1', 'VCFv4.2', 'VCFv4.3')


[docs]class QuotedStringSplitter: """Helper class for splitting quoted strings Has support for interpreting quoting strings but also brackets. Meant for splitting the VCF header line dicts """ #: state constant for normal NORMAL = 0 #: state constant for quoted QUOTED = 1 #: state constant for delimiter ESCAPED = 2 #: state constant for array ARRAY = 3 #: state constant for delimiter DELIM = 4 def __init__(self, delim=',', quote='"', brackets='[]'): #: string delimiter self.delim = delim #: quote character self.quote = quote #: two-character string with opening and closing brackets assert len(brackets) == 2 self.brackets = brackets
[docs] def run(self, s): """Split string ``s`` at delimiter, correctly interpreting quotes Further, interprets arrays wrapped in one level of ``[]``. No recursive brackets are interpreted (as this would make the grammar non-regular and currently this complexity is not needed). Currently, quoting inside of braces is not supported either. This is just to support the example from VCF v4.3. """ begins, ends = [0], [] # transition table DISPATCH = { self.NORMAL: self._handle_normal, self.QUOTED: self._handle_quoted, self.ARRAY: self._handle_array, self.DELIM: self._handle_delim, self.ESCAPED: self._handle_escaped, } # run state automaton state = self.NORMAL for pos, c in enumerate(s): state = DISPATCH[state](c, pos, begins, ends) ends.append(len(s)) assert len(begins) == len(ends) # Build resulting list return [s[start:end] for start, end in zip(begins, ends)]
def _handle_normal(self, c, pos, begins, ends): # pylint: disable=W0613 if c == self.delim: ends.append(pos) return self.DELIM elif c == self.quote: return self.QUOTED elif c == self.brackets[0]: return self.ARRAY else: return self.NORMAL def _handle_quoted(self, c, pos, begins, ends): # pylint: disable=W0613 if c == '\\': return self.ESCAPED elif c == self.quote: return self.NORMAL else: return self.QUOTED def _handle_array(self, c, pos, begins, ends): # pylint: disable=W0613 if c == self.brackets[1]: return self.NORMAL else: return self.ARRAY def _handle_delim(self, c, pos, begins, ends): # pylint: disable=W0613 begins.append(pos) return self.NORMAL def _handle_escaped(self, c, pos, begins, ends): # pylint: disable=W0613 return self.QUOTED
[docs]def split_quoted_string(s, delim=',', quote='"', brackets='[]'): return QuotedStringSplitter(delim, quote, brackets).run(s)
[docs]def split_mapping(pair_str): """Split the ``str`` in ``pair_str`` at ``'='`` Warn if key needs to be stripped """ orig_key, value = pair_str.split('=', 1) key = orig_key.strip() if key != orig_key: warnings.warn('Mapping key {} has leading or trailing space'.format( repr(orig_key)), LeadingTrailingSpaceInKey) return key, value
[docs]def parse_mapping(value): """Parse the given VCF header line mapping Such a mapping consists of "key=value" pairs, separated by commas and wrapped into angular brackets ("<...>"). Strings are usually quoted, for certain known keys, exceptions are made, depending on the tag key. this, however, only gets important when serializing. :raises: :py:class:`vcfpy.exceptions.InvalidHeaderException` if there was a problem parsing the file """ if not value.startswith('<') or not value.endswith('>'): raise exceptions.InvalidHeaderException( 'Header mapping value was not wrapped in angular brackets') # split the comma-separated list into pairs, ignoring commas in quotes pairs = split_quoted_string(value[1:-1], delim=',', quote='"') # split these pairs into key/value pairs, converting flags to mappings # to True key_values = [] for pair in pairs: if '=' in pair: key, value = split_mapping(pair) if value.startswith('"') and value.endswith('"'): value = ast.literal_eval(value) elif value.startswith('[') and value.endswith(']'): value = [v.strip() for v in value[1:-1].split(',')] else: key, value = pair, True key_values.append((key, value)) # return completely parsed mapping as OrderedDict return OrderedDict(key_values)
[docs]class HeaderLineParserBase: """Parse into appropriate HeaderLine"""
[docs] def parse_key_value(self, key, value): """Parse the key/value pair :param str key: the key to use in parsing :param str value: the value to parse :returns: :py:class:`vcfpy.header.HeaderLine` object """ raise NotImplementedError('Must be overridden')
[docs]class StupidHeaderLineParser(HeaderLineParserBase): """Parse into HeaderLine (no particular structure)"""
[docs] def parse_key_value(self, key, value): return header.HeaderLine(key, value)
[docs]class MappingHeaderLineParser(HeaderLineParserBase): """Parse into HeaderLine (no particular structure)""" def __init__(self, line_class): """Initialize the parser""" #: the class to use for the VCF header line self.line_class = line_class
[docs] def parse_key_value(self, key, value): return self.line_class(key, value, parse_mapping(value))
[docs]def build_header_parsers(): """Return mapping for parsers to use for each VCF header type Inject the WarningHelper into the parsers. """ result = { 'ALT': MappingHeaderLineParser(header.AltAlleleHeaderLine), 'contig': MappingHeaderLineParser(header.ContigHeaderLine), 'FILTER': MappingHeaderLineParser(header.FilterHeaderLine), 'FORMAT': MappingHeaderLineParser(header.FormatHeaderLine), 'INFO': MappingHeaderLineParser(header.InfoHeaderLine), 'META': MappingHeaderLineParser(header.MetaHeaderLine), 'PEDIGREE': MappingHeaderLineParser(header.PedigreeHeaderLine), 'SAMPLE': MappingHeaderLineParser(header.SampleHeaderLine), '__default__': StupidHeaderLineParser(), # fallback } return result
# Field value converters _CONVERTERS = { 'Integer': int, 'Float': float, 'Flag': lambda x: True, 'Character': str, 'String': str, }
[docs]def convert_field_value(type_, value): """Convert atomic field value according to the type""" if value == '.': return None elif type_ in ('Character', 'String'): if '%' in value: for k, v in record.UNESCAPE_MAPPING: value = value.replace(k, v) return value else: try: return _CONVERTERS[type_](value) except ValueError: warnings.warn( ('{} cannot be converted to {}, keeping as ' 'string.').format(value, type_), CannotConvertValue) return value
[docs]def parse_field_value(field_info, value): """Parse ``value`` according to ``field_info`` """ if field_info.id == 'FT': return [x for x in value.split(';') if x != '.'] elif field_info.type == 'Flag': return True elif field_info.number == 1: return convert_field_value(field_info.type, value) else: if value == '.': return [] else: return [convert_field_value(field_info.type, x) for x in value.split(',')]
# Regular expression for break-end BREAKEND_PATTERN = re.compile('[\[\]]')
[docs]def parse_breakend(alt_str): """Parse breakend and return tuple with results, parameters for BreakEnd constructor """ arr = BREAKEND_PATTERN.split(alt_str) mate_chrom, mate_pos = arr[1].split(':', 1) mate_pos = int(mate_pos) if mate_chrom[0] == '<': mate_chrom = mate_chrom[1:-1] within_main_assembly = False else: within_main_assembly = True FWD_REV = {True: record.FORWARD, False: record.REVERSE} orientation = FWD_REV[alt_str[0] == '[' or alt_str[0] == ']'] mate_orientation = FWD_REV['[' in alt_str] if orientation == record.FORWARD: sequence = arr[2] else: sequence = arr[0] return (mate_chrom, mate_pos, orientation, mate_orientation, sequence, within_main_assembly)
[docs]def process_sub_grow(ref, alt_str): """Process substution where the string grows""" if len(alt_str) == 0: raise exceptions.InvalidRecordException( 'Invalid VCF, empty ALT') elif len(alt_str) == 1: if ref[0] == alt_str[0]: return record.Substitution(record.DEL, alt_str) else: return record.Substitution(record.INDEL, alt_str) else: return record.Substitution(record.INDEL, alt_str)
[docs]def process_sub_shrink(ref, alt_str): """Process substution where the string shrink""" if len(ref) == 0: raise exceptions.InvalidRecordException( 'Invalid VCF, empty REF') elif len(ref) == 1: if ref[0] == alt_str[0]: return record.Substitution(record.INS, alt_str) else: return record.Substitution(record.INDEL, alt_str) else: return record.Substitution(record.INDEL, alt_str)
[docs]def process_sub(ref, alt_str): """Process substitution""" if len(ref) == len(alt_str): if len(ref) == 1: return record.Substitution(record.SNV, alt_str) else: return record.Substitution(record.MNV, alt_str) elif len(ref) > len(alt_str): return process_sub_grow(ref, alt_str) else: # len(ref) < len(alt_str): return process_sub_shrink(ref, alt_str)
[docs]def process_alt(header, ref, alt_str): # pylint: disable=W0613 """Process alternative value using Header in ``header``""" # By its nature, this function contains a large number of case distinctions if ']' in alt_str or '[' in alt_str: return record.BreakEnd(*parse_breakend(alt_str)) elif alt_str[0] == '.' and len(alt_str) > 0: return record.SingleBreakEnd(record.FORWARD, alt_str[1:]) elif alt_str[-1] == '.' and len(alt_str) > 0: return record.SingleBreakEnd(record.REVERSE, alt_str[:-1]) elif alt_str[0] == '<' and alt_str[-1] == '>': inner = alt_str[1:-1] return record.SymbolicAllele(inner) else: # substitution return process_sub(ref, alt_str)
[docs]class HeaderParser: """Helper class for parsing a VCF header """ def __init__(self): #: Sub parsers to use for parsing the header lines self.sub_parsers = build_header_parsers()
[docs] def parse_line(self, line): """Parse VCF header ``line`` (trailing '\r\n' or '\n' is ignored) :param str line: ``str`` with line to parse :param dict sub_parsers: ``dict`` mapping header line types to appropriate parser objects :returns: appropriate :py:class:`HeaderLine` parsed from ``line`` :raises: :py:class:`vcfpy.exceptions.InvalidHeaderException` if there was a problem parsing the file """ if not line or not line.startswith('##'): raise exceptions.InvalidHeaderException( 'Invalid VCF header line (must start with "##") {}'.format( line)) if '=' not in line: raise exceptions.InvalidHeaderException( 'Invalid VCF header line (must contain "=") {}'.format(line)) line = line[len('##'):].rstrip() # trim '^##' and trailing whitespace # split key/value pair at "=" key, value = split_mapping(line) sub_parser = self.sub_parsers.get(key, self.sub_parsers['__default__']) return sub_parser.parse_key_value(key, value)
[docs]class RecordParser: """Helper class for parsing VCF records""" def __init__(self, header, samples, record_checks=None): #: Header with the meta information self.header = header #: SamplesInfos with sample information self.samples = samples #: The checks to perform, can contain 'INFO' and 'FORMAT' self.record_checks = tuple(record_checks or []) # Expected number of fields if self.samples.names: self.expected_fields = 9 + len(self.samples.names) else: self.expected_fields = 8 # Cache of FieldInfo objects by FORMAT string self._format_cache = {} # Cache of FILTER entries, also applied to FORMAT/FT self._filter_ids = set(self.header.filter_ids()) # Helper for checking INFO fields if 'INFO' in self.record_checks: self._info_checker = InfoChecker(self.header) else: self._info_checker = NoopInfoChecker() # Helper for checking FORMAT fields if 'FORMAT' in self.record_checks: self._format_checker = FormatChecker(self.header) else: self._format_checker = NoopFormatChecker()
[docs] def parse_line(self, line_str): """Parse line from file (including trailing line break) and return resulting Record """ line_str = line_str.rstrip() if not line_str: return None # empty line, EOF arr = self._split_line(line_str) # CHROM chrom = arr[0] # POS pos = int(arr[1]) # IDS if arr[2] == '.': ids = [] else: ids = arr[2].split(';') # REF ref = arr[3] # ALT alts = [] if arr[4] != '.': for alt in arr[4].split(','): alts.append(process_alt(self.header, ref, alt)) # QUAL if arr[5] == '.': qual = None else: try: qual = int(arr[5]) except ValueError: # try as float qual = float(arr[5]) # FILTER if arr[6] == '.': filt = [] else: filt = arr[6].split(';') self._check_filters(filt, 'FILTER') # INFO info = self._parse_info(arr[7], len(alts)) # FORMAT format_ = arr[8].split(':') # sample/call columns calls = self._handle_calls(alts, format_, arr[8], arr) return record.Record( chrom, pos, ids, ref, alts, qual, filt, info, format_, calls)
def _handle_calls(self, alts, format_, format_str, arr): """Handle FORMAT and calls columns, factored out of parse_line""" if format_str not in self._format_cache: self._format_cache[format_str] = list(map( self.header.get_format_field_info, format_)) # per-sample calls calls = [] for sample, raw_data in zip(self.samples.names, arr[9:]): if self.samples.is_parsed(sample): data = self._parse_calls_data( format_, self._format_cache[format_str], raw_data) call = record.Call(sample, data) self._format_checker.run(call, len(alts)) self._check_filters( call.data.get('FT'), 'FORMAT/FT', call.sample) calls.append(call) else: calls.append(record.UnparsedCall(sample, raw_data)) return calls def _check_filters(self, filt, source, sample=None): if not filt: return for f in filt: self._check_filter(f, source, sample) def _check_filter(self, f, source, sample): if f not in self._filter_ids: if source == 'FILTER': warnings.warn( ('Filter not found in header: {}; problem in FILTER ' 'column').format(f), UnknownFilter) else: assert source == 'FORMAT/FT' and sample warnings.warn( ('Filter not found in header: {}; problem in ' 'FORMAT/FT column of sample {}').format(f, sample), UnknownFilter) def _split_line(self, line_str): """Split line and check number of columns""" arr = line_str.rstrip().split('\t') if len(arr) != self.expected_fields: raise exceptions.InvalidRecordException( ('The line contains an invalid number of fields. Was ' '{} but expected {}\n{}'.format( len(arr), 9 + len(self.samples.names), line_str))) return arr def _parse_info(self, info_str, num_alts): """Parse INFO column from string""" result = OrderedDict() if info_str == '.': return result # The standard is very nice to parsers, we can simply split at # semicolon characters, although I (Manuel) don't know how strict # programs follow this for entry in info_str.split(';'): if '=' not in entry: # flag key = entry result[key] = parse_field_value( self.header.get_info_field_info(key), True) else: key, value = split_mapping(entry) result[key] = parse_field_value( self.header.get_info_field_info(key), value) self._info_checker.run(key, result[key], num_alts) return result @classmethod def _parse_calls_data(klass, format_, infos, gt_str): """Parse genotype call information from arrays using format array :param list format: List of strings with format names :param gt_str arr: string with genotype information values """ data = OrderedDict() # The standard is very nice to parsers, we can simply split at # colon characters, although I (Manuel) don't know how strict # programs follow this for key, info, value in zip(format_, infos, gt_str.split(':')): data[key] = parse_field_value(info, value) return data
[docs]class HeaderChecker: """Helper class for checking a VCF header """
[docs] def run(self, header): """Check the header Warnings will be printed using ``warnings`` while errors will raise an exception. :raises: ``vcfpy.exceptions.InvalidHeaderException`` in the case of severe errors reading the header """ self._check_header_lines(header.lines)
def _check_header_lines(self, header_lines): """Check header lines, in particular for starting file "##fileformat" """ if not header_lines: raise exceptions.InvalidHeaderException( 'The VCF file did not contain any header lines!') first = header_lines[0] if first.key != 'fileformat': raise exceptions.InvalidHeaderException( 'The VCF file did not start with ##fileformat') if first.value not in SUPPORTED_VCF_VERSIONS: warnings.warn('Unknown VCF version {}'.format(first.value), UnknownVCFVersion)
[docs]@functools.lru_cache(maxsize=32) def binomial(n, k): try: res = math.factorial(n) // math.factorial(k) // math.factorial(n - k) except ValueError: res = 0 return res
[docs]class NoopInfoChecker: """Helper class that performs no checks""" def __init__(self): pass
[docs] def run(self, key, value, num_alts): pass
[docs]class InfoChecker: """Helper class for checking an INFO field""" def __init__(self, header): #: VCFHeader to use for checking self.header = header
[docs] def run(self, key, value, num_alts): """Check value in INFO[key] of record Currently, only checks for consistent counts are implemented :param str key: key of INFO entry to check :param value: value to check :param int alts: list of alternative alleles, for length """ field_info = self.header.get_info_field_info(key) if not isinstance(value, list): return TABLE = { '.': len(value), 'A': num_alts, 'R': num_alts + 1, 'G': binomial(num_alts + 1, 2), # diploid only at the moment } expected = TABLE.get(field_info.number, field_info.number) if len(value) != expected: tpl = 'Number of elements for INFO field {} is {} instead of {}' warnings.warn( tpl.format(key, len(value), field_info.number), IncorrectListLength)
[docs]class NoopFormatChecker: """Helper class that performs no checks""" def __init__(self): pass
[docs] def run(self, call, num_alts): pass
[docs]class FormatChecker: """Helper class for checking a FORMAT field""" def __init__(self, header): #: VCFHeader to use for checking self.header = header
[docs] def run(self, call, num_alts): """Check ``FORMAT`` of a record.Call Currently, only checks for consistent counts are implemented """ for key, value in call.data.items(): self._check_count(call, key, value, num_alts)
def _check_count(self, call, key, value, num_alts): field_info = self.header.get_format_field_info(key) if isinstance(value, list): return num_alleles = len(call.gt_alleles or []) TABLE = { '.': len(value), 'A': num_alts, 'R': num_alts + 1, 'G': binomial(num_alts + num_alleles, num_alleles), } expected = TABLE.get(field_info.number, field_info.number) if len(value) != expected: tpl = ('Number of elements for FORMAT field {} is {} instead ' 'of {} (number specifier {})') warnings.warn( tpl.format(key, len(value), expected, field_info.number), IncorrectListLength)
[docs]class Parser: """Class for line-wise parsing of VCF files In most cases, you want to use :py:class:`vcfpy.reader.Reader` instead. :param stream: ``file``-like object to read from :param str path: path the VCF is parsed from, for display purposes only, optional """ def __init__(self, stream, path=None, record_checks=None): self.stream = stream self.path = path #: checks to perform, can contain 'INFO' and 'FORMAT' self.record_checks = tuple(record_checks or []) #: header, once it has been read self.header = None # the currently read line self._line = stream.readline() # trailing '\n' #: :py:class:`vcfpy.header.SamplesInfos` with sample information; #: set on parsing the header self.samples = None # helper for parsing the records self._record_parser = None # helper for checking the header self._header_checker = HeaderChecker() def _read_next_line(self): """Read next line store in self._line and return old one""" prev_line = self._line self._line = self.stream.readline() return prev_line
[docs] def parse_header(self, parsed_samples=None): """Read and parse :py:class:`vcfpy.header.Header` from file, set into ``self.header`` and return it :param list parsed_samples: ``list`` of ``str`` for subsetting the samples to parse :returns: ``vcfpy.header.Header`` :raises: ``vcfpy.exceptions.InvalidHeaderException`` in the case of problems reading the header """ # parse header lines sub_parser = HeaderParser() header_lines = [] while self._line and self._line.startswith('##'): header_lines.append(sub_parser.parse_line(self._line)) self._read_next_line() # parse sample info line self.samples = self._handle_sample_line(parsed_samples) # construct Header object self.header = header.Header(header_lines, self.samples) # check header for consistency self._header_checker.run(self.header) # construct record parser self._record_parser = RecordParser( self.header, self.samples, self.record_checks) # read next line, must not be header self._read_next_line() if self._line and self._line.startswith('#'): raise exceptions.IncorrectVCFFormat( 'Expecting non-header line or EOF after "#CHROM" line') return self.header
def _handle_sample_line(self, parsed_samples=None): """"Check and interpret the "##CHROM" line and return samples""" if not self._line or not self._line.startswith('#CHROM'): raise exceptions.IncorrectVCFFormat( 'Missing line starting with "#CHROM"') # check for space before INFO line = self._line.rstrip() pos = line.find('FORMAT') if ('FORMAT' in line) else line.find('INFO') if pos == -1: raise exceptions.IncorrectVCFFormat( 'Ill-formatted line starting with "#CHROM"') if ' ' in line[:pos]: warnings.warn( ('Found space in #CHROM line, splitting at whitespace ' 'instead of tab; this VCF file is ill-formatted'), SpaceInChromLine) arr = self._line.rstrip().split() else: arr = self._line.rstrip().split('\t') self._check_samples_line(arr) return header.SamplesInfos( arr[len(REQUIRE_SAMPLE_HEADER):], parsed_samples) @classmethod def _check_samples_line(klass, arr): """Peform additional check on samples line""" if len(arr) <= len(REQUIRE_NO_SAMPLE_HEADER): if tuple(arr) != REQUIRE_NO_SAMPLE_HEADER: raise exceptions.IncorrectVCFFormat( 'Sample header line indicates no sample but does not ' 'equal required prefix {}'.format( '\t'.join(REQUIRE_NO_SAMPLE_HEADER))) elif tuple(arr[:len(REQUIRE_SAMPLE_HEADER)]) != REQUIRE_SAMPLE_HEADER: raise exceptions.IncorrectVCFFormat( 'Sample header line (starting with "#CHROM") does not ' 'start with required prefix {}'.format( '\t'.join(REQUIRE_SAMPLE_HEADER)))
[docs] def parse_line(self, line): """Pare the given line without reading another one from the stream""" return self._record_parser.parse_line(line)
[docs] def parse_next_record(self): """Read, parse and return next :py:class:`vcfpy.record.Record` :returns: next VCF record or ``None`` if at end :raises: ``vcfpy.exceptions.InvalidRecordException`` in the case of problems reading the record """ return self.parse_line(self._read_next_line())
[docs] def print_warn_summary(self): """If there were any warnings, print summary with warnings"""
# TODO: remove?