# Copyright (c) Microsoft Corporation.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import asyncio
import io
import json
import os
import subprocess
import sys
from abc import ABC, abstractmethod
from typing import Callable, Dict, Optional, Union
from playwright._impl._driver import compute_driver_executable, get_driver_env
from playwright._impl._helper import ParsedMessagePayload
# Sourced from: https://github.com/pytest-dev/pytest/blob/da01ee0a4bb0af780167ecd228ab3ad249511302/src/_pytest/faulthandler.py#L69-L77
def _get_stderr_fileno() -> Optional[int]:
try:
# when using pythonw, sys.stderr is None.
# when Pyinstaller is used, there is no closed attribute because Pyinstaller monkey-patches it with a NullWriter class
if sys.stderr is None or not hasattr(sys.stderr, "closed"):
return None
if sys.stderr.closed:
return None
return sys.stderr.fileno()
except (NotImplementedError, AttributeError, io.UnsupportedOperation):
# pytest-xdist monkeypatches sys.stderr with an object that is not an actual file.
# https://docs.python.org/3/library/faulthandler.html#issue-with-file-descriptors
# This is potentially dangerous, but the best we can do.
if not hasattr(sys, "__stderr__") or not sys.__stderr__:
return None
return sys.__stderr__.fileno()
class Transport(ABC):
def __init__(self, loop: asyncio.AbstractEventLoop) -> None:
self._loop = loop
self.on_message: Callable[[ParsedMessagePayload], None] = lambda _: None
self.on_error_future: asyncio.Future = loop.create_future()
@abstractmethod
def request_stop(self) -> None:
pass
def dispose(self) -> None:
pass
@abstractmethod
async def wait_until_stopped(self) -> None:
pass
@abstractmethod
async def connect(self) -> None:
pass
@abstractmethod
async def run(self) -> None:
pass
@abstractmethod
def send(self, message: Dict) -> None:
pass
def serialize_message(self, message: Dict) -> bytes:
msg = json.dumps(message)
if "DEBUGP" in os.environ: # pragma: no cover
print("\x1b[32mSEND>\x1b[0m", json.dumps(message, indent=2))
return msg.encode()
def deserialize_message(self, data: Union[str, bytes]) -> ParsedMessagePayload:
obj = json.loads(data)
if "DEBUGP" in os.environ: # pragma: no cover
print("\x1b[33mRECV>\x1b[0m", json.dumps(obj, indent=2))
return obj
class PipeTransport(Transport):
def __init__(self, loop: asyncio.AbstractEventLoop) -> None:
super().__init__(loop)
self._stopped = False
def request_stop(self) -> None:
assert self._output
self._stopped = True
self._output.close()
async def wait_until_stopped(self) -> None:
await self._stopped_future
async def connect(self) -> None:
self._stopped_future: asyncio.Future = asyncio.Future()
try:
# For pyinstaller and Nuitka
env = get_driver_env()
if getattr(sys, "frozen", False) or globals().get("__compiled__"):
env.setdefault("PLAYWRIGHT_BROWSERS_PATH", "0")
startupinfo = None
if sys.platform == "win32":
startupinfo = subprocess.STARTUPINFO()
startupinfo.dwFlags |= subprocess.STARTF_USESHOWWINDOW
startupinfo.wShowWindow = subprocess.SW_HIDE
executable_path, entrypoint_path = compute_driver_executable()
self._proc = await asyncio.create_subprocess_exec(
executable_path,
entrypoint_path,
"run-driver",
stdin=asyncio.subprocess.PIPE,
stdout=asyncio.subprocess.PIPE,
stderr=_get_stderr_fileno(),
limit=32768,
env=env,
startupinfo=startupinfo,
)
except Exception as exc:
self.on_error_future.set_exception(exc)
raise exc
self._output = self._proc.stdin
async def run(self) -> None:
assert self._proc.stdout
assert self._proc.stdin
while not self._stopped:
try:
buffer = await self._proc.stdout.readexactly(4)
if self._stopped:
break
length = int.from_bytes(buffer, byteorder="little", signed=False)
buffer = bytes(0)
while length:
to_read = min(length, 32768)
data = await self._proc.stdout.readexactly(to_read)
if self._stopped:
break
length -= to_read
if len(buffer):
buffer = buffer + data
else:
buffer = data
if self._stopped:
break
obj = self.deserialize_message(buffer)
self.on_message(obj)
except asyncio.IncompleteReadError:
if not self._stopped:
self.on_error_future.set_exception(
Exception("Connection closed while reading from the driver")
)
break
await asyncio.sleep(0)
await self._proc.communicate()
self._stopped_future.set_result(None)
def send(self, message: Dict) -> None:
assert self._output
data = self.serialize_message(message)
self._output.write(
len(data).to_bytes(4, byteorder="little", signed=False) + data
)