> For clean Markdown of any page, append .md to the page URL.
> For a complete documentation index, see https://docs.boundaryml.com/llms.txt.
> For AI client integration (Claude Code, Cursor, etc.), connect to the MCP server at https://docs.boundaryml.com/_mcp/server.

# Concurrent function calls

We’ll use `function ClassifyMessage(input: string) -> Category` for our example:

```baml
enum Category {
    Refund
    CancelOrder
    TechnicalSupport
    AccountIssue
    Question
}

function ClassifyMessage(input: string) -> Category {
  client GPT4o
  prompt #"
    Classify the following INPUT into ONE
    of the following categories:

    INPUT: {{ input }}

    {{ ctx.output_format }}

    Response:
  "#
}
```

You can make concurrent `b.ClassifyMessage()` calls like so:

```python main.py
import asyncio

from baml_client.async_client import b
from baml_client.types import Category

async def main():
    await asyncio.gather(
        b.ClassifyMessage("I want to cancel my order"),
        b.ClassifyMessage("I want a refund")
    )

if __name__ == '__main__':
    asyncio.run(main())
```

You can make concurrent `b.ClassifyMessage()` calls like so:

```ts main.ts
import { b } from './baml_client'
import { Category } from './baml_client/types'
import assert from 'assert'

const main = async () => {
  const category = await Promise.all(
    b.ClassifyMessage('I want to cancel my order'),
    b.ClassifyMessage('I want a refund'),
  )
}

if (require.main === module) {
  main()
}

```

You can make concurrent `b.ClassifyMessage()` calls using goroutines:

```go main.go
package main

import (
    "context"
    "sync"

    b "example.com/myproject/baml_client"
    "example.com/myproject/baml_client/types"
)

func main() {
    ctx := context.Background()
    
    var wg sync.WaitGroup
    results := make(chan types.Category, 2)
    
    // Launch concurrent goroutines
    wg.Add(2)
    
    go func() {
        defer wg.Done()
        result, err := b.ClassifyMessage(ctx, "I want to cancel my order")
        if err == nil {
            results <- result
        }
    }()
    
    go func() {
        defer wg.Done()
        result, err := b.ClassifyMessage(ctx, "I want a refund")
        if err == nil {
            results <- result
        }
    }()
    
    wg.Wait()
    close(results)
    
    // Collect results
    for result := range results {
        // Handle each result
        _ = result
    }
}
```

BAML Ruby (beta) does not currently support async/concurrent calls.

Please [contact us](/contact) if this is something you need.

You can make concurrent `B.ClassifyMessage.call()` calls using threads:

```rust main.rs
use myproject::baml_client::sync_client::B;
use myproject::baml_client::types::*;
use std::thread;

fn main() {
    let handles: Vec<_> = vec![
        "I want to cancel my order",
        "I want a refund",
    ]
    .into_iter()
    .map(|msg| {
        let msg = msg.to_string();
        thread::spawn(move || B.ClassifyMessage.call(&msg))
    })
    .collect();

    let results: Vec<Category> = handles
        .into_iter()
        .map(|h| h.join().unwrap().unwrap())
        .collect();
}
```

## Cancelling Parallel Operations

When running multiple operations in parallel, you can use abort controllers to cancel them all at once or individually.

### Cancel All Operations

Use a single abort controller to cancel all parallel operations:

```typescript
import { b } from './baml_client'

const controller = new AbortController()

// Start multiple operations with the same controller
const promises = [
  b.ClassifyMessage('I want to cancel my order', { abortController: controller }),
  b.ClassifyMessage('I want a refund', { abortController: controller }),
  b.ClassifyMessage('Is my package shipped?', { abortController: controller })
]

// Cancel all operations after 2 seconds
setTimeout(() => {
  controller.abort()
  console.log('All operations cancelled')
}, 2000)

try {
  const results = await Promise.all(promises)
  console.log('All completed:', results)
} catch (error) {
  if (error.name === 'BamlAbortError') {
    console.log('Operations were cancelled')
  }
}
```

