1. 背景

数据库从存储引擎中读取数据,通常称之为 Scan 操作。

通常 Scan 需要考虑到以下因素:

  • 是否使用索引 ?使用哪些索引 ?
  • 是否并发读取 ?
  • 列式存储如何实现 Predicate Pushdown ?

索引

对于 OLTP 数据库,普遍使用的索引是 B-Tree,而 Postgres 面向特殊场景还支持一些额外的索引类型,如 GiST, GIN。

而 OLAP 数据库通常会使用粗粒度的二级索引,因为 OLAP 数据库都假定需要读取非常多的数据做计算,所以索引没有必要非常精细,通常由下面几种索引类型:

  • 最常用的 min-max 统计
  • 部分引擎(如 ORC)支持 bloom-filter
  • ClickHouse 支持的索引类型相对来说比较丰富,例如额外支持: setngrambf_v1

Parallel Scan

OLAP 查询因为涉及大量的数据 Scan,I/O 势必会成为瓶颈,所以需要充分利用 I/O 带宽,而 Parallel Scan 是一种计算机多核化趋势下的合理解决方案。

ClickHouse 使用多线程读取模型并行读取数据,后文会有更详细的源码解析。

业界也有其他数据库支持 Parallel Scan,如 Postgres 在 9.6 版本新增 Parallel Sequential Scan 特性,Kudu 同样支持 Parallel Scan,而对于 Spark/Flink 等大数据计算引擎,Parallel Scan 是默认的模式(不同的数据源实现可能有差异)。

Predicate Pushdown

将 SQL 查询中的 Where 部分,下推到存储引擎执行,这个过程称之为 Predicate Pushdown (谓词下推)。

但在列式存储引擎面前有一个额外问题需要考虑,如何高效执行 Predicate Pushdown ?

  • 方案 1: 读取所有 Column,再执行 Filter
  • 方案 2: 先读取 Filter 相关的 Column,Filter 后识别哪些 Row Range 不用再读取,然后读取剩余 Column

方案 2 相比方案 1 潜在上可以提升性能,因为可以减少冗余 IO. 但是 ClickHouse 使用的方案 1, 但是有简单的判空 Skip.

2. MergeTree Read-Path 概述

简单来讲,MergeTree Read Path 包含以下几个阶段:

此为笔者自己总结,非官方资料

  • 阶段 1: Analysis, 确定读哪些 data part,哪些 range
  • 阶段 2: Parallel, 并行化,拆分成 N 个线程执行
  • 阶段 3: Read, 反序列化读取数据

如下图所示:

3. MergeTree Read-Path 源码解析

MergeTree Read-Path 整体调用链路要比 Write-Path 长的多。 额外新增 Call Tree 一节辅助想要看更多源码细节的读者快速找到对应的函数。

Call Tree

- InterpreterSelectQuery::InterpreterSelectQuery
  - MergeTreeWhereOptimizer::optimize
- InterpreterSelectQuery::executeImpl
  - InterpreterSelectQuery::executeFetchColumns
    - MergeTreeDataSelectExecutor::read
      - MergeTreeDataSelectExecutor::readFromParts
        - ReadFromMergeTree::initializePipeline
          - ReadFromMergeTree::getAnalysisResult
            - ReadFromMergeTree::selectRangesToRead
              - MergeTreeDataSelectExecutor::filterPartsByPartition
              - MergeTreeDataSelectExecutor::filterPartsByPrimaryKeyAndSkipIndexes
                - MergeTreeDataSelectExecutor::markRangesFromPKRange
                - MergeTreeDataSelectExecutor::filterMarksUsingIndex
            - ReadFromMergeTree::spreadMarkRangesAmongStreams
              - ReadFromMergeTree::read
                - ReadFromMergeTree::readFromPool
                  - MergeTreeReadPool
                    - MergeTreeReadPool::fillPerThreadInfo
                  - Parallel Thread: MergeTreeThreadSelectProcessor
                    - MergeTreeBaseSelectProcessor::generate
                      - MergeTreeReadPool::getTask
                      - MergeTreeBaseSelectProcessor::readFromPartImpl
                        - MergeTreeRangeReader::read
                          - MergeTreeRangeReader::startReadingChain
                          - MergeTreeRangeReader::executePrewhereActionsAndFilterColumns
                          - MergeTreeRangeReader::continueReadingChain
                            - MergeTreeReaderWide::readRows
                              - MergeTreeReaderWide::readData
                                - SerializationString DB::deserializeBinarySSE2

