Skip to content

AI Integration Guide

Point your AI here - This page provides the essential models and function signatures needed for AI agents and automated systems to integrate with BookWyrm.

Quick Reference for AI Systems

Task Method Input Output Use Case
Text → Chunks stream_process_text() Raw text/URL TextSpanResult[] Document preprocessing
Question → Citations stream_citations() Chunks + question Citation[] RAG, Q&A systems
PDF → Structure stream_extract_pdf() PDF bytes/URL PDFPage[] Document parsing
File → Type classify() File bytes FileClassification Content routing
Content → Summary stream_summarize() Text/phrases Summary text Document analysis
Structured Summary stream_summarize() Text + Pydantic model Structured JSON Data extraction

Complete Type Definitions

# Essential imports for AI code generation
from bookwyrm import AsyncBookWyrmClient, BookWyrmClient, BookWyrmAPIError
from bookwyrm.models import (
    TextSpan, Citation, CitationResponse,
    ClassifyResponse, SummaryResponse, TextResult, TextSpanResult,
    ResponseFormat, PhraseProgressUpdate, PDFPage
)
from typing import List, Optional, AsyncIterator, Union
import asyncio

Common AI Integration Patterns

RAG (Retrieval-Augmented Generation) Pipeline

async def rag_pipeline(document_text: str, user_question: str) -> List[Citation]:
    """Complete RAG pipeline for AI agents."""
    # Step 1: Process document into chunks
    chunks = []
    async with AsyncBookWyrmClient(api_key="your-key") as client:
        async for response in client.stream_process_text(
            text=document_text,
            chunk_size=1000,
            offsets=True
        ):
            if isinstance(response, TextSpanResult):
                chunks.append(TextSpan(
                    text=response.text,
                    start_char=response.start_char,
                    end_char=response.end_char
                ))

    # Step 2: Find relevant citations
    citations = []
    async with AsyncBookWyrmClient(api_key="your-key") as client:
        async for stream_response in client.stream_citations(
            chunks=chunks,
            question=user_question
        ):
            if hasattr(stream_response, 'citation'):
                citations.append(stream_response.citation)
        return citations

Function Calling for AI Agents

async def find_citations_tool(question: str, document_chunks: List[dict]) -> List[dict]:
    """Tool function for AI agents to find citations in documents."""
    chunks = [TextSpan(**chunk) for chunk in document_chunks]
    citations = []
    async with AsyncBookWyrmClient() as client:
        async for stream_response in client.stream_citations(chunks=chunks, question=question):
            if hasattr(stream_response, 'citation'):
                citations.append(stream_response.citation)
        return [citation.model_dump() for citation in citations]

async def process_document_tool(text: str, chunk_size: int = 1000) -> List[dict]:
    """Tool function to process documents into chunks."""
    chunks = []
    async with AsyncBookWyrmClient() as client:
        async for response in client.stream_process_text(
            text=text, chunk_size=chunk_size, offsets=True
        ):
            if isinstance(response, TextSpanResult):
                chunks.append(response.model_dump())
    return chunks

async def structured_summary_tool(text: str, model_schema: dict, model_name: str) -> dict:
    """Tool function for structured data extraction using Pydantic models."""
    import json
    async with AsyncBookWyrmClient() as client:
        async for response in client.stream_summarize(
            content=text,
            model_name=model_name,
            model_schema_json=json.dumps(model_schema),
            model_strength="smart"
        ):
            if hasattr(response, 'summary'):
                return json.loads(response.summary)
    return {}

Concurrent Processing Pattern

async def process_multiple_documents(documents: List[str]) -> List[List[TextSpan]]:
    """Process multiple documents concurrently."""
    async def process_single(text: str) -> List[TextSpan]:
        chunks = []
        async with AsyncBookWyrmClient() as client:
            async for response in client.stream_process_text(text=text, offsets=True):
                if isinstance(response, TextSpanResult):
                    chunks.append(TextSpan(
                        text=response.text,
                        start_char=response.start_char,
                        end_char=response.end_char
                    ))
        return chunks

    return await asyncio.gather(*[process_single(doc) for doc in documents])

Error Handling for AI Systems

from bookwyrm import BookWyrmAPIError
import asyncio
import logging

async def robust_citation_search(chunks: List[TextSpan], question: str, max_retries: int = 3):
    """Citation search with proper error handling and retries."""
    for attempt in range(max_retries):
        try:
            citations = []
            async with AsyncBookWyrmClient() as client:
                async for stream_response in client.stream_citations(chunks=chunks, question=question):
                    if hasattr(stream_response, 'citation'):
                        citations.append(stream_response.citation)
                return citations

        except BookWyrmAPIError as e:
            if e.status_code == 429:  # Rate limit
                wait_time = 2 ** attempt
                logging.warning(f"Rate limited, waiting {wait_time}s")
                await asyncio.sleep(wait_time)
            elif e.status_code == 413:  # Payload too large
                # Split chunks in half and retry
                mid = len(chunks) // 2
                chunks = chunks[:mid]
                logging.warning(f"Payload too large, reducing to {len(chunks)} chunks")
            else:
                logging.error(f"API error: {e}")
                if attempt == max_retries - 1:
                    raise
        except Exception as e:
            logging.error(f"Unexpected error: {e}")
            if attempt == max_retries - 1:
                raise

    return []  # Return empty list if all retries failed

Minimal Working Examples

Citation Finding (4 lines)

citations = []
async with AsyncBookWyrmClient(api_key="key") as client:
    async for r in client.stream_citations(chunks=text_chunks, question="What is X?"):
        if hasattr(r, 'citation'): citations.append(r.citation)
best_citation = max(citations, key=lambda c: c.quality) if citations else None

Document Processing (4 lines)

chunks = []
async with AsyncBookWyrmClient() as client:
    async for response in client.stream_process_text(text=document, offsets=True):
        if isinstance(response, TextSpanResult): chunks.append(response)

PDF Analysis (4 lines)

pages = []
async with AsyncBookWyrmClient() as client:
    async for response in client.stream_extract_pdf(pdf_bytes=pdf_data):
        if hasattr(response, 'page_data'): pages.append(response.page_data)
text_elements = [elem for page in pages for elem in page.text_blocks]

Structured Data Extraction (4 lines)

import json
async with AsyncBookWyrmClient() as client:
    async for r in client.stream_summarize(content=text, model_name="MyModel", model_schema_json=schema):
        if hasattr(r, 'summary'): return json.loads(r.summary)

Performance Optimization for AI Systems

Batch Processing

async def batch_process_citations(chunk_groups: List[List[TextSpan]], questions: List[str]):
    """Process multiple citation requests concurrently."""
    async def single_request(chunks, question):
        citations = []
        async with AsyncBookWyrmClient() as client:
            async for stream_response in client.stream_citations(chunks=chunks, question=question):
                if hasattr(stream_response, 'citation'):
                    citations.append(stream_response.citation)
            return citations

    tasks = [single_request(chunks, q) for chunks, q in zip(chunk_groups, questions)]
    return await asyncio.gather(*tasks, return_exceptions=True)

Memory-Efficient Streaming

async def stream_large_document(text: str, chunk_size: int = 2000):
    """Process large documents without loading everything into memory."""
    async with AsyncBookWyrmClient() as client:
        async for response in client.stream_process_text(
            text=text, 
            chunk_size=chunk_size,
            offsets=True
        ):
            if isinstance(response, TextSpanResult):
                # Process chunk immediately, don't store all chunks
                yield response
            elif isinstance(response, PhraseProgressUpdate):
                print(f"Progress: {response.message}")

AI Implementation Notes

  • Always use context managers: async with AsyncBookWyrmClient() as client:
  • Handle streaming responses: Check isinstance(response, TextSpanResult) before processing
  • Implement pagination: Use start and limit parameters for large datasets
  • Rate limiting: Implement exponential backoff for production systems
  • Memory management: Process large documents in streaming chunks
  • Error recovery: Handle network issues and API errors gracefully
  • Concurrent limits: Don't exceed reasonable concurrent request limits
  • Chunk size optimization: 500-2000 characters for balanced performance
  • Structured output: Use detailed field descriptions in Pydantic models for better extraction
  • Model strength selection: Use smart/clever/wise for structured output, swift for testing
  • JSON validation: Always validate structured output with json.loads() and Pydantic models

Structured Data Extraction Patterns

Define Extraction Models

from pydantic import BaseModel, Field
from typing import Optional, List
import json

class PersonInfo(BaseModel):
    """Extract person information from text."""
    name: Optional[str] = Field(None, description="Full name of the person")
    age: Optional[int] = Field(None, description="Age in years")
    occupation: Optional[str] = Field(None, description="Job title or profession")
    location: Optional[str] = Field(None, description="City, state, or country of residence")
    skills: Optional[List[str]] = Field(None, description="List of skills or expertise areas")

class CompanyInfo(BaseModel):
    """Extract company information from text."""
    name: Optional[str] = Field(None, description="Official company name")
    industry: Optional[str] = Field(None, description="Primary industry or sector")
    founded: Optional[int] = Field(None, description="Year the company was founded")
    employees: Optional[int] = Field(None, description="Number of employees")
    revenue: Optional[str] = Field(None, description="Annual revenue or financial information")
    products: Optional[List[str]] = Field(None, description="Main products or services offered")

class EventInfo(BaseModel):
    """Extract event information from text."""
    name: Optional[str] = Field(None, description="Name or title of the event")
    date: Optional[str] = Field(None, description="Date of the event in YYYY-MM-DD format")
    location: Optional[str] = Field(None, description="Venue or location where event takes place")
    attendees: Optional[int] = Field(None, description="Number of people attending")
    organizer: Optional[str] = Field(None, description="Person or organization organizing the event")
    topics: Optional[List[str]] = Field(None, description="Main topics or themes covered")

AI Agent Integration

async def extract_structured_data(text: str, extraction_type: str) -> dict:
    """AI agent function for structured data extraction."""

    models = {
        "person": PersonInfo,
        "company": CompanyInfo, 
        "event": EventInfo
    }

    if extraction_type not in models:
        raise ValueError(f"Unknown extraction type: {extraction_type}")

    model_class = models[extraction_type]
    schema = json.dumps(model_class.model_json_schema())

    async with AsyncBookWyrmClient() as client:
        async for response in client.stream_summarize(
            content=text,
            model_name=model_class.__name__,
            model_schema_json=schema,
            model_strength="smart"
        ):
            if hasattr(response, 'summary'):
                try:
                    structured_data = json.loads(response.summary)
                    validated_data = model_class.model_validate(structured_data)
                    return validated_data.model_dump()
                except (json.JSONDecodeError, ValueError) as e:
                    return {"error": f"Failed to parse structured output: {e}"}

    return {"error": "No response received"}

