/* * Copyright (c) 2020, Alibaba Group Holding Limited * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * http://www.apache.org/licenses/LICENSE-2.0 * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #include "compaction.h" #include "compact/new_compaction_iterator.h" #include "db/builder.h" #include "db/merge_helper.h" #include "db/table_cache.h" #include "db/table_properties_collector.h" #include "db/version_edit.h" #include "logger/logger.h" #include "memory/mod_info.h" #include "options/db_options.h" #include "storage/extent_space_manager.h" #include "storage/io_extent.h" #include "table/extent_table_factory.h" #include "table/extent_table_reader.h" #include "table/merging_iterator.h" #include "table/table_builder.h" #include "util/arena.h" #include "util/file_reader_writer.h" //#include "utilities/field_extractor/field_extractor.h" #include "util/string_util.h" #include "util/to_string.h" #include "util/stop_watch.h" #include "xengine/env.h" #include "xengine/options.h" #include "xengine/xengine_constants.h" using namespace xengine; using namespace table; using namespace util; using namespace common; using namespace db; using namespace monitor; using namespace memory; using namespace logger; #define START_PERF_STATS(item) \ util::StopWatchNano item(context_.cf_options_->env, true); #define RESTART_PERF_STATS(item) item.Start() #define RECORD_PERF_STATS(item) \ stats_.perf_stats_.item += item.ElapsedNanos(false); namespace xengine { namespace storage { common::CompressionType GetCompressionType( const common::ImmutableCFOptions &ioptions, const common::MutableCFOptions &mutable_cf_options, int level, const bool enable_compression) { if (!enable_compression) { // disable compression return kNoCompression; } // TODO bottommost_compression deprecated // If the user has specified a different compression level for each level, // then pick the compression for that level. if (!ioptions.compression_per_level.empty()) { assert(level >= 0 && level <= 2); const int n = static_cast(ioptions.compression_per_level.size()) - 1; return ioptions.compression_per_level[std::max(0, std::min(level, n))]; } else { return mutable_cf_options.compression; } } GeneralCompaction::GeneralCompaction(const CompactionContext &context, const ColumnFamilyDesc &cf, ArenaAllocator &arena) : context_(context), cf_desc_(cf), write_extent_opened_(false), // row_arena_(DEFAULT_ROW_LENGTH, ModId::kCompaction), input_extents_{0,0,0}, l2_largest_key_(nullptr), delete_percent_(0), // arena_(arena), arena_(CharArena::DEFAULT_PAGE_SIZE, ModId::kCompaction), stl_alloc_(WrapAllocator(arena_)), reader_reps_(stl_alloc_) { //change_info_.task_type_ = context_.task_type_; se_iterators_ = static_cast(arena_.alloc(sizeof(ExtSEIterator) * RESERVE_MERGE_WAY_SIZE)); for (int i = 0; i < RESERVE_MERGE_WAY_SIZE; ++i) { new(se_iterators_ + i) ExtSEIterator(context.data_comparator_, context.internal_comparator_); } } GeneralCompaction::~GeneralCompaction() { close_extent(); cleanup(); } int GeneralCompaction::add_merge_batch( const MetaDescriptorList &extents, const size_t start, const size_t end) { int ret = Status::kOk; ExtentSpaceManager *space_manager = context_.space_manager_; if (end > extents.size() || start > extents.size() || start >= end) { ret = Status::kInvalidArgument; COMPACTION_LOG(WARN, "invalid end or start.", K(start), K(end), K(ret)); } else if (nullptr == space_manager) { ret = Status::kNotInit; COMPACTION_LOG(WARN, "space manager is null", K(ret)); } else { BlockPosition pos; pos.first = merge_extents_.size(); ExtentMeta *extent_meta = nullptr; for (size_t i = start; i < end && SUCC(ret); i++) { // find the newest schema in current compaction task's extents MetaDescriptor meta_dt = extents.at(i); if (FAILED(space_manager->get_meta(meta_dt.get_extent_id(), extent_meta))) { COMPACTION_LOG(WARN, "failed to get meta", K(ret), K(meta_dt), K(i)); } else if (nullptr == extent_meta /*|| nullptr == extent_meta->get_schema()*/) { ret = Status::kCorruption; COMPACTION_LOG(WARN, "extent_meta is null", K(ret), K(meta_dt), KP(extent_meta)); } else { // todo support schema // meta_dt.set_schema(extent_meta->get_schema()); // if (nullptr == mini_tables_.schema || // mini_tables_.schema->get_schema_version() < // extent_meta->get_schema()->get_schema_version()) { // mini_tables_.schema = extent_meta->get_schema(); // } // suppose extent hold the keys memory, no need do deep copy again. merge_extents_.emplace_back(meta_dt); assert(meta_dt.layer_position_.level_ >= 0); assert(meta_dt.layer_position_.level_ < 3 /*StorageManager::MAX_LEVEL*/); input_extents_[meta_dt.layer_position_.level_] += 1; } } if (SUCC(ret)) { pos.second = merge_extents_.size(); merge_batch_indexes_.emplace_back(pos); MeasureTime(context_.cf_options_->statistics, NUM_FILES_IN_SINGLE_COMPACTION, merge_extents_.size()); } } return ret; } int GeneralCompaction::open_extent() { if (write_extent_opened_) { // already open return 0; } START_PERF_STATS(create_extent); mini_tables_.space_manager = context_.space_manager_; mini_tables_.change_info_ = &change_info_; mini_tables_.table_space_id_ = context_.table_space_id_; bool is_flush= TaskType::FLUSH_LEVEL1_TASK == context_.task_type_; storage::LayerPosition output_layer_position(context_.output_level_); if (0 == context_.output_level_) { output_layer_position.layer_index_ = storage::LayerPosition::NEW_GENERATE_LAYER_INDEX; } else { output_layer_position.layer_index_ = 0; } extent_builder_.reset(NewTableBuilder( *context_.cf_options_, *context_.internal_comparator_, &props_, cf_desc_.column_family_id_, cf_desc_.column_family_name_, &mini_tables_, GetCompressionType(*context_.cf_options_, *context_.mutable_cf_options_, context_.output_level_) /* compression type */, context_.cf_options_->compression_opts, output_layer_position, &compression_dict_, true, is_flush)); if (nullptr == extent_builder_.get()) { COMPACTION_LOG(WARN, "create new table builder error", K(cf_desc_.column_family_id_), K(cf_desc_.column_family_name_.c_str())); return Status::kMemoryLimit; } write_extent_opened_ = true; RECORD_PERF_STATS(create_extent); return 0; } int GeneralCompaction::close_extent(MiniTables *flush_tables) { if (!write_extent_opened_) { return 0; } START_PERF_STATS(finish_extent); int ret = extent_builder_->Finish(); RECORD_PERF_STATS(finish_extent); if (Status::kOk != ret) { COMPACTION_LOG(ERROR, "write extent failed", K(ret)); } else { int64_t compaction_delete_percent = context_.mutable_cf_options_->compaction_delete_percent; stats_.record_stats_.merge_output_extents += mini_tables_.metas.size(); COMPACTION_LOG(INFO, "compaction generate new extent stats", "column_family_id", cf_desc_.column_family_id_, "extent_count", mini_tables_.metas.size()); for (FileMetaData &meta : mini_tables_.metas) { stats_.record_stats_.total_output_bytes += meta.fd.file_size; if (nullptr != flush_tables) { // for mt_ext task, need preheat table_cache flush_tables->metas.push_back(meta); } } for (table::TableProperties &prop : mini_tables_.props) { stats_.record_stats_.merge_datablocks += prop.num_data_blocks; } } write_extent_opened_ = false; clear_current_writers(); return ret; } void GeneralCompaction::start_record_compaction_stats() { assert(nullptr != context_.cf_options_ && nullptr != context_.cf_options_->env); if (2 == context_.output_level_) { COMPACTION_LOG(INFO, "begin to run major compaction.", K(cf_desc_.column_family_id_),K(cf_desc_.column_family_name_.c_str())); } else if (1 == context_.output_level_) { COMPACTION_LOG(INFO, "begin to run minor compaction.", K(cf_desc_.column_family_id_),K(cf_desc_.column_family_name_.c_str())); } else { COMPACTION_LOG(INFO, "begin to run intra_l0 compaction.", K(cf_desc_.column_family_id_),K(cf_desc_.column_family_name_.c_str())); } stats_.record_stats_.start_micros = context_.cf_options_->env->NowMicros(); } void GeneralCompaction::stop_record_compaction_stats() { assert(nullptr != context_.cf_options_ && nullptr != context_.cf_options_->env); stats_.record_stats_.end_micros = context_.cf_options_->env->NowMicros(); stats_.record_stats_.micros = stats_.record_stats_.end_micros - stats_.record_stats_.start_micros; if (stats_.record_stats_.micros == 0) { stats_.record_stats_.micros = 1; } MeasureTime(context_.cf_options_->statistics, COMPACTION_TIME, stats_.record_stats_.micros); // Write amplification = (not_reused_merge_input_extent) / (not_reused_input_level_extents) int64_t input_level_not_reused_extents = 0; if (0 != stats_.record_stats_.total_input_extents_at_l0) { input_level_not_reused_extents = stats_.record_stats_.total_input_extents_at_l0 - stats_.record_stats_.reuse_extents_at_l0; } else { input_level_not_reused_extents = stats_.record_stats_.total_input_extents_at_l1 - stats_.record_stats_.reuse_extents_at_l1; } stats_.record_stats_.write_amp = input_level_not_reused_extents == 0 ? 0 : ((stats_.record_stats_.total_input_extents - stats_.record_stats_.reuse_extents) / input_level_not_reused_extents); // Calulate rate if rate_limiter is set. int64_t old_total_bytes_through = 0; if (context_.cf_options_->rate_limiter != nullptr) { old_total_bytes_through = context_.cf_options_->rate_limiter->GetTotalBytesThrough(); } char buf[64] = "NONE"; if (context_.cf_options_->rate_limiter != nullptr) { snprintf (buf, 64, "%.2fMB/s", (context_.cf_options_->rate_limiter->GetTotalBytesThrough() - old_total_bytes_through) * 1000000.0 / stats_.record_stats_.micros / 1024 / 1024); } double merge_rate = (1000000.0 * (stats_.record_stats_.total_input_extents - stats_.record_stats_.reuse_extents) * MAX_EXTENT_SIZE / 1024 / 1024 / stats_.record_stats_.micros); Slice merge_ratio(buf, 64); COMPACTION_LOG(INFO, "compact ok.", K(context_.output_level_), K(cf_desc_.column_family_id_), "total time", stats_.record_stats_.micros / 1000000, "merge rate", merge_rate, "merge ratio", merge_ratio.ToString(false).c_str()); } int GeneralCompaction::create_extent_index_iterator(const MetaDescriptor &extent, size_t &iterator_index, DataBlockIterator *&iterator, ExtSEIterator::ReaderRep &rep) { int ret = 0; if (IS_NULL(get_async_extent_reader(extent.block_position_.first))) { // if prefetch failed, just ignore it prefetch_extent(extent.block_position_.first); } iterator_index = reader_reps_.size(); rep.extent_id_ = extent.block_position_.first; if (FAILED(get_extent_index_iterator(extent, rep.table_reader_, rep.index_iterator_))) { COMPACTION_LOG(WARN, "get extent index iterator failed.", K(ret)); } else { MetaType type(MetaType::SSTable, MetaType::DataBlock, MetaType::InternalKey, extent.type_.level_, extent.type_.way_, iterator_index); rep.block_iter_ = ALLOC_OBJECT(DataBlockIterator, arena_, type, rep.index_iterator_); if (nullptr == rep.block_iter_) { ret = Status::kMemoryLimit; COMPACTION_LOG(WARN, "create new block iterator error", K(ret)); } else { stats_.record_stats_.total_input_bytes += rep.table_reader_->GetTableProperties()->data_size; iterator = rep.block_iter_; reader_reps_.emplace_back(rep); } } return ret; } int GeneralCompaction::down_level_extent(const MetaDescriptor &extent) { // move level 1 's extent to level 2; int ret = 0; COMPACTION_LOG(INFO, "reuse extent", K(extent), K(context_.output_level_), K(cf_desc_.column_family_id_)); ExtentMeta *extent_meta = nullptr; LayerPosition layer_position = (0 == context_.output_level_) ? (LayerPosition(0, storage::LayerPosition::NEW_GENERATE_LAYER_INDEX)) : (LayerPosition(context_.output_level_, 0)); if (0 != extent.layer_position_.level_ && (extent.layer_position_.level_ == context_.output_level_)) { // no need down } else if (FAILED(delete_extent_meta(extent))) { COMPACTION_LOG(WARN, "delete extent meta failed.", K(ret), K(extent)); } else if (FAILED(context_.space_manager_->get_meta(extent.extent_id_, extent_meta))) { COMPACTION_LOG(WARN, "failed to get meta", K(ret), K(extent.extent_id_)); } else if (FAILED(change_info_.add_extent(layer_position, extent.extent_id_))) { COMPACTION_LOG(WARN, "fail to reuse extent.", K(ret), K(extent)); } else { destroy_async_extent_reader(extent.block_position_.first, true); } // stats stats_.record_stats_.reuse_extents += 1; if (1 == extent.type_.level_) { stats_.record_stats_.reuse_extents_at_l1 += 1; } else if (0 == extent.type_.level_) { stats_.record_stats_.reuse_extents_at_l0 += 1; } return ret; } bool GeneralCompaction::check_do_reuse(const MetaDescriptor &meta) const { bool bret = true; if (2 == context_.output_level_ && meta.delete_percent_ >= delete_percent_) { bret = false; COMPACTION_LOG(DEBUG, "REUSE CHECK: not do reuse", K(meta), K(delete_percent_)); } else { COMPACTION_LOG(DEBUG, "REUSE CHECK: do reuse", K(meta), K(delete_percent_)); } return bret; } int GeneralCompaction::copy_data_block(const MetaDescriptor &data_block /*const XengineSchema *schema*/) { int ret = 0; if (!write_extent_opened_) { FAIL_RETURN(open_extent()); } TableReader *table_reader = reader_reps_[data_block.type_.sequence_].table_reader_; // covert row according to target_schema, then add it to extent_builder one // by one // if (IS_NULL(schema) || IS_NULL(mini_tables_.schema)) { // ret = Status::kNotInit; // COMPACTION_LOG(WARN, "block's or mini_tables' schema is null", K(ret), KP(schema)); // } else if (mini_tables_.schema->get_schema_version() > schema->get_schema_version()) { // if (FAILED(switch_schema_for_block(data_block, schema, table_reader))) { // COMPACTION_LOG(WARN, "switch schema for block failed", K(ret), K(data_block)); // } // } else { // no need convert, row's schema is equal to the newest schema in // // the task COMPACTION_LOG(DEBUG, "NOT changed: copy block to dest.", K(data_block)); ExtentBasedTable *reader = dynamic_cast(table_reader); BlockHandle handle(data_block.block_position_.first, data_block.block_position_.second); int64_t block_size = handle.size() + kBlockTrailerSize; if (FAILED(block_buffer_.reserve(block_size))) { COMPACTION_LOG(WARN, "reserve buffer for read block failed", K(ret), K(handle.offset()), K(handle.size()), K(data_block)); } else { common::Slice block_data(block_buffer_.all_data(), block_size); if (FAILED(reader->get_data_block(handle, block_data))) { COMPACTION_LOG(WARN, "read data block from extent failed", K(ret), K(handle.offset()), K(handle.size())); } else if (FAILED(extent_builder_->AddBlock(block_data, data_block.value_, data_block.range_.end_key_))) { COMPACTION_LOG(ERROR, "add block to extent failed", K(data_block.range_.end_key_), K(data_block.range_.start_key_)); } } if (SUCC(ret)) { stats_.record_stats_.total_input_bytes += block_size; // stats_.record_stats_.merge_input_records += stats_.record_stats_.reuse_datablocks += 1; if (1 == data_block.type_.level_) { stats_.record_stats_.reuse_datablocks_at_l1 += 1; } else if (0 == data_block.type_.level_) { stats_.record_stats_.reuse_datablocks_at_l0 += 1; } } return ret; } int GeneralCompaction::destroy_extent_index_iterator( const int64_t iterator_index) { int ret = 0; if (iterator_index >= (int64_t)reader_reps_.size()) { } else { ExtSEIterator::ReaderRep &rep = reader_reps_[iterator_index]; if (0 == rep.extent_id_) return ret; destroy_async_extent_reader(rep.extent_id_); if (nullptr != rep.table_reader_) { // delete rep.table_reader_; FREE_OBJECT(TableReader, arena_, rep.table_reader_); // rep.table_reader_ = nullptr; } if (nullptr != rep.block_iter_) { rep.block_iter_->~DataBlockIterator(); } // reset reader_rep, avoid re-destroy rep.extent_id_ = 0; } stats_.record_stats_.merge_extents += 1; return ret; } int GeneralCompaction::delete_extent_meta(const MetaDescriptor &extent) { int ret = Status::kOk; COMPACTION_LOG(INFO, "delete extent", K(cf_desc_.column_family_id_), K(extent), K(context_.output_level_)); ret = change_info_.delete_extent(extent.layer_position_, extent.extent_id_); return ret; } int GeneralCompaction::get_table_reader(const MetaDescriptor &extent, table::TableReader *&reader) { Status s; int ret = Status::kOk; // std::unique_ptr file; RandomAccessFile *file = nullptr; AsyncRandomAccessExtent *async_file = get_async_extent_reader(extent.block_position_.first); if (nullptr != async_file) { // file.reset(async_file); file = async_file; } else { //RandomAccessExtent *async_file = ALLOC_OBJECT(RandomAccessExtent, arena_); // file.reset(new RandomAccessExtent()); file = ALLOC_OBJECT(RandomAccessExtent, arena_); RandomAccessExtent *extent_file = dynamic_cast(file); // used for cache key construct if cache_key changed // extent_file->set_subtable_id(cf_desc_.column_family_id_); if (IS_NULL(file)) { ret = Status::kMemoryLimit; COMPACTION_LOG(WARN, "alloc memory for file failed", K(ret), K(extent)); } else { s = context_.space_manager_->get_random_access_extent( extent.block_position_.first, *extent_file); if (FAILED(s.code())) { COMPACTION_LOG(ERROR, "open extent for read failed.", K(ret), K(extent)); } } if (FAILED(ret)) { FREE_OBJECT(RandomAccessExtent, arena_, extent_file); } } if (SUCC(ret)) { // std::unique_ptr table_reader; // std::unique_ptr file_reader( // new RandomAccessFileReader(file, context_.cf_options_->env, // context_.cf_options_->statistics, 0, nullptr, // context_.cf_options_, *context_.env_options_)); table::TableReader *table_reader = nullptr; RandomAccessFileReader *file_reader = ALLOC_OBJECT(RandomAccessFileReader, arena_, file, context_.cf_options_->env, context_.cf_options_->statistics, 0, nullptr, context_.cf_options_, *context_.env_options_, true); s = context_.cf_options_->table_factory->NewTableReader( TableReaderOptions(*context_.cf_options_, *context_.env_options_, *context_.internal_comparator_, nullptr, nullptr, false, extent.type_.level_), file_reader, 2 * 1024 * 1024, table_reader, false, &arena_); if (!s.ok()) { COMPACTION_LOG(WARN, "create new table reader error", K(s.getState()), K(cf_desc_.column_family_id_), K(cf_desc_.column_family_name_.c_str())); } else { // reader = table_reader.release(); reader = table_reader; } } return s.code(); } int GeneralCompaction::get_extent_index_iterator( const MetaDescriptor &extent, table::TableReader *&table_reader, table::BlockIter *&index_iterator) { int ret = 0; START_PERF_STATS(read_extent); FAIL_RETURN(get_table_reader(extent, table_reader)); RECORD_PERF_STATS(read_extent); FAIL_RETURN(get_extent_index_iterator(table_reader, index_iterator)); return ret; } int GeneralCompaction::get_extent_index_iterator( table::TableReader *table_reader, table::BlockIter *&index_iterator) { int ret = 0; table::ExtentBasedTable *reader = dynamic_cast(table_reader); table::BlockIter *iterator = ALLOC_OBJECT(BlockIter, arena_); if (nullptr == iterator) { ret = Status::kMemoryLimit; COMPACTION_LOG(WARN, "create new iterator object error", K(ret)); } else { START_PERF_STATS(read_index); ExtentBasedTable::IndexReader *index_reader = nullptr; reader->create_index_iterator(ReadOptions(), iterator, index_reader, arena_); index_iterator = iterator; RECORD_PERF_STATS(read_index); if (!iterator->status().ok()) { COMPACTION_LOG(WARN, "create new index iterator error(%s)", K(iterator->status().ToString().c_str())); ret = iterator->status().code(); } } return ret; } int GeneralCompaction::create_data_block_iterator( const storage::BlockPosition &data_block, table::TableReader *table_reader, table::BlockIter *&iterator) { int ret = 0; assert(nullptr != table_reader); table::BlockHandle handle(data_block.first, data_block.second); START_PERF_STATS(read_block); ExtentBasedTable *reader = dynamic_cast(table_reader); table::BlockIter *input_iterator = MOD_NEW_OBJECT(ModId::kCompaction, BlockIter); // BlockIter *input_iterator = ALLOC_OBJECT(BlockIter, arena_); reader->create_data_block_iterator(ReadOptions(), handle, input_iterator); iterator = input_iterator; RECORD_PERF_STATS(read_block); bool in_cache = false; if (nullptr == iterator) { ret = Status::kMemoryLimit; COMPACTION_LOG(WARN, "create data block iterator error", K(ret), K(handle.offset()), K(handle.size())); } else if (iterator->status().code()) { COMPACTION_LOG(WARN, "create data block iterator error(%s)", K(iterator->status().ToString().c_str())); ret = iterator->status().code(); } else if (1 == context_.output_level_ && FAILED(reader->check_block_if_in_cache(handle, in_cache))) { COMPACTION_LOG(WARN, "check block if is in cache failed", K(handle.offset()), K(handle.size())); } else if (in_cache) { // if (FAILED(reader->erase_block_from_cache(handle))) { // COMPACTION_LOG(WARN, "erase block from cache failed", K(handle.offset()), K(handle.size())); if (nullptr != extent_builder_) { extent_builder_->set_in_cache_flag(); } } else {} return ret; } int GeneralCompaction::destroy_data_block_iterator( table::BlockIter *block_iterator) { int ret = 0; if (nullptr != block_iterator) { // FREE_OBJECT(BlockIter, arena_, block_iterator); MOD_DELETE_OBJECT(BlockIter, block_iterator); } return ret; } int GeneralCompaction::build_multiple_seiterators( const int64_t batch_index, const storage::BlockPosition &batch, MultipleSEIterator *&merge_iterator) { int ret = 0; MultipleSEIterator *multiple_se_iterators = ALLOC_OBJECT(MultipleSEIterator, arena_, context_.data_comparator_, context_.internal_comparator_); if (nullptr == multiple_se_iterators) { ret = Status::kMemoryLimit; COMPACTION_LOG(WARN, "failed to alloc memory for multiple_se_iterators", K(ret)); } else { for (int64_t index = batch.first; index < batch.second; ++index) { const storage::MetaDescriptor &extent = merge_extents_.at(index); se_iterators_[extent.type_.way_].add_extent(extent); if (1 == extent.layer_position_.level_) { stats_.record_stats_.total_input_extents_at_l1 += 1; } else if (0 == extent.layer_position_.level_) { stats_.record_stats_.total_input_extents_at_l0 += 1; } } for (int i = 0; i < RESERVE_MERGE_WAY_SIZE; ++i) { if (se_iterators_[i].has_data()) { se_iterators_[i].set_compaction(this); multiple_se_iterators->add_se_iterator(&se_iterators_[i]); } } merge_iterator = multiple_se_iterators; } return ret; } int GeneralCompaction::build_compactor(NewCompactionIterator *&compactor, MultipleSEIterator *merge_iterator) { int ret = 0; compactor = ALLOC_OBJECT(NewCompactionIterator, arena_, context_.data_comparator_, context_.internal_comparator_, kMaxSequenceNumber, &context_.existing_snapshots_, context_.earliest_write_conflict_snapshot_, true /*do not allow corrupt key*/, merge_iterator, /* mini_tables_.schema,*/ arena_, change_info_, context_.output_level_/*output level*/, context_.shutting_down_, context_.bg_stopped_, context_.cancel_type_, true, context_.mutable_cf_options_->background_disable_merge); COMPACTION_LOG(INFO, "backgroud_merge is disabled, subtable_id: ", K(cf_desc_.column_family_id_)); return ret; } int GeneralCompaction::merge_extents(MultipleSEIterator *&merge_iterator, MiniTables *flush_tables) { assert(reader_reps_.size() == 0); int ret = 0; // 1. build compaction_iterator NewCompactionIterator *compactor = nullptr; if (FAILED(build_compactor(compactor, merge_iterator))) { COMPACTION_LOG(WARN, "failed to build compactor", K(ret)); } else if (nullptr == compactor) { ret = Status::kMemoryLimit; COMPACTION_LOG(WARN, "create compact iterator error.", K(ret)); } else if (FAILED(compactor->seek_to_first())) { COMPACTION_LOG(WARN, "compactor seek to first failed.", K(ret)); } else { } // 2. output the row or reuse extent/block while (SUCC(ret) && compactor->valid() && !context_.bg_stopped_->load()) { SEIterator::IterLevel output_level = compactor->get_output_level(); if (SEIterator::kKVLevel == output_level) { if (!write_extent_opened_) { if (FAILED(open_extent())) { COMPACTION_LOG(WARN, "open extent failed", K(ret)); break; } } if (FAILED(extent_builder_->Add(compactor->key(), compactor->value()))) { COMPACTION_LOG(WARN, "add kv failed", K(ret), K(compactor->key())); } else { stats_.record_stats_.merge_output_records += 1; } // reuse memory for row need switch schema // row_arena_.get_arena().fast_reuse(); } else if (SEIterator::kExtentLevel == output_level) { if (FAILED(close_extent(flush_tables))) { COMPACTION_LOG(WARN, "close extent failed", K(ret)); } else { // down l1 to l2 / l0 to l1 / l0 to l0 // todo schema // const XengineSchema *schema = nullptr; ret = down_level_extent(compactor->get_reuse_meta()); } } else if (SEIterator::kBlockLevel == output_level) { // const XengineSchema *schema = nullptr; MetaDescriptor meta = compactor->get_reuse_meta(/*schema*/); ret = copy_data_block(meta/*, schema*/); } else if (SEIterator::kDataEnd == output_level) { ret = Status::kCorruption; COMPACTION_LOG(WARN, "output level is invalid.", K(ret)); } else { // has been reused, extent/block } // get next item if (SUCC(ret) && FAILED(compactor->next())) { COMPACTION_LOG(WARN, "compaction iterator get next failed.", K(ret)); } } // deal with shutting down if (context_.shutting_down_->load(std::memory_order_acquire) || context_.bg_stopped_->load(std::memory_order_acquire)) { // Since the next() method will check shutting down, invalid iterator // and set status_, we distinguish shutting down and normal ending ret = Status::kShutdownInProgress; // cover ret, by design COMPACTION_LOG(WARN, "process shutting down or bg_stop, break compaction.", K(ret)); } if (SUCC(ret)) { record_compaction_iterator_stats(*compactor, stats_.record_stats_); } FREE_OBJECT(NewCompactionIterator, arena_, compactor); FREE_OBJECT(MultipleSEIterator, arena_, merge_iterator); clear_current_readers(); return ret; } int GeneralCompaction::run() { int ret = Status::kOk; start_record_compaction_stats(); int64_t batch_size = (int64_t)merge_batch_indexes_.size(); // merge extents start for (int64_t index = 0; SUCC(ret) && index < batch_size; ++index) { if (context_.shutting_down_->load(std::memory_order_acquire)) { ret = Status::kShutdownInProgress; COMPACTION_LOG(WARN, "process shutting down, break compaction.", K(ret)); break; } const BlockPosition &batch = merge_batch_indexes_.at(index); int64_t extents_size = batch.second - batch.first; bool no_reuse = true; if (extents_size <= 0) { ret = Status::kCorruption; COMPACTION_LOG(ERROR, "batch is empty", K(batch.second), K(ret)); } else if (1 == extents_size) { if (batch.first < 0 || batch.first >= (int64_t)merge_extents_.size()) { ret = Status::kCorruption; COMPACTION_LOG(WARN, "batch index is not valid > extents, BUG here!", K(batch.first), K(batch.second), K(merge_extents_.size())); } else if (!check_do_reuse(merge_extents_.at(batch.first))) { // (1->2)has many delete records or arrange extents' space, not to do reuse } else if (FAILED(close_extent())) { COMPACTION_LOG(WARN, "failed to close extent,write_opened, mini tables", K(ret), K(write_extent_opened_), K(mini_tables_.metas.size()), K(mini_tables_.props.size())); } else { no_reuse = false; const MetaDescriptor &extent = merge_extents_.at(batch.first); if (FAILED(down_level_extent(extent))) { COMPACTION_LOG(WARN, "down level extent failed", K(ret), K(extent)); } } } if (SUCC(ret) && no_reuse) { START_PERF_STATS(create_extent); RECORD_PERF_STATS(create_extent); MultipleSEIterator *merge_iterator = nullptr; if (FAILED(build_multiple_seiterators(index, batch, merge_iterator))) { COMPACTION_LOG(WARN, "failed to build multiple seiterators", K(ret), K(index), K(batch.first), K(batch.second)); } else if (FAILED(merge_extents(merge_iterator))) { COMPACTION_LOG(WARN, "merge extents failed", K(index), K(ret)); } } stats_.record_stats_.total_input_extents += extents_size; } // merge extents end if (SUCC(ret) && FAILED(close_extent())) { COMPACTION_LOG(WARN, "close extent failed.", K(ret)); } stop_record_compaction_stats(); return ret; } int GeneralCompaction::prefetch_extent(int64_t extent_id) { int ret = 0; AsyncRandomAccessExtent *reader = ALLOC_OBJECT(AsyncRandomAccessExtent, arena_); // AsyncRandomAccessExtent *reader = new AsyncRandomAccessExtent(); Status s; if (IS_NULL(reader)) { ret = Status::kMemoryLimit; COMPACTION_LOG(WARN, "failed to alloc memory for reader", K(ret)); } else if (IS_NULL(context_.space_manager_)) { ret = Status::kAborted; COMPACTION_LOG(WARN, "space_manager_ is nullptr", K(ret)); } else if (FAILED(context_.space_manager_->get_random_access_extent(extent_id, *reader).code())) { COMPACTION_LOG(ERROR, "open extent for read failed.", K(extent_id), K(ret)); } else { // todo if cache key changed // reader->set_subtable_id(cf_desc_.column_family_id_); if (nullptr != context_.cf_options_->rate_limiter) { reader->set_rate_limiter(context_.cf_options_->rate_limiter); } if (FAILED(reader->prefetch())) { COMPACTION_LOG(WARN, "failed to prefetch", K(ret)); } else { prefetch_extents_.insert(std::make_pair(extent_id, reader)); } } return ret; } AsyncRandomAccessExtent *GeneralCompaction::get_async_extent_reader( int64_t extent_id) const { AsyncRandomAccessExtent *reader = nullptr; std::unordered_map::const_iterator iter = prefetch_extents_.find(extent_id); if (iter != prefetch_extents_.end()) { reader = iter->second; } return reader; } void GeneralCompaction::destroy_async_extent_reader(int64_t extent_id, bool is_reuse) { std::unordered_map::iterator iter = prefetch_extents_.find(extent_id); if (iter != prefetch_extents_.end() && nullptr != iter->second) { iter->second->reset(); if (is_reuse) { // delete iter->second; FREE_OBJECT(AsyncRandomAccessExtent, arena_, iter->second); } else { // no need delete it->second(AsyncRandomAccessExtent), delete with table_reader } iter->second = nullptr; } if (iter != prefetch_extents_.end()) { prefetch_extents_.erase(iter); } } int GeneralCompaction::cleanup() { l2_largest_key_ = nullptr; if (nullptr != se_iterators_) { for (int64_t i = 0; i < RESERVE_MERGE_WAY_SIZE; ++i) { se_iterators_[i].~ExtSEIterator(); } } se_iterators_ = nullptr; clear_current_readers(); clear_current_writers(); if (write_extent_opened_) { extent_builder_->Abandon(); write_extent_opened_ = false; } // cleanup change_info_ change_info_.clear(); return 0; } /*int GeneralCompaction::switch_schema_for_block( const MetaDescriptor &data_block, const XengineSchema *src_schema, TableReader *table_reader) { int ret = 0; COMPACTION_LOG(DEBUG, "NOT changed: copy block to dest.need switch schema for each row.", K(data_block)); BlockIter *block_iter = nullptr; if (FAILED(create_data_block_iterator(data_block.block_position_, table_reader, block_iter))) { COMPACTION_LOG(WARN, "create data block iterator failed", K(ret), K(data_block)); } else if (IS_NULL(block_iter)) { ret = Status::kCorruption; COMPACTION_LOG(WARN, "block iter is null", K(ret), K(data_block)); } else { block_iter->SeekToFirst(); Slice tmp_value; while (block_iter->Valid() && SUCC(ret)) { Slice key = block_iter->key(); Slice tmp_value = block_iter->value(); uint64_t num = util::DecodeFixed64(key.data() + key.size() - 8); unsigned char c = num & 0xff; ValueType type = static_cast(c); if (kTypeValue == type && block_iter->value().size() > 0 && FAILED(FieldExtractor::get_instance()->convert_schema(src_schema, mini_tables_.schema, block_iter->value(), tmp_value, row_arena_))) { COMPACTION_LOG(WARN, "switch value failed.", K(ret), K(block_iter->value()), K(tmp_value)); } else if (FAILED(extent_builder_->Add(key, tmp_value))) { COMPACTION_LOG(WARN, "add kv failed", K(ret), K(key)); } else { row_arena_.get_arena().fast_reuse(); stats_.record_stats_.switch_output_records += 1; block_iter->Next(); } } } return ret; } */ void GeneralCompaction::clear_current_readers() { // clear se_iterators if (nullptr != se_iterators_) { for (int64_t i = 0; i < RESERVE_MERGE_WAY_SIZE; ++i) { se_iterators_[i].reset(); } } // incase some readers not used while error happens break compaction. for (int64_t i = 0; i < (int64_t)reader_reps_.size(); ++i) { destroy_extent_index_iterator(i); } reader_reps_.clear(); for (auto &item : prefetch_extents_) { if (nullptr != item.second) { item.second->reset(); // AsyncExtentExtent needs delete if not used. // delete item.second; FREE_OBJECT(AsyncRandomAccessExtent, arena_, item.second); } } prefetch_extents_.clear(); } void GeneralCompaction::clear_current_writers() { mini_tables_.metas.clear(); mini_tables_.props.clear(); } void GeneralCompaction::record_compaction_iterator_stats( const NewCompactionIterator &iter, CompactRecordStats &stats) { const auto &c_iter_stats = iter.iter_stats(); stats.merge_input_records += c_iter_stats.num_input_records; stats.merge_delete_records = c_iter_stats.num_input_deletion_records; stats.merge_corrupt_keys = c_iter_stats.num_input_corrupt_records; stats.single_del_fallthru = c_iter_stats.num_single_del_fallthru; stats.single_del_mismatch = c_iter_stats.num_single_del_mismatch; stats.merge_input_raw_key_bytes += c_iter_stats.total_input_raw_key_bytes; stats.merge_input_raw_value_bytes += c_iter_stats.total_input_raw_value_bytes; stats.merge_replace_records += c_iter_stats.num_record_drop_hidden; stats.merge_expired_records += c_iter_stats.num_record_drop_obsolete; } } // namespace storage } // namespace xengine