Implement AI response generation and enhance workflow item display with PR file fetching and comments

This commit is contained in:
b-tsammmons
2025-11-12 16:34:14 -10:00
parent 5ba48e4c58
commit 376bc62349
3 changed files with 831 additions and 119 deletions
+108 -4
View File
@@ -3397,18 +3397,122 @@ class AIManager:
"""Create a LocalGitManager instance"""
if not AI_PROVIDERS_AVAILABLE:
return None
try:
ai_logger = Logger(self.log)
return LocalGitManager(ai_logger, github_token)
except Exception as e:
self.log(f"Error creating LocalGitManager: {e}")
return None
def get_last_diff_content(self) -> str:
"""Get the last generated diff content for display in the UI"""
return self.last_diff_content
def clear_diff_content(self):
"""Clear the stored diff content"""
self.last_diff_content = ""
self.last_diff_content = ""
def generate_response(self, prompt: str, provider_name: str, config: dict) -> str:
"""Generate a text response from an AI provider
Args:
prompt: The prompt/question to send to the AI
provider_name: Name of the AI provider ('chatgpt', 'claude', 'ollama', etc.)
config: Configuration dictionary containing API keys and settings
Returns:
str: The AI-generated response
"""
try:
provider_name = provider_name.lower()
# OpenAI/ChatGPT
if provider_name in ['chatgpt', 'openai', 'gpt']:
api_key = config.get('OPENAI_API_KEY', '')
if not api_key:
return "Error: OpenAI API key not configured"
try:
import openai
client = openai.OpenAI(api_key=api_key)
response = client.chat.completions.create(
model=config.get('OPENAI_MODEL', 'gpt-4'),
messages=[
{"role": "system", "content": "You are a helpful assistant that analyzes GitHub pull requests and issues."},
{"role": "user", "content": prompt}
],
max_tokens=2000,
temperature=0.7
)
return response.choices[0].message.content.strip()
except Exception as e:
self.log(f"Error calling OpenAI API: {e}")
return f"Error calling OpenAI API: {str(e)}"
# Anthropic/Claude
elif provider_name in ['claude', 'anthropic']:
api_key = config.get('ANTHROPIC_API_KEY', '')
if not api_key:
return "Error: Anthropic API key not configured"
try:
import anthropic
client = anthropic.Anthropic(api_key=api_key)
response = client.messages.create(
model=config.get('ANTHROPIC_MODEL', 'claude-3-5-sonnet-20241022'),
max_tokens=2000,
messages=[
{"role": "user", "content": prompt}
]
)
return response.content[0].text.strip()
except Exception as e:
self.log(f"Error calling Anthropic API: {e}")
return f"Error calling Anthropic API: {str(e)}"
# Ollama
elif provider_name == 'ollama':
ollama_url = config.get('OLLAMA_URL', 'http://localhost:11434')
ollama_model = config.get('OLLAMA_MODEL', 'llama2')
try:
import requests
# Normalize URL
if not ollama_url.startswith('http'):
ollama_url = f"http://{ollama_url}"
# Remove trailing slash
ollama_url = ollama_url.rstrip('/')
api_url = f"{ollama_url}/api/generate"
payload = {
"model": ollama_model,
"prompt": prompt,
"stream": False
}
response = requests.post(api_url, json=payload, timeout=120)
response.raise_for_status()
result = response.json()
return result.get('response', '').strip()
except Exception as e:
self.log(f"Error calling Ollama API: {e}")
return f"Error calling Ollama API: {str(e)}"
else:
return f"Error: Unknown AI provider '{provider_name}'"
except Exception as e:
self.log(f"Error in generate_response: {e}")
return f"Error generating response: {str(e)}"
+666 -115
View File
@@ -468,125 +468,41 @@ class MainGUI:
def _create_current_item_tab(self) -> ft.Container:
"""Create the current item tab"""
# Navigation buttons
nav_buttons = ft.Row(
# Create a container to hold the dynamic content
self.current_item_content_ref = ft.Ref[ft.Column]()
# Default empty state
default_content = ft.Column(
[
ft.Container(expand=True),
ft.ElevatedButton(
"Go",
ref=self.go_button_ref,
icon=ft.icons.PLAY_ARROW,
on_click=self._create_github_resource,
disabled=True,
ft.Container(
content=ft.Column([
ft.Icon(ft.icons.INBOX, size=64, color=ft.colors.GREY_500),
ft.Text(
"No item selected",
size=18,
weight=ft.FontWeight.BOLD,
color=ft.colors.GREY_500,
),
ft.Text(
"Select a PR or Issue from the sidebar to view details",
size=14,
color=ft.colors.GREY_600,
text_align=ft.TextAlign.CENTER,
),
], horizontal_alignment=ft.CrossAxisAlignment.CENTER, spacing=10),
alignment=ft.alignment.center,
expand=True,
),
],
alignment=ft.MainAxisAlignment.SPACE_BETWEEN,
)
# Work Item ID (clickable)
work_item_id = ft.Text(
ref=self.work_item_id_ref,
value="No item selected",
size=16,
weight=ft.FontWeight.BOLD,
color="blue",
)
# Fields
nature_text = ft.TextField(
ref=self.nature_text_ref,
label="Nature of Request",
multiline=True,
min_lines=2,
max_lines=4,
read_only=True,
expand=True,
)
live_doc_url = ft.TextField(
ref=self.live_doc_url_ref,
label="Live Doc URL",
read_only=True,
expand=True,
)
text_to_change = ft.TextField(
ref=self.text_to_change_ref,
label="Text to Change",
multiline=True,
min_lines=5,
max_lines=10,
read_only=True,
expand=True,
)
# Proposed New Text with Edit button
proposed_header = ft.Row(
[
ft.Text("Proposed New Text", weight=ft.FontWeight.BOLD),
ft.Container(expand=True),
ft.IconButton(
ref=self.edit_button_ref,
icon=ft.icons.EDIT,
tooltip="Edit",
on_click=self._toggle_edit_mode,
disabled=True,
),
],
)
proposed_new_text = ft.TextField(
ref=self.proposed_new_text_ref,
multiline=True,
min_lines=5,
max_lines=10,
read_only=True,
expand=True,
)
# Custom Instructions
custom_instructions_header = ft.Row(
[
ft.Text("Custom AI Instructions", weight=ft.FontWeight.BOLD),
ft.Container(expand=True),
ft.IconButton(
icon=ft.icons.SAVE,
tooltip="Save Instructions",
on_click=self.save_custom_instructions,
),
ft.IconButton(
icon=ft.icons.DELETE,
tooltip="Clear Instructions",
on_click=self.clear_custom_instructions,
),
],
)
custom_instructions = ft.TextField(
ref=self.custom_instructions_ref,
hint_text="Enter custom instructions for AI processing...",
multiline=True,
min_lines=3,
max_lines=6,
expand=True,
ref=self.current_item_content_ref,
spacing=15,
scroll=ft.ScrollMode.AUTO,
)
return ft.Container(
content=ft.ListView(
controls=[
nav_buttons,
work_item_id,
ft.Divider(),
nature_text,
live_doc_url,
text_to_change,
proposed_header,
proposed_new_text,
ft.Divider(),
custom_instructions_header,
custom_instructions,
],
spacing=15,
controls=[default_content],
spacing=0,
padding=20,
),
expand=True,
@@ -783,9 +699,644 @@ class MainGUI:
print("DEBUG: page.update() completed")
def _display_workflow_item(self, item):
"""Display a workflow item"""
# Implementation would populate fields with workflow item data
pass
"""Display a workflow item in the Current Item tab"""
if not self.current_item_content_ref.current:
return
# Get repo string based on source
config = self.config_manager.get_config()
if item.repo_source == "target":
repo_str = config.get('GITHUB_REPO', '')
else:
repo_str = config.get('FORKED_REPO', '')
# Fetch comments
comments = []
pr_files = []
try:
workflow_manager = self._get_workflow_manager()
comments = workflow_manager.fetch_comments(repo_str, item.number, item.item_type == "pull_request")
# Fetch PR files if this is a pull request
if item.item_type == "pull_request":
pr_files = workflow_manager.fetch_pr_files(repo_str, item.number)
except Exception as e:
print(f"Error fetching item details: {e}")
if self.logger:
self.logger.log(f"Error fetching item details: {e}")
# Build the display
controls = []
# Header section
header = ft.Container(
content=ft.Column([
ft.Row([
ft.Container(
content=ft.Text(
"PR" if item.item_type == "pull_request" else "Issue",
size=12,
weight=ft.FontWeight.BOLD,
color=ft.colors.WHITE,
),
bgcolor=ft.colors.GREEN if item.item_type == "pull_request" else ft.colors.ORANGE,
padding=ft.padding.symmetric(horizontal=8, vertical=4),
border_radius=4,
),
ft.Text(f"#{item.number}", size=18, weight=ft.FontWeight.BOLD),
ft.Container(expand=True),
ft.IconButton(
icon=ft.icons.OPEN_IN_BROWSER,
tooltip="Open in GitHub",
on_click=lambda e: self.page.launch_url(item.url),
),
], alignment=ft.MainAxisAlignment.START),
ft.Text(item.title, size=20, weight=ft.FontWeight.BOLD),
], spacing=8),
padding=15,
bgcolor=ft.colors.BLUE_GREY_900,
border_radius=8,
)
controls.append(header)
# Basic Info section
info_items = [
ft.Row([
ft.Icon(ft.icons.PERSON, size=16, color=ft.colors.BLUE_400),
ft.Text("Created by:", weight=ft.FontWeight.BOLD, size=14),
ft.Text(f"@{item.author}", size=14, color=ft.colors.BLUE_300),
], spacing=5),
ft.Row([
ft.Icon(ft.icons.CALENDAR_TODAY, size=16, color=ft.colors.BLUE_400),
ft.Text("Created:", weight=ft.FontWeight.BOLD, size=14),
ft.Text(item.created_at[:10] if item.created_at else 'Unknown', size=14),
], spacing=5),
ft.Row([
ft.Icon(ft.icons.UPDATE, size=16, color=ft.colors.BLUE_400),
ft.Text("Last Updated:", weight=ft.FontWeight.BOLD, size=14),
ft.Text(item.updated_at[:10] if item.updated_at else 'Unknown', size=14),
], spacing=5),
ft.Row([
ft.Icon(ft.icons.CIRCLE, size=16, color=ft.colors.GREEN if item.state == "open" else ft.colors.PURPLE),
ft.Text("Status:", weight=ft.FontWeight.BOLD, size=14),
ft.Text(item.state.capitalize(), size=14, color=ft.colors.GREEN if item.state == "open" else ft.colors.PURPLE),
], spacing=5),
]
# Add assignees with assign-to-self button
if item.assignees:
assignees_text = ", ".join([f"@{a}" for a in item.assignees])
info_items.append(
ft.Row([
ft.Icon(ft.icons.ASSIGNMENT_IND, size=16, color=ft.colors.BLUE_400),
ft.Text("Assigned to:", weight=ft.FontWeight.BOLD, size=14),
ft.Text(assignees_text, size=14, color=ft.colors.BLUE_300),
ft.IconButton(
icon=ft.icons.PERSON_ADD,
icon_size=16,
tooltip="Assign to me",
on_click=lambda _: self._assign_to_self(item, repo_str),
),
], spacing=5)
)
else:
info_items.append(
ft.Row([
ft.Icon(ft.icons.ASSIGNMENT_IND, size=16, color=ft.colors.GREY_600),
ft.Text("Assigned to:", weight=ft.FontWeight.BOLD, size=14),
ft.Text("Unassigned", size=14, color=ft.colors.GREY_500, italic=True),
ft.IconButton(
icon=ft.icons.PERSON_ADD,
icon_size=16,
tooltip="Assign to me",
on_click=lambda _: self._assign_to_self(item, repo_str),
),
], spacing=5)
)
# PR-specific info
if item.item_type == "pull_request":
merge_status_color = ft.colors.GREEN if item.merged else (ft.colors.ORANGE if item.state == "open" else ft.colors.GREY_600)
merge_status_text = "Merged" if item.merged else ("Pending Merge" if item.state == "open" else "Closed without merge")
info_items.append(
ft.Row([
ft.Icon(ft.icons.MERGE_TYPE, size=16, color=merge_status_color),
ft.Text("Merge Status:", weight=ft.FontWeight.BOLD, size=14),
ft.Text(merge_status_text, size=14, color=merge_status_color),
], spacing=5)
)
info_section = ft.Container(
content=ft.Column(info_items, spacing=8),
padding=15,
border=ft.border.all(1, ft.colors.OUTLINE),
border_radius=8,
)
controls.append(info_section)
# Description section (collapsible, collapsed by default)
description_section = ft.ExpansionTile(
title=ft.Text("Description", size=16, weight=ft.FontWeight.BOLD),
subtitle=ft.Text("Click to expand", size=12, color=ft.colors.GREY_500),
initially_expanded=False,
controls=[
ft.Container(
content=ft.Container(
content=ft.Row([
ft.Text(
item.body if item.body else "No description provided",
size=14,
selectable=True,
),
], spacing=5),
padding=10,
border=ft.border.all(1, ft.colors.OUTLINE),
border_radius=4,
bgcolor=ft.colors.GREY_900,
),
margin=ft.margin.only(left=10, right=10, bottom=10),
),
],
)
controls.append(
ft.Container(
content=description_section,
border=ft.border.all(1, ft.colors.OUTLINE),
border_radius=8,
)
)
# PR Files section
if item.item_type == "pull_request" and pr_files:
files_widgets = []
for file in pr_files:
status_color = {
'added': ft.colors.GREEN,
'removed': ft.colors.RED,
'modified': ft.colors.ORANGE,
'renamed': ft.colors.BLUE,
}.get(file['status'], ft.colors.GREY_400)
files_widgets.append(
ft.Container(
content=ft.Row([
ft.Icon(ft.icons.INSERT_DRIVE_FILE, size=16, color=status_color),
ft.Text(file['filename'], size=13, expand=True),
ft.Container(
content=ft.Text(file['status'], size=11, color=ft.colors.WHITE),
bgcolor=status_color,
padding=ft.padding.symmetric(horizontal=6, vertical=2),
border_radius=3,
),
ft.Text(f"+{file['additions']} -{file['deletions']}",
size=12,
color=ft.colors.GREY_400),
], spacing=8),
padding=8,
border=ft.border.all(1, ft.colors.OUTLINE),
border_radius=4,
bgcolor=ft.colors.GREY_900,
)
)
files_section = ft.Container(
content=ft.Column([
ft.Text(f"Modified Files ({len(pr_files)})", size=16, weight=ft.FontWeight.BOLD),
ft.Column(
controls=files_widgets,
spacing=5,
scroll=ft.ScrollMode.AUTO,
height=min(200, len(pr_files) * 50),
),
], spacing=8),
padding=15,
border=ft.border.all(1, ft.colors.OUTLINE),
border_radius=8,
)
controls.append(files_section)
# Comments section (collapsible, collapsed by default)
comments_widgets = []
if comments:
for comment in comments:
comments_widgets.append(
ft.Container(
content=ft.Column([
ft.Row([
ft.Icon(ft.icons.PERSON, size=14),
ft.Text(f"@{comment['user']}", weight=ft.FontWeight.BOLD, size=13),
ft.Text(
comment['created_at'][:10] if comment.get('created_at') else '',
size=11,
color=ft.colors.GREY_600
),
], spacing=5),
ft.Text(comment['body'], size=13, selectable=True),
], spacing=5),
padding=10,
border=ft.border.all(1, ft.colors.OUTLINE),
border_radius=4,
bgcolor=ft.colors.GREY_900,
)
)
else:
comments_widgets.append(
ft.Text("No comments yet", italic=True, color=ft.colors.GREY_500, size=13)
)
comments_section = ft.ExpansionTile(
title=ft.Text(f"Comments ({len(comments)})", size=16, weight=ft.FontWeight.BOLD),
subtitle=ft.Text("Click to expand", size=12, color=ft.colors.GREY_500),
initially_expanded=False,
controls=[
ft.Container(
content=ft.Column(
controls=comments_widgets,
spacing=8,
scroll=ft.ScrollMode.AUTO,
height=min(250, max(100, len(comments) * 80)),
),
margin=ft.margin.only(left=10, right=10, bottom=10),
),
],
)
controls.append(
ft.Container(
content=comments_section,
border=ft.border.all(1, ft.colors.OUTLINE),
border_radius=8,
)
)
# AI Analysis section (placeholder for now)
self.ai_analysis_result_ref = ft.Ref[ft.Column]()
ai_section = self._create_ai_analysis_section(item, repo_str, pr_files, comments)
controls.append(ai_section)
# Update the content
self.current_item_content_ref.current.controls = controls
self.page.update()
def _create_ai_analysis_section(self, item, repo_str, pr_files, comments):
"""Create the AI Analysis section"""
# Check if AI provider is configured
config = self.config_manager.get_config()
ai_provider = config.get('AI_PROVIDER', 'none').lower()
ai_configured = ai_provider and ai_provider != 'none'
# Create result container
ai_result_container = ft.Column(
ref=self.ai_analysis_result_ref,
controls=[],
spacing=10,
)
# Create analyze button or disabled message
if ai_configured:
# Create a wrapper function that captures the parameters
async def run_analysis_wrapper():
await self._run_ai_analysis_async(item, repo_str, pr_files, comments)
analyze_button = ft.ElevatedButton(
"Run AI Analysis",
icon=ft.icons.AUTO_AWESOME,
on_click=lambda _: self.page.run_task(run_analysis_wrapper),
)
button_row = ft.Row([
analyze_button,
ft.Text(
f"Using {ai_provider.upper()}",
size=12,
color=ft.colors.BLUE_300,
italic=True,
),
], spacing=10)
else:
button_row = ft.Container(
content=ft.Row([
ft.Icon(ft.icons.INFO_OUTLINE, size=16, color=ft.colors.ORANGE),
ft.Text(
"AI Analysis is not available. Please configure an AI provider in Settings.",
size=13,
color=ft.colors.ORANGE,
),
], spacing=8),
padding=10,
border=ft.border.all(1, ft.colors.ORANGE),
border_radius=4,
bgcolor=ft.colors.GREY_900,
)
ai_section = ft.Container(
content=ft.Column([
ft.Text("AI Analysis", size=16, weight=ft.FontWeight.BOLD),
ft.Text(
"For PRs: Analyze changes and create a summary. For Issues: Find relevant files and suggest fixes.",
size=12,
color=ft.colors.GREY_400,
),
button_row,
ai_result_container,
], spacing=10),
padding=15,
border=ft.border.all(1, ft.colors.OUTLINE),
border_radius=8,
)
return ai_section
async def _run_ai_analysis_async(self, item, repo_str, pr_files, comments):
"""Run AI analysis on the selected item"""
if not self.ai_analysis_result_ref.current:
return
# Show loading state
self.ai_analysis_result_ref.current.controls = [
ft.Container(
content=ft.Row([
ft.ProgressRing(width=16, height=16),
ft.Text("Analyzing...", size=14),
], spacing=10),
padding=10,
)
]
self.page.update()
def run_analysis():
try:
config = self.config_manager.get_config()
ai_provider = config.get('AI_PROVIDER', 'none').lower()
if item.item_type == "pull_request":
# PR Analysis: Summarize changes
result = self._analyze_pr(item, repo_str, pr_files, comments, ai_provider, config)
else:
# Issue Analysis: Find files and suggest fixes
result = self._analyze_issue(item, repo_str, comments, ai_provider, config)
return result
except Exception as e:
error_msg = f"Error during AI analysis: {str(e)}"
if self.logger:
self.logger.log(error_msg)
return {
'success': False,
'error': error_msg
}
# Run in thread
result = await asyncio.to_thread(run_analysis)
# Display results
if result.get('success'):
result_widgets = [
ft.Container(
content=ft.Column([
ft.Row([
ft.Icon(ft.icons.CHECK_CIRCLE, size=16, color=ft.colors.GREEN),
ft.Text("Analysis Complete", weight=ft.FontWeight.BOLD, size=14, color=ft.colors.GREEN),
], spacing=5),
ft.Divider(height=10),
ft.Text(result.get('summary', ''), size=13, selectable=True),
], spacing=10),
padding=15,
border=ft.border.all(1, ft.colors.GREEN),
border_radius=8,
bgcolor=ft.colors.GREY_900,
)
]
# Add suggested files for issues
if item.item_type == "issue" and result.get('suggested_files'):
result_widgets.append(
ft.Container(
content=ft.Column([
ft.Text("Suggested Files to Modify:", weight=ft.FontWeight.BOLD, size=14),
ft.Column([
ft.Text(f"{file}", size=13, color=ft.colors.BLUE_300)
for file in result['suggested_files']
], spacing=5),
], spacing=8),
padding=15,
border=ft.border.all(1, ft.colors.BLUE),
border_radius=8,
bgcolor=ft.colors.GREY_900,
)
)
# Add "Create PR with AI Fix" button
result_widgets.append(
ft.ElevatedButton(
"Create PR with AI-Suggested Fix",
icon=ft.icons.AUTO_FIX_HIGH,
on_click=lambda _: self._create_pr_from_ai_fix(item, result),
)
)
self.ai_analysis_result_ref.current.controls = result_widgets
else:
# Show error
self.ai_analysis_result_ref.current.controls = [
ft.Container(
content=ft.Row([
ft.Icon(ft.icons.ERROR_OUTLINE, size=16, color=ft.colors.RED),
ft.Text(
result.get('error', 'Unknown error occurred'),
size=13,
color=ft.colors.RED,
),
], spacing=8),
padding=10,
border=ft.border.all(1, ft.colors.RED),
border_radius=4,
bgcolor=ft.colors.GREY_900,
)
]
self.page.update()
def _analyze_pr(self, item, repo_str, pr_files, comments, ai_provider, config):
"""Analyze a Pull Request using AI"""
try:
# Build context for AI
context = f"""Pull Request Analysis Request
Repository: {repo_str}
PR Number: #{item.number}
Title: {item.title}
State: {item.state}
Merged: {item.merged}
Description:
{item.body if item.body else 'No description provided'}
Modified Files ({len(pr_files)}):
"""
for file in pr_files:
context += f"\n- {file['filename']} ({file['status']}) [+{file['additions']} -{file['deletions']}]"
if comments:
context += f"\n\nComments ({len(comments)}):\n"
for comment in comments[:5]: # Limit to first 5 comments
context += f"\n@{comment['user']}: {comment['body'][:200]}...\n"
context += "\n\nPlease provide a comprehensive summary of this pull request, including:\n"
context += "1. What changes were made\n"
context += "2. The purpose and impact of these changes\n"
context += "3. Any notable patterns or concerns from the comments\n"
context += "4. Overall assessment of the PR"
# Call AI manager
summary = self.ai_manager.generate_response(context, ai_provider, config)
if self.logger:
self.logger.log(f"AI PR Analysis completed for PR #{item.number}")
return {
'success': True,
'summary': summary
}
except Exception as e:
if self.logger:
self.logger.log(f"Error in PR analysis: {e}")
return {
'success': False,
'error': str(e)
}
def _analyze_issue(self, item, repo_str, comments, ai_provider, config):
"""Analyze an Issue using AI to suggest fixes"""
try:
# Build context for AI
context = f"""GitHub Issue Analysis Request
Repository: {repo_str}
Issue Number: #{item.number}
Title: {item.title}
State: {item.state}
Description:
{item.body if item.body else 'No description provided'}
"""
if comments:
context += f"\n\nComments ({len(comments)}):\n"
for comment in comments[:5]: # Limit to first 5 comments
context += f"\n@{comment['user']}: {comment['body'][:200]}...\n"
context += "\n\nPlease analyze this issue and provide:\n"
context += "1. A summary of the issue\n"
context += "2. Suggested files or components that might be causing this issue\n"
context += "3. Recommended approach to fix the issue\n"
context += "4. Any additional considerations\n"
context += "\nFor the suggested files, please list them in a clear format like:\n"
context += "SUGGESTED_FILES: file1.py, file2.js, file3.tsx"
# Call AI manager
analysis = self.ai_manager.generate_response(context, ai_provider, config)
# Try to extract suggested files from the response
suggested_files = []
if "SUGGESTED_FILES:" in analysis:
files_line = analysis.split("SUGGESTED_FILES:")[1].split("\n")[0]
suggested_files = [f.strip() for f in files_line.split(",") if f.strip()]
if self.logger:
self.logger.log(f"AI Issue Analysis completed for Issue #{item.number}")
return {
'success': True,
'summary': analysis,
'suggested_files': suggested_files
}
except Exception as e:
if self.logger:
self.logger.log(f"Error in Issue analysis: {e}")
return {
'success': False,
'error': str(e)
}
def _create_pr_from_ai_fix(self, item, _analysis_result):
"""Create a PR with AI-suggested fix for an issue"""
# TODO: Implement PR creation with AI fix
# The analysis_result will contain suggested files and fix recommendations
self._show_snackbar("PR creation with AI fix - Coming soon!", error=False)
if self.logger:
self.logger.log(f"PR creation requested for Issue #{item.number}")
def _assign_to_self(self, item, repo_str):
"""Assign the current PR or Issue to the authenticated user"""
try:
# Get GitHub token
config = self.config_manager.get_config()
github_token = config.get('GITHUB_PAT', '')
if not github_token:
self._show_snackbar("GitHub token not configured", error=True)
return
# Parse repository
if '/' not in repo_str:
self._show_snackbar("Invalid repository format", error=True)
return
owner, repo = repo_str.split('/', 1)
# Get authenticated user
import requests
headers = {
"Authorization": f"Bearer {github_token}",
"Accept": "application/vnd.github+json",
"User-Agent": "github-pulse/1.0"
}
# First, get the authenticated user's username
user_response = requests.get("https://api.github.com/user", headers=headers, timeout=10)
user_response.raise_for_status()
username = user_response.json().get('login')
if not username:
self._show_snackbar("Could not get authenticated user", error=True)
return
# Assign to self using the GitHub API
# For both PRs and Issues, we use the issues endpoint
url = f"https://api.github.com/repos/{owner}/{repo}/issues/{item.number}/assignees"
# Add the authenticated user to assignees
payload = {
"assignees": [username]
}
response = requests.post(url, headers=headers, json=payload, timeout=10)
response.raise_for_status()
# Update the item in memory
if username not in item.assignees:
item.assignees.append(username)
# Refresh the display
self._display_workflow_item(item)
self._show_snackbar(f"Successfully assigned to @{username}", error=False)
if self.logger:
self.logger.log(f"Assigned {item.item_type} #{item.number} to @{username}")
except requests.exceptions.RequestException as e:
error_msg = f"Error assigning to self: {str(e)}"
self._show_snackbar(error_msg, error=True)
if self.logger:
self.logger.log(error_msg)
except Exception as e:
error_msg = f"Unexpected error: {str(e)}"
self._show_snackbar(error_msg, error=True)
if self.logger:
self.logger.log(error_msg)
def _populate_all_items(self, search_query: str = "", type_filter: str = "both", repo_filter: str = "both"):
"""Populate the all items list with all loaded PRs and Issues
+57
View File
@@ -594,3 +594,60 @@ class WorkflowManager:
import traceback
traceback.print_exc()
return []
def fetch_pr_files(self, repo_str: str, pr_number: int) -> List[Dict[str, Any]]:
"""
Fetch the list of files changed in a pull request
Args:
repo_str: Repository string in format "owner/repo"
pr_number: Pull request number
Returns:
List of file dictionaries with keys: 'filename', 'status', 'additions', 'deletions', 'changes', 'patch'
"""
try:
# Parse repository string
if '/' not in repo_str:
self.log(f"Invalid repository format: {repo_str}")
return []
owner, repo = repo_str.split('/', 1)
# Fetch PR files
url = f"https://api.github.com/repos/{owner}/{repo}/pulls/{pr_number}/files"
print(f"DEBUG: Fetching PR files from URL: {url}", flush=True)
response = requests.get(url, headers=self.headers)
response.raise_for_status()
files_data = response.json()
print(f"DEBUG: Found {len(files_data)} files in PR #{pr_number}", flush=True)
files = []
for file_data in files_data:
files.append({
'filename': file_data.get('filename', ''),
'status': file_data.get('status', ''), # added, removed, modified, renamed
'additions': file_data.get('additions', 0),
'deletions': file_data.get('deletions', 0),
'changes': file_data.get('changes', 0),
'patch': file_data.get('patch', ''), # The actual diff patch
'blob_url': file_data.get('blob_url', ''),
})
self.log(f"Fetched {len(files)} files for PR {repo_str} #{pr_number}")
return files
except requests.exceptions.RequestException as e:
self.log(f"Error fetching PR files for {repo_str} #{pr_number}: {e}")
print(f"DEBUG: RequestException occurred: {e}", flush=True)
import traceback
traceback.print_exc()
return []
except Exception as e:
self.log(f"Unexpected error fetching PR files: {e}")
print(f"DEBUG: Exception occurred: {e}", flush=True)
import traceback
traceback.print_exc()
return []