P1: Analysis

Analysis 阶段: 分析需要读取哪些 Part,以及 Part 中哪些部分需要读取。

P1.1 自动添加 PreWhere

关于 PreWhere 的更多介绍见 PREWHERE Clause.

PreWhere 是 ClickHouse 扩展的 SQL 子句,其在 Where 子句之前执行。 通常情况下,你并不需要显示写明 PreWhere,ClickHouse 会自动将合适的 Where 子句条件转移到 PreWhere 子句下。

void MergeTreeWhereOptimizer::optimize(ASTSelectQuery & select) const
{
    // MK: 如果已经手动定义过 PreWhere,将不会进行自动的 PreWhere 分析
    if (!select.where() || select.prewhere())
        return;

    UInt64 total_size_of_moved_conditions = 0;
    /// Move condition and all other conditions depend on the same set of columns.
    auto move_condition = [&](Conditions::iterator cond_it)
    {
        prewhere_conditions.splice(prewhere_conditions.end(), where_conditions, cond_it);
        total_size_of_moved_conditions += cond_it->columns_size;
        // MK: ...
    };

    /// Move conditions unless the ratio of total_size_of_moved_conditions to the total_size_of_queried_columns is less than some threshold.
    while (!where_conditions.empty())
    {
        /// Move the best condition to PREWHERE if it is viable.
        // MK: 选择数据量最小的 column
        auto it = std::min_element(where_conditions.begin(), where_conditions.end());

        bool moved_enough = false;
        if (total_size_of_queried_columns > 0)
        {
            /// If we know size of queried columns use it as threshold. 10% ratio is just a guess.
            moved_enough = total_size_of_moved_conditions > 0
                && (total_size_of_moved_conditions + it->columns_size) * 10 > total_size_of_queried_columns;
        }
        // MK: ...
        if (moved_enough)
            break;
        move_condition(it);
    }

    // MK: ...
    // MK: 可以打开 Debug 日志,观察哪些 column 被自动移入 PREWHERE
    LOG_DEBUG(log, "MergeTreeWhereOptimizer: condition \"{}\" moved to PREWHERE", select.prewhere());
}
  • 当 SQL 语句已经提前声明了 PreWhere 时,ClickHouse 将不再进行 PreWhere 优化分析,因此有足够把握比 ClickHouse 做的好的时候再手动添加 PreWhere
  • 尽量将 size 比较小的 Where 谓词相关的 Columns 移动到 PreWhere
  • 移入 PreWhere 的 Columns 总 size 不大于整体的 10%, 此值为经验值并且不可以通过 settings 配置

P1.2 通过 Partition 筛选 Parts

每次插入都会生成新的 Part,Part 会在后台定期 Merge,仅在同一个 Partition 分区的 Part 才可以 Merge,Part 在达到一个预置的大小(默认 150GB)会停止 Merge。 因此 MergeTree 表是由多个 Partition 构成的,filterPartsByPartition 函数用来筛选查询需要的 Part。

void MergeTreeDataSelectExecutor::filterPartsByPartition( 
    MergeTreeData::DataPartsVector & parts,
    ... )
{
    // MK: ...
    minmax_idx_condition.emplace(
        query_info, context, minmax_columns_names, 
        data.getMinMaxExpr(partition_key, ExpressionActionsSettings::fromContext(context)));
    partition_pruner.emplace(metadata_snapshot, query_info, context, false /* strict */);

    selectPartsToRead( ... );
    // MK: ...
}

