Architecture Overview
Understanding Mini-ImagePipe’s design and components.
Table of contents
- System Architecture
- Core Components
- Execution Flow
- Memory Strategy
- Operator Interface
- Data Models
- Thread Safety
- Testing Strategy
- Next Steps
System Architecture
┌───────────────────────────────────────────────────────┐
│ Pipeline API │
├───────────────────────────────────────────────────────┤
│ TaskGraph │ DAGScheduler │ MemoryManager │
├───────────────────────────────────────────────────────┤
│ Operators: Gaussian │ Sobel │ Resize │ ColorConvert │
├───────────────────────────────────────────────────────┤
│ CUDA Streams │ CUDA Events │ Shared Memory │
└───────────────────────────────────────────────────────┘
Component Diagram
graph TB
subgraph "User API"
Pipeline[Pipeline Builder]
end
subgraph "Core Framework"
TaskGraph[Task Graph]
Scheduler[DAG Scheduler]
StreamMgr[CUDA Stream Manager]
MemMgr[Memory Manager]
end
subgraph "Operators"
Gaussian[Gaussian Blur]
Sobel[Sobel Edge]
Resize[Resize]
ColorConv[Color Convert]
end
subgraph "GPU Resources"
Streams[CUDA Streams]
Events[CUDA Events]
DevMem[Device Memory]
SharedMem[Shared Memory]
end
Pipeline --> TaskGraph
TaskGraph --> Scheduler
Scheduler --> StreamMgr
Scheduler --> MemMgr
StreamMgr --> Streams
StreamMgr --> Events
MemMgr --> DevMem
Scheduler --> Gaussian
Scheduler --> Sobel
Scheduler --> Resize
Scheduler --> ColorConv
Gaussian --> SharedMem
Sobel --> SharedMem
Core Components
Pipeline
The main entry point for building and executing image processing workflows.
Responsibilities:
- Manages operator registration and connection
- Handles buffer allocation and lifecycle
- Provides batch processing support
- Coordinates execution across components
Pipeline pipeline(config);
int node = pipeline.addOperator("name", op);
pipeline.connect(from, to);
pipeline.execute();
TaskGraph
DAG-based task dependency management using Kahn’s algorithm for topological sorting.
Features:
- Topological sorting: Determines execution order respecting dependencies
- Cycle detection: Validates graph before execution using DFS
- State tracking: Tasks track states (PENDING, RUNNING, COMPLETED, FAILED)
TaskGraph& graph = pipeline.getTaskGraph();
auto order = graph.topologicalSort();
bool hasCycle = graph.detectCycle();
DAGScheduler
CUDA multi-stream execution engine that maximizes GPU utilization.
How it works:
- Assigns tasks to available CUDA streams based on dependencies
- Uses CUDA events for synchronization between dependent tasks
- Independent tasks run concurrently on different streams
- Errors propagate downstream automatically
DAGScheduler& scheduler = pipeline.getScheduler();
scheduler.setNumStreams(4);
scheduler.setErrorCallback(callback);
MemoryManager
Pinned host and device memory pool management with best-fit allocation.
Design:
- Best-fit strategy: Minimizes fragmentation by finding the smallest suitable block
- Reuse across runs: Allocated buffers are retained for subsequent pipeline executions
- Thread-safe: Safe for concurrent access from multiple threads
MemoryManager& mm = MemoryManager::getInstance();
void* ptr = mm.allocateDevice(size);
mm.freeDevice(ptr);
Execution Flow
- Build (
addOperator,connect)- Register operators and define dependencies
- Validate connections
- Validate (
executebegins)- Check for cycles in the graph
- Verify all operators have required inputs
- Allocate (internal)
- Calculate buffer sizes based on topological order
- Allocate from memory pools
- Execute (
DAGScheduler::execute)- Launch kernels on assigned streams
- Synchronize with CUDA events
- Handle errors and propagate status
- Sync (
cudaDeviceSynchronize)- Wait for all streams to complete
- Results are ready in output buffers
Memory Strategy
Host Memory (Pinned) Device Memory
┌─────────────────┐ ┌─────────────────┐
│ Pinned Pool │ H2D/D2H │ Device Pool │
│ (Best-fit) │ ───────► │ (Best-fit) │
│ │ │ │
│ Fast async │ │ Reuse across │
│ transfers │ │ executions │
└─────────────────┘ └─────────────────┘
Key benefits:
- Pinned memory: Enables asynchronous host-to-device transfers
- Pool reuse: Eliminates allocation overhead in processing loops
- Best-fit: Reduces memory fragmentation over time
Operator Interface
All operators implement the IOperator interface:
class IOperator {
public:
virtual ~IOperator() = default;
// Execute the operator on GPU
virtual cudaError_t execute(const void* input, void* output,
int width, int height, int channels,
cudaStream_t stream) = 0;
// Get output dimensions
virtual void getOutputSize(int inputWidth, int inputHeight,
int& outputWidth, int& outputHeight) = 0;
// Get output channels
virtual int getOutputChannels(int inputChannels) = 0;
};
Operator Implementations
Gaussian Blur Operator
class GaussianBlurOperator : public IOperator {
public:
enum KernelSize { KERNEL_3x3 = 3, KERNEL_5x5 = 5, KERNEL_7x7 = 7 };
GaussianBlurOperator(KernelSize size, float sigma = 0.0f);
cudaError_t execute(
const void* input, void* output,
int width, int height, int channels,
cudaStream_t stream
) override;
private:
KernelSize kernelSize_;
float sigma_;
float* d_kernelH_; // Horizontal 1D kernel on device
float* d_kernelV_; // Vertical 1D kernel on device
void* d_intermediate_; // Intermediate buffer for separable filter
};
Separable Filter Implementation:
- Decompose 2D Gaussian kernel into two 1D kernels: G(x,y) = G(x) * G(y)
- First pass: horizontal convolution with 1D kernel
- Second pass: vertical convolution with 1D kernel
- Complexity reduction: O(n²) → O(2n) per pixel
Shared Memory with Halo Regions:
+------------------+
| Halo (top) |
+--+------------+--+
|H | Tile | H|
|a | Data | a|
|l | | l|
|o | | o|
+--+------------+--+
| Halo (bottom) |
+------------------+
Sobel Edge Detection Operator
class SobelOperator : public IOperator {
public:
SobelOperator();
cudaError_t execute(
const void* input, void* output,
int width, int height, int channels,
cudaStream_t stream
) override;
private:
// Sobel kernels are constant, stored in constant memory
// Gx = [-1 0 1; -2 0 2; -1 0 1]
// Gy = [-1 -2 -1; 0 0 0; 1 2 1]
};
Resize Operator
class ResizeOperator : public IOperator {
public:
enum InterpolationMode { NEAREST, BILINEAR };
ResizeOperator(int targetWidth, int targetHeight, InterpolationMode mode);
cudaError_t execute(
const void* input, void* output,
int width, int height, int channels,
cudaStream_t stream
) override;
void getOutputDimensions(
int inputWidth, int inputHeight,
int& outputWidth, int& outputHeight
) const override;
private:
int targetWidth_;
int targetHeight_;
InterpolationMode mode_;
};
Coordinate Mapping:
src_x = dst_x * (src_width / dst_width)
src_y = dst_y * (src_height / dst_height)
Color Conversion Operator
class ColorConvertOperator : public IOperator {
public:
enum ConversionType { RGB_TO_GRAY, BGR_TO_RGB, RGBA_TO_RGB };
ColorConvertOperator(ConversionType type);
cudaError_t execute(
const void* input, void* output,
int width, int height, int channels,
cudaStream_t stream
) override;
private:
ConversionType type_;
// Luminance weights: Y = 0.299*R + 0.587*G + 0.114*B
static constexpr float kLumR = 0.299f;
static constexpr float kLumG = 0.587f;
static constexpr float kLumB = 0.114f;
};
Data Models
Image Buffer
struct ImageBuffer {
void* data; // Pointer to pixel data
int width; // Image width in pixels
int height; // Image height in pixels
int channels; // Number of channels (1, 3, or 4)
int stride; // Row stride in bytes
bool isDeviceMemory; // True if data is on GPU
bool isPinned; // True if host memory is pinned
size_t sizeInBytes() const {
return static_cast<size_t>(stride) * height;
}
};
Kernel Configuration
struct KernelConfig {
dim3 blockSize; // Thread block dimensions
dim3 gridSize; // Grid dimensions
size_t sharedMem; // Shared memory size in bytes
static KernelConfig forImage(int width, int height, int tileSize = 16) {
KernelConfig cfg;
cfg.blockSize = dim3(tileSize, tileSize);
cfg.gridSize = dim3(
(width + tileSize - 1) / tileSize,
(height + tileSize - 1) / tileSize
);
return cfg;
}
};
Pipeline Configuration
struct PipelineConfig {
int numStreams = 4; // Number of CUDA streams
size_t pinnedPoolSize = 64 * 1024 * 1024; // 64MB pinned memory pool
bool enableProfiling = false; // Enable CUDA profiling
int maxBatchSize = 8; // Maximum frames in batch
};
Thread Safety
| Component | Thread Safety | Notes |
|---|---|---|
| Pipeline | Not thread-safe | One pipeline per thread, or external synchronization |
| MemoryManager | Thread-safe | Uses mutex for pool operations |
| TaskGraph | Not thread-safe | Modify only during build phase |
| Operators | Implementation-dependent | Most are stateless and thread-safe |
Testing Strategy
Unit Tests
Unit tests verify specific examples and edge cases:
- Operator Unit Tests
- Test each operator with known input/output pairs
- Test boundary conditions (1x1 image, maximum size image)
- Test error conditions (invalid parameters)
- Scheduler Unit Tests
- Test simple DAG topologies (linear, diamond, fork-join)
- Test cycle detection with specific cycle patterns
- Test error propagation with injected failures
- Memory Manager Unit Tests
- Test allocation/free sequences
- Test pool behavior with specific allocation patterns
- Test fallback behavior when pinned allocation fails
Property-Based Tests
Property-based tests verify universal properties across randomly generated inputs. Each property test runs minimum 100 iterations.
Testing Framework: Google Test with custom property-based testing utilities for CUDA
Property Test Configuration:
// Each property test runs 100+ iterations with random inputs
constexpr int kPropertyTestIterations = 100;
// Random image generator
ImageBuffer generateRandomImage(int minSize, int maxSize, int channels);
// Random DAG generator
TaskGraph generateRandomDAG(int minNodes, int maxNodes, float edgeProbability);
Property Test Annotations: Each property test is annotated with:
// Feature: mini-image-pipe, Property N: [Property Title]
// Validates: Requirements X.Y
TEST(OperatorPropertyTest, GaussianBlurMultiChannel) {
// Property 1: Gaussian Blur Multi-Channel Support
// ...
}
Integration Tests
- End-to-End Pipeline Tests
- Test complete pipelines with multiple operators
- Verify output correctness against reference implementation
- Concurrency Tests
- Test multi-stream execution with varying stream counts
- Verify no race conditions or data corruption
- Performance Benchmarks
- Measure throughput for standard image sizes
- Compare separable vs direct convolution performance
Next Steps
- API Reference — Explore the complete API
- Usage Examples — See common patterns in action