{interactive_html}
import json, os, logging, base64, time, hashlib, codecs, re, http.client, ssl from urllib.parse import urlparse from searx import network try: from searx.network import get_network except ImportError: get_network = None # Graceful fallback for test/demo environments from flask import Response, request, abort, jsonify from searx.plugins import Plugin, PluginInfo from searx.result_types import EngineResults from flask_babel import gettext from markupsafe import Markup logger = logging.getLogger(__name__) TOKEN_EXPIRY_SEC = 3600 STREAM_CHUNK_SIZE = 128 STREAM_TIMEOUT_SEC = 60 def _get_streaming_connection(url: str): parsed = urlparse(url) host = parsed.hostname port = parsed.port or (443 if parsed.scheme == 'https' else 80) path = parsed.path + ('?' + parsed.query if parsed.query else '') verify_ssl = True if get_network is not None: try: net = get_network() verify_ssl = getattr(net, 'verify', True) except Exception: pass if parsed.scheme == 'https': ctx = ssl.create_default_context() if verify_ssl else ssl._create_unverified_context() conn = http.client.HTTPSConnection(host, port, timeout=STREAM_TIMEOUT_SEC, context=ctx) else: conn = http.client.HTTPConnection(host, port, timeout=STREAM_TIMEOUT_SEC) return conn, path PLUGIN_NAME = "AI Answers" DEFAULT_TABS = "general,science,it,news" PROVIDER_PRESETS = { 'openai': {'url': 'https://api.openai.com/v1/chat/completions', 'model': 'gpt-4o-mini'}, 'openrouter': {'url': 'https://openrouter.ai/api/v1/chat/completions', 'model': 'google/gemma-3-27b-it:free'}, 'ollama': {'url': 'http://localhost:11434/v1/chat/completions', 'model': 'llama3.2'}, 'localai': {'url': 'http://localhost:8080/v1/chat/completions', 'model': 'gpt-4'}, 'lmstudio': {'url': 'http://localhost:1234/v1/chat/completions', 'model': 'local-model'}, 'gemini': {'url': 'https://generativelanguage.googleapis.com/v1beta/models/{model}:streamGenerateContent', 'model': 'gemma-3-27b-it'}, 'azure': {'url': None, 'model': 'azure-deployment'}, 'huggingface': {'url': 'https://api-inference.huggingface.co/models/{model}/v1/chat/completions', 'model': 'meta-llama/Meta-Llama-3-8B-Instruct'} } # UI assets (inlined for single-file install) INTERACTIVE_CSS = ''' @keyframes sxng-fade-in-up { 0% { opacity: 0; transform: translateY(10px); } 100% { opacity: 1; transform: translateY(0); } } .sxng-footer { display: flex; align-items: center; gap: 0.5rem; margin-top: 0.5rem; opacity: 0; animation: sxng-fade-in-up 0.5s ease-out forwards; } .sxng-btn { display: inline-flex; align-items: center; justify-content: center; width: 32px; height: 32px; padding: 0; border: 1px solid transparent; border-radius: 6px; background: transparent; color: var(--color-base-font, #333); cursor: pointer; transition: all 0.2s ease; opacity: 0.6; } .sxng-btn:hover { background: var(--color-base-background-hover, rgba(0,0,0,0.05)); color: var(--color-result-link, #5e81ac); opacity: 1; transform: translateY(-1px); } .sxng-btn svg { width: 18px; height: 18px; fill: currentColor; } .sxng-input-wrapper { flex-grow: 1; display: flex; align-items: center; margin: 0 0.5rem; position: relative; } .sxng-input { width: 100%; background: transparent; border: none; color: var(--color-base-font, #333); font-family: -apple-system, BlinkMacSystemFont, "Segoe UI", Roboto, Helvetica, Arial, sans-serif; font-size: 16px; padding: 0.5rem 2.5rem 0.5rem 0; opacity: 0.8; transition: opacity 0.2s; } .sxng-input:focus { outline: none; opacity: 1; } .sxng-input::placeholder { color: var(--color-base-font, #333); opacity: 0.35; } .sxng-input-line { position: absolute; bottom: 0; left: 0; width: 0; height: 1px; background: var(--color-result-link, #5e81ac); transition: width 0.3s ease; } .sxng-input:focus + .sxng-input-line { width: 100%; } .sxng-user-msg { display: block; width: fit-content; max-width: 80%; margin: 0.75rem 0 0.75rem auto; padding: 0.25rem 0.6rem 0.25rem 0; border-right: 2px solid var(--color-result-link, #5e81ac); text-align: right; font-size: 0.85rem; line-height: 1.4; opacity: 0.55; animation: sxng-fade-in-up 0.3s ease-out forwards; } .sxng-input-submit { all: unset; position: absolute; right: 0; top: 50%; transform: translateY(-50%); display: inline-flex; align-items: center; justify-content: center; width: 32px; height: 32px; padding: 0; background: transparent !important; border: none !important; border-radius: 6px; color: var(--color-base-font, #333); cursor: pointer; opacity: 0.3; transition: all 0.2s ease; } .sxng-input-wrapper:focus-within .sxng-input-submit, .sxng-input-submit:hover { opacity: 1; color: var(--color-result-link, #5e81ac); background: var(--color-base-background-hover, rgba(0,0,0,0.05)) !important; } .sxng-input-submit svg { width: 18px; height: 18px; fill: currentColor; } .sxng-input-submit svg { width: 18px; height: 18px; fill: currentColor; } ''' INTERACTIVE_HTML = '''
''' CITATION_HELPER_JS = r''' function renderCitations(text, urls) { const fragment = document.createDocumentFragment(); const re = /\[(\d{1,2}(?:\s*,\s*\d{1,2})*)\]/g; let lastIdx = 0; const matches = [...text.matchAll(re)]; matches.forEach(match => { if (match.index > lastIdx) { const s = document.createElement('span'); s.className = 'sxng-chunk'; // Preserve whitespace by not trimming s.textContent = text.substring(lastIdx, match.index); fragment.appendChild(s); } match[1].split(/\s*,\s*/).forEach(n => { const idx = parseInt(n.trim()); if (idx >= 1 && idx <= urls.length) { const url = urls[idx-1]; if (url) { const a = document.createElement('a'); a.href = url; a.target = '_blank'; a.style.cssText = 'text-decoration:none;color:var(--color-result-link);font-weight:bold;'; a.textContent = `[${n.trim()}]`; a.className = 'sxng-chunk'; fragment.appendChild(a); } else { const s = document.createElement('span'); s.className = 'sxng-chunk'; s.textContent = `[${n.trim()}]`; fragment.appendChild(s); } } else { const s = document.createElement('span'); s.className = 'sxng-chunk'; s.textContent = `[${n.trim()}]`; fragment.appendChild(s); } }); lastIdx = match.index + match[0].length; }); if (lastIdx < text.length) { const s = document.createElement('span'); s.className = 'sxng-chunk'; // Preserve whitespace by not trimming s.textContent = text.substring(lastIdx); fragment.appendChild(s); } return fragment; } ''' INTERACTIVE_JS = r''' const footer = document.getElementById('sxng-footer'); const input = document.getElementById('sxng-action-input'); // Inherited from outer scope: box, data, conversation // Theme detection: inherit host accent color if (window.getComputedStyle && box) { try { const docStyles = getComputedStyle(document.documentElement); let accent = docStyles.getPropertyValue('--color-result-link').trim(); if (!accent) { const a = document.createElement('a'); document.body.appendChild(a); accent = getComputedStyle(a).color; document.body.removeChild(a); } if (accent) { box.style.setProperty('--color-result-link', accent); box.style.setProperty('--sxng-ai-accent', accent); } } catch(e) {} } // Persist conversation state to URL hash (stateless) const updateState = () => { try { const state = { t: conversation.turns.map(t => ({ r: t.role === 'user' ? 'u' : 'a', c: t.content.replace(/\s+/g, ' ').trim() })), u: urls }; const b64 = btoa(encodeURIComponent(JSON.stringify(state)).replace(/%([0-9A-F]{2})/g, (m,p)=>String.fromCharCode('0x'+p))); history.replaceState(null, null, '#ai=' + b64); } catch(e) {} }; if (location.hash.includes('ai=')) { try { const b64 = location.hash.split('ai=')[1]; const json = decodeURIComponent(atob(b64).split('').map(c => '%' + ('00' + c.charCodeAt(0).toString(16)).slice(-2)).join('')); const state = JSON.parse(json); if (state.t && state.t.length > 0) { // Restore URLs for citation indexing if (state.u && Array.isArray(state.u)) { urls = state.u; } conversation.turns = state.t.map(t => ({ role: t.r === 'u' ? 'user' : 'assistant', content: t.c.trim(), ts: 0 })); // Helper function to inject citations into text const injectCitations = (text) => { return renderCitations(text, urls); }; data.innerHTML = ''; conversation.turns.forEach((turn, i) => { if (turn.role === 'user') { if (turn.content !== conversation.originalQuery) { const u = document.createElement('span'); u.className = 'sxng-user-msg'; u.textContent = turn.content; data.appendChild(u); const clr = document.createElement('div'); clr.style.clear = 'both'; data.appendChild(clr); } } else { // Inject citations for assistant responses data.appendChild(injectCitations(turn.content)); } }); box.style.display = 'block'; if(wrapper) wrapper.style.display = ''; if(footer && is_interactive) footer.style.display = 'flex'; restored = true; } } catch(e) { console.warn('Restore failed', e); } } document.getElementById('btn-copy').onclick = async (e) => { const btn = e.currentTarget; const originalContent = btn.innerHTML; const text = Array.from(data.childNodes) .filter(n => n.nodeType === 3 || n.tagName === 'SPAN') .map(n => n.textContent) .join(''); await navigator.clipboard.writeText(text); btn.innerHTML = ''; setTimeout(() => btn.innerHTML = originalContent, 2000); }; document.getElementById('btn-regen').onclick = () => { data.innerHTML = ''; footer.style.display = 'none'; startStream(); }; const handleAction = async (e) => { if (e) e.preventDefault(); const val = input.value.trim(); conversation.turns.push({role: 'user', content: val, ts: Date.now()}); updateState(); const currentText = conversation.turns.slice(0, -1).slice(-6) .map(t => (t.role === 'user' ? 'Q' : 'A') + ': ' + t.content) .join('\\n\\n'); input.value = ''; input.blur(); footer.style.display = 'none'; if (val) { const cursor = data.querySelector('.sxng-cursor'); if (cursor) cursor.remove(); const userMsg = document.createElement('span'); userMsg.className = 'sxng-user-msg'; userMsg.textContent = val; data.appendChild(userMsg); const clr = document.createElement('div'); clr.style.clear = 'both'; data.appendChild(clr); const newCursor = document.createElement('span'); newCursor.className = 'sxng-cursor'; data.appendChild(newCursor); const synthesized = synthesizeQuery(q_init, val); let auxContext = null; try { const auxData = await fetch('/ai-auxiliary-search', { method: 'POST', headers: {'Content-Type': 'application/json'}, body: JSON.stringify({query: synthesized, lang: lang_init, offset: urls.length}) }).then(r => r.json()); if (auxData.context) { const originalBackground = conversation.originalContext.substring(0, 1500); auxContext = `FRESH SOURCES (most relevant):\\n${auxData.context}\\n\\nBACKGROUND (for reference):\\n${originalBackground}`; if (auxData.new_urls && Array.isArray(auxData.new_urls)) { urls = urls.concat(auxData.new_urls); } } } catch (err) {} await startStream(val, currentText, auxContext); updateState(); } else { const cursor = data.querySelector('.sxng-cursor'); if (cursor) cursor.remove(); data.appendChild(document.createElement('br')); data.appendChild(document.createElement('br')); const newCursor = document.createElement('span'); newCursor.className = 'sxng-cursor'; data.appendChild(newCursor); await startStream("Continue", currentText); updateState(); } }; document.getElementById('sxng-action-form').onsubmit = handleAction; input.onfocus = () => { setTimeout(() => { input.scrollIntoView({behavior: 'smooth', block: 'center'}); }, 300); }; const _origStream = startStream; startStream = async function(...args) { if (args.length === 0 && restored) return; await _origStream.apply(this, args); if (args.length === 0) updateState(); }; ''' import typing if typing.TYPE_CHECKING: from searx.search import SearchWithPlugins from searx.extended_types import SXNG_Request from . import PluginCfg class SXNGPlugin(Plugin): id = "ai_answers" def __init__(self, plg_cfg: "PluginCfg"): super().__init__(plg_cfg) self.info = PluginInfo( id=self.id, name=gettext(f"{PLUGIN_NAME} Plugin"), description=gettext("Live AI search answers using LLM providers."), preference_section="general", ) self._load_config() def _ollama_unload_model(self) -> None: """ Force-unload an Ollama model after a response by calling the native /api/chat endpoint: {"model": "...", "messages": [], "keep_alive": 0} """ try: if self.provider != 'ollama': return if not getattr(self, 'ollama_unload_after', False): return unload_url = (getattr(self, 'ollama_unload_url', '') or '').strip() if not unload_url: return conn = None try: conn, path = _get_streaming_connection(unload_url) payload = json.dumps({ "model": self.model, "messages": [], "keep_alive": 0 }) headers = {"Content-Type": "application/json"} # Optional: if Ollama is behind auth, reuse LLM_KEY if self.api_key and self.api_key not in ('none', 'ollama'): headers["Authorization"] = f"Bearer {self.api_key}" conn.request("POST", path, body=payload, headers=headers) res = conn.getresponse() res.read() # drain if res.status >= 400: logger.warning(f"{PLUGIN_NAME}: Ollama unload failed: {res.status} {res.reason}") finally: if conn: conn.close() except Exception as e: logger.warning(f"{PLUGIN_NAME}: Ollama unload error: {e}") def _load_config(self): self.interactive = os.getenv('LLM_INTERACTIVE', 'true').lower().strip() in ('true', '1', 'yes', 'on') self.question_mark_required = os.getenv('LLM_QUESTION_MARK_REQUIRED', 'false').lower().strip() in ('true', '1', 'yes', 'on') raw_provider = os.getenv('LLM_PROVIDER', '').lower().strip() raw_url = os.getenv('LLM_URL', '').strip() if not raw_provider and raw_url: url_lower = raw_url.lower() if 'openai.com' in url_lower: raw_provider = 'openai' elif 'openrouter.ai' in url_lower: raw_provider = 'openrouter' elif ':11434' in url_lower: raw_provider = 'ollama' elif 'generativelanguage.googleapis.com' in url_lower: raw_provider = 'gemini' elif 'openai.azure.com' in url_lower or '.azure.com' in url_lower: raw_provider = 'azure' elif 'huggingface.co' in url_lower: raw_provider = 'huggingface' else: # Unknown URL fallback to OpenAI-compatible raw_provider = 'openai' logger.info(f"{PLUGIN_NAME}: Using OpenAI-compatible mode for custom URL") if not raw_provider: self.provider = '' self.model = '' self.is_gemini = False self.api_key = '' return if raw_provider not in PROVIDER_PRESETS: logger.warning(f"{PLUGIN_NAME}: Unknown provider '{raw_provider}', falling back to 'openai'") self.provider = raw_provider if raw_provider in PROVIDER_PRESETS else 'openai' self.is_gemini = (self.provider == 'gemini') preset = PROVIDER_PRESETS[self.provider] self.api_key = os.getenv('LLM_KEY', '') if not self.api_key and self.provider in ('ollama', 'localai', 'lmstudio'): self.api_key = 'none' self.api_key = self.api_key.strip() self.model = os.getenv('LLM_MODEL', preset['model']).strip() try: self.max_tokens = int(os.getenv('LLM_MAX_TOKENS', 500)) except ValueError: self.max_tokens = 500 try: self.temperature = float(os.getenv('LLM_TEMPERATURE', 0.2)) except ValueError: self.temperature = 0.2 try: self.context_deep_count = max(0, int(os.getenv('LLM_CONTEXT_DEEP_COUNT', 5))) except ValueError: self.context_deep_count = 5 try: self.context_shallow_count = max(0, int(os.getenv('LLM_CONTEXT_SHALLOW_COUNT', 15))) except ValueError: self.context_shallow_count = 15 self.allowed_tabs = set(t.strip() for t in os.getenv('LLM_TABS', DEFAULT_TABS).split(',')) preset_url = preset['url'] if preset_url and '{model}' in preset_url: preset_url = preset_url.format(model=self.model) raw_url = os.getenv('LLM_URL', '').strip() or preset_url if not raw_url.startswith(('http://', 'https://')): raw_url = f"https://{raw_url}" self.endpoint_url = raw_url # Ollama: optional "unload after response" (frees VRAM between queries). # Enable with: LLM_OLLAMA_UNLOAD_AFTER=true self.ollama_unload_after = os.getenv('LLM_OLLAMA_UNLOAD_AFTER', 'false').lower().strip() in ('true', '1', 'yes', 'on') self.ollama_unload_url = '' if self.provider == 'ollama' and self.ollama_unload_after: try: p = urlparse(self.endpoint_url) scheme = p.scheme or 'http' host = p.hostname or 'localhost' port = p.port netloc = f"{host}:{port}" if port else host self.ollama_unload_url = f"{scheme}://{netloc}/api/chat" except Exception: self.ollama_unload_url = "http://localhost:11434/api/chat" if self.api_key: self.secret = os.getenv('SXNG_LLM_SECRET') or hashlib.sha256(self.api_key.encode()).hexdigest() else: self.secret = os.getenv('SXNG_LLM_SECRET', '') def _parse_aux_results(self, raw_results, raw_infoboxes, raw_answers): results = [] limit = self.context_deep_count + self.context_shallow_count for r in raw_results[:limit]: # Handle both MainResult (attribute access) and LegacyResult (dict access) if hasattr(r, 'title'): results.append({ 'title': getattr(r, 'title', ''), 'content': getattr(r, 'content', ''), 'url': getattr(r, 'url', ''), 'publishedDate': getattr(r, 'publishedDate', '') }) else: # Legacy dictionary-style access results.append({ 'title': r.get('title', ''), 'content': r.get('content', ''), 'url': r.get('url', ''), 'publishedDate': r.get('publishedDate', '') }) # SearXNG already merges infoboxes by ID - take first with full content infoboxes = [] for ib in raw_infoboxes[:1]: infoboxes.append({ 'name': ib.get('infobox', '') or ib.get('title', ''), 'content': ib.get('content', '')[:2000], 'attributes': ib.get('attributes', []) }) # Only extract simple Answer types (skip Translations, WeatherAnswer etc.) answers = [] for a in list(raw_answers)[:2]: if hasattr(a, 'answer') and isinstance(getattr(a, 'answer', None), str): answers.append(a.answer) elif isinstance(a, dict) and a.get('answer'): answers.append(str(a['answer'])) return results, infoboxes, answers def init(self, app): if not self.provider: return @app.route('/ai-auxiliary-search', methods=['POST']) def ai_auxiliary_search(): if not self.api_key: abort(403) data = request.json or {} query = data.get('query', '').strip() lang = data.get('lang', 'all') categories = data.get('categories', 'general') offset = data.get('offset', 0) if not query: return jsonify({'results': []}) # Direct kernel access (bypasses HTTP loopback) try: from searx.search import SearchWithPlugins from searx.search.models import SearchQuery from searx.query import RawTextQuery from searx.webadapter import get_engineref_from_category_list preferences = getattr(request, 'preferences', None) disabled_engines = preferences.engines.get_disabled() if preferences else [] rtq = RawTextQuery(query, disabled_engines) if isinstance(categories, str): category_list = [c.strip() for c in categories.split(',') if c.strip()] else: category_list = categories or ['general'] enginerefs = get_engineref_from_category_list(category_list, disabled_engines) sq = SearchQuery( query=rtq.getQuery(), engineref_list=enginerefs, lang=lang, pageno=1, ) # Empty plugins list prevents recursion search_obj = SearchWithPlugins(sq, request, user_plugins=[]) result_container = search_obj.search() raw_results = result_container.get_ordered_results() raw_infoboxes = getattr(result_container, 'infoboxes', []) raw_answers = getattr(result_container, 'answers', []) results, infoboxes, answers = self._parse_aux_results(raw_results, raw_infoboxes, raw_answers) context_str, new_urls = self._assemble_context(results, infoboxes, answers, offset) return jsonify({ 'context': context_str, 'new_urls': new_urls, 'results': results, 'infoboxes': infoboxes, 'answers': answers, 'query': query }) except ImportError: try: search_url = f'{request.url_root}search' params = { 'q': query, 'format': 'json', 'categories': categories, 'language': lang } headers = { 'X-AI-Auxiliary': '1', 'Accept-Language': request.headers.get('Accept-Language', '') } res = network.get(search_url, params=params, headers=headers, timeout=2) search_data = res.json() results, infoboxes, answers = self._parse_aux_results( search_data.get('results', []), search_data.get('infoboxes', []), search_data.get('answers', []) ) context_str, new_urls = self._assemble_context(results, infoboxes, answers, offset) return jsonify({ 'context': context_str, 'new_urls': new_urls, 'results': results, 'infoboxes': infoboxes, 'answers': answers, 'query': query }) except Exception as e: return jsonify({'results': [], 'error': str(e)}) except Exception as e: return jsonify({'results': [], 'error': str(e)}) @app.route('/ai-stream', methods=['POST']) def handle_ai_stream(): data = request.json or {} if data.get('warmup'): return Response('', status=204) token = data.get('tk', '') q = data.get('q', '') lang = data.get('lang', 'all') try: ts, sig = token.rsplit('.', 1) expected = hashlib.sha256(f"{ts}{self.secret}".encode()).hexdigest() if sig != expected or (time.time() - float(ts)) > TOKEN_EXPIRY_SEC: abort(403) except (ValueError, KeyError, AttributeError): abort(403) context_text = data.get('context', '') prev_answer = (data.get('prev_answer') or '')[-4000:] if not self.api_key: return Response("Missing API key or query", status=400) today = time.strftime("%Y-%m-%d") target_words = int(self.max_tokens * 0.4) lang_instruction = f" Respond in {lang}." if lang not in ('all', 'auto') else "" SYSTEM = f"You are a direct, citation-accurate search synthesis engine. Today is {today}.{lang_instruction}" max_source_idx = 0 if context_text: indices = re.findall(r'\[(\d+)\]', context_text) if indices: max_source_idx = max(map(int, indices)) CORE_RULES = [ "Answer the question directly using the provided context.", f"High density: Expert-briefing level. Aim for ~{target_words} words.", "MUST CITE SOURCES by tailing a sentence with [n] or [n,n] etc. If citing general knowledge, use [*].", "Do not use filler words, transitions, or meta-commentary.", "Never explain your process. The user expects a direct response.", "If sources and general knowledge are insufficient, respond with 'Insufficient information to answer.'" ] if q == "Continue": task = "CONTINUE: Pick up exactly where previous answer stopped. No repetition. Seamless flow." elif prev_answer: task = "FOLLOW-UP: Address the new question using prior context. Prioritize the new query." else: task = "ANSWER FIRST: Lead with the direct answer. No preamble, no context-setting." grounding = "GROUNDING: KNOWLEDGE GRAPH > DEEP > SHALLOW." if context_text else "GROUNDING: No sources available. Use general knowledge and cite as [*] which means based on general knowledge." history_rule = "HISTORY: Refer to prior exchange for context. Ideally, do not repeat any claims." if prev_answer else None instructions = [task] + CORE_RULES + [grounding] if history_rule: instructions.append(history_rule) numbered_instructions = "\n".join(f"{i+1}. {r}" for i, r in enumerate(instructions)) prompt = f"""