void MergeTreeDataSelectExecutor::selectPartsToRead(
    MergeTreeData::DataPartsVector & parts,
    ... )
{
    MergeTreeData::DataPartsVector prev_parts;
    std::swap(prev_parts, parts);
    for (const auto & part_or_projection : prev_parts)
    {
        // MK: ...

        if (minmax_idx_condition && !minmax_idx_condition->checkInHyperrectangle(
                part->minmax_idx->hyperrectangle, minmax_columns_types).can_be_true)
            continue;

        if (partition_pruner)
        {
            if (partition_pruner->canBePruned(*part))
                continue;
        }

        parts.push_back(part_or_projection);
    }
}
  • 对于所有的 Part,遍历并检查是否可以 Skip
  • 检查依据是 Part 在写入时,记录的 Partition 相关 Column 的 min-max 索引
  • 更多细节可查看源码

P1.3 根据 PK/Index 筛选 Range

Primary key 是有序的,可以根据 Primary key 来筛选相关的 Range。 此外,ClickHouse 支持 Bloom-Filter、Min-Max、Set 等粗粒度二级索引,用来过滤 Granule。

struct MarkRange
{
    size_t begin;
    size_t end;

    // MK: ...
};

using MarkRanges = std::deque<MarkRange>;

struct RangesInDataPart
{
    MergeTreeData::DataPartPtr data_part;
    size_t part_index_in_query;
    MarkRanges ranges;

    // MK: ...
};

using RangesInDataParts = std::vector<RangesInDataPart>;
  • MarkRanges 及 RangesInDataParts 的定义
  • RangesInDataParts 标记出在每个 Part,有哪些 MarkRanges 是需要读取的
RangesInDataParts MergeTreeDataSelectExecutor::filterPartsByPrimaryKeyAndSkipIndexes (
    MergeTreeData::DataPartsVector && parts,
    ...)
{
    // MK: ...

    auto process_part = [&](size_t part_index)
    {
        auto & part = parts[part_index];

        if (metadata_snapshot->hasPrimaryKey())
            // MK: 根据 primary key 筛选 part 下的 ranges
            ranges.ranges = markRangesFromPKRange(part, metadata_snapshot, key_condition, settings, log);
        else if (total_marks_count)
            ranges.ranges = MarkRanges{MarkRange{0, total_marks_count}};

        // MK: ...
        for (auto & index_and_condition : useful_indices)
        {
            // MK: 根据 secondary index 筛选 part 下的 ranges
            ranges.ranges = filterMarksUsingIndex( ... );
        }

        // MK: ...
    };

    /// Parallel loading of data parts.
    ThreadPool pool(num_threads);

    for (size_t part_index = 0; part_index < parts.size(); ++part_index)
        pool.scheduleOrThrowOnError([&, part_index, thread_group = CurrentThread::getGroup()]
        {
            // MK: ...
            process_part(part_index);
        });

    return parts_with_ranges;
}
  • 多线程并行处理多个 Part (有最大并发限制)
  • markRangesFromPKRange 根据 Primary Key 筛选 Part Ranges
  • filterMarksUsingIndex 根据 Secondary Indexs 筛选 Part Ranges
  • markRangesFromPKRange 为纯内存操作,Primary Key 已经 Cache 在内存中
  • filterMarksUsingIndex 需要有 Disk IO

P2: Parallel

Parallel 阶段: 将读取任务拆解为 N 个可并行的读取任务,即为 Parallel Scan.

首先,我们考虑一个问题,如何并行扫描效率是最高的 ?和下列因素相关:

  1. 并行度, 也就是同时有多少线程/进程可以用来扫描数据
  2. 磁盘总 IO 带宽
  3. IO 次数

ClickHouse 在读取任务的规划上,做了下述决策来提升性能:

  1. (并行度) 尽可能利用 CPU 资源,查询并发默认为物理核数。
  2. (IO 带宽) 分散线程负载到多个磁盘,尽可能充分利用磁盘带宽
  3. (IO 次数) 读取时不留“小尾巴”,如果一个 Part 内只剩少量数据时,一次读取完
  4. (Steal Tasks) 当一些线程提前完成读取任务时,可以执行预分配给其他线程的任务

