diff --git a/searx/cache.py b/searx/cache.py index ea58c9328..f7c256078 100644 --- a/searx/cache.py +++ b/searx/cache.py @@ -114,7 +114,13 @@ class ExpireCacheStats: if expire: valid_until = datetime.datetime.fromtimestamp(expire).strftime("%Y-%m-%d %H:%M:%S") c_kv += 1 - lines.append(f"[{ctx_name:20s}] {valid_until} {key:12}" f" --> ({type(value).__name__}) {value} ") + value_str = str(value) + if len(value_str) > 120: + value_str = f"{value_str[:120]} ..." + lines.append( + f"[{ctx_name:20s}] {valid_until} {key:12}" + f" --> ({type(value).__name__}:{len(value)}) {value_str} " + ) lines.append(f"Number of contexts: {c_ctx}") lines.append(f"number of key/value pairs: {c_kv}") diff --git a/searx/enginelib/__init__.py b/searx/enginelib/__init__.py index ad112df24..8eba15894 100644 --- a/searx/enginelib/__init__.py +++ b/searx/enginelib/__init__.py @@ -8,8 +8,8 @@ There is a command line for developer purposes and for deeper analysis. Here is an example in which the command line is called in the development environment:: - $ ./manage pyenv.cmd bash --norc --noprofile - (py3) python -m searx.enginelib --help + $ ./manage dev.env + (dev.env)$ python -m searx.enginelib --help .. hint:: @@ -46,6 +46,7 @@ ENGINES_CACHE: ExpireCacheSQLite = ExpireCacheSQLite.build_cache( name="ENGINES_CACHE", MAXHOLD_TIME=60 * 60 * 24 * 7, # 7 days MAINTENANCE_PERIOD=60 * 60, # 2h + MAX_VALUE_LEN=1024 * 1024 * 1024, # 1MB ) ) """Global :py:obj:`searx.cache.ExpireCacheSQLite` instance where the cached @@ -71,9 +72,9 @@ def state(): @app.command() -def maintenance(force: bool = True): +def maintenance(force: bool = True, truncate: bool = False): """Carry out maintenance on cache of the engines.""" - ENGINES_CACHE.maintenance(force=force) + ENGINES_CACHE.maintenance(force=force, truncate=truncate) class EngineCache: @@ -111,8 +112,8 @@ class EngineCache: For introspection of the DB, jump into developer environment and run command to show cache state:: - $ ./manage pyenv.cmd bash --norc --noprofile - (py3) python -m searx.enginelib cache state + $ ./manage dev.env + (dev.env)$ python -m searx.enginelib cache state cache tables and key/values =========================== diff --git a/searx/engines/wikidata.py b/searx/engines/wikidata.py index a5d65cb7d..3902f79e0 100644 --- a/searx/engines/wikidata.py +++ b/searx/engines/wikidata.py @@ -1,7 +1,7 @@ # SPDX-License-Identifier: AGPL-3.0-or-later -"""This module implements the Wikidata engine. Some implementations are shared -from :ref:`wikipedia engine`. +"""This module implements the Wikidata engine. +Some implementations are shared from :ref:`wikipedia engine`. """ # pylint: disable=missing-class-docstring @@ -14,6 +14,7 @@ from json import loads from dateutil.parser import isoparse from babel.dates import format_datetime, format_date, format_time, get_datetime_format +from searx.enginelib import EngineCache from searx.data import WIKIDATA_UNITS from searx.network import post, get from searx.utils import searxng_useragent, get_string_replaces_function @@ -44,11 +45,15 @@ display_type = ["infobox"] one will add a hit to the result list. The first one will show a hit in the info box. Both values can be set, or one of the two can be set.""" +CACHE: EngineCache +"""Persistent (SQLite) key/value cache that deletes its values after ``expire`` +seconds.""" # SPARQL SPARQL_ENDPOINT_URL = "https://query.wikidata.org/sparql" SPARQL_EXPLAIN_URL = "https://query.wikidata.org/bigdata/namespace/wdq/sparql?explain" -WIKIDATA_PROPERTIES: dict[str | tuple[str, str], str] = { +WDPType = dict[str | tuple[str, str], str] +WIKIDATA_PROPERTIES: WDPType = { "P434": "MusicBrainz", "P435": "MusicBrainz", "P436": "MusicBrainz", @@ -140,7 +145,6 @@ replace_http_by_https = get_string_replaces_function({"http:": "https:"}) class WDAttribute: - __slots__ = ("name",) def __init__(self, name: str): self.name: str = name @@ -154,13 +158,13 @@ class WDAttribute: def get_where(self): return "OPTIONAL { ?item wdt:{name} ?{name} . }".replace("{name}", self.name) - def get_wikibase_label(self): + def get_wikibase_label(self) -> str: return "" - def get_group_by(self): + def get_group_by(self) -> str: return "" - def get_str(self, result: dict[str, t.Any], language: str): # pylint: disable=unused-argument + def get_str(self, result: dict[str, t.Any], language: str) -> str | None: # pylint: disable=unused-argument return result.get(self.name + "s") def __repr__(self): @@ -168,7 +172,7 @@ class WDAttribute: class WDAmountAttribute(WDAttribute): - def get_select(self): + def get_select(self) -> str: return "?{name} ?{name}Unit".replace("{name}", self.name) def get_where(self): @@ -178,21 +182,21 @@ class WDAmountAttribute(WDAttribute): '{name}', self.name ) - def get_group_by(self): + def get_group_by(self) -> str: return self.get_select() - def get_str(self, result: dict[str, t.Any], language: str): - value = result.get(self.name) - unit = result.get(self.name + "Unit") + def get_str(self, result: dict[str, t.Any], language: str) -> str | None: + value: str | None = result.get(self.name) + unit: str | None = result.get(self.name + "Unit") if unit is not None: unit = unit.replace("http://www.wikidata.org/entity/", "") - return value + " " + get_label_for_entity(unit, language) + return str(value) + " " + get_label_for_entity(unit, language) return value class WDArticle(WDAttribute): - def __init__(self, language: str, kwargs=None): + def __init__(self, language: str, kwargs: dict[str, t.Any] | None = None): super().__init__("wikipedia") self.language: str = language self.kwargs: dict[str, t.Any] = kwargs or {} @@ -215,7 +219,7 @@ class WDArticle(WDAttribute): def get_group_by(self): return self.get_select() - def get_str(self, result, language: str): + def get_str(self, result: dict[str, t.Any], language: str) -> str | None: key = "article{language}".replace("{language}", self.language) return result.get(key) @@ -227,16 +231,16 @@ class WDLabelAttribute(WDAttribute): def get_where(self): return "OPTIONAL { ?item wdt:{name} ?{name} . }".replace("{name}", self.name) - def get_wikibase_label(self): + def get_wikibase_label(self) -> str: return "?{name} rdfs:label ?{name}Label .".replace("{name}", self.name) - def get_str(self, result, language): + def get_str(self, result: dict[str, t.Any], language: str) -> str | None: return result.get(self.name + "Labels") class WDURLAttribute(WDAttribute): - HTTP_WIKIMEDIA_IMAGE = "http://commons.wikimedia.org/wiki/Special:FilePath/" + HTTP_WIKIMEDIA_IMAGE: str = "http://commons.wikimedia.org/wiki/Special:FilePath/" def __init__( self, @@ -265,12 +269,12 @@ class WDURLAttribute(WDAttribute): """ super().__init__(name) - self.url_id = url_id - self.url_path_prefix = url_path_prefix - self.kwargs = kwargs + self.url_id: str | None = url_id + self.url_path_prefix: str | None = url_path_prefix + self.kwargs: dict[str, t.Any] = kwargs or {} - def get_str(self, result, language: str): - value = result.get(self.name + "s") + def get_str(self, result: dict[str, t.Any], language: str) -> str | None: + value: str | None = result.get(self.name + "s") if not value: return None @@ -306,16 +310,16 @@ class WDGeoAttribute(WDAttribute): def get_group_by(self): return self.get_select() - def get_str(self, result, language: str): - latitude = result.get(self.name + "Lat") - longitude = result.get(self.name + "Long") + def get_str(self, result: dict[str, t.Any], language: str) -> str | None: + latitude: str | None = result.get(self.name + "Lat") + longitude: str | None = result.get(self.name + "Long") if latitude and longitude: return latitude + " " + longitude return None - def get_geo_url(self, result, osm_zoom=19): - latitude = result.get(self.name + "Lat") - longitude = result.get(self.name + "Long") + def get_geo_url(self, result: dict[str, t.Any], osm_zoom: int = 19) -> str | None: + latitude: str | None = result.get(self.name + "Lat") + longitude: str | None = result.get(self.name + "Long") if latitude and longitude: return get_earth_coordinates_url(latitude, longitude, osm_zoom) return None @@ -323,9 +327,9 @@ class WDGeoAttribute(WDAttribute): class WDImageAttribute(WDURLAttribute): - def __init__(self, name, url_id=None, priority=100): + def __init__(self, name: str, url_id: str | None = None, priority: int = 100): super().__init__(name, url_id) - self.priority = priority + self.priority: int = priority class WDDateAttribute(WDAttribute): @@ -349,11 +353,11 @@ class WDDateAttribute(WDAttribute): def get_group_by(self): return self.get_select() - def format_8(self, value, locale: str): # pylint: disable=unused-argument + def format_8(self, value: str, locale: str) -> str: # pylint: disable=unused-argument # precision: less than a year return value - def format_9(self, value, locale: str): + def format_9(self, value: str, locale: str) -> str: year = int(value) # precision: year if year < 1584: @@ -363,17 +367,17 @@ class WDDateAttribute(WDAttribute): timestamp = isoparse(value) return format_date(timestamp, format="yyyy", locale=locale) - def format_10(self, value, locale: str): + def format_10(self, value: str, locale: str) -> str: # precision: month timestamp = isoparse(value) return format_date(timestamp, format="MMMM y", locale=locale) - def format_11(self, value, locale: str): + def format_11(self, value: str, locale: str) -> str: # precision: day timestamp = isoparse(value) return format_date(timestamp, format="full", locale=locale) - def format_13(self, value, locale: str): + def format_13(self, value: str, locale: str) -> str: timestamp = isoparse(value) # precision: minute return ( @@ -383,11 +387,11 @@ class WDDateAttribute(WDAttribute): .replace("{1}", format_date(timestamp, "short", locale=locale)) ) - def format_14(self, value, locale): + def format_14(self, value: str, locale: str) -> str: # precision: second. return format_datetime(isoparse(value), format="full", locale=locale) - DATE_FORMAT = { + DATE_FORMAT: dict[str, tuple[str, int]] = { "0": ("format_8", 1000000000), "1": ("format_8", 100000000), "2": ("format_8", 10000000), @@ -405,15 +409,15 @@ class WDDateAttribute(WDAttribute): "14": ("format_14", 0), # second } - def get_str(self, result, language): - value = result.get(self.name) + def get_str(self, result: dict[str, t.Any], language: str) -> str | None: + value: str | None = result.get(self.name) if value == "" or value is None: return None - precision = result.get(self.name + "timePrecision") - date_format = WDDateAttribute.DATE_FORMAT.get(precision) + _p: str = result.get(self.name + "timePrecision") or "1" + date_format = WDDateAttribute.DATE_FORMAT.get(_p) if date_format is not None: format_method = getattr(self, date_format[0]) - precision = date_format[1] + precision: int = date_format[1] try: if precision >= 1: _t = value.split("-") @@ -427,9 +431,25 @@ class WDDateAttribute(WDAttribute): return value +WDAttrType = ( + WDAttribute + | WDAmountAttribute + | WDArticle + | WDLabelAttribute + | WDURLAttribute + | WDGeoAttribute + | WDImageAttribute + | WDDateAttribute +) +WDAttrList = list[WDAttrType] + + def get_headers() -> dict[str, str]: # user agent: https://www.mediawiki.org/wiki/Wikidata_Query_Service/User_Manual#Query_limits - return {"Accept": "application/sparql-results+json", "User-Agent": searxng_useragent()} + return { + "Accept": "application/sparql-results+json", + "User-Agent": f"wikidata engine - {searxng_useragent()}", + } def get_label_for_entity(entity_id: str, language: str) -> str: @@ -445,7 +465,7 @@ def get_label_for_entity(entity_id: str, language: str) -> str: return name -def send_wikidata_query(query: str, method="GET", **kwargs) -> dict[str, t.Any]: +def send_wikidata_query(query: str, method: str = "GET", **kwargs: dict[str, t.Any]) -> dict[str, t.Any]: if method == "GET": # query will be cached by wikidata http_response = get(SPARQL_ENDPOINT_URL + "?" + urlencode({"query": query}), headers=get_headers(), **kwargs) @@ -461,15 +481,17 @@ def send_wikidata_query(query: str, method="GET", **kwargs) -> dict[str, t.Any]: def request(query: str, params: "OnlineParams") -> None: - attributes: tuple[str, list[WDAttribute | WDAmountAttribute | WDLabelAttribute | WDImageAttribute]] + attributes: WDAttrList eng_tag, _wiki_netloc = get_wiki_params(params["searxng_locale"], traits) - query, attributes = get_query(query, eng_tag) + query, attributes = get_query(query, eng_tag or "en") logger.debug("request --> language %s // len(attributes): %s", eng_tag, len(attributes)) params["method"] = "POST" params["url"] = SPARQL_ENDPOINT_URL params["data"] = {"query": query} params["headers"] = get_headers() + + # additional parameters (not a part of OnlineParams) params["language"] = eng_tag # type: ignore params["attributes"] = attributes # type: ignore @@ -479,14 +501,16 @@ def response(resp: "SXNG_Response") -> list[dict[str, t.Any]]: results: list[dict[str, t.Any]] = [] jsonresponse = loads(resp.content.decode()) + # additional parameters .. language: str = resp.search_params["language"] # type: ignore - attributes = resp.search_params["attributes"] # type: ignore + attributes: WDAttrList = resp.search_params["attributes"] # type: ignore + logger.debug("request --> language %s // len(attributes): %s", language, len(attributes)) seen_entities: set[str] = set() for result in jsonresponse.get("results", {}).get("bindings", []): attribute_result = {key: value["value"] for key, value in result.items()} - entity_url = attribute_result["item"] + entity_url: str = attribute_result["item"] if entity_url not in seen_entities and entity_url not in DUMMY_ENTITY_URLS: seen_entities.add(entity_url) results += get_results(attribute_result, attributes, language) @@ -500,7 +524,7 @@ _IMG_SRC_DEFAULT_URL_PREFIX = "https://commons.wikimedia.org/wiki/Special:FilePa _IMG_SRC_NEW_URL_PREFIX = "https://upload.wikimedia.org/wikipedia/commons/thumb/" -def get_thumbnail(img_src: str) -> str: +def get_thumbnail(img_src: str | None) -> str | None: """Get Thumbnail image from wikimedia commons Images from commons.wikimedia.org are (HTTP) redirected to @@ -539,53 +563,58 @@ def get_thumbnail(img_src: str) -> str: return img_src -def get_results(attribute_result: dict[str, t.Any], attributes, language): +def get_results( + attribute_result: dict[str, t.Any], + attributes: WDAttrList, + language: str, +): # pylint: disable=too-many-branches - results = [] - infobox_title = attribute_result.get("itemLabel") + results: list[dict[str, t.Any]] = [] + infobox_title: str = attribute_result.get("itemLabel") # pyright: ignore[reportAssignmentType] infobox_id = attribute_result["item"] - infobox_id_lang = None - infobox_urls = [] - infobox_attributes = [] + infobox_id_lang: str | None = None + infobox_urls: list[dict[str, str]] = [] + infobox_attributes: list[dict[str, str]] = [] infobox_content = attribute_result.get("itemDescription", []) - img_src = None + img_src: str | None = None img_src_priority = 0 for attribute in attributes: - value = attribute.get_str(attribute_result, language) + value: str | None = attribute.get_str(attribute_result, language) if value is not None and value != "": - attribute_type = type(attribute) - if attribute_type in (WDURLAttribute, WDArticle): + if isinstance(attribute, (WDURLAttribute, WDArticle)): # get_select() method : there is group_concat(distinct ...;separator=", ") # split the value here for url in value.split(", "): infobox_urls.append({"title": attribute.get_label(language), "url": url, **attribute.kwargs}) # "normal" results (not infobox) include official website and Wikipedia links. - if "list" in display_type and (attribute.kwargs.get("official") or attribute_type == WDArticle): + if "list" in display_type and ( + attribute.kwargs.get("official") or isinstance(attribute, WDArticle) + ): results.append({"title": infobox_title, "url": url, "content": infobox_content}) # update the infobox_id with the wikipedia URL # first the local wikipedia URL, and as fallback the english wikipedia URL - if attribute_type == WDArticle and ( + if isinstance(attribute, WDArticle) and ( (attribute.language == "en" and infobox_id_lang is None) or attribute.language != "en" ): infobox_id_lang = attribute.language infobox_id = url - elif attribute_type == WDImageAttribute: + elif isinstance(attribute, WDImageAttribute): # this attribute is an image. # replace the current image only the priority is lower # (the infobox contain only one image). if attribute.priority > img_src_priority: img_src = get_thumbnail(value) img_src_priority = attribute.priority - elif attribute_type == WDGeoAttribute: + elif isinstance(attribute, WDGeoAttribute): # geocoordinate link # use the area to get the OSM zoom # Note: ignore the unit (must be km² otherwise the calculation is wrong) # Should use normalized value p:P2046/psn:P2046/wikibase:quantityAmount area = attribute_result.get("P2046") - osm_zoom = area_to_osm_zoom(area) if area else 19 + osm_zoom: int = area_to_osm_zoom(area) if area else 19 url = attribute.get_geo_url(attribute_result, osm_zoom=osm_zoom) if url: infobox_urls.append({"title": attribute.get_label(language), "url": url, "entity": attribute.name}) @@ -622,9 +651,7 @@ def get_results(attribute_result: dict[str, t.Any], attributes, language): return results -def get_query( - query: str, language: str -) -> tuple[str, list[WDAttribute | WDAmountAttribute | WDLabelAttribute | WDImageAttribute]]: +def get_query(query: str, language: str) -> tuple[str, WDAttrList]: attributes = get_attributes(language) select = [a.get_select() for a in attributes] where = list(filter(lambda s: len(s) > 0, [a.get_where() for a in attributes])) @@ -643,7 +670,7 @@ def get_query( def get_attributes(language: str): # pylint: disable=too-many-statements - attributes: list[WDAttribute | WDAmountAttribute | WDLabelAttribute | WDImageAttribute] = [] + attributes: WDAttrList = [] def add_value(name: str): attributes.append(WDAttribute(name)) @@ -654,7 +681,7 @@ def get_attributes(language: str): def add_label(name: str): attributes.append(WDLabelAttribute(name)) - def add_url(name: str, url_id: str | None = None, url_path_prefix: str | None = None, **kwargs): + def add_url(name: str, url_id: str | None = None, url_path_prefix: str | None = None, **kwargs: dict[str, t.Any]): attributes.append(WDURLAttribute(name, url_id, url_path_prefix, kwargs)) def add_image(name: str, url_id: str | None = None, priority: int = 1): @@ -749,7 +776,8 @@ def get_attributes(language: str): add_value("P498") # currency code (ISO 4217) # URL - add_url("P856", official=True) # official website + kwargs: dict[str, t.Any] = {"official": True} + add_url("P856", **kwargs) # official website attributes.append(WDArticle(language)) # wikipedia (user language) if not language.startswith("en"): attributes.append(WDArticle("en")) # wikipedia (english) @@ -796,7 +824,19 @@ def debug_explain_wikidata_query(query: str, method: str = "GET"): return http_response.content -def init(engine_settings=None): # pylint: disable=unused-argument +def init(_): + global CACHE # pylint: disable=global-statement + CACHE = EngineCache("wikidata") + init_wikidata_properties() + + +def init_wikidata_properties(): + global WIKIDATA_PROPERTIES # pylint: disable=global-statement + p: WDPType = CACHE.get(key="WIKIDATA_PROPERTIES") + if p: + WIKIDATA_PROPERTIES = p + return + # WIKIDATA_PROPERTIES : add unit symbols for k, v in WIKIDATA_UNITS.items(): WIKIDATA_PROPERTIES[k] = v["symbol"] @@ -808,7 +848,8 @@ def init(engine_settings=None): # pylint: disable=unused-argument if attribute.name not in WIKIDATA_PROPERTIES: wikidata_property_names.append("wd:" + attribute.name) query = QUERY_PROPERTY_NAMES.replace("%ATTRIBUTES%", " ".join(wikidata_property_names)) - jsonresponse = send_wikidata_query(query, timeout=20) + kwargs: dict[str, t.Any] = {"timeout": 20} + jsonresponse = send_wikidata_query(query, **kwargs) for result in jsonresponse.get("results", {}).get("bindings", {}): name_field = result.get("name") if not name_field: @@ -818,6 +859,8 @@ def init(engine_settings=None): # pylint: disable=unused-argument entity_id = result["item"]["value"].replace("http://www.wikidata.org/entity/", "") WIKIDATA_PROPERTIES[(entity_id, lang)] = name.capitalize() + CACHE.set(key="WIKIDATA_PROPERTIES", value=WIKIDATA_PROPERTIES) + def fetch_traits(engine_traits: EngineTraits): """Uses languages evaluated from :py:obj:`wikipedia.fetch_wikimedia_traits diff --git a/searx/external_urls.py b/searx/external_urls.py index 8e243ec47..d834ac5bb 100644 --- a/searx/external_urls.py +++ b/searx/external_urls.py @@ -16,12 +16,12 @@ IMDB_PREFIX_TO_URL_ID = { HTTP_WIKIMEDIA_IMAGE = 'http://commons.wikimedia.org/wiki/Special:FilePath/' -def get_imdb_url_id(imdb_item_id): +def get_imdb_url_id(imdb_item_id: str): id_prefix = imdb_item_id[:2] return IMDB_PREFIX_TO_URL_ID.get(id_prefix) -def get_wikimedia_image_id(url): +def get_wikimedia_image_id(url: str): if url.startswith(HTTP_WIKIMEDIA_IMAGE): return url[len(HTTP_WIKIMEDIA_IMAGE) :] if url.startswith('File:'): @@ -29,7 +29,7 @@ def get_wikimedia_image_id(url): return url -def get_external_url(url_id, item_id, alternative="default"): +def get_external_url(url_id: str, item_id: str | None, alternative: str = "default") -> str | None: """Return an external URL or None if url_id is not found. url_id can take value from data/external_urls.json