/***************************************************************************** Copyright (c) 2018, 2019, Oracle and/or its affiliates. All Rights Reserved. This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License, version 2.0, as published by the Free Software Foundation. This program is also distributed with certain software (including but not limited to OpenSSL) that is licensed under separate terms, as designated in a particular file or component or in included license documentation. The authors of MySQL hereby grant you an additional permission to link the program and your derivative works with the separately licensed software that they have included with MySQL. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License, version 2.0, for more details. You should have received a copy of the GNU General Public License along with this program; if not, write to the Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA *****************************************************************************/ /** @file row/row0pread.cc Parallel read implementation Created 2018-01-27 by Sunny Bains */ #include #include "btr0pcur.h" #include "dict0dict.h" #include "os0thread-create.h" #include "row0mysql.h" #include "row0pread.h" #include "row0row.h" #include "row0vers.h" #include "ut0new.h" #include "lizard0undo.h" #include "lizard0row.h" #ifdef UNIV_PFS_THREAD mysql_pfs_key_t parallel_read_thread_key; mysql_pfs_key_t parallel_read_ahead_thread_key; #endif /* UNIV_PFS_THREAD */ std::atomic_size_t Parallel_reader::s_active_threads{}; /** Tree depth at which we decide to split blocks further. */ static constexpr size_t SPLIT_THRESHOLD{3}; /** Size of the read ahead request queue. */ static constexpr size_t MAX_READ_AHEAD_REQUESTS{128}; /** Maximum number of read ahead threads to spawn. Partitioned tables can have 1000s of partitions. We don't want to spawn dedicated threads per scan context. */ constexpr static size_t MAX_READ_AHEAD_THREADS{2}; std::string Parallel_reader::Scan_range::to_string() const { std::ostringstream os; os << "m_start: "; if (m_start != nullptr) { m_start->print(os); } else { os << "null"; } os << ", m_end: "; if (m_end != nullptr) { m_end->print(os); } else { os << "null"; } return (os.str()); } Parallel_reader::Scan_ctx::Iter::~Iter() { if (m_heap == nullptr) { return; } if (m_pcur != nullptr) { m_pcur->free_rec_buf(); /* Created with placement new on the heap. */ call_destructor(m_pcur); } mem_heap_free(m_heap); m_heap = nullptr; } Parallel_reader::Ctx::~Ctx() {} Parallel_reader::Scan_ctx::~Scan_ctx() {} Parallel_reader::~Parallel_reader() { mutex_destroy(&m_mutex); os_event_destroy(m_event); release_unused_threads(m_max_threads); } size_t Parallel_reader::available_threads(size_t n_required) { if (n_required == 0) return (0); const auto RELAXED = std::memory_order_relaxed; auto active = s_active_threads.fetch_add(n_required, RELAXED); if (active < MAX_THREADS) { const auto available = MAX_THREADS - active; if (n_required <= available) { return (n_required); } else { s_active_threads.fetch_sub(n_required - available, RELAXED); return (available); } } s_active_threads.fetch_sub(n_required, RELAXED); return (0); } void Parallel_reader::Scan_ctx::index_s_lock() { if (m_s_locks.fetch_add(1, std::memory_order_acquire) == 0) { auto index = m_config.m_index; /* The latch can be unlocked by a thread that didn't originally lock it. */ rw_lock_s_lock_gen(dict_index_get_lock(index), true); } } void Parallel_reader::Scan_ctx::index_s_unlock() { if (m_s_locks.fetch_sub(1, std::memory_order_acquire) == 1) { auto index = m_config.m_index; /* The latch can be unlocked by a thread that didn't originally lock it. */ rw_lock_s_unlock_gen(dict_index_get_lock(index), true); } } dberr_t Parallel_reader::Ctx::split() { ut_ad(m_range.first->m_tuple == nullptr || dtuple_validate(m_range.first->m_tuple)); ut_ad(m_range.second->m_tuple == nullptr || dtuple_validate(m_range.second->m_tuple)); /* Setup the sub-range. */ Scan_range scan_range(m_range.first->m_tuple, m_range.second->m_tuple); /* S lock so that the tree structure doesn't change while we are figuring out the sub-trees to scan. */ m_scan_ctx->index_s_lock(); auto ranges = m_scan_ctx->partition(scan_range, 1); if (!ranges.empty()) { ranges.back().second = m_range.second; } /* Create the partitioned scan execution contexts. */ for (auto &range : ranges) { auto err = m_scan_ctx->create_context(range, false); if (err != DB_SUCCESS) { m_scan_ctx->index_s_unlock(); return (err); } } m_scan_ctx->index_s_unlock(); return (DB_SUCCESS); } Parallel_reader::Parallel_reader(size_t max_threads) : m_max_threads(max_threads), m_ctxs(), m_read_aheadq(ut_2_power_up(MAX_READ_AHEAD_REQUESTS)) { m_n_completed = 0; mutex_create(LATCH_ID_PARALLEL_READ, &m_mutex); m_event = os_event_create("Parallel reader"); } Parallel_reader::Scan_ctx::Scan_ctx(Parallel_reader *reader, size_t id, trx_t *trx, const Parallel_reader::Config &config, F &&f) : m_id(id), m_config(config), m_trx(trx), m_f(f), m_reader(reader) {} /** Persistent cursor wrapper around btr_pcur_t */ class PCursor { public: /** Constructor. @param[in,out] pcur Persistent cursor in use. @param[in] mtr Mini transaction used by the persistent cursor. */ PCursor(btr_pcur_t *pcur, mtr_t *mtr) : m_mtr(mtr), m_pcur(pcur) {} /** Check if are threads waiting on the index latch. Yield the latch so that other threads can progress. */ void yield(); /** Move to the next block. @param[in] index Index being traversed. @return DB_SUCCESS or error code. */ dberr_t move_to_next_block(dict_index_t *index) MY_ATTRIBUTE((warn_unused_result)); /** Restore the cursor position. */ void restore_position() { auto relative = m_pcur->m_rel_pos; auto equal = m_pcur->restore_position(BTR_SEARCH_LEAF, m_mtr, __FILE__, __LINE__); if (relative == BTR_PCUR_ON) { if (!equal) { page_cur_move_to_next(m_pcur->get_page_cur()); } } else { ut_ad(relative == BTR_PCUR_AFTER || relative == BTR_PCUR_AFTER_LAST_IN_TREE); } } private: /** Mini transaction. */ mtr_t *m_mtr{}; /** Persistent cursor. */ btr_pcur_t *m_pcur{}; }; buf_block_t *Parallel_reader::Scan_ctx::block_get_s_latched( const page_id_t &page_id, mtr_t *mtr, int line) const { auto block = buf_page_get_gen(page_id, m_config.m_page_size, RW_S_LATCH, nullptr, Page_fetch::SCAN, __FILE__, line, mtr); buf_block_dbg_add_level(block, SYNC_TREE_NODE); return (block); } void PCursor::yield() { /* We should always yield on a block boundary. */ ut_ad(m_pcur->is_after_last_on_page()); /* Store the cursor position on the last user record on the page. */ m_pcur->move_to_prev_on_page(); m_pcur->store_position(m_mtr); m_mtr->commit(); /* Yield so that another thread can proceed. */ os_thread_yield(); m_mtr->start(); m_mtr->set_log_mode(MTR_LOG_NO_REDO); /* Restore position on the record, or its predecessor if the record was purged meanwhile. */ restore_position(); if (!m_pcur->is_after_last_on_page()) { /* Move to the successor of the saved record. */ m_pcur->move_to_next_on_page(); } } dberr_t PCursor::move_to_next_block(dict_index_t *index) { ut_ad(m_pcur->is_after_last_on_page()); if (rw_lock_get_waiters(dict_index_get_lock(index))) { /* There are waiters on the index tree lock. Store and restore the cursor position, and yield so that scanning a large table will not starve other threads. */ yield(); /* It's possible that the restore places the cursor in the middle of the block. We need to account for that too. */ if (m_pcur->is_on_user_rec()) { return (DB_SUCCESS); } } auto cur = m_pcur->get_page_cur(); auto next_page_no = btr_page_get_next(page_cur_get_page(cur), m_mtr); if (next_page_no == FIL_NULL) { m_mtr->commit(); return (DB_END_OF_INDEX); } auto block = page_cur_get_block(cur); const auto &page_id = block->page.id; block = buf_page_get_gen(page_id_t(page_id.space(), next_page_no), block->page.size, RW_S_LATCH, nullptr, Page_fetch::SCAN, __FILE__, __LINE__, m_mtr); buf_block_dbg_add_level(block, SYNC_TREE_NODE); btr_leaf_page_release(page_cur_get_block(cur), RW_S_LATCH, m_mtr); page_cur_set_before_first(block, cur); /* Skip the infimum record. */ page_cur_move_to_next(cur); /* Page can't be empty unless it is a root page. */ ut_ad(!page_cur_is_after_last(cur)); return (DB_SUCCESS); } bool Parallel_reader::Scan_ctx::check_visibility(const rec_t *&rec, ulint *&offsets, mem_heap_t *&heap, mtr_t *mtr) { const auto table_name = m_config.m_index->table->name; // ut_ad(m_trx->read_view == nullptr || // MVCC::is_view_active(m_trx->read_view)); if (m_trx->vision.is_active()) { auto vision = &m_trx->vision; if (m_config.m_index->is_clustered()) { trx_id_t rec_trx_id; txn_rec_t txn_rec; if (m_config.m_index->trx_id_offset > 0) { rec_trx_id = trx_read_trx_id(rec + m_config.m_index->trx_id_offset); ulint rec_off = m_config.m_index->trx_id_offset; txn_rec.trx_id = rec_trx_id; rec_off += DATA_TRX_ID_LEN + DATA_ROLL_PTR_LEN; txn_rec.scn = lizard::trx_read_scn(rec + rec_off); rec_off += DATA_SCN_ID_LEN; txn_rec.undo_ptr = lizard::trx_read_undo_ptr(rec + rec_off); rec_off += DATA_UNDO_PTR_LEN; txn_rec.gcn = lizard::trx_read_gcn(rec + rec_off); } else { rec_trx_id = row_get_rec_trx_id(rec, m_config.m_index, offsets); txn_rec.trx_id = rec_trx_id; txn_rec.scn = lizard::row_get_rec_scn_id(rec, m_config.m_index, offsets); txn_rec.undo_ptr = lizard::row_get_rec_undo_ptr(rec, m_config.m_index, offsets); txn_rec.gcn = lizard::row_get_rec_gcn(rec, m_config.m_index, offsets); } { if (m_trx->isolation_level > TRX_ISO_READ_UNCOMMITTED) { lizard::txn_rec_real_state_by_misc(&txn_rec); } } if (m_trx->isolation_level > TRX_ISO_READ_UNCOMMITTED && !vision->modifications_visible(&txn_rec, table_name)) { rec_t *old_vers; row_vers_build_for_consistent_read(rec, mtr, m_config.m_index, &offsets, vision, &heap, heap, &old_vers, nullptr, nullptr); rec = old_vers; if (rec == nullptr) { return (false); } } } else { /* Secondary index scan not supported yet. */ ut_error; auto max_trx_id = page_get_max_trx_id(page_align(rec)); ut_ad(max_trx_id > 0); if (!vision->sees(max_trx_id)) { /* FIXME: This is not sufficient. We may need to read in the cluster index record to be 100% sure. */ return (false); } } } if (rec_get_deleted_flag(rec, m_config.m_is_compact)) { /* This record was deleted in the latest committed version, or it was deleted and then reinserted-by-update before purge kicked in. Skip it. */ return (false); } ut_ad(m_trx->isolation_level == TRX_ISO_READ_UNCOMMITTED || !rec_offs_any_null_extern(rec, offsets)); return (true); } void Parallel_reader::Scan_ctx::copy_row(const rec_t *rec, Iter *iter) const { iter->m_offsets = rec_get_offsets(rec, m_config.m_index, nullptr, ULINT_UNDEFINED, &iter->m_heap); /* Copy the row from the page to the scan iterator. The copy should use memory from the iterator heap because the scan iterator owns the copy. */ auto rec_len = rec_offs_size(iter->m_offsets); auto copy_rec = static_cast(mem_heap_alloc(iter->m_heap, rec_len)); memcpy(copy_rec, rec, rec_len); iter->m_rec = copy_rec; auto tuple = row_rec_to_index_entry_low(iter->m_rec, m_config.m_index, iter->m_offsets, &iter->m_n_ext, iter->m_heap); ut_ad(dtuple_validate(tuple)); /* We have copied the entire record but we only need to compare the key columns when we check for boundary conditions. */ const auto n_compare = dict_index_get_n_unique_in_tree(m_config.m_index); dtuple_set_n_fields_cmp(tuple, n_compare); iter->m_tuple = tuple; } std::shared_ptr Parallel_reader::Scan_ctx::create_persistent_cursor( const page_cur_t &page_cursor, mtr_t *mtr) const { ut_ad(index_s_own()); std::shared_ptr iter = std::make_shared(); iter->m_heap = mem_heap_create(sizeof(btr_pcur_t) + (srv_page_size / 16)); ut_a(page_is_leaf(buf_block_get_frame(page_cursor.block))); auto rec = page_cursor.rec; const bool is_infimum = page_rec_is_infimum(rec); if (is_infimum) { rec = page_rec_get_next(rec); } if (page_rec_is_supremum(rec)) { /* Empty page, only root page can be empty. */ ut_a(!is_infimum || page_cursor.block->page.id.page_no() == m_config.m_index->page); return (iter); } void *ptr = mem_heap_alloc(iter->m_heap, sizeof(btr_pcur_t)); ::new (ptr) btr_pcur_t(); iter->m_pcur = reinterpret_cast(ptr); iter->m_pcur->init(); /* Make a copy of the rec. */ copy_row(rec, iter.get()); iter->m_pcur->open_on_user_rec(page_cursor, PAGE_CUR_GE, BTR_ALREADY_S_LATCHED | BTR_SEARCH_LEAF); iter->m_pcur->store_position(mtr); iter->m_pcur->set_fetch_type(Page_fetch::SCAN); return (iter); } dberr_t Parallel_reader::Ctx::traverse() { mtr_t mtr; auto &from = m_range.first; const auto &end_tuple = m_range.second->m_tuple; PCursor pcursor(from->m_pcur, &mtr); ulint offsets_[REC_OFFS_NORMAL_SIZE]; ulint *offsets = offsets_; rec_offs_init(offsets_); mtr.start(); mtr.set_log_mode(MTR_LOG_NO_REDO); auto heap = mem_heap_create(srv_page_size / 4); pcursor.restore_position(); dberr_t err{DB_SUCCESS}; m_start = true; auto index = m_scan_ctx->m_config.m_index; for (;;) { auto pcur = from->m_pcur; auto cur = pcur->get_page_cur(); if (page_cur_is_after_last(cur)) { mem_heap_empty(heap); offsets = offsets_; rec_offs_init(offsets_); if (m_scan_ctx->m_config.m_read_ahead) { auto next_page_no = btr_page_get_next(page_cur_get_page(cur), &mtr); if (next_page_no != FIL_NULL && !(next_page_no % FSP_EXTENT_SIZE)) { m_scan_ctx->submit_read_ahead(next_page_no); } } err = pcursor.move_to_next_block(index); if (err != DB_SUCCESS) { ut_ad(err == DB_END_OF_INDEX); err = DB_SUCCESS; break; } ut_ad(!page_cur_is_before_first(cur)); } const rec_t *rec = page_cur_get_rec(cur); offsets = rec_get_offsets(rec, index, offsets, ULINT_UNDEFINED, &heap); auto skip = !m_scan_ctx->check_visibility(rec, offsets, heap, &mtr); m_block = page_cur_get_block(cur); if (rec != nullptr && end_tuple != nullptr) { auto ret = end_tuple->compare(rec, index, offsets); /* Note: The range creation doesn't use MVCC. Therefore it's possible that the range boundary entry could have been deleted. */ if (ret <= 0) { mtr.commit(); break; } } /* increase the counter if rec is delete marked. */ if (m_scan_ctx->m_config.m_ptr_n_rows_read_del_mark != nullptr && rec != nullptr && rec_get_deleted_flag(rec, m_scan_ctx->m_config.m_is_compact)) { DBUG_ASSERT(m_thread_id < Parallel_reader::MAX_THREADS); Counter::inc(*(m_scan_ctx->m_config.m_ptr_n_rows_read_del_mark), m_thread_id); } if (!skip) { m_rec = rec; err = m_scan_ctx->m_f(this); m_start = false; } page_cur_move_to_next(cur); if (err != DB_SUCCESS) { mtr.commit(); break; } } ut_a(!mtr.is_active()); mem_heap_free(heap); return (err); } void Parallel_reader::enqueue(std::shared_ptr ctx) { mutex_enter(&m_mutex); m_ctxs.push_back(ctx); mutex_exit(&m_mutex); } std::shared_ptr Parallel_reader::dequeue() { mutex_enter(&m_mutex); if (m_ctxs.empty()) { mutex_exit(&m_mutex); return (nullptr); } auto ctx = m_ctxs.front(); m_ctxs.pop_front(); mutex_exit(&m_mutex); return (ctx); } bool Parallel_reader::is_queue_empty() const { mutex_enter(&m_mutex); auto empty = m_ctxs.empty(); mutex_exit(&m_mutex); return (empty); } void Parallel_reader::worker(size_t thread_id) { dberr_t err{DB_SUCCESS}; if (m_start_callback) { err = m_start_callback(thread_id); } for (;;) { size_t n_completed = 0; int64_t sig_count = os_event_reset(m_event); while (err == DB_SUCCESS && !is_error_set()) { auto ctx = dequeue(); if (ctx == nullptr) { break; } auto scan_ctx = ctx->m_scan_ctx; if (scan_ctx->is_error_set()) { break; } ctx->m_thread_id = thread_id; if (ctx->m_split) { err = ctx->split(); /* Tell the other threads that there is work to do. */ os_event_set(m_event); } else { err = ctx->traverse(); } ++n_completed; } if (err != DB_SUCCESS || is_error_set()) { break; } m_n_completed.fetch_add(n_completed, std::memory_order_relaxed); if (m_n_completed == m_ctx_id) { /* Wakeup other worker threads before exiting */ os_event_set(m_event); break; } constexpr auto FOREVER = OS_SYNC_INFINITE_TIME; os_event_wait_time_low(m_event, FOREVER, sig_count); } if (m_finish_callback) { err = m_finish_callback(thread_id); } if (err != DB_SUCCESS) { /* Set the "global" error state. */ if (!is_error_set()) { set_error_state(err); } /* Wake up any sleeping threads. */ os_event_set(m_event); } ut_a(err != DB_SUCCESS || is_error_set() || (m_n_completed == m_ctx_id && is_queue_empty())); } page_no_t Parallel_reader::Scan_ctx::search(const buf_block_t *block, const dtuple_t *key) const { ut_ad(index_s_own()); page_cur_t page_cursor; const auto index = m_config.m_index; if (key != nullptr) { page_cur_search(block, index, key, PAGE_CUR_LE, &page_cursor); } else { page_cur_set_before_first(block, &page_cursor); } if (page_rec_is_infimum(page_cur_get_rec(&page_cursor))) { page_cur_move_to_next(&page_cursor); } const auto rec = page_cur_get_rec(&page_cursor); mem_heap_t *heap = nullptr; ulint offsets_[REC_OFFS_NORMAL_SIZE]; auto offsets = offsets_; rec_offs_init(offsets_); offsets = rec_get_offsets(rec, index, offsets, ULINT_UNDEFINED, &heap); auto page_no = btr_node_ptr_get_child_page_no(rec, offsets); if (heap != nullptr) { mem_heap_free(heap); } return (page_no); } page_cur_t Parallel_reader::Scan_ctx::start_range( page_no_t page_no, mtr_t *mtr, const dtuple_t *key, Savepoints &savepoints) const { ut_ad(index_s_own()); auto index = m_config.m_index; page_id_t page_id(index->space, page_no); /* Follow the left most pointer down on each page. */ for (;;) { auto savepoint = mtr->get_savepoint(); auto block = block_get_s_latched(page_id, mtr, __LINE__); savepoints.push_back({savepoint, block}); if (!page_is_leaf(buf_block_get_frame(block))) { page_id.set_page_no(search(block, key)); continue; } page_cur_t page_cursor; if (key != nullptr) { page_cur_search(block, index, key, PAGE_CUR_GE, &page_cursor); } else { page_cur_set_before_first(block, &page_cursor); } if (page_rec_is_infimum(page_cur_get_rec(&page_cursor))) { page_cur_move_to_next(&page_cursor); } ut_a(!page_cur_is_after_last(&page_cursor)); return (page_cursor); } ut_error; return (page_cur_t{}); } void Parallel_reader::Scan_ctx::create_range(Ranges &ranges, page_cur_t &leaf_page_cursor, mtr_t *mtr) const { leaf_page_cursor.index = m_config.m_index; auto iter = create_persistent_cursor(leaf_page_cursor, mtr); /* Setup the previous range (next) to point to the current range. */ if (!ranges.empty()) { ut_a(ranges.back().second->m_heap == nullptr); ranges.back().second = iter; } ranges.push_back(Range(iter, std::make_shared())); } void Parallel_reader::Scan_ctx::create_ranges(const Scan_range &scan_range, page_no_t page_no, size_t depth, const size_t level, Ranges &ranges, mtr_t *mtr) { ut_ad(index_s_own()); ut_a(max_threads() > 0); ut_a(page_no != FIL_NULL); /* Do a breadth first traversal of the B+Tree using recursion. We want to set up the scan ranges in one pass. This guarantees that the tree structure cannot change while we are creating the scan sub-ranges. Once we create the persistent cursor (Range) for a sub-tree we can release the latches on all blocks traversed for that sub-tree. */ const auto index = m_config.m_index; page_id_t page_id(index->space, page_no); Savepoint savepoint({mtr->get_savepoint(), nullptr}); auto block = block_get_s_latched(page_id, mtr, __LINE__); savepoint.second = block; ulint offsets_[REC_OFFS_NORMAL_SIZE]; auto offsets = offsets_; rec_offs_init(offsets_); page_cur_t page_cursor; page_cursor.index = index; auto start = scan_range.m_start; if (start != nullptr) { page_cur_search(block, index, start, PAGE_CUR_LE, &page_cursor); if (page_cur_is_after_last(&page_cursor)) { return; } else if (page_rec_is_infimum(page_cur_get_rec(&page_cursor))) { page_cur_move_to_next(&page_cursor); } } else { page_cur_set_before_first(block, &page_cursor); /* Skip the infimum record. */ page_cur_move_to_next(&page_cursor); } mem_heap_t *heap{}; const auto at_leaf = page_is_leaf(buf_block_get_frame(block)); Savepoints savepoints{}; while (!page_cur_is_after_last(&page_cursor)) { const auto rec = page_cur_get_rec(&page_cursor); ut_a(at_leaf || rec_get_node_ptr_flag(rec) || !dict_table_is_comp(index->table)); if (heap == nullptr) { heap = mem_heap_create(srv_page_size / 4); } offsets = rec_get_offsets(rec, index, offsets, ULINT_UNDEFINED, &heap); const auto end = scan_range.m_end; if (end != nullptr && end->compare(rec, index, offsets) <= 0) { break; } page_cur_t leaf_page_cursor; if (!at_leaf) { auto page_no = btr_node_ptr_get_child_page_no(rec, offsets); if (depth < level) { /* Need to create a range starting at a lower level in the tree. */ create_ranges(scan_range, page_no, depth + 1, level, ranges, mtr); page_cur_move_to_next(&page_cursor); continue; } /* Find the range start in the leaf node. */ leaf_page_cursor = start_range(page_no, mtr, start, savepoints); } else { if (start != nullptr) { page_cur_search(block, index, start, PAGE_CUR_GE, &page_cursor); ut_a(!page_rec_is_infimum(page_cur_get_rec(&page_cursor))); } else { page_cur_set_before_first(block, &page_cursor); /* Skip the infimum record. */ page_cur_move_to_next(&page_cursor); ut_a(!page_cur_is_after_last(&page_cursor)); } /* Since we are alread at a leaf node use the current page cursor. */ memcpy(&leaf_page_cursor, &page_cursor, sizeof(leaf_page_cursor)); } ut_a(page_is_leaf(buf_block_get_frame(leaf_page_cursor.block))); if (!page_rec_is_supremum(page_cur_get_rec(&leaf_page_cursor))) { create_range(ranges, leaf_page_cursor, mtr); } /* We've created the persistent cursor, safe to release S latches on the blocks that are in this range (sub-tree). */ for (auto &savepoint : savepoints) { mtr->release_block_at_savepoint(savepoint.first, savepoint.second); } if (m_depth == 0 && depth == 0) { m_depth = savepoints.size(); } savepoints.clear(); if (at_leaf) { break; } start = nullptr; page_cur_move_to_next(&page_cursor); } savepoints.push_back(savepoint); for (auto &savepoint : savepoints) { mtr->release_block_at_savepoint(savepoint.first, savepoint.second); } if (heap != nullptr) { mem_heap_free(heap); } } Parallel_reader::Scan_ctx::Ranges Parallel_reader::Scan_ctx::partition( const Scan_range &scan_range, size_t level) { ut_ad(index_s_own()); mtr_t mtr; mtr.start(); mtr.set_log_mode(MTR_LOG_NO_REDO); Ranges ranges{}; create_ranges(scan_range, m_config.m_index->page, 0, level, ranges, &mtr); if (scan_range.m_end != nullptr && !ranges.empty()) { auto &iter = ranges.back().second; ut_a(iter->m_heap == nullptr); iter->m_heap = mem_heap_create(sizeof(btr_pcur_t) + (srv_page_size / 16)); iter->m_tuple = dtuple_copy(scan_range.m_end, iter->m_heap); /* Do a deep copy. */ for (size_t i = 0; i < dtuple_get_n_fields(iter->m_tuple); ++i) { dfield_dup(&iter->m_tuple->fields[i], iter->m_heap); } } mtr.commit(); return (ranges); } dberr_t Parallel_reader::Scan_ctx::create_context(const Range &range, bool split) { auto ctx_id = m_reader->m_ctx_id.fetch_add(1, std::memory_order_relaxed); // clang-format off auto ctx = std::shared_ptr( UT_NEW_NOKEY(Ctx(ctx_id, this, range)), [](Ctx *ctx) { UT_DELETE(ctx); }); // clang-format on dberr_t err{DB_SUCCESS}; if (ctx.get() == nullptr) { m_reader->m_ctx_id.fetch_sub(1, std::memory_order_relaxed); return (DB_OUT_OF_MEMORY); } else { ctx->m_split = split; m_reader->enqueue(ctx); } return (err); } dberr_t Parallel_reader::Scan_ctx::create_contexts(const Ranges &ranges) { size_t split_point{}; ut_a(max_threads() > 0 && max_threads() <= Parallel_reader::MAX_THREADS); if (ranges.size() > max_threads()) { split_point = (ranges.size() / max_threads()) * max_threads(); } else if (m_depth < SPLIT_THRESHOLD) { /* If the tree is not very deep then don't split. For smaller tables it is more expensive to split because we end up traversing more blocks*/ split_point = max_threads(); } ib::info() << "ranges: " << ranges.size() << " max_threads: " << max_threads() << " split: " << split_point << " depth: " << m_depth; size_t i{}; for (auto range : ranges) { auto err = create_context(range, i >= split_point); if (err != DB_SUCCESS) { return (err); } ++i; } return (DB_SUCCESS); } void Parallel_reader::read_ahead_worker(page_no_t n_pages) { DBUG_EXECUTE_IF("bug28079850", set_error_state(DB_INTERRUPTED);); while (is_active() && !is_error_set()) { uint64_t dequeue_count{}; Read_ahead_request read_ahead_request; while (m_read_aheadq.dequeue(read_ahead_request)) { auto scan_ctx = read_ahead_request.m_scan_ctx; if (trx_is_interrupted(scan_ctx->m_trx)) { set_error_state(DB_INTERRUPTED); break; } ut_a(scan_ctx->m_config.m_read_ahead); ut_a(read_ahead_request.m_page_no != FIL_NULL); page_id_t page_id(scan_ctx->m_config.m_index->space, read_ahead_request.m_page_no); buf_phy_read_ahead(page_id, scan_ctx->m_config.m_page_size, n_pages); ++dequeue_count; } m_consumed.fetch_add(dequeue_count, std::memory_order_relaxed); while (read_ahead_queue_empty() && is_active() && !is_error_set()) { os_thread_sleep(20); } } } void Parallel_reader::read_ahead() { ut_a(!m_scan_ctxs.empty()); auto n_read_ahead_threads = std::min(m_scan_ctxs.size(), MAX_READ_AHEAD_THREADS); std::vector threads; for (size_t i = 1; i < n_read_ahead_threads; ++i) { threads.emplace_back(os_thread_create(parallel_read_ahead_thread_key, &Parallel_reader::read_ahead_worker, this, FSP_EXTENT_SIZE)); threads.back().start(); } read_ahead_worker(FSP_EXTENT_SIZE); if (is_error_set()) { os_event_set(m_event); } for (auto &t : threads) { t.wait(); } } void Parallel_reader::parallel_read() { ut_a(m_max_threads > 0); if (m_ctxs.empty()) { return; } std::vector threads; for (size_t i = 0; i < m_max_threads; ++i) { threads.emplace_back(os_thread_create(parallel_read_thread_key, &Parallel_reader::worker, this, i)); threads.back().start(); } os_event_set(m_event); /* Start the read ahead threads. */ read_ahead(); for (auto &t : threads) { t.wait(); } } dberr_t Parallel_reader::run() { if (!m_scan_ctxs.empty()) { parallel_read(); } for (auto &scan_ctx : m_scan_ctxs) { if (m_err != DB_SUCCESS) { return (m_err); } if (scan_ctx->m_err != DB_SUCCESS) { /* Return the state of the first Scan context that is in state ERROR. */ return (scan_ctx->m_err); } } return (DB_SUCCESS); } bool Parallel_reader::add_scan(trx_t *trx, const Parallel_reader::Config &config, Parallel_reader::F &&f) { // clang-format off auto scan_ctx = std::shared_ptr( UT_NEW_NOKEY(Scan_ctx(this, m_scan_ctx_id, trx, config, std::move(f))), [](Scan_ctx *scan_ctx) { UT_DELETE(scan_ctx); }); // clang-format on if (scan_ctx.get() == nullptr) { ib::error() << "Out of memory"; return (false); } m_scan_ctxs.push_back(scan_ctx); scan_ctx->index_s_lock(); ++m_scan_ctx_id; /* Split at the root node (level == 0). */ auto ranges = scan_ctx->partition(config.m_scan_range, 0); if (ranges.empty()) { /* Table is empty. */ scan_ctx->index_s_unlock(); return (true); } auto err = scan_ctx->create_contexts(ranges); scan_ctx->index_s_unlock(); return (err == DB_SUCCESS); }