# Usage in AI agents
person_data = await extract_structured_data(resume_text, "person")
company_data = await extract_structured_data(company_description, "company")
event_data = await extract_structured_data(event_announcement, "event")

Batch Structured Processing

async def batch_extract_structured_data(texts: List[str], model_class: BaseModel) -> List[dict]:
    """Process multiple texts with the same structured model."""

    schema = json.dumps(model_class.model_json_schema())
    results = []

    async def process_single(text: str) -> dict:
        async with AsyncBookWyrmClient() as client:
            async for response in client.stream_summarize(
                content=text,
                model_name=model_class.__name__,
                model_schema_json=schema,
                model_strength="smart"
            ):
                if hasattr(response, 'summary'):
                    try:
                        return json.loads(response.summary)
                    except json.JSONDecodeError:
                        return {"error": "Invalid JSON response"}
        return {"error": "No response"}

    # Process all texts concurrently
    import asyncio
    results = await asyncio.gather(*[process_single(text) for text in texts])
    return results

# Usage
resume_texts = ["Resume 1 content...", "Resume 2 content...", "Resume 3 content..."]
person_data_list = await batch_extract_structured_data(resume_texts, PersonInfo)

Error Handling for Structured Output

async def robust_structured_extraction(text: str, model_class: BaseModel, max_retries: int = 3) -> dict:
    """Structured extraction with error handling and retries."""

    schema = json.dumps(model_class.model_json_schema())

    for attempt in range(max_retries):
        try:
            async with AsyncBookWyrmClient() as client:
                async for response in client.stream_summarize(
                    content=text,
                    model_name=model_class.__name__,
                    model_schema_json=schema,
                    model_strength="smart" if attempt == 0 else "clever"  # Upgrade model on retry
                ):
                    if hasattr(response, 'summary'):
                        try:
                            structured_data = json.loads(response.summary)
                            validated_data = model_class.model_validate(structured_data)
                            return {"success": True, "data": validated_data.model_dump()}
                        except json.JSONDecodeError as e:
                            if attempt == max_retries - 1:
                                return {"success": False, "error": f"JSON parsing failed: {e}", "raw": response.summary}
                            continue  # Retry with better model
                        except ValueError as e:
                            if attempt == max_retries - 1:
                                return {"success": False, "error": f"Validation failed: {e}", "raw": response.summary}
                            continue  # Retry with better model
        except Exception as e:
            if attempt == max_retries - 1:
                return {"success": False, "error": f"API error: {e}"}
            await asyncio.sleep(2 ** attempt)  # Exponential backoff

    return {"success": False, "error": "Max retries exceeded"}

Core Models

bookwyrm.models.TextSpan

Bases: Text, Span

Text content with character position information.

options: show_root_heading: true members_order: source show_bases: true inherited_members: true

bookwyrm.models.Citation

Bases: BaseModel

A citation found in response to a question.

Citations include the relevant text, reasoning for why it's relevant, and a quality score indicating how well it answers the question.

start_chunk class-attribute instance-attribute

start_chunk: int = Field(
    ..., description="Starting chunk index (inclusive)"
)

end_chunk class-attribute instance-attribute

end_chunk: int = Field(
    ..., description="Ending chunk index (inclusive)"
)

text class-attribute instance-attribute

text: str = Field(
    ..., description="The citation text content"
)

reasoning class-attribute instance-attribute

reasoning: str = Field(
    ...,
    description="Explanation of why this citation is relevant",
)

quality class-attribute instance-attribute

quality: int = Field(
    ...,
    description="Quality score (0-4): 0=unrelated, 4=perfectly answers",
)

question_index class-attribute instance-attribute

question_index: Optional[int] = Field(
    None,
    description="1-based index of the question this citation answers (only present for multi-question requests)",
)

options: show_root_heading: true members_order: source show_bases: true inherited_members: true

bookwyrm.models.CitationResponse

Bases: BaseModel

Response containing citation results and usage information.

This is the response from non-streaming citation requests.

citations class-attribute instance-attribute

citations: List[Citation] = Field(
    ..., description="List of found citations"
)

total_citations class-attribute instance-attribute

total_citations: int = Field(
    ..., description="Total number of citations found"
)

usage class-attribute instance-attribute

usage: Optional[UsageInfo] = Field(
    None, description="Usage and billing information"
)

options: show_root_heading: true members_order: source show_bases: true inherited_members: true

bookwyrm.models.PDFPage

Bases: BaseModel

Data for a single PDF page with unified layout regions.

page_number class-attribute instance-attribute

page_number: int = Field(
    ..., description="The page number (1-based)"
)

layout_regions class-attribute instance-attribute

layout_regions: List[UnifiedLayoutRegion] = Field(
    default_factory=list,
    description="Unified list of all detected layout regions with typed content",
)

reading_order class-attribute instance-attribute

reading_order: Optional[List[int]] = Field(
    default=None,
    description="Global reading order indices for all content elements",
)

from_runpod_page_data classmethod

from_runpod_page_data(page_data: dict) -> PDFPage

Create PDFPage from runpod-pdf PageData format.

Source code in bookwyrm/models.py
@classmethod
def from_runpod_page_data(cls, page_data: dict) -> "PDFPage":
    """Create PDFPage from runpod-pdf PageData format."""
    return cls(**page_data)

get_text_content

get_text_content() -> List[TextContent]

Extract all text content from layout regions.

Source code in bookwyrm/models.py
def get_text_content(self) -> List[TextContent]:
    """Extract all text content from layout regions."""
    return [
        region.content
        for region in self.layout_regions
        if region.content.content_type == ContentType.TEXT
    ]

get_table_content

get_table_content() -> List[TableContent]

Extract all table content from layout regions.

Source code in bookwyrm/models.py
def get_table_content(self) -> List[TableContent]:
    """Extract all table content from layout regions."""
    return [
        region.content
        for region in self.layout_regions
        if region.content.content_type == ContentType.TABLE
    ]

get_image_content

get_image_content() -> List[ImageContent]

Extract all image content from layout regions.

Source code in bookwyrm/models.py
def get_image_content(self) -> List[ImageContent]:
    """Extract all image content from layout regions."""
    return [
        region.content
        for region in self.layout_regions
        if region.content.content_type == ContentType.IMAGE
    ]

get_formula_content

get_formula_content() -> List[FormulaContent]

Extract all formula content from layout regions.

Source code in bookwyrm/models.py
def get_formula_content(self) -> List[FormulaContent]:
    """Extract all formula content from layout regions."""
    return [
        region.content
        for region in self.layout_regions
        if region.content.content_type == ContentType.FORMULA
    ]

get_seal_content

get_seal_content() -> List[SealContent]

Extract all seal content from layout regions.

Source code in bookwyrm/models.py
def get_seal_content(self) -> List[SealContent]:
    """Extract all seal content from layout regions."""
    return [
        region.content
        for region in self.layout_regions
        if region.content.content_type == ContentType.SEAL
    ]

to_legacy_text_blocks

to_legacy_text_blocks() -> List[PDFTextElement]

Convert layout regions to legacy text blocks format for backward compatibility.

Source code in bookwyrm/models.py
def to_legacy_text_blocks(self) -> List[PDFTextElement]:
    """Convert layout regions to legacy text blocks format for backward compatibility."""
    legacy_blocks = []
    for region in self.layout_regions:
        if region.content.content_type == ContentType.TEXT:
            text_content = region.content
            legacy_block = PDFTextElement(
                text=text_content.text or "",
                confidence=text_content.confidence or 1.0,
                bbox=region.bbox,
                coordinates=region.coordinates,
            )
            legacy_blocks.append(legacy_block)
    return legacy_blocks

options: show_root_heading: true members_order: source show_bases: true inherited_members: true

bookwyrm.models.ClassifyResponse

Bases: BaseModel

Response model for classification results.

Contains the classification results along with file metadata.

classification class-attribute instance-attribute

classification: FileClassification = Field(
    ..., description="The file classification results"
)

file_size class-attribute instance-attribute

file_size: int = Field(
    ..., description="Size of the file in bytes"
)

sample_preview class-attribute instance-attribute

sample_preview: Optional[str] = Field(
    None,
    description="First few characters if text-based file",
)

options: show_root_heading: true members_order: source show_bases: true inherited_members: true

bookwyrm.models.SummaryResponse

Bases: BaseModel

Response model for summarization results.

Contains the final summary and metadata about the summarization process.

type class-attribute instance-attribute

type: Literal["summary"] = Field(
    "summary", description="Message type identifier"
)

summary class-attribute instance-attribute

summary: str = Field(
    ...,
    description="The final summary text or structured JSON",
)

subsummary_count class-attribute instance-attribute

subsummary_count: int = Field(
    ...,
    description="Number of intermediate summaries created",
)

levels_used class-attribute instance-attribute

levels_used: int = Field(
    ..., description="Number of hierarchical levels used"
)

total_tokens class-attribute instance-attribute

total_tokens: int = Field(
    ..., description="Total tokens processed"
)

intermediate_summaries class-attribute instance-attribute

intermediate_summaries: Optional[List[List[str]]] = Field(
    None,
    description="Debug information with summaries by level",
)

options: show_root_heading: true members_order: source show_bases: true inherited_members: true

bookwyrm.models.TextResult

Bases: Text

A simple text result without position information.

Used when ResponseFormat.TEXT_ONLY is specified in phrasal processing.

type class-attribute instance-attribute

type: Literal["text"] = Field(
    "text", description="Message type identifier"
)

options: show_root_heading: true members_order: source show_bases: true inherited_members: true

bookwyrm.models.TextSpanResult

Bases: TextSpan

A text span result with position information.

Used when ResponseFormat.WITH_OFFSETS is specified in phrasal processing. Inherits from TextSpan to include position data.

type class-attribute instance-attribute

type: Literal["text_span"] = Field(
    "text_span", description="Message type identifier"
)

options: show_root_heading: true members_order: source show_bases: true inherited_members: true

Synchronous Client Methods

bookwyrm.BookWyrmClient.classify

classify(
    *,
    content: Optional[str] = None,
    content_bytes: Optional[bytes] = None,
    filename: Optional[str] = None,
    content_encoding: ContentEncoding = ContentEncoding.RAW
) -> ClassifyResponse

Classify file content to determine file type and format.

This method analyzes file content to determine format type, content type, MIME type, and other classification details. It supports both binary and text files, providing confidence scores and additional metadata about the detected format.

Parameters:

Name Type Description Default
content Optional[str]

Text or encoded file content

None
content_bytes Optional[bytes]

