Skip to content

Processing Pipeline API

Namespace: fq::processing


ProcessingPipelineInterface

Abstract interface for processing pipeline, hiding concrete implementation (factory pattern).

Factory Creation

cpp
auto pipeline = fq::processing::createProcessingPipeline();

Interface Methods

cpp
class ProcessingPipelineInterface {
public:
    virtual void setInputPath(const std::string& path) = 0;
    virtual void setOutputPath(const std::string& path) = 0;
    virtual void setProcessingConfig(const ProcessingConfig& config) = 0;
    virtual void addReadPredicate(std::unique_ptr<ReadPredicateInterface> predicate) = 0;
    virtual void addReadMutator(std::unique_ptr<ReadMutatorInterface> mutator) = 0;
    virtual auto run() -> ProcessingStatistics = 0;
};

ProcessingConfig

Pipeline configuration parameters.

ParameterTypeDescription
batchSizesize_tNumber of reads per batch
threadCountsize_tNumber of parallel threads
executionBackendExecutionBackendCurrently supports OneTbb
memoryResourcePolicyMemoryResourcePolicyCurrently supports ObjectPool
allocationTelemetryEnabledboolEnables memory telemetry
readChunkBytessize_tRead chunk size
zlibBufferBytessize_tzlib buffer
writerBufferBytessize_tWriter buffer
batchCapacityBytessize_tBatch memory limit
memoryLimitBytessize_tTotal memory limit
maxInFlightBatchessize_tMax concurrent batches

ProcessingStatistics

Processing result statistics.

FieldTypeDescription
totalReadsuint64_tTotal input reads
passedReadsuint64_tReads passed filter
filteredReadsuint64_tReads filtered out
modifiedReadsuint64_tReads trimmed or otherwise modified
errorReadsuint64_tError reads
inputBytesuint64_tInput bytes
outputBytesuint64_tOutput bytes
elapsedMsuint64_tTotal time (milliseconds)
processingTimeMsdoubleProcessing time in milliseconds (floating-point)
throughputMbpsdoubleThroughput (MB/s)
allocationTelemetryEnabledboolWhether memory telemetry was enabled
memoryResourcePolicyMemoryResourcePolicyResolved memory policy
resolvedMaxInFlightBatchessize_tResolved in-flight batch limit
cpp
auto getPassRate() const -> double;
auto getFilterRate() const -> double;
auto toString() const -> std::string;

Parallel Processing Mechanism

When threadCount > 1, uses tbb::parallel_pipeline:

Source (serial_in_order) → Processing (parallel) → Sink (serial_in_order)
    Read FastqBatch            Parallel filter/trim         Ordered write results

Built-in backpressure mechanism, controls memory usage via maxInFlightBatches.


ReadPredicateInterface — Filter Predicate

cpp
class ReadPredicateInterface {
public:
    virtual auto evaluate(const fq::io::FastqRecord& read) const -> bool = 0;
};

Built-in Implementations

ClassDescription
MinQualityPredicateMinimum average quality filter
MinLengthPredicateMinimum read length filter
MaxLengthPredicateMaximum read length filter
MaxNRatioPredicateMaximum N base ratio filter
cpp
pipeline->addReadPredicate(
    std::make_unique<MinQualityPredicate>(20.0, 33));
pipeline->addReadPredicate(
    std::make_unique<MinLengthPredicate>(50));

ReadMutatorInterface — Read Mutator

cpp
class ReadMutatorInterface {
public:
    virtual void process(fq::io::FastqRecord& read) = 0;
};

Built-in Implementations

ClassDescription
QualityTrimmerQuality trim (Both/FivePrime/ThreePrime modes)
LengthTrimmerLength trim (FixedLength/MaxLength/FromStart/FromEnd)
AdapterTrimmerAdapter trim
PolyTailTrimmerpolyG / bounded polyX tail trimming
cpp
pipeline->addReadMutator(
    std::make_unique<QualityTrimmer>(
        20.0, 50,
        QualityTrimmer::TrimMode::Both, 33));

Complete Example

cpp
#include <fqtools/fq.h>

auto pipeline = fq::processing::createProcessingPipeline();
pipeline->setInputPath("input.fastq");
pipeline->setOutputPath("output.fastq");

fq::processing::ProcessingConfig config;
config.batchSize = 10000;
config.threadCount = 4;
config.executionBackend = fq::processing::ExecutionBackend::OneTbb;
pipeline->setProcessingConfig(config);

pipeline->addReadPredicate(
    std::make_unique<fq::processing::MinQualityPredicate>(20.0, 33));
pipeline->addReadPredicate(
    std::make_unique<fq::processing::MinLengthPredicate>(50));
pipeline->addReadMutator(
    std::make_unique<fq::processing::QualityTrimmer>(
        20.0, 50, fq::processing::QualityTrimmer::TrimMode::Both, 33));
pipeline->addReadMutator(
    std::make_unique<fq::processing::AdapterTrimmer>(
        std::vector<std::string>{"AGATCGGAAGAGC"}, 6, 1));

auto stats = pipeline->run();
std::cout << stats.toString() << "\n";

MIT License © LessUp