AsyncDataPipeline is a high-performance asynchronous data processing pipeline specifically designed for scenarios requiring concurrent data collection and processing. It provides a simple and easy-to-use interface, supports custom data collection and processing logic, and delivers excellent performance.
- Generic support for processing any type of data
- Configurable number of concurrent worker goroutines
- Automatic goroutine lifecycle management
- Elegant error handling mechanism
- Support for timeout and cancellation operations
- Automatic idle state detection and shutdown
go get github.com/rushairer/asyncdatapipeline
package main
import (
"context"
"fmt"
"time"
"github.com/rushairer/asyncdatapipeline"
)
type Data struct {
ID int
Name string
}
func main() {
// Create configuration
config := &asyncdatapipeline.AsyncDataPipelineConfig{
MaxWorkers: 4,
IdleTime: time.Second * 5,
CollectTimeout: time.Second * 5,
}
// Define collection function
collectFunc := func(ctx context.Context) ([]Data, error) {
// Simulate data collection
return []Data{{ID: 1, Name: "test"}}, nil
}
// Define processing function
processFunc := func(ctx context.Context, data []Data) error {
// Process data
for _, d := range data {
fmt.Printf("Processing data: %+v\n", d)
}
return nil
}
// Create pipeline
pipeline, err := asyncdatapipeline.NewAsyncDataPipeline(config, collectFunc, processFunc)
if err != nil {
panic(err)
}
// Execute pipeline
ctx := context.Background()
reason, errors := pipeline.Perform(ctx)
if len(errors) > 0 {
fmt.Printf("Error: %v\n", errors[0])
}
fmt.Printf("Pipeline closed: %v\n", reason)
}
graph TD
A[Start] --> B[Initialize Pipeline]
B --> C[Start Collection Goroutine]
B --> D[Start Processing Goroutine]
C --> E{Collect Data}
E -->|Has Data| F[Send to Channel]
E -->|Error| G[Record Error]
F --> H{Process Data}
H -->|Success| I[Continue Next Batch]
H -->|Error| J[Record Error]
I --> E
G --> K[End Pipeline]
J --> K
K --> L[Return Results]
Parameter | Type | Description | Default |
---|---|---|---|
MaxWorkers | int | Number of concurrent worker goroutines | Required |
IdleTime | time.Duration | Idle timeout duration | Required |
CollectTimeout | time.Duration | Timeout duration for data collection | Required |
Performance under different concurrency levels with standard configuration (4 CPU cores):
Concurrency | Processing Speed (ops/sec) | Memory Usage (MB) |
---|---|---|
1 | 1000 | 10 |
2 | 1800 | 15 |
4 | 3000 | 25 |
8 | 4500 | 40 |
16 | 5500 | 70 |
AsyncDataPipeline provides real-time performance metrics monitoring capabilities. You can subscribe to metrics updates to monitor the pipeline's performance in real-time.
Metric | Type | Description |
---|---|---|
TotalDuration | time.Duration | Total running time of the pipeline |
ProcessingDuration | time.Duration | Time spent on data processing |
IdleDuration | time.Duration | Time spent in idle state |
BatchCount | int64 | Number of data batches processed |
ItemCount | int64 | Total number of data items processed |
IdleRatio | float64 | Ratio of idle time to total time |
func main() {
// ... pipeline initialization code ...
// Subscribe to metrics updates
pipeline.SubscribeMetrics(func(metrics asyncdatapipeline.PipelineMetrics) {
fmt.Printf("Total Duration: %v\n", metrics.TotalDuration)
fmt.Printf("Processing Duration: %v\n", metrics.ProcessingDuration)
fmt.Printf("Idle Duration: %v\n", metrics.IdleDuration)
fmt.Printf("Batch Count: %d\n", metrics.BatchCount)
fmt.Printf("Item Count: %d\n", metrics.ItemCount)
fmt.Printf("Idle Ratio: %.2f%%\n", metrics.GetIdleRatio()*100)
}, time.Second*5) // Update every 5 seconds
// ... pipeline execution code ...
}
You can use these metrics to:
- Monitor pipeline performance in real-time
- Optimize worker count based on idle ratio
- Track processing throughput
- Identify performance bottlenecks
CollectError
: Errors occurring during data collectionProcessError
: Errors occurring during data processingErrInvalidMaxWorkers
: Invalid MaxWorkers configurationErrNeedCancel
: Signal indicating the need to cancel operation
CloseReasonNone
: No specific reasonCloseReasonIdleTimeout
: Idle timeoutCloseReasonCollectCancel
: Collection cancelledCloseReasonProcessCancel
: Processing cancelled
Issues and Pull Requests are welcome!
MIT License - see LICENSE file for details