***
## slug: /guide/baml-basics/streaming
BAML lets you stream in structured JSON output from LLMs as it comes in.
If you tried streaming in a JSON output from an LLM you'd see something like:
```
{"items": [{"name": "Appl
{"items": [{"name": "Apple", "quantity": 2, "price": 1.
{"items": [{"name": "Apple", "quantity": 2, "price": 1.50}], "total_cost":
{"items": [{"name": "Apple", "quantity": 2, "price": 1.50}], "total_cost": 3.00} # Completed
```
BAML gives you fine-grained control of how it fixes this partial JSON and transforms
it into a series of semantically valid partial objects.
You can check out more examples (including streaming in FastAPI and NextJS) in the
[BAML Examples]
repo.
[call BAML functions]: /docs/calling-baml/calling-functions
[BAML Examples]: https://github.com/BoundaryML/baml-examples/tree/main
Let's stream the output of this function `function ExtractReceiptInfo(email: string) -> ReceiptInfo` for our example:
```rust
class ReceiptItem {
name string
description string?
quantity int
price float
}
class ReceiptInfo {
items ReceiptItem[]
total_cost float?
}
function ExtractReceiptInfo(email: string) -> ReceiptInfo {
client GPT4o
prompt #"
Given the receipt below:
{{ email }}
{{ ctx.output_format }}
"#
}
```
The BAML code generator creates a set of types in the `baml_client` library
in a module called `partial_types` in `baml_client`. These types are modified
from your original types to support streaming.
By default, BAML will convert all Class fields into nullable fields, and
fill those fields with non-null values as much as possible given the tokens
received so far.
BAML will generate `b.stream.ExtractReceiptInfo()` for you, which you can use like so:
```python main.py
import asyncio
from baml_client import b, partial_types, types
# Using a stream:
def example1(receipt: str):
stream = b.stream.ExtractReceiptInfo(receipt)
# partial is a Partial type with all Optional fields
for partial in stream:
print(f"partial: parsed {len(partial.items)} items (object: {partial})")
# final is the full, original, validated ReceiptInfo type
final = stream.get_final_response()
print(f"final: {len(final.items)} items (object: {final})")
# Using only get_final_response() of a stream
#
# In this case, you should just use b.ExtractReceiptInfo(receipt) instead,
# which is slightly faster and more efficient.
def example2(receipt: str):
final = b.stream.ExtractReceiptInfo(receipt).get_final_response()
print(f"final: {len(final.items)} items (object: {final})")
# Using the async client:
async def example3(receipt: str):
# Note the import of the async client
from baml_client.async_client import b
stream = b.stream.ExtractReceiptInfo(receipt)
async for partial in stream:
print(f"partial: parsed {len(partial.items)} items (object: {partial})")
final = await stream.get_final_response()
print(f"final: {len(final.items)} items (object: {final})")
receipt = """
04/14/2024 1:05 pm
Ticket: 220000082489
Register: Shop Counter
Employee: Connor
Customer: Sam
Item # Price
Guide leash (1 Pair) uni UNI
1 $34.95
The Index Town Walls
1 $35.00
Boot Punch
3 $60.00
Subtotal $129.95
Tax ($129.95 @ 9%) $11.70
Total Tax $11.70
Total $141.65
"""
if __name__ == '__main__':
#uncomment one at a time and run to see the difference
example1(receipt)
#example2(receipt)
#asyncio.run(example3(receipt))
```
BAML will generate `b.stream.ExtractReceiptInfo()` for you, which you can use like so:
```ts main.ts
import { b } from './baml_client'
// Using both async iteration and getFinalResponse() from a stream
const example1 = async (receipt: string) => {
const stream = b.stream.ExtractReceiptInfo(receipt)
// partial is a Partial type with all Optional fields
for await (const partial of stream) {
console.log(`partial: ${partial.items?.length} items (object: ${partial})`)
}
// final is the full, original, validated ReceiptInfo type
const final = await stream.getFinalResponse()
console.log(`final: ${final.items.length} items (object: ${final})`)
}
// Using only async iteration of a stream
const example2 = async (receipt: string) => {
for await (const partial of b.stream.ExtractReceiptInfo(receipt)) {
console.log(`partial: ${partial.items?.length} items (object: ${partial})`)
}
}
// Using only getFinalResponse() of a stream
//
// In this case, you should just use b.ExtractReceiptInfo(receipt) instead,
// which is faster and more efficient.
const example3 = async (receipt: string) => {
const final = await b.stream.ExtractReceiptInfo(receipt).getFinalResponse()
console.log(`final: ${final.items.length} items (object: ${final})`)
}
const receipt = `
04/14/2024 1:05 pm
Ticket: 220000082489
Register: Shop Counter
Employee: Connor
Customer: Sam
Item # Price
Guide leash (1 Pair) uni UNI
1 $34.95
The Index Town Walls
1 $35.00
Boot Punch
3 $60.00
Subtotal $129.95
Tax ($129.95 @ 9%) $11.70
Total Tax $11.70
Total $141.65
`
if (require.main === module) {
example1(receipt)
example2(receipt)
example3(receipt)
}
```
BAML will generate `b.Stream.ExtractReceiptInfo()` for you, which you can use like so:
```go main.go
package main
import (
"context"
"fmt"
"log"
"sync"
"time"
b "example.com/myproject/baml_client"
"example.com/myproject/baml_client/stream_types"
"example.com/myproject/baml_client/types"
)
// Basic streaming with comprehensive error handling and context cancellation
func basicStreamingExample(receipt string) {
// Create context with timeout to prevent hanging
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel() // Always clean up context resources
stream, err := b.Stream.ExtractReceiptInfo(ctx, receipt)
if err != nil {
log.Printf("Failed to create stream: %v", err)
return
}
// Ensure stream is properly closed on exit
defer func() {
if stream != nil {
// Note: In practice, range automatically handles closing
// but explicit cleanup is shown here for demonstration
log.Println("Stream processing completed")
}
}()
for value := range stream {
// Handle context cancellation
select {
case <-ctx.Done():
log.Printf("Stream cancelled due to context: %v", ctx.Err())
return
default:
}
// Handle streaming errors
if value.IsError {
log.Printf("Stream error: %v", value.Error)
return
}
// Process partial results
if !value.IsFinal && value.Stream() != nil {
partial := *value.Stream()
fmt.Printf("Partial result: parsed %d items so far\n", len(partial.Items))
// You could process partial results here
for i, item := range partial.Items {
if item.Name != "" { // Only show items with names parsed so far
fmt.Printf(" Item %d: %s - %s\n", i+1, item.Name, item.Price)
}
}
}
// Process final result
if value.IsFinal && value.Final() != nil {
final := *value.Final()
fmt.Printf("Final result: %d items total\n", len(final.Items))
fmt.Printf("Total amount: %s\n", final.Total)
return
}
}
}
// Stream with early termination based on conditions
func streamWithEarlyTermination(receipt string) (*types.ReceiptInfo, error) {
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
stream, err := b.Stream.ExtractReceiptInfo(ctx, receipt)
if err != nil {
return nil, fmt.Errorf("failed to create stream: %w", err)
}
for value := range stream {
// Check for cancellation
select {
case <-ctx.Done():
return nil, fmt.Errorf("stream cancelled: %w", ctx.Err())
default:
}
if value.IsError {
return nil, fmt.Errorf("stream error: %w", value.Error)
}
// Early termination condition: stop if we have enough items
if !value.IsFinal && value.Stream() != nil {
partial := *value.Stream()
if len(partial.Items) >= 3 { // Stop early if we have 3+ items
fmt.Printf("Early termination: found %d items, stopping stream\n", len(partial.Items))
cancel() // Cancel context to stop stream
return &partial, nil
}
}
if value.IsFinal && value.Final() != nil {
final := *value.Final()
return &final, nil
}
}
return nil, fmt.Errorf("stream ended without final response")
}
// Concurrent streaming - process multiple receipts concurrently
func concurrentStreamingExample(receipts []string) {
var wg sync.WaitGroup
results := make(chan *types.ReceiptInfo, len(receipts))
errors := make(chan error, len(receipts))
// Create context with timeout for all goroutines
ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second)
defer cancel()
for i, receipt := range receipts {
wg.Add(1)
go func(index int, receiptData string) {
defer wg.Done()
// Create per-goroutine context
goroutineCtx, goroutineCancel := context.WithTimeout(ctx, 30*time.Second)
defer goroutineCancel()
stream, err := b.Stream.ExtractReceiptInfo(goroutineCtx, receiptData)
if err != nil {
errors <- fmt.Errorf("receipt %d: failed to create stream: %w", index, err)
return
}
for value := range stream {
select {
case <-goroutineCtx.Done():
errors <- fmt.Errorf("receipt %d: stream cancelled: %w", index, goroutineCtx.Err())
return
default:
}
if value.IsError {
errors <- fmt.Errorf("receipt %d: stream error: %w", index, value.Error)
return
}
if value.IsFinal && value.Final() != nil {
final := *value.Final()
fmt.Printf("Receipt %d: processed %d items\n", index, len(final.Items))
results <- &final
return
}
}
errors <- fmt.Errorf("receipt %d: stream ended without final response", index)
}(i, receipt)
}
// Wait for all goroutines and close channels
go func() {
wg.Wait()
close(results)
close(errors)
}()
// Collect results and errors
var successCount int
var errorCount int
for results != nil || errors != nil {
select {
case result, ok := <-results:
if !ok {
results = nil
continue
}
if result != nil {
successCount++
fmt.Printf("Successfully processed receipt with %d items, total: %s\n",
len(result.Items), result.Total)
}
case err, ok := <-errors:
if !ok {
errors = nil
continue
}
if err != nil {
errorCount++
log.Printf("Error processing receipt: %v", err)
}
}
}
fmt.Printf("Concurrent processing completed: %d successes, %d errors\n",
successCount, errorCount)
}
// Robust streaming with retry logic
func streamWithRetry(receipt string, maxRetries int) (*types.ReceiptInfo, error) {
for attempt := 1; attempt <= maxRetries; attempt++ {
// Create fresh context for each attempt
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
stream, err := b.Stream.ExtractReceiptInfo(ctx, receipt)
if err != nil {
cancel()
if attempt == maxRetries {
return nil, fmt.Errorf("failed after %d attempts: %w", maxRetries, err)
}
log.Printf("Attempt %d failed: %v, retrying...", attempt, err)
time.Sleep(time.Duration(attempt) * time.Second) // Exponential backoff
continue
}
for value := range stream {
select {
case <-ctx.Done():
cancel()
if attempt == maxRetries {
return nil, fmt.Errorf("stream timeout after %d attempts: %w", maxRetries, ctx.Err())
}
log.Printf("Attempt %d timed out, retrying...", attempt)
break
default:
}
if value.IsError {
cancel()
if attempt == maxRetries {
return nil, fmt.Errorf("stream failed after %d attempts: %w", maxRetries, value.Error)
}
log.Printf("Attempt %d failed with stream error: %v, retrying...", attempt, value.Error)
time.Sleep(time.Duration(attempt) * time.Second)
break
}
if value.IsFinal && value.Final() != nil {
final := *value.Final()
cancel()
return &final, nil
}
}
}
return nil, fmt.Errorf("all %d attempts failed", maxRetries)
}
func main() {
receipt := `04/14/2024 1:05 pm
Ticket: 220000082489
Register: Shop Counter
Employee: Connor
Customer: Sam
Item # Price
Guide leash (1 Pair) uni UNI
1 $34.95
The Index Town Walls
1 $35.00
Boot Punch
3 $60.00
Subtotal $129.95
Tax ($129.95 @ 9%) $11.70
Total Tax $11.70
Total $141.65`
fmt.Println("=== Basic Streaming Example ===")
basicStreamingExample(receipt)
fmt.Println("\n=== Stream with Early Termination ===")
result, err := streamWithEarlyTermination(receipt)
if err != nil {
log.Printf("Early termination example failed: %v", err)
} else if result != nil {
fmt.Printf("Early termination result: %d items\n", len(result.Items))
}
fmt.Println("\n=== Concurrent Streaming Example ===")
receipts := []string{receipt, receipt, receipt} // Process same receipt 3 times concurrently
concurrentStreamingExample(receipts)
fmt.Println("\n=== Stream with Retry Example ===")
retryResult, err := streamWithRetry(receipt, 3)
if err != nil {
log.Printf("Retry example failed: %v", err)
} else if retryResult != nil {
fmt.Printf("Retry example succeeded: %d items\n", len(retryResult.Items))
}
}
```
BAML will generate `Baml.Client.stream.ExtractReceiptInfo()` for you,
which you can use like so:
```ruby main.rb
require_relative "baml_client/client"
$b = Baml.Client
# Using both iteration and get_final_response() from a stream
def example1(receipt)
stream = $b.stream.ExtractReceiptInfo(receipt)
stream.each do |partial|
puts "partial: #{partial.items&.length} items"
end
final = stream.get_final_response
puts "final: #{final.items.length} items"
end
# Using only iteration of a stream
def example2(receipt)
$b.stream.ExtractReceiptInfo(receipt).each do |partial|
puts "partial: #{partial.items&.length} items"
end
end
# Using only get_final_response() of a stream
#
# In this case, you should just use BamlClient.ExtractReceiptInfo(receipt) instead,
# which is faster and more efficient.
def example3(receipt)
final = $b.stream.ExtractReceiptInfo(receipt).get_final_response
puts "final: #{final.items.length} items"
end
receipt = <<~RECEIPT
04/14/2024 1:05 pm
Ticket: 220000082489
Register: Shop Counter
Employee: Connor
Customer: Sam
Item # Price
Guide leash (1 Pair) uni UNI
1 $34.95
The Index Town Walls
1 $35.00
Boot Punch
3 $60.00
Subtotal $129.95
Tax ($129.95 @ 9%) $11.70
Total Tax $11.70
Total $141.65
RECEIPT
if __FILE__ == $0
example1(receipt)
example2(receipt)
example3(receipt)
end
```
BAML will generate `B.ExtractReceiptInfo.stream()` for you, which you can use like so:
```rust main.rs
use myproject::baml_client::sync_client::B;
use myproject::baml_client::types::*;
// Using both partials and get_final_response() from a stream
fn example1(receipt: &str) {
let mut stream = B.ExtractReceiptInfo.stream(receipt).unwrap();
// partial is a Partial type with all Optional fields
for partial in stream.partials() {
let partial = partial.unwrap();
println!("partial: {} items", partial.items.len());
}
// final is the full, original, validated ReceiptInfo type
let final_result = stream.get_final_response().unwrap();
println!("final: {} items", final_result.items.len());
}
// Using only get_final_response() of a stream
//
// In this case, you should just use B.ExtractReceiptInfo.call(receipt) instead,
// which is slightly faster and more efficient.
fn example2(receipt: &str) {
let stream = B.ExtractReceiptInfo.stream(receipt).unwrap();
let final_result = stream.get_final_response().unwrap();
println!("final: {} items", final_result.items.len());
}
fn main() {
let receipt = "04/14/2024 1:05 pm\n\nTicket: 220000082489\nRegister: Shop Counter\nEmployee: Connor\nCustomer: Sam\nItem\t#\tPrice\nGuide leash (1 Pair) uni UNI\n1\t$34.95\nThe Index Town Walls\n1\t$35.00\nBoot Punch\n3\t$60.00\nSubtotal\t$129.95\nTax ($129.95 @ 9%)\t$11.70\nTotal Tax\t$11.70\nTotal\t$141.65";
example1(receipt);
example2(receipt);
}
```
When using `baml-cli serve`, streaming is available via `http://localhost:2024/stream/{FunctionName}`.
However streaming routes are not added to the `openapi.yaml` file because there are no
partial type definitions for JSON schema yet.
Number fields are always streamed in only when the LLM completes them. E.g. if
the final number is 129.95, you'll only see null or 129.95 instead of partial
numbers like 1, 12, 129.9, etc.
## Cancelling Streams
You can cancel ongoing streams using abort controllers, which is essential for responsive applications that allow users to stop generation or implement timeouts.
```typescript
import { b } from './baml_client'
const controller = new AbortController()
const stream = b.stream.ExtractReceiptInfo(receipt, {
abortController: controller
})
// Process stream with ability to cancel
let itemCount = 0
for await (const partial of stream) {
itemCount = partial.items?.length || 0
console.log(`Received ${itemCount} items so far`)
// Cancel if we have enough items
if (itemCount >= 5) {
console.log('Stopping stream - got enough items')
controller.abort()
break
}
}
// Or cancel after a timeout
setTimeout(() => {
controller.abort()
console.log('Stream cancelled due to timeout')
}, 5000)
```
```python
from baml_client.async_client import b
from baml_py import AbortController
controller = AbortController()
stream = b.stream.ExtractReceiptInfo(
receipt,
baml_options={"abort_controller": controller}
)
# Process stream with ability to cancel
item_count = 0
async for partial in stream:
item_count = len(partial.items) if partial.items else 0
print(f"Received {item_count} items so far")
# Cancel if we have enough items
if item_count >= 5:
print("Stopping stream - got enough items")
controller.abort()
break
# Or cancel after a timeout
import asyncio
async def cancel_after_timeout():
await asyncio.sleep(5)
controller.abort()
print("Stream cancelled due to timeout")
asyncio.create_task(cancel_after_timeout())
```
```go
// Go already uses context for cancellation in the examples above
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
stream, err := b.Stream.ExtractReceiptInfo(ctx, receipt)
if err != nil {
log.Printf("Failed to create stream: %v", err)
return
}
for value := range stream {
// Stream will automatically stop when context is cancelled
select {
case <-ctx.Done():
log.Printf("Stream cancelled: %v", ctx.Err())
return
default:
}
// Process partial results
if !value.IsFinal && value.Stream() != nil {
partial := *value.Stream()
if len(partial.Items) >= 5 {
log.Printf("Stopping stream - got %d items", len(partial.Items))
cancel() // Cancel the context to stop the stream
return
}
}
}
```
```ruby
require 'baml_client'
controller = Baml::AbortController.new
stream = $b.stream.ExtractReceiptInfo(
receipt,
baml_options: { abort_controller: controller }
)
# Process stream with ability to cancel
item_count = 0
stream.each do |partial|
item_count = partial.items&.length || 0
puts "Received #{item_count} items so far"
# Cancel if we have enough items
if item_count >= 5
puts "Stopping stream - got enough items"
controller.abort
break
end
end
# Or cancel after a timeout (in a separate thread)
Thread.new do
sleep(5)
controller.abort
puts "Stream cancelled due to timeout"
end
```
```rust
use baml::CancellationToken;
use myproject::baml_client::sync_client::B;
use std::time::Duration;
// Cancel a stream after a timeout
let token = CancellationToken::new_with_timeout(Duration::from_secs(5));
let mut stream = B.ExtractReceiptInfo
.with_cancellation_token(Some(token))
.stream(receipt)
.unwrap();
for partial in stream.partials() {
match partial {
Ok(partial) => {
println!("Received {} items so far", partial.items.len());
// Stop consuming the stream if we have enough items
if partial.items.len() >= 5 {
println!("Stopping stream - got enough items");
break;
}
}
Err(e) => {
eprintln!("Stream error: {}", e);
break;
}
}
}
```
### Common Streaming Cancellation Patterns
#### User-Initiated Cancellation
Allow users to stop streaming generation with a "Stop" button:
```tsx
function StreamingComponent() {
const [controller, setController] = useState(null)
const [isStreaming, setIsStreaming] = useState(false)
const [result, setResult] = useState("")
const startStreaming = async () => {
const newController = new AbortController()
setController(newController)
setIsStreaming(true)
try {
const stream = b.stream.GenerateContent(prompt, {
abortController: newController
})
let accumulated = ""
for await (const partial of stream) {
accumulated = partial.content || ""
setResult(accumulated)
}
} catch (error) {
if (error.name === 'BamlAbortError') {
console.log('Stream cancelled by user')
}
} finally {
setIsStreaming(false)
setController(null)
}
}
const stopStreaming = () => {
controller?.abort()
}
return (
{result}
)
}
```
```python
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from baml_py import AbortController
import asyncio
app = FastAPI()
active_streams = {}
@app.post("/stream/{stream_id}")
async def start_stream(stream_id: str, prompt: str):
controller = AbortController()
active_streams[stream_id] = controller
async def generate():
try:
stream = b.stream.GenerateContent(
prompt,
baml_options={"abort_controller": controller}
)
async for partial in stream:
if controller.aborted:
break
yield f"data: {partial.content}\n\n"
except BamlAbortError:
yield "data: [CANCELLED]\n\n"
finally:
active_streams.pop(stream_id, None)
return StreamingResponse(generate(), media_type="text/event-stream")
@app.post("/stop/{stream_id}")
async def stop_stream(stream_id: str):
if controller := active_streams.get(stream_id):
controller.abort()
return {"status": "stopped"}
return {"status": "not found"}
```
For more examples and patterns, see the [Abort Controllers guide](/guide/baml-basics/abort-signal).
## Semantic Streaming
BAML provides powerful attributes to control how your data streams, ensuring that partial values always maintain semantic validity. Here are the three key streaming attributes:
### `@stream.done`
This attribute ensures a type or field is only streamed when it's completely finished. It's useful when you need atomic, fully-formed values.
For example:
```baml
class ReceiptItem {
name string
quantity int
price float
// The entire ReceiptItem will only stream when complete
@@stream.done
}
// Receipts is a list of ReceiptItems,
// each internal item will only stream when complete
type Receipts = ReceiptItem[]
class Person {
// Name will only appear when fully complete,
// until then it will be null
name string @stream.done
// Numbers (floats and ints) will only appear
// when fully complete by default
age int
// Bio will stream token by token
bio string
}
```
#### Atomic list items with union types
A common pattern is streaming a list of items where each item can be one of
several types (e.g. tool calls and messages). You can use `@stream.done` on
the list element type to ensure each item only appears once it's fully complete:
```baml
class ToolCall {
name string
parameters string
}
class Message {
role string
content string
}
type OutputItem = ToolCall | Message
// Each list element appears only when fully complete.
// The list grows incrementally as items finish.
function Run(input: string) -> (OutputItem @stream.done)[] {
client MyClient
prompt #"
{{ input }}
{{ ctx.output_format }}
"#
}
```
When `@stream.done` is applied to a union type, it propagates to all variants.
This means you don't need to add `@@stream.done` to each class individually —
annotating the union is sufficient.
You can also achieve the same behavior by adding `@@stream.done` to each
class in the union. The `(T @stream.done)[]` syntax is more concise when
the classes are used in other contexts where you don't want `@@stream.done`.
### `@stream.not_null`
This attribute ensures a containing object is only streamed when this field has a value. It's particularly useful for discriminator fields or required metadata.
For example:
```baml
class Message {
// Message won't stream until type is known
type "error" | "success" | "info" @stream.not_null
// Timestamp will only appear when fully complete
// until then it will be null
timestamp string @stream.done
// Content can stream token by token
content string
}
```
### `@stream.with_state`
This attribute adds metadata to track if a field has finished streaming. It's perfect for showing loading states in UIs.
For example:
```baml
class BlogPost {
// The blog post will only stream when title is known
title string @stream.done @stream.not_null
// The content will stream token by token, and include completion state
content string @stream.with_state
}
```
This will generate the following code in the `partial_types` module:
```python
class StreamState(BaseModel, Generic[T]):
value: T,
state: "incomplete" | "complete"
class BlogPost(BaseModel):
title: str
content: StreamState[str | None]
```
```typescript
interface StreamState {
value: T,
state: "incomplete" | "complete"
}
interface BlogPost {
title: StreamState
content: StreamState
}
```
### Type Transformation Summary
Here's how these attributes affect your types in generated code:
| BAML Type | Generated Type (during streaming) | Description |
| --------------------------------- | --------------------------------- | ----------------------------------------- |
| `T` | `Partial[T]?` | Default: Nullable and partial |
| `T @stream.done` | `T?` | Nullable but always complete when present |
| `T @stream.not_null` | `Partial[T]` | Always present but may be partial |
| `T @stream.done @stream.not_null` | `T` | Always present and always complete |
| `T @stream.with_state` | `StreamState[Partial[T]?]` | Includes streaming state metadata |
The return type of a function is not affected by streaming attributes!
## Putting it all together
Let's put all of these concepts together to design an application that
streams a conversation containing stock recommendations, using semantic
streaming to ensure that the streamed data obeys our domain's invariants.
```baml
enum Stock {
APPL
MSFT
GOOG
BAML
}
// Make recommendations atomic - we do not want a recommendation to be
// modified by streaming additional messages.
class Recommendation {
stock Stock
amount float
action "buy" | "sell"
@@stream.done
}
class AssistantMessage {
message_type "greeting" | "conversation" | "farewell" @stream.not_null
message string @stream.with_state @stream.not_null
}
function Respond(
history: (UserMessage | AssistantMessage | Recommendation)[]
) -> Message | Recommendation {
client DeepseekR1
prompt #"
Make the message in the conversation, using a conversational
message or a stock recommendation, based on this conversation history:
{{ history }}.
{{ ctx.output_format }}
"#
}
```
The above BAML code will generate the following Python definitions in the
`partial_types` module. The use of streaming attributes has several effects on
the generated code:
* `Recommendation` does not have any partial fields because it was marked
`@stream.done`.
* The `Message.message` `string` is wrapped in `StreamState`, allowing
runtime checking of its completion status. This status could be used
to render a spinner as the message streams in.
* The `Message.message_type` field may not be `null`, because it was marked
as `@stream.not_null`.
```python
class StreamState(BaseModel, Generic[T]):
value: T,
state: Literal["Pending", "Incomplete", "Complete"]
class Stock(str, Enum):
APPL = "APPL"
MSFT = "MSFT"
GOOG = "GOOG"
BAML = "BAML"
class Recommendation(BaseClass):
stock: Stock
amount: float
action: Literal["buy", "sell"]
class Message(BaseClass):
message_type: Literal["gretting","conversation","farewell"]
message: StreamState[string]
```
This BAML code will generate the following Typescript definitions in the
`partial_types` module. The use of streaming attributes has several effects on
the generated code:
* `Recommendation` does not have any partial fields because it was marked
`@stream.done`.
* The `Message.message` `string` is wrapped in `StreamState`, allowing
runtime checking of its completion status. This status could be used
to render a spinner as the message streams in.
* The `Message.message_type` field may not be `null`, because it was marked
as `@stream.not_null`.
```typescript
export interface StreamState {
value: T,
state: "Pending" | "Incomplete" | "Complete"
}
export enum Category {
APPL = "APPl",
MSFT = "MSFT",
GOOG = "GOOG",
BAML = "BAML",
}
export interface Recommendation {
stock: Stock,
amount: float,
action: "buy" | "sell"
}
export interface Message {
message_type: "gretting" | "conversation" | "farewell"
message: StreamState
}
```