I've built an open source streaming library for async pipelines
1 day ago
6
A Python library for efficient asynchronous pipeline processing with real-time streaming capabilities.
This library implements streaming of intermediate results between pipeline stages, allowing subsequent tasks to begin processing as soon as any individual item completes a stage, while still preserving original input ordering when needed by default (with option for streaming ASAP and wait for all to be completed).
The fundamental challenge:
# This pattern is everywhere, but streaming is hard:# Especially if different chunks can take different time to processIndividualProcessing → BatchProcessing → IndividualProcessing
(fast) (slower) (fast)
# Examples:# • Fetch RAG results → Re-Rank → Enrich ( → stream to UI)# • Compress → ML batch inference → Apply# • Validate → Batch database ops → Send notifications
Early Results Delivery: Get initial results quickly without waiting for all items to finish processing
Order Preservation: Stream results in original order while maintaining maximum parallelism
Optimal Resource Usage: Process items concurrently through all pipeline stages
Flexible Consumption: Stream results as they're ready or collect them all at completion
API Response Streaming: Show initial results to users immediately while processing continues
Batch Processing with Previews: Process large datasets in batches but deliver completed batch results as they finish
Long-Running Pipeline Visibility: Monitor progress of multi-stage transformations in real-time
Efficient Resource Management: Maximize throughput for IO-bound workloads by ensuring all stages remain active
Define atomic tasks that process single items or batches of items
Chain tasks together to create powerful asynchronous pipelines
Flexible output handling: consume results as an async stream or collect them all at once
Robust error handling: Configure retry logic and error recovery strategies at the task level
Designed for extensibility
pip install conveyor-streaming
importasynciofromconveyorimportsingle_task, batch_task# Define some tasks@single_taskasyncdefmultiply_by_two(x: int) ->int:
print(f"Multiplying {x} by 2")
awaitasyncio.sleep(0.01) # simulate io-bound workreturnx*2@batch_task(max_size=3)asyncdefsum_batch(batch: list[int]) ->int:
print(f"Summing batch: {batch}")
awaitasyncio.sleep(0.05) # simulate io-bound works=sum(batch)
print(f"Sum of batch {batch} is {s}")
returns@single_taskasyncdefadd_ten(x: int) ->int:
print(f"Adding 10 to {x}")
awaitasyncio.sleep(0.01)
returnx+10asyncdefmain():
# Create a pipelinepipeline=multiply_by_two|sum_batch|add_tendata_source= [1, 2, 3, 4, 5, 6, 7]
# Option 1: Process results as they comeprint("Streaming results as they come...")
stream=pipeline(data_source)
asyncforiteminstream:
print(f"Streamed item: {item}")
# Option 2: Collect all resultsprint("\nCollecting all results...")
# Re-create stream for fresh iterationresults=awaitpipeline(data_source).collect()
print(f"Collected results: {results}")
# Expected:# 1*2=2, 2*2=4, 3*2=6 -> sum_batch([2,4,6]) = 12 -> add_ten(12) = 22# 4*2=8, 5*2=10, 6*2=12 -> sum_batch([8,10,12]) = 30 -> add_ten(30) = 40# 7*2=14 -> sum_batch([14]) = 14 -> add_ten(14) = 24# Results: [22, 40, 24]if__name__=="__main__":
asyncio.run(main())
Stream Processing vs. Collecting Results
Conveyor offers three main approaches to consuming pipeline results, depending on your specific use case:
Option 1A: Ordered Streaming (Default)
Use async iteration when you want to handle results as they become available while preserving the original input order:
# Process results as they're available, in original input orderasyncforresultinpipeline(data_source):
print(f"Got a result in original order: {result}")
# Process each result immediately
🚀 Ordered Streaming Performance Benefits:
Conveyor's ordered processing uses an ordered queue approach that enables true streaming while preserving order:
Early yielding: Results are yielded as soon as they can be yielded in order
Maximum parallelism: All items process concurrently, not sequentially
Pipeline efficiency: Subsequent stages can start processing early results immediately
Timeline Example (Optimal Scenario):
Input tasks with processing times: [0.1s, 0.2s, 0.3s, 0.4s, 0.5s, 2.0s, 0.7s, 0.8s]
Traditional approach (waits for all):
Time: 2.0s → ALL results yielded at once
Conveyor's streaming approach:
Time: 0.1s → yield result 1 ⚡ IMMEDIATE
Time: 0.2s → yield result 2 ⚡ IMMEDIATE
Time: 0.3s → yield result 3 ⚡ IMMEDIATE
Time: 0.4s → yield result 4 ⚡ IMMEDIATE
Time: 0.5s → yield result 5 ⚡ IMMEDIATE
Time: 2.0s → yield results 6,7,8 (7,8 were buffered, waiting for 6)
Benefits: 🎯 First 5 results available 75% faster!
Order Preservation with Outliers:
Fast tasks stream immediately when they're next in order
Slow outliers block subsequent results to maintain order
Once outliers complete, buffered results yield immediately
Never performs worse than traditional batch processing
Option 1B: Unordered Streaming (as_completed)
When you care about processing results as soon as possible, regardless of their original order:
# Process results in the order they complete, ignoring original input orderasyncforresultinpipeline.as_completed(data_source):
print(f"Got a completed result (fastest first): {result}")
# Process results in completion order
Alternatively, you can create a stream with a specific execution mode:
# Create a stream with as_completed execution modestream=pipeline.with_execution_mode("as_completed")(data_source)
asyncforresultinstream:
print(f"Got result in completion order: {result}")
⚡ Unordered Streaming Benefits:
Maximum responsiveness: Get results immediately as they complete
No blocking: Fast results are never delayed by slow outliers
Optimized for speed: Ideal when result order doesn't matter
Real-time processing: Perfect for displaying immediate progress
Option 2: Collecting All Results
Use collect() when you need all results available at once:
# Wait for all results to be processedresults=awaitpipeline(data_source).collect()
print(f"All results are ready: {results}")
# Or collect results in completion orderresults=awaitpipeline.as_completed(data_source).collect()
print(f"All results collected in completion order: {results}")
Pipeline Flow Diagram:
graph TD
A["Input Data: 1, 2, 3, 4, 5, 6, 7"] --> B(multiply_by_two - single_task);
B --> C["Processed Items: 2, 4, 6, 8, 10, 12, 14"];
C --> D(sum_batch - batch_task, max_size=3);
D -- "batch [2,4,6]" --> E((12));
D -- "batch [8,10,12]" --> F((30));
D -- "batch [14]" --> G((14));
E --> H(add_ten - single_task);
F --> H;
G --> H;
H --> I["Final Results: 22, 40, 24"];
Loading
Understanding Task Types:
@single_task: Processes one item at a time.
@batch_task(max_size=N, min_size=M): Collects items into a batch (up to max_size) and processes the batch. If the remaining items are fewer than max_size but at least min_size, they are processed as a final batch.
Using Side Inputs with with_inputs
Handling Multiple Input Sources
When building pipelines, you might need to incorporate data from multiple sources. Conveyor Streaming offers two approaches:
Side Inputs Diagram :
graph LR
DataSource["data_source <br/>(primary pipeline arg)"] --> Input[Main Input Item]
subgraph Pipeline with Side Input
Input --> multiply_by_two_1
multiply_by_two_1 --> add_value_from_side_input
add_value_from_side_input --> multiply_by_two_2
end
multiply_by_two_2 --> OutputItem[Processed Stream]
SideInputSrc["Side Input Source<br>(value, coroutine, AsyncStream)"] --> add_value_from_side_input
Loading
1. Direct Stream Integration in Tasks
You can create tasks that explicitly process data from multiple sources:
importasynciofromconveyorimportsingle_task, batch_task, Pipeline@single_taskasyncdefmultiply_by_two(x: int) ->int:
returnx*2@batch_task(max_size=3)asyncdefsum_batch(batch: list[int]) ->int:
returnsum(batch)
# A task that waits for another data source@single_taskasyncdefwait_for_other_source(x: int, other_stream) ->int:
other_value=awaitanext(other_stream)
print(f"Combined main value {x} with other value {other_value}")
returnx+other_valueasyncdefgenerate_secondary_data():
foriinrange(10, 40, 10):
yieldiawaitasyncio.sleep(0.02)
asyncdefmain_multiple_sources():
# Create the secondary streamsecondary_stream=generate_secondary_data()
# Create pipeline with the custom taskpipeline=multiply_by_two|wait_for_other_source(secondary_stream) |multiply_by_twodata= [1, 2, 3]
results=awaitpipeline(data).collect()
print(f"Results: {results}")
# Expected flow:# 1. multiply_by_two: [2,4,6]# 3. wait_task combines with [10, 20, 30]: [12, 24, 36] # 4. multiply_by_two: [24, 48, 72]
This approach uses a lambda to capture the secondary stream in the pipeline definition.
2. Declarative Side Inputs with with_inputs
Alternatively, you can use the more elegant with_inputs method to specify additional inputs in a declarative way, as shown in the next section.
Tasks can be configured to accept additional "side inputs" that are resolved when the task is executed. This is useful for incorporating data that isn't part of the main pipeline flow or needs to be fetched/calculated at the time of processing.
The with_inputs method on a task allows you to specify these side inputs. They can be regular values, coroutines, or AsyncStream instances (from which the first item will be taken).
Conveyor provides comprehensive error handling capabilities that can be configured directly in task decorators. You can control retry behavior and specify what should happen when errors occur.
on_error: What to do when an error occurs after all retry attempts
"fail" (default): Raise the error and stop the pipeline
"skip_item": Skip the failing item and continue processing
"skip_batch": For batch tasks, skip the entire batch if any item fails
Retry Configuration: Independent retry logic with exponential backoff
retry_attempts: Number of retry attempts (default: 1, no retry)
retry_delay: Base delay between retries in seconds
retry_exponential_backoff: Whether to use exponential backoff (default: True)
retry_max_delay: Maximum delay between retries
Custom Error Handlers: For complex error handling scenarios
Basic Error Handling Example
importasyncioimportrandomfromconveyorimportsingle_task, batch_task, ErrorHandler# Task that skips failing items@single_task(on_error="skip_item")asyncdefunreliable_task(x: int) ->int:
awaitasyncio.sleep(0.01)
ifrandom.random() <0.3: # 30% failure rateraiseValueError(f"Random failure processing {x}")
returnx*2# Task with retry logic@single_task(retry_attempts=3,retry_delay=0.1,retry_exponential_backoff=True,on_error="skip_item")asyncdeftask_with_retry(x: int) ->int:
ifrandom.random() <0.5: # 50% failure rateraiseValueError(f"Temporary failure processing {x}")
returnx+100# Batch task that skips entire batch on any failure@batch_task(max_size=3, on_error="skip_batch")asyncdefsensitive_batch_task(batch: list[int]) ->int:
ifany(x<0forxinbatch):
raiseValueError("Negative numbers not allowed in batch")
returnsum(batch)
# Custom error handler for business logicclassBusinessErrorHandler(ErrorHandler):
asyncdefhandle_error(self, error: Exception, item, task_name: str, attempt: int) ->tuple[bool, any]:
ifisinstance(error, ValueError) and"business rule"instr(error):
print(f"Business rule violation in {task_name}: {error}")
returnTrue, -1# Continue with sentinel valuereturnFalse, None# Re-raise other errors@single_task(error_handler=BusinessErrorHandler())asyncdefbusiness_task(x: int) ->int:
ifx%7==0:
raiseValueError("business rule: multiples of 7 not allowed")
returnx*5asyncdefmain_error_handling():
data= [1, 2, -1, 3, 4, 5, 6, 7, 8]
print("1. Skip failing items:")
pipeline1=unreliable_task|sensitive_batch_taskresults1=awaitpipeline1(data).collect()
print(f"Results: {results1}")
print("\n2. Retry with backoff:")
pipeline2=task_with_retryresults2=awaitpipeline2([1, 2, 3]).collect()
print(f"Results: {results2}")
print("\n3. Custom error handler:")
pipeline3=business_taskresults3=awaitpipeline3([1, 2, 7, 14, 21]).collect()
print(f"Results: {results3}")
if__name__=="__main__":
asyncio.run(main_error_handling())
While there are several libraries in adjacent spaces, none directly address the specific challenge of streaming results from mixed single/batch task pipelines. Here's how Conveyor compares to existing solutions:
Workflow Orchestration Platforms
Prefect, Airflow, Dagster
What they do: Orchestrate complex DAG-based workflows with scheduling, monitoring, and error recovery
Gap: Designed for batch-oriented ETL workflows, not real-time streaming of intermediate results
Use case: Great for scheduled data pipelines, but won't help with streaming progressive results to users
When to use instead: Large-scale ETL orchestration with complex dependencies and scheduling requirements
Distributed Computing Frameworks
Ray, Dask
What they do: Distributed computing with parallel task execution across clusters
Gap: Much heavier weight, focused on distributed computing rather than pipeline streaming coordination
Use case: Large-scale distributed processing, not single-machine streaming pipelines
When to use instead: When you need to scale across multiple machines or have very large datasets
Celery, RQ, Dramatiq
What they do: Distributed task queues for background job processing
Gap: Focused on job distribution and execution, not streaming intermediate results between pipeline stages
Use case: Background job processing, not real-time result streaming
When to use instead: When you need distributed task execution across multiple workers
Stream Processing Frameworks
Apache Kafka Streams, Apache Beam
What they do: Event-driven stream processing for real-time data pipelines
Gap: Different paradigm (event streaming), not async Python task pipelines
Use case: Event-driven architectures with message queues
When to use instead: Event-driven systems with external message brokers