Raw file bytes

None
filename Optional[str]

Optional filename hint for classification

None
content_encoding ContentEncoding

Content encoding format (ContentEncoding enum)

RAW

Returns:

Type Description
ClassifyResponse

Classification response with detected file type, confidence score, and additional details

Raises:

Type Description
BookWyrmAPIError

If the API request fails (network, authentication, server errors)

Examples:

Classify using raw bytes directly (recommended):

# Read file as binary
with open("document.pdf", "rb") as f:
    file_bytes = f.read()

response = client.classify(
    content_bytes=file_bytes,
    filename="document.pdf"
)
print(f"Format: {response.classification.format_type}")
print(f"Content Type: {response.classification.content_type}")
print(f"MIME Type: {response.classification.mime_type}")
print(f"Confidence: {response.classification.confidence:.2%}")

Classify text content with UTF-8 encoding:

with open("script.py", "r") as f:
    text_content = f.read()

response = client.classify(
    content=text_content,
    filename="script.py",
    content_encoding=ContentEncoding.UTF8
)
print(f"Detected as: {response.classification.content_type}")

Classify base64-encoded content:

import base64

with open("image.png", "rb") as f:
    raw_bytes = f.read()

base64_content = base64.b64encode(raw_bytes).decode('ascii')

response = client.classify(
    content=base64_content,
    filename="image.png",
    content_encoding=ContentEncoding.BASE64
)
print(f"Detected as: {response.classification.content_type}")
Source code in bookwyrm/client.py
def classify(
    self,
    *,
    content: Optional[str] = None,
    content_bytes: Optional[bytes] = None,
    filename: Optional[str] = None,
    content_encoding: ContentEncoding = ContentEncoding.RAW,
) -> ClassifyResponse:
    """Classify file content to determine file type and format.

    This method analyzes file content to determine format type, content type, MIME type,
    and other classification details. It supports both binary and text files, providing
    confidence scores and additional metadata about the detected format.

    Args:
        content: Text or encoded file content
        content_bytes: Raw file bytes
        filename: Optional filename hint for classification
        content_encoding: Content encoding format (ContentEncoding enum)

    Returns:
        Classification response with detected file type, confidence score, and additional details

    Raises:
        BookWyrmAPIError: If the API request fails (network, authentication, server errors)

    Examples:
        Classify using raw bytes directly (recommended):

        ```python
        # Read file as binary
        with open("document.pdf", "rb") as f:
            file_bytes = f.read()

        response = client.classify(
            content_bytes=file_bytes,
            filename="document.pdf"
        )
        print(f"Format: {response.classification.format_type}")
        print(f"Content Type: {response.classification.content_type}")
        print(f"MIME Type: {response.classification.mime_type}")
        print(f"Confidence: {response.classification.confidence:.2%}")
        ```

        Classify text content with UTF-8 encoding:

        ```python
        with open("script.py", "r") as f:
            text_content = f.read()

        response = client.classify(
            content=text_content,
            filename="script.py",
            content_encoding=ContentEncoding.UTF8
        )
        print(f"Detected as: {response.classification.content_type}")
        ```

        Classify base64-encoded content:

        ```python
        import base64

        with open("image.png", "rb") as f:
            raw_bytes = f.read()

        base64_content = base64.b64encode(raw_bytes).decode('ascii')

        response = client.classify(
            content=base64_content,
            filename="image.png",
            content_encoding=ContentEncoding.BASE64
        )
        print(f"Detected as: {response.classification.content_type}")
        ```
    """
    if content is None and content_bytes is None:
        raise ValueError("Either content or content_bytes is required")

    request = ClassifyRequest(
        content=content,
        content_bytes=content_bytes,
        filename=filename,
        content_encoding=content_encoding,
    )
    headers: Dict[str, str] = {**DEFAULT_HEADERS}
    if self.api_key:
        headers["Authorization"] = f"Bearer {self.api_key}"

    try:
        # Handle marshalling at API level
        if request.content_bytes is not None:
            file_bytes: bytes = request.content_bytes
        elif request.content is not None:
            # Handle content based on encoding
            if request.content_encoding == ContentEncoding.BASE64:
                # Decode base64 content
                import base64

                file_bytes = base64.b64decode(request.content)
            elif request.content_encoding == ContentEncoding.UTF8:
                # Encode UTF-8 text to bytes
                file_bytes = request.content.encode("utf-8")
            elif request.content_encoding == ContentEncoding.RAW:
                # Treat as raw bytes (assume content is already bytes-like)
                # This case should typically use content_bytes instead
                file_bytes = request.content.encode("latin-1")  # Preserve raw bytes
            else:
                raise BookWyrmAPIError(
                    f"Unsupported content encoding: {request.content_encoding}"
                )
        else:
            raise BookWyrmAPIError(
                "Either content or content_bytes must be provided"
            )

        files: Dict[str, tuple] = {
            "file": (request.filename or "document", file_bytes)
        }
        response: requests.Response = self.session.post(
            f"{self.base_url}/classify",
            files=files,
            headers=headers,
            timeout=self.timeout,
        )

        response.raise_for_status()
        _check_deprecation_headers(response)
        response_data: Dict[str, Any] = response.json()
        return ClassifyResponse.model_validate(response_data)
    except requests.HTTPError as e:
        raise _marshal_http_error(e)
    except requests.RequestException as e:
        raise BookWyrmAPIError(f"Request failed: {e}")

options: show_root_heading: true

bookwyrm.BookWyrmClient.stream_process_text

stream_process_text(
    *,
    text: Optional[str] = None,
    text_url: Optional[str] = None,
    chunk_size: Optional[int] = None,
    response_format: Union[
        ResponseFormat, Literal["offsets", "text_only"]
    ] = ResponseFormat.WITH_OFFSETS,
    offsets: Optional[bool] = None,
    text_only: Optional[bool] = None
) -> Iterator[StreamingPhrasalResponse]

Stream text processing using phrasal analysis with real-time results.

This method breaks down text into meaningful phrases or chunks using NLP, supporting both direct text input and URLs. It can create fixed-size chunks or extract individual phrases with optional position information.

Parameters:

Name Type Description Default
text Optional[str]

Text content to process

None
text_url Optional[str]

URL to fetch text from

None
chunk_size Optional[int]

Optional chunk size for fixed-size chunking

None
response_format Union[ResponseFormat, Literal['offsets', 'text_only']]

Response format - use ResponseFormat enum, "offsets", or "text_only"

WITH_OFFSETS
offsets Optional[bool]

Set to True for WITH_OFFSETS format (boolean flag)

None
text_only Optional[bool]

Set to True for TEXT_ONLY format (boolean flag)

None

Yields:

Name Type Description
StreamingPhrasalResponse StreamingPhrasalResponse

Union of progress updates and phrase/chunk results

Raises:

Type Description
BookWyrmAPIError

If the API request fails (network, authentication, server errors)

Examples:

Extract phrases from text with position offsets:

from bookwyrm import BookWyrmClient
from bookwyrm.models import ResponseFormat, TextResult, TextSpanResult

text = "Natural language processing (NLP) is a subfield of linguistics, computer science, and artificial intelligence concerned with the interactions between computers and human language."

client = BookWyrmClient(api_key="your-api-key")
phrases = []
for response in client.stream_process_text(
    text=text,
    offsets=True,  # or response_format="with_offsets" or ResponseFormat.WITH_OFFSETS
):
    if isinstance(response, (TextResult, TextSpanResult)):  # Phrase result
        phrases.append(response)
        print(f"Phrase: {response.text}")
        if isinstance(response, TextSpanResult):
            print(f"Position: {response.start_char}-{response.end_char}")

Create bounded phrasal chunks:

from bookwyrm import BookWyrmClient
from bookwyrm.models import TextResult, TextSpanResult

client = BookWyrmClient(api_key="your-api-key")
chunks = []
for response in client.stream_process_text(
    text=long_text,
    chunk_size=1000,  # chunks composed of phrases, not exceeding ~1000 characters
    offsets=True  # boolean flag for WITH_OFFSETS
):
    if isinstance(response, (TextResult, TextSpanResult)):
        chunks.append(response)

print(f"Created {len(chunks)} chunks")

Process text from URL:

from bookwyrm import BookWyrmClient
from bookwyrm.models import TextResult, TextSpanResult

client = BookWyrmClient(api_key="your-api-key")
phrases = []
for response in client.stream_process_text(
    text_url="https://www.gutenberg.org/files/11/11-0.txt",
    chunk_size=2000,
    text_only=True
):
    if isinstance(response, (TextResult, TextSpanResult)):
        phrases.append(response)