```python
import asyncio
from baml_client.async_client import b
from baml_py import AbortController, BamlAbortError

async def main():
    controller = AbortController()
    
    # Start multiple operations with the same controller
    tasks = [
        b.ClassifyMessage(
            'I want to cancel my order',
            baml_options={"abort_controller": controller}
        ),
        b.ClassifyMessage(
            'I want a refund',
            baml_options={"abort_controller": controller}
        ),
        b.ClassifyMessage(
            'Is my package shipped?',
            baml_options={"abort_controller": controller}
        )
    ]
    
    # Cancel all operations after 2 seconds
    async def cancel_after_timeout():
        await asyncio.sleep(2)
        controller.abort()
        print('All operations cancelled')
    
    asyncio.create_task(cancel_after_timeout())
    
    try:
        results = await asyncio.gather(*tasks)
        print('All completed:', results)
    except BamlAbortError:
        print('Operations were cancelled')
```

```go
package main

import (
    "context"
    "fmt"
    "sync"
    "time"

    b "example.com/myproject/baml_client"
)

func main() {
    // Create a cancellable context
    ctx, cancel := context.WithCancel(context.Background())

    // Cancel all operations after 2 seconds
    go func() {
        time.Sleep(2 * time.Second)
        cancel()
        fmt.Println("All operations cancelled")
    }()

    var wg sync.WaitGroup
    messages := []string{
        "I want to cancel my order",
        "I want a refund",
        "Is my package shipped?",
    }

    for _, msg := range messages {
        wg.Add(1)
        go func(message string) {
            defer wg.Done()
            result, err := b.ClassifyMessage(ctx, message)
            if err != nil {
                if err == context.Canceled {
                    fmt.Printf("Cancelled: %s\n", message)
                }
                return
            }
            fmt.Printf("Completed: %s -> %v\n", message, result)
        }(msg)
    }

    wg.Wait()
}
```

```rust
use baml::CancellationToken;
use myproject::baml_client::sync_client::B;
use std::thread;
use std::time::Duration;

fn main() {
    // Create a shared cancellation token
    let token = CancellationToken::new();
    let token_for_cancel = token.clone();

    // Cancel all operations after 2 seconds
    thread::spawn(move || {
        thread::sleep(Duration::from_secs(2));
        token_for_cancel.cancel();
        println!("All operations cancelled");
    });

    let messages = vec![
        "I want to cancel my order",
        "I want a refund",
        "Is my package shipped?",
    ];

    let handles: Vec<_> = messages
        .into_iter()
        .map(|msg| {
            let t = token.clone();
            thread::spawn(move || {
                let result = B.ClassifyMessage
                    .with_cancellation_token(Some(t))
                    .call(msg);
                match result {
                    Ok(category) => println!("Completed: {} -> {:?}", msg, category),
                    Err(e) => println!("Cancelled: {}", msg),
                }
            })
        })
        .collect();

    for handle in handles {
        handle.join().unwrap();
    }
}
```

### Cancel Individual Operations

Use separate controllers to cancel operations independently:

```typescript
const controllers = [
  new AbortController(),
  new AbortController(),
  new AbortController()
]

const promises = [
  b.ClassifyMessage('I want to cancel my order', { abortController: controllers[0] }),
  b.ClassifyMessage('I want a refund', { abortController: controllers[1] }),
  b.ClassifyMessage('Is my package shipped?', { abortController: controllers[2] })
]

// Cancel only the second operation
controllers[1].abort()

const results = await Promise.allSettled(promises)
results.forEach((result, index) => {
  if (result.status === 'fulfilled') {
    console.log(`Operation ${index} completed:`, result.value)
  } else {
    console.log(`Operation ${index} failed:`, result.reason.message)
  }
})
```

