/* * Copyright (c) 2020, Alibaba Group Holding Limited * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #ifdef USE_PRAGMA_IMPLEMENTATION #pragma implementation // gcc: Class implementation #endif #define MYSQL_SERVER 1 /* The C++ file's header */ #include "sql_class.h" #include "handler_alter.h" /* C++ standard header files */ #include #include #include #include #include /* MySQL includes */ #include "debug_sync.h" #include "my_bit.h" #include "my_sys.h" #include "my_stacktrace.h" #include "sql_table.h" #include "m_string.h" #include "mysql/psi/mysql_stage.h" #include "mysql/psi/mysql_table.h" #include "mysql/thread_pool_priv.h" #include "mysys_err.h" #include "debug_sync.h" #include "mysqld.h" #include "sql_table.h" #include "sql_thd_internal_api.h" #include "table.h" #include "my_loglevel.h" #include "sql/create_field.h" /* XENGINE includes */ #include "xengine/compaction_filter.h" #include "xengine/slice_transform.h" #include "xengine/utilities/checkpoint.h" #include "xengine/utilities/convenience.h" #include "xengine/utilities/memory_util.h" #include "xengine/perf_level.h" #include "core/util/memory_stat.h" #include "core/util/sync_point.h" #include "core/memory/mod_info.h" #include "core/port/likely.h" #include "core/monitoring/query_perf_context.h" #include "core/util/string_util.h" #include "xengine/utilities/write_batch_with_index.h" #include "core/options/options_helper.h" #include "core/logger/logger.h" #include "utilities/transactions/transaction_db_impl.h" #include "db/db_impl.h" #include "db/column_family.h" #include "db/version_edit.h" #include "table/internal_iterator.h" #include "storage/multi_version_extent_meta_layer.h" /* MyX includes */ #include "./util/xdb_utils.h" #include "./event_listener.h" #include "./ha_xengine_proto.h" #include "./logger.h" #include "./xdb_cf_manager.h" #include "./xdb_cf_options.h" #include "./xdb_datadic.h" #include "./xdb_mutex_wrapper.h" #include "./xdb_threads.h" #include "./log_ddl.h" using namespace xengine; using namespace util; using namespace common; namespace myx { extern Xdb_dict_manager dict_manager; extern Xdb_cf_manager cf_manager; extern Xdb_ddl_manager ddl_manager; extern Xdb_binlog_manager binlog_manager; extern Xdb_ddl_log_manager ddl_log_manager; extern Xdb_drop_index_thread xdb_drop_idx_thread; extern xengine::util::TransactionDB *xdb; extern int xengine_inplace_populate_indexes; extern int32_t xengine_shrink_table_space; extern bool xengine_enable_bulk_load_api; extern uint32_t xengine_disable_online_ddl; extern bool xengine_disable_instant_ddl; extern ulong xengine_sort_buffer_size; extern xengine::common::DBOptions xengine_db_options; extern bool xengine_disable_parallel_ddl; /** Function to convert the Instant_Type to a comparable int */ uint16_t instant_type_to_int(Instant_Type type) { return (static_cast::type>(type)); } /** Determine if this is an instant ALTER TABLE. This can be checked in *inplace_alter_table() functions, which are called after check_if_supported_inplace_alter()*/ bool is_instant(const Alter_inplace_info *ha_alter_info) { return (ha_alter_info->handler_trivial_ctx != instant_type_to_int(Instant_Type::INSTANT_IMPOSSIBLE)); } /** Determine if one ALTER TABLE can be done instantly on the table @param[in] ha_alter_info The DDL operation @return Instant_Type accordingly */ Instant_Type ha_xengine::check_if_support_instant_ddl( const Alter_inplace_info *ha_alter_info) { if (!(ha_alter_info->handler_flags & ~XENGINE_INPLACE_IGNORE)) { return (Instant_Type::INSTANT_NO_CHANGE); } Alter_inplace_info::HA_ALTER_FLAGS alter_inplace_flags = ha_alter_info->handler_flags & ~XENGINE_INPLACE_IGNORE; /* If it's an ADD COLUMN without changing existing column orders */ if (alter_inplace_flags == Alter_inplace_info::ADD_STORED_BASE_COLUMN || alter_inplace_flags == (Alter_inplace_info::ADD_STORED_BASE_COLUMN | my_core::Alter_inplace_info::CHANGE_CREATE_OPTION)) { // in CHANGE_CREATE_OPTION only support HA_CREATE_USED_COMMENT if (alter_inplace_flags & my_core::Alter_inplace_info::CHANGE_CREATE_OPTION) { // Only support used_fields with HA_CREATE_USED_COMMENT flag if (ha_alter_info->create_info->used_fields & ~(HA_CREATE_USED_COMMENT)) { return (Instant_Type::INSTANT_IMPOSSIBLE); } } return (Instant_Type::INSTANT_ADD_COLUMN); } else { return (Instant_Type::INSTANT_IMPOSSIBLE); } } static int set_duplicate_key_for_print( TABLE *new_table, TABLE *old_table, const Xdb_key_def *kd, const std::shared_ptr& dict_info, bool is_rebuild); static MY_ATTRIBUTE((warn_unused_result)) bool xengine_need_rebuild( const Alter_inplace_info *ha_alter_info); /** Get instant ddl information from Data Dictionary @param[in] table_def dd table */ void ha_xengine::get_instant_ddl_info_if_needed(const dd::Table *table_def) { m_instant_ddl_info.clearup(); for (const auto col : table_def->columns()) { if (col->is_virtual() || col->is_se_hidden()) { continue; } const dd::Properties &se_private_data = col->se_private_data(); if (!se_private_data.exists( dd_column_key_strings[DD_INSTANT_COLUMN_DEFAULT_NULL]) && !se_private_data.exists( dd_column_key_strings[DD_INSTANT_COLUMN_DEFAULT])) { continue; } if (se_private_data.exists( dd_column_key_strings[DD_INSTANT_COLUMN_DEFAULT_NULL])) { m_instant_ddl_info.instantly_added_default_value_null.push_back(true); m_instant_ddl_info.instantly_added_default_values.push_back(""); } else if (se_private_data.exists( dd_column_key_strings[DD_INSTANT_COLUMN_DEFAULT])) { dd::String_type value; se_private_data.get(dd_column_key_strings[DD_INSTANT_COLUMN_DEFAULT], &value); size_t len; const byte *decoded_default_value; DD_instant_col_val_coder coder; decoded_default_value = coder.decode(value.c_str(), value.length(), &len); std::string default_value( reinterpret_cast(decoded_default_value), len); m_instant_ddl_info.instantly_added_default_value_null.push_back(false); m_instant_ddl_info.instantly_added_default_values.push_back( default_value); } } if (table_def->se_private_data().exists( dd_table_key_strings[DD_TABLE_INSTANT_COLS])) { table_def->se_private_data().get( dd_table_key_strings[DD_TABLE_INSTANT_COLS], &m_instant_ddl_info.m_instant_cols); assert(m_instant_ddl_info.instantly_added_default_values.size() == (table->s->fields - m_instant_ddl_info.m_instant_cols)); } else { assert(m_instant_ddl_info.instantly_added_default_values.empty()); } if (table_def->se_private_data().exists( dd_table_key_strings[DD_TABLE_NULL_BYTES])) { table_def->se_private_data().get(dd_table_key_strings[DD_TABLE_NULL_BYTES], &m_instant_ddl_info.m_null_bytes); } } /** Get the error message format string. @return the format string or 0 if not found. */ const char *myx_get_err_msg(int error_code) /*!< in: MySQL error code */ { return (my_get_err_msg(error_code)); } /** Checks if inplace alter is supported for a given operation. @param[in] altered_table new table @param[in] ha_alter_info The DDL operation */ my_core::enum_alter_inplace_result ha_xengine::check_if_supported_inplace_alter( TABLE *altered_table, my_core::Alter_inplace_info *const ha_alter_info) { DBUG_ENTER_FUNC(); DBUG_ASSERT(ha_alter_info != nullptr); if (ha_alter_info->handler_flags & ~(XENGINE_INPLACE_IGNORE | XENGINE_ALTER_NOREBUILD | XENGINE_ALTER_REBUILD)) { if (ha_alter_info->handler_flags & Alter_inplace_info::ALTER_STORED_COLUMN_TYPE) { ha_alter_info->unsupported_reason = myx_get_err_msg( ER_ALTER_OPERATION_NOT_SUPPORTED_REASON_COLUMN_TYPE); } DBUG_RETURN(HA_ALTER_INPLACE_NOT_SUPPORTED); } Instant_Type instant_type = check_if_support_instant_ddl(ha_alter_info); ha_alter_info->handler_trivial_ctx = instant_type_to_int(Instant_Type::INSTANT_IMPOSSIBLE); switch (instant_type) { case Instant_Type::INSTANT_IMPOSSIBLE: break; case Instant_Type::INSTANT_ADD_COLUMN: if ((ha_alter_info->alter_info->requested_algorithm == Alter_info::ALTER_TABLE_ALGORITHM_INPLACE) || ha_alter_info->error_if_not_empty) { /* When user request using INPLACE or table may be not empty, * we have to fall back to INPLACE */ break; } /* Fall through */ case Instant_Type::INSTANT_NO_CHANGE: ha_alter_info->handler_trivial_ctx = instant_type_to_int(instant_type); if (!xengine_disable_instant_ddl) { DBUG_RETURN(HA_ALTER_INPLACE_INSTANT); } else { DBUG_RETURN(HA_ALTER_INPLACE_NOT_SUPPORTED); } } if (ha_alter_info->handler_flags & XENGINE_ALTER_NOT_SUPPORTED) { DBUG_RETURN(HA_ALTER_INPLACE_NOT_SUPPORTED); } if (ha_alter_info->handler_flags & my_core::Alter_inplace_info::CHANGE_CREATE_OPTION) { // Only support used_fields with HA_CREATE_USED_COMMENT flag if (ha_alter_info->create_info->used_fields & ~(HA_CREATE_USED_COMMENT)) { ha_alter_info->unsupported_reason = "XEngineDDL: only supports to change " "comment of table"; DBUG_RETURN(HA_ALTER_INPLACE_NOT_SUPPORTED); } } #if 0 /* [BUG] Due to a known issue, we disable support of combination other * alteration with rename table, so disable it temporarily * remove this if branch after the bug is fixed */ if ((ha_alter_info->handler_flags & Alter_inplace_info::ALTER_RENAME) && (ha_alter_info->handler_flags & ~Alter_inplace_info::ALTER_RENAME)) { __XHANDLER_LOG(WARN, "XEngineDDL: Combination of renaming table with other " "alteration is disabled due to known issue. table:%s", table->s->table_name.str); ha_alter_info->unsupported_reason = "Combination of renaming table with " "other alteration is disabled due to " "known issue"; DBUG_RETURN(HA_ALTER_INPLACE_NOT_SUPPORTED); } #endif /* Only support NULL -> NOT NULL change if strict table sql_mode is set. Fall back to COPY for conversion if not strict tables. In-Place will fail with an error when trying to convert NULL to a NOT NULL value. */ if ((ha_alter_info->handler_flags & Alter_inplace_info::ALTER_COLUMN_NOT_NULLABLE) && !(ha_thd()->is_strict_mode())) { ha_alter_info->unsupported_reason = myx_get_err_msg(ER_ALTER_OPERATION_NOT_SUPPORTED_REASON_NOT_NULL); DBUG_RETURN(HA_ALTER_INPLACE_NOT_SUPPORTED); } /* DROP PRIMARY KEY is only allowed in combination with ADD PRIMARY KEY. */ if ((ha_alter_info->handler_flags & (Alter_inplace_info::ADD_PK_INDEX | Alter_inplace_info::DROP_PK_INDEX)) == Alter_inplace_info::DROP_PK_INDEX) { __XHANDLER_LOG(WARN, "XEngineDDL: drop primary key is only allowed in " "combination with ADD PRIMARY KEY. table_name: %s", table->s->table_name.str); ha_alter_info->unsupported_reason = "With INPLACE DDL, XEngine only allows " "that DROP PRIMARY KEY is combined with " "ADD PRIMARY KEY"; DBUG_RETURN(HA_ALTER_INPLACE_NOT_SUPPORTED); } /* for a special case, disable inplace DDL * When a table has all non-nullable columns defined, and has an unique key, * it will be treated as the PRIMARY KEY. When the non-nullable column is * changed to nullable, that unique key can't be treated as PRIMARY KEY in * new table, so we will has a hidden pk in altered_table. * We can't support inplace DROP a PRIMARY KEY. */ if (!has_hidden_pk(table) && has_hidden_pk(altered_table)) { __XHANDLER_LOG(WARN, "XEngineDDL: drop primary key is only allowed in " "combination with ADD PRIMARY KEY. table_name: %s", table->s->table_name.str); ha_alter_info->unsupported_reason = "With INPLACE DDL, XEngine only allows " "that DROP PRIMARY KEY is combined with " "ADD PRIMARY KEY"; DBUG_RETURN(HA_ALTER_INPLACE_NOT_SUPPORTED); } /* If a column change from NOT NULL to NULL and there's an implicit pk on this * column, the table should be rebuild. * The alteration should only go through the "Copy" method. */ /* for xengine, if there is no pk in mysql table, xengine will build * a hidden pk for table, So It's ok for xengine table. */ List_iterator_fast cf_it( ha_alter_info->alter_info->create_list); /* Fix the key parts. */ for (KEY *new_key = ha_alter_info->key_info_buffer; new_key < ha_alter_info->key_info_buffer + ha_alter_info->key_count; new_key++) { for (KEY_PART_INFO *key_part = new_key->key_part; key_part < new_key->key_part + new_key->user_defined_key_parts; key_part++) { const Create_field *new_field; DBUG_ASSERT(key_part->fieldnr < altered_table->s->fields); cf_it.rewind(); for (uint fieldnr = 0; (new_field = cf_it++); fieldnr++) { if (fieldnr == key_part->fieldnr) { break; } } DBUG_ASSERT(new_field); key_part->field = altered_table->field[key_part->fieldnr]; /* In some special cases InnoDB emits "false" duplicate key errors with NULL key values. Let us play safe and ensure that we can correctly print key values even in such cases. */ key_part->null_offset = key_part->field->null_offset(); key_part->null_bit = key_part->field->null_bit; if (new_field->field) { /* This is an existing column. */ continue; } /* This is an added column. */ DBUG_ASSERT(ha_alter_info->handler_flags & Alter_inplace_info::ADD_COLUMN); DBUG_ASSERT((key_part->field->auto_flags & Field::NEXT_NUMBER) == !!(key_part->field->flags & AUTO_INCREMENT_FLAG)); if (key_part->field->flags & AUTO_INCREMENT_FLAG) { /* We cannot assign an AUTO_INCREMENT column values during online ALTER. */ DBUG_ASSERT(key_part->field == altered_table->found_next_number_field); __XHANDLER_LOG(WARN, "XEngineDDL: not support assign a auto_increment " "column value. table_name: %s", table->s->table_name.str); /* for add autoinc column can't not downgrade lock after phase, * so, inplace ddl cost is the same as copy-ddl. */ DBUG_RETURN(HA_ALTER_INPLACE_NOT_SUPPORTED); } } } /** if xengine_disable_online_ddl is 1, disable online-rebuild ddl, * if xengine_disable_online_ddl is 2, disable online-rebuild/norebuild ddl, * default xengine_disable_online_ddl is 0, enable online-rebuild/norebuild ddl. */ if (xengine_disable_online_ddl == 1 && xengine_need_rebuild(ha_alter_info)) { DBUG_RETURN(HA_ALTER_INPLACE_NOT_SUPPORTED); } else if (xengine_disable_online_ddl == 2) { DBUG_RETURN(HA_ALTER_INPLACE_NOT_SUPPORTED); } DBUG_RETURN(HA_ALTER_INPLACE_NO_LOCK_AFTER_PREPARE); } /** Determine if ALTER TABLE needs to rebuild the table. @param[in] ha_alter_info The DDL operation @return whether it is necessary to rebuild the table */ static MY_ATTRIBUTE((warn_unused_result)) bool xengine_need_rebuild( const Alter_inplace_info *ha_alter_info) { if (is_instant(ha_alter_info)) { return (false); } Alter_inplace_info::HA_ALTER_FLAGS alter_inplace_flags = ha_alter_info->handler_flags & ~(XENGINE_INPLACE_IGNORE); if (alter_inplace_flags == Alter_inplace_info::CHANGE_CREATE_OPTION && !(ha_alter_info->create_info->used_fields & (HA_CREATE_USED_ROW_FORMAT | HA_CREATE_USED_KEY_BLOCK_SIZE | HA_CREATE_USED_TABLESPACE))) { /* Any other CHANGE_CREATE_OPTION than changing ROW_FORMAT, KEY_BLOCK_SIZE or TABLESPACE can be done without rebuilding the table. */ return (false); } return (0 != (ha_alter_info->handler_flags & XENGINE_ALTER_REBUILD)); } #ifndef NDEBUG void dbug_create_err_inplace_alter() { // This my_printf_error is needed for DEBUG SYNC my_printf_error(ER_UNKNOWN_ERROR, "Intentional failure in inplace alter occurred.", MYF(0)); } #endif /** check if there is duplicate key during build base data */ bool ha_xengine::check_duplicate_in_base( const TABLE *table_arg, const Xdb_key_def &index, const xengine::common::Slice& key, unique_key_buf_info *key_buf_info) { bool res = false; // used for secondary key uint n_null_fields = 0; const xengine::util::Comparator *index_comp = index.get_cf()->GetComparator(); /* Get proper key buffer. */ uchar *key_buf = key_buf_info->swap_and_get_key_buf(); uint key_size = 0; if (index.is_primary_key()) { DBUG_ASSERT(key.size() <= m_max_packed_sk_len); memcpy(key_buf, key.data(), key.size()); key_size = key.size(); } else { /* Get memcmp form of sk without extended pk tail */ key_size = index.get_memcmp_sk_parts(table_arg, key, key_buf, &n_null_fields); } key_buf_info->memcmp_key = xengine::common::Slice(reinterpret_cast(key_buf), key_size); if (key_buf_info->memcmp_key_old.size() > 0 && n_null_fields == 0 && !index_comp->Compare(key_buf_info->memcmp_key, key_buf_info->memcmp_key_old)) { res = true; } key_buf_info->memcmp_key_old = key_buf_info->memcmp_key; return res; } /* Create key definition needed for storing data in xengine during ADD index inplace operations. @param in table_arg Table with definition tbl_def_arg New table def structure being populated old_tbl_def_arg Old(current) table def structure cfs Struct array which contains column family information @return 0 - Ok other - error, either given table ddl is not supported by xengine or OOM. */ int ha_xengine::create_inplace_key_defs( const TABLE *const table_arg, Xdb_tbl_def *const tbl_def_arg, const TABLE *const old_table_arg, const Xdb_tbl_def *const old_tbl_def_arg, const std::array &cfs, const std::array &index_ids) const { DBUG_ENTER_FUNC(); DBUG_ASSERT(table_arg != nullptr); DBUG_ASSERT(tbl_def_arg != nullptr); DBUG_ASSERT(old_tbl_def_arg != nullptr); std::shared_ptr *const old_key_descr = old_tbl_def_arg->m_key_descr_arr; std::shared_ptr *const new_key_descr = tbl_def_arg->m_key_descr_arr; for (uint i = 0; i < tbl_def_arg->m_key_count; i++) { const char* new_key_name = get_key_name(i, table_arg, tbl_def_arg); auto stat_it = new_key_stats.find(new_key_name); DBUG_ASSERT(stat_it != new_key_stats.end()); const prepare_inplace_key_stat& new_key_stat = stat_it->second; DBUG_ASSERT(new_key_stat.key_name == new_key_name); #ifndef NDEBUG __XHANDLER_LOG(INFO, "key stat: %s", new_key_stat.to_string().c_str()); #endif if (new_key_stat.key_stat == prepare_inplace_key_stat::COPIED || new_key_stat.key_stat == prepare_inplace_key_stat::RENAMED) { DBUG_ASSERT(new_key_stat.mapped_key_index < m_tbl_def->m_key_count); /* Found matching index in old table definition, so copy it over to the new one created. */ const Xdb_key_def &okd = *old_key_descr[new_key_stat.mapped_key_index]; DBUG_ASSERT(okd.m_name == new_key_stat.mapped_key_name); #if 0 uint16 index_dict_version = 0; uchar index_type = 0; uint16 kv_version = 0; const GL_INDEX_ID gl_index_id = okd.get_gl_index_id(); if (!dict_manager.get_index_info(gl_index_id, &index_dict_version, &index_type, &kv_version)) { // NO_LINT_DEBUG __XHANDLER_LOG(ERROR, "XEngineDDL: Could not get index information " "for Index Number (%u,%u), table %s", gl_index_id.cf_id, gl_index_id.index_id, old_tbl_def_arg->full_tablename().c_str()); DBUG_RETURN(HA_EXIT_FAILURE); } #endif /* We can't use the copy constructor because we need to update the keynr within the pack_info for each field and the keyno of the keydef itself. Xdb_key_def is already constructed from dictionary, these meta info should be up-to-date to use */ new_key_descr[i] = std::make_shared( okd.get_index_number(), i, okd.get_cf(), okd.m_index_dict_version, okd.m_index_type, okd.m_kv_format_version, okd.m_is_reverse_cf, okd.m_is_auto_cf, new_key_name, dict_manager.get_stats(okd.get_gl_index_id())); } else if (create_key_def(table_arg, i, tbl_def_arg, &new_key_descr[i], cfs[i], index_ids[i])) { DBUG_RETURN(HA_EXIT_FAILURE); } DBUG_ASSERT(new_key_descr[i] != nullptr); new_key_descr[i]->setup(table_arg, tbl_def_arg); } DBUG_RETURN(HA_EXIT_SUCCESS); } /** Find the same old index definition in old table. @param[in] table_arg new_table @param[in] tbl_def_arg new xengine table definition @param[in] old_table_arg old_table @param[in] old_tbl_def_arg old xengine table definition @return map */ std::unordered_map ha_xengine::get_old_key_positions( const TABLE *const table_arg, const Xdb_tbl_def *const tbl_def_arg, const TABLE *const old_table_arg, const Xdb_tbl_def *const old_tbl_def_arg) const { DBUG_ENTER_FUNC(); DBUG_ASSERT(table_arg != nullptr); DBUG_ASSERT(old_table_arg != nullptr); DBUG_ASSERT(tbl_def_arg != nullptr); DBUG_ASSERT(old_tbl_def_arg != nullptr); std::shared_ptr *const old_key_descr = old_tbl_def_arg->m_key_descr_arr; std::unordered_map old_key_pos; std::unordered_map new_key_pos; uint i; for (i = 0; i < tbl_def_arg->m_key_count; i++) { new_key_pos[get_key_name(i, table_arg, tbl_def_arg)] = i; } for (i = 0; i < old_tbl_def_arg->m_key_count; i++) { if (is_hidden_pk(i, old_table_arg, old_tbl_def_arg)) { old_key_pos[old_key_descr[i]->m_name] = i; continue; } /* In case of matching key name, need to check key parts of keys as well, in case a simultaneous drop + add is performed, where the key name is the same but the key parts are different. Example: CREATE TABLE t1 (a INT, b INT, KEY ka(a)) ENGINE=XENGINE; ALTER TABLE t1 DROP INDEX ka, ADD INDEX ka(b), ALGORITHM=INPLACE; */ const KEY *const old_key = &old_table_arg->key_info[i]; const auto &it = new_key_pos.find(old_key->name); if (it == new_key_pos.end()) { continue; } KEY *const new_key = &table_arg->key_info[it->second]; /* If index algorithms are different we need to rebuild. */ if (old_key->algorithm != new_key->algorithm) { continue; } /* Check that the key is identical between old and new tables. If so, we still need to create a new index. The exception is if there is an index changed from unique to non-unique, in these cases we don't need to rebuild as they are stored the same way in XENGINE. */ bool unique_to_non_unique = (old_key->flags ^ new_key->flags) == HA_NOSAME && (old_key->flags & HA_NOSAME); if (!unique_to_non_unique && ((old_key->flags & HA_KEYFLAG_MASK) != (new_key->flags & HA_KEYFLAG_MASK))) { continue; } if (compare_key_parts(old_key, new_key)) { continue; } old_key_pos[old_key->name] = i; } DBUG_RETURN(old_key_pos); } /* Check two keys to ensure that key parts within keys match */ int ha_xengine::compare_key_parts(const KEY *const old_key, const KEY *const new_key) const { DBUG_ENTER_FUNC(); DBUG_ASSERT(old_key != nullptr); DBUG_ASSERT(new_key != nullptr); /* Skip if key parts do not match, as it is a different key */ if (new_key->user_defined_key_parts != old_key->user_defined_key_parts) { DBUG_RETURN(HA_EXIT_FAILURE); } /* Check to see that key parts themselves match */ for (uint i = 0; i < old_key->user_defined_key_parts; i++) { if (strcmp(old_key->key_part[i].field->field_name, new_key->key_part[i].field->field_name) != 0) { DBUG_RETURN(HA_EXIT_FAILURE); } /* Check if prefix index key part length has changed */ if (old_key->key_part[i].length != new_key->key_part[i].length) { DBUG_RETURN(HA_EXIT_FAILURE); } } DBUG_RETURN(HA_EXIT_SUCCESS); } static void inplace_alter_table_release(Xdb_tbl_def* &tbl_def) { if (nullptr != tbl_def) { if (nullptr != tbl_def->m_key_descr_arr) { /* Delete the new key descriptors */ delete[] tbl_def->m_key_descr_arr; /* * Explicitly mark as nullptr so we don't accidentally remove entries * from data dictionary on cleanup (or cause double delete[]). */ tbl_def->m_key_descr_arr = nullptr; } delete tbl_def; tbl_def = nullptr; } } /** * Online create unique index process, non-unique is simpler * * t0 t1 t2 t3 * |-----------|------|----|----- * * t0: TABLE_CREATE, create table, empty table * t1: BUILDING_BASE_INDEX, start to create index * t2: CHECK_UNIQUE_CONSTRAINT, finish building index for (t0, t1), start to check unique * t3: FINISHED, finish checking unique between (t1, t2) and (t0, t1) * * t1-t2: new arrivals unique check against data in (t1, then), * the uncommitted 2nd index are updated accordingly from now on. * cf-index is the destination cfh. * At this time, base-index is not done yet, so data in (t1-t2) only do * unique check with its own(mem,L0,L1). * * t2-t3: we do unique check between(t0,t1),(t1,t2), new arrivals(t2,then) * unique check against data in (t0, t1) & (t1, t2), * (t0, t1) and (t1, then) are in separate parts.(t0,t1) locate at L2 layer of * subtable, and (t1, then) locate at (mem,L0,L1) of subtable. * * t3: last phase of inplace_ddl, if there were errors happened during the ddl, * we rollback new indexes operations, otherwise, we make new indexes available * for new arrival request. */ /** Allows the storage engine to update internal structures with concurrent writes blocked. If check_if_supported_inplace_alter() returns HA_ALTER_INPLACE_NO_LOCK_AFTER_PREPARE or HA_ALTER_INPLACE_SHARED_AFTER_PREPARE, this function is called with exclusive lock otherwise the same level of locking as for inplace_alter_table() will be used. @note Storage engines are responsible for reporting any errors by calling my_error()/print_error() @note If this function reports error, commit_inplace_alter_table() will be called with commit= false. @note For partitioning, failing to prepare one partition, means that commit_inplace_alter_table() will be called to roll back changes for all partitions. This means that commit_inplace_alter_table() might be called without prepare_inplace_alter_table() having been called first for a given partition. @param altered_table TABLE object for new version of table. @param ha_alter_info Structure describing changes to be done by ALTER TABLE and holding data used during in-place alter. @param old_table_def dd::Table object describing old version of the table. @param new_table_def dd::Table object for the new version of the table. Can be adjusted by this call if SE supports atomic DDL. These changes to the table definition will be persisted in the data-dictionary at statement commit time. @retval true Error @retval false Success */ bool ha_xengine::prepare_inplace_alter_table( TABLE *const altered_table, my_core::Alter_inplace_info *const ha_alter_info, const dd::Table *old_table_def, dd::Table *new_table_def) { DBUG_ENTER_FUNC(); DBUG_ASSERT(altered_table != nullptr); DBUG_ASSERT(ha_alter_info != nullptr); DBUG_ASSERT(old_table_def != nullptr); DBUG_ASSERT(new_table_def != nullptr); #ifndef NDEBUG __XHANDLER_LOG(INFO, "XEngineDDL: prepare alter sql is %s", ha_thd()->query().str); #endif if (is_instant(ha_alter_info)) { // Nothing to do if it's instant ddl here DBUG_RETURN(HA_EXIT_SUCCESS); } int ret = HA_EXIT_SUCCESS; const uint old_n_keys = m_tbl_def->m_key_count; uint new_n_keys = altered_table->s->keys; if (has_hidden_pk(altered_table)) { new_n_keys += 1; } std::shared_ptr *const old_key_descr = m_tbl_def->m_key_descr_arr; auto new_key_descr = new std::shared_ptr[ new_n_keys ]; auto new_tdef = new Xdb_tbl_def(m_tbl_def->full_tablename()); new_tdef->m_key_descr_arr = new_key_descr; new_tdef->m_key_count = new_n_keys; new_tdef->m_auto_incr_val = m_tbl_def->m_auto_incr_val.load(std::memory_order_relaxed); new_tdef->m_hidden_pk_val = m_tbl_def->m_hidden_pk_val.load(std::memory_order_relaxed); bool need_rebuild = xengine_need_rebuild(ha_alter_info); if ((ret = prepare_inplace_alter_table_collect_key_stats( altered_table, ha_alter_info))) { XHANDLER_LOG(ERROR, "XEngineDDL: failed to collect status of keys for altered table", "table_name", table->s->table_name.str); } else if (new_tdef->init_table_id(ddl_manager)) { XHANDLER_LOG(ERROR, "XEngineDDL: failed to init table_id for creating table", "table_name", table->s->table_name.str); ret = HA_EXIT_FAILURE; } else if ((ret = create_key_defs(altered_table, new_tdef, nullptr, table, m_tbl_def.get(), need_rebuild))) { XHANDLER_LOG(ERROR, "XEngineDDL: failed creating new key definitions for altered table", "table_name", table->s->table_name.str); } else if (new_tdef->write_dd_table(new_table_def)) { XHANDLER_LOG(ERROR, "XEngineDDL: failed to write dd::Table", "table_name", table->s->table_name.str); ret = HA_EXIT_FAILURE; } else if (need_rebuild) { ret = prepare_inplace_alter_table_rebuild( altered_table, ha_alter_info, new_tdef, old_key_descr, old_n_keys, new_key_descr, new_n_keys); if (ret) { XHANDLER_LOG(ERROR, "XEngineDDL: prepare for online inplace ddl failed", "table_name", table->s->table_name.str); } } else { ret = prepare_inplace_alter_table_norebuild( altered_table, ha_alter_info, new_tdef, old_key_descr, old_n_keys, new_key_descr, new_n_keys); if (ret) { XHANDLER_LOG(ERROR, "XEngineDDL: prepare for online inplace ddl failed", "table_name", table->s->table_name.str); } } if (ret) { XHANDLER_LOG(ERROR, "XEngineDDL: prepare for online inplace ddl failed", "table_name", table->s->table_name.str); inplace_alter_table_release(new_tdef); } else { #ifndef NDEBUG for (auto &it : new_key_stats) { DBUG_ASSERT(it.first == it.second.key_name); XHANDLER_LOG(INFO, "XEngineDDL: ", "key stat", it.second.to_string().c_str()); } #endif XHANDLER_LOG(INFO, "XEngineDDL: prepare for online inplace ddl successfully", "table_name", table->s->table_name.str); } DBUG_RETURN(ret); } int ha_xengine::prepare_inplace_alter_table_collect_key_stats( TABLE *const altered_table, my_core::Alter_inplace_info *const ha_alter_info) { DBUG_ENTER_FUNC(); new_key_stats.clear(); std::map all_old_key_names; // collect key names from old table for (uint i = 0; i < m_tbl_def->m_key_count; ++i) { all_old_key_names.emplace(m_tbl_def->m_key_descr_arr[i]->m_name, i); } // collect (dropped or renamed) key name from Alter_inplace_info std::unordered_set dropped_key_names; for (uint i=0; i < ha_alter_info->index_drop_count; ++i) { dropped_key_names.emplace(ha_alter_info->index_drop_buffer[i]->name); } std::map renamed_key_names; if (ha_alter_info->handler_flags & Alter_inplace_info::RENAME_INDEX) { for (uint i = 0; i < ha_alter_info->index_rename_count; ++i) { renamed_key_names.emplace( ha_alter_info->index_rename_buffer[i].new_key->name, ha_alter_info->index_rename_buffer[i].old_key->name); } } // iterate over all key info of new table for (uint i = 0; i < ha_alter_info->key_count; ++i) { prepare_inplace_key_stat new_key_stat; new_key_stat.key_name = std::string(ha_alter_info->key_info_buffer[i].name); auto old_it1 = all_old_key_names.find(new_key_stat.key_name); if (old_it1 != all_old_key_names.end()) { if (dropped_key_names.count(new_key_stat.key_name)) { // key is dropped from old table and re-created in new table // index of key in old table is ignored new_key_stat.key_stat = prepare_inplace_key_stat::REDEFINED; } else { // key is copied from key old table new_key_stat.key_stat = prepare_inplace_key_stat::COPIED; new_key_stat.mapped_key_name = new_key_stat.key_name; new_key_stat.mapped_key_index = old_it1->second; } } else { auto it_re = renamed_key_names.find(new_key_stat.key_name); if (it_re != renamed_key_names.end()) { auto old_it2 = all_old_key_names.find(it_re->second); DBUG_ASSERT(old_it2 != all_old_key_names.end()); // key is renamed from key in old table new_key_stat.key_stat = prepare_inplace_key_stat::RENAMED; new_key_stat.mapped_key_name = old_it2->first; new_key_stat.mapped_key_index = old_it2->second; } else { // key is newly added into new table new_key_stat.key_stat = prepare_inplace_key_stat::ADDED; } } new_key_stats.emplace(new_key_stat.key_name, new_key_stat); } // hidden pk if (has_hidden_pk(altered_table)) { prepare_inplace_key_stat new_key_stat; new_key_stat.key_name = HIDDEN_PK_NAME; if (has_hidden_pk(table)) { // hidden pk is copied from old table new_key_stat.key_stat = prepare_inplace_key_stat::COPIED; new_key_stat.mapped_key_name = HIDDEN_PK_NAME; new_key_stat.mapped_key_index = m_tbl_def->m_key_count - 1; } else { // primary key is dropped from old table, XEngine will add hidden pk new_key_stat.key_stat = prepare_inplace_key_stat::ADDED; } new_key_stats.emplace(new_key_stat.key_name, new_key_stat); } DBUG_RETURN(HA_EXIT_SUCCESS); } /** Build col_map[new_column_index, old_column_index], if col_map[new_column_index] == -1, it means new column added into new_table @param[in] ha_alter_info @param[in] altered_table @param[in] old_table @param[in/out] dictionary used to pack/unpack new record from old record @return HA_EXIT_SUCCESS/HA_EXIT_FAILURE */ static int setup_col_maps(Alter_inplace_info *ha_alter_info, const TABLE *altered_table, const TABLE *old_table, std::shared_ptr& dict_info) { int ret = HA_EXIT_SUCCESS; uint n_cols = altered_table->s->fields; uint old_n_cols = old_table->s->fields; dict_info->m_col_map = static_cast( my_malloc(PSI_NOT_INSTRUMENTED, (n_cols) * sizeof(uint), MYF(0))); if (dict_info->m_col_map == nullptr) { __XHANDLER_LOG(ERROR, "XEngineDDL: alloc memory failed, table_name: %s", old_table->s->table_name.str); return HA_EXIT_FAILURE; } dict_info->m_col_map_rev = static_cast( my_malloc(PSI_NOT_INSTRUMENTED, (old_n_cols) * sizeof(uint), MYF(0))); if (dict_info->m_col_map_rev == nullptr) { __XHANDLER_LOG(ERROR, "XEngineDDL: alloc memory failed, table_name: %s", old_table->s->table_name.str); return HA_EXIT_FAILURE; } for (uint i = 0; i < old_n_cols; i++) { dict_info->m_col_map_rev[i] = (uint)(-1); } uint *col_map = dict_info->m_col_map; uint *col_map_rev = dict_info->m_col_map_rev; List_iterator_fast cf_it( ha_alter_info->alter_info->create_list); uint new_i = 0; while (const Create_field *new_field = cf_it++) { uint old_i = 0; for (; old_i < old_n_cols; old_i++) { const Field *field = old_table->field[old_i]; if (new_field->field == field) { break; } } // found if (old_i < old_n_cols) { col_map[new_i] = old_i; col_map_rev[old_i] = new_i; new_i++; } else { col_map[new_i++] = (uint)(-1); } } DBUG_ASSERT(new_i == n_cols); return ret; } /* prepare dictionary info for online-ddl/dml transactions @param[in] ha_alter_info Data used during in-place alter @param[in] altered_table MySQL table that is being altered @param[in] old_table MySQL table as it is before the ALTER operation @param[in] table_name Table name in MySQL @retval HA_EXIT_SUCCESS/HA_EXIT_FAILURE success/fail */ int ha_xengine::prepare_inplace_alter_table_dict( Alter_inplace_info *ha_alter_info, const TABLE *altered_table, const TABLE *old_table, Xdb_tbl_def *old_tbl_def, const char *table_name) { int ret = HA_EXIT_SUCCESS; old_tbl_def->m_dict_info = std::make_shared(); if (old_tbl_def->m_dict_info == nullptr) { __XHANDLER_LOG(ERROR, "XEngineDDL: allocate memory failed, table_name: %s", table->s->table_name.str); return HA_EXIT_FAILURE; } else if ((ret = setup_field_converters( altered_table, m_new_pk_descr, old_tbl_def->m_dict_info->m_encoder_arr, old_tbl_def->m_dict_info->m_fields_no_needed_to_decode, old_tbl_def->m_dict_info->m_null_bytes_in_rec, old_tbl_def->m_dict_info->m_maybe_unpack_info))) { __XHANDLER_LOG(ERROR, "XEngineDDL: setup field converters failed, table_name: %s", table->s->table_name.str); } else if ((ret = setup_read_decoders( altered_table, old_tbl_def->m_dict_info->m_encoder_arr, old_tbl_def->m_dict_info->m_decoders_vect, true))) { __XHANDLER_LOG(ERROR, "XEngineDDL: setup read decoders failed, table_name: %s", table->s->table_name.str); } else if ((ret = setup_col_maps(ha_alter_info, altered_table, old_table, old_tbl_def->m_dict_info))) { __XHANDLER_LOG(ERROR, "XEngineDDL: setup column maps failed, table_name: %s", table->s->table_name.str); } else { __XHANDLER_LOG(INFO, "XEngineDDL: prepare alter rebuild table dictionary successfully, table_name: %s", table->s->table_name.str); } return ret; } /** Initialize new xdb_tbl_def and dict info for inplace_rebuild table @param[in] altered_table new TABLE @param[in] ha_alter_info The DDL operation @param[in] new_tdef new xengine tbl def @param[in] old_key_descr old table key descr @param[in] old_n_keys number of old table keys @param[in] new_key_descr new table key descr @param[in] new_n_keys number of new table keys @return Success or Failure */ int ha_xengine::prepare_inplace_alter_table_rebuild( TABLE *const altered_table, my_core::Alter_inplace_info *const ha_alter_info, Xdb_tbl_def *const new_tdef, std::shared_ptr *const old_key_descr, uint old_n_keys, std::shared_ptr *const new_key_descr, uint new_n_keys) { DBUG_ENTER_FUNC(); DBUG_ASSERT(altered_table); DBUG_ASSERT(new_tdef); // disable subtable major compaction std::vector column_family_handles; for (uint i = 0; i < new_tdef->m_key_count; i++) { std::shared_ptr kd = new_tdef->m_key_descr_arr[i]; xengine::db::ColumnFamilyHandle *cf = kd->get_cf(); column_family_handles.push_back(cf); } // disable the data compaction from L1 to L2 xengine::util::TransactionDBImpl *txn_db_impl; txn_db_impl = dynamic_cast(xdb); xengine::common::Status s; s = txn_db_impl->GetDBImpl()->switch_major_compaction(column_family_handles, false); if (!s.ok()) { __XHANDLER_LOG(ERROR, "XEngineDDL: switch_major_compaction fail: %s, table_name: %s", s.ToString().c_str(), table->s->table_name.str); DBUG_RETURN(HA_EXIT_FAILURE); } // disable the garbage old MVCC version for the unique constraint check. s = txn_db_impl->GetDBImpl()->disable_backgroud_merge(column_family_handles); if (!s.ok()) { __XHANDLER_LOG(ERROR, "XEngineDDL: disable_background_merge fail:%s, table_name: %s", s.ToString().c_str(), table->s->table_name.str); DBUG_RETURN(HA_EXIT_FAILURE); } // for both update old_tbl and new_tbl m_tbl_def->m_inplace_new_tdef = new_tdef; for (uint i = 0; i < new_tdef->m_key_count; i++) { std::shared_ptr kd = new_tdef->m_key_descr_arr[i]; m_tbl_def->create_new_keys_info(kd, altered_table); } // reallocate memory for pack/unpack buffer, // make sure new_tdef have enough space for build new table record free_key_buffers(); int err; if ((err = alloc_key_buffers(table, m_tbl_def.get(), true))) { __XHANDLER_LOG(ERROR, "XEngineDDL: failed allocating key buffers during alter, table_name: %s", table->s->table_name.str); DBUG_RETURN(err); } if ((err = prepare_inplace_alter_table_dict(ha_alter_info, altered_table, table, m_tbl_def.get(), table->s->table_name.str))) { __XHANDLER_LOG(ERROR, "XEngineDDL: failed prepare dictionary for online copy ddl, table_name: %s", table->s->table_name.str); DBUG_RETURN(err); } // check whether keys in altered table can skip unique check prepare_inplace_alter_table_skip_unique_check(altered_table, new_tdef); ha_alter_info->handler_ctx = new Xdb_inplace_alter_ctx(new_tdef); #if 0 /* Add uncommitted key definitions to ddl_manager. We need to do this so that the property collector can find this keydef when it needs to update stats. The property collector looks for the keydef in the data dictionary, but it won't be there yet since this key definition is still in the creation process. */ ddl_manager.add_uncommitted_keydefs(added_indexes); #endif DBUG_RETURN(HA_EXIT_SUCCESS); } int ha_xengine::prepare_inplace_alter_table_skip_unique_check( TABLE *const altered_table, Xdb_tbl_def *const new_tbl_def) const { DBUG_ENTER_FUNC(); // for rebuilt table, check whether index is copied from old table for (uint i = 0; i < new_tbl_def->m_key_count; i++) { auto &kd = new_tbl_def->m_key_descr_arr[i]; auto it = new_key_stats.find(kd->m_name); DBUG_ASSERT(it != new_key_stats.end()); kd->m_can_skip_unique_check = false; /* only when the key definition isn't changed, it is copied from old table * One possible optimization is to skip unique check when columns of the key are re-orderd * If any key is affected by modified or dropped column, the key will be * dropped and re-created, if old key is also renamed, new key will be * created with new name, no more renaming operation. */ if (kd->is_hidden_primary_key() || !(altered_table->key_info[i].flags & HA_NOSAME) || (it->second.key_stat == prepare_inplace_key_stat::COPIED) || (it->second.key_stat == prepare_inplace_key_stat::RENAMED)) { kd->m_can_skip_unique_check = true; } } DBUG_RETURN(HA_EXIT_SUCCESS); } /** used convert to new xengine_record from old table_record and new added columns. @param[in] table MySQL table as it is before the ALTER operation @param[in] altered_table MySQL table that is being altered @param[in] dict_info, dictionary for altered table, read_only @param[in] pk_packed_slice Packed PK tuple. We need it in order to compute and store its CRC. @parama[in] pk_unpack_info unpack_info used to decode PK tuple. @param[out] Data slice with record data. @retval HA_EXIT_SUCCESS/HA_EXIT_FAILURE */ int ha_xengine::convert_new_record_from_old_record( const TABLE *table, const TABLE *altered_table, const std::shared_ptr& dict_info, const xengine::common::Slice &pk_packed_slice, Xdb_string_writer *const pk_unpack_info, xengine::common::Slice *const packed_rec, String& new_storage_record) { int ret = HA_EXIT_SUCCESS; new_storage_record.length(0); new_storage_record.fill(XENGINE_RECORD_HEADER_LENGTH, 0); String null_bytes_str; null_bytes_str.length(0); null_bytes_str.fill(dict_info->m_null_bytes_in_rec, 0); new_storage_record.append(null_bytes_str); // If a primary key may have non-empty unpack_info for certain values, // (m_maybe_unpack_info=TRUE), we write the unpack_info block. The block // itself was prepared in Xdb_key_def::pack_new_record. if (dict_info->m_maybe_unpack_info) { new_storage_record.append(reinterpret_cast(pk_unpack_info->ptr()), pk_unpack_info->get_current_pos()); } Xdb_field_encoder *encoder_arr = dict_info->m_encoder_arr; uint *col_map = dict_info->m_col_map; uint old_col_index = 0; for (uint i = 0; i < altered_table->s->fields; i++) { /* field may come from old table or new table. if new added field, then it comes form altered_table, else field comes from old_table. */ Field *field = nullptr; old_col_index = col_map[i]; if (old_col_index == (uint)(-1)) { field = altered_table->field[i]; } else { field = table->field[old_col_index]; } //alter operation make column null->not-null if (field->is_null() && !encoder_arr[i].maybe_null()) { ret = HA_ERR_INVALID_NULL_ERROR; } /* Don't pack decodable PK key parts */ if (encoder_arr[i].m_storage_type != Xdb_field_encoder::STORE_ALL) { continue; } if (encoder_arr[i].maybe_null()) { char *const data = (char *)new_storage_record.ptr() + XENGINE_RECORD_HEADER_LENGTH; if (field->is_null()) { data[encoder_arr[i].m_null_offset] |= encoder_arr[i].m_null_mask; continue; } } if (encoder_arr[i].m_field_type == MYSQL_TYPE_BLOB || encoder_arr[i].m_field_type == MYSQL_TYPE_JSON) { auto *field_blob = (my_core::Field_blob *)field; append_blob_to_storage_format(new_storage_record, field_blob); } else if (encoder_arr[i].m_field_type == MYSQL_TYPE_VARCHAR) { auto *field_var = (Field_varstring *)field; append_varchar_to_storage_format(new_storage_record, field_var); } else { /* Copy the field data */ const uint len = field->pack_length_in_rec(); new_storage_record.append(reinterpret_cast(field->ptr), len); } } if (should_store_row_debug_checksums()) { const uint32_t key_crc32 = my_core::crc32( 0, xdb_slice_to_uchar_ptr(&pk_packed_slice), pk_packed_slice.size()); const uint32_t val_crc32 = my_core::crc32(0, xdb_mysql_str_to_uchar_str(&new_storage_record), new_storage_record.length()); uchar key_crc_buf[XDB_CHECKSUM_SIZE]; uchar val_crc_buf[XDB_CHECKSUM_SIZE]; xdb_netbuf_store_uint32(key_crc_buf, key_crc32); xdb_netbuf_store_uint32(val_crc_buf, val_crc32); new_storage_record.append((const char *)&XDB_CHECKSUM_DATA_TAG, 1); new_storage_record.append((const char *)key_crc_buf, XDB_CHECKSUM_SIZE); new_storage_record.append((const char *)val_crc_buf, XDB_CHECKSUM_SIZE); } *packed_rec = xengine::common::Slice(new_storage_record.ptr(), new_storage_record.length()); return ret; } /** Prepare new added indexes for online inplace ddl. @param altered_table, new table @param ha_alter_info, DDL operation @param new_tdef, new xengine table definition @param[in] old_key_descr old table key descr @param[in] old_n_keys number of old table keys @param[in] new_key_descr new table key descr @param[in] new_n_keys number of new table keys @return Success or Failure */ int ha_xengine::prepare_inplace_alter_table_norebuild( TABLE *const altered_table, my_core::Alter_inplace_info *const ha_alter_info, Xdb_tbl_def *const new_tdef, std::shared_ptr *const old_key_descr, uint old_n_keys, std::shared_ptr *const new_key_descr, uint new_n_keys) { DBUG_ENTER_FUNC(); uint i; uint j; const KEY *key; std::unordered_set> added_indexes; std::unordered_set dropped_index_ids; /* Determine which(if any) key definition(s) need to be dropped */ for (i = 0; i < ha_alter_info->index_drop_count; i++) { key = ha_alter_info->index_drop_buffer[i]; for (j = 0; j < old_n_keys; j++) { if (!old_key_descr[j]->m_name.compare(key->name)) { dropped_index_ids.insert(old_key_descr[j]->get_gl_index_id()); break; } } } /* Determine which(if any) key definitions(s) need to be added */ int drop_indexes_found = 0; for (i = 0; i < ha_alter_info->index_add_count; i++) { key = &ha_alter_info->key_info_buffer[ha_alter_info->index_add_buffer[i]]; for (j = 0; j < new_n_keys; j++) { if (!new_key_descr[j]->m_name.compare(key->name)) { /* Check for cases where an 'identical' index is being dropped and re-added in a single ALTER statement. Turn this into a no-op as the index has not changed. E.G. Unique index -> non-unique index requires no change Note that cases where the index name remains the same but the key-parts are changed is already handled in create_inplace_key_defs. In these cases the index needs to be rebuilt. */ if (dropped_index_ids.count(new_key_descr[j]->get_gl_index_id())) { dropped_index_ids.erase(new_key_descr[j]->get_gl_index_id()); drop_indexes_found++; } else { added_indexes.insert(new_key_descr[j]); } break; } } } //const std::unique_ptr wb = dict_manager.begin(); //xengine::db::WriteBatch *const batch = wb.get(); // disable add-index major compaction std::vector column_family_handles; for (const auto &k : added_indexes) { xengine::db::ColumnFamilyHandle *cf = k->get_cf(); column_family_handles.push_back(cf); } xengine::util::TransactionDBImpl *txn_db_impl; txn_db_impl = dynamic_cast(xdb); xengine::common::Status s; // disable the data compaction from L1 to L2 s = txn_db_impl->GetDBImpl()->switch_major_compaction(column_family_handles, false); if (!s.ok()) { __XHANDLER_LOG(ERROR, "XEngineDDL: switch_major_compaction fail: %s, table_name: %s", s.ToString().c_str(), table->s->table_name.str); DBUG_RETURN(HA_EXIT_FAILURE); } // this option is only affect flush data from memtable to L0, this option // disable the garbage old MVCC version for the unique constraint check. s = txn_db_impl->GetDBImpl()->disable_backgroud_merge(column_family_handles); if (!s.ok()) { __XHANDLER_LOG(ERROR, "XEngineDDL: disable_background_merge fail:%s, table_name: %s", s.ToString().c_str(), table->s->table_name.str); DBUG_RETURN(HA_EXIT_FAILURE); } // add new indexes as part of m_tbl_def, then we can update new indexes during // online ddl phase for (const auto &k : added_indexes) { m_tbl_def->create_added_key(k, altered_table); } /* Update the data dictionary */ std::unordered_set create_index_ids; for (const auto &index : added_indexes) { create_index_ids.insert(index->get_gl_index_id()); } //dict_manager.add_create_index(create_index_ids, batch); //dict_manager.commit(batch); const uint n_dropped_keys = ha_alter_info->index_drop_count - drop_indexes_found; const uint n_added_keys = ha_alter_info->index_add_count - drop_indexes_found; DBUG_ASSERT(dropped_index_ids.size() == n_dropped_keys); DBUG_ASSERT(added_indexes.size() == n_added_keys); // as new writes are allowed to come immediately after this function returns, // we should prepare more here than the offline (no-concurrent-write) method if (ha_alter_info->handler_flags & (my_core::Alter_inplace_info::ADD_INDEX | my_core::Alter_inplace_info::ADD_UNIQUE_INDEX)) { /* Buffers need to be set up again to account for new, possibly longer secondary keys. */ free_key_buffers(); /* If adding unique index, allocate special buffers for duplicate checking. */ int err; if ((err = alloc_key_buffers( table, m_tbl_def.get(), ha_alter_info->handler_flags & my_core::Alter_inplace_info::ADD_UNIQUE_INDEX))) { __XHANDLER_LOG(ERROR, "XEngineDDL: failed allocating key buffers during alter, table_name: %s", table->s->table_name.str); DBUG_RETURN(err); } } ha_alter_info->handler_ctx = new Xdb_inplace_alter_ctx( new_tdef, old_key_descr, new_key_descr, old_n_keys, new_n_keys, added_indexes, dropped_index_ids, n_added_keys, n_dropped_keys); /* Add uncommitted key definitions to ddl_manager. We need to do this so that the property collector can find this keydef when it needs to update stats. The property collector looks for the keydef in the data dictionary, but it won't be there yet since this key definition is still in the creation process. */ ddl_manager.add_uncommitted_keydefs(added_indexes); DBUG_RETURN(HA_EXIT_SUCCESS); } /** Alter the table structure in-place with operations specified using HA_ALTER_FLAGS and Alter_inplace_info. The level of concurrency allowed during this operation depends on the return value from check_if_supported_inplace_alter(). @note Storage engines are responsible for reporting any errors by calling my_error()/print_error() @note If this function reports error, commit_inplace_alter_table() will be called with commit= false. @param altered_table TABLE object for new version of table. @param ha_alter_info Structure describing changes to be done by ALTER TABLE and holding data used during in-place alter. @param old_table_def dd::Table object describing old version of the table. @param new_table_def dd::Table object for the new version of the table. Can be adjusted by this call if SE supports atomic DDL. These changes to the table definition will be persisted in the data-dictionary at statement commit time. @retval true Error @retval false Success */ bool ha_xengine::inplace_alter_table( TABLE *const altered_table, my_core::Alter_inplace_info *const ha_alter_info, const dd::Table *old_table_def, dd::Table *new_table_def) { DBUG_ENTER_FUNC(); DBUG_ASSERT(altered_table != nullptr); DBUG_ASSERT(ha_alter_info != nullptr); if (is_instant(ha_alter_info)) { DBUG_RETURN(HA_EXIT_SUCCESS); } int res = HA_EXIT_SUCCESS; DBUG_ASSERT(ha_alter_info->handler_ctx != nullptr); auto ctx = dynamic_cast(ha_alter_info->handler_ctx); if (!ctx->m_rebuild && !ctx->m_added_indexes.empty()) { /* Populate all new secondary keys by scanning the primary key. */ // no rebuild, primary key is not changed std::shared_ptr primary_key; res = inplace_populate_indexes(altered_table, ctx->m_added_indexes, primary_key, false); } else if (ctx->m_rebuild) { /* Populate new primary key-value and new secondary keys for new table. */ res = inplace_populate_new_table(altered_table, ctx->m_new_tdef.get()); } if (res == HA_ERR_FOUND_DUPP_KEY) { __XHANDLER_LOG(WARN, "XEngineDDL: duplicate error happened during ddl " "ddl_rebuild_type: %d, table_name: %s", (int)ctx->m_rebuild, table->s->table_name.str); } else if (res == HA_ERR_INVALID_NULL_ERROR) { __XHANDLER_LOG(WARN, "XEngineDDL: convert record error happened during ddl " "ddl_rebuild_type: %d, table_name: %s", (int)ctx->m_rebuild, table->s->table_name.str); } else if (res) { __XHANDLER_LOG(ERROR, "XEngineDDL: failed populating secondary key during " "alter, errcode=%d, ddl_rebuild_type: %d, table_name: %s", res, (int)ctx->m_rebuild, table->s->table_name.str); } else { __XHANDLER_LOG(INFO, "XEngineDDL: online inplace ddl succcess, rebuild_type: %d, " "table_name: %s", (int)ctx->m_rebuild, table->s->table_name.str); } DBUG_EXECUTE_IF("myx_simulate_index_create_rollback", { dbug_create_err_inplace_alter(); DBUG_RETURN(true); };); DBUG_RETURN(res != HA_EXIT_SUCCESS); } /** Build indexes for new table @param[in] new_table_arg, new altered table @param[in] new_tbl_def, new xengine table definition @return SUCCESS/FAILURE */ int ha_xengine::inplace_populate_new_table(TABLE *const new_table_arg, Xdb_tbl_def *const new_tbl_def) { DBUG_ENTER_FUNC(); #ifndef NDEBUG __XHANDLER_LOG(INFO, "XEngineDDL: populate new primary key and secondary keys" " for new table: %s", new_table_arg->s->table_name.str); #endif std::unordered_set> new_indexes; std::shared_ptr primary_key; for (uint i = 0; i < new_tbl_def->m_key_count; i++) { if (new_tbl_def->m_key_descr_arr[i]->is_primary_key()) { DBUG_ASSERT(nullptr == primary_key); primary_key = new_tbl_def->m_key_descr_arr[i]; } else { #ifndef NDEBUG auto ret = new_indexes.insert(new_tbl_def->m_key_descr_arr[i]); DBUG_ASSERT(ret.second); #else (void)new_indexes.insert(new_tbl_def->m_key_descr_arr[i]); #endif } } int ret = HA_EXIT_SUCCESS; ret = inplace_populate_indexes(new_table_arg, new_indexes, primary_key, true); // error is handled in inplace_populate_indexes DBUG_RETURN(ret); } /** Avoid the concurrency of creating index and shrinking extent space. use xengine_inplace_populate_indexes to indicate how many creating tasks is running. @param[in] new_table_arg, new mysql table object @param[in] indexes, new added index from inplace ddl @param[in] is_rebuild, indicate rebuild table or not @return SUCCESS/FAILURE */ int ha_xengine::inplace_populate_indexes( TABLE *const new_table_arg, const std::unordered_set> &indexes, const std::shared_ptr& primary_key, bool is_rebuild) { DBUG_ENTER_FUNC(); int ret = HA_EXIT_SUCCESS; DBUG_EXECUTE_IF("sleep_before_create_second_index", sleep(2);); xdb_drop_idx_thread.enter_race_condition(); if (xengine_shrink_table_space >= 0) { __XHANDLER_LOG(ERROR, "XEngineDDL: Shrink extent space is running. table_name: %s", table->s->table_name.str); ret = HA_ERR_INTERNAL_ERROR; xdb_drop_idx_thread.exit_race_condition(); DBUG_RETURN(ret); } xengine_inplace_populate_indexes++; #ifndef NDEBUG __XHANDLER_LOG(INFO, "XEngineDDL: Begin inplace populate indexes " "xengine_inplace_populate_indexes = %d table_name:%s", xengine_inplace_populate_indexes, table->s->table_name.str); #endif xdb_drop_idx_thread.exit_race_condition(); DBUG_EXECUTE_IF("sleep_in_create_second_index", sleep(5);); Xdb_transaction *tx = get_or_create_tx(table->in_use); /* There is one specific scenario where m_sst_info may not be nullptr. This happens if the handler we're using happens to be the handler where the PK bulk load was done on. The sequence of events that lead to this is as follows (T1 is PK bulk load, T2 is SK alter table): T1: Execute last INSERT statement T1: Return TABLE and handler object back to Table_cache_manager T1: Close connection T2: Execute ALTER statement T2: Take same TABLE/handler from Table_cache_manager T2: Call closefrm which will call finalize_bulk_load on every other open table/handler *except* the one it's on. T2: Acquire stale snapshot of PK T1: Call finalize_bulk_load This is rare because usually, closefrm will call the destructor (and thus finalize_bulk_load) on the handler where PK bulk load is done. However, if the thread ids of the bulk load thread and the alter thread differ by a multiple of table_cache_instances (8 by default), then they hash to the same bucket in Table_cache_manager and the alter thread will not not call the destructor on the handler it is holding. Thus, its m_sst_info will not be nullptr. At this point, it is safe to refresh the snapshot because we know all other open handlers have been closed at this point, and the one we're on is the only one left. */ if (m_sst_info != nullptr) { if ((ret = finalize_bulk_load())) { __XHANDLER_LOG(ERROR, "XEngineDDL: finalize_bulk_load failed, errcode=%d, table_name: %s", ret, table->s->table_name.str); DBUG_RETURN(ret); } tx->commit(); } if (nullptr != primary_key) { #ifndef NDEBUG __XHANDLER_LOG(INFO, "XEngineDDL: populate primary key: %u for new table: %s, firstly", primary_key->get_index_number(), new_table_arg->s->table_name.str); #endif DBUG_ASSERT(primary_key->is_primary_key()); // build the primary key first, to avoid failure xdb_merge.add() on sk ret = inplace_populate_index(new_table_arg, primary_key, is_rebuild); } if (HA_EXIT_SUCCESS == ret) { DEBUG_SYNC(ha_thd(), "xengine.inplace_populate_primary_key_done"); // build all secondary keys for (const auto &index : indexes) { DBUG_ASSERT(!index->is_primary_key()); if ((ret = inplace_populate_index(new_table_arg, index, is_rebuild))) { break; } } } DEBUG_SYNC(ha_thd(), "xengine.inplace_populate_indexes_done"); DBUG_EXECUTE_IF("sleep_after_index_base_creation", sleep(5);); if (ret && ha_thd()->mdl_context.upgrade_shared_lock(table->mdl_ticket, MDL_EXCLUSIVE, ha_thd()->variables.lock_wait_timeout)) { XHANDLER_LOG(ERROR, "update mdl_exclusive lock fail"); ret = HA_ERR_INTERNAL_ERROR; } /* * Explicitly tell jemalloc to clean up any unused dirty pages at this point. * See https://reviews.facebook.net/D63723 for more details. */ purge_all_jemalloc_arenas(); DBUG_EXECUTE_IF("crash_during_online_index_creation", DBUG_SUICIDE();); DBUG_EXECUTE_IF("after_inplace_populate_indexes finish", sleep(5);); // clear the bulk load context whenever build index succeed or failed. // since the manifest txn don't support span thread, we need ensure the // bulk load used in single thread, so clear the bulk_load context as // early as we can. int ret_bulk_load = HA_EXIT_SUCCESS; if (m_sst_info != nullptr) { if ((ret_bulk_load = finalize_bulk_load())) { __XHANDLER_LOG(ERROR, "XEngineDDL: finalize_bulk_load failed, errcode: %d, table_name: %s", ret_bulk_load, table->s->table_name.str); } tx->commit(); } ret = (ret != HA_EXIT_SUCCESS) ? ret : ret_bulk_load; xdb_drop_idx_thread.enter_race_condition(); xengine_inplace_populate_indexes--; if (ret == HA_ERR_FOUND_DUPP_KEY) { __XHANDLER_LOG(WARN, "XEngineDDL: duplicate key happened during ddl, table_name: %s", table->s->table_name.str); } else if (ret == HA_ERR_INVALID_NULL_ERROR) { __XHANDLER_LOG(WARN, "XEngineDDL: convert new record error during ddl, table_name: %s", table->s->table_name.str); } else if (ret) { __XHANDLER_LOG(ERROR, "XEngineDDL: failed to finish inplace populate indexes" " xengine_inplace_populate_indexes = %d, errcode: %d, table_name: %s", xengine_inplace_populate_indexes, ret, table->s->table_name.str); } else { __XHANDLER_LOG(INFO, "XEngineDDL: inplace populate indexes successfully " " xengine_inplace_populate_indexes = %d, table_name: %s", xengine_inplace_populate_indexes, table->s->table_name.str); } xdb_drop_idx_thread.exit_race_condition(); DBUG_RETURN(ret); } // update Added_key_info.status when duplicate entry is found during inplace DDL int ha_xengine::inplace_update_added_key_info_status_dup_key( TABLE *const new_table_arg, const std::shared_ptr& index, Added_key_info& added_key_info, const common::Slice& dup_key, const common::Slice& dup_val, bool is_rebuild, bool use_key/* = true*/) { int res = HA_EXIT_SUCCESS; KEY* key_info = &new_table_arg->key_info[index->get_keyno()]; uint old_status = HA_EXIT_SUCCESS; bool rc = added_key_info.status.compare_exchange_strong( old_status, HA_ERR_FOUND_DUPP_KEY); if (rc) { if (use_key) { // using dup_key and dup_val if ((res = fill_new_duplicate_record(index.get(), new_table_arg, dup_key, dup_val, is_rebuild))) { __XHANDLER_LOG(ERROR, "XEngineDDL: Fill new duplicate record error for index %d, error %d, table_name: %s", index->get_index_number(), res, table->s->table_name.str); } else { #ifndef NDEBUG __XHANDLER_LOG(INFO, "XEngineDDL: Fill new duplicate record for index %d successfully, table_name: %s", index->get_index_number(), table->s->table_name.str); #endif } } else { // using table->record[0] if ((res = set_duplicate_key_for_print(new_table_arg, table, index.get(), m_tbl_def->m_dict_info, is_rebuild))) { __XHANDLER_LOG(ERROR, "XEngineDDL: Generate duplicate key for index %d, error %d, table_name:%s", index->get_index_number(), res, table->s->table_name.str); } else { #ifndef NDEBUG __XHANDLER_LOG(INFO, "XEngineDDL: Generate duplicate key for index %d successfully, table_name:%s", index->get_index_number(), table->s->table_name.str); #endif } } added_key_info.dup_key_saved.store(true); print_keydup_error(new_table_arg, key_info, MYF(0)); res = HA_ERR_FOUND_DUPP_KEY; } else if (HA_ERR_FOUND_DUPP_KEY == added_key_info.status.load()) { __XHANDLER_LOG(INFO, "XEngineDDL: other one set duplicate entry for index %d, table_name: %s", index->get_index_number(), table->s->table_name.str); // the duplicate key is set by other, we wait for the duplicate key to be saved while (!added_key_info.dup_key_saved.load()) {} print_keydup_error(new_table_arg, key_info, MYF(0)); res = HA_ERR_FOUND_DUPP_KEY; } else { res = (int)added_key_info.status.load(); __XHANDLER_LOG(INFO, "XEngineDDL: error %u for index %d is set by other, table_name: %s", res, index->get_index_number(), table->s->table_name.str); } return res; } void ha_xengine::print_dup_err(int res, std::atomic& dup_ctx_id, const std::shared_ptr& index, bool is_rebuild, TABLE *const new_table_arg, std::vector& dup_key, std::vector& dup_val, Added_key_info& added_key_info, KEY* key_info) { if (dup_ctx_id >= 0) { // ddl dup res = inplace_update_added_key_info_status_dup_key(new_table_arg, index, added_key_info, dup_key[dup_ctx_id], dup_val[dup_ctx_id], is_rebuild); } else if (res == HA_ERR_FOUND_DUPP_KEY) { // dml dup print_keydup_error(new_table_arg, key_info, MYF(0)); } } void ha_xengine::print_common_err(int res) { if (res == HA_ERR_INVALID_NULL_ERROR) { my_error(ER_INVALID_USE_OF_NULL, MYF(0)); } else if (res == HA_ERR_INTERNAL_ERROR) { my_error(ER_UNKNOWN_ERROR, MYF(0)); } else if (res == HA_ERR_XENGINE_OUT_OF_SORTMEMORY) { my_printf_error(ER_OUT_OF_SORTMEMORY, "xengine_sort_buffer_size is too small to process merge. " "Please set xengine_sort_buffer_size to a higher value.",MYF(0)); } } int ha_xengine::build_index(xengine::common::Slice& iter_key, xengine::common::Slice& iter_val, ParallelDDLScanCtx& ddl_ctx, xengine::common::Slice& key, xengine::common::Slice& val, TABLE* tbl, bool hidden_pk_exists, const std::shared_ptr& index, bool is_rebuild, TABLE *const new_table_arg) { int res = HA_EXIT_SUCCESS; // pk ddl_ctx.primary_key.copy(iter_key.data(), iter_key.size(), &my_charset_bin); // record if ((res = convert_record_from_storage_format( &iter_key, &iter_val, tbl->record[0], tbl))) { __XHANDLER_LOG(ERROR, "XEngineDDL: Error getting record, table_name: %s", tbl->s->table_name.str); } longlong hidden_pk_id = 0; if (hidden_pk_exists && read_hidden_pk_id_from_rowkey( &hidden_pk_id, &ddl_ctx.primary_key)) { __XHANDLER_LOG(ERROR, "XEngineDDL: Error retrieving hidden pk id, table_name: %s", tbl->s->table_name.str); return HA_ERR_INTERNAL_ERROR; } if (!is_rebuild) { /* Create new secondary index entry */ const uint new_packed_size = index->pack_record( tbl, ddl_ctx.pack_buffer, tbl->record[0], ddl_ctx.sk_packed_tuple, &ddl_ctx.sk_tails, should_store_row_debug_checksums(), hidden_pk_id, 0, nullptr, new_table_arg); DBUG_ASSERT(new_packed_size <= m_max_packed_sk_len); key = xengine::common::Slice( reinterpret_cast(ddl_ctx.sk_packed_tuple), new_packed_size); val = xengine::common::Slice( reinterpret_cast(ddl_ctx.sk_tails.ptr()), ddl_ctx.sk_tails.get_current_pos()); } else if (index->is_primary_key()) { /*** rebuild primary key **/ if (index->is_hidden_primary_key()) { DBUG_ASSERT(hidden_pk_exists); } uint new_packed_size = 0; if ((res = index->pack_new_record( tbl, ddl_ctx.pack_buffer, tbl->record[0], ddl_ctx.pk_packed_tuple, &ddl_ctx.pk_unpack_info, false, hidden_pk_id, 0, nullptr, new_table_arg, m_tbl_def->m_dict_info, new_packed_size))) { if (res != HA_ERR_INVALID_NULL_ERROR) { __XHANDLER_LOG(ERROR, "XEngineDDL: pack new record error, table_name: %s", tbl->s->table_name.str); } return res; } DBUG_ASSERT(new_packed_size <= m_max_packed_sk_len); key = xengine::common::Slice( reinterpret_cast(ddl_ctx.pk_packed_tuple), new_packed_size); if ((res = convert_new_record_from_old_record( tbl, new_table_arg, m_tbl_def->m_dict_info, key, &ddl_ctx.pk_unpack_info, &val, ddl_ctx.new_storage_record))) { __XHANDLER_LOG(WARN, "XEngineDDL: convert new record error, table_name: %s, code is %d", tbl->s->table_name.str, res); return res; } } else { /*** rebuild secondary key **/ DBUG_ASSERT(index->is_secondary_key()); uint new_packed_size = 0; if (index->pack_new_record( tbl, ddl_ctx.pack_buffer, tbl->record[0], ddl_ctx.sk_packed_tuple, &ddl_ctx.sk_tails, should_store_row_debug_checksums(), hidden_pk_id, 0, nullptr, new_table_arg, m_tbl_def->m_dict_info, new_packed_size)) { __XHANDLER_LOG(ERROR, "XEngineDDL: pack new record error, table_name: %s", tbl->s->table_name.str); return HA_EXIT_FAILURE; } DBUG_ASSERT(new_packed_size <= m_max_packed_sk_len); key = xengine::common::Slice( reinterpret_cast(ddl_ctx.sk_packed_tuple), new_packed_size); val = xengine::common::Slice( reinterpret_cast(ddl_ctx.sk_tails.ptr()), ddl_ctx.sk_tails.get_current_pos()); } return res; } int ha_xengine::build_base_global_merge( std::vector>& ddl_ctx_set, const std::shared_ptr& index, bool is_rebuild, TABLE *const new_table_arg, size_t max_threads, bool need_unique_check, Added_key_info& added_key_info, size_t part_id, xengine::common::Slice& dup_key, xengine::common::Slice& dup_val, std::atomic& dup_ctx_id) { int res = HA_EXIT_SUCCESS; xengine::db::MiniTables tables; xengine::storage::ChangeInfo change_info; xengine::db::ColumnFamilyHandle *const cf = index->get_cf(); DBUG_ASSERT(cf != nullptr); tables.change_info_ = &change_info; tables.level = 2; Xdb_sst_info sst_info( xdb, m_table_handler->m_table_name, index->get_name(), cf, xengine_db_options, false/*THDVAR(ha_thd(), trace_sst_api)*/, &tables); const xengine::util::Comparator *index_comp = index->get_cf()->GetComparator(); KEY* key_info = &new_table_arg->key_info[index->get_keyno()]; std::vector merge_keys(max_threads); std::vector merge_vals(max_threads); auto cmp = [&merge_keys, &index_comp](const int& lhs, const int& rhs) { return index_comp->Compare(merge_keys[rhs], merge_keys[lhs]) < 0;}; std::priority_queue, decltype(cmp)> pq(cmp); for (int i = 0; i < static_cast(max_threads); ++i) { if ((res = ddl_ctx_set[i]->bg_merge.next(&merge_keys[i], &merge_vals[i], part_id)) == 0) pq.push(i); else if (res != HA_ERR_END_OF_FILE) return res; } /* must call my_free to release memory before exiting */ uchar* dup_sk_packed_tuple = nullptr; uchar* dup_sk_packed_tuple_old = nullptr; if (need_unique_check && (!(dup_sk_packed_tuple = reinterpret_cast(my_malloc( PSI_NOT_INSTRUMENTED, m_max_packed_sk_len, MYF(0)))) || !(dup_sk_packed_tuple_old = reinterpret_cast(my_malloc( PSI_NOT_INSTRUMENTED, m_max_packed_sk_len, MYF(0)))))) { my_free(dup_sk_packed_tuple); my_free(dup_sk_packed_tuple_old); __XHANDLER_LOG(ERROR, "XEngineDDL: allocate memory for sk duplication check fail"); return HA_ERR_INTERNAL_ERROR; } unique_key_buf_info key_buf(dup_sk_packed_tuple, dup_sk_packed_tuple_old); uint64_t merged_count = 0; while (pq.size() > 0) { int idx = pq.top(); xengine::common::Slice& merge_key = merge_keys[idx]; xengine::common::Slice& merge_val = merge_vals[idx]; /* Perform uniqueness check if needed */ // xdb_merge.add can only detect duplication in singe (merge) segment if (need_unique_check) { if (check_duplicate_in_base(new_table_arg, *index, merge_key, &key_buf)) { __XHANDLER_LOG(WARN, "XEngineDDL: duplicate entry found during xdb_merge, table_name:%s", table->s->table_name.str); dup_key = merge_key; dup_val = merge_val; dup_ctx_id = part_id; res = HA_ERR_FOUND_DUPP_KEY; break; } } // Check where duplicateentry is found during DML if (merged_count && (0 == merged_count % BUILD_BASE_CHECK_ERROR_FREQUENCY) && (res = inplace_check_dml_error(new_table_arg, key_info, added_key_info, false))) { __XHANDLER_LOG(WARN, "XEngineDDL: error status found during DML, code is %d, table_name:%s", res, table->s->table_name.str); break; } /* Insert key and slice to SST via SSTFileWriter API. Use mirror index (cfh) to save the index of (t0, t1). */ if ((res = sst_info.put(merge_key, merge_val))) { __XHANDLER_LOG(ERROR, "XEngineDDL: Error while bulk loading keys in " "external merge sort, table_name:%s", table->s->table_name.str); break; } ++merged_count; pq.pop(); if ((res = ddl_ctx_set[idx]->bg_merge.next( &merge_keys[idx], &merge_vals[idx], part_id)) == 0) pq.push(idx); else if (res != HA_ERR_END_OF_FILE) // error occur in next2 break; if (table->in_use->killed) { res = HA_ERR_QUERY_INTERRUPTED; break; } } __XHANDLER_LOG(INFO, "PART: %d, CNT: %d", part_id, merged_count); int bulk_load_ret = sst_info.commit(); if (res == HA_ERR_END_OF_FILE && (res = bulk_load_ret)) { // NO_LINT_DEBUG __XHANDLER_LOG(ERROR, "XEngineDDL: Error finishing bulk load, table_name:%s", table->s->table_name.str); } my_free(dup_sk_packed_tuple); my_free(dup_sk_packed_tuple_old); return res; } int ha_xengine::inplace_build_base_phase_parallel( TABLE *const new_table_arg, const std::shared_ptr& index, bool need_unique_check, Added_key_info& added_key_info, bool is_rebuild) { __XHANDLER_LOG(INFO, "XEngineDDL: inplace build base parallel start"); DBUG_ENTER_FUNC(); const bool hidden_pk_exists = has_hidden_pk(table); int res = HA_EXIT_SUCCESS; Xdb_transaction *tx = get_or_create_tx(table->in_use); if (ha_thd()->mdl_context.upgrade_shared_lock( table->mdl_ticket, MDL_EXCLUSIVE, ha_thd()->variables.lock_wait_timeout)) { XHANDLER_LOG(ERROR, "XEngineDDL: update mdl_exclusive lock fail"); DBUG_RETURN(HA_ERR_INTERNAL_ERROR); } // ensure we get a newest snapshot for the table. if (tx->has_snapshot()) { tx->release_snapshot(); } /* * For rebuilding of secondary key, rebuild population of primary * key has been finished at this stage, but duplication may happen * on primary key during DML before we get snapshot. */ if (is_rebuild && index->is_secondary_key()) { auto iter = std::find_if( m_tbl_def->m_inplace_new_keys.cbegin(), m_tbl_def->m_inplace_new_keys.cend(), [&](const std::pair& key) { return key.first->is_primary_key(); }); DBUG_ASSERT(iter != m_tbl_def->m_inplace_new_keys.cend()); KEY *primary_key = &new_table_arg->key_info[iter->first->get_keyno()]; if ((res = inplace_check_dml_error(new_table_arg, primary_key, iter->second))) { XHANDLER_LOG(WARN, "XEngineDDL: error happened on primary key " "while building base for subtable", "subtable_id", index->get_index_number(), "code", res); DBUG_RETURN(res); } } const uint pk = pk_index(table, m_tbl_def.get()); ha_index_init(pk, true); KEY* key_info = &new_table_arg->key_info[index->get_keyno()]; inplace_update_added_key_info_step(added_key_info, Added_key_info::BUILDING_BASE_INDEX); table->mdl_ticket->downgrade_lock(MDL_SHARED_UPGRADABLE); if (index->is_secondary_key()) { DEBUG_SYNC(ha_thd(), "xengine.inplace_create_sk_scan_base_begin"); } else { DEBUG_SYNC(ha_thd(), "xengine.inplace_copy_ddl_scan_base_begin"); } /* Initialize variables for parallelddl */ size_t max_threads = get_parallel_read_threads(); std::vector> ddl_ctx_set(max_threads); const xengine::util::Comparator *index_comp = index->get_cf()->GetComparator(); ulonglong xengine_merge_combine_read_size = (xengine_sort_buffer_size * XENGINE_MERGE_COMBINE_READ_SIZE_RATIO); const size_t xengine_merge_sample_mem_limit = xengine_sort_buffer_size; for (int i = 0; i < static_cast(ddl_ctx_set.size()); i++) { ddl_ctx_set[i].reset(new ParallelDDLScanCtx( this, std::make_shared( thd_xengine_tmpdir(), xengine_sort_buffer_size, xengine_merge_combine_read_size / max_threads, index_comp, max_threads, xengine_merge_sample_mem_limit / max_threads), max_threads)); if (ddl_ctx_set[i]->init()) DBUG_RETURN(HA_ADMIN_CORRUPT); } std::atomic dup_ctx_id{-1}; std::vector dup_key(max_threads); std::vector dup_val(max_threads); /* Step1: Scan all records and build new index */ auto f_scan = [this, &ddl_ctx_set, is_rebuild, new_table_arg, &dup_ctx_id, hidden_pk_exists, &index, &dup_key, &dup_val]( const xengine::common::ParallelReader::ExecuteCtx *ctx, db::Iterator *db_iter) { int res = HA_EXIT_SUCCESS; ParallelDDLScanCtx* ddl_ctx = ddl_ctx_set[ctx->thread_id_].get(); TABLE& tbl = ddl_ctx->thd_table; xengine::common::Slice iter_key = db_iter->key(); xengine::common::Slice iter_val = db_iter->value(); xengine::common::Slice key; xengine::common::Slice val; if ((res = this->build_index(iter_key, iter_val, *ddl_ctx, key, val, &tbl, hidden_pk_exists, index, is_rebuild, new_table_arg))) { // __XHANDLER_LOG(ERROR, "XEngineDDL: build index error, table_name: %s", tbl.s->table_name.str); return res; } /* Add record to offset tree in preparation for writing out to disk in sorted chunks. */ bool inserted = false; res = ddl_ctx->xdb_merge->add(key, val, inserted); if (res) { XHANDLER_LOG(ERROR, "Failed add record to sort merge buffer", "value_size", val.size()); return res; } else if (!inserted) { dup_ctx_id = ctx->thread_id_; dup_key[ctx->thread_id_] = ddl_ctx->xdb_merge->get_dup_key(); dup_val[ctx->thread_id_] = ddl_ctx->xdb_merge->get_dup_val(); return HA_ERR_FOUND_DUPP_KEY; } return 0; }; if ((res = scan_parallel(m_key_descr_arr[pk].get(), tx, std::move(f_scan)))) { print_dup_err(res, dup_ctx_id, index, is_rebuild, new_table_arg, dup_key, dup_val, added_key_info, key_info); ha_index_end(); DBUG_RETURN(res); } ha_index_end(); __XHANDLER_LOG(INFO, "Scan Finished"); /* Step2: Sort and choose (partition_num-1) sample sort */ std::vector sample_tmp, sample; for (auto& ctx : ddl_ctx_set) { ctx->xdb_merge->get_sample(sample_tmp); } std::sort(sample_tmp.begin(), sample_tmp.end(), [index_comp] (const xengine::common::Slice& lhs, const xengine::common::Slice& rhs) { return index_comp->Compare(rhs, lhs) > 0;}); size_t split_interval = std::ceil(static_cast(sample_tmp.size()) / max_threads); for (size_t i = split_interval; i < sample_tmp.size(); i += split_interval) { sample.push_back(sample_tmp[i]); } __XHANDLER_LOG(INFO, "Deal Sample Finished"); /* Step3: When one thread's scan finished, start background local merge */ auto f_local_merge = [this, &ddl_ctx_set, &sample](size_t thread_id) { if (thread_id >= ddl_ctx_set.size()) { DBUG_ASSERT(0); } return ddl_ctx_set[thread_id]->bg_merge.merge(sample, table->in_use); }; ParallelDDLMergeCtx local_merge_ctx(ddl_ctx_set, f_local_merge); bool inject_err = false; DBUG_EXECUTE_IF("crash_during_local_merge", inject_err = true;); DBUG_EXECUTE_IF("parallel_ddl_local_merge_kill", table->in_use->killed = THD::KILL_QUERY;); local_merge_ctx.start(max_threads, inject_err); if ((res = local_merge_ctx.finish())) { DBUG_EXECUTE_IF("crash_during_local_merge", { if (ddl_ctx_set[0]->bg_merge.get_interrupt()) { DEBUG_SYNC(ha_thd(), "xengine.local_merge_interrupted"); }; }); DBUG_RETURN(res); } __XHANDLER_LOG(INFO, "Local Merge Finished"); /* Step4: Start global merge, each partition perform an n-way merge of * n sorted buffers on disk, then writes all results to XENGINE via * SSTFileWriter API. */ DEBUG_SYNC(ha_thd(), "xengine.build_base_global_merge_start"); bool test_stack_err = false; DBUG_EXECUTE_IF("xengine.test_pddl_stack_err", test_stack_err = true;); dup_ctx_id = -1; auto f_global_merge = [this, &ddl_ctx_set, &index, is_rebuild, new_table_arg, max_threads, need_unique_check, &added_key_info, &dup_key, &dup_val, &dup_ctx_id, test_stack_err] (size_t part_id) { if (test_stack_err) { // wait for function return sleep(10); } return this->build_base_global_merge(ddl_ctx_set, index, is_rebuild, new_table_arg, max_threads, need_unique_check, added_key_info, part_id, dup_key[part_id], dup_val[part_id], dup_ctx_id); }; ParallelDDLMergeCtx global_merge_ctx(ddl_ctx_set, f_global_merge); inject_err = false; DBUG_EXECUTE_IF("crash_during_global_merge", inject_err = true;); DBUG_EXECUTE_IF("parallel_ddl_global_merge_kill", table->in_use->killed = THD::KILL_QUERY;); global_merge_ctx.start(max_threads, inject_err); DBUG_EXECUTE_IF("xengine.test_pddl_stack_err", DBUG_RETURN(HA_ERR_INTERNAL_ERROR);); if ((res = global_merge_ctx.finish())) { DBUG_EXECUTE_IF("crash_during_global_merge", { if (ddl_ctx_set[0]->bg_merge.get_interrupt()) { DEBUG_SYNC(ha_thd(), "xengine.global_merge_interrupted"); }; }); print_dup_err(res, dup_ctx_id, index, is_rebuild, new_table_arg, dup_key, dup_val, added_key_info, key_info); DBUG_RETURN(res); } __XHANDLER_LOG(INFO, "Global Merge Finished"); __XHANDLER_LOG(INFO, "XEngineDDL: inplace build base parallel end"); DBUG_RETURN(res); } /* Build Base data for new indexes @param[in] new_table_arg @param[in] index, xengine index definition @param[in] need_unique_check, need unique check or not @param[in] added_key_info: used to call inplace_check_dml_error @param[in] is_rebuild @return */ int ha_xengine::inplace_build_base_phase(TABLE *const new_table_arg, const std::shared_ptr& index, bool need_unique_check, Added_key_info& added_key_info, bool is_rebuild) { if (!xengine_disable_parallel_ddl && get_parallel_read_threads() > 1) return inplace_build_base_phase_parallel(new_table_arg, index, need_unique_check, added_key_info, is_rebuild); DBUG_ENTER_FUNC(); const bool hidden_pk_exists = has_hidden_pk(table); int res = HA_EXIT_SUCCESS; Xdb_transaction *tx = get_or_create_tx(table->in_use); const xengine::util::Comparator *index_comp = index->get_cf()->GetComparator(); ulonglong xengine_merge_combine_read_size = (xengine_sort_buffer_size * XENGINE_MERGE_COMBINE_READ_SIZE_RATIO); Xdb_index_merge xdb_merge(thd_xengine_tmpdir(), xengine_sort_buffer_size, xengine_merge_combine_read_size, index_comp); if ((res = xdb_merge.init())) { __XHANDLER_LOG(ERROR, "XEngineDDL: xdb_merge init failed, errcode=%d, table_name: %s", res, table->s->table_name.str); DBUG_RETURN(res); } if (ha_thd()->mdl_context.upgrade_shared_lock( table->mdl_ticket, MDL_EXCLUSIVE, ha_thd()->variables.lock_wait_timeout)) { XHANDLER_LOG(ERROR, "XEngineDDL: update mdl_exclusive lock fail"); DBUG_RETURN(HA_ERR_INTERNAL_ERROR); } // ensure we get a newest snapshot for the table. if (tx->has_snapshot()) { tx->release_snapshot(); } /* For rebuilding of secondary key, rebuild population of primary * key has been finished at this stage, but duplication may happen * on primary key during DML before we get snapshot. */ if (is_rebuild && index->is_secondary_key()) { auto iter = std::find_if( m_tbl_def->m_inplace_new_keys.cbegin(), m_tbl_def->m_inplace_new_keys.cend(), [&](const std::pair& key) { return key.first->is_primary_key(); }); DBUG_ASSERT(iter != m_tbl_def->m_inplace_new_keys.cend()); KEY *primary_key = &new_table_arg->key_info[iter->first->get_keyno()]; if ((res = inplace_check_dml_error(new_table_arg, primary_key, iter->second))) { XHANDLER_LOG(WARN, "XEngineDDL: error happened on primary key " "while building base for subtable", "subtable_id", index->get_index_number(), "code", res); DBUG_RETURN(res); } } const uint pk = pk_index(table, m_tbl_def.get()); ha_index_init(pk, true); KEY* key_info = &new_table_arg->key_info[index->get_keyno()]; // get snapshot res = index_first(table->record[0]); if (res && res != HA_ERR_END_OF_FILE) { __XHANDLER_LOG(ERROR, "XEngineDDL: failed to get first record, errcode=%d, table_name: %s", res, table->s->table_name.str); ha_index_end(); table->mdl_ticket->downgrade_lock(MDL_SHARED_UPGRADABLE); DBUG_RETURN(HA_ERR_INTERNAL_ERROR); } inplace_update_added_key_info_step(added_key_info, Added_key_info::BUILDING_BASE_INDEX); table->mdl_ticket->downgrade_lock(MDL_SHARED_UPGRADABLE); if (index->is_secondary_key()) { DEBUG_SYNC(ha_thd(), "xengine.inplace_create_sk_scan_base_begin"); } else { DEBUG_SYNC(ha_thd(), "xengine.inplace_copy_ddl_scan_base_begin"); } bool processed_error = false; /* Scan each record in the primary key in order */ for (; res == 0; res = index_next(table->record[0])) { longlong hidden_pk_id = 0; if (hidden_pk_exists && read_hidden_pk_id_from_rowkey(&hidden_pk_id, &m_last_rowkey)) { __XHANDLER_LOG(ERROR, "XEngineDDL: Error retrieving hidden pk id, table_name: %s", table->s->table_name.str); ha_index_end(); DBUG_RETURN(HA_ERR_INTERNAL_ERROR); } xengine::common::Slice key; xengine::common::Slice val; if (!is_rebuild) { /* Create new secondary index entry */ const uint new_packed_size = index->pack_record( table, m_pack_buffer, table->record[0], m_sk_packed_tuple, &m_sk_tails, should_store_row_debug_checksums(), hidden_pk_id, 0, nullptr, new_table_arg); DBUG_ASSERT(new_packed_size <= m_max_packed_sk_len); key = xengine::common::Slice( reinterpret_cast(m_sk_packed_tuple), new_packed_size); val = xengine::common::Slice( reinterpret_cast(m_sk_tails.ptr()), m_sk_tails.get_current_pos()); } else if (index->is_primary_key()) { /*** rebuild primary key **/ if (index->is_hidden_primary_key()) { DBUG_ASSERT(hidden_pk_exists); } uint new_packed_size = 0; if ((res = index->pack_new_record( table, m_pack_buffer, table->record[0], m_pk_packed_tuple, &m_pk_unpack_info, false, hidden_pk_id, 0, nullptr, new_table_arg, m_tbl_def->m_dict_info, new_packed_size))) { if (res != HA_ERR_INVALID_NULL_ERROR) { __XHANDLER_LOG(ERROR, "XEngineDDL: pack new record error, table_name: %s", table->s->table_name.str); } ha_index_end(); DBUG_RETURN(res); } DBUG_ASSERT(new_packed_size <= m_max_packed_sk_len); key = xengine::common::Slice( reinterpret_cast(m_pk_packed_tuple), new_packed_size); if ((res = convert_new_record_from_old_record(table, new_table_arg, m_tbl_def->m_dict_info, key, &m_pk_unpack_info, &val, m_new_storage_record))) { __XHANDLER_LOG(WARN, "XEngineDDL: convert new record error, table_name: %s, code is %d", table->s->table_name.str, res); my_error(ER_INVALID_USE_OF_NULL, MYF(0)); processed_error = true; break; } } else { /*** rebuild secondary key **/ DBUG_ASSERT(index->is_secondary_key()); uint new_packed_size = 0; if (index->pack_new_record( table, m_pack_buffer, table->record[0], m_sk_packed_tuple, &m_sk_tails, should_store_row_debug_checksums(), hidden_pk_id, 0, nullptr, new_table_arg, m_tbl_def->m_dict_info, new_packed_size)) { __XHANDLER_LOG(ERROR, "XEngineDDL: pack new record error, table_name: %s", table->s->table_name.str); ha_index_end(); DBUG_RETURN(HA_EXIT_FAILURE); } DBUG_ASSERT(new_packed_size <= m_max_packed_sk_len); key = xengine::common::Slice( reinterpret_cast(m_sk_packed_tuple), new_packed_size); val = xengine::common::Slice( reinterpret_cast(m_sk_tails.ptr()), m_sk_tails.get_current_pos()); } /* Add record to offset tree in preparation for writing out to disk in sorted chunks. */ bool inserted = false; res = xdb_merge.add(key, val, inserted); if (res) { XHANDLER_LOG(ERROR, "Failed add record to sort merge buffer", "value_size", val.size()); processed_error = true; break; } else if (!inserted) { // insert failed indicates there is duplicate key in current merge buffer // For primary key, if xdb_merge.add with failure(m_offset_tree.emplace // returns false), it means the key here is a duplicate key. // For secondary key, currently, the key here composites user key // columns/parts with primary key, even if there is duplication among user // key columns/parts, the key here will not be same. if (need_unique_check) { // for utf8mb4_0900_ai_ci, we can't unpack record directly, so we get // value from old_record and default value. // using record[0] to call set_duplicate_key_for_print res = inplace_update_added_key_info_status_dup_key(new_table_arg, index, added_key_info, Slice(), Slice(), is_rebuild, false); } else { XHANDLER_LOG(ERROR, "XEngineDDL: Unexpected error: failed to do " "xdb_merge.add while building base don't check unique", "table_name", table->s->table_name.str); DBUG_ASSERT(false); } ha_index_end(); DBUG_RETURN(res); } DBUG_EXECUTE_IF("serial_ddl_scan_phase", table->in_use->killed = THD::KILL_QUERY;); if (my_core::thd_killed(current_thd)) { ha_index_end(); DBUG_RETURN(HA_ERR_QUERY_INTERRUPTED); } } if (res != HA_ERR_END_OF_FILE) { // NO_LINT_DEBUG if (!processed_error) { XHANDLER_LOG(ERROR, "XEngineDDL: Error retrieving index entry from primary key", "code", res, "table_name", table->s->table_name.str); } ha_index_end(); DBUG_RETURN(res); } ha_index_end(); /* Perform an n-way merge of n sorted buffers on disk, then writes all results to XENGINE via SSTFileWriter API. */ xengine::common::Slice merge_key; xengine::common::Slice merge_val; unique_key_buf_info key_buf(m_dup_sk_packed_tuple, m_dup_sk_packed_tuple_old); uint64_t merged_count = 0; while ((res = xdb_merge.next(&merge_key, &merge_val)) == 0) { /* Perform uniqueness check if needed */ // xdb_merge.add can only detect duplication in singe (merge) segment if (need_unique_check) { if (check_duplicate_in_base(new_table_arg, *index, merge_key, &key_buf)) { /* Duplicate entry found when trying to create unique secondary key. We need to unpack the record into new_table_arg->record[0] as it is used inside print_keydup_error so that the error message shows the duplicate record. for utf8mb4_0900_ai_ci, we can't unpack record directly, so we get value from old_record and default value. For Unique-PK, we need use pk to search old_table and fill data, and then fill new_data record. using merge_key/merge_val to call fill_new_duplicate_record */ __XHANDLER_LOG(WARN, "XEngineDDL: duplicate entry found during xdb_merge, table_name:%s", table->s->table_name.str); res = inplace_update_added_key_info_status_dup_key(new_table_arg, index, added_key_info, merge_key, merge_val, is_rebuild); break; } } // Check where duplicate entry is found during DML if (merged_count && (0 == merged_count % BUILD_BASE_CHECK_ERROR_FREQUENCY) && (res = inplace_check_dml_error(new_table_arg, key_info, added_key_info))) { __XHANDLER_LOG(WARN, "XEngineDDL: error status found during DML, code is %d, table_name:%s", res, table->s->table_name.str); break; } /* Insert key and slice to SST via SSTFileWriter API. Use mirror index (cfh) to save the index of (t0, t1). */ if ((res = bulk_load_key(tx, *index, merge_key, merge_val, 2))) { __XHANDLER_LOG(ERROR, "XEngineDDL: Error while bulk loading keys in " "external merge sort, table_name:%s", table->s->table_name.str); break; } ++merged_count; DBUG_EXECUTE_IF("serial_ddl_merge_phase", table->in_use->killed = THD::KILL_QUERY;); if (my_core::thd_killed(current_thd)) { // can't return directly, need to finish_bulk_load() break; } } // we need to finish bulk load whether there was error happened int bulk_load_ret = tx->finish_bulk_load(); if (my_core::thd_killed(current_thd)) { DBUG_RETURN(HA_ERR_QUERY_INTERRUPTED); } // Here, res == HA_ERR_END_OF_FILE means that we are finished if (res == HA_ERR_END_OF_FILE && (res = bulk_load_ret)) { // NO_LINT_DEBUG __XHANDLER_LOG(ERROR, "XEngineDDL: Error finishing bulk load, table_name:%s", table->s->table_name.str); } DBUG_RETURN(res); } /* Check whether given user key sequences have unique conflict * return true which means there is unique conflict when there are two adjacent * PUT (after sort by sequence number) in the given user key sequences */ bool ha_xengine::check_user_key_sequence(std::vector &user_key_sequences) { if (user_key_sequences.size() > 1) { // sort by sequence in descending order std::sort(user_key_sequences.begin(), user_key_sequences.end(), [](const UserKeySequence &key1, const UserKeySequence &key2) { return key1.first > key2.first; }); // check all delta sequences and the latest sequence in base for // current user key to make sure there are no two adjacent PUT op xengine::db::Iterator::RecordStatus prev_status = xengine::db::Iterator::kError; for (auto& key_seq : user_key_sequences) { if (xengine::db::Iterator::kExist == prev_status && xengine::db::Iterator::kExist == key_seq.second) { return true; } prev_status = key_seq.second; } } return false; } /** check data in (BUILDING_BASE_INDEX, CHECK_UNIQUE_CONSTRAINT) against (TABLE_CREATE, BUILDING_BASE_INDEX) @param new_table_arg @param index, xengine index definition @param is_rebuild, rebuild table or not */ /* Uniqueness conflict check with DML for online DDL * Iterate over all user keys from DML operations: * 1) Using key_seq() and key_status() to get sequence and op * 2) Using Next to compare with user key at next position * 3) For sequences of same user key, using check_use_key_sequence to do * conflict check */ int ha_xengine::inplace_check_unique_phase(TABLE *new_table_arg, const std::shared_ptr& index, Added_key_info& added_key_info, bool is_rebuild) { DBUG_ENTER_FUNC(); if (index->is_secondary_key()) { DEBUG_SYNC(ha_thd(), "xengine.inplace_create_sk_check_constraint_begin"); } else { DEBUG_SYNC(ha_thd(), "xengine.inplace_unique_check_constraint_begin"); } int res = HA_EXIT_SUCCESS; KEY* key_info = &new_table_arg->key_info[index->get_keyno()]; if ((res = inplace_check_dml_error(new_table_arg, key_info, added_key_info))) { __XHANDLER_LOG(WARN, "error status found during DML, code is %d, table_name: %s", res, table->s->table_name.str); DBUG_RETURN(res); } xengine::common::ReadOptions base_opts; base_opts.read_level_ = xengine::common::kOnlyL2; base_opts.total_order_seek = true; xengine::common::ReadOptions delta_opts; delta_opts.read_level_ = xengine::common::kExcludeL2; delta_opts.total_order_seek = true; // also check DEL/SDEL records from delta delta_opts.skip_del_ = false; delta_opts.unique_check_ = true; // Get Snapshot and Set CheckUniquePhase under protect for MDL-Lock if (ha_thd()->mdl_context.upgrade_shared_lock( table->mdl_ticket, MDL_EXCLUSIVE, ha_thd()->variables.lock_wait_timeout)) { __XHANDLER_LOG(ERROR, "XEngineDDL: update mdl_exclusive lock fail, table_name: %s", table->s->table_name.str); DBUG_RETURN(HA_ERR_INTERNAL_ERROR); } inplace_update_added_key_info_step(added_key_info, Added_key_info::CHECK_UNIQUE_CONSTRAINT); // (t0, t1) iter IteratorUptr base_iter(xdb->NewIterator(base_opts, index->get_cf())); // (t1, t2) iter, snapshot IteratorUptr delta_iter(xdb->NewIterator(delta_opts, index->get_cf())); table->mdl_ticket->downgrade_lock(MDL_SHARED_UPGRADABLE); uint user_defined_key_parts = key_info->user_defined_key_parts; const bool all_parts_used = (user_defined_key_parts == index->get_key_parts()); Arena arena; // used to do Slice::deep_copy std::vector user_key_sequences; bool memcmp_key_inited = false, has_dup = false; uint memcmp_key_size = 0; xengine::common::Slice memcmp_key, saved_iter_key, saved_iter_val; for (delta_iter->SeekToFirst(); delta_iter->Valid(); delta_iter->Next()) { uint iter_key_memcmp_size = delta_iter->key().size(); if (index->is_secondary_key()) { uint n_null_fields = 0; iter_key_memcmp_size = index->get_memcmp_sk_size( new_table_arg, delta_iter->key(), &n_null_fields); if (n_null_fields > 0) { continue; } } if (memcmp_key_inited) { // check whether this iterator key has same user key if (memcmp_key_size != iter_key_memcmp_size || (0 != memcmp( memcmp_key.data(), delta_iter->key().data(), memcmp_key_size))) { // user key is changed, check whether the old one exists in base or not base_iter->Seek(memcmp_key); if (HA_EXIT_SUCCESS == read_key_exact(*index, base_iter.get(), all_parts_used, memcmp_key)) { #ifndef NDEBUG __XHANDLER_LOG(INFO, "XEngineDDL: find match record %s in L2 on index:%d during DDL", base_iter->key().ToString(true).c_str(), index->get_index_number()); #endif // Use 0 as the sequence number of the record in base tier user_key_sequences.emplace_back(0, xengine::db::Iterator::kExist); } // check all sequences with status for current user key has_dup = check_user_key_sequence(user_key_sequences); user_key_sequences.clear(); if (has_dup) { break; } memcmp_key_inited = false; } } if (!memcmp_key_inited) { if (index->is_secondary_key()) { uint n_null_fields = 0; uint sk_memcmp_size = index->get_memcmp_sk_parts( new_table_arg, delta_iter->key(), m_dup_sk_packed_tuple, &n_null_fields); DBUG_ASSERT(n_null_fields == 0); DBUG_ASSERT(sk_memcmp_size == iter_key_memcmp_size); memcmp_key = xengine::common::Slice( reinterpret_cast(m_dup_sk_packed_tuple), sk_memcmp_size); } else { memcmp_key = delta_iter->key().deep_copy(arena); } memcmp_key_size = iter_key_memcmp_size; memcmp_key_inited = true; #ifndef NDEBUG __XHANDLER_LOG(INFO, "XEngineDDL: unique check for %s(size=%u) on index:%d during DDL", memcmp_key.ToString(true).c_str(), memcmp_key_size, index->get_index_number()); #endif } // collect status and sequences number for the same user key auto delta_sequence = delta_iter->key_seq(); auto status = delta_iter->key_status(); DBUG_ASSERT(xengine::db::Iterator::kNonExist != status); user_key_sequences.emplace_back(delta_sequence, status); // To print duplicate entry we may need value information to unpack the record // save the first PUT record for later usage if duplicate entry is found if (xengine::db::Iterator::kExist == status && saved_iter_val.empty()) { saved_iter_key = delta_iter->key().deep_copy(arena); saved_iter_val = delta_iter->value().deep_copy(arena); } #ifndef NDEBUG __XHANDLER_LOG(INFO, "XEngineDDL: find match record %s(%u, %s) in L0&L1 on index:%d during DDL", delta_iter->key().ToString(true).c_str(), delta_sequence, (xengine::db::Iterator::kExist == status) ? "PUT":"DEL", index->get_index_number()); #endif } if (!has_dup && memcmp_key_inited && !user_key_sequences.empty()) { /* check whether the user key exists in base or not for last user key when * delta_iter is not valid */ base_iter->Seek(memcmp_key); if (HA_EXIT_SUCCESS == read_key_exact(*index, base_iter.get(), all_parts_used, memcmp_key)) { #ifndef NDEBUG __XHANDLER_LOG(INFO, "XEngineDDL: find match record %s in L2 on index:%d during DDL", base_iter->key().ToString(true).c_str(), index->get_index_number()); #endif // Use 0 as the sequence number of the record in base tier user_key_sequences.emplace_back(0, xengine::db::Iterator::kExist); } // check all sequences with status for current user key has_dup = check_user_key_sequence(user_key_sequences); user_key_sequences.clear(); } if (has_dup) { DBUG_ASSERT(!saved_iter_key.empty()); __XHANDLER_LOG(WARN, "XEngineDDL: duplicate entry is found %s during DDL on index:%d", saved_iter_key.ToString(true).c_str(), index->get_index_number()); // Current user key is duplicate entry // using saved_iter_key/saved_iter_val to call fill_new_duplicate_record res = inplace_update_added_key_info_status_dup_key(new_table_arg, index, added_key_info, saved_iter_key, saved_iter_val, is_rebuild); } if (index->is_secondary_key()) { DEBUG_SYNC(ha_thd(), "xengine.inplace_create_sk_check_constraint_done"); } else { DEBUG_SYNC(ha_thd(), "xengine.inplace_unique_check_constraint_done"); } /** * at this point, cf-index has all the data (t0, now) * * new data is still allowed to come before commit, it goes to cf-index * as usual since the lock is released, and still check unique against * cf-index, cf-mirror has already removed and not checked anymore. * * upper layer will call commit_inplace_alter_table(..., commit=true) * even duplicates happen here, but that function notices and returns * error, then another commit_inplace_alter_table(..., commit=false) will * be called. */ DBUG_RETURN(res); } /** * Update status of index during inplace_populate_index */ int ha_xengine::inplace_update_added_key_info_step( Added_key_info& added_key_info, Added_key_info::ADD_KEY_STEP step) const { DBUG_ENTER_FUNC(); DBUG_ASSERT(step >= Added_key_info::TABLE_CREATE && step <= Added_key_info::FINISHED); added_key_info.step = step; DBUG_RETURN(HA_EXIT_SUCCESS); } /** * Check where duplicate status is set by DML during inplace populate index * * @param table_arg: used to call print_keydup_error * @param key: used to call print_keydup_error * @param added_key_info: used to check status * @param print_err: used to identify whether print err */ int ha_xengine::inplace_check_dml_error(TABLE *const table_arg, KEY *key, const Added_key_info& added_key_info, bool print_err) { DBUG_ENTER_FUNC(); DBUG_ASSERT(nullptr != table_arg); if (HA_ERR_FOUND_DUPP_KEY == added_key_info.status.load()) { // wait dup_key dup_val saved. while (!added_key_info.dup_key_saved.load()) { } // new table duplicate record is set by dml session if (print_err) { // can't print err in child thread print_keydup_error(table_arg, key, MYF(0)); } DBUG_RETURN(HA_ERR_FOUND_DUPP_KEY); } else if (HA_ERR_INVALID_NULL_ERROR == added_key_info.status.load()) { if (print_err) { my_error(ER_INVALID_USE_OF_NULL, MYF(0)); } DBUG_RETURN(HA_ERR_INVALID_NULL_ERROR); } DBUG_RETURN(HA_EXIT_SUCCESS); } /** Scan the Primary Key index entries and populate the new add index keys. for new table, we build new storage record from old_table_record and default values. @param new_table_arg @param index, new added index @param is_rebuild, rebuild table or not @return SUCCESS/FAILURE */ int ha_xengine::inplace_populate_index(TABLE *const new_table_arg, const std::shared_ptr& index, bool is_rebuild) { DBUG_ENTER_FUNC(); int res = HA_EXIT_SUCCESS; bool need_unique_check = false; KEY* key_info = &new_table_arg->key_info[index->get_keyno()]; if (!index->is_hidden_primary_key()) { need_unique_check = is_rebuild ? !index->m_can_skip_unique_check : key_info->flags & HA_NOSAME; } std::unordered_map::iterator iter; if (!is_rebuild) { iter = m_tbl_def->m_added_key.find(index.get()); DBUG_ASSERT(iter != m_tbl_def->m_added_key.end()); } else { iter = m_tbl_def->m_inplace_new_keys.find(index.get()); DBUG_ASSERT(iter != m_tbl_def->m_inplace_new_keys.end()); } if ((res = inplace_build_base_phase( new_table_arg, index, need_unique_check, iter->second, is_rebuild))) { print_common_err(res); if (res != HA_ERR_FOUND_DUPP_KEY && res != HA_ERR_INVALID_NULL_ERROR) { __XHANDLER_LOG(ERROR, "XEngineDDL: build base error, code: %d, table_name: %s", res, table->s->table_name.str); } } else if (need_unique_check && (res = inplace_check_unique_phase(new_table_arg, index, iter->second, is_rebuild))) { if (res != HA_ERR_FOUND_DUPP_KEY) { __XHANDLER_LOG(ERROR, "XEngineDDL: check unique phase error, code: %d, table_name: %s", res, table->s->table_name.str); } } else if ((res = inplace_update_added_key_info_step( iter->second, Added_key_info::FINISHED))) { __XHANDLER_LOG(ERROR, "Failed to update state of Added_key_info::step after unique check"); } else { #ifndef NDEBUG __XHANDLER_LOG(INFO, "XEngineDDL: build index %d for table %s successfully", index->get_index_number(), table->s->table_name.str); #endif } if (res == HA_ERR_FOUND_DUPP_KEY) { __XHANDLER_LOG(WARN, "XEngineDDL: duplicated error happened during build index %d for table %s", index->get_index_number(), table->s->table_name.str); } DBUG_RETURN(res); } /** Update metadata in commit phase for instant ADD COLUMN. Basically, it should remember number of instant columns, and the default value of newly added columns. Note this function should only update the metadata which would not result in failure @param[in] new_table New InnoDB table object @param[in] old_table MySQL table as it is before the ALTER operation @param[in] altered_table MySQL table that is being altered @param[in] old_dd_tab Old dd::Table @param[in,out] new_dd_tab New dd::Table */ void ha_xengine::dd_commit_instant_table(const TABLE *old_table, const TABLE *altered_table, const dd::Table *old_dd_tab, dd::Table *new_dd_tab) { assert(old_dd_tab->columns().size() <= new_dd_tab->columns()->size()); // pass DD_TABLE_INSTANT_COLS from old to new DD if (!new_dd_tab->se_private_data().exists( dd_table_key_strings[DD_TABLE_INSTANT_COLS])) { uint32_t instant_cols = old_table->s->fields; if (dd_table_has_instant_cols(*old_dd_tab)) { old_dd_tab->se_private_data().get( dd_table_key_strings[DD_TABLE_INSTANT_COLS], &instant_cols); } new_dd_tab->se_private_data().set( dd_table_key_strings[DD_TABLE_INSTANT_COLS], instant_cols); } // pass DD_TABLE_NULL_BYTES from old to new DD if (!new_dd_tab->se_private_data().exists( dd_table_key_strings[DD_TABLE_NULL_BYTES])) { uint32_t null_bytes = m_null_bytes_in_rec; if (dd_table_has_instant_cols(*old_dd_tab)) { old_dd_tab->se_private_data().get( dd_table_key_strings[DD_TABLE_NULL_BYTES], &null_bytes); } new_dd_tab->se_private_data().set(dd_table_key_strings[DD_TABLE_NULL_BYTES], null_bytes); } /* To remember old default values if exist */ dd_copy_table_columns(*new_dd_tab, *old_dd_tab); /* Then add all new default values */ dd_add_instant_columns(old_table, altered_table, new_dd_tab); // assert(dd_table_has_instant_cols(*new_dd_tab)); } void ha_xengine::dd_commit_inplace_no_change(const dd::Table *old_dd_tab, dd::Table *new_dd_tab) { dd_copy_private(*new_dd_tab, *old_dd_tab); /* To remember old default values if exist */ dd_copy_table_columns(*new_dd_tab, *old_dd_tab); /*if (!dd_table_is_partitioned(new_dd_tab->table()) || dd_part_is_first(reinterpret_cast(new_dd_tab))) { dd_copy_table(new_dd_tab->table(), old_dd_tab->table()); }*/ } /** Update table level instant metadata in commit phase, may be instant column name is modified, or default value is modifed after ddl operation. 1. if add columns mixed with other ddl operation, it will be inplace type 2. if data type changed or not-null/null attribute changed, it will be copy/rebuild so instant columns and instant nulls should be same to old_table @param[in] old_dd_tab old dd::Table @param[in] new_dd_tab new dd::Table */ void dd_commit_inplace_update_instant_meta( my_core::Alter_inplace_info *const ha_alter_info, const dd::Table *old_dd_tab, dd::Table *new_dd_tab, TABLE *table, TABLE *altered_table) { if (!dd_table_has_instant_cols(*old_dd_tab)) { return; } assert(old_dd_tab->se_private_data().exists( dd_table_key_strings[DD_TABLE_INSTANT_COLS])); assert(old_dd_tab->se_private_data().exists( dd_table_key_strings[DD_TABLE_NULL_BYTES])); uint32_t instant_cols = 0; old_dd_tab->se_private_data().get(dd_table_key_strings[DD_TABLE_INSTANT_COLS], &instant_cols); new_dd_tab->se_private_data().set(dd_table_key_strings[DD_TABLE_INSTANT_COLS], instant_cols); uint32_t null_bytes = 0; old_dd_tab->se_private_data().get(dd_table_key_strings[DD_TABLE_NULL_BYTES], &null_bytes); new_dd_tab->se_private_data().set(dd_table_key_strings[DD_TABLE_NULL_BYTES], null_bytes); //update dd-column meta if necessary assert(table->s->fields == altered_table->s->fields); if (!(ha_alter_info->handler_flags & Alter_inplace_info::ALTER_COLUMN_NAME)) { dd_copy_table_columns(*new_dd_tab, *old_dd_tab); return; } //some field name changed during inplae-ddl Field *old_field, *new_field; for (uint i=0; i < table->s->fields; i++) { old_field = table->field[i]; new_field = altered_table->field[i]; dd::Column *old_col = const_cast(dd_find_column(old_dd_tab, old_field->field_name)); dd::Column *new_col = const_cast( dd_find_column(new_dd_tab, new_field->field_name)); #ifndef NDEBUG if (new_field->flags & FIELD_IS_RENAMED) { assert(my_strcasecmp(system_charset_info, old_field->field_name, new_field->field_name)); } #endif if (!old_col->se_private_data().empty()) { if (!new_col->se_private_data().empty()) new_col->se_private_data().clear(); new_col->set_se_private_data(old_col->se_private_data()); } } return; } void ha_xengine::dd_commit_inplace_instant(Alter_inplace_info *ha_alter_info, const TABLE *old_table, const TABLE *altered_table, const dd::Table *old_dd_tab, dd::Table *new_dd_tab) { assert(is_instant(ha_alter_info)); auto type = static_cast(ha_alter_info->handler_trivial_ctx); switch (type) { case Instant_Type::INSTANT_NO_CHANGE: dd_commit_inplace_no_change(old_dd_tab, new_dd_tab); break; case Instant_Type::INSTANT_ADD_COLUMN: dd_copy_private(*new_dd_tab, *old_dd_tab); dd_commit_instant_table(old_table, altered_table, &old_dd_tab->table(), &new_dd_tab->table()); break; case Instant_Type::INSTANT_IMPOSSIBLE: default: assert(0); } } /** If ddl failed, we need to rollback new added indexes. @param added_indexes, new add indexes in ddl @ret, SUCCESS/FAILURE. */ int ha_xengine::rollback_added_index( const std::unordered_set>& added_indexes) { int ret = HA_EXIT_SUCCESS; /* Remove uncommitted key definitions from ddl_manager */ ddl_manager.remove_uncommitted_keydefs(added_indexes); /* Rollback created indexes */ m_tbl_def->clear_keys_for_ddl(); /* fix #22611997, in rollback case, we need renew members of handlers Buffers need to be set up again to account for new, possibly longer secondary keys. */ free_key_buffers(); if ((ret = alloc_key_buffers(table, m_tbl_def.get()))) { __XHANDLER_LOG(ERROR, "XEngine failed allocating key buffers during alter. table_name: %s", table->s->table_name.str); return ret; } return ret; } /** Commit or rollback the changes made during prepare_inplace_alter_table() and inplace_alter_table() inside the storage engine. Note that in case of rollback the allowed level of concurrency during this operation will be the same as for inplace_alter_table() and thus might be higher than during prepare_inplace_alter_table(). (For example, concurrent writes were blocked during prepare, but might not be during rollback). @note Storage engines are responsible for reporting any errors by calling my_error()/print_error() @note If this function with commit= true reports error, it will be called again with commit= false. @note In case of partitioning, this function might be called for rollback without prepare_inplace_alter_table() having been called first. Also partitioned tables sets ha_alter_info->group_commit_ctx to a NULL terminated array of the partitions handlers and if all of them are committed as one, then group_commit_ctx should be set to NULL to indicate to the partitioning handler that all partitions handlers are committed. @see prepare_inplace_alter_table(). @param altered_table TABLE object for new version of table. @param ha_alter_info Structure describing changes to be done by ALTER TABLE and holding data used during in-place alter. @param commit True => Commit, False => Rollback. @param old_table_def dd::Table object describing old version of the table. @param new_table_def dd::Table object for the new version of the table. Can be adjusted by this call if SE supports atomic DDL. These changes to the table definition will be persisted in the data-dictionary at statement commit time. @retval true Error @retval false Success */ bool ha_xengine::commit_inplace_alter_table( my_core::TABLE *const altered_table, my_core::Alter_inplace_info *const ha_alter_info, bool commit, const dd::Table *old_dd_tab, dd::Table *new_dd_tab) { DBUG_ENTER_FUNC(); DBUG_ASSERT(nullptr != ha_alter_info); auto ctx = dynamic_cast(ha_alter_info->handler_ctx); if (is_instant(ha_alter_info)) { dd_commit_inplace_instant(ha_alter_info, table, altered_table, old_dd_tab, new_dd_tab); #ifndef NDEBUG __XHANDLER_LOG(INFO, "XEngineDDL: commit instant alter sql is %s, commit is %d", ha_thd()->query().str, (int)commit); #endif DBUG_RETURN(HA_EXIT_SUCCESS); } else if (nullptr == ctx) { /* If ctx has not been created yet, nothing to do here */ __XHANDLER_LOG(INFO, "XEngineDDL: ctx is released or not initialized"); DBUG_RETURN(HA_EXIT_SUCCESS); } else { int ret = HA_EXIT_SUCCESS; bool is_rebuild = ctx->m_rebuild; if (ctx->m_rebuild) { ret = commit_inplace_alter_table_rebuild(altered_table, ha_alter_info, commit, old_dd_tab, new_dd_tab); } else { ret = commit_inplace_alter_table_norebuild( altered_table, ha_alter_info, commit, old_dd_tab, new_dd_tab); } if (!ret) { #ifndef NDEBUG __XHANDLER_LOG(INFO, "XEngineDDL: inplace ddl %s successfully, table: %s", commit ? "commit" : "rollback", table->s->table_name.str); #endif } else { __XHANDLER_LOG(WARN, "XEngineDDL: failed to commit inplace ddl, table: %s, errcode is %d", table->s->table_name.str, ret); } #ifndef NDEBUG __XHANDLER_LOG(INFO, "XEngineDDL: commit alter sql is %s, inplace_rebuild: %d, commit is %d, code is %d", ha_thd()->query().str, (int)is_rebuild, (int)commit, ret); #endif DBUG_RETURN(ret); } } /** Maybe autoincrement value changed by Alter Statement, Get the auto-increment value of the table on commit. @param[in] ha_alter_info Data used during in-place alter @param[in] altered_table MySQL table that is being altered @param[in] old_table MySQL table as it is before the ALTER operation @retval HA_EXIT_SUCCESS Success, others FAILED */ int ha_xengine::commit_inplace_alter_get_autoinc(Alter_inplace_info *ha_alter_info, const TABLE *altered_table, const TABLE *old_table) { DBUG_ENTER("alter_commit_get_autoinc"); DBUG_ASSERT(nullptr != ha_alter_info); auto ctx = dynamic_cast(ha_alter_info->handler_ctx); DBUG_ASSERT(ctx); if (altered_table->found_next_number_field && ((ha_alter_info->handler_flags & Alter_inplace_info::CHANGE_CREATE_OPTION) && (ha_alter_info->create_info->used_fields & HA_CREATE_USED_AUTO))) { /* An AUTO_INCREMENT value was supplied, but the table was not rebuilt. Get the user-supplied value or the last value from the sequence. */ ulonglong set_autoinc_value = ha_alter_info->create_info->auto_increment_value; if (old_table->found_next_number_field) { load_auto_incr_value(); } ulonglong max_value_table = m_tbl_def->m_auto_incr_val; if (set_autoinc_value > max_value_table) { ctx->m_new_tdef->m_auto_incr_val = set_autoinc_value; } } DBUG_RETURN(HA_EXIT_SUCCESS); } /** Both inplace alter ddl(rebuild/no_rebuild) should do these things at commit phase 1. update autoinc if necessary 2. update xengine dictionary 3. calulate statistics 4. enable major compaction and inner merge operation @param[in] altered_table, new table @param[in] ha_alter_info, DDL operation @param[in] commit, true indicate commit, and false indicate rollback @param[in] new_added_indexes @return SUCCESS/FAILURE */ int ha_xengine::commit_inplace_alter_table_common( my_core::TABLE *const altered_table, my_core::Alter_inplace_info *const ha_alter_info, bool commit, const std::unordered_set> &new_added_indexes) { DBUG_ENTER_FUNC(); //update autoinc if necessary if (commit_inplace_alter_get_autoinc(ha_alter_info, altered_table, table)) { __XHANDLER_LOG(ERROR, "XEngineDDL: get autoinc value failed. table_name: %s", table->s->table_name.str); return HA_EXIT_FAILURE; } DBUG_ASSERT(nullptr != ha_alter_info); auto ctx0 = dynamic_cast(ha_alter_info->handler_ctx); DBUG_ASSERT(ctx0); m_tbl_def->clear_keys_for_ddl(); // current Xdb_tbl_def represented by m_tbl_def will be unreferenced // new Xdb_tbl_def represented by ctx0->m_new_tdef will be referenced m_tbl_def = ctx0->m_new_tdef; m_key_descr_arr = m_tbl_def->m_key_descr_arr; m_pk_descr = m_key_descr_arr[pk_index(altered_table, m_tbl_def.get())]; /* For partitioned tables, we need to commit all changes to all tables at once, unlike in the other inplace alter API methods. */ inplace_alter_handler_ctx **ctx_array; inplace_alter_handler_ctx *ctx_single[2]; if (ha_alter_info->group_commit_ctx) { DBUG_EXECUTE_IF("crash_during_index_creation_partition", DBUG_SUICIDE();); ctx_array = ha_alter_info->group_commit_ctx; } else { ctx_single[0] = ctx0; ctx_single[1] = nullptr; ctx_array = ctx_single; } DBUG_ASSERT(ctx0 == ctx_array[0]); ha_alter_info->group_commit_ctx = nullptr; THD *const thd = table ? table->in_use : my_core::thd_get_current_thd(); Xdb_transaction *const tx = get_or_create_tx(thd); xengine_register_tx(ht, thd, tx); auto batch = dynamic_cast(tx->get_blind_write_batch()); std::unordered_set create_index_ids; dict_manager.lock(); for (inplace_alter_handler_ctx **pctx = ctx_array; *pctx; pctx++) { auto ctx = dynamic_cast(*pctx); for (uint i = 0; i < ctx->m_new_tdef->m_key_count; i++) { create_index_ids.insert(ctx->m_new_tdef->m_key_descr_arr[i]->get_gl_index_id()); } if (ddl_manager.put_and_write(ctx->m_new_tdef, batch, &ddl_log_manager, thd_thread_id(thd), false)) { DBUG_ASSERT(0); } /* Remove uncommitted key definitions from ddl_manager, as they are now committed into the data dictionary. */ ddl_manager.remove_uncommitted_keydefs(new_added_indexes); } dict_manager.unlock(); /** if crash here, during crash-recovery, drop_thread will collect create-ongoing-indexs then drop subtables; during post_recover phase, we collect ddl_logs and find subtables, if exists, then drop it; if not, do nothing. */ DBUG_EXECUTE_IF("ddl_log_crash_before_remove_index_ongoing", DBUG_SUICIDE();); /* Mark ongoing create indexes as finished/remove from data dictionary */ //dict_manager.finish_indexes_operation(create_index_ids, // Xdb_key_def::DDL_CREATE_INDEX_ONGOING); /** if crash here, during post_recover phase, we drop it. */ DBUG_EXECUTE_IF("ddl_log_crash_after_remove_index_ongoing", DBUG_SUICIDE();); /* We need to recalculate the index stats here manually. The reason is that the secondary index does not exist inside m_index_num_to_keydef until it is committed to the data dictionary, which prevents us from updating the stats normally as the ddl_manager cannot find the proper gl_index_ids yet during adjust_stats calls. */ if (calculate_stats(altered_table, nullptr, nullptr)) { /* Failed to update index statistics, should never happen */ DBUG_ASSERT(0); } // enable add-index major compaction std::vector column_family_handles; for (auto& index : new_added_indexes) { xengine::db::ColumnFamilyHandle *cf = index->get_cf(); column_family_handles.push_back(cf); } auto txn_db_impl = dynamic_cast(xdb); xengine::common::Status s; txn_db_impl->GetDBImpl()->switch_major_compaction(column_family_handles, true); if (!s.ok()) { __XHANDLER_LOG(ERROR, "XEngineDDL: switch_major_compaction fail: %s. table_name: %s", s.ToString().c_str(), table->s->table_name.str); DBUG_RETURN(HA_EXIT_FAILURE); } s = txn_db_impl->GetDBImpl()->enable_backgroud_merge(column_family_handles); if (!s.ok()) { __XHANDLER_LOG(ERROR, "XEngineDDL: enable_background_merge fail: %s. table_name: %s", s.ToString().c_str(), table->s->table_name.str); DBUG_RETURN(HA_EXIT_FAILURE); } DBUG_RETURN(HA_EXIT_SUCCESS); } /** commit phase for inplace_rebuild ddl @param[in] altered_table new table @param[in] ha_alter_info DDL operation @param[in] commit, true is commit and false is rollback @param[in] old_dd_tab @param[in] new_dd_tab @return true is ERROR and false is Success */ bool ha_xengine::commit_inplace_alter_table_rebuild( my_core::TABLE *const altered_table, my_core::Alter_inplace_info *const ha_alter_info, bool commit, const dd::Table *old_dd_tab, dd::Table *new_dd_tab) { DBUG_ENTER_FUNC(); DBUG_ASSERT(altered_table != nullptr); DBUG_ASSERT(ha_alter_info != nullptr); auto ctx0 = dynamic_cast(ha_alter_info->handler_ctx); Xdb_tbl_def *new_tdef = m_tbl_def->m_inplace_new_tdef; DBUG_ASSERT(new_tdef != nullptr); int ret = HA_EXIT_SUCCESS; std::unordered_set> new_added_indexes; for (uint i=0; i < new_tdef->m_key_count; i++) { new_added_indexes.insert(new_tdef->m_key_descr_arr[i]); } if (commit) { // if there were dups between inplace_alter_table's return and now, // capture it here for (auto &k : m_tbl_def->m_inplace_new_keys) { if ((ret = inplace_check_dml_error(altered_table, &altered_table->key_info[k.first->get_keyno()], k.second))) { __XHANDLER_LOG(WARN, "XEngineDDL: found error when rebuild table, code: %d", ret); rollback_added_index(new_added_indexes); ret = HA_EXIT_FAILURE; goto rebuild_fun_end; } } } else { /* rollback created indexes **/ ret = rollback_added_index(new_added_indexes); goto rebuild_fun_end; } /* IMPORTANT: When rollback is requested, mysql will abort with an assertion failure. That means every failed commit during inplace alter table will result in a fatal error on the server. Indexes ongoing creation will be detected when the server restarts, and dropped. */ DBUG_EXECUTE_IF("ddl_log_crash_before_inplace_ddl_commit", DBUG_SUICIDE();); ret = commit_inplace_alter_table_common(altered_table, ha_alter_info, commit, new_added_indexes); rebuild_fun_end: // any way, whether succeed or fail, we need to reload new_tdef for atomic ddl delete ctx0; ha_alter_info->handler_ctx = nullptr; DBUG_RETURN(ret); } /** commit phase for inplace_norebuild ddl @param[in] altered_table new table @param[in] ha_alter_info DDL operation @param[in] commit, true is commit and false is rollback @param[in] old_dd_tab @param[in] new_dd_tab @return true is ERROR and false is Success */ bool ha_xengine::commit_inplace_alter_table_norebuild( my_core::TABLE *const altered_table, my_core::Alter_inplace_info *const ha_alter_info, bool commit, const dd::Table *old_dd_tab, dd::Table *new_dd_tab) { DBUG_ENTER_FUNC(); DBUG_ASSERT(altered_table != nullptr); DBUG_ASSERT(ha_alter_info != nullptr); auto ctx0 = dynamic_cast(ha_alter_info->handler_ctx); int ret = HA_EXIT_SUCCESS; if (commit) { // if there were dups between inplace_alter_table's return and now, // capture it here for (auto &k : m_tbl_def->m_added_key) { if ((ret = inplace_check_dml_error(altered_table, &altered_table->key_info[k.first->get_keyno()], k.second))) { __XHANDLER_LOG(WARN, "XEngineDDL: found error when build unique index, code is %d", ret); DBUG_ASSERT(ctx0 != nullptr); rollback_added_index(ctx0->m_added_indexes); ret = HA_EXIT_FAILURE; goto fun_end; } } } /* * IMPORTANT: When rollback is requested, mysql will abort with * an assertion failure. That means every failed commit during inplace alter * table will result in a fatal error on the server. Indexes ongoing creation * will be detected when the server restarts, and dropped. * * For partitioned tables, a rollback call to this function (commit == false) * is done for each partition. A successful commit call only executes once * for all partitions. */ if (!commit) { // be consistent with ha_xengine::open, // only when add indexes case, we will enlarge our buffer if (ha_alter_info->handler_flags & (my_core::Alter_inplace_info::ADD_INDEX | my_core::Alter_inplace_info::ADD_UNIQUE_INDEX)) { ret = rollback_added_index(ctx0->m_added_indexes); goto fun_end; } else { ret = HA_EXIT_SUCCESS; goto fun_end; } } DBUG_EXECUTE_IF("ddl_log_crash_before_inplace_ddl_commit", DBUG_SUICIDE();); dd_commit_inplace_update_instant_meta(ha_alter_info, old_dd_tab, new_dd_tab, table, altered_table); /* For partitioned tables, we need to commit all changes to all tables at once, unlike in the other inplace alter API methods. */ #ifndef NDEBUG inplace_alter_handler_ctx **ctx_array; inplace_alter_handler_ctx *ctx_single[2]; #endif DBUG_ASSERT(ctx0); if (ha_alter_info->group_commit_ctx) { DBUG_EXECUTE_IF("crash_during_index_creation_partition", DBUG_SUICIDE();); #ifndef NDEBUG ctx_array = ha_alter_info->group_commit_ctx; } else { ctx_single[0] = ctx0; ctx_single[1] = nullptr; ctx_array = ctx_single; #endif } DBUG_ASSERT(ctx0 == ctx_array[0]); ha_alter_info->group_commit_ctx = nullptr; ret = commit_inplace_alter_table_common(altered_table, ha_alter_info, commit, ctx0->m_added_indexes); fun_end: // any way, whether succeed or fail, we need to reload new_tdef for atomic ddl delete ctx0; ha_alter_info->handler_ctx = nullptr; DBUG_RETURN(ret); } /** Unique check for ongoing DML with new index building concurrently. used both for lock_and_check_new_index_pk and lock_and_check_new_index_sk. @param[in] kd, xengine key definition @param[in/out] ki, altered_table and dup key information @param[in] row_info, Xdb_transaction, new_pk_slice @param[in] key, key slice to check @param[in] is_rebuild @param[out] duplicate key found or not @return SUCCESS/FAILURE */ void ha_xengine::check_new_index_unique_inner( const Xdb_key_def &kd, Added_key_info &ki, const struct update_row_info &row_info, const xengine::common::Slice& key, bool is_rebuild, bool *const found) const { #ifndef NDEBUG __XHANDLER_LOG(INFO, "XEngineDDL: unique check for %s(size=%u) on index:%d during DML", key.ToString(true).c_str(), key.size(), kd.get_index_number()); #endif DBUG_ASSERT(ki.altered_table); KEY *key_info = &ki.altered_table->key_info[kd.get_keyno()]; DBUG_ASSERT(key_info); uint user_defined_key_parts = key_info->user_defined_key_parts; const bool all_parts_used = (user_defined_key_parts == kd.get_key_parts()); const bool total_order_seek = !can_use_bloom_filter(ha_thd(), kd, key, all_parts_used, is_ascending(kd, HA_READ_KEY_EXACT)); const bool fill_cache = true; // check against (t1, -) IteratorUptr iter(row_info.tx->get_iterator(kd.get_cf(), total_order_seek, fill_cache, true /* read current data */, false /* acquire snapshot */, true /*exclude_l2*/, true/* for unique check */)); /* * Need to scan the transaction to see if there is a duplicate key. * Also need to scan XENGINE and verify the key has not been deleted * in the transaction. */ // Stage 1 check write batch only auto wb_status = xengine::db::Iterator::kNonExist; auto uk_iter = dynamic_cast(iter.get()); DBUG_ASSERT(nullptr != uk_iter); for (uk_iter->Seek(key); !uk_iter->is_at_base() && uk_iter->Valid(); uk_iter->Next()) { uint cmp_size = uk_iter->key().size(); if (kd.is_secondary_key()) { uint n_null_fields = 0; cmp_size = kd.get_memcmp_sk_size(ki.altered_table, uk_iter->key(), &n_null_fields); if (n_null_fields > 0) { #ifndef NDEBUG __XHANDLER_LOG(INFO, "XEngineDDL: null field exists: %u", n_null_fields); #endif continue; } } if (cmp_size != key.size() || memcmp(key.data(), uk_iter->key().data(), cmp_size)) { // current key doesn't match exactly, no further check in WriteBatch #ifndef NDEBUG __XHANDLER_LOG(INFO, "XEngineDDL: record in WriteBatch doesn't match:(key:%s, size:%u) for indxe:%d", uk_iter->key().ToString(true).c_str(), cmp_size, kd.get_index_number()); #endif break; } wb_status = uk_iter->key_status(); if (xengine::db::Iterator::kExist == wb_status) { // find conflict user key, no further check in WriteBatch and base #ifndef NDEBUG __XHANDLER_LOG(INFO, "XEngineDDL: find duplicate record %s in WriteBatch for indxe:%d", key.ToString(true).c_str(), kd.get_index_number()); #endif break; } DBUG_ASSERT(xengine::db::Iterator::kDeleted == wb_status); // found deleted record, check more in WriteBatch #ifndef NDEBUG __XHANDLER_LOG(INFO, "XEngineDDL: find deleted record %s in WriteBatch for indxe:%d", uk_iter->key().ToString(true).c_str(), kd.get_index_number()); #endif } if (xengine::db::Iterator::kExist == wb_status) { // duplicate record is found in WriteBatch *found = true; } else if (xengine::db::Iterator::kDeleted == wb_status) { // duplicate record is deleted in WriteBatch *found = false; } else if (ki.step >= Added_key_info::CHECK_UNIQUE_CONSTRAINT) { // Stage 2 check L0, L1 and L2 if not match in WriteBatch, if needed /* no matching record is found in WriteBatch * further check all user keys from base iterator of delta (L0, L1) and L2. * We only do this unique check procedure after DDL has finished building * baseline, because we only can get static L2 data at this stage. * If we do this unique check procedure only with data in L0&L1, it is just * waste of time due to incomplete data set */ std::vector user_key_sequences; for (uk_iter->change_to_base(); uk_iter->Valid(); uk_iter->Next()) { uint cmp_size = uk_iter->key().size(); if (kd.is_secondary_key()) { uint n_null_fields = 0; cmp_size = kd.get_memcmp_sk_size(ki.altered_table, uk_iter->key(), &n_null_fields); if (n_null_fields > 0) { #ifndef NDEBUG __XHANDLER_LOG(INFO, "XEngineDDL: null field exist: %u", n_null_fields); #endif continue; } } if (cmp_size != key.size() || memcmp(key.data(), uk_iter->key().data(), cmp_size)) { // no more matching record in L0 and L1 #ifndef NDEBUG __XHANDLER_LOG(INFO, "XEngineDDL: record in L0&L1 doesn't match: (key:%s, size:%u) for index:%d", uk_iter->key().ToString().c_str(), cmp_size, kd.get_index_number()); #endif break; } auto sequence = uk_iter->key_seq(); auto status = uk_iter->key_status(); DBUG_ASSERT(xengine::db::Iterator::kExist == status || xengine::db::Iterator::kDeleted == status); // match in (L0, L1), collect status for later usage #ifndef NDEBUG __XHANDLER_LOG(INFO, "XEngineDDL: match record %s(%u, %s) in L0&L1 for index:%d", uk_iter->key().ToString(true).c_str(), sequence, (xengine::db::Iterator::kExist == status) ? "PUT" : "DEL", kd.get_index_number()); #endif user_key_sequences.emplace_back(sequence, status); } xengine::common::ReadOptions opts; opts.total_order_seek = true; opts.read_level_ = xengine::common::kOnlyL2; iter.reset(xdb->NewIterator(opts, kd.get_cf())); iter->Seek(key); if (HA_EXIT_SUCCESS == read_key_exact(kd, iter.get(), all_parts_used, key)) { #ifndef NDEBUG __XHANDLER_LOG(INFO, "XEngineDDL: find match record %s in L2 for index:%d", iter->key().ToString(true).c_str(), kd.get_index_number()); #endif user_key_sequences.emplace_back(0, xengine::db::Iterator::kExist); } *found = (!user_key_sequences.empty() && (check_user_key_sequence(user_key_sequences) || xengine::db::Iterator::kExist == user_key_sequences.begin()->second)); } if (*found) { __XHANDLER_LOG(WARN, "XEngineDDL: duplicate entry is found during DML for %s on index:%d", key.ToString(true).c_str(), kd.get_index_number()); // save the duplicated key uint err = HA_EXIT_SUCCESS; bool rc = ki.status.compare_exchange_strong(err, HA_ERR_FOUND_DUPP_KEY); // err only set from 0 -> DUP_KEY, not otherwise. false alert is possible, // e.g. the dup record is deleted later, but not detect that right now. if (rc) { if (set_duplicate_key_for_print(ki.altered_table, table, &kd, m_tbl_def->m_dict_info, is_rebuild)) { __XHANDLER_LOG(WARN, "XEngineDDL: Generate duplicate key error, table_name:%s", table->s->table_name.str); } else { #ifndef NDEBUG __XHANDLER_LOG(INFO, "XEngineDDL: Generate duplicate key successfully, table_name:%s", table->s->table_name.str); #endif } ki.dup_key_saved.store(true); } } } /** Only used for rebuild table, if primary key is modified, all secondary indexes will also modified. @param[in] kd, xengine key definition @param[in/out] ki, altered_table and dup key information @param[in] row_info, old_data and new_data @param[out] duplicate key found or not @return SUCCESS/FAILURE */ int ha_xengine::lock_and_check_new_index_pk( const Xdb_key_def &kd, Added_key_info &ki, const struct update_row_info &row_info, bool *const pk_changed, bool *const found) const { DEBUG_SYNC(ha_thd(), "xengine.check_and_lock_pk_inplace"); DBUG_ASSERT(found != nullptr); DBUG_ASSERT(!has_hidden_pk(ki.altered_table)); DBUG_ASSERT(row_info.new_data != nullptr); *pk_changed = false; // pack user data with new table uint new_pk_size = 0; if (m_new_pk_descr->pack_new_record(table, m_pack_buffer, row_info.new_data, m_pk_packed_tuple, nullptr, false, 0, 0, nullptr, ki.altered_table, m_tbl_def->m_dict_info, new_pk_size)) { __XHANDLER_LOG(ERROR, "XEngineDDL: pack new data for new table error. table_name: %s", table->s->table_name.str); return HA_EXIT_FAILURE; } if (row_info.old_data != nullptr) { // for update uint old_pk_size = 0; if (m_new_pk_descr->pack_new_record(table, m_pack_buffer, row_info.old_data, m_pk_packed_tuple_old, nullptr, false, 0, 0, nullptr, ki.altered_table, m_tbl_def->m_dict_info, old_pk_size)) { __XHANDLER_LOG(ERROR, "XEngineDDL: pack old data for new table error. table_name: %s", table->s->table_name.str); return HA_EXIT_FAILURE; } DBUG_ASSERT(old_pk_size > 0); // If the keys are the same, then no lock is needed if ((new_pk_size == old_pk_size) && !memcmp(m_pk_packed_tuple, m_pk_packed_tuple_old, new_pk_size)) { *found = false; return HA_EXIT_SUCCESS; } *pk_changed = true; } /* For UPDATEs, if the key has changed, we need to obtain a lock. INSERTs * always require locking. */ xengine::common::Slice new_pk_slice((const char *)m_pk_packed_tuple, new_pk_size); std::string tmp_value; const xengine::common::Status s = get_for_update(row_info.tx, kd.get_cf(), new_pk_slice, &tmp_value); if (!s.ok() && !s.IsNotFound()) { __XHANDLER_LOG(WARN, "XEngineDDL: get_for_update for key(%s) on pk(%u) failed with error:%s, table_name: %s", new_pk_slice.ToString(true).c_str(), kd.get_index_number(), s.ToString().c_str(), table->s->table_name.str); return row_info.tx->set_status_error(ki.altered_table->in_use, s, *m_new_pk_descr, m_tbl_def->m_inplace_new_tdef); } if (!kd.m_can_skip_unique_check) { check_new_index_unique_inner(kd, ki, row_info, new_pk_slice, true, found); } return HA_EXIT_SUCCESS; } /** During online-ddl running, new arrival inserts/updates need to do unique-check for new-index base from ddl-session and increments itself. @param[in] kd, xengine key definition @param[in/out] ki, altered_table and dup key information @param[in] row_info, old_data and new_data @param[out] duplicate key found or not @param[in] true is rebuild-ddl, and we should pack new record. @return SUCCESS/FAILURE */ int ha_xengine::lock_and_check_new_index_sk( const Xdb_key_def &kd, Added_key_info &ki, const struct update_row_info &row_info, bool *const found, bool is_rebuild/* = false */) const { DBUG_ASSERT(found != nullptr); *found = false; /* Can skip checking this key if none of the key fields have changed. */ if (row_info.old_data != nullptr && !m_update_scope.is_set(kd.get_keyno())) { return HA_EXIT_SUCCESS; } TABLE *altered_table = ki.altered_table; KEY *key_info = nullptr; uint n_null_fields = 0; uint user_defined_key_parts = 1; key_info = &altered_table->key_info[kd.get_keyno()]; user_defined_key_parts = key_info->user_defined_key_parts; /** If there are no uniqueness requirements, there's no need to obtain a lock for this key. */ if (!(key_info->flags & HA_NOSAME)) { return HA_EXIT_SUCCESS; } /* Calculate the new key for obtaining the lock For unique secondary indexes, the key used for locking does not include the extended fields.*/ uint size = 0; if (!is_rebuild) { size = kd.pack_record(table, m_pack_buffer, row_info.new_data, m_sk_packed_tuple, nullptr, false, 0, user_defined_key_parts, &n_null_fields, altered_table); } else { if (kd.pack_new_record(table, m_pack_buffer, row_info.new_data, m_sk_packed_tuple, nullptr, false, 0, user_defined_key_parts, &n_null_fields, altered_table, m_tbl_def->m_dict_info, size)) { __XHANDLER_LOG(ERROR, "XEngineDDL: pack new record errror, table_name: %s", table->s->table_name.str); return HA_EXIT_FAILURE; } } if (n_null_fields > 0) { #ifndef NDEBUG __XHANDLER_LOG(INFO, "XEngineDDL: null field exist: %u", n_null_fields); #endif /** If any fields are marked as NULL this will never match another row as to NULL never matches anything else including another NULL. */ return HA_EXIT_SUCCESS; } const xengine::common::Slice new_slice = xengine::common::Slice((const char *)m_sk_packed_tuple, size); /** For UPDATEs, if the key has changed, we need to obtain a lock. INSERTs always require locking. */ if (row_info.old_data != nullptr) { // don't include the hidden_pk_id for sk comparision if (!is_rebuild) { size = kd.pack_record(table, m_pack_buffer, row_info.old_data, m_sk_packed_tuple_old, nullptr, false, 0, user_defined_key_parts, nullptr, altered_table); } else { if (kd.pack_new_record(table, m_pack_buffer, row_info.old_data, m_sk_packed_tuple_old, nullptr, false, 0, user_defined_key_parts, nullptr, altered_table, m_tbl_def->m_dict_info, size)) { __XHANDLER_LOG(ERROR, "XEngineDDL: pack new record errror, table_name: %s", table->s->table_name.str); return HA_EXIT_FAILURE; } } assert(size <= m_max_packed_sk_len); const xengine::common::Slice old_slice = xengine::common::Slice((const char *)m_sk_packed_tuple_old, size); /** For updates, if the keys are the same, then no lock is needed Also check to see if the key has any fields set to NULL. If it does, then this key is unique since NULL is not equal to each other, so no lock is needed. */ if (!Xdb_pk_comparator::bytewise_compare(new_slice, old_slice)) { return HA_EXIT_SUCCESS; } } /** Perform a read to determine if a duplicate entry exists - since this is a secondary indexes a range scan is needed. note: we intentionally don't set options.snapshot here. We want to read the latest committed data. */ const bool all_parts_used = (user_defined_key_parts == kd.get_key_parts()); /**This iterator seems expensive since we need to allocate and free memory for each unique index. If this needs to be optimized, for keys without NULL fields, the extended primary key fields can be migrated to the value portion of the key. This enables using Get() instead of Seek() as in the primary key case. The bloom filter may need to be disabled for this lookup. */ const bool total_order_seek = !can_use_bloom_filter(ha_thd(), kd, new_slice, all_parts_used, is_ascending(kd, HA_READ_KEY_EXACT)); const bool fill_cache = true; // !THDVAR(ha_thd(), skip_fill_cache); // psergey-todo: we just need to take lock, lookups not needed: DBUG_ASSERT(!kd.m_is_reverse_cf); const xengine::common::Status s = lock_unique_key( row_info.tx, kd.get_cf(), new_slice, total_order_seek, fill_cache); if (!s.ok() && !s.IsNotFound()) { __XHANDLER_LOG(WARN, "XEngineDDL: lock_unique_key for key(%s) on index:%d failed with error:%s, table_name: %s", new_slice.ToString(true).c_str(), kd.get_index_number(), s.ToString().c_str(), table->s->table_name.str); auto tbl = is_rebuild ? m_tbl_def->m_inplace_new_tdef : m_tbl_def.get(); return row_info.tx->set_status_error(ki.altered_table->in_use, s, kd, tbl); } if (!kd.m_can_skip_unique_check) { check_new_index_unique_inner(kd, ki, row_info, new_slice, is_rebuild, found); } return HA_EXIT_SUCCESS; } /* unique check and get lock of key for DDL altered table during DML * *@param[in] row_info: row record to insert or update *@param[out] if true it means primary key of new table is changed (either DML * updates the value of pk, or definition of pk is changed by DDL) */ int ha_xengine::check_uniqueness_and_lock_rebuild( const struct update_row_info &row_info, bool *const pk_changed) { int rc = HA_EXIT_SUCCESS; /* check new table index if necessary we need do unique-check for new-pk or new-uk, if failed, make sure dml transaction go ahead, and record duplicate-error, then make ddl transaction failed. */ for (auto &k : m_tbl_def->m_inplace_new_keys) { const Xdb_key_def &kd = *k.first; TABLE *atab = k.second.altered_table; bool found = false; if (kd.is_hidden_primary_key()) { *pk_changed = false; // No need to check with copied hidden pk continue; } else if (kd.is_primary_key()) { *pk_changed = false; // check whether the DML will update primary key rc = lock_and_check_new_index_pk(kd, k.second, row_info, pk_changed, &found); // this primary key is newly added/re-defined // we need convert UPDATE to DEL+PUT *pk_changed = *pk_changed || !kd.m_can_skip_unique_check; } else { rc = lock_and_check_new_index_sk(kd, k.second, row_info, &found, true); } if (rc != HA_EXIT_SUCCESS){ __XHANDLER_LOG(WARN, "check duplicated key failed for new key of inplace DDL, errcode is %d", rc); break; } } /* update new table for inplace rebuild online ddl if necessary */ if (row_info.old_data == nullptr) { // this is for INSERT *pk_changed = false; } return rc; } /** update rebuild new table for inplace_rebuild_ddl @param[in] row_info The DDL operation @param[in] pk_changed flag indicate whether we should convert update to (del,put) @return Success or Failure */ int ha_xengine::update_new_table(struct update_row_info &row_info, bool pk_changed) { int ret = HA_EXIT_SUCCESS; for (auto &k : m_tbl_def->m_inplace_new_keys) { const Xdb_key_def &kd = *k.first; const TABLE *atab = k.second.altered_table; if (k.second.step < Added_key_info::BUILDING_BASE_INDEX) { continue; } DBUG_ASSERT(!row_info.new_pk_slice.empty()); if (kd.is_primary_key()) { ret = update_pk_for_new_table(kd, k.second, row_info, pk_changed, atab); } else { DBUG_ASSERT(kd.is_secondary_key()); ret = update_sk_for_new_table(kd, k.second, row_info, atab); } if (ret > 0) { __XHANDLER_LOG(WARN, "XEngineDDL: update indexes error, code is %d, table_name: %s", ret, table->s->table_name.str); return ret; } } return ret; } xengine::common::Status ha_xengine::delete_or_singledelete_new_table( const TABLE *altered_table, Xdb_tbl_def *new_tbl_def, uint index, Xdb_transaction *const tx, xengine::db::ColumnFamilyHandle *const column_family, const xengine::common::Slice &key) { bool can_use_single_del_opt = false; /* - Secondary Indexes can always use SingleDelete. - If the index is PRIMARY KEY, and if all of the columns of the table are covered by the PRIMARY KEY, SingleDelete can be used. */ if (index != pk_index(altered_table, new_tbl_def)) { can_use_single_del_opt = true; } else if (!has_hidden_pk(altered_table) && altered_table->key_info[index].actual_key_parts == altered_table->s->fields) { can_use_single_del_opt = true; } if (can_use_single_del_opt) return tx->single_delete(column_family, key); return tx->delete_key(column_family, key); } /** update primary key for new table in inplace_rebuild_ddl @param[in] kd, pk_def @param[in] row_info, struct store old-data and new-data @param[in] pk_changed, flag indicate whether we should convert update to (del,put) @param[in] altered_table @return Success or Failure */ int ha_xengine::update_pk_for_new_table(const Xdb_key_def &kd, Added_key_info &ki, const struct update_row_info &row_info, const bool pk_changed, const TABLE *altered_table) { int rc = HA_EXIT_SUCCESS; DBUG_ASSERT(altered_table != nullptr); DBUG_ASSERT(m_tbl_def->m_inplace_new_tdef); const uint key_id = kd.get_keyno(); const bool hidden_pk = is_hidden_pk(key_id, altered_table, m_tbl_def->m_inplace_new_tdef); if (!hidden_pk && pk_changed) { /* * generate old_pk for new-table and delete it */ DBUG_ASSERT(row_info.old_data != nullptr); DBUG_ASSERT(m_pk_packed_tuple_old); uint old_packed_size = 0; if (kd.pack_new_record( table, m_pack_buffer, row_info.old_data, m_pk_packed_tuple_old, nullptr, false, 0, 0, nullptr, altered_table, m_tbl_def->m_dict_info, old_packed_size)) { __XHANDLER_LOG(ERROR, "XEngineDDL: pack old pk for new table error, table_name: %s", table->s->table_name.str); return HA_EXIT_FAILURE; } assert(old_packed_size <= m_max_packed_sk_len); xengine::common::Slice old_pk_slice = xengine::common::Slice(reinterpret_cast(m_pk_packed_tuple_old), old_packed_size); const xengine::common::Status s = delete_or_singledelete_new_table(altered_table, m_tbl_def->m_inplace_new_tdef, key_id, row_info.tx, kd.get_cf(), old_pk_slice); if (!s.ok()) { __XHANDLER_LOG(WARN, "XEngineDDL: failed to delete old record(%s) for updating pk(%u), code is %d, table_name: %s", old_pk_slice.ToString(true).c_str(), kd.get_index_number(), s.code(), table->s->table_name.str); return row_info.tx->set_status_error(table->in_use, s, kd, m_tbl_def.get()); } } // auto_increment already increased in old table, we just use newest value for new_table /* if (table->next_number_field) { update_auto_incr_val(); } */ xengine::common::Slice value_slice; if ((rc = convert_new_record_from_old_record( table, altered_table, m_tbl_def->m_dict_info, row_info.new_pk_slice, row_info.new_pk_unpack_info, &value_slice, m_new_storage_record))) { if (rc == HA_ERR_INVALID_NULL_ERROR) { __XHANDLER_LOG(WARN, "XEngineDDL: convert null to not null failed, code is %d, table_name: %s", rc, table->s->table_name.str); //if errcode is set, set HA_ERR_INVALID_NULL_ERROR is failed, that'ok uint err = HA_EXIT_SUCCESS; ki.status.compare_exchange_strong(err, HA_ERR_INVALID_NULL_ERROR); return HA_EXIT_SUCCESS; } else { __XHANDLER_LOG(ERROR, "XEngineDDL: convert new record from old record error, code is %d, table_name: %s", rc, table->s->table_name.str); return rc; } } const auto cf = kd.get_cf(); if (xengine_enable_bulk_load_api /*&& THDVAR(table->in_use, bulk_load)*/ && !hidden_pk) { /* Write the primary key directly to an SST file using an SstFileWriter */ rc = bulk_load_key(row_info.tx, kd, row_info.new_pk_slice, value_slice, 0); } else if (row_info.skip_unique_check) { /* It is responsibility of the user to make sure that the data being inserted doesn't violate any unique keys. */ row_info.tx->get_blind_write_batch()->Put(cf, row_info.new_pk_slice, value_slice); } else if (row_info.tx->m_ddl_transaction) { /* DDL statement must check for unique key conflicts. For example: ALTER TABLE tbl DROP PRIMARY KEY, ADD PRIMARY KEY(non_unique_column) */ row_info.tx->get_indexed_write_batch()->Put(cf, row_info.new_pk_slice, value_slice); } else { const auto s = row_info.tx->put(cf, row_info.new_pk_slice, value_slice); if (!s.ok()) { if (s.IsBusy()) { __XHANDLER_LOG(WARN, "XEngineDDL: duplicate entry is found during DML for key(%s) on pk:%d, table_name:%s", row_info.new_pk_slice.ToString(true).c_str(), kd.get_index_number(), table->s->table_name.str); errkey = table->s->primary_key; m_dupp_errkey = errkey; rc = HA_ERR_FOUND_DUPP_KEY; // ToDo confirm this is due to duplicate entry which should be handled by DDL } else { __XHANDLER_LOG(WARN, "XEngineDDL: failed to put record(%s:%s) for updating pk(%u), code is %d, table_name: %s", row_info.new_pk_slice.ToString(true).c_str(), value_slice.ToString(true).c_str(), kd.get_index_number(), rc, table->s->table_name.str); rc = row_info.tx->set_status_error(table->in_use, s, *m_pk_descr, m_tbl_def.get()); } } } return rc; } /** update secondary keys for new table in inplace_rebuild_ddl @param[in] kd, xengine key definition @param[in] row_info, struct store old-data and new-data @param[in] altered_table MySQL new table @return Success or Failure */ int ha_xengine::update_sk_for_new_table(const Xdb_key_def &kd, Added_key_info &ki, const struct update_row_info &row_info, const TABLE *const altered_table) { uint new_packed_size; uint old_packed_size; xengine::common::Slice new_key_slice; xengine::common::Slice new_value_slice; xengine::common::Slice old_key_slice; const uint key_id = kd.get_keyno(); /* Can skip updating this key if none of the key fields have changed. */ if (row_info.old_data != nullptr && !m_update_scope.is_set(key_id)) { return HA_EXIT_SUCCESS; } const bool store_row_debug_checksums = should_store_row_debug_checksums(); if (kd.pack_new_record(table, m_pack_buffer, row_info.new_data, m_sk_packed_tuple, &m_sk_tails, store_row_debug_checksums, row_info.hidden_pk_id, 0, nullptr, altered_table, m_tbl_def->m_dict_info, new_packed_size)) { __XHANDLER_LOG(ERROR, "XEngineDDL: pack new record error, table_name: %s", table->s->table_name.str); return HA_EXIT_FAILURE; } DBUG_ASSERT(new_packed_size <= m_max_packed_sk_len); if (row_info.old_data != nullptr) { // The old value if (kd.pack_new_record(table, m_pack_buffer, row_info.old_data, m_sk_packed_tuple_old, &m_sk_tails_old, store_row_debug_checksums, row_info.hidden_pk_id, 0, nullptr, altered_table, m_tbl_def->m_dict_info, old_packed_size)) { __XHANDLER_LOG(ERROR, "XEngineDDL: pack new record error, table_name: %s", table->s->table_name.str); return HA_EXIT_FAILURE; } assert(old_packed_size <= m_max_packed_sk_len); /* Check if we are going to write the same value. This can happen when one does UPDATE tbl SET col='foo' and we are looking at the row that already has col='foo'. We also need to compare the unpack info. Suppose, the collation is case-insensitive, and unpack info contains information about whether the letters were uppercase and lowercase. Then, both 'foo' and 'FOO' will have the same key value, but different data in unpack_info. (note: anyone changing bytewise_compare should take this code into account) */ if (old_packed_size == new_packed_size && m_sk_tails_old.get_current_pos() == m_sk_tails.get_current_pos() && memcmp(m_sk_packed_tuple_old, m_sk_packed_tuple, old_packed_size) == 0 && memcmp(m_sk_tails_old.ptr(), m_sk_tails.ptr(), m_sk_tails.get_current_pos()) == 0) { return HA_EXIT_SUCCESS; } /* Deleting entries from secondary index should skip locking, but be visible to the transaction. (also note that DDL statements do not delete rows, so this is not a DDL statement) */ old_key_slice = xengine::common::Slice( reinterpret_cast(m_sk_packed_tuple_old), old_packed_size); row_info.tx->get_indexed_write_batch()->SingleDelete(kd.get_cf(), old_key_slice); } xengine::db::WriteBatchBase *write_batch; write_batch = row_info.tx->get_indexed_write_batch(); new_key_slice = xengine::common::Slice( reinterpret_cast(m_sk_packed_tuple), new_packed_size); new_value_slice = xengine::common::Slice(reinterpret_cast(m_sk_tails.ptr()), m_sk_tails.get_current_pos()); write_batch->Put(kd.get_cf(), new_key_slice, new_value_slice); return HA_EXIT_SUCCESS; } /** delete for new table in inplace_rebuild_ddl @param[in] row_info, struct store old-data and new-data @return Success or Failure */ int ha_xengine::delete_row_new_table(struct update_row_info &row_info) { DBUG_ENTER_FUNC(); int ret = HA_EXIT_SUCCESS; bool get_new_pk_flag = false; for (auto &k : m_tbl_def->m_inplace_new_keys) { const Xdb_key_def &kd = *k.first; const TABLE *atab = k.second.altered_table; if (k.second.step < Added_key_info::BUILDING_BASE_INDEX) { continue; } /** generate new primary key **/ if (!get_new_pk_flag) { if ((ret = get_new_pk_for_delete(&row_info, atab))) { __XHANDLER_LOG(ERROR, "XEngineDDL: get new primary pk for rebuild ddl error, table_name: %s", table->s->table_name.str); DBUG_RETURN(ret); } get_new_pk_flag = true; } Xdb_transaction *const tx = row_info.tx; DBUG_ASSERT(tx != nullptr); if (kd.is_primary_key()) { const uint index = pk_index(atab, m_tbl_def->m_inplace_new_tdef); xengine::common::Status s = delete_or_singledelete_new_table( atab, m_tbl_def->m_inplace_new_tdef, index, row_info.tx, kd.get_cf(), row_info.new_pk_slice); if (!s.ok()) { __XHANDLER_LOG(WARN, "XEngineDDL: failed to delete record(%s) on pk (%u) " "of new table, code is %d, table_name: %s", row_info.new_pk_slice.ToString(true).c_str(), kd.get_index_number(), s.code(), table->s->table_name.str); DBUG_RETURN( tx->set_status_error(table->in_use, s, *m_pk_descr, m_tbl_def.get())); } } else { uint packed_size; if ((ret = kd.pack_new_record(table, m_pack_buffer, row_info.old_data, m_sk_packed_tuple, &m_sk_tails, false, row_info.hidden_pk_id, 0, nullptr, atab, m_tbl_def->m_dict_info, packed_size))) { __XHANDLER_LOG(ERROR, "XEngineDDL: pack new reocrd error, table_name: %s", table->s->table_name.str); DBUG_RETURN(HA_EXIT_FAILURE); } DBUG_ASSERT(packed_size <= m_max_packed_sk_len); xengine::common::Slice secondary_key_slice( reinterpret_cast(m_sk_packed_tuple), packed_size); /* Deleting on secondary key doesn't need any locks: */ tx->get_indexed_write_batch()->SingleDelete(kd.get_cf(), secondary_key_slice); } if (ret != HA_EXIT_SUCCESS) { __XHANDLER_LOG(ERROR, "XEngineDDL: delete indexes error, code is %d, table_name: %s", ret, table->s->table_name.str); DBUG_RETURN(ret); } } DBUG_RETURN(ret); } /** Build new primary key for delete-stmt in storage format @param[in] row_info, MySQL new/old record @param[in] altered_table MySQL new table @return SUCCESS/FAILURE */ int ha_xengine::get_new_pk_for_delete(struct update_row_info *const row_info, const TABLE *altered_table) { DBUG_ENTER_FUNC(); uint size = 0; DBUG_ASSERT(m_new_pk_descr != nullptr); if (!has_hidden_pk(altered_table)) { row_info->new_pk_unpack_info = &m_pk_unpack_info; if (m_new_pk_descr->pack_new_record( table, m_pack_buffer, row_info->old_data, m_pk_packed_tuple, row_info->new_pk_unpack_info, false, 0, 0, nullptr, altered_table, m_tbl_def->m_dict_info, size)) { __XHANDLER_LOG(ERROR, "XEngineDDL: pack new record error, table_name: %s", table->s->table_name.str); DBUG_RETURN(HA_EXIT_FAILURE); } DBUG_ASSERT(size <= m_max_packed_sk_len); } else { //we don't support drop primary key online-ddl DBUG_ASSERT(has_hidden_pk(altered_table) && has_hidden_pk(table)); if (read_hidden_pk_id_from_rowkey(&row_info->hidden_pk_id, &m_last_rowkey)) { __XHANDLER_LOG(ERROR, "XEngineDDL: read hidden_pk error, table_name: %s", table->s->table_name.str); DBUG_RETURN(HA_EXIT_FAILURE); } //for new subtable_id changed, we can't use m_last_rowkey directly. size = m_new_pk_descr->pack_hidden_pk(row_info->hidden_pk_id, m_pk_packed_tuple); } row_info->new_pk_slice = xengine::common::Slice((const char *)m_pk_packed_tuple, size); DBUG_RETURN(HA_EXIT_SUCCESS); } /** Build new primary key for update-stmt in storage format @param[in] row_info, MySQL new/old record @param[in] altered_table MySQL new table @return SUCCESS/FAILURE */ int ha_xengine::get_new_pk_for_update(struct update_row_info *const row_info, const TABLE *altered_table) { uint size = 0; /* Get new row key for any insert, and any update where the pk is not hidden. Row key for updates with hidden pk is handled below. */ if (!has_hidden_pk(altered_table)) { row_info->hidden_pk_id = 0; row_info->new_pk_unpack_info = &m_pk_unpack_info; if (m_new_pk_descr->pack_new_record( table, m_pack_buffer, row_info->new_data, m_pk_packed_tuple, row_info->new_pk_unpack_info, false, 0, 0, nullptr, altered_table, m_tbl_def->m_dict_info, size)) { __XHANDLER_LOG(ERROR, "XEngineDDL: pack new record error, table_name: %s", table->s->table_name.str); return HA_EXIT_FAILURE; } DBUG_ASSERT(size <= m_max_packed_sk_len); } else { //if new table has hidden_pk, there should be same with old table, //hidden_pk already set in get_pk_for_update DBUG_ASSERT(row_info->hidden_pk_id > 0); size = m_new_pk_descr->pack_hidden_pk(row_info->hidden_pk_id, m_pk_packed_tuple); } row_info->new_pk_slice = xengine::common::Slice((const char *)m_pk_packed_tuple, size); return HA_EXIT_SUCCESS; } /** fill duplicate value into new table record TODO if pk is changed between old_table and new_table, it's hard to get old_table_record from new_pk_slice. 1. if pk_changed, we use unpack_record get value from memcmp_key 2. we try to get old_record, and then fill new duplicate record 3. if step 2 failed(duplicate key deleted), we retry use unpack_record @param[in] index duplicate xengine key definition @param[in] new_table_arg MySQL Table Object @param[in] dup_key duplicate key @param[in] dup_val duplicate value @param[in] rebuild ddl or not */ int ha_xengine::fill_new_duplicate_record(const Xdb_key_def *index, TABLE *new_table_arg, const xengine::common::Slice &dup_key, const xengine::common::Slice &dup_val, bool is_rebuild) { DBUG_ENTER_FUNC(); bool pk_def_changed = false; for (auto &k : m_tbl_def->m_inplace_new_keys) { if (k.first->is_primary_key()) { pk_def_changed = !k.first->m_can_skip_unique_check; break; } } int res; if (pk_def_changed && is_rebuild) { if (index->is_primary_key()) { DBUG_ASSERT(!index->is_hidden_primary_key()); if (index->get_support_icp_flag()) { res = index->unpack_record_pk(new_table_arg, new_table_arg->record[0], &dup_key, &dup_val, m_tbl_def->m_dict_info); } else { res = convert_record_from_storage_format( &dup_key, &dup_val, new_table_arg->record[0], new_table_arg, m_tbl_def->m_dict_info->m_null_bytes_in_rec, m_tbl_def->m_dict_info->m_maybe_unpack_info, index, m_tbl_def->m_dict_info->m_decoders_vect, m_tbl_def->m_dict_info->m_instant_ddl_info, m_tbl_def->m_dict_info->m_verify_row_debug_checksums, m_tbl_def->m_dict_info->m_row_checksums_checked); } } else { res = index->unpack_record(new_table_arg, new_table_arg->record[0], &dup_key, &dup_val, false); } if (res) { __XHANDLER_LOG(ERROR, "XEngineDDL: unable to unpack record from value"); } } else { //for utf8mb4_0900, try to use old-table record if ((res = fill_old_table_record(new_table_arg, table, dup_key, index))) { if (res != HA_ERR_KEY_NOT_FOUND) { __XHANDLER_LOG(ERROR, "XEngineDDL: Fill old table record failed, table_name:%s", table->s->table_name.str); DBUG_RETURN(res); } } if (res == HA_ERR_KEY_NOT_FOUND) { DBUG_ASSERT(!index->is_primary_key()); if (index->unpack_record(new_table_arg, new_table_arg->record[0], &dup_key, &dup_val, false)) { /* Should never reach here */ __XHANDLER_LOG(ERROR, "XEngineDDL: Error retrieving index entry, table_name:%s", table->s->table_name.str); DBUG_ASSERT(0); } __XHANDLER_LOG(INFO, "XEngineDDL: duplicated entry for index:%d is " "deledted in old table:%s during online DML", index->get_index_number(), table->s->table_name.str); } else if ((res = set_duplicate_key_for_print(new_table_arg, table, index, m_tbl_def->m_dict_info, is_rebuild))) { __XHANDLER_LOG(ERROR, "XEngineDDL: Set duplicate record for new table failed, " "table_name:%s", table->s->table_name.str); DBUG_RETURN(res); } else { __XHANDLER_LOG(INFO, "XEngineDDL: Set duplicate record for new table successsfully, " "table_name:%s", table->s->table_name.str); } } DBUG_RETURN(HA_EXIT_SUCCESS); } /** Get lastest row by rowid, used find duplicate-key record. There is a case may duplicate-key record is deleted, then we need use unpack_record to get field-value from memcmp-key.But for utf8mb4_0900_ai_ci, we can't get varchar/char value. Only used by inplace ddl session. */ int ha_xengine::get_latest_row_by_rowid(uchar *const buf, const char *const rowid, const uint rowid_size) { DBUG_ENTER_FUNC(); DBUG_ASSERT(buf != nullptr); DBUG_ASSERT(rowid != nullptr); DBUG_ASSERT(table != nullptr); int ret = HA_EXIT_SUCCESS; xengine::common::Slice key_slice(rowid, rowid_size); Xdb_transaction *const tx = get_or_create_tx(table->in_use); DBUG_ASSERT(tx != nullptr); bool found; xengine::common::Status s; s = tx->get_latest(m_pk_descr->get_cf(), key_slice, &m_retrieved_record); if (!s.IsNotFound() && !s.ok()) { __XHANDLER_LOG(WARN, "XEngienDDL: lock failed for key(%s) on index %u, code is %d, table_name is: %s", key_slice.ToString(true).c_str(), m_pk_descr->get_index_number(), s.code(), table->s->table_name.str); DBUG_RETURN(tx->set_status_error(table->in_use, s, *m_pk_descr, m_tbl_def.get())); } found = !s.IsNotFound(); if (found) { m_last_rowkey.copy((const char *)rowid, rowid_size, &my_charset_bin); ret = convert_record_from_storage_format(&key_slice, m_retrieved_record, buf, table); } else { /* Note: we don't need to unlock the row. It is intentional that we keep locks on rows that don't exist. */ ret = HA_ERR_KEY_NOT_FOUND; } DBUG_RETURN(ret); } /** We need fill old record before print duplicate key information. @param[in] old_table @param[in] dup_key, duplicate_key found during buiding index @param[in] dup_kd, duplicate xengine key definition @return SUCCESS/FAILURE */ int ha_xengine::fill_old_table_record(TABLE *new_table, TABLE *old_table, const xengine::common::Slice &dup_key, const Xdb_key_def *dup_kd) { int ret = HA_EXIT_SUCCESS; uint pk_size = 0; if (dup_kd->is_primary_key()) { //use old_pk index_number memcpy(m_pk_packed_tuple, dup_key.data(), dup_key.size()); xdb_netbuf_store_index(m_pk_packed_tuple, m_pk_descr->get_index_number()); pk_size = dup_key.size(); } else { pk_size = dup_kd->get_primary_key_tuple(new_table, *m_pk_descr, &dup_key, m_pk_packed_tuple); if (pk_size == XDB_INVALID_KEY_LEN) { __XHANDLER_LOG(ERROR, "XEngineDDL: Get primary key failed, table_name: %s", table->s->table_name.str); ret = HA_ERR_INTERNAL_ERROR; } } if (ret != HA_EXIT_SUCCESS) { __XHANDLER_LOG(ERROR, "XEngineDDL: Fill table record failed, table_name: %s", table->s->table_name.str); return ret; } else if ((ret = get_latest_row_by_rowid(old_table->record[0], reinterpret_cast(m_pk_packed_tuple), pk_size))) { if (ret == HA_ERR_KEY_NOT_FOUND) { __XHANDLER_LOG(WARN, "XEngineDDL: duplicate record maybe deleted, table_name: %s", table->s->table_name.str); } else { __XHANDLER_LOG(ERROR, "XEngineDDL: Find duplicate record error, table_name: %s", table->s->table_name.str); } return ret; } return HA_EXIT_SUCCESS; } static int fill_duplicate_blob_val(Field_blob *const old_lob, Field_blob *const new_lob) { const uint length_bytes = old_lob->pack_length() - portable_sizeof_char_ptr; char *data_ptr; memcpy(&data_ptr, old_lob->ptr + length_bytes, sizeof(uchar **)); if (new_lob->store(data_ptr, old_lob->get_length(), old_lob->charset())) { return HA_EXIT_FAILURE; } return HA_EXIT_SUCCESS; } /** Set duplicate field value for new table, the value comes for old_reocrd. If field is added in the ddl, we use default value to fill. @param[in] new_table Altered_table @param[in] old_table table before DDL operation @param[in] kd, duplicate key found during ddl @param[in] dict_info, dictionary used for ddl, such as col_map @param[in] is_rebuild, if ddl is rebuild ddl @return SUCCESS/FAILURE */ static int set_duplicate_key_for_print( TABLE *new_table, TABLE *old_table, const Xdb_key_def *kd, const std::shared_ptr& dict_info, bool is_rebuild) { const bool is_secondary_key = kd->is_secondary_key(); const bool hidden_pk_exists = (new_table->s->primary_key == MAX_INDEXES); const bool is_hidden_pk = kd->is_hidden_primary_key(); DBUG_ASSERT(!is_hidden_pk); uint *col_map = nullptr; if (is_rebuild) { DBUG_ASSERT(dict_info != nullptr); col_map = dict_info->m_col_map; } else { DBUG_ASSERT(dict_info == nullptr); } uint old_col_index = 0; for (uint i = 0; i < kd->get_key_parts(); i++) { /* Hidden pk field is packed at the end of the secondary keys, but the SQL layer does not know about it. Skip retrieving field if hidden pk. */ if ((is_secondary_key && hidden_pk_exists && i + 1 == kd->get_key_parts())) { continue; } Field *field = kd->get_table_field_for_part_no(new_table, i); Field *old_field = nullptr; bool use_old_field = false; if (is_rebuild) { old_col_index = col_map[field->field_index]; if (old_col_index != (uint)(-1)) { old_field = old_table->field[old_col_index]; use_old_field = true; } } else { old_field = old_table->field[field->field_index]; use_old_field = true; } if (use_old_field) { //If lob type is part of key, we need store blob value for new field //Bug #24173631 if (old_field->real_type() == MYSQL_TYPE_BLOB || old_field->real_type() == MYSQL_TYPE_JSON) { DBUG_ASSERT(field->real_type() == old_field->real_type()); if (fill_duplicate_blob_val((Field_blob *)old_field, (Field_blob *)field)) { __XHANDLER_LOG(ERROR, "store duplicated blob field failed, index_id: %d", kd->get_index_number()); return HA_EXIT_FAILURE; } } else { // for non-duplicate sk, may be duplicated because of new primary-key. // DBUG_ASSERT(!old_field->is_real_null()); memcpy(field->ptr, old_field->ptr, old_field->pack_length()); } } else { DBUG_ASSERT(is_rebuild); // blob can not have default value. DBUG_ASSERT(field->real_type() != MYSQL_TYPE_BLOB && field->real_type() != MYSQL_TYPE_JSON); uint field_offset = field->ptr - new_table->record[0]; memcpy(field->ptr, new_table->s->default_values + field_offset, field->pack_length()); } field->set_notnull(); } return HA_EXIT_SUCCESS; } } // namespace myx