# SPDX-License-Identifier: AGPL-3.0-or-later # pylint: disable=invalid-name """Swisscows (general, images, videos)""" import base64 import codecs import hashlib import json import random from datetime import datetime from urllib.parse import urlencode import typing as t from searx.result_types import EngineResults, LegacyResult from searx.utils import humanize_number, html_to_text if t.TYPE_CHECKING: from searx.extended_types import SXNG_Response from searx.search.processors import OnlineParams about = { "website": "https://swisscows.com", "wikidata_id": "Q22937452", "official_api_documentation": None, "use_official_api": False, "require_api_key": False, "results": "JSON", } categories = ["general"] swisscows_category = "web" # possible: "web", "videos", "images" results_per_page = 50 time_range_support = True paging = True base_url = "https://api.swisscows.com" CAESAR_ALPHABET = "ABCDEFGHIJKLMNOPQRSTUVWXYZ" NONCE_ALPHABET = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789-._~" time_range_map = {"day": "Day", "week": "Week", "month": "Month", "year": "Year"} def generate_nonce(length: int = 32) -> str: """ Generate a random char sequence with the given length. """ return "".join([random.choice(NONCE_ALPHABET) for _ in range(length)]) def caesar_shift_with_switch_case(s: str, offset: int = 13) -> str: """ Caesar shift by :py:obj:`offset` that additionally inverts the casing of all letters (i.e. from lowercase to uppercase and vice versa). """ out = "" for c in s: if c.upper() in CAESAR_ALPHABET: alphabet_index = ord(c.upper()) - ord("A") shifted = CAESAR_ALPHABET[(alphabet_index + offset) % len(CAESAR_ALPHABET)] case_switched = shifted.lower() if c.isupper() else shifted.upper() out += case_switched else: out += c return out def sha256_hash_b64_url(s: str) -> str: """ Calculate the SHA256 hash and base64 URL-encodes it. """ hasher = hashlib.sha256() hasher.update(s.encode()) hashed_bytes = hasher.digest() # hashlib generates a byte digest, but since we need to convert it to base64, we # need to do that by hand hash_base64 = codecs.encode(hashed_bytes, "base64").decode("utf-8").rstrip('\n') hash_base64_url_encoded = hash_base64.replace("=", "").replace("+", '-').replace("/", '_') return hash_base64_url_encoded def generate_nonce_and_signature(base_path: str, args: dict[str, t.Any]) -> tuple[str, str]: """ Generate "X-Request-Nonce" and "X-Request-Signature" which are required for accessing Swisscows images (reverse engineered from their official website). """ nonce = generate_nonce() nonce_shifted = caesar_shift_with_switch_case(nonce, 13) # in the path, all keys must be sorted in alphabetic order, # otherwise the generated signature won't be accepted! # additionally, the values may not be URL encoded, they have to be plain text # hence we don't use urlencode here args_sorted = sorted(args.items(), key=lambda arg: arg[0]) query_string = "&".join(f"{key}={value}" for (key, value) in args_sorted) full_path = f"{base_path}?{query_string}" signature = sha256_hash_b64_url(full_path + nonce_shifted) return (nonce, signature) maximum_page_size = {"web": 20, "images": 50, "videos": 10} def init(_): if swisscows_category not in ("web", "images", "videos"): raise ValueError("illegal swisscows category: %s" % swisscows_category) if results_per_page > maximum_page_size[swisscows_category]: raise ValueError( "results_per_page for swisscows %s can be at most %d" % (swisscows_category, maximum_page_size[swisscows_category]) ) def request(query: str, params: "OnlineParams") -> None: # swisscows images only supports 2 pages if swisscows_category == "images" and params["pageno"] > 2: params["url"] = None return base_path = "" args = dict[str, t.Any] if swisscows_category == "web": freshness = "All" if params["time_range"]: freshness = time_range_map[params["time_range"]] args = { "freshness": freshness, "itemsCount": results_per_page, "locale": "en-US", "offset": (params["pageno"] - 1) * results_per_page, "query": query, "spellcheck": True, } base_path = "/v5/web/search" elif swisscows_category == "images": args = { "itemsCount": results_per_page, "locale": "en-US", "offset": (params["pageno"] - 1) * results_per_page, "query": query, "spellcheck": True, } base_path = "/v5/images/search" else: args = { "itemsCount": results_per_page, "offset": (params["pageno"] - 1) * results_per_page, "query": query, "region": "en-US", "spellcheck": True, } base_path = "/v2/videos/search" nonce, signature = generate_nonce_and_signature(base_path, args) params["headers"].update( { "X-Request-Nonce": nonce, "X-Request-Signature": signature, } ) params["url"] = f"{base_url}{base_path}?{urlencode(args)}" def _video_result(result: dict[str, t.Any]) -> LegacyResult: published_date = None if result.get("datePublished"): published_date = datetime.fromisoformat(result["datePublished"]) view_count = None if result.get("viewCount"): view_count = humanize_number(result["viewCount"]) return LegacyResult( { "template": "videos.html", "url": result["url"], "title": html_to_text(result.get("title") or result["name"]), "content": result["description"], "thumbnail": result.get("thumbnailUrl") or result.get("thumbnail", {}).get("url"), "length": result.get("duration"), "iframe_src": result.get("embedUrl"), "publishedDate": published_date, "views": view_count, } ) def response(resp: "SXNG_Response"): res = EngineResults() json_data = resp.json() # the payload encoding is only used for general and images, # for videos the data gets returned directly as a normal JSON response # payload is encoded as a JSON web token -> 3 parts, separated by "." # the actual data is in the center of the encoded string if "payload" in json_data: payload = json_data["payload"].split(".")[1] # pad with '=' to be valid base64 payload = payload + '=' * (4 - len(payload) % 4) decoded = base64.urlsafe_b64decode(payload) json_data = json.loads(decoded.decode()) for result in json_data["items"]: if result["type"] == "WebPage": res.add( res.types.MainResult( url=result["url"], title=result["name"], content=html_to_text(result["description"]), thumbnail=result.get("thumbnail", {}).get("url"), ) ) elif result["type"] == "VideoCollection": for video in result["hasPart"]: res.add(_video_result(video)) elif result["type"] == "ImageObject": res.add( res.types.LegacyResult( { "template": "images.html", "url": result["url"], "thumbnail_src": result["thumbnail"]["url"], "img_src": result["contentUrl"], "title": result["name"], } ) ) elif result["type"] == "video": res.add(_video_result(result)) return res