PK `ZZZw9�w micropip/__init__.pyfrom .package_manager import PackageManager
try:
from ._version import __version__
except ImportError:
pass
_package_manager_singleton = PackageManager()
install = _package_manager_singleton.install
set_index_urls = _package_manager_singleton.set_index_urls
list = _package_manager_singleton.list
freeze = _package_manager_singleton.freeze
add_mock_package = _package_manager_singleton.add_mock_package
list_mock_packages = _package_manager_singleton.list_mock_packages
remove_mock_package = _package_manager_singleton.remove_mock_package
uninstall = _package_manager_singleton.uninstall
__all__ = [
"install",
"list",
"freeze",
"add_mock_package",
"list_mock_packages",
"remove_mock_package",
"uninstall",
"set_index_urls",
"__version__",
]
PK `ZZZ�t� � micropip/_mock_package.pyimport importlib
import importlib.abc
import importlib.metadata
import importlib.util
import shutil
import site
import sys
from collections.abc import Callable
from pathlib import Path
from textwrap import dedent
MOCK_INSTALL_NAME_MEMORY = "micropip in-memory mock package"
MOCK_INSTALL_NAME_PERSISTENT = "micropip mock package"
class MockDistribution(importlib.metadata.Distribution):
def __init__(self, file_dict, modules):
self.file_dict = file_dict
self.modules = modules
def read_text(self, filename):
"""Attempt to load metadata file given by the name.
:param filename: The name of the file in the distribution info.
:return: The text if found, otherwise None.
"""
if filename in self.file_dict:
return self.file_dict[filename]
else:
return None
def locate_file(self, path):
"""
Given a path to a file in this distribution, return a path
to it.
"""
return None
_mock_modules: dict[str, str | Callable] = {}
_mock_distributions: dict[str, MockDistribution] = {}
class _MockModuleFinder(importlib.abc.MetaPathFinder, importlib.abc.Loader):
def __init__(self):
pass
def find_distributions(self, context):
if context.name in _mock_distributions:
return [_mock_distributions[context.name]]
elif context.name is None:
return _mock_distributions.values()
else:
return []
def find_module(self, fullname, path=None):
spec = self.find_spec(fullname, path)
if spec is None:
return None
return spec
def create_module(self, spec):
if spec.name in _mock_modules:
from types import ModuleType
module = ModuleType(spec.name)
module.__path__ = "/micropip_mocks/" + module.__name__.replace(".", "/")
return module
def exec_module(self, module):
init_object = _mock_modules[module.__name__]
if isinstance(init_object, str):
# run module init code in the module
exec(dedent(init_object), module.__dict__)
elif callable(init_object):
# run module init function
init_object(module)
def find_spec(self, fullname, path=None, target=None):
if fullname not in _mock_modules.keys():
return None
spec = importlib.util.spec_from_loader(fullname, self)
return spec
_finder = _MockModuleFinder()
def _add_in_memory_distribution(name, metafiles, modules):
if _finder not in sys.meta_path:
sys.meta_path = [_finder] + sys.meta_path
_mock_distributions[name] = MockDistribution(metafiles, modules)
for name, obj in modules.items():
_add_mock_module(name, obj)
def _add_mock_module(name, obj):
_mock_modules[name] = obj
def _remove_in_memory_distribution(name):
if name in _mock_distributions:
for module in _mock_distributions[name].modules.keys():
if module in sys.modules:
del sys.modules[module]
del _mock_modules[module]
del _mock_distributions[name]
def add_mock_package(
name: str,
version: str,
*,
modules: dict[str, str | None] | None = None,
persistent: bool = False,
) -> None:
if modules is None:
# make a single mock module with this name
modules = {name: ""}
# make the metadata
METADATA = f"""Metadata-Version: 1.1
Name: {name}
Version: {version}
Summary: {name} mock package generated by micropip
Author-email: {name}@micro.pip.non-working-fake-host
"""
for module_name in modules.keys():
METADATA += f"Provides: {module_name}\n"
if persistent:
# make empty mock modules with the requested names in user site packages
site_packages = Path(site.getsitepackages()[0])
# should exist already, but just in case
site_packages.mkdir(parents=True, exist_ok=True)
dist_dir = site_packages / f"{name}-{version}.dist-info"
dist_dir.mkdir(parents=True, exist_ok=False)
metadata_file = dist_dir / "METADATA"
record_file = dist_dir / "RECORD"
installer_file = dist_dir / "INSTALLER"
file_list = [metadata_file, installer_file]
metadata_file.write_text(METADATA)
installer_file.write_text(MOCK_INSTALL_NAME_PERSISTENT)
for module_name, content in modules.items():
if not content:
content = ""
content = dedent(content)
path_parts = module_name.split(".")
dir_path = Path(site_packages, *path_parts)
dir_path.mkdir(exist_ok=True, parents=True)
init_file = dir_path / "__init__.py"
file_list.append(init_file)
init_file.write_text(content)
with open(record_file, "w") as f:
for file in file_list:
f.write(f"{file},,{file.stat().st_size}\n")
f.write(f"{record_file},,\n")
else:
# make memory mocks of files
INSTALLER = MOCK_INSTALL_NAME_MEMORY
metafiles = {"METADATA": METADATA, "INSTALLER": INSTALLER}
_add_in_memory_distribution(name, metafiles, modules)
importlib.invalidate_caches()
def list_mock_packages() -> list[str]:
mock_packages = [
dist.name
for dist in importlib.metadata.distributions()
if dist.read_text("INSTALLER")
in (MOCK_INSTALL_NAME_PERSISTENT, MOCK_INSTALL_NAME_MEMORY)
]
return mock_packages
def remove_mock_package(name: str) -> None:
d = importlib.metadata.distribution(name)
installer = d.read_text("INSTALLER")
if installer == MOCK_INSTALL_NAME_MEMORY:
_remove_in_memory_distribution(name)
return
elif installer is None or installer != MOCK_INSTALL_NAME_PERSISTENT:
raise ValueError(
f"Package {name} doesn't seem to be a micropip mock. \n"
"Are you sure it was installed with micropip?"
)
# a real mock package - kill it
# remove all files
folders: set[Path] = set()
if d.files is not None:
for file in d.files:
p = Path(file.locate())
p.unlink()
folders.add(p.parent)
# delete all folders except site_packages
# (that check is just to avoid killing
# undesirable things in case of weird micropip errors)
site_packages = Path(site.getsitepackages()[0])
for f in folders:
if f != site_packages:
shutil.rmtree(f)
PK `ZZZI�� � micropip/_utils.pyimport functools
import json
from importlib.metadata import Distribution
from pathlib import Path
from sysconfig import get_config_var, get_platform
from packaging.requirements import Requirement
from packaging.tags import Tag
from packaging.tags import sys_tags as sys_tags_orig
from packaging.utils import BuildTag, InvalidWheelFilename, canonicalize_name
from packaging.utils import parse_wheel_filename as parse_wheel_filename_orig
from packaging.version import InvalidVersion, Version
from ._compat import REPODATA_PACKAGES
def get_dist_info(dist: Distribution) -> Path:
"""
Get the .dist-info directory of a distribution.
"""
return dist._path # type: ignore[attr-defined]
def get_root(dist: Distribution) -> Path:
"""
Get the root directory where a package is installed.
This is normally the site-packages directory.
"""
return get_dist_info(dist).parent
def get_files_in_distribution(dist: Distribution) -> set[Path]:
"""
Get a list of files in a distribution, using the metadata.
Parameters
----------
dist
Distribution to get files from.
Returns
-------
A list of files in the distribution.
"""
root = get_root(dist)
dist_info = get_dist_info(dist)
files_to_remove = set()
pkg_files = dist.files or []
metadata_files = dist_info.glob("*")
for file in pkg_files:
abspath = (root / file).resolve()
files_to_remove.add(abspath)
# Also add all files in the .dist-info directory.
# Since micropip adds some extra files there, we need to remove them too.
files_to_remove.update(metadata_files)
return files_to_remove
@functools.cache
def sys_tags() -> tuple[Tag, ...]:
new_tags = []
abi_version = get_config_var("PYODIDE_ABI_VERSION")
pyodide_platform_tag = f"pyodide_{abi_version}_wasm32"
for tag in sys_tags_orig():
if "emscripten" in tag.platform:
new_tags.append(Tag(tag.interpreter, tag.abi, pyodide_platform_tag))
new_tags.append(tag)
return tuple(new_tags)
@functools.cache
def parse_wheel_filename(
filename: str,
) -> tuple[str, Version, BuildTag, frozenset[Tag]]:
return parse_wheel_filename_orig(filename)
# TODO: Move these helper functions back to WheelInfo
def parse_version(filename: str) -> Version:
return parse_wheel_filename(filename)[1]
def parse_tags(filename: str) -> frozenset[Tag]:
return parse_wheel_filename(filename)[3]
def best_compatible_tag_index(tags: frozenset[Tag]) -> int | None:
"""Get the index of the first tag in ``packaging.tags.sys_tags()`` that a wheel has.
Since ``packaging.tags.sys_tags()`` is sorted from most specific ("best") to most
general ("worst") compatibility, this index douples as a priority rank: given two
compatible wheels, the one whose best index is closer to zero should be installed.
Parameters
----------
tags
The tags to check.
Returns
-------
The index, or ``None`` if this wheel has no compatible tags.
"""
for index, tag in enumerate(sys_tags()):
if tag in tags:
return index
return None
def is_package_compatible(filename: str) -> bool:
"""
Check if a package is compatible with the current platform.
Parameters
----------
filename
Filename of the package to check.
"""
if not filename.endswith(".whl"):
return False
if filename.endswith("py3-none-any.whl"):
return True
try:
tags = parse_tags(filename)
except (InvalidVersion, InvalidWheelFilename):
return False
return best_compatible_tag_index(tags) is not None
def check_compatible(filename: str) -> None:
"""
Check if a package is compatible with the current platform.
If not, raise an exception with a error message that explains why.
"""
compatible = is_package_compatible(filename)
if compatible:
return
# Not compatible, now we need to figure out why.
try:
tags = parse_tags(filename)
except InvalidWheelFilename:
raise ValueError(f"Wheel filename is invalid: {filename!r}") from None
except InvalidVersion:
raise ValueError(f"Wheel version is invalid: {filename!r}") from None
tag: Tag = next(iter(tags))
if "emscripten" not in tag.platform:
raise ValueError(
f"Wheel platform '{tag.platform}' is not compatible with "
f"Pyodide's platform '{get_platform()}'"
)
def platform_to_version(platform: str) -> str:
return (
platform.replace("-", "_")
.removeprefix("emscripten_")
.removesuffix("_wasm32")
.replace("_", ".")
)
wheel_emscripten_version = platform_to_version(tag.platform)
pyodide_emscripten_version = platform_to_version(get_platform())
if wheel_emscripten_version != pyodide_emscripten_version:
raise ValueError(
f"Wheel was built with Emscripten v{wheel_emscripten_version} but "
f"Pyodide was built with Emscripten v{pyodide_emscripten_version}"
)
abi_incompatible = True
from sys import version_info
version = f"{version_info.major}{version_info.minor}"
abis = ["abi3", f"cp{version}"]
for tag in tags:
if tag.abi in abis:
abi_incompatible = False
break
if abi_incompatible:
abis_string = ",".join({tag.abi for tag in tags})
raise ValueError(
f"Wheel abi '{abis_string}' is not supported. Supported abis are 'abi3' and 'cp{version}'."
)
raise ValueError(f"Wheel interpreter version '{tag.interpreter}' is not supported.")
def fix_package_dependencies(
package_name: str, *, extras: list[str | None] | None = None
) -> None:
"""Check and fix the list of dependencies for this package
If you have manually installed a package and dependencies from wheels,
the dependencies will not be correctly setup in the package list
or the pyodide lockfile generated by freezing. This method checks
if the dependencies are correctly set in the package list and will
add missing dependencies.
Parameters
----------
package_name (string):
The name of the package to check.
extras (list):
List of extras for this package.
"""
if package_name in REPODATA_PACKAGES:
# don't check things that are in original repository
return
dist = Distribution.from_name(package_name)
package_requires = dist.requires
if package_requires is None:
# no dependencies - we're good to go
return
url = dist.read_text("PYODIDE_URL")
# If it wasn't installed with micropip / pyodide, then we
# can't do anything with it.
if url is None:
return
# Get current list of pyodide requirements
requires = dist.read_text("PYODIDE_REQUIRES")
if requires:
depends = json.loads(requires)
else:
depends = []
if extras is None:
extras = [None]
else:
extras = extras + [None]
for r in package_requires:
req = Requirement(r)
req_extras = req.extras
req_marker = req.marker
req_name = canonicalize_name(req.name)
needs_requirement = False
if req_marker is not None:
for e in extras:
if req_marker.evaluate(None if e is None else {"extra": e}):
needs_requirement = True
break
else:
needs_requirement = True
if needs_requirement:
fix_package_dependencies(req_name, extras=list(req_extras))
if req_name not in depends:
depends.append(req_name)
# write updated depends to PYODIDE_DEPENDS
(get_dist_info(dist) / "PYODIDE_REQUIRES").write_text(
json.dumps(sorted(x for x in depends))
)
PK `ZZZ���� � micropip/_version.py# file generated by setuptools_scm
# don't change, don't track in version control
TYPE_CHECKING = False
if TYPE_CHECKING:
from typing import Tuple, Union
VERSION_TUPLE = Tuple[Union[int, str], ...]
else:
VERSION_TUPLE = object
version: str
__version__: str
__version_tuple__: VERSION_TUPLE
version_tuple: VERSION_TUPLE
__version__ = version = '0.8.0'
__version_tuple__ = version_tuple = (0, 8, 0)
PK `ZZZ��:� � micropip/constants.pyFAQ_URLS = {
"cant_find_wheel": "https://pyodide.org/en/stable/usage/faq.html#why-can-t-micropip-find-a-pure-python-wheel-for-a-package"
}
PK `ZZZӃ� � micropip/freeze.pyimport importlib.metadata
import itertools
import json
from collections.abc import Iterator
from copy import deepcopy
from typing import Any
from packaging.utils import canonicalize_name
from ._utils import fix_package_dependencies
def freeze_lockfile(
lockfile_packages: dict[str, dict[str, Any]], lockfile_info: dict[str, str]
) -> str:
return json.dumps(freeze_data(lockfile_packages, lockfile_info))
def freeze_data(
lockfile_packages: dict[str, dict[str, Any]], lockfile_info: dict[str, str]
) -> dict[str, Any]:
pyodide_packages = deepcopy(lockfile_packages)
pip_packages = load_pip_packages()
package_items = itertools.chain(pyodide_packages.items(), pip_packages)
# Sort
packages = dict(sorted(package_items))
return {
"info": lockfile_info,
"packages": packages,
}
def load_pip_packages() -> Iterator[tuple[str, dict[str, Any]]]:
return map(
package_item,
filter(is_valid, map(load_pip_package, importlib.metadata.distributions())),
)
def package_item(entry: dict[str, Any]) -> tuple[str, dict[str, Any]]:
return canonicalize_name(entry["name"]), entry
def is_valid(entry: dict[str, Any]) -> bool:
return entry["file_name"] is not None
def load_pip_package(dist: importlib.metadata.Distribution) -> dict[str, Any]:
name = dist.name
version = dist.version
url = dist.read_text("PYODIDE_URL")
sha256 = dist.read_text("PYODIDE_SHA256")
imports = (dist.read_text("top_level.txt") or "").split()
requires = dist.read_text("PYODIDE_REQUIRES")
if not requires:
fix_package_dependencies(name)
requires = dist.read_text("PYODIDE_REQUIRES")
depends = json.loads(requires or "[]")
return dict(
name=name,
version=version,
file_name=url,
install_dir="site",
sha256=sha256,
imports=imports,
depends=depends,
)
PK `ZZZ�҄U
U
micropip/install.pyimport asyncio
import importlib
from collections.abc import Coroutine
from pathlib import Path
from typing import Any
from packaging.markers import default_environment
from ._compat import loadPackage, to_js
from .constants import FAQ_URLS
from .logging import setup_logging
from .transaction import Transaction
async def install(
requirements: str | list[str],
index_urls: list[str] | str,
keep_going: bool = False,
deps: bool = True,
credentials: str | None = None,
pre: bool = False,
*,
verbose: bool | int | None = None,
) -> None:
with setup_logging().ctx_level(verbose) as logger:
ctx = default_environment()
if isinstance(requirements, str):
requirements = [requirements]
fetch_kwargs = dict()
if credentials:
fetch_kwargs["credentials"] = credentials
# Note: getsitepackages is not available in a virtual environment...
# See https://github.com/pypa/virtualenv/issues/228 (issue is closed but
# problem is not fixed)
from site import getsitepackages
wheel_base = Path(getsitepackages()[0])
transaction = Transaction(
ctx=ctx, # type: ignore[arg-type]
ctx_extras=[],
keep_going=keep_going,
deps=deps,
pre=pre,
fetch_kwargs=fetch_kwargs,
verbose=verbose,
index_urls=index_urls,
)
await transaction.gather_requirements(requirements)
if transaction.failed:
failed_requirements = ", ".join([f"'{req}'" for req in transaction.failed])
raise ValueError(
f"Can't find a pure Python 3 wheel for: {failed_requirements}\n"
f"See: {FAQ_URLS['cant_find_wheel']}\n"
)
package_names = [pkg.name for pkg in transaction.pyodide_packages] + [
pkg.name for pkg in transaction.wheels
]
logger.debug(
"Installing packages %r and wheels %r ",
transaction.pyodide_packages,
[w.filename for w in transaction.wheels],
)
if package_names:
logger.info("Installing collected packages: %s", ", ".join(package_names))
wheel_promises: list[Coroutine[Any, Any, None] | asyncio.Task[Any]] = []
# Install built-in packages
pyodide_packages = transaction.pyodide_packages
if len(pyodide_packages):
# Note: branch never happens in out-of-browser testing because in
# that case REPODATA_PACKAGES is empty.
wheel_promises.append(
asyncio.ensure_future(
loadPackage(to_js([name for [name, _, _] in pyodide_packages]))
)
)
# Now install PyPI packages
for wheel in transaction.wheels:
# detect whether the wheel metadata is from PyPI or from custom location
# wheel metadata from PyPI has SHA256 checksum digest.
wheel_promises.append(wheel.install(wheel_base))
await asyncio.gather(*wheel_promises)
packages = [
f"{pkg.name}-{pkg.version}" for pkg in transaction.pyodide_packages
] + [f"{pkg.name}-{pkg.version}" for pkg in transaction.wheels]
if packages:
logger.info("Successfully installed %s", ", ".join(packages))
importlib.invalidate_caches()
PK `ZZZ��Aɿ � micropip/list.pyimport importlib.metadata
from typing import Any
from ._compat import loadedPackages
from .package import PackageDict, PackageMetadata
def list_installed_packages(
lockfile_packages: dict[str, dict[str, Any]]
) -> PackageDict:
# Add packages that are loaded through pyodide.loadPackage
packages = PackageDict()
for dist in importlib.metadata.distributions():
name = dist.name
version = dist.version
source = dist.read_text("PYODIDE_SOURCE")
if source is None:
# source is None if PYODIDE_SOURCE does not exist. In this case the
# wheel was installed manually, not via `pyodide.loadPackage` or
# `micropip`.
continue
packages[name] = PackageMetadata(
name=name,
version=version,
source=source,
)
for name, pkg_source in loadedPackages.to_py().items():
if name in packages:
continue
if name in lockfile_packages:
version = lockfile_packages[name]["version"]
source_ = "pyodide"
if pkg_source != "default channel":
# Pyodide package loaded from a custom URL
source_ = pkg_source
else:
# TODO: calculate version from wheel metadata
version = "unknown"
source_ = pkg_source
packages[name] = PackageMetadata(name=name, version=version, source=source_)
return packages
PK `ZZZ�q micropip/logging.pyimport logging
import sys
from collections.abc import Generator
from contextlib import contextmanager
from typing import Any
_logger: logging.Logger | None = None
_indentation: int = 0
@contextmanager
def indent_log(num: int = 2) -> Generator[None, None, None]:
"""
A context manager which will cause the log output to be indented for any
log messages emitted inside it.
"""
global _indentation
_indentation += num
try:
yield
finally:
_indentation -= num
# borrowed from pip._internal.utils.logging
class IndentingFormatter(logging.Formatter):
default_time_format = "%Y-%m-%dT%H:%M:%S"
def __init__(
self,
*args: Any,
add_timestamp: bool = False,
**kwargs: Any,
) -> None:
"""
A logging.Formatter that obeys the indent_log() context manager.
:param add_timestamp: A bool indicating output lines should be prefixed
with their record's timestamp.
"""
self.add_timestamp = add_timestamp
super().__init__(*args, **kwargs)
def get_message_start(self, formatted: str, levelno: int) -> str:
"""
Return the start of the formatted log message (not counting the
prefix to add to each line).
"""
if levelno < logging.WARNING:
return ""
if levelno < logging.ERROR:
return "WARNING: "
return "ERROR: "
def format(self, record: logging.LogRecord) -> str:
"""
Calls the standard formatter, but will indent all of the log message
lines by our current indentation level.
"""
global _indentation
formatted = super().format(record)
message_start = self.get_message_start(formatted, record.levelno)
formatted = message_start + formatted
prefix = ""
if self.add_timestamp:
prefix = f"{self.formatTime(record)} "
prefix += " " * _indentation
formatted = "".join([prefix + line for line in formatted.splitlines(True)])
return formatted
def _set_formatter_once() -> None:
global _logger
if _logger is not None:
return
_logger = logging.getLogger("micropip")
ch = logging.StreamHandler(sys.stdout)
ch.setLevel(logging.NOTSET)
ch.setFormatter(IndentingFormatter())
_logger.addHandler(ch)
class LoggerWrapper:
# need a default value because of __getattr__/__setattr__
logger: logging.Logger = None # type: ignore[assignment]
def __init__(self, logger: logging.Logger):
# Bypassing __setattr__ by setting attributes directly in __dict__
self.__dict__["logger"] = logger
def __getattr__(self, attr):
return getattr(self.logger, attr)
def __setattr__(self, attr, value):
return setattr(self.logger, attr, value)
@contextmanager
def ctx_level(self, verbosity: int | bool | None = None):
cur_level = self.logger.level
if verbosity is not None:
if verbosity > 2:
raise ValueError(
"verbosity should be in 0,1,2, False, True, if you are "
"directly setting level using logging.LEVEL, please "
"directly call `setLevel` on the logger."
)
elif verbosity >= 2:
level_number = logging.DEBUG
elif verbosity == 1: # True == 1
level_number = logging.INFO
else:
level_number = logging.WARNING
self.logger.setLevel(level_number)
try:
yield self.logger
finally:
self.logger.setLevel(cur_level)
def setup_logging() -> LoggerWrapper:
_set_formatter_once()
assert _logger
return LoggerWrapper(_logger)
# TODO: expose this somehow
def set_log_level(verbosity: int | bool):
if verbosity >= 2:
level_number = logging.DEBUG
elif verbosity == 1: # True == 1
level_number = logging.INFO
else:
level_number = logging.WARNING
assert _logger
_logger.setLevel(level_number)
PK `ZZZ١�9 9 micropip/metadata.py"""
This is a stripped down version of pip._vendor.pkg_resources.DistInfoDistribution
"""
import re
import zipfile
from collections.abc import Iterable
from pathlib import Path
from packaging.requirements import Requirement
from packaging.utils import canonicalize_name
def safe_name(name):
"""Convert an arbitrary string to a standard distribution name
Any runs of non-alphanumeric/. characters are replaced with a single '-'.
"""
return re.sub("[^A-Za-z0-9.]+", "-", name)
def safe_extra(extra):
"""Convert an arbitrary string to a standard 'extra' name
Any runs of non-alphanumeric characters are replaced with a single '_',
and the result is always lowercased.
"""
return re.sub("[^A-Za-z0-9.-]+", "_", extra).lower()
# Vendored from pip
class UnsupportedWheel(Exception):
"""Unsupported wheel."""
def wheel_dist_info_dir(source: zipfile.ZipFile, name: str) -> str:
"""Returns the name of the contained .dist-info directory.
Raises UnsupportedWheel if not found, >1 found, or it doesn't match the
provided name.
"""
# Zip file path separators must be /
subdirs = {p.split("/", 1)[0] for p in source.namelist()}
info_dirs = [s for s in subdirs if s.endswith(".dist-info")]
if not info_dirs:
raise UnsupportedWheel(f".dist-info directory not found in wheel {name!r}")
if len(info_dirs) > 1:
raise UnsupportedWheel(
"multiple .dist-info directories found in wheel {!r}: {}".format(
name, ", ".join(info_dirs)
)
)
info_dir = info_dirs[0]
info_dir_name = canonicalize_name(info_dir)
canonical_name = canonicalize_name(name)
if not info_dir_name.startswith(canonical_name):
raise UnsupportedWheel(
f".dist-info directory {info_dir!r} does not start with {canonical_name!r}"
)
return info_dir
class Metadata:
"""
Represents a metadata file in a wheel
"""
PKG_INFO = "METADATA"
REQUIRES_DIST = "Requires-Dist:"
PROVIDES_EXTRA = "Provides-Extra:"
def __init__(self, metadata: Path | zipfile.Path | bytes):
self.metadata: list[str] = []
if isinstance(metadata, Path | zipfile.Path):
self.metadata = metadata.read_text(encoding="utf-8").splitlines()
elif isinstance(metadata, bytes):
self.metadata = metadata.decode("utf-8").splitlines()
self.deps = self._compute_dependencies()
def _parse_requirement(self, line: str) -> Requirement:
line = line[len(self.REQUIRES_DIST) :]
if " #" in line:
line = line[: line.find(" #")]
return Requirement(line.strip())
def _compute_dependencies(self) -> dict[str | None, frozenset[Requirement]]:
"""
Compute the dependencies of the metadata file
"""
deps: dict[str | None, frozenset[Requirement]] = {}
reqs: list[Requirement] = []
extras: list[str] = []
def reqs_for_extra(extra: str | None) -> Iterable[Requirement]:
environment = {"extra": extra} if extra else None
for req in reqs:
if not req.marker or req.marker.evaluate(environment):
yield req
for line in self.metadata:
if line.startswith(self.REQUIRES_DIST):
reqs.append(self._parse_requirement(line))
elif line.startswith(self.PROVIDES_EXTRA):
extras.append(line[len(self.PROVIDES_EXTRA) :].strip())
deps[None] = frozenset(reqs_for_extra(None))
for extra in extras:
deps[safe_extra(extra)] = frozenset(reqs_for_extra(extra)) - deps[None]
return deps
def requires(self, extras: Iterable[str] = ()) -> list[Requirement]:
"""List of Requirements needed for this distro if `extras` are used"""
deps: list[Requirement] = []
deps.extend(self.deps.get(None, ()))
for ext in extras:
try:
deps.extend(self.deps[safe_extra(ext)])
except KeyError:
raise KeyError(f"Unknown extra {ext!r}") from None
return deps
PK `ZZZ$��� � micropip/package.pyfrom collections import UserDict
from collections.abc import Iterable
from dataclasses import astuple, dataclass
from typing import Any
from packaging.utils import canonicalize_name
__all__ = ["PackageDict"]
def _format_table(headers: list[str], table: Iterable[Iterable[Any]]) -> str:
"""
Returns a minimal formatted table
>>> print(_format_table(["Header1", "Header2"], [["val1", "val2"], ["val3", "val4"]]))
Header1 | Header2
------- | -------
val1 | val2
val3 | val4
"""
def format_row(values, widths, filler=""):
row = " | ".join(
f"{x:{filler}<{w}}" for x, w in zip(values, widths, strict=True)
)
return row.rstrip()
col_width = [max(len(x) for x in col) for col in zip(headers, *table, strict=True)]
rows = []
rows.append(format_row(headers, col_width))
rows.append(format_row([""] * len(col_width), col_width, filler="-"))
for line in table:
rows.append(format_row(line, col_width))
return "\n".join(rows)
@dataclass
class PackageMetadata:
name: str
version: str = ""
source: str = ""
def __iter__(self):
return iter(astuple(self))
@staticmethod
def keys():
return PackageMetadata.__dataclass_fields__.keys()
class PackageDict(UserDict[str, PackageMetadata]):
"""
A dictionary that holds list of metadata on packages.
This class is used in micropip to keep the list of installed packages.
"""
def __repr__(self) -> str:
return self._tabularize()
def __getitem__(self, key):
normalized_key = canonicalize_name(key)
return super().__getitem__(normalized_key)
def __setitem__(self, key, val):
normalized_key = canonicalize_name(key)
return super().__setitem__(normalized_key, val)
def __contains__(self, key: str) -> bool: # type: ignore[override]
normalized_key = canonicalize_name(key)
return super().__contains__(normalized_key)
def _tabularize(self) -> str:
headers = [key.capitalize() for key in PackageMetadata.keys()]
table = list(self.values())
return _format_table(headers, table)
PK `ZZZ����, �, micropip/package_index.pyimport json
import logging
import string
import sys
from collections import defaultdict
from collections.abc import Callable, Generator
from dataclasses import dataclass
from functools import partial
from typing import Any
from urllib.parse import urlparse, urlunparse
from packaging.utils import InvalidWheelFilename
from packaging.version import InvalidVersion, Version
from ._compat import HttpStatusError, fetch_string_and_headers
from ._utils import is_package_compatible, parse_version
from .externals.mousebender.simple import from_project_details_html
from .types import DistributionMetadata
from .wheelinfo import WheelInfo
PYPI = "PYPI"
PYPI_URL = "https://pypi.org/simple"
DEFAULT_INDEX_URLS = [PYPI_URL]
_formatter = string.Formatter()
logger = logging.getLogger("micropip")
@dataclass
class ProjectInfo:
"""
This class stores common metadata that can be obtained from different APIs (JSON, Simple)
provided by PyPI. Responses received from PyPI or other package indexes that support the
same APIs must be converted to this class before being processed by micropip.
"""
name: str # Name of the package
# List of releases available for the package, sorted in ascending order by version.
# For each version, list of wheels compatible with the current platform are stored.
# If no such wheel is available, the list is empty.
releases: dict[Version, Generator[WheelInfo, None, None]]
@staticmethod
def from_json_api(data: str | bytes | dict[str, Any]) -> "ProjectInfo":
"""
Parse JSON API response
https://warehouse.pypa.io/api-reference/json.html
"""
data_dict = json.loads(data) if isinstance(data, str | bytes) else data
name: str = data_dict.get("info", {}).get("name", "UNKNOWN")
releases_raw: dict[str, list[Any]] = data_dict["releases"]
# Filter out non PEP 440 compliant versions
releases: dict[Version, list[Any]] = {}
for version_str, fileinfo in releases_raw.items():
version, ok = _is_valid_pep440_version(version_str)
if not ok or not version:
continue
# Skip empty releases
if not fileinfo:
continue
releases[version] = fileinfo
return ProjectInfo._compatible_only(name, releases)
@staticmethod
def from_simple_json_api(data: str | bytes | dict[str, Any]) -> "ProjectInfo":
"""
Parse Simple JSON API response
https://peps.python.org/pep-0691/
"""
data_dict = json.loads(data) if isinstance(data, str | bytes) else data
name, releases = ProjectInfo._parse_pep691_response(
data_dict, index_base_url=""
)
return ProjectInfo._compatible_only(name, releases)
@staticmethod
def from_simple_html_api(
data: str, pkgname: str, index_base_url: str
) -> "ProjectInfo":
"""
Parse Simple HTML API response
https://peps.python.org/pep-0503
"""
project_detail = from_project_details_html(data, pkgname)
name, releases = ProjectInfo._parse_pep691_response(project_detail, index_base_url) # type: ignore[arg-type]
return ProjectInfo._compatible_only(name, releases)
@staticmethod
def _parse_pep691_response(
resp: dict[str, Any], index_base_url: str
) -> tuple[str, dict[Version, list[Any]]]:
name = resp["name"]
# List of versions (PEP 700), this key is not critical to find packages
# but it is required to ensure that the same class instance is returned
# from JSON and Simple JSON APIs.
# Note that Simple HTML API does not have this key.
versions = resp.get("versions", [])
# Group files by version
releases: dict[Version, list[Any]] = defaultdict(list)
for version_str in versions:
version, ok = _is_valid_pep440_version(version_str)
if not ok or not version:
continue
releases[version] = []
for file in resp["files"]:
filename = file["filename"]
if not _fast_check_incompatibility(filename):
# parsing a wheel filename is expensive, so we do a quick check first
continue
try:
version = parse_version(filename)
except (InvalidVersion, InvalidWheelFilename):
continue
if file["url"].startswith("/"):
file["url"] = index_base_url + file["url"]
releases[version].append(file)
return name, releases
@classmethod
def _compatible_wheels(
cls, files: list[dict[str, Any]], version: Version, name: str
) -> Generator[WheelInfo, None, None]:
for file in files:
filename = file["filename"]
# Checking compatibility takes a bit of time,
# so we use a generator to avoid doing it for all files.
compatible = is_package_compatible(filename)
if not compatible:
continue
# JSON API has a "digests" key, while Simple API has a "hashes" key.
hashes = file["digests"] if "digests" in file else file["hashes"]
sha256 = hashes.get("sha256")
# Check if the metadata file is available (PEP 658 / PEP-714)
core_metadata: DistributionMetadata = file.get("core-metadata") or file.get(
"data-dist-info-metadata"
)
# Size of the file in bytes, if available (PEP 700)
# This key is not available in the Simple API HTML response, so this field may be None
size = file.get("size")
yield WheelInfo.from_package_index(
name=name,
filename=filename,
url=file["url"],
version=version,
sha256=sha256,
size=size,
core_metadata=core_metadata,
)
@classmethod
def _compatible_only(
cls, name: str, releases: dict[Version, list[dict[str, Any]]]
) -> "ProjectInfo":
"""
Return a generator of wheels compatible with the current platform.
Checking compatibility takes a bit of time, so we use a generator to avoid doing it if not needed.
"""
releases_compatible = {
version: cls._compatible_wheels(files, version, name=name)
for version, files in releases.items()
}
# Unfortunately, the JSON API seems to compare versions as strings...
# For example, pytest 3.10.0 is considered newer than 3.2.0.
# So we need to sort the releases by version again here.
releases_compatible = dict(sorted(releases_compatible.items()))
return cls(
name=name,
releases=releases_compatible,
)
def _is_valid_pep440_version(version_str: str) -> tuple[Version | None, bool]:
"""
Check if the given string is a valid PEP 440 version.
Since parsing a version is expensive, we return the parsed version as well,
so that it can be reused if needed.
"""
try:
version = Version(version_str)
return version, True
except InvalidVersion:
return None, False
def _fast_check_incompatibility(filename: str) -> bool:
"""
This function returns True is the package is incompatible with the current platform.
It can be used to quickly filter out incompatible packages before running heavy checks.
Note that this function may return False for some packages that are actually incompatible.
So it should only be used as a quick check.
"""
if not filename.endswith(".whl"):
return False
if filename.endswith("wasm32.whl") and sys.platform == "emscripten":
return True
if sys.platform not in filename and not filename.endswith("-none-any.whl"):
return False
return True
def _contain_placeholder(url: str, placeholder: str = "package_name") -> bool:
fields = [parsed[1] for parsed in _formatter.parse(url)]
return placeholder in fields
def _select_parser(
content_type: str, pkgname: str, index_base_url: str
) -> Callable[[str], ProjectInfo]:
"""
Select the function to parse the response based on the content type.
"""
match content_type:
case "application/vnd.pypi.simple.v1+json":
return ProjectInfo.from_simple_json_api
case "application/json":
return ProjectInfo.from_json_api
case (
"application/vnd.pypi.simple.v1+html"
| "text/html"
| "text/html; charset=utf-8"
):
return partial(
ProjectInfo.from_simple_html_api,
pkgname=pkgname,
index_base_url=index_base_url,
)
case _:
raise ValueError(f"Unsupported content type: {content_type}")
async def query_package(
name: str,
index_urls: list[str] | str,
fetch_kwargs: dict[str, Any] | None = None,
) -> ProjectInfo:
"""
Query for a package from package indexes.
Parameters
----------
name
Name of the package to search for.
index_urls
A list of URLs or a single URL to use as the package index.
If a list of URLs is provided, it will be tried in order until
it finds a package. If no package is found, an error will be raised.
fetch_kwargs
Keyword arguments to pass to the fetch function.
"""
_fetch_kwargs = fetch_kwargs.copy() if fetch_kwargs else {}
if "headers" not in _fetch_kwargs:
_fetch_kwargs["headers"] = {}
# If not specified, prefer Simple JSON API over Simple HTML API or JSON API
_fetch_kwargs["headers"].setdefault(
"accept", "application/vnd.pypi.simple.v1+json, */*;q=0.01"
)
if isinstance(index_urls, str):
index_urls = [index_urls]
index_urls = [PYPI_URL if url == PYPI else url for url in index_urls]
for url in index_urls:
logger.debug("Looping through index urls: %r", url)
if _contain_placeholder(url):
url = url.format(package_name=name)
logger.debug("Formatting url with package name : %r", url)
else:
url = f"{url}/{name}/"
logger.debug("Url has no placeholder, appending package name : %r", url)
try:
metadata, headers = await fetch_string_and_headers(url, _fetch_kwargs)
except HttpStatusError as e:
if e.status_code == 404:
logger.debug("NotFound (404) for %r, trying next index.", url)
continue
logger.debug(
"Error fetching %r (%s), trying next index.", url, e.status_code
)
raise
content_type = headers.get("content-type", "").lower()
try:
base_url = urlunparse(urlparse(url)._replace(path=""))
parser = _select_parser(content_type, name, index_base_url=base_url)
except ValueError as e:
raise ValueError(f"Error trying to decode url: {url}") from e
return parser(metadata)
else:
raise ValueError(
f"Can't fetch metadata for '{name}'. "
"Please make sure you have entered a correct package name "
"and correctly specified index_urls (if you changed them)."
)
PK `ZZZa�ٝ+ �+ micropip/package_manager.pyimport builtins
from typing import ( # noqa: UP035 List import is necessary due to the `list` method
Any,
List,
)
from . import _mock_package, package_index
from ._compat import REPODATA_INFO, REPODATA_PACKAGES
from .freeze import freeze_lockfile
from .install import install
from .list import list_installed_packages
from .package import PackageDict
from .uninstall import uninstall
class PackageManager:
"""
PackageManager provides an extensible interface for customizing micropip's behavior.
Each Manager instance holds its own local state that is
independent of other instances.
"""
def __init__(self) -> None:
self.index_urls = package_index.DEFAULT_INDEX_URLS[:]
self.repodata_packages: dict[str, dict[str, Any]] = REPODATA_PACKAGES
self.repodata_info: dict[str, str] = REPODATA_INFO
pass
async def install(
self,
requirements: str | list[str],
keep_going: bool = False,
deps: bool = True,
credentials: str | None = None,
pre: bool = False,
index_urls: list[str] | str | None = None,
*,
verbose: bool | int | None = None,
):
"""Install the given package and all of its dependencies.
If a package is not found in the Pyodide repository it will be loaded from
PyPI. Micropip can only load pure Python wheels or wasm32/emscripten wheels
built by Pyodide.
When used in web browsers, downloads from PyPI will be cached. When run in
Node.js, packages are currently not cached, and will be re-downloaded each
time ``micropip.install`` is run.
Parameters
----------
requirements :
A requirement or list of requirements to install. Each requirement is a
string, which should be either a package name or a wheel URI:
- If the requirement does not end in ``.whl``, it will be interpreted as
a package name. A package with this name must either be present
in the Pyodide lock file or on PyPI.
- If the requirement ends in ``.whl``, it is a wheel URI. The part of
the requirement after the last ``/`` must be a valid wheel name in
compliance with the `PEP 427 naming convention
<https://www.python.org/dev/peps/pep-0427/#file-format>`_.
- If a wheel URI starts with ``emfs:``, it will be interpreted as a path
in the Emscripten file system (Pyodide's file system). E.g.,
``emfs:../relative/path/wheel.whl`` or ``emfs:/absolute/path/wheel.whl``.
In this case, only .whl files are supported.
- If a wheel URI requirement starts with ``http:`` or ``https:`` it will
be interpreted as a URL.
- In node, you can access the native file system using a URI that starts
with ``file:``. In the browser this will not work.
keep_going :
This parameter decides the behavior of the micropip when it encounters a
Python package without a pure Python wheel while doing dependency
resolution:
- If ``False``, an error will be raised on first package with a missing
wheel.
- If ``True``, the micropip will keep going after the first error, and
report a list of errors at the end.
deps :
If ``True``, install dependencies specified in METADATA file for each
package. Otherwise do not install dependencies.
credentials :
This parameter specifies the value of ``credentials`` when calling the
`fetch() <https://developer.mozilla.org/en-US/docs/Web/API/fetch>`__
function which is used to download the package.
When not specified, ``fetch()`` is called without ``credentials``.
pre :
If ``True``, include pre-release and development versions. By default,
micropip only finds stable versions.
index_urls :
A list of URLs or a single URL to use as the package index when looking
up packages. If None, *https://pypi.org/pypi/{package_name}/json* is used.
- The index URL should support the
`JSON API <https://warehouse.pypa.io/api-reference/json/>`__ .
- The index URL may contain the placeholder {package_name} which will be
replaced with the package name when looking up a package. If it does not
contain the placeholder, the package name will be appended to the URL.
- If a list of URLs is provided, micropip will try each URL in order until
it finds a package. If no package is found, an error will be raised.
verbose :
Print more information about the process. By default, micropip does not
change logger level. Setting ``verbose=True`` will print similar
information as pip.
"""
if index_urls is None:
index_urls = self.index_urls
return await install(
requirements,
index_urls,
keep_going,
deps,
credentials,
pre,
verbose=verbose,
)
def list(self) -> PackageDict:
"""Get the dictionary of installed packages.
Returns
-------
``PackageDict``
A dictionary of installed packages.
>>> import micropip
>>> await micropip.install('regex') # doctest: +SKIP
>>> package_list = micropip.list()
>>> print(package_list) # doctest: +SKIP
Name | Version | Source
----------------- | -------- | -------
regex | 2021.7.6 | pyodide
>>> "regex" in package_list # doctest: +SKIP
True
"""
return list_installed_packages(self.repodata_packages)
def freeze(self) -> str:
"""Produce a json string which can be used as the contents of the
``pyodide-lock.json`` lock file.
If you later load Pyodide with this lock file, you can use
:js:func:`pyodide.loadPackage` to load packages that were loaded with :py:mod:`micropip`
this time. Loading packages with :js:func:`~pyodide.loadPackage` is much faster
and you will always get consistent versions of all your dependencies.
You can use your custom lock file by passing an appropriate url to the
``lockFileURL`` of :js:func:`~globalThis.loadPyodide`.
"""
return freeze_lockfile(self.repodata_packages, self.repodata_info)
def add_mock_package(
self,
name: str,
version: str,
*,
modules: dict[str, str | None] | None = None,
persistent: bool = False,
):
"""
Add a mock version of a package to the package dictionary.
This means that if it is a dependency, it is skipped on install.
By default a single empty module is installed with the same
name as the package. You can alternatively give one or more modules to make a
set of named modules.
The modules parameter is usually a dictionary mapping module name to module text.
.. code-block:: python
{
"mylovely_module":'''
def module_method(an_argument):
print("This becomes a module level argument")
module_value = "this value becomes a module level variable"
print("This is run on import of module")
'''
}
If you are adding the module in non-persistent mode, you can also pass functions
which are used to initialize the module on loading (as in `importlib.abc.loader.exec_module` ).
This allows you to do things like use `unittest.mock.MagicMock` classes for modules.
.. code-block:: python
def init_fn(module):
module.dict["WOO"]="hello"
print("Initing the module now!")
...
{
"mylovely_module": init_fn
}
Parameters
----------
name :
Package name to add
version :
Version of the package. This should be a semantic version string,
e.g. 1.2.3
modules :
Dictionary of module_name:string pairs.
The string contains the source of the mock module or is blank for
an empty module.
persistent :
If this is True, modules will be written to the file system, so they
persist between runs of python (assuming the file system persists).
If it is False, modules will be stored inside micropip in memory only.
"""
return _mock_package.add_mock_package(
name, version, modules=modules, persistent=persistent
)
def list_mock_packages(self):
"""
List all mock packages currently installed.
"""
return _mock_package.list_mock_packages()
def remove_mock_package(self, name: str):
"""
Remove a mock package.
"""
return _mock_package.remove_mock_package(name)
def uninstall(
self, packages: str | builtins.list[str], *, verbose: bool | int = False
) -> None:
"""Uninstall the given packages.
This function only supports uninstalling packages that are installed
using a wheel file, i.e. packages that have distribution metadata.
It is possible to reinstall a package after uninstalling it, but
note that modules / functions that are already imported will not be
automatically removed from the namespace. So make sure to reload
the module after reinstalling by e.g. running `importlib.reload(module)`.
Parameters
----------
packages
Packages to uninstall.
verbose
Print more information about the process.
By default, micropip is silent. Setting ``verbose=True`` will print
similar information as pip.
"""
return uninstall(packages, verbose=verbose)
def set_index_urls(self, urls: List[str] | str): # noqa: UP006
"""
Set the index URLs to use when looking up packages.
- The index URL should support the
`JSON API <https://warehouse.pypa.io/api-reference/json/>`__ .
- The index URL may contain the placeholder {package_name} which will be
replaced with the package name when looking up a package. If it does not
contain the placeholder, the package name will be appended to the URL.
- If a list of URLs is provided, micropip will try each URL in order until
it finds a package. If no package is found, an error will be raised.
Parameters
----------
urls
A list of URLs or a single URL to use as the package index.
"""
if isinstance(urls, str):
urls = [urls]
self.index_urls = urls[:]
PK `ZZZ micropip/py.typedPK `ZZZ~;��, �, micropip/transaction.pyimport asyncio
import importlib.metadata
import logging
import warnings
from dataclasses import dataclass, field
from importlib.metadata import PackageNotFoundError
from urllib.parse import urlparse
from packaging.requirements import Requirement
from packaging.utils import canonicalize_name
from . import package_index
from ._compat import REPODATA_PACKAGES
from ._utils import best_compatible_tag_index, check_compatible
from .constants import FAQ_URLS
from .package import PackageMetadata
from .package_index import ProjectInfo
from .wheelinfo import WheelInfo
logger = logging.getLogger("micropip")
@dataclass
class Transaction:
ctx: dict[str, str]
ctx_extras: list[dict[str, str]]
keep_going: bool
deps: bool
pre: bool
fetch_kwargs: dict[str, str]
index_urls: list[str] | str
locked: dict[str, PackageMetadata] = field(default_factory=dict)
wheels: list[WheelInfo] = field(default_factory=list)
pyodide_packages: list[PackageMetadata] = field(default_factory=list)
failed: list[Requirement] = field(default_factory=list)
verbose: bool | int | None = None
def __post_init__(self):
# If index_urls is None, pyodide-lock.json have to be searched first.
# TODO: when PyPI starts to support hosting WASM wheels, this might be deleted.
self.search_pyodide_lock_first = (
self.index_urls == package_index.DEFAULT_INDEX_URLS
)
async def gather_requirements(
self,
requirements: list[str] | list[Requirement],
) -> None:
requirement_promises = []
for requirement in requirements:
requirement_promises.append(self.add_requirement(requirement))
await asyncio.gather(*requirement_promises)
async def add_requirement(self, req: str | Requirement) -> None:
if isinstance(req, Requirement):
return await self.add_requirement_inner(req)
if not urlparse(req).path.endswith(".whl"):
return await self.add_requirement_inner(Requirement(req))
# custom download location
wheel = WheelInfo.from_url(req)
check_compatible(wheel.filename)
await self.add_wheel(wheel, extras=set(), specifier="")
def check_version_satisfied(self, req: Requirement) -> tuple[bool, str]:
ver = None
try:
ver = importlib.metadata.version(req.name)
except PackageNotFoundError:
pass
if req.name in self.locked:
ver = self.locked[req.name].version
if not ver:
return False, ""
if req.specifier.contains(ver, prereleases=True):
# installed version matches, nothing to do
return True, ver
raise ValueError(
f"Requested '{req}', " f"but {req.name}=={ver} is already installed"
)
async def add_requirement_inner(
self,
req: Requirement,
) -> None:
"""Add a requirement to the transaction.
See PEP 508 for a description of the requirements.
https://www.python.org/dev/peps/pep-0508
"""
for e in req.extras:
self.ctx_extras.append({"extra": e})
if self.pre:
req.specifier.prereleases = True
if req.marker:
# handle environment markers
# https://www.python.org/dev/peps/pep-0508/#environment-markers
# For a requirement being installed as part of an optional feature
# via the extra specifier, the evaluation of the marker requires
# the extra key in self.ctx to have the value specified in the
# primary requirement.
# The req.extras attribute is only set for the primary requirement
# and hence has to be available during the evaluation of the
# dependencies. Thus, we use the self.ctx_extras attribute above to
# store all the extra values we come across during the transaction and
# attempt the marker evaluation for all of these values. If any of the
# evaluations return true we include the dependency.
def eval_marker(e: dict[str, str]) -> bool:
self.ctx.update(e)
# need the assertion here to make mypy happy:
# https://github.com/python/mypy/issues/4805
assert req.marker is not None
return req.marker.evaluate(self.ctx)
self.ctx.update({"extra": ""})
# The current package may have been brought into the transaction
# without any of the optional requirement specification, but has
# another marker, such as implementation_name. In this scenario,
# self.ctx_extras is empty and hence the eval_marker() function
# will not be called at all.
if not req.marker.evaluate(self.ctx) and not any(
[eval_marker(e) for e in self.ctx_extras]
):
return
# Is some version of this package is already installed?
req.name = canonicalize_name(req.name)
satisfied, ver = self.check_version_satisfied(req)
if satisfied:
logger.info("Requirement already satisfied: %s (%s)", req, ver)
return
try:
if self.search_pyodide_lock_first:
if self._add_requirement_from_pyodide_lock(req):
logger.debug("Transaction: package found in lock file: %r", req)
return
await self._add_requirement_from_package_index(req)
else:
try:
await self._add_requirement_from_package_index(req)
except ValueError:
logger.debug(
"Transaction: package %r not found in index, will search lock file",
req,
)
# If the requirement is not found in package index,
# we still have a chance to find it from pyodide lockfile.
if not self._add_requirement_from_pyodide_lock(req):
logger.debug(
"Transaction: package %r not found in lock file", req
)
raise
except ValueError:
self.failed.append(req)
if not self.keep_going:
raise
def _add_requirement_from_pyodide_lock(self, req: Requirement) -> bool:
"""
Find requirement from pyodide-lock.json. If the requirement is found,
add it to the package list and return True. Otherwise, return False.
"""
if req.name in REPODATA_PACKAGES and req.specifier.contains(
REPODATA_PACKAGES[req.name]["version"], prereleases=True
):
version = REPODATA_PACKAGES[req.name]["version"]
self.pyodide_packages.append(
PackageMetadata(name=req.name, version=str(version), source="pyodide")
)
return True
return False
async def _add_requirement_from_package_index(self, req: Requirement):
"""
Find requirement from package index. If the requirement is found,
add it to the package list and return True. Otherwise, return False.
"""
metadata = await package_index.query_package(
req.name,
self.index_urls,
self.fetch_kwargs,
)
logger.debug("Transaction: got metadata %r for requirement %r", metadata, req)
wheel = find_wheel(metadata, req)
logger.debug("Transaction: Selected wheel: %r", wheel)
# Maybe while we were downloading pypi_json some other branch
# installed the wheel?
satisfied, ver = self.check_version_satisfied(req)
if satisfied:
logger.info("Requirement already satisfied: %s (%s)", req, ver)
await self.add_wheel(wheel, req.extras, specifier=str(req.specifier))
async def add_wheel(
self,
wheel: WheelInfo,
extras: set[str],
*,
specifier: str = "",
) -> None:
"""
Download a wheel, and add its dependencies to the transaction.
Parameters
----------
wheel
The wheel to add.
extras
Markers for optional dependencies.
For example, `micropip.install("pkg[test]")`
will pass `{"test"}` as the extras argument.
specifier
Requirement specifier, used only for logging.
For example, `micropip.install("pkg>=1.0.0,!=2.0.0")`
will pass `>=1.0.0,!=2.0.0` as the specifier argument.
"""
normalized_name = canonicalize_name(wheel.name)
self.locked[normalized_name] = PackageMetadata(
name=wheel.name,
version=str(wheel.version),
)
logger.info("Collecting %s%s", wheel.name, specifier)
logger.info(" Downloading %s", wheel.url.split("/")[-1])
wheel_download_task = asyncio.create_task(wheel.download(self.fetch_kwargs))
if self.deps:
# Case 1) If metadata file is available,
# we can gather requirements without waiting for the wheel to be downloaded.
if wheel.pep658_metadata_available():
try:
await wheel.download_pep658_metadata(self.fetch_kwargs)
except OSError:
# If something goes wrong while downloading the metadata,
# we have to wait for the wheel to be downloaded.
await wheel_download_task
await asyncio.gather(
self.gather_requirements(wheel.requires(extras)),
wheel_download_task,
)
# Case 2) If metadata file is not available,
# we have to wait for the wheel to be downloaded.
else:
await wheel_download_task
await self.gather_requirements(wheel.requires(extras))
self.wheels.append(wheel)
def find_wheel(metadata: ProjectInfo, req: Requirement) -> WheelInfo:
"""Parse metadata to find the latest version of pure python wheel.
Parameters
----------
metadata : ProjectInfo
req : Requirement
Returns
-------
wheel : WheelInfo
"""
releases = metadata.releases
candidate_versions = sorted(
req.specifier.filter(releases),
reverse=True,
)
for ver in candidate_versions:
if ver not in releases:
warnings.warn(
f"The package '{metadata.name}' contains an invalid version: '{ver}'. This version will be skipped",
stacklevel=1,
)
continue
best_wheel = None
best_tag_index = float("infinity")
wheels = releases[ver]
for wheel in wheels:
tag_index = best_compatible_tag_index(wheel.tags)
if tag_index is not None and tag_index < best_tag_index:
best_wheel = wheel
best_tag_index = tag_index
if best_wheel is not None:
return wheel
raise ValueError(
f"Can't find a pure Python 3 wheel for '{req}'.\n"
f"See: {FAQ_URLS['cant_find_wheel']}\n"
"You can use `await micropip.install(..., keep_going=True)` "
"to get a list of all packages with missing wheels."
)
PK `ZZZ-L9� � micropip/types.py# Distribution Metadata type (PEP 658)
# None = metadata not available
# bool = metadata available, but no checksum
# dict[str, str] = metadata available with checksum
DistributionMetadata = bool | dict[str, str] | None
PK `ZZZ)�b�
micropip/uninstall.pyimport importlib
import importlib.metadata
from importlib.metadata import Distribution
from ._compat import loadedPackages
from ._utils import get_files_in_distribution, get_root
from .logging import setup_logging
def uninstall(packages: str | list[str], *, verbose: bool | int = False) -> None:
with setup_logging().ctx_level(verbose) as logger:
if isinstance(packages, str):
packages = [packages]
distributions: list[Distribution] = []
for package in packages:
try:
dist = importlib.metadata.distribution(package)
distributions.append(dist)
except importlib.metadata.PackageNotFoundError:
logger.warning("Skipping '%s' as it is not installed.", package)
for dist in distributions:
# Note: this value needs to be retrieved before removing files, as
# dist.name uses metadata file to get the name
name = dist.name
version = dist.version
logger.info("Found existing installation: %s %s", name, version)
root = get_root(dist)
files = get_files_in_distribution(dist)
directories = set()
for file in files:
if not file.is_file():
if not file.is_relative_to(root):
# This file is not in the site-packages directory. Probably one of:
# - data_files
# - scripts
# - entry_points
# Since we don't support these, we can ignore them (except for data_files (TODO))
logger.warning(
"skipping file '%s' that is relative to root",
)
continue
# see PR 130, it is likely that this is never triggered since Python 3.12
# as non existing files are not listed by get_files_in_distribution anymore.
logger.warning(
"A file '%s' listed in the metadata of '%s' does not exist.",
file,
name,
)
continue
file.unlink()
if file.parent != root:
directories.add(file.parent)
# Remove directories in reverse hierarchical order
for directory in sorted(
directories, key=lambda x: len(x.parts), reverse=True
):
try:
directory.rmdir()
except OSError:
logger.warning(
"A directory '%s' is not empty after uninstallation of '%s'. "
"This might cause problems when installing a new version of the package. ",
directory,
name,
)
if hasattr(loadedPackages, name):
delattr(loadedPackages, name)
else:
# This should not happen, but just in case
logger.warning("a package '%s' was not found in loadedPackages.", name)
logger.info("Successfully uninstalled %s-%s", name, version)
importlib.invalidate_caches()
PK `ZZZKo�# �# micropip/wheelinfo.pyimport hashlib
import io
import json
import zipfile
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any, Literal
from urllib.parse import ParseResult, urlparse
from packaging.requirements import Requirement
from packaging.tags import Tag
from packaging.version import Version
from ._compat import (
fetch_bytes,
get_dynlibs,
loadDynlibsFromPackage,
loadedPackages,
)
from ._utils import parse_wheel_filename
from .metadata import Metadata, safe_name, wheel_dist_info_dir
from .types import DistributionMetadata
@dataclass
class PackageData:
file_name: str
package_type: Literal["shared_library", "package"]
shared_library: bool
@dataclass
class WheelInfo:
"""
WheelInfo represents a wheel file and its metadata (e.g. URL and hash)
"""
name: str
version: Version
filename: str
build: tuple[int, str] | tuple[()]
tags: frozenset[Tag]
url: str
parsed_url: ParseResult
sha256: str | None = None
size: int | None = None # Size in bytes, if available (PEP 700)
core_metadata: DistributionMetadata = None # Wheel's metadata (PEP 658 / PEP-714)
# Fields below are only available after downloading the wheel, i.e. after calling `download()`.
_data: bytes | None = field(default=None, repr=False) # Wheel file contents.
_metadata: Metadata | None = None # Wheel metadata.
_requires: list[Requirement] | None = None # List of requirements.
# Path to the .dist-info directory. This is only available after extracting the wheel, i.e. after calling `extract()`.
_dist_info: Path | None = None
def __post_init__(self):
assert (
self.url.startwith(p) for p in ("http:", "https:", "emfs:", "file:")
), self.url
self._project_name = safe_name(self.name)
self.metadata_url = self.url + ".metadata"
@classmethod
def from_url(cls, url: str) -> "WheelInfo":
"""Parse wheels URL and extract available metadata
See https://www.python.org/dev/peps/pep-0427/#file-name-convention
"""
parsed_url = urlparse(url)
if parsed_url.scheme == "":
url = "file:///" + url
file_name = Path(parsed_url.path).name
name, version, build, tags = parse_wheel_filename(file_name)
return WheelInfo(
name=name,
version=version,
filename=file_name,
build=build,
tags=tags,
url=url,
parsed_url=parsed_url,
)
@classmethod
def from_package_index(
cls,
name: str,
filename: str,
url: str,
version: Version,
sha256: str | None,
size: int | None,
core_metadata: DistributionMetadata = None,
) -> "WheelInfo":
"""Extract available metadata from response received from package index"""
parsed_url = urlparse(url)
_, _, build, tags = parse_wheel_filename(filename)
return WheelInfo(
name=name,
version=version,
filename=filename,
build=build,
tags=tags,
url=url,
parsed_url=parsed_url,
sha256=sha256,
size=size,
core_metadata=core_metadata,
)
async def install(self, target: Path) -> None:
"""
Install the wheel to the target directory.
The installation process is as follows:
0. A wheel needs to be downloaded before it can be installed. This is done by calling `download()`.
1. The wheel is validated by comparing its hash with the one provided by the package index.
2. The wheel is extracted to the target directory.
3. The wheel's shared libraries are loaded.
4. The wheel's metadata is set.
"""
if not self._data:
raise RuntimeError(
"Micropip internal error: attempted to install wheel before downloading it?"
)
_validate_sha256_checksum(self._data, self.sha256)
self._extract(target)
await self._load_libraries(target)
self._set_installer()
async def download(self, fetch_kwargs: dict[str, Any]):
if self._data is not None:
return
self._data = await self._fetch_bytes(self.url, fetch_kwargs)
# The wheel's metadata might be downloaded separately from the wheel itself.
# If it is not downloaded yet or if the metadata is not available, extract it from the wheel.
if self._metadata is None:
with zipfile.ZipFile(io.BytesIO(self._data)) as zf:
metadata_path = (
Path(wheel_dist_info_dir(zf, self.name)) / Metadata.PKG_INFO
)
self._metadata = Metadata(zipfile.Path(zf, str(metadata_path)))
def pep658_metadata_available(self) -> bool:
"""
Check if the wheel's metadata is exposed via PEP 658.
"""
return self.core_metadata is not None
async def download_pep658_metadata(
self,
fetch_kwargs: dict[str, Any],
) -> None:
"""
Download the wheel's metadata. If the metadata is not available, return None.
"""
if self.core_metadata is None:
return None
data = await self._fetch_bytes(self.metadata_url, fetch_kwargs)
match self.core_metadata:
case {"sha256": checksum}: # sha256 checksum available
_validate_sha256_checksum(data, checksum)
case _: # no checksum available
pass
self._metadata = Metadata(data)
def requires(self, extras: set[str]) -> list[Requirement]:
"""
Get a list of requirements for the wheel.
"""
if self._metadata is None:
raise RuntimeError(
"Micropip internal error: attempted to get requirements before downloading the wheel?"
)
requires = self._metadata.requires(extras)
self._requires = requires
return requires
async def _fetch_bytes(self, url: str, fetch_kwargs: dict[str, Any]):
if self.parsed_url.scheme not in ("https", "http", "emfs", "file"):
# Don't raise ValueError it gets swallowed
raise TypeError(
f"Cannot download from a non-remote location: {url!r} ({self.parsed_url!r})"
)
try:
bytes = await fetch_bytes(url, fetch_kwargs)
return bytes
except OSError as e:
if self.parsed_url.hostname in [
"files.pythonhosted.org",
"cdn.jsdelivr.net",
]:
raise e
else:
raise ValueError(
f"Can't fetch wheel from {url!r}. "
"One common reason for this is when the server blocks "
"Cross-Origin Resource Sharing (CORS). "
"Check if the server is sending the correct 'Access-Control-Allow-Origin' header."
) from e
def _extract(self, target: Path) -> None:
assert self._data
with zipfile.ZipFile(io.BytesIO(self._data)) as zf:
zf.extractall(target)
self._dist_info = target / wheel_dist_info_dir(zf, self.name)
def _set_installer(self) -> None:
"""
Set the installer metadata in the wheel's .dist-info directory.
"""
assert self._data
wheel_source = "pypi" if self.sha256 is not None else self.url
self._write_dist_info("PYODIDE_SOURCE", wheel_source)
self._write_dist_info("PYODIDE_URL", self.url)
self._write_dist_info("PYODIDE_SHA256", _generate_package_hash(self._data))
self._write_dist_info("INSTALLER", "micropip")
if self._requires:
self._write_dist_info(
"PYODIDE_REQUIRES", json.dumps(sorted(x.name for x in self._requires))
)
setattr(loadedPackages, self._project_name, wheel_source)
def _write_dist_info(self, file: str, content: str) -> None:
assert self._dist_info
(self._dist_info / file).write_text(content)
async def _load_libraries(self, target: Path) -> None:
"""
Compiles shared libraries (WASM modules) in the wheel and loads them.
"""
assert self._data
pkg = PackageData(
file_name=self.filename,
package_type="package",
shared_library=False,
)
dynlibs = get_dynlibs(io.BytesIO(self._data), ".whl", target)
await loadDynlibsFromPackage(pkg, dynlibs)
def _validate_sha256_checksum(data: bytes, expected: str | None = None) -> None:
if expected is None:
# No checksums available, e.g. because installing
# from a different location than PyPI.
return
actual = _generate_package_hash(data)
if actual != expected:
raise RuntimeError(f"Invalid checksum: expected {expected}, got {actual}")
def _generate_package_hash(data: bytes) -> str:
return hashlib.sha256(data).hexdigest()
PK `ZZZ���ϰ � micropip/_compat/__init__.pyimport sys
from .compatibility_layer import CompatibilityLayer
compatibility_layer: type[CompatibilityLayer] | None = None
IN_BROWSER = "_pyodide_core" in sys.modules
if IN_BROWSER:
from ._compat_in_pyodide import CompatibilityInPyodide
compatibility_layer = CompatibilityInPyodide
else:
from ._compat_not_in_pyodide import CompatibilityNotInPyodide
compatibility_layer = CompatibilityNotInPyodide
REPODATA_INFO = compatibility_layer.repodata_info()
REPODATA_PACKAGES = compatibility_layer.repodata_packages()
fetch_bytes = compatibility_layer.fetch_bytes
fetch_string_and_headers = compatibility_layer.fetch_string_and_headers
loadedPackages = compatibility_layer.loadedPackages
loadDynlibsFromPackage = compatibility_layer.loadDynlibsFromPackage
loadPackage = compatibility_layer.loadPackage
get_dynlibs = compatibility_layer.get_dynlibs
to_js = compatibility_layer.to_js
HttpStatusError = compatibility_layer.HttpStatusError
__all__ = [
"REPODATA_INFO",
"REPODATA_PACKAGES",
"fetch_bytes",
"fetch_string_and_headers",
"loadedPackages",
"loadDynlibsFromPackage",
"loadPackage",
"get_dynlibs",
"to_js",
"HttpStatusError",
]
PK `ZZZ�*g� &