```python
controllers = [
    AbortController(),
    AbortController(),
    AbortController()
]

tasks = [
    b.ClassifyMessage(
        'I want to cancel my order',
        baml_options={"abort_controller": controllers[0]}
    ),
    b.ClassifyMessage(
        'I want a refund',
        baml_options={"abort_controller": controllers[1]}
    ),
    b.ClassifyMessage(
        'Is my package shipped?',
        baml_options={"abort_controller": controllers[2]}
    )
]

# Cancel only the second operation
controllers[1].abort()

# Use gather with return_exceptions to handle partial failures
results = await asyncio.gather(*tasks, return_exceptions=True)
for index, result in enumerate(results):
    if isinstance(result, Exception):
        print(f"Operation {index} failed: {result}")
    else:
        print(f"Operation {index} completed: {result}")
```

### Fastest Request Wins

Race multiple LLM providers and cancel slower ones when the fastest completes. This pattern is useful for optimizing latency by using whichever provider responds first.

```typescript
import { ClientRegistry } from '@boundaryml/baml'

async function fastestProviderWins(message: string) {
  const controllers = [
    new AbortController(),
    new AbortController(),
    new AbortController()
  ]

  // Create separate client registries for each provider
  const openaiRegistry = new ClientRegistry()
  openaiRegistry.addLlmClient('OpenAI', 'openai', {
    model: 'gpt-5-mini',
    api_key: process.env.OPENAI_API_KEY
  })
  openaiRegistry.setPrimary('OpenAI')

  const anthropicRegistry = new ClientRegistry()
  anthropicRegistry.addLlmClient('Anthropic', 'anthropic', {
    model: 'claude-3-5-haiku-20241022',
    api_key: process.env.ANTHROPIC_API_KEY
  })
  anthropicRegistry.setPrimary('Anthropic')

  const geminiRegistry = new ClientRegistry()
  geminiRegistry.addLlmClient('Gemini', 'vertex-ai', {
    model: 'gemini-2.5-flash',
    location: 'us-central1',
    credentials: process.env.GOOGLE_APPLICATION_CREDENTIALS
  })
  geminiRegistry.setPrimary('Gemini')

  const promises = [
    b.ClassifyMessage(message, {
      clientRegistry: openaiRegistry,
      abortController: controllers[0]
    }),
    b.ClassifyMessage(message, {
      clientRegistry: anthropicRegistry,
      abortController: controllers[1]
    }),
    b.ClassifyMessage(message, {
      clientRegistry: geminiRegistry,
      abortController: controllers[2]
    })
  ]

  try {
    // Wait for the first to complete
    const result = await Promise.race(promises)
    
    // Cancel the others
    controllers.forEach(c => c.abort())
    
    return result
  } catch (error) {
    // All failed - cancel any still running
    controllers.forEach(c => c.abort())
    throw error
  }
}
```

```python
import os
from baml_py import ClientRegistry

async def fastest_provider_wins(message: str):
    controllers = [
        AbortController(),
        AbortController(),
        AbortController()
    ]
    
    # Create separate client registries for each provider
    openai_registry = ClientRegistry()
    openai_registry.add_llm_client('OpenAI', 'openai', {
        'model': 'gpt-5-mini',
        'api_key': os.environ.get('OPENAI_API_KEY')
    })
    openai_registry.set_primary('OpenAI')
    
    anthropic_registry = ClientRegistry()
    anthropic_registry.add_llm_client('Anthropic', 'anthropic', {
        'model': 'claude-3-5-haiku-20241022',
        'api_key': os.environ.get('ANTHROPIC_API_KEY')
    })
    anthropic_registry.set_primary('Anthropic')
    
    gemini_registry = ClientRegistry()
    gemini_registry.add_llm_client('Gemini', 'vertex-ai', {
        'model': 'gemini-2.5-flash',
        'location': 'us-central1',
        'credentials': os.environ.get('GOOGLE_APPLICATION_CREDENTIALS_CONTENT')
    })
    gemini_registry.set_primary('Gemini')
    
    # Create tasks
    tasks = [
        asyncio.create_task(
            b.ClassifyMessage(message, baml_options={
                'client_registry': openai_registry,
                'abort_controller': controllers[0]
            })
        ),
        asyncio.create_task(
            b.ClassifyMessage(message, baml_options={
                'client_registry': anthropic_registry,
                'abort_controller': controllers[1]
            })
        ),
        asyncio.create_task(
            b.ClassifyMessage(message, baml_options={
                'client_registry': gemini_registry,
                'abort_controller': controllers[2]
            })
        )
    ]
    
    try:
        # Wait for first to complete
        done, pending = await asyncio.wait(
            tasks,
            return_when=asyncio.FIRST_COMPLETED
        )
        
        # Cancel the others
        for controller in controllers:
            controller.abort()
        
        # Cancel pending tasks
        for task in pending:
            task.cancel()
        
        # Get result from completed task
        result = done.pop().result()
        return result
        
    except Exception as e:
        # Cancel all on error
        for controller in controllers:
            controller.abort()
        for task in tasks:
            if not task.done():
                task.cancel()
        raise
```

