feats: new interactive mode, language preference and time injection, more provider flexibility and clarity, more configuration options

This commit is contained in:
cra88y/pc
2026-01-19 22:40:46 -06:00
parent 1ea09a38eb
commit 140a5f7235
5 changed files with 1051 additions and 310 deletions
+62 -20
View File
@@ -2,9 +2,9 @@
**Does not block result loading time.**
A SearXNG plugin that generates an AI answer using search results as RAG grounding context. Supports Google Gemini and OpenAI-compatible providers (OpenRouter, Ollama, OpenAI API etc.).
A SearXNG plugin that generates AI answers using search results as RAG context. Supports 8 LLM providers.
Features token by token UI updates as response is recieved.
Features token-by-token streaming and clickable inline citations.
## Installation
@@ -20,32 +20,74 @@ plugins:
Set the following environment variables:
### General
### Required
- `LLM_PROVIDER`: `openrouter` (default) or `gemini`. (openrouter for all OpenAI APIs)
- `RESPONSE_MAX_TOKENS`: Defaults to `500`.
- `RESPONSE_TEMPERATURE`: Defaults to `0.2`.
- `LLM_PROVIDER`: openrouter, openai, ollama, localai, lmstudio, gemini, azure, or huggingface
- `LLM_KEY`: Your API key
### OpenRouter / OpenAI / Ollama
(for any OpenAI compatible API, will revise naming clarity in update soon)
- `OPENROUTER_API_KEY`: Your API key.
- `OPENROUTER_MODEL`: Defaults to `google/gemma-3-27b-it:free`.
- `OPENROUTER_BASE_URL`: Defaults to `openrouter.ai`. (Change to `localhost:11434` for Ollama, or base url of target OpenAI-compatible API).
### Optional
### Google Gemini
- `GEMINI_API_KEY`: Your Google AI API key.
- `GEMINI_MODEL`: Defaults to `gemma-3-27b-it`.
- `LLM_MODEL`: Model identifier. Defaults vary by provider.
- `LLM_URL`: Custom endpoint URL. Overrides provider preset.
- `LLM_MAX_TOKENS`: Defaults to `500`.
- `LLM_TEMPERATURE`: Defaults to `0.2`.
- `LLM_CONTEXT_COUNT`: Search results to include. Defaults to `5`.
- `LLM_TABS`: Comma-separated tab whitelist. Defaults to general,science,it,news.
- `LLM_STYLE`: UI mode. Set to "simple" for no interactive controls (copy, regenerate, follow up, continue). Defaults to simple.
## How It Works
After search completes, the plugin extracts the top 6 results as context. A client-side script calls the stream endpoint with a signed token. The LLM response streams back. Token by token rendering is soon.
After search completes, the plugin extracts top search results as context. A client-side script calls the stream endpoint with a signed token. The LLM response streams back token by token.
## Ollama (Local)
## Examples
### OpenRouter
```
LLM_PROVIDER=openrouter
OPENROUTER_API_KEY=ollama
OPENROUTER_MODEL=gemma3:27b
OPENROUTER_BASE_URL=localhost:11434
LLM_KEY=sk-or-xxx
LLM_MODEL=google/gemma-3-27b-it:free
```
### Ollama (Local)
```
LLM_PROVIDER=ollama
LLM_KEY=ollama
LLM_MODEL=llama3.2
```
### LocalAI
```
LLM_PROVIDER=localai
LLM_KEY=your-key
LLM_MODEL=gpt-4
LLM_URL=http://localai.lan:8080/v1/chat/completions
```
### Gemini
```
LLM_PROVIDER=gemini
LLM_KEY=AIzaSy...
LLM_MODEL=gemma-3-27b-it
```
### Azure
```
LLM_PROVIDER=azure
LLM_KEY=your-api-key
LLM_URL=https://your-resource.openai.azure.com/openai/deployments/your-deployment/chat/completions?api-version=2024-02-01
```
### Hugging Face
```
LLM_PROVIDER=huggingface
LLM_KEY=hf_xxx
LLM_MODEL=meta-llama/Meta-Llama-3-8B-Instruct
```
## Development
```bash
pip install flask flask-babel python-dotenv
python demo.py # Interactive test server on localhost:5000
python test.py # One-shot test suite
```
+604 -96
View File
@@ -1,4 +1,5 @@
import json, http.client, ssl, os, logging, base64, time, hashlib
from urllib.parse import urlparse
from flask import Response, request, abort
from searx.plugins import Plugin, PluginInfo
from searx.result_types import EngineResults
@@ -7,81 +8,234 @@ from markupsafe import Markup
logger = logging.getLogger(__name__)
# Constants
TOKEN_EXPIRY_SEC = 60
TOKEN_EXPIRY_SEC = 86400
CONNECTION_TIMEOUT_SEC = 30
PROVIDER_PRESETS = {
'openai': {'url': 'https://api.openai.com/v1/chat/completions', 'model': 'gpt-4o-mini'},
'openrouter': {'url': 'https://openrouter.ai/api/v1/chat/completions', 'model': 'google/gemma-3-27b-it:free'},
'ollama': {'url': 'http://localhost:11434/v1/chat/completions', 'model': 'llama3.2'},
'localai': {'url': 'http://localhost:8080/v1/chat/completions', 'model': 'gpt-4'},
'lmstudio': {'url': 'http://localhost:1234/v1/chat/completions', 'model': 'local-model'},
'gemini': {'url': 'https://generativelanguage.googleapis.com', 'model': 'gemma-3-27b-it'},
'azure': {'url': None, 'model': 'azure-deployment'},
'huggingface': {'url': 'https://api-inference.huggingface.co/models/{model}/v1/chat/completions', 'model': 'meta-llama/Meta-Llama-3-8B-Instruct'}
}
import typing
if typing.TYPE_CHECKING:
from searx.search import SearchWithPlugins
from searx.extended_types import SXNG_Request
from . import PluginCfg
class SXNGPlugin(Plugin):
"""
AI Answers Plugin for SearXNG.
Injects a real-time streaming answer box synthesized from search results using LLM providers.
Supports OpenAI, OpenRouter, Gemini, Ollama, LocalAI, Azure, and Hugging Face.
"""
id = "ai_answers"
def __init__(self, plg_cfg):
def __init__(self, plg_cfg: "PluginCfg"):
super().__init__(plg_cfg)
self.info = PluginInfo(
id=self.id,
name=gettext("AI Answers Plugin"),
description=gettext("Live AI search answers using AI providers."),
description=gettext("Live AI search answers using LLM providers."),
preference_section="general",
)
self.provider = os.getenv('LLM_PROVIDER', 'openrouter').lower()
self.api_key = os.getenv('OPENROUTER_API_KEY') if self.provider == 'openrouter' else os.getenv('GEMINI_API_KEY')
self.model = os.getenv('GEMINI_MODEL', 'gemma-3-27b-it') if self.provider == 'gemini' else os.getenv('OPENROUTER_MODEL', 'google/gemma-3-27b-it:free')
try:
self.max_tokens = int(os.getenv('RESPONSE_MAX_TOKENS', 500))
except ValueError:
self.max_tokens = 500
try:
self.temperature = float(os.getenv('RESPONSE_TEMPERATURE', 0.2))
except ValueError:
self.temperature = 0.2
self.base_url = os.getenv('OPENROUTER_BASE_URL', 'openrouter.ai')
# Stable secret for multi-worker environments
self._load_config()
if self.api_key:
self.secret = os.getenv('SXNG_LLM_SECRET') or hashlib.sha256(self.api_key.encode()).hexdigest()
else:
self.secret = os.getenv('SXNG_LLM_SECRET', '')
logger.warning("AI Answers plugin: No API key configured, plugin will be inactive")
logger.warning("AI Answers: No API key configured, plugin inactive")
def _load_config(self):
self.style = os.getenv('LLM_STYLE', 'interactive')
raw_provider = os.getenv('LLM_PROVIDER', '').lower().strip()
raw_url = os.getenv('LLM_URL', '').strip()
if not raw_provider and raw_url:
url_lower = raw_url.lower()
if 'openai.com' in url_lower:
raw_provider = 'openai'
elif 'openrouter.ai' in url_lower:
raw_provider = 'openrouter'
elif ':11434' in url_lower:
raw_provider = 'ollama'
elif 'generativelanguage.googleapis.com' in url_lower:
raw_provider = 'gemini'
if not raw_provider:
logger.debug("AI Answers: No provider configured, plugin inactive")
self.provider = ''
self.model = ''
self.is_gemini = False
self.api_key = ''
return
self.provider = raw_provider if raw_provider in PROVIDER_PRESETS else 'openai'
self.is_gemini = (self.provider == 'gemini')
preset = PROVIDER_PRESETS[self.provider]
self.api_key = os.getenv('LLM_KEY', '')
if not self.api_key and self.provider in ('ollama', 'localai', 'lmstudio'):
self.api_key = 'none'
self.api_key = self.api_key.strip()
self.model = os.getenv('LLM_MODEL', preset['model']).strip()
try:
self.max_tokens = int(os.getenv('LLM_MAX_TOKENS', 500))
except ValueError:
self.max_tokens = 500
try:
self.temperature = float(os.getenv('LLM_TEMPERATURE', 0.2))
except ValueError:
self.temperature = 0.2
try:
self.context_count = max(0, int(os.getenv('LLM_CONTEXT_COUNT', 5)))
except ValueError:
self.context_count = 5
self.allowed_tabs = set(t.strip() for t in os.getenv('LLM_TABS', 'general,science,it,news').split(','))
preset_url = preset['url']
if preset_url and '{model}' in preset_url:
preset_url = preset_url.format(model=self.model)
self._parse_url(preset_url)
logger.info(f"AI Answers: {self.provider} @ {self.endpoint_host}")
def _parse_url(self, default_url):
raw_url = os.getenv('LLM_URL', '').strip() or default_url
if not raw_url.startswith(('http://', 'https://')):
raw_url = f"https://{raw_url}"
parsed = urlparse(raw_url)
self.endpoint_url = raw_url
self.endpoint_host = parsed.hostname or 'localhost'
self.endpoint_port = parsed.port
self.endpoint_path = parsed.path or '/v1/chat/completions'
if parsed.query:
self.endpoint_path += f"?{parsed.query}"
self.endpoint_ssl = (parsed.scheme == 'https')
if self.is_gemini:
return
is_local = self.endpoint_host in ('localhost', '127.0.0.1') or self.endpoint_host.startswith('127.')
if not self.endpoint_ssl and not is_local:
logger.warning(f"AI Answers: HTTP on non-localhost ({self.endpoint_host}). Credentials may be exposed.")
def _get_connection(self):
proxy_url = os.getenv('HTTPS_PROXY' if self.endpoint_ssl else 'HTTP_PROXY') or os.getenv('https_proxy' if self.endpoint_ssl else 'http_proxy')
target_host = self.endpoint_host
target_port = self.endpoint_port
target_str = f"{target_host}:{target_port}" if target_port else target_host
if proxy_url:
p = urlparse(proxy_url)
p_host = p.hostname
p_port = p.port or 8080
if p.scheme == 'https':
conn = http.client.HTTPSConnection(p_host, p_port, timeout=CONNECTION_TIMEOUT_SEC, context=ssl.create_default_context())
else:
conn = http.client.HTTPConnection(p_host, p_port, timeout=CONNECTION_TIMEOUT_SEC)
conn.set_tunnel(target_host, target_port)
return conn
# Direct Connection
if self.endpoint_ssl:
return http.client.HTTPSConnection(target_str, timeout=CONNECTION_TIMEOUT_SEC, context=ssl.create_default_context())
return http.client.HTTPConnection(target_str, timeout=CONNECTION_TIMEOUT_SEC)
def init(self, app):
@app.route('/ai-stream', methods=['POST'])
def g_stream():
def handle_ai_stream():
data = request.json or {}
token = data.get('tk', '')
q = data.get('q', '')
lang = data.get('lang', 'all')
try:
ts, sig = token.split('.', 1)
query_clean = q.strip()
expected = hashlib.sha256(f"{ts}{query_clean}{self.secret}".encode()).hexdigest()
expected = hashlib.sha256(f"{ts}{self.secret}".encode()).hexdigest()
if sig != expected or (time.time() - float(ts)) > TOKEN_EXPIRY_SEC:
abort(403)
except (ValueError, KeyError, AttributeError):
abort(403)
context_text = data.get('context', '')
if not self.api_key or not q:
return Response("Error: Missing Key", status=400)
prev_answer = (data.get('prev_answer') or '')[-4000:]
prompt = (
f"SYSTEM: Answer USER QUERY by integrating SEARCH RESULTS with expert knowledge.\n"
f"HIERARCHY: Use RESULTS for facts/data. Use KNOWLEDGE for context/synthesis.\n"
f"CONSTRAINTS: <4 sentences | Dense information | Complete thoughts.\n"
f"FALLBACK: If results are empty, answer from knowledge but note the lack of sources.\n\n"
f"SEARCH RESULTS:\n{context_text}\n\n"
f"USER QUERY: {q}\n\n"
f"ANSWER:"
)
if not self.api_key:
logger.warning(f"AI Answers: request rejected. Key loaded: {bool(self.api_key)}, Query: {bool(q)}")
return Response("Missing API key or query", status=400)
def generate_gemini():
host = "generativelanguage.googleapis.com"
path = f"/v1/models/{self.model}:streamGenerateContent?key={self.api_key}"
today = time.strftime("%Y-%m-%d")
target_words = int(self.max_tokens * 0.2)
lang_instruction = f" Respond in {lang}." if lang not in ('all', 'auto') else ""
SYSTEM = f"You are a search synthesis engine. Direct, grounded, citation-accurate. Today is {today}.{lang_instruction}"
CORE_RULES = [
"DENSITY 4/5: Expert-briefing level. No filler, no transitions. Every sentence = new information.",
f"BREVITY: {target_words} words max. Complete, not verbose.",
"CITATIONS: Cite [n] only for specific facts from sources. Max 3 total. Sentence-end only. Never cite common knowledge.",
"NO HEDGE: State answers confidently. Note uncertainty only if critical.",
]
if q == "Continue":
task = "CONTINUE: Pick up exactly where previous answer stopped. No repetition. Seamless flow."
elif prev_answer:
task = "FOLLOW-UP: Address the new question using prior context. Prioritize the new query."
else:
task = "ANSWER FIRST: Lead with the direct answer. No preamble, no context-setting."
grounding = "GROUNDING: Trust sources for current events. Use knowledge for fundamentals." if context_text else "GROUNDING: No sources available. Use knowledge and note 'based on general knowledge'."
history_rule = "HISTORY: Refer to prior exchange for context. Do not repeat." if prev_answer else None
instructions = [task] + CORE_RULES + [grounding]
if history_rule:
instructions.append(history_rule)
numbered_instructions = "\n".join(f"{i+1}. {r}" for i, r in enumerate(instructions))
prompt = f"""<system>{SYSTEM}</system>
<sources>
{context_text or 'None.'}
</sources>
<history>
{prev_answer or 'None.'}
</history>
<query>{q}</query>
<instructions>
{numbered_instructions}
</instructions>
<answer>"""
def stream_gemini():
path = f"/v1/models/{self.model}:streamGenerateContent"
conn = None
try:
conn = http.client.HTTPSConnection(host, timeout=CONNECTION_TIMEOUT_SEC, context=ssl.create_default_context())
payload = {"contents": [{"parts": [{"text": prompt}]}], "generationConfig": {"maxOutputTokens": self.max_tokens, "temperature": self.temperature}}
conn.request("POST", path, body=json.dumps(payload), headers={"Content-Type": "application/json"})
conn = self._get_connection()
payload = {"contents": [{"parts": [{"text": prompt}]}], "generationConfig": {"maxOutputTokens": self.max_tokens, "temperature": self.temperature, "stopSequences": ["</answer>"]}}
headers = {"Content-Type": "application/json", "x-goog-api-key": self.api_key}
conn.request("POST", path, body=json.dumps(payload), headers=headers)
res = conn.getresponse()
if res.status != 200:
logger.error(f"Gemini API Error {res.status}: {res.read().decode('utf-8')}")
logger.error(f"Gemini API {res.status}: {res.read().decode('utf-8')}")
return
decoder = json.JSONDecoder()
@@ -105,48 +259,48 @@ class SXNGPlugin(Plugin):
buffer = buffer[idx:]
except json.JSONDecodeError: break
except Exception as e:
logger.error(f"Gemini Stream Exception: {e}")
logger.error(f"Gemini stream error: {e}")
finally:
if conn: conn.close()
def generate_openrouter():
def stream_openai_compatible():
conn = None
try:
# Support HTTP for localhost/Ollama
is_local = self.base_url.startswith('localhost') or self.base_url.startswith('127.')
if is_local:
conn = http.client.HTTPConnection(self.base_url, timeout=CONNECTION_TIMEOUT_SEC)
else:
conn = http.client.HTTPSConnection(self.base_url, timeout=CONNECTION_TIMEOUT_SEC, context=ssl.create_default_context())
conn = self._get_connection()
payload = {
"model": self.model,
"messages": [{"role": "user", "content": prompt}],
"stream": True,
"max_tokens": self.max_tokens,
"temperature": self.temperature
"temperature": self.temperature,
"stop": ["</answer>"]
}
headers = {
"Authorization": f"Bearer {self.api_key}",
"Content-Type": "application/json",
"HTTP-Referer": "https://github.com/searxng/searxng",
"X-Title": "SearXNG LLM Plugin"
"X-Title": "SearXNG"
}
# Ollama uses /v1/... while OpenRouter uses /api/v1/...
api_path = "/v1/chat/completions" if is_local else "/api/v1/chat/completions"
conn.request("POST", api_path, body=json.dumps(payload), headers=headers)
if self.provider == 'azure':
headers['api-key'] = self.api_key
else:
headers['Authorization'] = f"Bearer {self.api_key}"
conn.request("POST", self.endpoint_path, body=json.dumps(payload), headers=headers)
res = conn.getresponse()
if res.status != 200:
logger.error(f"OpenRouter API Error {res.status}: {res.read().decode('utf-8')}")
logger.error(f"{self.provider} API {res.status}: {res.read().decode('utf-8')}")
return
decoder = json.JSONDecoder()
buffer = ""
buffer = b""
while True:
chunk = res.read(128)
if not chunk: break
buffer += chunk.decode('utf-8')
while "\n" in buffer:
line, buffer = buffer.split("\n", 1)
buffer += chunk
while b"\n" in buffer:
line_bytes, buffer = buffer.split(b"\n", 1)
line = line_bytes.decode('utf-8', errors='replace')
if line.startswith("data: "):
data_str = line[6:].strip()
if data_str == "[DONE]": return
@@ -157,11 +311,11 @@ class SXNGPlugin(Plugin):
except json.JSONDecodeError:
pass
except Exception as e:
logger.error(f"OpenRouter Stream Exception: {e}")
logger.error(f"{self.provider} stream error: {e}")
finally:
if conn: conn.close()
generator = generate_openrouter if self.provider == 'openrouter' else generate_gemini
generator = stream_gemini if self.is_gemini else stream_openai_compatible
return Response(generator(), mimetype='text/event-stream', headers={
'X-Accel-Buffering': 'no',
'Cache-Control': 'no-cache, no-store',
@@ -170,106 +324,460 @@ class SXNGPlugin(Plugin):
})
return True
def post_search(self, request, search) -> EngineResults:
def post_search(self, request: "SXNG_Request", search: "SearchWithPlugins") -> EngineResults:
results = EngineResults()
try:
if not self.active or not self.api_key or search.search_query.pageno > 1:
current_tabs = set(search.search_query.categories)
if not current_tabs: current_tabs = {'general'}
if not self.active or not self.api_key or search.search_query.pageno > 1 or not self.allowed_tabs.intersection(current_tabs):
return results
raw_results = search.result_container.get_ordered_results()
context_list = [f"[{i+1}] {r.get('title')}: {r.get('content')}" for i, r in enumerate(raw_results[:6])]
context_list = []
for i, r in enumerate(raw_results[:self.context_count]):
domain = urlparse(r.get('url', '')).netloc
date = r.get('publishedDate')
date_str = f" ({date})" if date else ""
title = r.get('title') or ""
context_list.append(f"[{i+1}] {domain}{date_str}: {title}: {str(r.get('content', ''))[:500]}")
context_str = "\n".join(context_list)
# Stateless Handshake
ts = str(int(time.time()))
q_clean = search.search_query.query.strip()
sig = hashlib.sha256(f"{ts}{q_clean}{self.secret}".encode()).hexdigest()
lang = search.search_query.lang
sig = hashlib.sha256(f"{ts}{self.secret}".encode()).hexdigest()
tk = f"{ts}.{sig}"
b64_context = base64.b64encode(context_str.encode('utf-8')).decode('utf-8')
js_q = json.dumps(q_clean)
js_lang = json.dumps(lang)
js_urls = json.dumps([r.get('url') for r in raw_results[:self.context_count]])
is_interactive = (self.style == 'interactive')
# Conditional CSS for interactive mode
interactive_css = '''
@keyframes sxng-fade-in-up {
0% { opacity: 0; transform: translateY(10px); }
100% { opacity: 1; transform: translateY(0); }
}
.sxng-footer {
display: flex;
align-items: center;
gap: 0.5rem;
margin-top: 0.5rem;
opacity: 0;
animation: sxng-fade-in-up 0.5s ease-out forwards;
}
.sxng-btn {
display: inline-flex;
align-items: center;
justify-content: center;
width: 32px;
height: 32px;
padding: 0;
border: 1px solid transparent;
border-radius: 6px;
background: transparent;
color: var(--color-base-font, #333);
cursor: pointer;
transition: all 0.2s ease;
opacity: 0.6;
}
.sxng-btn:hover {
background: var(--color-base-background-hover, rgba(0,0,0,0.05));
color: var(--color-result-link, #5e81ac);
opacity: 1;
transform: translateY(-1px);
}
.sxng-btn svg { width: 18px; height: 18px; fill: currentColor; }
.sxng-input-wrapper {
flex-grow: 1;
display: flex;
align-items: center;
margin: 0 0.5rem;
position: relative;
}
.sxng-input {
width: 100%;
background: transparent;
border: none;
color: var(--color-base-font, #333);
font-family: -apple-system, BlinkMacSystemFont, "Segoe UI", Roboto, Helvetica, Arial, sans-serif;
font-size: 16px;
padding: 0.5rem 2.5rem 0.5rem 0;
opacity: 0.8;
transition: opacity 0.2s;
}
.sxng-input:focus { outline: none; opacity: 1; }
.sxng-input::placeholder { color: var(--color-base-font, #333); opacity: 0.35; }
.sxng-input-line {
position: absolute;
bottom: 0;
left: 0;
width: 0;
height: 1px;
background: var(--color-result-link, #5e81ac);
transition: width 0.3s ease;
}
.sxng-input:focus + .sxng-input-line { width: 100%; }
.sxng-user-msg {
display: block;
width: fit-content;
max-width: 80%;
margin: 1rem 0 1rem auto;
padding: 0.5rem 0.8rem;
background: var(--color-base-background-hover, rgba(0,0,0,0.05));
border-radius: 12px 12px 0 12px;
color: var(--color-base-font, #333);
font-size: 0.9rem;
line-height: 1.5;
animation: sxng-fade-in-up 0.3s ease-out forwards;
border: 1px solid var(--color-base-border, rgba(0,0,0,0.1));
}
.sxng-input-submit {
position: absolute;
right: 0;
top: 50%;
transform: translateY(-50%);
background: none;
border: none;
padding: 8px;
color: var(--color-base-font, #333);
cursor: pointer;
opacity: 0.3;
transition: all 0.2s ease;
}
.sxng-input-wrapper:focus-within .sxng-input-submit,
.sxng-input-submit:hover { opacity: 1; color: var(--color-result-link, #5e81ac); }
.sxng-input-submit svg { width: 18px; height: 18px; fill: currentColor; }
''' if is_interactive else ''
# Conditional HTML for interactive footer
interactive_html = '''
<div id="sxng-footer" class="sxng-footer" style="display:none;">
<button class="sxng-btn" id="btn-copy" title="Copy to clipboard">
<svg viewBox="0 0 24 24"><path d="M16 1H4C2.9 1 2 1.9 2 3V17H4V3H16V1M19 5H8C6.9 5 6 5.9 6 7V21C6 22.1 6.9 23 8 23H19C20.1 23 21 22.1 21 21V7C21 5.9 20.1 5 19 5M19 21H8V7H19V21Z"/></svg>
</button>
<button class="sxng-btn" id="btn-regen" title="Regenerate answer">
<svg viewBox="0 0 24 24"><path d="M17.65 6.35C16.2 4.9 14.21 4 12 4C7.58 4 4.01 7.58 4.01 12C4.01 16.42 7.58 20 12 20C15.73 20 18.84 17.45 19.73 14H17.65C16.83 16.33 14.61 18 12 18C8.69 18 6 15.31 6 12C6 8.69 8.69 6 12 6C13.66 6 15.14 6.69 16.22 7.78L13 11H20V4L17.65 6.35Z"/></svg>
</button>
<form id="sxng-action-form" class="sxng-input-wrapper" onsubmit="event.preventDefault();">
<input type="text" id="sxng-action-input" class="sxng-input" placeholder="Ask..." aria-label="Ask follow-up" autocomplete="off">
<div class="sxng-input-line"></div>
<button type="submit" id="btn-action" class="sxng-input-submit" title="Send / Continue">
<svg viewBox="0 0 24 24"><path d="M19,7V11H5.83L9.41,7.41L8,6L2,12L8,18L9.41,16.59L5.83,13H21V7H19Z"/></svg>
</button>
</form>
</div>
''' if is_interactive else ''
# Conditional JS for interactive handlers
interactive_js_init = '''
const footer = document.getElementById('sxng-footer');
const input = document.getElementById('sxng-action-input');
document.getElementById('btn-copy').onclick = async (e) => {
const btn = e.currentTarget;
const originalContent = btn.innerHTML;
const text = Array.from(data.childNodes)
.filter(n => n.nodeType === 3 || n.tagName === 'SPAN')
.map(n => n.textContent)
.join('');
await navigator.clipboard.writeText(text);
btn.innerHTML = '<svg viewBox="0 0 24 24" style="color:#a3be8c;"><path d="M9 16.17L4.83 12L3.41 13.41L9 19L21 7L19.59 5.59L9 16.17Z"/></svg>';
setTimeout(() => btn.innerHTML = originalContent, 2000);
};
document.getElementById('btn-regen').onclick = () => {
data.innerHTML = '<span class="sxng-cursor"></span>';
footer.style.display = 'none';
startStream();
};
const handleAction = (e) => {
if (e) e.preventDefault();
const val = input.value.trim();
const currentText = Array.from(data.childNodes)
.filter(n => n.nodeType === 3 || n.tagName === 'SPAN')
.map(n => {
if (n.classList && n.classList.contains('sxng-user-msg')) {
return '\\n\\nQ: ' + n.textContent + '\\nA: ';
}
return n.textContent;
})
.join('');
input.value = '';
input.blur();
footer.style.display = 'none';
if (val) {
const cursor = data.querySelector('.sxng-cursor');
if (cursor) cursor.remove();
const userMsg = document.createElement('span');
userMsg.className = 'sxng-user-msg';
userMsg.textContent = val;
data.appendChild(userMsg);
const newCursor = document.createElement('span');
newCursor.className = 'sxng-cursor';
data.appendChild(newCursor);
startStream(val, currentText);
} else {
const cursor = data.querySelector('.sxng-cursor');
if (cursor) cursor.remove();
data.appendChild(document.createElement('br'));
data.appendChild(document.createElement('br'));
const newCursor = document.createElement('span');
newCursor.className = 'sxng-cursor';
data.appendChild(newCursor);
startStream("Continue", currentText);
}
};
document.getElementById('sxng-action-form').onsubmit = handleAction;
input.onfocus = () => {
setTimeout(() => {
input.scrollIntoView({behavior: 'smooth', block: 'center'});
}, 300);
};
''' if is_interactive else ''
interactive_js_complete = "footer.style.display = 'flex';" if is_interactive else ''
# Streaming function signature differs between modes
stream_fn_sig = 'async function startStream(overrideQ = null, prevAnswer = null)' if is_interactive else 'async function startStream()'
stream_q = 'overrideQ || q_init' if is_interactive else 'q_init'
stream_body = f'''prev_answer: prevAnswer''' if is_interactive else ''
html_payload = f'''
<article id="sxng-stream-box" class="answer" style="display:none; margin-bottom: 1rem;">
<article id="sxng-stream-box" class="answer" style="display:none; margin: 1rem 0;">
<style>
@keyframes sxng-blink {{ 0%, 100% {{ opacity: 1; }} 50% {{ opacity: 0; }} }}
@keyframes sxng-pulse {{ 0%, 100% {{ opacity: 0.4; }} 50% {{ opacity: 0.9; }} }}
@keyframes sxng-fade-pulse {{
0%, 100% {{ opacity: 0.3; }}
50% {{ opacity: 1; }}
}}
@keyframes sxng-fade-in {{
0% {{ opacity: 0; filter: blur(3px); transform: translateY(2px); }}
100% {{ opacity: 1; filter: blur(0); transform: translateY(0); }}
}}
#sxng-stream-data {{
position: relative;
margin: 0;
min-height: 1.5em;
}}
.sxng-cursor {{
display: inline-block; width: 0.5rem; height: 1rem;
background: var(--color-result-description);
margin-left: 2px; vertical-align: middle;
animation: sxng-blink 1s step-end infinite;
display: inline-block;
width: 0.6em;
height: 1.2em;
background: var(--color-result-link, #5e81ac);
vertical-align: text-bottom;
animation: sxng-fade-pulse 1s ease-in-out infinite;
margin-right: 0.2rem;
border-radius: 2px;
}}
.sxng-thinking {{
color: var(--color-result-description);
font-style: italic;
animation: sxng-pulse 1.5s ease-in-out infinite;
.sxng-chunk {{
opacity: 0;
animation: sxng-fade-in 0.4s cubic-bezier(0.2, 0.9, 0.1, 1.0) forwards;
will-change: opacity, filter, transform;
}}
{interactive_css}
</style>
<p id="sxng-stream-data" style="white-space: pre-wrap; color: var(--color-result-description); font-size: 0.95rem;"></p>
<p id="sxng-stream-data" style="white-space: pre-wrap; color: var(--color-result-description); font-size: 0.95rem; margin:0;"><span class="sxng-cursor"></span></p>
{interactive_html}
<script>
(async () => {{
const q = {js_q};
const b64 = "{b64_context}";
const tk = "{tk}";
const q_init = {js_q};
const lang_init = {js_lang};
const urls = {js_urls};
const b64_init = "{b64_context}";
const tk_init = "{tk}";
const box = document.getElementById('sxng-stream-box');
const data = document.getElementById('sxng-stream-data');
const wrapper = box.closest('.answer');
if (wrapper) wrapper.style.display = 'none';
try {{
const ctx = new TextDecoder().decode(Uint8Array.from(atob(b64), c => c.charCodeAt(0)));
{interactive_js_init}
// Show "Thinking..." placeholder while waiting for LLM
data.innerHTML = '<span class="sxng-thinking">Thinking...</span>';
{stream_fn_sig} {{
try {{
const ctx = new TextDecoder().decode(Uint8Array.from(atob(b64_init), c => c.charCodeAt(0)));
if (wrapper) wrapper.style.display = '';
box.style.display = 'block';
const controller = new AbortController();
const timeoutId = setTimeout(() => controller.abort(), 60000);
const finalQ = {stream_q};
const bodyObj = {{ q: finalQ, lang: lang_init, context: ctx, tk: tk_init{', ' + stream_body if stream_body else ''} }};
const res = await fetch('/ai-stream', {{
method: 'POST',
headers: {{ 'Content-Type': 'application/json' }},
body: JSON.stringify({{ q: q, context: ctx, tk: tk }}),
body: JSON.stringify(bodyObj),
signal: controller.signal
}});
clearTimeout(timeoutId);
if (!res.ok) {{ if (wrapper) wrapper.remove(); else box.remove(); return; }}
if (!res.ok) {{
const errSpan = document.createElement('span');
errSpan.style.color = '#bf616a';
errSpan.textContent = "Error: " + res.statusText;
data.appendChild(errSpan);
return;
}}
const reader = res.body.getReader();
const decoder = new TextDecoder();
const cursor = document.createElement('span');
let cursor = data.querySelector('.sxng-cursor');
if (!cursor) {{
cursor = document.createElement('span');
cursor.className = 'sxng-cursor';
data.appendChild(cursor);
}}
let started = false;
let pendingSpace = '';
while (true) {{
const {{done, value}} = await reader.read();
if (done) break;
const chunk = decoder.decode(value);
const chunk = decoder.decode(value, {{stream: true}});
if (chunk) {{
let text = chunk;
if (!started) {{
text = text.replace(/^[\\s.,;:!?]+/, '');
if (!text) continue;
data.textContent = ''; // Clear "Thinking..."
data.appendChild(cursor);
if (cursor && !cursor.isConnected) data.appendChild(cursor);
started = true;
}}
cursor.before(text);
if (text.trim().length === 0) {{
pendingSpace += text;
continue;
}}
if (pendingSpace) {{
const s = document.createElement('span');
s.className = 'sxng-chunk';
s.textContent = pendingSpace;
cursor.before(s);
pendingSpace = '';
}}
const span = document.createElement('span');
span.className = 'sxng-chunk';
span.textContent = text;
cursor.before(span);
if (text.includes(']')) {{
processLastCitation();
}}
}}
cursor.remove();
data.textContent = data.textContent.trimEnd();
if (!started) {{ if (wrapper) wrapper.remove(); else box.remove(); }}
}} catch (e) {{ console.error(e); if (wrapper) wrapper.remove(); else box.remove(); }}
}}
if (cursor) cursor.remove();
let last = data.lastChild;
while (last) {{
if (last.textContent && last.textContent.trim().length === 0) {{
const prev = last.previousSibling;
last.remove();
last = prev;
}} else {{
if (last.textContent) last.textContent = last.textContent.trimEnd();
break;
}}
}}
if (!started) {{
if (box.parentElement) box.parentElement.remove();
else box.remove();
return;
}}
{interactive_js_complete}
function processLastCitation() {{
let node = cursor ? cursor.previousSibling : data.lastChild;
let nodesRaw = [];
let buffer = '';
while (node && nodesRaw.length < 20) {{
if (node.tagName === 'SPAN' && node.className === 'sxng-chunk') {{
const content = node.textContent;
buffer = content + buffer;
nodesRaw.unshift(node);
if (content.includes('[')) break;
}} else {{
break;
}}
node = node.previousSibling;
}}
const re = /(?:\\\\)?\\[\\s*(\\d{{1,2}}(?:\\s*,\\s*\\d{{1,2}})*)\\s*(?:\\\\)?\\]/g;
let match, lastMatch;
while ((match = re.exec(buffer)) !== null) {{
lastMatch = match;
}}
if (lastMatch) {{
const before = buffer.substring(0, lastMatch.index);
const citationBody = lastMatch[1];
const after = buffer.substring(lastMatch.index + lastMatch[0].length);
nodesRaw.forEach(n => n.remove());
const fragment = document.createDocumentFragment();
if (before) {{
const s = document.createElement('span');
s.className = 'sxng-chunk';
s.textContent = before;
fragment.appendChild(s);
}}
citationBody.split(/\\s*,\\s*/).forEach(n => {{
const url = urls[parseInt(n)-1];
if (url) {{
const a = document.createElement('a');
a.href = url;
a.target = '_blank';
a.style.cssText = 'text-decoration:none;color:var(--color-result-link);font-weight:bold;';
a.textContent = `[${{n}}]`;
a.className = 'sxng-chunk';
fragment.appendChild(a);
}} else {{
const s = document.createElement('span');
s.className = 'sxng-chunk';
s.textContent = `[${{n}}]`;
fragment.appendChild(s);
}}
}});
if (after) {{
const s = document.createElement('span');
s.className = 'sxng-chunk';
s.textContent = after;
fragment.appendChild(s);
}}
if (cursor) cursor.before(fragment);
else data.appendChild(fragment);
}}
}}
}} catch (e) {{
console.error(e);
if (box.parentElement) box.parentElement.remove();
else box.remove();
}}
}}
startStream();
}})();
</script>
</article>
'''
search.result_container.answers.add(results.types.Answer(answer=Markup(html_payload)))
except Exception as e:
logger.error(f"AI Answers plugin error: {e}")
logger.error(f"AI Answers: {e}")
return results
+153
View File
@@ -0,0 +1,153 @@
"""
AI Answers Plugin - Interactive Demo Server
Simulates SearXNG environment for local development and testing.
Usage: python demo.py
Then visit: http://localhost:5000/?q=your+query+here
Requires: pip install flask flask-babel python-dotenv
"""
import sys
import os
import logging
from types import ModuleType
from flask import Flask, request
from dotenv import load_dotenv
logging.basicConfig(level=logging.INFO, format='%(levelname)s: %(message)s')
load_dotenv()
os.environ.setdefault('LLM_STYLE', 'interactive')
# Mock SearXNG modules
searx = ModuleType("searx")
searx_plugins = ModuleType("searx.plugins")
searx_results = ModuleType("searx.result_types")
class MockPlugin:
def __init__(self, cfg):
self.active = getattr(cfg, 'active', True)
class MockPluginInfo:
def __init__(self, **kwargs):
self.meta = kwargs
class MockEngineResults:
def __init__(self):
self.types = ModuleType("types")
self.types.Answer = lambda *args, **kwargs: kwargs.get('answer', args[0] if args else "")
self._results = []
def add(self, res):
self._results.append(res)
searx_plugins.Plugin = MockPlugin
searx_plugins.PluginInfo = MockPluginInfo
searx_results.EngineResults = MockEngineResults
sys.modules["searx"] = searx
sys.modules["searx.plugins"] = searx_plugins
sys.modules["searx.result_types"] = searx_results
from ai_answers import SXNGPlugin
from flask_babel import Babel
app = Flask(__name__)
babel = Babel(app)
class MockConfig:
active = True
plugin = SXNGPlugin(MockConfig())
plugin.init(app)
@app.route("/")
def index():
query = request.args.get("q", "why is the sky blue")
class MockSearchQuery:
pageno = 1
lang = 'en'
categories = ['general']
MockSearchQuery.query = query
class MockSearch:
search_query = MockSearchQuery()
class MockResultContainer:
def __init__(self):
self.answers = set()
def get_ordered_results(self):
if 'quantum' in query.lower():
return [
{"title": "IBM Quantum", "content": "Quantum computers rely on qubits, which can represent 0, 1, or both via superposition. They solve complex problems faster.", "url": "https://www.ibm.com/quantum", "publishedDate": "2026-01-15"},
{"title": "Nature Physics", "content": "Entanglement allows qubits to be correlated instantly across distances. This is key for quantum cryptography and teleportation.", "url": "https://nature.com/articles/quantum", "publishedDate": "2026-01-10"},
{"title": "Wikipedia", "content": "Quantum computing uses quantum mechanics. Major applications include drug discovery and materials science.", "url": "https://en.wikipedia.org/wiki/Quantum_computing", "publishedDate": "2025-12-01"}
]
return [
{"title": "Wikipedia", "content": "The sky appears blue due to Rayleigh scattering of sunlight.", "url": "https://en.wikipedia.org/wiki/Rayleigh_scattering", "publishedDate": "2026-01-15"},
{"title": "NASA Science", "content": "Shorter blue wavelengths scatter more than longer red wavelengths.", "url": "https://science.nasa.gov/blue-sky", "publishedDate": "2026-01-10"},
{"title": "Physics Today", "content": "The atmosphere acts as a filter, scattering blue light in all directions.", "url": "https://physicstoday.org/atmosphere", "publishedDate": "2026-01-01"}
]
result_container = MockResultContainer()
search = MockSearch()
plugin.post_search(None, search)
injection_html = ""
if search.result_container.answers:
injection_html = list(search.result_container.answers)[0]
return f"""
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>AI Answers Demo</title>
<style>
body {{
font-family: -apple-system, BlinkMacSystemFont, 'Segoe UI', sans-serif;
padding: 2rem;
max-width: 800px;
margin: 0 auto;
background: #2e3440;
color: #eceff4;
}}
:root {{
--color-result-border: #3b4252;
--color-result-description: #d8dee9;
--color-base-font: #88c0d0;
--color-result-link: #81a1c1;
}}
h1 {{ color: #88c0d0; }}
.meta {{ color: #81a1c1; font-size: 0.9rem; }}
hr {{ border-color: #4c566a; }}
a {{ color: #88c0d0; }}
</style>
</head>
<body>
<div style="margin-top: 2rem;"></div>
<p class="meta">Provider: <strong>{plugin.provider or 'Not configured'}</strong> | Model: <strong>{plugin.model or 'N/A'}</strong></p>
<p>Query: <strong>{query}</strong></p>
<hr>
{injection_html if injection_html else '<p style="color:#f66;">Plugin inactive. Set LLM_PROVIDER and LLM_KEY in .env</p>'}
<hr>
<p class="meta">Try: <a href="/?q=what+is+quantum+computing">/?q=what+is+quantum+computing</a></p>
</body>
</html>
"""
if __name__ == "__main__":
print()
print("=" * 50)
print(" AI Answers Plugin - Demo Server")
print("=" * 50)
print(f" Provider: {plugin.provider or 'NOT SET'}")
print(f" Model: {plugin.model or 'N/A'}")
print(f" Style: {plugin.style}")
print(f" Status: {'Active' if plugin.api_key else 'Inactive (no LLM_KEY)'}")
print("=" * 50)
print(" http://localhost:5000/?q=your+query+here")
print("=" * 50)
print()
app.run(debug=True, port=5000)
+186
View File
@@ -0,0 +1,186 @@
"""
AI Answers Plugin - One-Shot Test
Comprehensive test that outputs everything: config, injection, LLM response.
Usage: python test.py
Requires: pip install flask flask-babel python-dotenv
"""
import sys
import os
import re
import time
import logging
from types import ModuleType
from dotenv import load_dotenv
load_dotenv()
# Suppress Flask noise during test
logging.getLogger('werkzeug').setLevel(logging.ERROR)
logging.basicConfig(level=logging.INFO, format='%(message)s')
# Mock SearXNG modules
searx = ModuleType("searx")
searx_plugins = ModuleType("searx.plugins")
searx_results = ModuleType("searx.result_types")
class MockPlugin:
def __init__(self, cfg):
self.active = getattr(cfg, 'active', True)
class MockPluginInfo:
def __init__(self, **kwargs):
self.meta = kwargs
class MockEngineResults:
def __init__(self):
self.types = ModuleType("types")
self.types.Answer = lambda *args, **kwargs: kwargs.get('answer', args[0] if args else "")
self._results = []
def add(self, res):
self._results.append(res)
searx_plugins.Plugin = MockPlugin
searx_plugins.PluginInfo = MockPluginInfo
searx_results.EngineResults = MockEngineResults
sys.modules["searx"] = searx
sys.modules["searx.plugins"] = searx_plugins
sys.modules["searx.result_types"] = searx_results
from flask import Flask
from flask_babel import Babel
from ai_answers import SXNGPlugin
def run_tests():
print()
print("=" * 60)
print(" AI Answers Plugin - Comprehensive Test")
print("=" * 60)
# === CONFIG TEST ===
print("\n[1/4] Configuration")
print("-" * 40)
app = Flask(__name__)
Babel(app)
class MockConfig:
active = True
plugin = SXNGPlugin(MockConfig())
plugin.init(app)
print(f" Provider: {plugin.provider or 'NOT SET'}")
print(f" Model: {plugin.model or 'N/A'}")
print(f" API Key: {'[OK]' if plugin.api_key else '[MISSING]'}")
print(f" Max Tokens: {getattr(plugin, 'max_tokens', 'N/A')}")
print(f" Temperature: {getattr(plugin, 'temperature', 'N/A')}")
print(f" Context Count: {getattr(plugin, 'context_count', 'N/A')}")
print(f" Allowed Tabs: {getattr(plugin, 'allowed_tabs', 'N/A')}")
if not plugin.api_key:
print("\n" + "=" * 60)
print(" SKIPPED: No LLM_KEY configured")
print(" Set LLM_PROVIDER and LLM_KEY in .env to run full test")
print("=" * 60)
return False
# === INJECTION TEST ===
print("\n[2/4] HTML Injection")
print("-" * 40)
class MockSearchQuery:
pageno = 1
query = "why is the sky blue"
lang = 'en'
categories = ['general']
class MockSearch:
search_query = MockSearchQuery()
class MockResultContainer:
def __init__(self):
self.answers = set()
def get_ordered_results(self):
return [
{"title": "Wikipedia", "content": "The sky appears blue due to Rayleigh scattering.", "url": "https://example.com/1", "publishedDate": "2026-01-15"},
{"title": "NASA", "content": "Blue wavelengths scatter more than red.", "url": "https://example.com/2", "publishedDate": "2026-01-10"},
]
result_container = MockResultContainer()
search = MockSearch()
plugin.post_search(None, search)
if not search.result_container.answers:
print(" FAIL: No HTML injected")
return False
html = str(list(search.result_container.answers)[0])
has_box = 'id="sxng-stream-box"' in html
has_endpoint = '/ai-stream' in html
token_match = re.search(r'const tk = "(.*?)";', html)
has_token = bool(token_match)
print(f" Stream box: {'[OK]' if has_box else '[FAIL]'}")
print(f" Endpoint ref: {'[OK]' if has_endpoint else '[FAIL]'}")
print(f" Auth token: {'[OK]' if has_token else '[FAIL]'}")
print(f" HTML size: {len(html):,} bytes")
if not (has_box and has_endpoint and has_token):
print(" FAIL: Missing required elements")
return False
# === STREAM ENDPOINT TEST ===
print("\n[3/4] Stream Endpoint")
print("-" * 40)
with app.test_client() as client:
payload = {
"q": "why is the sky blue",
"context": "[1] Wikipedia: The sky appears blue due to Rayleigh scattering.",
"lang": "en",
"tk": token_match.group(1)
}
start = time.time()
response = client.post('/ai-stream', json=payload)
elapsed = time.time() - start
print(f" Status: {response.status_code}")
print(f" Time: {elapsed:.2f}s")
if response.status_code != 200:
print(f" FAIL: Expected 200, got {response.status_code}")
return False
# === LLM RESPONSE TEST ===
print("\n[4/4] LLM Response")
print("-" * 40)
data = response.data.decode('utf-8')
print(f" Bytes: {len(data):,}")
print(f" Words: ~{len(data.split())}")
if len(data) < 10:
print(" FAIL: Response too short (API error?)")
return False
print("\n --- Response Preview ---")
preview = data[:500] + ("..." if len(data) > 500 else "")
for line in preview.split('\n'):
print(f" {line}")
print(" --- End Preview ---")
# === SUMMARY ===
print("\n" + "=" * 60)
print(" ALL TESTS PASSED")
print("=" * 60)
return True
if __name__ == "__main__":
success = run_tests()
sys.exit(0 if success else 1)
-148
View File
@@ -1,148 +0,0 @@
import sys
import os
import logging
from types import ModuleType
from flask import Flask, request
from dotenv import load_dotenv
logging.basicConfig(level=logging.INFO)
load_dotenv()
searx = ModuleType("searx")
searx_plugins = ModuleType("searx.plugins")
searx_results = ModuleType("searx.result_types")
class MockPlugin:
def __init__(self, cfg):
self.active = getattr(cfg, 'active', True)
class MockPluginInfo:
def __init__(self, **kwargs):
self.meta = kwargs
class MockEngineResults:
def __init__(self):
self.types = ModuleType("types")
self.types.Answer = lambda *args, **kwargs: kwargs.get('answer', args[0] if args else "")
self._results = []
def add(self, res):
self._results.append(res)
searx_plugins.Plugin = MockPlugin
searx_plugins.PluginInfo = MockPluginInfo
searx_results.EngineResults = MockEngineResults
sys.modules["searx"] = searx
sys.modules["searx.plugins"] = searx_plugins
sys.modules["searx.result_types"] = searx_results
from ai_answers import SXNGPlugin
from flask_babel import Babel
app = Flask(__name__)
babel = Babel(app)
class MockConfig:
active = True
plugin = SXNGPlugin(MockConfig())
plugin.init(app)
@app.route("/")
def index():
class MockSearchQuery:
pageno = 1
query = request.args.get("q", "why is the sky blue")
class MockSearch:
search_query = MockSearchQuery()
class MockResultContainer:
def __init__(self):
self.answers = set()
def get_ordered_results(self):
return [
{"title": "Fact About Sky", "content": "The sky is blue because of Rayleigh scattering."},
{"title": "Atmosphere Info", "content": "The atmosphere scatters shorter blue wavelengths more than red ones."},
{"title": "NASA Science", "content": "Sunlight reaches Earth's atmosphere and is scattered in all directions by gases."}
]
result_container = MockResultContainer()
search = MockSearch()
plugin.post_search(None, search)
injection_html = ""
if search.result_container.answers:
injection_html = list(search.result_container.answers)[0]
return f"""
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>Plugin Test</title>
<style>
body {{ font-family: sans-serif; padding: 2rem; max-width: 800px; margin: 0 auto; }}
:root {{
--color-result-border: #ccc;
--color-result-description: #333;
}}
</style>
</head>
<body>
<h1>LLM Plugin Test</h1>
<p>Provider: <strong>{plugin.provider}</strong> | Model: <strong>{plugin.model}</strong></p>
<p>Testing query: <strong>{MockSearch.search_query.query}</strong></p>
<hr>
{injection_html}
</body>
</html>
"""
import unittest
class PluginTestCase(unittest.TestCase):
def setUp(self):
self.app = app.test_client()
self.app.testing = True
def test_html_injection(self):
response = self.app.get('/')
content = response.data.decode('utf-8')
self.assertIn('<article id="sxng-stream-box"', content)
self.assertIn('/ai-stream', content)
def test_stream_endpoint(self):
# Trigger index to generate a response containing the token
response = self.app.get('/')
content = response.data.decode('utf-8')
# Extract the token from the injected script (tk = "...")
import re
match = re.search(r'const tk = "(.*?)";', content)
if not match:
self.fail("Handshake token not found in injection")
token = match.group(1)
# Check for the appropriate key based on provider
key = os.getenv("OPENROUTER_API_KEY") if plugin.provider == 'openrouter' else os.getenv("GEMINI_API_KEY")
if not key:
self.skipTest(f"API Key for {plugin.provider} not set")
payload = {
"q": "why is the sky blue",
"context": "The sky is blue because of Rayleigh scattering.",
"tk": token
}
response = self.app.post('/ai-stream', json=payload)
self.assertEqual(response.status_code, 200)
# If the API returns a 404/429, data will be empty due to silent error handling.
# This test ensures the endpoint exists and responds with 200.
data = response.data.decode('utf-8')
print(f"\n[Test] Received {len(data)} bytes from {plugin.provider}")
if __name__ == "__main__":
unittest.main()