from __future__ import annotations import csv import logging import os from concurrent.futures import ThreadPoolExecutor, as_completed from datetime import datetime from pathlib import Path from urllib.parse import unquote import click import primp from .duckduckgo_search import DDGS from .utils import _expand_proxy_tb_alias, json_dumps from .version import __version__ logger = logging.getLogger(__name__) COLORS = { 0: "black", 1: "red", 2: "green", 3: "yellow", 4: "blue", 5: "magenta", 6: "cyan", 7: "bright_black", 8: "bright_red", 9: "bright_green", 10: "bright_yellow", 11: "bright_blue", 12: "bright_magenta", 13: "bright_cyan", 14: "white", 15: "bright_white", } CHAT_MODEL_CHOICES = {f"{i}": k for i, k in enumerate(DDGS._chat_models, start=1)} CHAT_MODEL_CHOICES_PROMPT = ( "DuckDuckGo AI chat. Choose a model:\n" + "\n".join([f"[{key}]: {value}" for key, value in CHAT_MODEL_CHOICES.items()]) + "\n" ) def _save_data(keywords: str, data: list[dict[str, str]], function_name: str, filename: str | None) -> None: filename, ext = filename.rsplit(".", 1) if filename and filename.endswith((".csv", ".json")) else (None, filename) filename = filename if filename else f"{function_name}_{keywords}_{datetime.now():%Y%m%d_%H%M%S}" if ext == "csv": _save_csv(f"{filename}.{ext}", data) elif ext == "json": _save_json(f"{filename}.{ext}", data) def _save_json(jsonfile: str | Path, data: list[dict[str, str]]) -> None: with open(jsonfile, "w", encoding="utf-8") as file: file.write(json_dumps(data)) def _save_csv(csvfile: str | Path, data: list[dict[str, str]]) -> None: with open(csvfile, "w", newline="", encoding="utf-8") as file: if data: headers = data[0].keys() writer = csv.DictWriter(file, fieldnames=headers, quoting=csv.QUOTE_MINIMAL) writer.writeheader() writer.writerows(data) def _print_data(data: list[dict[str, str]]) -> None: if data: for i, e in enumerate(data, start=1): click.secho(f"{i}.\t {'=' * 78}", bg="black", fg="white") for j, (k, v) in enumerate(e.items(), start=1): if v: width = 300 if k in ("content", "href", "image", "source", "thumbnail", "url") else 78 k = "language" if k == "detected_language" else k text = click.wrap_text( f"{v}", width=width, initial_indent="", subsequent_indent=" " * 12, preserve_paragraphs=True ) else: text = v click.secho(f"{k:<12}{text}", bg="black", fg=COLORS[j], overline=True) input() def _sanitize_keywords(keywords: str) -> str: keywords = ( keywords.replace("filetype", "") .replace(":", "") .replace('"', "'") .replace("site", "") .replace(" ", "_") .replace("/", "_") .replace("\\", "_") .replace(" ", "") ) return keywords def _download_file(url: str, dir_path: str, filename: str, proxy: str | None, verify: bool) -> None: try: resp = primp.Client(proxy=proxy, impersonate="chrome_131", timeout=10, verify=verify).get(url) if resp.status_code == 200: with open(os.path.join(dir_path, filename[:200]), "wb") as file: file.write(resp.content) except Exception as ex: logger.debug(f"download_file url={url} {type(ex).__name__} {ex}") def _download_results( keywords: str, results: list[dict[str, str]], function_name: str, proxy: str | None = None, threads: int | None = None, verify: bool = True, pathname: str | None = None, ) -> None: path = pathname if pathname else f"{function_name}_{keywords}_{datetime.now():%Y%m%d_%H%M%S}" os.makedirs(path, exist_ok=True) threads = 10 if threads is None else threads with ThreadPoolExecutor(max_workers=threads) as executor: futures = [] for i, res in enumerate(results, start=1): url = res["image"] if function_name == "images" else res["href"] filename = unquote(url.split("/")[-1].split("?")[0]) f = executor.submit(_download_file, url, path, f"{i}_{filename}", proxy, verify) futures.append(f) with click.progressbar( # type: ignore length=len(futures), label="Downloading", show_percent=True, show_pos=True, width=50 ) as bar: for future in as_completed(futures): future.result() bar.update(1) @click.group(chain=True) def cli() -> None: """duckduckgo_search CLI tool""" pass def safe_entry_point() -> None: try: cli() except Exception as ex: click.echo(f"{type(ex).__name__}: {ex}") @cli.command() def version() -> str: print(__version__) return __version__ @cli.command() @click.option("-k", "--keywords", required=True, help="text search, keywords for query") @click.option("-r", "--region", default="wt-wt", help="wt-wt, us-en, ru-ru, etc. -region https://duckduckgo.com/params") @click.option("-s", "--safesearch", default="moderate", type=click.Choice(["on", "moderate", "off"])) @click.option("-t", "--timelimit", type=click.Choice(["d", "w", "m", "y"]), help="day, week, month, year") @click.option("-m", "--max_results", type=int, help="maximum number of results") @click.option("-o", "--output", help="csv, json or filename.csv|json (save the results to a csv or json file)") @click.option("-d", "--download", is_flag=True, default=False, help="download results. -dd to set custom directory") @click.option("-dd", "--download-directory", help="Specify custom download directory") @click.option("-b", "--backend", default="auto", type=click.Choice(["auto", "html", "lite"])) @click.option("-th", "--threads", default=10, help="download threads, default=10") @click.option("-p", "--proxy", help="the proxy to send requests, example: socks5://127.0.0.1:9150") @click.option("-v", "--verify", default=True, help="verify SSL when making the request") def text( keywords: str, region: str, safesearch: str, timelimit: str | None, backend: str, output: str | None, download: bool, download_directory: str | None, threads: int, max_results: int | None, proxy: str | None, verify: bool, ) -> None: """CLI function to perform a text search using DuckDuckGo API.""" data = DDGS(proxy=_expand_proxy_tb_alias(proxy), verify=verify).text( keywords=keywords, region=region, safesearch=safesearch, timelimit=timelimit, backend=backend, max_results=max_results, ) keywords = _sanitize_keywords(keywords) if output: _save_data(keywords, data, "text", filename=output) if download: _download_results( keywords, data, function_name="text", proxy=proxy, threads=threads, verify=verify, pathname=download_directory, ) if not output and not download: _print_data(data) @cli.command() @click.option("-k", "--keywords", required=True, help="keywords for query") @click.option("-r", "--region", default="wt-wt", help="wt-wt, us-en, ru-ru, etc. -region https://duckduckgo.com/params") @click.option("-s", "--safesearch", default="moderate", type=click.Choice(["on", "moderate", "off"])) @click.option("-t", "--timelimit", type=click.Choice(["Day", "Week", "Month", "Year"])) @click.option("-size", "--size", type=click.Choice(["Small", "Medium", "Large", "Wallpaper"])) @click.option( "-c", "--color", type=click.Choice( [ "color", "Monochrome", "Red", "Orange", "Yellow", "Green", "Blue", "Purple", "Pink", "Brown", "Black", "Gray", "Teal", "White", ] ), ) @click.option("-type", "--type_image", type=click.Choice(["photo", "clipart", "gif", "transparent", "line"])) @click.option("-l", "--layout", type=click.Choice(["Square", "Tall", "Wide"])) @click.option( "-lic", "--license_image", type=click.Choice(["any", "Public", "Share", "ShareCommercially", "Modify", "ModifyCommercially"]), ) @click.option("-m", "--max_results", type=int, help="maximum number of results") @click.option("-o", "--output", help="csv, json or filename.csv|json (save the results to a csv or json file)") @click.option("-d", "--download", is_flag=True, default=False, help="download results. -dd to set custom directory") @click.option("-dd", "--download-directory", help="Specify custom download directory") @click.option("-th", "--threads", default=10, help="download threads, default=10") @click.option("-p", "--proxy", help="the proxy to send requests, example: socks5://127.0.0.1:9150") @click.option("-v", "--verify", default=True, help="verify SSL when making the request") def images( keywords: str, region: str, safesearch: str, timelimit: str | None, size: str | None, color: str | None, type_image: str | None, layout: str | None, license_image: str | None, download: bool, download_directory: str | None, threads: int, max_results: int | None, output: str | None, proxy: str | None, verify: bool, ) -> None: """CLI function to perform a images search using DuckDuckGo API.""" data = DDGS(proxy=_expand_proxy_tb_alias(proxy), verify=verify).images( keywords=keywords, region=region, safesearch=safesearch, timelimit=timelimit, size=size, color=color, type_image=type_image, layout=layout, license_image=license_image, max_results=max_results, ) keywords = _sanitize_keywords(keywords) if output: _save_data(keywords, data, function_name="images", filename=output) if download: _download_results( keywords, data, function_name="images", proxy=proxy, threads=threads, verify=verify, pathname=download_directory, ) if not output and not download: _print_data(data) @cli.command() @click.option("-k", "--keywords", required=True, help="keywords for query") @click.option("-r", "--region", default="wt-wt", help="wt-wt, us-en, ru-ru, etc. -region https://duckduckgo.com/params") @click.option("-s", "--safesearch", default="moderate", type=click.Choice(["on", "moderate", "off"])) @click.option("-t", "--timelimit", type=click.Choice(["d", "w", "m"]), help="day, week, month") @click.option("-res", "--resolution", type=click.Choice(["high", "standart"])) @click.option("-d", "--duration", type=click.Choice(["short", "medium", "long"])) @click.option("-lic", "--license_videos", type=click.Choice(["creativeCommon", "youtube"])) @click.option("-m", "--max_results", type=int, help="maximum number of results") @click.option("-o", "--output", help="csv, json or filename.csv|json (save the results to a csv or json file)") @click.option("-p", "--proxy", help="the proxy to send requests, example: socks5://127.0.0.1:9150") @click.option("-v", "--verify", default=True, help="verify SSL when making the request") def videos( keywords: str, region: str, safesearch: str, timelimit: str | None, resolution: str | None, duration: str | None, license_videos: str | None, max_results: int | None, output: str | None, proxy: str | None, verify: bool, ) -> None: """CLI function to perform a videos search using DuckDuckGo API.""" data = DDGS(proxy=_expand_proxy_tb_alias(proxy), verify=verify).videos( keywords=keywords, region=region, safesearch=safesearch, timelimit=timelimit, resolution=resolution, duration=duration, license_videos=license_videos, max_results=max_results, ) keywords = _sanitize_keywords(keywords) if output: _save_data(keywords, data, function_name="videos", filename=output) else: _print_data(data) @cli.command() @click.option("-k", "--keywords", required=True, help="keywords for query") @click.option("-r", "--region", default="wt-wt", help="wt-wt, us-en, ru-ru, etc. -region https://duckduckgo.com/params") @click.option("-s", "--safesearch", default="moderate", type=click.Choice(["on", "moderate", "off"])) @click.option("-t", "--timelimit", type=click.Choice(["d", "w", "m", "y"]), help="day, week, month, year") @click.option("-m", "--max_results", type=int, help="maximum number of results") @click.option("-o", "--output", help="csv, json or filename.csv|json (save the results to a csv or json file)") @click.option("-p", "--proxy", help="the proxy to send requests, example: socks5://127.0.0.1:9150") @click.option("-v", "--verify", default=True, help="verify SSL when making the request") def news( keywords: str, region: str, safesearch: str, timelimit: str | None, max_results: int | None, output: str | None, proxy: str | None, verify: bool, ) -> None: """CLI function to perform a news search using DuckDuckGo API.""" data = DDGS(proxy=_expand_proxy_tb_alias(proxy), verify=verify).news( keywords=keywords, region=region, safesearch=safesearch, timelimit=timelimit, max_results=max_results ) keywords = _sanitize_keywords(keywords) if output: _save_data(keywords, data, function_name="news", filename=output) else: _print_data(data) if __name__ == "__main__": cli(prog_name="ddgs")
Memory