# Copyright (c) Microsoft Corporation.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import collections.abc
import datetime
import math
import traceback
from pathlib import Path
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Union
from urllib.parse import ParseResult, urlparse, urlunparse
from playwright._impl._connection import Channel, ChannelOwner, from_channel
from playwright._impl._errors import Error, is_target_closed_error
from playwright._impl._map import Map
if TYPE_CHECKING: # pragma: no cover
from playwright._impl._element_handle import ElementHandle
Serializable = Any
class VisitorInfo:
visited: Map[Any, int]
last_id: int
def __init__(self) -> None:
self.visited = Map()
self.last_id = 0
def visit(self, obj: Any) -> int:
assert obj not in self.visited
self.last_id += 1
self.visited[obj] = self.last_id
return self.last_id
class JSHandle(ChannelOwner):
def __init__(
self, parent: ChannelOwner, type: str, guid: str, initializer: Dict
) -> None:
super().__init__(parent, type, guid, initializer)
self._preview = self._initializer["preview"]
self._channel.on(
"previewUpdated", lambda params: self._on_preview_updated(params["preview"])
)
def __repr__(self) -> str:
return f"<JSHandle preview={self._preview}>"
def __str__(self) -> str:
return self._preview
def _on_preview_updated(self, preview: str) -> None:
self._preview = preview
async def evaluate(self, expression: str, arg: Serializable = None) -> Any:
return parse_result(
await self._channel.send(
"evaluateExpression",
dict(
expression=expression,
arg=serialize_argument(arg),
),
)
)
async def evaluate_handle(
self, expression: str, arg: Serializable = None
) -> "JSHandle":
return from_channel(
await self._channel.send(
"evaluateExpressionHandle",
dict(
expression=expression,
arg=serialize_argument(arg),
),
)
)
async def get_property(self, propertyName: str) -> "JSHandle":
return from_channel(
await self._channel.send("getProperty", dict(name=propertyName))
)
async def get_properties(self) -> Dict[str, "JSHandle"]:
return {
prop["name"]: from_channel(prop["value"])
for prop in await self._channel.send("getPropertyList")
}
def as_element(self) -> Optional["ElementHandle"]:
return None
async def dispose(self) -> None:
try:
await self._channel.send("dispose")
except Exception as e:
if not is_target_closed_error(e):
raise e
async def json_value(self) -> Any:
return parse_result(await self._channel.send("jsonValue"))
def serialize_value(
value: Any, handles: List[Channel], visitor_info: Optional[VisitorInfo] = None
) -> Any:
if visitor_info is None:
visitor_info = VisitorInfo()
if isinstance(value, JSHandle):
h = len(handles)
handles.append(value._channel)
return dict(h=h)
if value is None:
return dict(v="null")
if isinstance(value, float):
if value == float("inf"):
return dict(v="Infinity")
if value == float("-inf"):
return dict(v="-Infinity")
if value == float("-0"):
return dict(v="-0")
if math.isnan(value):
return dict(v="NaN")
if isinstance(value, datetime.datetime):
# Node.js Date objects are always in UTC.
return {
"d": datetime.datetime.strftime(
value.astimezone(datetime.timezone.utc), "%Y-%m-%dT%H:%M:%S.%fZ"
)
}
if isinstance(value, Exception):
return {
"e": {
"m": str(value),
"n": (
(value.name or "")
if isinstance(value, Error)
else value.__class__.__name__
),
"s": (
(value.stack or "")
if isinstance(value, Error)
else "".join(
traceback.format_exception(type(value), value=value, tb=None)
)
),
}
}
if isinstance(value, bool):
return {"b": value}
if isinstance(value, (int, float)):
return {"n": value}
if isinstance(value, str):
return {"s": value}
if isinstance(value, ParseResult):
return {"u": urlunparse(value)}
if value in visitor_info.visited:
return dict(ref=visitor_info.visited[value])
if isinstance(value, collections.abc.Sequence) and not isinstance(value, str):
id = visitor_info.visit(value)
a = []
for e in value:
a.append(serialize_value(e, handles, visitor_info))
return dict(a=a, id=id)
if isinstance(value, dict):
id = visitor_info.visit(value)
o = []
for name in value:
o.append(
{"k": name, "v": serialize_value(value[name], handles, visitor_info)}
)
return dict(o=o, id=id)
return dict(v="undefined")
def serialize_argument(arg: Serializable = None) -> Any:
handles: List[Channel] = []
value = serialize_value(arg, handles)
return dict(value=value, handles=handles)
def parse_value(value: Any, refs: Optional[Dict[int, Any]] = None) -> Any:
if refs is None:
refs = {}
if value is None:
return None
if isinstance(value, dict):
if "ref" in value:
return refs[value["ref"]]
if "v" in value:
v = value["v"]
if v == "Infinity":
return float("inf")
if v == "-Infinity":
return float("-inf")
if v == "-0":
return float("-0")
if v == "NaN":
return float("nan")
if v == "undefined":
return None
if v == "null":
return None
return v
if "u" in value:
return urlparse(value["u"])
if "bi" in value:
return int(value["bi"])
if "e" in value:
error = Error(value["e"]["m"])
error._name = value["e"]["n"]
error._stack = value["e"]["s"]
return error
if "a" in value:
a: List = []
refs[value["id"]] = a
for e in value["a"]:
a.append(parse_value(e, refs))
return a
if "d" in value:
# Node.js Date objects are always in UTC.
return datetime.datetime.strptime(
value["d"], "%Y-%m-%dT%H:%M:%S.%fZ"
).replace(tzinfo=datetime.timezone.utc)
if "o" in value:
o: Dict = {}
refs[value["id"]] = o
for e in value["o"]:
o[e["k"]] = parse_value(e["v"], refs)
return o
if "n" in value:
return value["n"]
if "s" in value:
return value["s"]
if "b" in value:
return value["b"]
return value
def parse_result(result: Any) -> Any:
return parse_value(result)
def add_source_url_to_script(source: str, path: Union[str, Path]) -> str:
return source + "\n//# sourceURL=" + str(path).replace("\n", "")