import json, os, logging, base64, time, hashlib, re, http.client, ssl, concurrent.futures
from urllib.parse import urlparse
from searx import network
try:
from searx.network import get_network
except ImportError:
get_network = None
from flask import Response, request, abort, jsonify
from searx.plugins import Plugin, PluginInfo
from searx.result_types import Answer
from searx import settings
from flask_babel import gettext
from markupsafe import Markup
logger = logging.getLogger(__name__)
TOKEN_EXPIRY_SEC = 3600
STREAM_CHUNK_SIZE = 512
STREAM_TIMEOUT_SEC = 60
def _get_streaming_connection(url: str, verify_ssl: bool = True):
parsed = urlparse(url)
host = parsed.hostname
port = parsed.port or (443 if parsed.scheme == 'https' else 80)
path = parsed.path + ('?' + parsed.query if parsed.query else '')
if verify_ssl and get_network is not None:
try:
net = get_network()
verify_ssl = getattr(net, 'verify', True)
except Exception:
pass
if parsed.scheme == 'https':
if not verify_ssl:
ctx = ssl._create_unverified_context()
else:
try:
import certifi
ctx = ssl.create_default_context(cafile=certifi.where())
except ImportError:
ctx = ssl.create_default_context()
conn = http.client.HTTPSConnection(host, port, timeout=STREAM_TIMEOUT_SEC, context=ctx)
else:
conn = http.client.HTTPConnection(host, port, timeout=STREAM_TIMEOUT_SEC)
return conn, path
PLUGIN_NAME = "AI Answers"
DEFAULT_TABS = "general,science,it,news"
# UI assets
_ASSETS = os.path.join(os.path.dirname(__file__), 'assets')
INTERACTIVE_CSS = open(os.path.join(_ASSETS, 'ui.css')).read()
INTERACTIVE_HTML = open(os.path.join(_ASSETS, 'ui.html')).read()
_js_raw = open(os.path.join(_ASSETS, 'ui.js')).read()
FRONTEND_JS_TEMPLATE = _js_raw.split('// === CITATION_HELPER_JS ===')[0].replace('// === FRONTEND_JS_TEMPLATE ===', '').strip()
CITATION_HELPER_JS = _js_raw.split('// === CITATION_HELPER_JS ===')[1].split('// === INTERACTIVE_JS ===')[0].strip()
INTERACTIVE_JS = _js_raw.split('// === INTERACTIVE_JS ===')[1].strip()
import typing
if typing.TYPE_CHECKING:
from searx.search import SearchWithPlugins
from searx.extended_types import SXNG_Request
from . import PluginCfg
class SXNGPlugin(Plugin):
id = "ai_answers"
def __init__(self, plg_cfg: "PluginCfg"):
super().__init__(plg_cfg)
self.info = PluginInfo(
id=self.id,
name=gettext(f"{PLUGIN_NAME} Plugin"),
description=gettext("Live AI search answers using LLM providers."),
preference_section="general",
)
self._load_config()
def _load_config(self):
self.interactive = os.getenv('LLM_INTERACTIVE', 'true').lower().strip() in ('true', '1', 'yes', 'on')
self.question_mark_required = os.getenv('LLM_QUESTION_MARK_REQUIRED', 'false').lower().strip() in ('true', '1', 'yes', 'on')
raw_url = os.getenv('LLM_URL', 'http://ollama:11434/v1/chat/completions').strip()
if not raw_url.startswith(('http://', 'https://')):
raw_url = f"http://{raw_url}"
self.endpoint_url = raw_url
self.api_key = 'ollama'
self.model = os.getenv('LLM_MODEL', 'qwen3.5:9b').strip()
try:
self.max_tokens = max(1, int(os.getenv('LLM_MAX_TOKENS', 200)))
except ValueError:
logger.warning(f"{PLUGIN_NAME}: Invalid LLM_MAX_TOKENS value. Enforcing default (200).")
self.max_tokens = 200
try:
self.temperature = float(os.getenv('LLM_TEMPERATURE', 0.2))
except ValueError:
logger.warning(f"{PLUGIN_NAME}: Invalid LLM_TEMPERATURE value. Enforcing default (0.2).")
self.temperature = 0.2
try:
self.context_deep_count = max(0, int(os.getenv('LLM_CONTEXT_DEEP_COUNT', 5)))
except ValueError:
logger.warning(f"{PLUGIN_NAME}: Invalid LLM_CONTEXT_DEEP_COUNT value. Enforcing default (5).")
self.context_deep_count = 5
try:
self.context_shallow_count = max(0, int(os.getenv('LLM_CONTEXT_SHALLOW_COUNT', 15)))
except ValueError:
logger.warning(f"{PLUGIN_NAME}: Invalid LLM_CONTEXT_SHALLOW_COUNT value. Enforcing default (15).")
self.context_shallow_count = 15
self.allowed_tabs = set(t.strip() for t in os.getenv('LLM_TABS', DEFAULT_TABS).split(','))
server_secret = settings.get('server', {}).get('secret_key', '')
self.secret = hashlib.sha256(f"ai_answers_{server_secret}".encode()).hexdigest()
self.system_prompt = os.getenv('LLM_SYSTEM_PROMPT', '').strip()
def _parse_aux_results(self, raw_results, raw_infoboxes, raw_answers):
results = []
limit = self.context_deep_count + self.context_shallow_count
for r in raw_results[:limit]:
# MainResult (attribute access) and LegacyResult (dict access)
if hasattr(r, 'title'):
results.append({
'title': getattr(r, 'title', ''),
'content': getattr(r, 'content', ''),
'url': getattr(r, 'url', ''),
'publishedDate': getattr(r, 'publishedDate', '')
})
else:
# Legacy dictionary-style access
results.append({
'title': r.get('title', ''),
'content': r.get('content', ''),
'url': r.get('url', ''),
'publishedDate': r.get('publishedDate', '')
})
# SearXNG already merges infoboxes by ID, use first
infoboxes = []
for ib in raw_infoboxes[:1]:
infoboxes.append({
'name': ib.get('infobox', '') or ib.get('title', ''),
'content': str(ib.get('content') or '')[:2000],
'attributes': ib.get('attributes', [])
})
answers = []
for a in list(raw_answers)[:2]:
ans_text = ""
if hasattr(a, 'answer') and isinstance(getattr(a, 'answer', None), str):
ans_text = a.answer
elif isinstance(a, dict) and a.get('answer'):
ans_text = str(a['answer'])
if ans_text and 'id="sxng-stream-box"' not in ans_text and not ans_text.strip().startswith('<'):
answers.append(ans_text)
return results, infoboxes, answers
def init(self, app):
@app.route('/ai-auxiliary-search', methods=['POST'])
def ai_auxiliary_search():
if not self.api_key:
abort(403)
data = request.json or {}
token = data.get('tk', '')
# Token access control
try:
ts, sig = token.rsplit('.', 1)
expected = hashlib.sha256(f"{ts}{self.secret}".encode()).hexdigest()
if sig != expected or (time.time() - float(ts)) > TOKEN_EXPIRY_SEC:
abort(403)
except (ValueError, KeyError, AttributeError):
abort(403)
aux_ip = request.headers.get('X-Real-IP') or request.headers.get('X-Forwarded-For')
if aux_ip:
logger.debug(f"{PLUGIN_NAME}: /ai-auxiliary-search from proxied IP {aux_ip}")
query = data.get('query', '').strip()
lang = data.get('lang', 'all')
categories = data.get('categories', 'general')
offset = data.get('offset', 0)
if not query:
return jsonify({'results': []})
try:
from searx.search import SearchWithPlugins
from searx.search.models import SearchQuery
from searx.query import RawTextQuery
from searx.webadapter import get_engineref_from_category_list
preferences = getattr(request, 'preferences', None)
disabled_engines = preferences.engines.get_disabled() if preferences else []
rtq = RawTextQuery(query, disabled_engines)
if isinstance(categories, str):
category_list = [c.strip() for c in categories.split(',') if c.strip()]
else:
category_list = categories or ['general']
enginerefs = get_engineref_from_category_list(category_list, disabled_engines)
sq = SearchQuery(
query=rtq.getQuery(),
engineref_list=enginerefs,
lang=lang,
pageno=1,
)
search_obj = SearchWithPlugins(sq, request, user_plugins=[])
result_container = search_obj.search()
raw_results = result_container.get_ordered_results()
raw_infoboxes = getattr(result_container, 'infoboxes', [])
raw_answers = getattr(result_container, 'answers', [])
results, infoboxes, answers = self._parse_aux_results(raw_results, raw_infoboxes, raw_answers)
context_str, new_urls = self._assemble_context(results, infoboxes, answers, offset)
return jsonify({
'context': context_str,
'new_urls': new_urls,
'results': results,
'infoboxes': infoboxes,
'answers': answers,
'query': query
})
except Exception as e:
logger.error(f"{PLUGIN_NAME}: Aux search failed: {e}")
return jsonify({'results': [], 'error': 'Search failed'}), 500
@app.route('/ai-models', methods=['GET'])
def ai_models():
token = request.args.get('tk', '')
models_ip = request.headers.get('X-Real-IP') or request.headers.get('X-Forwarded-For')
if models_ip:
logger.debug(f"{PLUGIN_NAME}: /ai-models from proxied IP {models_ip}")
try:
ts, sig = token.rsplit('.', 1)
expected = hashlib.sha256(f"{ts}{self.secret}".encode()).hexdigest()
if sig != expected or (time.time() - float(ts)) > TOKEN_EXPIRY_SEC:
abort(403)
except (ValueError, KeyError, AttributeError):
abort(403)
auth_headers = {"Authorization": f"Bearer {self.api_key}"}
p = urlparse(self.endpoint_url)
base = f"{p.scheme}://{p.netloc}"
def fetch_get(start_url):
url = start_url
for _ in range(5):
conn, path = _get_streaming_connection(url)
conn.request("GET", path, headers=auth_headers)
res = conn.getresponse()
if res.status in (301, 302, 307, 308):
location = res.getheader('Location', '')
res.read(); conn.close()
if not location:
return None
url = location if location.startswith('http') else f"{urlparse(url).scheme}://{urlparse(url).netloc}{location}"
continue
return res
return None
for models_url, parse_fn in [
(f"{base}/v1/models", lambda d: [m['id'] for m in d.get('data', [])]),
(f"{base}/api/tags", lambda d: [m['name'] for m in d.get('models', [])]),
]:
try:
res = fetch_get(models_url)
if res and res.status == 200:
models = parse_fn(json.loads(res.read().decode('utf-8', errors='replace')))
if models:
return jsonify({'models': models})
elif res:
res.read()
except Exception as e:
logger.debug(f"{PLUGIN_NAME}: /ai-models attempt {models_url} failed: {e}")
return jsonify({'models': [self.model] if self.model else []})
@app.route('/ai-stream', methods=['POST'])
def handle_ai_stream():
data = request.json or {}
token = data.get('tk', '')
q = data.get('q', '')
lang = data.get('lang', 'all')
try:
ts, sig = token.rsplit('.', 1)
expected = hashlib.sha256(f"{ts}{self.secret}".encode()).hexdigest()
if sig != expected or (time.time() - float(ts)) > TOKEN_EXPIRY_SEC:
abort(403)
except (ValueError, KeyError, AttributeError):
abort(403)
context_text = data.get('context', '')
prev_answer = (data.get('prev_answer') or '')[-4000:]
req_model = (data.get('model') or '').strip()
effective_model = req_model or self.model
client_ip = request.headers.get('X-Real-IP') or request.headers.get('X-Forwarded-For')
if client_ip:
logger.debug(f"{PLUGIN_NAME}: /ai-stream from proxied IP {client_ip}")
if not self.api_key:
return Response("Missing API key or query", status=400)
today = time.strftime("%Y-%m-%d")
lang_instruction = f" Respond in {lang}." if lang not in ('all', 'auto') else ""
base_sys = self.system_prompt if self.system_prompt else "You are a direct, citation-accurate search synthesis engine."
SYSTEM = (f"{base_sys} Today is {today}.{lang_instruction} "
"Output only your final answer. Do not output your thinking process, "
"reasoning steps, or internal monologue. Begin your response with the "
"direct answer immediately. "
"Be concise. Give a 2-4 sentence overview that directly answers the query. "
"The user can ask follow-up questions for more detail. "
"Do not enumerate or list everything from the sources.")
max_source_idx = 0
if context_text:
indices = re.findall(r'\[(\d+)\]', context_text)
if indices:
max_source_idx = max(map(int, indices))
CORE_RULES = [
"Answer the question directly using the provided context.",
"MUST CITE SOURCES by tailing a sentence with [n] or [n,n] etc. If citing general knowledge, use [*].",
"Never explain your process. The user expects a direct response.",
"Response format must be plain text with no markdown. "
"Be brief: 2-4 sentences maximum. Lead with the direct answer. "
"Cite the most relevant source(s) only. Stop after the overview.",
"If sources and general knowledge are insufficient, respond with 'Insufficient information to answer.'"
]
if q == "Continue":
task = "CONTINUE: Pick up exactly where previous answer stopped. No repetition. Seamless flow."
elif prev_answer:
task = "FOLLOW-UP: Address the new question using prior context. Prioritize the new query."
else:
task = "ANSWER FIRST: Lead with the direct answer. No preamble, no context-setting."
grounding = "GROUNDING: KNOWLEDGE GRAPH > DEEP > SHALLOW." if context_text else "GROUNDING: No sources available. Use general knowledge and cite as [*] which means based on general knowledge."
history_rule = "HISTORY: Refer to prior exchange for context. Ideally, do not repeat any claims." if prev_answer else None
instructions = [task] + CORE_RULES + [grounding]
if history_rule:
instructions.append(history_rule)
numbered_instructions = "\n".join(f"{i+1}. {r}" for i, r in enumerate(instructions))
prompt = f"""{SYSTEM}
{context_text or 'None.'}
{prev_answer or 'None.'}
{q}
{numbered_instructions}
"""
def call_ollama():
conn = None
try:
payload_dict = {
"model": effective_model,
"messages": [
{"role": "system", "content": SYSTEM},
{"role": "user", "content": prompt},
{"role": "assistant", "content": ""},
],
"stream": False,
"max_tokens": self.max_tokens,
"temperature": self.temperature,
}
payload = json.dumps(payload_dict)
headers = {
"Content-Type": "application/json",
"Authorization": f"Bearer {self.api_key}",
}
url = self.endpoint_url
res = None # type: ignore[assignment]
for _ in range(3):
conn, path = _get_streaming_connection(url)
conn.request("POST", path, body=payload.encode('utf-8'), headers=headers)
res = conn.getresponse()
if res.status in (301, 302, 307, 308):
location = res.getheader('Location', '')
res.read()
conn.close()
conn = None
if not location:
return '', f"Redirect {res.status} with no Location header"
url = location if location.startswith('http') else f"{urlparse(url).scheme}://{urlparse(url).netloc}{location}"
logger.info(f"{PLUGIN_NAME}: Following redirect to {url}")
continue
break
if res.status != 200:
body = res.read(1024).decode('utf-8', errors='replace')
logger.error(f"{PLUGIN_NAME}: Ollama {res.status}: {body}")
return '', f"Ollama error {res.status}"
obj = json.loads(res.read().decode('utf-8', errors='replace'))
if 'error' in obj:
err = obj['error']
msg = err.get('message', str(err)) if isinstance(err, dict) else str(err)
return '', msg
choices = obj.get('choices', [])
if not choices:
return '', "No choices in Ollama response."
message = choices[0].get('message', {})
content = message.get('content') or ''
reasoning = message.get('reasoning') or message.get('reasoning_content') or ''
content = re.sub(r'.*?', '', content, flags=re.DOTALL).strip()
if not content and reasoning:
logger.warning(f"{PLUGIN_NAME}: content empty, extracting from reasoning field")
lines = reasoning.splitlines()
header_re = re.compile(r'^\s*\*?\*?[A-Z][^:]{0,40}:\*?\*?\s*$')
last_header_idx = -1
for i, line in enumerate(lines):
if header_re.match(line):
last_header_idx = i
if last_header_idx >= 0 and last_header_idx < len(lines) - 1:
content = '\n'.join(lines[last_header_idx + 1:]).strip()
if not content:
paragraphs = [p.strip() for p in reasoning.split('\n\n') if p.strip()]
content = '\n\n'.join(paragraphs[-2:]) if len(paragraphs) >= 2 else paragraphs[-1] if paragraphs else ''
if reasoning and content:
full = (f"\n{reasoning}\n\n\n" if reasoning else "") + content
else:
full = content
full = re.sub(r'.*?', '', full, flags=re.DOTALL).strip()
return full, None
except Exception as e:
logger.error(f"{PLUGIN_NAME}: Ollama call error: {e}", exc_info=True)
return '', f"Connection Error: {e}"
finally:
if conn:
conn.close()
text, error = call_ollama()
return jsonify({"text": text, "error": error})
return True
def _fetch_page_text(self, url: str, timeout: int = 5) -> str:
SKIP_DOMAINS = ('youtube.com', 'twitter.com', 'x.com', 'instagram.com', 'facebook.com', 'reddit.com')
try:
if url.endswith('.pdf'):
return ''
if any(d in url for d in SKIP_DOMAINS):
return ''
current_url = url
for _ in range(3): # initial request + up to 2 redirects
parsed = urlparse(current_url)
host = parsed.hostname or ''
if not host:
return ''
port = parsed.port or (443 if parsed.scheme == 'https' else 80)
path = (parsed.path or '/') + ('?' + parsed.query if parsed.query else '')
if parsed.scheme == 'https':
try:
import certifi
ctx = ssl.create_default_context(cafile=certifi.where())
except ImportError:
ctx = ssl.create_default_context()
conn = http.client.HTTPSConnection(host, port, timeout=timeout, context=ctx)
else:
conn = http.client.HTTPConnection(host, port, timeout=timeout)
try:
conn.request('GET', path, headers={'User-Agent': 'Mozilla/5.0 (compatible; SearXNG-AI/1.0)'})
res = conn.getresponse()
if res.status in (301, 302, 303, 307, 308):
location = res.getheader('Location', '')
res.read()
if not location:
return ''
current_url = location if location.startswith('http') else f"{parsed.scheme}://{parsed.netloc}{location}"
continue
if res.status != 200:
return ''
html = res.read(512 * 1024).decode('utf-8', errors='replace')
finally:
conn.close()
html = re.sub(r'', '', html, flags=re.DOTALL | re.IGNORECASE)
html = re.sub(r'', '', html, flags=re.DOTALL | re.IGNORECASE)
html = re.sub(r'', '', html, flags=re.DOTALL | re.IGNORECASE)
html = re.sub(r'', '', html, flags=re.DOTALL | re.IGNORECASE)
html = re.sub(r'', '', html, flags=re.DOTALL | re.IGNORECASE)
text = re.sub(r'<[^>]+>', '', html)
text = (text.replace('&', '&').replace('<', '<').replace('>', '>')
.replace('"', '"').replace(''', "'").replace(' ', ' '))
text = re.sub(r'\s+', ' ', text).strip()
logger.debug(f"{PLUGIN_NAME}: fetched {len(text)} chars from {url}")
return text[:2000]
return ''
except Exception:
return ''
def _enrich_results(self, clean_results: list, query: str) -> list:
enrich_count = min(3, self.context_deep_count)
for r in clean_results:
r['fetched_content'] = ''
futures_map: dict = {}
with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
for r in clean_results[:enrich_count]:
futures_map[executor.submit(self._fetch_page_text, r.get('url', ''))] = r
for future, r in futures_map.items():
try:
text = future.result(timeout=6)
if text and len(text) > 100:
words = query.lower().split()
text_lower = text.lower()
best_pos = len(text) // 2
best_count = -1
keyword_positions = []
for word in words:
start = 0
while True:
idx = text_lower.find(word, start)
if idx == -1:
break
keyword_positions.append(idx)
start = idx + 1
for pos in (keyword_positions or [best_pos]):
window_start = max(0, pos - 400)
window_end = min(len(text), pos + 400)
count = sum(w in text_lower[window_start:window_end] for w in words)
if count > best_count:
best_count = count
best_pos = pos
start = max(0, best_pos - 400)
r['fetched_content'] = text[start:start + 800]
except Exception:
pass
return clean_results
def _assemble_context(self, clean_results, infoboxes, answers, offset=0) -> tuple[str, list]:
"""Builds context string from normalized search data. Returns (context_str, urls)."""
context_parts = []
result_urls = []
knowledge_graph_lines = []
for ib in infoboxes:
ib_name = ib.get('name', '') or ib.get('infobox', '') or ib.get('title', '')
ib_content = str(ib.get('content', '')).replace('\n', ' ').strip()
if ib_name:
parts = [f"INFOBOX [{ib_name}]:"]
if ib_content:
parts.append(ib_content)
for attr in ib.get('attributes', []):
attr_label = attr.get('label', '')
attr_value = attr.get('value', '')
if attr_label and attr_value:
parts.append(f" {attr_label}: {attr_value}")
knowledge_graph_lines.append(" ".join(parts) if len(parts) == 2 else "\n".join(parts))
for ans_text in answers:
if ans_text and not str(ans_text).startswith('<'):
knowledge_graph_lines.append(f"ANSWER: {str(ans_text)[:300]}")
if knowledge_graph_lines:
context_parts.append("KNOWLEDGE GRAPH:\n" + "\n".join(knowledge_graph_lines))
deep_lines = []
for i, r in enumerate(clean_results[:self.context_deep_count]):
url = r.get('url', '')
result_urls.append(url)
domain = urlparse(url).netloc.replace('www.', '')
date_str = f" ({r.get('publishedDate')})" if r.get('publishedDate') else ""
title = r.get('title', '').replace('\n', ' ').strip()
idx = i + 1 + offset
fetched_content = r.get('fetched_content', '')
if fetched_content:
deep_lines.append(f"[{idx}] {domain}{date_str}: {title}: {fetched_content}")
else:
logger.debug(f"{PLUGIN_NAME}: falling back to snippet for [{idx}] {domain}")
content = str(r.get('content', '')).replace('\n', ' ').strip()[:800]
deep_lines.append(f"[{idx}] {domain}{date_str}: {title}: {content}")
if deep_lines:
context_parts.append("DEEP SOURCES:\n" + "\n".join(deep_lines))
if self.context_shallow_count > 0:
shallow_lines = []
start_idx = self.context_deep_count
end_idx = self.context_deep_count + self.context_shallow_count
for i, r in enumerate(clean_results[start_idx:end_idx]):
url = r.get('url', '')
result_urls.append(url)
domain = urlparse(url).netloc.replace('www.', '')
title = r.get('title', '').replace('\n', ' ').strip()[:60]
idx = i + 1 + start_idx + offset
shallow_lines.append(f"[{idx}] {domain}: {title}")
if shallow_lines:
context_parts.append("SHALLOW SOURCES (headlines):\n" + "\n".join(shallow_lines))
return "\n\n".join(context_parts), result_urls
def post_search(self, request: "SXNG_Request", search: "SearchWithPlugins") -> list:
try:
if request and hasattr(request, 'headers') and request.headers.get('X-AI-Auxiliary'):
return []
if request and request.form.get('format', 'html') != 'html':
return []
if self.question_mark_required and '?' not in search.search_query.query:
return []
current_tabs = set(search.search_query.categories)
if not current_tabs: current_tabs = {'general'}
if not self.active or not self.api_key or search.search_query.pageno > 1 or not self.allowed_tabs.intersection(current_tabs):
return []
raw_results = search.result_container.get_ordered_results()
raw_infoboxes = getattr(search.result_container, 'infoboxes', [])
raw_answers = getattr(search.result_container, 'answers', [])
q_clean = search.search_query.query.strip()
clean_results, infoboxes, answers = self._parse_aux_results(raw_results, raw_infoboxes, raw_answers)
clean_results = self._enrich_results(clean_results, q_clean)
context_str, _ = self._assemble_context(clean_results, infoboxes, answers)
ts = str(int(time.time()))
lang = search.search_query.lang
sig = hashlib.sha256(f"{ts}{self.secret}".encode()).hexdigest()
tk = f"{ts}.{sig}"
# XSS blocking
safe_json = lambda x: json.dumps(x).replace('<', '\\u003c').replace('>', '\\u003e').replace('&', '\\u0026')
b64_context = base64.b64encode(context_str.encode('utf-8')).decode('utf-8')
total_context_count = self.context_deep_count + self.context_shallow_count
raw_urls = [r.get('url', '') for r in clean_results[:total_context_count]]
js_q = safe_json(q_clean)
js_lang = safe_json(lang)
js_urls = safe_json(raw_urls)
js_b64_context = safe_json(b64_context)
js_tk = safe_json(tk)
js_script_root = safe_json((request.script_root if request else '').rstrip('/'))
js_model_init = safe_json(self.model)
is_interactive = self.interactive
interactive_css = INTERACTIVE_CSS if is_interactive else ''
interactive_html = INTERACTIVE_HTML if is_interactive else ''
interactive_js_init = INTERACTIVE_JS if is_interactive else ''
if is_interactive:
interactive_js_complete = "footer.style.display = 'flex';"
else:
interactive_js_complete = ''
stream_fn_sig = 'async function startStream(overrideQ = null, prevAnswer = null, auxContext = null)'
stream_q = 'overrideQ || q_init' if is_interactive else 'q_init'
stream_body = f'''prev_answer: prevAnswer''' if is_interactive else ''
js_code = FRONTEND_JS_TEMPLATE \
.replace("__IS_INTERACTIVE__", 'true' if is_interactive else 'false') \
.replace("__TK__", js_tk) \
.replace("__SCRIPT_ROOT__", js_script_root) \
.replace("__MODEL_INIT__", js_model_init) \
.replace("__CITATION_HELPER_JS__", CITATION_HELPER_JS) \
.replace("__INTERACTIVE_JS_INIT__", interactive_js_init) \
.replace("__STREAM_FN_SIG__", stream_fn_sig) \
.replace("__STREAM_Q__", stream_q) \
.replace("__STREAM_BODY__", ', ' + stream_body if stream_body else '') \
.replace("__INTERACTIVE_JS_COMPLETE__", interactive_js_complete) \
.replace("__JS_LANG__", js_lang) \
.replace("__JS_URLS__", js_urls) \
.replace("__B64_CONTEXT__", js_b64_context) \
.replace("__JS_Q__", js_q)
html_payload = f'''
{interactive_html}
'''
return [Answer(answer=Markup(html_payload))]
except Exception as e:
logger.error(f"{PLUGIN_NAME}: {e}")
return []