```go
import (
    "context"
    "fmt"
    "os"
    "sync"
    
    b "example.com/myproject/baml_client"
)

func fastestProviderWins(message string) (interface{}, error) {
    // Create separate contexts for each provider
    ctx1, cancel1 := context.WithCancel(context.Background())
    ctx2, cancel2 := context.WithCancel(context.Background())
    ctx3, cancel3 := context.WithCancel(context.Background())
    
    // Defer cleanup
    defer cancel1()
    defer cancel2()
    defer cancel3()
    
    // Create client registries for each provider
    openaiRegistry, _ := b.NewClientRegistry()
    openaiRegistry.AddLlmClient("OpenAI", "openai", map[string]interface{}{
        "model": "gpt-5-mini",
        "api_key": os.Getenv("OPENAI_API_KEY"),
    })
    openaiRegistry.SetPrimary("OpenAI")
    
    anthropicRegistry, _ := b.NewClientRegistry()
    anthropicRegistry.AddLlmClient("Anthropic", "anthropic", map[string]interface{}{
        "model": "claude-3-5-haiku-20241022",
        "api_key": os.Getenv("ANTHROPIC_API_KEY"),
    })
    anthropicRegistry.SetPrimary("Anthropic")
    
    geminiRegistry, _ := b.NewClientRegistry()
    geminiRegistry.AddLlmClient("Gemini", "vertex-ai", map[string]interface{}{
        "model": "gemini-2.5-flash",
        "location": "us-central1",
        "credentials": os.Getenv("GOOGLE_APPLICATION_CREDENTIALS"),
    })
    geminiRegistry.SetPrimary("Gemini")
    
    type result struct {
        data interface{}
        err  error
        provider string
    }
    
    resultChan := make(chan result, 3)
    var wg sync.WaitGroup
    wg.Add(3)
    
    // Launch goroutines for each provider
    go func() {
        defer wg.Done()
        data, err := b.ClassifyMessage(ctx1, message, b.WithClientRegistry(openaiRegistry))
        resultChan <- result{data: data, err: err, provider: "OpenAI"}
    }()
    
    go func() {
        defer wg.Done()
        data, err := b.ClassifyMessage(ctx2, message, b.WithClientRegistry(anthropicRegistry))
        resultChan <- result{data: data, err: err, provider: "Anthropic"}
    }()
    
    go func() {
        defer wg.Done()
        data, err := b.ClassifyMessage(ctx3, message, b.WithClientRegistry(geminiRegistry))
        resultChan <- result{data: data, err: err, provider: "Gemini"}
    }()
    
    // Wait for first successful result
    go func() {
        wg.Wait()
        close(resultChan)
    }()
    
    // Get first result and cancel others
    firstResult := <-resultChan
    
    // Cancel all contexts to stop other operations
    cancel1()
    cancel2()
    cancel3()
    
    if firstResult.err != nil {
        // If first failed, try to get another result
        select {
        case secondResult := <-resultChan:
            if secondResult.err == nil {
                fmt.Printf("Provider %s won\n", secondResult.provider)
                return secondResult.data, nil
            }
        default:
            // No more results
        }
        return nil, firstResult.err
    }
    
    fmt.Printf("Provider %s won\n", firstResult.provider)
    return firstResult.data, nil
}
```

