- Added support for multiple AI providers (Ollama, Gemini, OpenRouter, Copilot) with provider abstraction layer - Created settings view with provider configuration and API key management - Updated UI to show current provider status and handle provider-specific availability - Modified reasoning mode to work exclusively with Ollama provider - Added provider switching functionality with persistent settings - Updated error messages and placeholders to be
1248 lines
46 KiB
Python
1248 lines
46 KiB
Python
import threading
|
|
import os
|
|
from ignis import widgets, app
|
|
from gi.repository import GLib, Gtk, Gdk
|
|
from .ollama_client import OllamaClient
|
|
from .conversation_manager import ConversationManager
|
|
from .conversation_archive import ConversationArchive
|
|
from .ollama_monitor import OllamaAvailabilityMonitor
|
|
from .streaming_handler import StreamingHandler
|
|
from .command_processor import CommandProcessor, CommandResult
|
|
from .reasoning_controller import ReasoningController
|
|
from .provider_client import (
|
|
AIProvider,
|
|
OllamaProvider,
|
|
GeminiProvider,
|
|
OpenRouterProvider,
|
|
CopilotProvider,
|
|
)
|
|
from .settings_widget import SettingsWidget
|
|
|
|
|
|
class ChatWidget(widgets.Box):
|
|
"""Chat interface widget with Ollama integration"""
|
|
|
|
def __init__(self):
|
|
# Load CSS for TextView styling
|
|
self._load_css()
|
|
self._conversation_manager = ConversationManager()
|
|
self._conversation_archive = ConversationArchive()
|
|
|
|
# Initialize reasoning controller
|
|
self._reasoning_controller = ReasoningController()
|
|
|
|
# Initialize provider abstraction
|
|
self._current_provider: AIProvider | None = None
|
|
self._current_model: str | None = None
|
|
self._provider_instances: dict[str, AIProvider] = {}
|
|
self._initialize_provider()
|
|
|
|
# Initialize availability monitor (only for Ollama)
|
|
ollama_provider = self._get_provider("ollama")
|
|
if isinstance(ollama_provider, OllamaProvider):
|
|
self._ollama_client = ollama_provider._client
|
|
self._ollama_monitor = OllamaAvailabilityMonitor(self._ollama_client)
|
|
self._ollama_monitor.add_callback(self._on_ollama_availability_changed)
|
|
self._ollama_monitor.start()
|
|
else:
|
|
self._ollama_client = None
|
|
self._ollama_monitor = None
|
|
|
|
# Initialize command processor
|
|
self._command_processor = CommandProcessor()
|
|
self._register_commands()
|
|
|
|
# Header with title and model
|
|
header_title = widgets.Label(
|
|
label="AI Sidebar",
|
|
halign="start",
|
|
css_classes=["title-2"],
|
|
)
|
|
|
|
# Display provider and model status
|
|
self._model_label = widgets.Label(
|
|
label="",
|
|
halign="start",
|
|
css_classes=["dim-label"],
|
|
)
|
|
self._update_model_label()
|
|
|
|
# Reasoning mode toggle button (using regular button with state tracking)
|
|
self._reasoning_enabled = self._reasoning_controller.is_enabled()
|
|
toggle_label = "🧠 Reasoning: ON" if self._reasoning_enabled else "🧠 Reasoning: OFF"
|
|
self._reasoning_toggle = widgets.Button(
|
|
label=toggle_label,
|
|
on_click=lambda x: self._on_reasoning_toggled(),
|
|
halign="end",
|
|
hexpand=False,
|
|
)
|
|
|
|
# Settings button (gear icon)
|
|
settings_button = widgets.Button(
|
|
label="⚙️",
|
|
on_click=lambda x: self._show_settings(),
|
|
halign="end",
|
|
hexpand=False,
|
|
)
|
|
|
|
# Header top row with title, settings, and toggle
|
|
header_top = widgets.Box(
|
|
spacing=8,
|
|
hexpand=True,
|
|
child=[header_title, settings_button, self._reasoning_toggle],
|
|
)
|
|
|
|
header_box = widgets.Box(
|
|
vertical=True,
|
|
spacing=4,
|
|
child=[header_top, self._model_label],
|
|
)
|
|
|
|
# Message list
|
|
self._message_list = widgets.Box(
|
|
vertical=True,
|
|
spacing=8,
|
|
hexpand=True,
|
|
vexpand=True,
|
|
valign="start",
|
|
)
|
|
|
|
# Scrolled window for messages
|
|
self._scroller = widgets.Scroll(
|
|
hexpand=True,
|
|
vexpand=True,
|
|
min_content_height=300,
|
|
child=self._message_list,
|
|
)
|
|
|
|
# Input text view (multi-line)
|
|
self._text_view = Gtk.TextView()
|
|
self._text_buffer = self._text_view.get_buffer()
|
|
self._text_view.set_wrap_mode(Gtk.WrapMode.WORD_CHAR)
|
|
self._text_view.set_hexpand(True)
|
|
self._text_view.set_vexpand(False)
|
|
self._text_view.set_left_margin(8)
|
|
self._text_view.set_right_margin(8)
|
|
self._text_view.set_top_margin(8)
|
|
self._text_view.set_bottom_margin(8)
|
|
self._text_view.set_size_request(300, 60) # Set explicit width and height
|
|
|
|
# Set placeholder text
|
|
self._update_placeholder_text()
|
|
self._is_placeholder_shown = False
|
|
self._updating_placeholder = False
|
|
|
|
# Add key event controller for Enter/Shift+Enter handling
|
|
key_controller = Gtk.EventControllerKey()
|
|
key_controller.connect("key-pressed", self._on_key_pressed)
|
|
self._text_view.add_controller(key_controller)
|
|
|
|
# Add focus event controllers for placeholder handling
|
|
focus_controller = Gtk.EventControllerFocus()
|
|
focus_controller.connect("enter", self._on_text_view_focus_in)
|
|
focus_controller.connect("leave", self._on_text_view_focus_out)
|
|
self._text_view.add_controller(focus_controller)
|
|
|
|
# Connect to buffer changed signal to handle typing
|
|
self._text_buffer.connect("changed", self._on_buffer_changed)
|
|
|
|
# Scrolled window for text view
|
|
self._text_scroller = Gtk.ScrolledWindow()
|
|
self._text_scroller.set_child(self._text_view)
|
|
self._text_scroller.set_min_content_height(60)
|
|
self._text_scroller.set_max_content_height(200)
|
|
self._text_scroller.set_hexpand(True)
|
|
self._text_scroller.set_vexpand(False)
|
|
self._text_scroller.set_policy(Gtk.PolicyType.NEVER, Gtk.PolicyType.AUTOMATIC)
|
|
self._text_scroller.set_has_frame(True)
|
|
self._text_scroller.set_size_request(300, 60)
|
|
|
|
# Store reference for use in input_box
|
|
self._text_view_container = self._text_scroller
|
|
|
|
# Send button
|
|
self._send_button = widgets.Button(
|
|
label="Send",
|
|
on_click=lambda x: self._on_submit(),
|
|
)
|
|
|
|
# Input box
|
|
input_box = widgets.Box(
|
|
spacing=8,
|
|
hexpand=True,
|
|
child=[self._text_view_container, self._send_button],
|
|
)
|
|
|
|
# Main container
|
|
super().__init__(
|
|
vertical=True,
|
|
spacing=12,
|
|
hexpand=True,
|
|
vexpand=True,
|
|
child=[header_box, self._scroller, input_box],
|
|
css_classes=["ai-sidebar-content"],
|
|
)
|
|
|
|
# Set margins
|
|
self.set_margin_top(16)
|
|
self.set_margin_bottom(16)
|
|
self.set_margin_start(16)
|
|
self.set_margin_end(16)
|
|
|
|
# Auto-archive old messages on startup (Option 2)
|
|
self._auto_archive_old_messages()
|
|
|
|
# Load initial messages
|
|
self._populate_initial_messages()
|
|
|
|
# Initialize placeholder display
|
|
self._update_placeholder()
|
|
|
|
# Disable input if provider unavailable at startup
|
|
if not self._current_provider or not self._current_provider.is_available:
|
|
self._set_input_enabled(False)
|
|
|
|
# Create settings widget
|
|
self._settings_widget = SettingsWidget(
|
|
self._reasoning_controller,
|
|
on_provider_changed=self._on_provider_changed_from_settings,
|
|
on_back=self._show_chat
|
|
)
|
|
|
|
# Create view stack for switching between chat and settings
|
|
self._view_stack = Gtk.Stack()
|
|
self._view_stack.set_transition_type(Gtk.StackTransitionType.SLIDE_LEFT_RIGHT)
|
|
self._view_stack.set_transition_duration(200)
|
|
|
|
# Add chat view
|
|
chat_container = widgets.Box(
|
|
vertical=True,
|
|
spacing=12,
|
|
hexpand=True,
|
|
vexpand=True,
|
|
child=[header_box, self._scroller, input_box],
|
|
css_classes=["ai-sidebar-content"],
|
|
)
|
|
chat_container.set_margin_top(16)
|
|
chat_container.set_margin_bottom(16)
|
|
chat_container.set_margin_start(16)
|
|
chat_container.set_margin_end(16)
|
|
|
|
self._view_stack.add_named(chat_container, "chat")
|
|
self._view_stack.add_named(self._settings_widget, "settings")
|
|
self._view_stack.set_visible_child_name("chat")
|
|
|
|
# Replace main container with stack
|
|
super().__init__(
|
|
hexpand=True,
|
|
vexpand=True,
|
|
child=[self._view_stack],
|
|
)
|
|
|
|
def _auto_archive_old_messages(self, keep_recent: int = 20):
|
|
"""Auto-archive old messages on startup, keeping only recent ones.
|
|
|
|
Args:
|
|
keep_recent: Number of recent messages to keep in active conversation
|
|
"""
|
|
# Only auto-archive if we have more than keep_recent messages
|
|
if len(self._conversation_manager.messages) <= keep_recent:
|
|
return
|
|
|
|
# Get the full conversation state before trimming
|
|
full_state = self._conversation_manager._state
|
|
|
|
# Trim the conversation and get removed messages
|
|
removed_messages = self._conversation_manager.trim_to_recent(keep_recent)
|
|
|
|
if removed_messages:
|
|
# Create a state with only the old messages for archiving
|
|
from .conversation_manager import ConversationState
|
|
old_state = ConversationState(
|
|
conversation_id=full_state.conversation_id,
|
|
created_at=full_state.created_at,
|
|
updated_at=removed_messages[-1].get("timestamp", full_state.updated_at),
|
|
messages=removed_messages,
|
|
)
|
|
|
|
# Archive old messages without title (avoid blocking)
|
|
self._conversation_archive.archive_conversation(old_state)
|
|
|
|
def _populate_initial_messages(self):
|
|
"""Load conversation history"""
|
|
for message in self._conversation_manager.messages:
|
|
self._append_message(message["role"], message["content"], persist=False)
|
|
|
|
if not self._conversation_manager.messages:
|
|
self._append_message(
|
|
"assistant",
|
|
"Welcome! Ask a question to start a conversation.",
|
|
persist=True,
|
|
)
|
|
|
|
def _append_message(self, role: str, content: str, *, persist: bool = True, reasoning: str | None = None):
|
|
"""Add a message bubble to the chat
|
|
|
|
Args:
|
|
role: Message role (user/assistant/system)
|
|
content: Message content
|
|
persist: Whether to save to conversation history
|
|
reasoning: Optional reasoning content to display separately
|
|
"""
|
|
label_prefix = "You" if role == "user" else "Assistant"
|
|
|
|
# If reasoning content is provided and reasoning mode is enabled, format it specially
|
|
if reasoning and self._reasoning_controller.is_enabled():
|
|
# Create a box to hold reasoning and content
|
|
message_box = widgets.Box(
|
|
vertical=True,
|
|
spacing=4,
|
|
halign="start",
|
|
)
|
|
|
|
# Add reasoning label with special styling
|
|
reasoning_label = widgets.Label(
|
|
label=f"💭 Reasoning: {reasoning}",
|
|
halign="start",
|
|
xalign=0.0,
|
|
wrap=True,
|
|
wrap_mode="word_char",
|
|
justify="left",
|
|
css_classes=["reasoning-content"],
|
|
)
|
|
message_box.append(reasoning_label)
|
|
|
|
# Add divider
|
|
divider = widgets.Box(
|
|
css_classes=["reasoning-divider"],
|
|
hexpand=True,
|
|
)
|
|
divider.set_size_request(-1, 1)
|
|
message_box.append(divider)
|
|
|
|
# Add main content
|
|
content_label = widgets.Label(
|
|
label=f"{label_prefix}: {content}",
|
|
halign="start",
|
|
xalign=0.0,
|
|
wrap=True,
|
|
wrap_mode="word_char",
|
|
justify="left",
|
|
)
|
|
message_box.append(content_label)
|
|
|
|
self._message_list.append(message_box)
|
|
else:
|
|
# Standard message without reasoning
|
|
message_label = widgets.Label(
|
|
label=f"{label_prefix}: {content}",
|
|
halign="start",
|
|
xalign=0.0,
|
|
wrap=True,
|
|
wrap_mode="word_char",
|
|
justify="left",
|
|
)
|
|
self._message_list.append(message_label)
|
|
|
|
self._scroll_to_bottom()
|
|
|
|
if persist and self._conversation_manager:
|
|
self._conversation_manager.append_message(role, content)
|
|
|
|
def _append_system_message(self, content: str):
|
|
"""Add a system message (for command results) with distinct styling"""
|
|
message_label = widgets.Label(
|
|
label=f"System: {content}",
|
|
halign="start",
|
|
xalign=0.0,
|
|
wrap=True,
|
|
wrap_mode="word_char",
|
|
justify="left",
|
|
css_classes=["dim-label"], # Use dim-label for system messages
|
|
)
|
|
|
|
self._message_list.append(message_label)
|
|
self._scroll_to_bottom()
|
|
|
|
def _clear_message_list(self):
|
|
"""Safely clear all messages from the UI"""
|
|
# Collect all children first to avoid iterator issues
|
|
children = []
|
|
child = self._message_list.get_first_child()
|
|
while child is not None:
|
|
children.append(child)
|
|
child = child.get_next_sibling()
|
|
|
|
# Now remove them
|
|
for child in children:
|
|
self._message_list.remove(child)
|
|
|
|
def _scroll_to_bottom(self):
|
|
"""Scroll to the latest message"""
|
|
def _scroll():
|
|
adjustment = self._scroller.get_vadjustment()
|
|
if adjustment:
|
|
adjustment.set_value(adjustment.get_upper() - adjustment.get_page_size())
|
|
return False
|
|
|
|
GLib.idle_add(_scroll)
|
|
|
|
def _set_input_enabled(self, enabled: bool):
|
|
"""Enable/disable input controls"""
|
|
self._text_view.set_sensitive(enabled)
|
|
self._send_button.set_sensitive(enabled)
|
|
|
|
def _update_placeholder(self):
|
|
"""Update placeholder text visibility based on buffer content"""
|
|
# Prevent recursion during placeholder updates
|
|
if self._updating_placeholder:
|
|
return False
|
|
|
|
self._updating_placeholder = True
|
|
|
|
try:
|
|
start_iter = self._text_buffer.get_start_iter()
|
|
end_iter = self._text_buffer.get_end_iter()
|
|
text = self._text_buffer.get_text(start_iter, end_iter, False)
|
|
|
|
if not text:
|
|
# Buffer is empty, show placeholder
|
|
if not self._is_placeholder_shown:
|
|
# Create placeholder tag if needed
|
|
if not hasattr(self, '_placeholder_tag'):
|
|
self._placeholder_tag = self._text_buffer.create_tag(
|
|
"placeholder",
|
|
foreground="gray"
|
|
)
|
|
|
|
self._text_buffer.set_text(self._placeholder_text)
|
|
self._text_buffer.apply_tag(
|
|
self._placeholder_tag,
|
|
self._text_buffer.get_start_iter(),
|
|
self._text_buffer.get_end_iter()
|
|
)
|
|
self._is_placeholder_shown = True
|
|
else:
|
|
# Buffer has content
|
|
if self._is_placeholder_shown and text == self._placeholder_text:
|
|
# User started typing, remove placeholder
|
|
self._text_buffer.set_text("")
|
|
self._is_placeholder_shown = False
|
|
finally:
|
|
self._updating_placeholder = False
|
|
|
|
return False # For GLib.idle_add
|
|
|
|
def _on_buffer_changed(self, buffer):
|
|
"""Handle text buffer changes to manage placeholder"""
|
|
# Prevent recursion during placeholder updates
|
|
if self._updating_placeholder:
|
|
return
|
|
|
|
start_iter = buffer.get_start_iter()
|
|
end_iter = buffer.get_end_iter()
|
|
text = buffer.get_text(start_iter, end_iter, False)
|
|
|
|
# If placeholder is shown and user typed something, clear it
|
|
if self._is_placeholder_shown and text != self._placeholder_text:
|
|
self._updating_placeholder = True
|
|
# User started typing, clear everything and keep only what they typed
|
|
# Extract what they typed (everything after placeholder)
|
|
if text.startswith(self._placeholder_text):
|
|
new_text = text[len(self._placeholder_text):]
|
|
buffer.set_text(new_text)
|
|
self._is_placeholder_shown = False
|
|
self._updating_placeholder = False
|
|
|
|
def _on_text_view_focus_in(self, controller):
|
|
"""Handle focus entering the text view"""
|
|
# Clear placeholder when user focuses
|
|
if self._is_placeholder_shown:
|
|
self._updating_placeholder = True
|
|
self._text_buffer.set_text("")
|
|
self._is_placeholder_shown = False
|
|
self._updating_placeholder = False
|
|
|
|
def _on_text_view_focus_out(self, controller):
|
|
"""Handle focus leaving the text view"""
|
|
# Show placeholder if buffer is empty
|
|
start_iter = self._text_buffer.get_start_iter()
|
|
end_iter = self._text_buffer.get_end_iter()
|
|
text = self._text_buffer.get_text(start_iter, end_iter, False)
|
|
|
|
if not text:
|
|
self._update_placeholder()
|
|
|
|
def _on_key_pressed(self, controller, keyval, keycode, state):
|
|
"""Handle key press events for Enter/Shift+Enter"""
|
|
# Check if Enter key was pressed
|
|
if keyval == Gdk.KEY_Return or keyval == Gdk.KEY_KP_Enter:
|
|
# Check if Shift is held
|
|
if state & Gdk.ModifierType.SHIFT_MASK:
|
|
# Shift+Enter: allow newline (default behavior)
|
|
return False
|
|
else:
|
|
# Enter alone: submit message
|
|
self._on_submit()
|
|
return True # Prevent default newline insertion
|
|
|
|
return False
|
|
|
|
def _on_submit(self):
|
|
"""Handle message submission"""
|
|
# Get text from TextView buffer
|
|
start_iter = self._text_buffer.get_start_iter()
|
|
end_iter = self._text_buffer.get_end_iter()
|
|
text = self._text_buffer.get_text(start_iter, end_iter, False).strip()
|
|
|
|
# Ignore if empty or just placeholder
|
|
if not text or text == self._placeholder_text:
|
|
return
|
|
|
|
# Clear the text buffer
|
|
self._text_buffer.set_text("")
|
|
|
|
# Defer placeholder update to avoid GTK warnings during user action
|
|
GLib.idle_add(self._update_placeholder)
|
|
|
|
# Check if this is a command
|
|
if self._command_processor.is_command(text):
|
|
# Execute command
|
|
result = self._command_processor.execute(text)
|
|
|
|
# Display command result as system message
|
|
self._append_system_message(result.message)
|
|
return
|
|
|
|
# Check provider availability before processing regular messages
|
|
if not self._current_provider or not self._current_provider.is_available:
|
|
provider_name = self._current_provider.name if self._current_provider else "Provider"
|
|
if provider_name == "ollama":
|
|
error_msg = "Ollama is not running. Please start Ollama with: ollama serve"
|
|
else:
|
|
error_msg = f"{provider_name.capitalize()} is not configured. Please check settings."
|
|
self._append_message(
|
|
"assistant",
|
|
error_msg,
|
|
persist=False,
|
|
)
|
|
return
|
|
|
|
self._append_message("user", text, persist=True)
|
|
self._request_response()
|
|
|
|
def _request_response(self):
|
|
"""Request AI response in background thread with streaming"""
|
|
# Double-check availability before making request
|
|
if not self._current_provider or not self._current_provider.is_available:
|
|
provider_name = self._current_provider.name if self._current_provider else "Provider"
|
|
if provider_name == "ollama":
|
|
error_msg = "Ollama is not running. Please start Ollama with: ollama serve"
|
|
else:
|
|
error_msg = f"{provider_name.capitalize()} is not configured. Please check settings."
|
|
self._append_message(
|
|
"assistant",
|
|
error_msg,
|
|
persist=False,
|
|
)
|
|
return
|
|
|
|
model = self._current_model or self._current_provider.default_model
|
|
if not model:
|
|
provider_name = self._current_provider.name
|
|
if provider_name == "ollama":
|
|
error_msg = "No Ollama models are available. Install a model with: ollama pull llama2"
|
|
else:
|
|
error_msg = f"No {provider_name.capitalize()} models are available. Check settings."
|
|
self._append_message(
|
|
"assistant",
|
|
error_msg,
|
|
persist=True,
|
|
)
|
|
return
|
|
|
|
|
|
|
|
history = self._conversation_manager.chat_messages
|
|
self._set_input_enabled(False)
|
|
|
|
# Create message container for streaming
|
|
message_container = widgets.Box(
|
|
vertical=True,
|
|
spacing=8,
|
|
halign="start",
|
|
)
|
|
|
|
# Create thinking box (initially hidden)
|
|
thinking_box = widgets.Box(
|
|
vertical=True,
|
|
spacing=0,
|
|
css_classes=["thinking-box"],
|
|
visible=False,
|
|
)
|
|
|
|
# Create collapsible header button
|
|
thinking_header_box = widgets.Box(
|
|
spacing=8,
|
|
halign="start",
|
|
)
|
|
|
|
thinking_icon = widgets.Label(
|
|
label="▼",
|
|
css_classes=["thinking-icon"],
|
|
)
|
|
|
|
thinking_header_label = widgets.Label(
|
|
label="💭 Thinking...",
|
|
halign="start",
|
|
css_classes=["thinking-header"],
|
|
)
|
|
|
|
thinking_header_box.append(thinking_icon)
|
|
thinking_header_box.append(thinking_header_label)
|
|
|
|
# Make header clickable
|
|
thinking_header_button = widgets.Button(
|
|
child=thinking_header_box,
|
|
css_classes=["thinking-header-button"],
|
|
)
|
|
|
|
# Create revealer for collapsible content
|
|
thinking_content = widgets.Label(
|
|
label="",
|
|
halign="start",
|
|
xalign=0.0,
|
|
wrap=True,
|
|
wrap_mode="word_char",
|
|
justify="left",
|
|
css_classes=["thinking-content"],
|
|
)
|
|
|
|
thinking_revealer = widgets.Revealer(
|
|
child=thinking_content,
|
|
transition_type="slide_down",
|
|
transition_duration=200,
|
|
reveal_child=True,
|
|
)
|
|
|
|
# Connect toggle functionality
|
|
def toggle_thinking():
|
|
is_revealed = thinking_revealer.reveal_child
|
|
thinking_revealer.reveal_child = not is_revealed
|
|
thinking_icon.label = "▼" if not is_revealed else "▲"
|
|
|
|
thinking_header_button.on_click = lambda x: toggle_thinking()
|
|
|
|
thinking_box.append(thinking_header_button)
|
|
thinking_box.append(thinking_revealer)
|
|
|
|
# Create main response label
|
|
response_label = widgets.Label(
|
|
label="Assistant: ...",
|
|
halign="start",
|
|
xalign=0.0,
|
|
wrap=True,
|
|
wrap_mode="word_char",
|
|
justify="left",
|
|
)
|
|
|
|
message_container.append(thinking_box)
|
|
message_container.append(response_label)
|
|
|
|
self._message_list.append(message_container)
|
|
self._scroll_to_bottom()
|
|
|
|
# Create streaming handler with both widgets
|
|
streaming_handler = StreamingHandler(
|
|
response_label,
|
|
self._scroller,
|
|
thinking_widget=thinking_content,
|
|
thinking_box=thinking_box
|
|
)
|
|
|
|
def _worker(messages, handler):
|
|
try:
|
|
handler.start_stream()
|
|
|
|
# Get model-specific options (only for Ollama)
|
|
options = self._reasoning_controller.get_model_options() if self._current_provider.name == "ollama" else None
|
|
|
|
# Stream response tokens
|
|
for chunk in self._current_provider.stream_chat(
|
|
model=model,
|
|
messages=list(messages),
|
|
options=options
|
|
):
|
|
# Check for errors
|
|
if chunk.get("error"):
|
|
content = chunk.get("content", "An error occurred during streaming")
|
|
GLib.idle_add(self._handle_stream_error, content, priority=GLib.PRIORITY_DEFAULT)
|
|
return
|
|
|
|
# Extract tokens from chunk
|
|
message = chunk.get("message", {})
|
|
content_token = message.get("content", "")
|
|
thinking_token = message.get("thinking", "")
|
|
|
|
if content_token:
|
|
handler.append_token(content_token)
|
|
|
|
if thinking_token:
|
|
handler.append_thinking_token(thinking_token)
|
|
|
|
# Check if streaming is complete
|
|
if chunk.get("done", False):
|
|
break
|
|
|
|
# Finalize stream and get complete content
|
|
thinking_content, main_content = handler.finish_stream()
|
|
|
|
# Persist the complete message
|
|
GLib.idle_add(
|
|
self._handle_stream_complete,
|
|
thinking_content,
|
|
main_content,
|
|
priority=GLib.PRIORITY_DEFAULT
|
|
)
|
|
|
|
except Exception as exc:
|
|
error_msg = f"Streaming error: {exc}"
|
|
GLib.idle_add(self._handle_stream_error, error_msg, priority=GLib.PRIORITY_DEFAULT)
|
|
|
|
thread = threading.Thread(target=_worker, args=(history, streaming_handler), daemon=True)
|
|
thread.start()
|
|
|
|
def _handle_response(self, response):
|
|
"""Handle AI response (legacy non-streaming method)"""
|
|
self._set_input_enabled(True)
|
|
|
|
if not response:
|
|
self._append_message(
|
|
"assistant",
|
|
"The model returned an empty response.",
|
|
persist=True,
|
|
)
|
|
return False
|
|
|
|
role = response.get("role", "assistant")
|
|
content = response.get("content") or ""
|
|
if not content:
|
|
content = "[No content received from Ollama]"
|
|
|
|
self._append_message(role, content, persist=True)
|
|
return False
|
|
|
|
def _handle_stream_token(self, token: str) -> bool:
|
|
"""Handle individual stream token with thread-safe UI update.
|
|
|
|
This method is called via GLib.idle_add for thread-safe UI updates.
|
|
|
|
Args:
|
|
token: The token string to process
|
|
|
|
Returns:
|
|
False to indicate this is a one-time callback
|
|
"""
|
|
# Token handling is now managed by StreamingHandler
|
|
# This method exists for potential future direct token handling
|
|
return False
|
|
|
|
def _handle_stream_complete(self, thinking_content: str, main_content: str) -> bool:
|
|
"""Handle stream completion and persist message.
|
|
|
|
Args:
|
|
thinking_content: The thinking/reasoning content
|
|
main_content: The main response content
|
|
|
|
Returns:
|
|
False to indicate this is a one-time callback
|
|
"""
|
|
|
|
|
|
# The message is already displayed with thinking by the StreamingHandler
|
|
# We just need to persist it to conversation history
|
|
|
|
# Combine thinking and content for persistence
|
|
full_content = main_content
|
|
if thinking_content:
|
|
full_content = f"<think>{thinking_content}</think>\n{main_content}"
|
|
|
|
if full_content and self._conversation_manager:
|
|
self._conversation_manager.append_message("assistant", full_content)
|
|
|
|
# Re-enable input
|
|
self._set_input_enabled(True)
|
|
return False
|
|
|
|
def _handle_stream_error(self, error_msg: str) -> bool:
|
|
"""Handle streaming errors.
|
|
|
|
Args:
|
|
error_msg: The error message to display
|
|
|
|
Returns:
|
|
False to indicate this is a one-time callback
|
|
"""
|
|
# Display error message (don't persist errors)
|
|
self._append_message("assistant", error_msg, persist=False)
|
|
|
|
# Re-enable input
|
|
self._set_input_enabled(True)
|
|
return False
|
|
|
|
def focus_input(self):
|
|
"""Focus the input text view"""
|
|
self._text_view.grab_focus()
|
|
|
|
def _initialize_provider(self):
|
|
"""Initialize the current provider based on preferences."""
|
|
provider_id = self._reasoning_controller.get_provider()
|
|
self._current_provider = self._get_provider(provider_id)
|
|
|
|
if self._current_provider:
|
|
if provider_id == "ollama":
|
|
# Use reasoning controller model for Ollama
|
|
self._current_model = self._reasoning_controller.get_model_name()
|
|
else:
|
|
self._current_model = self._current_provider.default_model
|
|
else:
|
|
self._current_model = None
|
|
|
|
def _get_provider(self, provider_id: str) -> AIProvider | None:
|
|
"""Get or create provider instance."""
|
|
if provider_id in self._provider_instances:
|
|
return self._provider_instances[provider_id]
|
|
|
|
if provider_id == "ollama":
|
|
provider = OllamaProvider()
|
|
elif provider_id == "gemini":
|
|
api_key = self._reasoning_controller.get_api_key("gemini")
|
|
provider = GeminiProvider(api_key=api_key) if api_key else None
|
|
elif provider_id == "openrouter":
|
|
api_key = self._reasoning_controller.get_api_key("openrouter")
|
|
provider = OpenRouterProvider(api_key=api_key) if api_key else None
|
|
elif provider_id == "copilot":
|
|
token = self._reasoning_controller.get_copilot_token()
|
|
provider = CopilotProvider(oauth_token=token) if token else None
|
|
else:
|
|
return None
|
|
|
|
if provider:
|
|
self._provider_instances[provider_id] = provider
|
|
return provider
|
|
|
|
def _update_model_label(self):
|
|
"""Update the model label with current provider and model."""
|
|
if not hasattr(self, '_model_label'):
|
|
return
|
|
|
|
if not self._current_provider:
|
|
self._model_label.label = "Provider: Not configured"
|
|
return
|
|
|
|
provider_name = self._current_provider.name.capitalize()
|
|
if self._current_provider.is_available:
|
|
model_name = self._current_model or "No model selected"
|
|
self._model_label.label = f"{provider_name}: {model_name}"
|
|
else:
|
|
if provider_name == "Ollama":
|
|
self._model_label.label = f"{provider_name}: Not running"
|
|
else:
|
|
self._model_label.label = f"{provider_name}: Not configured"
|
|
|
|
def _update_placeholder_text(self):
|
|
"""Update placeholder text based on provider availability."""
|
|
if not hasattr(self, '_text_view'):
|
|
return
|
|
|
|
if not self._current_provider or not self._current_provider.is_available:
|
|
provider_name = self._current_provider.name if self._current_provider else "Provider"
|
|
if provider_name == "ollama":
|
|
self._placeholder_text = "Ollama not running - start with: ollama serve"
|
|
else:
|
|
self._placeholder_text = f"{provider_name.capitalize()} not configured. Check settings."
|
|
else:
|
|
self._placeholder_text = "Ask a question…"
|
|
|
|
def _show_settings(self):
|
|
"""Show settings view."""
|
|
if hasattr(self, '_view_stack'):
|
|
self._view_stack.set_visible_child_name("settings")
|
|
|
|
def _show_chat(self):
|
|
"""Show chat view."""
|
|
if hasattr(self, '_view_stack'):
|
|
self._view_stack.set_visible_child_name("chat")
|
|
|
|
def _on_provider_changed_from_settings(self, provider_id: str):
|
|
"""Handle provider change from settings widget."""
|
|
self._current_provider = self._get_provider(provider_id)
|
|
if self._current_provider:
|
|
if provider_id == "ollama":
|
|
self._current_model = self._reasoning_controller.get_model_name()
|
|
else:
|
|
self._current_model = self._current_provider.default_model
|
|
else:
|
|
self._current_model = None
|
|
|
|
self._update_model_label()
|
|
self._update_placeholder_text()
|
|
self._update_placeholder()
|
|
|
|
if self._current_provider and self._current_provider.is_available:
|
|
self._set_input_enabled(True)
|
|
else:
|
|
self._set_input_enabled(False)
|
|
|
|
def _on_ollama_availability_changed(self, is_available: bool):
|
|
"""Handle Ollama availability state changes"""
|
|
# Only handle if Ollama is the current provider
|
|
if self._reasoning_controller.get_provider() != "ollama":
|
|
return
|
|
|
|
if is_available:
|
|
# Ollama became available - use model from reasoning controller
|
|
self._current_model = self._reasoning_controller.get_model_name()
|
|
self._update_model_label()
|
|
self._update_placeholder_text()
|
|
self._update_placeholder()
|
|
self._set_input_enabled(True)
|
|
|
|
# Show notification
|
|
self._append_message(
|
|
"assistant",
|
|
"Ollama connection restored! You can now chat.",
|
|
persist=False,
|
|
)
|
|
else:
|
|
# Ollama became unavailable
|
|
self._update_model_label()
|
|
self._update_placeholder_text()
|
|
self._update_placeholder()
|
|
self._set_input_enabled(False)
|
|
|
|
def _on_reasoning_toggled(self):
|
|
"""Handle reasoning mode toggle button state changes"""
|
|
# Only work for Ollama provider
|
|
if self._reasoning_controller.get_provider() != "ollama":
|
|
self._append_system_message("Reasoning mode is only available for Ollama provider.")
|
|
return
|
|
|
|
# Toggle the reasoning mode
|
|
new_state = self._reasoning_controller.toggle()
|
|
self._reasoning_enabled = new_state
|
|
|
|
# Switch to the appropriate model
|
|
new_model = self._reasoning_controller.get_model_name()
|
|
self._current_model = new_model
|
|
|
|
# Update button label
|
|
toggle_label = "🧠 Reasoning: ON" if new_state else "🧠 Reasoning: OFF"
|
|
self._reasoning_toggle.label = toggle_label
|
|
|
|
# Update model label in header
|
|
self._update_model_label()
|
|
|
|
# Show feedback message
|
|
status = "enabled" if new_state else "disabled"
|
|
model_name = "Qwen3-4B-Thinking" if new_state else "Qwen3-4B-Instruct"
|
|
self._append_system_message(f"Reasoning mode {status} - switched to {model_name}")
|
|
|
|
|
|
def _load_css(self):
|
|
"""Load CSS styling for TextView"""
|
|
css_provider = Gtk.CssProvider()
|
|
css_path = os.path.join(os.path.dirname(__file__), "style.css")
|
|
|
|
if os.path.exists(css_path):
|
|
css_provider.load_from_path(css_path)
|
|
Gtk.StyleContext.add_provider_for_display(
|
|
Gdk.Display.get_default(),
|
|
css_provider,
|
|
Gtk.STYLE_PROVIDER_PRIORITY_APPLICATION
|
|
)
|
|
|
|
def _register_commands(self):
|
|
"""Register all slash commands with the command processor"""
|
|
self._command_processor.register_command("/new", self._cmd_new_conversation)
|
|
self._command_processor.register_command("/clear", self._cmd_new_conversation)
|
|
self._command_processor.register_command("/models", self._cmd_list_models)
|
|
self._command_processor.register_command("/model", self._cmd_switch_model)
|
|
self._command_processor.register_command("/list", self._cmd_list_conversations)
|
|
self._command_processor.register_command("/resume", self._cmd_resume_conversation)
|
|
|
|
def _generate_conversation_title(self, messages: list) -> str:
|
|
"""Generate a concise title for a conversation using AI.
|
|
|
|
Args:
|
|
messages: List of conversation messages
|
|
|
|
Returns:
|
|
Generated title or empty string if generation fails
|
|
"""
|
|
if not messages or not self._current_provider or not self._current_provider.is_available:
|
|
return ""
|
|
|
|
# Extract first few user messages for context
|
|
user_messages = [msg for msg in messages if msg.get("role") == "user"]
|
|
if not user_messages:
|
|
return ""
|
|
|
|
# Use first 2-3 user messages for context
|
|
context_messages = user_messages[:3]
|
|
context_text = " ".join([msg.get("content", "")[:100] for msg in context_messages])
|
|
|
|
# Create a prompt for title generation
|
|
title_prompt = [
|
|
{
|
|
"role": "system",
|
|
"content": "You are a helpful assistant that generates concise, descriptive titles for conversations. Generate a title that is 3-6 words maximum. Respond with ONLY the title, no quotes or extra text."
|
|
},
|
|
{
|
|
"role": "user",
|
|
"content": f"Generate a short title (3-6 words) for a conversation that starts with: {context_text}"
|
|
}
|
|
]
|
|
|
|
try:
|
|
model = self._current_model or self._current_provider.default_model
|
|
if not model:
|
|
return ""
|
|
|
|
# Use non-streaming chat for title generation
|
|
response = self._current_provider.chat(model=model, messages=title_prompt)
|
|
if response and response.get("content"):
|
|
title = response["content"].strip()
|
|
# Clean up the title (remove quotes, limit length)
|
|
title = title.strip('"\'').strip()
|
|
# Limit to 50 characters
|
|
if len(title) > 50:
|
|
title = title[:47] + "..."
|
|
return title
|
|
except Exception:
|
|
# If title generation fails, return empty string
|
|
pass
|
|
|
|
return ""
|
|
|
|
def _cmd_new_conversation(self, args: str) -> CommandResult:
|
|
"""
|
|
Start a new conversation, saving the current one.
|
|
|
|
Args:
|
|
args: Command arguments (unused for /new and /clear)
|
|
|
|
Returns:
|
|
CommandResult with success status and confirmation message
|
|
"""
|
|
# Archive current conversation if it has messages
|
|
archive_id = None
|
|
if self._conversation_manager.messages:
|
|
current_state = self._conversation_manager._state
|
|
|
|
# Archive without title for now (avoid blocking main thread)
|
|
# Title generation will happen in background via auto-archive
|
|
archive_id = self._conversation_archive.archive_conversation(current_state)
|
|
|
|
# Clear the conversation manager (Option 1: clear default.json)
|
|
self._conversation_manager.clear_messages()
|
|
|
|
# Clear the message list UI safely
|
|
self._clear_message_list()
|
|
|
|
# Add welcome message to new conversation
|
|
self._append_message(
|
|
"assistant",
|
|
"New conversation started. Previous conversation archived.",
|
|
persist=True,
|
|
)
|
|
|
|
if archive_id:
|
|
return CommandResult(
|
|
success=True,
|
|
message=f"Started new conversation. Previous conversation archived as '{archive_id}'"
|
|
)
|
|
else:
|
|
return CommandResult(
|
|
success=True,
|
|
message="Started new conversation (no previous messages to archive)"
|
|
)
|
|
|
|
def _cmd_list_models(self, args: str) -> CommandResult:
|
|
"""
|
|
List available Ollama models.
|
|
|
|
Args:
|
|
args: Command arguments (unused)
|
|
|
|
Returns:
|
|
CommandResult with model list
|
|
"""
|
|
if not self._current_provider or not self._current_provider.is_available:
|
|
provider_name = self._current_provider.name if self._current_provider else "Provider"
|
|
return CommandResult(
|
|
success=False,
|
|
message=f"{provider_name.capitalize()} is not available. Check settings."
|
|
)
|
|
|
|
models = self._current_provider.list_models(force_refresh=True)
|
|
|
|
if not models:
|
|
return CommandResult(
|
|
success=True,
|
|
message="No models available. Install a model with: ollama pull llama2"
|
|
)
|
|
|
|
# Format model list with current model highlighted
|
|
model_lines = []
|
|
for model in models:
|
|
if model == self._current_model:
|
|
model_lines.append(f"• {model} (current)")
|
|
else:
|
|
model_lines.append(f"• {model}")
|
|
|
|
message = "Available models:\n" + "\n".join(model_lines)
|
|
|
|
return CommandResult(
|
|
success=True,
|
|
message=message,
|
|
data={"models": models}
|
|
)
|
|
|
|
def _cmd_switch_model(self, args: str) -> CommandResult:
|
|
"""
|
|
Switch to a different Ollama model.
|
|
|
|
Args:
|
|
args: Model name to switch to
|
|
|
|
Returns:
|
|
CommandResult with success status
|
|
"""
|
|
if not self._current_provider or not self._current_provider.is_available:
|
|
provider_name = self._current_provider.name if self._current_provider else "Provider"
|
|
return CommandResult(
|
|
success=False,
|
|
message=f"{provider_name.capitalize()} is not available. Check settings."
|
|
)
|
|
|
|
model_name = args.strip()
|
|
|
|
if not model_name:
|
|
return CommandResult(
|
|
success=False,
|
|
message="Usage: /model <model_name>\nExample: /model llama2"
|
|
)
|
|
|
|
# Validate model exists
|
|
available_models = self._current_provider.list_models(force_refresh=True)
|
|
|
|
if model_name not in available_models:
|
|
return CommandResult(
|
|
success=False,
|
|
message=f"Model '{model_name}' not found. Use /models to see available models."
|
|
)
|
|
|
|
# Switch model
|
|
old_model = self._current_model
|
|
self._current_model = model_name
|
|
|
|
# Update model label in header
|
|
self._model_label.label = f"Model: {model_name}"
|
|
|
|
return CommandResult(
|
|
success=True,
|
|
message=f"Switched from '{old_model}' to '{model_name}'"
|
|
)
|
|
|
|
def _cmd_list_conversations(self, args: str) -> CommandResult:
|
|
"""
|
|
List all archived conversations with metadata.
|
|
|
|
Args:
|
|
args: Command arguments (unused)
|
|
|
|
Returns:
|
|
CommandResult with formatted conversation list
|
|
"""
|
|
conversations = self._conversation_archive.list_conversations()
|
|
|
|
if not conversations:
|
|
return CommandResult(
|
|
success=True,
|
|
message="No archived conversations found. Use /new to archive the current conversation."
|
|
)
|
|
|
|
# Format conversation list
|
|
lines = ["Archived conversations:"]
|
|
for conv in conversations:
|
|
# Format timestamp for display
|
|
try:
|
|
from datetime import datetime
|
|
dt = datetime.fromisoformat(conv.updated_at.replace('Z', '+00:00'))
|
|
time_str = dt.strftime("%Y-%m-%d %H:%M")
|
|
except:
|
|
time_str = conv.updated_at[:16] # Fallback to raw string
|
|
|
|
# Display title prominently, with archive_id as reference
|
|
lines.append(f"• {conv.title}")
|
|
lines.append(f" ID: {conv.archive_id}")
|
|
lines.append(f" Updated: {time_str} | Messages: {conv.message_count}")
|
|
|
|
lines.append("\nUse /resume <archive_id> to load a conversation")
|
|
|
|
return CommandResult(
|
|
success=True,
|
|
message="\n".join(lines),
|
|
data={"conversations": [c.archive_id for c in conversations]}
|
|
)
|
|
|
|
def _cmd_resume_conversation(self, args: str) -> CommandResult:
|
|
"""
|
|
Resume an archived conversation by loading it.
|
|
|
|
Args:
|
|
args: Archive ID to resume
|
|
|
|
Returns:
|
|
CommandResult with success status
|
|
"""
|
|
archive_id = args.strip()
|
|
|
|
if not archive_id:
|
|
return CommandResult(
|
|
success=False,
|
|
message="Usage: /resume <archive_id>\nUse /list to see available conversations"
|
|
)
|
|
|
|
# Load the conversation
|
|
conversation_state = self._conversation_archive.load_conversation(archive_id)
|
|
|
|
if conversation_state is None:
|
|
return CommandResult(
|
|
success=False,
|
|
message=f"Conversation '{archive_id}' not found or could not be loaded.\nUse /list to see available conversations."
|
|
)
|
|
|
|
# Archive current conversation before switching
|
|
if self._conversation_manager.messages:
|
|
current_state = self._conversation_manager._state
|
|
# Archive without title (avoid blocking)
|
|
archive_id_saved = self._conversation_archive.archive_conversation(current_state)
|
|
|
|
# Clear the message list UI safely
|
|
self._clear_message_list()
|
|
|
|
# Create new conversation manager with loaded state
|
|
self._conversation_manager = ConversationManager()
|
|
self._conversation_manager.replace_messages(conversation_state.messages)
|
|
|
|
# Repopulate message list from loaded conversation
|
|
for message in conversation_state.messages:
|
|
self._append_message(message["role"], message["content"], persist=False)
|
|
|
|
return CommandResult(
|
|
success=True,
|
|
message=f"Resumed conversation '{archive_id}' with {len(conversation_state.messages)} messages"
|
|
)
|