Fixed the .gitignore

This commit is contained in:
TySP-Dev
2025-11-13 22:04:39 -10:00
parent 895f89acfd
commit c633412903
25 changed files with 0 additions and 11815 deletions
-54
View File
@@ -1,54 +0,0 @@
"""
GitHub Pulse - Application Components
Modular components for the application
"""
import sys
import os
# Version info
__version__ = "0.0.1"
__author__ = "TySP-Dev"
__app_name__ = "GitHub Pulse"
# Determine if running in production build
IS_PRODUCTION = getattr(sys, 'frozen', False)
# Get the application directory
if IS_PRODUCTION:
# In production build, get the executable directory
APP_DIR = os.path.dirname(sys.executable)
else:
# In development, get the source directory
APP_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
# Export main classes for easier imports
from .config_manager import ConfigManager
from .ai_manager import AIManager
from .github_api import GitHubAPI
from .settings_dialog import SettingsDialog
from .main_gui import MainGUI
from .utils import Logger, PRNumberManager, ContentBuilders
from .workflow import WorkflowManager, WorkflowItem, GitHubRepoFetcher
from .ai_action_planner import AIActionPlanner, ActionPlan
__all__ = [
'ConfigManager',
'AIManager',
'GitHubAPI',
'SettingsDialog',
'MainGUI',
'Logger',
'PRNumberManager',
'ContentBuilders',
'WorkflowManager',
'WorkflowItem',
'GitHubRepoFetcher',
'AIActionPlanner',
'ActionPlan',
'__version__',
'__author__',
'__app_name__',
'IS_PRODUCTION',
'APP_DIR'
]
-617
View File
@@ -1,617 +0,0 @@
"""
AI Action Planner
Generates and executes action plans for GitHub issues and PRs using AI
"""
import json
import re
import requests
from typing import List, Dict, Any, Optional, Callable
from pathlib import Path
class ActionPlan:
"""Represents an AI-generated action plan"""
def __init__(self, title: str, steps: List[Dict[str, Any]], context: Dict[str, Any]):
self.title = title
self.steps = steps # List of {description, file_path, changes, completed}
self.context = context # PR/Issue context
self.completed_steps = []
self.failed_steps = []
def to_dict(self) -> Dict[str, Any]:
"""Convert plan to dictionary"""
return {
'title': self.title,
'steps': self.steps,
'context': self.context,
'completed_steps': self.completed_steps,
'failed_steps': self.failed_steps
}
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> 'ActionPlan':
"""Create plan from dictionary"""
plan = cls(data['title'], data['steps'], data['context'])
plan.completed_steps = data.get('completed_steps', [])
plan.failed_steps = data.get('failed_steps', [])
return plan
class OllamaProvider:
"""Simple Ollama API provider for AI action planning"""
def __init__(self, base_url: str, model: str, logger):
self.base_url = base_url.rstrip('/')
self.model = model
self.logger = logger
def generate(self, prompt: str) -> Optional[str]:
"""Generate a response from Ollama"""
try:
response = requests.post(
f"{self.base_url}/api/generate",
json={
"model": self.model,
"prompt": prompt,
"stream": False
},
timeout=120
)
response.raise_for_status()
result = response.json()
return result.get('response', '')
except Exception as e:
self.logger.log(f"❌ Ollama API error: {str(e)}")
return None
def make_change(self, file_content: str, old_text: str, new_text: str, file_path: str, custom_instructions: str = None) -> Optional[str]:
"""Make changes to file content using Ollama"""
# Try direct replacement first
if old_text and old_text.strip() in file_content:
return file_content.replace(old_text.strip(), new_text.strip())
# Use Ollama to make intelligent changes
prompt = f"""You are a code modification assistant. Modify the following file according to the instructions.
File: {file_path}
Current Content:
```
{file_content}
```
Instructions: {new_text}
{f'Additional context: {custom_instructions}' if custom_instructions else ''}
Return ONLY the complete modified file content. Do not include explanations or markdown code blocks."""
return self.generate(prompt)
class AIActionPlanner:
"""Generates and executes action plans using AI"""
def __init__(self, ai_manager, logger, config_manager):
self.ai_manager = ai_manager
self.logger = logger
self.config_manager = config_manager
def generate_plan(self, item, custom_instructions: str = "") -> Optional[ActionPlan]:
"""
Generate an action plan for a PR or Issue
Args:
item: The PR or Issue (WorkflowItem object or dict)
custom_instructions: Optional user-provided instructions
Returns:
ActionPlan object or None if generation failed
"""
# Handle both WorkflowItem objects and dictionaries
if hasattr(item, 'item_type'):
# It's a WorkflowItem object
item_type = item.item_type
item_number = item.number
title = item.title
body = item.body or ''
repo = getattr(item, 'repo', None)
else:
# It's a dictionary
item_type = item.get('type', 'unknown')
item_number = item.get('number')
title = item.get('title', 'Untitled')
body = item.get('body', '')
repo = item.get('repo')
self.logger.log(f"🤖 Generating action plan for {item_type} #{item_number}...")
# Get AI provider
config = self.config_manager.get_config()
ai_provider_name = config.get('AI_PROVIDER', 'none').lower()
if ai_provider_name == 'none' or not ai_provider_name:
self.logger.log("❌ No AI provider configured. Please configure in Settings.")
return None
# Get provider instance
provider = self._get_ai_provider(ai_provider_name, config)
if not provider:
return None
# Generate the plan using AI
try:
self.logger.log(f"📤 Calling AI provider: {type(provider).__name__}")
plan_text = self._call_ai_for_plan(provider, item_type, title, body, custom_instructions)
if not plan_text:
self.logger.log("❌ AI did not generate a plan (empty response)")
return None
self.logger.log(f"📥 Received response from AI ({len(plan_text)} characters)")
self.logger.log(f"📄 Response preview: {plan_text[:200]}...")
# Parse the plan
self.logger.log("🔍 Parsing AI response into steps...")
steps = self._parse_plan(plan_text)
if not steps:
self.logger.log("❌ Could not parse action steps from AI response")
return None
# Get repo from item or config
if repo is None:
repo = config.get('GITHUB_REPO', '')
plan = ActionPlan(
title=f"Action Plan for {item_type.upper()} #{item_number}: {title}",
steps=steps,
context={
'item_type': item_type,
'item_number': item_number,
'item_title': title,
'item_body': body,
'repo': repo
}
)
self.logger.log(f"✅ Generated plan with {len(steps)} steps")
return plan
except Exception as e:
self.logger.log(f"❌ Error generating plan: {str(e)}")
return None
def _get_ai_provider(self, provider_name: str, config: Dict[str, Any]):
"""Get the AI provider instance"""
try:
if provider_name in ['claude', 'anthropic']:
# Try both CLAUDE_API_KEY and ANTHROPIC_API_KEY for compatibility
api_key = config.get('CLAUDE_API_KEY')
if not api_key:
api_key = config.get('ANTHROPIC_API_KEY')
if not api_key:
self.logger.log("❌ Claude API key not found in secure storage (tried both CLAUDE_API_KEY and ANTHROPIC_API_KEY)")
return None
self.logger.log("️ Initializing Claude provider...")
from . import ai_manager
provider = ai_manager.ClaudeProvider(api_key, self.logger)
self.logger.log("✅ Claude provider initialized successfully")
return provider
elif provider_name in ['chatgpt', 'openai']:
api_key = config.get('OPENAI_API_KEY')
if not api_key:
self.logger.log("❌ OpenAI API key not found in secure storage")
return None
self.logger.log("️ Initializing ChatGPT provider...")
from . import ai_manager
provider = ai_manager.ChatGPTProvider(api_key, self.logger)
self.logger.log("✅ ChatGPT provider initialized successfully")
return provider
elif provider_name == 'ollama':
# Ollama doesn't need an API key, uses URL from config
ollama_url = config.get('OLLAMA_URL', 'http://localhost:11434')
ollama_model = config.get('OLLAMA_MODEL', 'llama2')
self.logger.log(f"️ Using Ollama at {ollama_url} with model {ollama_model}")
# Create a simple Ollama provider wrapper
return OllamaProvider(ollama_url, ollama_model, self.logger)
else:
self.logger.log(f"❌ Unsupported AI provider: {provider_name}")
return None
except Exception as e:
self.logger.log(f"❌ Error creating AI provider: {str(e)}")
return None
def _call_ai_for_plan(self, provider, item_type: str, title: str, body: str, custom_instructions: str) -> Optional[str]:
"""Call AI to generate an action plan"""
prompt = f"""You are an expert software engineer tasked with creating an actionable plan to address a GitHub {item_type}.
{item_type.upper()} Title: {title}
{item_type.upper()} Description:
{body}
{"Additional Instructions: " + custom_instructions if custom_instructions else ""}
Please create a detailed action plan with specific, executable steps. For each step, specify:
1. What needs to be done (clear description)
2. Which file(s) need to be modified (if applicable)
3. What changes should be made (if applicable)
Format your response as a JSON array of steps, where each step has:
- "description": A clear description of what to do
- "file_path": Path to the file to modify (or null if not file-specific)
- "changes": Description of changes to make (or null if not applicable)
- "action_type": One of ["modify_file", "create_file", "delete_file", "investigate", "test", "document"]
Example format:
```json
[
{{
"description": "Fix the authentication bug in login handler",
"file_path": "src/auth/login.py",
"changes": "Update the password validation logic to handle special characters correctly",
"action_type": "modify_file"
}},
{{
"description": "Add unit tests for authentication",
"file_path": "tests/test_auth.py",
"changes": "Add test cases for special characters in passwords",
"action_type": "create_file"
}}
]
```
IMPORTANT: Return ONLY the JSON array, no other text before or after."""
try:
if isinstance(provider, OllamaProvider):
# Use Ollama
self.logger.log(f"🤖 Calling Ollama AI to generate plan...")
return provider.generate(prompt)
elif hasattr(provider, '_generate_updated_document'):
# Use Claude's document generation
self.logger.log(f"🤖 Calling Claude AI to generate plan...")
import anthropic
client = anthropic.Anthropic(api_key=provider.api_key)
message = client.messages.create(
model="claude-sonnet-4-5",
max_tokens=4096,
messages=[{"role": "user", "content": prompt}]
)
return message.content[0].text
elif hasattr(provider, 'client'):
# Use OpenAI/ChatGPT
self.logger.log(f"🤖 Calling ChatGPT AI to generate plan...")
response = provider.client.chat.completions.create(
model="gpt-4",
messages=[{"role": "user", "content": prompt}],
max_tokens=4096
)
self.logger.log(f"✅ ChatGPT response received")
return response.choices[0].message.content
else:
self.logger.log(f"❌ Unknown provider type: {type(provider).__name__}")
return None
except Exception as e:
self.logger.log(f"❌ AI API call failed: {str(e)}")
import traceback
self.logger.log(f"❌ Traceback: {traceback.format_exc()}")
return None
def _parse_plan(self, plan_text: str) -> List[Dict[str, Any]]:
"""Parse the AI-generated plan text into structured steps"""
try:
# Extract JSON from response (might be wrapped in markdown)
json_match = re.search(r'```json\s*(\[.*?\])\s*```', plan_text, re.DOTALL)
if json_match:
json_text = json_match.group(1)
else:
# Try to find JSON array directly
json_match = re.search(r'\[.*\]', plan_text, re.DOTALL)
if json_match:
json_text = json_match.group(0)
else:
self.logger.log("⚠️ Could not find JSON in AI response")
return []
# Parse JSON
steps = json.loads(json_text)
# Validate and clean up steps
validated_steps = []
for i, step in enumerate(steps):
if isinstance(step, dict):
validated_step = {
'step_number': i + 1,
'description': step.get('description', f'Step {i+1}'),
'file_path': step.get('file_path'),
'changes': step.get('changes'),
'action_type': step.get('action_type', 'investigate'),
'completed': False,
'status': 'pending'
}
validated_steps.append(validated_step)
self.logger.log(f"✅ Successfully parsed {len(validated_steps)} steps from AI response")
return validated_steps
except json.JSONDecodeError as e:
self.logger.log(f"❌ Failed to parse JSON: {str(e)}")
self.logger.log(f"Response was: {plan_text[:500]}...")
return []
except Exception as e:
self.logger.log(f"❌ Error parsing plan: {str(e)}")
return []
def execute_plan(
self,
plan: ActionPlan,
local_repo_path: str,
progress_callback: Optional[Callable[[int, int, str], None]] = None,
log_callback: Optional[Callable[[str], None]] = None
) -> Dict[str, Any]:
"""
Execute an action plan
Args:
plan: The ActionPlan to execute
local_repo_path: Path to local git repository
progress_callback: Callback function(current_step, total_steps, message)
log_callback: Callback function for logging thought process
Returns:
Dictionary with execution results
"""
def log(message):
"""Helper to log to both logger and callback"""
self.logger.log(message)
if log_callback:
log_callback(message)
log(f"▶️ Starting execution of plan: {plan.title}")
if not local_repo_path or not Path(local_repo_path).exists():
log(f"❌ Local repository path not found: {local_repo_path}")
return {'success': False, 'error': 'Invalid local repository path'}
total_steps = len(plan.steps)
completed = 0
failed = 0
for i, step in enumerate(plan.steps):
step_num = step['step_number']
# Mark step as in-progress
step['status'] = 'in_progress'
if progress_callback:
progress_callback(i + 1, total_steps, f"Executing step {step_num}...")
log(f"\n📍 Step {step_num}/{total_steps}: {step['description']}")
try:
result = self._execute_step(step, local_repo_path, plan.context, log)
if result['success']:
step['completed'] = True
step['status'] = 'completed'
plan.completed_steps.append(step_num)
completed += 1
log(f"✅ Step {step_num} completed")
else:
step['status'] = 'failed'
step['error'] = result.get('error', 'Unknown error')
plan.failed_steps.append(step_num)
failed += 1
log(f"❌ Step {step_num} failed: {result.get('error')}")
except Exception as e:
step['status'] = 'failed'
step['error'] = str(e)
plan.failed_steps.append(step_num)
failed += 1
log(f"❌ Step {step_num} failed with exception: {str(e)}")
log(f"\n📊 Execution complete: {completed}/{total_steps} steps successful, {failed} failed")
# If we made changes successfully, commit and push them
if completed > 0:
try:
log("\n🔧 Committing and pushing changes...")
# Get PR/Issue info from context
item_type = plan.context.get('item_type', 'item')
item_number = plan.context.get('item_number', 'unknown')
item_title = plan.context.get('item_title', 'changes')
# Commit message
commit_msg = f"AI: Execute action plan for {item_type} #{item_number}\n\n{item_title}\n\nAutomated changes by GitHub Pulse AI"
# Get current branch (should be the PR branch)
import subprocess
result = subprocess.run(
['git', 'rev-parse', '--abbrev-ref', 'HEAD'],
cwd=local_repo_path,
capture_output=True,
text=True,
timeout=10
)
current_branch = result.stdout.strip() if result.returncode == 0 else 'main'
log(f"📍 Current branch: {current_branch}")
# Stage all changes
log("📝 Staging changes...")
subprocess.run(['git', 'add', '-A'], cwd=local_repo_path, check=True, timeout=10)
# Check if there are changes to commit
result = subprocess.run(
['git', 'diff', '--cached', '--quiet'],
cwd=local_repo_path,
timeout=10
)
if result.returncode != 0: # There are changes
# Commit
log("💾 Committing changes...")
subprocess.run(
['git', 'commit', '-m', commit_msg],
cwd=local_repo_path,
check=True,
timeout=10
)
# Push
log(f"🚀 Pushing to {current_branch}...")
subprocess.run(
['git', 'push', 'origin', current_branch],
cwd=local_repo_path,
check=True,
timeout=30
)
log(f"✅ Changes pushed to {current_branch}")
else:
log("️ No changes to commit")
except subprocess.TimeoutExpired:
log("⚠️ Git operation timed out")
except subprocess.CalledProcessError as e:
log(f"⚠️ Git operation failed: {e}")
except Exception as e:
log(f"⚠️ Error during git commit/push: {str(e)}")
return {
'success': failed == 0,
'completed': completed,
'failed': failed,
'total': total_steps,
'plan': plan
}
def _execute_step(self, step: Dict[str, Any], local_repo_path: str, context: Dict[str, Any], log=None) -> Dict[str, Any]:
"""Execute a single step of the plan"""
action_type = step.get('action_type', 'investigate')
file_path = step.get('file_path')
changes = step.get('changes')
# Use log function if provided, otherwise fall back to logger
log_func = log if log else self.logger.log
if action_type == 'modify_file' and file_path:
return self._modify_file(file_path, changes, local_repo_path, log_func)
elif action_type == 'create_file' and file_path:
return self._create_file(file_path, changes, local_repo_path, log_func)
elif action_type == 'delete_file' and file_path:
return self._delete_file(file_path, local_repo_path, log_func)
else:
# For investigate, test, document actions, just mark as completed
# (requires manual intervention)
log_func(f"️ Manual action required: {step['description']}")
return {'success': True, 'message': 'Manual action logged'}
def _modify_file(self, file_path: str, changes: str, local_repo_path: str, log=None) -> Dict[str, Any]:
"""Modify a file using AI"""
log_func = log if log else self.logger.log
full_path = Path(local_repo_path) / file_path
if not full_path.exists():
return {'success': False, 'error': f'File not found: {file_path}'}
try:
log_func(f"📝 Reading file: {file_path}")
# Read current content
with open(full_path, 'r', encoding='utf-8') as f:
current_content = f.read()
# Get AI provider to make changes
config = self.config_manager.get_config()
provider_name = config.get('AI_PROVIDER', 'none').lower()
provider = self._get_ai_provider(provider_name, config)
if not provider:
return {'success': False, 'error': 'AI provider not available'}
# Use AI to make the changes
log_func(f"🤖 Using AI to modify {file_path}...")
log_func(f"🔍 Analyzing changes needed...")
updated_content = provider.make_change(
file_content=current_content,
old_text=current_content[:200] + "...", # Context
new_text=changes, # What to change
file_path=str(full_path),
custom_instructions=changes
)
if updated_content and updated_content != current_content:
log_func(f"💾 Writing changes to {file_path}...")
# Write updated content
with open(full_path, 'w', encoding='utf-8') as f:
f.write(updated_content)
log_func(f"✅ Successfully modified {file_path}")
return {'success': True, 'file': file_path}
else:
return {'success': False, 'error': 'AI could not generate changes'}
except Exception as e:
return {'success': False, 'error': f'Error modifying file: {str(e)}'}
def _create_file(self, file_path: str, content: str, local_repo_path: str, log=None) -> Dict[str, Any]:
"""Create a new file"""
log_func = log if log else self.logger.log
full_path = Path(local_repo_path) / file_path
if full_path.exists():
return {'success': False, 'error': f'File already exists: {file_path}'}
try:
log_func(f"📄 Creating new file: {file_path}")
# Create parent directories if needed
full_path.parent.mkdir(parents=True, exist_ok=True)
# Create file with content
with open(full_path, 'w', encoding='utf-8') as f:
f.write(content or f"# TODO: Implement {file_path}\n")
log_func(f"✅ Created {file_path}")
return {'success': True, 'file': file_path}
except Exception as e:
return {'success': False, 'error': f'Error creating file: {str(e)}'}
def _delete_file(self, file_path: str, local_repo_path: str, log=None) -> Dict[str, Any]:
"""Delete a file"""
log_func = log if log else self.logger.log
full_path = Path(local_repo_path) / file_path
if not full_path.exists():
return {'success': False, 'error': f'File not found: {file_path}'}
try:
log_func(f"🗑️ Deleting file: {file_path}")
full_path.unlink()
log_func(f"✅ Deleted {file_path}")
return {'success': True, 'file': file_path}
except Exception as e:
return {'success': False, 'error': f'Error deleting file: {str(e)}'}
File diff suppressed because it is too large Load Diff
Binary file not shown.

