Concurrent function calls

We’ll use function ClassifyMessage(input: string) -> Category for our example:

1enum Category {
2 Refund
3 CancelOrder
4 TechnicalSupport
5 AccountIssue
6 Question
7}
8
9function ClassifyMessage(input: string) -> Category {
10 client GPT4o
11 prompt #"
12 Classify the following INPUT into ONE
13 of the following categories:
14
15 INPUT: {{ input }}
16
17 {{ ctx.output_format }}
18
19 Response:
20 "#
21}

You can make concurrent b.ClassifyMessage() calls like so:

main.py
1import asyncio
2
3from baml_client.async_client import b
4from baml_client.types import Category
5
6async def main():
7 await asyncio.gather(
8 b.ClassifyMessage("I want to cancel my order"),
9 b.ClassifyMessage("I want a refund")
10 )
11
12if __name__ == '__main__':
13 asyncio.run(main())

Cancelling Parallel Operations

When running multiple operations in parallel, you can use abort controllers to cancel them all at once or individually.

Cancel All Operations

Use a single abort controller to cancel all parallel operations:

1import { b } from './baml_client'
2
3const controller = new AbortController()
4
5// Start multiple operations with the same controller
6const promises = [
7 b.ClassifyMessage('I want to cancel my order', { abortController: controller }),
8 b.ClassifyMessage('I want a refund', { abortController: controller }),
9 b.ClassifyMessage('Is my package shipped?', { abortController: controller })
10]
11
12// Cancel all operations after 2 seconds
13setTimeout(() => {
14 controller.abort()
15 console.log('All operations cancelled')
16}, 2000)
17
18try {
19 const results = await Promise.all(promises)
20 console.log('All completed:', results)
21} catch (error) {
22 if (error.name === 'BamlAbortError') {
23 console.log('Operations were cancelled')
24 }
25}

Cancel Individual Operations

Use separate controllers to cancel operations independently:

1const controllers = [
2 new AbortController(),
3 new AbortController(),
4 new AbortController()
5]
6
7const promises = [
8 b.ClassifyMessage('I want to cancel my order', { abortController: controllers[0] }),
9 b.ClassifyMessage('I want a refund', { abortController: controllers[1] }),
10 b.ClassifyMessage('Is my package shipped?', { abortController: controllers[2] })
11]
12
13// Cancel only the second operation
14controllers[1].abort()
15
16const results = await Promise.allSettled(promises)
17results.forEach((result, index) => {
18 if (result.status === 'fulfilled') {
19 console.log(`Operation ${index} completed:`, result.value)
20 } else {
21 console.log(`Operation ${index} failed:`, result.reason.message)
22 }
23})

Fastest Request Wins

Race multiple LLM providers and cancel slower ones when the fastest completes. This pattern is useful for optimizing latency by using whichever provider responds first.

1import { ClientRegistry } from '@boundaryml/baml'
2
3async function fastestProviderWins(message: string) {
4 const controllers = [
5 new AbortController(),
6 new AbortController(),
7 new AbortController()
8 ]
9
10 // Create separate client registries for each provider
11 const openaiRegistry = new ClientRegistry()
12 openaiRegistry.addLlmClient('OpenAI', 'openai', {
13 model: 'gpt-4o-mini',
14 api_key: process.env.OPENAI_API_KEY
15 })
16 openaiRegistry.setPrimary('OpenAI')
17
18 const anthropicRegistry = new ClientRegistry()
19 anthropicRegistry.addLlmClient('Anthropic', 'anthropic', {
20 model: 'claude-3-haiku-20240307',
21 api_key: process.env.ANTHROPIC_API_KEY
22 })
23 anthropicRegistry.setPrimary('Anthropic')
24
25 const geminiRegistry = new ClientRegistry()
26 geminiRegistry.addLlmClient('Gemini', 'vertex-ai', {
27 model: 'gemini-1.5-flash',
28 location: 'us-central1',
29 credentials: process.env.GOOGLE_APPLICATION_CREDENTIALS
30 })
31 geminiRegistry.setPrimary('Gemini')
32
33 const promises = [
34 b.ClassifyMessage(message, {
35 clientRegistry: openaiRegistry,
36 abortController: controllers[0]
37 }),
38 b.ClassifyMessage(message, {
39 clientRegistry: anthropicRegistry,
40 abortController: controllers[1]
41 }),
42 b.ClassifyMessage(message, {
43 clientRegistry: geminiRegistry,
44 abortController: controllers[2]
45 })
46 ]
47
48 try {
49 // Wait for the first to complete
50 const result = await Promise.race(promises)
51
52 // Cancel the others
53 controllers.forEach(c => c.abort())
54
55 return result
56 } catch (error) {
57 // All failed - cancel any still running
58 controllers.forEach(c => c.abort())
59 throw error
60 }
61}

Implementing Timeouts for Parallel Operations

Set automatic timeouts to prevent operations from running indefinitely:

1async function classifyWithTimeout(messages: string[], timeoutMs: number = 5000) {
2 const controller = new AbortController()
3
4 // Set timeout for all operations
5 const timeoutId = setTimeout(() => {
6 controller.abort()
7 }, timeoutMs)
8
9 try {
10 const promises = messages.map(msg =>
11 b.ClassifyMessage(msg, { abortController: controller })
12 )
13
14 const results = await Promise.all(promises)
15 clearTimeout(timeoutId)
16 return results
17 } catch (error) {
18 clearTimeout(timeoutId)
19 if (error.name === 'BamlAbortError') {
20 throw new Error(`Operations timed out after ${timeoutMs}ms`)
21 }
22 throw error
23 }
24}

Batching with Cancellation Support

Process items in batches with the ability to cancel remaining batches:

1async function processBatches<T, R>(
2 items: T[],
3 batchSize: number,
4 processor: (item: T, controller: AbortController) => Promise<R>
5): Promise<R[]> {
6 const results: R[] = []
7 const masterController = new AbortController()
8
9 try {
10 for (let i = 0; i < items.length; i += batchSize) {
11 const batch = items.slice(i, i + batchSize)
12
13 // Check if we should stop
14 if (masterController.signal.aborted) {
15 throw new Error('Batch processing cancelled')
16 }
17
18 // Process batch in parallel
19 const batchPromises = batch.map(item =>
20 processor(item, masterController)
21 )
22
23 const batchResults = await Promise.all(batchPromises)
24 results.push(...batchResults)
25
26 console.log(`Completed batch ${Math.floor(i / batchSize) + 1}`)
27 }
28
29 return results
30 } catch (error) {
31 masterController.abort()
32 throw error
33 }
34}
35
36// Usage
37const messages = ['message1', 'message2', 'message3', /*...*/]
38const results = await processBatches(
39 messages,
40 5, // batch size
41 (msg, controller) => b.ClassifyMessage(msg, { abortController: controller })
42)

For basic abort controller usage and error handling, see the Abort Controllers guide.