# Copyright (c) Microsoft Corporation.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import asyncio
from pathlib import Path
from typing import (
TYPE_CHECKING,
Any,
Dict,
List,
Optional,
Pattern,
Sequence,
Set,
Union,
cast,
)
from pyee import EventEmitter
from playwright._impl._api_structures import AriaRole, FilePayload, Position
from playwright._impl._connection import (
ChannelOwner,
from_channel,
from_nullable_channel,
)
from playwright._impl._element_handle import ElementHandle, convert_select_option_values
from playwright._impl._errors import Error
from playwright._impl._event_context_manager import EventContextManagerImpl
from playwright._impl._helper import (
DocumentLoadState,
FrameNavigatedEvent,
KeyboardModifier,
Literal,
MouseButton,
URLMatch,
async_readfile,
locals_to_params,
monotonic_time,
url_matches,
)
from playwright._impl._js_handle import (
JSHandle,
Serializable,
add_source_url_to_script,
parse_result,
serialize_argument,
)
from playwright._impl._locator import (
FrameLocator,
Locator,
get_by_alt_text_selector,
get_by_label_selector,
get_by_placeholder_selector,
get_by_role_selector,
get_by_test_id_selector,
get_by_text_selector,
get_by_title_selector,
test_id_attribute_name,
)
from playwright._impl._network import Response
from playwright._impl._set_input_files_helpers import convert_input_files
from playwright._impl._waiter import Waiter
if TYPE_CHECKING: # pragma: no cover
from playwright._impl._page import Page
class Frame(ChannelOwner):
def __init__(
self, parent: ChannelOwner, type: str, guid: str, initializer: Dict
) -> None:
super().__init__(parent, type, guid, initializer)
self._parent_frame = from_nullable_channel(initializer.get("parentFrame"))
if self._parent_frame:
self._parent_frame._child_frames.append(self)
self._name = initializer["name"]
self._url = initializer["url"]
self._detached = False
self._child_frames: List[Frame] = []
self._page: Optional[Page] = None
self._load_states: Set[str] = set(initializer["loadStates"])
self._event_emitter = EventEmitter()
self._channel.on(
"loadstate",
lambda params: self._on_load_state(params.get("add"), params.get("remove")),
)
self._channel.on(
"navigated",
lambda params: self._on_frame_navigated(params),
)
def __repr__(self) -> str:
return f"<Frame name={self.name} url={self.url!r}>"
def _on_load_state(
self, add: DocumentLoadState = None, remove: DocumentLoadState = None
) -> None:
if add:
self._load_states.add(add)
self._event_emitter.emit("loadstate", add)
elif remove and remove in self._load_states:
self._load_states.remove(remove)
if not self._parent_frame and add == "load" and self._page:
self._page.emit("load", self._page)
if not self._parent_frame and add == "domcontentloaded" and self._page:
self._page.emit("domcontentloaded", self._page)
def _on_frame_navigated(self, event: FrameNavigatedEvent) -> None:
self._url = event["url"]
self._name = event["name"]
self._event_emitter.emit("navigated", event)
if "error" not in event and self._page:
self._page.emit("framenavigated", self)
async def _query_count(self, selector: str) -> int:
return await self._channel.send("queryCount", {"selector": selector})
@property
def page(self) -> "Page":
assert self._page
return self._page
async def goto(
self,
url: str,
timeout: float = None,
waitUntil: DocumentLoadState = None,
referer: str = None,
) -> Optional[Response]:
return cast(
Optional[Response],
from_nullable_channel(
await self._channel.send("goto", locals_to_params(locals()))
),
)
def _setup_navigation_waiter(self, wait_name: str, timeout: float = None) -> Waiter:
assert self._page
waiter = Waiter(self._page, f"frame.{wait_name}")
waiter.reject_on_event(
self._page,
"close",
lambda: cast("Page", self._page)._close_error_with_reason(),
)
waiter.reject_on_event(
self._page, "crash", Error("Navigation failed because page crashed!")
)
waiter.reject_on_event(
self._page,
"framedetached",
Error("Navigating frame was detached!"),
lambda frame: frame == self,
)
if timeout is None:
timeout = self._page._timeout_settings.navigation_timeout()
waiter.reject_on_timeout(timeout, f"Timeout {timeout}ms exceeded.")
return waiter
def expect_navigation(
self,
url: URLMatch = None,
waitUntil: DocumentLoadState = None,
timeout: float = None,
) -> EventContextManagerImpl[Response]:
assert self._page
if not waitUntil:
waitUntil = "load"
if timeout is None:
timeout = self._page._timeout_settings.navigation_timeout()
deadline = monotonic_time() + timeout
waiter = self._setup_navigation_waiter("expect_navigation", timeout)
to_url = f' to "{url}"' if url else ""
waiter.log(f"waiting for navigation{to_url} until '{waitUntil}'")
def predicate(event: Any) -> bool:
# Any failed navigation results in a rejection.
if event.get("error"):
return True
waiter.log(f' navigated to "{event["url"]}"')
return url_matches(
cast("Page", self._page)._browser_context._options.get("baseURL"),
event["url"],
url,
)
waiter.wait_for_event(
self._event_emitter,
"navigated",
predicate=predicate,
)
async def continuation() -> Optional[Response]:
event = await waiter.result()
if "error" in event:
raise Error(event["error"])
if waitUntil not in self._load_states:
t = deadline - monotonic_time()
if t > 0:
await self._wait_for_load_state_impl(state=waitUntil, timeout=t)
if "newDocument" in event and "request" in event["newDocument"]:
request = from_channel(event["newDocument"]["request"])
return await request.response()
return None
return EventContextManagerImpl(asyncio.create_task(continuation()))
async def wait_for_url(
self,
url: URLMatch,
waitUntil: DocumentLoadState = None,
timeout: float = None,
) -> None:
assert self._page
if url_matches(
self._page._browser_context._options.get("baseURL"), self.url, url
):
await self._wait_for_load_state_impl(state=waitUntil, timeout=timeout)
return
async with self.expect_navigation(
url=url, waitUntil=waitUntil, timeout=timeout
):
pass
async def wait_for_load_state(
self,
state: Literal["domcontentloaded", "load", "networkidle"] = None,
timeout: float = None,
) -> None:
return await self._wait_for_load_state_impl(state, timeout)
async def _wait_for_load_state_impl(
self, state: DocumentLoadState = None, timeout: float = None
) -> None:
if not state:
state = "load"
if state not in ("load", "domcontentloaded", "networkidle", "commit"):
raise Error(
"state: expected one of (load|domcontentloaded|networkidle|commit)"
)
waiter = self._setup_navigation_waiter("wait_for_load_state", timeout)
if state in self._load_states:
waiter.log(f' not waiting, "{state}" event already fired')
# TODO: align with upstream
waiter._fulfill(None)
else:
def handle_load_state_event(actual_state: str) -> bool:
waiter.log(f'"{actual_state}" event fired')
return actual_state == state
waiter.wait_for_event(
self._event_emitter,
"loadstate",
handle_load_state_event,
)
await waiter.result()
async def frame_element(self) -> ElementHandle:
return from_channel(await self._channel.send("frameElement"))
async def evaluate(self, expression: str, arg: Serializable = None) -> Any:
return parse_result(
await self._channel.send(
"evaluateExpression",
dict(
expression=expression,
arg=serialize_argument(arg),
),
)
)
async def evaluate_handle(
self, expression: str, arg: Serializable = None
) -> JSHandle:
return from_channel(
await self._channel.send(
"evaluateExpressionHandle",
dict(
expression=expression,
arg=serialize_argument(arg),
),
)
)
async def query_selector(
self, selector: str, strict: bool = None
) -> Optional[ElementHandle]:
return from_nullable_channel(
await self._channel.send("querySelector", locals_to_params(locals()))
)
async def query_selector_all(self, selector: str) -> List[ElementHandle]:
return list(
map(
from_channel,
await self._channel.send("querySelectorAll", dict(selector=selector)),
)
)
async def wait_for_selector(
self,
selector: str,
strict: bool = None,
timeout: float = None,
state: Literal["attached", "detached", "hidden", "visible"] = None,
) -> Optional[ElementHandle]:
return from_nullable_channel(
await self._channel.send("waitForSelector", locals_to_params(locals()))
)
async def is_checked(
self, selector: str, strict: bool = None, timeout: float = None
) -> bool:
return await self._channel.send("isChecked", locals_to_params(locals()))
async def is_disabled(
self, selector: str, strict: bool = None, timeout: float = None
) -> bool:
return await self._channel.send("isDisabled", locals_to_params(locals()))
async def is_editable(
self, selector: str, strict: bool = None, timeout: float = None
) -> bool:
return await self._channel.send("isEditable", locals_to_params(locals()))
async def is_enabled(
self, selector: str, strict: bool = None, timeout: float = None
) -> bool:
return await self._channel.send("isEnabled", locals_to_params(locals()))
async def is_hidden(
self, selector: str, strict: bool = None, timeout: float = None
) -> bool:
return await self._channel.send("isHidden", locals_to_params(locals()))
async def is_visible(
self, selector: str, strict: bool = None, timeout: float = None
) -> bool:
return await self._channel.send("isVisible", locals_to_params(locals()))
async def dispatch_event(
self,
selector: str,
type: str,
eventInit: Dict = None,
strict: bool = None,
timeout: float = None,
) -> None:
await self._channel.send(
"dispatchEvent",
locals_to_params(
dict(
selector=selector,
type=type,
eventInit=serialize_argument(eventInit),
strict=strict,
timeout=timeout,
),
),
)
async def eval_on_selector(
self,
selector: str,
expression: str,
arg: Serializable = None,
strict: bool = None,
) -> Any:
return parse_result(
await self._channel.send(
"evalOnSelector",
locals_to_params(
dict(
selector=selector,
expression=expression,
arg=serialize_argument(arg),
strict=strict,
)
),
)
)
async def eval_on_selector_all(
self,
selector: str,
expression: str,
arg: Serializable = None,
) -> Any:
return parse_result(
await self._channel.send(
"evalOnSelectorAll",
dict(
selector=selector,
expression=expression,
arg=serialize_argument(arg),
),
)
)
async def content(self) -> str:
return await self._channel.send("content")
async def set_content(
self,
html: str,
timeout: float = None,
waitUntil: DocumentLoadState = None,
) -> None:
await self._channel.send("setContent", locals_to_params(locals()))
@property
def name(self) -> str:
return self._name or ""
@property
def url(self) -> str:
return self._url or ""
@property
def parent_frame(self) -> Optional["Frame"]:
return self._parent_frame
@property
def child_frames(self) -> List["Frame"]:
return self._child_frames.copy()
def is_detached(self) -> bool:
return self._detached
async def add_script_tag(
self,
url: str = None,
path: Union[str, Path] = None,
content: str = None,
type: str = None,
) -> ElementHandle:
params = locals_to_params(locals())
if path:
params["content"] = add_source_url_to_script(
(await async_readfile(path)).decode(), path
)
del params["path"]
return from_channel(await self._channel.send("addScriptTag", params))
async def add_style_tag(
self, url: str = None, path: Union[str, Path] = None, content: str = None
) -> ElementHandle:
params = locals_to_params(locals())
if path:
params["content"] = (
(await async_readfile(path)).decode()
+ "\n/*# sourceURL="
+ str(Path(path))
+ "*/"
)
del params["path"]
return from_channel(await self._channel.send("addStyleTag", params))
async def click(
self,
selector: str,
modifiers: Sequence[KeyboardModifier] = None,
position: Position = None,
delay: float = None,
button: MouseButton = None,
clickCount: int = None,
timeout: float = None,
force: bool = None,
noWaitAfter: bool = None,
strict: bool = None,
trial: bool = None,
) -> None:
await self._channel.send("click", locals_to_params(locals()))
async def dblclick(
self,
selector: str,
modifiers: Sequence[KeyboardModifier] = None,
position: Position = None,
delay: float = None,
button: MouseButton = None,
timeout: float = None,
force: bool = None,
noWaitAfter: bool = None,
strict: bool = None,
trial: bool = None,
) -> None:
await self._channel.send("dblclick", locals_to_params(locals()))
async def tap(
self,
selector: str,
modifiers: Sequence[KeyboardModifier] = None,
position: Position = None,
timeout: float = None,
force: bool = None,
noWaitAfter: bool = None,
strict: bool = None,
trial: bool = None,
) -> None:
await self._channel.send("tap", locals_to_params(locals()))
async def fill(
self,
selector: str,
value: str,
timeout: float = None,
noWaitAfter: bool = None,
strict: bool = None,
force: bool = None,
) -> None:
await self._channel.send("fill", locals_to_params(locals()))
def locator(
self,
selector: str,
hasText: Union[str, Pattern[str]] = None,
hasNotText: Union[str, Pattern[str]] = None,
has: Locator = None,
hasNot: Locator = None,
) -> Locator:
return Locator(
self,
selector,
has_text=hasText,
has_not_text=hasNotText,
has=has,
has_not=hasNot,
)
def get_by_alt_text(
self, text: Union[str, Pattern[str]], exact: bool = None
) -> "Locator":
return self.locator(get_by_alt_text_selector(text, exact=exact))
def get_by_label(
self, text: Union[str, Pattern[str]], exact: bool = None
) -> "Locator":
return self.locator(get_by_label_selector(text, exact=exact))
def get_by_placeholder(
self, text: Union[str, Pattern[str]], exact: bool = None
) -> "Locator":
return self.locator(get_by_placeholder_selector(text, exact=exact))
def get_by_role(
self,
role: AriaRole,
checked: bool = None,
disabled: bool = None,
expanded: bool = None,
includeHidden: bool = None,
level: int = None,
name: Union[str, Pattern[str]] = None,
pressed: bool = None,
selected: bool = None,
exact: bool = None,
) -> "Locator":
return self.locator(
get_by_role_selector(
role,
checked=checked,
disabled=disabled,
expanded=expanded,
includeHidden=includeHidden,
level=level,
name=name,
pressed=pressed,
selected=selected,
exact=exact,
)
)
def get_by_test_id(self, testId: Union[str, Pattern[str]]) -> "Locator":
return self.locator(get_by_test_id_selector(test_id_attribute_name(), testId))
def get_by_text(
self, text: Union[str, Pattern[str]], exact: bool = None
) -> "Locator":
return self.locator(get_by_text_selector(text, exact=exact))
def get_by_title(
self, text: Union[str, Pattern[str]], exact: bool = None
) -> "Locator":
return self.locator(get_by_title_selector(text, exact=exact))
def frame_locator(self, selector: str) -> FrameLocator:
return FrameLocator(self, selector)
async def focus(
self, selector: str, strict: bool = None, timeout: float = None
) -> None:
await self._channel.send("focus", locals_to_params(locals()))
async def text_content(
self, selector: str, strict: bool = None, timeout: float = None
) -> Optional[str]:
return await self._channel.send("textContent", locals_to_params(locals()))
async def inner_text(
self, selector: str, strict: bool = None, timeout: float = None
) -> str:
return await self._channel.send("innerText", locals_to_params(locals()))
async def inner_html(
self, selector: str, strict: bool = None, timeout: float = None
) -> str:
return await self._channel.send("innerHTML", locals_to_params(locals()))
async def get_attribute(
self, selector: str, name: str, strict: bool = None, timeout: float = None
) -> Optional[str]:
return await self._channel.send("getAttribute", locals_to_params(locals()))
async def hover(
self,
selector: str,
modifiers: Sequence[KeyboardModifier] = None,
position: Position = None,
timeout: float = None,
noWaitAfter: bool = None,
force: bool = None,
strict: bool = None,
trial: bool = None,
) -> None:
await self._channel.send("hover", locals_to_params(locals()))
async def drag_and_drop(
self,
source: str,
target: str,
sourcePosition: Position = None,
targetPosition: Position = None,
force: bool = None,
noWaitAfter: bool = None,
strict: bool = None,
timeout: float = None,
trial: bool = None,
) -> None:
await self._channel.send("dragAndDrop", locals_to_params(locals()))
async def select_option(
self,
selector: str,
value: Union[str, Sequence[str]] = None,
index: Union[int, Sequence[int]] = None,
label: Union[str, Sequence[str]] = None,
element: Union["ElementHandle", Sequence["ElementHandle"]] = None,
timeout: float = None,
noWaitAfter: bool = None,
strict: bool = None,
force: bool = None,
) -> List[str]:
params = locals_to_params(
dict(
selector=selector,
timeout=timeout,
strict=strict,
force=force,
**convert_select_option_values(value, index, label, element),
)
)
return await self._channel.send("selectOption", params)
async def input_value(
self,
selector: str,
strict: bool = None,
timeout: float = None,
) -> str:
return await self._channel.send("inputValue", locals_to_params(locals()))
async def set_input_files(
self,
selector: str,
files: Union[
str, Path, FilePayload, Sequence[Union[str, Path]], Sequence[FilePayload]
],
strict: bool = None,
timeout: float = None,
noWaitAfter: bool = None,
) -> None:
converted = await convert_input_files(files, self.page.context)
await self._channel.send(
"setInputFiles",
{
"selector": selector,
"strict": strict,
"timeout": timeout,
**converted,
},
)
async def type(
self,
selector: str,
text: str,
delay: float = None,
strict: bool = None,
timeout: float = None,
noWaitAfter: bool = None,
) -> None:
await self._channel.send("type", locals_to_params(locals()))
async def press(
self,
selector: str,
key: str,
delay: float = None,
strict: bool = None,
timeout: float = None,
noWaitAfter: bool = None,
) -> None:
await self._channel.send("press", locals_to_params(locals()))
async def check(
self,
selector: str,
position: Position = None,
timeout: float = None,
force: bool = None,
noWaitAfter: bool = None,
strict: bool = None,
trial: bool = None,
) -> None:
await self._channel.send("check", locals_to_params(locals()))
async def uncheck(
self,
selector: str,
position: Position = None,
timeout: float = None,
force: bool = None,
noWaitAfter: bool = None,
strict: bool = None,
trial: bool = None,
) -> None:
await self._channel.send("uncheck", locals_to_params(locals()))
async def wait_for_timeout(self, timeout: float) -> None:
await self._channel.send("waitForTimeout", locals_to_params(locals()))
async def wait_for_function(
self,
expression: str,
arg: Serializable = None,
timeout: float = None,
polling: Union[float, Literal["raf"]] = None,
) -> JSHandle:
if isinstance(polling, str) and polling != "raf":
raise Error(f"Unknown polling option: {polling}")
params = locals_to_params(locals())
params["arg"] = serialize_argument(arg)
if polling is not None and polling != "raf":
params["pollingInterval"] = polling
return from_channel(await self._channel.send("waitForFunction", params))
async def title(self) -> str:
return await self._channel.send("title")
async def set_checked(
self,
selector: str,
checked: bool,
position: Position = None,
timeout: float = None,
force: bool = None,
noWaitAfter: bool = None,
strict: bool = None,
trial: bool = None,
) -> None:
if checked:
await self.check(
selector=selector,
position=position,
timeout=timeout,
force=force,
strict=strict,
trial=trial,
)
else:
await self.uncheck(
selector=selector,
position=position,
timeout=timeout,
force=force,
strict=strict,
trial=trial,
)
async def _highlight(self, selector: str) -> None:
await self._channel.send("highlight", {"selector": selector})