import os import typing from . import constants, exceptions # Alias for readability. Due to import recursion issues we cannot do: # from .constants import LockFlags LockFlags = constants.LockFlags class HasFileno(typing.Protocol): def fileno(self) -> int: ... LOCKER: typing.Optional[typing.Callable[ [typing.Union[int, HasFileno], int], typing.Any]] = None if os.name == 'nt': # pragma: no cover import msvcrt import pywintypes import win32con import win32file import winerror __overlapped = pywintypes.OVERLAPPED() def lock(file_: typing.Union[typing.IO, int], flags: LockFlags): # Windows locking does not support locking through `fh.fileno()` so # we cast it to make mypy and pyright happy file_ = typing.cast(typing.IO, file_) mode = 0 if flags & LockFlags.NON_BLOCKING: mode |= win32con.LOCKFILE_FAIL_IMMEDIATELY if flags & LockFlags.EXCLUSIVE: mode |= win32con.LOCKFILE_EXCLUSIVE_LOCK # Save the old position so we can go back to that position but # still lock from the beginning of the file savepos = file_.tell() if savepos: file_.seek(0) os_fh = msvcrt.get_osfhandle(file_.fileno()) # type: ignore try: win32file.LockFileEx(os_fh, mode, 0, -0x10000, __overlapped) except pywintypes.error as exc_value: # error: (33, 'LockFileEx', 'The process cannot access the file # because another process has locked a portion of the file.') if exc_value.winerror == winerror.ERROR_LOCK_VIOLATION: raise exceptions.AlreadyLocked( exceptions.LockException.LOCK_FAILED, exc_value.strerror, fh=file_, ) from exc_value else: # Q: Are there exceptions/codes we should be dealing with # here? raise finally: if savepos: file_.seek(savepos) def unlock(file_: typing.IO): try: savepos = file_.tell() if savepos: file_.seek(0) os_fh = msvcrt.get_osfhandle(file_.fileno()) # type: ignore try: win32file.UnlockFileEx( os_fh, 0, -0x10000, __overlapped, ) except pywintypes.error as exc: if exc.winerror != winerror.ERROR_NOT_LOCKED: # Q: Are there exceptions/codes we should be # dealing with here? raise finally: if savepos: file_.seek(savepos) except OSError as exc: raise exceptions.LockException( exceptions.LockException.LOCK_FAILED, exc.strerror, fh=file_, ) from exc elif os.name == 'posix': # pragma: no cover import errno import fcntl # The locking implementation. # Expected values are either fcntl.flock() or fcntl.lockf(), # but any callable that matches the syntax will be accepted. LOCKER = fcntl.flock def lock(file_: typing.Union[typing.IO, int], flags: LockFlags): assert LOCKER is not None, 'We need a locking function in `LOCKER` ' # Locking with NON_BLOCKING without EXCLUSIVE or SHARED enabled # results in an error if (flags & LockFlags.NON_BLOCKING) and not flags & ( LockFlags.SHARED | LockFlags.EXCLUSIVE ): raise RuntimeError( 'When locking in non-blocking mode the SHARED ' 'or EXCLUSIVE flag must be specified as well', ) try: LOCKER(file_, flags) except OSError as exc_value: # Python can use one of several different exception classes to # represent timeout (most likely is BlockingIOError and IOError), # but these errors may also represent other failures. On some # systems, `IOError is OSError` which means checking for either # IOError or OSError can mask other errors. # The safest check is to catch OSError (from which the others # inherit) and check the errno (which should be EACCESS or EAGAIN # according to the spec). if exc_value.errno in (errno.EACCES, errno.EAGAIN): # A timeout exception, wrap this so the outer code knows to try # again (if it wants to). raise exceptions.AlreadyLocked( exc_value, fh=file_, ) from exc_value else: # Something else went wrong; don't wrap this so we stop # immediately. raise exceptions.LockException( exc_value, fh=file_, ) from exc_value except EOFError as exc_value: # On NFS filesystems, flock can raise an EOFError raise exceptions.LockException( exc_value, fh=file_, ) from exc_value def unlock(file_: typing.IO): assert LOCKER is not None, 'We need a locking function in `LOCKER` ' LOCKER(file_.fileno(), LockFlags.UNBLOCK) else: # pragma: no cover raise RuntimeError('PortaLocker only defined for nt and posix platforms')
Memory