diff --git a/searx/enginelib/traits.py b/searx/enginelib/traits.py index 8d700406d..257734dd9 100644 --- a/searx/enginelib/traits.py +++ b/searx/enginelib/traits.py @@ -9,15 +9,14 @@ To load traits from the persistence :py:obj:`EngineTraitsMap.from_data` can be used. """ - -import os -import json import dataclasses +import json +import pathlib import types import typing as t -import pathlib + from searx import locales -from searx.data import data_dir, ENGINE_TRAITS +from searx.data import ENGINE_TRAITS, data_dir if t.TYPE_CHECKING: from . import Engine @@ -77,7 +76,7 @@ class EngineTraits: language"). """ - data_type: t.Literal['traits_v1'] = 'traits_v1' + data_type: t.Literal["traits_v1"] = "traits_v1" """Data type, default is 'traits_v1'. """ @@ -97,7 +96,7 @@ class EngineTraits: :py:obj:`searx.locales.get_engine_locale`. Except for the special value ``all`` which is determined from :py:obj:`EngineTraits.all_locale`. """ - if searxng_locale == 'all' and self.all_locale is not None: + if searxng_locale == "all" and self.all_locale is not None: return self.all_locale return locales.get_engine_locale(searxng_locale, self.languages, default=default) @@ -113,7 +112,7 @@ class EngineTraits: :py:obj:`searx.locales.get_engine_locale`. Except for the special value ``all`` which is determined from :py:obj:`EngineTraits.all_locale`. """ - if searxng_locale == 'all' and self.all_locale is not None: + if searxng_locale == "all" and self.all_locale is not None: return self.all_locale return locales.get_engine_locale(searxng_locale, self.regions, default=default) @@ -125,10 +124,10 @@ class EngineTraits: For verification the functions :py:func:`EngineTraits.get_region` and :py:func:`EngineTraits.get_language` are used. """ - if self.data_type == 'traits_v1': + if self.data_type == "traits_v1": return bool(self.get_region(searxng_locale) or self.get_language(searxng_locale)) - raise TypeError('engine traits of type %s is unknown' % self.data_type) + raise TypeError("engine traits of type %s is unknown" % self.data_type) def copy(self): """Create a copy of the dataclass object.""" @@ -141,7 +140,7 @@ class EngineTraits: function does not exists, ``None`` is returned. """ - fetch_traits = getattr(engine, 'fetch_traits', None) + fetch_traits = getattr(engine, "fetch_traits", None) engine_traits = None if fetch_traits: @@ -155,10 +154,10 @@ class EngineTraits: :param engine: engine instance build by :py:func:`searx.engines.load_engine` """ - if self.data_type == 'traits_v1': + if self.data_type == "traits_v1": self._set_traits_v1(engine) else: - raise TypeError('engine traits of type %s is unknown' % self.data_type) + raise TypeError("engine traits of type %s is unknown" % self.data_type) def _set_traits_v1(self, engine: "Engine | types.ModuleType") -> None: # For an engine, when there is `language: ...` in the YAML settings the engine @@ -174,15 +173,15 @@ class EngineTraits: _msg = "settings.yml - engine: '%s' / %s: '%s' not supported" languages = traits.languages - if hasattr(engine, 'language'): + if hasattr(engine, "language"): if engine.language not in languages: - raise ValueError(_msg % (engine.name, 'language', engine.language)) + raise ValueError(_msg % (engine.name, "language", engine.language)) traits.languages = {engine.language: languages[engine.language]} regions = traits.regions - if hasattr(engine, 'region'): + if hasattr(engine, "region"): if engine.region not in regions: - raise ValueError(_msg % (engine.name, 'region', engine.region)) + raise ValueError(_msg % (engine.name, "region", engine.region)) traits.regions = {engine.region: regions[engine.region]} engine.language_support = bool(traits.languages or traits.regions) @@ -194,16 +193,16 @@ class EngineTraits: class EngineTraitsMap(dict[str, EngineTraits]): """A python dictionary to map :class:`EngineTraits` by engine name.""" - ENGINE_TRAITS_FILE: pathlib.Path = (data_dir / 'engine_traits.json').resolve() + ENGINE_TRAITS_FILE: pathlib.Path = (data_dir / "engine_traits.json").resolve() """File with persistence of the :py:obj:`EngineTraitsMap`.""" def save_data(self): """Store EngineTraitsMap in in file :py:obj:`self.ENGINE_TRAITS_FILE`""" - with open(self.ENGINE_TRAITS_FILE, 'w', encoding='utf-8') as f: + with open(self.ENGINE_TRAITS_FILE, "w", encoding="utf-8") as f: json.dump(self, f, indent=2, sort_keys=True, cls=EngineTraitsEncoder) @classmethod - def from_data(cls) -> 'EngineTraitsMap': + def from_data(cls) -> "EngineTraitsMap": """Instantiate :class:`EngineTraitsMap` object from :py:obj:`ENGINE_TRAITS`""" obj = cls() for k, v in ENGINE_TRAITS.items(): @@ -211,8 +210,10 @@ class EngineTraitsMap(dict[str, EngineTraits]): return obj @classmethod - def fetch_traits(cls, log: t.Callable[[str], None]) -> 'EngineTraitsMap': - from searx import engines # pylint: disable=cyclic-import, import-outside-toplevel + def fetch_traits(cls, log: t.Callable[[str], None]) -> "EngineTraitsMap": + from searx import ( # pylint: disable=cyclic-import, import-outside-toplevel + engines, + ) names = list(engines.engines) names.sort() @@ -226,13 +227,13 @@ class EngineTraitsMap(dict[str, EngineTraits]): try: traits = EngineTraits.fetch_traits(engine) except Exception as exc: - log("FATAL: while fetch_traits %s: %s" % (engine_name, exc)) - if os.environ.get('FORCE', '').lower() not in ['on', 'true', '1']: - raise + log("ERROR: while fetch_traits %s: %s" % (engine_name, exc)) v = ENGINE_TRAITS.get(engine_name) if v: - log("FORCE: re-use old values from fetch_traits - ENGINE_TRAITS[%s]" % engine_name) + log("WARNING: re-use old values from fetch_traits - ENGINE_TRAITS[%s]" % engine_name) traits = EngineTraits(**v) + else: + log("WARNING: no old values available for ENGINE_TRAITS[%s], skipping" % engine_name) if traits is not None: log("%-20s: SearXNG languages --> %s " % (engine_name, len(traits.languages))) @@ -247,7 +248,7 @@ class EngineTraitsMap(dict[str, EngineTraits]): :param engine: engine instance build by :py:func:`searx.engines.load_engine` """ - engine_traits = EngineTraits(data_type='traits_v1') + engine_traits = EngineTraits(data_type="traits_v1") if engine.name in self.keys(): engine_traits = self[engine.name] diff --git a/searx/engines/annas_archive.py b/searx/engines/annas_archive.py index a49b97f3e..d2df1040c 100644 --- a/searx/engines/annas_archive.py +++ b/searx/engines/annas_archive.py @@ -250,9 +250,10 @@ def fetch_traits(engine_traits: EngineTraits) -> None: engine_traits.custom["ext"] = [] engine_traits.custom["sort"] = [] - resp = get(_get_base_url_choice() + "/search") + resp = get(_get_base_url_choice() + "/search", timeout=5) if not resp.ok: - raise RuntimeError("Response from Anna's search page is not OK.") + raise RuntimeError("Response from Anna's Archive is not OK.") + dom = html.fromstring(resp.text) # supported language codes diff --git a/searx/engines/archlinux.py b/searx/engines/archlinux.py index 9e3adf154..4ccc6ab48 100644 --- a/searx/engines/archlinux.py +++ b/searx/engines/archlinux.py @@ -9,55 +9,60 @@ Arch Wiki blocks access to it. """ from urllib.parse import urlencode, urljoin, urlparse -import lxml -import babel -from searx.utils import extract_text, eval_xpath_list, eval_xpath_getindex, searxng_useragent +import babel +import lxml + from searx.enginelib.traits import EngineTraits from searx.locales import language_tag - +from searx.utils import ( + eval_xpath_getindex, + eval_xpath_list, + extract_text, + searxng_useragent, +) about = { - "website": 'https://wiki.archlinux.org/', - "wikidata_id": 'Q101445877', + "website": "https://wiki.archlinux.org/", + "wikidata_id": "Q101445877", "official_api_documentation": None, "use_official_api": False, "require_api_key": False, - "results": 'HTML', + "results": "HTML", } # engine dependent config -categories = ['it', 'software wikis'] +categories = ["it", "software wikis"] paging = True -main_wiki = 'wiki.archlinux.org' +main_wiki = "wiki.archlinux.org" def request(query, params): - sxng_lang = params['searxng_locale'].split('-')[0] - netloc: str = traits.custom['wiki_netloc'].get(sxng_lang, main_wiki) # type: ignore - title: str = traits.custom['title'].get(sxng_lang, 'Special:Search') # type: ignore - base_url = 'https://' + netloc + '/index.php?' - offset = (params['pageno'] - 1) * 20 + sxng_lang = params["searxng_locale"].split("-")[0] + netloc: str = traits.custom["wiki_netloc"].get(sxng_lang, main_wiki) # type: ignore + title: str = traits.custom["title"].get(sxng_lang, "Special:Search") # type: ignore + base_url = "https://" + netloc + "/index.php?" + offset = (params["pageno"] - 1) * 20 if netloc == main_wiki: - eng_lang: str = traits.get_language(sxng_lang, 'English') # type: ignore - query += ' (' + eng_lang + ')' + eng_lang: str = traits.get_language(sxng_lang, "English") # type: ignore + query += " (" + eng_lang + ")" # wiki.archlinux.org is protected by anubis # - https://github.com/searxng/searxng/issues/4646#issuecomment-2817848019 - params['headers']['User-Agent'] = searxng_useragent() - elif netloc == 'wiki.archlinuxcn.org': - base_url = 'https://' + netloc + '/wzh/index.php?' + params["headers"]["User-Agent"] = searxng_useragent() + elif netloc == "wiki.archlinuxcn.org": + base_url = "https://" + netloc + "/wzh/index.php?" args = { - 'search': query, - 'title': title, - 'limit': 20, - 'offset': offset, - 'profile': 'default', + "search": query, + "title": title, + "limit": 20, + "offset": offset, + "profile": "default", } - params['url'] = base_url + urlencode(args) + params["url"] = base_url + urlencode(args) return params @@ -67,18 +72,18 @@ def response(resp): dom = lxml.html.fromstring(resp.text) # type: ignore # get the base URL for the language in which request was made - sxng_lang = resp.search_params['searxng_locale'].split('-')[0] - netloc: str = traits.custom['wiki_netloc'].get(sxng_lang, main_wiki) # type: ignore - base_url = 'https://' + netloc + '/index.php?' + sxng_lang = resp.search_params["searxng_locale"].split("-")[0] + netloc: str = traits.custom["wiki_netloc"].get(sxng_lang, main_wiki) # type: ignore + base_url = "https://" + netloc + "/index.php?" for result in eval_xpath_list(dom, '//ul[@class="mw-search-results"]/li'): link = eval_xpath_getindex(result, './/div[@class="mw-search-result-heading"]/a', 0) content = extract_text(result.xpath('.//div[@class="searchresult"]')) results.append( { - 'url': urljoin(base_url, link.get('href')), # type: ignore - 'title': extract_text(link), - 'content': content, + "url": urljoin(base_url, link.get("href")), # type: ignore + "title": extract_text(link), + "content": content, } ) @@ -108,39 +113,39 @@ def fetch_traits(engine_traits: EngineTraits): """ # pylint: disable=import-outside-toplevel + from searx.network import get # see https://github.com/searxng/searxng/issues/762 - engine_traits.custom['wiki_netloc'] = {} - engine_traits.custom['title'] = {} + engine_traits.custom["wiki_netloc"] = {} + engine_traits.custom["title"] = {} title_map = { - 'de': 'Spezial:Suche', - 'fa': 'ویژه:جستجو', - 'ja': '特別:検索', - 'zh': 'Special:搜索', + "de": "Spezial:Suche", + "fa": "ویژه:جستجو", + "ja": "特別:検索", + "zh": "Special:搜索", } - resp = get('https://wiki.archlinux.org/', timeout=3) - if not resp.ok: # type: ignore - print("ERROR: response from wiki.archlinux.org is not OK.") + resp = get("https://wiki.archlinux.org/", timeout=5) + if not resp.ok: + raise RuntimeError("Response from Arch Linux Wiki is not OK.") dom = lxml.html.fromstring(resp.text) # type: ignore for a in eval_xpath_list(dom, "//a[@class='interlanguage-link-target']"): - - sxng_tag = language_tag(babel.Locale.parse(a.get('lang'), sep='-')) + sxng_tag = language_tag(babel.Locale.parse(a.get("lang"), sep="-")) # zh_Hans --> zh - sxng_tag = sxng_tag.split('_')[0] + sxng_tag = sxng_tag.split("_")[0] - netloc = urlparse(a.get('href')).netloc - if netloc != 'wiki.archlinux.org': + netloc = urlparse(a.get("href")).netloc + if netloc != "wiki.archlinux.org": title = title_map.get(sxng_tag) if not title: print("ERROR: title tag from %s (%s) is unknown" % (netloc, sxng_tag)) continue - engine_traits.custom['wiki_netloc'][sxng_tag] = netloc - engine_traits.custom['title'][sxng_tag] = title # type: ignore + engine_traits.custom["wiki_netloc"][sxng_tag] = netloc + engine_traits.custom["title"][sxng_tag] = title # type: ignore eng_tag = extract_text(eval_xpath_list(a, ".//span")) engine_traits.languages[sxng_tag] = eng_tag # type: ignore - engine_traits.languages['en'] = 'English' + engine_traits.languages["en"] = "English" diff --git a/searx/engines/bing.py b/searx/engines/bing.py index 4a1934a2d..dd5ce4beb 100644 --- a/searx/engines/bing.py +++ b/searx/engines/bing.py @@ -30,26 +30,27 @@ import base64 import re import time from urllib.parse import parse_qs, urlencode, urlparse -from lxml import html + import babel import babel.languages +from lxml import html -from searx.utils import eval_xpath, extract_text, eval_xpath_list, eval_xpath_getindex -from searx.locales import language_tag, region_tag from searx.enginelib.traits import EngineTraits from searx.exceptions import SearxEngineAPIException +from searx.locales import language_tag, region_tag +from searx.utils import eval_xpath, eval_xpath_getindex, eval_xpath_list, extract_text about = { - "website": 'https://www.bing.com', - "wikidata_id": 'Q182496', - "official_api_documentation": 'https://www.microsoft.com/en-us/bing/apis/bing-web-search-api', + "website": "https://www.bing.com", + "wikidata_id": "Q182496", + "official_api_documentation": "https://www.microsoft.com/en-us/bing/apis/bing-web-search-api", "use_official_api": False, "require_api_key": False, - "results": 'HTML', + "results": "HTML", } # engine dependent config -categories = ['general', 'web'] +categories = ["general", "web"] paging = True max_page = 200 """200 pages maximum (``&first=1991``)""" @@ -60,7 +61,7 @@ safesearch = True verification by a cookie is needed / thats not possible in SearXNG. """ -base_url = 'https://www.bing.com/search' +base_url = "https://www.bing.com/search" """Bing (Web) search URL""" @@ -69,25 +70,25 @@ def _page_offset(pageno): def set_bing_cookies(params, engine_language, engine_region): - params['cookies']['_EDGE_CD'] = f'm={engine_region}&u={engine_language}' - params['cookies']['_EDGE_S'] = f'mkt={engine_region}&ui={engine_language}' - logger.debug("bing cookies: %s", params['cookies']) + params["cookies"]["_EDGE_CD"] = f"m={engine_region}&u={engine_language}" + params["cookies"]["_EDGE_S"] = f"mkt={engine_region}&ui={engine_language}" + logger.debug("bing cookies: %s", params["cookies"]) def request(query, params): """Assemble a Bing-Web request.""" - engine_region = traits.get_region(params['searxng_locale'], traits.all_locale) # type: ignore - engine_language = traits.get_language(params['searxng_locale'], 'en') # type: ignore + engine_region = traits.get_region(params["searxng_locale"], traits.all_locale) # type: ignore + engine_language = traits.get_language(params["searxng_locale"], "en") # type: ignore set_bing_cookies(params, engine_language, engine_region) - page = params.get('pageno', 1) + page = params.get("pageno", 1) query_params = { - 'q': query, + "q": query, # if arg 'pq' is missed, sometimes on page 4 we get results from page 1, # don't ask why it is only sometimes / its M$ and they have never been # deterministic ;) - 'pq': query, + "pq": query, } # To get correct page, arg first and this arg FORM is needed, the value PERE @@ -95,22 +96,27 @@ def request(query, params): # The 'first' arg should never send on page 1. if page > 1: - query_params['first'] = _page_offset(page) # see also arg FORM + query_params["first"] = _page_offset(page) # see also arg FORM if page == 2: - query_params['FORM'] = 'PERE' + query_params["FORM"] = "PERE" elif page > 2: - query_params['FORM'] = 'PERE%s' % (page - 2) + query_params["FORM"] = "PERE%s" % (page - 2) - params['url'] = f'{base_url}?{urlencode(query_params)}' + params["url"] = f"{base_url}?{urlencode(query_params)}" - if params.get('time_range'): + if params.get("time_range"): unix_day = int(time.time() / 86400) - time_ranges = {'day': '1', 'week': '2', 'month': '3', 'year': f'5_{unix_day-365}_{unix_day}'} - params['url'] += f'&filters=ex1:"ez{time_ranges[params["time_range"]]}"' + time_ranges = { + "day": "1", + "week": "2", + "month": "3", + "year": f"5_{unix_day - 365}_{unix_day}", + } + params["url"] += f'&filters=ex1:"ez{time_ranges[params["time_range"]]}"' # in some regions where geoblocking is employed (e.g. China), # www.bing.com redirects to the regional version of Bing - params['allow_redirects'] = True + params["allow_redirects"] = True return params @@ -126,14 +132,13 @@ def response(resp): # parse results again if nothing is found yet for result in eval_xpath_list(dom, '//ol[@id="b_results"]/li[contains(@class, "b_algo")]'): - - link = eval_xpath_getindex(result, './/h2/a', 0, None) + link = eval_xpath_getindex(result, ".//h2/a", 0, None) if link is None: continue - url = link.attrib.get('href') + url = link.attrib.get("href") title = extract_text(link) - content = eval_xpath(result, './/p') + content = eval_xpath(result, ".//p") for p in content: # Make sure that the element is free of: # Web @@ -142,7 +147,7 @@ def response(resp): content = extract_text(content) # get the real URL - if url.startswith('https://www.bing.com/ck/a?'): + if url.startswith("https://www.bing.com/ck/a?"): # get the first value of u parameter url_query = urlparse(url).query parsed_url_query = parse_qs(url_query) @@ -150,23 +155,23 @@ def response(resp): # remove "a1" in front encoded_url = param_u[2:] # add padding - encoded_url = encoded_url + '=' * (-len(encoded_url) % 4) + encoded_url = encoded_url + "=" * (-len(encoded_url) % 4) # decode base64 encoded URL url = base64.urlsafe_b64decode(encoded_url).decode() # append result - results.append({'url': url, 'title': title, 'content': content}) + results.append({"url": url, "title": title, "content": content}) # get number_of_results if results: result_len_container = "".join(eval_xpath(dom, '//span[@class="sb_count"]//text()')) if "-" in result_len_container: - start_str, result_len_container = re.split(r'-\d+', result_len_container) + start_str, result_len_container = re.split(r"-\d+", result_len_container) start = int(start_str) else: start = 1 - result_len_container = re.sub('[^0-9]', '', result_len_container) + result_len_container = re.sub("[^0-9]", "", result_len_container) if len(result_len_container) > 0: result_len = int(result_len_container) @@ -186,7 +191,7 @@ def response(resp): msg = f"Expected results to start at {expected_start}, but got results starting at {start}" raise SearxEngineAPIException(msg) - results.append({'number_of_results': result_len}) + results.append({"number_of_results": result_len}) return results @@ -208,28 +213,28 @@ def fetch_traits(engine_traits: EngineTraits): "Cache-Control": "max-age=0", } - resp = get("https://www.bing.com/account/general", headers=headers) - if not resp.ok: # type: ignore - print("ERROR: response from bing is not OK.") + resp = get("https://www.bing.com/account/general", headers=headers, timeout=5) + if not resp.ok: + raise RuntimeError("Response from Bing is not OK.") - dom = html.fromstring(resp.text) # type: ignore + dom = html.fromstring(resp.text) # languages - engine_traits.languages['zh'] = 'zh-hans' + engine_traits.languages["zh"] = "zh-hans" - map_lang = {'prs': 'fa-AF', 'en': 'en-us'} + map_lang = {"prs": "fa-AF", "en": "en-us"} bing_ui_lang_map = { # HINT: this list probably needs to be supplemented - 'en': 'us', # en --> en-us - 'da': 'dk', # da --> da-dk + "en": "us", # en --> en-us + "da": "dk", # da --> da-dk } for href in eval_xpath(dom, '//div[@id="language-section-content"]//div[@class="languageItem"]/a/@href'): - eng_lang = parse_qs(urlparse(href).query)['setlang'][0] + eng_lang = parse_qs(urlparse(href).query)["setlang"][0] babel_lang = map_lang.get(eng_lang, eng_lang) try: - sxng_tag = language_tag(babel.Locale.parse(babel_lang.replace('-', '_'))) + sxng_tag = language_tag(babel.Locale.parse(babel_lang.replace("-", "_"))) except babel.UnknownLocaleError: print("ERROR: language (%s) is unknown by babel" % (babel_lang)) continue @@ -238,8 +243,8 @@ def fetch_traits(engine_traits: EngineTraits): # already a '-' delemitter in the language. For instance 'pt-PT' --> # 'pt-pt' and 'pt-br' --> 'pt-br' bing_ui_lang = eng_lang.lower() - if '-' not in bing_ui_lang: - bing_ui_lang = bing_ui_lang + '-' + bing_ui_lang_map.get(bing_ui_lang, bing_ui_lang) + if "-" not in bing_ui_lang: + bing_ui_lang = bing_ui_lang + "-" + bing_ui_lang_map.get(bing_ui_lang, bing_ui_lang) conflict = engine_traits.languages.get(sxng_tag) if conflict: @@ -250,14 +255,14 @@ def fetch_traits(engine_traits: EngineTraits): # regions (aka "market codes") - engine_traits.regions['zh-CN'] = 'zh-cn' + engine_traits.regions["zh-CN"] = "zh-cn" map_market_codes = { - 'zh-hk': 'en-hk', # not sure why, but at M$ this is the market code for Hongkong + "zh-hk": "en-hk", # not sure why, but at M$ this is the market code for Hongkong } for href in eval_xpath(dom, '//div[@id="region-section-content"]//div[@class="regionItem"]/a/@href'): - cc_tag = parse_qs(urlparse(href).query)['cc'][0] - if cc_tag == 'clear': + cc_tag = parse_qs(urlparse(href).query)["cc"][0] + if cc_tag == "clear": engine_traits.all_locale = cc_tag continue @@ -266,11 +271,11 @@ def fetch_traits(engine_traits: EngineTraits): if lang_tag not in engine_traits.languages.keys(): # print("ignore lang: %s <-- %s" % (cc_tag, lang_tag)) continue - lang_tag = lang_tag.split('_')[0] # zh_Hant --> zh + lang_tag = lang_tag.split("_")[0] # zh_Hant --> zh market_code = f"{lang_tag}-{cc_tag}" # zh-tw market_code = map_market_codes.get(market_code, market_code) - sxng_tag = region_tag(babel.Locale.parse('%s_%s' % (lang_tag, cc_tag.upper()))) + sxng_tag = region_tag(babel.Locale.parse("%s_%s" % (lang_tag, cc_tag.upper()))) conflict = engine_traits.regions.get(sxng_tag) if conflict: if conflict != market_code: diff --git a/searx/engines/brave.py b/searx/engines/brave.py index 9b716c843..dcc96609f 100644 --- a/searx/engines/brave.py +++ b/searx/engines/brave.py @@ -117,29 +117,28 @@ Implementations """ +import json import typing as t - from urllib.parse import ( urlencode, urlparse, ) -import json from dateutil import parser from lxml import html from searx import locales -from searx.utils import ( - extract_text, - eval_xpath_list, - eval_xpath_getindex, - js_obj_str_to_python, - js_obj_str_to_json_str, - get_embeded_stream_url, -) from searx.enginelib.traits import EngineTraits -from searx.result_types import EngineResults from searx.extended_types import SXNG_Response +from searx.result_types import EngineResults +from searx.utils import ( + eval_xpath_getindex, + eval_xpath_list, + extract_text, + get_embeded_stream_url, + js_obj_str_to_json_str, + js_obj_str_to_python, +) about = { "website": "https://search.brave.com/", @@ -264,10 +263,10 @@ def extract_json_data(text: str) -> dict[str, t.Any]: def response(resp: SXNG_Response) -> EngineResults: - if brave_category in ('search', 'goggles'): + if brave_category in ("search", "goggles"): return _parse_search(resp) - if brave_category in ('news'): + if brave_category in ("news"): return _parse_news(resp) # Example script source containing the data: @@ -277,11 +276,11 @@ def response(resp: SXNG_Response) -> EngineResults: # data: [{type:"data",data: .... ["q","goggles_id"],route:1,url:1}}] # ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ json_data: dict[str, t.Any] = extract_json_data(resp.text) - json_resp: dict[str, t.Any] = json_data['data'][1]["data"]['body']['response'] + json_resp: dict[str, t.Any] = json_data["data"][1]["data"]["body"]["response"] - if brave_category == 'images': + if brave_category == "images": return _parse_images(json_resp) - if brave_category == 'videos': + if brave_category == "videos": return _parse_videos(json_resp) raise ValueError(f"Unsupported brave category: {brave_category}") @@ -292,7 +291,6 @@ def _parse_search(resp: SXNG_Response) -> EngineResults: dom = html.fromstring(resp.text) for result in eval_xpath_list(dom, "//div[contains(@class, 'snippet ')]"): - url: str | None = eval_xpath_getindex(result, ".//a/@href", 0, default=None) title_tag = eval_xpath_getindex(result, ".//div[contains(@class, 'title')]", 0, default=None) if url is None or title_tag is None or not urlparse(url).netloc: # partial url likely means it's an ad @@ -304,7 +302,12 @@ def _parse_search(resp: SXNG_Response) -> EngineResults: # there are other classes like 'site-name-content' we don't want to match, # however only using contains(@class, 'content') would e.g. also match `site-name-content` # thus, we explicitly also require the spaces as class separator - _content = eval_xpath_getindex(result, ".//div[contains(concat(' ', @class, ' '), ' content ')]", 0, default="") + _content = eval_xpath_getindex( + result, + ".//div[contains(concat(' ', @class, ' '), ' content ')]", + 0, + default="", + ) if len(_content): content = extract_text(_content) # type: ignore _pub_date = extract_text( @@ -327,7 +330,10 @@ def _parse_search(resp: SXNG_Response) -> EngineResults: res.add(item) video_tag = eval_xpath_getindex( - result, ".//div[contains(@class, 'video-snippet') and @data-macro='video']", 0, default=[] + result, + ".//div[contains(@class, 'video-snippet') and @data-macro='video']", + 0, + default=[], ) if len(video_tag): # In my tests a video tag in the WEB search was most often not a @@ -338,7 +344,7 @@ def _parse_search(resp: SXNG_Response) -> EngineResults: item["template"] = "videos.html" for suggestion in eval_xpath_list(dom, "//a[contains(@class, 'related-query')]"): - res.append(res.types.LegacyResult({'suggestion': extract_text(suggestion)})) + res.append(res.types.LegacyResult({"suggestion": extract_text(suggestion)})) return res @@ -348,7 +354,6 @@ def _parse_news(resp: SXNG_Response) -> EngineResults: dom = html.fromstring(resp.text) for result in eval_xpath_list(dom, "//div[contains(@class, 'results')]//div[@data-type='news']"): - url = eval_xpath_getindex(result, ".//a[contains(@class, 'result-header')]/@href", 0, default=None) if url is None: continue @@ -417,23 +422,23 @@ def fetch_traits(engine_traits: EngineTraits): # pylint: disable=import-outside-toplevel, too-many-branches import babel.languages - from searx.locales import region_tag, language_tag + + from searx.locales import language_tag, region_tag from searx.network import get # see https://github.com/searxng/searxng/issues/762 engine_traits.custom["ui_lang"] = {} - lang_map = {'no': 'nb'} # norway + lang_map = {"no": "nb"} # norway # languages (UI) - resp = get('https://search.brave.com/settings') - + resp = get("https://search.brave.com/settings", timeout=5) if not resp.ok: - print("ERROR: response from Brave is not OK.") + raise RuntimeError("Response from Brave languages is not OK.") + dom = html.fromstring(resp.text) for option in dom.xpath("//section//option[@value='en-us']/../option"): - ui_lang = option.get("value") try: l = babel.Locale.parse(ui_lang, sep="-") @@ -441,9 +446,8 @@ def fetch_traits(engine_traits: EngineTraits): sxng_tag = region_tag(babel.Locale.parse(ui_lang, sep="-")) else: sxng_tag = language_tag(babel.Locale.parse(ui_lang, sep="-")) - except babel.UnknownLocaleError: - print("ERROR: can't determine babel locale of Brave's (UI) language %s" % ui_lang) + # silently ignore unknown languages continue conflict = engine_traits.custom["ui_lang"].get(sxng_tag) # type: ignore @@ -455,10 +459,12 @@ def fetch_traits(engine_traits: EngineTraits): # search regions of brave - resp = get("https://cdn.search.brave.com/serp/v2/_app/immutable/chunks/parameters.734c106a.js") - + resp = get( + "https://cdn.search.brave.com/serp/v2/_app/immutable/chunks/parameters.734c106a.js", + timeout=5, + ) if not resp.ok: - print("ERROR: response from Brave is not OK.") + raise RuntimeError("Response from Brave regions is not OK.") country_js = resp.text[resp.text.index("options:{all") + len("options:") :] country_js = country_js[: country_js.index("},k={default")] @@ -473,7 +479,11 @@ def fetch_traits(engine_traits: EngineTraits): # add official languages of the country .. for lang_tag in babel.languages.get_official_languages(country_tag, de_facto=True): lang_tag = lang_map.get(lang_tag, lang_tag) - sxng_tag = region_tag(babel.Locale.parse("%s_%s" % (lang_tag, country_tag.upper()))) + try: + sxng_tag = region_tag(babel.Locale.parse("%s_%s" % (lang_tag, country_tag.upper()))) + except babel.UnknownLocaleError: + # silently ignore unknown languages + continue # print("%-20s: %s <-- %s" % (v["label"], country_tag, sxng_tag)) conflict = engine_traits.regions.get(sxng_tag) diff --git a/searx/engines/dailymotion.py b/searx/engines/dailymotion.py index b625c082a..759aed99c 100644 --- a/searx/engines/dailymotion.py +++ b/searx/engines/dailymotion.py @@ -10,29 +10,33 @@ Dailymotion (Videos) """ +import time from datetime import datetime, timedelta from urllib.parse import urlencode -import time + import babel -from searx.network import get, raise_for_httperror # see https://github.com/searxng/searxng/issues/762 -from searx.utils import html_to_text -from searx.exceptions import SearxEngineAPIException -from searx.locales import region_tag, language_tag from searx.enginelib.traits import EngineTraits +from searx.exceptions import SearxEngineAPIException +from searx.locales import language_tag, region_tag +from searx.network import ( # see https://github.com/searxng/searxng/issues/762 + get, + raise_for_httperror, +) +from searx.utils import html_to_text # about about = { - "website": 'https://www.dailymotion.com', - "wikidata_id": 'Q769222', - "official_api_documentation": 'https://www.dailymotion.com/developer', + "website": "https://www.dailymotion.com", + "wikidata_id": "Q769222", + "official_api_documentation": "https://www.dailymotion.com/developer", "use_official_api": True, "require_api_key": False, - "results": 'JSON', + "results": "JSON", } # engine dependent config -categories = ['videos'] +categories = ["videos"] paging = True number_of_results = 10 @@ -46,8 +50,8 @@ time_delta_dict = { safesearch = True safesearch_params = { - 2: {'is_created_for_kids': 'true'}, - 1: {'is_created_for_kids': 'true'}, + 2: {"is_created_for_kids": "true"}, + 1: {"is_created_for_kids": "true"}, 0: {}, } """True if this video is "Created for Kids" / intends to target an audience @@ -55,9 +59,9 @@ under the age of 16 (``is_created_for_kids`` in `Video filters API`_ ) """ family_filter_map = { - 2: 'true', - 1: 'true', - 0: 'false', + 2: "true", + 1: "true", + 0: "false", } """By default, the family filter is turned on. Setting this parameter to ``false`` will stop filtering-out explicit content from searches and global @@ -65,21 +69,21 @@ contexts (``family_filter`` in `Global API Parameters`_ ). """ result_fields = [ - 'allow_embed', - 'description', - 'title', - 'created_time', - 'duration', - 'url', - 'thumbnail_360_url', - 'id', + "allow_embed", + "description", + "title", + "created_time", + "duration", + "url", + "thumbnail_360_url", + "id", ] """`Fields selection`_, by default, a few fields are returned. To request more specific fields, the ``fields`` parameter is used with the list of fields SearXNG needs in the response to build a video result list. """ -search_url = 'https://api.dailymotion.com/videos?' +search_url = "https://api.dailymotion.com/videos?" """URL to retrieve a list of videos. - `REST GET`_ @@ -96,42 +100,42 @@ def request(query, params): if not query: return False - eng_region: str = traits.get_region(params['searxng_locale'], 'en_US') # type: ignore - eng_lang = traits.get_language(params['searxng_locale'], 'en') + eng_region: str = traits.get_region(params["searxng_locale"], "en_US") # type: ignore + eng_lang = traits.get_language(params["searxng_locale"], "en") args = { - 'search': query, - 'family_filter': family_filter_map.get(params['safesearch'], 'false'), - 'thumbnail_ratio': 'original', # original|widescreen|square + "search": query, + "family_filter": family_filter_map.get(params["safesearch"], "false"), + "thumbnail_ratio": "original", # original|widescreen|square # https://developers.dailymotion.com/api/#video-filters - 'languages': eng_lang, - 'page': params['pageno'], - 'password_protected': 'false', - 'private': 'false', - 'sort': 'relevance', - 'limit': number_of_results, - 'fields': ','.join(result_fields), + "languages": eng_lang, + "page": params["pageno"], + "password_protected": "false", + "private": "false", + "sort": "relevance", + "limit": number_of_results, + "fields": ",".join(result_fields), } - args.update(safesearch_params.get(params['safesearch'], {})) + args.update(safesearch_params.get(params["safesearch"], {})) # Don't add localization and country arguments if the user does select a # language (:de, :en, ..) - if len(params['searxng_locale'].split('-')) > 1: + if len(params["searxng_locale"].split("-")) > 1: # https://developers.dailymotion.com/api/#global-parameters - args['localization'] = eng_region - args['country'] = eng_region.split('_')[1] + args["localization"] = eng_region + args["country"] = eng_region.split("_")[1] # Insufficient rights for the `ams_country' parameter of route `GET /videos' # 'ams_country': eng_region.split('_')[1], time_delta = time_delta_dict.get(params["time_range"]) if time_delta: created_after = datetime.now() - time_delta - args['created_after'] = datetime.timestamp(created_after) + args["created_after"] = datetime.timestamp(created_after) query_str = urlencode(args) - params['url'] = search_url + query_str + params["url"] = search_url + query_str return params @@ -143,46 +147,45 @@ def response(resp): search_res = resp.json() # check for an API error - if 'error' in search_res: - raise SearxEngineAPIException(search_res['error'].get('message')) + if "error" in search_res: + raise SearxEngineAPIException(search_res["error"].get("message")) raise_for_httperror(resp) # parse results - for res in search_res.get('list', []): + for res in search_res.get("list", []): + title = res["title"] + url = res["url"] - title = res['title'] - url = res['url'] - - content = html_to_text(res['description']) + content = html_to_text(res["description"]) if len(content) > 300: - content = content[:300] + '...' + content = content[:300] + "..." - publishedDate = datetime.fromtimestamp(res['created_time'], None) + publishedDate = datetime.fromtimestamp(res["created_time"], None) - length = time.gmtime(res.get('duration')) + length = time.gmtime(res.get("duration")) if length.tm_hour: length = time.strftime("%H:%M:%S", length) else: length = time.strftime("%M:%S", length) - thumbnail = res['thumbnail_360_url'] + thumbnail = res["thumbnail_360_url"] thumbnail = thumbnail.replace("http://", "https://") item = { - 'template': 'videos.html', - 'url': url, - 'title': title, - 'content': content, - 'publishedDate': publishedDate, - 'length': length, - 'thumbnail': thumbnail, + "template": "videos.html", + "url": url, + "title": title, + "content": content, + "publishedDate": publishedDate, + "length": length, + "thumbnail": thumbnail, } # HINT: no mater what the value is, without API token videos can't shown # embedded - if res['allow_embed']: - item['iframe_src'] = iframe_src.format(video_id=res['id']) + if res["allow_embed"]: + item["iframe_src"] = iframe_src.format(video_id=res["id"]) results.append(item) @@ -208,13 +211,13 @@ def fetch_traits(engine_traits: EngineTraits): """ - resp = get('https://api.dailymotion.com/locales') - if not resp.ok: # type: ignore - print("ERROR: response from dailymotion/locales is not OK.") + resp = get("https://api.dailymotion.com/locales", timeout=5) + if not resp.ok: + raise RuntimeError("Response from Dailymotion locales is not OK.") - for item in resp.json()['list']: # type: ignore - eng_tag = item['locale'] - if eng_tag in ('en_EN', 'ar_AA'): + for item in resp.json()["list"]: # type: ignore + eng_tag = item["locale"] + if eng_tag in ("en_EN", "ar_AA"): continue try: sxng_tag = region_tag(babel.Locale.parse(eng_tag)) @@ -229,14 +232,14 @@ def fetch_traits(engine_traits: EngineTraits): continue engine_traits.regions[sxng_tag] = eng_tag - locale_lang_list = [x.split('_')[0] for x in engine_traits.regions.values()] + locale_lang_list = [x.split("_")[0] for x in engine_traits.regions.values()] - resp = get('https://api.dailymotion.com/languages') - if not resp.ok: # type: ignore - print("ERROR: response from dailymotion/languages is not OK.") + resp = get("https://api.dailymotion.com/languages", timeout=5) + if not resp.ok: + raise RuntimeError("Response from Dailymotion languages is not OK.") - for item in resp.json()['list']: # type: ignore - eng_tag = item['code'] + for item in resp.json()["list"]: # type: ignore + eng_tag = item["code"] if eng_tag in locale_lang_list: sxng_tag = language_tag(babel.Locale.parse(eng_tag)) engine_traits.languages[sxng_tag] = eng_tag diff --git a/searx/engines/duckduckgo.py b/searx/engines/duckduckgo.py index 8b675751f..1a424fac7 100644 --- a/searx/engines/duckduckgo.py +++ b/searx/engines/duckduckgo.py @@ -166,30 +166,27 @@ Terms / phrases that you keep coming across: """ # pylint: disable=global-statement -import typing as t import json import re +import typing as t import babel import lxml.html from searx import locales - +from searx.enginelib import EngineCache +from searx.enginelib.traits import EngineTraits +from searx.exceptions import SearxEngineCaptchaException from searx.external_bang import EXTERNAL_BANGS, get_node # type: ignore - +from searx.result_types import EngineResults from searx.utils import ( + ElementType, eval_xpath, eval_xpath_getindex, extr, extract_text, - ElementType, gen_useragent, ) -from searx.network import get # see https://github.com/searxng/searxng/issues/762 -from searx.enginelib.traits import EngineTraits -from searx.enginelib import EngineCache -from searx.exceptions import SearxEngineCaptchaException -from searx.result_types import EngineResults if t.TYPE_CHECKING: from searx.extended_types import SXNG_Response @@ -355,7 +352,7 @@ def quote_ddg_bangs(query: str) -> str: if not val.strip(): continue - if val.startswith('!') and get_node(EXTERNAL_BANGS, val[1:]): + if val.startswith("!") and get_node(EXTERNAL_BANGS, val[1:]): val = f"'{val}'" _q.append(val) return " ".join(_q) @@ -412,7 +409,8 @@ def request(query: str, params: "OnlineParams") -> None: # reputation of the SearXNG IP and DDG starts to activate CAPTCHAs. # set suspend time to zero is OK --> ddg does not block the IP raise SearxEngineCaptchaException( - suspended_time=0, message=f"VQD missed (page: {params['pageno']}, locale: {params['searxng_locale']})" + suspended_time=0, + message=f"VQD missed (page: {params['pageno']}, locale: {params['searxng_locale']})", ) if params["searxng_locale"].startswith("zh"): @@ -536,34 +534,34 @@ def fetch_traits(engine_traits: EngineTraits): """ # pylint: disable=too-many-branches, too-many-statements, disable=import-outside-toplevel + + from searx.network import get # see https://github.com/searxng/searxng/issues/762 from searx.utils import js_obj_str_to_python # fetch regions - engine_traits.all_locale = 'wt-wt' + engine_traits.all_locale = "wt-wt" # updated from u661.js to u.7669f071a13a7daa57cb / should be updated automatically? - resp = get('https://duckduckgo.com/dist/util/u.7669f071a13a7daa57cb.js') - + resp = get("https://duckduckgo.com/dist/util/u.7669f071a13a7daa57cb.js", timeout=5) if not resp.ok: - print("ERROR: response from DuckDuckGo is not OK.") + raise RuntimeError("Response from DuckDuckGo regions is not OK.") - js_code = extr(resp.text, 'regions:', ',snippetLengths') + js_code = extr(resp.text, "regions:", ",snippetLengths") regions = json.loads(js_code) for eng_tag, name in regions.items(): - - if eng_tag == 'wt-wt': - engine_traits.all_locale = 'wt-wt' + if eng_tag == "wt-wt": + engine_traits.all_locale = "wt-wt" continue region = ddg_reg_map.get(eng_tag) - if region == 'skip': + if region == "skip": continue if not region: - eng_territory, eng_lang = eng_tag.split('-') - region = eng_lang + '_' + eng_territory.upper() + eng_territory, eng_lang = eng_tag.split("-") + region = eng_lang + "_" + eng_territory.upper() try: sxng_tag = locales.region_tag(babel.Locale.parse(region)) @@ -580,25 +578,23 @@ def fetch_traits(engine_traits: EngineTraits): # fetch languages - engine_traits.custom['lang_region'] = {} + engine_traits.custom["lang_region"] = {} - js_code = extr(resp.text, 'languages:', ',regions') + js_code = extr(resp.text, "languages:", ",regions") languages: dict[str, str] = js_obj_str_to_python(js_code) for eng_lang, name in languages.items(): - - if eng_lang == 'wt_WT': + if eng_lang == "wt_WT": continue babel_tag = ddg_lang_map.get(eng_lang, eng_lang) - if babel_tag == 'skip': + if babel_tag == "skip": continue try: - - if babel_tag == 'lang_region': + if babel_tag == "lang_region": sxng_tag = locales.region_tag(babel.Locale.parse(eng_lang)) - engine_traits.custom['lang_region'][sxng_tag] = eng_lang + engine_traits.custom["lang_region"][sxng_tag] = eng_lang continue sxng_tag = locales.language_tag(babel.Locale.parse(babel_tag)) diff --git a/searx/engines/google.py b/searx/engines/google.py index 5148fcc41..601716821 100644 --- a/searx/engines/google.py +++ b/searx/engines/google.py @@ -11,40 +11,45 @@ engines: """ -import typing as t - -import re import random +import re import string import time -from urllib.parse import urlencode, unquote -from lxml import html +import typing as t +from urllib.parse import unquote, urlencode + import babel import babel.core import babel.languages +from lxml import html -from searx.utils import extract_text, eval_xpath, eval_xpath_list, eval_xpath_getindex, gen_gsa_useragent -from searx.locales import language_tag, region_tag, get_official_locales -from searx.network import get # see https://github.com/searxng/searxng/issues/762 -from searx.exceptions import SearxEngineCaptchaException from searx.enginelib.traits import EngineTraits +from searx.exceptions import SearxEngineCaptchaException +from searx.locales import get_official_locales, language_tag, region_tag from searx.result_types import EngineResults +from searx.utils import ( + eval_xpath, + eval_xpath_getindex, + eval_xpath_list, + extract_text, + gen_gsa_useragent, +) if t.TYPE_CHECKING: from searx.extended_types import SXNG_Response from searx.search.processors import OnlineParams about = { - "website": 'https://www.google.com', - "wikidata_id": 'Q9366', - "official_api_documentation": 'https://developers.google.com/custom-search/', + "website": "https://www.google.com", + "wikidata_id": "Q9366", + "official_api_documentation": "https://developers.google.com/custom-search/", "use_official_api": False, "require_api_key": False, - "results": 'HTML', + "results": "HTML", } # engine dependent config -categories = ['general', 'web'] +categories = ["general", "web"] paging = True max_page = 50 """`Google max 50 pages`_ @@ -54,10 +59,10 @@ max_page = 50 time_range_support = True safesearch = True -time_range_dict = {'day': 'd', 'week': 'w', 'month': 'm', 'year': 'y'} +time_range_dict = {"day": "d", "week": "w", "month": "m", "year": "y"} # Filter results. 0: None, 1: Moderate, 2: Strict -filter_mapping = {0: 'off', 1: 'medium', 2: 'high'} +filter_mapping = {0: "off", 1: "medium", 2: "high"} # specific xpath variables # ------------------------ @@ -87,7 +92,7 @@ def ui_async(start: int) -> str: # create a new random arc_id every hour if not _arcid_random or (int(time.time()) - _arcid_random[1]) > 3600: - _arcid_random = (''.join(random.choices(_arcid_range, k=23)), int(time.time())) + _arcid_random = ("".join(random.choices(_arcid_range, k=23)), int(time.time())) arc_id = f"arc_id:srp_{_arcid_random[0]}_1{start:02}" return ",".join([arc_id, use_ac, _fmt]) @@ -149,23 +154,23 @@ def get_google_info(params: "OnlineParams", eng_traits: EngineTraits) -> dict[st """ ret_val: dict[str, t.Any] = { - 'language': None, - 'country': None, - 'subdomain': None, - 'params': {}, - 'headers': {}, - 'cookies': {}, - 'locale': None, + "language": None, + "country": None, + "subdomain": None, + "params": {}, + "headers": {}, + "cookies": {}, + "locale": None, } - sxng_locale = params.get('searxng_locale', 'all') + sxng_locale = params.get("searxng_locale", "all") try: - locale = babel.Locale.parse(sxng_locale, sep='-') + locale = babel.Locale.parse(sxng_locale, sep="-") except babel.core.UnknownLocaleError: locale = None - eng_lang = eng_traits.get_language(sxng_locale, 'lang_en') - lang_code = eng_lang.split('_')[-1] # lang_zh-TW --> zh-TW / lang_en --> en + eng_lang = eng_traits.get_language(sxng_locale, "lang_en") + lang_code = eng_lang.split("_")[-1] # lang_zh-TW --> zh-TW / lang_en --> en country = eng_traits.get_region(sxng_locale, eng_traits.all_locale) # Test zh_hans & zh_hant --> in the topmost links in the result list of list @@ -176,10 +181,10 @@ def get_google_info(params: "OnlineParams", eng_traits: EngineTraits) -> dict[st # '!go 日 :zh-TW' --> https://zh.m.wiktionary.org/zh-hant/%E6%97%A5 # '!go 日 :zh-CN' --> https://zh.m.wikipedia.org/zh/%E6%97%A5 - ret_val['language'] = eng_lang - ret_val['country'] = country - ret_val['locale'] = locale - ret_val['subdomain'] = eng_traits.custom['supported_domains'].get(country.upper(), 'www.google.com') + ret_val["language"] = eng_lang + ret_val["country"] = country + ret_val["locale"] = locale + ret_val["subdomain"] = eng_traits.custom["supported_domains"].get(country.upper(), "www.google.com") # hl parameter: # The hl parameter specifies the interface language (host language) of @@ -191,7 +196,7 @@ def get_google_info(params: "OnlineParams", eng_traits: EngineTraits) -> dict[st # https://developers.google.com/custom-search/docs/xml_results_appendices#interfaceLanguages # https://github.com/searxng/searxng/issues/2515#issuecomment-1607150817 - ret_val['params']['hl'] = f'{lang_code}-{country}' + ret_val["params"]["hl"] = f"{lang_code}-{country}" # lr parameter: # The lr (language restrict) parameter restricts search results to @@ -207,9 +212,9 @@ def get_google_info(params: "OnlineParams", eng_traits: EngineTraits) -> dict[st # By example: &lr=lang_zh-TW%7Clang_de selects articles written in # traditional chinese OR german language. - ret_val['params']['lr'] = eng_lang - if sxng_locale == 'all': - ret_val['params']['lr'] = '' + ret_val["params"]["lr"] = eng_lang + if sxng_locale == "all": + ret_val["params"]["lr"] = "" # cr parameter: # The cr parameter restricts search results to documents originating in a @@ -218,9 +223,9 @@ def get_google_info(params: "OnlineParams", eng_traits: EngineTraits) -> dict[st # specify a region (country) only if a region is given in the selected # locale --> https://github.com/searxng/searxng/issues/2672 - ret_val['params']['cr'] = '' - if len(sxng_locale.split('-')) > 1: - ret_val['params']['cr'] = 'country' + country + ret_val["params"]["cr"] = "" + if len(sxng_locale.split("-")) > 1: + ret_val["params"]["cr"] = "country" + country # gl parameter: (mandatory by Google News) # The gl parameter value is a two-letter country code. For WebSearch @@ -241,14 +246,14 @@ def get_google_info(params: "OnlineParams", eng_traits: EngineTraits) -> dict[st # to interpret the query string. The default ie value is latin1. # https://developers.google.com/custom-search/docs/xml_results#iesp - ret_val['params']['ie'] = 'utf8' + ret_val["params"]["ie"] = "utf8" # oe parameter: # The oe parameter sets the character encoding scheme that should be used # to decode the XML result. The default oe value is latin1. # https://developers.google.com/custom-search/docs/xml_results#oesp - ret_val['params']['oe'] = 'utf8' + ret_val["params"]["oe"] = "utf8" # num parameter: # The num parameter identifies the number of search results to return. @@ -261,43 +266,43 @@ def get_google_info(params: "OnlineParams", eng_traits: EngineTraits) -> dict[st # HTTP headers - ret_val['headers']['Accept'] = '*/*' - ret_val['headers']['User-Agent'] = gen_gsa_useragent() + ret_val["headers"]["Accept"] = "*/*" + ret_val["headers"]["User-Agent"] = gen_gsa_useragent() # Cookies # - https://github.com/searxng/searxng/pull/1679#issuecomment-1235432746 # - https://github.com/searxng/searxng/issues/1555 - ret_val['cookies']['CONSENT'] = "YES+" + ret_val["cookies"]["CONSENT"] = "YES+" return ret_val def detect_google_sorry(resp): - if resp.url.host == 'sorry.google.com' or resp.url.path.startswith('/sorry'): + if resp.url.host == "sorry.google.com" or resp.url.path.startswith("/sorry"): raise SearxEngineCaptchaException() def request(query: str, params: "OnlineParams") -> None: """Google search request""" # pylint: disable=line-too-long - start = (params['pageno'] - 1) * 10 + start = (params["pageno"] - 1) * 10 str_async = ui_async(start) google_info = get_google_info(params, traits) logger.debug("ARC_ID: %s", str_async) # https://www.google.de/search?q=corona&hl=de&lr=lang_de&start=0&tbs=qdr%3Ad&safe=medium query_url = ( - 'https://' - + google_info['subdomain'] - + '/search' + "https://" + + google_info["subdomain"] + + "/search" + "?" + urlencode( { - 'q': query, - **google_info['params'], - 'filter': '0', - 'start': start, + "q": query, + **google_info["params"], + "filter": "0", + "start": start, # 'vet': '12ahUKEwik3ZbIzfn7AhXMX_EDHbUDBh0QxK8CegQIARAC..i', # 'ved': '2ahUKEwik3ZbIzfn7AhXMX_EDHbUDBh0Q_skCegQIARAG', # 'cs' : 1, @@ -308,20 +313,20 @@ def request(query: str, params: "OnlineParams") -> None: # 'sa': 'N', # 'sstk': 'AcOHfVkD7sWCSAheZi-0tx_09XDO55gTWY0JNq3_V26cNN-c8lfD45aZYPI8s_Bqp8s57AHz5pxchDtAGCA_cikAWSjy9kw3kgg' # formally known as use_mobile_ui - 'asearch': 'arc', - 'async': str_async, + "asearch": "arc", + "async": str_async, } ) ) - if params['time_range'] in time_range_dict: - query_url += '&' + urlencode({'tbs': 'qdr:' + time_range_dict[params['time_range']]}) - if params['safesearch']: - query_url += '&' + urlencode({'safe': filter_mapping[params['safesearch']]}) - params['url'] = query_url + if params["time_range"] in time_range_dict: + query_url += "&" + urlencode({"tbs": "qdr:" + time_range_dict[params["time_range"]]}) + if params["safesearch"]: + query_url += "&" + urlencode({"safe": filter_mapping[params["safesearch"]]}) + params["url"] = query_url - params['cookies'] = google_info['cookies'] - params['headers'].update(google_info['headers']) + params["cookies"] = google_info["cookies"] + params["headers"].update(google_info["headers"]) # =26;[3,"dimg_ZNMiZPCqE4apxc8P3a2tuAQ_137"]a87;data:image/jpeg;base64,/9j/4AAQSkZJRgABA @@ -334,14 +339,14 @@ def parse_data_images(text: str): data_image_map = {} for img_id, data_image in RE_DATA_IMAGE.findall(text): - end_pos = data_image.rfind('=') + end_pos = data_image.rfind("=") if end_pos > 0: data_image = data_image[: end_pos + 1] data_image_map[img_id] = data_image last = RE_DATA_IMAGE_end.search(text) if last: data_image_map[last.group(1)] = last.group(2) - logger.debug('data:image objects --> %s', list(data_image_map.keys())) + logger.debug("data:image objects --> %s", list(data_image_map.keys())) return data_image_map @@ -365,15 +370,18 @@ def response(resp: "SXNG_Response"): title_tag = eval_xpath_getindex(result, './/div[contains(@role, "link")]', 0, default=None) if title_tag is None: # this not one of the common google results *section* - logger.debug('ignoring item from the result_xpath list: missing title') + logger.debug("ignoring item from the result_xpath list: missing title") continue title = extract_text(title_tag) - raw_url = eval_xpath_getindex(result, './/a/@href', 0, None) + raw_url = eval_xpath_getindex(result, ".//a/@href", 0, None) if raw_url is None: - logger.debug('ignoring item from the result_xpath list: missing url of title "%s"', title) + logger.debug( + 'ignoring item from the result_xpath list: missing url of title "%s"', + title, + ) continue - url = unquote(raw_url[7:].split('&sa=U')[0]) # remove the google redirector + url = unquote(raw_url[7:].split("&sa=U")[0]) # remove the google redirector content_nodes = eval_xpath(result, './/div[contains(@data-sncf, "1")]') for item in content_nodes: @@ -383,20 +391,23 @@ def response(resp: "SXNG_Response"): content = extract_text(content_nodes) if not content: - logger.debug('ignoring item from the result_xpath list: missing content of title "%s"', title) + logger.debug( + 'ignoring item from the result_xpath list: missing content of title "%s"', + title, + ) continue - thumbnail = content_nodes[0].xpath('.//img/@src') + thumbnail = content_nodes[0].xpath(".//img/@src") if thumbnail: thumbnail = thumbnail[0] - if thumbnail.startswith('data:image'): - img_id = content_nodes[0].xpath('.//img/@id') + if thumbnail.startswith("data:image"): + img_id = content_nodes[0].xpath(".//img/@id") if img_id: thumbnail = data_image_map.get(img_id[0]) else: thumbnail = None - results.append({'url': url, 'title': title, 'content': content, 'thumbnail': thumbnail}) + results.append({"url": url, "title": title, "content": content, "thumbnail": thumbnail}) except Exception as e: # pylint: disable=broad-except logger.error(e, exc_info=True) @@ -405,7 +416,7 @@ def response(resp: "SXNG_Response"): # parse suggestion for suggestion in eval_xpath_list(dom, suggestion_xpath): # append suggestion - results.append({'suggestion': extract_text(suggestion)}) + results.append({"suggestion": extract_text(suggestion)}) # return results return results @@ -416,27 +427,27 @@ def response(resp: "SXNG_Response"): skip_countries = [ # official language of google-country not in google-languages - 'AL', # Albanien (sq) - 'AZ', # Aserbaidschan (az) - 'BD', # Bangladesch (bn) - 'BN', # Brunei Darussalam (ms) - 'BT', # Bhutan (dz) - 'ET', # Äthiopien (am) - 'GE', # Georgien (ka, os) - 'GL', # Grönland (kl) - 'KH', # Kambodscha (km) - 'LA', # Laos (lo) - 'LK', # Sri Lanka (si, ta) - 'ME', # Montenegro (sr) - 'MK', # Nordmazedonien (mk, sq) - 'MM', # Myanmar (my) - 'MN', # Mongolei (mn) - 'MV', # Malediven (dv) // dv_MV is unknown by babel - 'MY', # Malaysia (ms) - 'NP', # Nepal (ne) - 'TJ', # Tadschikistan (tg) - 'TM', # Turkmenistan (tk) - 'UZ', # Usbekistan (uz) + "AL", # Albanien (sq) + "AZ", # Aserbaidschan (az) + "BD", # Bangladesch (bn) + "BN", # Brunei Darussalam (ms) + "BT", # Bhutan (dz) + "ET", # Äthiopien (am) + "GE", # Georgien (ka, os) + "GL", # Grönland (kl) + "KH", # Kambodscha (km) + "LA", # Laos (lo) + "LK", # Sri Lanka (si, ta) + "ME", # Montenegro (sr) + "MK", # Nordmazedonien (mk, sq) + "MM", # Myanmar (my) + "MN", # Mongolei (mn) + "MV", # Malediven (dv) // dv_MV is unknown by babel + "MY", # Malaysia (ms) + "NP", # Nepal (ne) + "TJ", # Tadschikistan (tg) + "TM", # Turkmenistan (tk) + "UZ", # Usbekistan (uz) ] @@ -444,21 +455,23 @@ def fetch_traits(engine_traits: EngineTraits, add_domains: bool = True): """Fetch languages from Google.""" # pylint: disable=import-outside-toplevel, too-many-branches - engine_traits.custom['supported_domains'] = {} + from searx.network import get # see https://github.com/searxng/searxng/issues/762 - resp = get('https://www.google.com/preferences') - if not resp.ok: # type: ignore - raise RuntimeError("Response from Google's preferences is not OK.") + engine_traits.custom["supported_domains"] = {} - dom = html.fromstring(resp.text.replace('', '')) + resp = get("https://www.google.com/preferences", timeout=5) + if not resp.ok: + raise RuntimeError("Response from Google preferences is not OK.") + + dom = html.fromstring(resp.text.replace('', "")) # supported language codes - lang_map = {'no': 'nb'} + lang_map = {"no": "nb"} for x in eval_xpath_list(dom, "//select[@name='hl']/option"): eng_lang = x.get("value") try: - locale = babel.Locale.parse(lang_map.get(eng_lang, eng_lang), sep='-') + locale = babel.Locale.parse(lang_map.get(eng_lang, eng_lang), sep="-") except babel.UnknownLocaleError: print("INFO: google UI language %s (%s) is unknown by babel" % (eng_lang, x.text.split("(")[0].strip())) continue @@ -469,10 +482,10 @@ def fetch_traits(engine_traits: EngineTraits, add_domains: bool = True): if conflict != eng_lang: print("CONFLICT: babel %s --> %s, %s" % (sxng_lang, conflict, eng_lang)) continue - engine_traits.languages[sxng_lang] = 'lang_' + eng_lang + engine_traits.languages[sxng_lang] = "lang_" + eng_lang # alias languages - engine_traits.languages['zh'] = 'lang_zh-CN' + engine_traits.languages["zh"] = "lang_zh-CN" # supported region codes @@ -481,37 +494,37 @@ def fetch_traits(engine_traits: EngineTraits, add_domains: bool = True): if eng_country in skip_countries: continue - if eng_country == 'ZZ': - engine_traits.all_locale = 'ZZ' + if eng_country == "ZZ": + engine_traits.all_locale = "ZZ" continue sxng_locales = get_official_locales(eng_country, engine_traits.languages.keys(), regional=True) if not sxng_locales: - print("ERROR: can't map from google country %s (%s) to a babel region." % (x.get('data-name'), eng_country)) + print("ERROR: can't map from google country %s (%s) to a babel region." % (x.get("data-name"), eng_country)) continue for sxng_locale in sxng_locales: engine_traits.regions[region_tag(sxng_locale)] = eng_country # alias regions - engine_traits.regions['zh-CN'] = 'HK' + engine_traits.regions["zh-CN"] = "HK" # supported domains if add_domains: - resp = get('https://www.google.com/supported_domains') - if not resp.ok: # type: ignore - raise RuntimeError("Response from https://www.google.com/supported_domains is not OK.") + resp = get("https://www.google.com/supported_domains", timeout=5) + if not resp.ok: + raise RuntimeError("Response from Google supported domains is not OK.") - for domain in resp.text.split(): # type: ignore + for domain in resp.text.split(): domain = domain.strip() if not domain or domain in [ - '.google.com', + ".google.com", ]: continue - region = domain.split('.')[-1].upper() - engine_traits.custom['supported_domains'][region] = 'www' + domain # type: ignore - if region == 'HK': + region = domain.split(".")[-1].upper() + engine_traits.custom["supported_domains"][region] = "www" + domain + if region == "HK": # There is no google.cn, we use .com.hk for zh-CN - engine_traits.custom['supported_domains']['CN'] = 'www' + domain # type: ignore + engine_traits.custom["supported_domains"]["CN"] = "www" + domain diff --git a/searx/engines/mojeek.py b/searx/engines/mojeek.py index 596f90bfd..b04c9a9c5 100644 --- a/searx/engines/mojeek.py +++ b/searx/engines/mojeek.py @@ -3,19 +3,20 @@ from datetime import datetime from urllib.parse import urlencode -from lxml import html from dateutil.relativedelta import relativedelta -from searx.utils import eval_xpath, eval_xpath_list, extract_text +from lxml import html + from searx.enginelib.traits import EngineTraits +from searx.utils import eval_xpath, eval_xpath_list, extract_text about = { - 'website': 'https://mojeek.com', - 'wikidata_id': 'Q60747299', - 'official_api_documentation': 'https://www.mojeek.com/support/api/search/request_parameters.html', - 'use_official_api': False, - 'require_api_key': False, - 'results': 'HTML', + "website": "https://mojeek.com", + "wikidata_id": "Q60747299", + "official_api_documentation": "https://www.mojeek.com/support/api/search/request_parameters.html", + "use_official_api": False, + "require_api_key": False, + "results": "HTML", } paging = True # paging is only supported for general search safesearch = True @@ -28,53 +29,53 @@ categories = ["general", "web"] search_type = "" # leave blank for general, other possible values: images, news results_xpath = '//ul[@class="results-standard"]/li/a[@class="ob"]' -url_xpath = './@href' -title_xpath = '../h2/a' +url_xpath = "./@href" +title_xpath = "../h2/a" content_xpath = '..//p[@class="s"]' suggestion_xpath = '//div[@class="top-info"]/p[@class="top-info spell"]/em/a' image_results_xpath = '//div[@id="results"]/div[contains(@class, "image")]' -image_url_xpath = './a/@href' -image_title_xpath = './a/@data-title' -image_img_src_xpath = './a/img/@src' +image_url_xpath = "./a/@href" +image_title_xpath = "./a/@data-title" +image_img_src_xpath = "./a/img/@src" news_results_xpath = '//section[contains(@class, "news-search-result")]//article' -news_url_xpath = './/h2/a/@href' -news_title_xpath = './/h2/a' +news_url_xpath = ".//h2/a/@href" +news_title_xpath = ".//h2/a" news_content_xpath = './/p[@class="s"]' -language_param = 'lb' -region_param = 'arc' +language_param = "lb" +region_param = "arc" -_delta_kwargs = {'day': 'days', 'week': 'weeks', 'month': 'months', 'year': 'years'} +_delta_kwargs = {"day": "days", "week": "weeks", "month": "months", "year": "years"} def init(_): - if search_type not in ('', 'images', 'news'): + if search_type not in ("", "images", "news"): raise ValueError(f"Invalid search type {search_type}") def request(query, params): args = { - 'q': query, - 'safe': min(params['safesearch'], 1), - language_param: traits.get_language(params['searxng_locale'], traits.custom['language_all']), - region_param: traits.get_region(params['searxng_locale'], traits.custom['region_all']), + "q": query, + "safe": min(params["safesearch"], 1), + language_param: traits.get_language(params["searxng_locale"], traits.custom["language_all"]), + region_param: traits.get_region(params["searxng_locale"], traits.custom["region_all"]), } if search_type: - args['fmt'] = search_type + args["fmt"] = search_type # setting the page number on the first page (i.e. s=0) triggers a rate-limit - if search_type == '' and params['pageno'] > 1: - args['s'] = 10 * (params['pageno'] - 1) + if search_type == "" and params["pageno"] > 1: + args["s"] = 10 * (params["pageno"] - 1) - if params['time_range'] and search_type != 'images': - kwargs = {_delta_kwargs[params['time_range']]: 1} + if params["time_range"] and search_type != "images": + kwargs = {_delta_kwargs[params["time_range"]]: 1} args["since"] = (datetime.now() - relativedelta(**kwargs)).strftime("%Y%m%d") # type: ignore logger.debug(args["since"]) - params['url'] = f"{base_url}/search?{urlencode(args)}" + params["url"] = f"{base_url}/search?{urlencode(args)}" return params @@ -85,14 +86,14 @@ def _general_results(dom): for result in eval_xpath_list(dom, results_xpath): results.append( { - 'url': extract_text(eval_xpath(result, url_xpath)), - 'title': extract_text(eval_xpath(result, title_xpath)), - 'content': extract_text(eval_xpath(result, content_xpath)), + "url": extract_text(eval_xpath(result, url_xpath)), + "title": extract_text(eval_xpath(result, title_xpath)), + "content": extract_text(eval_xpath(result, content_xpath)), } ) for suggestion in eval_xpath(dom, suggestion_xpath): - results.append({'suggestion': extract_text(suggestion)}) + results.append({"suggestion": extract_text(suggestion)}) return results @@ -103,11 +104,11 @@ def _image_results(dom): for result in eval_xpath_list(dom, image_results_xpath): results.append( { - 'template': 'images.html', - 'url': extract_text(eval_xpath(result, image_url_xpath)), - 'title': extract_text(eval_xpath(result, image_title_xpath)), - 'img_src': base_url + extract_text(eval_xpath(result, image_img_src_xpath)), # type: ignore - 'content': '', + "template": "images.html", + "url": extract_text(eval_xpath(result, image_url_xpath)), + "title": extract_text(eval_xpath(result, image_title_xpath)), + "img_src": base_url + extract_text(eval_xpath(result, image_img_src_xpath)), # type: ignore + "content": "", } ) @@ -120,9 +121,9 @@ def _news_results(dom): for result in eval_xpath_list(dom, news_results_xpath): results.append( { - 'url': extract_text(eval_xpath(result, news_url_xpath)), - 'title': extract_text(eval_xpath(result, news_title_xpath)), - 'content': extract_text(eval_xpath(result, news_content_xpath)), + "url": extract_text(eval_xpath(result, news_url_xpath)), + "title": extract_text(eval_xpath(result, news_title_xpath)), + "content": extract_text(eval_xpath(result, news_content_xpath)), } ) @@ -132,13 +133,13 @@ def _news_results(dom): def response(resp): dom = html.fromstring(resp.text) - if search_type == '': + if search_type == "": return _general_results(dom) - if search_type == 'images': + if search_type == "images": return _image_results(dom) - if search_type == 'news': + if search_type == "news": return _news_results(dom) raise ValueError(f"Invalid search type {search_type}") @@ -146,17 +147,26 @@ def response(resp): def fetch_traits(engine_traits: EngineTraits): # pylint: disable=import-outside-toplevel - from searx import network - from searx.locales import get_official_locales, region_tag - from babel import Locale, UnknownLocaleError import contextlib - resp = network.get(base_url + "/preferences", headers={'Accept-Language': 'en-US,en;q=0.5'}) - dom = html.fromstring(resp.text) # type: ignore + from babel import Locale, UnknownLocaleError + + from searx.locales import get_official_locales, region_tag + from searx.network import get # see https://github.com/searxng/searxng/issues/762 + + resp = get( + base_url + "/preferences", + headers={"Accept-Language": "en-US,en;q=0.5"}, + timeout=5, + ) + if not resp.ok: + raise RuntimeError("Response from Mojeek is not OK.") + + dom = html.fromstring(resp.text) languages = eval_xpath_list(dom, f'//select[@name="{language_param}"]/option/@value') - engine_traits.custom['language_all'] = languages[0] + engine_traits.custom["language_all"] = languages[0] for code in languages[1:]: with contextlib.suppress(UnknownLocaleError): @@ -165,7 +175,7 @@ def fetch_traits(engine_traits: EngineTraits): regions = eval_xpath_list(dom, f'//select[@name="{region_param}"]/option/@value') - engine_traits.custom['region_all'] = regions[1] + engine_traits.custom["region_all"] = regions[1] for code in regions[2:]: for locale in get_official_locales(code, engine_traits.languages): diff --git a/searx/engines/odysee.py b/searx/engines/odysee.py index 64bde3b0e..73e779ea1 100644 --- a/searx/engines/odysee.py +++ b/searx/engines/odysee.py @@ -5,14 +5,13 @@ """ import time -from urllib.parse import urlencode from datetime import datetime +from urllib.parse import urlencode import babel -from searx.network import get -from searx.locales import language_tag from searx.enginelib.traits import EngineTraits +from searx.locales import language_tag # Engine metadata about = { @@ -28,7 +27,7 @@ about = { paging = True time_range_support = True results_per_page = 20 -categories = ['videos'] +categories = ["videos"] # Search URL (Note: lighthouse.lbry.com/search works too, and may be faster at times) base_url = "https://lighthouse.odysee.tv/search" @@ -51,12 +50,12 @@ def request(query, params): "mediaType": "video", } - lang = traits.get_language(params['searxng_locale'], None) + lang = traits.get_language(params["searxng_locale"], None) if lang is not None: - query_params['language'] = lang + query_params["language"] = lang - if params['time_range'] in time_range_dict: - query_params['time_filter'] = time_range_dict[params['time_range']] + if params["time_range"] in time_range_dict: + query_params["time_filter"] = time_range_dict[params["time_range"]] params["url"] = f"{base_url}?{urlencode(query_params)}" return params @@ -114,15 +113,16 @@ def fetch_traits(engine_traits: EngineTraits): """ Fetch languages from Odysee's source code. """ + # pylint: disable=import-outside-toplevel + + from searx.network import get # see https://github.com/searxng/searxng/issues/762 resp = get( - 'https://raw.githubusercontent.com/OdyseeTeam/odysee-frontend/master/ui/constants/supported_browser_languages.js', # pylint: disable=line-too-long - timeout=60, + "https://raw.githubusercontent.com/OdyseeTeam/odysee-frontend/master/ui/constants/supported_browser_languages.js", # pylint: disable=line-too-long + timeout=5, ) - if not resp.ok: - print("ERROR: can't determine languages from Odysee") - return + raise RuntimeError("Response from Odysee is not OK.") for line in resp.text.split("\n")[1:-4]: lang_tag = line.strip().split(": ")[0].replace("'", "") diff --git a/searx/engines/peertube.py b/searx/engines/peertube.py index b781c6205..ee2b1ce28 100644 --- a/searx/engines/peertube.py +++ b/searx/engines/peertube.py @@ -5,26 +5,25 @@ """ import re -from urllib.parse import urlencode from datetime import datetime, timedelta +from urllib.parse import urlencode + +import babel from dateutil.parser import parse from dateutil.relativedelta import relativedelta -import babel - -from searx.network import get # see https://github.com/searxng/searxng/issues/762 +from searx.enginelib.traits import EngineTraits from searx.locales import language_tag from searx.utils import html_to_text, humanize_number -from searx.enginelib.traits import EngineTraits about = { # pylint: disable=line-too-long - "website": 'https://joinpeertube.org', - "wikidata_id": 'Q50938515', - "official_api_documentation": 'https://docs.joinpeertube.org/api-rest-reference.html#tag/Search/operation/searchVideos', + "website": "https://joinpeertube.org", + "wikidata_id": "Q50938515", + "official_api_documentation": "https://docs.joinpeertube.org/api-rest-reference.html#tag/Search/operation/searchVideos", "use_official_api": True, "require_api_key": False, - "results": 'JSON', + "results": "JSON", } # engine dependent config @@ -38,14 +37,14 @@ base_url = "https://peer.tube" time_range_support = True time_range_table = { - 'day': relativedelta(), - 'week': relativedelta(weeks=-1), - 'month': relativedelta(months=-1), - 'year': relativedelta(years=-1), + "day": relativedelta(), + "week": relativedelta(weeks=-1), + "month": relativedelta(months=-1), + "year": relativedelta(years=-1), } safesearch = True -safesearch_table = {0: 'both', 1: 'false', 2: 'false'} +safesearch_table = {0: "both", 1: "false", 2: "false"} def request(query, params): @@ -55,32 +54,32 @@ def request(query, params): return False # eng_region = traits.get_region(params['searxng_locale'], 'en_US') - eng_lang = traits.get_language(params['searxng_locale'], None) + eng_lang = traits.get_language(params["searxng_locale"], None) - params['url'] = ( + params["url"] = ( base_url.rstrip("/") + "/api/v1/search/videos?" + urlencode( { - 'search': query, - 'searchTarget': 'search-index', # Vidiversum - 'resultType': 'videos', - 'start': (params['pageno'] - 1) * 10, - 'count': 10, + "search": query, + "searchTarget": "search-index", # Vidiversum + "resultType": "videos", + "start": (params["pageno"] - 1) * 10, + "count": 10, # -createdAt: sort by date ascending / createdAt: date descending - 'sort': '-match', # sort by *match descending* - 'nsfw': safesearch_table[params['safesearch']], + "sort": "-match", # sort by *match descending* + "nsfw": safesearch_table[params["safesearch"]], } ) ) if eng_lang is not None: - params['url'] += '&languageOneOf[]=' + eng_lang - params['url'] += '&boostLanguages[]=' + eng_lang + params["url"] += "&languageOneOf[]=" + eng_lang + params["url"] += "&boostLanguages[]=" + eng_lang - if params['time_range'] in time_range_table: - time = datetime.now().date() + time_range_table[params['time_range']] - params['url'] += '&startDate=' + time.isoformat() + if params["time_range"] in time_range_table: + time = datetime.now().date() + time_range_table[params["time_range"]] + params["url"] += "&startDate=" + time.isoformat() return params @@ -95,37 +94,37 @@ def video_response(resp): json_data = resp.json() - if 'data' not in json_data: + if "data" not in json_data: return [] - for result in json_data['data']: + for result in json_data["data"]: metadata = [ x for x in [ - result.get('channel', {}).get('displayName'), - result.get('channel', {}).get('name') + '@' + result.get('channel', {}).get('host'), - ', '.join(result.get('tags', [])), + result.get("channel", {}).get("displayName"), + result.get("channel", {}).get("name") + "@" + result.get("channel", {}).get("host"), + ", ".join(result.get("tags", [])), ] if x ] - duration = result.get('duration') + duration = result.get("duration") if duration: duration = timedelta(seconds=duration) results.append( { - 'url': result['url'], - 'title': result['name'], - 'content': html_to_text(result.get('description') or ''), - 'author': result.get('account', {}).get('displayName'), - 'length': duration, - 'views': humanize_number(result['views']), - 'template': 'videos.html', - 'publishedDate': parse(result['publishedAt']), - 'iframe_src': result.get('embedUrl'), - 'thumbnail': result.get('thumbnailUrl') or result.get('previewUrl'), - 'metadata': ' | '.join(metadata), + "url": result["url"], + "title": result["name"], + "content": html_to_text(result.get("description") or ""), + "author": result.get("account", {}).get("displayName"), + "length": duration, + "views": humanize_number(result["views"]), + "template": "videos.html", + "publishedDate": parse(result["publishedAt"]), + "iframe_src": result.get("embedUrl"), + "thumbnail": result.get("thumbnailUrl") or result.get("previewUrl"), + "metadata": " | ".join(metadata), } ) @@ -142,16 +141,16 @@ def fetch_traits(engine_traits: EngineTraits): .. _videoLanguages: https://framagit.org/framasoft/peertube/search-index/-/commit/8ed5c729#3d8747f9a60695c367c70bb64efba8f403721fad_0_291 """ + # pylint: disable=import-outside-toplevel + + from searx.network import get # see https://github.com/searxng/searxng/issues/762 resp = get( - 'https://framagit.org/framasoft/peertube/search-index/-/raw/master/client/src/components/Filters.vue', - # the response from search-index repository is very slow - timeout=60, + "https://framagit.org/framasoft/peertube/search-index/-/raw/master/client/src/components/Filters.vue", + timeout=5, ) - - if not resp.ok: # type: ignore - print("ERROR: response from peertube is not OK.") - return + if not resp.ok: + raise RuntimeError("Response from Peertube is not OK.") js_lang = re.search(r"videoLanguages \(\)[^\n]+(.*?)\]", resp.text, re.DOTALL) # type: ignore if not js_lang: @@ -160,7 +159,7 @@ def fetch_traits(engine_traits: EngineTraits): for lang in re.finditer(r"\{ id: '([a-z]+)', label:", js_lang.group(1)): eng_tag = lang.group(1) - if eng_tag == 'oc': + if eng_tag == "oc": # Occitanis not known by babel, its closest relative is Catalan # but 'ca' is already in the list of engine_traits.languages --> # 'oc' will be ignored. @@ -178,5 +177,5 @@ def fetch_traits(engine_traits: EngineTraits): continue engine_traits.languages[sxng_tag] = eng_tag - engine_traits.languages['zh_Hans'] = 'zh' - engine_traits.languages['zh_Hant'] = 'zh' + engine_traits.languages["zh_Hans"] = "zh" + engine_traits.languages["zh_Hant"] = "zh" diff --git a/searx/engines/qwant.py b/searx/engines/qwant.py index b2940832d..829ee4298 100644 --- a/searx/engines/qwant.py +++ b/searx/engines/qwant.py @@ -45,19 +45,19 @@ from datetime import ( ) from json import loads from urllib.parse import urlencode -from flask_babel import gettext + import babel import lxml +from flask_babel import gettext +from searx.enginelib.traits import EngineTraits from searx.exceptions import ( - SearxEngineAPIException, - SearxEngineTooManyRequestsException, - SearxEngineCaptchaException, SearxEngineAccessDeniedException, + SearxEngineAPIException, + SearxEngineCaptchaException, + SearxEngineTooManyRequestsException, ) from searx.network import raise_for_httperror -from searx.enginelib.traits import EngineTraits - from searx.utils import ( eval_xpath, eval_xpath_list, @@ -67,12 +67,12 @@ from searx.utils import ( # about about = { - "website": 'https://www.qwant.com/', - "wikidata_id": 'Q14657870', + "website": "https://www.qwant.com/", + "wikidata_id": "Q14657870", "official_api_documentation": None, "use_official_api": True, "require_api_key": False, - "results": 'JSON', + "results": "JSON", } # engine dependent config @@ -100,10 +100,10 @@ qwant_news_locales = [ # search-url -api_url = 'https://api.qwant.com/v3/search/' +api_url = "https://api.qwant.com/v3/search/" """URL of Qwant's API (JSON)""" -web_lite_url = 'https://lite.qwant.com/' +web_lite_url = "https://lite.qwant.com/" """URL of Qwant-Lite (HTML)""" @@ -113,47 +113,44 @@ def request(query, params): if not query: return None - q_locale = traits.get_region(params["searxng_locale"], default='en_US') + q_locale = traits.get_region(params["searxng_locale"], default="en_US") - url = api_url + f'{qwant_categ}?' - args = {'q': query} - params['raise_for_httperror'] = False + url = api_url + f"{qwant_categ}?" + args = {"q": query} + params["raise_for_httperror"] = False - if qwant_categ == 'web-lite': + if qwant_categ == "web-lite": + url = web_lite_url + "?" + args["locale"] = q_locale.lower() + args["l"] = q_locale.split("_")[0] + args["s"] = params["safesearch"] + args["p"] = params["pageno"] - url = web_lite_url + '?' - args['locale'] = q_locale.lower() - args['l'] = q_locale.split('_')[0] - args['s'] = params['safesearch'] - args['p'] = params['pageno'] + params["raise_for_httperror"] = True - params['raise_for_httperror'] = True - - elif qwant_categ == 'images': - - args['count'] = 50 - args['locale'] = q_locale - args['safesearch'] = params['safesearch'] - args['tgp'] = 3 - args['offset'] = (params['pageno'] - 1) * args['count'] + elif qwant_categ == "images": + args["count"] = 50 + args["locale"] = q_locale + args["safesearch"] = params["safesearch"] + args["tgp"] = 3 + args["offset"] = (params["pageno"] - 1) * args["count"] else: # web, news, videos + args["count"] = 10 + args["locale"] = q_locale + args["safesearch"] = params["safesearch"] + args["llm"] = "false" + args["tgp"] = 3 + args["offset"] = (params["pageno"] - 1) * args["count"] - args['count'] = 10 - args['locale'] = q_locale - args['safesearch'] = params['safesearch'] - args['llm'] = 'false' - args['tgp'] = 3 - args['offset'] = (params['pageno'] - 1) * args['count'] - - params['url'] = url + urlencode(args) + params["url"] = url + urlencode(args) return params def response(resp): - if qwant_categ == 'web-lite': + if qwant_categ == "web-lite": return parse_web_lite(resp) return parse_web_api(resp) @@ -164,15 +161,15 @@ def parse_web_lite(resp): results = [] dom = lxml.html.fromstring(resp.text) - for item in eval_xpath_list(dom, '//section/article'): + for item in eval_xpath_list(dom, "//section/article"): if eval_xpath(item, "./span[contains(@class, 'tooltip')]"): # ignore randomly interspersed advertising adds continue results.append( { - 'url': extract_text(eval_xpath(item, "./span[contains(@class, 'url partner')]")), - 'title': extract_text(eval_xpath(item, './h2/a')), - 'content': extract_text(eval_xpath(item, './p')), + "url": extract_text(eval_xpath(item, "./span[contains(@class, 'url partner')]")), + "title": extract_text(eval_xpath(item, "./h2/a")), + "content": extract_text(eval_xpath(item, "./p")), } ) @@ -191,35 +188,35 @@ def parse_web_api(resp): except ValueError: search_results = {} - data = search_results.get('data', {}) + data = search_results.get("data", {}) # check for an API error - if search_results.get('status') != 'success': - error_code = data.get('error_code') + if search_results.get("status") != "success": + error_code = data.get("error_code") if error_code == 24: raise SearxEngineTooManyRequestsException() if search_results.get("data", {}).get("error_data", {}).get("captchaUrl") is not None: raise SearxEngineCaptchaException() if resp.status_code == 403: raise SearxEngineAccessDeniedException() - msg = ",".join(data.get('message', ['unknown'])) + msg = ",".join(data.get("message", ["unknown"])) raise SearxEngineAPIException(f"{msg} ({error_code})") # raise for other errors raise_for_httperror(resp) - if qwant_categ == 'web': + if qwant_categ == "web": # The WEB query contains a list named 'mainline'. This list can contain # different result types (e.g. mainline[0]['type'] returns type of the # result items in mainline[0]['items'] - mainline = data.get('result', {}).get('items', {}).get('mainline', {}) + mainline = data.get("result", {}).get("items", {}).get("mainline", {}) else: # Queries on News, Images and Videos do not have a list named 'mainline' # in the response. The result items are directly in the list # result['items']. - mainline = data.get('result', {}).get('items', []) + mainline = data.get("result", {}).get("items", []) mainline = [ - {'type': qwant_categ, 'items': mainline}, + {"type": qwant_categ, "items": mainline}, ] # return empty array if there are no results @@ -227,68 +224,66 @@ def parse_web_api(resp): return [] for row in mainline: - mainline_type = row.get('type', 'web') + mainline_type = row.get("type", "web") if mainline_type != qwant_categ: continue - if mainline_type == 'ads': + if mainline_type == "ads": # ignore adds continue - mainline_items = row.get('items', []) + mainline_items = row.get("items", []) for item in mainline_items: + title = item.get("title", None) + res_url = item.get("url", None) - title = item.get('title', None) - res_url = item.get('url', None) - - if mainline_type == 'web': - content = item['desc'] + if mainline_type == "web": + content = item["desc"] results.append( { - 'title': title, - 'url': res_url, - 'content': content, + "title": title, + "url": res_url, + "content": content, } ) - elif mainline_type == 'news': - - pub_date = item['date'] + elif mainline_type == "news": + pub_date = item["date"] if pub_date is not None: pub_date = datetime.fromtimestamp(pub_date) - news_media = item.get('media', []) + news_media = item.get("media", []) thumbnail = None if news_media: - thumbnail = news_media[0].get('pict', {}).get('url', None) + thumbnail = news_media[0].get("pict", {}).get("url", None) results.append( { - 'title': title, - 'url': res_url, - 'publishedDate': pub_date, - 'thumbnail': thumbnail, + "title": title, + "url": res_url, + "publishedDate": pub_date, + "thumbnail": thumbnail, } ) - elif mainline_type == 'images': - thumbnail = item['thumbnail'] - img_src = item['media'] + elif mainline_type == "images": + thumbnail = item["thumbnail"] + img_src = item["media"] results.append( { - 'title': title, - 'url': res_url, - 'template': 'images.html', - 'thumbnail_src': thumbnail, - 'img_src': img_src, - 'resolution': f"{item['width']} x {item['height']}", - 'img_format': item.get('thumb_type'), + "title": title, + "url": res_url, + "template": "images.html", + "thumbnail_src": thumbnail, + "img_src": img_src, + "resolution": f"{item['width']} x {item['height']}", + "img_format": item.get("thumb_type"), } ) - elif mainline_type == 'videos': + elif mainline_type == "videos": # some videos do not have a description: while qwant-video # returns an empty string, such video from a qwant-web query # miss the 'desc' key. - d, s, c = item.get('desc'), item.get('source'), item.get('channel') + d, s, c = item.get("desc"), item.get("source"), item.get("channel") content_parts = [] if d: content_parts.append(d) @@ -296,27 +291,27 @@ def parse_web_api(resp): content_parts.append("%s: %s " % (gettext("Source"), s)) if c: content_parts.append("%s: %s " % (gettext("Channel"), c)) - content = ' // '.join(content_parts) - length = item['duration'] + content = " // ".join(content_parts) + length = item["duration"] if length is not None: length = timedelta(milliseconds=length) - pub_date = item['date'] + pub_date = item["date"] if pub_date is not None: pub_date = datetime.fromtimestamp(pub_date) - thumbnail = item['thumbnail'] + thumbnail = item["thumbnail"] # from some locations (DE and others?) the s2 link do # response a 'Please wait ..' but does not deliver the thumbnail - thumbnail = thumbnail.replace('https://s2.qwant.com', 'https://s1.qwant.com', 1) + thumbnail = thumbnail.replace("https://s2.qwant.com", "https://s1.qwant.com", 1) results.append( { - 'title': title, - 'url': res_url, - 'content': content, - 'iframe_src': get_embeded_stream_url(res_url), - 'publishedDate': pub_date, - 'thumbnail': thumbnail, - 'template': 'videos.html', - 'length': length, + "title": title, + "url": res_url, + "content": content, + "iframe_src": get_embeded_stream_url(res_url), + "publishedDate": pub_date, + "thumbnail": thumbnail, + "template": "videos.html", + "length": length, } ) @@ -326,22 +321,28 @@ def parse_web_api(resp): def fetch_traits(engine_traits: EngineTraits): # pylint: disable=import-outside-toplevel - from searx import network from searx.locales import region_tag + from searx.network import get # see https://github.com/searxng/searxng/issues/762 from searx.utils import extr - resp = network.get(about['website']) - json_string = extr(resp.text, 'INITIAL_PROPS = ', '') + resp = get( + about["website"], + timeout=5, + ) + if not resp.ok: + raise RuntimeError("Response from Qwant is not OK.") + + json_string = extr(resp.text, "INITIAL_PROPS = ", "") q_initial_props = loads(json_string) - q_locales = q_initial_props.get('locales') + q_locales = q_initial_props.get("locales") eng_tag_list = set() for country, v in q_locales.items(): - for lang in v['langs']: + for lang in v["langs"]: _locale = "{lang}_{country}".format(lang=lang, country=country) - if qwant_categ == 'news' and _locale.lower() not in qwant_news_locales: + if qwant_categ == "news" and _locale.lower() not in qwant_news_locales: # qwant-news does not support all locales from qwant-web: continue @@ -349,7 +350,7 @@ def fetch_traits(engine_traits: EngineTraits): for eng_tag in eng_tag_list: try: - sxng_tag = region_tag(babel.Locale.parse(eng_tag, sep='_')) + sxng_tag = region_tag(babel.Locale.parse(eng_tag, sep="_")) except babel.UnknownLocaleError: print("ERROR: can't determine babel locale of quant's locale %s" % eng_tag) continue diff --git a/searx/engines/radio_browser.py b/searx/engines/radio_browser.py index 7aa6229c6..0133c4912 100644 --- a/searx/engines/radio_browser.py +++ b/searx/engines/radio_browser.py @@ -5,28 +5,28 @@ https://de1.api.radio-browser.info/#Advanced_station_search """ + import random import socket from urllib.parse import urlencode + import babel from flask_babel import gettext -from searx.network import get from searx.enginelib import EngineCache from searx.enginelib.traits import EngineTraits from searx.locales import language_tag - about = { - "website": 'https://www.radio-browser.info/', - "wikidata_id": 'Q111664849', - "official_api_documentation": 'https://de1.api.radio-browser.info/', + "website": "https://www.radio-browser.info/", + "wikidata_id": "Q111664849", + "official_api_documentation": "https://de1.api.radio-browser.info/", "use_official_api": True, "require_api_key": False, - "results": 'JSON', + "results": "JSON", } paging = True -categories = ['music', 'radio'] +categories = ["music", "radio"] number_of_results = 10 @@ -98,26 +98,26 @@ def request(query, params): server = random.choice(servers) args = { - 'name': query, - 'order': 'votes', - 'offset': (params['pageno'] - 1) * number_of_results, - 'limit': number_of_results, - 'hidebroken': 'true', - 'reverse': 'true', + "name": query, + "order": "votes", + "offset": (params["pageno"] - 1) * number_of_results, + "limit": number_of_results, + "hidebroken": "true", + "reverse": "true", } - if 'language' in station_filters: - lang = traits.get_language(params['searxng_locale']) # type: ignore + if "language" in station_filters: + lang = traits.get_language(params["searxng_locale"]) # type: ignore if lang: - args['language'] = lang + args["language"] = lang - if 'countrycode' in station_filters: - if len(params['searxng_locale'].split('-')) > 1: - countrycode = params['searxng_locale'].split('-')[-1].upper() - if countrycode in traits.custom['countrycodes']: # type: ignore - args['countrycode'] = countrycode + if "countrycode" in station_filters: + if len(params["searxng_locale"].split("-")) > 1: + countrycode = params["searxng_locale"].split("-")[-1].upper() + if countrycode in traits.custom["countrycodes"]: # type: ignore + args["countrycode"] = countrycode - params['url'] = f"{server}/json/stations/search?{urlencode(args)}" + params["url"] = f"{server}/json/stations/search?{urlencode(args)}" def response(resp): @@ -126,28 +126,28 @@ def response(resp): json_resp = resp.json() for result in json_resp: - url = result['homepage'] + url = result["homepage"] if not url: - url = result['url_resolved'] + url = result["url_resolved"] content = [] - tags = ', '.join(result.get('tags', '').split(',')) + tags = ", ".join(result.get("tags", "").split(",")) if tags: content.append(tags) - for x in ['state', 'country']: + for x in ["state", "country"]: v = result.get(x) if v: v = str(v).strip() content.append(v) metadata = [] - codec = result.get('codec') - if codec and codec.lower() != 'unknown': - metadata.append(f'{codec} ' + gettext('radio')) + codec = result.get("codec") + if codec and codec.lower() != "unknown": + metadata.append(f"{codec} " + gettext("radio")) for x, y in [ - (gettext('bitrate'), 'bitrate'), - (gettext('votes'), 'votes'), - (gettext('clicks'), 'clickcount'), + (gettext("bitrate"), "bitrate"), + (gettext("votes"), "votes"), + (gettext("clicks"), "clickcount"), ]: v = result.get(y) if v: @@ -155,12 +155,12 @@ def response(resp): metadata.append(f"{x} {v}") results.append( { - 'url': url, - 'title': result['name'], - 'thumbnail': result.get('favicon', '').replace("http://", "https://"), - 'content': ' | '.join(content), - 'metadata': ' | '.join(metadata), - 'iframe_src': result['url_resolved'].replace("http://", "https://"), + "url": url, + "title": result["name"], + "thumbnail": result.get("favicon", "").replace("http://", "https://"), + "content": " | ".join(content), + "metadata": " | ".join(metadata), + "iframe_src": result["url_resolved"].replace("http://", "https://"), } ) @@ -181,15 +181,32 @@ def fetch_traits(engine_traits: EngineTraits): init(None) from babel.core import get_global + from searx.network import get # see https://github.com/searxng/searxng/issues/762 + babel_reg_list = get_global("territory_languages").keys() server = server_list()[0] - language_list = get(f'{server}/json/languages').json() # type: ignore - country_list = get(f'{server}/json/countries').json() # type: ignore + + resp = get( + f"{server}/json/languages", + timeout=5, + ) + if not resp.ok: + raise RuntimeError("Response from radio-browser languages is not OK.") + + language_list = resp.json() + + resp = get( + f"{server}/json/countries", + timeout=5, + ) + if not resp.ok: + raise RuntimeError("Response from radio-browser countries is not OK.") + + country_list = resp.json() for lang in language_list: - - babel_lang = lang.get('iso_639') + babel_lang = lang.get("iso_639") if not babel_lang: # the language doesn't have any iso code, and hence can't be parsed # print(f"ERROR: lang - no iso code in {lang}") @@ -200,7 +217,7 @@ def fetch_traits(engine_traits: EngineTraits): # print(f"ERROR: language tag {babel_lang} is unknown by babel") continue - eng_tag = lang['name'] + eng_tag = lang["name"] conflict = engine_traits.languages.get(sxng_tag) if conflict: if conflict != eng_tag: @@ -211,7 +228,7 @@ def fetch_traits(engine_traits: EngineTraits): countrycodes = set() for region in country_list: # country_list contains duplicates that differ only in upper/lower case - _reg = region['iso_3166_1'].upper() + _reg = region["iso_3166_1"].upper() if _reg not in babel_reg_list: print(f"ERROR: region tag {region['iso_3166_1']} is unknown by babel") continue @@ -219,4 +236,4 @@ def fetch_traits(engine_traits: EngineTraits): countrycodes = list(countrycodes) countrycodes.sort() - engine_traits.custom['countrycodes'] = countrycodes + engine_traits.custom["countrycodes"] = countrycodes diff --git a/searx/engines/startpage.py b/searx/engines/startpage.py index 3267daee5..c688b7cb4 100644 --- a/searx/engines/startpage.py +++ b/searx/engines/startpage.py @@ -84,41 +84,48 @@ Startpage's category (for Web-search, News, Videos, ..) is set by """ # pylint: disable=too-many-statements -import typing as t - -from collections import OrderedDict import re -from unicodedata import normalize, combining +import typing as t +from collections import OrderedDict from datetime import datetime, timedelta from json import loads +from unicodedata import combining, normalize +import babel.localedata import dateutil.parser import lxml.html -import babel.localedata -from searx.utils import extr, extract_text, eval_xpath, gen_useragent, html_to_text, humanize_bytes, remove_pua_from_str -from searx.network import get # see https://github.com/searxng/searxng/issues/762 +from searx.enginelib import EngineCache +from searx.enginelib.traits import EngineTraits from searx.exceptions import SearxEngineCaptchaException from searx.locales import region_tag -from searx.enginelib.traits import EngineTraits -from searx.enginelib import EngineCache +from searx.network import get # see https://github.com/searxng/searxng/issues/762 +from searx.utils import ( + eval_xpath, + extr, + extract_text, + gen_useragent, + html_to_text, + humanize_bytes, + remove_pua_from_str, +) # about about = { - "website": 'https://startpage.com', - "wikidata_id": 'Q2333295', + "website": "https://startpage.com", + "wikidata_id": "Q2333295", "official_api_documentation": None, "use_official_api": False, "require_api_key": False, - "results": 'HTML', + "results": "HTML", } -startpage_categ = 'web' +startpage_categ = "web" """Startpage's category, visit :ref:`startpage categories`. """ # engine dependent config -categories = ['general', 'web'] +categories = ["general", "web"] paging = True max_page = 18 """Tested 18 pages maximum (argument ``page``), to be save max is set to 20.""" @@ -126,12 +133,12 @@ max_page = 18 time_range_support = True safesearch = True -time_range_dict = {'day': 'd', 'week': 'w', 'month': 'm', 'year': 'y'} -safesearch_dict = {0: '1', 1: '0', 2: '0'} +time_range_dict = {"day": "d", "week": "w", "month": "m", "year": "y"} +safesearch_dict = {0: "1", 1: "0", 2: "0"} # search-url -base_url = 'https://www.startpage.com' -search_url = base_url + '/sp/search' +base_url = "https://www.startpage.com" +search_url = base_url + "/sp/search" # specific xpath variables # ads xpath //div[@id="results"]/div[@id="sponsored"]//div[@class="result"] @@ -189,7 +196,7 @@ def get_sc_code(params): get_sc_url = base_url + "/" logger.debug("get_sc_code: querying new sc timestamp @ %s", get_sc_url) - headers = {**params['headers']} + headers = {**params["headers"]} logger.debug("get_sc_code: request headers: %s", headers) resp = get(get_sc_url, headers=headers) @@ -197,7 +204,7 @@ def get_sc_code(params): # ?? https://www.startpage.com/sp/cdn/images/filter-chevron.svg # ?? ping-back URL: https://www.startpage.com/sp/pb?sc=TLsB0oITjZ8F21 - if str(resp.url).startswith('https://www.startpage.com/sp/captcha'): + if str(resp.url).startswith("https://www.startpage.com/sp/captcha"): raise SearxEngineCaptchaException( message="get_sc_code: got redirected to https://www.startpage.com/sp/captcha", ) @@ -231,61 +238,61 @@ def request(query, params): Additionally the arguments form Startpage's search form needs to be set in HTML POST data / compare ```` elements: :py:obj:`search_form_xpath`. """ - engine_region = traits.get_region(params['searxng_locale'], 'en-US') - engine_language = traits.get_language(params['searxng_locale'], 'en') + engine_region = traits.get_region(params["searxng_locale"], "en-US") + engine_language = traits.get_language(params["searxng_locale"], "en") - params['headers']['Origin'] = base_url - params['headers']['Referer'] = base_url + '/' + params["headers"]["Origin"] = base_url + params["headers"]["Referer"] = base_url + "/" # Build form data args = { - 'query': query, - 'cat': startpage_categ, - 't': 'device', - 'sc': get_sc_code(params), - 'with_date': time_range_dict.get(params['time_range'], ''), - 'abp': '1', - 'abd': '1', - 'abe': '1', + "query": query, + "cat": startpage_categ, + "t": "device", + "sc": get_sc_code(params), + "with_date": time_range_dict.get(params["time_range"], ""), + "abp": "1", + "abd": "1", + "abe": "1", } if engine_language: - args['language'] = engine_language - args['lui'] = engine_language + args["language"] = engine_language + args["lui"] = engine_language - if params['pageno'] > 1: - args['page'] = params['pageno'] - args['segment'] = 'startpage.udog' + if params["pageno"] > 1: + args["page"] = params["pageno"] + args["segment"] = "startpage.udog" # Build cookie - lang_homepage = 'en' + lang_homepage = "en" cookie = OrderedDict() - cookie['date_time'] = 'world' - cookie['disable_family_filter'] = safesearch_dict[params['safesearch']] - cookie['disable_open_in_new_window'] = '0' - cookie['enable_post_method'] = '1' # hint: POST - cookie['enable_proxy_safety_suggest'] = '1' - cookie['enable_stay_control'] = '1' - cookie['instant_answers'] = '1' - cookie['lang_homepage'] = 's/device/%s/' % lang_homepage - cookie['num_of_results'] = '10' - cookie['suggestions'] = '1' - cookie['wt_unit'] = 'celsius' + cookie["date_time"] = "world" + cookie["disable_family_filter"] = safesearch_dict[params["safesearch"]] + cookie["disable_open_in_new_window"] = "0" + cookie["enable_post_method"] = "1" # hint: POST + cookie["enable_proxy_safety_suggest"] = "1" + cookie["enable_stay_control"] = "1" + cookie["instant_answers"] = "1" + cookie["lang_homepage"] = "s/device/%s/" % lang_homepage + cookie["num_of_results"] = "10" + cookie["suggestions"] = "1" + cookie["wt_unit"] = "celsius" if engine_language: - cookie['language'] = engine_language - cookie['language_ui'] = engine_language + cookie["language"] = engine_language + cookie["language_ui"] = engine_language if engine_region: - cookie['search_results_region'] = engine_region + cookie["search_results_region"] = engine_region - params['cookies']['preferences'] = 'N1N'.join(["%sEEE%s" % x for x in cookie.items()]) - logger.debug('cookie preferences: %s', params['cookies']['preferences']) + params["cookies"]["preferences"] = "N1N".join(["%sEEE%s" % x for x in cookie.items()]) + logger.debug("cookie preferences: %s", params["cookies"]["preferences"]) logger.debug("data: %s", args) - params['data'] = args - params['method'] = 'POST' - params['url'] = search_url + params["data"] = args + params["method"] = "POST" + params["url"] = search_url return params @@ -295,7 +302,7 @@ def _parse_published_date(content: str) -> tuple[str, datetime | None]: # check if search result starts with something like: "2 Sep 2014 ... " if re.match(r"^([1-9]|[1-2][0-9]|3[0-1]) [A-Z][a-z]{2} [0-9]{4} \.\.\. ", content): - date_pos = content.find('...') + 4 + date_pos = content.find("...") + 4 date_string = content[0 : date_pos - 5] # fix content string content = content[date_pos:] @@ -307,11 +314,11 @@ def _parse_published_date(content: str) -> tuple[str, datetime | None]: # check if search result starts with something like: "5 days ago ... " elif re.match(r"^[0-9]+ days? ago \.\.\. ", content): - date_pos = content.find('...') + 4 + date_pos = content.find("...") + 4 date_string = content[0 : date_pos - 5] # calculate datetime - published_date = datetime.now() - timedelta(days=int(re.match(r'\d+', date_string).group())) # type: ignore + published_date = datetime.now() - timedelta(days=int(re.match(r"\d+", date_string).group())) # type: ignore # fix content string content = content[date_pos:] @@ -320,88 +327,88 @@ def _parse_published_date(content: str) -> tuple[str, datetime | None]: def _get_web_result(result): - content = html_to_text(result.get('description')) + content = html_to_text(result.get("description")) content, publishedDate = _parse_published_date(content) return { - 'url': result['clickUrl'], - 'title': html_to_text(result['title']), - 'content': content, - 'publishedDate': publishedDate, + "url": result["clickUrl"], + "title": html_to_text(result["title"]), + "content": content, + "publishedDate": publishedDate, } def _get_news_result(result): - title = remove_pua_from_str(html_to_text(result['title'])) - content = remove_pua_from_str(html_to_text(result.get('description'))) + title = remove_pua_from_str(html_to_text(result["title"])) + content = remove_pua_from_str(html_to_text(result.get("description"))) publishedDate = None - if result.get('date'): - publishedDate = datetime.fromtimestamp(result['date'] / 1000) + if result.get("date"): + publishedDate = datetime.fromtimestamp(result["date"] / 1000) thumbnailUrl = None - if result.get('thumbnailUrl'): - thumbnailUrl = base_url + result['thumbnailUrl'] + if result.get("thumbnailUrl"): + thumbnailUrl = base_url + result["thumbnailUrl"] return { - 'url': result['clickUrl'], - 'title': title, - 'content': content, - 'publishedDate': publishedDate, - 'thumbnail': thumbnailUrl, + "url": result["clickUrl"], + "title": title, + "content": content, + "publishedDate": publishedDate, + "thumbnail": thumbnailUrl, } def _get_image_result(result) -> dict[str, t.Any] | None: - url = result.get('altClickUrl') + url = result.get("altClickUrl") if not url: return None thumbnailUrl = None - if result.get('thumbnailUrl'): - thumbnailUrl = base_url + result['thumbnailUrl'] + if result.get("thumbnailUrl"): + thumbnailUrl = base_url + result["thumbnailUrl"] resolution = None - if result.get('width') and result.get('height'): + if result.get("width") and result.get("height"): resolution = f"{result['width']}x{result['height']}" filesize = None - if result.get('filesize'): - size_str = ''.join(filter(str.isdigit, result['filesize'])) + if result.get("filesize"): + size_str = "".join(filter(str.isdigit, result["filesize"])) filesize = humanize_bytes(int(size_str)) return { - 'template': 'images.html', - 'url': url, - 'title': html_to_text(result['title']), - 'content': '', - 'img_src': result.get('rawImageUrl'), - 'thumbnail_src': thumbnailUrl, - 'resolution': resolution, - 'img_format': result.get('format'), - 'filesize': filesize, + "template": "images.html", + "url": url, + "title": html_to_text(result["title"]), + "content": "", + "img_src": result.get("rawImageUrl"), + "thumbnail_src": thumbnailUrl, + "resolution": resolution, + "img_format": result.get("format"), + "filesize": filesize, } def response(resp): categ = startpage_categ.capitalize() - results_raw = '{' + extr(resp.text, f"React.createElement(UIStartpage.AppSerp{categ}, {{", '}})') + '}}' + results_raw = "{" + extr(resp.text, f"React.createElement(UIStartpage.AppSerp{categ}, {{", "}})") + "}}" - if resp.headers.get('Location', '').startswith("https://www.startpage.com/sp/captcha"): + if resp.headers.get("Location", "").startswith("https://www.startpage.com/sp/captcha"): raise SearxEngineCaptchaException() results_json = loads(results_raw) - results_obj = results_json.get('render', {}).get('presenter', {}).get('regions', {}) + results_obj = results_json.get("render", {}).get("presenter", {}).get("regions", {}) results = [] - for results_categ in results_obj.get('mainline', []): - for item in results_categ.get('results', []): - if results_categ['display_type'] == 'web-google': + for results_categ in results_obj.get("mainline", []): + for item in results_categ.get("results", []): + if results_categ["display_type"] == "web-google": results.append(_get_web_result(item)) - elif results_categ['display_type'] == 'news-bing': + elif results_categ["display_type"] == "news-bing": results.append(_get_news_result(item)) - elif 'images' in results_categ['display_type']: + elif "images" in results_categ["display_type"]: item = _get_image_result(item) if item: results.append(item) @@ -415,13 +422,17 @@ def fetch_traits(engine_traits: EngineTraits): # pylint: disable=too-many-branches headers = { - 'User-Agent': gen_useragent(), - 'Accept-Language': "en-US,en;q=0.5", # bing needs to set the English language + "User-Agent": gen_useragent(), + "Accept-Language": "en-US,en;q=0.5", # bing needs to set the English language } - resp = get('https://www.startpage.com/do/settings', headers=headers) + resp = get( + "https://www.startpage.com/do/settings", + headers=headers, + timeout=5, + ) if not resp.ok: - print("ERROR: response from Startpage is not OK.") + raise RuntimeError("Response from Startpage is not OK.") dom = lxml.html.fromstring(resp.text) @@ -429,24 +440,24 @@ def fetch_traits(engine_traits: EngineTraits): sp_region_names = [] for option in dom.xpath('//form[@name="settings"]//select[@name="search_results_region"]/option'): - sp_region_names.append(option.get('value')) + sp_region_names.append(option.get("value")) for eng_tag in sp_region_names: - if eng_tag == 'all': + if eng_tag == "all": continue - babel_region_tag = {'no_NO': 'nb_NO'}.get(eng_tag, eng_tag) # norway + babel_region_tag = {"no_NO": "nb_NO"}.get(eng_tag, eng_tag) # norway - if '-' in babel_region_tag: # pyright: ignore[reportOperatorIssue] - l, r = babel_region_tag.split('-') - r = r.split('_')[-1] - sxng_tag = region_tag(babel.Locale.parse(l + '_' + r, sep='_')) + if "-" in babel_region_tag: # pyright: ignore[reportOperatorIssue] + l, r = babel_region_tag.split("-") + r = r.split("_")[-1] + sxng_tag = region_tag(babel.Locale.parse(l + "_" + r, sep="_")) else: try: - sxng_tag = region_tag(babel.Locale.parse(babel_region_tag, sep='_')) + sxng_tag = region_tag(babel.Locale.parse(babel_region_tag, sep="_")) except babel.UnknownLocaleError: - print("ERROR: can't determine babel locale of startpage's locale %s" % eng_tag) + print("IGNORE: can't determine babel locale of startpage's locale %s" % eng_tag) continue conflict = engine_traits.regions.get(sxng_tag) @@ -458,21 +469,24 @@ def fetch_traits(engine_traits: EngineTraits): # languages - catalog_engine2code = {name.lower(): lang_code for lang_code, name in babel.Locale('en').languages.items()} + catalog_engine2code = {name.lower(): lang_code for lang_code, name in babel.Locale("en").languages.items()} # get the native name of every language known by babel - for lang_code in filter(lambda lang_code: lang_code.find('_') == -1, babel.localedata.locale_identifiers()): + for lang_code in filter( + lambda lang_code: lang_code.find("_") == -1, + babel.localedata.locale_identifiers(), + ): native_name = babel.Locale(lang_code).get_language_name() if not native_name: - print(f"ERROR: language name of startpage's language {lang_code} is unknown by babel") + print(f"IGNORE: language name of startpage's language {lang_code} is unknown by babel") continue native_name = native_name.lower() # add native name exactly as it is catalog_engine2code[native_name] = lang_code # add "normalized" language name (i.e. français becomes francais and español becomes espanol) - unaccented_name = ''.join(filter(lambda c: not combining(c), normalize('NFKD', native_name))) + unaccented_name = "".join(filter(lambda c: not combining(c), normalize("NFKD", native_name))) if len(unaccented_name) == len(unaccented_name.encode()): # add only if result is ascii (otherwise "normalization" didn't work) catalog_engine2code[unaccented_name] = lang_code @@ -481,31 +495,35 @@ def fetch_traits(engine_traits: EngineTraits): catalog_engine2code.update( { + # Brazilian Portuguese + "brazilian": "pt_BR", # traditional chinese used in .. - 'fantizhengwen': 'zh_Hant', + "fantizhengwen": "zh_Hant", # Korean alphabet - 'hangul': 'ko', + "hangul": "ko", # Malayalam is one of 22 scheduled languages of India. - 'malayam': 'ml', - 'norsk': 'nb', - 'sinhalese': 'si', + "malayam": "ml", + "norsk": "nb", + "sinhalese": "si", } ) skip_eng_tags = { - 'english_uk', # SearXNG lang 'en' already maps to 'english' + "english_uk", # SearXNG lang 'en' already maps to 'english' } for option in dom.xpath('//form[@name="settings"]//select[@name="language"]/option'): - - eng_tag = option.get('value') + eng_tag = option.get("value") if eng_tag in skip_eng_tags: continue name = extract_text(option).lower() # type: ignore sxng_tag = catalog_engine2code.get(eng_tag) if sxng_tag is None: - sxng_tag = catalog_engine2code[name] + sxng_tag = catalog_engine2code.get(name) + if sxng_tag is None: + # silently ignore unknown languages + continue conflict = engine_traits.languages.get(sxng_tag) if conflict: diff --git a/searx/engines/wikipedia.py b/searx/engines/wikipedia.py index e5403d194..b35cde866 100644 --- a/searx/engines/wikipedia.py +++ b/searx/engines/wikipedia.py @@ -55,23 +55,22 @@ options: """ import urllib.parse -import babel +import babel from lxml import html -from searx import utils +from searx import locales, utils from searx import network as _network -from searx import locales from searx.enginelib.traits import EngineTraits # about about = { - "website": 'https://www.wikipedia.org/', - "wikidata_id": 'Q52', - "official_api_documentation": 'https://en.wikipedia.org/api/', + "website": "https://www.wikipedia.org/", + "wikidata_id": "Q52", + "official_api_documentation": "https://en.wikipedia.org/api/", "use_official_api": True, "require_api_key": False, - "results": 'JSON', + "results": "JSON", } display_type = ["infobox"] @@ -79,18 +78,18 @@ display_type = ["infobox"] one will add a hit to the result list. The first one will show a hit in the info box. Both values can be set, or one of the two can be set.""" -list_of_wikipedias = 'https://meta.wikimedia.org/wiki/List_of_Wikipedias' +list_of_wikipedias = "https://meta.wikimedia.org/wiki/List_of_Wikipedias" """`List of all wikipedias `_ """ -wikipedia_article_depth = 'https://meta.wikimedia.org/wiki/Wikipedia_article_depth' +wikipedia_article_depth = "https://meta.wikimedia.org/wiki/Wikipedia_article_depth" """The *editing depth* of Wikipedia is one of several possible rough indicators of the encyclopedia's collaborative quality, showing how frequently its articles are updated. The measurement of depth was introduced after some limitations of the classic measurement of article count were realized. """ -rest_v1_summary_url = 'https://{wiki_netloc}/api/rest_v1/page/summary/{title}' +rest_v1_summary_url = "https://{wiki_netloc}/api/rest_v1/page/summary/{title}" """ `wikipedia rest_v1 summary API`_: The summary response includes an extract of the first paragraph of the page in @@ -140,8 +139,8 @@ def get_wiki_params(sxng_locale, eng_traits): (region) higher than a language (compare :py:obj:`wiki_lc_locale_variants`). """ - eng_tag = eng_traits.get_region(sxng_locale, eng_traits.get_language(sxng_locale, 'en')) - wiki_netloc = eng_traits.custom['wiki_netloc'].get(eng_tag, 'en.wikipedia.org') + eng_tag = eng_traits.get_region(sxng_locale, eng_traits.get_language(sxng_locale, "en")) + wiki_netloc = eng_traits.custom["wiki_netloc"].get(eng_tag, "en.wikipedia.org") return eng_tag, wiki_netloc @@ -150,12 +149,12 @@ def request(query, params): if query.islower(): query = query.title() - _eng_tag, wiki_netloc = get_wiki_params(params['searxng_locale'], traits) + _eng_tag, wiki_netloc = get_wiki_params(params["searxng_locale"], traits) title = urllib.parse.quote(query) - params['url'] = rest_v1_summary_url.format(wiki_netloc=wiki_netloc, title=title) + params["url"] = rest_v1_summary_url.format(wiki_netloc=wiki_netloc, title=title) - params['raise_for_httperror'] = False - params['soft_max_redirects'] = 2 + params["raise_for_httperror"] = False + params["soft_max_redirects"] = 2 return params @@ -173,31 +172,37 @@ def response(resp): pass else: if ( - api_result['type'] == 'https://mediawiki.org/wiki/HyperSwitch/errors/bad_request' - and api_result['detail'] == 'title-invalid-characters' + api_result["type"] == "https://mediawiki.org/wiki/HyperSwitch/errors/bad_request" + and api_result["detail"] == "title-invalid-characters" ): return [] _network.raise_for_httperror(resp) api_result = resp.json() - title = utils.html_to_text(api_result.get('titles', {}).get('display') or api_result.get('title')) - wikipedia_link = api_result['content_urls']['desktop']['page'] + title = utils.html_to_text(api_result.get("titles", {}).get("display") or api_result.get("title")) + wikipedia_link = api_result["content_urls"]["desktop"]["page"] - if "list" in display_type or api_result.get('type') != 'standard': + if "list" in display_type or api_result.get("type") != "standard": # show item in the result list if 'list' is in the display options or it # is a item that can't be displayed in a infobox. - results.append({'url': wikipedia_link, 'title': title, 'content': api_result.get('description', '')}) + results.append( + { + "url": wikipedia_link, + "title": title, + "content": api_result.get("description", ""), + } + ) if "infobox" in display_type: - if api_result.get('type') == 'standard': + if api_result.get("type") == "standard": results.append( { - 'infobox': title, - 'id': wikipedia_link, - 'content': api_result.get('extract', ''), - 'img_src': api_result.get('thumbnail', {}).get('source'), - 'urls': [{'title': 'Wikipedia', 'url': wikipedia_link}], + "infobox": title, + "id": wikipedia_link, + "content": api_result.get("extract", ""), + "img_src": api_result.get("thumbnail", {}).get("source"), + "urls": [{"title": "Wikipedia", "url": wikipedia_link}], } ) @@ -212,28 +217,28 @@ def response(resp): lang_map = locales.LOCALE_BEST_MATCH.copy() lang_map.update( { - 'be-tarask': 'bel', - 'ak': 'aka', - 'als': 'gsw', - 'bat-smg': 'sgs', - 'cbk-zam': 'cbk', - 'fiu-vro': 'vro', - 'map-bms': 'map', - 'no': 'nb-NO', - 'nrm': 'nrf', - 'roa-rup': 'rup', - 'nds-nl': 'nds', + "be-tarask": "bel", + "ak": "aka", + "als": "gsw", + "bat-smg": "sgs", + "cbk-zam": "cbk", + "fiu-vro": "vro", + "map-bms": "map", + "no": "nb-NO", + "nrm": "nrf", + "roa-rup": "rup", + "nds-nl": "nds", #'simple: – invented code used for the Simple English Wikipedia (not the official IETF code en-simple) - 'zh-min-nan': 'nan', - 'zh-yue': 'yue', - 'an': 'arg', + "zh-min-nan": "nan", + "zh-yue": "yue", + "an": "arg", } ) def fetch_traits(engine_traits: EngineTraits): fetch_wikimedia_traits(engine_traits) - print("WIKIPEDIA_LANGUAGES: %s" % len(engine_traits.custom['WIKIPEDIA_LANGUAGES'])) + print("WIKIPEDIA_LANGUAGES: %s" % len(engine_traits.custom["WIKIPEDIA_LANGUAGES"])) def fetch_wikimedia_traits(engine_traits: EngineTraits): @@ -257,9 +262,13 @@ def fetch_wikimedia_traits(engine_traits: EngineTraits): "zh-classical": "zh-classical.wikipedia.org" } """ - # pylint: disable=too-many-branches - engine_traits.custom['wiki_netloc'] = {} - engine_traits.custom['WIKIPEDIA_LANGUAGES'] = [] + # pylint: disable=import-outside-toplevel, too-many-branches + + from searx.network import get # see https://github.com/searxng/searxng/issues/762 + from searx.utils import searxng_useragent + + engine_traits.custom["wiki_netloc"] = {} + engine_traits.custom["WIKIPEDIA_LANGUAGES"] = [] # insert alias to map from a script or region to a wikipedia variant @@ -270,35 +279,34 @@ def fetch_wikimedia_traits(engine_traits: EngineTraits): for sxng_tag in sxng_tag_list: engine_traits.regions[sxng_tag] = eng_tag - resp = _network.get(list_of_wikipedias) + headers = {"Accept": "*/*", "User-Agent": searxng_useragent()} + resp = get(list_of_wikipedias, timeout=5, headers=headers) if not resp.ok: - print("ERROR: response from Wikipedia is not OK.") + raise RuntimeError("Response from Wikipedia is not OK.") dom = html.fromstring(resp.text) for row in dom.xpath('//table[contains(@class,"sortable")]//tbody/tr'): - - cols = row.xpath('./td') + cols = row.xpath("./td") if not cols: continue cols = [c.text_content().strip() for c in cols] - depth = float(cols[11].replace('-', '0').replace(',', '')) - articles = int(cols[4].replace(',', '').replace(',', '')) + depth = float(cols[11].replace("-", "0").replace(",", "")) + articles = int(cols[4].replace(",", "").replace(",", "")) eng_tag = cols[3] - wiki_url = row.xpath('./td[4]/a/@href')[0] + wiki_url = row.xpath("./td[4]/a/@href")[0] wiki_url = urllib.parse.urlparse(wiki_url) try: - sxng_tag = locales.language_tag(babel.Locale.parse(lang_map.get(eng_tag, eng_tag), sep='-')) + sxng_tag = locales.language_tag(babel.Locale.parse(lang_map.get(eng_tag, eng_tag), sep="-")) except babel.UnknownLocaleError: # print("ERROR: %s [%s] is unknown by babel" % (cols[0], eng_tag)) continue finally: - engine_traits.custom['WIKIPEDIA_LANGUAGES'].append(eng_tag) + engine_traits.custom["WIKIPEDIA_LANGUAGES"].append(eng_tag) if sxng_tag not in locales.LOCALE_NAMES: - if articles < 10000: # exclude languages with too few articles continue @@ -315,6 +323,6 @@ def fetch_wikimedia_traits(engine_traits: EngineTraits): continue engine_traits.languages[sxng_tag] = eng_tag - engine_traits.custom['wiki_netloc'][eng_tag] = wiki_url.netloc + engine_traits.custom["wiki_netloc"][eng_tag] = wiki_url.netloc - engine_traits.custom['WIKIPEDIA_LANGUAGES'].sort() + engine_traits.custom["WIKIPEDIA_LANGUAGES"].sort() diff --git a/searx/engines/zlibrary.py b/searx/engines/zlibrary.py index 233f776f3..9f0f7d88e 100644 --- a/searx/engines/zlibrary.py +++ b/searx/engines/zlibrary.py @@ -36,14 +36,15 @@ Implementations import typing as t from datetime import datetime from urllib.parse import quote -from lxml import html -from flask_babel import gettext # pyright: ignore[reportUnknownVariableType] -from searx.utils import extract_text, eval_xpath, eval_xpath_list, ElementType -from searx.enginelib.traits import EngineTraits +from flask_babel import gettext # pyright: ignore[reportUnknownVariableType] +from lxml import html + from searx.data import ENGINE_TRAITS +from searx.enginelib.traits import EngineTraits from searx.exceptions import SearxException from searx.result_types import EngineResults +from searx.utils import ElementType, eval_xpath, eval_xpath_list, extract_text if t.TYPE_CHECKING: from searx.extended_types import SXNG_Response @@ -129,7 +130,7 @@ def response(resp: "SXNG_Response") -> EngineResults: def domain_is_seized(dom: ElementType): - return bool(dom.xpath('//title') and "seized" in dom.xpath('//title')[0].text.lower()) + return bool(dom.xpath("//title") and "seized" in dom.xpath("//title")[0].text.lower()) def _text(item: ElementType, selector: str) -> str | None: @@ -145,19 +146,28 @@ def _parse_result(item: ElementType) -> dict[str, t.Any]: "title": _text(item, './/*[@itemprop="name"]'), "authors": [extract_text(author) for author in author_elements], "publisher": _text(item, './/a[@title="Publisher"]'), - "type": _text(item, './/div[contains(@class, "property__file")]//div[contains(@class, "property_value")]'), + "type": _text( + item, + './/div[contains(@class, "property__file")]//div[contains(@class, "property_value")]', + ), } thumbnail = _text(item, './/img[contains(@class, "cover")]/@data-src') - if thumbnail and not thumbnail.startswith('/'): + if thumbnail and not thumbnail.startswith("/"): result["thumbnail"] = thumbnail - year = _text(item, './/div[contains(@class, "property_year")]//div[contains(@class, "property_value")]') + year = _text( + item, + './/div[contains(@class, "property_year")]//div[contains(@class, "property_value")]', + ) if year: - result["publishedDate"] = datetime.strptime(year, '%Y') + result["publishedDate"] = datetime.strptime(year, "%Y") content: list[str] = [] - language = _text(item, './/div[contains(@class, "property_language")]//div[contains(@class, "property_value")]') + language = _text( + item, + './/div[contains(@class, "property_language")]//div[contains(@class, "property_value")]', + ) if language: content.append(f"{i18n_language}: {language.capitalize()}") book_rating = _text(item, './/span[contains(@class, "book-rating-interest-score")]') @@ -177,33 +187,18 @@ def fetch_traits(engine_traits: EngineTraits) -> None: import babel import babel.core - import httpx - from searx.network import get # see https://github.com/searxng/searxng/issues/762 from searx.locales import language_tag + from searx.network import get # see https://github.com/searxng/searxng/issues/762 - def _use_old_values(): - # don't change anything, re-use the existing values - engine_traits.all_locale = ENGINE_TRAITS["z-library"]["all_locale"] - engine_traits.custom = ENGINE_TRAITS["z-library"]["custom"] - engine_traits.languages = ENGINE_TRAITS["z-library"]["languages"] - - try: - resp = get(base_url, verify=False) - except (SearxException, httpx.HTTPError) as exc: - print(f"ERROR: zlibrary domain '{base_url}' is seized?") - print(f" --> {exc}") - _use_old_values() - return - + resp = get(base_url, timeout=5, verify=False) if not resp.ok: - raise RuntimeError("Response from zlibrary's search page is not OK.") + raise RuntimeError("Response from zlibrary is not OK.") + dom = html.fromstring(resp.text) if domain_is_seized(dom): - print(f"ERROR: zlibrary domain is seized: {base_url}") - _use_old_values() - return + raise RuntimeError(f"Response from zlibrary is not OK. ({base_url} seized)") engine_traits.all_locale = "" engine_traits.custom["ext"] = []