print(f"Processed {len(phrases)} phrases from URL")
Source code in bookwyrm/client.py
def stream_process_text(
    self,
    *,
    text: Optional[str] = None,
    text_url: Optional[str] = None,
    chunk_size: Optional[int] = None,
    response_format: Union[
        ResponseFormat, Literal["offsets", "text_only"]
    ] = ResponseFormat.WITH_OFFSETS,
    # Boolean flags for response format
    offsets: Optional[bool] = None,
    text_only: Optional[bool] = None,
) -> Iterator[StreamingPhrasalResponse]:
    """Stream text processing using phrasal analysis with real-time results.

    This method breaks down text into meaningful phrases or chunks using NLP,
    supporting both direct text input and URLs. It can create fixed-size chunks
    or extract individual phrases with optional position information.

    Args:
        text: Text content to process
        text_url: URL to fetch text from
        chunk_size: Optional chunk size for fixed-size chunking
        response_format: Response format - use ResponseFormat enum, "offsets", or "text_only"
        offsets: Set to True for WITH_OFFSETS format (boolean flag)
        text_only: Set to True for TEXT_ONLY format (boolean flag)

    Yields:
        StreamingPhrasalResponse: Union of progress updates and phrase/chunk results

    Raises:
        BookWyrmAPIError: If the API request fails (network, authentication, server errors)

    Examples:
        Extract phrases from text with position offsets:

        ```python
        from bookwyrm import BookWyrmClient
        from bookwyrm.models import ResponseFormat, TextResult, TextSpanResult

        text = "Natural language processing (NLP) is a subfield of linguistics, computer science, and artificial intelligence concerned with the interactions between computers and human language."

        client = BookWyrmClient(api_key="your-api-key")
        phrases = []
        for response in client.stream_process_text(
            text=text,
            offsets=True,  # or response_format="with_offsets" or ResponseFormat.WITH_OFFSETS
        ):
            if isinstance(response, (TextResult, TextSpanResult)):  # Phrase result
                phrases.append(response)
                print(f"Phrase: {response.text}")
                if isinstance(response, TextSpanResult):
                    print(f"Position: {response.start_char}-{response.end_char}")
        ```

        Create bounded phrasal chunks:

        ```python
        from bookwyrm import BookWyrmClient
        from bookwyrm.models import TextResult, TextSpanResult

        client = BookWyrmClient(api_key="your-api-key")
        chunks = []
        for response in client.stream_process_text(
            text=long_text,
            chunk_size=1000,  # chunks composed of phrases, not exceeding ~1000 characters
            offsets=True  # boolean flag for WITH_OFFSETS
        ):
            if isinstance(response, (TextResult, TextSpanResult)):
                chunks.append(response)

        print(f"Created {len(chunks)} chunks")
        ```

        Process text from URL:

        ```python
        from bookwyrm import BookWyrmClient
        from bookwyrm.models import TextResult, TextSpanResult

        client = BookWyrmClient(api_key="your-api-key")
        phrases = []
        for response in client.stream_process_text(
            text_url="https://www.gutenberg.org/files/11/11-0.txt",
            chunk_size=2000,
            text_only=True
        ):
            if isinstance(response, (TextResult, TextSpanResult)):
                phrases.append(response)

        print(f"Processed {len(phrases)} phrases from URL")
        ```
    """
    if text is None and text_url is None:
        raise ValueError("Either text or text_url is required")

    # Handle boolean flags for response format
    boolean_flags = [offsets, text_only]
    true_flags = [flag for flag in boolean_flags if flag is True]

    if len(true_flags) > 1:
        raise ValueError("Only one response format flag can be True")

    if len(true_flags) == 1:
        if offsets:
            response_format = ResponseFormat.WITH_OFFSETS
        elif text_only:
            response_format = ResponseFormat.TEXT_ONLY

    # Convert string to enum if needed
    if isinstance(response_format, str):
        if response_format.lower() in ("with_offsets", "offsets"):
            response_format = ResponseFormat.WITH_OFFSETS
        elif response_format.lower() in ("text_only", "text"):
            response_format = ResponseFormat.TEXT_ONLY
        else:
            raise ValueError(
                f"Invalid response_format: {response_format}. Use 'with_offsets'/'offsets' or 'text_only'/'text'"
            )

    request = ProcessTextRequest(
        text=text,
        text_url=text_url,
        chunk_size=chunk_size,
        response_format=response_format,
    )
    headers = {**DEFAULT_HEADERS, "Content-Type": "application/json"}
    if self.api_key:
        headers["Authorization"] = f"Bearer {self.api_key}"

    try:
        request_data = request.model_dump(exclude_none=True)

        # Debug: Print the HTTP request details if BOOKWYRM_DEBUG is set
        if os.getenv("BOOKWYRM_DEBUG") == "1":
            print(f"DEBUG: Making POST request to: {self.base_url}/phrasal")
            print(f"DEBUG: Request headers: {headers}")
            print(f"DEBUG: Request JSON data: {json.dumps(request_data, indent=2)}")

        response: requests.Response = self.session.post(
            f"{self.base_url}/phrasal/sse",
            json=request_data,
            headers=headers,
            stream=True,
            timeout=self.timeout,
        )

        # Debug: Print response details if BOOKWYRM_DEBUG is set
        if os.getenv("BOOKWYRM_DEBUG") == "1":
            print(f"DEBUG: Response status code: {response.status_code}")
            print(f"DEBUG: Response headers: {dict(response.headers)}")

        response.raise_for_status()
        _check_deprecation_headers(response)

        # Use SSEClient for proper SSE parsing
        event_count = 0
        client = SSEClient(response)
        for event in client.events():
            event_count += 1

            # Debug: Print every event received if BOOKWYRM_DEBUG is set
            if os.getenv("BOOKWYRM_DEBUG") == "1":
                print(
                    f"DEBUG: Event {event_count} - type: {event.event}, data: {repr(event.data)}"
                )

            # Always yield raw event info for debugging if BOOKWYRM_DEBUG is set
            if os.getenv("BOOKWYRM_DEBUG") == "1":
                from types import SimpleNamespace

                raw_event_response = SimpleNamespace()
                raw_event_response.type = "raw_event_debug"
                raw_event_response.event_type = event.event
                raw_event_response.event_data = event.data
                raw_event_response.event_id = event.id
                yield raw_event_response

            if event.data and event.data.strip():
                try:
                    data: Dict[str, Any] = json.loads(event.data)

                    # Use the event type, or fall back to data.type
                    event_type = event.event or data.get("type")

                    match event_type:
                        case "progress":
                            yield PhraseProgressUpdate.model_validate(data)
                        case "text":
                            yield TextResult.model_validate(data)
                        case "text_span":
                            yield TextSpanResult.model_validate(data)
                        case _:
                            # Unknown response type - create a generic response object for debugging
                            from types import SimpleNamespace

                            unknown_response = SimpleNamespace(**data)
                            unknown_response.type = event_type
                            yield unknown_response
                except json.JSONDecodeError as e:
                    # Create a debug object for malformed JSON
                    from types import SimpleNamespace

                    error_response = SimpleNamespace()
                    error_response.type = "json_decode_error"
                    error_response.raw_data = event.data
                    error_response.error = str(e)
                    yield error_response

    except requests.HTTPError as e:
        raise _marshal_http_error(e)
    except requests.RequestException as e:
        raise BookWyrmAPIError(f"Request failed: {e}")

options: show_root_heading: true

bookwyrm.BookWyrmClient.stream_citations

stream_citations(
    *,
    chunks: Optional[List[TextSpan]] = None,
    jsonl_content: Optional[str] = None,
    jsonl_url: Optional[str] = None,
    question: Union[str, List[str]],
    start: Optional[int] = 0,
    limit: Optional[int] = None,
    max_tokens_per_chunk: Optional[int] = 1000,
    model_strength: ModelStrength = ModelStrength.SWIFT
) -> Iterator[StreamingCitationResponse]

Stream citations as they are found with real-time progress updates.

This method provides real-time streaming of citation results, allowing you to process citations as they're found rather than waiting for all results. Useful for large datasets or when you want to show progress to users.

Parameters:

Name Type Description Default
chunks Optional[List[TextSpan]]

List of text chunks to search

None
jsonl_content Optional[str]

Raw JSONL content as string

None
jsonl_url Optional[str]

URL to fetch JSONL content from

None
question Union[str, List[str]]

The question(s) to find citations for - can be a single string or list of strings

required
start Optional[int]

Starting chunk index (0-based)

0
limit Optional[int]

Maximum number of chunks to process

None
max_tokens_per_chunk Optional[int]

Maximum tokens per chunk

1000

Yields:

Name Type Description
StreamingCitationResponse StreamingCitationResponse

Union of progress updates, individual citations,

StreamingCitationResponse

final summary, or error messages

Raises:

Type Description
BookWyrmAPIError

If the API request fails (network, authentication, server errors)

Examples:

Basic streaming with single question:

from bookwyrm import BookWyrmClient
from bookwyrm.models import TextSpan, CitationProgressUpdate, CitationStreamResponse, CitationSummaryResponse

# Create some example chunks
chunks = [
    TextSpan(text="The sky is blue due to Rayleigh scattering.", start_char=0, end_char=42),
    TextSpan(text="Water molecules are polar.", start_char=43, end_char=69),
    TextSpan(text="Plants appear green due to chlorophyll.", start_char=70, end_char=109)
]

client = BookWyrmClient(api_key="your-api-key")
citations = []
for response in client.stream_citations(
    chunks=chunks,
    question="Why is the sky blue?"
):
    if isinstance(response, CitationProgressUpdate):  # Progress update
        print(f"Progress: {response.message}")
    elif isinstance(response, CitationStreamResponse):  # Citation found
        citations.append(response.citation)
        print(f"Found: {response.citation.text[:50]}...")
    elif isinstance(response, CitationSummaryResponse):  # Summary
        print(f"Complete: {response.total_citations} citations found")

Multiple questions:

questions = [
    "Why is the sky blue?",
    "What causes plants to be green?",
    "How do water molecules behave?"
]

for response in client.stream_citations(
    chunks=chunks,
    question=questions
):
    if isinstance(response, CitationStreamResponse):
        citation = response.citation
        if citation.question_index:
            print(f"Question {citation.question_index}: {citation.text[:50]}...")
        else:
            print(f"Citation: {citation.text[:50]}...")
