"""Manage the filesystem in a Zip archive. """ from __future__ import print_function, unicode_literals import sys import typing import six import zipfile from datetime import datetime from . import errors from ._url_tools import url_quote from .base import FS from .compress import write_zip from .enums import ResourceType, Seek from .info import Info from .iotools import RawWrapper from .memoryfs import MemoryFS from .opener import open_fs from .path import dirname, forcedir, normpath, relpath from .permissions import Permissions from .time import datetime_to_epoch from .wrapfs import WrapFS if typing.TYPE_CHECKING: from typing import ( Any, BinaryIO, Collection, Dict, List, Optional, SupportsInt, Text, Tuple, Union, ) from .info import RawInfo from .subfs import SubFS R = typing.TypeVar("R", bound="ReadZipFS") class _ZipExtFile(RawWrapper): def __init__(self, fs, name): # noqa: D107 # type: (ReadZipFS, Text) -> None self._zip = _zip = fs._zip self._end = _zip.getinfo(name).file_size self._pos = 0 super(_ZipExtFile, self).__init__(_zip.open(name), "r", name) # NOTE(@althonos): Starting from Python 3.7, files inside a Zip archive are # seekable provided they were opened from a seekable file # handle. Before that, we can emulate a seek using the # read method, although it adds a ton of overhead and is # way less efficient than extracting once to a BytesIO. if sys.version_info < (3, 7): def read(self, size=-1): # type: (int) -> bytes buf = self._f.read(-1 if size is None else size) self._pos += len(buf) return buf def read1(self, size=-1): # type: (int) -> bytes buf = self._f.read1(-1 if size is None else size) # type: ignore self._pos += len(buf) return buf def tell(self): # type: () -> int return self._pos def seekable(self): return True def seek(self, offset, whence=Seek.set): # type: (int, SupportsInt) -> int """Change stream position. Change the stream position to the given byte offset. The offset is interpreted relative to the position indicated by ``whence``. Arguments: offset (int): the offset to the new position, in bytes. whence (int): the position reference. Possible values are: * `Seek.set`: start of stream (the default). * `Seek.current`: current position; offset may be negative. * `Seek.end`: end of stream; offset must be negative. Returns: int: the new absolute position. Raises: ValueError: when ``whence`` is not known, or ``offset`` is invalid. Note: Zip compression does not support seeking, so the seeking is emulated. Seeking somewhere else than the current position will need to either: * reopen the file and restart decompression * read and discard data to advance in the file """ _whence = int(whence) if _whence == Seek.current: offset += self._pos if _whence == Seek.current or _whence == Seek.set: if offset < 0: raise ValueError("Negative seek position {}".format(offset)) elif _whence == Seek.end: if offset > 0: raise ValueError("Positive seek position {}".format(offset)) offset += self._end else: raise ValueError( "Invalid whence ({}, should be {}, {} or {})".format( _whence, Seek.set, Seek.current, Seek.end ) ) if offset < self._pos: self._f = self._zip.open(self.name) # type: ignore self._pos = 0 self.read(offset - self._pos) return self._pos else: def seek(self, offset, whence=Seek.set): # type: (int, SupportsInt) -> int """Change stream position. Change the stream position to the given byte offset. The offset is interpreted relative to the position indicated by ``whence``. Arguments: offset (int): the offset to the new position, in bytes. whence (int): the position reference. Possible values are: * `Seek.set`: start of stream (the default). * `Seek.current`: current position; offset may be negative. * `Seek.end`: end of stream; offset must be negative. Returns: int: the new absolute position. Raises: ValueError: when ``whence`` is not known, or ``offset`` is invalid. """ _whence = int(whence) _pos = self.tell() if _whence == Seek.set: if offset < 0: raise ValueError("Negative seek position {}".format(offset)) elif _whence == Seek.current: if _pos + offset < 0: raise ValueError("Negative seek position {}".format(offset)) elif _whence == Seek.end: if offset > 0: raise ValueError("Positive seek position {}".format(offset)) else: raise ValueError( "Invalid whence ({}, should be {}, {} or {})".format( _whence, Seek.set, Seek.current, Seek.end ) ) return self._f.seek(offset, _whence) class ZipFS(WrapFS): """Read and write zip files. There are two ways to open a `ZipFS` for the use cases of reading a zip file, and creating a new one. If you open the `ZipFS` with ``write`` set to `False` (the default) then the filesystem will be a read-only filesystem which maps to the files and directories within the zip file. Files are decompressed on the fly when you open them. Here's how you might extract and print a readme from a zip file:: with ZipFS('foo.zip') as zip_fs: readme = zip_fs.readtext('readme.txt') If you open the `ZipFS` with ``write`` set to `True`, then the `ZipFS` will be an empty temporary filesystem. Any files / directories you create in the `ZipFS` will be written in to a zip file when the `ZipFS` is closed. Here's how you might write a new zip file containing a ``readme.txt`` file:: with ZipFS('foo.zip', write=True) as new_zip: new_zip.writetext( 'readme.txt', 'This zip file was written by PyFilesystem' ) Arguments: file (str or io.IOBase): An OS filename, or an open file object. write (bool): Set to `True` to write a new zip file, or `False` (default) to read an existing zip file. compression (int): Compression to use (one of the constants defined in the `zipfile` module in the stdlib). temp_fs (str or FS): An FS URL or an FS instance to use to store data prior to zipping. Defaults to creating a new `~fs.tempfs.TempFS`. """ # TODO: __new__ returning different types may be too 'magical' def __new__( # type: ignore cls, file, # type: Union[Text, BinaryIO] write=False, # type: bool compression=zipfile.ZIP_DEFLATED, # type: int encoding="utf-8", # type: Text temp_fs="temp://__ziptemp__", # type: Union[Text, FS] ): # type: (...) -> FS # This magic returns a different class instance based on the # value of the ``write`` parameter. if write: return WriteZipFS( file, compression=compression, encoding=encoding, temp_fs=temp_fs ) else: return ReadZipFS(file, encoding=encoding) if typing.TYPE_CHECKING: def __init__( self, file, # type: Union[Text, BinaryIO] write=False, # type: bool compression=zipfile.ZIP_DEFLATED, # type: int encoding="utf-8", # type: Text temp_fs="temp://__ziptemp__", # type: Text ): # noqa: D107 # type: (...) -> None pass @six.python_2_unicode_compatible class WriteZipFS(WrapFS): """A writable zip file.""" def __init__( self, file, # type: Union[Text, BinaryIO] compression=zipfile.ZIP_DEFLATED, # type: int encoding="utf-8", # type: Text temp_fs="temp://__ziptemp__", # type: Union[Text, FS] ): # noqa: D107 # type: (...) -> None self._file = file self.compression = compression self.encoding = encoding self._temp_fs_url = temp_fs self._temp_fs = open_fs(temp_fs) self._meta = dict(self._temp_fs.getmeta()) # type: ignore super(WriteZipFS, self).__init__(self._temp_fs) def __repr__(self): # type: () -> Text t = "WriteZipFS({!r}, compression={!r}, encoding={!r}, temp_fs={!r})" return t.format(self._file, self.compression, self.encoding, self._temp_fs_url) def __str__(self): # type: () -> Text return "<zipfs-write '{}'>".format(self._file) def delegate_path(self, path): # type: (Text) -> Tuple[FS, Text] return self._temp_fs, path def delegate_fs(self): # type: () -> FS return self._temp_fs def close(self): # type: () -> None if not self.isclosed(): try: self.write_zip() finally: self._temp_fs.close() super(WriteZipFS, self).close() def write_zip( self, file=None, # type: Union[Text, BinaryIO, None] compression=None, # type: Optional[int] encoding=None, # type: Optional[Text] ): # type: (...) -> None """Write zip to a file. Arguments: file (str or io.IOBase, optional): Destination file, may be a file name or an open file handle. compression (int, optional): Compression to use (one of the constants defined in the `zipfile` module in the stdlib). encoding (str, optional): The character encoding to use (default uses the encoding defined in `~WriteZipFS.__init__`). Note: This is called automatically when the ZipFS is closed. """ if not self.isclosed(): write_zip( self._temp_fs, file or self._file, compression=compression or self.compression, encoding=encoding or self.encoding, ) @six.python_2_unicode_compatible class ReadZipFS(FS): """A readable zip file.""" _meta = { "case_insensitive": False, "network": False, "read_only": True, "supports_rename": False, "thread_safe": True, "unicode_paths": True, "virtual": False, } @errors.CreateFailed.catch_all def __init__(self, file, encoding="utf-8"): # noqa: D107 # type: (Union[BinaryIO, Text], Text) -> None super(ReadZipFS, self).__init__() self._file = file self.encoding = encoding self._zip = zipfile.ZipFile(file, "r") self._directory_fs = None # type: Optional[MemoryFS] def __repr__(self): # type: () -> Text return "ReadZipFS({!r})".format(self._file) def __str__(self): # type: () -> Text return "<zipfs '{}'>".format(self._file) def _path_to_zip_name(self, path): # type: (Text) -> str """Convert a path to a zip file name.""" path = relpath(normpath(path)) if self._directory.isdir(path): path = forcedir(path) if six.PY2: return path.encode(self.encoding) return path @property def _directory(self): # type: () -> MemoryFS """`MemoryFS`: a filesystem with the same folder hierarchy as the zip.""" self.check() with self._lock: if self._directory_fs is None: self._directory_fs = _fs = MemoryFS() for zip_name in self._zip.namelist(): resource_name = zip_name if six.PY2: resource_name = resource_name.decode(self.encoding, "replace") if resource_name.endswith("/"): _fs.makedirs(resource_name, recreate=True) else: _fs.makedirs(dirname(resource_name), recreate=True) _fs.create(resource_name) return self._directory_fs def getinfo(self, path, namespaces=None): # type: (Text, Optional[Collection[Text]]) -> Info _path = self.validatepath(path) namespaces = namespaces or () raw_info = {} # type: Dict[Text, Dict[Text, object]] if _path == "/": raw_info["basic"] = {"name": "", "is_dir": True} if "details" in namespaces: raw_info["details"] = {"type": int(ResourceType.directory)} else: basic_info = self._directory.getinfo(_path) raw_info["basic"] = {"name": basic_info.name, "is_dir": basic_info.is_dir} if not {"details", "access", "zip"}.isdisjoint(namespaces): zip_name = self._path_to_zip_name(path) try: zip_info = self._zip.getinfo(zip_name) except KeyError: # Can occur if there is an implied directory in the zip pass else: if "details" in namespaces: raw_info["details"] = { "size": zip_info.file_size, "type": int( ResourceType.directory if basic_info.is_dir else ResourceType.file ), "modified": datetime_to_epoch( datetime(*zip_info.date_time) ), } if "zip" in namespaces: raw_info["zip"] = { k: getattr(zip_info, k) for k in zip_info.__slots__ # type: ignore if not k.startswith("_") } if "access" in namespaces: # check the zip was created on UNIX to get permissions if zip_info.external_attr and zip_info.create_system == 3: raw_info["access"] = { "permissions": Permissions( mode=zip_info.external_attr >> 16 & 0xFFF ).dump() } return Info(raw_info) def setinfo(self, path, info): # type: (Text, RawInfo) -> None self.check() raise errors.ResourceReadOnly(path) def listdir(self, path): # type: (Text) -> List[Text] self.check() return self._directory.listdir(path) def makedir( self, # type: R path, # type: Text permissions=None, # type: Optional[Permissions] recreate=False, # type: bool ): # type: (...) -> SubFS[R] self.check() raise errors.ResourceReadOnly(path) def openbin(self, path, mode="r", buffering=-1, **kwargs): # type: (Text, Text, int, **Any) -> BinaryIO self.check() if "w" in mode or "+" in mode or "a" in mode: raise errors.ResourceReadOnly(path) if not self._directory.exists(path): raise errors.ResourceNotFound(path) elif self._directory.isdir(path): raise errors.FileExpected(path) zip_name = self._path_to_zip_name(path) return _ZipExtFile(self, zip_name) # type: ignore def remove(self, path): # type: (Text) -> None self.check() raise errors.ResourceReadOnly(path) def removedir(self, path): # type: (Text) -> None self.check() raise errors.ResourceReadOnly(path) def close(self): # type: () -> None super(ReadZipFS, self).close() if hasattr(self, "_zip"): self._zip.close() def readbytes(self, path): # type: (Text) -> bytes self.check() if not self._directory.isfile(path): raise errors.ResourceNotFound(path) zip_name = self._path_to_zip_name(path) zip_bytes = self._zip.read(zip_name) return zip_bytes def geturl(self, path, purpose="download"): # type: (Text, Text) -> Text if purpose == "fs" and isinstance(self._file, six.string_types): quoted_file = url_quote(self._file) quoted_path = url_quote(path) return "zip://{}!/{}".format(quoted_file, quoted_path) else: raise errors.NoURL(path, purpose)
Memory