[fix] engine wikidata - fails to initialize with HTTP 403 (#6081)

In order not to be further blocked, the WIKIDATA_PROPERTIES are cached, which
drastically reduces the WD-SQL request.

BTW: improve type hints

Closes: https://github.com/searxng/searxng/issues/6051

Signed-off-by: Markus Heiser <markus.heiser@darmarit.de>
This commit is contained in:
Markus Heiser
2026-05-15 16:21:47 +02:00
committed by GitHub
parent 240f403d93
commit afafca93f3
4 changed files with 133 additions and 83 deletions
+7 -1
View File
@@ -114,7 +114,13 @@ class ExpireCacheStats:
if expire: if expire:
valid_until = datetime.datetime.fromtimestamp(expire).strftime("%Y-%m-%d %H:%M:%S") valid_until = datetime.datetime.fromtimestamp(expire).strftime("%Y-%m-%d %H:%M:%S")
c_kv += 1 c_kv += 1
lines.append(f"[{ctx_name:20s}] {valid_until} {key:12}" f" --> ({type(value).__name__}) {value} ") value_str = str(value)
if len(value_str) > 120:
value_str = f"{value_str[:120]} ..."
lines.append(
f"[{ctx_name:20s}] {valid_until} {key:12}"
f" --> ({type(value).__name__}:{len(value)}) {value_str} "
)
lines.append(f"Number of contexts: {c_ctx}") lines.append(f"Number of contexts: {c_ctx}")
lines.append(f"number of key/value pairs: {c_kv}") lines.append(f"number of key/value pairs: {c_kv}")
+7 -6
View File
@@ -8,8 +8,8 @@
There is a command line for developer purposes and for deeper analysis. Here is There is a command line for developer purposes and for deeper analysis. Here is
an example in which the command line is called in the development environment:: an example in which the command line is called in the development environment::
$ ./manage pyenv.cmd bash --norc --noprofile $ ./manage dev.env
(py3) python -m searx.enginelib --help (dev.env)$ python -m searx.enginelib --help
.. hint:: .. hint::
@@ -46,6 +46,7 @@ ENGINES_CACHE: ExpireCacheSQLite = ExpireCacheSQLite.build_cache(
name="ENGINES_CACHE", name="ENGINES_CACHE",
MAXHOLD_TIME=60 * 60 * 24 * 7, # 7 days MAXHOLD_TIME=60 * 60 * 24 * 7, # 7 days
MAINTENANCE_PERIOD=60 * 60, # 2h MAINTENANCE_PERIOD=60 * 60, # 2h
MAX_VALUE_LEN=1024 * 1024 * 1024, # 1MB
) )
) )
"""Global :py:obj:`searx.cache.ExpireCacheSQLite` instance where the cached """Global :py:obj:`searx.cache.ExpireCacheSQLite` instance where the cached
@@ -71,9 +72,9 @@ def state():
@app.command() @app.command()
def maintenance(force: bool = True): def maintenance(force: bool = True, truncate: bool = False):
"""Carry out maintenance on cache of the engines.""" """Carry out maintenance on cache of the engines."""
ENGINES_CACHE.maintenance(force=force) ENGINES_CACHE.maintenance(force=force, truncate=truncate)
class EngineCache: class EngineCache:
@@ -111,8 +112,8 @@ class EngineCache:
For introspection of the DB, jump into developer environment and run command to For introspection of the DB, jump into developer environment and run command to
show cache state:: show cache state::
$ ./manage pyenv.cmd bash --norc --noprofile $ ./manage dev.env
(py3) python -m searx.enginelib cache state (dev.env)$ python -m searx.enginelib cache state
cache tables and key/values cache tables and key/values
=========================== ===========================
+116 -73
View File
@@ -1,7 +1,7 @@
# SPDX-License-Identifier: AGPL-3.0-or-later # SPDX-License-Identifier: AGPL-3.0-or-later
"""This module implements the Wikidata engine. Some implementations are shared """This module implements the Wikidata engine.
from :ref:`wikipedia engine`.
Some implementations are shared from :ref:`wikipedia engine`.
""" """
# pylint: disable=missing-class-docstring # pylint: disable=missing-class-docstring
@@ -14,6 +14,7 @@ from json import loads
from dateutil.parser import isoparse from dateutil.parser import isoparse
from babel.dates import format_datetime, format_date, format_time, get_datetime_format from babel.dates import format_datetime, format_date, format_time, get_datetime_format
from searx.enginelib import EngineCache
from searx.data import WIKIDATA_UNITS from searx.data import WIKIDATA_UNITS
from searx.network import post, get from searx.network import post, get
from searx.utils import searxng_useragent, get_string_replaces_function from searx.utils import searxng_useragent, get_string_replaces_function
@@ -44,11 +45,15 @@ display_type = ["infobox"]
one will add a hit to the result list. The first one will show a hit in the one will add a hit to the result list. The first one will show a hit in the
info box. Both values can be set, or one of the two can be set.""" info box. Both values can be set, or one of the two can be set."""
CACHE: EngineCache
"""Persistent (SQLite) key/value cache that deletes its values after ``expire``
seconds."""
# SPARQL # SPARQL
SPARQL_ENDPOINT_URL = "https://query.wikidata.org/sparql" SPARQL_ENDPOINT_URL = "https://query.wikidata.org/sparql"
SPARQL_EXPLAIN_URL = "https://query.wikidata.org/bigdata/namespace/wdq/sparql?explain" SPARQL_EXPLAIN_URL = "https://query.wikidata.org/bigdata/namespace/wdq/sparql?explain"
WIKIDATA_PROPERTIES: dict[str | tuple[str, str], str] = { WDPType = dict[str | tuple[str, str], str]
WIKIDATA_PROPERTIES: WDPType = {
"P434": "MusicBrainz", "P434": "MusicBrainz",
"P435": "MusicBrainz", "P435": "MusicBrainz",
"P436": "MusicBrainz", "P436": "MusicBrainz",
@@ -140,7 +145,6 @@ replace_http_by_https = get_string_replaces_function({"http:": "https:"})
class WDAttribute: class WDAttribute:
__slots__ = ("name",)
def __init__(self, name: str): def __init__(self, name: str):
self.name: str = name self.name: str = name
@@ -154,13 +158,13 @@ class WDAttribute:
def get_where(self): def get_where(self):
return "OPTIONAL { ?item wdt:{name} ?{name} . }".replace("{name}", self.name) return "OPTIONAL { ?item wdt:{name} ?{name} . }".replace("{name}", self.name)
def get_wikibase_label(self): def get_wikibase_label(self) -> str:
return "" return ""
def get_group_by(self): def get_group_by(self) -> str:
return "" return ""
def get_str(self, result: dict[str, t.Any], language: str): # pylint: disable=unused-argument def get_str(self, result: dict[str, t.Any], language: str) -> str | None: # pylint: disable=unused-argument
return result.get(self.name + "s") return result.get(self.name + "s")
def __repr__(self): def __repr__(self):
@@ -168,7 +172,7 @@ class WDAttribute:
class WDAmountAttribute(WDAttribute): class WDAmountAttribute(WDAttribute):
def get_select(self): def get_select(self) -> str:
return "?{name} ?{name}Unit".replace("{name}", self.name) return "?{name} ?{name}Unit".replace("{name}", self.name)
def get_where(self): def get_where(self):
@@ -178,21 +182,21 @@ class WDAmountAttribute(WDAttribute):
'{name}', self.name '{name}', self.name
) )
def get_group_by(self): def get_group_by(self) -> str:
return self.get_select() return self.get_select()
def get_str(self, result: dict[str, t.Any], language: str): def get_str(self, result: dict[str, t.Any], language: str) -> str | None:
value = result.get(self.name) value: str | None = result.get(self.name)
unit = result.get(self.name + "Unit") unit: str | None = result.get(self.name + "Unit")
if unit is not None: if unit is not None:
unit = unit.replace("http://www.wikidata.org/entity/", "") unit = unit.replace("http://www.wikidata.org/entity/", "")
return value + " " + get_label_for_entity(unit, language) return str(value) + " " + get_label_for_entity(unit, language)
return value return value
class WDArticle(WDAttribute): class WDArticle(WDAttribute):
def __init__(self, language: str, kwargs=None): def __init__(self, language: str, kwargs: dict[str, t.Any] | None = None):
super().__init__("wikipedia") super().__init__("wikipedia")
self.language: str = language self.language: str = language
self.kwargs: dict[str, t.Any] = kwargs or {} self.kwargs: dict[str, t.Any] = kwargs or {}
@@ -215,7 +219,7 @@ class WDArticle(WDAttribute):
def get_group_by(self): def get_group_by(self):
return self.get_select() return self.get_select()
def get_str(self, result, language: str): def get_str(self, result: dict[str, t.Any], language: str) -> str | None:
key = "article{language}".replace("{language}", self.language) key = "article{language}".replace("{language}", self.language)
return result.get(key) return result.get(key)
@@ -227,16 +231,16 @@ class WDLabelAttribute(WDAttribute):
def get_where(self): def get_where(self):
return "OPTIONAL { ?item wdt:{name} ?{name} . }".replace("{name}", self.name) return "OPTIONAL { ?item wdt:{name} ?{name} . }".replace("{name}", self.name)
def get_wikibase_label(self): def get_wikibase_label(self) -> str:
return "?{name} rdfs:label ?{name}Label .".replace("{name}", self.name) return "?{name} rdfs:label ?{name}Label .".replace("{name}", self.name)
def get_str(self, result, language): def get_str(self, result: dict[str, t.Any], language: str) -> str | None:
return result.get(self.name + "Labels") return result.get(self.name + "Labels")
class WDURLAttribute(WDAttribute): class WDURLAttribute(WDAttribute):
HTTP_WIKIMEDIA_IMAGE = "http://commons.wikimedia.org/wiki/Special:FilePath/" HTTP_WIKIMEDIA_IMAGE: str = "http://commons.wikimedia.org/wiki/Special:FilePath/"
def __init__( def __init__(
self, self,
@@ -265,12 +269,12 @@ class WDURLAttribute(WDAttribute):
""" """
super().__init__(name) super().__init__(name)
self.url_id = url_id self.url_id: str | None = url_id
self.url_path_prefix = url_path_prefix self.url_path_prefix: str | None = url_path_prefix
self.kwargs = kwargs self.kwargs: dict[str, t.Any] = kwargs or {}
def get_str(self, result, language: str): def get_str(self, result: dict[str, t.Any], language: str) -> str | None:
value = result.get(self.name + "s") value: str | None = result.get(self.name + "s")
if not value: if not value:
return None return None
@@ -306,16 +310,16 @@ class WDGeoAttribute(WDAttribute):
def get_group_by(self): def get_group_by(self):
return self.get_select() return self.get_select()
def get_str(self, result, language: str): def get_str(self, result: dict[str, t.Any], language: str) -> str | None:
latitude = result.get(self.name + "Lat") latitude: str | None = result.get(self.name + "Lat")
longitude = result.get(self.name + "Long") longitude: str | None = result.get(self.name + "Long")
if latitude and longitude: if latitude and longitude:
return latitude + " " + longitude return latitude + " " + longitude
return None return None
def get_geo_url(self, result, osm_zoom=19): def get_geo_url(self, result: dict[str, t.Any], osm_zoom: int = 19) -> str | None:
latitude = result.get(self.name + "Lat") latitude: str | None = result.get(self.name + "Lat")
longitude = result.get(self.name + "Long") longitude: str | None = result.get(self.name + "Long")
if latitude and longitude: if latitude and longitude:
return get_earth_coordinates_url(latitude, longitude, osm_zoom) return get_earth_coordinates_url(latitude, longitude, osm_zoom)
return None return None
@@ -323,9 +327,9 @@ class WDGeoAttribute(WDAttribute):
class WDImageAttribute(WDURLAttribute): class WDImageAttribute(WDURLAttribute):
def __init__(self, name, url_id=None, priority=100): def __init__(self, name: str, url_id: str | None = None, priority: int = 100):
super().__init__(name, url_id) super().__init__(name, url_id)
self.priority = priority self.priority: int = priority
class WDDateAttribute(WDAttribute): class WDDateAttribute(WDAttribute):
@@ -349,11 +353,11 @@ class WDDateAttribute(WDAttribute):
def get_group_by(self): def get_group_by(self):
return self.get_select() return self.get_select()
def format_8(self, value, locale: str): # pylint: disable=unused-argument def format_8(self, value: str, locale: str) -> str: # pylint: disable=unused-argument
# precision: less than a year # precision: less than a year
return value return value
def format_9(self, value, locale: str): def format_9(self, value: str, locale: str) -> str:
year = int(value) year = int(value)
# precision: year # precision: year
if year < 1584: if year < 1584:
@@ -363,17 +367,17 @@ class WDDateAttribute(WDAttribute):
timestamp = isoparse(value) timestamp = isoparse(value)
return format_date(timestamp, format="yyyy", locale=locale) return format_date(timestamp, format="yyyy", locale=locale)
def format_10(self, value, locale: str): def format_10(self, value: str, locale: str) -> str:
# precision: month # precision: month
timestamp = isoparse(value) timestamp = isoparse(value)
return format_date(timestamp, format="MMMM y", locale=locale) return format_date(timestamp, format="MMMM y", locale=locale)
def format_11(self, value, locale: str): def format_11(self, value: str, locale: str) -> str:
# precision: day # precision: day
timestamp = isoparse(value) timestamp = isoparse(value)
return format_date(timestamp, format="full", locale=locale) return format_date(timestamp, format="full", locale=locale)
def format_13(self, value, locale: str): def format_13(self, value: str, locale: str) -> str:
timestamp = isoparse(value) timestamp = isoparse(value)
# precision: minute # precision: minute
return ( return (
@@ -383,11 +387,11 @@ class WDDateAttribute(WDAttribute):
.replace("{1}", format_date(timestamp, "short", locale=locale)) .replace("{1}", format_date(timestamp, "short", locale=locale))
) )
def format_14(self, value, locale): def format_14(self, value: str, locale: str) -> str:
# precision: second. # precision: second.
return format_datetime(isoparse(value), format="full", locale=locale) return format_datetime(isoparse(value), format="full", locale=locale)
DATE_FORMAT = { DATE_FORMAT: dict[str, tuple[str, int]] = {
"0": ("format_8", 1000000000), "0": ("format_8", 1000000000),
"1": ("format_8", 100000000), "1": ("format_8", 100000000),
"2": ("format_8", 10000000), "2": ("format_8", 10000000),
@@ -405,15 +409,15 @@ class WDDateAttribute(WDAttribute):
"14": ("format_14", 0), # second "14": ("format_14", 0), # second
} }
def get_str(self, result, language): def get_str(self, result: dict[str, t.Any], language: str) -> str | None:
value = result.get(self.name) value: str | None = result.get(self.name)
if value == "" or value is None: if value == "" or value is None:
return None return None
precision = result.get(self.name + "timePrecision") _p: str = result.get(self.name + "timePrecision") or "1"
date_format = WDDateAttribute.DATE_FORMAT.get(precision) date_format = WDDateAttribute.DATE_FORMAT.get(_p)
if date_format is not None: if date_format is not None:
format_method = getattr(self, date_format[0]) format_method = getattr(self, date_format[0])
precision = date_format[1] precision: int = date_format[1]
try: try:
if precision >= 1: if precision >= 1:
_t = value.split("-") _t = value.split("-")
@@ -427,9 +431,25 @@ class WDDateAttribute(WDAttribute):
return value return value
WDAttrType = (
WDAttribute
| WDAmountAttribute
| WDArticle
| WDLabelAttribute
| WDURLAttribute
| WDGeoAttribute
| WDImageAttribute
| WDDateAttribute
)
WDAttrList = list[WDAttrType]
def get_headers() -> dict[str, str]: def get_headers() -> dict[str, str]:
# user agent: https://www.mediawiki.org/wiki/Wikidata_Query_Service/User_Manual#Query_limits # user agent: https://www.mediawiki.org/wiki/Wikidata_Query_Service/User_Manual#Query_limits
return {"Accept": "application/sparql-results+json", "User-Agent": searxng_useragent()} return {
"Accept": "application/sparql-results+json",
"User-Agent": f"wikidata engine - {searxng_useragent()}",
}
def get_label_for_entity(entity_id: str, language: str) -> str: def get_label_for_entity(entity_id: str, language: str) -> str:
@@ -445,7 +465,7 @@ def get_label_for_entity(entity_id: str, language: str) -> str:
return name return name
def send_wikidata_query(query: str, method="GET", **kwargs) -> dict[str, t.Any]: def send_wikidata_query(query: str, method: str = "GET", **kwargs: dict[str, t.Any]) -> dict[str, t.Any]:
if method == "GET": if method == "GET":
# query will be cached by wikidata # query will be cached by wikidata
http_response = get(SPARQL_ENDPOINT_URL + "?" + urlencode({"query": query}), headers=get_headers(), **kwargs) http_response = get(SPARQL_ENDPOINT_URL + "?" + urlencode({"query": query}), headers=get_headers(), **kwargs)
@@ -461,15 +481,17 @@ def send_wikidata_query(query: str, method="GET", **kwargs) -> dict[str, t.Any]:
def request(query: str, params: "OnlineParams") -> None: def request(query: str, params: "OnlineParams") -> None:
attributes: tuple[str, list[WDAttribute | WDAmountAttribute | WDLabelAttribute | WDImageAttribute]] attributes: WDAttrList
eng_tag, _wiki_netloc = get_wiki_params(params["searxng_locale"], traits) eng_tag, _wiki_netloc = get_wiki_params(params["searxng_locale"], traits)
query, attributes = get_query(query, eng_tag) query, attributes = get_query(query, eng_tag or "en")
logger.debug("request --> language %s // len(attributes): %s", eng_tag, len(attributes)) logger.debug("request --> language %s // len(attributes): %s", eng_tag, len(attributes))
params["method"] = "POST" params["method"] = "POST"
params["url"] = SPARQL_ENDPOINT_URL params["url"] = SPARQL_ENDPOINT_URL
params["data"] = {"query": query} params["data"] = {"query": query}
params["headers"] = get_headers() params["headers"] = get_headers()
# additional parameters (not a part of OnlineParams)
params["language"] = eng_tag # type: ignore params["language"] = eng_tag # type: ignore
params["attributes"] = attributes # type: ignore params["attributes"] = attributes # type: ignore
@@ -479,14 +501,16 @@ def response(resp: "SXNG_Response") -> list[dict[str, t.Any]]:
results: list[dict[str, t.Any]] = [] results: list[dict[str, t.Any]] = []
jsonresponse = loads(resp.content.decode()) jsonresponse = loads(resp.content.decode())
# additional parameters ..
language: str = resp.search_params["language"] # type: ignore language: str = resp.search_params["language"] # type: ignore
attributes = resp.search_params["attributes"] # type: ignore attributes: WDAttrList = resp.search_params["attributes"] # type: ignore
logger.debug("request --> language %s // len(attributes): %s", language, len(attributes)) logger.debug("request --> language %s // len(attributes): %s", language, len(attributes))
seen_entities: set[str] = set() seen_entities: set[str] = set()
for result in jsonresponse.get("results", {}).get("bindings", []): for result in jsonresponse.get("results", {}).get("bindings", []):
attribute_result = {key: value["value"] for key, value in result.items()} attribute_result = {key: value["value"] for key, value in result.items()}
entity_url = attribute_result["item"] entity_url: str = attribute_result["item"]
if entity_url not in seen_entities and entity_url not in DUMMY_ENTITY_URLS: if entity_url not in seen_entities and entity_url not in DUMMY_ENTITY_URLS:
seen_entities.add(entity_url) seen_entities.add(entity_url)
results += get_results(attribute_result, attributes, language) results += get_results(attribute_result, attributes, language)
@@ -500,7 +524,7 @@ _IMG_SRC_DEFAULT_URL_PREFIX = "https://commons.wikimedia.org/wiki/Special:FilePa
_IMG_SRC_NEW_URL_PREFIX = "https://upload.wikimedia.org/wikipedia/commons/thumb/" _IMG_SRC_NEW_URL_PREFIX = "https://upload.wikimedia.org/wikipedia/commons/thumb/"
def get_thumbnail(img_src: str) -> str: def get_thumbnail(img_src: str | None) -> str | None:
"""Get Thumbnail image from wikimedia commons """Get Thumbnail image from wikimedia commons
Images from commons.wikimedia.org are (HTTP) redirected to Images from commons.wikimedia.org are (HTTP) redirected to
@@ -539,53 +563,58 @@ def get_thumbnail(img_src: str) -> str:
return img_src return img_src
def get_results(attribute_result: dict[str, t.Any], attributes, language): def get_results(
attribute_result: dict[str, t.Any],
attributes: WDAttrList,
language: str,
):
# pylint: disable=too-many-branches # pylint: disable=too-many-branches
results = [] results: list[dict[str, t.Any]] = []
infobox_title = attribute_result.get("itemLabel") infobox_title: str = attribute_result.get("itemLabel") # pyright: ignore[reportAssignmentType]
infobox_id = attribute_result["item"] infobox_id = attribute_result["item"]
infobox_id_lang = None infobox_id_lang: str | None = None
infobox_urls = [] infobox_urls: list[dict[str, str]] = []
infobox_attributes = [] infobox_attributes: list[dict[str, str]] = []
infobox_content = attribute_result.get("itemDescription", []) infobox_content = attribute_result.get("itemDescription", [])
img_src = None img_src: str | None = None
img_src_priority = 0 img_src_priority = 0
for attribute in attributes: for attribute in attributes:
value = attribute.get_str(attribute_result, language) value: str | None = attribute.get_str(attribute_result, language)
if value is not None and value != "": if value is not None and value != "":
attribute_type = type(attribute)
if attribute_type in (WDURLAttribute, WDArticle): if isinstance(attribute, (WDURLAttribute, WDArticle)):
# get_select() method : there is group_concat(distinct ...;separator=", ") # get_select() method : there is group_concat(distinct ...;separator=", ")
# split the value here # split the value here
for url in value.split(", "): for url in value.split(", "):
infobox_urls.append({"title": attribute.get_label(language), "url": url, **attribute.kwargs}) infobox_urls.append({"title": attribute.get_label(language), "url": url, **attribute.kwargs})
# "normal" results (not infobox) include official website and Wikipedia links. # "normal" results (not infobox) include official website and Wikipedia links.
if "list" in display_type and (attribute.kwargs.get("official") or attribute_type == WDArticle): if "list" in display_type and (
attribute.kwargs.get("official") or isinstance(attribute, WDArticle)
):
results.append({"title": infobox_title, "url": url, "content": infobox_content}) results.append({"title": infobox_title, "url": url, "content": infobox_content})
# update the infobox_id with the wikipedia URL # update the infobox_id with the wikipedia URL
# first the local wikipedia URL, and as fallback the english wikipedia URL # first the local wikipedia URL, and as fallback the english wikipedia URL
if attribute_type == WDArticle and ( if isinstance(attribute, WDArticle) and (
(attribute.language == "en" and infobox_id_lang is None) or attribute.language != "en" (attribute.language == "en" and infobox_id_lang is None) or attribute.language != "en"
): ):
infobox_id_lang = attribute.language infobox_id_lang = attribute.language
infobox_id = url infobox_id = url
elif attribute_type == WDImageAttribute: elif isinstance(attribute, WDImageAttribute):
# this attribute is an image. # this attribute is an image.
# replace the current image only the priority is lower # replace the current image only the priority is lower
# (the infobox contain only one image). # (the infobox contain only one image).
if attribute.priority > img_src_priority: if attribute.priority > img_src_priority:
img_src = get_thumbnail(value) img_src = get_thumbnail(value)
img_src_priority = attribute.priority img_src_priority = attribute.priority
elif attribute_type == WDGeoAttribute: elif isinstance(attribute, WDGeoAttribute):
# geocoordinate link # geocoordinate link
# use the area to get the OSM zoom # use the area to get the OSM zoom
# Note: ignore the unit (must be km² otherwise the calculation is wrong) # Note: ignore the unit (must be km² otherwise the calculation is wrong)
# Should use normalized value p:P2046/psn:P2046/wikibase:quantityAmount # Should use normalized value p:P2046/psn:P2046/wikibase:quantityAmount
area = attribute_result.get("P2046") area = attribute_result.get("P2046")
osm_zoom = area_to_osm_zoom(area) if area else 19 osm_zoom: int = area_to_osm_zoom(area) if area else 19
url = attribute.get_geo_url(attribute_result, osm_zoom=osm_zoom) url = attribute.get_geo_url(attribute_result, osm_zoom=osm_zoom)
if url: if url:
infobox_urls.append({"title": attribute.get_label(language), "url": url, "entity": attribute.name}) infobox_urls.append({"title": attribute.get_label(language), "url": url, "entity": attribute.name})
@@ -622,9 +651,7 @@ def get_results(attribute_result: dict[str, t.Any], attributes, language):
return results return results
def get_query( def get_query(query: str, language: str) -> tuple[str, WDAttrList]:
query: str, language: str
) -> tuple[str, list[WDAttribute | WDAmountAttribute | WDLabelAttribute | WDImageAttribute]]:
attributes = get_attributes(language) attributes = get_attributes(language)
select = [a.get_select() for a in attributes] select = [a.get_select() for a in attributes]
where = list(filter(lambda s: len(s) > 0, [a.get_where() for a in attributes])) where = list(filter(lambda s: len(s) > 0, [a.get_where() for a in attributes]))
@@ -643,7 +670,7 @@ def get_query(
def get_attributes(language: str): def get_attributes(language: str):
# pylint: disable=too-many-statements # pylint: disable=too-many-statements
attributes: list[WDAttribute | WDAmountAttribute | WDLabelAttribute | WDImageAttribute] = [] attributes: WDAttrList = []
def add_value(name: str): def add_value(name: str):
attributes.append(WDAttribute(name)) attributes.append(WDAttribute(name))
@@ -654,7 +681,7 @@ def get_attributes(language: str):
def add_label(name: str): def add_label(name: str):
attributes.append(WDLabelAttribute(name)) attributes.append(WDLabelAttribute(name))
def add_url(name: str, url_id: str | None = None, url_path_prefix: str | None = None, **kwargs): def add_url(name: str, url_id: str | None = None, url_path_prefix: str | None = None, **kwargs: dict[str, t.Any]):
attributes.append(WDURLAttribute(name, url_id, url_path_prefix, kwargs)) attributes.append(WDURLAttribute(name, url_id, url_path_prefix, kwargs))
def add_image(name: str, url_id: str | None = None, priority: int = 1): def add_image(name: str, url_id: str | None = None, priority: int = 1):
@@ -749,7 +776,8 @@ def get_attributes(language: str):
add_value("P498") # currency code (ISO 4217) add_value("P498") # currency code (ISO 4217)
# URL # URL
add_url("P856", official=True) # official website kwargs: dict[str, t.Any] = {"official": True}
add_url("P856", **kwargs) # official website
attributes.append(WDArticle(language)) # wikipedia (user language) attributes.append(WDArticle(language)) # wikipedia (user language)
if not language.startswith("en"): if not language.startswith("en"):
attributes.append(WDArticle("en")) # wikipedia (english) attributes.append(WDArticle("en")) # wikipedia (english)
@@ -796,7 +824,19 @@ def debug_explain_wikidata_query(query: str, method: str = "GET"):
return http_response.content return http_response.content
def init(engine_settings=None): # pylint: disable=unused-argument def init(_):
global CACHE # pylint: disable=global-statement
CACHE = EngineCache("wikidata")
init_wikidata_properties()
def init_wikidata_properties():
global WIKIDATA_PROPERTIES # pylint: disable=global-statement
p: WDPType = CACHE.get(key="WIKIDATA_PROPERTIES")
if p:
WIKIDATA_PROPERTIES = p
return
# WIKIDATA_PROPERTIES : add unit symbols # WIKIDATA_PROPERTIES : add unit symbols
for k, v in WIKIDATA_UNITS.items(): for k, v in WIKIDATA_UNITS.items():
WIKIDATA_PROPERTIES[k] = v["symbol"] WIKIDATA_PROPERTIES[k] = v["symbol"]
@@ -808,7 +848,8 @@ def init(engine_settings=None): # pylint: disable=unused-argument
if attribute.name not in WIKIDATA_PROPERTIES: if attribute.name not in WIKIDATA_PROPERTIES:
wikidata_property_names.append("wd:" + attribute.name) wikidata_property_names.append("wd:" + attribute.name)
query = QUERY_PROPERTY_NAMES.replace("%ATTRIBUTES%", " ".join(wikidata_property_names)) query = QUERY_PROPERTY_NAMES.replace("%ATTRIBUTES%", " ".join(wikidata_property_names))
jsonresponse = send_wikidata_query(query, timeout=20) kwargs: dict[str, t.Any] = {"timeout": 20}
jsonresponse = send_wikidata_query(query, **kwargs)
for result in jsonresponse.get("results", {}).get("bindings", {}): for result in jsonresponse.get("results", {}).get("bindings", {}):
name_field = result.get("name") name_field = result.get("name")
if not name_field: if not name_field:
@@ -818,6 +859,8 @@ def init(engine_settings=None): # pylint: disable=unused-argument
entity_id = result["item"]["value"].replace("http://www.wikidata.org/entity/", "") entity_id = result["item"]["value"].replace("http://www.wikidata.org/entity/", "")
WIKIDATA_PROPERTIES[(entity_id, lang)] = name.capitalize() WIKIDATA_PROPERTIES[(entity_id, lang)] = name.capitalize()
CACHE.set(key="WIKIDATA_PROPERTIES", value=WIKIDATA_PROPERTIES)
def fetch_traits(engine_traits: EngineTraits): def fetch_traits(engine_traits: EngineTraits):
"""Uses languages evaluated from :py:obj:`wikipedia.fetch_wikimedia_traits """Uses languages evaluated from :py:obj:`wikipedia.fetch_wikimedia_traits
+3 -3
View File
@@ -16,12 +16,12 @@ IMDB_PREFIX_TO_URL_ID = {
HTTP_WIKIMEDIA_IMAGE = 'http://commons.wikimedia.org/wiki/Special:FilePath/' HTTP_WIKIMEDIA_IMAGE = 'http://commons.wikimedia.org/wiki/Special:FilePath/'
def get_imdb_url_id(imdb_item_id): def get_imdb_url_id(imdb_item_id: str):
id_prefix = imdb_item_id[:2] id_prefix = imdb_item_id[:2]
return IMDB_PREFIX_TO_URL_ID.get(id_prefix) return IMDB_PREFIX_TO_URL_ID.get(id_prefix)
def get_wikimedia_image_id(url): def get_wikimedia_image_id(url: str):
if url.startswith(HTTP_WIKIMEDIA_IMAGE): if url.startswith(HTTP_WIKIMEDIA_IMAGE):
return url[len(HTTP_WIKIMEDIA_IMAGE) :] return url[len(HTTP_WIKIMEDIA_IMAGE) :]
if url.startswith('File:'): if url.startswith('File:'):
@@ -29,7 +29,7 @@ def get_wikimedia_image_id(url):
return url return url
def get_external_url(url_id, item_id, alternative="default"): def get_external_url(url_id: str, item_id: str | None, alternative: str = "default") -> str | None:
"""Return an external URL or None if url_id is not found. """Return an external URL or None if url_id is not found.
url_id can take value from data/external_urls.json url_id can take value from data/external_urls.json