Source code in bookwyrm/client.py
def stream_citations(
    self,
    *,
    chunks: Optional[List[TextSpan]] = None,
    jsonl_content: Optional[str] = None,
    jsonl_url: Optional[str] = None,
    question: Union[str, List[str]],
    start: Optional[int] = 0,
    limit: Optional[int] = None,
    max_tokens_per_chunk: Optional[int] = 1000,
    model_strength: ModelStrength = ModelStrength.SWIFT,
) -> Iterator[StreamingCitationResponse]:
    """Stream citations as they are found with real-time progress updates.

    This method provides real-time streaming of citation results, allowing you to
    process citations as they're found rather than waiting for all results. Useful
    for large datasets or when you want to show progress to users.

    Args:
        chunks: List of text chunks to search
        jsonl_content: Raw JSONL content as string
        jsonl_url: URL to fetch JSONL content from
        question: The question(s) to find citations for - can be a single string or list of strings
        start: Starting chunk index (0-based)
        limit: Maximum number of chunks to process
        max_tokens_per_chunk: Maximum tokens per chunk

    Yields:
        StreamingCitationResponse: Union of progress updates, individual citations,
        final summary, or error messages

    Raises:
        BookWyrmAPIError: If the API request fails (network, authentication, server errors)

    Examples:
        Basic streaming with single question:

        ```python
        from bookwyrm import BookWyrmClient
        from bookwyrm.models import TextSpan, CitationProgressUpdate, CitationStreamResponse, CitationSummaryResponse

        # Create some example chunks
        chunks = [
            TextSpan(text="The sky is blue due to Rayleigh scattering.", start_char=0, end_char=42),
            TextSpan(text="Water molecules are polar.", start_char=43, end_char=69),
            TextSpan(text="Plants appear green due to chlorophyll.", start_char=70, end_char=109)
        ]

        client = BookWyrmClient(api_key="your-api-key")
        citations = []
        for response in client.stream_citations(
            chunks=chunks,
            question="Why is the sky blue?"
        ):
            if isinstance(response, CitationProgressUpdate):  # Progress update
                print(f"Progress: {response.message}")
            elif isinstance(response, CitationStreamResponse):  # Citation found
                citations.append(response.citation)
                print(f"Found: {response.citation.text[:50]}...")
            elif isinstance(response, CitationSummaryResponse):  # Summary
                print(f"Complete: {response.total_citations} citations found")
        ```

        Multiple questions:

        ```python
        questions = [
            "Why is the sky blue?",
            "What causes plants to be green?",
            "How do water molecules behave?"
        ]

        for response in client.stream_citations(
            chunks=chunks,
            question=questions
        ):
            if isinstance(response, CitationStreamResponse):
                citation = response.citation
                if citation.question_index:
                    print(f"Question {citation.question_index}: {citation.text[:50]}...")
                else:
                    print(f"Citation: {citation.text[:50]}...")
        ```
    """
    # Validate question(s)
    if isinstance(question, str):
        if not question or not question.strip():
            raise ValueError("question cannot be empty")
    elif isinstance(question, list):
        if not question:
            raise ValueError("question list cannot be empty")
        if len(question) > 20:
            raise ValueError("question list cannot contain more than 20 questions")
        for i, q in enumerate(question):
            if not q or not q.strip():
                raise ValueError(f"question at index {i} cannot be empty")
    else:
        raise ValueError("question must be a string or list of strings")

    # Handle empty chunks list - return empty response immediately
    if chunks is not None and len(chunks) == 0:
        yield CitationSummaryResponse(
            total_citations=0,
            chunks_processed=0,
            token_chunks_processed=0,
            start_offset=0,
            usage=UsageInfo(
                tokens_processed=0,
                chunks_processed=0,
                estimated_cost=None,
                remaining_credits=0.0,
            ),
        )
        return

    request = CitationRequest(
        chunks=chunks,
        jsonl_content=jsonl_content,
        jsonl_url=jsonl_url,
        question=question,
        start=start,
        limit=limit,
        max_tokens_per_chunk=max_tokens_per_chunk,
        model_strength=model_strength,
    )
    headers = {**DEFAULT_HEADERS, "Content-Type": "application/json"}
    if self.api_key:
        headers["Authorization"] = f"Bearer {self.api_key}"

    try:
        response: requests.Response = self.session.post(
            f"{self.base_url}/cite/sse",
            json=request.model_dump(exclude_none=True),
            headers=headers,
            stream=True,
            timeout=self.timeout,
        )
        response.raise_for_status()
        _check_deprecation_headers(response)

        # Use SSEClient for proper SSE parsing
        client = SSEClient(response)
        for event in client.events():
            if event.data and event.data.strip():
                try:
                    data: Dict[str, Any] = json.loads(event.data)

                    # Use the event type, or fall back to data.type
                    event_type = event.event or data.get("type")

                    match event_type:
                        case "progress":
                            # SSE endpoint sends ProgressUpdate, convert to CitationProgressUpdate
                            progress_data = {
                                "type": "progress",
                                "chunks_processed": data.get("chunks_processed", 0),
                                "total_chunks": data.get("total_chunks", 0),
                                "citations_found": data.get("citations_found", 0),
                                "current_chunk_range": data.get(
                                    "message", "Processing..."
                                ),  # Use message as range
                                "message": data.get("message", "Processing..."),
                            }
                            yield CitationProgressUpdate.model_validate(
                                progress_data
                            )
                        case "citation":
                            yield CitationStreamResponse.model_validate(data)
                        case "citation_span":
                            # Handle citation_span events as regular citations
                            citation_data = {
                                "type": "citation",
                                "citation": data.get("citation"),
                            }
                            yield CitationStreamResponse.model_validate(
                                citation_data
                            )
                        case "summary":
                            # SSE endpoint sends SummaryResult, convert to CitationSummaryResponse
                            summary_data = {
                                "type": "summary",
                                "total_citations": data.get("total_citations", 0),
                                "chunks_processed": data.get("chunks_processed", 0),
                                "token_chunks_processed": data.get(
                                    "token_chunks_processed", 0
                                ),
                                "start_offset": 0,  # SSE endpoint doesn't provide this, default to 0
                                "usage": data.get(
                                    "usage",
                                    {
                                        "tokens_processed": 0,
                                        "chunks_processed": 0,
                                        "remaining_credits": 0.0,
                                    },
                                ),
                            }
                            yield CitationSummaryResponse.model_validate(
                                summary_data
                            )
                        case "error":
                            yield CitationErrorResponse.model_validate(data)
                        case _:
                            # Unknown response type, skip
                            continue
                except json.JSONDecodeError:
                    # Skip malformed JSON lines
                    continue

    except requests.HTTPError as e:
        raise _marshal_http_error(e)
    except requests.RequestException as e:
        raise BookWyrmAPIError(f"Request failed: {e}")

options: show_root_heading: true

bookwyrm.BookWyrmClient.stream_summarize

stream_summarize(
    *,
    content: Optional[str] = None,
    url: Optional[str] = None,
    phrases: Optional[List[TextSpan]] = None,
    max_tokens: int = 10000,
    model_strength: str = "swift",
    debug: bool = False,
    model_name: Optional[str] = None,
    model_schema_json: Optional[str] = None,
    summary_class: Optional[Type[BaseModel]] = None,
    chunk_prompt: Optional[str] = None,
    summary_of_summaries_prompt: Optional[str] = None
) -> Iterator[StreamingSummarizeResponse]

Stream summarization progress and results with real-time updates.

This method provides real-time streaming of summarization progress, including hierarchical processing updates, retry attempts, and final results. Useful for long-running summarization tasks where you want to show progress to users.

Parameters:

Name Type Description Default
content Optional[str]

Text content to summarize

None
url Optional[str]

URL to fetch content from

None
phrases Optional[List[TextSpan]]

List of text phrases to summarize

None
max_tokens int

Maximum tokens for chunking (default: 10000)

10000
debug bool

Include intermediate summaries in response

False

Yields:

Name Type Description
StreamingSummarizeResponse StreamingSummarizeResponse

Union of progress updates, final summary, rate limit messages,

StreamingSummarizeResponse

structural error messages, or general errors

Raises:

Type Description
BookWyrmAPIError

If the API request fails (network, authentication, server errors)

Examples:

Basic streaming:

final_result = None
for response in client.stream_summarize(
    content=content,
    max_tokens=5000,
    debug=True
):
    if isinstance(response, SummarizeProgressUpdate):  # Progress update
        print(f"Progress: {response.message}")
    elif isinstance(response, SummaryResponse):  # Final summary
        final_result = response
        print(f"Summary complete!")

if final_result:
    print(final_result.summary)
Source code in bookwyrm/client.py
def stream_summarize(
    self,
    *,
    content: Optional[str] = None,
    url: Optional[str] = None,
    phrases: Optional[List[TextSpan]] = None,
    max_tokens: int = 10000,
    model_strength: str = "swift",
    debug: bool = False,
    model_name: Optional[str] = None,
    model_schema_json: Optional[str] = None,
    summary_class: Optional[Type[BaseModel]] = None,
    chunk_prompt: Optional[str] = None,
    summary_of_summaries_prompt: Optional[str] = None,
) -> Iterator[StreamingSummarizeResponse]:
    """Stream summarization progress and results with real-time updates.

    This method provides real-time streaming of summarization progress, including
    hierarchical processing updates, retry attempts, and final results. Useful for
    long-running summarization tasks where you want to show progress to users.

    Args:
        content: Text content to summarize
        url: URL to fetch content from
        phrases: List of text phrases to summarize
        max_tokens: Maximum tokens for chunking (default: 10000)
        debug: Include intermediate summaries in response

    Yields:
        StreamingSummarizeResponse: Union of progress updates, final summary, rate limit messages,
        structural error messages, or general errors

    Raises:
        BookWyrmAPIError: If the API request fails (network, authentication, server errors)

    Examples:
        Basic streaming:

        ```python
        final_result = None
        for response in client.stream_summarize(
            content=content,
            max_tokens=5000,
            debug=True
        ):
            if isinstance(response, SummarizeProgressUpdate):  # Progress update
                print(f"Progress: {response.message}")
            elif isinstance(response, SummaryResponse):  # Final summary
                final_result = response
                print(f"Summary complete!")

        if final_result:
            print(final_result.summary)
        ```
    """
    sources = [content, url, phrases]
    provided_sources = [s for s in sources if s is not None]
    if len(provided_sources) != 1:
        raise ValueError("Exactly one of content, url, or phrases must be provided")

    request = SummarizeRequest(
        content=content,
        url=url,
        phrases=phrases,
        max_tokens=max_tokens,
        model_strength=model_strength,
        debug=debug,
        model_name=model_name,
        model_schema_json=model_schema_json,
        summary_class=summary_class,
        chunk_prompt=chunk_prompt,
        summary_of_summaries_prompt=summary_of_summaries_prompt,
    )
    headers = {**DEFAULT_HEADERS, "Content-Type": "application/json"}
    if self.api_key:
        headers["Authorization"] = f"Bearer {self.api_key}"

    try:
        response: requests.Response = self.session.post(
            f"{self.base_url}/summarize/sse",
            json=request.model_dump(exclude_none=True),
            headers=headers,
            stream=True,
            timeout=self.timeout,
        )
        response.raise_for_status()
        _check_deprecation_headers(response)

        # Use SSEClient for proper SSE parsing
        client = SSEClient(response)
        for event in client.events():
            if event.data and event.data.strip():
                try:
                    data: Dict[str, Any] = json.loads(event.data)

                    # Use the event type, or fall back to data.type
                    event_type = event.event or data.get("type")

                    match event_type:
                        case "progress":
                            yield SummarizeProgressUpdate.model_validate(data)
                        case "data":
                            # SSE endpoint sends DataResult, but we need to convert to SummaryResponse
                            # Create a SummaryResponse from the data fields
                            summary_data = {
                                "type": "summary",  # Set the expected type
                                "summary": data.get("summary", ""),
                                "subsummary_count": data.get("subsummary_count", 0),
                                "levels_used": data.get("levels_used", 0),
                                "total_tokens": data.get("total_tokens", 0),
                                "intermediate_summaries": data.get(
                                    "intermediate_summaries"
                                ),
                            }
                            yield SummaryResponse.model_validate(summary_data)
                        case "error":
                            yield SummarizeErrorResponse.model_validate(data)
                        case "rate_limit":
                            yield RateLimitMessage.model_validate(data)
                        case "structural_error":
                            yield StructuralErrorMessage.model_validate(data)
                        case _:
                            # Unknown response type, skip
                            continue
                except json.JSONDecodeError:
                    # Skip malformed JSON lines
                    continue

    except requests.HTTPError as e:
        raise _marshal_http_error(e)
    except requests.RequestException as e:
        raise BookWyrmAPIError(f"Request failed: {e}")

options: show_root_heading: true

Asynchronous Client Methods

bookwyrm.AsyncBookWyrmClient.classify async