下面我们可以看下实现细节。

P2.1: 将读取任务分配到多个线程

void MergeTreeReadPool::fillPerThreadInfo(
    const size_t threads, const size_t sum_marks, std::vector<size_t> per_part_sum_marks,
    const RangesInDataParts & parts, const size_t min_marks_for_concurrent_read)
{
    // MK: 类型定义及初始化 ...

    {
        // MK: 将 part 按 Disk 分组到 parts_per_disk
        // MK: ...

        for (auto & info : parts_per_disk)
            parts_queue.push(std::move(info.second));
    }

    const size_t min_marks_per_thread = (sum_marks - 1) / threads + 1;

    // MK: 按 Disk 分配线程负载,最大化磁盘 IO 带宽利用率
    for (size_t i = 0; i < threads && !parts_queue.empty(); ++i)
    {
        auto need_marks = min_marks_per_thread;

        while (need_marks > 0 && !parts_queue.empty())
        {
            // MK: ...

            /// Get whole part to read if it is small enough.
            if (marks_in_part <= need_marks)
            {
                // MK: 整个 Part 作为当前 Task
                // 但不会继续塞入下个 Part
            }
            else
            {
                /// Loop through part ranges.
                while (need_marks > 0)
                {
                    // MK: ...
                    // MK: 在单个 Part 内部,持续向 Task 内添加 Mark 直到到达每个 Task 的 need_marks
                    ranges_to_get_from_part.emplace_back(range.begin, range.begin + marks_to_get_from_range);
                    // MK: ...
                    if (range.begin == range.end)
                        part.ranges.pop_front();
                    // MK: ...
                    need_marks -= marks_to_get_from_range;
                }
            }

            threads_tasks[i].parts_and_ranges.push_back({ part_idx, ranges_to_get_from_part });
            threads_tasks[i].sum_marks_in_parts.push_back(marks_in_ranges);
            if (marks_in_ranges != 0)
                remaining_thread_tasks.insert(i);
        }
        // MK: ...
    }
}
  • 将所有 Part 按物理存储磁盘排序
  • 按序将磁盘负载分散到 N 个线程,这样可减少线程间的 IO 竞争

P2.2: 获取当前线程执行的 Task

并不会将线程需要完成的所有任务初始时全部分配,而是每次仅会分配一个小量级的数据读取任务。

通过参数: merge_tree_min_rows_for_concurrent_read / merge_tree_min_bytes_for_concurrent_read 配置读取的数据量级。

MergeTreeReadTaskPtr MergeTreeReadPool::getTask(const size_t min_marks_to_read, const size_t thread, const Names & ordered_names)
{
    // MK: 无任务时直接返回 nullptr ...

    /// Steal task if nothing to do and it's not prohibited
    auto thread_idx = thread;
    if (!tasks_remaining_for_this_thread)
    {
        auto it = remaining_thread_tasks.lower_bound(backoff_state.current_threads);
        // Grab the entire tasks of a thread which is killed by backoff
        if (it != remaining_thread_tasks.end())
        {
            // MK: 先检查是否还有剩下的可以执行的任务 ...
        }
        else // Try steal tasks from the next thread
        {
            // MK: 当前线程没有任务可以执行时,执行其他线程的 Task (Steal Tasks)
            it = remaining_thread_tasks.upper_bound(thread);
            if (it == remaining_thread_tasks.end())
                it = remaining_thread_tasks.begin();
            thread_idx = *it;
        }
    }
    auto & thread_tasks = threads_tasks[thread_idx];

    // MK: ...

    return std::make_unique<MergeTreeReadTask>(
        part.data_part, ranges_to_get_from_part ...);
}
  • getTask() 函数参数 min_marks_to_read, 通过 setting 配置 merge_tree_min_rows_for_concurrent_read / merge_tree_min_bytes_for_concurrent_read 计算得到
  • merge_tree_min_rows_for_concurrent_read 默认值为 163840,大约是 20 个 Granule。
  • 如果当前线程已经没有剩余 Task,则分配其他线程的待执行任务 (Steal Tasks)

