# -*- coding: utf-8 -*-
"""Support code for writing BGZF files
Shamelessly taken from Biopython
"""
# Biopython License Agreement
#
# Permission to use, copy, modify, and distribute this software and its
# documentation with or without modifications and for any purpose and
# without fee is hereby granted, provided that any copyright notices
# appear in all copies and that both those copyright notices and this
# permission notice appear in supporting documentation, and that the
# names of the contributors or copyright holders not be used in
# advertising or publicity pertaining to distribution of the software
# without specific prior permission.
#
# THE CONTRIBUTORS AND COPYRIGHT HOLDERS OF THIS SOFTWARE DISCLAIM ALL
# WARRANTIES WITH REGARD TO THIS SOFTWARE, INCLUDING ALL IMPLIED
# WARRANTIES OF MERCHANTABILITY AND FITNESS, IN NO EVENT SHALL THE
# CONTRIBUTORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY SPECIAL, INDIRECT
# OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM LOSS
# OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE
# OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE
# OR PERFORMANCE OF THIS SOFTWARE.
import codecs
import struct
import typing
import zlib
from typing import Iterable
# For Python 2 can just use: _bgzf_magic = '\x1f\x8b\x08\x04'
# but need to use bytes on Python 3
_bgzf_magic = b"\x1f\x8b\x08\x04"
_bgzf_header = b"\x1f\x8b\x08\x04\x00\x00\x00\x00\x00\xff\x06\x00\x42\x43\x02\x00"
_bgzf_eof = (
b"\x1f\x8b\x08\x04\x00\x00\x00\x00\x00\xff\x06\x00BC\x02\x00\x1b\x00\x03\x00\x00\x00\x00\x00\x00\x00\x00\x00"
)
_bytes_BC = b"BC"
[docs]
def make_virtual_offset(block_start_offset: int, within_block_offset: int) -> int:
"""Compute a BGZF virtual offset from block start and within block offsets.
The BAM indexing scheme records read positions using a 64 bit
'virtual offset', comprising in C terms:
block_start_offset << 16 | within_block_offset
Here block_start_offset is the file offset of the BGZF block
start (unsigned integer using up to 64-16 = 48 bits), and
within_block_offset within the (decompressed) block (unsigned
16 bit integer).
>>> make_virtual_offset(0, 0)
0
>>> make_virtual_offset(0, 1)
1
>>> make_virtual_offset(0, 2**16 - 1)
65535
>>> make_virtual_offset(0, 2**16)
Traceback (most recent call last):
...
ValueError: Require 0 <= within_block_offset < 2**16, got 65536
>>> 65536 == make_virtual_offset(1, 0)
True
>>> 65537 == make_virtual_offset(1, 1)
True
>>> 131071 == make_virtual_offset(1, 2**16 - 1)
True
>>> 6553600000 == make_virtual_offset(100000, 0)
True
>>> 6553600001 == make_virtual_offset(100000, 1)
True
>>> 6553600010 == make_virtual_offset(100000, 10)
True
>>> make_virtual_offset(2**48, 0)
Traceback (most recent call last):
...
ValueError: Require 0 <= block_start_offset < 2**48, got 281474976710656
"""
if within_block_offset < 0 or within_block_offset >= 65536:
raise ValueError("Require 0 <= within_block_offset < 2**16, got %i" % within_block_offset)
if block_start_offset < 0 or block_start_offset >= 281474976710656:
raise ValueError("Require 0 <= block_start_offset < 2**48, got %i" % block_start_offset)
return (block_start_offset << 16) | within_block_offset
[docs]
def split_virtual_offset(virtual_offset: int) -> tuple[int, int]:
"""Split a 64-bit BGZF virtual offset into block start and within block offsets.
Returns a tuple of (block_start_offset, within_block_offset).
>>> split_virtual_offset(0)
(0, 0)
>>> split_virtual_offset(1)
(0, 1)
>>> split_virtual_offset(65535)
(0, 65535)
>>> split_virtual_offset(65536)
(1, 0)
>>> split_virtual_offset(65537)
(1, 1)
>>> split_virtual_offset(1195311108)
(18239, 4)
"""
start_offset = virtual_offset >> 16
within_block = virtual_offset ^ (start_offset << 16)
return start_offset, within_block
def _load_bgzf_block(handle: typing.IO[bytes]) -> tuple[int, str]:
"""Load the next BGZF block from the file handle.
Returns a tuple of (block_size, decompressed_data_as_string).
Raises StopIteration if EOF is reached.
"""
# Read the complete gzip header (10 bytes)
# ID1 ID2 CM FLG MTIME(4) XFL OS
header = handle.read(10)
if len(header) < 10: # pragma: no cover
raise StopIteration("EOF")
# Check magic bytes
if header[:4] != _bgzf_magic: # pragma: no cover
raise ValueError(f"Invalid BGZF magic: {header[:4]!r}")
# Check FLG field - should have FEXTRA bit set (0x04)
flg = header[3]
if not (flg & 0x04): # pragma: no cover
raise ValueError("BGZF file missing FEXTRA flag")
# Read XLEN (extra field length)
xlen_bytes = handle.read(2)
if len(xlen_bytes) < 2: # pragma: no cover
raise ValueError("Truncated BGZF extra field length")
xlen = struct.unpack("<H", xlen_bytes)[0]
if xlen < 6: # pragma: no cover
raise ValueError("Invalid BGZF extra field length")
# Read the extra field
extra_data = handle.read(xlen)
if len(extra_data) < xlen: # pragma: no cover
raise ValueError("Truncated BGZF extra field")
# Parse the BC subfield to get BSIZE
if extra_data[:2] != _bytes_BC: # pragma: no cover
raise ValueError("Missing BC subfield in BGZF header")
if len(extra_data) < 6: # pragma: no cover
raise ValueError("BC subfield too short")
# BC subfield: SI1='B' SI2='C' SLEN=2 BSIZE(2 bytes)
slen = struct.unpack("<H", extra_data[2:4])[0]
if slen != 2: # pragma: no cover
raise ValueError(f"Expected BC subfield length 2, got {slen}")
bsize = struct.unpack("<H", extra_data[4:6])[0]
# Calculate compressed data size
# Total block size = BSIZE + 1
# Header size = 10 + 2 + XLEN (header(10) + xlen(2) + extra(XLEN))
# Trailer size = 8 (CRC32 + ISIZE)
header_size = 10 + 2 + xlen
cdata_size = (bsize + 1) - header_size - 8
if cdata_size <= 0: # pragma: no cover
raise ValueError(f"Invalid compressed data size: {cdata_size}")
# Read the compressed data
compressed_data = handle.read(cdata_size)
if len(compressed_data) != cdata_size: # pragma: no cover
raise ValueError(f"Truncated BGZF block data: expected {cdata_size}, got {len(compressed_data)}")
# Read the trailer (CRC32 and ISIZE)
trailer = handle.read(8)
if len(trailer) != 8: # pragma: no cover
raise ValueError("Truncated BGZF trailer")
crc32, isize = struct.unpack("<II", trailer)
# Decompress the data
try:
# Use raw deflate decompression (negative window bits)
data = zlib.decompress(compressed_data, -15)
except zlib.error as e: # pragma: no cover
raise ValueError(f"Failed to decompress BGZF block: {e}")
# Verify the uncompressed size
if len(data) != isize: # pragma: no cover
raise ValueError(f"Uncompressed size mismatch: got {len(data)}, expected {isize}")
# Verify the CRC32
if zlib.crc32(data) & 0xFFFFFFFF != crc32: # pragma: no cover
raise ValueError("CRC32 mismatch in BGZF block")
# Always convert to string using latin-1 encoding
result = data.decode("latin-1")
return bsize + 1, result
[docs]
class BgzfWriter(typing.IO[str]):
def __init__(
self,
filename: str | None = None,
mode: str = "w",
fileobj: typing.IO[bytes] | None = None,
compresslevel: int = 6,
):
if fileobj:
assert filename is None
handle = fileobj
else:
if "w" not in mode.lower() and "a" not in mode.lower(): # pragma: no cover
raise ValueError("Must use write or append mode, not %r" % mode)
if filename is None: # pragma: no cover
raise ValueError("Must give a filename if not passing a file handle")
if "a" in mode.lower(): # pragma: no cover
handle = open(filename, "ab")
else:
handle = open(filename, "wb")
self._text: bool = "b" not in mode.lower()
self._handle: typing.IO[bytes] = handle
self._buffer: bytes = b""
self.compresslevel: int = compresslevel
self._filename = filename
self._mode = mode
self._closed = False
def _write_block(self, block: bytes):
# print("Saving %i bytes" % len(block))
assert len(block) <= 65536
# Giving a negative window bits means no gzip/zlib headers,
# -15 used in samtools
c = zlib.compressobj(self.compresslevel, zlib.DEFLATED, -15, zlib.DEF_MEM_LEVEL, 0)
compressed = c.compress(block) + c.flush()
del c
assert len(compressed) < 65536, "TODO - Didn't compress enough, try less data in this block"
crc = zlib.crc32(block)
# Should cope with a mix of Python platforms...
if crc < 0: # pragma: no cover
crc = struct.pack("<i", crc)
else:
crc = struct.pack("<I", crc)
bsize = struct.pack("<H", len(compressed) + 25) # includes -1
crc = struct.pack("<I", zlib.crc32(block) & 0xFFFFFFFF)
uncompressed_length = struct.pack("<I", len(block))
# Fixed 16 bytes,
# gzip magic bytes (4) mod time (4),
# gzip flag (1), os (1), extra length which is six (2),
# sub field which is BC (2), sub field length of two (2),
# Variable data,
# 2 bytes: block length as BC sub field (2)
# X bytes: the data
# 8 bytes: crc (4), uncompressed data length (4)
data = _bgzf_header + bsize + compressed + crc + uncompressed_length
self._handle.write(data)
[docs]
def write(self, data: str) -> int:
"""Write string data to the BGZF file.
Args:
data: String data to write
Returns:
Number of characters written
"""
if self._closed: # pragma: no cover
raise ValueError("I/O operation on closed file.")
original_len = len(data)
# Convert string to bytes using latin-1 encoding
data_bytes = codecs.latin_1_encode(data)[0]
# block_size = 2**16 = 65536
data_len = len(data_bytes)
if len(self._buffer) + data_len < 65536:
# print("Cached %r" % data)
self._buffer += data_bytes
else: # pragma: no cover
# print("Got %r, writing out some data..." % data)
self._buffer += data_bytes
while len(self._buffer) >= 65536:
self._write_block(self._buffer[:65536])
self._buffer = self._buffer[65536:]
return original_len
def flush(self):
while len(self._buffer) >= 65536: # pragma: no cover
self._write_block(self._buffer[:65535])
self._buffer = self._buffer[65535:]
self._write_block(self._buffer)
self._buffer = b""
self._handle.flush()
[docs]
def close(self) -> None:
"""Flush data, write 28 bytes BGZF EOF marker, and close BGZF file.
samtools will look for a magic EOF marker, just a 28 byte empty BGZF
block, and if it is missing warns the BAM file may be truncated. In
addition to samtools writing this block, so too does bgzip - so this
implementation does too.
"""
if self._closed: # pragma: no cover
return
if self._buffer:
self.flush()
self._handle.write(_bgzf_eof)
self._handle.flush()
self._handle.close()
self._closed = True
[docs]
def tell(self) -> int: # pragma: no cover
"""Returns a BGZF 64-bit virtual offset."""
return make_virtual_offset(self._handle.tell(), len(self._buffer))
def seekable(self) -> bool: # pragma: no cover
# Not seekable, but we do support tell...
return False
[docs]
def isatty(self) -> bool: # pragma: no cover
"""Return False as BGZF files are not TTY."""
return False
@property
def closed(self) -> bool: # pragma: no cover
"""Return True if the file is closed."""
return self._closed
@property
def mode(self) -> str: # pragma: no cover
"""Return the file mode."""
return self._mode
@property
def name(self) -> str: # pragma: no cover
"""Return the file name."""
return self._filename or ""
[docs]
def readable(self) -> bool: # pragma: no cover
"""Return False as this is a write-only file."""
return False
[docs]
def writable(self) -> bool: # pragma: no cover
"""Return True as this is a writable file."""
return not self._closed
[docs]
def read(self, size: int = -1) -> str: # pragma: no cover
"""Read operation not supported for write-only BGZF file."""
raise OSError("not readable")
[docs]
def readline(self, size: int = -1) -> str: # pragma: no cover
"""Readline operation not supported for write-only BGZF file."""
raise OSError("not readable")
[docs]
def readlines(self, hint: int = -1) -> list[str]: # pragma: no cover
"""Readlines operation not supported for write-only BGZF file."""
raise OSError("not readable")
[docs]
def seek(self, offset: int, whence: int = 0) -> int: # pragma: no cover
"""Seek operation not supported for BGZF files."""
raise OSError("seek not supported on BGZF files")
[docs]
def truncate(self, size: int | None = None) -> int: # pragma: no cover
"""Truncate operation not supported for BGZF files."""
raise OSError("truncate not supported on BGZF files")
[docs]
def writelines(self, lines: Iterable[str]) -> None:
"""Write a list of strings to the file."""
for line in lines:
self.write(line)
def fileno(self) -> int:
return self._handle.fileno()
def __enter__(self) -> "BgzfWriter":
return self
def __exit__(self, type_: type[BaseException] | None, value: BaseException | None, traceback: typing.Any) -> None:
self.close()
[docs]
class BgzfReader(typing.IO[str]):
r"""BGZF reader, acts like a read only handle but seek/tell differ."""
def __init__(
self,
filename: str | None = None,
mode: str = "r",
fileobj: typing.IO[bytes] | None = None,
max_cache: int = 100,
):
r"""Initialize the class for reading a BGZF file.
You would typically use the top level ``bgzf.open(...)`` function
which will call this class internally. Direct use is discouraged.
Either the ``filename`` (string) or ``fileobj`` (input file object in
binary mode) arguments must be supplied, but not both.
Argument ``mode`` controls if the data will be returned as strings in
text mode ("rt", "tr", or default "r"), or bytes binary mode ("rb"
or "br"). The argument name matches the built-in ``open(...)`` and
standard library ``gzip.open(...)`` function.
If text mode is requested, in order to avoid multi-byte characters,
this is hard coded to use the "latin1" encoding, and "\r" and "\n"
are passed as is (without implementing universal new line mode). There
is no ``encoding`` argument.
If your data is in UTF-8 or any other incompatible encoding, you must
use binary mode, and decode the appropriate fragments yourself.
Argument ``max_cache`` controls the maximum number of BGZF blocks to
cache in memory. Each can be up to 64kb thus the default of 100 blocks
could take up to 6MB of RAM. This is important for efficient random
access, a small value is fine for reading the file in one pass.
"""
# TODO - Assuming we can seek, check for 28 bytes EOF empty block
# and if missing warn about possible truncation (as in samtools)?
if max_cache < 1: # pragma: no cover
raise ValueError("Use max_cache with a minimum of 1")
# Must open the BGZF file in binary mode, but we may want to
# treat the contents as either text or binary (unicode or
# bytes under Python 3)
if filename and fileobj:
raise ValueError("Supply either filename or fileobj, not both")
# Want to reject output modes like w, a, x, +
if mode.lower() not in ("r", "tr", "rt", "rb", "br"): # pragma: no cover
raise ValueError("Must use a read mode like 'r' (default), 'rt', or 'rb' for binary")
# If an open file was passed, make sure it was opened in binary mode.
if fileobj:
if fileobj.read(0) != b"": # pragma: no cover
raise ValueError("fileobj not opened in binary mode")
handle = fileobj
else:
if filename is None: # pragma: no cover
raise ValueError("Must provide filename if fileobj is None")
handle = open(filename, "rb")
# Always read bytes from disk but return strings to the outside world
self._newline = "\n"
self._handle = handle
self.max_cache = max_cache
self._buffers: dict[int, tuple[str, int]] = {}
self._block_start_offset: int = -1 # Force initial load
self._block_raw_length: int = 0
self._within_block_offset: int = 0
self._buffer: str = ""
self._load_block(0) # Start at offset 0
def _load_block(self, start_offset: int | None = None) -> None:
if start_offset is None:
# If the file is being read sequentially, then _handle.tell()
# should be pointing at the start of the next block.
# However, if seek has been used, we can't assume that.
start_offset = self._block_start_offset + self._block_raw_length
if start_offset == self._block_start_offset: # pragma: no cover
self._within_block_offset = 0
return
elif start_offset in self._buffers:
# Already in cache
self._buffer, self._block_raw_length = self._buffers[start_offset]
self._within_block_offset = 0
self._block_start_offset = start_offset
return
# Must hit the disk... first check cache limits,
while len(self._buffers) >= self.max_cache: # pragma: no cover
# TODO - Implement LRU cache removal?
self._buffers.popitem()
# Now load the block
handle = self._handle
if start_offset is not None:
handle.seek(start_offset)
self._block_start_offset = handle.tell()
try:
block_size, self._buffer = _load_bgzf_block(handle)
except StopIteration:
# EOF
block_size = 0
self._buffer = ""
self._within_block_offset = 0
self._block_raw_length = block_size
# Finally save the block in our cache,
self._buffers[self._block_start_offset] = self._buffer, block_size
[docs]
def tell(self): # pragma: no cover
"""Return a 64-bit unsigned BGZF virtual offset."""
if 0 < self._within_block_offset and self._within_block_offset == len(self._buffer):
# Special case where we're right at the end of a (non empty) block.
# For non-maximal blocks could give two possible virtual offsets,
# but for a maximal block can't use 65536 as the within block
# offset. Therefore for consistency, use the next block and a
# within block offset of zero.
return (self._block_start_offset + self._block_raw_length) << 16
else:
# return make_virtual_offset(self._block_start_offset,
# self._within_block_offset)
# TODO - Include bounds checking as in make_virtual_offset?
return (self._block_start_offset << 16) | self._within_block_offset
[docs]
def seek(self, virtual_offset: int, whence: int = 0) -> int:
"""Seek to a 64-bit unsigned BGZF virtual offset."""
# Do this inline to avoid a function call,
# start_offset, within_block = split_virtual_offset(virtual_offset)
start_offset = virtual_offset >> 16
within_block = virtual_offset ^ (start_offset << 16)
if start_offset != self._block_start_offset:
# Don't need to load the block if already there
# (this avoids a function call since _load_block would do nothing)
self._load_block(start_offset)
if start_offset != self._block_start_offset: # pragma: no cover
raise ValueError("start_offset not loaded correctly")
if within_block > len(self._buffer):
if not (within_block == 0 and len(self._buffer) == 0): # pragma: no cover
raise ValueError("Within offset %i but block size only %i" % (within_block, len(self._buffer)))
self._within_block_offset = within_block
# assert virtual_offset == self.tell(), \
# "Did seek to %i (%i, %i), but tell says %i (%i, %i)" \
# % (virtual_offset, start_offset, within_block,
# self.tell(), self._block_start_offset,
# self._within_block_offset)
return virtual_offset
[docs]
def read(self, size: int = -1) -> str:
"""Read method for the BGZF module."""
if size < 0: # pragma: no cover
raise NotImplementedError("Don't be greedy, that could be massive!")
result = ""
while size and self._block_raw_length:
if self._within_block_offset + size <= len(self._buffer):
# This may leave us right at the end of a block
# (lazy loading, don't load the next block unless we have too)
data = self._buffer[self._within_block_offset : self._within_block_offset + size]
self._within_block_offset += size
if not data: # pragma: no cover
raise ValueError("Must be at least 1 byte")
result += data
break
else: # pragma: no cover
data = self._buffer[self._within_block_offset :]
size -= len(data)
self._load_block() # will reset offsets
result += data
return result
[docs]
def readline(self, size: int = -1) -> str:
"""Read a single line for the BGZF file."""
result = ""
while self._block_raw_length:
i = self._buffer.find(self._newline, self._within_block_offset)
# Three cases to consider,
if i == -1: # pragma: no cover
# No newline, need to read in more data
data = self._buffer[self._within_block_offset :]
self._load_block() # will reset offsets
result += data
elif i + 1 == len(self._buffer):
# Found new line, but right at end of block (SPECIAL)
data = self._buffer[self._within_block_offset :]
# Must now load the next block to ensure tell() works
self._load_block() # will reset offsets
if not data: # pragma: no cover
raise ValueError("Must be at least 1 byte")
result += data
break
else:
# Found new line, not at end of block (easy case, no IO)
data = self._buffer[self._within_block_offset : i + 1]
self._within_block_offset = i + 1
# assert data.endswith(self._newline)
result += data
break
return result
def __next__(self) -> str:
"""Return the next line."""
line = self.readline()
if not line:
raise StopIteration
return line
def __iter__(self) -> "BgzfReader": # pragma: no cover
"""Iterate over the lines in the BGZF file."""
return self
[docs]
def close(self) -> None:
"""Close BGZF file."""
self._handle.close()
self._buffer = ""
self._block_start_offset = 0
self._buffers = {}
[docs]
def seekable(self) -> bool: # pragma: no cover
"""Return True indicating the BGZF supports random access."""
return True
[docs]
def isatty(self) -> bool: # pragma: no cover
"""Return True if connected to a TTY device."""
return False
[docs]
def readable(self) -> bool: # pragma: no cover
"""Return True indicating the BGZF file is readable."""
return True
[docs]
def writable(self) -> bool: # pragma: no cover
"""Return False indicating the BGZF file is not writable."""
return False
[docs]
def fileno(self) -> int: # pragma: no cover
"""Return integer file descriptor."""
return self._handle.fileno()
@property
def closed(self) -> bool: # pragma: no cover
"""Return True if the file is closed."""
return self._handle.closed
@property
def mode(self) -> str: # pragma: no cover
"""Return the file mode."""
return "r"
@property
def name(self) -> str:
"""Return the file name."""
return getattr(self._handle, "name", "")
[docs]
def flush(self) -> None: # pragma: no cover
"""Flush - no-op for read-only file."""
pass
[docs]
def readlines(self, hint: int = -1) -> list[str]:
"""Read all lines from the file."""
lines = []
for line in self:
lines.append(line)
if hint > 0 and len(lines) >= hint:
break
return lines
[docs]
def writelines(self, lines: Iterable[str]) -> None: # pragma: no cover
"""Write lines - not supported for read-only file."""
raise OSError("not writable")
[docs]
def write(self, s: str) -> int: # pragma: no cover
"""Write - not supported for read-only file."""
raise OSError("not writable")
[docs]
def truncate(self, size: int | None = None) -> int: # pragma: no cover
"""Truncate - not supported for read-only file."""
raise OSError("not writable")
def __enter__(self):
"""Open a file operable with WITH statement."""
return self
def __exit__(self, type, value, traceback):
"""Close a file with WITH statement."""
self.close()