```rust
use baml::{CancellationToken, ClientRegistry};
use myproject::baml_client::sync_client::B;
use std::collections::HashMap;
use std::sync::mpsc;
use std::thread;

fn fastest_provider_wins(message: &str) -> Result<String, String> {
    let (tx, rx) = mpsc::channel();
    let cancel_token = CancellationToken::new();

    // OpenAI
    let msg = message.to_string();
    let tx1 = tx.clone();
    let t1 = cancel_token.clone();
    thread::spawn(move || {
        let mut registry = ClientRegistry::new();
        let mut options = HashMap::new();
        options.insert("model".to_string(), serde_json::json!("gpt-5-mini"));
        registry.add_llm_client("OpenAI", "openai", options);
        registry.set_primary_client("OpenAI");
        if let Ok(result) = B.ClassifyMessage
            .with_client_registry(&registry)
            .with_cancellation_token(Some(t1))
            .call(&msg)
        {
            let _ = tx1.send(("OpenAI", result));
        }
    });

    // Anthropic
    let msg = message.to_string();
    let tx2 = tx.clone();
    let t2 = cancel_token.clone();
    thread::spawn(move || {
        let mut registry = ClientRegistry::new();
        let mut options = HashMap::new();
        options.insert("model".to_string(), serde_json::json!("claude-3-5-haiku-20241022"));
        registry.add_llm_client("Anthropic", "anthropic", options);
        registry.set_primary_client("Anthropic");
        if let Ok(result) = B.ClassifyMessage
            .with_client_registry(&registry)
            .with_cancellation_token(Some(t2))
            .call(&msg)
        {
            let _ = tx2.send(("Anthropic", result));
        }
    });

    drop(tx);

    // Wait for the first result and cancel slower operations
    match rx.recv() {
        Ok((provider, result)) => {
            cancel_token.cancel();
            println!("Provider {} won", provider);
            Ok(format!("{:?}", result))
        }
        Err(_) => Err("All providers failed".to_string()),
    }
}
```

### Implementing Timeouts for Parallel Operations

Set automatic timeouts to prevent operations from running indefinitely:

```typescript
async function classifyWithTimeout(messages: string[], timeoutMs: number = 5000) {
  const controller = new AbortController()
  
  // Set timeout for all operations
  const timeoutId = setTimeout(() => {
    controller.abort()
  }, timeoutMs)

  try {
    const promises = messages.map(msg => 
      b.ClassifyMessage(msg, { abortController: controller })
    )
    
    const results = await Promise.all(promises)
    clearTimeout(timeoutId)
    return results
  } catch (error) {
    clearTimeout(timeoutId)
    if (error.name === 'BamlAbortError') {
      throw new Error(`Operations timed out after ${timeoutMs}ms`)
    }
    throw error
  }
}
```

```python
import asyncio
from baml_py import AbortController

async def classify_with_timeout(messages: list[str], timeout_seconds: float = 5):
    controller = AbortController()
    
    async def timeout_task():
        await asyncio.sleep(timeout_seconds)
        controller.abort()
    
    # Start timeout
    timeout = asyncio.create_task(timeout_task())
    
    try:
        tasks = [
            b.ClassifyMessage(msg, baml_options={"abort_controller": controller})
            for msg in messages
        ]
        
        results = await asyncio.gather(*tasks)
        timeout.cancel()
        return results
    except BamlAbortError:
        raise TimeoutError(f"Operations timed out after {timeout_seconds}s")
    except Exception:
        timeout.cancel()
        raise
```

