refactor(aisidebar): restructure project and implement reasoning mode toggle
- Reorganize project structure and file locations - Add ReasoningController to manage model selection and reasoning mode - Update design and requirements for reasoning mode toggle - Implement model switching between Qwen3-4B-Instruct and Qwen3-4B-Thinking models - Remove deprecated files and consolidate project layout - Add new steering and specification documentation - Clean up and remove unnecessary files and directories - Prepare for enhanced AI sidebar functionality with more flexible model handling
This commit is contained in:
222
streaming_handler.py
Normal file
222
streaming_handler.py
Normal file
@@ -0,0 +1,222 @@
|
||||
"""Streaming response handler for progressive token display."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from gi.repository import GLib
|
||||
from ignis import widgets
|
||||
|
||||
|
||||
class StreamingHandler:
|
||||
"""Manages streaming response display with token-by-token updates."""
|
||||
|
||||
def __init__(self, message_widget: widgets.Label, scroller: widgets.Scroll,
|
||||
thinking_widget: widgets.Label = None, thinking_box: widgets.Box = None):
|
||||
"""Initialize streaming handler.
|
||||
|
||||
Args:
|
||||
message_widget: The label widget to update with streaming content
|
||||
scroller: The scroll widget to manage auto-scrolling
|
||||
thinking_widget: Optional label widget for thinking content
|
||||
thinking_box: Optional box container for thinking section
|
||||
"""
|
||||
self._widget = message_widget
|
||||
self._thinking_widget = thinking_widget
|
||||
self._thinking_box = thinking_box
|
||||
self._scroller = scroller
|
||||
self._buffer = ""
|
||||
self._thinking_buffer = "" # Separate buffer for thinking content
|
||||
self._token_buffer = []
|
||||
self._thinking_token_buffer = []
|
||||
self._is_streaming = False
|
||||
self._buffer_size = 3 # Accumulate 3-5 tokens before UI update
|
||||
|
||||
def start_stream(self) -> None:
|
||||
"""Initialize streaming state."""
|
||||
self._buffer = ""
|
||||
self._thinking_buffer = ""
|
||||
self._token_buffer = []
|
||||
self._thinking_token_buffer = []
|
||||
self._is_streaming = True
|
||||
# Set initial text with streaming indicator
|
||||
self._widget.label = "Assistant: ..."
|
||||
|
||||
def append_token(self, token: str) -> None:
|
||||
"""Add token to buffer and update UI via GLib.idle_add.
|
||||
|
||||
Args:
|
||||
token: The token string to append
|
||||
"""
|
||||
if not self._is_streaming:
|
||||
return
|
||||
|
||||
# Add token to buffer
|
||||
self._token_buffer.append(token)
|
||||
|
||||
# Update UI when buffer reaches threshold
|
||||
if len(self._token_buffer) >= self._buffer_size:
|
||||
self._flush_buffer()
|
||||
|
||||
def append_thinking_token(self, token: str) -> None:
|
||||
"""Add thinking token to separate buffer.
|
||||
|
||||
Args:
|
||||
token: The thinking token string to append
|
||||
"""
|
||||
if not self._is_streaming:
|
||||
return
|
||||
|
||||
# Add token to thinking buffer
|
||||
self._thinking_token_buffer.append(token)
|
||||
|
||||
# Update UI when buffer reaches threshold
|
||||
if len(self._thinking_token_buffer) >= self._buffer_size:
|
||||
self._flush_thinking_buffer()
|
||||
|
||||
def _flush_buffer(self) -> None:
|
||||
"""Flush accumulated tokens to UI."""
|
||||
if not self._token_buffer:
|
||||
return
|
||||
|
||||
# Combine buffered tokens
|
||||
tokens = "".join(self._token_buffer)
|
||||
self._buffer += tokens
|
||||
self._token_buffer = []
|
||||
|
||||
# Schedule UI update on main thread
|
||||
GLib.idle_add(self._update_ui, priority=GLib.PRIORITY_DEFAULT)
|
||||
|
||||
def _flush_thinking_buffer(self) -> None:
|
||||
"""Flush accumulated thinking tokens to UI."""
|
||||
if not self._thinking_token_buffer:
|
||||
return
|
||||
|
||||
# Combine buffered thinking tokens
|
||||
tokens = "".join(self._thinking_token_buffer)
|
||||
self._thinking_buffer += tokens
|
||||
self._thinking_token_buffer = []
|
||||
|
||||
# Schedule UI update on main thread
|
||||
GLib.idle_add(self._update_ui, priority=GLib.PRIORITY_DEFAULT)
|
||||
|
||||
def _update_ui(self) -> bool:
|
||||
"""Update the widget label with current buffer content.
|
||||
|
||||
Returns:
|
||||
False to indicate this is a one-time callback
|
||||
"""
|
||||
# Update thinking widget if present
|
||||
if self._thinking_widget and self._thinking_buffer:
|
||||
self._thinking_widget.label = self._thinking_buffer + "..."
|
||||
if self._thinking_box:
|
||||
self._thinking_box.set_visible(True)
|
||||
|
||||
# Update main response label with streaming indicator
|
||||
self._widget.label = f"Assistant: {self._buffer}..."
|
||||
|
||||
# Auto-scroll to bottom
|
||||
self._scroll_to_bottom()
|
||||
|
||||
return False # Don't repeat this callback
|
||||
|
||||
def _scroll_to_bottom(self) -> None:
|
||||
"""Scroll to the latest content."""
|
||||
adjustment = self._scroller.get_vadjustment()
|
||||
if adjustment:
|
||||
adjustment.set_value(adjustment.get_upper() - adjustment.get_page_size())
|
||||
|
||||
def finish_stream(self) -> tuple[str, str]:
|
||||
"""Finalize streaming and return complete content.
|
||||
|
||||
Returns:
|
||||
Tuple of (thinking_content, main_content)
|
||||
"""
|
||||
self._is_streaming = False
|
||||
|
||||
# Flush any remaining tokens
|
||||
if self._token_buffer:
|
||||
self._buffer += "".join(self._token_buffer)
|
||||
self._token_buffer = []
|
||||
|
||||
if self._thinking_token_buffer:
|
||||
self._thinking_buffer += "".join(self._thinking_token_buffer)
|
||||
self._thinking_token_buffer = []
|
||||
|
||||
# Final UI update without streaming indicator
|
||||
final_content = self._buffer
|
||||
final_thinking = self._thinking_buffer
|
||||
GLib.idle_add(self._finalize_ui, final_thinking, final_content, priority=GLib.PRIORITY_DEFAULT)
|
||||
|
||||
return (final_thinking, final_content)
|
||||
|
||||
def _finalize_ui(self, thinking: str, content: str) -> bool:
|
||||
"""Update UI with final content without streaming indicator.
|
||||
|
||||
Args:
|
||||
thinking: The final thinking content
|
||||
content: The final complete content
|
||||
|
||||
Returns:
|
||||
False to indicate this is a one-time callback
|
||||
"""
|
||||
# Update thinking widget if present
|
||||
if self._thinking_widget and thinking:
|
||||
self._thinking_widget.label = thinking
|
||||
if self._thinking_box:
|
||||
self._thinking_box.set_visible(True)
|
||||
|
||||
# Update main response without streaming indicator
|
||||
self._widget.label = f"Assistant: {content}"
|
||||
self._scroll_to_bottom()
|
||||
return False
|
||||
|
||||
def parse_reasoning_content(self, content: str) -> tuple[str | None, str]:
|
||||
"""Parse reasoning content from response if present.
|
||||
|
||||
Looks for common reasoning patterns like:
|
||||
- <think>...</think> or <thinking>...</thinking> tags
|
||||
- [Reasoning: ...] markers
|
||||
- Other model-specific formats
|
||||
|
||||
Args:
|
||||
content: The complete response content
|
||||
|
||||
Returns:
|
||||
Tuple of (reasoning_content, main_content)
|
||||
If no reasoning found, returns (None, original_content)
|
||||
"""
|
||||
import re
|
||||
|
||||
# Pattern 1a: <think>...</think> tags (shorter form)
|
||||
think_pattern = r'<think>(.*?)</think>\s*(.*)'
|
||||
match = re.search(think_pattern, content, re.DOTALL | re.IGNORECASE)
|
||||
if match:
|
||||
reasoning = match.group(1).strip()
|
||||
main_content = match.group(2).strip()
|
||||
return (reasoning, main_content)
|
||||
|
||||
# Pattern 1b: <thinking>...</thinking> tags (longer form)
|
||||
thinking_pattern = r'<thinking>(.*?)</thinking>\s*(.*)'
|
||||
match = re.search(thinking_pattern, content, re.DOTALL | re.IGNORECASE)
|
||||
if match:
|
||||
reasoning = match.group(1).strip()
|
||||
main_content = match.group(2).strip()
|
||||
return (reasoning, main_content)
|
||||
|
||||
# Pattern 2: [Reasoning: ...] followed by [Answer: ...]
|
||||
reasoning_pattern = r'\[Reasoning:?\s*(.*?)\]\s*\[Answer:?\s*(.*?)\]'
|
||||
match = re.search(reasoning_pattern, content, re.DOTALL | re.IGNORECASE)
|
||||
if match:
|
||||
reasoning = match.group(1).strip()
|
||||
main_content = match.group(2).strip()
|
||||
return (reasoning, main_content)
|
||||
|
||||
# Pattern 3: "Reasoning:" followed by "Answer:" or "Conclusion:"
|
||||
reasoning_pattern2 = r'Reasoning:\s*(.*?)\s*(?:Answer|Conclusion):\s*(.*)'
|
||||
match = re.search(reasoning_pattern2, content, re.DOTALL | re.IGNORECASE)
|
||||
if match:
|
||||
reasoning = match.group(1).strip()
|
||||
main_content = match.group(2).strip()
|
||||
return (reasoning, main_content)
|
||||
|
||||
# No reasoning pattern found
|
||||
return (None, content)
|
||||
Reference in New Issue
Block a user