mirror of
https://github.com/searxng/searxng.git
synced 2026-06-11 12:27:52 +02:00
26801e92af
The initialization of the DB schema ("base schema") has so far been done on
demand, which causes race conditions with competing threads and processes.
The DDL statements for creating the "base schema" are now executed as part of
the initialization of the app.
Further improvements were made to harden the database applications:
- Wikidata & Radio-Browser engine perform their initialization only once (so far
the initialization was carried out in each thread/process).
- If multiple processes try to set DB's WAL mode when opening the DB at the same
time, this usually leads to another race condition, which is now also caught.
Related:
- https://github.com/searxng/searxng/issues/6181#issuecomment-4586705
Closes: #6181
Signed-off-by: Markus Heiser <markus.heiser@darmarit.de>
892 lines
32 KiB
Python
892 lines
32 KiB
Python
# SPDX-License-Identifier: AGPL-3.0-or-later
|
|
"""This module implements the Wikidata engine.
|
|
|
|
Some implementations are shared from :ref:`wikipedia engine`.
|
|
"""
|
|
# pylint: disable=missing-class-docstring
|
|
|
|
import typing as t
|
|
|
|
import os
|
|
from hashlib import md5
|
|
from urllib.parse import urlencode, unquote
|
|
from json import loads
|
|
|
|
from dateutil.parser import isoparse
|
|
from babel.dates import format_datetime, format_date, format_time, get_datetime_format
|
|
|
|
from searx.enginelib import EngineCache
|
|
from searx.data import WIKIDATA_UNITS
|
|
from searx.network import post, get
|
|
from searx.utils import searxng_useragent, get_string_replaces_function
|
|
from searx.external_urls import get_external_url, get_earth_coordinates_url, area_to_osm_zoom
|
|
from searx.engines.wikipedia import (
|
|
fetch_wikimedia_traits,
|
|
get_wiki_params,
|
|
)
|
|
from searx.enginelib.traits import EngineTraits
|
|
|
|
if t.TYPE_CHECKING:
|
|
from searx.extended_types import SXNG_Response
|
|
from searx.search.processors import OnlineParams
|
|
|
|
|
|
# about
|
|
about = {
|
|
"website": 'https://wikidata.org/',
|
|
"wikidata_id": 'Q2013',
|
|
"official_api_documentation": 'https://query.wikidata.org/',
|
|
"use_official_api": True,
|
|
"require_api_key": False,
|
|
"results": 'JSON',
|
|
}
|
|
|
|
display_type = ["infobox"]
|
|
"""A list of display types composed from ``infobox`` and ``list``. The latter
|
|
one will add a hit to the result list. The first one will show a hit in the
|
|
info box. Both values can be set, or one of the two can be set."""
|
|
|
|
CACHE: EngineCache
|
|
"""Persistent (SQLite) key/value cache that deletes its values after ``expire``
|
|
seconds."""
|
|
|
|
# SPARQL
|
|
SPARQL_ENDPOINT_URL = "https://query.wikidata.org/sparql"
|
|
SPARQL_EXPLAIN_URL = "https://query.wikidata.org/bigdata/namespace/wdq/sparql?explain"
|
|
WDPType = dict[str | tuple[str, str], str]
|
|
WIKIDATA_PROPERTIES: WDPType = {
|
|
"P434": "MusicBrainz",
|
|
"P435": "MusicBrainz",
|
|
"P436": "MusicBrainz",
|
|
"P966": "MusicBrainz",
|
|
"P345": "IMDb",
|
|
"P2397": "YouTube",
|
|
"P1651": "YouTube",
|
|
"P2002": "Twitter",
|
|
"P2013": "Facebook",
|
|
"P2003": "Instagram",
|
|
"P4033": "Mastodon",
|
|
"P11947": "Lemmy",
|
|
"P12622": "PeerTube",
|
|
}
|
|
|
|
# SERVICE wikibase:mwapi : https://www.mediawiki.org/wiki/Wikidata_Query_Service/User_Manual/MWAPI
|
|
# SERVICE wikibase:label: https://en.wikibooks.org/wiki/SPARQL/SERVICE_-_Label#Manual_Label_SERVICE
|
|
# https://en.wikibooks.org/wiki/SPARQL/WIKIDATA_Precision,_Units_and_Coordinates
|
|
# https://www.mediawiki.org/wiki/Wikibase/Indexing/RDF_Dump_Format#Data_model
|
|
# optimization:
|
|
# * https://www.wikidata.org/wiki/Wikidata:SPARQL_query_service/query_optimization
|
|
# * https://github.com/blazegraph/database/wiki/QueryHints
|
|
QUERY_TEMPLATE = """
|
|
SELECT ?item ?itemLabel ?itemDescription ?lat ?long %SELECT%
|
|
WHERE
|
|
{
|
|
SERVICE wikibase:mwapi {
|
|
bd:serviceParam wikibase:endpoint "www.wikidata.org";
|
|
wikibase:api "EntitySearch";
|
|
wikibase:limit 1;
|
|
mwapi:search "%QUERY%";
|
|
mwapi:language "%LANGUAGE%".
|
|
?item wikibase:apiOutputItem mwapi:item.
|
|
}
|
|
hint:Prior hint:runFirst "true".
|
|
|
|
%WHERE%
|
|
|
|
SERVICE wikibase:label {
|
|
bd:serviceParam wikibase:language "%LANGUAGE%,en".
|
|
?item rdfs:label ?itemLabel .
|
|
?item schema:description ?itemDescription .
|
|
%WIKIBASE_LABELS%
|
|
}
|
|
|
|
}
|
|
GROUP BY ?item ?itemLabel ?itemDescription ?lat ?long %GROUP_BY%
|
|
"""
|
|
|
|
# Get the calendar names and the property names
|
|
QUERY_PROPERTY_NAMES = """
|
|
SELECT ?item ?name
|
|
WHERE {
|
|
{
|
|
SELECT ?item
|
|
WHERE { ?item wdt:P279* wd:Q12132 }
|
|
} UNION {
|
|
VALUES ?item { %ATTRIBUTES% }
|
|
}
|
|
OPTIONAL { ?item rdfs:label ?name. }
|
|
}
|
|
"""
|
|
|
|
# see the property "dummy value" of https://www.wikidata.org/wiki/Q2013 (Wikidata)
|
|
# hard coded here to avoid to an additional SPARQL request when the server starts
|
|
DUMMY_ENTITY_URLS = set(
|
|
"http://www.wikidata.org/entity/" + wid for wid in ("Q4115189", "Q13406268", "Q15397819", "Q17339402")
|
|
)
|
|
|
|
|
|
# https://www.w3.org/TR/sparql11-query/#rSTRING_LITERAL1
|
|
# https://lists.w3.org/Archives/Public/public-rdf-dawg/2011OctDec/0175.html
|
|
sparql_string_escape = get_string_replaces_function(
|
|
# fmt: off
|
|
{
|
|
"\t": "\\\t",
|
|
"\n": "\\\n",
|
|
"\r": "\\\r",
|
|
"\b": "\\\b",
|
|
"\f": "\\\f",
|
|
"\"": "\\\"",
|
|
"\'": "\\\'",
|
|
"\\": "\\\\"
|
|
}
|
|
# fmt: on
|
|
)
|
|
|
|
replace_http_by_https = get_string_replaces_function({"http:": "https:"})
|
|
|
|
|
|
class WDAttribute:
|
|
|
|
def __init__(self, name: str):
|
|
self.name: str = name
|
|
|
|
def get_select(self):
|
|
return "(group_concat(distinct ?{name};separator=', ') as ?{name}s)".replace("{name}", self.name)
|
|
|
|
def get_label(self, language: str):
|
|
return get_label_for_entity(self.name, language)
|
|
|
|
def get_where(self):
|
|
return "OPTIONAL { ?item wdt:{name} ?{name} . }".replace("{name}", self.name)
|
|
|
|
def get_wikibase_label(self) -> str:
|
|
return ""
|
|
|
|
def get_group_by(self) -> str:
|
|
return ""
|
|
|
|
def get_str(self, result: dict[str, t.Any], language: str) -> str | None: # pylint: disable=unused-argument
|
|
return result.get(self.name + "s")
|
|
|
|
def __repr__(self):
|
|
return "<" + str(type(self).__name__) + ":" + self.name + ">"
|
|
|
|
|
|
class WDAmountAttribute(WDAttribute):
|
|
def get_select(self) -> str:
|
|
return "?{name} ?{name}Unit".replace("{name}", self.name)
|
|
|
|
def get_where(self):
|
|
return """ OPTIONAL { ?item p:{name} ?{name}Node .
|
|
?{name}Node rdf:type wikibase:BestRank ; ps:{name} ?{name} .
|
|
OPTIONAL { ?{name}Node psv:{name}/wikibase:quantityUnit ?{name}Unit. } }""".replace(
|
|
'{name}', self.name
|
|
)
|
|
|
|
def get_group_by(self) -> str:
|
|
return self.get_select()
|
|
|
|
def get_str(self, result: dict[str, t.Any], language: str) -> str | None:
|
|
value: str | None = result.get(self.name)
|
|
unit: str | None = result.get(self.name + "Unit")
|
|
if unit is not None:
|
|
unit = unit.replace("http://www.wikidata.org/entity/", "")
|
|
return str(value) + " " + get_label_for_entity(unit, language)
|
|
return value
|
|
|
|
|
|
class WDArticle(WDAttribute):
|
|
|
|
def __init__(self, language: str, kwargs: dict[str, t.Any] | None = None):
|
|
super().__init__("wikipedia")
|
|
self.language: str = language
|
|
self.kwargs: dict[str, t.Any] = kwargs or {}
|
|
|
|
def get_label(self, language: str):
|
|
# language parameter is ignored
|
|
return "Wikipedia ({language})".replace("{language}", self.language)
|
|
|
|
def get_select(self):
|
|
return "?article{language} ?articleName{language}".replace("{language}", self.language)
|
|
|
|
def get_where(self):
|
|
return """OPTIONAL { ?article{language} schema:about ?item ;
|
|
schema:inLanguage "{language}" ;
|
|
schema:isPartOf <https://{language}.wikipedia.org/> ;
|
|
schema:name ?articleName{language} . }""".replace(
|
|
'{language}', self.language
|
|
)
|
|
|
|
def get_group_by(self):
|
|
return self.get_select()
|
|
|
|
def get_str(self, result: dict[str, t.Any], language: str) -> str | None:
|
|
key = "article{language}".replace("{language}", self.language)
|
|
return result.get(key)
|
|
|
|
|
|
class WDLabelAttribute(WDAttribute):
|
|
def get_select(self):
|
|
return "(group_concat(distinct ?{name}Label;separator=', ') as ?{name}Labels)".replace("{name}", self.name)
|
|
|
|
def get_where(self):
|
|
return "OPTIONAL { ?item wdt:{name} ?{name} . }".replace("{name}", self.name)
|
|
|
|
def get_wikibase_label(self) -> str:
|
|
return "?{name} rdfs:label ?{name}Label .".replace("{name}", self.name)
|
|
|
|
def get_str(self, result: dict[str, t.Any], language: str) -> str | None:
|
|
return result.get(self.name + "Labels")
|
|
|
|
|
|
class WDURLAttribute(WDAttribute):
|
|
|
|
HTTP_WIKIMEDIA_IMAGE: str = "http://commons.wikimedia.org/wiki/Special:FilePath/"
|
|
|
|
def __init__(
|
|
self,
|
|
name: str,
|
|
url_id: str | None = None,
|
|
url_path_prefix: str | None = None,
|
|
kwargs: dict[str, t.Any] | None = None,
|
|
):
|
|
"""
|
|
:param url_id: ID matching one key in ``external_urls.json`` for
|
|
converting IDs to full URLs.
|
|
|
|
:param url_path_prefix: Path prefix if the values are of format
|
|
``account@domain``. If provided, value are rewritten to
|
|
``https://<domain><url_path_prefix><account>``. For example::
|
|
|
|
WDURLAttribute('P4033', url_path_prefix='/@')
|
|
|
|
Adds Property `P4033 <https://www.wikidata.org/wiki/Property:P4033>`_
|
|
to the wikidata query. This field might return for example
|
|
``libreoffice@fosstodon.org`` and the URL built from this is then:
|
|
|
|
- account: ``libreoffice``
|
|
- domain: ``fosstodon.org``
|
|
- result url: https://fosstodon.org/@libreoffice
|
|
"""
|
|
|
|
super().__init__(name)
|
|
self.url_id: str | None = url_id
|
|
self.url_path_prefix: str | None = url_path_prefix
|
|
self.kwargs: dict[str, t.Any] = kwargs or {}
|
|
|
|
def get_str(self, result: dict[str, t.Any], language: str) -> str | None:
|
|
value: str | None = result.get(self.name + "s")
|
|
if not value:
|
|
return None
|
|
|
|
value = value.split(",")[0]
|
|
if self.url_id:
|
|
url_id = self.url_id
|
|
if value.startswith(WDURLAttribute.HTTP_WIKIMEDIA_IMAGE):
|
|
value = value[len(WDURLAttribute.HTTP_WIKIMEDIA_IMAGE) :]
|
|
url_id = "wikimedia_image"
|
|
return get_external_url(url_id, value)
|
|
|
|
if self.url_path_prefix:
|
|
[account, domain] = [x.strip("@ ") for x in value.rsplit("@", 1)]
|
|
return f"https://{domain}{self.url_path_prefix}{account}"
|
|
|
|
return value
|
|
|
|
|
|
class WDGeoAttribute(WDAttribute):
|
|
def get_label(self, language: str):
|
|
return "OpenStreetMap"
|
|
|
|
def get_select(self):
|
|
return "?{name}Lat ?{name}Long".replace("{name}", self.name)
|
|
|
|
def get_where(self):
|
|
return """OPTIONAL { ?item p:{name}/psv:{name} [
|
|
wikibase:geoLatitude ?{name}Lat ;
|
|
wikibase:geoLongitude ?{name}Long ] }""".replace(
|
|
'{name}', self.name
|
|
)
|
|
|
|
def get_group_by(self):
|
|
return self.get_select()
|
|
|
|
def get_str(self, result: dict[str, t.Any], language: str) -> str | None:
|
|
latitude: str | None = result.get(self.name + "Lat")
|
|
longitude: str | None = result.get(self.name + "Long")
|
|
if latitude and longitude:
|
|
return latitude + " " + longitude
|
|
return None
|
|
|
|
def get_geo_url(self, result: dict[str, t.Any], osm_zoom: int = 19) -> str | None:
|
|
latitude: str | None = result.get(self.name + "Lat")
|
|
longitude: str | None = result.get(self.name + "Long")
|
|
if latitude and longitude:
|
|
return get_earth_coordinates_url(latitude, longitude, osm_zoom)
|
|
return None
|
|
|
|
|
|
class WDImageAttribute(WDURLAttribute):
|
|
|
|
def __init__(self, name: str, url_id: str | None = None, priority: int = 100):
|
|
super().__init__(name, url_id)
|
|
self.priority: int = priority
|
|
|
|
|
|
class WDDateAttribute(WDAttribute):
|
|
def get_select(self):
|
|
return "?{name} ?{name}timePrecision ?{name}timeZone ?{name}timeCalendar".replace("{name}", self.name)
|
|
|
|
def get_where(self):
|
|
# To remove duplicate, add
|
|
# FILTER NOT EXISTS { ?item p:{name}/psv:{name}/wikibase:timeValue ?{name}bis FILTER (?{name}bis < ?{name}) }
|
|
# this filter is too slow, so the response function ignore duplicate results
|
|
# (see the seen_entities variable)
|
|
return """OPTIONAL { ?item p:{name}/psv:{name} [
|
|
wikibase:timeValue ?{name} ;
|
|
wikibase:timePrecision ?{name}timePrecision ;
|
|
wikibase:timeTimezone ?{name}timeZone ;
|
|
wikibase:timeCalendarModel ?{name}timeCalendar ] . }
|
|
hint:Prior hint:rangeSafe true;""".replace(
|
|
'{name}', self.name
|
|
)
|
|
|
|
def get_group_by(self):
|
|
return self.get_select()
|
|
|
|
def format_8(self, value: str, locale: str) -> str: # pylint: disable=unused-argument
|
|
# precision: less than a year
|
|
return value
|
|
|
|
def format_9(self, value: str, locale: str) -> str:
|
|
year = int(value)
|
|
# precision: year
|
|
if year < 1584:
|
|
if year < 0:
|
|
return str(year - 1)
|
|
return str(year)
|
|
timestamp = isoparse(value)
|
|
return format_date(timestamp, format="yyyy", locale=locale)
|
|
|
|
def format_10(self, value: str, locale: str) -> str:
|
|
# precision: month
|
|
timestamp = isoparse(value)
|
|
return format_date(timestamp, format="MMMM y", locale=locale)
|
|
|
|
def format_11(self, value: str, locale: str) -> str:
|
|
# precision: day
|
|
timestamp = isoparse(value)
|
|
return format_date(timestamp, format="full", locale=locale)
|
|
|
|
def format_13(self, value: str, locale: str) -> str:
|
|
timestamp = isoparse(value)
|
|
# precision: minute
|
|
return (
|
|
get_datetime_format(format, locale=locale)
|
|
.replace("'", "")
|
|
.replace("{0}", format_time(timestamp, "full", tzinfo=None, locale=locale))
|
|
.replace("{1}", format_date(timestamp, "short", locale=locale))
|
|
)
|
|
|
|
def format_14(self, value: str, locale: str) -> str:
|
|
# precision: second.
|
|
return format_datetime(isoparse(value), format="full", locale=locale)
|
|
|
|
DATE_FORMAT: dict[str, tuple[str, int]] = {
|
|
"0": ("format_8", 1000000000),
|
|
"1": ("format_8", 100000000),
|
|
"2": ("format_8", 10000000),
|
|
"3": ("format_8", 1000000),
|
|
"4": ("format_8", 100000),
|
|
"5": ("format_8", 10000),
|
|
"6": ("format_8", 1000),
|
|
"7": ("format_8", 100),
|
|
"8": ("format_8", 10),
|
|
"9": ("format_9", 1), # year
|
|
"10": ("format_10", 1), # month
|
|
"11": ("format_11", 0), # day
|
|
"12": ("format_13", 0), # hour (not supported by babel, display minute)
|
|
"13": ("format_13", 0), # minute
|
|
"14": ("format_14", 0), # second
|
|
}
|
|
|
|
def get_str(self, result: dict[str, t.Any], language: str) -> str | None:
|
|
value: str | None = result.get(self.name)
|
|
if value == "" or value is None:
|
|
return None
|
|
_p: str = result.get(self.name + "timePrecision") or "1"
|
|
date_format = WDDateAttribute.DATE_FORMAT.get(_p)
|
|
if date_format is not None:
|
|
format_method = getattr(self, date_format[0])
|
|
precision: int = date_format[1]
|
|
try:
|
|
if precision >= 1:
|
|
_t = value.split("-")
|
|
if value.startswith("-"):
|
|
value = "-" + _t[1]
|
|
else:
|
|
value = _t[0]
|
|
return format_method(value, language)
|
|
except Exception: # pylint: disable=broad-except
|
|
return value
|
|
return value
|
|
|
|
|
|
WDAttrType = (
|
|
WDAttribute
|
|
| WDAmountAttribute
|
|
| WDArticle
|
|
| WDLabelAttribute
|
|
| WDURLAttribute
|
|
| WDGeoAttribute
|
|
| WDImageAttribute
|
|
| WDDateAttribute
|
|
)
|
|
WDAttrList = list[WDAttrType]
|
|
|
|
|
|
def get_headers() -> dict[str, str]:
|
|
# user agent: https://www.mediawiki.org/wiki/Wikidata_Query_Service/User_Manual#Query_limits
|
|
return {
|
|
"Accept": "application/sparql-results+json",
|
|
"User-Agent": f"wikidata engine - {searxng_useragent()}",
|
|
}
|
|
|
|
|
|
def get_label_for_entity(entity_id: str, language: str) -> str:
|
|
name = WIKIDATA_PROPERTIES.get(entity_id)
|
|
if name is None:
|
|
name = WIKIDATA_PROPERTIES.get((entity_id, language))
|
|
if name is None:
|
|
name = WIKIDATA_PROPERTIES.get((entity_id, language.split("-")[0]))
|
|
if name is None:
|
|
name = WIKIDATA_PROPERTIES.get((entity_id, "en"))
|
|
if name is None:
|
|
name = entity_id
|
|
return name
|
|
|
|
|
|
def send_wikidata_query(query: str, method: str = "GET", **kwargs: dict[str, t.Any]) -> dict[str, t.Any]:
|
|
if method == "GET":
|
|
# query will be cached by wikidata
|
|
http_response = get(SPARQL_ENDPOINT_URL + "?" + urlencode({"query": query}), headers=get_headers(), **kwargs)
|
|
else:
|
|
# query won't be cached by wikidata
|
|
http_response = post(SPARQL_ENDPOINT_URL, data={"query": query}, headers=get_headers(), **kwargs)
|
|
if http_response.status_code != 200:
|
|
logger.debug("SPARQL endpoint error %s", http_response.content.decode())
|
|
logger.debug("request time %s", str(http_response.elapsed))
|
|
http_response.raise_for_status()
|
|
return loads(http_response.content.decode())
|
|
|
|
|
|
def request(query: str, params: "OnlineParams") -> None:
|
|
|
|
attributes: WDAttrList
|
|
eng_tag, _wiki_netloc = get_wiki_params(params["searxng_locale"], traits)
|
|
query, attributes = get_query(query, eng_tag or "en")
|
|
logger.debug("request --> language %s // len(attributes): %s", eng_tag, len(attributes))
|
|
|
|
params["method"] = "POST"
|
|
params["url"] = SPARQL_ENDPOINT_URL
|
|
params["data"] = {"query": query}
|
|
params["headers"] = get_headers()
|
|
|
|
# additional parameters (not a part of OnlineParams)
|
|
params["language"] = eng_tag # type: ignore
|
|
params["attributes"] = attributes # type: ignore
|
|
|
|
|
|
def response(resp: "SXNG_Response") -> list[dict[str, t.Any]]:
|
|
|
|
results: list[dict[str, t.Any]] = []
|
|
jsonresponse = loads(resp.content.decode())
|
|
|
|
# additional parameters ..
|
|
language: str = resp.search_params["language"] # type: ignore
|
|
attributes: WDAttrList = resp.search_params["attributes"] # type: ignore
|
|
|
|
logger.debug("request --> language %s // len(attributes): %s", language, len(attributes))
|
|
|
|
seen_entities: set[str] = set()
|
|
for result in jsonresponse.get("results", {}).get("bindings", []):
|
|
attribute_result = {key: value["value"] for key, value in result.items()}
|
|
entity_url: str = attribute_result["item"]
|
|
if entity_url not in seen_entities and entity_url not in DUMMY_ENTITY_URLS:
|
|
seen_entities.add(entity_url)
|
|
results += get_results(attribute_result, attributes, language)
|
|
else:
|
|
logger.debug("The SPARQL request returns duplicate entities: %s", str(attribute_result))
|
|
|
|
return results
|
|
|
|
|
|
_IMG_SRC_DEFAULT_URL_PREFIX = "https://commons.wikimedia.org/wiki/Special:FilePath/"
|
|
_IMG_SRC_NEW_URL_PREFIX = "https://upload.wikimedia.org/wikipedia/commons/thumb/"
|
|
|
|
|
|
def get_thumbnail(img_src: str | None) -> str | None:
|
|
"""Get Thumbnail image from wikimedia commons
|
|
|
|
Images from commons.wikimedia.org are (HTTP) redirected to
|
|
upload.wikimedia.org. The redirected URL can be calculated by this
|
|
function.
|
|
|
|
- https://stackoverflow.com/a/33691240
|
|
|
|
"""
|
|
logger.debug("get_thumbnail(): %s", img_src)
|
|
if not img_src is None and _IMG_SRC_DEFAULT_URL_PREFIX in img_src.split()[0]:
|
|
img_src_name = unquote(img_src.replace(_IMG_SRC_DEFAULT_URL_PREFIX, "").split("?", 1)[0].replace("%20", "_"))
|
|
img_src_name_first = img_src_name
|
|
img_src_name_second = img_src_name
|
|
|
|
if ".svg" in img_src_name.split()[0]:
|
|
img_src_name_second = img_src_name + ".png"
|
|
|
|
img_src_size = img_src.replace(_IMG_SRC_DEFAULT_URL_PREFIX, "").split("?", 1)[1]
|
|
img_src_size = img_src_size[img_src_size.index("=") + 1 : img_src_size.index("&")]
|
|
img_src_name_md5 = md5(img_src_name.encode("utf-8")).hexdigest()
|
|
img_src = (
|
|
_IMG_SRC_NEW_URL_PREFIX
|
|
+ img_src_name_md5[0]
|
|
+ "/"
|
|
+ img_src_name_md5[0:2]
|
|
+ "/"
|
|
+ img_src_name_first
|
|
+ "/"
|
|
+ img_src_size
|
|
+ "px-"
|
|
+ img_src_name_second
|
|
)
|
|
logger.debug("get_thumbnail() redirected: %s", img_src)
|
|
|
|
return img_src
|
|
|
|
|
|
def get_results(
|
|
attribute_result: dict[str, t.Any],
|
|
attributes: WDAttrList,
|
|
language: str,
|
|
):
|
|
# pylint: disable=too-many-branches
|
|
results: list[dict[str, t.Any]] = []
|
|
infobox_title: str = attribute_result.get("itemLabel") # pyright: ignore[reportAssignmentType]
|
|
infobox_id = attribute_result["item"]
|
|
infobox_id_lang: str | None = None
|
|
infobox_urls: list[dict[str, str]] = []
|
|
infobox_attributes: list[dict[str, str]] = []
|
|
infobox_content = attribute_result.get("itemDescription", [])
|
|
img_src: str | None = None
|
|
img_src_priority = 0
|
|
|
|
for attribute in attributes:
|
|
value: str | None = attribute.get_str(attribute_result, language)
|
|
if value is not None and value != "":
|
|
|
|
if isinstance(attribute, (WDURLAttribute, WDArticle)):
|
|
# get_select() method : there is group_concat(distinct ...;separator=", ")
|
|
# split the value here
|
|
for url in value.split(", "):
|
|
infobox_urls.append({"title": attribute.get_label(language), "url": url, **attribute.kwargs})
|
|
# "normal" results (not infobox) include official website and Wikipedia links.
|
|
if "list" in display_type and (
|
|
attribute.kwargs.get("official") or isinstance(attribute, WDArticle)
|
|
):
|
|
results.append({"title": infobox_title, "url": url, "content": infobox_content})
|
|
|
|
# update the infobox_id with the wikipedia URL
|
|
# first the local wikipedia URL, and as fallback the english wikipedia URL
|
|
if isinstance(attribute, WDArticle) and (
|
|
(attribute.language == "en" and infobox_id_lang is None) or attribute.language != "en"
|
|
):
|
|
infobox_id_lang = attribute.language
|
|
infobox_id = url
|
|
elif isinstance(attribute, WDImageAttribute):
|
|
# this attribute is an image.
|
|
# replace the current image only the priority is lower
|
|
# (the infobox contain only one image).
|
|
if attribute.priority > img_src_priority:
|
|
img_src = get_thumbnail(value)
|
|
img_src_priority = attribute.priority
|
|
elif isinstance(attribute, WDGeoAttribute):
|
|
# geocoordinate link
|
|
# use the area to get the OSM zoom
|
|
# Note: ignore the unit (must be km² otherwise the calculation is wrong)
|
|
# Should use normalized value p:P2046/psn:P2046/wikibase:quantityAmount
|
|
area = attribute_result.get("P2046")
|
|
osm_zoom: int = area_to_osm_zoom(area) if area else 19
|
|
url = attribute.get_geo_url(attribute_result, osm_zoom=osm_zoom)
|
|
if url:
|
|
infobox_urls.append({"title": attribute.get_label(language), "url": url, "entity": attribute.name})
|
|
else:
|
|
infobox_attributes.append(
|
|
{"label": attribute.get_label(language), "value": value, "entity": attribute.name}
|
|
)
|
|
|
|
if infobox_id:
|
|
infobox_id = replace_http_by_https(infobox_id)
|
|
|
|
# add the wikidata URL at the end
|
|
infobox_urls.append({"title": "Wikidata", "url": attribute_result["item"]})
|
|
|
|
if (
|
|
"list" in display_type
|
|
and img_src is None
|
|
and len(infobox_attributes) == 0
|
|
and len(infobox_urls) == 1
|
|
and len(infobox_content) == 0
|
|
):
|
|
results.append({"url": infobox_urls[0]["url"], "title": infobox_title, "content": infobox_content})
|
|
elif "infobox" in display_type:
|
|
results.append(
|
|
{
|
|
"infobox": infobox_title,
|
|
"id": infobox_id,
|
|
"content": infobox_content,
|
|
"img_src": img_src,
|
|
"urls": infobox_urls,
|
|
"attributes": infobox_attributes,
|
|
}
|
|
)
|
|
return results
|
|
|
|
|
|
def get_query(query: str, language: str) -> tuple[str, WDAttrList]:
|
|
attributes = get_attributes(language)
|
|
select = [a.get_select() for a in attributes]
|
|
where = list(filter(lambda s: len(s) > 0, [a.get_where() for a in attributes]))
|
|
wikibase_label = list(filter(lambda s: len(s) > 0, [a.get_wikibase_label() for a in attributes]))
|
|
group_by = list(filter(lambda s: len(s) > 0, [a.get_group_by() for a in attributes]))
|
|
query = (
|
|
QUERY_TEMPLATE.replace("%QUERY%", sparql_string_escape(query))
|
|
.replace("%SELECT%", " ".join(select))
|
|
.replace("%WHERE%", "\n ".join(where))
|
|
.replace("%WIKIBASE_LABELS%", "\n ".join(wikibase_label))
|
|
.replace("%GROUP_BY%", " ".join(group_by))
|
|
.replace("%LANGUAGE%", language)
|
|
)
|
|
return query, attributes
|
|
|
|
|
|
def get_attributes(language: str):
|
|
# pylint: disable=too-many-statements
|
|
attributes: WDAttrList = []
|
|
|
|
def add_value(name: str):
|
|
attributes.append(WDAttribute(name))
|
|
|
|
def add_amount(name: str):
|
|
attributes.append(WDAmountAttribute(name))
|
|
|
|
def add_label(name: str):
|
|
attributes.append(WDLabelAttribute(name))
|
|
|
|
def add_url(name: str, url_id: str | None = None, url_path_prefix: str | None = None, **kwargs: dict[str, t.Any]):
|
|
attributes.append(WDURLAttribute(name, url_id, url_path_prefix, kwargs))
|
|
|
|
def add_image(name: str, url_id: str | None = None, priority: int = 1):
|
|
attributes.append(WDImageAttribute(name, url_id, priority))
|
|
|
|
def add_date(name: str):
|
|
attributes.append(WDDateAttribute(name))
|
|
|
|
# Dates
|
|
for p in [
|
|
"P571", # inception date
|
|
"P576", # dissolution date
|
|
"P580", # start date
|
|
"P582", # end date
|
|
"P569", # date of birth
|
|
"P570", # date of death
|
|
"P619", # date of spacecraft launch
|
|
"P620",
|
|
]: # date of spacecraft landing
|
|
add_date(p)
|
|
|
|
for p in [
|
|
"P27", # country of citizenship
|
|
"P495", # country of origin
|
|
"P17", # country
|
|
"P159",
|
|
]: # headquarters location
|
|
add_label(p)
|
|
|
|
# Places
|
|
for p in [
|
|
"P36", # capital
|
|
"P35", # head of state
|
|
"P6", # head of government
|
|
"P122", # basic form of government
|
|
"P37",
|
|
]: # official language
|
|
add_label(p)
|
|
|
|
add_value("P1082") # population
|
|
add_amount("P2046") # area
|
|
add_amount("P281") # postal code
|
|
add_label("P38") # currency
|
|
add_amount("P2048") # height (building)
|
|
|
|
# Media
|
|
for p in [
|
|
"P400", # platform (videogames, computing)
|
|
"P50", # author
|
|
"P170", # creator
|
|
"P57", # director
|
|
"P175", # performer
|
|
"P178", # developer
|
|
"P162", # producer
|
|
"P176", # manufacturer
|
|
"P58", # screenwriter
|
|
"P272", # production company
|
|
"P264", # record label
|
|
"P123", # publisher
|
|
"P449", # original network
|
|
"P750", # distributed by
|
|
"P86",
|
|
]: # composer
|
|
add_label(p)
|
|
|
|
add_date("P577") # publication date
|
|
add_label("P136") # genre (music, film, artistic...)
|
|
add_label("P364") # original language
|
|
add_value("P212") # ISBN-13
|
|
add_value("P957") # ISBN-10
|
|
add_label("P275") # copyright license
|
|
add_label("P277") # programming language
|
|
add_value("P348") # version
|
|
add_label("P840") # narrative location
|
|
|
|
# Languages
|
|
add_value("P1098") # number of speakers
|
|
add_label("P282") # writing system
|
|
add_label("P1018") # language regulatory body
|
|
add_value("P218") # language code (ISO 639-1)
|
|
|
|
# Other
|
|
add_label("P169") # ceo
|
|
add_label("P112") # founded by
|
|
add_label("P1454") # legal form (company, organization)
|
|
add_label("P137") # operator (service, facility, ...)
|
|
add_label("P1029") # crew members (tripulation)
|
|
add_label("P225") # taxon name
|
|
add_value("P274") # chemical formula
|
|
add_label("P1346") # winner (sports, contests, ...)
|
|
add_value("P1120") # number of deaths
|
|
add_value("P498") # currency code (ISO 4217)
|
|
|
|
# URL
|
|
kwargs: dict[str, t.Any] = {"official": True}
|
|
add_url("P856", **kwargs) # official website
|
|
attributes.append(WDArticle(language)) # wikipedia (user language)
|
|
if not language.startswith("en"):
|
|
attributes.append(WDArticle("en")) # wikipedia (english)
|
|
|
|
add_url("P1324") # source code repository
|
|
add_url("P1581") # blog
|
|
add_url("P434", url_id="musicbrainz_artist")
|
|
add_url("P435", url_id="musicbrainz_work")
|
|
add_url("P436", url_id="musicbrainz_release_group")
|
|
add_url("P966", url_id="musicbrainz_label")
|
|
add_url("P345", url_id="imdb_id")
|
|
add_url("P2397", url_id="youtube_channel")
|
|
add_url("P1651", url_id="youtube_video")
|
|
add_url("P2002", url_id="twitter_profile")
|
|
add_url("P2013", url_id="facebook_profile")
|
|
add_url("P2003", url_id="instagram_profile")
|
|
|
|
# Fediverse
|
|
add_url("P4033", url_path_prefix="/@") # Mastodon user
|
|
add_url("P11947", url_path_prefix="/c/") # Lemmy community
|
|
add_url("P12622", url_path_prefix="/c/") # PeerTube channel
|
|
|
|
# Map
|
|
attributes.append(WDGeoAttribute("P625"))
|
|
|
|
# Image
|
|
add_image("P15", priority=1, url_id="wikimedia_image") # route map
|
|
add_image("P242", priority=2, url_id="wikimedia_image") # locator map
|
|
add_image("P154", priority=3, url_id="wikimedia_image") # logo
|
|
add_image("P18", priority=4, url_id="wikimedia_image") # image
|
|
add_image("P41", priority=5, url_id="wikimedia_image") # flag
|
|
add_image("P2716", priority=6, url_id="wikimedia_image") # collage
|
|
add_image("P2910", priority=7, url_id="wikimedia_image") # icon
|
|
|
|
return attributes
|
|
|
|
|
|
def debug_explain_wikidata_query(query: str, method: str = "GET"):
|
|
if method == "GET":
|
|
http_response = get(SPARQL_EXPLAIN_URL + "&" + urlencode({"query": query}), headers=get_headers())
|
|
else:
|
|
http_response = post(SPARQL_EXPLAIN_URL, data={"query": query}, headers=get_headers())
|
|
http_response.raise_for_status()
|
|
return http_response.content
|
|
|
|
|
|
def init(_):
|
|
global CACHE # pylint: disable=global-statement
|
|
CACHE = EngineCache("wikidata")
|
|
|
|
# In an environment with competing processes, the initial loading of the
|
|
# cache is required only once.
|
|
eng_state: str | None = CACHE.get("eng_state")
|
|
if not eng_state or not eng_state.startswith("STATE:"):
|
|
CACHE.set("eng_state", f"STATE: being initialized by PID {os.getpid()}")
|
|
try:
|
|
init_wikidata_properties()
|
|
except Exception:
|
|
CACHE.set("eng_state", f"ERROR: initialization by PID {os.getpid()} failed.")
|
|
raise
|
|
else:
|
|
logger.debug(eng_state)
|
|
|
|
|
|
def init_wikidata_properties():
|
|
global WIKIDATA_PROPERTIES # pylint: disable=global-statement
|
|
p: WDPType = CACHE.get(key="WIKIDATA_PROPERTIES")
|
|
if p:
|
|
WIKIDATA_PROPERTIES = p
|
|
return
|
|
|
|
# WIKIDATA_PROPERTIES : add unit symbols
|
|
for k, v in WIKIDATA_UNITS.items():
|
|
WIKIDATA_PROPERTIES[k] = v["symbol"]
|
|
|
|
# WIKIDATA_PROPERTIES : add property labels
|
|
wikidata_property_names: list[str] = []
|
|
for attribute in get_attributes("en"):
|
|
if type(attribute) in (WDAttribute, WDAmountAttribute, WDURLAttribute, WDDateAttribute, WDLabelAttribute):
|
|
if attribute.name not in WIKIDATA_PROPERTIES:
|
|
wikidata_property_names.append("wd:" + attribute.name)
|
|
query = QUERY_PROPERTY_NAMES.replace("%ATTRIBUTES%", " ".join(wikidata_property_names))
|
|
kwargs: dict[str, t.Any] = {"timeout": 20}
|
|
jsonresponse = send_wikidata_query(query, **kwargs)
|
|
for result in jsonresponse.get("results", {}).get("bindings", {}):
|
|
name_field = result.get("name")
|
|
if not name_field:
|
|
continue
|
|
name = name_field["value"]
|
|
lang = name_field["xml:lang"]
|
|
entity_id = result["item"]["value"].replace("http://www.wikidata.org/entity/", "")
|
|
WIKIDATA_PROPERTIES[(entity_id, lang)] = name.capitalize()
|
|
|
|
CACHE.set(key="WIKIDATA_PROPERTIES", value=WIKIDATA_PROPERTIES)
|
|
|
|
|
|
def fetch_traits(engine_traits: EngineTraits):
|
|
"""Uses languages evaluated from :py:obj:`wikipedia.fetch_wikimedia_traits
|
|
<searx.engines.wikipedia.fetch_wikimedia_traits>` and removes
|
|
|
|
- ``traits.custom['wiki_netloc']``: wikidata does not have net-locations for
|
|
the languages and the list of all
|
|
|
|
- ``traits.custom['WIKIPEDIA_LANGUAGES']``: not used in the wikipedia engine
|
|
|
|
"""
|
|
|
|
fetch_wikimedia_traits(engine_traits)
|
|
engine_traits.custom["wiki_netloc"] = {}
|
|
engine_traits.custom["WIKIPEDIA_LANGUAGES"] = []
|