Before

Width:  |  Height:  |  Size: 128 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 91 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 25 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 25 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 72 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 23 KiB

-185
View File
@@ -1,185 +0,0 @@
"""
Cache Manager for GitHub PRs and Issues
Stores fetched items in temporary cache to avoid reloading on every app start
"""
import json
import os
import tempfile
import time
from pathlib import Path
from typing import List, Dict, Any, Optional
from hashlib import md5
class CacheManager:
"""Manages caching of GitHub PRs and Issues"""
def __init__(self, cache_duration_hours: int = 24):
"""
Initialize cache manager
Args:
cache_duration_hours: How long cache is valid (default 24 hours)
"""
self.cache_duration_seconds = cache_duration_hours * 3600
self.cache_dir = Path(tempfile.gettempdir()) / "github_pulse_cache"
self.cache_dir.mkdir(exist_ok=True)
def _get_cache_key(self, source_type: str, identifier: str) -> str:
"""Generate cache key from source type and identifier"""
# Use MD5 hash to create safe filename
key_str = f"{source_type}_{identifier}"
return md5(key_str.encode()).hexdigest()
def _get_cache_path(self, cache_key: str) -> Path:
"""Get full path to cache file"""
return self.cache_dir / f"{cache_key}.json"
def is_cache_valid(self, source_type: str, identifier: str) -> bool:
"""Check if cache exists and is still valid"""
cache_key = self._get_cache_key(source_type, identifier)
cache_path = self._get_cache_path(cache_key)
if not cache_path.exists():
return False
# Check if cache has expired
file_age = time.time() - cache_path.stat().st_mtime
return file_age < self.cache_duration_seconds
def load_from_cache(self, source_type: str, identifier: str) -> Optional[List[Dict[str, Any]]]:
"""
Load GitHub items from cache
Args:
source_type: 'github_prs', 'github_issues', 'target_prs', 'fork_prs', etc.
identifier: repository identifier or config hash
Returns:
List of items if cache is valid, None otherwise
"""
if not self.is_cache_valid(source_type, identifier):
return None
cache_key = self._get_cache_key(source_type, identifier)
cache_path = self._get_cache_path(cache_key)
try:
with open(cache_path, 'r', encoding='utf-8') as f:
cache_data = json.load(f)
# Validate cache structure
if 'timestamp' not in cache_data or 'items' not in cache_data:
return None
return cache_data['items']
except Exception as e:
print(f"Error loading cache: {e}")
return None
def save_to_cache(self, source_type: str, identifier: str, items: List[Dict[str, Any]]) -> bool:
"""
Save GitHub items to cache
Args:
source_type: 'github_prs', 'github_issues', 'target_prs', 'fork_prs', etc.
identifier: repository identifier or config hash
items: List of items to cache (PRs or Issues)
Returns:
True if successful, False otherwise
"""
cache_key = self._get_cache_key(source_type, identifier)
cache_path = self._get_cache_path(cache_key)
try:
cache_data = {
'timestamp': time.time(),
'source_type': source_type,
'identifier': identifier,
'items': items
}
with open(cache_path, 'w', encoding='utf-8') as f:
json.dump(cache_data, f, indent=2, ensure_ascii=False)
return True
except Exception as e:
print(f"Error saving cache: {e}")
return False
def invalidate_cache(self, source_type: str = None, identifier: str = None):
"""
Invalidate (delete) cache
Args:
source_type: If specified, only invalidate this source type
identifier: If specified, only invalidate this specific cache
"""
if source_type and identifier:
# Invalidate specific cache
cache_key = self._get_cache_key(source_type, identifier)
cache_path = self._get_cache_path(cache_key)
if cache_path.exists():
cache_path.unlink()
elif source_type:
# Invalidate all caches for this source type
for cache_file in self.cache_dir.glob("*.json"):
try:
with open(cache_file, 'r', encoding='utf-8') as f:
cache_data = json.load(f)
if cache_data.get('source_type') == source_type:
cache_file.unlink()
except:
pass
else:
# Invalidate all caches
for cache_file in self.cache_dir.glob("*.json"):
cache_file.unlink()
def get_cache_info(self) -> Dict[str, Any]:
"""Get information about cached items"""
cache_files = list(self.cache_dir.glob("*.json"))
info = {
'cache_dir': str(self.cache_dir),
'total_files': len(cache_files),
'total_size_bytes': sum(f.stat().st_size for f in cache_files),
'caches': []
}
for cache_file in cache_files:
try:
with open(cache_file, 'r', encoding='utf-8') as f:
cache_data = json.load(f)
file_age = time.time() - cache_file.stat().st_mtime
is_valid = file_age < self.cache_duration_seconds
info['caches'].append({
'source_type': cache_data.get('source_type', 'unknown'),
'item_count': len(cache_data.get('items', [])),
'age_hours': round(file_age / 3600, 1),
'is_valid': is_valid,
'size_kb': round(cache_file.stat().st_size / 1024, 1)
})
except:
pass
return info
def cleanup_expired(self):
"""Remove expired cache files"""
current_time = time.time()
removed_count = 0
for cache_file in self.cache_dir.glob("*.json"):
file_age = current_time - cache_file.stat().st_mtime
if file_age >= self.cache_duration_seconds:
cache_file.unlink()
removed_count += 1
return removed_count
-246
View File
@@ -1,246 +0,0 @@
"""
Configuration Manager
Wrapper around SettingsManager for backward compatibility.
Now uses config.json + keyring instead of .env files.
"""
import os
import json
from typing import Dict, Any, Optional
from pathlib import Path
from .settings_manager import SettingsManager
class ConfigManager:
"""
Manages application configuration using the new SettingsManager.
Provides backward compatibility with old .env-based code while
using the modern config.json + keyring system underneath.
"""
def __init__(self):
"""Initialize with SettingsManager backend"""
# Initialize the modern settings system
self._settings = SettingsManager()
# Check if .env exists and offer migration
env_path = Path('.env')
if env_path.exists() and not Path('application/config.json').exists():
print("\n" + "="*60)
print("NOTICE: Legacy .env file detected!")
print("="*60)
print("Your app now uses a modern settings system with:")
print(" ✓ Secure API key storage (Windows Credential Manager)")
print(" ✓ Live settings updates (no restart needed)")
print(" ✓ Better configuration management")
print()
print("Migrating settings from .env to new system...")
print()
if self._settings.migrate_from_env(env_path):
print("✓ Migration successful!")
print(f" - Secrets → System keyring")
print(f" - Settings → {self._settings.config_file}")
print()
print("Your .env file is kept as backup.")
print("You can delete it once you verify everything works.")
else:
print("✗ Migration failed. Using .env as fallback.")
print("="*60 + "\n")
# Load configuration
self.config = self._settings.get_all()
# Auto-default GITHUB_TOKEN to GITHUB_PAT if needed
self._apply_token_defaults()
# Show configuration status
self._print_config_status()
def _apply_token_defaults(self):
"""Auto-default GITHUB_TOKEN to GITHUB_PAT if GITHUB_TOKEN is empty"""
github_token = self.config.get('GITHUB_TOKEN', '').strip() if self.config.get('GITHUB_TOKEN') else ''
github_pat = self.config.get('GITHUB_PAT', '').strip() if self.config.get('GITHUB_PAT') else ''
if not github_token and github_pat:
self.config['GITHUB_TOKEN'] = github_pat
self._settings.set('GITHUB_TOKEN', github_pat, save=False)
def _print_config_status(self):
"""Print configuration load status"""
loaded_keys = []
for key, value in self.config.items():
if value and str(value).strip():
# Don't show actual secret values
if key in SettingsManager.SECRET_KEYS:
loaded_keys.append(f"{key}: loaded")
else:
loaded_keys.append(f"{key}: loaded")
if loaded_keys:
print(f"Configuration status: {', '.join(loaded_keys)}")
else:
print("No configuration values loaded - using defaults")
def load_configuration(self) -> Dict[str, Any]:
"""
Load configuration from new system (config.json + keyring).
Returns:
Dictionary of all settings
"""
self.config = self._settings.load()
self._apply_token_defaults()
return self.config
def save_configuration(self, config_values: Dict[str, Any]) -> bool:
"""
Save configuration using new system.
No restart required - changes apply immediately!
Args:
config_values: Settings to save
Returns:
True if successful
"""
# Save using new system
success = self._settings.save(config_values)
if success:
# Reload to get updated values
self.config = self._settings.get_all()
self._apply_token_defaults()
print(f"Configuration saved to {self._settings.config_file}")
print("Settings updated (no restart needed!)")
else:
print("Failed to save configuration")
return success
def get_config(self) -> Dict[str, Any]:
"""
Get current configuration with automatic GITHUB_TOKEN defaulting.
Returns:
Dictionary of all settings
"""
config = self.config.copy()
# Auto-default GITHUB_TOKEN to GITHUB_PAT if needed
github_token = config.get('GITHUB_TOKEN', '').strip() if config.get('GITHUB_TOKEN') else ''
github_pat = config.get('GITHUB_PAT', '').strip() if config.get('GITHUB_PAT') else ''
if not github_token and github_pat:
config['GITHUB_TOKEN'] = github_pat
return config
def get_value(self, key: str, default: Any = None) -> Any:
"""
Get a specific configuration value.
Args:
key: Setting key
default: Default value if not found
Returns:
Setting value or default
"""
return self._settings.get(key, default)
def get(self, key: str, default: Any = None) -> Any:
"""
Get a specific configuration value (dictionary-like interface).
Args:
key: Setting key
default: Default value if not found
Returns:
Setting value or default
"""
return self._settings.get(key, default)
def set_value(self, key: str, value: Any) -> None:
"""
Set a specific configuration value.
Args:
key: Setting key
value: New value
"""
self._settings.set(key, value)
self.config[key] = value
def register_listener(self, callback):
"""
Register a callback for settings changes (live updates).
The callback will be called with (key, new_value) when a setting changes.
Args:
callback: Function to call on settings change
Example:
def on_settings_changed(key, value):
if key == 'THEME_MODE':
# Update theme immediately
page.theme_mode = ft.ThemeMode.DARK if value == 'dark' else ft.ThemeMode.LIGHT
page.update()
config_manager.register_listener(on_settings_changed)
"""
self._settings.register_listener(callback)
def unregister_listener(self, callback):
"""
Unregister a settings change callback.
Args:
callback: Function to remove from listeners
"""
self._settings.unregister_listener(callback)
# Legacy methods for PR counter (unchanged)
def get_pr_counter_file(self) -> str:
"""Get the path to the PR counter file"""
script_dir = os.path.dirname(os.path.abspath(__file__))
return os.path.join(script_dir, '..', '.pr_counter.json')
def load_pr_counter(self) -> Dict[str, int]:
"""Load the PR counter from file"""
counter_file = self.get_pr_counter_file()
if os.path.exists(counter_file):
try:
with open(counter_file, 'r', encoding='utf-8') as f:
return json.load(f)
except (json.JSONDecodeError, FileNotFoundError):
pass
return {'count': 0}
def save_pr_counter(self, counter: Dict[str, int]) -> bool:
"""Save the PR counter to file"""
counter_file = self.get_pr_counter_file()
try:
with open(counter_file, 'w', encoding='utf-8') as f:
json.dump(counter, f, indent=2)
return True
except Exception as e:
print(f"Error saving PR counter: {e}")
return False
def increment_pr_counter(self) -> int:
"""Increment and return the PR counter"""
counter = self.load_pr_counter()
counter['count'] = counter.get('count', 0) + 1
self.save_pr_counter(counter)
return counter['count']
def get_pr_counter(self) -> int:
"""Get the current PR counter value"""
counter = self.load_pr_counter()
return counter.get('count', 0)
-985
View File
@@ -1,985 +0,0 @@
"""
GitHub API Manager
Handles GitHub GraphQL operations, PR/Issue creation, and Copilot interactions
"""
import base64
import difflib
import json
import requests
from typing import Optional, Tuple, Dict, Any, List
from urllib.parse import urlparse
# Constants
GITHUB_GRAPHQL_ENDPOINT = "https://api.github.com/graphql"
USER_AGENT = "github-automation-tool/1.0"
class GitHubGQL:
"""GitHub GraphQL API client for creating issues, PRs, and managing assignments"""
def __init__(self, token: str, logger=None, dry_run: bool = False):
self.token = token
self.logger = logger
self.dry_run = dry_run
def log(self, message: str) -> None:
"""Log a message"""
if self.logger:
self.logger.log(message)
else:
print(message)
def _headers(self):
"""Get headers for GitHub API requests"""
return {
"Authorization": f"Bearer {self.token}",
"User-Agent": USER_AGENT,
"Content-Type": "application/json"
}
def run(self, query: str, variables: dict | None = None) -> dict:
"""Execute a GraphQL query"""
payload = {"query": query, "variables": variables or {}}
if self.dry_run:
self.log("[DRY-RUN] Would POST GraphQL payload:")
pretty = json.dumps(payload, indent=2)
self.log(pretty)
return {"dryRun": True, "data": None}
try:
resp = requests.post(GITHUB_GRAPHQL_ENDPOINT, headers=self._headers(), json=payload, timeout=60)
if resp.status_code != 200:
raise RuntimeError(f"GraphQL HTTP {resp.status_code}: {resp.text}")
data = resp.json()
if "errors" in data and data["errors"]:
raise RuntimeError(f"GraphQL errors: {json.dumps(data['errors'], indent=2)}")
return data
except requests.RequestException as e:
raise RuntimeError(f"Request failed: {str(e)}")
def _make_rest_request(self, method: str, url: str, data: Dict[str, Any] = None) -> Dict[str, Any]:
"""Make a REST API request to GitHub"""
headers = {
"Authorization": f"Bearer {self.token}",
"Accept": "application/vnd.github+json",
"User-Agent": USER_AGENT
}
if self.dry_run:
self.log(f"[DRY-RUN] Would make {method} request to: {url}")
return {"number": 123, "html_url": "https://github.com/example/repo/pull/123"}
response = requests.request(method, url, headers=headers, json=data, timeout=30)
response.raise_for_status()
return response.json()
def get_repo_id(self, owner: str, name: str) -> str:
"""Get GitHub repository ID"""
self.log(f"Fetching repositoryId for {owner}/{name}...")
query = """
query($owner:String!, $name:String!) {
repository(owner:$owner, name:$name) {
id
url
}
}
"""
data = self.run(query, {"owner": owner, "name": name})
if data.get("dryRun"):
return "DRY_RUN_REPO_ID"
repo = data["data"]["repository"]
if not repo:
raise RuntimeError(f"Repository {owner}/{name} not found or token lacks access.")
self.log(f"Repository ID: {repo['id']} ({repo['url']})")
return repo["id"]
def get_copilot_actor_id(self, owner: str, name: str) -> tuple[str | None, str | None]:
"""Find Copilot actor ID for assignment"""
self.log("Querying suggestedActors for CAN_BE_ASSIGNED...")
query = """
query($owner:String!, $name:String!) {
repository(owner:$owner, name:$name) {
suggestedActors(capabilities:[CAN_BE_ASSIGNED], first:100) {
nodes {
login
__typename
... on Bot { id }
... on User { id }
}
}
}
}
"""
data = self.run(query, {"owner": owner, "name": name})
if data.get("dryRun"):
return ("DRY_RUN_ACTOR_ID", "copilot-swe-agent")
nodes = data["data"]["repository"]["suggestedActors"]["nodes"]
if not nodes:
self.log("No suggestedActors returned.")
return (None, None)
# Log all available actors for debugging
self.log(f"Available assignable actors ({len(nodes)}):")
for node in nodes:
self.log(f" - {node.get('login', 'N/A')} ({node.get('__typename', 'N/A')}) ID: {node.get('id', 'N/A')}")
# Prefer known Copilot logins
preferred = ("copilot-swe-agent", "copilot", "github-copilot", "github-advanced-security")
chosen = None
# First, try exact matches
for candidate in nodes:
login = candidate.get("login", "").lower()
if login in preferred:
chosen = candidate
break
# If no exact match, try partial matches
if not chosen:
for candidate in nodes:
login = candidate.get("login", "").lower()
if "copilot" in login:
chosen = candidate
break
if not chosen:
self.log("Copilot not found in suggestedActors list.")
self.log("Available actors: " + ", ".join([n.get("login", "N/A") for n in nodes]))
return (None, None)
login = chosen["login"]
actor_id = chosen.get("id")
if not actor_id:
self.log(f"Warning: No actor ID found for {login}")
return (None, None)
self.log(f"Found assignable Copilot actor: {login} (id={actor_id})")
return (actor_id, login)
def create_issue(self, repository_id: str, title: str, body: str) -> tuple[str, str, int]:
"""Create a GitHub issue"""
self.log("Creating issue with createIssue mutation...")
mutation = """
mutation($repositoryId:ID!, $title:String!, $body:String!) {
createIssue(input:{repositoryId:$repositoryId, title:$title, body:$body}) {
issue {
id
url
number
title
}
}
}
"""
data = self.run(mutation, {"repositoryId": repository_id, "title": title, "body": body})
if data.get("dryRun"):
return ("DRY_RUN_ISSUE_ID", "https://github.com/owner/repo/issues/123", 123)
issue = data["data"]["createIssue"]["issue"]
self.log(f"Issue created: {issue['url']} (#{issue['number']})")
return (issue["id"], issue["url"], issue["number"])
def create_branch_from_main(self, owner: str, repo: str, branch_name: str) -> bool:
"""Create a new branch from the main branch"""
self.log(f"Creating branch '{branch_name}' in {owner}/{repo}")
try:
# Get the SHA of the main branch
main_ref_url = f"https://api.github.com/repos/{owner}/{repo}/git/ref/heads/main"
main_ref_response = self._make_rest_request("GET", main_ref_url)
main_sha = main_ref_response["object"]["sha"]
self.log(f"Main branch SHA: {main_sha}")
# Create new branch
new_ref_url = f"https://api.github.com/repos/{owner}/{repo}/git/refs"
new_ref_data = {
"ref": f"refs/heads/{branch_name}",
"sha": main_sha
}
if self.dry_run:
self.log(f"🧪 DRY RUN: Would create branch '{branch_name}' from main ({main_sha})")
return True
self._make_rest_request("POST", new_ref_url, new_ref_data)
self.log(f"✅ Branch '{branch_name}' created successfully")
return True
except Exception as e:
self.log(f"❌ Failed to create branch: {str(e)}")
return False
def get_user_forks(self, include_org_repos: bool = True) -> List[str]:
"""Get list of user's forked repositories"""
self.log("Fetching user's forked repositories...")
if self.dry_run:
# Return sample data for dry run
return [
"username/repo_name",
]
try:
forks = []
page = 1
per_page = 100
while page <= 5: # Limit to 5 pages to avoid long waits
url = f"https://api.github.com/user/repos?type=forks&per_page={per_page}&page={page}"
response = self._make_rest_request("GET", url)
repos = response if isinstance(response, list) else response.get('data', [])
if not repos:
break
for repo in repos:
if repo.get('fork', False):
forks.append(f"{repo['owner']['login']}/{repo['name']}")
if len(repos) < per_page:
break
page += 1
self.log(f"Found {len(forks)} forked repositories")
return forks
except Exception as e:
self.log(f"❌ Failed to fetch user forks: {str(e)}")
return []
def get_authenticated_user(self) -> Dict[str, Any]:
"""Get authenticated user information"""
if self.dry_run:
return {"login": "dry-run-user", "name": "Dry Run User"}
try:
return self._make_rest_request("GET", "https://api.github.com/user")
except Exception as e:
self.log(f"❌ Failed to get user info: {str(e)}")
return {}
def fork_repository(self, owner: str, repo: str, target_org: str = None) -> tuple[str, str]:
"""Fork a repository to the authenticated user's account or specified organization"""
self.log(f"Forking repository {owner}/{repo}")
fork_url = f"https://api.github.com/repos/{owner}/{repo}/forks"
fork_data = {}
if target_org:
fork_data["organization"] = target_org
if self.dry_run:
# Get authenticated user for dry run
user_url = "https://api.github.com/user"
try:
user_data = self._make_rest_request("GET", user_url)
fork_owner = target_org if target_org else user_data["login"]
self.log(f"🧪 DRY RUN: Would fork {owner}/{repo} to {fork_owner}/{repo}")
return fork_owner, repo
except:
self.log(f"🧪 DRY RUN: Would fork {owner}/{repo}")
return "dry-run-user", repo
try:
fork_response = self._make_rest_request("POST", fork_url, fork_data)
fork_owner = fork_response["owner"]["login"]
fork_name = fork_response["name"]
self.log(f"✅ Repository forked to {fork_owner}/{fork_name}")
return fork_owner, fork_name
except Exception as e:
self.log(f"❌ Failed to fork repository: {str(e)}")
raise
def check_repository_exists(self, owner: str, repo: str) -> bool:
"""Check if a repository exists and is accessible"""
try:
url = f"https://api.github.com/repos/{owner}/{repo}"
response = self._make_rest_request("GET", url)
return bool(response.get('id'))
except:
return False
def find_matching_repositories(self, target_repo: str, fork_repo: str) -> Dict[str, List[str]]:
"""Find matching repositories to suggest alternatives for mismatched repos"""
self.log(f"Finding matching repositories for target: {target_repo}, fork: {fork_repo}")
if self.dry_run:
return {
"target_alternatives": ["username/target_repo_name"],
"fork_alternatives": ["username/fork_repo_name"]
}
try:
target_owner, target_name = target_repo.split('/', 1) if '/' in target_repo else ("", target_repo)
fork_owner, fork_name = fork_repo.split('/', 1) if '/' in fork_repo else ("", fork_repo)
target_alternatives = []
fork_alternatives = []
# Get authenticated user info
user_info = self.get_authenticated_user()
user_login = user_info.get('login', '')
# Search for repositories with similar names
search_terms = [target_name, fork_name]
for term in search_terms:
if term:
# Clean up the search term (remove common suffixes)
clean_term = term.replace('-docs', '').replace('-pr', '').replace('_', ' ')
# Search for repositories
search_url = f"https://api.github.com/search/repositories?q={clean_term}&per_page=20"
try:
search_response = self._make_rest_request("GET", search_url)
repositories = search_response.get('items', [])
for repo_data in repositories:
repo_full_name = repo_data['full_name']
repo_owner = repo_data['owner']['login']
# Check if this is a potential target alternative
if (repo_owner == target_owner and
repo_data['name'] != target_name and
repo_full_name not in target_alternatives):
target_alternatives.append(repo_full_name)
# Check if this is a potential fork alternative
if (repo_owner == user_login and
repo_data['name'] != fork_name and
repo_data.get('fork', False) and
repo_full_name not in fork_alternatives):
fork_alternatives.append(repo_full_name)
except Exception as e:
self.log(f"❌ Search failed for term '{term}': {str(e)}")
return {
"target_alternatives": target_alternatives[:5], # Limit to 5 suggestions
"fork_alternatives": fork_alternatives[:5]
}
except Exception as e:
self.log(f"❌ Failed to find matching repositories: {str(e)}")
return {"target_alternatives": [], "fork_alternatives": []}
def make_documentation_change(self, owner: str, repo: str, branch_name: str, file_path: str,
old_text: str, new_text: str, commit_message: str) -> bool:
"""Make actual documentation changes to a file in the repository
This fetches the file, makes the text replacement, and commits it to the branch.
Returns True if successful, False otherwise.
"""
if self.dry_run:
self.log(f"[DRY-RUN] Would update {file_path} in branch {branch_name}")
self.log(f"[DRY-RUN] Replace: {old_text[:50]}...")
self.log(f"[DRY-RUN] With: {new_text[:50]}...")
return True
try:
rest_headers = {
"Authorization": f"Bearer {self.token}",
"Accept": "application/vnd.github+json",
"User-Agent": USER_AGENT
}
# 1. Get the current file content from the branch
self.log(f"Fetching file: {file_path}")
file_url = f"https://api.github.com/repos/{owner}/{repo}/contents/{file_path}?ref={branch_name}"
resp = requests.get(file_url, headers=rest_headers, timeout=30)
if resp.status_code == 404:
self.log(f"❌ File not found: {file_path}")
self.log(f" The file path might be incorrect or the file doesn't exist")
return False
resp.raise_for_status()
file_data = resp.json()
# Decode the file content
current_content = base64.b64decode(file_data["content"]).decode('utf-8')
file_sha = file_data["sha"]
self.log(f"✅ File retrieved ({len(current_content)} bytes)")
# Detect line ending style to preserve it
line_ending = '\r\n' if '\r\n' in current_content else '\n'
self.log(f"📝 Detected line endings: {'CRLF' if line_ending == '\\r\\n' else 'LF'}")
# Normalize everything to LF for consistent processing
normalized_content = current_content.replace('\r\n', '\n')
normalized_old = old_text.replace('\r\n', '\n')
normalized_new = new_text.replace('\r\n', '\n')
# 2. Make the text replacement
if normalized_old not in normalized_content:
self.log(f"⚠️ Warning: Could not find exact text to replace in {file_path}")
self.log(f" Searching for similar text...")
# Try to find similar text (case-insensitive, whitespace-flexible)
lines = normalized_content.split('\n')
old_lines = normalized_old.split('\n')
# Find the best matching sequence
matcher = difflib.SequenceMatcher(None, old_lines, lines)
match = matcher.find_longest_match(0, len(old_lines), 0, len(lines))
if match.size > len(old_lines) * 0.7: # If we find 70% match
self.log(f" Found similar text at line {match.b + 1}")
self.log(f" Making best-effort replacement...")
# This is a simplified approach - in production you'd want more sophisticated matching
else:
self.log(f"❌ Could not find text to replace. The document may have changed.")
self.log(f" Creating PR with instructions instead...")
return False
# Replace the text (using normalized versions)
updated_content = normalized_content.replace(normalized_old, normalized_new)
if updated_content == normalized_content:
self.log(f"⚠️ No changes made - text might not exist in file")
return False
self.log(f"✅ Text replacement successful")
# Restore original line endings
if line_ending == '\r\n':
updated_content = updated_content.replace('\n', '\r\n')
self.log(f"✅ Restored CRLF line endings")
# 3. Commit the updated file
self.log(f"Committing changes to {file_path}...")
encoded_content = base64.b64encode(updated_content.encode('utf-8')).decode()
update_payload = {
"message": commit_message,
"content": encoded_content,
"sha": file_sha,
"branch": branch_name
}
update_url = f"https://api.github.com/repos/{owner}/{repo}/contents/{file_path}"
resp = requests.put(update_url, headers=rest_headers, json=update_payload, timeout=30)
resp.raise_for_status()
self.log(f"✅ Changes committed to branch {branch_name}")
return True
except requests.HTTPError as e:
self.log(f"❌ HTTP Error making changes: {e}")
if e.response.status_code == 403:
self.log(" Permission denied - token doesn't have write access")
elif e.response.status_code == 404:
self.log(f" File not found: {file_path}")
return False
except Exception as e:
self.log(f"❌ Error making changes: {str(e)}")
return False
def create_cross_repo_pull_request(self, source_owner: str, source_repo: str, target_owner: str, target_repo: str,
title: str, body: str, head_ref: str, base_ref: str = "main") -> tuple[str, str, int]:
"""Create a pull request from source repo to target repo"""
self.log(f"Creating cross-repository PR from {source_owner}/{source_repo}:{head_ref} to {target_owner}/{target_repo}:{base_ref}")
# Get target repository ID
target_repo_id = self.get_repo_id(target_owner, target_repo)
# Format the head reference for cross-repo PR
head_ref_full = f"{source_owner}:{head_ref}"
mutation = """
mutation($repositoryId:ID!, $title:String!, $body:String!, $headRefName:String!, $baseRefName:String!) {
createPullRequest(input:{
repositoryId:$repositoryId,
title:$title,
body:$body,
headRefName:$headRefName,
baseRefName:$baseRefName
}) {
pullRequest {
id
url
number
}
}
}
"""
variables = {
"repositoryId": target_repo_id,
"title": title,
"body": body,
"headRefName": head_ref_full,
"baseRefName": base_ref
}
if self.dry_run:
self.log(f"🧪 DRY RUN: Would create cross-repo PR '{title}' from {head_ref_full} to {base_ref}")
return "dry-run-pr-id", f"https://github.com/{target_owner}/{target_repo}/pull/0", 0
try:
data = self.run(mutation, variables)
pr_data = data["data"]["createPullRequest"]["pullRequest"]
pr_id = pr_data["id"]
pr_url = pr_data["url"]
pr_number = pr_data["number"]
self.log(f"✅ Cross-repo pull request created: {pr_url}")
return pr_id, pr_url, pr_number
except Exception as e:
self.log(f"❌ Failed to create cross-repo pull request: {str(e)}")
raise
def create_pull_request(self, repository_id: str, title: str, body: str, head_ref: str, base_ref: str = "main") -> tuple[str, str, int]:
"""Create a pull request"""
self.log(f"Creating pull request with createPullRequest mutation from {head_ref} to {base_ref}...")
mutation = """
mutation($repositoryId:ID!, $title:String!, $body:String!, $headRefName:String!, $baseRefName:String!) {
createPullRequest(input:{
repositoryId:$repositoryId,
title:$title,
body:$body,
headRefName:$headRefName,
baseRefName:$baseRefName
}) {
pullRequest {
id
url
number
title
}
}
}
"""
variables = {
"repositoryId": repository_id,
"title": title,
"body": body,
"headRefName": head_ref,
"baseRefName": base_ref
}
data = self.run(mutation, variables)
if data.get("dryRun"):
return ("DRY_RUN_PR_ID", "https://github.com/owner/repo/pull/456", 456)
pr = data["data"]["createPullRequest"]["pullRequest"]
self.log(f"Pull request created: {pr['url']} (#{pr['number']})")
return (pr["id"], pr["url"], pr["number"])
def assign_to_copilot(self, assignable_id: str, actor_ids: list[str]) -> bool:
"""Assign issue to Copilot
Returns True if successful, False otherwise.
"""
self.log("Assigning with replaceActorsForAssignable mutation...")
mutation = """
mutation($assignableId:ID!, $actorIds:[ID!]!) {
replaceActorsForAssignable(input:{assignableId:$assignableId, actorIds:$actorIds}) {
assignable {
... on Issue {
id
title
assignees(first:10) { nodes { login } }
url
}
... on PullRequest {
id
title
assignees(first:10) { nodes { login } }
url
}
}
}
}
"""
try:
data = self.run(mutation, {"assignableId": assignable_id, "actorIds": actor_ids})
if data.get("dryRun"):
self.log("[DRY-RUN] Would have assigned Copilot.")
return True
assigned = data["data"]["replaceActorsForAssignable"]["assignable"]["assignees"]["nodes"]
assignees = ", ".join([n["login"] for n in assigned]) or "(none)"
self.log(f"Current assignees: {assignees}")
return True
except Exception as e:
error_message = str(e)
self.log(f"Error assigning Copilot: {error_message}")
# Provide specific guidance for common permission issues
if "FORBIDDEN" in error_message and "ReplaceActorsForAssignable" in error_message:
self.log("")
self.log("📋 Permission Issue: Cannot assign GitHub Copilot")
self.log(" This is a repository permission limitation, not an application error.")
self.log("")
self.log(" Possible solutions:")
self.log(" 1. Repository admin can assign Copilot manually to the PR")
self.log(" 2. Repository admin can grant assignment permissions")
self.log(" 3. The @copilot comment will still notify Copilot to work on the PR")
self.log("")
self.log(" ✅ The PR was created successfully with @copilot instructions")
self.log(" ✅ Copilot can still see and act on the @copilot comment")
elif "NOT_FOUND" in error_message:
self.log("")
self.log("📋 Copilot Actor Not Found")
self.log(" This repository may not have GitHub Copilot enabled or available.")
self.log(" The @copilot comment was still added to notify available Copilot services.")
return False
def add_copilot_comment(self, owner: str, repo: str, pr_number: int,
file_path: str, old_text: str, new_text: str, branch_name: str,
work_item_id: str = None, item_source: str = None, doc_url: str = None,
custom_instructions: str = None) -> bool:
"""Add a comment mentioning @copilot with explicit instructions to work on THIS PR
This tells Copilot to make changes in the current PR's branch, not create a new PR.
Args:
owner: Repository owner
repo: Repository name
pr_number: Pull request number
file_path: Path to the file to modify
old_text: Text to find and replace
new_text: New text to replace with
branch_name: Branch name for this PR
work_item_id: Reference ID for tracking (optional)
item_source: Source of the item (optional)
Returns True if successful, False otherwise.
"""
if self.dry_run:
self.log(f"[DRY-RUN] Would add @copilot comment to PR #{pr_number}")
return True
try:
rest_headers = {
"Authorization": f"Bearer {self.token}",
"Accept": "application/vnd.github+json",
"User-Agent": USER_AGENT
}
# Build reference ID if provided
if work_item_id:
reference_id = f"**Reference ID:** {work_item_id}\n"
else:
reference_id = ""
# Build document reference
if file_path and not file_path.startswith("File path not specified"):
doc_ref = f"**Document to modify:** `{file_path}`\n"
file_instruction = f"2. Locate the file: `{file_path}`"
elif doc_url:
doc_ref = f"**Document URL:** {doc_url}\n"
file_instruction = f"2. Locate the file from this document URL: {doc_url}"
else:
doc_ref = "**Note:** File path not specified\n"
file_instruction = "2. Review the PR description to identify the file(s) that need to be modified"
# Build custom instructions section
if custom_instructions and custom_instructions.strip():
custom_instructions_section = f"""
**Custom AI Instructions:**
{custom_instructions.strip()}
"""
else:
custom_instructions_section = ""
# Create a comment mentioning @copilot with VERY explicit instructions
comment_body = f"""@copilot
{reference_id}{doc_ref}
**Instructions:**
Task: Update the file with the changes requested above.
Steps to complete:
Locate the file containing the reference shown below.
Find the reference text within the file
Replace it with the 'Proposed New Text' shown above or use the reference as guidance
Maintain the existing formatting, indentation, and structure
Ensure no other content in the file is modified
> [!IMPORTANT]
> Only replace the specified text - do not make additional changes.
> Preserve all formatting, links, and code blocks.
> If the current text cannot be found exactly, search for similar text.
> Do not remove any text unless the reference or suggested guidance indicates to do so, if the text is obsolete or incorrect.
1. Make changes to `{branch_name}` branch for this pull request.
{file_instruction}
3. Find this reference in the content:
```
{old_text}
```
4. Use this text as guidance for the new content:
```
{new_text}
```
5. Ensure the changes align with the context provided.
6. Do a freshness check to ensure the file content is up-to-date before making changes.
7. Commit the changes to the `{branch_name}` branch
> [!NOTE]
> If guidance is empty, follow the reference to make changes.
{custom_instructions_section}
Thank you!
"""
# Post the comment to the PR
comments_url = f"https://api.github.com/repos/{owner}/{repo}/issues/{pr_number}/comments"
comment_data = {"body": comment_body}
resp = requests.post(comments_url, headers=rest_headers, json=comment_data, timeout=30)
if resp.status_code == 403:
self.log("❌ Permission denied when adding comment")
return False
resp.raise_for_status()
self.log(f"✅ Added @copilot comment to PR #{pr_number}")
self.log(" Copilot has been instructed to work on THIS PR's branch")
return True
except requests.HTTPError as e:
self.log(f"❌ HTTP Error adding comment: {e}")
return False
except Exception as e:
self.log(f"❌ Error adding comment: {str(e)}")
return False
def add_pr_suggestion(self, owner: str, repo: str, pr_number: int, file_path: str,
old_text: str, new_text: str) -> bool:
"""Add a suggested change comment to a PR
This creates a review comment with a code suggestion that can be applied
with one click, keeping everything in the same PR.
Returns True if successful, False otherwise.
"""
if self.dry_run:
self.log(f"[DRY-RUN] Would add suggested change to PR #{pr_number}")
return True
try:
# Use REST API to create a review comment with suggestion
rest_headers = {
"Authorization": f"Bearer {self.token}",
"Accept": "application/vnd.github+json",
"User-Agent": USER_AGENT
}
# First, get the latest commit SHA from the PR
pr_url = f"https://api.github.com/repos/{owner}/{repo}/pulls/{pr_number}"
resp = requests.get(pr_url, headers=rest_headers, timeout=30)
resp.raise_for_status()
pr_data = resp.json()
commit_sha = pr_data["head"]["sha"]
self.log(f"Latest commit SHA: {commit_sha}")
# Get the file content to find line numbers
file_url = f"https://api.github.com/repos/{owner}/{repo}/contents/{file_path}?ref={commit_sha}"
resp = requests.get(file_url, headers=rest_headers, timeout=30)
if resp.status_code == 404:
self.log(f"⚠️ File not found in PR: {file_path}")
return False
resp.raise_for_status()
file_data = resp.json()
content = base64.b64decode(file_data["content"]).decode('utf-8')
lines = content.split('\n')
# Find the line number where the old text appears
old_text_lines = old_text.split('\n')
start_line = None
for i in range(len(lines) - len(old_text_lines) + 1):
if '\n'.join(lines[i:i+len(old_text_lines)]) == old_text:
start_line = i + 1 # Line numbers are 1-based
break
if not start_line:
self.log("⚠️ Could not find text in file to create suggestion")
return False
end_line = start_line + len(old_text_lines) - 1
# Create a review comment with suggested change
suggestion_body = f"""```suggestion
{new_text}
```
**Automated Suggestion:** This change was requested.
Click "Commit suggestion" above to apply this change directly to the PR."""
comment_data = {
"body": suggestion_body,
"commit_id": commit_sha,
"path": file_path,
"line": end_line,
"start_line": start_line if start_line != end_line else None,
"start_side": "RIGHT"
}
# Remove start_line if it's the same as line (single-line comment)
if start_line == end_line:
del comment_data["start_line"]
comments_url = f"https://api.github.com/repos/{owner}/{repo}/pulls/{pr_number}/comments"
resp = requests.post(comments_url, headers=rest_headers, json=comment_data, timeout=30)
if resp.status_code == 403:
self.log("❌ Permission denied when adding suggestion")
return False
resp.raise_for_status()
self.log(f"✅ Added suggested change comment to PR #{pr_number}")
self.log(" User can click 'Commit suggestion' to apply it")
return True
except requests.HTTPError as e:
self.log(f"❌ HTTP Error adding suggestion: {e}")
if hasattr(e, 'response') and e.response is not None:
self.log(f" Response: {e.response.text[:200]}")
return False
except Exception as e:
self.log(f"❌ Error adding suggestion: {str(e)}")
return False
def create_branch_with_placeholder(self, owner: str, repo: str, branch_name: str, instructions: str) -> bool:
"""Create a branch with a placeholder commit using REST API
This creates a branch from main and adds a .copilot-instructions.md file
so that the branch has at least one commit, allowing PR creation.
Returns True if successful, False otherwise.
"""
if self.dry_run:
self.log(f"[DRY-RUN] Would create branch {branch_name} with placeholder commit")
return True
try:
# Use REST API for branch/file creation
rest_headers = {
"Authorization": f"Bearer {self.token}",
"Accept": "application/vnd.github+json",
"User-Agent": USER_AGENT
}
# 1. Get the SHA of the main branch
self.log(f"Getting SHA of main branch...")
ref_url = f"https://api.github.com/repos/{owner}/{repo}/git/refs/heads/main"
resp = requests.get(ref_url, headers=rest_headers, timeout=30)
resp.raise_for_status()
main_sha = resp.json()["object"]["sha"]
self.log(f"Main branch SHA: {main_sha}")
# 2. Create new branch from main
self.log(f"Creating branch {branch_name}...")
create_ref_url = f"https://api.github.com/repos/{owner}/{repo}/git/refs"
create_ref_payload = {
"ref": f"refs/heads/{branch_name}",
"sha": main_sha
}
resp = requests.post(create_ref_url, headers=rest_headers, json=create_ref_payload, timeout=30)
# Check for permission errors
if resp.status_code == 403:
self.log("❌ Permission denied: GitHub token doesn't have write access to this repository")
self.log(f" Repository: {owner}/{repo}")
self.log(" Required permission: 'repo' scope with write access")
self.log("")
self.log(" Please verify:")
self.log(" 1. Your token has the 'repo' scope enabled")
self.log(" 2. You have write/push access to this repository")
self.log(" 3. The repository exists and the name is correct")
self.log("")
self.log(" TIP: You can still create Issues (uncheck the PR checkbox)")
return False
# Branch might already exist, that's okay
if resp.status_code == 422:
error_detail = resp.json()
if "already exists" in str(error_detail).lower():
self.log(f"Branch {branch_name} already exists, using existing branch")
return True
else:
self.log(f"Error creating branch: {error_detail}")
resp.raise_for_status()
self.log(f"✅ Branch {branch_name} created")
# 3. Create a placeholder file with instructions
self.log("Creating placeholder commit with Copilot instructions...")
file_content = f"""# Copilot Instructions
This is a placeholder file created to allow PR creation.
## Task
{instructions}
Please process the instructions above and make the necessary changes to the documentation.
Once you've made the changes, you can delete this file.
"""
encoded_content = base64.b64encode(file_content.encode('utf-8')).decode()
file_payload = {
"message": f"Add Copilot instructions for {branch_name}",
"content": encoded_content,
"branch": branch_name
}
file_url = f"https://api.github.com/repos/{owner}/{repo}/contents/.copilot-instructions.md"
resp = requests.put(file_url, headers=rest_headers, json=file_payload, timeout=30)
resp.raise_for_status()
self.log(f"✅ Placeholder commit created in branch {branch_name}")
return True
except requests.HTTPError as e:
self.log(f"❌ HTTP Error creating branch with placeholder: {e}")
if e.response.status_code == 403:
self.log(" Permission denied - token doesn't have write access")
return False
except Exception as e:
self.log(f"❌ Error creating branch with placeholder: {str(e)}")
return False
# Backward compatibility alias
GitHubAPI = GitHubGQL
File diff suppressed because it is too large Load Diff
-125
View File
@@ -1,125 +0,0 @@
"""
Processing Log Dialog
Displays the processing log in a separate dialog window
"""
import flet as ft
# Compatibility fix for Flet 0.28+ (Icons vs icons, Colors vs colors)
ft.icons = ft.Icons
ft.colors = ft.Colors
class ProcessingLogDialog:
"""Processing log display dialog"""
def __init__(self, page: ft.Page, log_text_ref: ft.Ref):
self.page = page
self.log_text_ref = log_text_ref
self.dialog_ref = ft.Ref[ft.AlertDialog]()
self.log_display_ref = ft.Ref[ft.TextField]()
def show(self):
"""Show the processing log dialog"""
try:
print("ProcessingLogDialog.show() called")
# Create the dialog
dialog = self._create_dialog()
self.dialog_ref.current = dialog
# Sync the log content before showing
self._sync_log_content()
# Open the dialog
self.page.open(dialog)
self.page.update()
except Exception as ex:
print(f"Error in ProcessingLogDialog.show(): {ex}")
import traceback
traceback.print_exc()
def _sync_log_content(self):
"""Sync log content from main log to dialog display"""
if self.log_text_ref.current and self.log_display_ref.current:
self.log_display_ref.current.value = self.log_text_ref.current.value
if self.page:
self.page.update()
def _create_dialog(self) -> ft.AlertDialog:
"""Create the processing log dialog"""
# Create a display field that will show a copy of the log
# This is synced from the main log field
log_display = ft.TextField(
ref=self.log_display_ref,
value=self.log_text_ref.current.value if self.log_text_ref.current else "",
multiline=True,
read_only=True,
expand=True,
text_style=ft.TextStyle(font_family="Courier New"),
min_lines=20,
max_lines=30,
)
# Refresh button
refresh_button = ft.TextButton(
"Refresh",
icon=ft.icons.REFRESH,
on_click=self._refresh_log,
)
# Clear button
clear_button = ft.TextButton(
"Clear Log",
icon=ft.icons.DELETE_OUTLINE,
on_click=self._clear_log,
)
# Close button
close_button = ft.TextButton(
"Close",
on_click=self._close_clicked,
)
dialog = ft.AlertDialog(
ref=self.dialog_ref,
modal=True,
title=ft.Row(
[
ft.Icon(ft.icons.LIST_ALT, color="blue"),
ft.Text("Processing Log", size=20, weight=ft.FontWeight.BOLD),
],
alignment=ft.MainAxisAlignment.START,
),
content=ft.Container(
content=log_display,
width=800,
height=500,
),
actions=[
refresh_button,
clear_button,
close_button,
],
actions_alignment=ft.MainAxisAlignment.SPACE_BETWEEN,
)
return dialog
def _refresh_log(self, e):
"""Refresh the log content from the main log"""
self._sync_log_content()
def _clear_log(self, e):
"""Clear the log"""
# Clear both the main log and the display
if self.log_text_ref.current:
self.log_text_ref.current.value = ""
if self.log_display_ref.current:
self.log_display_ref.current.value = ""
self.page.update()
def _close_clicked(self, e):
"""Handle close button click"""
if self.dialog_ref.current:
self.page.close(self.dialog_ref.current)
File diff suppressed because it is too large Load Diff
-307
View File
@@ -1,307 +0,0 @@
"""
Settings Manager
Handles application settings with live updates and secure storage.
Non-secret settings are stored in config.json.
Secrets (API keys, tokens) are stored in the system keyring.
"""
import json
import os
from pathlib import Path
from typing import Dict, Any, Optional, Callable
import keyring
class SettingsManager:
"""
Manages application settings with live updates.
Features:
- Non-secret settings stored in JSON
- API keys stored securely in system keyring
- Live update notifications to registered listeners
- No app restart required for changes
"""
# Keyring service name for this app
SERVICE_NAME = "GitHubPulse"
# Keys that should be stored in keyring (secrets)
SECRET_KEYS = {
'GITHUB_PAT',
'ANTHROPIC_API_KEY',
'OPENAI_API_KEY',
'GITHUB_COPILOT_TOKEN',
'CLAUDE_API_KEY', # Alternative name for Anthropic
'GITHUB_TOKEN', # For GitHub Copilot
'OLLAMA_API_KEY', # Optional Ollama API key
}
# Default settings (non-secrets)
DEFAULT_SETTINGS = {
# GitHub Configuration
'GITHUB_REPO': '',
'FORKED_REPO': '',
'LOCAL_REPO_PATH': '',
# Application Settings
'AI_PROVIDER': 'none',
'DRY_RUN': 'false',
'DEFAULT_BRANCH': 'main',
'THEME_MODE': 'dark',
'AUTO_REFRESH': 'true',
'REFRESH_INTERVAL': '300',
# Ollama Configuration
'OLLAMA_URL': '',
'OLLAMA_MODEL': '',
# Custom AI Instructions
'CUSTOM_INSTRUCTIONS': '',
}
def __init__(self, config_dir: Optional[Path] = None):
"""
Initialize the settings manager.
Args:
config_dir: Directory to store config.json. Defaults to app directory.
"""
# Determine config directory
if config_dir is None:
# Use app directory
config_dir = Path(__file__).parent.parent
self.config_dir = Path(config_dir)
self.config_file = self.config_dir / "config.json"
# Settings storage
self._settings: Dict[str, Any] = {}
# Registered change listeners
self._listeners: list[Callable[[str, Any], None]] = []
# Load settings
self.load()
def load(self) -> Dict[str, Any]:
"""
Load settings from config.json and keyring.
Returns:
Dictionary of all settings (secrets and non-secrets combined)
"""
# Start with defaults
self._settings = self.DEFAULT_SETTINGS.copy()
# Load from JSON file
if self.config_file.exists():
try:
with open(self.config_file, 'r') as f:
saved_settings = json.load(f)
# Only load non-secret settings from JSON
for key, value in saved_settings.items():
if key not in self.SECRET_KEYS:
self._settings[key] = value
except Exception as e:
print(f"Error loading config.json: {e}")
# Load secrets from keyring
for secret_key in self.SECRET_KEYS:
try:
value = keyring.get_password(self.SERVICE_NAME, secret_key)
if value:
self._settings[secret_key] = value
except Exception as e:
print(f"Error loading {secret_key} from keyring: {e}")
return self._settings.copy()
def save(self, settings: Optional[Dict[str, Any]] = None) -> bool:
"""
Save settings to config.json and keyring.
Args:
settings: Settings to save. If None, saves current settings.
Returns:
True if successful, False otherwise
"""
if settings is not None:
# Update internal settings
for key, value in settings.items():
old_value = self._settings.get(key)
self._settings[key] = value
# Notify listeners of changes
if old_value != value:
self._notify_change(key, value)
try:
# Save non-secrets to JSON
json_settings = {
key: value for key, value in self._settings.items()
if key not in self.SECRET_KEYS
}
with open(self.config_file, 'w') as f:
json.dump(json_settings, f, indent=2)
# Save secrets to keyring
for secret_key in self.SECRET_KEYS:
if secret_key in self._settings:
value = self._settings[secret_key]
if value: # Only save non-empty values
try:
keyring.set_password(self.SERVICE_NAME, secret_key, str(value))
except Exception as e:
print(f"Error saving {secret_key} to keyring: {e}")
return True
except Exception as e:
print(f"Error saving settings: {e}")
return False
def get(self, key: str, default: Any = None) -> Any:
"""
Get a setting value.
Args:
key: Setting key
default: Default value if key doesn't exist
Returns:
Setting value or default
"""
return self._settings.get(key, default)
def set(self, key: str, value: Any, save: bool = True) -> bool:
"""
Set a setting value with live update.
Args:
key: Setting key
value: New value
save: Whether to persist immediately
Returns:
True if successful
"""
old_value = self._settings.get(key)
self._settings[key] = value
# Notify listeners
if old_value != value:
self._notify_change(key, value)
# Save if requested
if save:
return self.save()
return True
def get_all(self) -> Dict[str, Any]:
"""
Get all settings.
Returns:
Dictionary of all settings
"""
return self._settings.copy()
def register_listener(self, callback: Callable[[str, Any], None]):
"""
Register a callback to be notified of setting changes.
The callback will be called with (key, new_value) when a setting changes.
Args:
callback: Function to call on settings change
"""
if callback not in self._listeners:
self._listeners.append(callback)
def unregister_listener(self, callback: Callable[[str, Any], None]):
"""
Unregister a settings change callback.
Args:
callback: Function to remove from listeners
"""
if callback in self._listeners:
self._listeners.remove(callback)
def _notify_change(self, key: str, value: Any):
"""
Notify all registered listeners of a setting change.
Args:
key: Setting key that changed
value: New value
"""
for listener in self._listeners:
try:
listener(key, value)
except Exception as e:
print(f"Error notifying listener of {key} change: {e}")
def delete_secret(self, key: str) -> bool:
"""
Delete a secret from the keyring.
Args:
key: Secret key to delete
Returns:
True if successful
"""
if key not in self.SECRET_KEYS:
return False
try:
keyring.delete_password(self.SERVICE_NAME, key)
if key in self._settings:
del self._settings[key]
self._notify_change(key, None)
return True
except Exception as e:
print(f"Error deleting {key} from keyring: {e}")
return False
def migrate_from_env(self, env_file: Path) -> bool:
"""
Migrate settings from a .env file to the new system.
Args:
env_file: Path to .env file
Returns:
True if migration successful
"""
if not env_file.exists():
print(f"Env file not found: {env_file}")
return False
try:
# Read .env file
env_settings = {}
with open(env_file, 'r') as f:
for line in f:
line = line.strip()
if line and not line.startswith('#'):
if '=' in line:
key, value = line.split('=', 1)
key = key.strip()
value = value.strip().strip('"').strip("'")
env_settings[key] = value
# Save to new system
self.save(env_settings)
print(f"Successfully migrated {len(env_settings)} settings from .env")
return True
except Exception as e:
print(f"Error migrating from .env: {e}")
return False
-662
View File
@@ -1,662 +0,0 @@
"""
Utility functions and helpers
"""
import json
import os
import re
import subprocess
import threading
import datetime
from pathlib import Path
from typing import Dict, Any, Optional, Tuple, List
from urllib.parse import urlparse
class Logger:
"""Simple logger for GUI applications"""
def __init__(self, text_widget=None):
self.text_widget = text_widget
self._lock = threading.Lock()
def log(self, message: str) -> None:
"""Log a message to the text widget and console"""
timestamp = __import__('datetime').datetime.now().strftime("%H:%M:%S")
formatted_message = f"[{timestamp}] {message}"
try:
print(formatted_message)
except UnicodeEncodeError:
# Fallback: replace Unicode emojis with ASCII equivalents
safe_message = formatted_message.replace('', '[SUCCESS]').replace('', '[ERROR]').replace('⚠️', '[WARNING]').replace('📋', '[INFO]').replace('📄', '[FILE]').replace('📍', '[LOCATION]').replace('📝', '[EDIT]')
print(safe_message)
if self.text_widget:
def update_widget():
try:
with self._lock:
self.text_widget.config(state='normal')
self.text_widget.insert('end', formatted_message + '\n')
self.text_widget.see('end')
self.text_widget.config(state='disabled')
self.text_widget.update_idletasks()
except:
pass # Widget might be destroyed
# Schedule update on main thread
if hasattr(self.text_widget, 'after'):
self.text_widget.after(0, update_widget)
else:
update_widget()
class PRNumberManager:
"""Manages PR numbers for branch naming"""
PR_COUNTER_FILE = '.pr_counter.json'
@classmethod
def get_pr_counter_file(cls) -> str:
"""Get the path to the PR counter file"""
script_dir = os.path.dirname(os.path.abspath(__file__))
return os.path.join(script_dir, cls.PR_COUNTER_FILE)
@classmethod
def load_pr_counter(cls) -> Dict[str, int]:
"""Load the PR counter from file"""
counter_file = cls.get_pr_counter_file()
if os.path.exists(counter_file):
try:
with open(counter_file, 'r', encoding='utf-8') as f:
return json.load(f)
except (json.JSONDecodeError, FileNotFoundError):
pass
return {}
@classmethod
def save_pr_counter(cls, counter: Dict[str, int]) -> None:
"""Save the PR counter to file"""
counter_file = cls.get_pr_counter_file()
try:
with open(counter_file, 'w', encoding='utf-8') as f:
json.dump(counter, f, indent=2)
except Exception as e:
print(f"Warning: Could not save PR counter: {e}")
@classmethod
def get_next_pr_number(cls, provider_key: str) -> int:
"""
Get the next PR number for a given provider
Args:
provider_key: Either the AI provider name ('chatgpt', 'claude') or 'gh_copilot'
Returns:
Next available PR number for this provider
"""
try:
counter = cls.load_pr_counter()
current_number = counter.get(provider_key, 0)
next_number = current_number + 1
counter[provider_key] = next_number
cls.save_pr_counter(counter)
return next_number
except Exception as e:
print(f"Error managing PR counter: {e}")
# Fallback to a timestamp-based number
import time
return int(time.time()) % 10000
class GitHubInfoExtractor:
"""Extracts GitHub repository information from URLs"""
@staticmethod
def extract_github_info(doc_url: str) -> Dict[str, Any]:
"""Extract GitHub repository information from a document URL"""
try:
if not doc_url or 'github.com' not in doc_url:
return {'error': 'Not a GitHub URL'}
parsed = urlparse(doc_url)
path_parts = parsed.path.strip('/').split('/')
if len(path_parts) < 2:
return {'error': 'Invalid GitHub URL format'}
owner = path_parts[0]
repo = path_parts[1]
# Try to extract file path if it's a blob URL
file_path = None
if len(path_parts) > 3 and path_parts[2] == 'blob':
# Skip branch name and get file path
if len(path_parts) > 4:
file_path = '/'.join(path_parts[4:])
result = {
'owner': owner,
'repo': repo,
'original_content_git_url': doc_url
}
if file_path:
result['file_path'] = file_path
# Try to find ms.author from the URL or repo name
ms_author = GitHubInfoExtractor._extract_ms_author(owner, repo, doc_url)
if ms_author:
result['ms_author'] = ms_author
return result
except Exception as e:
return {'error': f'Error parsing GitHub URL: {str(e)}'}
@staticmethod
def _extract_ms_author(owner: str, repo: str, url: str) -> Optional[str]:
"""Try to extract ms.author from various sources"""
try:
# Method 1: Check if owner looks like a Microsoft username
if owner.startswith('Microsoft') or 'microsoft' in owner.lower():
# Try to extract from repo name or URL patterns
if '-' in repo:
parts = repo.split('-')
for part in parts:
if len(part) > 2 and part.islower():
return part
# Method 2: Look for patterns in the URL
url_lower = url.lower()
# Common patterns for ms.author
patterns = [
r'/([a-z][a-z0-9-]+[a-z0-9])/', # username-like patterns
r'author[=:]([a-z][a-z0-9-]+)', # author= or author: patterns
]
for pattern in patterns:
match = re.search(pattern, url_lower)
if match:
candidate = match.group(1)
# Validate it looks like a reasonable username
if 3 <= len(candidate) <= 20 and candidate.replace('-', '').isalnum():
return candidate
return None
except Exception:
return None
class WorkItemFieldExtractor:
"""Extracts and processes item fields (placeholder for future implementation)"""
@staticmethod
def extract_work_item_fields(work_item: Dict[str, Any]) -> Dict[str, Any]:
"""Extract and process fields from work item (placeholder)"""
fields = work_item.get('fields', {})
# Extract basic fields
item_id = work_item.get('id', 'Unknown')
title = fields.get('System.Title', 'No Title')
# Extract custom fields with fallbacks
nature_of_request = (
fields.get('Custom.Natureofrequest') or
fields.get('Custom.NatureOfRequest') or
fields.get('Microsoft.VSTS.Common.DescriptionHtml', '')
)
# Clean HTML if present
if nature_of_request and '<' in nature_of_request:
nature_of_request = WorkItemFieldExtractor._clean_html(nature_of_request)
mydoc_url = (
fields.get('Custom.MyDocURL') or
fields.get('Custom.DocumentURL') or
fields.get('Custom.URL', '')
)
text_to_change = (
fields.get('Custom.TextToChange') or
fields.get('Custom.CurrentText', '')
)
new_text = (
fields.get('Custom.NewText') or
fields.get('Custom.ProposedText') or
fields.get('Custom.ReplacementText', '')
)
# Extract GitHub info from the document URL
github_info = GitHubInfoExtractor.extract_github_info(mydoc_url)
return {
'id': item_id,
'title': title,
'nature_of_request': nature_of_request,
'mydoc_url': mydoc_url,
'text_to_change': text_to_change,
'new_text': new_text,
'github_info': github_info,
'status': 'Ready',
'source': 'Generic'
}
@staticmethod
def extract_uuf_item_fields(uuf_item: Dict[str, Any]) -> Dict[str, Any]:
"""Extract and process fields from custom item (placeholder)"""
# UUF items have different field structure
item_id = uuf_item.get('cr_uufitemid', 'Unknown')
title = uuf_item.get('cr_title', 'No Title')
nature_of_request = uuf_item.get('cr_description', '')
mydoc_url = uuf_item.get('cr_documenturl', '')
text_to_change = uuf_item.get('cr_currenttext', '')
new_text = uuf_item.get('cr_newtext', '')
# Extract GitHub info
github_info = GitHubInfoExtractor.extract_github_info(mydoc_url)
return {
'id': item_id,
'title': title,
'nature_of_request': nature_of_request,
'mydoc_url': mydoc_url,
'text_to_change': text_to_change,
'new_text': new_text,
'github_info': github_info,
'status': 'Ready',
'source': 'UUF'
}
@staticmethod
def _clean_html(html_text: str) -> str:
"""Remove HTML tags and decode entities"""
import html
# Remove HTML tags
clean_text = re.sub(r'<[^>]+>', '', html_text)
# Decode HTML entities
clean_text = html.unescape(clean_text)
# Clean up whitespace
clean_text = re.sub(r'\s+', ' ', clean_text).strip()
return clean_text
class ContentBuilders:
"""Builds content for GitHub issues and PRs"""
@staticmethod
def build_issue_title(item: Dict[str, Any]) -> str:
"""Build GitHub issue title"""
item_id = item.get('id', '')
if item_id:
return f"[#{item_id}] {item['title']}"
return f"{item['title']}"
@staticmethod
def build_issue_body(item: Dict[str, Any], github_info: Dict[str, Any]) -> str:
"""Build GitHub issue body"""
body_parts = []
# Header
body_parts.append("## Item Details")
body_parts.append("")
# Make ID a hyperlink if source URL is available
if item.get('source_url'):
body_parts.append(f"**ID:** [{item['id']}]({item['source_url']})")
else:
body_parts.append(f"**ID:** {item['id']}")
body_parts.append(f"**Title:** {item['title']}")
body_parts.append("")
# Nature of request
if item['nature_of_request']:
body_parts.append("**Nature of Request:**")
body_parts.append(item['nature_of_request'])
body_parts.append("")
# Document information
if item['mydoc_url']:
body_parts.append("**Document URL:**")
body_parts.append(item['mydoc_url'])
body_parts.append("")
# Change details
body_parts.append("## Change Details")
body_parts.append("")
if item['text_to_change']:
body_parts.append("**Text to Change:**")
body_parts.append("```")
body_parts.append(item['text_to_change'])
body_parts.append("```")
body_parts.append("")
if item['new_text']:
body_parts.append("**Proposed New Text:**")
body_parts.append("```")
body_parts.append(item['new_text'])
body_parts.append("```")
body_parts.append("")
# Repository info
if github_info.get('owner') and github_info.get('repo'):
body_parts.append("## Repository Information")
body_parts.append("")
body_parts.append(f"**Repository:** {github_info['owner']}/{github_info['repo']}")
if github_info.get('ms_author'):
body_parts.append(f"**Author:** @{github_info['ms_author']}")
body_parts.append("")
# Instructions for manual review
body_parts.append("## Instructions")
body_parts.append("")
body_parts.append("This issue requires manual review of the proposed documentation change.")
body_parts.append("")
body_parts.append("**Next Steps:**")
body_parts.append("1. Review the proposed change above")
body_parts.append("2. Navigate to the document URL")
body_parts.append("3. Locate the text that needs to be changed")
body_parts.append("4. Make the appropriate updates")
body_parts.append("5. Close this issue when complete")
body_parts.append("")
body_parts.append("---")
body_parts.append("*Created automatically by GitHub Pulse*")
return "\n".join(body_parts)
@staticmethod
def build_pr_title(item: Dict[str, Any]) -> str:
"""Build GitHub PR title"""
item_id = item.get('id', '')
if item_id:
return f"[#{item_id}] {item['title']}"
return f"{item['title']}"
@staticmethod
def build_pr_body(item: Dict[str, Any], github_info: Dict[str, Any]) -> str:
"""Build GitHub PR body"""
body_parts = []
# Header
body_parts.append("## Documentation Update")
body_parts.append("")
# Make ID a hyperlink if source URL is available
if item.get('source_url'):
body_parts.append(f"**ID:** [{item['id']}]({item['source_url']})")
else:
body_parts.append(f"**ID:** {item['id']}")
body_parts.append(f"**Title:** {item['title']}")
body_parts.append("")
# Nature of request
if item['nature_of_request']:
body_parts.append("**Description:**")
body_parts.append(item['nature_of_request'])
body_parts.append("")
# Change summary
body_parts.append("## Changes Made")
body_parts.append("")
body_parts.append("This PR updates documentation as requested.")
body_parts.append("")
if item['text_to_change'] and item['new_text']:
body_parts.append("**Change Summary:**")
body_parts.append("- Updated specific text content as requested")
body_parts.append("")
body_parts.append("<details>")
body_parts.append("<summary>View Change Details</summary>")
body_parts.append("")
body_parts.append("**Original Text:**")
body_parts.append("```")
body_parts.append(item['text_to_change'])
body_parts.append("```")
body_parts.append("")
body_parts.append("**New Text:**")
body_parts.append("```")
body_parts.append(item['new_text'])
body_parts.append("```")
body_parts.append("</details>")
body_parts.append("")
# Repository info
if github_info.get('ms_author'):
body_parts.append(f"**Author:** @{github_info['ms_author']}")
body_parts.append("")
# Review instructions
body_parts.append("## Review Checklist")
body_parts.append("")
body_parts.append("- [ ] Changes match the requested update")
body_parts.append("- [ ] No unintended changes were made")
body_parts.append("- [ ] Grammar and formatting are correct")
body_parts.append("- [ ] Links and references are working")
body_parts.append("")
body_parts.append("---")
body_parts.append("*Created automatically by GitHub Pulse*")
return "\n".join(body_parts)
class LocalRepositoryScanner:
"""Scans local repository path for Git repositories"""
@staticmethod
def scan_local_repos(local_repo_path: str) -> List[str]:
"""Scan local path for Git repositories"""
if not local_repo_path or not os.path.exists(local_repo_path):
return []
repos = []
try:
for item in os.listdir(local_repo_path):
item_path = os.path.join(local_repo_path, item)
if os.path.isdir(item_path):
git_path = os.path.join(item_path, '.git')
if os.path.exists(git_path):
# Get remote origin URL to determine repo name
repo_info = LocalRepositoryScanner.get_repo_info(item_path)
if repo_info:
repos.append(repo_info)
else:
# Fallback to folder name
repos.append(f"local/{item}")
except PermissionError:
pass # Skip directories we can't access
except Exception as e:
print(f"Error scanning local repos: {e}")
return sorted(repos)
@staticmethod
def get_repo_info(repo_path: str) -> Optional[str]:
"""Get repository information from local Git repo"""
try:
# Get remote origin URL
result = subprocess.run(
['git', 'config', '--get', 'remote.origin.url'],
cwd=repo_path,
capture_output=True,
text=True,
timeout=10
)
if result.returncode == 0:
url = result.stdout.strip()
return LocalRepositoryScanner.parse_git_url(url)
except Exception:
pass
return None
@staticmethod
def parse_git_url(url: str) -> Optional[str]:
"""Parse Git URL to extract owner/repo format"""
try:
# Handle GitHub URLs
if 'github.com' in url:
# Handle both HTTPS and SSH URLs
if url.startswith('git@'):
# SSH: git@github.com:owner/repo.git
parts = url.split(':')[-1].replace('.git', '')
return parts
else:
# HTTPS: https://github.com/owner/repo.git
parsed = urlparse(url)
path = parsed.path.strip('/').replace('.git', '')
return path
except:
pass
return None
@staticmethod
def clone_repository(repo_url: str, local_path: str, repo_name: str) -> bool:
"""Clone a repository to local path"""
try:
target_path = os.path.join(local_path, repo_name.split('/')[-1])
if os.path.exists(target_path):
print(f"Repository already exists at {target_path}")
return True
os.makedirs(local_path, exist_ok=True)
result = subprocess.run(
['git', 'clone', repo_url, target_path],
capture_output=True,
text=True,
timeout=300 # 5 minutes timeout
)
if result.returncode == 0:
print(f"Successfully cloned {repo_url} to {target_path}")
return True
else:
print(f"Failed to clone repository: {result.stderr}")
return False
except Exception as e:
print(f"Error cloning repository: {e}")
return False
class ConfigurationHelpers:
"""Configuration and validation utilities"""
@staticmethod
def validate_ai_provider_setup(config: Dict[str, Any], parent_window=None) -> bool:
"""Validate AI provider setup and offer to install missing modules
Args:
config: Configuration dictionary
parent_window: Parent tkinter window for dialogs
Returns:
bool: True if setup is valid or user handled the issue
"""
ai_provider = config.get('AI_PROVIDER', '').lower()
if not ai_provider or ai_provider == 'none':
return True # No AI provider selected, nothing to validate
try:
# Try to import AI manager for validation
from .ai_manager import AIManager
ai_manager = AIManager()
# Check if modules are available
available, missing = ai_manager.check_ai_module_availability(ai_provider)
if available:
return True # All modules available
print(f"⚠️ AI Provider '{ai_provider}' selected but missing required packages: {', '.join(missing)}")
# Offer to install missing packages
success = ai_manager.install_ai_packages(missing, parent_window)
if success:
# Re-check availability after installation
available, still_missing = ai_manager.check_ai_module_availability(ai_provider)
if available:
print(f"✅ AI Provider '{ai_provider}' is now ready to use")
return True
else:
print(f"⚠️ Some packages may still be missing: {', '.join(still_missing)}")
print("Please restart the application after installation completes")
return False
return False
except ImportError:
# AI manager not available, skip validation
return True
@staticmethod
def create_default_env_file() -> bool:
"""Create a default .env file with all settings blank"""
try:
default_config = """# GitHub Pulse Configuration
# Generated automatically - fill in your values
# IMPORTANT: Do NOT commit this file to source control. Add it to .gitignore.
# GitHub Configuration
GITHUB_PAT=
GITHUB_REPO=
FORKED_REPO=
# Application Settings
DRY_RUN=false
# AI Provider Configuration (for local PR creation with AI assistance)
AI_PROVIDER=
CLAUDE_API_KEY=
OPENAI_API_KEY=
GITHUB_TOKEN=
LOCAL_REPO_PATH=
# Custom AI Instructions (optional)
CUSTOM_INSTRUCTIONS=
"""
with open('.env', 'w', encoding='utf-8') as f:
f.write(default_config)
print("Created default .env file with blank values")
return True
except Exception as e:
print(f"Error creating default .env file: {e}")
return False
# Compatibility functions for direct function access
def get_next_pr_number(provider_key: str) -> int:
"""Compatibility function for direct access to PR number generation"""
return PRNumberManager.get_next_pr_number(provider_key)
def validate_ai_provider_setup(config: Dict[str, Any], parent_window=None) -> bool:
"""Compatibility function for direct access to AI provider validation"""
return ConfigurationHelpers.validate_ai_provider_setup(config, parent_window)
def create_default_env_file() -> bool:
"""Compatibility function for direct access to .env file creation"""
return ConfigurationHelpers.create_default_env_file()
-653
View File
@@ -1,653 +0,0 @@
"""
Workflow Manager
Manages GitHub workflow items (Issues and Pull Requests) from target and fork repositories
"""
import requests
from typing import List, Dict, Any, Optional, Tuple
class WorkflowItem:
"""Represents a GitHub workflow item (Issue or PR)"""
def __init__(self, item_type: str, data: Dict[str, Any], repo_source: str):
"""
Initialize a workflow item
Args:
item_type: 'issue' or 'pull_request'
data: Raw data from GitHub API
repo_source: 'target' or 'fork'
"""
self.item_type = item_type
self.repo_source = repo_source
self.data = data
# Extract common fields
self.number = data.get('number')
self.title = data.get('title', 'No Title')
self.state = data.get('state', 'unknown')
self.created_at = data.get('created_at', '')
self.updated_at = data.get('updated_at', '')
self.body = data.get('body', '')
self.url = data.get('html_url', '')
self.api_url = data.get('url', '')
# Author information
user = data.get('user', {})
self.author = user.get('login', 'unknown') if user else 'unknown'
self.author_url = user.get('html_url', '') if user else ''
# Labels
self.labels = [label.get('name', '') for label in data.get('labels', [])]
# Assignees
assignees = data.get('assignees', [])
self.assignees = [a.get('login', '') for a in assignees if a]
# PR-specific fields
if item_type == 'pull_request':
self.is_draft = data.get('draft', False)
self.mergeable_state = data.get('mergeable_state', 'unknown')
self.merged = data.get('merged', False)
self.base_ref = data.get('base', {}).get('ref', '')
self.head_ref = data.get('head', {}).get('ref', '')
else:
self.is_draft = False
self.mergeable_state = None
self.merged = False
self.base_ref = None
self.head_ref = None
# Comments count
self.comments_count = data.get('comments', 0)
def __repr__(self):
return f"<WorkflowItem {self.item_type} #{self.number}: {self.title[:50]}>"
def to_dict(self) -> Dict[str, Any]:
"""Convert to dictionary for easy serialization"""
return {
'item_type': self.item_type,
'repo_source': self.repo_source,
'data': self.data, # Include raw data for full reconstruction
'number': self.number,
'title': self.title,
'state': self.state,
'created_at': self.created_at,
'updated_at': self.updated_at,
'body': self.body,
'url': self.url,
'api_url': self.api_url,
'author': self.author,
'author_url': self.author_url,
'labels': self.labels,
'assignees': self.assignees,
'is_draft': self.is_draft,
'mergeable_state': self.mergeable_state,
'merged': self.merged,
'base_ref': self.base_ref,
'head_ref': self.head_ref,
'comments_count': self.comments_count
}
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> 'WorkflowItem':
"""Create WorkflowItem from dictionary (for cache deserialization)"""
# Extract the raw GitHub API data if available, otherwise use the dict itself
raw_data = data.get('data', data)
item_type = data.get('item_type', 'issue')
repo_source = data.get('repo_source', 'target')
return cls(item_type, raw_data, repo_source)
class GitHubRepoFetcher:
"""Fetches repository information from GitHub"""
def __init__(self, github_token: str, logger=None):
"""
Initialize the repo fetcher
Args:
github_token: GitHub Personal Access Token
logger: Optional logger instance
"""
self.token = github_token
self.logger = logger
self.headers = {
"Authorization": f"Bearer {github_token}",
"Accept": "application/vnd.github+json",
"User-Agent": "github-automation-tool/1.0"
}
def log(self, message: str):
"""Log a message"""
if self.logger:
self.logger.log(message)
else:
print(message)
def get_authenticated_user(self) -> Optional[Dict[str, Any]]:
"""
Get information about the authenticated user
Returns:
Dictionary with user information or None if error
"""
try:
url = "https://api.github.com/user"
response = requests.get(url, headers=self.headers, timeout=30)
response.raise_for_status()
return response.json()
except Exception as e:
self.log(f"❌ Error fetching authenticated user: {str(e)}")
return None
def fetch_user_repos(self, repo_type: str = 'owner', per_page: int = 100) -> List[Dict[str, Any]]:
"""
Fetch repositories for the authenticated user
Args:
repo_type: 'owner', 'member', or 'all'
per_page: Number of repos per page (max 100)
Returns:
List of repository dictionaries
"""
try:
url = "https://api.github.com/user/repos"
params = {
'type': repo_type,
'per_page': min(per_page, 100),
'sort': 'updated',
'direction': 'desc'
}
response = requests.get(url, headers=self.headers, params=params, timeout=30)
response.raise_for_status()
repos = response.json()
self.log(f"✅ Found {len(repos)} repositories ({repo_type})")
return repos
except Exception as e:
self.log(f"❌ Error fetching user repos: {str(e)}")
return []
def fetch_repos_with_permissions(self, min_permission: str = 'push') -> List[Dict[str, Any]]:
"""
Fetch repositories where user has specific permissions
Args:
min_permission: Minimum permission level ('pull', 'push', 'admin')
Returns:
List of repository dictionaries with sufficient permissions
"""
try:
# Fetch all repos user has access to
all_repos = self.fetch_user_repos(repo_type='all')
# Filter by permission level
filtered_repos = []
permission_levels = {'pull': 0, 'push': 1, 'admin': 2}
min_level = permission_levels.get(min_permission, 1)
for repo in all_repos:
permissions = repo.get('permissions', {})
# Check permission level
if permissions.get('admin'):
level = 2
elif permissions.get('push'):
level = 1
elif permissions.get('pull'):
level = 0
else:
level = -1
if level >= min_level:
filtered_repos.append(repo)
self.log(f"✅ Found {len(filtered_repos)} repos with '{min_permission}' permission or higher")
return filtered_repos
except Exception as e:
self.log(f"❌ Error fetching repos with permissions: {str(e)}")
return []
def search_repositories(self, query: str, per_page: int = 30) -> List[Dict[str, Any]]:
"""
Search for repositories on GitHub
Args:
query: Search query string
per_page: Number of results per page (max 100)
Returns:
List of repository dictionaries
"""
if not query or not query.strip():
return []
try:
url = "https://api.github.com/search/repositories"
params = {
'q': query.strip(),
'per_page': min(per_page, 100),
'sort': 'updated',
'order': 'desc'
}
response = requests.get(url, headers=self.headers, params=params, timeout=30)
response.raise_for_status()
data = response.json()
repos = data.get('items', [])
total_count = data.get('total_count', 0)
self.log(f"✅ Search found {total_count} repositories (showing {len(repos)})")
return repos
except Exception as e:
self.log(f"❌ Error searching repositories: {str(e)}")
return []
def get_repo_names(self, repos: List[Dict[str, Any]]) -> List[str]:
"""
Extract repository names in 'owner/repo' format
Args:
repos: List of repository dictionaries
Returns:
List of repository name strings
"""
return [repo.get('full_name', '') for repo in repos if repo.get('full_name')]
class WorkflowManager:
"""Manages workflow items from GitHub repositories"""
def __init__(self, github_token: str, logger=None):
"""
Initialize the workflow manager
Args:
github_token: GitHub Personal Access Token
logger: Optional logger instance
"""
self.token = github_token
self.logger = logger
self.headers = {
"Authorization": f"Bearer {github_token}",
"Accept": "application/vnd.github+json",
"User-Agent": "github-automation-tool/1.0"
}
# Initialize repo fetcher
self.repo_fetcher = GitHubRepoFetcher(github_token, logger)
def log(self, message: str):
"""Log a message"""
if self.logger:
self.logger.log(message)
else:
print(message)
def _parse_repo(self, repo_str: str) -> Optional[Tuple[str, str]]:
"""
Parse a repository string into owner and name
Args:
repo_str: Repository string in format "owner/repo"
Returns:
Tuple of (owner, repo) or None if invalid
"""
if not repo_str or '/' not in repo_str:
return None
parts = repo_str.strip().split('/')
if len(parts) != 2:
return None
return parts[0], parts[1]
def fetch_issues(self, repo_str: str, repo_source: str = 'target',
state: str = 'all', per_page: int = 100) -> List[WorkflowItem]:
"""
Fetch issues from a repository
Args:
repo_str: Repository string in format "owner/repo"
repo_source: 'target' or 'fork' to identify source
state: 'open', 'closed', or 'all'
per_page: Number of items per page (max 100)
Returns:
List of WorkflowItem objects
"""
parsed = self._parse_repo(repo_str)
if not parsed:
self.log(f"L Invalid repository format: {repo_str}")
return []
owner, repo = parsed
self.log(f"Fetching issues from {owner}/{repo} ({repo_source})...")
try:
url = f"https://api.github.com/repos/{owner}/{repo}/issues"
params = {
'state': state,
'per_page': min(per_page, 100),
'sort': 'updated',
'direction': 'desc'
}
response = requests.get(url, headers=self.headers, params=params, timeout=30)
response.raise_for_status()
items_data = response.json()
# Filter out pull requests (GitHub's issues endpoint includes PRs)
issues_data = [item for item in items_data if 'pull_request' not in item]
issues = [WorkflowItem('issue', data, repo_source) for data in issues_data]
self.log(f" Found {len(issues)} issues in {owner}/{repo}")
return issues
except requests.HTTPError as e:
self.log(f"L HTTP Error fetching issues from {owner}/{repo}: {e}")
if e.response.status_code == 401:
self.log(" Check your GitHub Personal Access Token")
elif e.response.status_code == 404:
self.log(" Repository not found or no access")
return []
except Exception as e:
self.log(f"L Error fetching issues from {owner}/{repo}: {str(e)}")
return []
def fetch_pull_requests(self, repo_str: str, repo_source: str = 'target',
state: str = 'all', per_page: int = 100) -> List[WorkflowItem]:
"""
Fetch pull requests from a repository
Args:
repo_str: Repository string in format "owner/repo"
repo_source: 'target' or 'fork' to identify source
state: 'open', 'closed', or 'all'
per_page: Number of items per page (max 100)
Returns:
List of WorkflowItem objects
"""
parsed = self._parse_repo(repo_str)
if not parsed:
self.log(f"L Invalid repository format: {repo_str}")
return []
owner, repo = parsed
self.log(f"Fetching pull requests from {owner}/{repo} ({repo_source})...")
try:
url = f"https://api.github.com/repos/{owner}/{repo}/pulls"
params = {
'state': state,
'per_page': min(per_page, 100),
'sort': 'updated',
'direction': 'desc'
}
response = requests.get(url, headers=self.headers, params=params, timeout=30)
response.raise_for_status()
prs_data = response.json()
prs = [WorkflowItem('pull_request', data, repo_source) for data in prs_data]
self.log(f" Found {len(prs)} pull requests in {owner}/{repo}")
return prs
except requests.HTTPError as e:
self.log(f"L HTTP Error fetching PRs from {owner}/{repo}: {e}")
if e.response.status_code == 401:
self.log(" Check your GitHub Personal Access Token")
elif e.response.status_code == 404:
self.log(" Repository not found or no access")
return []
except Exception as e:
self.log(f"L Error fetching PRs from {owner}/{repo}: {str(e)}")
return []
def fetch_all_workflow_items(self, target_repo: str, fork_repo: str = None,
include_issues: bool = True,
include_prs: bool = True,
state: str = 'all') -> Dict[str, List[WorkflowItem]]:
"""
Fetch all workflow items from both target and fork repositories
Args:
target_repo: Target repository string "owner/repo"
fork_repo: Fork repository string "owner/repo" (optional)
include_issues: Whether to fetch issues
include_prs: Whether to fetch pull requests
state: 'open', 'closed', or 'all'
Returns:
Dictionary with keys 'target_issues', 'target_prs', 'fork_issues', 'fork_prs'
"""
results = {
'target_issues': [],
'target_prs': [],
'fork_issues': [],
'fork_prs': []
}
# Fetch from target repository
if target_repo:
if include_issues:
results['target_issues'] = self.fetch_issues(target_repo, 'target', state)
if include_prs:
results['target_prs'] = self.fetch_pull_requests(target_repo, 'target', state)
# Fetch from fork repository
if fork_repo:
if include_issues:
results['fork_issues'] = self.fetch_issues(fork_repo, 'fork', state)
if include_prs:
results['fork_prs'] = self.fetch_pull_requests(fork_repo, 'fork', state)
# Log summary
total = sum(len(items) for items in results.values())
self.log(f"\n= Summary: Fetched {total} total items")
self.log(f" Target Issues: {len(results['target_issues'])}")
self.log(f" Target PRs: {len(results['target_prs'])}")
if fork_repo:
self.log(f" Fork Issues: {len(results['fork_issues'])}")
self.log(f" Fork PRs: {len(results['fork_prs'])}")
return results
def get_combined_items(self, workflow_items: Dict[str, List[WorkflowItem]],
sort_by: str = 'updated') -> List[WorkflowItem]:
"""
Combine and sort all workflow items
Args:
workflow_items: Dictionary from fetch_all_workflow_items()
sort_by: 'updated', 'created', or 'number'
Returns:
Sorted list of all workflow items
"""
all_items = []
for items_list in workflow_items.values():
all_items.extend(items_list)
# Sort items
if sort_by == 'updated':
all_items.sort(key=lambda x: x.updated_at, reverse=True)
elif sort_by == 'created':
all_items.sort(key=lambda x: x.created_at, reverse=True)
elif sort_by == 'number':
all_items.sort(key=lambda x: x.number, reverse=True)
return all_items
def filter_items(self, items: List[WorkflowItem], **filters) -> List[WorkflowItem]:
"""
Filter workflow items based on criteria
Args:
items: List of WorkflowItem objects
**filters: Filter criteria (state, item_type, repo_source, author, labels)
Returns:
Filtered list of items
"""
filtered = items
if 'state' in filters and filters['state']:
filtered = [item for item in filtered if item.state == filters['state']]
if 'item_type' in filters and filters['item_type']:
filtered = [item for item in filtered if item.item_type == filters['item_type']]
if 'repo_source' in filters and filters['repo_source']:
filtered = [item for item in filtered if item.repo_source == filters['repo_source']]
if 'author' in filters and filters['author']:
filtered = [item for item in filtered if item.author == filters['author']]
if 'labels' in filters and filters['labels']:
label_filter = filters['labels']
if isinstance(label_filter, str):
label_filter = [label_filter]
filtered = [item for item in filtered
if any(label in item.labels for label in label_filter)]
return filtered
def fetch_comments(self, repo_str: str, issue_number: int, is_pull_request: bool = False) -> List[Dict[str, Any]]:
"""
Fetch comments for an issue or pull request
Args:
repo_str: Repository string in format "owner/repo"
issue_number: Issue or PR number
is_pull_request: Whether this is a pull request (for PR-specific comments)
Returns:
List of comment dictionaries with keys: 'user', 'body', 'created_at', 'updated_at'
"""
try:
# Parse repository string
if '/' not in repo_str:
self.log(f"Invalid repository format: {repo_str}")
return []
owner, repo = repo_str.split('/', 1)
# Fetch issue/PR comments (these are the same endpoint for both issues and PRs)
url = f"https://api.github.com/repos/{owner}/{repo}/issues/{issue_number}/comments"
print(f"DEBUG: Fetching comments from URL: {url}", flush=True)
response = requests.get(url, headers=self.headers)
print(f"DEBUG: Response status code: {response.status_code}", flush=True)
print(f"DEBUG: Response headers: {dict(response.headers)}", flush=True)
print(f"DEBUG: Response text length: {len(response.text)}", flush=True)
print(f"DEBUG: Response content (first 500): {response.text[:500]}", flush=True)
response.raise_for_status()
response_data = response.json()
print(f"DEBUG: Response data type: {type(response_data)}", flush=True)
print(f"DEBUG: Number of items: {len(response_data) if isinstance(response_data, list) else 'Not a list'}", flush=True)
if isinstance(response_data, list) and len(response_data) > 0:
print(f"DEBUG: First item keys: {list(response_data[0].keys())}", flush=True)
comments = []
for comment_data in response_data:
comments.append({
'user': comment_data.get('user', {}).get('login', 'unknown'),
'body': comment_data.get('body', ''),
'created_at': comment_data.get('created_at', ''),
'updated_at': comment_data.get('updated_at', ''),
'url': comment_data.get('html_url', '')
})
self.log(f"Fetched {len(comments)} comments for {repo_str} #{issue_number}")
print(f"DEBUG: Successfully parsed {len(comments)} comments", flush=True)
return comments
except requests.exceptions.RequestException as e:
self.log(f"Error fetching comments for {repo_str} #{issue_number}: {e}")
print(f"DEBUG: RequestException occurred: {e}", flush=True)
import traceback
traceback.print_exc()
return []
except Exception as e:
self.log(f"Unexpected error fetching comments: {e}")
print(f"DEBUG: Exception occurred: {e}", flush=True)
import traceback
traceback.print_exc()
return []
def fetch_pr_files(self, repo_str: str, pr_number: int) -> List[Dict[str, Any]]:
"""
Fetch the list of files changed in a pull request
Args:
repo_str: Repository string in format "owner/repo"
pr_number: Pull request number
Returns:
List of file dictionaries with keys: 'filename', 'status', 'additions', 'deletions', 'changes', 'patch'
"""
try:
# Parse repository string
if '/' not in repo_str:
self.log(f"Invalid repository format: {repo_str}")
return []
owner, repo = repo_str.split('/', 1)
# Fetch PR files
url = f"https://api.github.com/repos/{owner}/{repo}/pulls/{pr_number}/files"
print(f"DEBUG: Fetching PR files from URL: {url}", flush=True)
response = requests.get(url, headers=self.headers)
response.raise_for_status()
files_data = response.json()
print(f"DEBUG: Found {len(files_data)} files in PR #{pr_number}", flush=True)
files = []
for file_data in files_data:
files.append({
'filename': file_data.get('filename', ''),
'status': file_data.get('status', ''), # added, removed, modified, renamed
'additions': file_data.get('additions', 0),
'deletions': file_data.get('deletions', 0),
'changes': file_data.get('changes', 0),
'patch': file_data.get('patch', ''), # The actual diff patch
'blob_url': file_data.get('blob_url', ''),
})
self.log(f"Fetched {len(files)} files for PR {repo_str} #{pr_number}")
return files
except requests.exceptions.RequestException as e:
self.log(f"Error fetching PR files for {repo_str} #{pr_number}: {e}")
print(f"DEBUG: RequestException occurred: {e}", flush=True)
import traceback
traceback.print_exc()
return []
except Exception as e:
self.log(f"Unexpected error fetching PR files: {e}")
print(f"DEBUG: Exception occurred: {e}", flush=True)
import traceback
traceback.print_exc()
return []