classify(
    *,
    content: Optional[str] = None,
    content_bytes: Optional[bytes] = None,
    filename: Optional[str] = None,
    content_encoding: str = "raw"
) -> ClassifyResponse

Classify file content to determine file type and format asynchronously.

This async method analyzes file content to determine format type, content type, MIME type, and other classification details. It supports both binary and text files, providing confidence scores and additional metadata about the detected format.

Parameters:

Name Type Description Default
content Optional[str]

File content as string (raw text or base64-encoded)

None
content_bytes Optional[bytes]

Raw file bytes

None
filename Optional[str]

Optional filename hint for classification

None
content_encoding str

Content encoding format ("raw" for plain text, "base64" for encoded)

'raw'

Returns:

Type Description
ClassifyResponse

Classification response with detected file type, confidence score, and additional details

Raises:

Type Description
BookWyrmAPIError

If the API request fails (network, authentication, server errors)

Examples:

Basic async file classification:

async def classify_file():
    # Read file as binary
    with open("document.pdf", "rb") as f:
        file_bytes = f.read()

    async with AsyncBookWyrmClient() as client:
        response = await client.classify(
            content_bytes=file_bytes,
            filename="document.pdf"
        )
        print(f"File type: {response.classification.format_type}")
        print(f"Confidence: {response.classification.confidence:.2%}")

asyncio.run(classify_file())

Classify multiple files concurrently:

async def classify_multiple_files():
    files = ["doc1.pdf", "script.py", "data.json", "image.jpg"]

    async def classify_single(filename):
        file_path = Path(filename)
        return filename, await client.classify(
            content_bytes=file_path.read_bytes(),
            filename=file_path.name
        )

    async with AsyncBookWyrmClient() as client:
        results = await asyncio.gather(*[
            classify_single(f) for f in files
        ])

        for filename, response in results:
            print(f"{filename}: {response.classification.content_type} ({response.classification.confidence:.1%})")
Source code in bookwyrm/async_client.py
async def classify(
    self,
    *,
    content: Optional[str] = None,
    content_bytes: Optional[bytes] = None,
    filename: Optional[str] = None,
    content_encoding: str = "raw",
) -> ClassifyResponse:
    """Classify file content to determine file type and format asynchronously.

    This async method analyzes file content to determine format type, content type, MIME type,
    and other classification details. It supports both binary and text files, providing
    confidence scores and additional metadata about the detected format.

    Args:
        content: File content as string (raw text or base64-encoded)
        content_bytes: Raw file bytes
        filename: Optional filename hint for classification
        content_encoding: Content encoding format ("raw" for plain text, "base64" for encoded)

    Returns:
        Classification response with detected file type, confidence score, and additional details

    Raises:
        BookWyrmAPIError: If the API request fails (network, authentication, server errors)

    Examples:
        Basic async file classification:

        ```python
        async def classify_file():
            # Read file as binary
            with open("document.pdf", "rb") as f:
                file_bytes = f.read()

            async with AsyncBookWyrmClient() as client:
                response = await client.classify(
                    content_bytes=file_bytes,
                    filename="document.pdf"
                )
                print(f"File type: {response.classification.format_type}")
                print(f"Confidence: {response.classification.confidence:.2%}")

        asyncio.run(classify_file())
        ```

        Classify multiple files concurrently:

        ```python
        async def classify_multiple_files():
            files = ["doc1.pdf", "script.py", "data.json", "image.jpg"]

            async def classify_single(filename):
                file_path = Path(filename)
                return filename, await client.classify(
                    content_bytes=file_path.read_bytes(),
                    filename=file_path.name
                )

            async with AsyncBookWyrmClient() as client:
                results = await asyncio.gather(*[
                    classify_single(f) for f in files
                ])

                for filename, response in results:
                    print(f"{filename}: {response.classification.content_type} ({response.classification.confidence:.1%})")
        ```
    """
    if content is None and content_bytes is None:
        raise ValueError("Either content or content_bytes is required")

    request = ClassifyRequest(
        content=content,
        content_bytes=content_bytes,
        filename=filename,
        content_encoding=content_encoding,
    )
    headers: Dict[str, str] = {**DEFAULT_HEADERS}
    if self.api_key:
        headers["Authorization"] = f"Bearer {self.api_key}"

    try:
        # Handle marshalling at API level
        if request.content_bytes is not None:
            file_bytes: bytes = request.content_bytes
        elif request.content is not None:
            if request.content_encoding == "base64":
                # Decode base64 content and send as multipart form data
                import base64

                file_bytes = base64.b64decode(request.content)
            else:
                # Handle raw text content
                file_bytes = request.content.encode("utf-8")
        else:
            raise BookWyrmAPIError(
                "Either content or content_bytes must be provided"
            )

        files: Dict[str, tuple] = {
            "file": (request.filename or "document", file_bytes)
        }
        response: httpx.Response = await self.client.post(
            f"{self.base_url}/classify",
            files=files,
            headers=headers,
            timeout=self.timeout,
        )

        response.raise_for_status()
        _check_deprecation_headers(response)
        response_data: Dict[str, Any] = response.json()
        return ClassifyResponse.model_validate(response_data)
    except httpx.HTTPStatusError as e:
        raise BookWyrmAPIError(f"API request failed: {e}", e.response.status_code)
    except httpx.RequestError as e:
        raise BookWyrmAPIError(f"Request failed: {e}")

options: show_root_heading: true

bookwyrm.AsyncBookWyrmClient.stream_process_text async

stream_process_text(
    *,
    text: Optional[str] = None,
    text_url: Optional[str] = None,
    chunk_size: Optional[int] = None,
    response_format: Union[
        ResponseFormat,
        Literal[
            "with_offsets", "offsets", "text_only", "text"
        ],
    ] = ResponseFormat.WITH_OFFSETS,
    offsets: Optional[bool] = None,
    text_only: Optional[bool] = None
) -> AsyncIterator[StreamingPhrasalResponse]

Stream text processing using phrasal analysis with async real-time results.

This async method breaks down text into meaningful phrases or chunks using NLP, supporting both direct text input and URLs. It can create fixed-size chunks or extract individual phrases with optional position information.

Parameters:

Name Type Description Default
text Optional[str]

Text content to process

None
text_url Optional[str]

URL to fetch text from

None
chunk_size Optional[int]

Optional chunk size for fixed-size chunking

None
response_format Union[ResponseFormat, Literal['with_offsets', 'offsets', 'text_only', 'text']]

Response format - use ResponseFormat enum, "with_offsets"/"offsets", or "text_only"/"text"

WITH_OFFSETS
offsets Optional[bool]

Set to True for WITH_OFFSETS format (boolean flag)

None
text_only Optional[bool]

Set to True for TEXT_ONLY format (boolean flag)

None

Yields:

Name Type Description
StreamingPhrasalResponse AsyncIterator[StreamingPhrasalResponse]

Union of progress updates and phrase/chunk results

Raises:

Type Description
BookWyrmAPIError

If the API request fails (network, authentication, server errors)

Examples:

Basic async phrasal processing:

import asyncio
from bookwyrm import AsyncBookWyrmClient
from bookwyrm.models import ResponseFormat, TextResult, TextSpanResult, PhraseProgressUpdate

async def process_text_example():
    text = "Natural language processing (NLP) is a subfield of linguistics, computer science, and artificial intelligence."
    phrases = []
    async with AsyncBookWyrmClient(api_key="your-api-key") as client:
        async for response in client.stream_process_text(
            text=text,
            offsets=True  # or response_format="with_offsets" or ResponseFormat.WITH_OFFSETS
        ):
            if isinstance(response, (TextResult, TextSpanResult)):  # Phrase result
                phrases.append(response)
            elif isinstance(response, PhraseProgressUpdate):  # Progress
                print(f"Progress: {response.message}")

    print(f"Extracted {len(phrases)} phrases")

asyncio.run(process_text_example())

Process multiple texts concurrently:

import asyncio
from bookwyrm import AsyncBookWyrmClient
from bookwyrm.models import TextResult, TextSpanResult

async def process_multiple_texts():
    async def process_single(text, name):
        phrases = []
        async with AsyncBookWyrmClient(api_key="your-api-key") as client:
            async for response in client.stream_process_text(
                text=text,
                chunk_size=500
            ):
                if isinstance(response, (TextResult, TextSpanResult)):
                    phrases.append(response)
        return name, phrases

    results = await asyncio.gather(
        process_single(text1, "Text1"),
        process_single(text2, "Text2"),
    )

    for name, phrases in results:
        print(f"{name}: {len(phrases)} phrases")

asyncio.run(process_multiple_texts())

Process text from URL:

import asyncio
from bookwyrm import AsyncBookWyrmClient
from bookwyrm.models import TextResult, TextSpanResult

async def process_from_url():
    async with AsyncBookWyrmClient(api_key="your-api-key") as client:
        phrases: List[Union[TextResult, TextSpanResult]] = []
        async for response in client.stream_process_text(
            text_url="https://www.gutenberg.org/files/11/11-0.txt",
            chunk_size=2000,
            text_only=True
        ):
            if isinstance(response, (TextResult, TextSpanResult)):
                phrases.append(response)

        print(f"Processed {len(phrases)} phrases from URL")