```go
func classifyWithTimeout(messages []string, timeout time.Duration) ([]types.Category, error) {
    ctx, cancel := context.WithTimeout(context.Background(), timeout)
    defer cancel()

    results := make([]types.Category, len(messages))
    errors := make([]error, len(messages))
    var wg sync.WaitGroup

    for i, msg := range messages {
        wg.Add(1)
        go func(index int, message string) {
            defer wg.Done()
            result, err := b.ClassifyMessage(ctx, message)
            results[index] = result
            errors[index] = err
        }(i, msg)
    }

    wg.Wait()

    // Check for errors
    for i, err := range errors {
        if err != nil {
            if errors.Is(err, context.DeadlineExceeded) {
                return nil, fmt.Errorf("operations timed out after %v", timeout)
            }
            return nil, fmt.Errorf("message %d failed: %w", i, err)
        }
    }

    return results, nil
}
```

```rust
use baml::CancellationToken;
use myproject::baml_client::sync_client::B;
use myproject::baml_client::types::*;
use std::thread;
use std::time::Duration;

fn classify_with_timeout(
    messages: &[&str],
    timeout: Duration,
) -> Result<Vec<Category>, String> {
    let token = CancellationToken::new_with_timeout(timeout);

    let handles: Vec<_> = messages
        .iter()
        .map(|msg| {
            let t = token.clone();
            let msg = msg.to_string();
            thread::spawn(move || {
                B.ClassifyMessage
                    .with_cancellation_token(Some(t))
                    .call(&msg)
            })
        })
        .collect();

    let mut results = Vec::new();
    for handle in handles {
        match handle.join().unwrap() {
            Ok(result) => results.push(result),
            Err(e) => return Err(format!("Classification failed: {}", e)),
        }
    }

    Ok(results)
}
```

### Batching with Cancellation Support

Process items in batches with the ability to cancel remaining batches:

```typescript
async function processBatches<T, R>(
  items: T[],
  batchSize: number,
  processor: (item: T, controller: AbortController) => Promise<R>
): Promise<R[]> {
  const results: R[] = []
  const masterController = new AbortController()
  
  try {
    for (let i = 0; i < items.length; i += batchSize) {
      const batch = items.slice(i, i + batchSize)
      
      // Check if we should stop
      if (masterController.signal.aborted) {
        throw new Error('Batch processing cancelled')
      }
      
      // Process batch in parallel
      const batchPromises = batch.map(item => 
        processor(item, masterController)
      )
      
      const batchResults = await Promise.all(batchPromises)
      results.push(...batchResults)
      
      console.log(`Completed batch ${Math.floor(i / batchSize) + 1}`)
    }
    
    return results
  } catch (error) {
    masterController.abort()
    throw error
  }
}

// Usage
const messages = ['message1', 'message2', 'message3', /*...*/]
const results = await processBatches(
  messages,
  5, // batch size
  (msg, controller) => b.ClassifyMessage(msg, { abortController: controller })
)
```

```python
async def process_batches(items, batch_size, processor):
    results = []
    master_controller = AbortController()
    
    try:
        for i in range(0, len(items), batch_size):
            batch = items[i:i + batch_size]
            
            # Check if we should stop
            if master_controller.aborted:
                raise Exception('Batch processing cancelled')
            
            # Process batch in parallel
            batch_tasks = [
                processor(item, master_controller)
                for item in batch
            ]
            
            batch_results = await asyncio.gather(*batch_tasks)
            results.extend(batch_results)
            
            print(f"Completed batch {i // batch_size + 1}")
        
        return results
    except Exception as e:
        master_controller.abort()
        raise

# Usage
messages = ['message1', 'message2', 'message3']
results = await process_batches(
    messages,
    5,  # batch size
    lambda msg, ctrl: b.ClassifyMessage(msg, baml_options={"abort_controller": ctrl})
)
```

For basic abort controller usage and error handling, see the [Abort Controllers guide](/guide/baml-basics/abort-signal).