P3: Read

分配好每个线程的任务后,就可以开始真正读取数据了。

从宏观结构上,ClickHouse 将读取分成了两部分

此为笔者自己总结,非官方资料

  • Read Pre-Where Columns, 先读取 Pre-Where 相关 Columns
  • Read Remaining Columns, 再读取剩余 Columns

Read Pre-Where Columns 过程中,还会对数据进行 Filter,如果 Filter 后无数据,便不会进行后续的 Read Remaining Columns 步骤。

P3.1: Read Pre-Where Columns

MergeTreeRangeReader::ReadResult MergeTreeRangeReader::read(size_t max_rows, MarkRanges & ranges)
{
    // MK: ...
    if (prev_reader)
    {
        // MK: 分支 1

        // MK: 如果有 pre where 条件,先把 pre where 涉及的字段读出来
        read_result = prev_reader->read(max_rows, ranges);

        // MK: 再把除了 pre where 剩下的字段读出来
        Columns columns = continueReadingChain(read_result, num_read_rows);
        // MK: ...
        if (has_columns)
        {
            // MK: 如果有 Filter,就地执行 filter ...
            if (read_result.getFilter())
                filterColumns(columns, read_result.getFilter()->getData());
        }

        // MK: 将上面读到的两部分字段合并...
        for (auto & column : columns)
            read_result.columns.emplace_back(std::move(column));
    }
    else
    {
        // MK: 分支 2

        read_result = startReadingChain(max_rows, ranges);
        // MK: ...
    }

    // MK: 分支 3

    // MK: 如果有 pre where 条件,构建 filter column
    // MK: pre where 相关的 columns 就地执行 filter
    executePrewhereActionsAndFilterColumns(read_result);
    return read_result;
}
  • 这里逻辑稍微有一点绕,读取 Pre-Where 相关 Columns 时,仅会走 分支 2分支 3
  • 而读取完 Pre-Where 后,仅会走 分支 1continueReadingChain 函数读取剩下的 Columns
  • executePrewhereActionsAndFilterColumns 逻辑不再详述,感兴趣可以去阅读源码

P3.3: Read Rows 具体实现

下面是读取的具体实现。

readRows() 函数读取范围行数内的所有列的数据。 分别对每一列数据,使用对应的反序列化方法进行读取。

size_t MergeTreeReaderWide::readRows(
    size_t from_mark, size_t current_task_last_mark, bool continue_reading, size_t max_rows_to_read, Columns & res_columns)
{
    try
    {
        // MK: ...
        for (size_t pos = 0; pos < num_columns; ++pos, ++name_and_type)
        {
            auto column_from_part = getColumnFromPart(*name_and_type);
            // MK: ...

                readData(
                    column_from_part, column, from_mark, continue_reading, current_task_last_mark,
                    max_rows_to_read, cache, /* was_prefetched =*/ !prefetched_streams.empty());

            // MK: ...
        }
    }
    // MK: ...

    return read_rows;
}

void MergeTreeReaderWide::readData(...)
{
    // MK: ...
    auto serialization = data_part->getSerialization(name_and_type);
    // MK: ...

    serialization->[[CH.ISerialization::deserializeBinaryBulkWithMultipleStreams]](column, max_rows_to_read, deserialize_settings, deserialize_state, &cache);
}

下面是 String 类型的反序列化代码解析。

void SerializationString::deserializeBinaryBulk(IColumn & column, ReadBuffer & istr ...) const
{
    // MK: ...
    // MK: 读取 String 类型 Column 时,开启了 SSE2 指令集相关优化
    if (avg_chars_size >= 64)
        deserializeBinarySSE2<4>(data, offsets, istr, limit);
    else if (avg_chars_size >= 48)
        deserializeBinarySSE2<3>(data, offsets, istr, limit);
    else if (avg_chars_size >= 32)
        deserializeBinarySSE2<2>(data, offsets, istr, limit);
    else
        deserializeBinarySSE2<1>(data, offsets, istr, limit);
}

