import threading import os from ignis import widgets, app from gi.repository import GLib, Gtk, Gdk from .ollama_client import OllamaClient from .conversation_manager import ConversationManager from .conversation_archive import ConversationArchive from .ollama_monitor import OllamaAvailabilityMonitor from .streaming_handler import StreamingHandler from .command_processor import CommandProcessor, CommandResult from .reasoning_controller import ReasoningController from .provider_client import ( AIProvider, OllamaProvider, GeminiProvider, OpenRouterProvider, CopilotProvider, ) from .settings_widget import SettingsWidget class ChatWidget(widgets.Box): """Chat interface widget with Ollama integration""" def __init__(self): # Load CSS for TextView styling self._load_css() self._conversation_manager = ConversationManager() self._conversation_archive = ConversationArchive() # Initialize reasoning controller self._reasoning_controller = ReasoningController() # Initialize provider abstraction self._current_provider: AIProvider | None = None self._current_model: str | None = None self._provider_instances: dict[str, AIProvider] = {} self._initialize_provider() # Initialize availability monitor (only for Ollama) ollama_provider = self._get_provider("ollama") if isinstance(ollama_provider, OllamaProvider): self._ollama_client = ollama_provider._client self._ollama_monitor = OllamaAvailabilityMonitor(self._ollama_client) self._ollama_monitor.add_callback(self._on_ollama_availability_changed) self._ollama_monitor.start() else: self._ollama_client = None self._ollama_monitor = None # Initialize command processor self._command_processor = CommandProcessor() self._register_commands() # Header with title and model header_title = widgets.Label( label="AI Sidebar", halign="start", css_classes=["title-2"], ) # Display provider and model status self._model_label = widgets.Label( label="", halign="start", css_classes=["dim-label"], ) self._update_model_label() # Reasoning mode toggle button (using regular button with state tracking) self._reasoning_enabled = self._reasoning_controller.is_enabled() toggle_label = "🧠 Reasoning: ON" if self._reasoning_enabled else "🧠 Reasoning: OFF" self._reasoning_toggle = widgets.Button( label=toggle_label, on_click=lambda x: self._on_reasoning_toggled(), halign="end", hexpand=False, ) # Settings button (gear icon) settings_button = widgets.Button( label="⚙️", on_click=lambda x: self._show_settings(), halign="end", hexpand=False, ) # Header top row with title, settings, and toggle header_top = widgets.Box( spacing=8, hexpand=True, child=[header_title, settings_button, self._reasoning_toggle], ) header_box = widgets.Box( vertical=True, spacing=4, child=[header_top, self._model_label], ) # Message list self._message_list = widgets.Box( vertical=True, spacing=8, hexpand=True, vexpand=True, valign="start", ) # Scrolled window for messages self._scroller = widgets.Scroll( hexpand=True, vexpand=True, min_content_height=300, child=self._message_list, ) # Input text view (multi-line) self._text_view = Gtk.TextView() self._text_buffer = self._text_view.get_buffer() self._text_view.set_wrap_mode(Gtk.WrapMode.WORD_CHAR) self._text_view.set_hexpand(True) self._text_view.set_vexpand(False) self._text_view.set_left_margin(8) self._text_view.set_right_margin(8) self._text_view.set_top_margin(8) self._text_view.set_bottom_margin(8) self._text_view.set_size_request(300, 60) # Set explicit width and height # Set placeholder text self._update_placeholder_text() self._is_placeholder_shown = False self._updating_placeholder = False # Add key event controller for Enter/Shift+Enter handling key_controller = Gtk.EventControllerKey() key_controller.connect("key-pressed", self._on_key_pressed) self._text_view.add_controller(key_controller) # Add focus event controllers for placeholder handling focus_controller = Gtk.EventControllerFocus() focus_controller.connect("enter", self._on_text_view_focus_in) focus_controller.connect("leave", self._on_text_view_focus_out) self._text_view.add_controller(focus_controller) # Connect to buffer changed signal to handle typing self._text_buffer.connect("changed", self._on_buffer_changed) # Scrolled window for text view self._text_scroller = Gtk.ScrolledWindow() self._text_scroller.set_child(self._text_view) self._text_scroller.set_min_content_height(60) self._text_scroller.set_max_content_height(200) self._text_scroller.set_hexpand(True) self._text_scroller.set_vexpand(False) self._text_scroller.set_policy(Gtk.PolicyType.NEVER, Gtk.PolicyType.AUTOMATIC) self._text_scroller.set_has_frame(True) self._text_scroller.set_size_request(300, 60) # Store reference for use in input_box self._text_view_container = self._text_scroller # Send button self._send_button = widgets.Button( label="Send", on_click=lambda x: self._on_submit(), ) # Input box input_box = widgets.Box( spacing=8, hexpand=True, child=[self._text_view_container, self._send_button], ) # Main container super().__init__( vertical=True, spacing=12, hexpand=True, vexpand=True, child=[header_box, self._scroller, input_box], css_classes=["ai-sidebar-content"], ) # Set margins self.set_margin_top(16) self.set_margin_bottom(16) self.set_margin_start(16) self.set_margin_end(16) # Auto-archive old messages on startup (Option 2) self._auto_archive_old_messages() # Load initial messages self._populate_initial_messages() # Initialize placeholder display self._update_placeholder() # Disable input if provider unavailable at startup if not self._current_provider or not self._current_provider.is_available: self._set_input_enabled(False) # Create settings widget self._settings_widget = SettingsWidget( self._reasoning_controller, on_provider_changed=self._on_provider_changed_from_settings, on_back=self._show_chat ) # Create view stack for switching between chat and settings self._view_stack = Gtk.Stack() self._view_stack.set_transition_type(Gtk.StackTransitionType.SLIDE_LEFT_RIGHT) self._view_stack.set_transition_duration(200) # Add chat view chat_container = widgets.Box( vertical=True, spacing=12, hexpand=True, vexpand=True, child=[header_box, self._scroller, input_box], css_classes=["ai-sidebar-content"], ) chat_container.set_margin_top(16) chat_container.set_margin_bottom(16) chat_container.set_margin_start(16) chat_container.set_margin_end(16) self._view_stack.add_named(chat_container, "chat") self._view_stack.add_named(self._settings_widget, "settings") self._view_stack.set_visible_child_name("chat") # Replace main container with stack super().__init__( hexpand=True, vexpand=True, child=[self._view_stack], ) def _auto_archive_old_messages(self, keep_recent: int = 20): """Auto-archive old messages on startup, keeping only recent ones. Args: keep_recent: Number of recent messages to keep in active conversation """ # Only auto-archive if we have more than keep_recent messages if len(self._conversation_manager.messages) <= keep_recent: return # Get the full conversation state before trimming full_state = self._conversation_manager._state # Trim the conversation and get removed messages removed_messages = self._conversation_manager.trim_to_recent(keep_recent) if removed_messages: # Create a state with only the old messages for archiving from .conversation_manager import ConversationState old_state = ConversationState( conversation_id=full_state.conversation_id, created_at=full_state.created_at, updated_at=removed_messages[-1].get("timestamp", full_state.updated_at), messages=removed_messages, ) # Archive old messages without title (avoid blocking) self._conversation_archive.archive_conversation(old_state) def _populate_initial_messages(self): """Load conversation history""" for message in self._conversation_manager.messages: self._append_message(message["role"], message["content"], persist=False) if not self._conversation_manager.messages: self._append_message( "assistant", "Welcome! Ask a question to start a conversation.", persist=True, ) def _append_message(self, role: str, content: str, *, persist: bool = True, reasoning: str | None = None): """Add a message bubble to the chat Args: role: Message role (user/assistant/system) content: Message content persist: Whether to save to conversation history reasoning: Optional reasoning content to display separately """ label_prefix = "You" if role == "user" else "Assistant" # If reasoning content is provided and reasoning mode is enabled, format it specially if reasoning and self._reasoning_controller.is_enabled(): # Create a box to hold reasoning and content message_box = widgets.Box( vertical=True, spacing=4, halign="start", ) # Add reasoning label with special styling reasoning_label = widgets.Label( label=f"💭 Reasoning: {reasoning}", halign="start", xalign=0.0, wrap=True, wrap_mode="word_char", justify="left", css_classes=["reasoning-content"], ) message_box.append(reasoning_label) # Add divider divider = widgets.Box( css_classes=["reasoning-divider"], hexpand=True, ) divider.set_size_request(-1, 1) message_box.append(divider) # Add main content content_label = widgets.Label( label=f"{label_prefix}: {content}", halign="start", xalign=0.0, wrap=True, wrap_mode="word_char", justify="left", ) message_box.append(content_label) self._message_list.append(message_box) else: # Standard message without reasoning message_label = widgets.Label( label=f"{label_prefix}: {content}", halign="start", xalign=0.0, wrap=True, wrap_mode="word_char", justify="left", ) self._message_list.append(message_label) self._scroll_to_bottom() if persist and self._conversation_manager: self._conversation_manager.append_message(role, content) def _append_system_message(self, content: str): """Add a system message (for command results) with distinct styling""" message_label = widgets.Label( label=f"System: {content}", halign="start", xalign=0.0, wrap=True, wrap_mode="word_char", justify="left", css_classes=["dim-label"], # Use dim-label for system messages ) self._message_list.append(message_label) self._scroll_to_bottom() def _clear_message_list(self): """Safely clear all messages from the UI""" # Collect all children first to avoid iterator issues children = [] child = self._message_list.get_first_child() while child is not None: children.append(child) child = child.get_next_sibling() # Now remove them for child in children: self._message_list.remove(child) def _scroll_to_bottom(self): """Scroll to the latest message""" def _scroll(): adjustment = self._scroller.get_vadjustment() if adjustment: adjustment.set_value(adjustment.get_upper() - adjustment.get_page_size()) return False GLib.idle_add(_scroll) def _set_input_enabled(self, enabled: bool): """Enable/disable input controls""" self._text_view.set_sensitive(enabled) self._send_button.set_sensitive(enabled) def _update_placeholder(self): """Update placeholder text visibility based on buffer content""" # Prevent recursion during placeholder updates if self._updating_placeholder: return False self._updating_placeholder = True try: start_iter = self._text_buffer.get_start_iter() end_iter = self._text_buffer.get_end_iter() text = self._text_buffer.get_text(start_iter, end_iter, False) if not text: # Buffer is empty, show placeholder if not self._is_placeholder_shown: # Create placeholder tag if needed if not hasattr(self, '_placeholder_tag'): self._placeholder_tag = self._text_buffer.create_tag( "placeholder", foreground="gray" ) self._text_buffer.set_text(self._placeholder_text) self._text_buffer.apply_tag( self._placeholder_tag, self._text_buffer.get_start_iter(), self._text_buffer.get_end_iter() ) self._is_placeholder_shown = True else: # Buffer has content if self._is_placeholder_shown and text == self._placeholder_text: # User started typing, remove placeholder self._text_buffer.set_text("") self._is_placeholder_shown = False finally: self._updating_placeholder = False return False # For GLib.idle_add def _on_buffer_changed(self, buffer): """Handle text buffer changes to manage placeholder""" # Prevent recursion during placeholder updates if self._updating_placeholder: return start_iter = buffer.get_start_iter() end_iter = buffer.get_end_iter() text = buffer.get_text(start_iter, end_iter, False) # If placeholder is shown and user typed something, clear it if self._is_placeholder_shown and text != self._placeholder_text: self._updating_placeholder = True # User started typing, clear everything and keep only what they typed # Extract what they typed (everything after placeholder) if text.startswith(self._placeholder_text): new_text = text[len(self._placeholder_text):] buffer.set_text(new_text) self._is_placeholder_shown = False self._updating_placeholder = False def _on_text_view_focus_in(self, controller): """Handle focus entering the text view""" # Clear placeholder when user focuses if self._is_placeholder_shown: self._updating_placeholder = True self._text_buffer.set_text("") self._is_placeholder_shown = False self._updating_placeholder = False def _on_text_view_focus_out(self, controller): """Handle focus leaving the text view""" # Show placeholder if buffer is empty start_iter = self._text_buffer.get_start_iter() end_iter = self._text_buffer.get_end_iter() text = self._text_buffer.get_text(start_iter, end_iter, False) if not text: self._update_placeholder() def _on_key_pressed(self, controller, keyval, keycode, state): """Handle key press events for Enter/Shift+Enter""" # Check if Enter key was pressed if keyval == Gdk.KEY_Return or keyval == Gdk.KEY_KP_Enter: # Check if Shift is held if state & Gdk.ModifierType.SHIFT_MASK: # Shift+Enter: allow newline (default behavior) return False else: # Enter alone: submit message self._on_submit() return True # Prevent default newline insertion return False def _on_submit(self): """Handle message submission""" # Get text from TextView buffer start_iter = self._text_buffer.get_start_iter() end_iter = self._text_buffer.get_end_iter() text = self._text_buffer.get_text(start_iter, end_iter, False).strip() # Ignore if empty or just placeholder if not text or text == self._placeholder_text: return # Clear the text buffer self._text_buffer.set_text("") # Defer placeholder update to avoid GTK warnings during user action GLib.idle_add(self._update_placeholder) # Check if this is a command if self._command_processor.is_command(text): # Execute command result = self._command_processor.execute(text) # Display command result as system message self._append_system_message(result.message) return # Check provider availability before processing regular messages if not self._current_provider or not self._current_provider.is_available: provider_name = self._current_provider.name if self._current_provider else "Provider" if provider_name == "ollama": error_msg = "Ollama is not running. Please start Ollama with: ollama serve" else: error_msg = f"{provider_name.capitalize()} is not configured. Please check settings." self._append_message( "assistant", error_msg, persist=False, ) return self._append_message("user", text, persist=True) self._request_response() def _request_response(self): """Request AI response in background thread with streaming""" # Double-check availability before making request if not self._current_provider or not self._current_provider.is_available: provider_name = self._current_provider.name if self._current_provider else "Provider" if provider_name == "ollama": error_msg = "Ollama is not running. Please start Ollama with: ollama serve" else: error_msg = f"{provider_name.capitalize()} is not configured. Please check settings." self._append_message( "assistant", error_msg, persist=False, ) return model = self._current_model or self._current_provider.default_model if not model: provider_name = self._current_provider.name if provider_name == "ollama": error_msg = "No Ollama models are available. Install a model with: ollama pull llama2" else: error_msg = f"No {provider_name.capitalize()} models are available. Check settings." self._append_message( "assistant", error_msg, persist=True, ) return history = self._conversation_manager.chat_messages self._set_input_enabled(False) # Create message container for streaming message_container = widgets.Box( vertical=True, spacing=8, halign="start", ) # Create thinking box (initially hidden) thinking_box = widgets.Box( vertical=True, spacing=0, css_classes=["thinking-box"], visible=False, ) # Create collapsible header button thinking_header_box = widgets.Box( spacing=8, halign="start", ) thinking_icon = widgets.Label( label="▼", css_classes=["thinking-icon"], ) thinking_header_label = widgets.Label( label="💭 Thinking...", halign="start", css_classes=["thinking-header"], ) thinking_header_box.append(thinking_icon) thinking_header_box.append(thinking_header_label) # Make header clickable thinking_header_button = widgets.Button( child=thinking_header_box, css_classes=["thinking-header-button"], ) # Create revealer for collapsible content thinking_content = widgets.Label( label="", halign="start", xalign=0.0, wrap=True, wrap_mode="word_char", justify="left", css_classes=["thinking-content"], ) thinking_revealer = widgets.Revealer( child=thinking_content, transition_type="slide_down", transition_duration=200, reveal_child=True, ) # Connect toggle functionality def toggle_thinking(): is_revealed = thinking_revealer.reveal_child thinking_revealer.reveal_child = not is_revealed thinking_icon.label = "▼" if not is_revealed else "▲" thinking_header_button.on_click = lambda x: toggle_thinking() thinking_box.append(thinking_header_button) thinking_box.append(thinking_revealer) # Create main response label response_label = widgets.Label( label="Assistant: ...", halign="start", xalign=0.0, wrap=True, wrap_mode="word_char", justify="left", ) message_container.append(thinking_box) message_container.append(response_label) self._message_list.append(message_container) self._scroll_to_bottom() # Create streaming handler with both widgets streaming_handler = StreamingHandler( response_label, self._scroller, thinking_widget=thinking_content, thinking_box=thinking_box ) def _worker(messages, handler): try: handler.start_stream() # Get model-specific options (only for Ollama) options = self._reasoning_controller.get_model_options() if self._current_provider.name == "ollama" else None # Stream response tokens for chunk in self._current_provider.stream_chat( model=model, messages=list(messages), options=options ): # Check for errors if chunk.get("error"): content = chunk.get("content", "An error occurred during streaming") GLib.idle_add(self._handle_stream_error, content, priority=GLib.PRIORITY_DEFAULT) return # Extract tokens from chunk message = chunk.get("message", {}) content_token = message.get("content", "") thinking_token = message.get("thinking", "") if content_token: handler.append_token(content_token) if thinking_token: handler.append_thinking_token(thinking_token) # Check if streaming is complete if chunk.get("done", False): break # Finalize stream and get complete content thinking_content, main_content = handler.finish_stream() # Persist the complete message GLib.idle_add( self._handle_stream_complete, thinking_content, main_content, priority=GLib.PRIORITY_DEFAULT ) except Exception as exc: error_msg = f"Streaming error: {exc}" GLib.idle_add(self._handle_stream_error, error_msg, priority=GLib.PRIORITY_DEFAULT) thread = threading.Thread(target=_worker, args=(history, streaming_handler), daemon=True) thread.start() def _handle_response(self, response): """Handle AI response (legacy non-streaming method)""" self._set_input_enabled(True) if not response: self._append_message( "assistant", "The model returned an empty response.", persist=True, ) return False role = response.get("role", "assistant") content = response.get("content") or "" if not content: content = "[No content received from Ollama]" self._append_message(role, content, persist=True) return False def _handle_stream_token(self, token: str) -> bool: """Handle individual stream token with thread-safe UI update. This method is called via GLib.idle_add for thread-safe UI updates. Args: token: The token string to process Returns: False to indicate this is a one-time callback """ # Token handling is now managed by StreamingHandler # This method exists for potential future direct token handling return False def _handle_stream_complete(self, thinking_content: str, main_content: str) -> bool: """Handle stream completion and persist message. Args: thinking_content: The thinking/reasoning content main_content: The main response content Returns: False to indicate this is a one-time callback """ # The message is already displayed with thinking by the StreamingHandler # We just need to persist it to conversation history # Combine thinking and content for persistence full_content = main_content if thinking_content: full_content = f"{thinking_content}\n{main_content}" if full_content and self._conversation_manager: self._conversation_manager.append_message("assistant", full_content) # Re-enable input self._set_input_enabled(True) return False def _handle_stream_error(self, error_msg: str) -> bool: """Handle streaming errors. Args: error_msg: The error message to display Returns: False to indicate this is a one-time callback """ # Display error message (don't persist errors) self._append_message("assistant", error_msg, persist=False) # Re-enable input self._set_input_enabled(True) return False def focus_input(self): """Focus the input text view""" self._text_view.grab_focus() def _initialize_provider(self): """Initialize the current provider based on preferences.""" provider_id = self._reasoning_controller.get_provider() self._current_provider = self._get_provider(provider_id) if self._current_provider: if provider_id == "ollama": # Use reasoning controller model for Ollama self._current_model = self._reasoning_controller.get_model_name() else: self._current_model = self._current_provider.default_model else: self._current_model = None def _get_provider(self, provider_id: str) -> AIProvider | None: """Get or create provider instance.""" if provider_id in self._provider_instances: return self._provider_instances[provider_id] if provider_id == "ollama": provider = OllamaProvider() elif provider_id == "gemini": api_key = self._reasoning_controller.get_api_key("gemini") provider = GeminiProvider(api_key=api_key) if api_key else None elif provider_id == "openrouter": api_key = self._reasoning_controller.get_api_key("openrouter") provider = OpenRouterProvider(api_key=api_key) if api_key else None elif provider_id == "copilot": token = self._reasoning_controller.get_copilot_token() provider = CopilotProvider(oauth_token=token) if token else None else: return None if provider: self._provider_instances[provider_id] = provider return provider def _update_model_label(self): """Update the model label with current provider and model.""" if not hasattr(self, '_model_label'): return if not self._current_provider: self._model_label.label = "Provider: Not configured" return provider_name = self._current_provider.name.capitalize() if self._current_provider.is_available: model_name = self._current_model or "No model selected" self._model_label.label = f"{provider_name}: {model_name}" else: if provider_name == "Ollama": self._model_label.label = f"{provider_name}: Not running" else: self._model_label.label = f"{provider_name}: Not configured" def _update_placeholder_text(self): """Update placeholder text based on provider availability.""" if not hasattr(self, '_text_view'): return if not self._current_provider or not self._current_provider.is_available: provider_name = self._current_provider.name if self._current_provider else "Provider" if provider_name == "ollama": self._placeholder_text = "Ollama not running - start with: ollama serve" else: self._placeholder_text = f"{provider_name.capitalize()} not configured. Check settings." else: self._placeholder_text = "Ask a question…" def _show_settings(self): """Show settings view.""" if hasattr(self, '_view_stack'): self._view_stack.set_visible_child_name("settings") def _show_chat(self): """Show chat view.""" if hasattr(self, '_view_stack'): self._view_stack.set_visible_child_name("chat") def _on_provider_changed_from_settings(self, provider_id: str): """Handle provider change from settings widget.""" self._current_provider = self._get_provider(provider_id) if self._current_provider: if provider_id == "ollama": self._current_model = self._reasoning_controller.get_model_name() else: self._current_model = self._current_provider.default_model else: self._current_model = None self._update_model_label() self._update_placeholder_text() self._update_placeholder() if self._current_provider and self._current_provider.is_available: self._set_input_enabled(True) else: self._set_input_enabled(False) def _on_ollama_availability_changed(self, is_available: bool): """Handle Ollama availability state changes""" # Only handle if Ollama is the current provider if self._reasoning_controller.get_provider() != "ollama": return if is_available: # Ollama became available - use model from reasoning controller self._current_model = self._reasoning_controller.get_model_name() self._update_model_label() self._update_placeholder_text() self._update_placeholder() self._set_input_enabled(True) # Show notification self._append_message( "assistant", "Ollama connection restored! You can now chat.", persist=False, ) else: # Ollama became unavailable self._update_model_label() self._update_placeholder_text() self._update_placeholder() self._set_input_enabled(False) def _on_reasoning_toggled(self): """Handle reasoning mode toggle button state changes""" # Only work for Ollama provider if self._reasoning_controller.get_provider() != "ollama": self._append_system_message("Reasoning mode is only available for Ollama provider.") return # Toggle the reasoning mode new_state = self._reasoning_controller.toggle() self._reasoning_enabled = new_state # Switch to the appropriate model new_model = self._reasoning_controller.get_model_name() self._current_model = new_model # Update button label toggle_label = "🧠 Reasoning: ON" if new_state else "🧠 Reasoning: OFF" self._reasoning_toggle.label = toggle_label # Update model label in header self._update_model_label() # Show feedback message status = "enabled" if new_state else "disabled" model_name = "Qwen3-4B-Thinking" if new_state else "Qwen3-4B-Instruct" self._append_system_message(f"Reasoning mode {status} - switched to {model_name}") def _load_css(self): """Load CSS styling for TextView""" css_provider = Gtk.CssProvider() css_path = os.path.join(os.path.dirname(__file__), "style.css") if os.path.exists(css_path): css_provider.load_from_path(css_path) Gtk.StyleContext.add_provider_for_display( Gdk.Display.get_default(), css_provider, Gtk.STYLE_PROVIDER_PRIORITY_APPLICATION ) def _register_commands(self): """Register all slash commands with the command processor""" self._command_processor.register_command("/new", self._cmd_new_conversation) self._command_processor.register_command("/clear", self._cmd_new_conversation) self._command_processor.register_command("/models", self._cmd_list_models) self._command_processor.register_command("/model", self._cmd_switch_model) self._command_processor.register_command("/list", self._cmd_list_conversations) self._command_processor.register_command("/resume", self._cmd_resume_conversation) def _generate_conversation_title(self, messages: list) -> str: """Generate a concise title for a conversation using AI. Args: messages: List of conversation messages Returns: Generated title or empty string if generation fails """ if not messages or not self._current_provider or not self._current_provider.is_available: return "" # Extract first few user messages for context user_messages = [msg for msg in messages if msg.get("role") == "user"] if not user_messages: return "" # Use first 2-3 user messages for context context_messages = user_messages[:3] context_text = " ".join([msg.get("content", "")[:100] for msg in context_messages]) # Create a prompt for title generation title_prompt = [ { "role": "system", "content": "You are a helpful assistant that generates concise, descriptive titles for conversations. Generate a title that is 3-6 words maximum. Respond with ONLY the title, no quotes or extra text." }, { "role": "user", "content": f"Generate a short title (3-6 words) for a conversation that starts with: {context_text}" } ] try: model = self._current_model or self._current_provider.default_model if not model: return "" # Use non-streaming chat for title generation response = self._current_provider.chat(model=model, messages=title_prompt) if response and response.get("content"): title = response["content"].strip() # Clean up the title (remove quotes, limit length) title = title.strip('"\'').strip() # Limit to 50 characters if len(title) > 50: title = title[:47] + "..." return title except Exception: # If title generation fails, return empty string pass return "" def _cmd_new_conversation(self, args: str) -> CommandResult: """ Start a new conversation, saving the current one. Args: args: Command arguments (unused for /new and /clear) Returns: CommandResult with success status and confirmation message """ # Archive current conversation if it has messages archive_id = None if self._conversation_manager.messages: current_state = self._conversation_manager._state # Archive without title for now (avoid blocking main thread) # Title generation will happen in background via auto-archive archive_id = self._conversation_archive.archive_conversation(current_state) # Clear the conversation manager (Option 1: clear default.json) self._conversation_manager.clear_messages() # Clear the message list UI safely self._clear_message_list() # Add welcome message to new conversation self._append_message( "assistant", "New conversation started. Previous conversation archived.", persist=True, ) if archive_id: return CommandResult( success=True, message=f"Started new conversation. Previous conversation archived as '{archive_id}'" ) else: return CommandResult( success=True, message="Started new conversation (no previous messages to archive)" ) def _cmd_list_models(self, args: str) -> CommandResult: """ List available Ollama models. Args: args: Command arguments (unused) Returns: CommandResult with model list """ if not self._current_provider or not self._current_provider.is_available: provider_name = self._current_provider.name if self._current_provider else "Provider" return CommandResult( success=False, message=f"{provider_name.capitalize()} is not available. Check settings." ) models = self._current_provider.list_models(force_refresh=True) if not models: return CommandResult( success=True, message="No models available. Install a model with: ollama pull llama2" ) # Format model list with current model highlighted model_lines = [] for model in models: if model == self._current_model: model_lines.append(f"• {model} (current)") else: model_lines.append(f"• {model}") message = "Available models:\n" + "\n".join(model_lines) return CommandResult( success=True, message=message, data={"models": models} ) def _cmd_switch_model(self, args: str) -> CommandResult: """ Switch to a different Ollama model. Args: args: Model name to switch to Returns: CommandResult with success status """ if not self._current_provider or not self._current_provider.is_available: provider_name = self._current_provider.name if self._current_provider else "Provider" return CommandResult( success=False, message=f"{provider_name.capitalize()} is not available. Check settings." ) model_name = args.strip() if not model_name: return CommandResult( success=False, message="Usage: /model \nExample: /model llama2" ) # Validate model exists available_models = self._current_provider.list_models(force_refresh=True) if model_name not in available_models: return CommandResult( success=False, message=f"Model '{model_name}' not found. Use /models to see available models." ) # Switch model old_model = self._current_model self._current_model = model_name # Update model label in header self._model_label.label = f"Model: {model_name}" return CommandResult( success=True, message=f"Switched from '{old_model}' to '{model_name}'" ) def _cmd_list_conversations(self, args: str) -> CommandResult: """ List all archived conversations with metadata. Args: args: Command arguments (unused) Returns: CommandResult with formatted conversation list """ conversations = self._conversation_archive.list_conversations() if not conversations: return CommandResult( success=True, message="No archived conversations found. Use /new to archive the current conversation." ) # Format conversation list lines = ["Archived conversations:"] for conv in conversations: # Format timestamp for display try: from datetime import datetime dt = datetime.fromisoformat(conv.updated_at.replace('Z', '+00:00')) time_str = dt.strftime("%Y-%m-%d %H:%M") except: time_str = conv.updated_at[:16] # Fallback to raw string # Display title prominently, with archive_id as reference lines.append(f"• {conv.title}") lines.append(f" ID: {conv.archive_id}") lines.append(f" Updated: {time_str} | Messages: {conv.message_count}") lines.append("\nUse /resume to load a conversation") return CommandResult( success=True, message="\n".join(lines), data={"conversations": [c.archive_id for c in conversations]} ) def _cmd_resume_conversation(self, args: str) -> CommandResult: """ Resume an archived conversation by loading it. Args: args: Archive ID to resume Returns: CommandResult with success status """ archive_id = args.strip() if not archive_id: return CommandResult( success=False, message="Usage: /resume \nUse /list to see available conversations" ) # Load the conversation conversation_state = self._conversation_archive.load_conversation(archive_id) if conversation_state is None: return CommandResult( success=False, message=f"Conversation '{archive_id}' not found or could not be loaded.\nUse /list to see available conversations." ) # Archive current conversation before switching if self._conversation_manager.messages: current_state = self._conversation_manager._state # Archive without title (avoid blocking) archive_id_saved = self._conversation_archive.archive_conversation(current_state) # Clear the message list UI safely self._clear_message_list() # Create new conversation manager with loaded state self._conversation_manager = ConversationManager() self._conversation_manager.replace_messages(conversation_state.messages) # Repopulate message list from loaded conversation for message in conversation_state.messages: self._append_message(message["role"], message["content"], persist=False) return CommandResult( success=True, message=f"Resumed conversation '{archive_id}' with {len(conversation_state.messages)} messages" )