Skip to content

处理流水线 API

命名空间:fq::processing


ProcessingPipelineInterface

处理流水线的抽象接口,隐藏具体实现(工厂模式)。

工厂创建

cpp
auto pipeline = fq::processing::createProcessingPipeline();

接口方法

cpp
class ProcessingPipelineInterface {
public:
    virtual void setInputPath(const std::string& path) = 0;
    virtual void setOutputPath(const std::string& path) = 0;
    virtual void setProcessingConfig(const ProcessingConfig& config) = 0;
    virtual void addReadPredicate(std::unique_ptr<ReadPredicateInterface> predicate) = 0;
    virtual void addReadMutator(std::unique_ptr<ReadMutatorInterface> mutator) = 0;
    virtual auto run() -> ProcessingStatistics = 0;
};

ProcessingConfig

流水线配置参数。

参数类型说明
batchSizesize_t每批 reads 数量
threadCountsize_t并行线程数
executionBackendExecutionBackend当前支持 OneTbb
memoryResourcePolicyMemoryResourcePolicy当前支持 ObjectPool
allocationTelemetryEnabledbool是否启用内存遥测
readChunkBytessize_t读取块大小
zlibBufferBytessize_tzlib 缓冲区
writerBufferBytessize_t写入缓冲区
batchCapacityBytessize_t批次内存限制
memoryLimitBytessize_t总内存限制
maxInFlightBatchessize_t并发批次数

ProcessingStatistics

处理结果统计。

字段类型说明
totalReadsuint64_t输入读段总数
passedReadsuint64_t通过过滤的读段数
filteredReadsuint64_t被过滤的读段数
modifiedReadsuint64_t被修剪或修改的读段数
errorReadsuint64_t错误读段数
inputBytesuint64_t输入字节数
outputBytesuint64_t输出字节数
elapsedMsuint64_t总耗时(毫秒)
processingTimeMsdouble处理耗时(毫秒,浮点表示)
throughputMbpsdouble吞吐量(MB/s)
allocationTelemetryEnabledbool是否启用内存遥测
memoryResourcePolicyMemoryResourcePolicy解析后的内存策略
resolvedMaxInFlightBatchessize_t本次运行解析后的 in-flight 上限
cpp
auto getPassRate() const -> double;
auto getFilterRate() const -> double;
auto toString() const -> std::string;

并行处理机制

threadCount > 1 时,使用 tbb::parallel_pipeline

Source(serial_in_order)→ Processing(parallel)→ Sink(serial_in_order)
    读取 FastqBatch          并行执行过滤/修剪         有序写入结果

内置背压机制,通过 maxInFlightBatches 控制内存使用。


ReadPredicateInterface — 过滤谓词

cpp
class ReadPredicateInterface {
public:
    virtual auto evaluate(const fq::io::FastqRecord& read) const -> bool = 0;
};

内置实现

说明
MinQualityPredicate最小平均质量过滤
MinLengthPredicate最小读长过滤
MaxLengthPredicate最大读长过滤
MaxNRatioPredicate最大 N 碱基比例过滤
cpp
pipeline->addReadPredicate(
    std::make_unique<MinQualityPredicate>(20.0, 33));
pipeline->addReadPredicate(
    std::make_unique<MinLengthPredicate>(50));

ReadMutatorInterface — 读段修饰器

cpp
class ReadMutatorInterface {
public:
    virtual void process(fq::io::FastqRecord& read) = 0;
};

内置实现

说明
QualityTrimmer质量修剪(支持 Both/FivePrime/ThreePrime 模式)
LengthTrimmer长度修剪(FixedLength/MaxLength/FromStart/FromEnd)
AdapterTrimmer接头修剪
PolyTailTrimmerpolyG / bounded polyX 尾巴修剪
cpp
pipeline->addReadMutator(
    std::make_unique<QualityTrimmer>(
        20.0, 50,
        QualityTrimmer::TrimMode::Both, 33));

完整示例

cpp
#include <fqtools/fq.h>

auto pipeline = fq::processing::createProcessingPipeline();
pipeline->setInputPath("input.fastq");
pipeline->setOutputPath("output.fastq");

fq::processing::ProcessingConfig config;
config.batchSize = 10000;
config.threadCount = 4;
config.executionBackend = fq::processing::ExecutionBackend::OneTbb;
pipeline->setProcessingConfig(config);

pipeline->addReadPredicate(
    std::make_unique<fq::processing::MinQualityPredicate>(20.0, 33));
pipeline->addReadPredicate(
    std::make_unique<fq::processing::MinLengthPredicate>(50));
pipeline->addReadMutator(
    std::make_unique<fq::processing::QualityTrimmer>(
        20.0, 50, fq::processing::QualityTrimmer::TrimMode::Both, 33));
pipeline->addReadMutator(
    std::make_unique<fq::processing::AdapterTrimmer>(
        std::vector<std::string>{"AGATCGGAAGAGC"}, 6, 1));

auto stats = pipeline->run();
std::cout << stats.toString() << "\n";

MIT License © LessUp