import json, http.client, ssl, os, logging, base64 from flask import Response, request from searx.plugins import Plugin, PluginInfo from searx.result_types import EngineResults from flask_babel import gettext from markupsafe import Markup logger = logging.getLogger(__name__) class SXNGPlugin(Plugin): id = "gemini_flash" def __init__(self, plg_cfg): super().__init__(plg_cfg) self.info = PluginInfo( id=self.id, name=gettext("Gemini Flash Streaming"), description=gettext("Live AI search answers using Google Gemini Flash"), preference_section="general", ) self.api_key = os.getenv('GEMINI_API_KEY') self.model = os.getenv('GEMINI_MODEL', 'gemini-1.5-flash') def init(self, app): @app.route('/gemini-stream', methods=['POST']) def g_stream(): data = request.json or {} context_text = data.get('context', '') q = data.get('q', '') if not self.api_key or not q: return Response("Error: Missing Key or Query", status=400) def generate(): host = "generativelanguage.googleapis.com" path = f"/v1beta/models/{self.model}:streamGenerateContent?key={self.api_key}" try: conn = http.client.HTTPSConnection(host, context=ssl.create_default_context()) prompt = f"Using these SEARCH RESULTS, answer the USER QUERY concisely (<4 sentences). If results are irrelevant, say so.\n\nRESULTS:\n{context_text}\n\nUSER QUERY: {q}" payload = {"contents": [{"parts": [{"text": prompt}]}], "generationConfig": {"maxOutputTokens": 400, "temperature": 0.3}} conn.request("POST", path, body=json.dumps(payload), headers={"Content-Type": "application/json"}) res = conn.getresponse() buffer = "" for chunk in res: if not chunk: continue buffer += chunk.decode('utf-8') while True: start = buffer.find('{') if start == -1: break brace_count, end = 0, -1 for i in range(start, len(buffer)): if buffer[i] == '{': brace_count += 1 elif buffer[i] == '}': brace_count -= 1 if brace_count == 0: end = i + 1 break if end == -1: break try: data = json.loads(buffer[start:end]) candidates = data.get('candidates', []) if candidates: text = candidates[0]['content']['parts'][0]['text'] if text: yield text except: pass buffer = buffer[end:] conn.close() except Exception as e: yield f" [Error: {str(e)}]" return Response(generate(), mimetype='text/plain', headers={'X-Accel-Buffering': 'no'}) return True def post_search(self, request, search) -> EngineResults: results = EngineResults() if not self.active or not self.api_key or search.search_query.pageno > 1: return results raw_results = search.result_container.get_ordered_results() context_list = [f"[{i+1}] {r.get('title')}: {r.get('content')}" for i, r in enumerate(raw_results[:6])] context_str = "\n".join(context_list) # Base64 Encode to ensure HTML safety b64_context = base64.b64encode(context_str.encode('utf-8')).decode('utf-8') js_q = json.dumps(search.search_query.query) html_payload = f'''
''' results.add(results.types.Answer(answer=Markup(html_payload))) return results