*** ## slug: /guide/baml-basics/streaming BAML lets you stream in structured JSON output from LLMs as it comes in. If you tried streaming in a JSON output from an LLM you'd see something like: ``` {"items": [{"name": "Appl {"items": [{"name": "Apple", "quantity": 2, "price": 1. {"items": [{"name": "Apple", "quantity": 2, "price": 1.50}], "total_cost": {"items": [{"name": "Apple", "quantity": 2, "price": 1.50}], "total_cost": 3.00} # Completed ``` BAML gives you fine-grained control of how it fixes this partial JSON and transforms it into a series of semantically valid partial objects. You can check out more examples (including streaming in FastAPI and NextJS) in the [BAML Examples] repo. [call BAML functions]: /docs/calling-baml/calling-functions [BAML Examples]: https://github.com/BoundaryML/baml-examples/tree/main Let's stream the output of this function `function ExtractReceiptInfo(email: string) -> ReceiptInfo` for our example: ```rust class ReceiptItem { name string description string? quantity int price float } class ReceiptInfo { items ReceiptItem[] total_cost float? } function ExtractReceiptInfo(email: string) -> ReceiptInfo { client GPT4o prompt #" Given the receipt below: {{ email }} {{ ctx.output_format }} "# } ``` The BAML code generator creates a set of types in the `baml_client` library in a module called `partial_types` in `baml_client`. These types are modified from your original types to support streaming. By default, BAML will convert all Class fields into nullable fields, and fill those fields with non-null values as much as possible given the tokens received so far. BAML will generate `b.stream.ExtractReceiptInfo()` for you, which you can use like so: ```python main.py import asyncio from baml_client import b, partial_types, types # Using a stream: def example1(receipt: str): stream = b.stream.ExtractReceiptInfo(receipt) # partial is a Partial type with all Optional fields for partial in stream: print(f"partial: parsed {len(partial.items)} items (object: {partial})") # final is the full, original, validated ReceiptInfo type final = stream.get_final_response() print(f"final: {len(final.items)} items (object: {final})") # Using only get_final_response() of a stream # # In this case, you should just use b.ExtractReceiptInfo(receipt) instead, # which is slightly faster and more efficient. def example2(receipt: str): final = b.stream.ExtractReceiptInfo(receipt).get_final_response() print(f"final: {len(final.items)} items (object: {final})") # Using the async client: async def example3(receipt: str): # Note the import of the async client from baml_client.async_client import b stream = b.stream.ExtractReceiptInfo(receipt) async for partial in stream: print(f"partial: parsed {len(partial.items)} items (object: {partial})") final = await stream.get_final_response() print(f"final: {len(final.items)} items (object: {final})") receipt = """ 04/14/2024 1:05 pm Ticket: 220000082489 Register: Shop Counter Employee: Connor Customer: Sam Item # Price Guide leash (1 Pair) uni UNI 1 $34.95 The Index Town Walls 1 $35.00 Boot Punch 3 $60.00 Subtotal $129.95 Tax ($129.95 @ 9%) $11.70 Total Tax $11.70 Total $141.65 """ if __name__ == '__main__': #uncomment one at a time and run to see the difference example1(receipt) #example2(receipt) #asyncio.run(example3(receipt)) ``` BAML will generate `b.stream.ExtractReceiptInfo()` for you, which you can use like so: ```ts main.ts import { b } from './baml_client' // Using both async iteration and getFinalResponse() from a stream const example1 = async (receipt: string) => { const stream = b.stream.ExtractReceiptInfo(receipt) // partial is a Partial type with all Optional fields for await (const partial of stream) { console.log(`partial: ${partial.items?.length} items (object: ${partial})`) } // final is the full, original, validated ReceiptInfo type const final = await stream.getFinalResponse() console.log(`final: ${final.items.length} items (object: ${final})`) } // Using only async iteration of a stream const example2 = async (receipt: string) => { for await (const partial of b.stream.ExtractReceiptInfo(receipt)) { console.log(`partial: ${partial.items?.length} items (object: ${partial})`) } } // Using only getFinalResponse() of a stream // // In this case, you should just use b.ExtractReceiptInfo(receipt) instead, // which is faster and more efficient. const example3 = async (receipt: string) => { const final = await b.stream.ExtractReceiptInfo(receipt).getFinalResponse() console.log(`final: ${final.items.length} items (object: ${final})`) } const receipt = ` 04/14/2024 1:05 pm Ticket: 220000082489 Register: Shop Counter Employee: Connor Customer: Sam Item # Price Guide leash (1 Pair) uni UNI 1 $34.95 The Index Town Walls 1 $35.00 Boot Punch 3 $60.00 Subtotal $129.95 Tax ($129.95 @ 9%) $11.70 Total Tax $11.70 Total $141.65 ` if (require.main === module) { example1(receipt) example2(receipt) example3(receipt) } ``` BAML will generate `b.Stream.ExtractReceiptInfo()` for you, which you can use like so: ```go main.go package main import ( "context" "fmt" "log" "sync" "time" b "example.com/myproject/baml_client" "example.com/myproject/baml_client/stream_types" "example.com/myproject/baml_client/types" ) // Basic streaming with comprehensive error handling and context cancellation func basicStreamingExample(receipt string) { // Create context with timeout to prevent hanging ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) defer cancel() // Always clean up context resources stream, err := b.Stream.ExtractReceiptInfo(ctx, receipt) if err != nil { log.Printf("Failed to create stream: %v", err) return } // Ensure stream is properly closed on exit defer func() { if stream != nil { // Note: In practice, range automatically handles closing // but explicit cleanup is shown here for demonstration log.Println("Stream processing completed") } }() for value := range stream { // Handle context cancellation select { case <-ctx.Done(): log.Printf("Stream cancelled due to context: %v", ctx.Err()) return default: } // Handle streaming errors if value.IsError { log.Printf("Stream error: %v", value.Error) return } // Process partial results if !value.IsFinal && value.Stream() != nil { partial := *value.Stream() fmt.Printf("Partial result: parsed %d items so far\n", len(partial.Items)) // You could process partial results here for i, item := range partial.Items { if item.Name != "" { // Only show items with names parsed so far fmt.Printf(" Item %d: %s - %s\n", i+1, item.Name, item.Price) } } } // Process final result if value.IsFinal && value.Final() != nil { final := *value.Final() fmt.Printf("Final result: %d items total\n", len(final.Items)) fmt.Printf("Total amount: %s\n", final.Total) return } } } // Stream with early termination based on conditions func streamWithEarlyTermination(receipt string) (*types.ReceiptInfo, error) { ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) defer cancel() stream, err := b.Stream.ExtractReceiptInfo(ctx, receipt) if err != nil { return nil, fmt.Errorf("failed to create stream: %w", err) } for value := range stream { // Check for cancellation select { case <-ctx.Done(): return nil, fmt.Errorf("stream cancelled: %w", ctx.Err()) default: } if value.IsError { return nil, fmt.Errorf("stream error: %w", value.Error) } // Early termination condition: stop if we have enough items if !value.IsFinal && value.Stream() != nil { partial := *value.Stream() if len(partial.Items) >= 3 { // Stop early if we have 3+ items fmt.Printf("Early termination: found %d items, stopping stream\n", len(partial.Items)) cancel() // Cancel context to stop stream return &partial, nil } } if value.IsFinal && value.Final() != nil { final := *value.Final() return &final, nil } } return nil, fmt.Errorf("stream ended without final response") } // Concurrent streaming - process multiple receipts concurrently func concurrentStreamingExample(receipts []string) { var wg sync.WaitGroup results := make(chan *types.ReceiptInfo, len(receipts)) errors := make(chan error, len(receipts)) // Create context with timeout for all goroutines ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second) defer cancel() for i, receipt := range receipts { wg.Add(1) go func(index int, receiptData string) { defer wg.Done() // Create per-goroutine context goroutineCtx, goroutineCancel := context.WithTimeout(ctx, 30*time.Second) defer goroutineCancel() stream, err := b.Stream.ExtractReceiptInfo(goroutineCtx, receiptData) if err != nil { errors <- fmt.Errorf("receipt %d: failed to create stream: %w", index, err) return } for value := range stream { select { case <-goroutineCtx.Done(): errors <- fmt.Errorf("receipt %d: stream cancelled: %w", index, goroutineCtx.Err()) return default: } if value.IsError { errors <- fmt.Errorf("receipt %d: stream error: %w", index, value.Error) return } if value.IsFinal && value.Final() != nil { final := *value.Final() fmt.Printf("Receipt %d: processed %d items\n", index, len(final.Items)) results <- &final return } } errors <- fmt.Errorf("receipt %d: stream ended without final response", index) }(i, receipt) } // Wait for all goroutines and close channels go func() { wg.Wait() close(results) close(errors) }() // Collect results and errors var successCount int var errorCount int for results != nil || errors != nil { select { case result, ok := <-results: if !ok { results = nil continue } if result != nil { successCount++ fmt.Printf("Successfully processed receipt with %d items, total: %s\n", len(result.Items), result.Total) } case err, ok := <-errors: if !ok { errors = nil continue } if err != nil { errorCount++ log.Printf("Error processing receipt: %v", err) } } } fmt.Printf("Concurrent processing completed: %d successes, %d errors\n", successCount, errorCount) } // Robust streaming with retry logic func streamWithRetry(receipt string, maxRetries int) (*types.ReceiptInfo, error) { for attempt := 1; attempt <= maxRetries; attempt++ { // Create fresh context for each attempt ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) stream, err := b.Stream.ExtractReceiptInfo(ctx, receipt) if err != nil { cancel() if attempt == maxRetries { return nil, fmt.Errorf("failed after %d attempts: %w", maxRetries, err) } log.Printf("Attempt %d failed: %v, retrying...", attempt, err) time.Sleep(time.Duration(attempt) * time.Second) // Exponential backoff continue } for value := range stream { select { case <-ctx.Done(): cancel() if attempt == maxRetries { return nil, fmt.Errorf("stream timeout after %d attempts: %w", maxRetries, ctx.Err()) } log.Printf("Attempt %d timed out, retrying...", attempt) break default: } if value.IsError { cancel() if attempt == maxRetries { return nil, fmt.Errorf("stream failed after %d attempts: %w", maxRetries, value.Error) } log.Printf("Attempt %d failed with stream error: %v, retrying...", attempt, value.Error) time.Sleep(time.Duration(attempt) * time.Second) break } if value.IsFinal && value.Final() != nil { final := *value.Final() cancel() return &final, nil } } } return nil, fmt.Errorf("all %d attempts failed", maxRetries) } func main() { receipt := `04/14/2024 1:05 pm Ticket: 220000082489 Register: Shop Counter Employee: Connor Customer: Sam Item # Price Guide leash (1 Pair) uni UNI 1 $34.95 The Index Town Walls 1 $35.00 Boot Punch 3 $60.00 Subtotal $129.95 Tax ($129.95 @ 9%) $11.70 Total Tax $11.70 Total $141.65` fmt.Println("=== Basic Streaming Example ===") basicStreamingExample(receipt) fmt.Println("\n=== Stream with Early Termination ===") result, err := streamWithEarlyTermination(receipt) if err != nil { log.Printf("Early termination example failed: %v", err) } else if result != nil { fmt.Printf("Early termination result: %d items\n", len(result.Items)) } fmt.Println("\n=== Concurrent Streaming Example ===") receipts := []string{receipt, receipt, receipt} // Process same receipt 3 times concurrently concurrentStreamingExample(receipts) fmt.Println("\n=== Stream with Retry Example ===") retryResult, err := streamWithRetry(receipt, 3) if err != nil { log.Printf("Retry example failed: %v", err) } else if retryResult != nil { fmt.Printf("Retry example succeeded: %d items\n", len(retryResult.Items)) } } ``` BAML will generate `Baml.Client.stream.ExtractReceiptInfo()` for you, which you can use like so: ```ruby main.rb require_relative "baml_client/client" $b = Baml.Client # Using both iteration and get_final_response() from a stream def example1(receipt) stream = $b.stream.ExtractReceiptInfo(receipt) stream.each do |partial| puts "partial: #{partial.items&.length} items" end final = stream.get_final_response puts "final: #{final.items.length} items" end # Using only iteration of a stream def example2(receipt) $b.stream.ExtractReceiptInfo(receipt).each do |partial| puts "partial: #{partial.items&.length} items" end end # Using only get_final_response() of a stream # # In this case, you should just use BamlClient.ExtractReceiptInfo(receipt) instead, # which is faster and more efficient. def example3(receipt) final = $b.stream.ExtractReceiptInfo(receipt).get_final_response puts "final: #{final.items.length} items" end receipt = <<~RECEIPT 04/14/2024 1:05 pm Ticket: 220000082489 Register: Shop Counter Employee: Connor Customer: Sam Item # Price Guide leash (1 Pair) uni UNI 1 $34.95 The Index Town Walls 1 $35.00 Boot Punch 3 $60.00 Subtotal $129.95 Tax ($129.95 @ 9%) $11.70 Total Tax $11.70 Total $141.65 RECEIPT if __FILE__ == $0 example1(receipt) example2(receipt) example3(receipt) end ``` BAML will generate `B.ExtractReceiptInfo.stream()` for you, which you can use like so: ```rust main.rs use myproject::baml_client::sync_client::B; use myproject::baml_client::types::*; // Using both partials and get_final_response() from a stream fn example1(receipt: &str) { let mut stream = B.ExtractReceiptInfo.stream(receipt).unwrap(); // partial is a Partial type with all Optional fields for partial in stream.partials() { let partial = partial.unwrap(); println!("partial: {} items", partial.items.len()); } // final is the full, original, validated ReceiptInfo type let final_result = stream.get_final_response().unwrap(); println!("final: {} items", final_result.items.len()); } // Using only get_final_response() of a stream // // In this case, you should just use B.ExtractReceiptInfo.call(receipt) instead, // which is slightly faster and more efficient. fn example2(receipt: &str) { let stream = B.ExtractReceiptInfo.stream(receipt).unwrap(); let final_result = stream.get_final_response().unwrap(); println!("final: {} items", final_result.items.len()); } fn main() { let receipt = "04/14/2024 1:05 pm\n\nTicket: 220000082489\nRegister: Shop Counter\nEmployee: Connor\nCustomer: Sam\nItem\t#\tPrice\nGuide leash (1 Pair) uni UNI\n1\t$34.95\nThe Index Town Walls\n1\t$35.00\nBoot Punch\n3\t$60.00\nSubtotal\t$129.95\nTax ($129.95 @ 9%)\t$11.70\nTotal Tax\t$11.70\nTotal\t$141.65"; example1(receipt); example2(receipt); } ``` When using `baml-cli serve`, streaming is available via `http://localhost:2024/stream/{FunctionName}`. However streaming routes are not added to the `openapi.yaml` file because there are no partial type definitions for JSON schema yet. Number fields are always streamed in only when the LLM completes them. E.g. if the final number is 129.95, you'll only see null or 129.95 instead of partial numbers like 1, 12, 129.9, etc. ## Cancelling Streams You can cancel ongoing streams using abort controllers, which is essential for responsive applications that allow users to stop generation or implement timeouts. ```typescript import { b } from './baml_client' const controller = new AbortController() const stream = b.stream.ExtractReceiptInfo(receipt, { abortController: controller }) // Process stream with ability to cancel let itemCount = 0 for await (const partial of stream) { itemCount = partial.items?.length || 0 console.log(`Received ${itemCount} items so far`) // Cancel if we have enough items if (itemCount >= 5) { console.log('Stopping stream - got enough items') controller.abort() break } } // Or cancel after a timeout setTimeout(() => { controller.abort() console.log('Stream cancelled due to timeout') }, 5000) ``` ```python from baml_client.async_client import b from baml_py import AbortController controller = AbortController() stream = b.stream.ExtractReceiptInfo( receipt, baml_options={"abort_controller": controller} ) # Process stream with ability to cancel item_count = 0 async for partial in stream: item_count = len(partial.items) if partial.items else 0 print(f"Received {item_count} items so far") # Cancel if we have enough items if item_count >= 5: print("Stopping stream - got enough items") controller.abort() break # Or cancel after a timeout import asyncio async def cancel_after_timeout(): await asyncio.sleep(5) controller.abort() print("Stream cancelled due to timeout") asyncio.create_task(cancel_after_timeout()) ``` ```go // Go already uses context for cancellation in the examples above ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() stream, err := b.Stream.ExtractReceiptInfo(ctx, receipt) if err != nil { log.Printf("Failed to create stream: %v", err) return } for value := range stream { // Stream will automatically stop when context is cancelled select { case <-ctx.Done(): log.Printf("Stream cancelled: %v", ctx.Err()) return default: } // Process partial results if !value.IsFinal && value.Stream() != nil { partial := *value.Stream() if len(partial.Items) >= 5 { log.Printf("Stopping stream - got %d items", len(partial.Items)) cancel() // Cancel the context to stop the stream return } } } ``` ```ruby require 'baml_client' controller = Baml::AbortController.new stream = $b.stream.ExtractReceiptInfo( receipt, baml_options: { abort_controller: controller } ) # Process stream with ability to cancel item_count = 0 stream.each do |partial| item_count = partial.items&.length || 0 puts "Received #{item_count} items so far" # Cancel if we have enough items if item_count >= 5 puts "Stopping stream - got enough items" controller.abort break end end # Or cancel after a timeout (in a separate thread) Thread.new do sleep(5) controller.abort puts "Stream cancelled due to timeout" end ``` ```rust use baml::CancellationToken; use myproject::baml_client::sync_client::B; use std::time::Duration; // Cancel a stream after a timeout let token = CancellationToken::new_with_timeout(Duration::from_secs(5)); let mut stream = B.ExtractReceiptInfo .with_cancellation_token(Some(token)) .stream(receipt) .unwrap(); for partial in stream.partials() { match partial { Ok(partial) => { println!("Received {} items so far", partial.items.len()); // Stop consuming the stream if we have enough items if partial.items.len() >= 5 { println!("Stopping stream - got enough items"); break; } } Err(e) => { eprintln!("Stream error: {}", e); break; } } } ``` ### Common Streaming Cancellation Patterns #### User-Initiated Cancellation Allow users to stop streaming generation with a "Stop" button: ```tsx function StreamingComponent() { const [controller, setController] = useState(null) const [isStreaming, setIsStreaming] = useState(false) const [result, setResult] = useState("") const startStreaming = async () => { const newController = new AbortController() setController(newController) setIsStreaming(true) try { const stream = b.stream.GenerateContent(prompt, { abortController: newController }) let accumulated = "" for await (const partial of stream) { accumulated = partial.content || "" setResult(accumulated) } } catch (error) { if (error.name === 'BamlAbortError') { console.log('Stream cancelled by user') } } finally { setIsStreaming(false) setController(null) } } const stopStreaming = () => { controller?.abort() } return (
{result}
) } ```
```python from fastapi import FastAPI from fastapi.responses import StreamingResponse from baml_py import AbortController import asyncio app = FastAPI() active_streams = {} @app.post("/stream/{stream_id}") async def start_stream(stream_id: str, prompt: str): controller = AbortController() active_streams[stream_id] = controller async def generate(): try: stream = b.stream.GenerateContent( prompt, baml_options={"abort_controller": controller} ) async for partial in stream: if controller.aborted: break yield f"data: {partial.content}\n\n" except BamlAbortError: yield "data: [CANCELLED]\n\n" finally: active_streams.pop(stream_id, None) return StreamingResponse(generate(), media_type="text/event-stream") @app.post("/stop/{stream_id}") async def stop_stream(stream_id: str): if controller := active_streams.get(stream_id): controller.abort() return {"status": "stopped"} return {"status": "not found"} ```
For more examples and patterns, see the [Abort Controllers guide](/guide/baml-basics/abort-signal). ## Semantic Streaming BAML provides powerful attributes to control how your data streams, ensuring that partial values always maintain semantic validity. Here are the three key streaming attributes: ### `@stream.done` This attribute ensures a type or field is only streamed when it's completely finished. It's useful when you need atomic, fully-formed values. For example: ```baml class ReceiptItem { name string quantity int price float // The entire ReceiptItem will only stream when complete @@stream.done } // Receipts is a list of ReceiptItems, // each internal item will only stream when complete type Receipts = ReceiptItem[] class Person { // Name will only appear when fully complete, // until then it will be null name string @stream.done // Numbers (floats and ints) will only appear // when fully complete by default age int // Bio will stream token by token bio string } ``` #### Atomic list items with union types A common pattern is streaming a list of items where each item can be one of several types (e.g. tool calls and messages). You can use `@stream.done` on the list element type to ensure each item only appears once it's fully complete: ```baml class ToolCall { name string parameters string } class Message { role string content string } type OutputItem = ToolCall | Message // Each list element appears only when fully complete. // The list grows incrementally as items finish. function Run(input: string) -> (OutputItem @stream.done)[] { client MyClient prompt #" {{ input }} {{ ctx.output_format }} "# } ``` When `@stream.done` is applied to a union type, it propagates to all variants. This means you don't need to add `@@stream.done` to each class individually — annotating the union is sufficient. You can also achieve the same behavior by adding `@@stream.done` to each class in the union. The `(T @stream.done)[]` syntax is more concise when the classes are used in other contexts where you don't want `@@stream.done`. ### `@stream.not_null` This attribute ensures a containing object is only streamed when this field has a value. It's particularly useful for discriminator fields or required metadata. For example: ```baml class Message { // Message won't stream until type is known type "error" | "success" | "info" @stream.not_null // Timestamp will only appear when fully complete // until then it will be null timestamp string @stream.done // Content can stream token by token content string } ``` ### `@stream.with_state` This attribute adds metadata to track if a field has finished streaming. It's perfect for showing loading states in UIs. For example: ```baml class BlogPost { // The blog post will only stream when title is known title string @stream.done @stream.not_null // The content will stream token by token, and include completion state content string @stream.with_state } ``` This will generate the following code in the `partial_types` module: ```python class StreamState(BaseModel, Generic[T]): value: T, state: "incomplete" | "complete" class BlogPost(BaseModel): title: str content: StreamState[str | None] ``` ```typescript interface StreamState { value: T, state: "incomplete" | "complete" } interface BlogPost { title: StreamState content: StreamState } ``` ### Type Transformation Summary Here's how these attributes affect your types in generated code: | BAML Type | Generated Type (during streaming) | Description | | --------------------------------- | --------------------------------- | ----------------------------------------- | | `T` | `Partial[T]?` | Default: Nullable and partial | | `T @stream.done` | `T?` | Nullable but always complete when present | | `T @stream.not_null` | `Partial[T]` | Always present but may be partial | | `T @stream.done @stream.not_null` | `T` | Always present and always complete | | `T @stream.with_state` | `StreamState[Partial[T]?]` | Includes streaming state metadata | The return type of a function is not affected by streaming attributes! ## Putting it all together Let's put all of these concepts together to design an application that streams a conversation containing stock recommendations, using semantic streaming to ensure that the streamed data obeys our domain's invariants. ```baml enum Stock { APPL MSFT GOOG BAML } // Make recommendations atomic - we do not want a recommendation to be // modified by streaming additional messages. class Recommendation { stock Stock amount float action "buy" | "sell" @@stream.done } class AssistantMessage { message_type "greeting" | "conversation" | "farewell" @stream.not_null message string @stream.with_state @stream.not_null } function Respond( history: (UserMessage | AssistantMessage | Recommendation)[] ) -> Message | Recommendation { client DeepseekR1 prompt #" Make the message in the conversation, using a conversational message or a stock recommendation, based on this conversation history: {{ history }}. {{ ctx.output_format }} "# } ``` The above BAML code will generate the following Python definitions in the `partial_types` module. The use of streaming attributes has several effects on the generated code: * `Recommendation` does not have any partial fields because it was marked `@stream.done`. * The `Message.message` `string` is wrapped in `StreamState`, allowing runtime checking of its completion status. This status could be used to render a spinner as the message streams in. * The `Message.message_type` field may not be `null`, because it was marked as `@stream.not_null`. ```python class StreamState(BaseModel, Generic[T]): value: T, state: Literal["Pending", "Incomplete", "Complete"] class Stock(str, Enum): APPL = "APPL" MSFT = "MSFT" GOOG = "GOOG" BAML = "BAML" class Recommendation(BaseClass): stock: Stock amount: float action: Literal["buy", "sell"] class Message(BaseClass): message_type: Literal["gretting","conversation","farewell"] message: StreamState[string] ``` This BAML code will generate the following Typescript definitions in the `partial_types` module. The use of streaming attributes has several effects on the generated code: * `Recommendation` does not have any partial fields because it was marked `@stream.done`. * The `Message.message` `string` is wrapped in `StreamState`, allowing runtime checking of its completion status. This status could be used to render a spinner as the message streams in. * The `Message.message_type` field may not be `null`, because it was marked as `@stream.not_null`. ```typescript export interface StreamState { value: T, state: "Pending" | "Incomplete" | "Complete" } export enum Category { APPL = "APPl", MSFT = "MSFT", GOOG = "GOOG", BAML = "BAML", } export interface Recommendation { stock: Stock, amount: float, action: "buy" | "sell" } export interface Message { message_type: "gretting" | "conversation" | "farewell" message: StreamState } ```