diff --git a/.gitignore b/.gitignore index 67ac389..f4c2ae9 100644 --- a/.gitignore +++ b/.gitignore @@ -5,3 +5,4 @@ venv/ .env .idea/ .vscode/ +.agent/ \ No newline at end of file diff --git a/README.md b/README.md index b7b2ac4..ca29f78 100644 --- a/README.md +++ b/README.md @@ -9,6 +9,10 @@ Features: - clickable inline citations - interactive mode to continue summary, ask follow ups, copy, or regenerate - simple response mode with no extras +- internally called low-latency RAG for follow ups (bypasses http loopback) +- native network integration via `searx.network` (respects proxy/SSL settings) +- stateless conversation persistence/sharability via URL +- provider detection based on URL ## Installation @@ -23,26 +27,32 @@ plugins: ## Configuration -Set the following environment variables: +Configure via the environment variables: ### Required - `LLM_PROVIDER`: openrouter, openai, ollama, localai, lmstudio, gemini, azure, or huggingface -- `LLM_KEY`: Your API key +- `LLM_KEY`: Provider API key (optional for local providers: ollama, localai, lmstudio) ### Optional -- `LLM_MODEL`: Model identifier. Defaults vary by provider. -- `LLM_URL`: Custom endpoint URL. Overrides provider preset. -- `LLM_MAX_TOKENS`: Defaults to `500`. -- `LLM_TEMPERATURE`: Defaults to `0.2`. -- `LLM_CONTEXT_COUNT`: Search results to include. Defaults to `5`. -- `LLM_TABS`: Comma-separated tab whitelist. Defaults to general,science,it,news. -- `LLM_STYLE`: UI mode. Set to "interactive" for interactive controls (copy, regenerate, follow up, continue). Defaults to "simple". +- `LLM_MODEL`: Model identifier. Defaults vary. Recommended: 10-30B dense or 5-15B MoE activated. +- `LLM_URL`: Overrides endpoint URL for any provider preset. +- `LLM_MAX_TOKENS`: Default `500`. +- `LLM_TEMPERATURE`: Default `0.2`. +- `LLM_CONTEXT_DEEP_COUNT`: results as context with full snippets. Default `5`. +- `LLM_CONTEXT_SHALLOW_COUNT`: Results with headlines only (additional breadth). Default `15`. +- `LLM_TABS`: Tab whitelist, comma delimiter. Default `general,science,it,news`. +- `LLM_INTERACTIVE`: UI mode. Default is `true` (interactive: copy, regenerate, follow up). Set to `false` for simple response only mode. ## How It Works - -After search completes, the plugin extracts top search results as context. A client-side script calls the stream endpoint with a signed token. The LLM response streams back token by token. +1 user initial search +2 results return server side +3 `post_search` plugin hook entry +4 token optimized context extracted +5 inject the ui/logic "shell" into standard results answer object +6 client side script calls custom endpoint with signed token +7 LLM response streams back token by token ## Examples @@ -92,7 +102,7 @@ LLM_MODEL=meta-llama/Meta-Llama-3-8B-Instruct ## Development ```bash -pip install flask flask-babel python-dotenv -python demo.py # Interactive test server on localhost:5000 -python test.py # One-shot test suite +pip install flask flask-babel +python tests/demo.py # Interactive demo at localhost:5000 +python tests/test.py # One-shot test suite ``` diff --git a/ai_answers.py b/ai_answers.py index 2200da6..2badbe7 100644 --- a/ai_answers.py +++ b/ai_answers.py @@ -1,6 +1,7 @@ -import json, http.client, ssl, os, logging, base64, time, hashlib +import json, os, logging, base64, time, hashlib, codecs, re from urllib.parse import urlparse -from flask import Response, request, abort +from searx import network +from flask import Response, request, abort, jsonify from searx.plugins import Plugin, PluginInfo from searx.result_types import EngineResults from flask_babel import gettext @@ -8,8 +9,12 @@ from markupsafe import Markup logger = logging.getLogger(__name__) -TOKEN_EXPIRY_SEC = 86400 -CONNECTION_TIMEOUT_SEC = 30 +TOKEN_EXPIRY_SEC = 3600 + + + +PLUGIN_NAME = "AI Answers" +DEFAULT_TABS = "general,science,it,news" PROVIDER_PRESETS = { 'openai': {'url': 'https://api.openai.com/v1/chat/completions', 'model': 'gpt-4o-mini'}, @@ -17,349 +22,14 @@ PROVIDER_PRESETS = { 'ollama': {'url': 'http://localhost:11434/v1/chat/completions', 'model': 'llama3.2'}, 'localai': {'url': 'http://localhost:8080/v1/chat/completions', 'model': 'gpt-4'}, 'lmstudio': {'url': 'http://localhost:1234/v1/chat/completions', 'model': 'local-model'}, - 'gemini': {'url': 'https://generativelanguage.googleapis.com', 'model': 'gemma-3-27b-it'}, + 'gemini': {'url': 'https://generativelanguage.googleapis.com/v1beta/models/{model}:streamGenerateContent', 'model': 'gemma-3-27b-it'}, 'azure': {'url': None, 'model': 'azure-deployment'}, 'huggingface': {'url': 'https://api-inference.huggingface.co/models/{model}/v1/chat/completions', 'model': 'meta-llama/Meta-Llama-3-8B-Instruct'} } -import typing -if typing.TYPE_CHECKING: - from searx.search import SearchWithPlugins - from searx.extended_types import SXNG_Request - from . import PluginCfg +# UI assets (inlined for single-file install) -class SXNGPlugin(Plugin): - """ - AI Answers Plugin for SearXNG. - Injects a real-time streaming answer box synthesized from search results using LLM providers. - Supports OpenAI, OpenRouter, Gemini, Ollama, LocalAI, Azure, and Hugging Face. - """ - id = "ai_answers" - - def __init__(self, plg_cfg: "PluginCfg"): - super().__init__(plg_cfg) - self.info = PluginInfo( - id=self.id, - name=gettext("AI Answers Plugin"), - description=gettext("Live AI search answers using LLM providers."), - preference_section="general", - ) - self._load_config() - - if self.api_key: - self.secret = os.getenv('SXNG_LLM_SECRET') or hashlib.sha256(self.api_key.encode()).hexdigest() - else: - self.secret = os.getenv('SXNG_LLM_SECRET', '') - logger.warning("AI Answers: No API key configured, plugin inactive") - - def _load_config(self): - self.style = os.getenv('LLM_STYLE', 'interactive') - raw_provider = os.getenv('LLM_PROVIDER', '').lower().strip() - - raw_url = os.getenv('LLM_URL', '').strip() - if not raw_provider and raw_url: - url_lower = raw_url.lower() - if 'openai.com' in url_lower: - raw_provider = 'openai' - elif 'openrouter.ai' in url_lower: - raw_provider = 'openrouter' - elif ':11434' in url_lower: - raw_provider = 'ollama' - elif 'generativelanguage.googleapis.com' in url_lower: - raw_provider = 'gemini' - - if not raw_provider: - logger.debug("AI Answers: No provider configured, plugin inactive") - self.provider = '' - self.model = '' - self.is_gemini = False - self.api_key = '' - return - - self.provider = raw_provider if raw_provider in PROVIDER_PRESETS else 'openai' - self.is_gemini = (self.provider == 'gemini') - preset = PROVIDER_PRESETS[self.provider] - - self.api_key = os.getenv('LLM_KEY', '') - if not self.api_key and self.provider in ('ollama', 'localai', 'lmstudio'): - self.api_key = 'none' - self.api_key = self.api_key.strip() - - self.model = os.getenv('LLM_MODEL', preset['model']).strip() - - try: - self.max_tokens = int(os.getenv('LLM_MAX_TOKENS', 500)) - except ValueError: - self.max_tokens = 500 - try: - self.temperature = float(os.getenv('LLM_TEMPERATURE', 0.2)) - except ValueError: - self.temperature = 0.2 - try: - self.context_count = max(0, int(os.getenv('LLM_CONTEXT_COUNT', 5))) - except ValueError: - self.context_count = 5 - - self.allowed_tabs = set(t.strip() for t in os.getenv('LLM_TABS', 'general,science,it,news').split(',')) - - preset_url = preset['url'] - if preset_url and '{model}' in preset_url: - preset_url = preset_url.format(model=self.model) - self._parse_url(preset_url) - - logger.info(f"AI Answers: {self.provider} @ {self.endpoint_host}") - - def _parse_url(self, default_url): - raw_url = os.getenv('LLM_URL', '').strip() or default_url - if not raw_url.startswith(('http://', 'https://')): - raw_url = f"https://{raw_url}" - - parsed = urlparse(raw_url) - self.endpoint_url = raw_url - self.endpoint_host = parsed.hostname or 'localhost' - self.endpoint_port = parsed.port - self.endpoint_path = parsed.path or '/v1/chat/completions' - if parsed.query: - self.endpoint_path += f"?{parsed.query}" - self.endpoint_ssl = (parsed.scheme == 'https') - - if self.is_gemini: - return - - is_local = self.endpoint_host in ('localhost', '127.0.0.1') or self.endpoint_host.startswith('127.') - if not self.endpoint_ssl and not is_local: - logger.warning(f"AI Answers: HTTP on non-localhost ({self.endpoint_host}). Credentials may be exposed.") - - def _get_connection(self): - proxy_url = os.getenv('HTTPS_PROXY' if self.endpoint_ssl else 'HTTP_PROXY') or os.getenv('https_proxy' if self.endpoint_ssl else 'http_proxy') - - target_host = self.endpoint_host - target_port = self.endpoint_port - target_str = f"{target_host}:{target_port}" if target_port else target_host - - if proxy_url: - p = urlparse(proxy_url) - p_host = p.hostname - p_port = p.port or 8080 - - if p.scheme == 'https': - conn = http.client.HTTPSConnection(p_host, p_port, timeout=CONNECTION_TIMEOUT_SEC, context=ssl.create_default_context()) - else: - conn = http.client.HTTPConnection(p_host, p_port, timeout=CONNECTION_TIMEOUT_SEC) - - conn.set_tunnel(target_host, target_port) - return conn - - # Direct Connection - if self.endpoint_ssl: - return http.client.HTTPSConnection(target_str, timeout=CONNECTION_TIMEOUT_SEC, context=ssl.create_default_context()) - return http.client.HTTPConnection(target_str, timeout=CONNECTION_TIMEOUT_SEC) - - def init(self, app): - @app.route('/ai-stream', methods=['POST']) - def handle_ai_stream(): - data = request.json or {} - token = data.get('tk', '') - q = data.get('q', '') - lang = data.get('lang', 'all') - - try: - ts, sig = token.split('.', 1) - expected = hashlib.sha256(f"{ts}{self.secret}".encode()).hexdigest() - if sig != expected or (time.time() - float(ts)) > TOKEN_EXPIRY_SEC: - abort(403) - except (ValueError, KeyError, AttributeError): - abort(403) - - context_text = data.get('context', '') - prev_answer = (data.get('prev_answer') or '')[-4000:] - - if not self.api_key: - logger.warning(f"AI Answers: request rejected. Key loaded: {bool(self.api_key)}, Query: {bool(q)}") - return Response("Missing API key or query", status=400) - - today = time.strftime("%Y-%m-%d") - target_words = int(self.max_tokens * 0.2) - lang_instruction = f" Respond in {lang}." if lang not in ('all', 'auto') else "" - - SYSTEM = f"You are a search synthesis engine. Direct, grounded, citation-accurate. Today is {today}.{lang_instruction}" - - CORE_RULES = [ - "DENSITY 4/5: Expert-briefing level. No filler, no transitions. Every sentence = new information.", - f"BREVITY: {target_words} words max. Complete, not verbose.", - "CITATIONS: Cite [n] only for specific facts from sources. Max 3 total. Sentence-end only. Never cite common knowledge.", - "NO HEDGE: State answers confidently. Note uncertainty only if critical.", - ] - - if q == "Continue": - task = "CONTINUE: Pick up exactly where previous answer stopped. No repetition. Seamless flow." - elif prev_answer: - task = "FOLLOW-UP: Address the new question using prior context. Prioritize the new query." - else: - task = "ANSWER FIRST: Lead with the direct answer. No preamble, no context-setting." - - grounding = "GROUNDING: Trust sources for current events. Use knowledge for fundamentals." if context_text else "GROUNDING: No sources available. Use knowledge and note 'based on general knowledge'." - history_rule = "HISTORY: Refer to prior exchange for context. Do not repeat." if prev_answer else None - - instructions = [task] + CORE_RULES + [grounding] - if history_rule: - instructions.append(history_rule) - - numbered_instructions = "\n".join(f"{i+1}. {r}" for i, r in enumerate(instructions)) - prompt = f"""{SYSTEM} - - -{context_text or 'None.'} - - - -{prev_answer or 'None.'} - - -{q} - - -{numbered_instructions} - - -""" - - def stream_gemini(): - path = f"/v1/models/{self.model}:streamGenerateContent" - conn = None - try: - conn = self._get_connection() - - payload = {"contents": [{"parts": [{"text": prompt}]}], "generationConfig": {"maxOutputTokens": self.max_tokens, "temperature": self.temperature, "stopSequences": [""]}} - headers = {"Content-Type": "application/json", "x-goog-api-key": self.api_key} - conn.request("POST", path, body=json.dumps(payload), headers=headers) - res = conn.getresponse() - if res.status != 200: - logger.error(f"Gemini API {res.status}: {res.read().decode('utf-8')}") - return - - decoder = json.JSONDecoder() - buffer = "" - while True: - chunk = res.read(128) - if not chunk: break - buffer += chunk.decode('utf-8') - while buffer: - buffer = buffer.lstrip() - if not buffer: break - try: - obj, idx = decoder.raw_decode(buffer) - candidates = obj.get('candidates', []) - if candidates: - content = candidates[0].get('content', {}) - parts = content.get('parts', []) - if parts: - text = parts[0].get('text', '') - if text: yield text - buffer = buffer[idx:] - except json.JSONDecodeError: break - except Exception as e: - logger.error(f"Gemini stream error: {e}") - finally: - if conn: conn.close() - - def stream_openai_compatible(): - conn = None - try: - conn = self._get_connection() - - payload = { - "model": self.model, - "messages": [{"role": "user", "content": prompt}], - "stream": True, - "max_tokens": self.max_tokens, - "temperature": self.temperature, - "stop": [""] - } - headers = { - "Content-Type": "application/json", - "HTTP-Referer": "https://github.com/searxng/searxng", - "X-Title": "SearXNG" - } - if self.provider == 'azure': - headers['api-key'] = self.api_key - else: - headers['Authorization'] = f"Bearer {self.api_key}" - - conn.request("POST", self.endpoint_path, body=json.dumps(payload), headers=headers) - res = conn.getresponse() - if res.status != 200: - logger.error(f"{self.provider} API {res.status}: {res.read().decode('utf-8')}") - return - - decoder = json.JSONDecoder() - buffer = b"" - while True: - chunk = res.read(128) - if not chunk: break - buffer += chunk - while b"\n" in buffer: - line_bytes, buffer = buffer.split(b"\n", 1) - line = line_bytes.decode('utf-8', errors='replace') - if line.startswith("data: "): - data_str = line[6:].strip() - if data_str == "[DONE]": return - try: - obj, _ = decoder.raw_decode(data_str) - content = obj.get("choices", [{}])[0].get("delta", {}).get("content", "") - if content: yield content - except json.JSONDecodeError: - pass - except Exception as e: - logger.error(f"{self.provider} stream error: {e}") - finally: - if conn: conn.close() - - generator = stream_gemini if self.is_gemini else stream_openai_compatible - return Response(generator(), mimetype='text/event-stream', headers={ - 'X-Accel-Buffering': 'no', - 'Cache-Control': 'no-cache, no-store', - 'Connection': 'keep-alive', - 'Content-Encoding': 'identity' - }) - return True - - def post_search(self, request: "SXNG_Request", search: "SearchWithPlugins") -> EngineResults: - results = EngineResults() - try: - current_tabs = set(search.search_query.categories) - if not current_tabs: current_tabs = {'general'} - - if not self.active or not self.api_key or search.search_query.pageno > 1 or not self.allowed_tabs.intersection(current_tabs): - return results - - raw_results = search.result_container.get_ordered_results() - context_list = [] - for i, r in enumerate(raw_results[:self.context_count]): - domain = urlparse(r.get('url', '')).netloc - date = r.get('publishedDate') - date_str = f" ({date})" if date else "" - title = r.get('title') or "" - context_list.append(f"[{i+1}] {domain}{date_str}: {title}: {str(r.get('content', ''))[:500]}") - - context_str = "\n".join(context_list) - - - ts = str(int(time.time())) - q_clean = search.search_query.query.strip() - lang = search.search_query.lang - sig = hashlib.sha256(f"{ts}{self.secret}".encode()).hexdigest() - tk = f"{ts}.{sig}" - - b64_context = base64.b64encode(context_str.encode('utf-8')).decode('utf-8') - js_q = json.dumps(q_clean) - js_lang = json.dumps(lang) - js_urls = json.dumps([r.get('url') for r in raw_results[:self.context_count]]) - - is_interactive = (self.style == 'interactive') - - # Conditional CSS for interactive mode - interactive_css = ''' +INTERACTIVE_CSS = ''' @keyframes sxng-fade-in-up { 0% { opacity: 0; transform: translateY(10px); } 100% { opacity: 1; transform: translateY(0); } @@ -428,36 +98,46 @@ class SXNGPlugin(Plugin): display: block; width: fit-content; max-width: 80%; - margin: 1rem 0 1rem auto; - padding: 0.5rem 0.8rem; - background: var(--color-base-background-hover, rgba(0,0,0,0.05)); - border-radius: 12px 12px 0 12px; - color: var(--color-base-font, #333); - font-size: 0.9rem; - line-height: 1.5; + margin: 0.75rem 0 0.75rem auto; + padding: 0.25rem 0.6rem 0.25rem 0; + border-right: 2px solid var(--color-result-link, #5e81ac); + text-align: right; + font-size: 0.85rem; + line-height: 1.4; + opacity: 0.55; animation: sxng-fade-in-up 0.3s ease-out forwards; - border: 1px solid var(--color-base-border, rgba(0,0,0,0.1)); } .sxng-input-submit { + all: unset; position: absolute; right: 0; top: 50%; transform: translateY(-50%); - background: none; - border: none; - padding: 8px; + display: inline-flex; + align-items: center; + justify-content: center; + width: 32px; + height: 32px; + padding: 0; + background: transparent !important; + border: none !important; + border-radius: 6px; color: var(--color-base-font, #333); cursor: pointer; opacity: 0.3; transition: all 0.2s ease; } .sxng-input-wrapper:focus-within .sxng-input-submit, - .sxng-input-submit:hover { opacity: 1; color: var(--color-result-link, #5e81ac); } + .sxng-input-submit:hover { + opacity: 1; + color: var(--color-result-link, #5e81ac); + background: var(--color-base-background-hover, rgba(0,0,0,0.05)) !important; + } .sxng-input-submit svg { width: 18px; height: 18px; fill: currentColor; } -''' if is_interactive else '' + .sxng-input-submit svg { width: 18px; height: 18px; fill: currentColor; } +''' - # Conditional HTML for interactive footer - interactive_html = ''' +INTERACTIVE_HTML = ''' -''' if is_interactive else '' +''' - # Conditional JS for interactive handlers - interactive_js_init = ''' +CITATION_HELPER_JS = ''' + function renderCitations(text, urls) { + const fragment = document.createDocumentFragment(); + const re = /\[(\d{1,2}(?:\s*,\s*\d{1,2})*)\]/g; + let lastIdx = 0; + const matches = [...text.matchAll(re)]; + + matches.forEach(match => { + if (match.index > lastIdx) { + const s = document.createElement('span'); + s.className = 'sxng-chunk'; + // Preserve whitespace by not trimming + s.textContent = text.substring(lastIdx, match.index); + fragment.appendChild(s); + } + match[1].split(/\s*,\s*/).forEach(n => { + const idx = parseInt(n.trim()); + if (idx >= 1 && idx <= urls.length) { + const url = urls[idx-1]; + if (url) { + const a = document.createElement('a'); + a.href = url; + a.target = '_blank'; + a.style.cssText = 'text-decoration:none;color:var(--color-result-link);font-weight:bold;'; + a.textContent = `[${n.trim()}]`; + a.className = 'sxng-chunk'; + fragment.appendChild(a); + } else { + const s = document.createElement('span'); + s.className = 'sxng-chunk'; + s.textContent = `[${n.trim()}]`; + fragment.appendChild(s); + } + } else { + const s = document.createElement('span'); + s.className = 'sxng-chunk'; + s.textContent = `[${n.trim()}]`; + fragment.appendChild(s); + } + }); + lastIdx = match.index + match[0].length; + }); + + if (lastIdx < text.length) { + const s = document.createElement('span'); + s.className = 'sxng-chunk'; + // Preserve whitespace by not trimming + s.textContent = text.substring(lastIdx); + fragment.appendChild(s); + } + return fragment; + } +''' + +INTERACTIVE_JS = ''' const footer = document.getElementById('sxng-footer'); const input = document.getElementById('sxng-action-input'); + // Inherited from outer scope: box, data, conversation + // Theme detection: inherit host accent color + if (window.getComputedStyle && box) { + try { + const docStyles = getComputedStyle(document.documentElement); + let accent = docStyles.getPropertyValue('--color-result-link').trim(); + if (!accent) { + const a = document.createElement('a'); + document.body.appendChild(a); + accent = getComputedStyle(a).color; + document.body.removeChild(a); + } + if (accent) { + box.style.setProperty('--color-result-link', accent); + box.style.setProperty('--sxng-ai-accent', accent); + } + } catch(e) {} + } + + // Persist conversation state to URL hash (stateless) + const updateState = () => { + try { + const state = { + t: conversation.turns.map(t => ({ + r: t.role === 'user' ? 'u' : 'a', + c: t.content.replace(/\s+/g, ' ').trim() + })), + u: urls + }; + const b64 = btoa(encodeURIComponent(JSON.stringify(state)).replace(/%([0-9A-F]{2})/g, (m,p)=>String.fromCharCode('0x'+p))); + history.replaceState(null, null, '#ai=' + b64); + } catch(e) {} + }; + + if (location.hash.includes('ai=')) { + try { + const b64 = location.hash.split('ai=')[1]; + const json = decodeURIComponent(atob(b64).split('').map(c => '%' + ('00' + c.charCodeAt(0).toString(16)).slice(-2)).join('')); + const state = JSON.parse(json); + if (state.t && state.t.length > 0) { + // Restore URLs for citation indexing + if (state.u && Array.isArray(state.u)) { + urls = state.u; + } + + conversation.turns = state.t.map(t => ({ + role: t.r === 'u' ? 'user' : 'assistant', + content: t.c.trim(), + ts: 0 + })); + + // Helper function to inject citations into text + const injectCitations = (text) => { + return renderCitations(text, urls); + }; + + data.innerHTML = ''; + conversation.turns.forEach((turn, i) => { + if (turn.role === 'user') { + if (turn.content !== conversation.originalQuery) { + const u = document.createElement('span'); + u.className = 'sxng-user-msg'; + u.textContent = turn.content; + data.appendChild(u); + const clr = document.createElement('div'); + clr.style.clear = 'both'; + data.appendChild(clr); + } + } else { + // Inject citations for assistant responses + data.appendChild(injectCitations(turn.content)); + } + }); + box.style.display = 'block'; + if(wrapper) wrapper.style.display = ''; + if(footer && is_interactive) footer.style.display = 'flex'; + restored = true; + } + } catch(e) { console.warn('Restore failed', e); } + } document.getElementById('btn-copy').onclick = async (e) => { const btn = e.currentTarget; const originalContent = btn.innerHTML; @@ -498,18 +311,17 @@ class SXNGPlugin(Plugin): startStream(); }; - const handleAction = (e) => { + const handleAction = async (e) => { if (e) e.preventDefault(); const val = input.value.trim(); - const currentText = Array.from(data.childNodes) - .filter(n => n.nodeType === 3 || n.tagName === 'SPAN') - .map(n => { - if (n.classList && n.classList.contains('sxng-user-msg')) { - return '\\n\\nQ: ' + n.textContent + '\\nA: '; - } - return n.textContent; - }) - .join(''); + + conversation.turns.push({role: 'user', content: val, ts: Date.now()}); + updateState(); + + const currentText = conversation.turns.slice(0, -1).slice(-6) + .map(t => (t.role === 'user' ? 'Q' : 'A') + ': ' + t.content) + .join('\\n\\n'); + input.value = ''; input.blur(); footer.style.display = 'none'; @@ -521,10 +333,33 @@ class SXNGPlugin(Plugin): userMsg.className = 'sxng-user-msg'; userMsg.textContent = val; data.appendChild(userMsg); + const clr = document.createElement('div'); + clr.style.clear = 'both'; + data.appendChild(clr); + const newCursor = document.createElement('span'); newCursor.className = 'sxng-cursor'; data.appendChild(newCursor); - startStream(val, currentText); + + const synthesized = synthesizeQuery(q_init, val); + let auxContext = null; + try { + const auxData = await fetch('/ai-auxiliary-search', { + method: 'POST', + headers: {'Content-Type': 'application/json'}, + body: JSON.stringify({query: synthesized, lang: lang_init, offset: urls.length}) + }).then(r => r.json()); + if (auxData.context) { + const originalBackground = conversation.originalContext.substring(0, 1500); + auxContext = `FRESH SOURCES (most relevant):\\n${auxData.context}\\n\\nBACKGROUND (for reference):\\n${originalBackground}`; + if (auxData.new_urls && Array.isArray(auxData.new_urls)) { + urls = urls.concat(auxData.new_urls); + } + } + } catch (err) {} + + await startStream(val, currentText, auxContext); + updateState(); } else { const cursor = data.querySelector('.sxng-cursor'); if (cursor) cursor.remove(); @@ -533,7 +368,8 @@ class SXNGPlugin(Plugin): const newCursor = document.createElement('span'); newCursor.className = 'sxng-cursor'; data.appendChild(newCursor); - startStream("Continue", currentText); + await startStream("Continue", currentText); + updateState(); } }; @@ -543,12 +379,540 @@ class SXNGPlugin(Plugin): input.scrollIntoView({behavior: 'smooth', block: 'center'}); }, 300); }; -''' if is_interactive else '' + + const _origStream = startStream; + startStream = async function(...args) { + if (args.length === 0 && restored) return; + await _origStream.apply(this, args); + if (args.length === 0) updateState(); + }; +''' + + +import typing +if typing.TYPE_CHECKING: + from searx.search import SearchWithPlugins + from searx.extended_types import SXNG_Request + from . import PluginCfg + +class SXNGPlugin(Plugin): + id = "ai_answers" + + def __init__(self, plg_cfg: "PluginCfg"): + super().__init__(plg_cfg) + self.info = PluginInfo( + id=self.id, + name=gettext(f"{PLUGIN_NAME} Plugin"), + description=gettext("Live AI search answers using LLM providers."), + preference_section="general", + ) + self._load_config() + + + + def _load_config(self): + self.interactive = os.getenv('LLM_INTERACTIVE', 'true').lower().strip() in ('true', '1', 'yes', 'on') + raw_provider = os.getenv('LLM_PROVIDER', '').lower().strip() + + raw_url = os.getenv('LLM_URL', '').strip() + if not raw_provider and raw_url: + url_lower = raw_url.lower() + if 'openai.com' in url_lower: + raw_provider = 'openai' + elif 'openrouter.ai' in url_lower: + raw_provider = 'openrouter' + elif ':11434' in url_lower: + raw_provider = 'ollama' + elif 'generativelanguage.googleapis.com' in url_lower: + raw_provider = 'gemini' + elif 'openai.azure.com' in url_lower or '.azure.com' in url_lower: + raw_provider = 'azure' + elif 'huggingface.co' in url_lower: + raw_provider = 'huggingface' + else: + # Unknown URL fallback to OpenAI-compatible + raw_provider = 'openai' + logger.info(f"{PLUGIN_NAME}: Using OpenAI-compatible mode for custom URL") + + if not raw_provider: + self.provider = '' + self.model = '' + self.is_gemini = False + self.api_key = '' + return + + if raw_provider not in PROVIDER_PRESETS: + logger.warning(f"{PLUGIN_NAME}: Unknown provider '{raw_provider}', falling back to 'openai'") + self.provider = raw_provider if raw_provider in PROVIDER_PRESETS else 'openai' + self.is_gemini = (self.provider == 'gemini') + preset = PROVIDER_PRESETS[self.provider] + + self.api_key = os.getenv('LLM_KEY', '') + if not self.api_key and self.provider in ('ollama', 'localai', 'lmstudio'): + self.api_key = 'none' + self.api_key = self.api_key.strip() + + self.model = os.getenv('LLM_MODEL', preset['model']).strip() + + try: + self.max_tokens = int(os.getenv('LLM_MAX_TOKENS', 500)) + except ValueError: + self.max_tokens = 500 + try: + self.temperature = float(os.getenv('LLM_TEMPERATURE', 0.2)) + except ValueError: + self.temperature = 0.2 + try: + self.context_deep_count = max(0, int(os.getenv('LLM_CONTEXT_DEEP_COUNT', 5))) + except ValueError: + self.context_deep_count = 5 + try: + self.context_shallow_count = max(0, int(os.getenv('LLM_CONTEXT_SHALLOW_COUNT', 15))) + except ValueError: + self.context_shallow_count = 15 + + self.allowed_tabs = set(t.strip() for t in os.getenv('LLM_TABS', DEFAULT_TABS).split(',')) + + preset_url = preset['url'] + if preset_url and '{model}' in preset_url: + preset_url = preset_url.format(model=self.model) + + raw_url = os.getenv('LLM_URL', '').strip() or preset_url + if not raw_url.startswith(('http://', 'https://')): + raw_url = f"https://{raw_url}" + self.endpoint_url = raw_url + + if self.api_key: + self.secret = os.getenv('SXNG_LLM_SECRET') or hashlib.sha256(self.api_key.encode()).hexdigest() + else: + self.secret = os.getenv('SXNG_LLM_SECRET', '') + + def _parse_aux_results(self, raw_results, raw_infoboxes, raw_answers): + results = [] + limit = self.context_deep_count + self.context_shallow_count + for r in raw_results[:limit]: + results.append({ + 'title': r.get('title', ''), + 'content': r.get('content', ''), + 'url': r.get('url', ''), + 'publishedDate': r.get('publishedDate', '') + }) + + infoboxes = [] + for ib in raw_infoboxes[:2]: + infoboxes.append({ + 'name': ib.get('infobox', '') or ib.get('title', ''), + 'content': ib.get('content', '')[:400], + 'attributes': ib.get('attributes', [])[:3] + }) + + answers = [a.get('answer', '') for a in raw_answers[:2] + if a.get('answer') and not str(a.get('answer')).startswith('<')] + + return results, infoboxes, answers + + def _format_aux_context_string(self, results, infoboxes, answers, offset): + sections = [] + aux_urls = [] + + kg_lines = [] + for ib in infoboxes: + if ib.get('name'): + content = ib.get('content', '') + kg_lines.append(f"INFOBOX [{ib['name']}]: {content}") + + for a in answers: + if a: + kg_lines.append(f"ANSWER: {a}") + + if kg_lines: + sections.append('KNOWLEDGE GRAPH:\n' + '\n'.join(kg_lines)) + + source_lines = [] + for i, r in enumerate(results): + url = r.get('url', '') + aux_urls.append(url) + # Match JS logic: domain extraction + domain = urlparse(url).netloc.replace('www.', '') + date = f" ({r['publishedDate']})" if r.get('publishedDate') else '' + title = r.get('title', '') + content = r.get('content', '')[:600] + # [index] domain(date): title: content + idx = i + 1 + offset + source_lines.append(f"[{idx}] {domain}{date}: {title}: {content}") + + if source_lines: + sections.append('SOURCES:\n' + '\n'.join(source_lines)) + + return "\n\n".join(sections), aux_urls + + + + def init(self, app): + if not self.provider: + return + + @app.route('/ai-auxiliary-search', methods=['POST']) + def ai_auxiliary_search(): + if not self.api_key: + abort(403) + + data = request.json or {} + query = data.get('query', '').strip() + lang = data.get('lang', 'all') + categories = data.get('categories', 'general') + offset = data.get('offset', 0) + if not query: + return jsonify({'results': []}) + + # Direct kernel access (bypasses HTTP loopback) + try: + from searx.search import SearchWithPlugins + from searx.search.models import SearchQuery + from searx.query import RawTextQuery + from searx.webadapter import get_engineref_from_category_list + + preferences = getattr(request, 'preferences', None) + disabled_engines = preferences.engines.get_disabled() if preferences else [] + rtq = RawTextQuery(query, disabled_engines) + if isinstance(categories, str): + category_list = [c.strip() for c in categories.split(',') if c.strip()] + else: + category_list = categories or ['general'] + + enginerefs = get_engineref_from_category_list(category_list, disabled_engines) + sq = SearchQuery( + query=rtq.getQuery(), + engineref_list=enginerefs, + lang=lang, + pageno=1, + ) + # Empty plugins list prevents recursion + search_obj = SearchWithPlugins(sq, request, user_plugins=[]) + result_container = search_obj.search() + + raw_results = result_container.get_ordered_results() + raw_infoboxes = getattr(result_container, 'infoboxes', []) + raw_answers = getattr(result_container, 'answers', []) + + results, infoboxes, answers = self._parse_aux_results(raw_results, raw_infoboxes, raw_answers) + + context_str, new_urls = self._format_aux_context_string(results, infoboxes, answers, offset) + + return jsonify({ + 'context': context_str, + 'new_urls': new_urls, + 'results': results, + 'infoboxes': infoboxes, + 'answers': answers, + 'query': query + }) + + except ImportError: + try: + search_url = f'{request.url_root}search' + params = { + 'q': query, + 'format': 'json', + 'categories': categories, + 'language': lang + } + + headers = { + 'X-AI-Auxiliary': '1', + 'Accept-Language': request.headers.get('Accept-Language', '') + } + + + res = network.get(search_url, params=params, headers=headers, timeout=2) + search_data = res.json() + + + + results, infoboxes, answers = self._parse_aux_results( + search_data.get('results', []), + search_data.get('infoboxes', []), + search_data.get('answers', []) + ) + + context_str, new_urls = self._format_aux_context_string(results, infoboxes, answers, offset) + + return jsonify({ + 'context': context_str, + 'new_urls': new_urls, + 'results': results, + 'infoboxes': infoboxes, + 'answers': answers, + 'query': query + }) + except Exception as e: + return jsonify({'results': [], 'error': str(e)}) + except Exception as e: + return jsonify({'results': [], 'error': str(e)}) + + @app.route('/ai-stream', methods=['POST']) + def handle_ai_stream(): + data = request.json or {} + if data.get('warmup'): + return Response('', status=204) + + token = data.get('tk', '') + q = data.get('q', '') + lang = data.get('lang', 'all') + + try: + ts, sig = token.rsplit('.', 1) + expected = hashlib.sha256(f"{ts}{self.secret}".encode()).hexdigest() + if sig != expected or (time.time() - float(ts)) > TOKEN_EXPIRY_SEC: + abort(403) + except (ValueError, KeyError, AttributeError): + abort(403) + + context_text = data.get('context', '') + prev_answer = (data.get('prev_answer') or '')[-4000:] + + if not self.api_key: + return Response("Missing API key or query", status=400) + + today = time.strftime("%Y-%m-%d") + target_words = int(self.max_tokens * 0.2) + lang_instruction = f" Respond in {lang}." if lang not in ('all', 'auto') else "" + + SYSTEM = f"You are a search synthesis engine. Direct, grounded, citation-accurate. Today is {today}.{lang_instruction}" + max_source_idx = 0 + if context_text: + indices = re.findall(r'\[(\d+)\]', context_text) + if indices: + max_source_idx = max(map(int, indices)) + + CORE_RULES = [ + "DENSITY 4/5: Expert-briefing level. No filler, no transitions. Every sentence = new information.", + f"BREVITY: {target_words} words max. Complete, not verbose.", + f"CITATIONS: Cite format is [n] for facts from grounding sources", + "NO HEDGE: State answers confidently. Note uncertainty only if critical.", + ] + + if q == "Continue": + task = "CONTINUE: Pick up exactly where previous answer stopped. No repetition. Seamless flow." + elif prev_answer: + task = "FOLLOW-UP: Address the new question using prior context. Prioritize the new query." + else: + task = "ANSWER FIRST: Lead with the direct answer. No preamble, no context-setting." + + grounding = "GROUNDING: KNOWLEDGE GRAPH > DEEP > SHALLOW." if context_text else "GROUNDING: No sources available. Use knowledge and note 'based on general knowledge'." + history_rule = "HISTORY: Refer to prior exchange for context. Ideally, do not repeat any claims." if prev_answer else None + + instructions = [task] + CORE_RULES + [grounding] + if history_rule: + instructions.append(history_rule) + + numbered_instructions = "\n".join(f"{i+1}. {r}" for i, r in enumerate(instructions)) + prompt = f"""{SYSTEM} + + +{context_text or 'None.'} + + + +{prev_answer or 'None.'} + + +{q} + + +{numbered_instructions} + + +""" + + def stream_gemini(): + if '?' in self.endpoint_url: + url = f"{self.endpoint_url}&key={self.api_key}" + else: + url = f"{self.endpoint_url}?key={self.api_key}" + + try: + payload = {"contents": [{"parts": [{"text": prompt}]}], "generationConfig": {"maxOutputTokens": self.max_tokens, "temperature": self.temperature, "stopSequences": [""]}} + headers = {"Content-Type": "application/json"} + res, chunk_gen = network.stream('POST', url, json=payload, headers=headers, timeout=60) + + if res.status_code != 200: + for _ in chunk_gen: pass # Drain to prevent resource leak + logger.error(f"{PLUGIN_NAME}: Gemini API {res.status_code}") + return + + decoder = json.JSONDecoder() + buffer = "" + utf8_decoder = codecs.getincrementaldecoder("utf-8")(errors='replace') + for chunk in chunk_gen: + if not chunk: continue + buffer += utf8_decoder.decode(chunk, final=False) + while buffer: + buffer = buffer.lstrip() + if not buffer: break + try: + obj, idx = decoder.raw_decode(buffer) + items = obj if isinstance(obj, list) else [obj] + for item in items: + candidates = item.get('candidates', []) + if candidates: + content = candidates[0].get('content', {}) + parts = content.get('parts', []) + if parts: + text = parts[0].get('text', '') + if text: yield text + buffer = buffer[idx:] + except json.JSONDecodeError: break + except Exception as e: + logger.error(f"{PLUGIN_NAME}: Gemini stream error: {e}") + + def stream_openai_compatible(): + try: + payload = { + "model": self.model, + "messages": [{"role": "user", "content": prompt}], + "stream": True, + "max_tokens": self.max_tokens, + "temperature": self.temperature, + "stop": [""] + } + headers = { + "Content-Type": "application/json", + "HTTP-Referer": "https://github.com/searxng/searxng", + "X-Title": "SearXNG" + } + if self.provider == 'azure': + headers['api-key'] = self.api_key + else: + headers['Authorization'] = f"Bearer {self.api_key}" + res, chunk_gen = network.stream('POST', self.endpoint_url, json=payload, headers=headers, timeout=60) + + if res.status_code != 200: + for _ in chunk_gen: pass + logger.error(f"{PLUGIN_NAME}: {self.provider} API {res.status_code}") + return + + decoder = json.JSONDecoder() + buffer = b"" + for chunk in chunk_gen: + if not chunk: continue + buffer += chunk + while b"\n" in buffer: + line_bytes, buffer = buffer.split(b"\n", 1) + line = line_bytes.decode('utf-8', errors='replace') + if line.startswith("data: "): + data_str = line[6:].strip() + if data_str == "[DONE]": return + try: + obj, _ = decoder.raw_decode(data_str) + content = obj.get("choices", [{}])[0].get("delta", {}).get("content", "") + if content: yield content + except json.JSONDecodeError: + pass + except Exception as e: + logger.error(f"{PLUGIN_NAME}: {self.provider} stream error: {e}") + + generator = stream_gemini if self.is_gemini else stream_openai_compatible + return Response(generator(), mimetype='text/event-stream', headers={ + 'X-Accel-Buffering': 'no', + 'Cache-Control': 'no-cache, no-store', + 'Connection': 'keep-alive', + 'Content-Encoding': 'identity' + }) + return True + + def _assemble_context(self, search, raw_results) -> str: + """Builds three-tier context string from search results.""" + context_parts = [] + knowledge_graph_lines = [] + for infobox in getattr(search.result_container, 'infoboxes', [])[:3]: + ib_name = infobox.get('infobox', '') or infobox.get('title', '') + ib_content = str(infobox.get('content', '')).replace('\n', ' ').strip() + + if ib_name: + parts = [f"INFOBOX [{ib_name}]:"] + if ib_content: + parts.append(ib_content[:600]) + for attr in infobox.get('attributes', [])[:5]: + attr_label = attr.get('label', '') + attr_value = attr.get('value', '') + if attr_label and attr_value: + parts.append(f" {attr_label}: {attr_value}") + + knowledge_graph_lines.append(" ".join(parts) if len(parts) == 2 else "\n".join(parts)) + + for answer in getattr(search.result_container, 'answers', []): + if hasattr(answer, 'answer'): + ans_text = str(answer.answer).replace('\n', ' ').strip()[:300] + ans_url = getattr(answer, 'url', '') + if ans_text and not ans_text.startswith('<'): + knowledge_graph_lines.append(f"ANSWER: {ans_text}" + (f" [via {ans_url}]" if ans_url else "")) + + if knowledge_graph_lines: + context_parts.append("KNOWLEDGE GRAPH:\n" + "\n".join(knowledge_graph_lines)) + + deep_lines = [] + for i, r in enumerate(raw_results[:self.context_deep_count]): + domain = urlparse(r.get('url', '')).netloc + date = r.get('publishedDate') + date_str = f" ({date})" if date else "" + title = (r.get('title') or "").replace('\n', ' ').strip() + content = str(r.get('content', '')).replace('\n', ' ').strip()[:800] + deep_lines.append(f"[{i+1}] {domain}{date_str}: {title}: {content}") + + if deep_lines: + context_parts.append("DEEP SOURCES:\n" + "\n".join(deep_lines)) + + if self.context_shallow_count > 0: + shallow_lines = [] + start_idx = self.context_deep_count + end_idx = self.context_deep_count + self.context_shallow_count + for i, r in enumerate(raw_results[start_idx:end_idx]): + domain = urlparse(r.get('url', '')).netloc.replace('www.', '') + title = (r.get('title') or '').replace('\n', ' ').strip()[:60] + shallow_lines.append(f"[{i+1+start_idx}] {domain}: {title}") + + if shallow_lines: + context_parts.append("SHALLOW SOURCES (headlines):\n" + "\n".join(shallow_lines)) + + return "\n\n".join(context_parts) + + def post_search(self, request: "SXNG_Request", search: "SearchWithPlugins") -> EngineResults: + results = EngineResults() + try: + if request and hasattr(request, 'headers') and request.headers.get('X-AI-Auxiliary'): + return results + + current_tabs = set(search.search_query.categories) + if not current_tabs: current_tabs = {'general'} + + if not self.active or not self.api_key or search.search_query.pageno > 1 or not self.allowed_tabs.intersection(current_tabs): + return results + + raw_results = search.result_container.get_ordered_results() + context_str = self._assemble_context(search, raw_results) + + + ts = str(int(time.time())) + q_clean = search.search_query.query.strip() + lang = search.search_query.lang + sig = hashlib.sha256(f"{ts}{self.secret}".encode()).hexdigest() + tk = f"{ts}.{sig}" + + b64_context = base64.b64encode(context_str.encode('utf-8')).decode('utf-8') + js_q = json.dumps(q_clean) + js_lang = json.dumps(lang) + total_context_count = self.context_deep_count + self.context_shallow_count + js_urls = json.dumps([r.get('url') for r in raw_results[:total_context_count]]) + + is_interactive = self.interactive + + interactive_css = INTERACTIVE_CSS if is_interactive else '' + interactive_html = INTERACTIVE_HTML if is_interactive else '' + interactive_js_init = INTERACTIVE_JS if is_interactive else '' interactive_js_complete = "footer.style.display = 'flex';" if is_interactive else '' - - # Streaming function signature differs between modes - stream_fn_sig = 'async function startStream(overrideQ = null, prevAnswer = null)' if is_interactive else 'async function startStream()' + stream_fn_sig = 'async function startStream(overrideQ = null, prevAnswer = null, auxContext = null)' stream_q = 'overrideQ || q_init' if is_interactive else 'q_init' stream_body = f'''prev_answer: prevAnswer''' if is_interactive else '' @@ -556,12 +920,12 @@ class SXNGPlugin(Plugin): ''' search.result_container.answers.add(results.types.Answer(answer=Markup(html_payload))) except Exception as e: - logger.error(f"AI Answers: {e}") + logger.error(f"{PLUGIN_NAME}: {e}") return results diff --git a/demo.py b/demo.py deleted file mode 100644 index 79f0319..0000000 --- a/demo.py +++ /dev/null @@ -1,153 +0,0 @@ -""" -AI Answers Plugin - Interactive Demo Server -Simulates SearXNG environment for local development and testing. - -Usage: python demo.py -Then visit: http://localhost:5000/?q=your+query+here - -Requires: pip install flask flask-babel python-dotenv -""" - -import sys -import os -import logging -from types import ModuleType -from flask import Flask, request -from dotenv import load_dotenv - -logging.basicConfig(level=logging.INFO, format='%(levelname)s: %(message)s') -load_dotenv() -os.environ.setdefault('LLM_STYLE', 'interactive') - -# Mock SearXNG modules -searx = ModuleType("searx") -searx_plugins = ModuleType("searx.plugins") -searx_results = ModuleType("searx.result_types") - -class MockPlugin: - def __init__(self, cfg): - self.active = getattr(cfg, 'active', True) - -class MockPluginInfo: - def __init__(self, **kwargs): - self.meta = kwargs - -class MockEngineResults: - def __init__(self): - self.types = ModuleType("types") - self.types.Answer = lambda *args, **kwargs: kwargs.get('answer', args[0] if args else "") - self._results = [] - - def add(self, res): - self._results.append(res) - -searx_plugins.Plugin = MockPlugin -searx_plugins.PluginInfo = MockPluginInfo -searx_results.EngineResults = MockEngineResults - -sys.modules["searx"] = searx -sys.modules["searx.plugins"] = searx_plugins -sys.modules["searx.result_types"] = searx_results - -from ai_answers import SXNGPlugin -from flask_babel import Babel - -app = Flask(__name__) -babel = Babel(app) - -class MockConfig: - active = True - -plugin = SXNGPlugin(MockConfig()) -plugin.init(app) - -@app.route("/") -def index(): - query = request.args.get("q", "why is the sky blue") - - class MockSearchQuery: - pageno = 1 - lang = 'en' - categories = ['general'] - MockSearchQuery.query = query - - class MockSearch: - search_query = MockSearchQuery() - class MockResultContainer: - def __init__(self): - self.answers = set() - - def get_ordered_results(self): - if 'quantum' in query.lower(): - return [ - {"title": "IBM Quantum", "content": "Quantum computers rely on qubits, which can represent 0, 1, or both via superposition. They solve complex problems faster.", "url": "https://www.ibm.com/quantum", "publishedDate": "2026-01-15"}, - {"title": "Nature Physics", "content": "Entanglement allows qubits to be correlated instantly across distances. This is key for quantum cryptography and teleportation.", "url": "https://nature.com/articles/quantum", "publishedDate": "2026-01-10"}, - {"title": "Wikipedia", "content": "Quantum computing uses quantum mechanics. Major applications include drug discovery and materials science.", "url": "https://en.wikipedia.org/wiki/Quantum_computing", "publishedDate": "2025-12-01"} - ] - return [ - {"title": "Wikipedia", "content": "The sky appears blue due to Rayleigh scattering of sunlight.", "url": "https://en.wikipedia.org/wiki/Rayleigh_scattering", "publishedDate": "2026-01-15"}, - {"title": "NASA Science", "content": "Shorter blue wavelengths scatter more than longer red wavelengths.", "url": "https://science.nasa.gov/blue-sky", "publishedDate": "2026-01-10"}, - {"title": "Physics Today", "content": "The atmosphere acts as a filter, scattering blue light in all directions.", "url": "https://physicstoday.org/atmosphere", "publishedDate": "2026-01-01"} - ] - result_container = MockResultContainer() - - search = MockSearch() - plugin.post_search(None, search) - - injection_html = "" - if search.result_container.answers: - injection_html = list(search.result_container.answers)[0] - - return f""" - - - - - AI Answers Demo - - - -
-

