# -*- coding: utf-8 -*-
"""Parsing of VCF files from ``str``"""
import ast
import functools
import io
import math
import pathlib
import re
import warnings
from typing import Any, Callable, Iterable, Literal, cast
from vcfpy import exceptions, header, record
__author__ = "Manuel Holtgrewe <manuel.holtgrewe@bihealth.de>"
# expected "#CHROM" header prefix when there are samples
REQUIRE_SAMPLE_HEADER = ("#CHROM", "POS", "ID", "REF", "ALT", "QUAL", "FILTER", "INFO", "FORMAT")
# expected "#CHROM" header prefix when there are no samples
REQUIRE_NO_SAMPLE_HEADER = ("#CHROM", "POS", "ID", "REF", "ALT", "QUAL", "FILTER", "INFO")
#: Supported VCF versions, a warning will be issued otherwise
SUPPORTED_VCF_VERSIONS = ("VCFv4.0", "VCFv4.1", "VCFv4.2", "VCFv4.3")
class QuotedStringSplitter:
"""Helper class for splitting quoted strings
Has support for interpreting quoting strings but also brackets. Meant
for splitting the VCF header line dicts
"""
#: state constant for normal
NORMAL = 0
#: state constant for quoted
QUOTED = 1
#: state constant for delimiter
ESCAPED = 2
#: state constant for array
ARRAY = 3
#: state constant for delimiter
DELIM = 4
def __init__(self, delim: str = ",", quote: str = '"', brackets: str = "[]"):
#: string delimiter
self.delim = delim
#: quote character
self.quote = quote
#: two-character string with opening and closing brackets
assert len(brackets) == 2
self.brackets = brackets
def run(self, s: str) -> list[str]:
"""Split string ``s`` at delimiter, correctly interpreting quotes
Further, interprets arrays wrapped in one level of ``[]``. No
recursive brackets are interpreted (as this would make the grammar
non-regular and currently this complexity is not needed). Currently,
quoting inside of braces is not supported either. This is just to
support the example from VCF v4.3.
"""
begins: list[int] = [0]
ends: list[int] = []
# transition table
DISPATCH: dict[Literal[0, 1, 2, 3, 4], Callable[[str, int, list[int], list[int]], Literal[0, 1, 2, 3, 4]]] = {
self.NORMAL: self._handle_normal,
self.QUOTED: self._handle_quoted,
self.ARRAY: self._handle_array,
self.DELIM: self._handle_delim,
self.ESCAPED: self._handle_escaped,
}
# run state automaton
state: Literal[0, 1, 2, 3, 4] = self.NORMAL
for pos, c in enumerate(s):
state = DISPATCH[state](c, pos, begins, ends)
ends.append(len(s))
assert len(begins) == len(ends)
# Build resulting list
return [s[start:end] for start, end in zip(begins, ends, strict=False)]
def _handle_normal(
self, c: str, pos: int, begins: list[int], ends: list[int]
) -> Literal[0, 1, 2, 3, 4]: # pylint: disable=W0613
if c == self.delim:
ends.append(pos)
return self.DELIM
elif c == self.quote:
return self.QUOTED
elif c == self.brackets[0]:
return self.ARRAY
else:
return self.NORMAL
def _handle_quoted(
self, c: str, pos: int, begins: list[int], ends: list[int]
) -> Literal[0, 1, 2, 3, 4]: # pylint: disable=W0613
if c == "\\":
return self.ESCAPED
elif c == self.quote:
return self.NORMAL
else:
return self.QUOTED
def _handle_array(
self, c: str, pos: int, begins: list[int], ends: list[int]
) -> Literal[0, 1, 2, 3, 4]: # pylint: disable=W0613
if c == self.brackets[1]:
return self.NORMAL
else:
return self.ARRAY
def _handle_delim(
self, c: str, pos: int, begins: list[int], ends: list[int]
) -> Literal[0, 1, 2, 3, 4]: # pylint: disable=W0613
begins.append(pos)
return self.NORMAL
def _handle_escaped(
self, c: str, pos: int, begins: list[int], ends: list[int]
) -> Literal[0, 1, 2, 3, 4]: # pylint: disable=W0613
return self.QUOTED
def split_quoted_string(s: str, delim: str = ",", quote: str = '"', brackets: str = "[]") -> list[str]:
return QuotedStringSplitter(delim, quote, brackets).run(s)
def split_mapping(pair_str: str) -> tuple[str, str]:
"""Split the ``str`` in ``pair_str`` at ``'='``
Warn if key needs to be stripped
"""
orig_key, value = pair_str.split("=", 1)
key = orig_key.strip()
if key != orig_key:
warnings.warn(
"Mapping key {} has leading or trailing space".format(repr(orig_key)),
exceptions.LeadingTrailingSpaceInKey,
)
return key, value
def parse_mapping(value: str) -> dict[str, bool | str | list[str]]:
"""Parse the given VCF header line mapping
Such a mapping consists of "key=value" pairs, separated by commas and
wrapped into angular brackets ("<...>"). Strings are usually quoted,
for certain known keys, exceptions are made, depending on the tag key.
this, however, only gets important when serializing.
:raises: :py:class:`vcfpy.exceptions.InvalidHeaderException` if
there was a problem parsing the file
"""
if not value.startswith("<") or not value.endswith(">"):
raise exceptions.InvalidHeaderException("Header mapping value was not wrapped in angular brackets")
# split the comma-separated list into pairs, ignoring commas in quotes
pairs = split_quoted_string(value[1:-1], delim=",", quote='"')
# split these pairs into key/value pairs, converting flags to mappings
# to True
key_values: list[tuple[str, bool | str | list[str]]] = []
for pair in pairs:
value_: bool | str | list[str]
if "=" in pair:
key, value = split_mapping(pair)
if value.startswith('"') and value.endswith('"'):
value_ = ast.literal_eval(value)
elif value.startswith("[") and value.endswith("]"):
value_ = [v.strip() for v in value[1:-1].split(",")]
else:
value_ = value
else:
key, value_ = pair, True
key_values.append((key, value_))
# return completely parsed mapping as OrderedDict
return dict(key_values)
class HeaderLineParserBase:
"""Parse into appropriate HeaderLine"""
def parse_key_value(self, key: str, value: str) -> header.HeaderLine:
"""Parse the key/value pair
:param str key: the key to use in parsing
:param str value: the value to parse
:returns: :py:class:`vcfpy.header.HeaderLine` object
"""
raise NotImplementedError("Must be overridden")
class StupidHeaderLineParser(HeaderLineParserBase):
"""Parse into HeaderLine (no particular structure)"""
def parse_key_value(self, key: str, value: str) -> header.HeaderLine:
return header.HeaderLine(key, value)
class MappingHeaderLineParser(HeaderLineParserBase):
"""Parse into HeaderLine (no particular structure)"""
def __init__(self, line_class: Callable[[str, str, dict[str, bool | str | list[str]]], header.HeaderLine]):
"""Initialize the parser"""
#: the class to use for the VCF header line
self.line_class = line_class
def parse_key_value(self, key: str, value: str) -> header.HeaderLine:
return self.line_class(key, value, parse_mapping(value))
def build_header_parsers() -> dict[str, HeaderLineParserBase]:
"""Return mapping for parsers to use for each VCF header type
Inject the WarningHelper into the parsers.
"""
result: dict[str, HeaderLineParserBase] = {
"ALT": MappingHeaderLineParser(header.AltAlleleHeaderLine),
"contig": MappingHeaderLineParser(header.ContigHeaderLine),
"FILTER": MappingHeaderLineParser(header.FilterHeaderLine),
"FORMAT": MappingHeaderLineParser(header.FormatHeaderLine),
"INFO": MappingHeaderLineParser(header.InfoHeaderLine),
"META": MappingHeaderLineParser(header.MetaHeaderLine),
"PEDIGREE": MappingHeaderLineParser(header.PedigreeHeaderLine),
"SAMPLE": MappingHeaderLineParser(header.SampleHeaderLine),
"__default__": StupidHeaderLineParser(), # fallback
}
return result
# Field value converters
_CONVERTERS: dict[
Literal["Integer", "Float", "Flag", "Character", "String"], Callable[[str], bool | int | float | str]
] = {
"Integer": int,
"Float": float,
"Flag": lambda x: True,
"Character": str,
"String": str,
}
[docs]
def convert_field_value(
type_: Literal["Integer", "Float", "Flag", "Character", "String"], value: str
) -> bool | int | float | str | None:
"""Convert atomic field value according to the type"""
if value == ".":
return None
elif type_ in ("Character", "String"):
if "%" in value:
for k, v in record.UNESCAPE_MAPPING:
value = value.replace(k, v)
return value
else:
try:
return _CONVERTERS[type_](value)
except ValueError: # pragma: no cover
warnings.warn(
("{} cannot be converted to {}, keeping as string.").format(value, type_),
exceptions.CannotConvertValue,
)
return value
[docs]
def parse_field_value(
field_info: header.FieldInfo, value: str | bool
) -> bool | int | float | str | list[bool | int | float | str | None] | None:
"""Parse ``value`` according to ``field_info``"""
if isinstance(value, bool) or field_info.type == "Flag":
return True
elif field_info.id in ("FORMAT/FT", "FT"):
return [x for x in value.split(";") if x != "."]
elif field_info.number == 1:
return convert_field_value(field_info.type, value)
else:
if value == ".":
return []
else:
return [convert_field_value(field_info.type, x) for x in value.split(",")]
# Regular expression for break-end
BREAKEND_PATTERN = re.compile(r"[\[\]]")
def parse_breakend(alt_str: str) -> tuple[str, int, str, Literal["+", "-"], str, bool]:
"""Parse breakend and return tuple with results, parameters for BreakEnd
constructor
"""
arr = BREAKEND_PATTERN.split(alt_str)
assert isinstance(arr[1], str)
mate_chrom, mate_pos = arr[1].split(":", 1)
mate_pos = int(mate_pos)
if mate_chrom[0] == "<":
mate_chrom = mate_chrom[1:-1]
within_main_assembly = False
else:
within_main_assembly = True
FWD_REV: dict[bool, Literal["+", "-"]] = {True: record.FORWARD, False: record.REVERSE}
orientation = FWD_REV[alt_str[0] == "[" or alt_str[0] == "]"]
mate_orientation = FWD_REV["[" in alt_str]
assert isinstance(arr[2], str) and isinstance(arr[0], str)
if orientation == record.FORWARD:
sequence = arr[2]
else:
sequence = arr[0]
return (mate_chrom, mate_pos, orientation, mate_orientation, sequence, within_main_assembly)
def process_sub_grow(ref: str, alt_str: str) -> record.Substitution:
"""Process substution where the string grows"""
if len(alt_str) == 0:
raise exceptions.InvalidRecordException("Invalid VCF, empty ALT")
elif len(alt_str) == 1:
if ref[0] == alt_str[0]:
return record.Substitution(record.DEL, alt_str)
else:
return record.Substitution(record.INDEL, alt_str)
else:
return record.Substitution(record.INDEL, alt_str)
def process_sub_shrink(ref: str, alt_str: str) -> record.Substitution:
"""Process substution where the string shrink"""
if len(ref) == 0: # pragma: no cover
raise exceptions.InvalidRecordException("Invalid VCF, empty REF")
elif len(ref) == 1:
if ref[0] == alt_str[0]:
return record.Substitution(record.INS, alt_str)
else:
return record.Substitution(record.INDEL, alt_str)
else:
return record.Substitution(record.INDEL, alt_str)
def process_sub(ref: str, alt_str: str) -> record.Substitution:
"""Process substitution"""
if len(ref) == len(alt_str):
if len(ref) == 1:
return record.Substitution(record.SNV, alt_str)
else:
return record.Substitution(record.MNV, alt_str)
elif len(ref) > len(alt_str):
return process_sub_grow(ref, alt_str)
else: # len(ref) < len(alt_str):
return process_sub_shrink(ref, alt_str)
def process_alt(header: header.Header, ref: str, alt_str: str) -> record.AltRecord:
"""Process alternative value using Header in ``header``"""
# By its nature, this function contains a large number of case distinctions
if "]" in alt_str or "[" in alt_str:
return record.BreakEnd(*parse_breakend(alt_str))
elif alt_str[0] == "." and len(alt_str) > 0:
return record.SingleBreakEnd(record.FORWARD, alt_str[1:])
elif alt_str[-1] == "." and len(alt_str) > 0:
return record.SingleBreakEnd(record.REVERSE, alt_str[:-1])
elif alt_str[0] == "<" and alt_str[-1] == ">":
inner = alt_str[1:-1]
return record.SymbolicAllele(inner)
else: # substitution
return process_sub(ref, alt_str)
[docs]
class RecordParser:
"""Helper class for parsing VCF records"""
def __init__(
self,
header: header.Header,
samples: header.SamplesInfos,
record_checks: Iterable[Literal["FORMAT", "INFO"]] | None = None,
):
#: Header with the meta information
self.header = header
#: SamplesInfos with sample information
self.samples = samples
#: The checks to perform, can contain 'INFO' and 'FORMAT'
self.record_checks = tuple(record_checks or [])
# Expected number of fields
if self.samples.names:
self.expected_fields = 9 + len(self.samples.names)
else:
self.expected_fields = 8
# Cache of FieldInfo objects by FORMAT string
self._format_cache: dict[str, list["header.FieldInfo"]] = {}
# Cache of FILTER entries, also applied to FORMAT/FT
self._filter_ids = set(self.header.filter_ids())
# Helper for checking INFO fields
if "INFO" in self.record_checks:
self._info_checker = InfoChecker(self.header)
else:
self._info_checker = NoopInfoChecker()
# Helper for checking FORMAT fields
if "FORMAT" in self.record_checks:
self._format_checker = FormatChecker(self.header)
else:
self._format_checker = NoopFormatChecker()
[docs]
def parse_line(self, line_str: str) -> "record.Record | None":
"""Parse line from file (including trailing line break) and return
resulting Record
"""
line_str = line_str.rstrip()
if not line_str:
return None # empty line, EOF
arr = self._split_line(line_str)
# CHROM
chrom = arr[0]
# POS
pos = int(arr[1])
# IDS
if arr[2] == ".":
ids = []
else:
ids = arr[2].split(";")
# REF
ref = arr[3]
# ALT
alts: list[record.AltRecord] = []
if arr[4] != ".":
for alt in arr[4].split(","):
alts.append(process_alt(self.header, ref, alt))
# QUAL
if arr[5] == ".":
qual = None
else:
try:
qual = int(arr[5])
except ValueError: # try as float # pragma: no cover
qual = float(arr[5])
# FILTER
if arr[6] == ".":
filt = []
else:
filt = arr[6].split(";")
self._check_filters(filt, "FILTER")
# INFO
info = self._parse_info(arr[7], len(alts))
if len(arr) == 9:
raise exceptions.IncorrectVCFFormat("Expected 8 or 10+ columns, got 9!") # pragma: no cover
elif len(arr) == 8:
format_ = None
calls = None
else:
# FORMAT
format_ = arr[8].split(":")
# sample/call columns
calls = self._handle_calls(alts, format_, arr[8], arr)
return record.Record(chrom, pos, ids, ref, alts, qual, filt, info, format_, calls)
def _handle_calls(
self, alts: list[record.AltRecord], format_: list[str], format_str: str, arr: list[str]
) -> list["record.Call | record.UnparsedCall"]:
"""Handle FORMAT and calls columns, factored out of parse_line"""
if format_str not in self._format_cache:
self._format_cache[format_str] = list(map(self.header.get_format_field_info, format_))
# per-sample calls
calls: list["record.Call | record.UnparsedCall"] = []
for sample, raw_data in zip(self.samples.names, arr[9:], strict=False):
if self.samples.is_parsed(sample):
data = self._parse_calls_data(format_, self._format_cache[format_str], raw_data)
call = record.Call(sample, data)
self._format_checker.run(call, len(alts))
ft_value = call.data.get("FT") or []
if not isinstance(ft_value, list): # pragma: no cover
raise ValueError("FORMAT/FT field must be a list of strings but was {}".format(repr(ft_value)))
ft_value_ = cast(list[str], ft_value)
if not all(isinstance(x, str) for x in ft_value_): # pragma: no cover
raise ValueError("FORMAT/FT field must be a list of strings but was {}".format(repr(ft_value_)))
self._check_filters(ft_value_, "FORMAT/FT", call.sample)
calls.append(call)
else:
calls.append(record.UnparsedCall(sample, raw_data))
return calls
def _check_filters(self, filt: list[str], source: str, sample: str | None = None):
if not filt:
return
for f in filt:
self._check_filter(f, source, sample)
def _check_filter(self, f: str, source: str, sample: str | None):
if f == "PASS":
pass # the PASS filter is implicitely defined
elif f not in self._filter_ids: # pragma: no cover
if source == "FILTER":
warnings.warn(
("Filter not found in header: {}; problem in FILTER column").format(f),
exceptions.UnknownFilter,
)
else:
assert source == "FORMAT/FT" and sample
warnings.warn(
("Filter not found in header: {}; problem in FORMAT/FT column of sample {}").format(f, sample),
exceptions.UnknownFilter,
)
def _split_line(self, line_str: str) -> list[str]:
"""Split line and check number of columns"""
arr = line_str.rstrip().split("\t")
if len(arr) != self.expected_fields:
raise exceptions.InvalidRecordException(
"The line contains an invalid number of fields. Was {} but expected {}\n{}".format(
len(arr), self.expected_fields, line_str
)
)
return arr
def _parse_info(self, info_str: str, num_alts: int) -> dict[str, Any]:
"""Parse INFO column from string"""
result: dict[str, Any] = {}
if info_str == ".":
return result
# The standard is very nice to parsers, we can simply split at
# semicolon characters, although I (Manuel) don't know how strict
# programs follow this
for entry in info_str.split(";"):
if "=" not in entry: # flag
key = entry
result[key] = parse_field_value(self.header.get_info_field_info(key), True)
else:
key, value = split_mapping(entry)
result[key] = parse_field_value(self.header.get_info_field_info(key), value)
self._info_checker.run(key, result[key], num_alts)
return result
@classmethod
def _parse_calls_data(
cls, format_: list[str], infos: list["header.FieldInfo"], gt_str: str
) -> dict[str, bool | int | float | str | list[bool | int | float | str | None] | None]:
"""Parse genotype call information from arrays using format array
:param list format: List of strings with format names
:param gt_str arr: string with genotype information values
"""
data: dict[str, bool | int | float | str | list[bool | int | float | str | None] | None] = {}
# The standard is very nice to parsers, we can simply split at
# colon characters, although I (Manuel) don't know how strict
# programs follow this
for key, info, value in zip(format_, infos, gt_str.split(":"), strict=False):
data[key] = parse_field_value(info, value)
return data
class HeaderChecker:
"""Helper class for checking a VCF header"""
def run(self, header: header.Header) -> None:
"""Check the header
Warnings will be printed using ``warnings`` while errors will raise
an exception.
:raises: ``vcfpy.exceptions.InvalidHeaderException`` in the case of
severe errors reading the header
"""
self._check_header_lines(header.lines)
def _check_header_lines(self, header_lines: list[header.HeaderLine]) -> None:
"""Check header lines, in particular for starting file "##fileformat" """
if not header_lines:
raise exceptions.InvalidHeaderException(
"The VCF file did not contain any header lines!"
) # pragma: no cover
first = header_lines[0]
if first.key != "fileformat":
raise exceptions.InvalidHeaderException("The VCF file did not start with ##fileformat")
if first.value not in SUPPORTED_VCF_VERSIONS:
warnings.warn("Unknown VCF version {}".format(first.value), exceptions.UnknownVCFVersion)
@functools.lru_cache(maxsize=32)
def binomial(n: int, k: int):
try:
res = math.factorial(n) // math.factorial(k) // math.factorial(n - k)
except ValueError:
res = 0
return res
class AbstractInfoChecker:
"""Abstract base class for INFO field checkers"""
def run(self, key: str, value: str, num_alts: int) -> None:
"""Run the checker"""
raise NotImplementedError # pragma: no cover
class NoopInfoChecker(AbstractInfoChecker):
"""Helper class that performs no checks"""
def run(self, key: str, value: str, num_alts: int) -> None:
pass
class InfoChecker(AbstractInfoChecker):
"""Helper class for checking an INFO field"""
def __init__(self, header: header.Header):
#: VCFHeader to use for checking
self.header = header
def run(self, key: str, value: str, num_alts: int) -> None:
"""Check value in INFO[key] of record
Currently, only checks for consistent counts are implemented
:param str key: key of INFO entry to check
:param value: value to check
:param int alts: list of alternative alleles, for length
"""
field_info = self.header.get_info_field_info(key)
if not isinstance(value, list):
return
TABLE = {
".": len(value),
"A": num_alts,
"R": num_alts + 1,
"G": binomial(num_alts + 1, 2), # diploid only at the moment
}
expected = TABLE.get(str(field_info.number), field_info.number)
if len(value) != expected:
tpl = "Number of elements for INFO field {} is {} instead of {}"
warnings.warn(tpl.format(key, len(value), field_info.number), exceptions.IncorrectListLength)
class AbstractNoopFormatChecker:
"""Abstract base class for FORMAT field checkers"""
def run(self, call: "record.Call", num_alts: int) -> None:
raise NotImplementedError # pragma: no cover
class NoopFormatChecker(AbstractNoopFormatChecker):
"""Helper class that performs no checks"""
def run(self, call: "record.Call", num_alts: int) -> None:
pass
class FormatChecker(AbstractNoopFormatChecker):
"""Helper class for checking a FORMAT field"""
def __init__(self, header: header.Header):
#: VCFHeader to use for checking
self.header = header
def run(self, call: "record.Call", num_alts: int) -> None:
"""Check ``FORMAT`` of a record.Call
Currently, only checks for consistent counts are implemented
"""
for key, value in call.data.items():
self._check_count(call, key, value, num_alts)
def _check_count(self, call: "record.Call", key: str, value: str, num_alts: int) -> None:
field_info = self.header.get_format_field_info(key)
if field_info.id == "GT":
return
if isinstance(value, list):
return
num_alleles = len(call.gt_alleles or [])
TABLE = {
".": len(value),
"A": num_alts,
"R": num_alts + 1,
"G": binomial(num_alts + num_alleles, num_alleles),
}
expected = TABLE.get(str(field_info.number), field_info.number)
if len(value) != expected:
tpl = "Number of elements for FORMAT field {} is {} instead of {} (number specifier {})"
warnings.warn(
tpl.format(key, len(value), expected, field_info.number),
exceptions.IncorrectListLength,
)
[docs]
class Parser:
"""Class for line-wise parsing of VCF files
In most cases, you want to use :py:class:`vcfpy.reader.Reader` instead.
:param stream: ``file``-like object to read from
:param str path: path the VCF is parsed from, for display purposes
only, optional
"""
def __init__(
self,
stream: "io.TextIOWrapper",
path: pathlib.Path | str | None = None,
record_checks: Iterable[Literal["FORMAT", "INFO"]] | None = None,
):
self.stream = stream
self.path = None if path is None else str(path)
#: checks to perform, can contain 'INFO' and 'FORMAT'
self.record_checks = tuple(record_checks or []) or None
#: header, once it has been read
self.header = None
# the currently read line
self._line = stream.readline() # trailing '\n'
#: :py:class:`vcfpy.header.SamplesInfos` with sample information;
#: set on parsing the header
self.samples = None
# helper for parsing the records
self._record_parser = None
# helper for checking the header
self._header_checker = HeaderChecker()
def _read_next_line(self):
"""Read next line store in self._line and return old one"""
prev_line = self._line
self._line = self.stream.readline()
return prev_line
def _handle_sample_line(self, parsed_samples: list[str] | None = None):
""" "Check and interpret the "##CHROM" line and return samples"""
if not self._line or not self._line.startswith("#CHROM"): # pragma: no cover
raise exceptions.IncorrectVCFFormat('Missing line starting with "#CHROM"')
# check for space before INFO
line = self._line.rstrip()
pos = line.find("FORMAT") if ("FORMAT" in line) else line.find("INFO")
if pos == -1: # pragma: no cover
raise exceptions.IncorrectVCFFormat('Ill-formatted line starting with "#CHROM"')
if " " in line[:pos]:
warnings.warn(
"Found space in #CHROM line, splitting at whitespace instead of tab; this VCF file is ill-formatted",
exceptions.SpaceInChromLine,
)
arr = self._line.rstrip().split()
else:
arr = self._line.rstrip().split("\t")
self._check_samples_line(arr)
return header.SamplesInfos(arr[len(REQUIRE_SAMPLE_HEADER) :], parsed_samples)
@classmethod
def _check_samples_line(cls, arr: list[str]):
"""Peform additional check on samples line"""
if len(arr) <= len(REQUIRE_NO_SAMPLE_HEADER):
if tuple(arr) != REQUIRE_NO_SAMPLE_HEADER:
raise exceptions.IncorrectVCFFormat( # pragma: no cover
"Sample header line indicates no sample but does not equal required prefix {}".format(
"\t".join(REQUIRE_NO_SAMPLE_HEADER)
)
)
elif tuple(arr[: len(REQUIRE_SAMPLE_HEADER)]) != REQUIRE_SAMPLE_HEADER:
raise exceptions.IncorrectVCFFormat( # pragma: no cover
'Sample header line (starting with "#CHROM") does not start with required prefix {}'.format(
"\t".join(REQUIRE_SAMPLE_HEADER)
)
)
[docs]
def parse_line(self, line: str):
"""Parse the given line without reading another one from the stream"""
if self._record_parser is None:
raise exceptions.InvalidRecordException("Cannot parse record before parsing header") # pragma: no cover
return self._record_parser.parse_line(line)
[docs]
def parse_next_record(self):
"""Read, parse and return next :py:class:`vcfpy.record.Record`
:returns: next VCF record or ``None`` if at end
:raises: ``vcfpy.exceptions.InvalidRecordException`` in the case of
problems reading the record
"""
return self.parse_line(self._read_next_line())
[docs]
def print_warn_summary(self):
"""If there were any warnings, print summary with warnings"""
# TODO: remove?