Enhancing AI workflow

This commit is contained in:
TySP-Dev
2025-11-13 22:00:32 -10:00
parent 0f579c973d
commit d3fed1cea0
8 changed files with 1118 additions and 74 deletions
+3
View File
@@ -30,6 +30,7 @@ from .settings_dialog import SettingsDialog
from .main_gui import MainGUI
from .utils import Logger, PRNumberManager, ContentBuilders
from .workflow import WorkflowManager, WorkflowItem, GitHubRepoFetcher
from .ai_action_planner import AIActionPlanner, ActionPlan
__all__ = [
'ConfigManager',
@@ -43,6 +44,8 @@ __all__ = [
'WorkflowManager',
'WorkflowItem',
'GitHubRepoFetcher',
'AIActionPlanner',
'ActionPlan',
'__version__',
'__author__',
'__app_name__',
+617
View File
@@ -0,0 +1,617 @@
"""
AI Action Planner
Generates and executes action plans for GitHub issues and PRs using AI
"""
import json
import re
import requests
from typing import List, Dict, Any, Optional, Callable
from pathlib import Path
class ActionPlan:
"""Represents an AI-generated action plan"""
def __init__(self, title: str, steps: List[Dict[str, Any]], context: Dict[str, Any]):
self.title = title
self.steps = steps # List of {description, file_path, changes, completed}
self.context = context # PR/Issue context
self.completed_steps = []
self.failed_steps = []
def to_dict(self) -> Dict[str, Any]:
"""Convert plan to dictionary"""
return {
'title': self.title,
'steps': self.steps,
'context': self.context,
'completed_steps': self.completed_steps,
'failed_steps': self.failed_steps
}
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> 'ActionPlan':
"""Create plan from dictionary"""
plan = cls(data['title'], data['steps'], data['context'])
plan.completed_steps = data.get('completed_steps', [])
plan.failed_steps = data.get('failed_steps', [])
return plan
class OllamaProvider:
"""Simple Ollama API provider for AI action planning"""
def __init__(self, base_url: str, model: str, logger):
self.base_url = base_url.rstrip('/')
self.model = model
self.logger = logger
def generate(self, prompt: str) -> Optional[str]:
"""Generate a response from Ollama"""
try:
response = requests.post(
f"{self.base_url}/api/generate",
json={
"model": self.model,
"prompt": prompt,
"stream": False
},
timeout=120
)
response.raise_for_status()
result = response.json()
return result.get('response', '')
except Exception as e:
self.logger.log(f"❌ Ollama API error: {str(e)}")
return None
def make_change(self, file_content: str, old_text: str, new_text: str, file_path: str, custom_instructions: str = None) -> Optional[str]:
"""Make changes to file content using Ollama"""
# Try direct replacement first
if old_text and old_text.strip() in file_content:
return file_content.replace(old_text.strip(), new_text.strip())
# Use Ollama to make intelligent changes
prompt = f"""You are a code modification assistant. Modify the following file according to the instructions.
File: {file_path}
Current Content:
```
{file_content}
```
Instructions: {new_text}
{f'Additional context: {custom_instructions}' if custom_instructions else ''}
Return ONLY the complete modified file content. Do not include explanations or markdown code blocks."""
return self.generate(prompt)
class AIActionPlanner:
"""Generates and executes action plans using AI"""
def __init__(self, ai_manager, logger, config_manager):
self.ai_manager = ai_manager
self.logger = logger
self.config_manager = config_manager
def generate_plan(self, item, custom_instructions: str = "") -> Optional[ActionPlan]:
"""
Generate an action plan for a PR or Issue
Args:
item: The PR or Issue (WorkflowItem object or dict)
custom_instructions: Optional user-provided instructions
Returns:
ActionPlan object or None if generation failed
"""
# Handle both WorkflowItem objects and dictionaries
if hasattr(item, 'item_type'):
# It's a WorkflowItem object
item_type = item.item_type
item_number = item.number
title = item.title
body = item.body or ''
repo = getattr(item, 'repo', None)
else:
# It's a dictionary
item_type = item.get('type', 'unknown')
item_number = item.get('number')
title = item.get('title', 'Untitled')
body = item.get('body', '')
repo = item.get('repo')
self.logger.log(f"🤖 Generating action plan for {item_type} #{item_number}...")
# Get AI provider
config = self.config_manager.get_config()
ai_provider_name = config.get('AI_PROVIDER', 'none').lower()
if ai_provider_name == 'none' or not ai_provider_name:
self.logger.log("❌ No AI provider configured. Please configure in Settings.")
return None
# Get provider instance
provider = self._get_ai_provider(ai_provider_name, config)
if not provider:
return None
# Generate the plan using AI
try:
self.logger.log(f"📤 Calling AI provider: {type(provider).__name__}")
plan_text = self._call_ai_for_plan(provider, item_type, title, body, custom_instructions)
if not plan_text:
self.logger.log("❌ AI did not generate a plan (empty response)")
return None
self.logger.log(f"📥 Received response from AI ({len(plan_text)} characters)")
self.logger.log(f"📄 Response preview: {plan_text[:200]}...")
# Parse the plan
self.logger.log("🔍 Parsing AI response into steps...")
steps = self._parse_plan(plan_text)
if not steps:
self.logger.log("❌ Could not parse action steps from AI response")
return None
# Get repo from item or config
if repo is None:
repo = config.get('GITHUB_REPO', '')
plan = ActionPlan(
title=f"Action Plan for {item_type.upper()} #{item_number}: {title}",
steps=steps,
context={
'item_type': item_type,
'item_number': item_number,
'item_title': title,
'item_body': body,
'repo': repo
}
)
self.logger.log(f"✅ Generated plan with {len(steps)} steps")
return plan
except Exception as e:
self.logger.log(f"❌ Error generating plan: {str(e)}")
return None
def _get_ai_provider(self, provider_name: str, config: Dict[str, Any]):
"""Get the AI provider instance"""
try:
if provider_name in ['claude', 'anthropic']:
# Try both CLAUDE_API_KEY and ANTHROPIC_API_KEY for compatibility
api_key = config.get('CLAUDE_API_KEY')
if not api_key:
api_key = config.get('ANTHROPIC_API_KEY')
if not api_key:
self.logger.log("❌ Claude API key not found in secure storage (tried both CLAUDE_API_KEY and ANTHROPIC_API_KEY)")
return None
self.logger.log("️ Initializing Claude provider...")
from . import ai_manager
provider = ai_manager.ClaudeProvider(api_key, self.logger)
self.logger.log("✅ Claude provider initialized successfully")
return provider
elif provider_name in ['chatgpt', 'openai']:
api_key = config.get('OPENAI_API_KEY')
if not api_key:
self.logger.log("❌ OpenAI API key not found in secure storage")
return None
self.logger.log("️ Initializing ChatGPT provider...")
from . import ai_manager
provider = ai_manager.ChatGPTProvider(api_key, self.logger)
self.logger.log("✅ ChatGPT provider initialized successfully")
return provider
elif provider_name == 'ollama':
# Ollama doesn't need an API key, uses URL from config
ollama_url = config.get('OLLAMA_URL', 'http://localhost:11434')
ollama_model = config.get('OLLAMA_MODEL', 'llama2')
self.logger.log(f"️ Using Ollama at {ollama_url} with model {ollama_model}")
# Create a simple Ollama provider wrapper
return OllamaProvider(ollama_url, ollama_model, self.logger)
else:
self.logger.log(f"❌ Unsupported AI provider: {provider_name}")
return None
except Exception as e:
self.logger.log(f"❌ Error creating AI provider: {str(e)}")
return None
def _call_ai_for_plan(self, provider, item_type: str, title: str, body: str, custom_instructions: str) -> Optional[str]:
"""Call AI to generate an action plan"""
prompt = f"""You are an expert software engineer tasked with creating an actionable plan to address a GitHub {item_type}.
{item_type.upper()} Title: {title}
{item_type.upper()} Description:
{body}
{"Additional Instructions: " + custom_instructions if custom_instructions else ""}
Please create a detailed action plan with specific, executable steps. For each step, specify:
1. What needs to be done (clear description)
2. Which file(s) need to be modified (if applicable)
3. What changes should be made (if applicable)
Format your response as a JSON array of steps, where each step has:
- "description": A clear description of what to do
- "file_path": Path to the file to modify (or null if not file-specific)
- "changes": Description of changes to make (or null if not applicable)
- "action_type": One of ["modify_file", "create_file", "delete_file", "investigate", "test", "document"]
Example format:
```json
[
{{
"description": "Fix the authentication bug in login handler",
"file_path": "src/auth/login.py",
"changes": "Update the password validation logic to handle special characters correctly",
"action_type": "modify_file"
}},
{{
"description": "Add unit tests for authentication",
"file_path": "tests/test_auth.py",
"changes": "Add test cases for special characters in passwords",
"action_type": "create_file"
}}
]
```
IMPORTANT: Return ONLY the JSON array, no other text before or after."""
try:
if isinstance(provider, OllamaProvider):
# Use Ollama
self.logger.log(f"🤖 Calling Ollama AI to generate plan...")
return provider.generate(prompt)
elif hasattr(provider, '_generate_updated_document'):
# Use Claude's document generation
self.logger.log(f"🤖 Calling Claude AI to generate plan...")
import anthropic
client = anthropic.Anthropic(api_key=provider.api_key)
message = client.messages.create(
model="claude-sonnet-4-5",
max_tokens=4096,
messages=[{"role": "user", "content": prompt}]
)
return message.content[0].text
elif hasattr(provider, 'client'):
# Use OpenAI/ChatGPT
self.logger.log(f"🤖 Calling ChatGPT AI to generate plan...")
response = provider.client.chat.completions.create(
model="gpt-4",
messages=[{"role": "user", "content": prompt}],
max_tokens=4096
)
self.logger.log(f"✅ ChatGPT response received")
return response.choices[0].message.content
else:
self.logger.log(f"❌ Unknown provider type: {type(provider).__name__}")
return None
except Exception as e:
self.logger.log(f"❌ AI API call failed: {str(e)}")
import traceback
self.logger.log(f"❌ Traceback: {traceback.format_exc()}")
return None
def _parse_plan(self, plan_text: str) -> List[Dict[str, Any]]:
"""Parse the AI-generated plan text into structured steps"""
try:
# Extract JSON from response (might be wrapped in markdown)
json_match = re.search(r'```json\s*(\[.*?\])\s*```', plan_text, re.DOTALL)
if json_match:
json_text = json_match.group(1)
else:
# Try to find JSON array directly
json_match = re.search(r'\[.*\]', plan_text, re.DOTALL)
if json_match:
json_text = json_match.group(0)
else:
self.logger.log("⚠️ Could not find JSON in AI response")
return []
# Parse JSON
steps = json.loads(json_text)
# Validate and clean up steps
validated_steps = []
for i, step in enumerate(steps):
if isinstance(step, dict):
validated_step = {
'step_number': i + 1,
'description': step.get('description', f'Step {i+1}'),
'file_path': step.get('file_path'),
'changes': step.get('changes'),
'action_type': step.get('action_type', 'investigate'),
'completed': False,
'status': 'pending'
}
validated_steps.append(validated_step)
self.logger.log(f"✅ Successfully parsed {len(validated_steps)} steps from AI response")
return validated_steps
except json.JSONDecodeError as e:
self.logger.log(f"❌ Failed to parse JSON: {str(e)}")
self.logger.log(f"Response was: {plan_text[:500]}...")
return []
except Exception as e:
self.logger.log(f"❌ Error parsing plan: {str(e)}")
return []
def execute_plan(
self,
plan: ActionPlan,
local_repo_path: str,
progress_callback: Optional[Callable[[int, int, str], None]] = None,
log_callback: Optional[Callable[[str], None]] = None
) -> Dict[str, Any]:
"""
Execute an action plan
Args:
plan: The ActionPlan to execute
local_repo_path: Path to local git repository
progress_callback: Callback function(current_step, total_steps, message)
log_callback: Callback function for logging thought process
Returns:
Dictionary with execution results
"""
def log(message):
"""Helper to log to both logger and callback"""
self.logger.log(message)
if log_callback:
log_callback(message)
log(f"▶️ Starting execution of plan: {plan.title}")
if not local_repo_path or not Path(local_repo_path).exists():
log(f"❌ Local repository path not found: {local_repo_path}")
return {'success': False, 'error': 'Invalid local repository path'}
total_steps = len(plan.steps)
completed = 0
failed = 0
for i, step in enumerate(plan.steps):
step_num = step['step_number']
# Mark step as in-progress
step['status'] = 'in_progress'
if progress_callback:
progress_callback(i + 1, total_steps, f"Executing step {step_num}...")
log(f"\n📍 Step {step_num}/{total_steps}: {step['description']}")
try:
result = self._execute_step(step, local_repo_path, plan.context, log)
if result['success']:
step['completed'] = True
step['status'] = 'completed'
plan.completed_steps.append(step_num)
completed += 1
log(f"✅ Step {step_num} completed")
else:
step['status'] = 'failed'
step['error'] = result.get('error', 'Unknown error')
plan.failed_steps.append(step_num)
failed += 1
log(f"❌ Step {step_num} failed: {result.get('error')}")
except Exception as e:
step['status'] = 'failed'
step['error'] = str(e)
plan.failed_steps.append(step_num)
failed += 1
log(f"❌ Step {step_num} failed with exception: {str(e)}")
log(f"\n📊 Execution complete: {completed}/{total_steps} steps successful, {failed} failed")
# If we made changes successfully, commit and push them
if completed > 0:
try:
log("\n🔧 Committing and pushing changes...")
# Get PR/Issue info from context
item_type = plan.context.get('item_type', 'item')
item_number = plan.context.get('item_number', 'unknown')
item_title = plan.context.get('item_title', 'changes')
# Commit message
commit_msg = f"AI: Execute action plan for {item_type} #{item_number}\n\n{item_title}\n\nAutomated changes by GitHub Pulse AI"
# Get current branch (should be the PR branch)
import subprocess
result = subprocess.run(
['git', 'rev-parse', '--abbrev-ref', 'HEAD'],
cwd=local_repo_path,
capture_output=True,
text=True,
timeout=10
)
current_branch = result.stdout.strip() if result.returncode == 0 else 'main'
log(f"📍 Current branch: {current_branch}")
# Stage all changes
log("📝 Staging changes...")
subprocess.run(['git', 'add', '-A'], cwd=local_repo_path, check=True, timeout=10)
# Check if there are changes to commit
result = subprocess.run(
['git', 'diff', '--cached', '--quiet'],
cwd=local_repo_path,
timeout=10
)
if result.returncode != 0: # There are changes
# Commit
log("💾 Committing changes...")
subprocess.run(
['git', 'commit', '-m', commit_msg],
cwd=local_repo_path,
check=True,
timeout=10
)
# Push
log(f"🚀 Pushing to {current_branch}...")
subprocess.run(
['git', 'push', 'origin', current_branch],
cwd=local_repo_path,
check=True,
timeout=30
)
log(f"✅ Changes pushed to {current_branch}")
else:
log("️ No changes to commit")
except subprocess.TimeoutExpired:
log("⚠️ Git operation timed out")
except subprocess.CalledProcessError as e:
log(f"⚠️ Git operation failed: {e}")
except Exception as e:
log(f"⚠️ Error during git commit/push: {str(e)}")
return {
'success': failed == 0,
'completed': completed,
'failed': failed,
'total': total_steps,
'plan': plan
}
def _execute_step(self, step: Dict[str, Any], local_repo_path: str, context: Dict[str, Any], log=None) -> Dict[str, Any]:
"""Execute a single step of the plan"""
action_type = step.get('action_type', 'investigate')
file_path = step.get('file_path')
changes = step.get('changes')
# Use log function if provided, otherwise fall back to logger
log_func = log if log else self.logger.log
if action_type == 'modify_file' and file_path:
return self._modify_file(file_path, changes, local_repo_path, log_func)
elif action_type == 'create_file' and file_path:
return self._create_file(file_path, changes, local_repo_path, log_func)
elif action_type == 'delete_file' and file_path:
return self._delete_file(file_path, local_repo_path, log_func)
else:
# For investigate, test, document actions, just mark as completed
# (requires manual intervention)
log_func(f"️ Manual action required: {step['description']}")
return {'success': True, 'message': 'Manual action logged'}
def _modify_file(self, file_path: str, changes: str, local_repo_path: str, log=None) -> Dict[str, Any]:
"""Modify a file using AI"""
log_func = log if log else self.logger.log
full_path = Path(local_repo_path) / file_path
if not full_path.exists():
return {'success': False, 'error': f'File not found: {file_path}'}
try:
log_func(f"📝 Reading file: {file_path}")
# Read current content
with open(full_path, 'r', encoding='utf-8') as f:
current_content = f.read()
# Get AI provider to make changes
config = self.config_manager.get_config()
provider_name = config.get('AI_PROVIDER', 'none').lower()
provider = self._get_ai_provider(provider_name, config)
if not provider:
return {'success': False, 'error': 'AI provider not available'}
# Use AI to make the changes
log_func(f"🤖 Using AI to modify {file_path}...")
log_func(f"🔍 Analyzing changes needed...")
updated_content = provider.make_change(
file_content=current_content,
old_text=current_content[:200] + "...", # Context
new_text=changes, # What to change
file_path=str(full_path),
custom_instructions=changes
)
if updated_content and updated_content != current_content:
log_func(f"💾 Writing changes to {file_path}...")
# Write updated content
with open(full_path, 'w', encoding='utf-8') as f:
f.write(updated_content)
log_func(f"✅ Successfully modified {file_path}")
return {'success': True, 'file': file_path}
else:
return {'success': False, 'error': 'AI could not generate changes'}
except Exception as e:
return {'success': False, 'error': f'Error modifying file: {str(e)}'}
def _create_file(self, file_path: str, content: str, local_repo_path: str, log=None) -> Dict[str, Any]:
"""Create a new file"""
log_func = log if log else self.logger.log
full_path = Path(local_repo_path) / file_path
if full_path.exists():
return {'success': False, 'error': f'File already exists: {file_path}'}
try:
log_func(f"📄 Creating new file: {file_path}")
# Create parent directories if needed
full_path.parent.mkdir(parents=True, exist_ok=True)
# Create file with content
with open(full_path, 'w', encoding='utf-8') as f:
f.write(content or f"# TODO: Implement {file_path}\n")
log_func(f"✅ Created {file_path}")
return {'success': True, 'file': file_path}
except Exception as e:
return {'success': False, 'error': f'Error creating file: {str(e)}'}
def _delete_file(self, file_path: str, local_repo_path: str, log=None) -> Dict[str, Any]:
"""Delete a file"""
log_func = log if log else self.logger.log
full_path = Path(local_repo_path) / file_path
if not full_path.exists():
return {'success': False, 'error': f'File not found: {file_path}'}
try:
log_func(f"🗑️ Deleting file: {file_path}")
full_path.unlink()
log_func(f"✅ Deleted {file_path}")
return {'success': True, 'file': file_path}
except Exception as e:
return {'success': False, 'error': f'Error deleting file: {str(e)}'}
+15 -6
View File
@@ -742,6 +742,12 @@ Current file content:
class ChatGPTProvider(AIProvider):
"""ChatGPT/GPT-4 provider using OpenAI API"""
def __init__(self, api_key: str, logger: Logger):
"""Initialize ChatGPT provider with OpenAI client"""
super().__init__(api_key, logger)
import openai
self.client = openai.OpenAI(api_key=api_key)
def make_change(self, file_content: str, old_text: str, new_text: str, file_path: str, custom_instructions: str = None) -> Optional[str]:
"""Make smart, targeted changes based on reference text and suggestions
@@ -772,10 +778,10 @@ class ChatGPTProvider(AIProvider):
def _generate_updated_document_chatgpt(self, file_content: str, old_text: str, new_text: str, file_path: str, custom_instructions: str = None) -> Optional[str]:
"""Generate updated document content using ChatGPT"""
try:
import openai
client = openai.OpenAI(api_key=self.api_key)
# Use the client initialized in __init__
client = self.client
# Build custom instructions text
if custom_instructions and custom_instructions.strip():
@@ -3358,16 +3364,19 @@ class AIManager:
# Anthropic/Claude
elif provider_name in ['claude', 'anthropic']:
api_key = config.get('ANTHROPIC_API_KEY', '')
# Try both CLAUDE_API_KEY and ANTHROPIC_API_KEY for compatibility
api_key = config.get('CLAUDE_API_KEY', '')
if not api_key:
return "Error: Anthropic API key not configured"
api_key = config.get('ANTHROPIC_API_KEY', '')
if not api_key:
return "Error: Claude API key not configured (tried both CLAUDE_API_KEY and ANTHROPIC_API_KEY)"
try:
import anthropic
client = anthropic.Anthropic(api_key=api_key)
response = client.messages.create(
model=config.get('ANTHROPIC_MODEL', 'claude-3-5-sonnet-20241022'),
model=config.get('ANTHROPIC_MODEL', 'claude-sonnet-4-5'),
max_tokens=2000,
messages=[
{"role": "user", "content": prompt}
+421
View File
@@ -104,6 +104,15 @@ class MainGUI:
# Initialize logger
self.logger = None # Will be set after UI is created
# AI Action Plan state
self.current_action_plan = None
self.plan_display_ref = ft.Ref[ft.Column]()
self.plan_progress_ref = ft.Ref[ft.ProgressBar]()
self.plan_status_ref = ft.Ref[ft.Text]()
self.execute_plan_button_ref = ft.Ref[ft.ElevatedButton]()
self.generate_plan_button_ref = ft.Ref[ft.ElevatedButton]()
self.ai_instructions_ref = ft.Ref[ft.TextField]()
# Register settings change listener for live updates
self.config_manager.register_listener(self._on_settings_changed)
@@ -457,6 +466,11 @@ class MainGUI:
icon=ft.icons.DIFFERENCE,
content=self._create_diff_tab()
),
ft.Tab(
text="AI Action Plan",
icon=ft.icons.AUTO_AWESOME,
content=self._create_ai_plan_tab()
),
],
expand=True,
)
@@ -1538,6 +1552,10 @@ Description:
# Store the active item
self.active_workflow_item = item
# Enable the Generate Plan button when an item is selected
if self.generate_plan_button_ref.current:
self.generate_plan_button_ref.current.disabled = False
# Determine display labels
repo_label = "Target" if item.repo_source == "target" else "Fork"
repo_color = ft.colors.BLUE if item.repo_source == "target" else ft.colors.PURPLE
@@ -2683,6 +2701,409 @@ Description:
self.diff_text_ref.current.value = diff_content
self.page.update()
def _create_ai_plan_tab(self) -> ft.Container:
"""Create the AI Action Plan tab"""
# Plan display area (initially empty)
plan_display = ft.Column(
ref=self.plan_display_ref,
controls=[
ft.Container(
content=ft.Column([
ft.Icon(ft.icons.AUTO_AWESOME, size=64, color=ft.colors.BLUE_400),
ft.Text(
"AI Action Plan",
size=24,
weight=ft.FontWeight.BOLD,
color=ft.colors.BLUE_400,
),
ft.Text(
"Select a PR or Issue and generate an AI action plan",
size=14,
color=ft.colors.GREY_600,
text_align=ft.TextAlign.CENTER,
),
], horizontal_alignment=ft.CrossAxisAlignment.CENTER, spacing=10),
alignment=ft.alignment.center,
expand=True,
)
],
spacing=10,
scroll=ft.ScrollMode.AUTO,
expand=True,
)
# AI Instructions input
ai_instructions = ft.TextField(
ref=self.ai_instructions_ref,
label="Additional Instructions (Optional)",
hint_text="Add any specific requirements or context for the AI...",
multiline=True,
min_lines=2,
max_lines=4,
)
# Buttons row
buttons_row = ft.Row(
[
ft.ElevatedButton(
ref=self.generate_plan_button_ref,
text="Generate Plan",
icon=ft.icons.PSYCHOLOGY,
on_click=self._on_generate_plan_click,
disabled=True, # Enable when item is selected
),
ft.ElevatedButton(
ref=self.execute_plan_button_ref,
text="Execute Plan",
icon=ft.icons.PLAY_ARROW,
on_click=self._on_execute_plan_click,
disabled=True, # Enable when plan is generated
style=ft.ButtonStyle(
color=ft.colors.WHITE,
bgcolor=ft.colors.GREEN_700,
),
),
],
spacing=10,
)
# Progress bar (indeterminate/animated when visible)
progress_bar = ft.ProgressBar(
ref=self.plan_progress_ref,
visible=False,
color=ft.colors.BLUE_400,
bgcolor=ft.colors.BLUE_GREY_800,
)
# Status text
status_text = ft.Text(
ref=self.plan_status_ref,
value="",
color=ft.colors.BLUE_400,
size=12,
)
# Main layout
return ft.Container(
content=ft.Column(
[
ai_instructions,
buttons_row,
progress_bar,
status_text,
ft.Divider(),
plan_display,
],
spacing=10,
expand=True,
),
padding=20,
expand=True,
)
def _on_generate_plan_click(self, e):
"""Generate AI action plan for current item"""
if not self.active_workflow_item:
self._show_snackbar("Please select a PR or Issue first", error=True)
return
# Run in thread to avoid blocking UI
import threading
thread = threading.Thread(target=self._generate_plan_async)
thread.start()
def _generate_plan_async(self):
"""Generate plan asynchronously"""
try:
# Update UI - disable both buttons during generation
if self.generate_plan_button_ref.current:
self.generate_plan_button_ref.current.disabled = True
if self.execute_plan_button_ref.current:
self.execute_plan_button_ref.current.disabled = True
if self.plan_status_ref.current:
self.plan_status_ref.current.value = "🤖 Generating action plan..."
if self.plan_progress_ref.current:
self.plan_progress_ref.current.visible = True
self.page.update()
# Create action planner
from .ai_action_planner import AIActionPlanner
planner = AIActionPlanner(self.ai_manager, self.logger, self.config_manager)
# Get custom instructions
custom_instructions = ""
if self.ai_instructions_ref.current:
custom_instructions = self.ai_instructions_ref.current.value or ""
# Generate plan
plan = planner.generate_plan(self.active_workflow_item, custom_instructions)
if plan:
self.current_action_plan = plan
self._display_action_plan(plan)
if self.plan_status_ref.current:
self.plan_status_ref.current.value = f"✅ Plan generated with {len(plan.steps)} steps"
if self.execute_plan_button_ref.current:
self.execute_plan_button_ref.current.disabled = False
else:
if self.plan_status_ref.current:
self.plan_status_ref.current.value = "❌ Failed to generate plan"
except Exception as ex:
import traceback
self.logger.log(f"❌ Error generating plan: {str(ex)}")
self.logger.log(f"❌ Traceback: {traceback.format_exc()}")
if self.plan_status_ref.current:
self.plan_status_ref.current.value = f"❌ Error: {str(ex)}"
finally:
# Re-enable Generate Plan button
if self.generate_plan_button_ref.current:
self.generate_plan_button_ref.current.disabled = False
# Only enable Execute Plan if we have a valid plan
if self.execute_plan_button_ref.current and self.current_action_plan:
self.execute_plan_button_ref.current.disabled = False
if self.plan_progress_ref.current:
self.plan_progress_ref.current.visible = False
self.page.update()
def _display_action_plan(self, plan):
"""Display the generated action plan"""
if not self.plan_display_ref.current:
return
steps_widgets = []
# Plan header
steps_widgets.append(
ft.Container(
content=ft.Column([
ft.Text(plan.title, size=18, weight=ft.FontWeight.BOLD),
ft.Text(
f"Context: {plan.context.get('item_type', 'unknown').upper()} #{plan.context.get('item_number', '?')}",
size=12,
color=ft.colors.GREY_600
),
]),
padding=10,
bgcolor=ft.colors.BLUE_GREY_900,
border_radius=5,
)
)
# Individual steps
for step in plan.steps:
step_num = step['step_number']
description = step['description']
file_path = step.get('file_path', 'N/A')
action_type = step.get('action_type', 'unknown')
status = step.get('status', 'pending')
# Status icon and color based on step status
if status == 'completed':
icon = ft.icons.CHECK_CIRCLE
icon_color = ft.colors.GREEN
bg_color = None # No special background
elif status == 'failed':
icon = ft.icons.ERROR
icon_color = ft.colors.RED
bg_color = None
elif status == 'in_progress':
icon = ft.icons.TIMELAPSE # Animated-looking icon
icon_color = ft.colors.BLUE_400
bg_color = ft.colors.BLUE_GREY_800 # Highlight in-progress steps
else: # pending
icon = ft.icons.RADIO_BUTTON_UNCHECKED
icon_color = ft.colors.GREY
bg_color = None
step_widget = ft.Container(
content=ft.Row([
ft.Icon(icon, color=icon_color, size=20),
ft.Column([
ft.Text(f"Step {step_num}: {description}", weight=ft.FontWeight.BOLD, size=14),
ft.Text(f"Action: {action_type}", size=11, color=ft.colors.BLUE_400),
ft.Text(f"File: {file_path}", size=11, color=ft.colors.GREY_600) if file_path != 'N/A' else ft.Container(),
], spacing=2, expand=True),
], spacing=10),
padding=10,
border=ft.border.all(1, ft.colors.GREY_700),
border_radius=5,
bgcolor=bg_color, # Highlight in-progress steps
)
steps_widgets.append(step_widget)
# Update display
self.plan_display_ref.current.controls = steps_widgets
self.page.update()
def _on_execute_plan_click(self, e):
"""Execute the current action plan"""
if not self.current_action_plan:
self._show_snackbar("No plan to execute", error=True)
return
# Run in thread
import threading
thread = threading.Thread(target=self._execute_plan_async)
thread.start()
def _execute_plan_async(self):
"""Execute plan asynchronously"""
try:
self.logger.log("🔧 Starting _execute_plan_async...")
# Update UI - disable both buttons during execution
if self.execute_plan_button_ref.current:
self.execute_plan_button_ref.current.disabled = True
if self.generate_plan_button_ref.current:
self.generate_plan_button_ref.current.disabled = True
if self.plan_status_ref.current:
self.plan_status_ref.current.value = "▶️ Executing plan..."
if self.plan_progress_ref.current:
self.plan_progress_ref.current.visible = True
self.page.update()
# Get local repo path
config = self.config_manager.get_config()
local_repo_path = config.get('LOCAL_REPO_PATH', '')
self.logger.log(f"🔧 Local repo path: {local_repo_path}")
if not local_repo_path:
self.logger.log("❌ LOCAL_REPO_PATH is not set")
self._show_snackbar("Please set LOCAL_REPO_PATH in settings", error=True)
if self.plan_status_ref.current:
self.plan_status_ref.current.value = "❌ LOCAL_REPO_PATH not set in settings"
self.page.update()
return
# Create action planner
from .ai_action_planner import AIActionPlanner
planner = AIActionPlanner(self.ai_manager, self.logger, self.config_manager)
self.logger.log(f"🔧 Created AIActionPlanner, about to execute plan with {len(self.current_action_plan.steps)} steps")
# Execute with progress callback
def progress_callback(current, total, message):
if self.plan_status_ref.current:
self.plan_status_ref.current.value = f"▶️ {message} ({current}/{total})"
# Update the plan display in real-time to show progress
if self.current_action_plan:
self._display_action_plan(self.current_action_plan)
self.page.update()
# Execute with logging callback for thought process
def log_callback(message):
# Log to the processing log
self.logger.log(message)
# Also show in status if it's important
if any(keyword in message for keyword in ["🤖", "", "", "📝", "🔍"]):
if self.plan_status_ref.current:
self.plan_status_ref.current.value = message
self.page.update()
result = planner.execute_plan(
self.current_action_plan,
local_repo_path,
progress_callback,
log_callback
)
self.logger.log(f"🔧 execute_plan returned: {result}")
# Update display
self._display_action_plan(self.current_action_plan)
# Show completion dialog
if result['success']:
self._show_completion_dialog(result)
else:
if self.plan_status_ref.current:
self.plan_status_ref.current.value = f"⚠️ Plan completed with errors: {result['failed']}/{result['total']} steps failed"
except Exception as ex:
import traceback
self.logger.log(f"❌ Error executing plan: {str(ex)}")
self.logger.log(f"❌ Traceback: {traceback.format_exc()}")
if self.plan_status_ref.current:
self.plan_status_ref.current.value = f"❌ Error: {str(ex)}"
finally:
# Re-enable both buttons after execution
if self.execute_plan_button_ref.current:
self.execute_plan_button_ref.current.disabled = False
if self.generate_plan_button_ref.current:
self.generate_plan_button_ref.current.disabled = False
if self.plan_progress_ref.current:
self.plan_progress_ref.current.visible = False
self.page.update()
def _show_completion_dialog(self, result):
"""Show dialog after plan execution with options to push/create PR"""
context = self.current_action_plan.context
item_type = context.get('item_type', 'unknown')
item_number = context.get('item_number', '?')
def close_dialog(e):
dialog.open = False
self.page.update()
def push_changes(e):
dialog.open = False
self.page.update()
self._push_changes_async()
def create_pr(e):
dialog.open = False
self.page.update()
self._create_pr_from_plan_async()
dialog = ft.AlertDialog(
title=ft.Text("✅ Plan Execution Complete!"),
content=ft.Column([
ft.Text(f"Successfully completed {result['completed']}/{result['total']} steps"),
ft.Divider(),
ft.Text("What would you like to do next?", weight=ft.FontWeight.BOLD),
ft.Text(f"• Push changes to the {item_type} branch", size=12),
ft.Text(f"• Create/Update PR for these changes", size=12),
ft.Text(f"• Review changes manually", size=12),
], tight=True, spacing=10),
actions=[
ft.TextButton("Review Later", on_click=close_dialog),
ft.ElevatedButton(
"Push Changes",
icon=ft.icons.CLOUD_UPLOAD,
on_click=push_changes,
),
ft.ElevatedButton(
"Create/Update PR",
icon=ft.icons.MERGE_TYPE,
on_click=create_pr,
style=ft.ButtonStyle(bgcolor=ft.colors.GREEN_700),
),
],
actions_alignment=ft.MainAxisAlignment.END,
)
self.page.dialog = dialog
dialog.open = True
self.page.update()
def _push_changes_async(self):
"""Push changes to git repository"""
# This will be implemented to push changes
self.logger.log("🚀 Pushing changes to repository...")
# TODO: Implement git push logic
self._show_snackbar("Push functionality coming soon!")
def _create_pr_from_plan_async(self):
"""Create or update PR from executed plan"""
# This will be implemented to create/update PR
self.logger.log("📝 Creating/updating PR...")
# TODO: Implement PR creation logic
self._show_snackbar("PR creation functionality coming soon!")
class Logger:
"""Logger class for Flet"""
+55 -9
View File
@@ -771,6 +771,7 @@ class SettingsDialog:
if not path_str:
path_str = str(Path.home() / "Downloads" / "github_repos")
print(f"🔍 Scanning for repos in: {path_str}")
base_path = Path(path_str)
if not base_path.exists():
@@ -783,26 +784,70 @@ class SettingsDialog:
# Scan for git repositories
repos = []
try:
for owner_dir in base_path.iterdir():
if not owner_dir.is_dir():
# First, check if repos are directly in the base path (flat structure)
for item in base_path.iterdir():
if not item.is_dir():
continue
for repo_dir in owner_dir.iterdir():
if not repo_dir.is_dir():
continue
git_dir = item / ".git"
if git_dir.exists():
# This is a git repo directly in the base path
# Try to get the remote origin to get owner/repo format
try:
import subprocess
result = subprocess.run(
['git', 'config', '--get', 'remote.origin.url'],
cwd=str(item),
capture_output=True,
text=True,
timeout=5
)
git_dir = repo_dir / ".git"
if git_dir.exists():
repo_name = f"{owner_dir.name}/{repo_dir.name}"
repos.append(repo_name)
if result.returncode == 0:
url = result.stdout.strip()
# Parse GitHub URL to get owner/repo
if 'github.com' in url:
# Handle both HTTPS and SSH URLs
if url.startswith('https://'):
# https://github.com/owner/repo.git
parts = url.replace('https://github.com/', '').replace('.git', '').split('/')
if len(parts) >= 2:
repo_name = f"{parts[0]}/{parts[1]}"
repos.append(repo_name)
continue
elif url.startswith('git@'):
# git@github.com:owner/repo.git
parts = url.replace('git@github.com:', '').replace('.git', '').split('/')
if len(parts) >= 2:
repo_name = f"{parts[0]}/{parts[1]}"
repos.append(repo_name)
continue
except:
pass
# Fallback: use directory name
repos.append(f"local/{item.name}")
else:
# Check if this is an owner directory with repos inside (nested structure)
for repo_dir in item.iterdir():
if not repo_dir.is_dir():
continue
git_dir = repo_dir / ".git"
if git_dir.exists():
repo_name = f"{item.name}/{repo_dir.name}"
repos.append(repo_name)
except Exception as e:
print(f"Error scanning repos: {e}")
import traceback
traceback.print_exc()
# Update dropdown
if self.detected_repos_dropdown_ref.current:
if repos:
repos.sort()
print(f"✅ Found {len(repos)} repo(s): {', '.join(repos)}")
self.detected_repos_dropdown_ref.current.options = [
ft.dropdown.Option(repo) for repo in repos
]
@@ -811,6 +856,7 @@ class SettingsDialog:
else:
self.detected_repos_dropdown_ref.current.value = f'{len(repos)} repo(s) found - select one'
else:
print(f"❌ No git repositories found in {path_str}")
self.detected_repos_dropdown_ref.current.value = 'No git repositories found'
self.detected_repos_dropdown_ref.current.options = []