template <int UNROLL_TIMES>
static NO_INLINE void deserializeBinarySSE2(ColumnString::Chars & data, ColumnString::Offsets & offsets, ReadBuffer & istr, size_t limit)
{
    size_t offset = data.size();
    for (size_t i = 0; i < limit; ++i)
    {
        if (istr.eof())
            break;

        UInt64 size;
        readVarUInt(size, istr);

        offset += size + 1;
        offsets.push_back(offset);

        data.resize(offset);

        if (size)
        {
#ifdef __SSE2__
            /// An optimistic branch in which more efficient copying is possible.
            if (offset + 16 * UNROLL_TIMES <= data.capacity() && istr.position() + size + 16 * UNROLL_TIMES <= istr.buffer().end())
            {
                const __m128i * sse_src_pos = reinterpret_cast<const __m128i *>(istr.position());
                const __m128i * sse_src_end = sse_src_pos + (size + (16 * UNROLL_TIMES - 1)) / 16 / UNROLL_TIMES * UNROLL_TIMES;
                __m128i * sse_dst_pos = reinterpret_cast<__m128i *>(&data[offset - size - 1]);

                while (sse_src_pos < sse_src_end)
                {
                    // MK: _mm_storeu_si128 / _mm_loadu_si128 为 C++ 编译器提供的 SIMD 内置函数
                    //   _mm_loadu_si128 -> SSE2 指令 MOVDQU
                    //   _mm_storeu_si128 -> SSE2 指令 MOVDQU (底层对应于相同的 CPU 指令)
                    for (size_t j = 0; j < UNROLL_TIMES; ++j)
                        _mm_storeu_si128(sse_dst_pos + j, _mm_loadu_si128(sse_src_pos + j));

                    sse_src_pos += UNROLL_TIMES;
                    sse_dst_pos += UNROLL_TIMES;
                }

                istr.position() += size;
            }
            else
#endif
            {
                // MK: 不支持SSE2/尾部未对齐时的保底复制方案
                istr.readStrict(reinterpret_cast<char*>(&data[offset - size - 1]), size);
            }
        }

        data[offset - 1] = 0;
    }
}
  • String 类型的反序列化做了 SIMD 优化 (SSE2 指令集)
  • ClickHouse 广泛使用 SIMD 指令集来提升热点函数性能
  • 此处为较为简单的 SIMD 应用场景: 快速内存复制 (一次性复制更多字节内存)
  • 对未对齐的尾部元素及不支持 SSE2 指令集的场景,有保底处理

4. 总结

至此我们已经完成了从查询时候的 Part Ranges 分析,到最终反序列化 String 类型的 Column 读取到内存中的过程。 MergeTree Read Path 基础的流程已全面覆盖,以下细节出于篇幅未展开,感兴趣可以继续深入源码:

  • MergeTree projection
  • Partition / Primary Key / Secondary Index 的实现细节

MergeTree Write-path 过程中的耗时主要分布于:

  • 读取二级索引判断需要读取的 Part Ranges
  • 读取数据时的 Disk IO
  • 将数据反序列化

性能建议:

  • 避免 Part 总数过多,尽管 ClickHouse 会启动多个线程分析 Part 索引,但难免有木桶效应,因此不要让 Part 总数过多,把查询时间过多浪费在分析 Part Range 上
  • 使用多块磁盘,ClickHouse 可以充分利用磁盘 IO 带宽 (建议使用 RAID)
  • 注意二级索引的使用,索引不是越多越好,如果加的索引不能有效地减少Scan数据量,会有查询 overhead 浪费性能
  • PreWhere 子句谨慎手动填写,除非确认自己可以做的更好。

5. 参考文献



 

Leave a Reply

Your email address will not be published. Required fields are marked *

You may use these HTML tags and attributes:

<a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <cite> <code> <del datetime=""> <em> <i> <q cite=""> <s> <strike> <strong>