# Copyright (c) Microsoft Corporation.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import asyncio
import base64
import inspect
import re
import sys
from pathlib import Path
from types import SimpleNamespace
from typing import (
TYPE_CHECKING,
Any,
Callable,
Dict,
List,
Literal,
Optional,
Pattern,
Sequence,
Union,
cast,
)
from playwright._impl._accessibility import Accessibility
from playwright._impl._api_structures import (
AriaRole,
FilePayload,
FloatRect,
PdfMargins,
Position,
ViewportSize,
)
from playwright._impl._artifact import Artifact
from playwright._impl._clock import Clock
from playwright._impl._connection import (
ChannelOwner,
from_channel,
from_nullable_channel,
)
from playwright._impl._console_message import ConsoleMessage
from playwright._impl._download import Download
from playwright._impl._element_handle import ElementHandle
from playwright._impl._errors import Error, TargetClosedError, is_target_closed_error
from playwright._impl._event_context_manager import EventContextManagerImpl
from playwright._impl._file_chooser import FileChooser
from playwright._impl._frame import Frame
from playwright._impl._greenlets import LocatorHandlerGreenlet
from playwright._impl._har_router import HarRouter
from playwright._impl._helper import (
ColorScheme,
DocumentLoadState,
ForcedColors,
HarMode,
KeyboardModifier,
MouseButton,
ReducedMotion,
RouteFromHarNotFoundPolicy,
RouteHandler,
RouteHandlerCallback,
TimeoutSettings,
URLMatch,
URLMatchRequest,
URLMatchResponse,
WebSocketRouteHandlerCallback,
async_readfile,
async_writefile,
locals_to_params,
make_dirs_for_file,
serialize_error,
url_matches,
)
from playwright._impl._input import Keyboard, Mouse, Touchscreen
from playwright._impl._js_handle import (
JSHandle,
Serializable,
add_source_url_to_script,
parse_result,
serialize_argument,
)
from playwright._impl._network import (
Request,
Response,
Route,
WebSocketRoute,
WebSocketRouteHandler,
serialize_headers,
)
from playwright._impl._video import Video
from playwright._impl._waiter import Waiter
if TYPE_CHECKING: # pragma: no cover
from playwright._impl._browser_context import BrowserContext
from playwright._impl._fetch import APIRequestContext
from playwright._impl._locator import FrameLocator, Locator
from playwright._impl._network import WebSocket
class LocatorHandler:
locator: "Locator"
handler: Union[Callable[["Locator"], Any], Callable[..., Any]]
times: Union[int, None]
def __init__(
self, locator: "Locator", handler: Callable[..., Any], times: Union[int, None]
) -> None:
self.locator = locator
self._handler = handler
self.times = times
def __call__(self) -> Any:
arg_count = len(inspect.signature(self._handler).parameters)
if arg_count == 0:
return self._handler()
return self._handler(self.locator)
class Page(ChannelOwner):
Events = SimpleNamespace(
Close="close",
Crash="crash",
Console="console",
Dialog="dialog",
Download="download",
FileChooser="filechooser",
DOMContentLoaded="domcontentloaded",
PageError="pageerror",
Request="request",
Response="response",
RequestFailed="requestfailed",
RequestFinished="requestfinished",
FrameAttached="frameattached",
FrameDetached="framedetached",
FrameNavigated="framenavigated",
Load="load",
Popup="popup",
WebSocket="websocket",
Worker="worker",
)
accessibility: Accessibility
keyboard: Keyboard
mouse: Mouse
touchscreen: Touchscreen
def __init__(
self, parent: ChannelOwner, type: str, guid: str, initializer: Dict
) -> None:
super().__init__(parent, type, guid, initializer)
self._browser_context = cast("BrowserContext", parent)
self.accessibility = Accessibility(self._channel)
self.keyboard = Keyboard(self._channel)
self.mouse = Mouse(self._channel)
self.touchscreen = Touchscreen(self._channel)
self._main_frame: Frame = from_channel(initializer["mainFrame"])
self._main_frame._page = self
self._frames = [self._main_frame]
self._viewport_size: Optional[ViewportSize] = initializer.get("viewportSize")
self._is_closed = False
self._workers: List["Worker"] = []
self._bindings: Dict[str, Any] = {}
self._routes: List[RouteHandler] = []
self._web_socket_routes: List[WebSocketRouteHandler] = []
self._owned_context: Optional["BrowserContext"] = None
self._timeout_settings: TimeoutSettings = TimeoutSettings(
self._browser_context._timeout_settings
)
self._video: Optional[Video] = None
self._opener = cast("Page", from_nullable_channel(initializer.get("opener")))
self._close_reason: Optional[str] = None
self._close_was_called = False
self._har_routers: List[HarRouter] = []
self._locator_handlers: Dict[str, LocatorHandler] = {}
self._channel.on(
"bindingCall",
lambda params: self._on_binding(from_channel(params["binding"])),
)
self._channel.on("close", lambda _: self._on_close())
self._channel.on("crash", lambda _: self._on_crash())
self._channel.on("download", lambda params: self._on_download(params))
self._channel.on(
"fileChooser",
lambda params: self.emit(
Page.Events.FileChooser,
FileChooser(
self, from_channel(params["element"]), params["isMultiple"]
),
),
)
self._channel.on(
"frameAttached",
lambda params: self._on_frame_attached(from_channel(params["frame"])),
)
self._channel.on(
"frameDetached",
lambda params: self._on_frame_detached(from_channel(params["frame"])),
)
self._channel.on(
"locatorHandlerTriggered",
lambda params: self._loop.create_task(
self._on_locator_handler_triggered(params["uid"])
),
)
self._channel.on(
"route",
lambda params: self._loop.create_task(
self._on_route(from_channel(params["route"]))
),
)
self._channel.on(
"webSocketRoute",
lambda params: self._loop.create_task(
self._on_web_socket_route(from_channel(params["webSocketRoute"]))
),
)
self._channel.on("video", lambda params: self._on_video(params))
self._channel.on(
"webSocket",
lambda params: self.emit(
Page.Events.WebSocket, from_channel(params["webSocket"])
),
)
self._channel.on(
"worker", lambda params: self._on_worker(from_channel(params["worker"]))
)
self._closed_or_crashed_future: asyncio.Future = asyncio.Future()
self.on(
Page.Events.Close,
lambda _: (
self._closed_or_crashed_future.set_result(
self._close_error_with_reason()
)
if not self._closed_or_crashed_future.done()
else None
),
)
self.on(
Page.Events.Crash,
lambda _: (
self._closed_or_crashed_future.set_result(TargetClosedError())
if not self._closed_or_crashed_future.done()
else None
),
)
self._set_event_to_subscription_mapping(
{
Page.Events.Console: "console",
Page.Events.Dialog: "dialog",
Page.Events.Request: "request",
Page.Events.Response: "response",
Page.Events.RequestFinished: "requestFinished",
Page.Events.RequestFailed: "requestFailed",
Page.Events.FileChooser: "fileChooser",
}
)
def __repr__(self) -> str:
return f"<Page url={self.url!r}>"
def _on_frame_attached(self, frame: Frame) -> None:
frame._page = self
self._frames.append(frame)
self.emit(Page.Events.FrameAttached, frame)
def _on_frame_detached(self, frame: Frame) -> None:
self._frames.remove(frame)
frame._detached = True
self.emit(Page.Events.FrameDetached, frame)
async def _on_route(self, route: Route) -> None:
route._context = self.context
route_handlers = self._routes.copy()
for route_handler in route_handlers:
# If the page was closed we stall all requests right away.
if self._close_was_called or self.context._close_was_called:
return
if not route_handler.matches(route.request.url):
continue
if route_handler not in self._routes:
continue
if route_handler.will_expire:
self._routes.remove(route_handler)
try:
handled = await route_handler.handle(route)
finally:
if len(self._routes) == 0:
async def _update_interceptor_patterns_ignore_exceptions() -> None:
try:
await self._update_interception_patterns()
except Error:
pass
asyncio.create_task(
self._connection.wrap_api_call(
_update_interceptor_patterns_ignore_exceptions, True
)
)
if handled:
return
await self._browser_context._on_route(route)
async def _on_web_socket_route(self, web_socket_route: WebSocketRoute) -> None:
route_handler = next(
(
route_handler
for route_handler in self._web_socket_routes
if route_handler.matches(web_socket_route.url)
),
None,
)
if route_handler:
await route_handler.handle(web_socket_route)
else:
await self._browser_context._on_web_socket_route(web_socket_route)
def _on_binding(self, binding_call: "BindingCall") -> None:
func = self._bindings.get(binding_call._initializer["name"])
if func:
asyncio.create_task(binding_call.call(func))
self._browser_context._on_binding(binding_call)
def _on_worker(self, worker: "Worker") -> None:
self._workers.append(worker)
worker._page = self
self.emit(Page.Events.Worker, worker)
def _on_close(self) -> None:
self._is_closed = True
if self in self._browser_context._pages:
self._browser_context._pages.remove(self)
if self in self._browser_context._background_pages:
self._browser_context._background_pages.remove(self)
self._dispose_har_routers()
self.emit(Page.Events.Close, self)
def _on_crash(self) -> None:
self.emit(Page.Events.Crash, self)
def _on_download(self, params: Any) -> None:
url = params["url"]
suggested_filename = params["suggestedFilename"]
artifact = cast(Artifact, from_channel(params["artifact"]))
self.emit(
Page.Events.Download, Download(self, url, suggested_filename, artifact)
)
def _on_video(self, params: Any) -> None:
artifact = from_channel(params["artifact"])
self._force_video()._artifact_ready(artifact)
@property
def context(self) -> "BrowserContext":
return self._browser_context
@property
def clock(self) -> Clock:
return self._browser_context.clock
async def opener(self) -> Optional["Page"]:
if self._opener and self._opener.is_closed():
return None
return self._opener
@property
def main_frame(self) -> Frame:
return self._main_frame
def frame(self, name: str = None, url: URLMatch = None) -> Optional[Frame]:
for frame in self._frames:
if name and frame.name == name:
return frame
if url and url_matches(
self._browser_context._options.get("baseURL"), frame.url, url
):
return frame
return None
@property
def frames(self) -> List[Frame]:
return self._frames.copy()
def set_default_navigation_timeout(self, timeout: float) -> None:
self._timeout_settings.set_default_navigation_timeout(timeout)
self._channel.send_no_reply(
"setDefaultNavigationTimeoutNoReply", dict(timeout=timeout)
)
def set_default_timeout(self, timeout: float) -> None:
self._timeout_settings.set_default_timeout(timeout)
self._channel.send_no_reply("setDefaultTimeoutNoReply", dict(timeout=timeout))
async def query_selector(
self,
selector: str,
strict: bool = None,
) -> Optional[ElementHandle]:
return await self._main_frame.query_selector(selector, strict)
async def query_selector_all(self, selector: str) -> List[ElementHandle]:
return await self._main_frame.query_selector_all(selector)
async def wait_for_selector(
self,
selector: str,
timeout: float = None,
state: Literal["attached", "detached", "hidden", "visible"] = None,
strict: bool = None,
) -> Optional[ElementHandle]:
return await self._main_frame.wait_for_selector(**locals_to_params(locals()))
async def is_checked(
self, selector: str, strict: bool = None, timeout: float = None
) -> bool:
return await self._main_frame.is_checked(**locals_to_params(locals()))
async def is_disabled(
self, selector: str, strict: bool = None, timeout: float = None
) -> bool:
return await self._main_frame.is_disabled(**locals_to_params(locals()))
async def is_editable(
self, selector: str, strict: bool = None, timeout: float = None
) -> bool:
return await self._main_frame.is_editable(**locals_to_params(locals()))
async def is_enabled(
self, selector: str, strict: bool = None, timeout: float = None
) -> bool:
return await self._main_frame.is_enabled(**locals_to_params(locals()))
async def is_hidden(
self, selector: str, strict: bool = None, timeout: float = None
) -> bool:
return await self._main_frame.is_hidden(**locals_to_params(locals()))
async def is_visible(
self, selector: str, strict: bool = None, timeout: float = None
) -> bool:
return await self._main_frame.is_visible(**locals_to_params(locals()))
async def dispatch_event(
self,
selector: str,
type: str,
eventInit: Dict = None,
timeout: float = None,
strict: bool = None,
) -> None:
return await self._main_frame.dispatch_event(**locals_to_params(locals()))
async def evaluate(self, expression: str, arg: Serializable = None) -> Any:
return await self._main_frame.evaluate(expression, arg)
async def evaluate_handle(
self, expression: str, arg: Serializable = None
) -> JSHandle:
return await self._main_frame.evaluate_handle(expression, arg)
async def eval_on_selector(
self,
selector: str,
expression: str,
arg: Serializable = None,
strict: bool = None,
) -> Any:
return await self._main_frame.eval_on_selector(
selector, expression, arg, strict
)
async def eval_on_selector_all(
self,
selector: str,
expression: str,
arg: Serializable = None,
) -> Any:
return await self._main_frame.eval_on_selector_all(selector, expression, arg)
async def add_script_tag(
self,
url: str = None,
path: Union[str, Path] = None,
content: str = None,
type: str = None,
) -> ElementHandle:
return await self._main_frame.add_script_tag(**locals_to_params(locals()))
async def add_style_tag(
self, url: str = None, path: Union[str, Path] = None, content: str = None
) -> ElementHandle:
return await self._main_frame.add_style_tag(**locals_to_params(locals()))
async def expose_function(self, name: str, callback: Callable) -> None:
await self.expose_binding(name, lambda source, *args: callback(*args))
async def expose_binding(
self, name: str, callback: Callable, handle: bool = None
) -> None:
if name in self._bindings:
raise Error(f'Function "{name}" has been already registered')
if name in self._browser_context._bindings:
raise Error(
f'Function "{name}" has been already registered in the browser context'
)
self._bindings[name] = callback
await self._channel.send(
"exposeBinding", dict(name=name, needsHandle=handle or False)
)
async def set_extra_http_headers(self, headers: Dict[str, str]) -> None:
await self._channel.send(
"setExtraHTTPHeaders", dict(headers=serialize_headers(headers))
)
@property
def url(self) -> str:
return self._main_frame.url
async def content(self) -> str:
return await self._main_frame.content()
async def set_content(
self,
html: str,
timeout: float = None,
waitUntil: DocumentLoadState = None,
) -> None:
return await self._main_frame.set_content(**locals_to_params(locals()))
async def goto(
self,
url: str,
timeout: float = None,
waitUntil: DocumentLoadState = None,
referer: str = None,
) -> Optional[Response]:
return await self._main_frame.goto(**locals_to_params(locals()))
async def reload(
self,
timeout: float = None,
waitUntil: DocumentLoadState = None,
) -> Optional[Response]:
return from_nullable_channel(
await self._channel.send("reload", locals_to_params(locals()))
)
async def wait_for_load_state(
self,
state: Literal["domcontentloaded", "load", "networkidle"] = None,
timeout: float = None,
) -> None:
return await self._main_frame.wait_for_load_state(**locals_to_params(locals()))
async def wait_for_url(
self,
url: URLMatch,
waitUntil: DocumentLoadState = None,
timeout: float = None,
) -> None:
return await self._main_frame.wait_for_url(**locals_to_params(locals()))
async def wait_for_event(
self, event: str, predicate: Callable = None, timeout: float = None
) -> Any:
async with self.expect_event(event, predicate, timeout) as event_info:
pass
return await event_info
async def go_back(
self,
timeout: float = None,
waitUntil: DocumentLoadState = None,
) -> Optional[Response]:
return from_nullable_channel(
await self._channel.send("goBack", locals_to_params(locals()))
)
async def go_forward(
self,
timeout: float = None,
waitUntil: DocumentLoadState = None,
) -> Optional[Response]:
return from_nullable_channel(
await self._channel.send("goForward", locals_to_params(locals()))
)
async def request_gc(self) -> None:
await self._channel.send("requestGC")
async def emulate_media(
self,
media: Literal["null", "print", "screen"] = None,
colorScheme: ColorScheme = None,
reducedMotion: ReducedMotion = None,
forcedColors: ForcedColors = None,
) -> None:
params = locals_to_params(locals())
if "media" in params:
params["media"] = "no-override" if params["media"] == "null" else media
if "colorScheme" in params:
params["colorScheme"] = (
"no-override" if params["colorScheme"] == "null" else colorScheme
)
if "reducedMotion" in params:
params["reducedMotion"] = (
"no-override" if params["reducedMotion"] == "null" else reducedMotion
)
if "forcedColors" in params:
params["forcedColors"] = (
"no-override" if params["forcedColors"] == "null" else forcedColors
)
await self._channel.send("emulateMedia", params)
async def set_viewport_size(self, viewportSize: ViewportSize) -> None:
self._viewport_size = viewportSize
await self._channel.send("setViewportSize", locals_to_params(locals()))
@property
def viewport_size(self) -> Optional[ViewportSize]:
return self._viewport_size
async def bring_to_front(self) -> None:
await self._channel.send("bringToFront")
async def add_init_script(
self, script: str = None, path: Union[str, Path] = None
) -> None:
if path:
script = add_source_url_to_script(
(await async_readfile(path)).decode(), path
)
if not isinstance(script, str):
raise Error("Either path or script parameter must be specified")
await self._channel.send("addInitScript", dict(source=script))
async def route(
self, url: URLMatch, handler: RouteHandlerCallback, times: int = None
) -> None:
self._routes.insert(
0,
RouteHandler(
self._browser_context._options.get("baseURL"),
url,
handler,
True if self._dispatcher_fiber else False,
times,
),
)
await self._update_interception_patterns()
async def unroute(
self, url: URLMatch, handler: Optional[RouteHandlerCallback] = None
) -> None:
removed = []
remaining = []
for route in self._routes:
if route.url != url or (handler and route.handler != handler):
remaining.append(route)
else:
removed.append(route)
await self._unroute_internal(removed, remaining, "default")
async def _unroute_internal(
self,
removed: List[RouteHandler],
remaining: List[RouteHandler],
behavior: Literal["default", "ignoreErrors", "wait"] = None,
) -> None:
self._routes = remaining
await self._update_interception_patterns()
if behavior is None or behavior == "default":
return
await asyncio.gather(
*map(
lambda route: route.stop(behavior), # type: ignore
removed,
)
)
async def route_web_socket(
self, url: URLMatch, handler: WebSocketRouteHandlerCallback
) -> None:
self._web_socket_routes.insert(
0,
WebSocketRouteHandler(
self._browser_context._options.get("baseURL"), url, handler
),
)
await self._update_web_socket_interception_patterns()
def _dispose_har_routers(self) -> None:
for router in self._har_routers:
router.dispose()
self._har_routers = []
async def unroute_all(
self, behavior: Literal["default", "ignoreErrors", "wait"] = None
) -> None:
await self._unroute_internal(self._routes, [], behavior)
self._dispose_har_routers()
async def route_from_har(
self,
har: Union[Path, str],
url: Union[Pattern[str], str] = None,
notFound: RouteFromHarNotFoundPolicy = None,
update: bool = None,
updateContent: Literal["attach", "embed"] = None,
updateMode: HarMode = None,
) -> None:
if update:
await self._browser_context._record_into_har(
har=har,
page=self,
url=url,
update_content=updateContent,
update_mode=updateMode,
)
return
router = await HarRouter.create(
local_utils=self._connection.local_utils,
file=str(har),
not_found_action=notFound or "abort",
url_matcher=url,
)
self._har_routers.append(router)
await router.add_page_route(self)
async def _update_interception_patterns(self) -> None:
patterns = RouteHandler.prepare_interception_patterns(self._routes)
await self._channel.send(
"setNetworkInterceptionPatterns", {"patterns": patterns}
)
async def _update_web_socket_interception_patterns(self) -> None:
patterns = WebSocketRouteHandler.prepare_interception_patterns(
self._web_socket_routes
)
await self._channel.send(
"setWebSocketInterceptionPatterns", {"patterns": patterns}
)
async def screenshot(
self,
timeout: float = None,
type: Literal["jpeg", "png"] = None,
path: Union[str, Path] = None,
quality: int = None,
omitBackground: bool = None,
fullPage: bool = None,
clip: FloatRect = None,
animations: Literal["allow", "disabled"] = None,
caret: Literal["hide", "initial"] = None,
scale: Literal["css", "device"] = None,
mask: Sequence["Locator"] = None,
maskColor: str = None,
style: str = None,
) -> bytes:
params = locals_to_params(locals())
if "path" in params:
del params["path"]
if "mask" in params:
params["mask"] = list(
map(
lambda locator: (
{
"frame": locator._frame._channel,
"selector": locator._selector,
}
),
params["mask"],
)
)
encoded_binary = await self._channel.send("screenshot", params)
decoded_binary = base64.b64decode(encoded_binary)
if path:
make_dirs_for_file(path)
await async_writefile(path, decoded_binary)
return decoded_binary
async def title(self) -> str:
return await self._main_frame.title()
async def close(self, runBeforeUnload: bool = None, reason: str = None) -> None:
self._close_reason = reason
self._close_was_called = True
try:
await self._channel.send("close", locals_to_params(locals()))
if self._owned_context:
await self._owned_context.close()
except Exception as e:
if not is_target_closed_error(e) and not runBeforeUnload:
raise e
def is_closed(self) -> bool:
return self._is_closed
async def click(
self,
selector: str,
modifiers: Sequence[KeyboardModifier] = None,
position: Position = None,
delay: float = None,
button: MouseButton = None,
clickCount: int = None,
timeout: float = None,
force: bool = None,
noWaitAfter: bool = None,
trial: bool = None,
strict: bool = None,
) -> None:
return await self._main_frame.click(**locals_to_params(locals()))
async def dblclick(
self,
selector: str,
modifiers: Sequence[KeyboardModifier] = None,
position: Position = None,
delay: float = None,
button: MouseButton = None,
timeout: float = None,
force: bool = None,
noWaitAfter: bool = None,
strict: bool = None,
trial: bool = None,
) -> None:
return await self._main_frame.dblclick(**locals_to_params(locals()))
async def tap(
self,
selector: str,
modifiers: Sequence[KeyboardModifier] = None,
position: Position = None,
timeout: float = None,
force: bool = None,
noWaitAfter: bool = None,
strict: bool = None,
trial: bool = None,
) -> None:
return await self._main_frame.tap(**locals_to_params(locals()))
async def fill(
self,
selector: str,
value: str,
timeout: float = None,
noWaitAfter: bool = None,
strict: bool = None,
force: bool = None,
) -> None:
return await self._main_frame.fill(**locals_to_params(locals()))
def locator(
self,
selector: str,
hasText: Union[str, Pattern[str]] = None,
hasNotText: Union[str, Pattern[str]] = None,
has: "Locator" = None,
hasNot: "Locator" = None,
) -> "Locator":
return self._main_frame.locator(
selector,
hasText=hasText,
hasNotText=hasNotText,
has=has,
hasNot=hasNot,
)
def get_by_alt_text(
self, text: Union[str, Pattern[str]], exact: bool = None
) -> "Locator":
return self._main_frame.get_by_alt_text(text, exact=exact)
def get_by_label(
self, text: Union[str, Pattern[str]], exact: bool = None
) -> "Locator":
return self._main_frame.get_by_label(text, exact=exact)
def get_by_placeholder(
self, text: Union[str, Pattern[str]], exact: bool = None
) -> "Locator":
return self._main_frame.get_by_placeholder(text, exact=exact)
def get_by_role(
self,
role: AriaRole,
checked: bool = None,
disabled: bool = None,
expanded: bool = None,
includeHidden: bool = None,
level: int = None,
name: Union[str, Pattern[str]] = None,
pressed: bool = None,
selected: bool = None,
exact: bool = None,
) -> "Locator":
return self._main_frame.get_by_role(
role,
checked=checked,
disabled=disabled,
expanded=expanded,
includeHidden=includeHidden,
level=level,
name=name,
pressed=pressed,
selected=selected,
exact=exact,
)
def get_by_test_id(self, testId: Union[str, Pattern[str]]) -> "Locator":
return self._main_frame.get_by_test_id(testId)
def get_by_text(
self, text: Union[str, Pattern[str]], exact: bool = None
) -> "Locator":
return self._main_frame.get_by_text(text, exact=exact)
def get_by_title(
self, text: Union[str, Pattern[str]], exact: bool = None
) -> "Locator":
return self._main_frame.get_by_title(text, exact=exact)
def frame_locator(self, selector: str) -> "FrameLocator":
return self.main_frame.frame_locator(selector)
async def focus(
self, selector: str, strict: bool = None, timeout: float = None
) -> None:
return await self._main_frame.focus(**locals_to_params(locals()))
async def text_content(
self, selector: str, strict: bool = None, timeout: float = None
) -> Optional[str]:
return await self._main_frame.text_content(**locals_to_params(locals()))
async def inner_text(
self, selector: str, strict: bool = None, timeout: float = None
) -> str:
return await self._main_frame.inner_text(**locals_to_params(locals()))
async def inner_html(
self, selector: str, strict: bool = None, timeout: float = None
) -> str:
return await self._main_frame.inner_html(**locals_to_params(locals()))
async def get_attribute(
self, selector: str, name: str, strict: bool = None, timeout: float = None
) -> Optional[str]:
return await self._main_frame.get_attribute(**locals_to_params(locals()))
async def hover(
self,
selector: str,
modifiers: Sequence[KeyboardModifier] = None,
position: Position = None,
timeout: float = None,
noWaitAfter: bool = None,
force: bool = None,
strict: bool = None,
trial: bool = None,
) -> None:
return await self._main_frame.hover(**locals_to_params(locals()))
async def drag_and_drop(
self,
source: str,
target: str,
sourcePosition: Position = None,
targetPosition: Position = None,
force: bool = None,
noWaitAfter: bool = None,
timeout: float = None,
strict: bool = None,
trial: bool = None,
) -> None:
return await self._main_frame.drag_and_drop(**locals_to_params(locals()))
async def select_option(
self,
selector: str,
value: Union[str, Sequence[str]] = None,
index: Union[int, Sequence[int]] = None,
label: Union[str, Sequence[str]] = None,
element: Union["ElementHandle", Sequence["ElementHandle"]] = None,
timeout: float = None,
noWaitAfter: bool = None,
force: bool = None,
strict: bool = None,
) -> List[str]:
params = locals_to_params(locals())
return await self._main_frame.select_option(**params)
async def input_value(
self, selector: str, strict: bool = None, timeout: float = None
) -> str:
params = locals_to_params(locals())
return await self._main_frame.input_value(**params)
async def set_input_files(
self,
selector: str,
files: Union[
str, Path, FilePayload, Sequence[Union[str, Path]], Sequence[FilePayload]
],
timeout: float = None,
strict: bool = None,
noWaitAfter: bool = None,
) -> None:
return await self._main_frame.set_input_files(**locals_to_params(locals()))
async def type(
self,
selector: str,
text: str,
delay: float = None,
timeout: float = None,
noWaitAfter: bool = None,
strict: bool = None,
) -> None:
return await self._main_frame.type(**locals_to_params(locals()))
async def press(
self,
selector: str,
key: str,
delay: float = None,
timeout: float = None,
noWaitAfter: bool = None,
strict: bool = None,
) -> None:
return await self._main_frame.press(**locals_to_params(locals()))
async def check(
self,
selector: str,
position: Position = None,
timeout: float = None,
force: bool = None,
noWaitAfter: bool = None,
strict: bool = None,
trial: bool = None,
) -> None:
return await self._main_frame.check(**locals_to_params(locals()))
async def uncheck(
self,
selector: str,
position: Position = None,
timeout: float = None,
force: bool = None,
noWaitAfter: bool = None,
strict: bool = None,
trial: bool = None,
) -> None:
return await self._main_frame.uncheck(**locals_to_params(locals()))
async def wait_for_timeout(self, timeout: float) -> None:
await self._main_frame.wait_for_timeout(timeout)
async def wait_for_function(
self,
expression: str,
arg: Serializable = None,
timeout: float = None,
polling: Union[float, Literal["raf"]] = None,
) -> JSHandle:
return await self._main_frame.wait_for_function(**locals_to_params(locals()))
@property
def workers(self) -> List["Worker"]:
return self._workers.copy()
@property
def request(self) -> "APIRequestContext":
return self.context.request
async def pause(self) -> None:
default_navigation_timeout = (
self._browser_context._timeout_settings.default_navigation_timeout()
)
default_timeout = self._browser_context._timeout_settings.default_timeout()
self._browser_context.set_default_navigation_timeout(0)
self._browser_context.set_default_timeout(0)
try:
await asyncio.wait(
[
asyncio.create_task(self._browser_context._channel.send("pause")),
self._closed_or_crashed_future,
],
return_when=asyncio.FIRST_COMPLETED,
)
finally:
self._browser_context._set_default_navigation_timeout_impl(
default_navigation_timeout
)
self._browser_context._set_default_timeout_impl(default_timeout)
async def pdf(
self,
scale: float = None,
displayHeaderFooter: bool = None,
headerTemplate: str = None,
footerTemplate: str = None,
printBackground: bool = None,
landscape: bool = None,
pageRanges: str = None,
format: str = None,
width: Union[str, float] = None,
height: Union[str, float] = None,
preferCSSPageSize: bool = None,
margin: PdfMargins = None,
path: Union[str, Path] = None,
outline: bool = None,
tagged: bool = None,
) -> bytes:
params = locals_to_params(locals())
if "path" in params:
del params["path"]
encoded_binary = await self._channel.send("pdf", params)
decoded_binary = base64.b64decode(encoded_binary)
if path:
make_dirs_for_file(path)
await async_writefile(path, decoded_binary)
return decoded_binary
def _force_video(self) -> Video:
if not self._video:
self._video = Video(self)
return self._video
@property
def video(
self,
) -> Optional[Video]:
# Note: we are creating Video object lazily, because we do not know
# BrowserContextOptions when constructing the page - it is assigned
# too late during launchPersistentContext.
if not self._browser_context._options.get("recordVideo"):
return None
return self._force_video()
def _close_error_with_reason(self) -> TargetClosedError:
return TargetClosedError(
self._close_reason or self._browser_context._effective_close_reason()
)
def expect_event(
self,
event: str,
predicate: Callable = None,
timeout: float = None,
) -> EventContextManagerImpl:
return self._expect_event(
event, predicate, timeout, f'waiting for event "{event}"'
)
def _expect_event(
self,
event: str,
predicate: Callable = None,
timeout: float = None,
log_line: str = None,
) -> EventContextManagerImpl:
if timeout is None:
timeout = self._timeout_settings.timeout()
waiter = Waiter(self, f"page.expect_event({event})")
waiter.reject_on_timeout(
timeout, f'Timeout {timeout}ms exceeded while waiting for event "{event}"'
)
if log_line:
waiter.log(log_line)
if event != Page.Events.Crash:
waiter.reject_on_event(self, Page.Events.Crash, Error("Page crashed"))
if event != Page.Events.Close:
waiter.reject_on_event(
self, Page.Events.Close, lambda: self._close_error_with_reason()
)
waiter.wait_for_event(self, event, predicate)
return EventContextManagerImpl(waiter.result())
def expect_console_message(
self,
predicate: Callable[[ConsoleMessage], bool] = None,
timeout: float = None,
) -> EventContextManagerImpl[ConsoleMessage]:
return self.expect_event(Page.Events.Console, predicate, timeout)
def expect_download(
self,
predicate: Callable[[Download], bool] = None,
timeout: float = None,
) -> EventContextManagerImpl[Download]:
return self.expect_event(Page.Events.Download, predicate, timeout)
def expect_file_chooser(
self,
predicate: Callable[[FileChooser], bool] = None,
timeout: float = None,
) -> EventContextManagerImpl[FileChooser]:
return self.expect_event(Page.Events.FileChooser, predicate, timeout)
def expect_navigation(
self,
url: URLMatch = None,
waitUntil: DocumentLoadState = None,
timeout: float = None,
) -> EventContextManagerImpl[Response]:
return self.main_frame.expect_navigation(url, waitUntil, timeout)
def expect_popup(
self,
predicate: Callable[["Page"], bool] = None,
timeout: float = None,
) -> EventContextManagerImpl["Page"]:
return self.expect_event(Page.Events.Popup, predicate, timeout)
def expect_request(
self,
urlOrPredicate: URLMatchRequest,
timeout: float = None,
) -> EventContextManagerImpl[Request]:
def my_predicate(request: Request) -> bool:
if not callable(urlOrPredicate):
return url_matches(
self._browser_context._options.get("baseURL"),
request.url,
urlOrPredicate,
)
return urlOrPredicate(request)
trimmed_url = trim_url(urlOrPredicate)
log_line = f"waiting for request {trimmed_url}" if trimmed_url else None
return self._expect_event(
Page.Events.Request,
predicate=my_predicate,
timeout=timeout,
log_line=log_line,
)
def expect_request_finished(
self,
predicate: Callable[["Request"], bool] = None,
timeout: float = None,
) -> EventContextManagerImpl[Request]:
return self.expect_event(
Page.Events.RequestFinished, predicate=predicate, timeout=timeout
)
def expect_response(
self,
urlOrPredicate: URLMatchResponse,
timeout: float = None,
) -> EventContextManagerImpl[Response]:
def my_predicate(request: Response) -> bool:
if not callable(urlOrPredicate):
return url_matches(
self._browser_context._options.get("baseURL"),
request.url,
urlOrPredicate,
)
return urlOrPredicate(request)
trimmed_url = trim_url(urlOrPredicate)
log_line = f"waiting for response {trimmed_url}" if trimmed_url else None
return self._expect_event(
Page.Events.Response,
predicate=my_predicate,
timeout=timeout,
log_line=log_line,
)
def expect_websocket(
self,
predicate: Callable[["WebSocket"], bool] = None,
timeout: float = None,
) -> EventContextManagerImpl["WebSocket"]:
return self.expect_event("websocket", predicate, timeout)
def expect_worker(
self,
predicate: Callable[["Worker"], bool] = None,
timeout: float = None,
) -> EventContextManagerImpl["Worker"]:
return self.expect_event("worker", predicate, timeout)
async def set_checked(
self,
selector: str,
checked: bool,
position: Position = None,
timeout: float = None,
force: bool = None,
noWaitAfter: bool = None,
strict: bool = None,
trial: bool = None,
) -> None:
if checked:
await self.check(
selector=selector,
position=position,
timeout=timeout,
force=force,
strict=strict,
trial=trial,
)
else:
await self.uncheck(
selector=selector,
position=position,
timeout=timeout,
force=force,
strict=strict,
trial=trial,
)
async def add_locator_handler(
self,
locator: "Locator",
handler: Union[Callable[["Locator"], Any], Callable[[], Any]],
noWaitAfter: bool = None,
times: int = None,
) -> None:
if locator._frame != self._main_frame:
raise Error("Locator must belong to the main frame of this page")
if times == 0:
return
uid = await self._channel.send(
"registerLocatorHandler",
{
"selector": locator._selector,
"noWaitAfter": noWaitAfter,
},
)
self._locator_handlers[uid] = LocatorHandler(
handler=handler, times=times, locator=locator
)
async def _on_locator_handler_triggered(self, uid: str) -> None:
remove = False
try:
handler = self._locator_handlers.get(uid)
if handler and handler.times != 0:
if handler.times is not None:
handler.times -= 1
if self._dispatcher_fiber:
handler_finished_future = self._loop.create_future()
def _handler() -> None:
try:
handler()
handler_finished_future.set_result(None)
except Exception as e:
handler_finished_future.set_exception(e)
g = LocatorHandlerGreenlet(_handler)
g.switch()
await handler_finished_future
else:
coro_or_future = handler()
if coro_or_future:
await coro_or_future
remove = handler.times == 0
finally:
if remove:
del self._locator_handlers[uid]
try:
await self._connection.wrap_api_call(
lambda: self._channel.send(
"resolveLocatorHandlerNoReply", {"uid": uid, "remove": remove}
),
is_internal=True,
)
except Error:
pass
async def remove_locator_handler(self, locator: "Locator") -> None:
for uid, data in self._locator_handlers.copy().items():
if data.locator._equals(locator):
del self._locator_handlers[uid]
self._channel.send_no_reply("unregisterLocatorHandler", {"uid": uid})
class Worker(ChannelOwner):
Events = SimpleNamespace(Close="close")
def __init__(
self, parent: ChannelOwner, type: str, guid: str, initializer: Dict
) -> None:
super().__init__(parent, type, guid, initializer)
self._channel.on("close", lambda _: self._on_close())
self._page: Optional[Page] = None
self._context: Optional["BrowserContext"] = None
def __repr__(self) -> str:
return f"<Worker url={self.url!r}>"
def _on_close(self) -> None:
if self._page:
self._page._workers.remove(self)
if self._context:
self._context._service_workers.remove(self)
self.emit(Worker.Events.Close, self)
@property
def url(self) -> str:
return self._initializer["url"]
async def evaluate(self, expression: str, arg: Serializable = None) -> Any:
return parse_result(
await self._channel.send(
"evaluateExpression",
dict(
expression=expression,
arg=serialize_argument(arg),
),
)
)
async def evaluate_handle(
self, expression: str, arg: Serializable = None
) -> JSHandle:
return from_channel(
await self._channel.send(
"evaluateExpressionHandle",
dict(
expression=expression,
arg=serialize_argument(arg),
),
)
)
class BindingCall(ChannelOwner):
def __init__(
self, parent: ChannelOwner, type: str, guid: str, initializer: Dict
) -> None:
super().__init__(parent, type, guid, initializer)
async def call(self, func: Callable) -> None:
try:
frame = from_channel(self._initializer["frame"])
source = dict(context=frame._page.context, page=frame._page, frame=frame)
if self._initializer.get("handle"):
result = func(source, from_channel(self._initializer["handle"]))
else:
func_args = list(map(parse_result, self._initializer["args"]))
result = func(source, *func_args)
if inspect.iscoroutine(result):
result = await result
await self._channel.send("resolve", dict(result=serialize_argument(result)))
except Exception as e:
tb = sys.exc_info()[2]
asyncio.create_task(
self._channel.send(
"reject", dict(error=dict(error=serialize_error(e, tb)))
)
)
def trim_url(param: Union[URLMatchRequest, URLMatchResponse]) -> Optional[str]:
if isinstance(param, re.Pattern):
return trim_end(param.pattern)
if isinstance(param, str):
return trim_end(param)
return None
def trim_end(s: str) -> str:
if len(s) > 50:
return s[:50] + "\u2026"
return s