AsyncBatcher is a generic, asynchronous batch processor for Python that efficiently groups incoming items into batches and processes them asynchronously. It is designed for scenarios where multiple requests or tasks need to be handled in batches to improve efficiency and throughput.
- Asynchronous processing: Uses asyncio for non-blocking execution.
- Batching mechanism: Groups items into batches based on size or time constraints.
- Concurrency control: Limits the number of concurrent batch executions.
- Custom processing logic: Users must implement the process_batchmethod to define batch behavior.
- Queue management: Uses an asyncio.Queueto manage incoming items.
- Error handling: Ensures robust error reporting and handling.
- Users call process(item), which adds the item to an internal queue.
- A Futureobject is returned immediately and the result is awaited asynchronously.
- A background task (run()) continuously monitors the queue.
- Items are collected into batches based on:
- max_batch_size: Maximum items per batch.
- max_queue_time: Maximum time an item can wait before being processed.
 
- Once a batch is ready, it is passed to the processing function.
- If process_batchis asynchronous, it is awaited directly.
- If process_batchis synchronous, it runs inside anExecutor.
- Each item’s future is resolved with the corresponding processed result.
- If concurrency > 0, a semaphore ensures that only a limited number of batches are processed simultaneously.
- Otherwise, all batches run concurrently.
- Calling stop(force=True)cancels all ongoing tasks.
- Calling stop(force=False)waits for pending items to be processed before shutting down.
sequenceDiagram
    participant User
    participant AsyncBatcher
    participant Queue as asyncio.Queue
    participant RunLoop
    participant Semaphore
    participant BatchProcessor
    User->>AsyncBatcher: process(item)
    activate AsyncBatcher
    AsyncBatcher->>Queue: put(QueueItem(item, future))
    AsyncBatcher-->>User: returns future
    deactivate AsyncBatcher
    Note over AsyncBatcher: Starts RunLoop on first process()
    loop Run Loop (run() method)
        RunLoop->>Queue: Collect items (max_batch_size/max_queue_time)
        activate Queue
        Queue-->>RunLoop: Batch [QueueItem1, QueueItem2...]
        deactivate Queue
        alt Concurrency Limited (concurrency > 0)
            RunLoop->>Semaphore: acquire()
            activate Semaphore
            Semaphore-->>RunLoop: acquired
            deactivate Semaphore
        end
        RunLoop->>BatchProcessor: create_task(_batch_run(batch))
        activate BatchProcessor
        alt Async process_batch
            BatchProcessor->>AsyncBatcher: await process_batch(batch)
        else Sync process_batch
            BatchProcessor->>Executor: run_in_executor(process_batch)
        end
        AsyncBatcher-->>BatchProcessor: results [S1, S2...]
        BatchProcessor->>QueueItem1.future: set_result(S1)
        BatchProcessor->>QueueItem2.future: set_result(S2)
        deactivate BatchProcessor
        alt Concurrency Limited
            RunLoop->>Semaphore: release()
        end
    end
    Note over User, BatchProcessor: User's await future gets resolved
    To use the library, you need to install the package in your environment. You can install the package using pip:
pip install async-batcherThen, you can create a new AsyncBatcher class by implementing the process_batch method:
import asyncio
import logging
from async_batcher.batcher import AsyncBatcher
class MyBatchProcessor(AsyncBatcher[int, int]):
    async def process_batch(self, batch: list[int]) -> list[int]:
        await asyncio.sleep(1)  # Simulate processing delay
        return [x * 2 for x in batch]  # Example: Doubling each item
async def main():
    batcher = MyBatchProcessor(max_batch_size=5, max_queue_time=2.0, concurrency=2)
    results = await asyncio.gather(*[batcher.process(i) for i in range(10)])
    print(results)  # Output: [0, 2, 4, 6, 8, 10, 12, 14, 16, 18]
    await batcher.stop()
# Set logging level to DEBUG if you want to see more details and understand the flow
logging.basicConfig(level=logging.DEBUG)
asyncio.run(main())The benchmark is available in the BENCHMARK.md file.
The AsyncBatcher library is ideal for applications that need to efficiently handle asynchronous requests in batches, such as:
- Batch-processing requests to optimize inference performance (e.g., TensorFlow, PyTorch, Scikit-learn).
- Inserting multiple records in a single query to improve I/O efficiency and reduce costs (e.g., PostgreSQL, MySQL, AWS DynamoDB).
- Sending multiple messages in a single API call to reduce latency and costs (e.g., Kafka, RabbitMQ, AWS SQS, AWS SNS).
- Aggregating requests to comply with API rate limits (e.g., GitHub API, Twitter API, OpenAI API).
- Implement process_batchaccording to your needs.
- Ensure max_batch_sizeandmax_queue_timeare configured based on performance requirements.
- Handle exceptions inside process_batchto avoid failures affecting other tasks.