mirror of
https://github.com/searxng/searxng.git
synced 2026-05-08 02:13:51 +02:00
836 lines
30 KiB
Python
836 lines
30 KiB
Python
# SPDX-License-Identifier: AGPL-3.0-or-later
|
|
"""This module implements the Wikidata engine. Some implementations are shared
|
|
from :ref:`wikipedia engine`.
|
|
|
|
"""
|
|
# pylint: disable=missing-class-docstring
|
|
|
|
import typing as t
|
|
|
|
from hashlib import md5
|
|
from urllib.parse import urlencode, unquote
|
|
from json import loads
|
|
|
|
from dateutil.parser import isoparse
|
|
from babel.dates import format_datetime, format_date, format_time, get_datetime_format
|
|
|
|
from searx.data import WIKIDATA_UNITS
|
|
from searx.network import post, get
|
|
from searx.utils import searxng_useragent, get_string_replaces_function
|
|
from searx.external_urls import get_external_url, get_earth_coordinates_url, area_to_osm_zoom
|
|
from searx.engines.wikipedia import (
|
|
fetch_wikimedia_traits,
|
|
get_wiki_params,
|
|
)
|
|
from searx.enginelib.traits import EngineTraits
|
|
|
|
if t.TYPE_CHECKING:
|
|
from searx.extended_types import SXNG_Response
|
|
from searx.search.processors import OnlineParams
|
|
|
|
|
|
# about
|
|
about = {
|
|
"website": 'https://wikidata.org/',
|
|
"wikidata_id": 'Q2013',
|
|
"official_api_documentation": 'https://query.wikidata.org/',
|
|
"use_official_api": True,
|
|
"require_api_key": False,
|
|
"results": 'JSON',
|
|
}
|
|
|
|
display_type = ["infobox"]
|
|
"""A list of display types composed from ``infobox`` and ``list``. The latter
|
|
one will add a hit to the result list. The first one will show a hit in the
|
|
info box. Both values can be set, or one of the two can be set."""
|
|
|
|
|
|
# SPARQL
|
|
SPARQL_ENDPOINT_URL = "https://query.wikidata.org/sparql"
|
|
SPARQL_EXPLAIN_URL = "https://query.wikidata.org/bigdata/namespace/wdq/sparql?explain"
|
|
WIKIDATA_PROPERTIES: dict[str | tuple[str, str], str] = {
|
|
"P434": "MusicBrainz",
|
|
"P435": "MusicBrainz",
|
|
"P436": "MusicBrainz",
|
|
"P966": "MusicBrainz",
|
|
"P345": "IMDb",
|
|
"P2397": "YouTube",
|
|
"P1651": "YouTube",
|
|
"P2002": "Twitter",
|
|
"P2013": "Facebook",
|
|
"P2003": "Instagram",
|
|
"P4033": "Mastodon",
|
|
"P11947": "Lemmy",
|
|
"P12622": "PeerTube",
|
|
}
|
|
|
|
# SERVICE wikibase:mwapi : https://www.mediawiki.org/wiki/Wikidata_Query_Service/User_Manual/MWAPI
|
|
# SERVICE wikibase:label: https://en.wikibooks.org/wiki/SPARQL/SERVICE_-_Label#Manual_Label_SERVICE
|
|
# https://en.wikibooks.org/wiki/SPARQL/WIKIDATA_Precision,_Units_and_Coordinates
|
|
# https://www.mediawiki.org/wiki/Wikibase/Indexing/RDF_Dump_Format#Data_model
|
|
# optimization:
|
|
# * https://www.wikidata.org/wiki/Wikidata:SPARQL_query_service/query_optimization
|
|
# * https://github.com/blazegraph/database/wiki/QueryHints
|
|
QUERY_TEMPLATE = """
|
|
SELECT ?item ?itemLabel ?itemDescription ?lat ?long %SELECT%
|
|
WHERE
|
|
{
|
|
SERVICE wikibase:mwapi {
|
|
bd:serviceParam wikibase:endpoint "www.wikidata.org";
|
|
wikibase:api "EntitySearch";
|
|
wikibase:limit 1;
|
|
mwapi:search "%QUERY%";
|
|
mwapi:language "%LANGUAGE%".
|
|
?item wikibase:apiOutputItem mwapi:item.
|
|
}
|
|
hint:Prior hint:runFirst "true".
|
|
|
|
%WHERE%
|
|
|
|
SERVICE wikibase:label {
|
|
bd:serviceParam wikibase:language "%LANGUAGE%,en".
|
|
?item rdfs:label ?itemLabel .
|
|
?item schema:description ?itemDescription .
|
|
%WIKIBASE_LABELS%
|
|
}
|
|
|
|
}
|
|
GROUP BY ?item ?itemLabel ?itemDescription ?lat ?long %GROUP_BY%
|
|
"""
|
|
|
|
# Get the calendar names and the property names
|
|
QUERY_PROPERTY_NAMES = """
|
|
SELECT ?item ?name
|
|
WHERE {
|
|
{
|
|
SELECT ?item
|
|
WHERE { ?item wdt:P279* wd:Q12132 }
|
|
} UNION {
|
|
VALUES ?item { %ATTRIBUTES% }
|
|
}
|
|
OPTIONAL { ?item rdfs:label ?name. }
|
|
}
|
|
"""
|
|
|
|
# see the property "dummy value" of https://www.wikidata.org/wiki/Q2013 (Wikidata)
|
|
# hard coded here to avoid to an additional SPARQL request when the server starts
|
|
DUMMY_ENTITY_URLS = set(
|
|
"http://www.wikidata.org/entity/" + wid for wid in ("Q4115189", "Q13406268", "Q15397819", "Q17339402")
|
|
)
|
|
|
|
|
|
# https://www.w3.org/TR/sparql11-query/#rSTRING_LITERAL1
|
|
# https://lists.w3.org/Archives/Public/public-rdf-dawg/2011OctDec/0175.html
|
|
sparql_string_escape = get_string_replaces_function(
|
|
# fmt: off
|
|
{
|
|
"\t": "\\\t",
|
|
"\n": "\\\n",
|
|
"\r": "\\\r",
|
|
"\b": "\\\b",
|
|
"\f": "\\\f",
|
|
"\"": "\\\"",
|
|
"\'": "\\\'",
|
|
"\\": "\\\\"
|
|
}
|
|
# fmt: on
|
|
)
|
|
|
|
replace_http_by_https = get_string_replaces_function({"http:": "https:"})
|
|
|
|
|
|
class WDAttribute:
|
|
__slots__ = ("name",)
|
|
|
|
def __init__(self, name: str):
|
|
self.name: str = name
|
|
|
|
def get_select(self):
|
|
return "(group_concat(distinct ?{name};separator=', ') as ?{name}s)".replace("{name}", self.name)
|
|
|
|
def get_label(self, language: str):
|
|
return get_label_for_entity(self.name, language)
|
|
|
|
def get_where(self):
|
|
return "OPTIONAL { ?item wdt:{name} ?{name} . }".replace("{name}", self.name)
|
|
|
|
def get_wikibase_label(self):
|
|
return ""
|
|
|
|
def get_group_by(self):
|
|
return ""
|
|
|
|
def get_str(self, result: dict[str, t.Any], language: str): # pylint: disable=unused-argument
|
|
return result.get(self.name + "s")
|
|
|
|
def __repr__(self):
|
|
return "<" + str(type(self).__name__) + ":" + self.name + ">"
|
|
|
|
|
|
class WDAmountAttribute(WDAttribute):
|
|
def get_select(self):
|
|
return "?{name} ?{name}Unit".replace("{name}", self.name)
|
|
|
|
def get_where(self):
|
|
return """ OPTIONAL { ?item p:{name} ?{name}Node .
|
|
?{name}Node rdf:type wikibase:BestRank ; ps:{name} ?{name} .
|
|
OPTIONAL { ?{name}Node psv:{name}/wikibase:quantityUnit ?{name}Unit. } }""".replace(
|
|
'{name}', self.name
|
|
)
|
|
|
|
def get_group_by(self):
|
|
return self.get_select()
|
|
|
|
def get_str(self, result: dict[str, t.Any], language: str):
|
|
value = result.get(self.name)
|
|
unit = result.get(self.name + "Unit")
|
|
if unit is not None:
|
|
unit = unit.replace("http://www.wikidata.org/entity/", "")
|
|
return value + " " + get_label_for_entity(unit, language)
|
|
return value
|
|
|
|
|
|
class WDArticle(WDAttribute):
|
|
|
|
def __init__(self, language: str, kwargs=None):
|
|
super().__init__("wikipedia")
|
|
self.language: str = language
|
|
self.kwargs: dict[str, t.Any] = kwargs or {}
|
|
|
|
def get_label(self, language: str):
|
|
# language parameter is ignored
|
|
return "Wikipedia ({language})".replace("{language}", self.language)
|
|
|
|
def get_select(self):
|
|
return "?article{language} ?articleName{language}".replace("{language}", self.language)
|
|
|
|
def get_where(self):
|
|
return """OPTIONAL { ?article{language} schema:about ?item ;
|
|
schema:inLanguage "{language}" ;
|
|
schema:isPartOf <https://{language}.wikipedia.org/> ;
|
|
schema:name ?articleName{language} . }""".replace(
|
|
'{language}', self.language
|
|
)
|
|
|
|
def get_group_by(self):
|
|
return self.get_select()
|
|
|
|
def get_str(self, result, language: str):
|
|
key = "article{language}".replace("{language}", self.language)
|
|
return result.get(key)
|
|
|
|
|
|
class WDLabelAttribute(WDAttribute):
|
|
def get_select(self):
|
|
return "(group_concat(distinct ?{name}Label;separator=', ') as ?{name}Labels)".replace("{name}", self.name)
|
|
|
|
def get_where(self):
|
|
return "OPTIONAL { ?item wdt:{name} ?{name} . }".replace("{name}", self.name)
|
|
|
|
def get_wikibase_label(self):
|
|
return "?{name} rdfs:label ?{name}Label .".replace("{name}", self.name)
|
|
|
|
def get_str(self, result, language):
|
|
return result.get(self.name + "Labels")
|
|
|
|
|
|
class WDURLAttribute(WDAttribute):
|
|
|
|
HTTP_WIKIMEDIA_IMAGE = "http://commons.wikimedia.org/wiki/Special:FilePath/"
|
|
|
|
def __init__(
|
|
self,
|
|
name: str,
|
|
url_id: str | None = None,
|
|
url_path_prefix: str | None = None,
|
|
kwargs: dict[str, t.Any] | None = None,
|
|
):
|
|
"""
|
|
:param url_id: ID matching one key in ``external_urls.json`` for
|
|
converting IDs to full URLs.
|
|
|
|
:param url_path_prefix: Path prefix if the values are of format
|
|
``account@domain``. If provided, value are rewritten to
|
|
``https://<domain><url_path_prefix><account>``. For example::
|
|
|
|
WDURLAttribute('P4033', url_path_prefix='/@')
|
|
|
|
Adds Property `P4033 <https://www.wikidata.org/wiki/Property:P4033>`_
|
|
to the wikidata query. This field might return for example
|
|
``libreoffice@fosstodon.org`` and the URL built from this is then:
|
|
|
|
- account: ``libreoffice``
|
|
- domain: ``fosstodon.org``
|
|
- result url: https://fosstodon.org/@libreoffice
|
|
"""
|
|
|
|
super().__init__(name)
|
|
self.url_id = url_id
|
|
self.url_path_prefix = url_path_prefix
|
|
self.kwargs = kwargs
|
|
|
|
def get_str(self, result, language: str):
|
|
value = result.get(self.name + "s")
|
|
if not value:
|
|
return None
|
|
|
|
value = value.split(",")[0]
|
|
if self.url_id:
|
|
url_id = self.url_id
|
|
if value.startswith(WDURLAttribute.HTTP_WIKIMEDIA_IMAGE):
|
|
value = value[len(WDURLAttribute.HTTP_WIKIMEDIA_IMAGE) :]
|
|
url_id = "wikimedia_image"
|
|
return get_external_url(url_id, value)
|
|
|
|
if self.url_path_prefix:
|
|
[account, domain] = [x.strip("@ ") for x in value.rsplit("@", 1)]
|
|
return f"https://{domain}{self.url_path_prefix}{account}"
|
|
|
|
return value
|
|
|
|
|
|
class WDGeoAttribute(WDAttribute):
|
|
def get_label(self, language: str):
|
|
return "OpenStreetMap"
|
|
|
|
def get_select(self):
|
|
return "?{name}Lat ?{name}Long".replace("{name}", self.name)
|
|
|
|
def get_where(self):
|
|
return """OPTIONAL { ?item p:{name}/psv:{name} [
|
|
wikibase:geoLatitude ?{name}Lat ;
|
|
wikibase:geoLongitude ?{name}Long ] }""".replace(
|
|
'{name}', self.name
|
|
)
|
|
|
|
def get_group_by(self):
|
|
return self.get_select()
|
|
|
|
def get_str(self, result, language: str):
|
|
latitude = result.get(self.name + "Lat")
|
|
longitude = result.get(self.name + "Long")
|
|
if latitude and longitude:
|
|
return latitude + " " + longitude
|
|
return None
|
|
|
|
def get_geo_url(self, result, osm_zoom=19):
|
|
latitude = result.get(self.name + "Lat")
|
|
longitude = result.get(self.name + "Long")
|
|
if latitude and longitude:
|
|
return get_earth_coordinates_url(latitude, longitude, osm_zoom)
|
|
return None
|
|
|
|
|
|
class WDImageAttribute(WDURLAttribute):
|
|
|
|
def __init__(self, name, url_id=None, priority=100):
|
|
super().__init__(name, url_id)
|
|
self.priority = priority
|
|
|
|
|
|
class WDDateAttribute(WDAttribute):
|
|
def get_select(self):
|
|
return "?{name} ?{name}timePrecision ?{name}timeZone ?{name}timeCalendar".replace("{name}", self.name)
|
|
|
|
def get_where(self):
|
|
# To remove duplicate, add
|
|
# FILTER NOT EXISTS { ?item p:{name}/psv:{name}/wikibase:timeValue ?{name}bis FILTER (?{name}bis < ?{name}) }
|
|
# this filter is too slow, so the response function ignore duplicate results
|
|
# (see the seen_entities variable)
|
|
return """OPTIONAL { ?item p:{name}/psv:{name} [
|
|
wikibase:timeValue ?{name} ;
|
|
wikibase:timePrecision ?{name}timePrecision ;
|
|
wikibase:timeTimezone ?{name}timeZone ;
|
|
wikibase:timeCalendarModel ?{name}timeCalendar ] . }
|
|
hint:Prior hint:rangeSafe true;""".replace(
|
|
'{name}', self.name
|
|
)
|
|
|
|
def get_group_by(self):
|
|
return self.get_select()
|
|
|
|
def format_8(self, value, locale: str): # pylint: disable=unused-argument
|
|
# precision: less than a year
|
|
return value
|
|
|
|
def format_9(self, value, locale: str):
|
|
year = int(value)
|
|
# precision: year
|
|
if year < 1584:
|
|
if year < 0:
|
|
return str(year - 1)
|
|
return str(year)
|
|
timestamp = isoparse(value)
|
|
return format_date(timestamp, format="yyyy", locale=locale)
|
|
|
|
def format_10(self, value, locale: str):
|
|
# precision: month
|
|
timestamp = isoparse(value)
|
|
return format_date(timestamp, format="MMMM y", locale=locale)
|
|
|
|
def format_11(self, value, locale: str):
|
|
# precision: day
|
|
timestamp = isoparse(value)
|
|
return format_date(timestamp, format="full", locale=locale)
|
|
|
|
def format_13(self, value, locale: str):
|
|
timestamp = isoparse(value)
|
|
# precision: minute
|
|
return (
|
|
get_datetime_format(format, locale=locale)
|
|
.replace("'", "")
|
|
.replace("{0}", format_time(timestamp, "full", tzinfo=None, locale=locale))
|
|
.replace("{1}", format_date(timestamp, "short", locale=locale))
|
|
)
|
|
|
|
def format_14(self, value, locale):
|
|
# precision: second.
|
|
return format_datetime(isoparse(value), format="full", locale=locale)
|
|
|
|
DATE_FORMAT = {
|
|
"0": ("format_8", 1000000000),
|
|
"1": ("format_8", 100000000),
|
|
"2": ("format_8", 10000000),
|
|
"3": ("format_8", 1000000),
|
|
"4": ("format_8", 100000),
|
|
"5": ("format_8", 10000),
|
|
"6": ("format_8", 1000),
|
|
"7": ("format_8", 100),
|
|
"8": ("format_8", 10),
|
|
"9": ("format_9", 1), # year
|
|
"10": ("format_10", 1), # month
|
|
"11": ("format_11", 0), # day
|
|
"12": ("format_13", 0), # hour (not supported by babel, display minute)
|
|
"13": ("format_13", 0), # minute
|
|
"14": ("format_14", 0), # second
|
|
}
|
|
|
|
def get_str(self, result, language):
|
|
value = result.get(self.name)
|
|
if value == "" or value is None:
|
|
return None
|
|
precision = result.get(self.name + "timePrecision")
|
|
date_format = WDDateAttribute.DATE_FORMAT.get(precision)
|
|
if date_format is not None:
|
|
format_method = getattr(self, date_format[0])
|
|
precision = date_format[1]
|
|
try:
|
|
if precision >= 1:
|
|
_t = value.split("-")
|
|
if value.startswith("-"):
|
|
value = "-" + _t[1]
|
|
else:
|
|
value = _t[0]
|
|
return format_method(value, language)
|
|
except Exception: # pylint: disable=broad-except
|
|
return value
|
|
return value
|
|
|
|
|
|
def get_headers() -> dict[str, str]:
|
|
# user agent: https://www.mediawiki.org/wiki/Wikidata_Query_Service/User_Manual#Query_limits
|
|
return {"Accept": "application/sparql-results+json", "User-Agent": searxng_useragent()}
|
|
|
|
|
|
def get_label_for_entity(entity_id: str, language: str) -> str:
|
|
name = WIKIDATA_PROPERTIES.get(entity_id)
|
|
if name is None:
|
|
name = WIKIDATA_PROPERTIES.get((entity_id, language))
|
|
if name is None:
|
|
name = WIKIDATA_PROPERTIES.get((entity_id, language.split("-")[0]))
|
|
if name is None:
|
|
name = WIKIDATA_PROPERTIES.get((entity_id, "en"))
|
|
if name is None:
|
|
name = entity_id
|
|
return name
|
|
|
|
|
|
def send_wikidata_query(query: str, method="GET", **kwargs) -> dict[str, t.Any]:
|
|
if method == "GET":
|
|
# query will be cached by wikidata
|
|
http_response = get(SPARQL_ENDPOINT_URL + "?" + urlencode({"query": query}), headers=get_headers(), **kwargs)
|
|
else:
|
|
# query won't be cached by wikidata
|
|
http_response = post(SPARQL_ENDPOINT_URL, data={"query": query}, headers=get_headers(), **kwargs)
|
|
if http_response.status_code != 200:
|
|
logger.debug("SPARQL endpoint error %s", http_response.content.decode())
|
|
logger.debug("request time %s", str(http_response.elapsed))
|
|
http_response.raise_for_status()
|
|
return loads(http_response.content.decode())
|
|
|
|
|
|
def request(query: str, params: "OnlineParams") -> None:
|
|
|
|
attributes: tuple[str, list[WDAttribute | WDAmountAttribute | WDLabelAttribute | WDImageAttribute]]
|
|
eng_tag, _wiki_netloc = get_wiki_params(params["searxng_locale"], traits)
|
|
query, attributes = get_query(query, eng_tag)
|
|
logger.debug("request --> language %s // len(attributes): %s", eng_tag, len(attributes))
|
|
|
|
params["method"] = "POST"
|
|
params["url"] = SPARQL_ENDPOINT_URL
|
|
params["data"] = {"query": query}
|
|
params["headers"] = get_headers()
|
|
params["language"] = eng_tag # type: ignore
|
|
params["attributes"] = attributes # type: ignore
|
|
|
|
|
|
def response(resp: "SXNG_Response") -> list[dict[str, t.Any]]:
|
|
|
|
results: list[dict[str, t.Any]] = []
|
|
jsonresponse = loads(resp.content.decode())
|
|
|
|
language: str = resp.search_params["language"] # type: ignore
|
|
attributes = resp.search_params["attributes"] # type: ignore
|
|
logger.debug("request --> language %s // len(attributes): %s", language, len(attributes))
|
|
|
|
seen_entities: set[str] = set()
|
|
for result in jsonresponse.get("results", {}).get("bindings", []):
|
|
attribute_result = {key: value["value"] for key, value in result.items()}
|
|
entity_url = attribute_result["item"]
|
|
if entity_url not in seen_entities and entity_url not in DUMMY_ENTITY_URLS:
|
|
seen_entities.add(entity_url)
|
|
results += get_results(attribute_result, attributes, language)
|
|
else:
|
|
logger.debug("The SPARQL request returns duplicate entities: %s", str(attribute_result))
|
|
|
|
return results
|
|
|
|
|
|
_IMG_SRC_DEFAULT_URL_PREFIX = "https://commons.wikimedia.org/wiki/Special:FilePath/"
|
|
_IMG_SRC_NEW_URL_PREFIX = "https://upload.wikimedia.org/wikipedia/commons/thumb/"
|
|
|
|
|
|
def get_thumbnail(img_src: str) -> str:
|
|
"""Get Thumbnail image from wikimedia commons
|
|
|
|
Images from commons.wikimedia.org are (HTTP) redirected to
|
|
upload.wikimedia.org. The redirected URL can be calculated by this
|
|
function.
|
|
|
|
- https://stackoverflow.com/a/33691240
|
|
|
|
"""
|
|
logger.debug("get_thumbnail(): %s", img_src)
|
|
if not img_src is None and _IMG_SRC_DEFAULT_URL_PREFIX in img_src.split()[0]:
|
|
img_src_name = unquote(img_src.replace(_IMG_SRC_DEFAULT_URL_PREFIX, "").split("?", 1)[0].replace("%20", "_"))
|
|
img_src_name_first = img_src_name
|
|
img_src_name_second = img_src_name
|
|
|
|
if ".svg" in img_src_name.split()[0]:
|
|
img_src_name_second = img_src_name + ".png"
|
|
|
|
img_src_size = img_src.replace(_IMG_SRC_DEFAULT_URL_PREFIX, "").split("?", 1)[1]
|
|
img_src_size = img_src_size[img_src_size.index("=") + 1 : img_src_size.index("&")]
|
|
img_src_name_md5 = md5(img_src_name.encode("utf-8")).hexdigest()
|
|
img_src = (
|
|
_IMG_SRC_NEW_URL_PREFIX
|
|
+ img_src_name_md5[0]
|
|
+ "/"
|
|
+ img_src_name_md5[0:2]
|
|
+ "/"
|
|
+ img_src_name_first
|
|
+ "/"
|
|
+ img_src_size
|
|
+ "px-"
|
|
+ img_src_name_second
|
|
)
|
|
logger.debug("get_thumbnail() redirected: %s", img_src)
|
|
|
|
return img_src
|
|
|
|
|
|
def get_results(attribute_result: dict[str, t.Any], attributes, language):
|
|
# pylint: disable=too-many-branches
|
|
results = []
|
|
infobox_title = attribute_result.get("itemLabel")
|
|
infobox_id = attribute_result["item"]
|
|
infobox_id_lang = None
|
|
infobox_urls = []
|
|
infobox_attributes = []
|
|
infobox_content = attribute_result.get("itemDescription", [])
|
|
img_src = None
|
|
img_src_priority = 0
|
|
|
|
for attribute in attributes:
|
|
value = attribute.get_str(attribute_result, language)
|
|
if value is not None and value != "":
|
|
attribute_type = type(attribute)
|
|
|
|
if attribute_type in (WDURLAttribute, WDArticle):
|
|
# get_select() method : there is group_concat(distinct ...;separator=", ")
|
|
# split the value here
|
|
for url in value.split(", "):
|
|
infobox_urls.append({"title": attribute.get_label(language), "url": url, **attribute.kwargs})
|
|
# "normal" results (not infobox) include official website and Wikipedia links.
|
|
if "list" in display_type and (attribute.kwargs.get("official") or attribute_type == WDArticle):
|
|
results.append({"title": infobox_title, "url": url, "content": infobox_content})
|
|
|
|
# update the infobox_id with the wikipedia URL
|
|
# first the local wikipedia URL, and as fallback the english wikipedia URL
|
|
if attribute_type == WDArticle and (
|
|
(attribute.language == "en" and infobox_id_lang is None) or attribute.language != "en"
|
|
):
|
|
infobox_id_lang = attribute.language
|
|
infobox_id = url
|
|
elif attribute_type == WDImageAttribute:
|
|
# this attribute is an image.
|
|
# replace the current image only the priority is lower
|
|
# (the infobox contain only one image).
|
|
if attribute.priority > img_src_priority:
|
|
img_src = get_thumbnail(value)
|
|
img_src_priority = attribute.priority
|
|
elif attribute_type == WDGeoAttribute:
|
|
# geocoordinate link
|
|
# use the area to get the OSM zoom
|
|
# Note: ignore the unit (must be km² otherwise the calculation is wrong)
|
|
# Should use normalized value p:P2046/psn:P2046/wikibase:quantityAmount
|
|
area = attribute_result.get("P2046")
|
|
osm_zoom = area_to_osm_zoom(area) if area else 19
|
|
url = attribute.get_geo_url(attribute_result, osm_zoom=osm_zoom)
|
|
if url:
|
|
infobox_urls.append({"title": attribute.get_label(language), "url": url, "entity": attribute.name})
|
|
else:
|
|
infobox_attributes.append(
|
|
{"label": attribute.get_label(language), "value": value, "entity": attribute.name}
|
|
)
|
|
|
|
if infobox_id:
|
|
infobox_id = replace_http_by_https(infobox_id)
|
|
|
|
# add the wikidata URL at the end
|
|
infobox_urls.append({"title": "Wikidata", "url": attribute_result["item"]})
|
|
|
|
if (
|
|
"list" in display_type
|
|
and img_src is None
|
|
and len(infobox_attributes) == 0
|
|
and len(infobox_urls) == 1
|
|
and len(infobox_content) == 0
|
|
):
|
|
results.append({"url": infobox_urls[0]["url"], "title": infobox_title, "content": infobox_content})
|
|
elif "infobox" in display_type:
|
|
results.append(
|
|
{
|
|
"infobox": infobox_title,
|
|
"id": infobox_id,
|
|
"content": infobox_content,
|
|
"img_src": img_src,
|
|
"urls": infobox_urls,
|
|
"attributes": infobox_attributes,
|
|
}
|
|
)
|
|
return results
|
|
|
|
|
|
def get_query(
|
|
query: str, language: str
|
|
) -> tuple[str, list[WDAttribute | WDAmountAttribute | WDLabelAttribute | WDImageAttribute]]:
|
|
attributes = get_attributes(language)
|
|
select = [a.get_select() for a in attributes]
|
|
where = list(filter(lambda s: len(s) > 0, [a.get_where() for a in attributes]))
|
|
wikibase_label = list(filter(lambda s: len(s) > 0, [a.get_wikibase_label() for a in attributes]))
|
|
group_by = list(filter(lambda s: len(s) > 0, [a.get_group_by() for a in attributes]))
|
|
query = (
|
|
QUERY_TEMPLATE.replace("%QUERY%", sparql_string_escape(query))
|
|
.replace("%SELECT%", " ".join(select))
|
|
.replace("%WHERE%", "\n ".join(where))
|
|
.replace("%WIKIBASE_LABELS%", "\n ".join(wikibase_label))
|
|
.replace("%GROUP_BY%", " ".join(group_by))
|
|
.replace("%LANGUAGE%", language)
|
|
)
|
|
return query, attributes
|
|
|
|
|
|
def get_attributes(language: str):
|
|
# pylint: disable=too-many-statements
|
|
attributes: list[WDAttribute | WDAmountAttribute | WDLabelAttribute | WDImageAttribute] = []
|
|
|
|
def add_value(name: str):
|
|
attributes.append(WDAttribute(name))
|
|
|
|
def add_amount(name: str):
|
|
attributes.append(WDAmountAttribute(name))
|
|
|
|
def add_label(name: str):
|
|
attributes.append(WDLabelAttribute(name))
|
|
|
|
def add_url(name: str, url_id: str | None = None, url_path_prefix: str | None = None, **kwargs):
|
|
attributes.append(WDURLAttribute(name, url_id, url_path_prefix, kwargs))
|
|
|
|
def add_image(name: str, url_id: str | None = None, priority: int = 1):
|
|
attributes.append(WDImageAttribute(name, url_id, priority))
|
|
|
|
def add_date(name: str):
|
|
attributes.append(WDDateAttribute(name))
|
|
|
|
# Dates
|
|
for p in [
|
|
"P571", # inception date
|
|
"P576", # dissolution date
|
|
"P580", # start date
|
|
"P582", # end date
|
|
"P569", # date of birth
|
|
"P570", # date of death
|
|
"P619", # date of spacecraft launch
|
|
"P620",
|
|
]: # date of spacecraft landing
|
|
add_date(p)
|
|
|
|
for p in [
|
|
"P27", # country of citizenship
|
|
"P495", # country of origin
|
|
"P17", # country
|
|
"P159",
|
|
]: # headquarters location
|
|
add_label(p)
|
|
|
|
# Places
|
|
for p in [
|
|
"P36", # capital
|
|
"P35", # head of state
|
|
"P6", # head of government
|
|
"P122", # basic form of government
|
|
"P37",
|
|
]: # official language
|
|
add_label(p)
|
|
|
|
add_value("P1082") # population
|
|
add_amount("P2046") # area
|
|
add_amount("P281") # postal code
|
|
add_label("P38") # currency
|
|
add_amount("P2048") # height (building)
|
|
|
|
# Media
|
|
for p in [
|
|
"P400", # platform (videogames, computing)
|
|
"P50", # author
|
|
"P170", # creator
|
|
"P57", # director
|
|
"P175", # performer
|
|
"P178", # developer
|
|
"P162", # producer
|
|
"P176", # manufacturer
|
|
"P58", # screenwriter
|
|
"P272", # production company
|
|
"P264", # record label
|
|
"P123", # publisher
|
|
"P449", # original network
|
|
"P750", # distributed by
|
|
"P86",
|
|
]: # composer
|
|
add_label(p)
|
|
|
|
add_date("P577") # publication date
|
|
add_label("P136") # genre (music, film, artistic...)
|
|
add_label("P364") # original language
|
|
add_value("P212") # ISBN-13
|
|
add_value("P957") # ISBN-10
|
|
add_label("P275") # copyright license
|
|
add_label("P277") # programming language
|
|
add_value("P348") # version
|
|
add_label("P840") # narrative location
|
|
|
|
# Languages
|
|
add_value("P1098") # number of speakers
|
|
add_label("P282") # writing system
|
|
add_label("P1018") # language regulatory body
|
|
add_value("P218") # language code (ISO 639-1)
|
|
|
|
# Other
|
|
add_label("P169") # ceo
|
|
add_label("P112") # founded by
|
|
add_label("P1454") # legal form (company, organization)
|
|
add_label("P137") # operator (service, facility, ...)
|
|
add_label("P1029") # crew members (tripulation)
|
|
add_label("P225") # taxon name
|
|
add_value("P274") # chemical formula
|
|
add_label("P1346") # winner (sports, contests, ...)
|
|
add_value("P1120") # number of deaths
|
|
add_value("P498") # currency code (ISO 4217)
|
|
|
|
# URL
|
|
add_url("P856", official=True) # official website
|
|
attributes.append(WDArticle(language)) # wikipedia (user language)
|
|
if not language.startswith("en"):
|
|
attributes.append(WDArticle("en")) # wikipedia (english)
|
|
|
|
add_url("P1324") # source code repository
|
|
add_url("P1581") # blog
|
|
add_url("P434", url_id="musicbrainz_artist")
|
|
add_url("P435", url_id="musicbrainz_work")
|
|
add_url("P436", url_id="musicbrainz_release_group")
|
|
add_url("P966", url_id="musicbrainz_label")
|
|
add_url("P345", url_id="imdb_id")
|
|
add_url("P2397", url_id="youtube_channel")
|
|
add_url("P1651", url_id="youtube_video")
|
|
add_url("P2002", url_id="twitter_profile")
|
|
add_url("P2013", url_id="facebook_profile")
|
|
add_url("P2003", url_id="instagram_profile")
|
|
|
|
# Fediverse
|
|
add_url("P4033", url_path_prefix="/@") # Mastodon user
|
|
add_url("P11947", url_path_prefix="/c/") # Lemmy community
|
|
add_url("P12622", url_path_prefix="/c/") # PeerTube channel
|
|
|
|
# Map
|
|
attributes.append(WDGeoAttribute("P625"))
|
|
|
|
# Image
|
|
add_image("P15", priority=1, url_id="wikimedia_image") # route map
|
|
add_image("P242", priority=2, url_id="wikimedia_image") # locator map
|
|
add_image("P154", priority=3, url_id="wikimedia_image") # logo
|
|
add_image("P18", priority=4, url_id="wikimedia_image") # image
|
|
add_image("P41", priority=5, url_id="wikimedia_image") # flag
|
|
add_image("P2716", priority=6, url_id="wikimedia_image") # collage
|
|
add_image("P2910", priority=7, url_id="wikimedia_image") # icon
|
|
|
|
return attributes
|
|
|
|
|
|
def debug_explain_wikidata_query(query: str, method: str = "GET"):
|
|
if method == "GET":
|
|
http_response = get(SPARQL_EXPLAIN_URL + "&" + urlencode({"query": query}), headers=get_headers())
|
|
else:
|
|
http_response = post(SPARQL_EXPLAIN_URL, data={"query": query}, headers=get_headers())
|
|
http_response.raise_for_status()
|
|
return http_response.content
|
|
|
|
|
|
def init(engine_settings=None): # pylint: disable=unused-argument
|
|
# WIKIDATA_PROPERTIES : add unit symbols
|
|
for k, v in WIKIDATA_UNITS.items():
|
|
WIKIDATA_PROPERTIES[k] = v["symbol"]
|
|
|
|
# WIKIDATA_PROPERTIES : add property labels
|
|
wikidata_property_names: list[str] = []
|
|
for attribute in get_attributes("en"):
|
|
if type(attribute) in (WDAttribute, WDAmountAttribute, WDURLAttribute, WDDateAttribute, WDLabelAttribute):
|
|
if attribute.name not in WIKIDATA_PROPERTIES:
|
|
wikidata_property_names.append("wd:" + attribute.name)
|
|
query = QUERY_PROPERTY_NAMES.replace("%ATTRIBUTES%", " ".join(wikidata_property_names))
|
|
jsonresponse = send_wikidata_query(query, timeout=20)
|
|
for result in jsonresponse.get("results", {}).get("bindings", {}):
|
|
name_field = result.get("name")
|
|
if not name_field:
|
|
continue
|
|
name = name_field["value"]
|
|
lang = name_field["xml:lang"]
|
|
entity_id = result["item"]["value"].replace("http://www.wikidata.org/entity/", "")
|
|
WIKIDATA_PROPERTIES[(entity_id, lang)] = name.capitalize()
|
|
|
|
|
|
def fetch_traits(engine_traits: EngineTraits):
|
|
"""Uses languages evaluated from :py:obj:`wikipedia.fetch_wikimedia_traits
|
|
<searx.engines.wikipedia.fetch_wikimedia_traits>` and removes
|
|
|
|
- ``traits.custom['wiki_netloc']``: wikidata does not have net-locations for
|
|
the languages and the list of all
|
|
|
|
- ``traits.custom['WIKIPEDIA_LANGUAGES']``: not used in the wikipedia engine
|
|
|
|
"""
|
|
|
|
fetch_wikimedia_traits(engine_traits)
|
|
engine_traits.custom["wiki_netloc"] = {}
|
|
engine_traits.custom["WIKIPEDIA_LANGUAGES"] = []
|