asyncio.run(process_from_url())
Source code in bookwyrm/async_client.py
async def stream_process_text(
    self,
    *,
    text: Optional[str] = None,
    text_url: Optional[str] = None,
    chunk_size: Optional[int] = None,
    response_format: Union[
        ResponseFormat, Literal["with_offsets", "offsets", "text_only", "text"]
    ] = ResponseFormat.WITH_OFFSETS,
    # Boolean flags for response format
    offsets: Optional[bool] = None,
    text_only: Optional[bool] = None,
) -> AsyncIterator[StreamingPhrasalResponse]:
    """Stream text processing using phrasal analysis with async real-time results.

    This async method breaks down text into meaningful phrases or chunks using NLP,
    supporting both direct text input and URLs. It can create fixed-size chunks
    or extract individual phrases with optional position information.

    Args:
        text: Text content to process
        text_url: URL to fetch text from
        chunk_size: Optional chunk size for fixed-size chunking
        response_format: Response format - use ResponseFormat enum, "with_offsets"/"offsets", or "text_only"/"text"
        offsets: Set to True for WITH_OFFSETS format (boolean flag)
        text_only: Set to True for TEXT_ONLY format (boolean flag)

    Yields:
        StreamingPhrasalResponse: Union of progress updates and phrase/chunk results

    Raises:
        BookWyrmAPIError: If the API request fails (network, authentication, server errors)

    Examples:
        Basic async phrasal processing:

        ```python
        import asyncio
        from bookwyrm import AsyncBookWyrmClient
        from bookwyrm.models import ResponseFormat, TextResult, TextSpanResult, PhraseProgressUpdate

        async def process_text_example():
            text = "Natural language processing (NLP) is a subfield of linguistics, computer science, and artificial intelligence."
            phrases = []
            async with AsyncBookWyrmClient(api_key="your-api-key") as client:
                async for response in client.stream_process_text(
                    text=text,
                    offsets=True  # or response_format="with_offsets" or ResponseFormat.WITH_OFFSETS
                ):
                    if isinstance(response, (TextResult, TextSpanResult)):  # Phrase result
                        phrases.append(response)
                    elif isinstance(response, PhraseProgressUpdate):  # Progress
                        print(f"Progress: {response.message}")

            print(f"Extracted {len(phrases)} phrases")

        asyncio.run(process_text_example())
        ```

        Process multiple texts concurrently:

        ```python
        import asyncio
        from bookwyrm import AsyncBookWyrmClient
        from bookwyrm.models import TextResult, TextSpanResult

        async def process_multiple_texts():
            async def process_single(text, name):
                phrases = []
                async with AsyncBookWyrmClient(api_key="your-api-key") as client:
                    async for response in client.stream_process_text(
                        text=text,
                        chunk_size=500
                    ):
                        if isinstance(response, (TextResult, TextSpanResult)):
                            phrases.append(response)
                return name, phrases

            results = await asyncio.gather(
                process_single(text1, "Text1"),
                process_single(text2, "Text2"),
            )

            for name, phrases in results:
                print(f"{name}: {len(phrases)} phrases")

        asyncio.run(process_multiple_texts())
        ```

        Process text from URL:

        ```python
        import asyncio
        from bookwyrm import AsyncBookWyrmClient
        from bookwyrm.models import TextResult, TextSpanResult

        async def process_from_url():
            async with AsyncBookWyrmClient(api_key="your-api-key") as client:
                phrases: List[Union[TextResult, TextSpanResult]] = []
                async for response in client.stream_process_text(
                    text_url="https://www.gutenberg.org/files/11/11-0.txt",
                    chunk_size=2000,
                    text_only=True
                ):
                    if isinstance(response, (TextResult, TextSpanResult)):
                        phrases.append(response)

                print(f"Processed {len(phrases)} phrases from URL")

        asyncio.run(process_from_url())
        ```
    """
    if text is None and text_url is None:
        raise ValueError("Either text or text_url is required")

    # Handle boolean flags for response format
    boolean_flags = [offsets, text_only]
    true_flags = [flag for flag in boolean_flags if flag is True]

    if len(true_flags) > 1:
        raise ValueError("Only one response format flag can be True")

    if len(true_flags) == 1:
        if offsets:
            response_format = ResponseFormat.WITH_OFFSETS
        elif text_only:
            response_format = ResponseFormat.TEXT_ONLY

    # Convert string to enum if needed
    if isinstance(response_format, str):
        if response_format.lower() in ("with_offsets", "offsets"):
            response_format = ResponseFormat.WITH_OFFSETS
        elif response_format.lower() in ("text_only", "text"):
            response_format = ResponseFormat.TEXT_ONLY
        else:
            raise ValueError(
                f"Invalid response_format: {response_format}. Use 'with_offsets'/'offsets' or 'text_only'/'text'"
            )

    request = ProcessTextRequest(
        text=text,
        text_url=text_url,
        chunk_size=chunk_size,
        response_format=response_format,
    )
    headers = {**DEFAULT_HEADERS, "Content-Type": "application/json"}
    if self.api_key:
        headers["Authorization"] = f"Bearer {self.api_key}"

    try:
        async with aconnect_sse(
            self.client,
            "POST",
            f"{self.base_url}/phrasal/sse",
            json=request.model_dump(exclude_none=True),
            headers=headers,
            timeout=self.timeout,
        ) as event_source:
            # Check response status and headers
            response = event_source.response
            response.raise_for_status()
            _check_deprecation_headers(response)

            async for sse in event_source.aiter_sse():
                if sse.data and sse.data.strip():
                    try:
                        data: Dict[str, Any] = json.loads(sse.data)

                        # Use the event type, or fall back to data.type
                        event_type = sse.event or data.get("type")

                        match event_type:
                            case "progress":
                                yield PhraseProgressUpdate.model_validate(data)
                            case "text":
                                yield TextResult.model_validate(data)
                            case "text_span":
                                yield TextSpanResult.model_validate(data)
                            case _:
                                # Unknown response type, skip
                                continue
                    except json.JSONDecodeError:
                        # Skip malformed JSON lines
                        continue

    except httpx.HTTPStatusError as e:
        raise BookWyrmAPIError(f"API request failed: {e}", e.response.status_code)
    except httpx.RequestError as e:
        raise BookWyrmAPIError(f"Request failed: {e}")

options: show_root_heading: true

bookwyrm.AsyncBookWyrmClient.stream_citations async

stream_citations(
    *,
    chunks: Optional[List[TextSpan]] = None,
    jsonl_content: Optional[str] = None,
    jsonl_url: Optional[str] = None,
    question: str,
    start: Optional[int] = 0,
    limit: Optional[int] = None,
    max_tokens_per_chunk: Optional[int] = 1000,
    model_strength: ModelStrength = ModelStrength.SWIFT
) -> AsyncIterator[StreamingCitationResponse]

Stream citations as they are found with real-time progress updates.

This async method provides real-time streaming of citation results, allowing you to process citations as they're found rather than waiting for all results. Useful for large datasets or when you want to show progress to users.

Parameters:

Name Type Description Default
chunks Optional[List[TextSpan]]

List of text chunks to search

None
jsonl_content Optional[str]

Raw JSONL content as string

None
jsonl_url Optional[str]

URL to fetch JSONL content from

None
question str

The question to find citations for

required
start Optional[int]

Starting chunk index (0-based)

0
limit Optional[int]

Maximum number of chunks to process

None
max_tokens_per_chunk Optional[int]

Maximum tokens per chunk

1000

Yields:

Name Type Description
StreamingCitationResponse AsyncIterator[StreamingCitationResponse]

Union of progress updates, individual citations,

AsyncIterator[StreamingCitationResponse]

final summary, or error messages

Raises:

Type Description
BookWyrmAPIError

If the API request fails (network, authentication, server errors)

Examples:

Basic async streaming with function arguments:

import asyncio
from bookwyrm import AsyncBookWyrmClient
from bookwyrm.models import TextSpan, CitationProgressUpdate, CitationStreamResponse, CitationSummaryResponse

async def stream_citations_example():
    # Create some example chunks
    chunks = [
        TextSpan(text="The sky is blue due to Rayleigh scattering.", start_char=0, end_char=42),
        TextSpan(text="Water molecules are polar.", start_char=43, end_char=69),
        TextSpan(text="Plants appear green due to chlorophyll.", start_char=70, end_char=109)
    ]

    async with AsyncBookWyrmClient(api_key="your-api-key") as client:
        citations = []
        async for response in client.stream_citations(
            chunks=chunks,
            question="Why is the sky blue?"
        ):
            if isinstance(response, CitationProgressUpdate):  # Progress update
                print(f"Progress: {response.message}")
            elif isinstance(response, CitationStreamResponse):  # Citation found
                citations.append(response.citation)
                print(f"Found: {response.citation.text[:50]}...")
            elif isinstance(response, CitationSummaryResponse):  # Summary
                print(f"Complete: {response.total_citations} citations found")

asyncio.run(stream_citations_example())

Legacy request object usage (still supported):

from bookwyrm.models import CitationRequest

request = CitationRequest(
    chunks=chunks,
    question="Why is the sky blue?"
)

async for response in client.stream_citations(request):
    # Process responses...