Provider: {plugin.provider or 'Not configured'} | Model: {plugin.model or 'N/A'}

-

Query: {query}

-
- {injection_html if injection_html else '

Plugin inactive. Set LLM_PROVIDER and LLM_KEY in .env

'} -
-

Try: /?q=what+is+quantum+computing

- - - """ - -if __name__ == "__main__": - print() - print("=" * 50) - print(" AI Answers Plugin - Demo Server") - print("=" * 50) - print(f" Provider: {plugin.provider or 'NOT SET'}") - print(f" Model: {plugin.model or 'N/A'}") - print(f" Style: {plugin.style}") - print(f" Status: {'Active' if plugin.api_key else 'Inactive (no LLM_KEY)'}") - print("=" * 50) - print(" http://localhost:5000/?q=your+query+here") - print("=" * 50) - print() - app.run(debug=True, port=5000) diff --git a/test.py b/test.py deleted file mode 100644 index a84d529..0000000 --- a/test.py +++ /dev/null @@ -1,186 +0,0 @@ -""" -AI Answers Plugin - One-Shot Test -Comprehensive test that outputs everything: config, injection, LLM response. - -Usage: python test.py -Requires: pip install flask flask-babel python-dotenv -""" - -import sys -import os -import re -import time -import logging -from types import ModuleType -from dotenv import load_dotenv - -load_dotenv() - -# Suppress Flask noise during test -logging.getLogger('werkzeug').setLevel(logging.ERROR) -logging.basicConfig(level=logging.INFO, format='%(message)s') - -# Mock SearXNG modules -searx = ModuleType("searx") -searx_plugins = ModuleType("searx.plugins") -searx_results = ModuleType("searx.result_types") - -class MockPlugin: - def __init__(self, cfg): - self.active = getattr(cfg, 'active', True) - -class MockPluginInfo: - def __init__(self, **kwargs): - self.meta = kwargs - -class MockEngineResults: - def __init__(self): - self.types = ModuleType("types") - self.types.Answer = lambda *args, **kwargs: kwargs.get('answer', args[0] if args else "") - self._results = [] - - def add(self, res): - self._results.append(res) - -searx_plugins.Plugin = MockPlugin -searx_plugins.PluginInfo = MockPluginInfo -searx_results.EngineResults = MockEngineResults - -sys.modules["searx"] = searx -sys.modules["searx.plugins"] = searx_plugins -sys.modules["searx.result_types"] = searx_results - -from flask import Flask -from flask_babel import Babel -from ai_answers import SXNGPlugin - -def run_tests(): - print() - print("=" * 60) - print(" AI Answers Plugin - Comprehensive Test") - print("=" * 60) - - # === CONFIG TEST === - print("\n[1/4] Configuration") - print("-" * 40) - - app = Flask(__name__) - Babel(app) - - class MockConfig: - active = True - - plugin = SXNGPlugin(MockConfig()) - plugin.init(app) - - print(f" Provider: {plugin.provider or 'NOT SET'}") - print(f" Model: {plugin.model or 'N/A'}") - print(f" API Key: {'[OK]' if plugin.api_key else '[MISSING]'}") - print(f" Max Tokens: {getattr(plugin, 'max_tokens', 'N/A')}") - print(f" Temperature: {getattr(plugin, 'temperature', 'N/A')}") - print(f" Context Count: {getattr(plugin, 'context_count', 'N/A')}") - print(f" Allowed Tabs: {getattr(plugin, 'allowed_tabs', 'N/A')}") - - if not plugin.api_key: - print("\n" + "=" * 60) - print(" SKIPPED: No LLM_KEY configured") - print(" Set LLM_PROVIDER and LLM_KEY in .env to run full test") - print("=" * 60) - return False - - # === INJECTION TEST === - print("\n[2/4] HTML Injection") - print("-" * 40) - - class MockSearchQuery: - pageno = 1 - query = "why is the sky blue" - lang = 'en' - categories = ['general'] - - class MockSearch: - search_query = MockSearchQuery() - class MockResultContainer: - def __init__(self): - self.answers = set() - def get_ordered_results(self): - return [ - {"title": "Wikipedia", "content": "The sky appears blue due to Rayleigh scattering.", "url": "https://example.com/1", "publishedDate": "2026-01-15"}, - {"title": "NASA", "content": "Blue wavelengths scatter more than red.", "url": "https://example.com/2", "publishedDate": "2026-01-10"}, - ] - result_container = MockResultContainer() - - search = MockSearch() - plugin.post_search(None, search) - - if not search.result_container.answers: - print(" FAIL: No HTML injected") - return False - - html = str(list(search.result_container.answers)[0]) - - has_box = 'id="sxng-stream-box"' in html - has_endpoint = '/ai-stream' in html - - token_match = re.search(r'const tk = "(.*?)";', html) - has_token = bool(token_match) - - print(f" Stream box: {'[OK]' if has_box else '[FAIL]'}") - print(f" Endpoint ref: {'[OK]' if has_endpoint else '[FAIL]'}") - print(f" Auth token: {'[OK]' if has_token else '[FAIL]'}") - print(f" HTML size: {len(html):,} bytes") - - if not (has_box and has_endpoint and has_token): - print(" FAIL: Missing required elements") - return False - - # === STREAM ENDPOINT TEST === - print("\n[3/4] Stream Endpoint") - print("-" * 40) - - with app.test_client() as client: - payload = { - "q": "why is the sky blue", - "context": "[1] Wikipedia: The sky appears blue due to Rayleigh scattering.", - "lang": "en", - "tk": token_match.group(1) - } - - start = time.time() - response = client.post('/ai-stream', json=payload) - elapsed = time.time() - start - - print(f" Status: {response.status_code}") - print(f" Time: {elapsed:.2f}s") - - if response.status_code != 200: - print(f" FAIL: Expected 200, got {response.status_code}") - return False - - # === LLM RESPONSE TEST === - print("\n[4/4] LLM Response") - print("-" * 40) - - data = response.data.decode('utf-8') - print(f" Bytes: {len(data):,}") - print(f" Words: ~{len(data.split())}") - - if len(data) < 10: - print(" FAIL: Response too short (API error?)") - return False - - print("\n --- Response Preview ---") - preview = data[:500] + ("..." if len(data) > 500 else "") - for line in preview.split('\n'): - print(f" {line}") - print(" --- End Preview ---") - - # === SUMMARY === - print("\n" + "=" * 60) - print(" ALL TESTS PASSED") - print("=" * 60) - return True - -if __name__ == "__main__": - success = run_tests() - sys.exit(0 if success else 1) diff --git a/tests/demo.py b/tests/demo.py new file mode 100644 index 0000000..b6d3c44 --- /dev/null +++ b/tests/demo.py @@ -0,0 +1,241 @@ +import sys +import os +import logging +from types import ModuleType +from flask import Flask, request + +logging.basicConfig(level=logging.INFO, format='%(levelname)s: %(message)s') +# os.environ.setdefault('LLM_STYLE', 'interactive') # Removed to let plugin config decide defaults + +# SearXNG module mocks +searx = ModuleType("searx") +searx_plugins = ModuleType("searx.plugins") +searx_results = ModuleType("searx.result_types") + +class MockPlugin: + def __init__(self, cfg): + self.active = getattr(cfg, 'active', True) + +class MockPluginInfo: + def __init__(self, **kwargs): + self.meta = kwargs + +class MockEngineResults: + def __init__(self): + self.types = ModuleType("types") + self.types.Answer = lambda *args, **kwargs: kwargs.get('answer', args[0] if args else "") + self._results = [] + + def add(self, res): + self._results.append(res) + +searx_plugins.Plugin = MockPlugin +searx_plugins.PluginInfo = MockPluginInfo +searx_results.EngineResults = MockEngineResults + +sys.modules["searx"] = searx +sys.modules["searx.plugins"] = searx_plugins +sys.modules["searx.result_types"] = searx_results + +# Network module mock +searx_network = ModuleType("searx.network") +def mock_network_call(method, url, **kwargs): + import http.client, ssl, json + from urllib.parse import urlparse + + parsed = urlparse(url) + port = parsed.port or (443 if parsed.scheme=='https' else 80) + target = f"{parsed.hostname}:{port}" + + if parsed.scheme == 'https': + conn = http.client.HTTPSConnection(target, timeout=30, context=ssl.create_default_context()) + else: + conn = http.client.HTTPConnection(target, timeout=30) + + headers = kwargs.get('headers', {}) + body = None + if kwargs.get('json'): + body = json.dumps(kwargs['json']) + elif kwargs.get('data'): + body = kwargs['data'] + + path = parsed.path + if parsed.query: + path += f"?{parsed.query}" + + if kwargs.get('params'): + from urllib.parse import urlencode + query_str = urlencode(kwargs['params']) + if '?' in path: + path += f"&{query_str}" + else: + path += f"?{query_str}" + + conn.request(method, path, body=body, headers=headers) + return conn.getresponse() + +def mock_stream(method, url, **kwargs): + res = mock_network_call(method, url, **kwargs) + + class MockResponse: + def __init__(self, r): + self.status_code = r.status + self.text = "Mock Response" # Stub + self._r = r + + def generator(): + while True: + chunk = res.read(128) + if not chunk: break + yield chunk + + return MockResponse(res), generator() + +def mock_get(url, **kwargs): + import json + res = mock_network_call('GET', url, **kwargs) + + class MockResponse: + def __init__(self, r): + self.status_code = r.status + self._content = r.read() + self.text = self._content.decode('utf-8') + + def json(self): + return json.loads(self.text) + + return MockResponse(res) + +searx_network.stream = mock_stream +searx_network.get = mock_get +sys.modules["searx.network"] = searx_network + +from ai_answers import SXNGPlugin +from flask_babel import Babel + +app = Flask(__name__) +babel = Babel(app) + +class MockConfig: + active = True + +plugin = SXNGPlugin(MockConfig()) +plugin.init(app) + +@app.route("/search") +def mock_search(): + query = request.args.get("q", "") + format_type = request.args.get("format", "html") + + if format_type != "json": + return "Demo only supports JSON format", 400 + + results = [ + {"title": f"Result 1 for: {query}", "content": f"This is simulated content about {query}. It contains relevant information.", "url": f"https://example.com/1/{query.replace(' ', '-')}", "publishedDate": "2026-01-18"}, + {"title": f"Result 2 for: {query}", "content": f"Additional information regarding {query}. More context and details.", "url": f"https://example.com/2/{query.replace(' ', '-')}", "publishedDate": "2026-01-17"}, + {"title": f"Result 3 for: {query}", "content": f"Further reading on {query}. Expert analysis.", "url": f"https://example.com/3/{query.replace(' ', '-')}", "publishedDate": "2026-01-16"}, + ] + + return { + "results": results, + "infoboxes": [], + "answers": [], + "suggestions": [f"{query} explained", f"{query} tutorial"] + } + +@app.route("/") +def index(): + query = request.args.get("q", "why is the sky blue") + + class MockSearchQuery: + pageno = 1 + lang = 'en' + categories = ['general'] + MockSearchQuery.query = query + + class MockSearch: + search_query = MockSearchQuery() + class MockResultContainer: + def __init__(self): + self.answers = set() + + def get_ordered_results(self): + base_results = [ + {"title": "Wikipedia", "content": "The sky appears blue due to Rayleigh scattering of sunlight. When sunlight enters the atmosphere, it collides with gas molecules and scatters in all directions. Blue light scatters more than other colors because it travels in shorter waves.", "url": "https://en.wikipedia.org/wiki/Rayleigh_scattering", "publishedDate": "2026-01-15"}, + {"title": "NASA Science", "content": "Shorter blue wavelengths scatter more than longer red wavelengths. This phenomenon, discovered by Lord Rayleigh in the 1870s, explains why we see a blue sky during the day.", "url": "https://science.nasa.gov/blue-sky", "publishedDate": "2026-01-10"}, + {"title": "Physics Today", "content": "The atmosphere acts as a filter, scattering blue light in all directions while letting other colors pass through more directly.", "url": "https://physicstoday.org/atmosphere", "publishedDate": "2026-01-01"}, + {"title": "Scientific American", "content": "At sunset, light travels through more atmosphere, scattering away the blue and leaving reds and oranges.", "url": "https://scientificamerican.com/sunset", "publishedDate": "2025-12-20"}, + {"title": "National Geographic", "content": "Ocean color also results from light scattering and absorption by water molecules.", "url": "https://nationalgeographic.com/ocean-blue", "publishedDate": "2025-12-15"}, + ] + broad_results = [ + {"title": "MIT OpenCourseWare: Atmospheric Physics", "content": "Course materials.", "url": "https://ocw.mit.edu/physics"}, + {"title": "NOAA: Understanding the Atmosphere", "content": "Educational resource.", "url": "https://noaa.gov/atmosphere"}, + {"title": "BBC Science: Why is the sky blue?", "content": "Explainer article.", "url": "https://bbc.com/science/sky"}, + {"title": "Khan Academy: Light and Color", "content": "Video lesson.", "url": "https://khanacademy.org/light"}, + {"title": "HowStuffWorks: Rayleigh Scattering", "content": "Detailed explanation.", "url": "https://howstuffworks.com/rayleigh"}, + {"title": "Physics Stack Exchange: Sky color discussion", "content": "Q&A thread.", "url": "https://physics.stackexchange.com/sky"}, + {"title": "Quora: Atmospheric optics explained", "content": "Community answers.", "url": "https://quora.com/atmosphere"}, + ] + if 'quantum' in query.lower(): + return [ + {"title": "IBM Quantum", "content": "Quantum computers rely on qubits, which can represent 0, 1, or both via superposition. They solve complex problems faster.", "url": "https://www.ibm.com/quantum", "publishedDate": "2026-01-15"}, + {"title": "Nature Physics", "content": "Entanglement allows qubits to be correlated instantly across distances. This is key for quantum cryptography and teleportation.", "url": "https://nature.com/articles/quantum", "publishedDate": "2026-01-10"}, + {"title": "Wikipedia", "content": "Quantum computing uses quantum mechanics. Major applications include drug discovery and materials science.", "url": "https://en.wikipedia.org/wiki/Quantum_computing", "publishedDate": "2025-12-01"} + ] + broad_results + return base_results + broad_results + result_container = MockResultContainer() + + search = MockSearch() + plugin.post_search(None, search) + + injection_html = "" + if search.result_container.answers: + injection_html = list(search.result_container.answers)[0] + + return f""" + + + + + AI Answers Demo + + + +
+

Provider: {plugin.provider or 'Not configured'} | Model: {plugin.model or 'N/A'}

+

Query: {query}

+
+ {injection_html if injection_html else '

Plugin inactive. Set LLM_PROVIDER and LLM_KEY in .env

'} +
+

Try: /?q=what+is+quantum+computing

+ + + """ + +if __name__ == "__main__": + print("AI Answers - Demo\n") + print(f" Provider: {plugin.provider or 'NOT SET'}") + print(f" Model: {plugin.model or 'N/A'}") + print(f" Mode: {'interactive' if plugin.interactive else 'simple'}") + print(f" Status: {'active' if plugin.api_key else 'inactive (no LLM_KEY)'}") + print(f"\n http://localhost:5000/?q=why+is+the+sky+blue\n") + app.run(debug=False, port=5000) diff --git a/tests/test.py b/tests/test.py new file mode 100644 index 0000000..e6e2269 --- /dev/null +++ b/tests/test.py @@ -0,0 +1,382 @@ +""" +AI Answers Plugin - Comprehensive Test +Test suite that verifies both 'interactive' and 'simple' modes, +checks configuration, and validates LLM integration. + +Usage: python test.py +Requires: pip install flask flask-babel python-dotenv +""" + +import os +import sys + +# Add parent directory to path to find ai_answers.py +sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) + +import re +import time +import logging +import subprocess +import tempfile +from types import ModuleType + +import warnings +warnings.filterwarnings("ignore", category=SyntaxWarning) + +# Suppress Flask noise during test +logging.getLogger('werkzeug').setLevel(logging.ERROR) +logging.basicConfig(level=logging.INFO, format='%(message)s') + +# --- MOCKS START --- + +# SearXNG module mocks +searx = ModuleType("searx") +searx_plugins = ModuleType("searx.plugins") +searx_results = ModuleType("searx.result_types") + +class MockPlugin: + def __init__(self, cfg): + self.active = getattr(cfg, 'active', True) + +class MockPluginInfo: + def __init__(self, **kwargs): + self.meta = kwargs + +class MockEngineResults: + def __init__(self): + self.types = ModuleType("types") + self.types.Answer = lambda *args, **kwargs: kwargs.get('answer', args[0] if args else "") + self._results = [] + + def add(self, res): + self._results.append(res) + + def get_ordered_results(self): + return self._results + +searx_plugins.Plugin = MockPlugin +searx_plugins.PluginInfo = MockPluginInfo +searx_results.EngineResults = MockEngineResults + +# Internal search API mocks +searx_search = ModuleType("searx.search") +searx_search_models = ModuleType("searx.search.models") +searx_query = ModuleType("searx.query") +searx_webadapter = ModuleType("searx.webadapter") + +class MockSearchWithPlugins: + def __init__(self, search_query, request, user_plugins): + self.search_query = search_query + self.result_container = MockEngineResults() + # Add some mock results + self.result_container.add({"title": "Mock Aux Result", "url": "https://test.com", "content": "Test content", "publishedDate": "2026"}) + + # Add mock infoboxes/answers + self.result_container.infoboxes = [{"infobox": "Test Box", "content": "Box Content", "attributes": []}] + self.result_container.answers = set() + self.result_container.answers_list = ["Test Answer"] # Simulating raw answers list if needed + + def search(self): + return self.result_container + +class MockSearchQuery: + def __init__(self, query, engineref_list, **kwargs): + self.query = query + +class MockRawTextQuery: + def __init__(self, query, disabled_engines): + self.query = query + def getQuery(self): + return self.query + +searx_search.SearchWithPlugins = MockSearchWithPlugins +searx_search.models = searx_search_models +searx_search_models.SearchQuery = MockSearchQuery +searx_query.RawTextQuery = MockRawTextQuery +searx_webadapter.get_engineref_from_category_list = lambda cats, disabled: [] + +sys.modules["searx.search"] = searx_search +sys.modules["searx.search.models"] = searx_search_models +sys.modules["searx.query"] = searx_query +sys.modules["searx.webadapter"] = searx_webadapter + +# Network module mock +searx_network = ModuleType("searx.network") +def mock_network_call(method, url, **kwargs): + import http.client, ssl, json + from urllib.parse import urlparse + + parsed = urlparse(url) + port = parsed.port or (443 if parsed.scheme=='https' else 80) + target = f"{parsed.hostname}:{port}" + + if parsed.scheme == 'https': + conn = http.client.HTTPSConnection(target, timeout=30, context=ssl.create_default_context()) + else: + conn = http.client.HTTPConnection(target, timeout=30) + + headers = kwargs.get('headers', {}) + body = None + if kwargs.get('json'): + body = json.dumps(kwargs['json']) + elif kwargs.get('data'): + body = kwargs['data'] + + path = parsed.path + if parsed.query: + path += f"?{parsed.query}" + + if kwargs.get('params'): + from urllib.parse import urlencode + query_str = urlencode(kwargs['params']) + if '?' in path: + path += f"&{query_str}" + else: + path += f"?{query_str}" + + conn.request(method, path, body=body, headers=headers) + print(f" [DEBUG] Network Call: {method} {target}{path}") + print(f" [DEBUG] Headers: {headers}") + # print(f" [DEBUG] Body: {body}") + return conn.getresponse() + +def mock_stream(method, url, **kwargs): + res = mock_network_call(method, url, **kwargs) + + class MockResponse: + def __init__(self, r): + self.status_code = r.status + self.text = "Mock Response" # Stub + self._r = r + + def generator(): + while True: + chunk = res.read(128) + if not chunk: break + yield chunk + + return MockResponse(res), generator() + +def mock_get(url, **kwargs): + import json + res = mock_network_call('GET', url, **kwargs) + + class MockResponse: + def __init__(self, r): + self.status_code = r.status + self._content = r.read() + self.text = self._content.decode('utf-8') + + def json(self): + return json.loads(self.text) + + return MockResponse(res) + +searx_network.stream = mock_stream +searx_network.get = mock_get +sys.modules["searx.network"] = searx_network + +sys.modules["searx"] = searx +sys.modules["searx.plugins"] = searx_plugins +sys.modules["searx.result_types"] = searx_results + +# --- MOCKS END --- + +from flask import Flask +from flask_babel import Babel +from ai_answers import SXNGPlugin + +def check_js_syntax(js_code): + """Returns (valid, error_msg)""" + try: + with tempfile.NamedTemporaryFile(mode='w', suffix='.js', delete=False, encoding='utf-8') as f: + f.write(js_code) + temp_path = f.name + + result = subprocess.run( + ['node', '--check', temp_path], + capture_output=True, + text=True, + timeout=5 + ) + os.unlink(temp_path) + + if result.returncode == 0: + return True, None + else: + return False, result.stderr.strip() + except (FileNotFoundError, subprocess.TimeoutExpired, Exception) as e: + return True, f"[SKIP] {e}" # Skip if node not found + +def run_tests(): + print("AI Answers - Test Suite\n") + + print("[Syntax]") + + import py_compile + try: + target_file = os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), 'ai_answers.py') + py_compile.compile(target_file, doraise=True) + print(" Python: OK") + except py_compile.PyCompileError as e: + print(f" Syntax: [FAIL] {e}") + return False + + modes = ['interactive', 'simple'] + + for mode in modes: + app = Flask(__name__) + Babel(app) + + # Set LLM_INTERACTIVE based on mode + os.environ['LLM_INTERACTIVE'] = 'true' if mode == 'interactive' else 'false' + + # Override env var for this iteration + # os.environ['LLM_STYLE'] = mode # Legacy + os.environ['LLM_INTERACTIVE'] = 'true' if mode == 'interactive' else 'false' + + class MockConfig: + active = True + + # Re-init plugin with new env var in effect + plugin = SXNGPlugin(MockConfig()) + plugin.init(app) + + if mode == 'interactive': + print(f"\n[Config]") + print(f" Provider: {plugin.provider or 'NOT SET'}") + print(f" API Key: {'OK' if plugin.api_key else 'MISSING'}") + + # Construct Search + class MockSearchQuery: + pageno = 1 + query = "test query" + lang = 'en' + categories = ['general'] + + class MockSearch: + search_query = MockSearchQuery() + class MockResultContainer: + def __init__(self): + self.answers = set() + def get_ordered_results(self): + return [ + {"title": "T1", "content": "C1", "url": "https://a.com/1", "publishedDate": "2026-01-15"}, + {"title": "T2", "content": "C2", "url": "https://a.com/2", "publishedDate": "2026-01-10"}, + ] + result_container = MockResultContainer() + + search = MockSearch() + plugin.post_search(None, search) + + if not search.result_container.answers: + print(" FAIL: No HTML injected") + return False + + html = str(list(search.result_container.answers)[0]) + + # Mode-specific basic validations + has_box = 'id="sxng-stream-box"' in html + has_footer = 'id="sxng-footer"' in html + + if mode == 'interactive': + if has_box and has_footer: + print("\n[Render: interactive]") + print(" UI: OK") + else: + print(f" FAIL: Box={has_box}, Footer={has_footer}") + return False + else: + if has_box and not has_footer: + print("\n[Render: simple]") + print(" UI: OK") + else: + print(f" FAIL: Box={has_box}, Footer={has_footer}") + return False + + # JS Verification + js_match = re.search(r'', html, re.DOTALL) + if not js_match: + print(" FAIL: No script tag found") + return False + + js_code = js_match.group(1).strip() + valid, err = check_js_syntax(js_code) + + if valid: + print(" JS: OK") + else: + print(" JS Syntax: [FAIL]") + print(f" Error: {err.splitlines()[0][:80]}...") + return False + + print(f" Size: {len(html):,} bytes") + + # Verify Critical Fix: Function Signature + # simple mode caused reference error if signature wasn't unified + if 'async function startStream(overrideQ = null, prevAnswer = null, auxContext = null)' in js_code: + print(" Signature: OK") + else: + print(" Signature Fix: [FAIL] Unified startStream signature MISSING") + # Not fatal for interactive per se, but fatal if consistent code is desired + # For simple mode it IS fatal in runtime. + if mode == 'simple': return False + + + # --------------------------------------------------------- + # GLOBAL ENDPOINT / INTEGRATION TESTS (Using last plugin init) + # --------------------------------------------------------- + + if not plugin.api_key: + print("\n[Skip integration: no LLM_KEY]") + return True + + print(f"\n[Stream]") + print(f" Provider: {plugin.provider}") + print(f" Model: {plugin.model}") + + # Needs a token from the last run to pass auth + token_match = re.search(r'tk_init = "(.*?)";', html) + if not token_match: + print(" FAIL: Could not extract token for stream test") + return False + + with app.test_client() as client: + payload = { + "q": "why is the sky blue", + "context": "[1] Wikipedia: The sky appears blue.", + "lang": "en", + "tk": token_match.group(1) + } + + start = time.time() + response = client.post('/ai-stream', json=payload) + elapsed = time.time() - start + + print(f" Status: {response.status_code}") + print(f" Time: {elapsed:.2f}s") + + if response.status_code != 200: + print(f" FAIL: Expected 200, got {response.status_code}") + return False + + data = response.data.decode('utf-8') + if len(data) < 5: + print(" FAIL: Empty or too short response") + return False + print(" Result: OK") + + print("\n[Aux Search]") + with app.test_client() as client: + aux_response = client.post('/ai-auxiliary-search', json={'query': 'test'}) + if aux_response.status_code == 200 and 'results' in aux_response.get_json(): + print(" Result: OK") + else: + print(" Aux Endpoint: [FAIL]") + + print("\nPASS") + return True + +if __name__ == "__main__": + success = run_tests() + sys.exit(0 if success else 1)