Skip to main content
The Streaming Retrieval API in R2R enables real-time data flow for Retrieval-Augmented Generation (RAG) and agent-based interactions. It delivers live updates as content is retrieved, processed, and generated, allowing client applications to display results incrementally. This streaming approach significantly improves the user experience for interactive and latency-sensitive applications by providing immediate, progressive feedback instead of waiting for full completion.

Key Capabilities

The Streaming Retrieval API supports:
  • Real-time event streaming for RAG and agent operations.
  • Progressive response generation with token-level updates.
  • Immediate access to intermediate results such as retrieved documents or citations.
  • Fine-grained event structure to handle reasoning steps, tool calls, and responses.
  • Improved responsiveness for conversational and research-oriented applications.

Streaming Events

During streaming, R2R emits structured event types that represent each stage of the retrieval and generation process:
Event TypeDescription
SearchResultsEventContains initial search results from documents.
MessageEventStreams partial response tokens as they are generated.
CitationEventIndicates when a citation has been added to the response.
ThinkingEventProvides insight into the model’s internal reasoning steps (for agents).
ToolCallEventIndicates when the model invokes a tool during processing (for agents).
ToolResultEventReturns the results from previously called tools (for agents).
FinalAnswerEventContains the complete generated answer with citations.

Streaming RAG

Basic Streaming RAG

To use streaming with basic RAG functionality:
from r2r import (
    CitationEvent,
    FinalAnswerEvent,
    MessageEvent,
    SearchResultsEvent,
    R2RClient,
)

client = R2RClient("http://localhost:7272")

result_stream = client.retrieval.rag(
    query="What is DeepSeek R1?",
    search_settings={"limit": 25},
    rag_generation_config={"stream": True},
)

for event in result_stream:
    if isinstance(event, SearchResultsEvent):
        print("Search results:", event.data)
    elif isinstance(event, MessageEvent):
        print("Partial message:", event.data.delta)
    elif isinstance(event, CitationEvent):
        print("New citation detected:", event.data)
    elif isinstance(event, FinalAnswerEvent):
        print("Final answer:", event.data.generated_answer)
        print("Citations:", event.data.citations)
To include web search in your streaming RAG:
result_stream = client.retrieval.rag(
    query="What are the latest developments with DeepSeek R1?", 
    rag_generation_config={"stream": True},
    include_web_search=True
)

for event in result_stream:
    # Process events as shown in previous example
    pass

Streaming Agent

R2R provides powerful streaming agents that supports complex interactions with both document-based knowledge and web resources.

Basic Streaming Agent

from r2r import (
    ThinkingEvent,
    ToolCallEvent,
    ToolResultEvent,
    CitationEvent,
    MessageEvent,
    FinalAnswerEvent,
)

agent_stream = client.retrieval.agent(
    query="What does DeepSeek R1 imply for the future of AI?",
    generation_config={"stream": True},
    mode="research"
)

for event in agent_stream:
    if isinstance(event, ThinkingEvent):
        print(f"🧠 Thinking: {event.data.delta.content[0].payload.value}")
    elif isinstance(event, ToolCallEvent):
        print(f"🔧 Tool call: {event.data.name}({event.data.arguments})")
    elif isinstance(event, ToolResultEvent):
        print(f"📊 Tool result: {event.data.content[:60]}...")
    elif isinstance(event, CitationEvent):
        print(f"📑 Citation: {event.data}")
    elif isinstance(event, MessageEvent):
        print(f"💬 Message: {event.data.delta.content[0].payload.value}")
    elif isinstance(event, FinalAnswerEvent):
        print(f"✅ Final answer: {event.data.generated_answer[:100]}...")
        print(f"   Citations: {len(event.data.citations)} sources referenced")

Advanced Research Agent with Tools

The R2R agent can leverage multiple tools to perform in-depth research:
agent_stream = client.retrieval.agent(
    query="Analyze DeepSeek R1's performance compared to other models",
    generation_config={
        "model": "anthropic/claude-3-7-sonnet-20250219",
        "extended_thinking": True,
        "thinking_budget": 4096,
        "temperature": 1,
        "max_tokens_to_sample": 16000,
        "stream": True
    },
    mode="research",
    rag_tools=["web_search", "web_scrape"]
)

# Process events as shown in previous example

Streaming Citations

R2R streaming citations provide detailed attribution information that links specific parts of the response to source documents:
{
  "event": "citation",
  "data": {
    "id": "abc123",
    "object": "citation",
    "raw_index": 1,
    "index": 1,
    "start_index": 305,
    "end_index": 308,
    "source_type": "chunk",
    "source_id": "e760bb76-1c6e-52eb-910d-0ce5b567011b",
    "document_id": "e43864f5-a36f-548e-aacd-6f8d48b30c7f",
    "source_title": "DeepSeek_R1.pdf"
  }
}
Each citation includes:
  • id: Unique identifier for the citation.
  • index: The display index (e.g., /[1], /[2]).
  • start_index and end_index: Character positions in the response.
  • source_type: The type of source (chunk, graph, web).
  • source_id: ID of the specific chunk/node.
  • document_id: ID of the parent document.
  • source_title: Title of the source document.

Implementing Streaming UI

To create a responsive UI with Streaming RAG, consider the following:

Frontend Implementation

import { useState, useEffect } from 'react';

function RAGComponent() {
  const [messages, setMessages] = useState([]);
  const [currentMessage, setCurrentMessage] = useState('');
  const [citations, setCitations] = useState([]);
  const [isLoading, setIsLoading] = useState(false);

  const handleSubmit = async (query) => {
    setIsLoading(true);
    setCurrentMessage('');
    setCitations([]);
    
    try {
      const response = await fetch('/api/rag', {
        method: 'POST',
        headers: { 'Content-Type': 'application/json' },
        body: JSON.stringify({
          query,
          stream: true
        })
      });
      
      const reader = response.body.getReader();
      const decoder = new TextDecoder();
      
      while (true) {
        const { done, value } = await reader.read();
        if (done) break;
        
        const chunk = decoder.decode(value);
        const events = chunk.split('/n/n').filter(Boolean);
        
        for (const eventText of events) {
          if (!eventText.startsWith('data: ')) continue;
          
          const eventData = JSON.parse(eventText.slice(6));
          
          switch (eventData.event) {
            case 'message':
              setCurrentMessage(prev => prev + eventData.data.delta.content[0].payload.value);
              break;
            case 'citation':
              setCitations(prev => [...prev, eventData.data]);
              break;
            case 'final_answer':
              setMessages(prev => [...prev, {
                role: 'assistant',
                content: eventData.data.generated_answer,
                citations: eventData.data.citations
              }]);
              break;
          }
        }
      }
    } catch (error) {
      console.error('Error with streaming RAG:', error);
    } finally {
      setIsLoading(false);
    }
  };

  return (
    <div className="rag-container">
      {/* UI implementation */}
      {isLoading && <div className="typing-indicator">{currentMessage}</div>}
      {/* Display messages and citations */}
    </div>
  );
}