Source code in bookwyrm/async_client.py
async def stream_citations(
    self,
    *,
    chunks: Optional[List[TextSpan]] = None,
    jsonl_content: Optional[str] = None,
    jsonl_url: Optional[str] = None,
    question: str,
    start: Optional[int] = 0,
    limit: Optional[int] = None,
    max_tokens_per_chunk: Optional[int] = 1000,
    model_strength: ModelStrength = ModelStrength.SWIFT,
) -> AsyncIterator[StreamingCitationResponse]:
    """Stream citations as they are found with real-time progress updates.

    This async method provides real-time streaming of citation results, allowing you to
    process citations as they're found rather than waiting for all results. Useful
    for large datasets or when you want to show progress to users.

    Args:
        chunks: List of text chunks to search
        jsonl_content: Raw JSONL content as string
        jsonl_url: URL to fetch JSONL content from
        question: The question to find citations for
        start: Starting chunk index (0-based)
        limit: Maximum number of chunks to process
        max_tokens_per_chunk: Maximum tokens per chunk

    Yields:
        StreamingCitationResponse: Union of progress updates, individual citations,
        final summary, or error messages

    Raises:
        BookWyrmAPIError: If the API request fails (network, authentication, server errors)

    Examples:
        Basic async streaming with function arguments:

        ```python
        import asyncio
        from bookwyrm import AsyncBookWyrmClient
        from bookwyrm.models import TextSpan, CitationProgressUpdate, CitationStreamResponse, CitationSummaryResponse

        async def stream_citations_example():
            # Create some example chunks
            chunks = [
                TextSpan(text="The sky is blue due to Rayleigh scattering.", start_char=0, end_char=42),
                TextSpan(text="Water molecules are polar.", start_char=43, end_char=69),
                TextSpan(text="Plants appear green due to chlorophyll.", start_char=70, end_char=109)
            ]

            async with AsyncBookWyrmClient(api_key="your-api-key") as client:
                citations = []
                async for response in client.stream_citations(
                    chunks=chunks,
                    question="Why is the sky blue?"
                ):
                    if isinstance(response, CitationProgressUpdate):  # Progress update
                        print(f"Progress: {response.message}")
                    elif isinstance(response, CitationStreamResponse):  # Citation found
                        citations.append(response.citation)
                        print(f"Found: {response.citation.text[:50]}...")
                    elif isinstance(response, CitationSummaryResponse):  # Summary
                        print(f"Complete: {response.total_citations} citations found")

        asyncio.run(stream_citations_example())
        ```

        Legacy request object usage (still supported):

        ```python
        from bookwyrm.models import CitationRequest

        request = CitationRequest(
            chunks=chunks,
            question="Why is the sky blue?"
        )

        async for response in client.stream_citations(request):
            # Process responses...
        ```
    """
    # Handle empty chunks list - return empty response immediately
    if chunks is not None and len(chunks) == 0:
        from .models import UsageInfo

        yield CitationSummaryResponse(
            total_citations=0,
            chunks_processed=0,
            token_chunks_processed=0,
            start_offset=0,
            usage=UsageInfo(
                tokens_processed=0,
                chunks_processed=0,
                estimated_cost=None,
                remaining_credits=0.0,
            ),
        )
        return

    request = CitationRequest(
        chunks=chunks,
        jsonl_content=jsonl_content,
        jsonl_url=jsonl_url,
        question=question,
        start=start,
        limit=limit,
        max_tokens_per_chunk=max_tokens_per_chunk,
        model_strength=model_strength,
    )
    headers = {**DEFAULT_HEADERS, "Content-Type": "application/json"}
    if self.api_key:
        headers["Authorization"] = f"Bearer {self.api_key}"

    try:
        async with aconnect_sse(
            self.client,
            "POST",
            f"{self.base_url}/cite/sse",
            json=request.model_dump(exclude_none=True),
            headers=headers,
            timeout=self.timeout,
        ) as event_source:
            # Check response status and headers
            response = event_source.response
            response.raise_for_status()
            _check_deprecation_headers(response)

            async for sse in event_source.aiter_sse():
                if sse.data and sse.data.strip():
                    try:
                        data: Dict[str, Any] = json.loads(sse.data)

                        # Use the event type, or fall back to data.type
                        event_type = sse.event or data.get("type")

                        match event_type:
                            case "progress":
                                # SSE endpoint sends ProgressUpdate, convert to CitationProgressUpdate
                                progress_data = {
                                    "type": "progress",
                                    "chunks_processed": data.get(
                                        "chunks_processed", 0
                                    ),
                                    "total_chunks": data.get("total_chunks", 0),
                                    "citations_found": data.get(
                                        "citations_found", 0
                                    ),
                                    "current_chunk_range": data.get(
                                        "message", "Processing..."
                                    ),  # Use message as range
                                    "message": data.get("message", "Processing..."),
                                }
                                yield CitationProgressUpdate.model_validate(
                                    progress_data
                                )
                            case "citation":
                                yield CitationStreamResponse.model_validate(data)
                            case "citation_span":
                                # Handle citation_span events as regular citations
                                citation_data = {
                                    "type": "citation",
                                    "citation": data.get("citation"),
                                }
                                yield CitationStreamResponse.model_validate(
                                    citation_data
                                )
                            case "summary":
                                # SSE endpoint sends SummaryResult, convert to CitationSummaryResponse
                                summary_data = {
                                    "type": "summary",
                                    "total_citations": data.get(
                                        "total_citations", 0
                                    ),
                                    "chunks_processed": data.get(
                                        "chunks_processed", 0
                                    ),
                                    "token_chunks_processed": data.get(
                                        "token_chunks_processed", 0
                                    ),
                                    "start_offset": 0,  # SSE endpoint doesn't provide this, default to 0
                                    "usage": data.get(
                                        "usage",
                                        {
                                            "tokens_processed": 0,
                                            "chunks_processed": 0,
                                            "remaining_credits": 0.0,
                                        },
                                    ),
                                }
                                yield CitationSummaryResponse.model_validate(
                                    summary_data
                                )
                            case "error":
                                yield CitationErrorResponse.model_validate(data)
                            case _:
                                # Unknown response type, skip
                                continue
                    except json.JSONDecodeError:
                        # Skip malformed JSON lines
                        continue

    except httpx.HTTPStatusError as e:
        raise BookWyrmAPIError(f"API request failed: {e}", e.response.status_code)
    except httpx.RequestError as e:
        raise BookWyrmAPIError(f"Request failed: {e}")

options: show_root_heading: true

bookwyrm.AsyncBookWyrmClient.stream_summarize async

stream_summarize(
    *,
    content: Optional[str] = None,
    url: Optional[str] = None,
    phrases: Optional[List[TextSpan]] = None,
    max_tokens: int = 10000,
    model_strength: str = "swift",
    debug: bool = False,
    model_name: Optional[str] = None,
    model_schema_json: Optional[str] = None,
    summary_class: Optional[Type[BaseModel]] = None,
    chunk_prompt: Optional[str] = None,
    summary_of_summaries_prompt: Optional[str] = None
) -> AsyncIterator[StreamingSummarizeResponse]

Stream summarization progress and results with real-time updates.

This async method provides real-time streaming of summarization progress, including hierarchical processing updates, retry attempts, and final results. Useful for long-running summarization tasks where you want to show progress to users.

Parameters:

Name Type Description Default
content Optional[str]

Text content to summarize

None
url Optional[str]

URL to fetch content from

None
phrases Optional[List[TextSpan]]

List of text phrases to summarize

None
max_tokens int

Maximum tokens for chunking (default: 10000)

10000
debug bool

Include intermediate summaries in response

False

Yields:

Name Type Description
StreamingSummarizeResponse AsyncIterator[StreamingSummarizeResponse]

Union of progress updates, final summary, rate limit messages,

AsyncIterator[StreamingSummarizeResponse]

structural error messages, or general errors

Raises:

Type Description
BookWyrmAPIError

If the API request fails (network, authentication, server errors)

Examples:

Basic async streaming summarization with function arguments:

async def stream_summarize_example():
    async with AsyncBookWyrmClient() as client:
        final_result = None
        async for response in client.stream_summarize(
            content=content,
            max_tokens=5000,
            debug=True
        ):
            if isinstance(response, SummarizeProgressUpdate):  # Progress update
                print(f"Progress: {response.message}")
            elif isinstance(response, SummaryResponse):  # Final summary
                final_result = response
                print("Summary complete!")

        if final_result:
            print(final_result.summary)

asyncio.run(stream_summarize_example())

Legacy request object usage (still supported):

from bookwyrm.models import SummarizeRequest

request = SummarizeRequest(
    content=content,
    max_tokens=5000,
    debug=True
)

async for response in client.stream_summarize(request):
    # Process responses...
Source code in bookwyrm/async_client.py
async def stream_summarize(
    self,
    *,
    content: Optional[str] = None,
    url: Optional[str] = None,
    phrases: Optional[List[TextSpan]] = None,
    max_tokens: int = 10000,
    model_strength: str = "swift",
    debug: bool = False,
    model_name: Optional[str] = None,
    model_schema_json: Optional[str] = None,
    summary_class: Optional[Type[BaseModel]] = None,
    chunk_prompt: Optional[str] = None,
    summary_of_summaries_prompt: Optional[str] = None,
) -> AsyncIterator[StreamingSummarizeResponse]:
    """Stream summarization progress and results with real-time updates.

    This async method provides real-time streaming of summarization progress, including
    hierarchical processing updates, retry attempts, and final results. Useful for
    long-running summarization tasks where you want to show progress to users.

    Args:
        content: Text content to summarize
        url: URL to fetch content from
        phrases: List of text phrases to summarize
        max_tokens: Maximum tokens for chunking (default: 10000)
        debug: Include intermediate summaries in response

    Yields:
        StreamingSummarizeResponse: Union of progress updates, final summary, rate limit messages,
        structural error messages, or general errors

    Raises:
        BookWyrmAPIError: If the API request fails (network, authentication, server errors)

    Examples:
        Basic async streaming summarization with function arguments:

        ```python
        async def stream_summarize_example():
            async with AsyncBookWyrmClient() as client:
                final_result = None
                async for response in client.stream_summarize(
                    content=content,
                    max_tokens=5000,
                    debug=True
                ):
                    if isinstance(response, SummarizeProgressUpdate):  # Progress update
                        print(f"Progress: {response.message}")
                    elif isinstance(response, SummaryResponse):  # Final summary
                        final_result = response
                        print("Summary complete!")

                if final_result:
                    print(final_result.summary)

        asyncio.run(stream_summarize_example())
        ```

        Legacy request object usage (still supported):

        ```python
        from bookwyrm.models import SummarizeRequest

        request = SummarizeRequest(
            content=content,
            max_tokens=5000,
            debug=True
        )

        async for response in client.stream_summarize(request):
            # Process responses...
        ```
    """
    sources = [content, url, phrases]
    provided_sources = [s for s in sources if s is not None]
    if len(provided_sources) != 1:
        raise ValueError("Exactly one of content, url, or phrases must be provided")

    request = SummarizeRequest(
        content=content,
        url=url,
        phrases=phrases,
        max_tokens=max_tokens,
        model_strength=model_strength,
        debug=debug,
        model_name=model_name,
        model_schema_json=model_schema_json,
        summary_class=summary_class,
        chunk_prompt=chunk_prompt,
        summary_of_summaries_prompt=summary_of_summaries_prompt,
    )
    headers = {**DEFAULT_HEADERS, "Content-Type": "application/json"}
    if self.api_key:
        headers["Authorization"] = f"Bearer {self.api_key}"

    try:
        async with aconnect_sse(
            self.client,
            "POST",
            f"{self.base_url}/summarize/sse",
            json=request.model_dump(exclude_none=True),
            headers=headers,
            timeout=self.timeout,
        ) as event_source:
            # Check response status and headers
            response = event_source.response
            response.raise_for_status()
            _check_deprecation_headers(response)

            async for sse in event_source.aiter_sse():
                if sse.data and sse.data.strip():
                    try:
                        data: Dict[str, Any] = json.loads(sse.data)

                        # Use the event type, or fall back to data.type
                        event_type = sse.event or data.get("type")

                        match event_type:
                            case "progress":
                                yield SummarizeProgressUpdate.model_validate(data)
                            case "data":
                                # SSE endpoint sends DataResult, but we need to convert to SummaryResponse
                                # Create a SummaryResponse from the data fields
                                summary_data = {
                                    "type": "summary",  # Set the expected type
                                    "summary": data.get("summary", ""),
                                    "subsummary_count": data.get(
                                        "subsummary_count", 0
                                    ),
                                    "levels_used": data.get("levels_used", 0),
                                    "total_tokens": data.get("total_tokens", 0),
                                    "intermediate_summaries": data.get(
                                        "intermediate_summaries"
                                    ),
                                }
                                yield SummaryResponse.model_validate(summary_data)
                            case "error":
                                yield SummarizeErrorResponse.model_validate(data)
                            case "rate_limit":
                                yield RateLimitMessage.model_validate(data)
                            case "structural_error":
                                yield StructuralErrorMessage.model_validate(data)
                            case _:
                                # Unknown response type, skip
                                continue
                    except json.JSONDecodeError:
                        # Skip malformed JSON lines
                        continue

    except httpx.HTTPStatusError as e:
        raise BookWyrmAPIError(f"API request failed: {e}", e.response.status_code)
    except httpx.RequestError as e:
        raise BookWyrmAPIError(f"Request failed: {e}")

options: show_root_heading: true