""" Utility functions and helpers """ import json import os import re import subprocess import threading import datetime from pathlib import Path from typing import Dict, Any, Optional, Tuple, List from urllib.parse import urlparse class Logger: """Simple logger for GUI applications""" def __init__(self, text_widget=None): self.text_widget = text_widget self._lock = threading.Lock() def log(self, message: str) -> None: """Log a message to the text widget and console""" timestamp = __import__('datetime').datetime.now().strftime("%H:%M:%S") formatted_message = f"[{timestamp}] {message}" try: print(formatted_message) except UnicodeEncodeError: # Fallback: replace Unicode emojis with ASCII equivalents safe_message = formatted_message.replace('✅', '[SUCCESS]').replace('❌', '[ERROR]').replace('⚠️', '[WARNING]').replace('📋', '[INFO]').replace('📄', '[FILE]').replace('📍', '[LOCATION]').replace('📝', '[EDIT]') print(safe_message) if self.text_widget: def update_widget(): try: with self._lock: self.text_widget.config(state='normal') self.text_widget.insert('end', formatted_message + '\n') self.text_widget.see('end') self.text_widget.config(state='disabled') self.text_widget.update_idletasks() except: pass # Widget might be destroyed # Schedule update on main thread if hasattr(self.text_widget, 'after'): self.text_widget.after(0, update_widget) else: update_widget() class PRNumberManager: """Manages PR numbers for branch naming""" PR_COUNTER_FILE = '.pr_counter.json' @classmethod def get_pr_counter_file(cls) -> str: """Get the path to the PR counter file""" script_dir = os.path.dirname(os.path.abspath(__file__)) return os.path.join(script_dir, cls.PR_COUNTER_FILE) @classmethod def load_pr_counter(cls) -> Dict[str, int]: """Load the PR counter from file""" counter_file = cls.get_pr_counter_file() if os.path.exists(counter_file): try: with open(counter_file, 'r', encoding='utf-8') as f: return json.load(f) except (json.JSONDecodeError, FileNotFoundError): pass return {} @classmethod def save_pr_counter(cls, counter: Dict[str, int]) -> None: """Save the PR counter to file""" counter_file = cls.get_pr_counter_file() try: with open(counter_file, 'w', encoding='utf-8') as f: json.dump(counter, f, indent=2) except Exception as e: print(f"Warning: Could not save PR counter: {e}") @classmethod def get_next_pr_number(cls, provider_key: str) -> int: """ Get the next PR number for a given provider Args: provider_key: Either the AI provider name ('chatgpt', 'claude') or 'gh_copilot' Returns: Next available PR number for this provider """ try: counter = cls.load_pr_counter() current_number = counter.get(provider_key, 0) next_number = current_number + 1 counter[provider_key] = next_number cls.save_pr_counter(counter) return next_number except Exception as e: print(f"Error managing PR counter: {e}") # Fallback to a timestamp-based number import time return int(time.time()) % 10000 class GitHubInfoExtractor: """Extracts GitHub repository information from URLs""" @staticmethod def extract_github_info(doc_url: str) -> Dict[str, Any]: """Extract GitHub repository information from a document URL""" try: if not doc_url or 'github.com' not in doc_url: return {'error': 'Not a GitHub URL'} parsed = urlparse(doc_url) path_parts = parsed.path.strip('/').split('/') if len(path_parts) < 2: return {'error': 'Invalid GitHub URL format'} owner = path_parts[0] repo = path_parts[1] # Try to extract file path if it's a blob URL file_path = None if len(path_parts) > 3 and path_parts[2] == 'blob': # Skip branch name and get file path if len(path_parts) > 4: file_path = '/'.join(path_parts[4:]) result = { 'owner': owner, 'repo': repo, 'original_content_git_url': doc_url } if file_path: result['file_path'] = file_path # Try to find ms.author from the URL or repo name ms_author = GitHubInfoExtractor._extract_ms_author(owner, repo, doc_url) if ms_author: result['ms_author'] = ms_author return result except Exception as e: return {'error': f'Error parsing GitHub URL: {str(e)}'} @staticmethod def _extract_ms_author(owner: str, repo: str, url: str) -> Optional[str]: """Try to extract ms.author from various sources""" try: # Method 1: Check if owner looks like a Microsoft username if owner.startswith('Microsoft') or 'microsoft' in owner.lower(): # Try to extract from repo name or URL patterns if '-' in repo: parts = repo.split('-') for part in parts: if len(part) > 2 and part.islower(): return part # Method 2: Look for patterns in the URL url_lower = url.lower() # Common patterns for ms.author patterns = [ r'/([a-z][a-z0-9-]+[a-z0-9])/', # username-like patterns r'author[=:]([a-z][a-z0-9-]+)', # author= or author: patterns ] for pattern in patterns: match = re.search(pattern, url_lower) if match: candidate = match.group(1) # Validate it looks like a reasonable username if 3 <= len(candidate) <= 20 and candidate.replace('-', '').isalnum(): return candidate return None except Exception: return None class WorkItemFieldExtractor: """Extracts and processes item fields (placeholder for future implementation)""" @staticmethod def extract_work_item_fields(work_item: Dict[str, Any]) -> Dict[str, Any]: """Extract and process fields from work item (placeholder)""" fields = work_item.get('fields', {}) # Extract basic fields item_id = work_item.get('id', 'Unknown') title = fields.get('System.Title', 'No Title') # Extract custom fields with fallbacks nature_of_request = ( fields.get('Custom.Natureofrequest') or fields.get('Custom.NatureOfRequest') or fields.get('Microsoft.VSTS.Common.DescriptionHtml', '') ) # Clean HTML if present if nature_of_request and '<' in nature_of_request: nature_of_request = WorkItemFieldExtractor._clean_html(nature_of_request) mydoc_url = ( fields.get('Custom.MyDocURL') or fields.get('Custom.DocumentURL') or fields.get('Custom.URL', '') ) text_to_change = ( fields.get('Custom.TextToChange') or fields.get('Custom.CurrentText', '') ) new_text = ( fields.get('Custom.NewText') or fields.get('Custom.ProposedText') or fields.get('Custom.ReplacementText', '') ) # Extract GitHub info from the document URL github_info = GitHubInfoExtractor.extract_github_info(mydoc_url) return { 'id': item_id, 'title': title, 'nature_of_request': nature_of_request, 'mydoc_url': mydoc_url, 'text_to_change': text_to_change, 'new_text': new_text, 'github_info': github_info, 'status': 'Ready', 'source': 'Generic' } @staticmethod def extract_uuf_item_fields(uuf_item: Dict[str, Any]) -> Dict[str, Any]: """Extract and process fields from custom item (placeholder)""" # UUF items have different field structure item_id = uuf_item.get('cr_uufitemid', 'Unknown') title = uuf_item.get('cr_title', 'No Title') nature_of_request = uuf_item.get('cr_description', '') mydoc_url = uuf_item.get('cr_documenturl', '') text_to_change = uuf_item.get('cr_currenttext', '') new_text = uuf_item.get('cr_newtext', '') # Extract GitHub info github_info = GitHubInfoExtractor.extract_github_info(mydoc_url) return { 'id': item_id, 'title': title, 'nature_of_request': nature_of_request, 'mydoc_url': mydoc_url, 'text_to_change': text_to_change, 'new_text': new_text, 'github_info': github_info, 'status': 'Ready', 'source': 'UUF' } @staticmethod def _clean_html(html_text: str) -> str: """Remove HTML tags and decode entities""" import html # Remove HTML tags clean_text = re.sub(r'<[^>]+>', '', html_text) # Decode HTML entities clean_text = html.unescape(clean_text) # Clean up whitespace clean_text = re.sub(r'\s+', ' ', clean_text).strip() return clean_text class ContentBuilders: """Builds content for GitHub issues and PRs""" @staticmethod def build_issue_title(item: Dict[str, Any]) -> str: """Build GitHub issue title""" item_id = item.get('id', '') if item_id: return f"[#{item_id}] {item['title']}" return f"{item['title']}" @staticmethod def build_issue_body(item: Dict[str, Any], github_info: Dict[str, Any]) -> str: """Build GitHub issue body""" body_parts = [] # Header body_parts.append("## Item Details") body_parts.append("") # Make ID a hyperlink if source URL is available if item.get('source_url'): body_parts.append(f"**ID:** [{item['id']}]({item['source_url']})") else: body_parts.append(f"**ID:** {item['id']}") body_parts.append(f"**Title:** {item['title']}") body_parts.append("") # Nature of request if item['nature_of_request']: body_parts.append("**Nature of Request:**") body_parts.append(item['nature_of_request']) body_parts.append("") # Document information if item['mydoc_url']: body_parts.append("**Document URL:**") body_parts.append(item['mydoc_url']) body_parts.append("") # Change details body_parts.append("## Change Details") body_parts.append("") if item['text_to_change']: body_parts.append("**Text to Change:**") body_parts.append("```") body_parts.append(item['text_to_change']) body_parts.append("```") body_parts.append("") if item['new_text']: body_parts.append("**Proposed New Text:**") body_parts.append("```") body_parts.append(item['new_text']) body_parts.append("```") body_parts.append("") # Repository info if github_info.get('owner') and github_info.get('repo'): body_parts.append("## Repository Information") body_parts.append("") body_parts.append(f"**Repository:** {github_info['owner']}/{github_info['repo']}") if github_info.get('ms_author'): body_parts.append(f"**Author:** @{github_info['ms_author']}") body_parts.append("") # Instructions for manual review body_parts.append("## Instructions") body_parts.append("") body_parts.append("This issue requires manual review of the proposed documentation change.") body_parts.append("") body_parts.append("**Next Steps:**") body_parts.append("1. Review the proposed change above") body_parts.append("2. Navigate to the document URL") body_parts.append("3. Locate the text that needs to be changed") body_parts.append("4. Make the appropriate updates") body_parts.append("5. Close this issue when complete") body_parts.append("") body_parts.append("---") body_parts.append("*Created automatically by GitHub Pulse*") return "\n".join(body_parts) @staticmethod def build_pr_title(item: Dict[str, Any]) -> str: """Build GitHub PR title""" item_id = item.get('id', '') if item_id: return f"[#{item_id}] {item['title']}" return f"{item['title']}" @staticmethod def build_pr_body(item: Dict[str, Any], github_info: Dict[str, Any]) -> str: """Build GitHub PR body""" body_parts = [] # Header body_parts.append("## Documentation Update") body_parts.append("") # Make ID a hyperlink if source URL is available if item.get('source_url'): body_parts.append(f"**ID:** [{item['id']}]({item['source_url']})") else: body_parts.append(f"**ID:** {item['id']}") body_parts.append(f"**Title:** {item['title']}") body_parts.append("") # Nature of request if item['nature_of_request']: body_parts.append("**Description:**") body_parts.append(item['nature_of_request']) body_parts.append("") # Change summary body_parts.append("## Changes Made") body_parts.append("") body_parts.append("This PR updates documentation as requested.") body_parts.append("") if item['text_to_change'] and item['new_text']: body_parts.append("**Change Summary:**") body_parts.append("- Updated specific text content as requested") body_parts.append("") body_parts.append("
") body_parts.append("View Change Details") body_parts.append("") body_parts.append("**Original Text:**") body_parts.append("```") body_parts.append(item['text_to_change']) body_parts.append("```") body_parts.append("") body_parts.append("**New Text:**") body_parts.append("```") body_parts.append(item['new_text']) body_parts.append("```") body_parts.append("
") body_parts.append("") # Repository info if github_info.get('ms_author'): body_parts.append(f"**Author:** @{github_info['ms_author']}") body_parts.append("") # Review instructions body_parts.append("## Review Checklist") body_parts.append("") body_parts.append("- [ ] Changes match the requested update") body_parts.append("- [ ] No unintended changes were made") body_parts.append("- [ ] Grammar and formatting are correct") body_parts.append("- [ ] Links and references are working") body_parts.append("") body_parts.append("---") body_parts.append("*Created automatically by GitHub Pulse*") return "\n".join(body_parts) class LocalRepositoryScanner: """Scans local repository path for Git repositories""" @staticmethod def scan_local_repos(local_repo_path: str) -> List[str]: """Scan local path for Git repositories""" if not local_repo_path or not os.path.exists(local_repo_path): return [] repos = [] try: for item in os.listdir(local_repo_path): item_path = os.path.join(local_repo_path, item) if os.path.isdir(item_path): git_path = os.path.join(item_path, '.git') if os.path.exists(git_path): # Get remote origin URL to determine repo name repo_info = LocalRepositoryScanner.get_repo_info(item_path) if repo_info: repos.append(repo_info) else: # Fallback to folder name repos.append(f"local/{item}") except PermissionError: pass # Skip directories we can't access except Exception as e: print(f"Error scanning local repos: {e}") return sorted(repos) @staticmethod def get_repo_info(repo_path: str) -> Optional[str]: """Get repository information from local Git repo""" try: # Get remote origin URL result = subprocess.run( ['git', 'config', '--get', 'remote.origin.url'], cwd=repo_path, capture_output=True, text=True, timeout=10 ) if result.returncode == 0: url = result.stdout.strip() return LocalRepositoryScanner.parse_git_url(url) except Exception: pass return None @staticmethod def parse_git_url(url: str) -> Optional[str]: """Parse Git URL to extract owner/repo format""" try: # Handle GitHub URLs if 'github.com' in url: # Handle both HTTPS and SSH URLs if url.startswith('git@'): # SSH: git@github.com:owner/repo.git parts = url.split(':')[-1].replace('.git', '') return parts else: # HTTPS: https://github.com/owner/repo.git parsed = urlparse(url) path = parsed.path.strip('/').replace('.git', '') return path except: pass return None @staticmethod def clone_repository(repo_url: str, local_path: str, repo_name: str) -> bool: """Clone a repository to local path""" try: target_path = os.path.join(local_path, repo_name.split('/')[-1]) if os.path.exists(target_path): print(f"Repository already exists at {target_path}") return True os.makedirs(local_path, exist_ok=True) result = subprocess.run( ['git', 'clone', repo_url, target_path], capture_output=True, text=True, timeout=300 # 5 minutes timeout ) if result.returncode == 0: print(f"Successfully cloned {repo_url} to {target_path}") return True else: print(f"Failed to clone repository: {result.stderr}") return False except Exception as e: print(f"Error cloning repository: {e}") return False class ConfigurationHelpers: """Configuration and validation utilities""" @staticmethod def validate_ai_provider_setup(config: Dict[str, Any], parent_window=None) -> bool: """Validate AI provider setup and offer to install missing modules Args: config: Configuration dictionary parent_window: Parent tkinter window for dialogs Returns: bool: True if setup is valid or user handled the issue """ ai_provider = config.get('AI_PROVIDER', '').lower() if not ai_provider or ai_provider == 'none': return True # No AI provider selected, nothing to validate try: # Try to import AI manager for validation from .ai_manager import AIManager ai_manager = AIManager() # Check if modules are available available, missing = ai_manager.check_ai_module_availability(ai_provider) if available: return True # All modules available print(f"⚠️ AI Provider '{ai_provider}' selected but missing required packages: {', '.join(missing)}") # Offer to install missing packages success = ai_manager.install_ai_packages(missing, parent_window) if success: # Re-check availability after installation available, still_missing = ai_manager.check_ai_module_availability(ai_provider) if available: print(f"✅ AI Provider '{ai_provider}' is now ready to use") return True else: print(f"⚠️ Some packages may still be missing: {', '.join(still_missing)}") print("Please restart the application after installation completes") return False return False except ImportError: # AI manager not available, skip validation return True @staticmethod def create_default_env_file() -> bool: """Create a default .env file with all settings blank""" try: default_config = """# GitHub Pulse Configuration # Generated automatically - fill in your values # IMPORTANT: Do NOT commit this file to source control. Add it to .gitignore. # GitHub Configuration GITHUB_PAT= GITHUB_REPO= FORKED_REPO= # Application Settings DRY_RUN=false # AI Provider Configuration (for local PR creation with AI assistance) AI_PROVIDER= CLAUDE_API_KEY= OPENAI_API_KEY= GITHUB_TOKEN= LOCAL_REPO_PATH= # Custom AI Instructions (optional) CUSTOM_INSTRUCTIONS= """ with open('.env', 'w', encoding='utf-8') as f: f.write(default_config) print("Created default .env file with blank values") return True except Exception as e: print(f"Error creating default .env file: {e}") return False # Removed EnhancedContentBuilders class - was specific to Azure DevOps # Use ContentBuilders class for generic GitHub automation instead # Compatibility functions for direct function access def get_next_pr_number(provider_key: str) -> int: """Compatibility function for direct access to PR number generation""" return PRNumberManager.get_next_pr_number(provider_key) def validate_ai_provider_setup(config: Dict[str, Any], parent_window=None) -> bool: """Compatibility function for direct access to AI provider validation""" return ConfigurationHelpers.validate_ai_provider_setup(config, parent_window) def create_default_env_file() -> bool: """Compatibility function for direct access to .env file creation""" return ConfigurationHelpers.create_default_env_file()