From dd3eea5182d19ac6a352e96d33563847acb2c21c Mon Sep 17 00:00:00 2001 From: cra88y Date: Sat, 10 Jan 2026 19:42:37 -0600 Subject: [PATCH] feat: robust stateless hybrid architecture using base64 context handover --- gemini_flash.py | 204 +++++++++++++++--------------------------------- 1 file changed, 65 insertions(+), 139 deletions(-) diff --git a/gemini_flash.py b/gemini_flash.py index b1c0ef3..e05a7df 100644 --- a/gemini_flash.py +++ b/gemini_flash.py @@ -1,5 +1,5 @@ -import json, secrets, time, http.client, ssl, os, logging, html, urllib.parse -from flask import Response, request, abort +import json, http.client, ssl, os, logging, base64 +from flask import Response, request from searx.plugins import Plugin, PluginInfo from searx.result_types import EngineResults from flask_babel import gettext @@ -19,181 +19,107 @@ class SXNGPlugin(Plugin): preference_section="general", ) self.api_key = os.getenv('GEMINI_API_KEY') - self.model = os.getenv('GEMINI_MODEL', 'gemini-3-flash-preview') - self.tokens = {} - - if not self.api_key: - logger.error(f"[{self.id}] API Key missing! Set GEMINI_API_KEY env var.") - else: - logger.info(f"[{self.id}] Initialized with model: {self.model}") + self.model = os.getenv('GEMINI_MODEL', 'gemini-1.5-flash') def init(self, app): - @app.route('/gemini-stream') + @app.route('/gemini-stream', methods=['POST']) def g_stream(): - t = request.args.get('token') - q = request.args.get('q', '') - - # Maintenance: handle dict structure - current_time = time.time() - self.tokens = {k: v for k, v in self.tokens.items() if v['expires'] > current_time} - - if t not in self.tokens or not self.api_key: - abort(403) - - token_data = self.tokens[t] - context_text = token_data.get('context', '') - del self.tokens[t] + data = request.json or {} + context_text = data.get('context', '') + q = data.get('q', '') + + if not self.api_key or not q: + return Response("Error: Missing Key or Query", status=400) def generate(): host = "generativelanguage.googleapis.com" path = f"/v1beta/models/{self.model}:streamGenerateContent?key={self.api_key}" try: - context = ssl.create_default_context() - conn = http.client.HTTPSConnection(host, context=context) - - # RAG PROMPT - prompt = f""" -You are a concise search assistant. Use the provided SEARCH RESULTS to answer the USER QUERY. -If the results don't contain the answer, use your knowledge but prioritize the results. -Keep the answer under 4 sentences. - -SEARCH RESULTS: -{context_text} - -USER QUERY: {q} -""" - payload = { - "contents": [{"parts": [{"text": prompt}]}], - "generationConfig": { - "maxOutputTokens": 500, - "temperature": 0.2 # Lower temperature for better factual accuracy - } - } - - conn.request("POST", path, body=json.dumps(payload), - headers={"Content-Type": "application/json"}) + conn = http.client.HTTPSConnection(host, context=ssl.create_default_context()) + prompt = f"Using these SEARCH RESULTS, answer the USER QUERY concisely (<4 sentences). If results are irrelevant, say so.\n\nRESULTS:\n{context_text}\n\nUSER QUERY: {q}" + payload = {"contents": [{"parts": [{"text": prompt}]}], "generationConfig": {"maxOutputTokens": 400, "temperature": 0.3}} + conn.request("POST", path, body=json.dumps(payload), headers={"Content-Type": "application/json"}) res = conn.getresponse() buffer = "" for chunk in res: if not chunk: continue buffer += chunk.decode('utf-8') - while True: start = buffer.find('{') - if start == -1: - buffer = "" # Clear garbage - break - - brace_count = 0 - end = -1 + if start == -1: break + brace_count, end = 0, -1 for i in range(start, len(buffer)): if buffer[i] == '{': brace_count += 1 elif buffer[i] == '}': brace_count -= 1 if brace_count == 0: end = i + 1 break - - if end == -1: break # Wait for more data - + if end == -1: break try: - raw_json = buffer[start:end] - data = json.loads(raw_json) - parts = data.get('candidates', [{}])[0].get('content', {}).get('parts', []) - for part in parts: - text = part.get('text', '') - if text: - yield text - except Exception: - pass - + data = json.loads(buffer[start:end]) + candidates = data.get('candidates', []) + if candidates: + text = candidates[0]['content']['parts'][0]['text'] + if text: yield text + except: pass buffer = buffer[end:] conn.close() except Exception as e: - logger.error(f"[{self.id}] Stream error: {e}") yield f" [Error: {str(e)}]" - return Response(generate(), mimetype='text/plain', headers={ - 'Cache-Control': 'no-cache, no-store, must-revalidate', - 'Pragma': 'no-cache', - 'Expires': '0', - 'X-Accel-Buffering': 'no' - }) - - @app.route('/gemini.js') - def g_script(): - token = request.args.get('token', '') - query = request.args.get('q', '') - - js_query = json.dumps(query) - js_token = json.dumps(token) - - js_code = f""" - (async () => {{ - const shell = document.getElementById('ai-shell'); - const out = document.getElementById('ai-out'); - if (!shell || !out) return; - - const token = {js_token}; - const query = {js_query}; - - try {{ - const res = await fetch(`/gemini-stream?token=${{token}}&q=` + encodeURIComponent(query)); - if (!res.ok) throw new Error(res.statusText); - - const reader = res.body.getReader(); - const decoder = new TextDecoder(); - while (true) {{ - const {{done, value}} = await reader.read(); - if (done) break; - const chunk = decoder.decode(value); - if (chunk.trim()) {{ - shell.style.display = 'block'; - out.innerText += chunk; - }} - }} - }} catch (e) {{ console.error("Gemini Stream Failed", e); }} - }})(); - """ - return Response(js_code, mimetype='application/javascript') - + return Response(generate(), mimetype='text/plain', headers={'X-Accel-Buffering': 'no'}) return True def post_search(self, request, search) -> EngineResults: results = EngineResults() - if search.search_query.pageno > 1 or not self.active or not self.api_key: + if not self.active or not self.api_key or search.search_query.pageno > 1: return results - # Extract context from top 5 search results for RAG - context_parts = [] raw_results = search.result_container.get_ordered_results() - for i, res in enumerate(raw_results[:5]): - title = res.get('title', 'No Title') - content = res.get('content', 'No Content') - context_parts.append(f"Source [{i+1}]: {title}\nSnippet: {content}") - - context_str = "\n\n".join(context_parts) - - tk = secrets.token_urlsafe(16) - self.tokens[tk] = { - "expires": time.time() + 90, - "context": context_str - } - - logger.debug(f"[{self.id}] Prepared RAG context for query: {search.search_query.query[:20]}...") + context_list = [f"[{i+1}] {r.get('title')}: {r.get('content')}" for i, r in enumerate(raw_results[:6])] + context_str = "\n".join(context_list) + + # Base64 Encode to ensure HTML safety + b64_context = base64.b64encode(context_str.encode('utf-8')).decode('utf-8') + js_q = json.dumps(search.search_query.query) - # Encode query for the URL parameter in the script tag - safe_query_param = urllib.parse.quote(search.search_query.query) - - # HTML Payload: - # 1. The Container (Hidden by default) - # 2. The Script Tag (Pointing to our dynamic route with params) html_payload = f''' - + ''' - results.add(results.types.Answer(answer=Markup(html_payload))) - return results \ No newline at end of file + return results