More ollama fixes
CI Test Guard / validate-code (push) Has been cancelled

This commit is contained in:
Tyler
2026-05-14 21:55:19 -04:00
parent 323e7593ef
commit dab3f293a1
+102 -285
View File
@@ -1,11 +1,11 @@
import json, os, logging, base64, time, hashlib, codecs, re, http.client, ssl
import json, os, logging, base64, time, hashlib, re, http.client, ssl
from urllib.parse import urlparse
from searx import network
try:
from searx.network import get_network
except ImportError:
get_network = None
from flask import Response, request, abort, jsonify, stream_with_context
from flask import Response, request, abort, jsonify
from searx.plugins import Plugin, PluginInfo
from searx.result_types import EngineResults
from searx import settings
@@ -537,8 +537,46 @@ FRONTEND_JS_TEMPLATE = r"""
return;
}
const reader = res.body.getReader();
const decoder = new TextDecoder();
const respJson = await res.json();
if (respJson.error) {
const cursorErr = data.querySelector('.sxng-cursor');
if (cursorErr) cursorErr.remove();
const errSpan = document.createElement('span');
errSpan.style.color = '#bf616a';
errSpan.textContent = "⚠️ " + respJson.error;
data.appendChild(errSpan);
return;
}
const fullText = (respJson.text || '').trim();
if (!fullText) {
const cursorErr = data.querySelector('.sxng-cursor');
if (cursorErr) cursorErr.remove();
const errSpan = document.createElement('span');
errSpan.style.color = '#bf616a';
errSpan.textContent = 'No response received. Check API configuration and server logs.';
data.appendChild(errSpan);
return;
}
let mainText = fullText;
const thinkMatch = mainText.match(/^<think>([\s\S]*?)<\/think>\s*/);
if (thinkMatch) {
const cursorTh = data.querySelector('.sxng-cursor');
const details = document.createElement('details');
details.className = 'sxng-reasoning';
details.innerHTML = '<summary>Thought Process</summary>';
const thoughtDiv = document.createElement('div');
thoughtDiv.className = 'sxng-thought-content';
thoughtDiv.textContent = thinkMatch[1];
details.appendChild(thoughtDiv);
if (cursorTh) cursorTh.before(details);
else data.appendChild(details);
mainText = mainText.substring(thinkMatch[0].length);
}
let cursor = data.querySelector('.sxng-cursor');
if (!cursor) {
cursor = document.createElement('span');
@@ -546,10 +584,6 @@ FRONTEND_JS_TEMPLATE = r"""
data.appendChild(cursor);
}
let started = false;
let collectedResponse = '';
let isThinking = false, thoughtDiv = null;
let buffer = '';
const flushBuffer = (force = false) => {
if (!buffer) return;
@@ -611,105 +645,23 @@ FRONTEND_JS_TEMPLATE = r"""
}
};
let streamBuffer = '';
while (true) {
const {done, value} = await reader.read();
if (done) break;
clearTimeout(timeoutId);
timeoutId = setTimeout(() => controller.abort(), 60000);
const chunk = decoder.decode(value, {stream: true});
if (!chunk) continue;
streamBuffer += chunk;
if (streamBuffer.match(/<\/?(?:t(?:h(?:i(?:n(?:k)?)?)?)?)?$/)) {
continue;
}
while (true) {
const openIdx = streamBuffer.indexOf('<think>');
const closeIdx = streamBuffer.indexOf('</think>');
if (openIdx === -1 && closeIdx === -1) break;
if (!isThinking) {
if (openIdx !== -1 && (closeIdx === -1 || openIdx < closeIdx)) {
const preTag = streamBuffer.substring(0, openIdx);
if (preTag) {
if (!started) {
const trimmed = preTag.replace(/^[\s.,;:!?]+/, '');
if (trimmed || collectedResponse.trim()) {
if (cursor && !cursor.isConnected) data.appendChild(cursor);
started = true;
}
}
if (started) {
buffer += preTag;
flushBuffer(false);
}
collectedResponse += preTag;
}
isThinking = true;
const details = document.createElement('details');
details.className = 'sxng-reasoning';
details.innerHTML = '<summary>Thought Process</summary>';
thoughtDiv = document.createElement('div');
thoughtDiv.className = 'sxng-thought-content';
details.appendChild(thoughtDiv);
(cursor ? cursor.before(details) : data.appendChild(details));
streamBuffer = streamBuffer.substring(openIdx + 7);
} else {
streamBuffer = streamBuffer.replace('</think>', '');
}
} else {
if (closeIdx !== -1 && (openIdx === -1 || closeIdx < openIdx)) {
const thoughtText = streamBuffer.substring(0, closeIdx);
if (thoughtDiv) thoughtDiv.textContent += thoughtText;
isThinking = false;
streamBuffer = streamBuffer.substring(closeIdx + 8);
} else {
streamBuffer = streamBuffer.replace('<think>', '');
}
let twPos = 0;
const twBatch = 4;
await new Promise(resolve => {
function twTick() {
if (twPos >= mainText.length) {
flushBuffer(true);
resolve();
return;
}
const end = Math.min(twPos + twBatch, mainText.length);
buffer += mainText.substring(twPos, end);
twPos = end;
flushBuffer(false);
setTimeout(twTick, 8);
}
if (streamBuffer.length > 0) {
if (isThinking && thoughtDiv) {
thoughtDiv.textContent += streamBuffer;
} else {
if (!started) {
const trimmed = streamBuffer.replace(/^[\s.,;:!?]+/, '');
if (trimmed || collectedResponse.trim()) {
if (cursor && !cursor.isConnected) data.appendChild(cursor);
started = true;
}
}
if (started) {
buffer += streamBuffer;
flushBuffer(false);
}
collectedResponse += streamBuffer;
}
streamBuffer = '';
}
}
if (streamBuffer.length > 0) {
streamBuffer = streamBuffer.replace(/<\/?(?:t(?:h(?:i(?:n(?:k)?)?)?)?)?$/, '');
if (streamBuffer.length > 0) {
if (isThinking && thoughtDiv) {
thoughtDiv.textContent += streamBuffer;
} else {
buffer += streamBuffer;
collectedResponse += streamBuffer;
}
}
}
flushBuffer(true);
twTick();
});
if (cursor) cursor.remove();
@@ -725,21 +677,7 @@ FRONTEND_JS_TEMPLATE = r"""
}
}
if (!started && !collectedResponse.trim()) {
const cursor = data.querySelector('.sxng-cursor');
if (cursor) cursor.remove();
const errSpan = document.createElement('span');
if (thoughtDiv && thoughtDiv.textContent.trim().length > 0) {
errSpan.style.color = '#ebcb8b';
errSpan.textContent = 'Model provided reasoning but stopped before the final answer. Try adjusting token limits.';
} else {
errSpan.style.color = '#bf616a';
errSpan.textContent = 'No response received. Check API configuration and server logs.';
}
data.appendChild(errSpan);
return;
}
const collectedResponse = mainText;
__INTERACTIVE_JS_COMPLETE__
@@ -1163,103 +1101,48 @@ class SXNGPlugin(Plugin):
{numbered_instructions}
</CORE_DIRECTIVES>"""
def stream_gemini():
yield ""
if '?' in self.endpoint_url:
url = f"{self.endpoint_url}&key={self.api_key}"
else:
url = f"{self.endpoint_url}?key={self.api_key}"
def call_gemini():
base = self.endpoint_url.replace('streamGenerateContent', 'generateContent')
url = f"{base}&key={self.api_key}" if '?' in base else f"{base}?key={self.api_key}"
conn = None
try:
conn, path = _get_streaming_connection(url)
payload = json.dumps({"contents": [{"parts": [{"text": prompt}]}], "generationConfig": {"maxOutputTokens": min(self.max_tokens * 4, 8192), "temperature": self.temperature}})
payload = json.dumps({
"contents": [{"parts": [{"text": prompt}]}],
"generationConfig": {"maxOutputTokens": min(self.max_tokens * 4, 8192), "temperature": self.temperature}
})
conn.request("POST", path, body=payload.encode('utf-8'), headers={"Content-Type": "application/json"})
res = conn.getresponse()
if res.status != 200:
body = res.read(2048).decode('utf-8', errors='replace')[:500]
logger.error(f"{PLUGIN_NAME}: Gemini API {res.status}: {body}")
yield f"\n⚠️ API error {res.status}. Check server logs.\n"
return
decoder = json.JSONDecoder()
utf8_decoder = codecs.getincrementaldecoder('utf-8')(errors='replace')
buffer = ""
while True:
chunk = res.read(STREAM_CHUNK_SIZE)
if not chunk:
buffer += utf8_decoder.decode(b'', final=True)
break
buffer += utf8_decoder.decode(chunk)
while buffer:
buffer = buffer.lstrip()
if buffer.startswith('['):
buffer = buffer[1:].lstrip()
elif buffer.startswith(','):
buffer = buffer[1:].lstrip()
elif buffer.startswith(']'):
buffer = buffer[1:].lstrip()
if not buffer: break
try:
obj, idx = decoder.raw_decode(buffer)
items = obj if isinstance(obj, list) else [obj]
for item in items:
if not isinstance(item, dict):
continue
if 'promptFeedback' in item and item['promptFeedback'].get('blockReason'):
yield f"\n⚠️ Gemini blocked prompt. Reason: {item['promptFeedback']['blockReason']}\n"
return
candidates = item.get('candidates')
if not isinstance(candidates, list) or len(candidates) == 0:
continue
first_candidate = candidates[0]
if not isinstance(first_candidate, dict):
continue
if first_candidate.get('finishReason') == 'SAFETY':
yield "\n⚠️ Gemini stopped generation due to safety filters.\n"
return
content = first_candidate.get('content')
if not isinstance(content, dict):
continue
parts = content.get('parts')
if not isinstance(parts, list) or len(parts) == 0:
continue
first_part = parts[0]
if isinstance(first_part, dict):
text = first_part.get('text')
if text and isinstance(text, str):
yield text
buffer = buffer[idx:]
except json.JSONDecodeError:
break
except Exception as parse_err:
logger.debug(f"{PLUGIN_NAME}: Ignored malformed Gemini chunk. Error: {parse_err}")
break
return '', f"API error {res.status}. Check server logs."
obj = json.loads(res.read().decode('utf-8', errors='replace'))
if obj.get('promptFeedback', {}).get('blockReason'):
return '', f"Gemini blocked prompt: {obj['promptFeedback']['blockReason']}"
candidates = obj.get('candidates', [])
if not candidates:
return '', "No candidates in Gemini response."
first = candidates[0]
if first.get('finishReason') == 'SAFETY':
return '', "Gemini stopped generation due to safety filters."
parts = first.get('content', {}).get('parts', [])
text = ''.join(p.get('text', '') for p in parts if isinstance(p, dict))
return text, None
except Exception as e:
logger.error(f"{PLUGIN_NAME}: Gemini stream error: {e}", exc_info=True)
yield f"\n⚠️ Connection Error: {e}\n"
logger.error(f"{PLUGIN_NAME}: Gemini call error: {e}", exc_info=True)
return '', f"Connection Error: {e}"
finally:
if conn: conn.close()
def stream_openai_compatible():
yield ""
def call_openai_compatible():
conn = None
try:
conn, path = _get_streaming_connection(self.endpoint_url)
payload_dict = {
"model": effective_model,
"messages": [{"role": "user", "content": prompt}],
"stream": True,
"stream": False,
"max_tokens": self.max_tokens,
"temperature": self.temperature
}
@@ -1268,7 +1151,6 @@ class SXNGPlugin(Plugin):
payload = json.dumps(payload_dict)
headers = {
"Content-Type": "application/json",
"Accept": "text/event-stream",
"HTTP-Referer": "https://github.com/searxng/searxng",
"X-Title": "SearXNG"
}
@@ -1278,101 +1160,36 @@ class SXNGPlugin(Plugin):
headers['Authorization'] = f"Bearer {self.api_key}"
conn.request("POST", path, body=payload.encode('utf-8'), headers=headers)
res = conn.getresponse()
if res.status != 200:
body = res.read(2048).decode('utf-8', errors='replace')[:500]
logger.error(f"{PLUGIN_NAME}: {self.provider} API {res.status}: {body}")
yield f"\n⚠️ API error {res.status}. Check server logs.\n"
return
decoder = json.JSONDecoder()
in_reasoning_block = False
while True:
line_bytes = res.readline()
if not line_bytes: break
line = line_bytes.decode('utf-8', errors='replace').strip()
if not line:
continue
if line.startswith("data: "):
data_str = line[6:].strip()
if data_str == "[DONE]":
if in_reasoning_block:
yield "\n</think>\n\n"
return
try:
obj, _ = decoder.raw_decode(data_str)
if not isinstance(obj, dict):
continue
# Catch upstream errors
if "error" in obj:
err_msg = obj["error"].get("message", str(obj["error"])) if isinstance(obj["error"], dict) else str(obj["error"])
yield f"\n⚠️ API Error: {err_msg}\n"
return
choices = obj.get("choices")
if not isinstance(choices, list) or len(choices) == 0:
continue
choice = choices[0]
if not isinstance(choice, dict):
continue
delta = choice.get("delta")
if not isinstance(delta, dict):
continue
reasoning = delta.get("reasoning_content")
content = delta.get("content")
if reasoning and isinstance(reasoning, str):
if not in_reasoning_block:
yield "<think>\n"
in_reasoning_block = True
yield reasoning
if content and isinstance(content, str):
if in_reasoning_block:
yield "\n</think>\n\n"
in_reasoning_block = False
yield content
except json.JSONDecodeError:
pass
except Exception as parse_err:
logger.debug(f"{PLUGIN_NAME}: Ignored malformed OpenAI chunk. Error: {parse_err}")
pass
if in_reasoning_block:
yield "\n</think>\n\n"
return '', f"API error {res.status}. Check server logs."
obj = json.loads(res.read().decode('utf-8', errors='replace'))
if "error" in obj:
err = obj["error"]
msg = err.get("message", str(err)) if isinstance(err, dict) else str(err)
return '', f"API Error: {msg}"
choices = obj.get("choices", [])
if not choices:
return '', "No choices in API response."
message = choices[0].get("message", {})
content = message.get("content") or ""
reasoning = message.get("reasoning_content") or ""
full = (f"<think>\n{reasoning}\n</think>\n\n" if reasoning else "") + content
return full, None
except Exception as e:
logger.error(f"{PLUGIN_NAME}: {self.provider} stream error: {e}", exc_info=True)
yield f"\n⚠️ Connection Error: {e}\n"
logger.error(f"{PLUGIN_NAME}: {self.provider} call error: {e}", exc_info=True)
return '', f"Connection Error: {e}"
finally:
if conn: conn.close()
generator = stream_gemini if self.is_gemini else stream_openai_compatible
call_fn = call_gemini if self.is_gemini else call_openai_compatible
text, error = call_fn()
if self.provider == 'ollama' and getattr(self, 'ollama_unload_after', False):
self._ollama_unload_model()
gen_fn = generator
def generator():
try:
yield from gen_fn()
finally:
self._ollama_unload_model()
return Response(stream_with_context(generator()), mimetype='text/event-stream', headers={
'X-Accel-Buffering': 'no',
'Cache-Control': 'no-cache, no-store',
'Connection': 'keep-alive'
})
return jsonify({"text": text, "error": error})
return True
def _assemble_context(self, clean_results, infoboxes, answers, offset=0) -> tuple[str, list]: