polardbxengine/storage/xengine/handler/handler_alter.cc

4651 lines
178 KiB
C++

/*
* Copyright (c) 2020, Alibaba Group Holding Limited
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifdef USE_PRAGMA_IMPLEMENTATION
#pragma implementation // gcc: Class implementation
#endif
#define MYSQL_SERVER 1
/* The C++ file's header */
#include "sql_class.h"
#include "handler_alter.h"
/* C++ standard header files */
#include <algorithm>
#include <queue>
#include <set>
#include <string>
#include <vector>
/* MySQL includes */
#include "debug_sync.h"
#include "my_bit.h"
#include "my_sys.h"
#include "my_stacktrace.h"
#include "sql_table.h"
#include "m_string.h"
#include "mysql/psi/mysql_stage.h"
#include "mysql/psi/mysql_table.h"
#include "mysql/thread_pool_priv.h"
#include "mysys_err.h"
#include "debug_sync.h"
#include "mysqld.h"
#include "sql_table.h"
#include "sql_thd_internal_api.h"
#include "table.h"
#include "my_loglevel.h"
#include "sql/create_field.h"
/* XENGINE includes */
#include "xengine/compaction_filter.h"
#include "xengine/slice_transform.h"
#include "xengine/utilities/checkpoint.h"
#include "xengine/utilities/convenience.h"
#include "xengine/utilities/memory_util.h"
#include "xengine/perf_level.h"
#include "core/util/memory_stat.h"
#include "core/util/sync_point.h"
#include "core/memory/mod_info.h"
#include "core/port/likely.h"
#include "core/monitoring/query_perf_context.h"
#include "core/util/string_util.h"
#include "xengine/utilities/write_batch_with_index.h"
#include "core/options/options_helper.h"
#include "core/logger/logger.h"
#include "utilities/transactions/transaction_db_impl.h"
#include "db/db_impl.h"
#include "db/column_family.h"
#include "db/version_edit.h"
#include "table/internal_iterator.h"
#include "storage/multi_version_extent_meta_layer.h"
/* MyX includes */
#include "./util/xdb_utils.h"
#include "./event_listener.h"
#include "./ha_xengine_proto.h"
#include "./logger.h"
#include "./xdb_cf_manager.h"
#include "./xdb_cf_options.h"
#include "./xdb_datadic.h"
#include "./xdb_mutex_wrapper.h"
#include "./xdb_threads.h"
#include "./log_ddl.h"
using namespace xengine;
using namespace util;
using namespace common;
namespace myx
{
extern Xdb_dict_manager dict_manager;
extern Xdb_cf_manager cf_manager;
extern Xdb_ddl_manager ddl_manager;
extern Xdb_binlog_manager binlog_manager;
extern Xdb_ddl_log_manager ddl_log_manager;
extern Xdb_drop_index_thread xdb_drop_idx_thread;
extern xengine::util::TransactionDB *xdb;
extern int xengine_inplace_populate_indexes;
extern int32_t xengine_shrink_table_space;
extern bool xengine_enable_bulk_load_api;
extern uint32_t xengine_disable_online_ddl;
extern bool xengine_disable_instant_ddl;
extern ulong xengine_sort_buffer_size;
extern xengine::common::DBOptions xengine_db_options;
extern bool xengine_disable_parallel_ddl;
/** Function to convert the Instant_Type to a comparable int */
uint16_t instant_type_to_int(Instant_Type type)
{
return (static_cast<typename std::underlying_type<Instant_Type>::type>(type));
}
/** Determine if this is an instant ALTER TABLE.
This can be checked in *inplace_alter_table() functions, which are called
after check_if_supported_inplace_alter()*/
bool is_instant(const Alter_inplace_info *ha_alter_info)
{
return (ha_alter_info->handler_trivial_ctx !=
instant_type_to_int(Instant_Type::INSTANT_IMPOSSIBLE));
}
/** Determine if one ALTER TABLE can be done instantly on the table
@param[in] ha_alter_info The DDL operation
@return Instant_Type accordingly */
Instant_Type ha_xengine::check_if_support_instant_ddl(
const Alter_inplace_info *ha_alter_info)
{
if (!(ha_alter_info->handler_flags & ~XENGINE_INPLACE_IGNORE)) {
return (Instant_Type::INSTANT_NO_CHANGE);
}
Alter_inplace_info::HA_ALTER_FLAGS alter_inplace_flags =
ha_alter_info->handler_flags & ~XENGINE_INPLACE_IGNORE;
/* If it's an ADD COLUMN without changing existing column orders */
if (alter_inplace_flags == Alter_inplace_info::ADD_STORED_BASE_COLUMN ||
alter_inplace_flags ==
(Alter_inplace_info::ADD_STORED_BASE_COLUMN |
my_core::Alter_inplace_info::CHANGE_CREATE_OPTION)) {
// in CHANGE_CREATE_OPTION only support HA_CREATE_USED_COMMENT
if (alter_inplace_flags &
my_core::Alter_inplace_info::CHANGE_CREATE_OPTION) {
// Only support used_fields with HA_CREATE_USED_COMMENT flag
if (ha_alter_info->create_info->used_fields & ~(HA_CREATE_USED_COMMENT)) {
return (Instant_Type::INSTANT_IMPOSSIBLE);
}
}
return (Instant_Type::INSTANT_ADD_COLUMN);
} else {
return (Instant_Type::INSTANT_IMPOSSIBLE);
}
}
static int set_duplicate_key_for_print(
TABLE *new_table, TABLE *old_table, const Xdb_key_def *kd,
const std::shared_ptr<Xdb_inplace_ddl_dict_info>& dict_info,
bool is_rebuild);
static MY_ATTRIBUTE((warn_unused_result)) bool xengine_need_rebuild(
const Alter_inplace_info *ha_alter_info);
/** Get instant ddl information from Data Dictionary
@param[in] table_def dd table */
void ha_xengine::get_instant_ddl_info_if_needed(const dd::Table *table_def)
{
m_instant_ddl_info.clearup();
for (const auto col : table_def->columns()) {
if (col->is_virtual() || col->is_se_hidden()) {
continue;
}
const dd::Properties &se_private_data = col->se_private_data();
if (!se_private_data.exists(
dd_column_key_strings[DD_INSTANT_COLUMN_DEFAULT_NULL]) &&
!se_private_data.exists(
dd_column_key_strings[DD_INSTANT_COLUMN_DEFAULT])) {
continue;
}
if (se_private_data.exists(
dd_column_key_strings[DD_INSTANT_COLUMN_DEFAULT_NULL])) {
m_instant_ddl_info.instantly_added_default_value_null.push_back(true);
m_instant_ddl_info.instantly_added_default_values.push_back("");
} else if (se_private_data.exists(
dd_column_key_strings[DD_INSTANT_COLUMN_DEFAULT])) {
dd::String_type value;
se_private_data.get(dd_column_key_strings[DD_INSTANT_COLUMN_DEFAULT],
&value);
size_t len;
const byte *decoded_default_value;
DD_instant_col_val_coder coder;
decoded_default_value = coder.decode(value.c_str(), value.length(), &len);
std::string default_value(
reinterpret_cast<const char *>(decoded_default_value), len);
m_instant_ddl_info.instantly_added_default_value_null.push_back(false);
m_instant_ddl_info.instantly_added_default_values.push_back(
default_value);
}
}
if (table_def->se_private_data().exists(
dd_table_key_strings[DD_TABLE_INSTANT_COLS])) {
table_def->se_private_data().get(
dd_table_key_strings[DD_TABLE_INSTANT_COLS],
&m_instant_ddl_info.m_instant_cols);
assert(m_instant_ddl_info.instantly_added_default_values.size() ==
(table->s->fields - m_instant_ddl_info.m_instant_cols));
} else {
assert(m_instant_ddl_info.instantly_added_default_values.empty());
}
if (table_def->se_private_data().exists(
dd_table_key_strings[DD_TABLE_NULL_BYTES])) {
table_def->se_private_data().get(dd_table_key_strings[DD_TABLE_NULL_BYTES],
&m_instant_ddl_info.m_null_bytes);
}
}
/** Get the error message format string.
@return the format string or 0 if not found. */
const char *myx_get_err_msg(int error_code) /*!< in: MySQL error code */
{
return (my_get_err_msg(error_code));
}
/** Checks if inplace alter is supported for a given operation.
@param[in] altered_table new table
@param[in] ha_alter_info The DDL operation */
my_core::enum_alter_inplace_result ha_xengine::check_if_supported_inplace_alter(
TABLE *altered_table, my_core::Alter_inplace_info *const ha_alter_info)
{
DBUG_ENTER_FUNC();
DBUG_ASSERT(ha_alter_info != nullptr);
if (ha_alter_info->handler_flags &
~(XENGINE_INPLACE_IGNORE | XENGINE_ALTER_NOREBUILD |
XENGINE_ALTER_REBUILD)) {
if (ha_alter_info->handler_flags &
Alter_inplace_info::ALTER_STORED_COLUMN_TYPE) {
ha_alter_info->unsupported_reason = myx_get_err_msg(
ER_ALTER_OPERATION_NOT_SUPPORTED_REASON_COLUMN_TYPE);
}
DBUG_RETURN(HA_ALTER_INPLACE_NOT_SUPPORTED);
}
Instant_Type instant_type = check_if_support_instant_ddl(ha_alter_info);
ha_alter_info->handler_trivial_ctx =
instant_type_to_int(Instant_Type::INSTANT_IMPOSSIBLE);
switch (instant_type) {
case Instant_Type::INSTANT_IMPOSSIBLE:
break;
case Instant_Type::INSTANT_ADD_COLUMN:
if ((ha_alter_info->alter_info->requested_algorithm ==
Alter_info::ALTER_TABLE_ALGORITHM_INPLACE) ||
ha_alter_info->error_if_not_empty) {
/* When user request using INPLACE or table may be not empty,
* we have to fall back to INPLACE
*/
break;
}
/* Fall through */
case Instant_Type::INSTANT_NO_CHANGE:
ha_alter_info->handler_trivial_ctx = instant_type_to_int(instant_type);
if (!xengine_disable_instant_ddl) {
DBUG_RETURN(HA_ALTER_INPLACE_INSTANT);
} else {
DBUG_RETURN(HA_ALTER_INPLACE_NOT_SUPPORTED);
}
}
if (ha_alter_info->handler_flags & XENGINE_ALTER_NOT_SUPPORTED) {
DBUG_RETURN(HA_ALTER_INPLACE_NOT_SUPPORTED);
}
if (ha_alter_info->handler_flags &
my_core::Alter_inplace_info::CHANGE_CREATE_OPTION) {
// Only support used_fields with HA_CREATE_USED_COMMENT flag
if (ha_alter_info->create_info->used_fields & ~(HA_CREATE_USED_COMMENT)) {
ha_alter_info->unsupported_reason = "XEngineDDL: only supports to change "
"comment of table";
DBUG_RETURN(HA_ALTER_INPLACE_NOT_SUPPORTED);
}
}
#if 0
/* [BUG] Due to a known issue, we disable support of combination other
* alteration with rename table, so disable it temporarily
* remove this if branch after the bug is fixed
*/
if ((ha_alter_info->handler_flags & Alter_inplace_info::ALTER_RENAME) &&
(ha_alter_info->handler_flags & ~Alter_inplace_info::ALTER_RENAME)) {
__XHANDLER_LOG(WARN, "XEngineDDL: Combination of renaming table with other "
"alteration is disabled due to known issue. table:%s",
table->s->table_name.str);
ha_alter_info->unsupported_reason = "Combination of renaming table with "
"other alteration is disabled due to "
"known issue";
DBUG_RETURN(HA_ALTER_INPLACE_NOT_SUPPORTED);
}
#endif
/* Only support NULL -> NOT NULL change if strict table sql_mode
is set. Fall back to COPY for conversion if not strict tables.
In-Place will fail with an error when trying to convert
NULL to a NOT NULL value. */
if ((ha_alter_info->handler_flags &
Alter_inplace_info::ALTER_COLUMN_NOT_NULLABLE) &&
!(ha_thd()->is_strict_mode())) {
ha_alter_info->unsupported_reason =
myx_get_err_msg(ER_ALTER_OPERATION_NOT_SUPPORTED_REASON_NOT_NULL);
DBUG_RETURN(HA_ALTER_INPLACE_NOT_SUPPORTED);
}
/* DROP PRIMARY KEY is only allowed in combination with ADD
PRIMARY KEY. */
if ((ha_alter_info->handler_flags & (Alter_inplace_info::ADD_PK_INDEX |
Alter_inplace_info::DROP_PK_INDEX)) ==
Alter_inplace_info::DROP_PK_INDEX) {
__XHANDLER_LOG(WARN, "XEngineDDL: drop primary key is only allowed in "
"combination with ADD PRIMARY KEY. table_name: %s",
table->s->table_name.str);
ha_alter_info->unsupported_reason = "With INPLACE DDL, XEngine only allows "
"that DROP PRIMARY KEY is combined with "
"ADD PRIMARY KEY";
DBUG_RETURN(HA_ALTER_INPLACE_NOT_SUPPORTED);
}
/* for a special case, disable inplace DDL
* When a table has all non-nullable columns defined, and has an unique key,
* it will be treated as the PRIMARY KEY. When the non-nullable column is
* changed to nullable, that unique key can't be treated as PRIMARY KEY in
* new table, so we will has a hidden pk in altered_table.
* We can't support inplace DROP a PRIMARY KEY.
*/
if (!has_hidden_pk(table) && has_hidden_pk(altered_table)) {
__XHANDLER_LOG(WARN, "XEngineDDL: drop primary key is only allowed in "
"combination with ADD PRIMARY KEY. table_name: %s",
table->s->table_name.str);
ha_alter_info->unsupported_reason = "With INPLACE DDL, XEngine only allows "
"that DROP PRIMARY KEY is combined with "
"ADD PRIMARY KEY";
DBUG_RETURN(HA_ALTER_INPLACE_NOT_SUPPORTED);
}
/* If a column change from NOT NULL to NULL and there's an implicit pk on this
* column, the table should be rebuild.
* The alteration should only go through the "Copy" method.
*/
/* for xengine, if there is no pk in mysql table, xengine will build
* a hidden pk for table, So It's ok for xengine table.
*/
List_iterator_fast<Create_field> cf_it(
ha_alter_info->alter_info->create_list);
/* Fix the key parts. */
for (KEY *new_key = ha_alter_info->key_info_buffer;
new_key < ha_alter_info->key_info_buffer + ha_alter_info->key_count;
new_key++) {
for (KEY_PART_INFO *key_part = new_key->key_part;
key_part < new_key->key_part + new_key->user_defined_key_parts;
key_part++) {
const Create_field *new_field;
DBUG_ASSERT(key_part->fieldnr < altered_table->s->fields);
cf_it.rewind();
for (uint fieldnr = 0; (new_field = cf_it++); fieldnr++) {
if (fieldnr == key_part->fieldnr) {
break;
}
}
DBUG_ASSERT(new_field);
key_part->field = altered_table->field[key_part->fieldnr];
/* In some special cases InnoDB emits "false"
duplicate key errors with NULL key values. Let
us play safe and ensure that we can correctly
print key values even in such cases. */
key_part->null_offset = key_part->field->null_offset();
key_part->null_bit = key_part->field->null_bit;
if (new_field->field) {
/* This is an existing column. */
continue;
}
/* This is an added column. */
DBUG_ASSERT(ha_alter_info->handler_flags &
Alter_inplace_info::ADD_COLUMN);
DBUG_ASSERT((key_part->field->auto_flags & Field::NEXT_NUMBER) ==
!!(key_part->field->flags & AUTO_INCREMENT_FLAG));
if (key_part->field->flags & AUTO_INCREMENT_FLAG) {
/* We cannot assign an AUTO_INCREMENT
column values during online ALTER. */
DBUG_ASSERT(key_part->field == altered_table->found_next_number_field);
__XHANDLER_LOG(WARN, "XEngineDDL: not support assign a auto_increment "
"column value. table_name: %s",
table->s->table_name.str);
/* for add autoinc column can't not downgrade lock after phase,
* so, inplace ddl cost is the same as copy-ddl.
*/
DBUG_RETURN(HA_ALTER_INPLACE_NOT_SUPPORTED);
}
}
}
/** if xengine_disable_online_ddl is 1, disable online-rebuild ddl,
* if xengine_disable_online_ddl is 2, disable online-rebuild/norebuild ddl,
* default xengine_disable_online_ddl is 0, enable online-rebuild/norebuild ddl.
*/
if (xengine_disable_online_ddl == 1 && xengine_need_rebuild(ha_alter_info)) {
DBUG_RETURN(HA_ALTER_INPLACE_NOT_SUPPORTED);
} else if (xengine_disable_online_ddl == 2) {
DBUG_RETURN(HA_ALTER_INPLACE_NOT_SUPPORTED);
}
DBUG_RETURN(HA_ALTER_INPLACE_NO_LOCK_AFTER_PREPARE);
}
/** Determine if ALTER TABLE needs to rebuild the table.
@param[in] ha_alter_info The DDL operation
@return whether it is necessary to rebuild the table */
static MY_ATTRIBUTE((warn_unused_result)) bool xengine_need_rebuild(
const Alter_inplace_info *ha_alter_info)
{
if (is_instant(ha_alter_info)) {
return (false);
}
Alter_inplace_info::HA_ALTER_FLAGS alter_inplace_flags =
ha_alter_info->handler_flags & ~(XENGINE_INPLACE_IGNORE);
if (alter_inplace_flags == Alter_inplace_info::CHANGE_CREATE_OPTION &&
!(ha_alter_info->create_info->used_fields &
(HA_CREATE_USED_ROW_FORMAT | HA_CREATE_USED_KEY_BLOCK_SIZE |
HA_CREATE_USED_TABLESPACE))) {
/* Any other CHANGE_CREATE_OPTION than changing
ROW_FORMAT, KEY_BLOCK_SIZE or TABLESPACE can be done
without rebuilding the table. */
return (false);
}
return (0 != (ha_alter_info->handler_flags & XENGINE_ALTER_REBUILD));
}
#ifndef NDEBUG
void dbug_create_err_inplace_alter()
{
// This my_printf_error is needed for DEBUG SYNC
my_printf_error(ER_UNKNOWN_ERROR,
"Intentional failure in inplace alter occurred.", MYF(0));
}
#endif
/** check if there is duplicate key during build base data */
bool ha_xengine::check_duplicate_in_base(
const TABLE *table_arg, const Xdb_key_def &index,
const xengine::common::Slice& key, unique_key_buf_info *key_buf_info)
{
bool res = false;
// used for secondary key
uint n_null_fields = 0;
const xengine::util::Comparator *index_comp = index.get_cf()->GetComparator();
/* Get proper key buffer. */
uchar *key_buf = key_buf_info->swap_and_get_key_buf();
uint key_size = 0;
if (index.is_primary_key()) {
DBUG_ASSERT(key.size() <= m_max_packed_sk_len);
memcpy(key_buf, key.data(), key.size());
key_size = key.size();
} else {
/* Get memcmp form of sk without extended pk tail */
key_size = index.get_memcmp_sk_parts(table_arg, key, key_buf, &n_null_fields);
}
key_buf_info->memcmp_key =
xengine::common::Slice(reinterpret_cast<char *>(key_buf), key_size);
if (key_buf_info->memcmp_key_old.size() > 0 && n_null_fields == 0 &&
!index_comp->Compare(key_buf_info->memcmp_key, key_buf_info->memcmp_key_old)) {
res = true;
}
key_buf_info->memcmp_key_old = key_buf_info->memcmp_key;
return res;
}
/*
Create key definition needed for storing data in xengine during ADD index
inplace operations.
@param in
table_arg Table with definition
tbl_def_arg New table def structure being populated
old_tbl_def_arg Old(current) table def structure
cfs Struct array which contains column family information
@return
0 - Ok
other - error, either given table ddl is not supported by xengine or OOM.
*/
int ha_xengine::create_inplace_key_defs(
const TABLE *const table_arg, Xdb_tbl_def *const tbl_def_arg,
const TABLE *const old_table_arg, const Xdb_tbl_def *const old_tbl_def_arg,
const std::array<key_def_cf_info, MAX_INDEXES + 1> &cfs,
const std::array<uint32_t, MAX_INDEXES + 1> &index_ids) const
{
DBUG_ENTER_FUNC();
DBUG_ASSERT(table_arg != nullptr);
DBUG_ASSERT(tbl_def_arg != nullptr);
DBUG_ASSERT(old_tbl_def_arg != nullptr);
std::shared_ptr<Xdb_key_def> *const old_key_descr =
old_tbl_def_arg->m_key_descr_arr;
std::shared_ptr<Xdb_key_def> *const new_key_descr =
tbl_def_arg->m_key_descr_arr;
for (uint i = 0; i < tbl_def_arg->m_key_count; i++) {
const char* new_key_name = get_key_name(i, table_arg, tbl_def_arg);
auto stat_it = new_key_stats.find(new_key_name);
DBUG_ASSERT(stat_it != new_key_stats.end());
const prepare_inplace_key_stat& new_key_stat = stat_it->second;
DBUG_ASSERT(new_key_stat.key_name == new_key_name);
#ifndef NDEBUG
__XHANDLER_LOG(INFO, "key stat: %s", new_key_stat.to_string().c_str());
#endif
if (new_key_stat.key_stat == prepare_inplace_key_stat::COPIED ||
new_key_stat.key_stat == prepare_inplace_key_stat::RENAMED) {
DBUG_ASSERT(new_key_stat.mapped_key_index < m_tbl_def->m_key_count);
/*
Found matching index in old table definition, so copy it over to the
new one created.
*/
const Xdb_key_def &okd = *old_key_descr[new_key_stat.mapped_key_index];
DBUG_ASSERT(okd.m_name == new_key_stat.mapped_key_name);
#if 0
uint16 index_dict_version = 0;
uchar index_type = 0;
uint16 kv_version = 0;
const GL_INDEX_ID gl_index_id = okd.get_gl_index_id();
if (!dict_manager.get_index_info(gl_index_id, &index_dict_version,
&index_type, &kv_version)) {
// NO_LINT_DEBUG
__XHANDLER_LOG(ERROR,
"XEngineDDL: Could not get index information "
"for Index Number (%u,%u), table %s",
gl_index_id.cf_id, gl_index_id.index_id,
old_tbl_def_arg->full_tablename().c_str());
DBUG_RETURN(HA_EXIT_FAILURE);
}
#endif
/*
We can't use the copy constructor because we need to update the
keynr within the pack_info for each field and the keyno of the keydef
itself.
Xdb_key_def is already constructed from dictionary, these meta info
should be up-to-date to use
*/
new_key_descr[i] = std::make_shared<Xdb_key_def>(
okd.get_index_number(), i, okd.get_cf(), okd.m_index_dict_version,
okd.m_index_type, okd.m_kv_format_version, okd.m_is_reverse_cf,
okd.m_is_auto_cf, new_key_name,
dict_manager.get_stats(okd.get_gl_index_id()));
} else if (create_key_def(table_arg, i, tbl_def_arg, &new_key_descr[i],
cfs[i], index_ids[i])) {
DBUG_RETURN(HA_EXIT_FAILURE);
}
DBUG_ASSERT(new_key_descr[i] != nullptr);
new_key_descr[i]->setup(table_arg, tbl_def_arg);
}
DBUG_RETURN(HA_EXIT_SUCCESS);
}
/** Find the same old index definition in old table.
@param[in] table_arg new_table
@param[in] tbl_def_arg new xengine table definition
@param[in] old_table_arg old_table
@param[in] old_tbl_def_arg old xengine table definition
@return map<key_name, key_no>
*/
std::unordered_map<std::string, uint> ha_xengine::get_old_key_positions(
const TABLE *const table_arg, const Xdb_tbl_def *const tbl_def_arg,
const TABLE *const old_table_arg,
const Xdb_tbl_def *const old_tbl_def_arg) const
{
DBUG_ENTER_FUNC();
DBUG_ASSERT(table_arg != nullptr);
DBUG_ASSERT(old_table_arg != nullptr);
DBUG_ASSERT(tbl_def_arg != nullptr);
DBUG_ASSERT(old_tbl_def_arg != nullptr);
std::shared_ptr<Xdb_key_def> *const old_key_descr =
old_tbl_def_arg->m_key_descr_arr;
std::unordered_map<std::string, uint> old_key_pos;
std::unordered_map<std::string, uint> new_key_pos;
uint i;
for (i = 0; i < tbl_def_arg->m_key_count; i++) {
new_key_pos[get_key_name(i, table_arg, tbl_def_arg)] = i;
}
for (i = 0; i < old_tbl_def_arg->m_key_count; i++) {
if (is_hidden_pk(i, old_table_arg, old_tbl_def_arg)) {
old_key_pos[old_key_descr[i]->m_name] = i;
continue;
}
/*
In case of matching key name, need to check key parts of keys as well,
in case a simultaneous drop + add is performed, where the key name is the
same but the key parts are different.
Example:
CREATE TABLE t1 (a INT, b INT, KEY ka(a)) ENGINE=XENGINE;
ALTER TABLE t1 DROP INDEX ka, ADD INDEX ka(b), ALGORITHM=INPLACE;
*/
const KEY *const old_key = &old_table_arg->key_info[i];
const auto &it = new_key_pos.find(old_key->name);
if (it == new_key_pos.end()) {
continue;
}
KEY *const new_key = &table_arg->key_info[it->second];
/* If index algorithms are different we need to rebuild. */
if (old_key->algorithm != new_key->algorithm) {
continue;
}
/*
Check that the key is identical between old and new tables.
If so, we still need to create a new index.
The exception is if there is an index changed from unique to non-unique,
in these cases we don't need to rebuild as they are stored the same way
in
XENGINE.
*/
bool unique_to_non_unique =
(old_key->flags ^ new_key->flags) == HA_NOSAME &&
(old_key->flags & HA_NOSAME);
if (!unique_to_non_unique && ((old_key->flags & HA_KEYFLAG_MASK) !=
(new_key->flags & HA_KEYFLAG_MASK))) {
continue;
}
if (compare_key_parts(old_key, new_key)) {
continue;
}
old_key_pos[old_key->name] = i;
}
DBUG_RETURN(old_key_pos);
}
/* Check two keys to ensure that key parts within keys match */
int ha_xengine::compare_key_parts(const KEY *const old_key,
const KEY *const new_key) const
{
DBUG_ENTER_FUNC();
DBUG_ASSERT(old_key != nullptr);
DBUG_ASSERT(new_key != nullptr);
/* Skip if key parts do not match, as it is a different key */
if (new_key->user_defined_key_parts != old_key->user_defined_key_parts) {
DBUG_RETURN(HA_EXIT_FAILURE);
}
/* Check to see that key parts themselves match */
for (uint i = 0; i < old_key->user_defined_key_parts; i++) {
if (strcmp(old_key->key_part[i].field->field_name,
new_key->key_part[i].field->field_name) != 0) {
DBUG_RETURN(HA_EXIT_FAILURE);
}
/* Check if prefix index key part length has changed */
if (old_key->key_part[i].length != new_key->key_part[i].length) {
DBUG_RETURN(HA_EXIT_FAILURE);
}
}
DBUG_RETURN(HA_EXIT_SUCCESS);
}
static void inplace_alter_table_release(Xdb_tbl_def* &tbl_def)
{
if (nullptr != tbl_def) {
if (nullptr != tbl_def->m_key_descr_arr) {
/* Delete the new key descriptors */
delete[] tbl_def->m_key_descr_arr;
/*
* Explicitly mark as nullptr so we don't accidentally remove entries
* from data dictionary on cleanup (or cause double delete[]).
*/
tbl_def->m_key_descr_arr = nullptr;
}
delete tbl_def;
tbl_def = nullptr;
}
}
/**
* Online create unique index process, non-unique is simpler
*
* t0 t1 t2 t3
* |-----------|------|----|-----
*
* t0: TABLE_CREATE, create table, empty table
* t1: BUILDING_BASE_INDEX, start to create index
* t2: CHECK_UNIQUE_CONSTRAINT, finish building index for (t0, t1), start to check unique
* t3: FINISHED, finish checking unique between (t1, t2) and (t0, t1)
*
* t1-t2: new arrivals unique check against data in (t1, then),
* the uncommitted 2nd index are updated accordingly from now on.
* cf-index is the destination cfh.
* At this time, base-index is not done yet, so data in (t1-t2) only do
* unique check with its own(mem,L0,L1).
*
* t2-t3: we do unique check between(t0,t1),(t1,t2), new arrivals(t2,then)
* unique check against data in (t0, t1) & (t1, t2),
* (t0, t1) and (t1, then) are in separate parts.(t0,t1) locate at L2 layer of
* subtable, and (t1, then) locate at (mem,L0,L1) of subtable.
*
* t3: last phase of inplace_ddl, if there were errors happened during the ddl,
* we rollback new indexes operations, otherwise, we make new indexes available
* for new arrival request.
*/
/**
Allows the storage engine to update internal structures with concurrent
writes blocked. If check_if_supported_inplace_alter() returns
HA_ALTER_INPLACE_NO_LOCK_AFTER_PREPARE or
HA_ALTER_INPLACE_SHARED_AFTER_PREPARE, this function is called with
exclusive lock otherwise the same level of locking as for
inplace_alter_table() will be used.
@note Storage engines are responsible for reporting any errors by
calling my_error()/print_error()
@note If this function reports error, commit_inplace_alter_table()
will be called with commit= false.
@note For partitioning, failing to prepare one partition, means that
commit_inplace_alter_table() will be called to roll back changes for
all partitions. This means that commit_inplace_alter_table() might be
called without prepare_inplace_alter_table() having been called first
for a given partition.
@param altered_table TABLE object for new version of table.
@param ha_alter_info Structure describing changes to be done
by ALTER TABLE and holding data used
during in-place alter.
@param old_table_def dd::Table object describing old version of
the table.
@param new_table_def dd::Table object for the new version of the
table. Can be adjusted by this call if SE
supports atomic DDL. These changes to the
table definition will be persisted in the
data-dictionary at statement commit time.
@retval true Error
@retval false Success
*/
bool ha_xengine::prepare_inplace_alter_table(
TABLE *const altered_table,
my_core::Alter_inplace_info *const ha_alter_info,
const dd::Table *old_table_def, dd::Table *new_table_def)
{
DBUG_ENTER_FUNC();
DBUG_ASSERT(altered_table != nullptr);
DBUG_ASSERT(ha_alter_info != nullptr);
DBUG_ASSERT(old_table_def != nullptr);
DBUG_ASSERT(new_table_def != nullptr);
#ifndef NDEBUG
__XHANDLER_LOG(INFO, "XEngineDDL: prepare alter sql is %s", ha_thd()->query().str);
#endif
if (is_instant(ha_alter_info)) {
// Nothing to do if it's instant ddl here
DBUG_RETURN(HA_EXIT_SUCCESS);
}
int ret = HA_EXIT_SUCCESS;
const uint old_n_keys = m_tbl_def->m_key_count;
uint new_n_keys = altered_table->s->keys;
if (has_hidden_pk(altered_table)) {
new_n_keys += 1;
}
std::shared_ptr<Xdb_key_def> *const old_key_descr =
m_tbl_def->m_key_descr_arr;
auto new_key_descr = new std::shared_ptr<Xdb_key_def>[ new_n_keys ];
auto new_tdef = new Xdb_tbl_def(m_tbl_def->full_tablename());
new_tdef->m_key_descr_arr = new_key_descr;
new_tdef->m_key_count = new_n_keys;
new_tdef->m_auto_incr_val =
m_tbl_def->m_auto_incr_val.load(std::memory_order_relaxed);
new_tdef->m_hidden_pk_val =
m_tbl_def->m_hidden_pk_val.load(std::memory_order_relaxed);
bool need_rebuild = xengine_need_rebuild(ha_alter_info);
if ((ret = prepare_inplace_alter_table_collect_key_stats(
altered_table, ha_alter_info))) {
XHANDLER_LOG(ERROR, "XEngineDDL: failed to collect status of keys for altered table",
"table_name", table->s->table_name.str);
} else if (new_tdef->init_table_id(ddl_manager)) {
XHANDLER_LOG(ERROR, "XEngineDDL: failed to init table_id for creating table",
"table_name", table->s->table_name.str);
ret = HA_EXIT_FAILURE;
} else if ((ret = create_key_defs(altered_table, new_tdef, nullptr, table,
m_tbl_def.get(), need_rebuild))) {
XHANDLER_LOG(ERROR, "XEngineDDL: failed creating new key definitions for altered table",
"table_name", table->s->table_name.str);
} else if (new_tdef->write_dd_table(new_table_def)) {
XHANDLER_LOG(ERROR, "XEngineDDL: failed to write dd::Table",
"table_name", table->s->table_name.str);
ret = HA_EXIT_FAILURE;
} else if (need_rebuild) {
ret = prepare_inplace_alter_table_rebuild(
altered_table, ha_alter_info, new_tdef, old_key_descr, old_n_keys,
new_key_descr, new_n_keys);
if (ret) {
XHANDLER_LOG(ERROR, "XEngineDDL: prepare for online inplace ddl failed",
"table_name", table->s->table_name.str);
}
} else {
ret = prepare_inplace_alter_table_norebuild(
altered_table, ha_alter_info, new_tdef, old_key_descr, old_n_keys,
new_key_descr, new_n_keys);
if (ret) {
XHANDLER_LOG(ERROR, "XEngineDDL: prepare for online inplace ddl failed",
"table_name", table->s->table_name.str);
}
}
if (ret) {
XHANDLER_LOG(ERROR, "XEngineDDL: prepare for online inplace ddl failed",
"table_name", table->s->table_name.str);
inplace_alter_table_release(new_tdef);
} else {
#ifndef NDEBUG
for (auto &it : new_key_stats) {
DBUG_ASSERT(it.first == it.second.key_name);
XHANDLER_LOG(INFO, "XEngineDDL: ", "key stat", it.second.to_string().c_str());
}
#endif
XHANDLER_LOG(INFO, "XEngineDDL: prepare for online inplace ddl successfully",
"table_name", table->s->table_name.str);
}
DBUG_RETURN(ret);
}
int ha_xengine::prepare_inplace_alter_table_collect_key_stats(
TABLE *const altered_table, my_core::Alter_inplace_info *const ha_alter_info)
{
DBUG_ENTER_FUNC();
new_key_stats.clear();
std::map<std::string, uint> all_old_key_names;
// collect key names from old table
for (uint i = 0; i < m_tbl_def->m_key_count; ++i) {
all_old_key_names.emplace(m_tbl_def->m_key_descr_arr[i]->m_name, i);
}
// collect (dropped or renamed) key name from Alter_inplace_info
std::unordered_set<std::string> dropped_key_names;
for (uint i=0; i < ha_alter_info->index_drop_count; ++i) {
dropped_key_names.emplace(ha_alter_info->index_drop_buffer[i]->name);
}
std::map<std::string, std::string> renamed_key_names;
if (ha_alter_info->handler_flags & Alter_inplace_info::RENAME_INDEX) {
for (uint i = 0; i < ha_alter_info->index_rename_count; ++i) {
renamed_key_names.emplace(
ha_alter_info->index_rename_buffer[i].new_key->name,
ha_alter_info->index_rename_buffer[i].old_key->name);
}
}
// iterate over all key info of new table
for (uint i = 0; i < ha_alter_info->key_count; ++i) {
prepare_inplace_key_stat new_key_stat;
new_key_stat.key_name = std::string(ha_alter_info->key_info_buffer[i].name);
auto old_it1 = all_old_key_names.find(new_key_stat.key_name);
if (old_it1 != all_old_key_names.end()) {
if (dropped_key_names.count(new_key_stat.key_name)) {
// key is dropped from old table and re-created in new table
// index of key in old table is ignored
new_key_stat.key_stat = prepare_inplace_key_stat::REDEFINED;
} else {
// key is copied from key old table
new_key_stat.key_stat = prepare_inplace_key_stat::COPIED;
new_key_stat.mapped_key_name = new_key_stat.key_name;
new_key_stat.mapped_key_index = old_it1->second;
}
} else {
auto it_re = renamed_key_names.find(new_key_stat.key_name);
if (it_re != renamed_key_names.end()) {
auto old_it2 = all_old_key_names.find(it_re->second);
DBUG_ASSERT(old_it2 != all_old_key_names.end());
// key is renamed from key in old table
new_key_stat.key_stat = prepare_inplace_key_stat::RENAMED;
new_key_stat.mapped_key_name = old_it2->first;
new_key_stat.mapped_key_index = old_it2->second;
} else {
// key is newly added into new table
new_key_stat.key_stat = prepare_inplace_key_stat::ADDED;
}
}
new_key_stats.emplace(new_key_stat.key_name, new_key_stat);
}
// hidden pk
if (has_hidden_pk(altered_table)) {
prepare_inplace_key_stat new_key_stat;
new_key_stat.key_name = HIDDEN_PK_NAME;
if (has_hidden_pk(table)) {
// hidden pk is copied from old table
new_key_stat.key_stat = prepare_inplace_key_stat::COPIED;
new_key_stat.mapped_key_name = HIDDEN_PK_NAME;
new_key_stat.mapped_key_index = m_tbl_def->m_key_count - 1;
} else {
// primary key is dropped from old table, XEngine will add hidden pk
new_key_stat.key_stat = prepare_inplace_key_stat::ADDED;
}
new_key_stats.emplace(new_key_stat.key_name, new_key_stat);
}
DBUG_RETURN(HA_EXIT_SUCCESS);
}
/** Build col_map[new_column_index, old_column_index],
if col_map[new_column_index] == -1, it means new column added into new_table
@param[in] ha_alter_info
@param[in] altered_table
@param[in] old_table
@param[in/out] dictionary used to pack/unpack new record from old record
@return HA_EXIT_SUCCESS/HA_EXIT_FAILURE */
static int setup_col_maps(Alter_inplace_info *ha_alter_info,
const TABLE *altered_table, const TABLE *old_table,
std::shared_ptr<Xdb_inplace_ddl_dict_info>& dict_info)
{
int ret = HA_EXIT_SUCCESS;
uint n_cols = altered_table->s->fields;
uint old_n_cols = old_table->s->fields;
dict_info->m_col_map = static_cast<uint *>(
my_malloc(PSI_NOT_INSTRUMENTED, (n_cols) * sizeof(uint), MYF(0)));
if (dict_info->m_col_map == nullptr) {
__XHANDLER_LOG(ERROR, "XEngineDDL: alloc memory failed, table_name: %s", old_table->s->table_name.str);
return HA_EXIT_FAILURE;
}
dict_info->m_col_map_rev = static_cast<uint *>(
my_malloc(PSI_NOT_INSTRUMENTED, (old_n_cols) * sizeof(uint), MYF(0)));
if (dict_info->m_col_map_rev == nullptr) {
__XHANDLER_LOG(ERROR, "XEngineDDL: alloc memory failed, table_name: %s", old_table->s->table_name.str);
return HA_EXIT_FAILURE;
}
for (uint i = 0; i < old_n_cols; i++) {
dict_info->m_col_map_rev[i] = (uint)(-1);
}
uint *col_map = dict_info->m_col_map;
uint *col_map_rev = dict_info->m_col_map_rev;
List_iterator_fast<Create_field> cf_it(
ha_alter_info->alter_info->create_list);
uint new_i = 0;
while (const Create_field *new_field = cf_it++) {
uint old_i = 0;
for (; old_i < old_n_cols; old_i++) {
const Field *field = old_table->field[old_i];
if (new_field->field == field) {
break;
}
}
// found
if (old_i < old_n_cols) {
col_map[new_i] = old_i;
col_map_rev[old_i] = new_i;
new_i++;
} else {
col_map[new_i++] = (uint)(-1);
}
}
DBUG_ASSERT(new_i == n_cols);
return ret;
}
/* prepare dictionary info for online-ddl/dml transactions
@param[in] ha_alter_info Data used during in-place alter
@param[in] altered_table MySQL table that is being altered
@param[in] old_table MySQL table as it is before the ALTER operation
@param[in] table_name Table name in MySQL
@retval HA_EXIT_SUCCESS/HA_EXIT_FAILURE success/fail */
int ha_xengine::prepare_inplace_alter_table_dict(
Alter_inplace_info *ha_alter_info, const TABLE *altered_table,
const TABLE *old_table, Xdb_tbl_def *old_tbl_def, const char *table_name)
{
int ret = HA_EXIT_SUCCESS;
old_tbl_def->m_dict_info = std::make_shared<Xdb_inplace_ddl_dict_info>();
if (old_tbl_def->m_dict_info == nullptr) {
__XHANDLER_LOG(ERROR, "XEngineDDL: allocate memory failed, table_name: %s", table->s->table_name.str);
return HA_EXIT_FAILURE;
} else if ((ret = setup_field_converters(
altered_table, m_new_pk_descr, old_tbl_def->m_dict_info->m_encoder_arr,
old_tbl_def->m_dict_info->m_fields_no_needed_to_decode,
old_tbl_def->m_dict_info->m_null_bytes_in_rec,
old_tbl_def->m_dict_info->m_maybe_unpack_info))) {
__XHANDLER_LOG(ERROR, "XEngineDDL: setup field converters failed, table_name: %s", table->s->table_name.str);
} else if ((ret = setup_read_decoders(
altered_table, old_tbl_def->m_dict_info->m_encoder_arr,
old_tbl_def->m_dict_info->m_decoders_vect, true))) {
__XHANDLER_LOG(ERROR, "XEngineDDL: setup read decoders failed, table_name: %s", table->s->table_name.str);
} else if ((ret = setup_col_maps(ha_alter_info, altered_table, old_table,
old_tbl_def->m_dict_info))) {
__XHANDLER_LOG(ERROR, "XEngineDDL: setup column maps failed, table_name: %s", table->s->table_name.str);
} else {
__XHANDLER_LOG(INFO, "XEngineDDL: prepare alter rebuild table dictionary successfully, table_name: %s", table->s->table_name.str);
}
return ret;
}
/** Initialize new xdb_tbl_def and dict info for inplace_rebuild table
@param[in] altered_table new TABLE
@param[in] ha_alter_info The DDL operation
@param[in] new_tdef new xengine tbl def
@param[in] old_key_descr old table key descr
@param[in] old_n_keys number of old table keys
@param[in] new_key_descr new table key descr
@param[in] new_n_keys number of new table keys
@return Success or Failure */
int ha_xengine::prepare_inplace_alter_table_rebuild(
TABLE *const altered_table,
my_core::Alter_inplace_info *const ha_alter_info,
Xdb_tbl_def *const new_tdef,
std::shared_ptr<Xdb_key_def> *const old_key_descr, uint old_n_keys,
std::shared_ptr<Xdb_key_def> *const new_key_descr, uint new_n_keys)
{
DBUG_ENTER_FUNC();
DBUG_ASSERT(altered_table);
DBUG_ASSERT(new_tdef);
// disable subtable major compaction
std::vector<ColumnFamilyHandle *> column_family_handles;
for (uint i = 0; i < new_tdef->m_key_count; i++) {
std::shared_ptr<Xdb_key_def> kd = new_tdef->m_key_descr_arr[i];
xengine::db::ColumnFamilyHandle *cf = kd->get_cf();
column_family_handles.push_back(cf);
}
// disable the data compaction from L1 to L2
xengine::util::TransactionDBImpl *txn_db_impl;
txn_db_impl = dynamic_cast<xengine::util::TransactionDBImpl *>(xdb);
xengine::common::Status s;
s = txn_db_impl->GetDBImpl()->switch_major_compaction(column_family_handles,
false);
if (!s.ok()) {
__XHANDLER_LOG(ERROR, "XEngineDDL: switch_major_compaction fail: %s, table_name: %s",
s.ToString().c_str(), table->s->table_name.str);
DBUG_RETURN(HA_EXIT_FAILURE);
}
// disable the garbage old MVCC version for the unique constraint check.
s = txn_db_impl->GetDBImpl()->disable_backgroud_merge(column_family_handles);
if (!s.ok()) {
__XHANDLER_LOG(ERROR, "XEngineDDL: disable_background_merge fail:%s, table_name: %s",
s.ToString().c_str(), table->s->table_name.str);
DBUG_RETURN(HA_EXIT_FAILURE);
}
// for both update old_tbl and new_tbl
m_tbl_def->m_inplace_new_tdef = new_tdef;
for (uint i = 0; i < new_tdef->m_key_count; i++) {
std::shared_ptr<Xdb_key_def> kd = new_tdef->m_key_descr_arr[i];
m_tbl_def->create_new_keys_info(kd, altered_table);
}
// reallocate memory for pack/unpack buffer,
// make sure new_tdef have enough space for build new table record
free_key_buffers();
int err;
if ((err = alloc_key_buffers(table, m_tbl_def.get(), true))) {
__XHANDLER_LOG(ERROR, "XEngineDDL: failed allocating key buffers during alter, table_name: %s", table->s->table_name.str);
DBUG_RETURN(err);
}
if ((err = prepare_inplace_alter_table_dict(ha_alter_info, altered_table,
table, m_tbl_def.get(),
table->s->table_name.str))) {
__XHANDLER_LOG(ERROR, "XEngineDDL: failed prepare dictionary for online copy ddl, table_name: %s", table->s->table_name.str);
DBUG_RETURN(err);
}
// check whether keys in altered table can skip unique check
prepare_inplace_alter_table_skip_unique_check(altered_table, new_tdef);
ha_alter_info->handler_ctx = new Xdb_inplace_alter_ctx(new_tdef);
#if 0
/*
Add uncommitted key definitions to ddl_manager. We need to do this
so that the property collector can find this keydef when it needs to
update stats. The property collector looks for the keydef in the
data dictionary, but it won't be there yet since this key definition
is still in the creation process.
*/
ddl_manager.add_uncommitted_keydefs(added_indexes);
#endif
DBUG_RETURN(HA_EXIT_SUCCESS);
}
int ha_xengine::prepare_inplace_alter_table_skip_unique_check(
TABLE *const altered_table, Xdb_tbl_def *const new_tbl_def) const
{
DBUG_ENTER_FUNC();
// for rebuilt table, check whether index is copied from old table
for (uint i = 0; i < new_tbl_def->m_key_count; i++) {
auto &kd = new_tbl_def->m_key_descr_arr[i];
auto it = new_key_stats.find(kd->m_name);
DBUG_ASSERT(it != new_key_stats.end());
kd->m_can_skip_unique_check = false;
/* only when the key definition isn't changed, it is copied from old table
* One possible optimization is to skip unique check when columns of the key are re-orderd
* If any key is affected by modified or dropped column, the key will be
* dropped and re-created, if old key is also renamed, new key will be
* created with new name, no more renaming operation.
*/
if (kd->is_hidden_primary_key() ||
!(altered_table->key_info[i].flags & HA_NOSAME) ||
(it->second.key_stat == prepare_inplace_key_stat::COPIED) ||
(it->second.key_stat == prepare_inplace_key_stat::RENAMED)) {
kd->m_can_skip_unique_check = true;
}
}
DBUG_RETURN(HA_EXIT_SUCCESS);
}
/** used convert to new xengine_record from old table_record and new added
columns.
@param[in] table MySQL table as it is before the ALTER operation
@param[in] altered_table MySQL table that is being altered
@param[in] dict_info, dictionary for altered table, read_only
@param[in] pk_packed_slice Packed PK tuple. We need it in order to compute and
store its CRC.
@parama[in] pk_unpack_info unpack_info used to decode PK tuple.
@param[out] Data slice with record data.
@retval HA_EXIT_SUCCESS/HA_EXIT_FAILURE */
int ha_xengine::convert_new_record_from_old_record(
const TABLE *table, const TABLE *altered_table,
const std::shared_ptr<Xdb_inplace_ddl_dict_info>& dict_info,
const xengine::common::Slice &pk_packed_slice,
Xdb_string_writer *const pk_unpack_info,
xengine::common::Slice *const packed_rec,
String& new_storage_record)
{
int ret = HA_EXIT_SUCCESS;
new_storage_record.length(0);
new_storage_record.fill(XENGINE_RECORD_HEADER_LENGTH, 0);
String null_bytes_str;
null_bytes_str.length(0);
null_bytes_str.fill(dict_info->m_null_bytes_in_rec, 0);
new_storage_record.append(null_bytes_str);
// If a primary key may have non-empty unpack_info for certain values,
// (m_maybe_unpack_info=TRUE), we write the unpack_info block. The block
// itself was prepared in Xdb_key_def::pack_new_record.
if (dict_info->m_maybe_unpack_info) {
new_storage_record.append(reinterpret_cast<char *>(pk_unpack_info->ptr()),
pk_unpack_info->get_current_pos());
}
Xdb_field_encoder *encoder_arr = dict_info->m_encoder_arr;
uint *col_map = dict_info->m_col_map;
uint old_col_index = 0;
for (uint i = 0; i < altered_table->s->fields; i++) {
/* field may come from old table or new table.
if new added field, then it comes form altered_table,
else field comes from old_table.
*/
Field *field = nullptr;
old_col_index = col_map[i];
if (old_col_index == (uint)(-1)) {
field = altered_table->field[i];
} else {
field = table->field[old_col_index];
}
//alter operation make column null->not-null
if (field->is_null() && !encoder_arr[i].maybe_null()) {
ret = HA_ERR_INVALID_NULL_ERROR;
}
/* Don't pack decodable PK key parts */
if (encoder_arr[i].m_storage_type != Xdb_field_encoder::STORE_ALL) {
continue;
}
if (encoder_arr[i].maybe_null()) {
char *const data =
(char *)new_storage_record.ptr() + XENGINE_RECORD_HEADER_LENGTH;
if (field->is_null()) {
data[encoder_arr[i].m_null_offset] |= encoder_arr[i].m_null_mask;
continue;
}
}
if (encoder_arr[i].m_field_type == MYSQL_TYPE_BLOB ||
encoder_arr[i].m_field_type == MYSQL_TYPE_JSON) {
auto *field_blob = (my_core::Field_blob *)field;
append_blob_to_storage_format(new_storage_record, field_blob);
} else if (encoder_arr[i].m_field_type == MYSQL_TYPE_VARCHAR) {
auto *field_var = (Field_varstring *)field;
append_varchar_to_storage_format(new_storage_record, field_var);
} else {
/* Copy the field data */
const uint len = field->pack_length_in_rec();
new_storage_record.append(reinterpret_cast<char *>(field->ptr), len);
}
}
if (should_store_row_debug_checksums()) {
const uint32_t key_crc32 = my_core::crc32(
0, xdb_slice_to_uchar_ptr(&pk_packed_slice), pk_packed_slice.size());
const uint32_t val_crc32 =
my_core::crc32(0, xdb_mysql_str_to_uchar_str(&new_storage_record),
new_storage_record.length());
uchar key_crc_buf[XDB_CHECKSUM_SIZE];
uchar val_crc_buf[XDB_CHECKSUM_SIZE];
xdb_netbuf_store_uint32(key_crc_buf, key_crc32);
xdb_netbuf_store_uint32(val_crc_buf, val_crc32);
new_storage_record.append((const char *)&XDB_CHECKSUM_DATA_TAG, 1);
new_storage_record.append((const char *)key_crc_buf, XDB_CHECKSUM_SIZE);
new_storage_record.append((const char *)val_crc_buf, XDB_CHECKSUM_SIZE);
}
*packed_rec = xengine::common::Slice(new_storage_record.ptr(),
new_storage_record.length());
return ret;
}
/** Prepare new added indexes for online inplace ddl.
@param altered_table, new table
@param ha_alter_info, DDL operation
@param new_tdef, new xengine table definition
@param[in] old_key_descr old table key descr
@param[in] old_n_keys number of old table keys
@param[in] new_key_descr new table key descr
@param[in] new_n_keys number of new table keys
@return Success or Failure */
int ha_xengine::prepare_inplace_alter_table_norebuild(
TABLE *const altered_table,
my_core::Alter_inplace_info *const ha_alter_info,
Xdb_tbl_def *const new_tdef,
std::shared_ptr<Xdb_key_def> *const old_key_descr, uint old_n_keys,
std::shared_ptr<Xdb_key_def> *const new_key_descr, uint new_n_keys)
{
DBUG_ENTER_FUNC();
uint i;
uint j;
const KEY *key;
std::unordered_set<std::shared_ptr<Xdb_key_def>> added_indexes;
std::unordered_set<GL_INDEX_ID> dropped_index_ids;
/* Determine which(if any) key definition(s) need to be dropped */
for (i = 0; i < ha_alter_info->index_drop_count; i++) {
key = ha_alter_info->index_drop_buffer[i];
for (j = 0; j < old_n_keys; j++) {
if (!old_key_descr[j]->m_name.compare(key->name)) {
dropped_index_ids.insert(old_key_descr[j]->get_gl_index_id());
break;
}
}
}
/* Determine which(if any) key definitions(s) need to be added */
int drop_indexes_found = 0;
for (i = 0; i < ha_alter_info->index_add_count; i++) {
key = &ha_alter_info->key_info_buffer[ha_alter_info->index_add_buffer[i]];
for (j = 0; j < new_n_keys; j++) {
if (!new_key_descr[j]->m_name.compare(key->name)) {
/*
Check for cases where an 'identical' index is being dropped and
re-added in a single ALTER statement. Turn this into a no-op as the
index has not changed.
E.G. Unique index -> non-unique index requires no change
Note that cases where the index name remains the same but the
key-parts are changed is already handled in create_inplace_key_defs.
In these cases the index needs to be rebuilt.
*/
if (dropped_index_ids.count(new_key_descr[j]->get_gl_index_id())) {
dropped_index_ids.erase(new_key_descr[j]->get_gl_index_id());
drop_indexes_found++;
} else {
added_indexes.insert(new_key_descr[j]);
}
break;
}
}
}
//const std::unique_ptr<xengine::db::WriteBatch> wb = dict_manager.begin();
//xengine::db::WriteBatch *const batch = wb.get();
// disable add-index major compaction
std::vector<ColumnFamilyHandle *> column_family_handles;
for (const auto &k : added_indexes) {
xengine::db::ColumnFamilyHandle *cf = k->get_cf();
column_family_handles.push_back(cf);
}
xengine::util::TransactionDBImpl *txn_db_impl;
txn_db_impl = dynamic_cast<xengine::util::TransactionDBImpl *>(xdb);
xengine::common::Status s;
// disable the data compaction from L1 to L2
s = txn_db_impl->GetDBImpl()->switch_major_compaction(column_family_handles,
false);
if (!s.ok()) {
__XHANDLER_LOG(ERROR, "XEngineDDL: switch_major_compaction fail: %s, table_name: %s",
s.ToString().c_str(), table->s->table_name.str);
DBUG_RETURN(HA_EXIT_FAILURE);
}
// this option is only affect flush data from memtable to L0, this option
// disable the garbage old MVCC version for the unique constraint check.
s = txn_db_impl->GetDBImpl()->disable_backgroud_merge(column_family_handles);
if (!s.ok()) {
__XHANDLER_LOG(ERROR, "XEngineDDL: disable_background_merge fail:%s, table_name: %s",
s.ToString().c_str(), table->s->table_name.str);
DBUG_RETURN(HA_EXIT_FAILURE);
}
// add new indexes as part of m_tbl_def, then we can update new indexes during
// online ddl phase
for (const auto &k : added_indexes) {
m_tbl_def->create_added_key(k, altered_table);
}
/* Update the data dictionary */
std::unordered_set<GL_INDEX_ID> create_index_ids;
for (const auto &index : added_indexes) {
create_index_ids.insert(index->get_gl_index_id());
}
//dict_manager.add_create_index(create_index_ids, batch);
//dict_manager.commit(batch);
const uint n_dropped_keys =
ha_alter_info->index_drop_count - drop_indexes_found;
const uint n_added_keys = ha_alter_info->index_add_count - drop_indexes_found;
DBUG_ASSERT(dropped_index_ids.size() == n_dropped_keys);
DBUG_ASSERT(added_indexes.size() == n_added_keys);
// as new writes are allowed to come immediately after this function returns,
// we should prepare more here than the offline (no-concurrent-write) method
if (ha_alter_info->handler_flags &
(my_core::Alter_inplace_info::ADD_INDEX |
my_core::Alter_inplace_info::ADD_UNIQUE_INDEX)) {
/*
Buffers need to be set up again to account for new, possibly longer
secondary keys.
*/
free_key_buffers();
/*
If adding unique index, allocate special buffers for duplicate checking.
*/
int err;
if ((err = alloc_key_buffers(
table, m_tbl_def.get(),
ha_alter_info->handler_flags &
my_core::Alter_inplace_info::ADD_UNIQUE_INDEX))) {
__XHANDLER_LOG(ERROR, "XEngineDDL: failed allocating key buffers during alter, table_name: %s", table->s->table_name.str);
DBUG_RETURN(err);
}
}
ha_alter_info->handler_ctx = new Xdb_inplace_alter_ctx(
new_tdef, old_key_descr, new_key_descr, old_n_keys, new_n_keys,
added_indexes, dropped_index_ids, n_added_keys, n_dropped_keys);
/*
Add uncommitted key definitions to ddl_manager. We need to do this
so that the property collector can find this keydef when it needs to
update stats. The property collector looks for the keydef in the
data dictionary, but it won't be there yet since this key definition
is still in the creation process.
*/
ddl_manager.add_uncommitted_keydefs(added_indexes);
DBUG_RETURN(HA_EXIT_SUCCESS);
}
/**
Alter the table structure in-place with operations specified using
HA_ALTER_FLAGS and Alter_inplace_info. The level of concurrency allowed
during this operation depends on the return value from
check_if_supported_inplace_alter().
@note Storage engines are responsible for reporting any errors by
calling my_error()/print_error()
@note If this function reports error, commit_inplace_alter_table()
will be called with commit= false.
@param altered_table TABLE object for new version of table.
@param ha_alter_info Structure describing changes to be done
by ALTER TABLE and holding data used
during in-place alter.
@param old_table_def dd::Table object describing old version of
the table.
@param new_table_def dd::Table object for the new version of the
table. Can be adjusted by this call if SE
supports atomic DDL. These changes to the
table definition will be persisted in the
data-dictionary at statement commit time.
@retval true Error
@retval false Success
*/
bool ha_xengine::inplace_alter_table(
TABLE *const altered_table,
my_core::Alter_inplace_info *const ha_alter_info,
const dd::Table *old_table_def, dd::Table *new_table_def)
{
DBUG_ENTER_FUNC();
DBUG_ASSERT(altered_table != nullptr);
DBUG_ASSERT(ha_alter_info != nullptr);
if (is_instant(ha_alter_info)) {
DBUG_RETURN(HA_EXIT_SUCCESS);
}
int res = HA_EXIT_SUCCESS;
DBUG_ASSERT(ha_alter_info->handler_ctx != nullptr);
auto ctx = dynamic_cast<Xdb_inplace_alter_ctx *>(ha_alter_info->handler_ctx);
if (!ctx->m_rebuild && !ctx->m_added_indexes.empty()) {
/* Populate all new secondary keys by scanning the primary key. */
// no rebuild, primary key is not changed
std::shared_ptr<Xdb_key_def> primary_key;
res = inplace_populate_indexes(altered_table, ctx->m_added_indexes,
primary_key, false);
} else if (ctx->m_rebuild) {
/* Populate new primary key-value and new secondary keys for new table. */
res = inplace_populate_new_table(altered_table, ctx->m_new_tdef.get());
}
if (res == HA_ERR_FOUND_DUPP_KEY) {
__XHANDLER_LOG(WARN,
"XEngineDDL: duplicate error happened during ddl "
"ddl_rebuild_type: %d, table_name: %s",
(int)ctx->m_rebuild, table->s->table_name.str);
} else if (res == HA_ERR_INVALID_NULL_ERROR) {
__XHANDLER_LOG(WARN,
"XEngineDDL: convert record error happened during ddl "
"ddl_rebuild_type: %d, table_name: %s",
(int)ctx->m_rebuild, table->s->table_name.str);
} else if (res) {
__XHANDLER_LOG(ERROR,
"XEngineDDL: failed populating secondary key during "
"alter, errcode=%d, ddl_rebuild_type: %d, table_name: %s",
res, (int)ctx->m_rebuild, table->s->table_name.str);
} else {
__XHANDLER_LOG(INFO,
"XEngineDDL: online inplace ddl succcess, rebuild_type: %d, "
"table_name: %s",
(int)ctx->m_rebuild, table->s->table_name.str);
}
DBUG_EXECUTE_IF("myx_simulate_index_create_rollback", {
dbug_create_err_inplace_alter();
DBUG_RETURN(true);
};);
DBUG_RETURN(res != HA_EXIT_SUCCESS);
}
/** Build indexes for new table
@param[in] new_table_arg, new altered table
@param[in] new_tbl_def, new xengine table definition
@return SUCCESS/FAILURE */
int ha_xengine::inplace_populate_new_table(TABLE *const new_table_arg,
Xdb_tbl_def *const new_tbl_def)
{
DBUG_ENTER_FUNC();
#ifndef NDEBUG
__XHANDLER_LOG(INFO, "XEngineDDL: populate new primary key and secondary keys"
" for new table: %s", new_table_arg->s->table_name.str);
#endif
std::unordered_set<std::shared_ptr<Xdb_key_def>> new_indexes;
std::shared_ptr<Xdb_key_def> primary_key;
for (uint i = 0; i < new_tbl_def->m_key_count; i++) {
if (new_tbl_def->m_key_descr_arr[i]->is_primary_key()) {
DBUG_ASSERT(nullptr == primary_key);
primary_key = new_tbl_def->m_key_descr_arr[i];
} else {
#ifndef NDEBUG
auto ret = new_indexes.insert(new_tbl_def->m_key_descr_arr[i]);
DBUG_ASSERT(ret.second);
#else
(void)new_indexes.insert(new_tbl_def->m_key_descr_arr[i]);
#endif
}
}
int ret = HA_EXIT_SUCCESS;
ret = inplace_populate_indexes(new_table_arg, new_indexes, primary_key, true);
// error is handled in inplace_populate_indexes
DBUG_RETURN(ret);
}
/** Avoid the concurrency of creating index and shrinking extent space.
use xengine_inplace_populate_indexes to indicate how many creating tasks
is running.
@param[in] new_table_arg, new mysql table object
@param[in] indexes, new added index from inplace ddl
@param[in] is_rebuild, indicate rebuild table or not
@return SUCCESS/FAILURE
*/
int ha_xengine::inplace_populate_indexes(
TABLE *const new_table_arg,
const std::unordered_set<std::shared_ptr<Xdb_key_def>> &indexes,
const std::shared_ptr<Xdb_key_def>& primary_key,
bool is_rebuild)
{
DBUG_ENTER_FUNC();
int ret = HA_EXIT_SUCCESS;
DBUG_EXECUTE_IF("sleep_before_create_second_index", sleep(2););
xdb_drop_idx_thread.enter_race_condition();
if (xengine_shrink_table_space >= 0) {
__XHANDLER_LOG(ERROR, "XEngineDDL: Shrink extent space is running. table_name: %s", table->s->table_name.str);
ret = HA_ERR_INTERNAL_ERROR;
xdb_drop_idx_thread.exit_race_condition();
DBUG_RETURN(ret);
}
xengine_inplace_populate_indexes++;
#ifndef NDEBUG
__XHANDLER_LOG(INFO, "XEngineDDL: Begin inplace populate indexes "
"xengine_inplace_populate_indexes = %d table_name:%s",
xengine_inplace_populate_indexes, table->s->table_name.str);
#endif
xdb_drop_idx_thread.exit_race_condition();
DBUG_EXECUTE_IF("sleep_in_create_second_index", sleep(5););
Xdb_transaction *tx = get_or_create_tx(table->in_use);
/*
There is one specific scenario where m_sst_info may not be nullptr. This
happens if the handler we're using happens to be the handler where the PK
bulk load was done on. The sequence of events that lead to this is as
follows (T1 is PK bulk load, T2 is SK alter table):
T1: Execute last INSERT statement
T1: Return TABLE and handler object back to Table_cache_manager
T1: Close connection
T2: Execute ALTER statement
T2: Take same TABLE/handler from Table_cache_manager
T2: Call closefrm which will call finalize_bulk_load on every other open
table/handler *except* the one it's on.
T2: Acquire stale snapshot of PK
T1: Call finalize_bulk_load
This is rare because usually, closefrm will call the destructor (and thus
finalize_bulk_load) on the handler where PK bulk load is done. However, if
the thread ids of the bulk load thread and the alter thread differ by a
multiple of table_cache_instances (8 by default), then they hash to the
same bucket in Table_cache_manager and the alter thread will not not call
the destructor on the handler it is holding. Thus, its m_sst_info will not
be nullptr.
At this point, it is safe to refresh the snapshot because we know all other
open handlers have been closed at this point, and the one we're on is the
only one left.
*/
if (m_sst_info != nullptr) {
if ((ret = finalize_bulk_load())) {
__XHANDLER_LOG(ERROR, "XEngineDDL: finalize_bulk_load failed, errcode=%d, table_name: %s", ret, table->s->table_name.str);
DBUG_RETURN(ret);
}
tx->commit();
}
if (nullptr != primary_key) {
#ifndef NDEBUG
__XHANDLER_LOG(INFO, "XEngineDDL: populate primary key: %u for new table: %s, firstly",
primary_key->get_index_number(), new_table_arg->s->table_name.str);
#endif
DBUG_ASSERT(primary_key->is_primary_key());
// build the primary key first, to avoid failure xdb_merge.add() on sk
ret = inplace_populate_index(new_table_arg, primary_key, is_rebuild);
}
if (HA_EXIT_SUCCESS == ret) {
DEBUG_SYNC(ha_thd(), "xengine.inplace_populate_primary_key_done");
// build all secondary keys
for (const auto &index : indexes) {
DBUG_ASSERT(!index->is_primary_key());
if ((ret = inplace_populate_index(new_table_arg, index, is_rebuild))) {
break;
}
}
}
DEBUG_SYNC(ha_thd(), "xengine.inplace_populate_indexes_done");
DBUG_EXECUTE_IF("sleep_after_index_base_creation", sleep(5););
if (ret && ha_thd()->mdl_context.upgrade_shared_lock(table->mdl_ticket,
MDL_EXCLUSIVE, ha_thd()->variables.lock_wait_timeout)) {
XHANDLER_LOG(ERROR, "update mdl_exclusive lock fail");
ret = HA_ERR_INTERNAL_ERROR;
}
/*
* Explicitly tell jemalloc to clean up any unused dirty pages at this point.
* See https://reviews.facebook.net/D63723 for more details.
*/
purge_all_jemalloc_arenas();
DBUG_EXECUTE_IF("crash_during_online_index_creation", DBUG_SUICIDE(););
DBUG_EXECUTE_IF("after_inplace_populate_indexes finish", sleep(5););
// clear the bulk load context whenever build index succeed or failed.
// since the manifest txn don't support span thread, we need ensure the
// bulk load used in single thread, so clear the bulk_load context as
// early as we can.
int ret_bulk_load = HA_EXIT_SUCCESS;
if (m_sst_info != nullptr) {
if ((ret_bulk_load = finalize_bulk_load())) {
__XHANDLER_LOG(ERROR, "XEngineDDL: finalize_bulk_load failed, errcode: %d, table_name: %s",
ret_bulk_load, table->s->table_name.str);
}
tx->commit();
}
ret = (ret != HA_EXIT_SUCCESS) ? ret : ret_bulk_load;
xdb_drop_idx_thread.enter_race_condition();
xengine_inplace_populate_indexes--;
if (ret == HA_ERR_FOUND_DUPP_KEY) {
__XHANDLER_LOG(WARN, "XEngineDDL: duplicate key happened during ddl, table_name: %s", table->s->table_name.str);
} else if (ret == HA_ERR_INVALID_NULL_ERROR) {
__XHANDLER_LOG(WARN, "XEngineDDL: convert new record error during ddl, table_name: %s", table->s->table_name.str);
} else if (ret) {
__XHANDLER_LOG(ERROR, "XEngineDDL: failed to finish inplace populate indexes"
" xengine_inplace_populate_indexes = %d, errcode: %d, table_name: %s",
xengine_inplace_populate_indexes, ret, table->s->table_name.str);
} else {
__XHANDLER_LOG(INFO, "XEngineDDL: inplace populate indexes successfully "
" xengine_inplace_populate_indexes = %d, table_name: %s",
xengine_inplace_populate_indexes, table->s->table_name.str);
}
xdb_drop_idx_thread.exit_race_condition();
DBUG_RETURN(ret);
}
// update Added_key_info.status when duplicate entry is found during inplace DDL
int ha_xengine::inplace_update_added_key_info_status_dup_key(
TABLE *const new_table_arg, const std::shared_ptr<Xdb_key_def>& index,
Added_key_info& added_key_info, const common::Slice& dup_key,
const common::Slice& dup_val, bool is_rebuild, bool use_key/* = true*/)
{
int res = HA_EXIT_SUCCESS;
KEY* key_info = &new_table_arg->key_info[index->get_keyno()];
uint old_status = HA_EXIT_SUCCESS;
bool rc = added_key_info.status.compare_exchange_strong(
old_status, HA_ERR_FOUND_DUPP_KEY);
if (rc) {
if (use_key) {
// using dup_key and dup_val
if ((res = fill_new_duplicate_record(index.get(), new_table_arg,
dup_key, dup_val, is_rebuild))) {
__XHANDLER_LOG(ERROR, "XEngineDDL: Fill new duplicate record error for index %d, error %d, table_name: %s",
index->get_index_number(), res, table->s->table_name.str);
} else {
#ifndef NDEBUG
__XHANDLER_LOG(INFO, "XEngineDDL: Fill new duplicate record for index %d successfully, table_name: %s",
index->get_index_number(), table->s->table_name.str);
#endif
}
} else {
// using table->record[0]
if ((res = set_duplicate_key_for_print(new_table_arg, table, index.get(),
m_tbl_def->m_dict_info, is_rebuild))) {
__XHANDLER_LOG(ERROR, "XEngineDDL: Generate duplicate key for index %d, error %d, table_name:%s",
index->get_index_number(), res, table->s->table_name.str);
} else {
#ifndef NDEBUG
__XHANDLER_LOG(INFO, "XEngineDDL: Generate duplicate key for index %d successfully, table_name:%s",
index->get_index_number(), table->s->table_name.str);
#endif
}
}
added_key_info.dup_key_saved.store(true);
print_keydup_error(new_table_arg, key_info, MYF(0));
res = HA_ERR_FOUND_DUPP_KEY;
} else if (HA_ERR_FOUND_DUPP_KEY == added_key_info.status.load()) {
__XHANDLER_LOG(INFO, "XEngineDDL: other one set duplicate entry for index %d, table_name: %s",
index->get_index_number(), table->s->table_name.str);
// the duplicate key is set by other, we wait for the duplicate key to be saved
while (!added_key_info.dup_key_saved.load()) {}
print_keydup_error(new_table_arg, key_info, MYF(0));
res = HA_ERR_FOUND_DUPP_KEY;
} else {
res = (int)added_key_info.status.load();
__XHANDLER_LOG(INFO, "XEngineDDL: error %u for index %d is set by other, table_name: %s",
res, index->get_index_number(), table->s->table_name.str);
}
return res;
}
void ha_xengine::print_dup_err(int res, std::atomic<int>& dup_ctx_id,
const std::shared_ptr<Xdb_key_def>& index,
bool is_rebuild, TABLE *const new_table_arg,
std::vector<xengine::common::Slice>& dup_key,
std::vector<xengine::common::Slice>& dup_val,
Added_key_info& added_key_info, KEY* key_info) {
if (dup_ctx_id >= 0) {
// ddl dup
res = inplace_update_added_key_info_status_dup_key(new_table_arg, index,
added_key_info, dup_key[dup_ctx_id], dup_val[dup_ctx_id], is_rebuild);
} else if (res == HA_ERR_FOUND_DUPP_KEY) {
// dml dup
print_keydup_error(new_table_arg, key_info, MYF(0));
}
}
void ha_xengine::print_common_err(int res) {
if (res == HA_ERR_INVALID_NULL_ERROR) {
my_error(ER_INVALID_USE_OF_NULL, MYF(0));
} else if (res == HA_ERR_INTERNAL_ERROR) {
my_error(ER_UNKNOWN_ERROR, MYF(0));
} else if (res == HA_ERR_XENGINE_OUT_OF_SORTMEMORY) {
my_printf_error(ER_OUT_OF_SORTMEMORY,
"xengine_sort_buffer_size is too small to process merge. "
"Please set xengine_sort_buffer_size to a higher value.",MYF(0));
}
}
int ha_xengine::build_index(xengine::common::Slice& iter_key,
xengine::common::Slice& iter_val,
ParallelDDLScanCtx& ddl_ctx,
xengine::common::Slice& key,
xengine::common::Slice& val, TABLE* tbl,
bool hidden_pk_exists,
const std::shared_ptr<Xdb_key_def>& index,
bool is_rebuild, TABLE *const new_table_arg) {
int res = HA_EXIT_SUCCESS;
// pk
ddl_ctx.primary_key.copy(iter_key.data(), iter_key.size(), &my_charset_bin);
// record
if ((res = convert_record_from_storage_format(
&iter_key, &iter_val, tbl->record[0], tbl))) {
__XHANDLER_LOG(ERROR, "XEngineDDL: Error getting record, table_name: %s", tbl->s->table_name.str);
}
longlong hidden_pk_id = 0;
if (hidden_pk_exists && read_hidden_pk_id_from_rowkey(
&hidden_pk_id, &ddl_ctx.primary_key)) {
__XHANDLER_LOG(ERROR, "XEngineDDL: Error retrieving hidden pk id, table_name: %s", tbl->s->table_name.str);
return HA_ERR_INTERNAL_ERROR;
}
if (!is_rebuild) {
/* Create new secondary index entry */
const uint new_packed_size = index->pack_record(
tbl, ddl_ctx.pack_buffer, tbl->record[0], ddl_ctx.sk_packed_tuple,
&ddl_ctx.sk_tails, should_store_row_debug_checksums(), hidden_pk_id,
0, nullptr, new_table_arg);
DBUG_ASSERT(new_packed_size <= m_max_packed_sk_len);
key = xengine::common::Slice(
reinterpret_cast<const char *>(ddl_ctx.sk_packed_tuple),
new_packed_size);
val = xengine::common::Slice(
reinterpret_cast<const char *>(ddl_ctx.sk_tails.ptr()),
ddl_ctx.sk_tails.get_current_pos());
} else if (index->is_primary_key()) {
/*** rebuild primary key **/
if (index->is_hidden_primary_key()) {
DBUG_ASSERT(hidden_pk_exists);
}
uint new_packed_size = 0;
if ((res = index->pack_new_record(
tbl, ddl_ctx.pack_buffer, tbl->record[0], ddl_ctx.pk_packed_tuple,
&ddl_ctx.pk_unpack_info, false, hidden_pk_id,
0, nullptr, new_table_arg,
m_tbl_def->m_dict_info, new_packed_size))) {
if (res != HA_ERR_INVALID_NULL_ERROR) {
__XHANDLER_LOG(ERROR, "XEngineDDL: pack new record error, table_name: %s", tbl->s->table_name.str);
}
return res;
}
DBUG_ASSERT(new_packed_size <= m_max_packed_sk_len);
key = xengine::common::Slice(
reinterpret_cast<const char *>(ddl_ctx.pk_packed_tuple),
new_packed_size);
if ((res = convert_new_record_from_old_record(
tbl, new_table_arg, m_tbl_def->m_dict_info, key,
&ddl_ctx.pk_unpack_info, &val, ddl_ctx.new_storage_record))) {
__XHANDLER_LOG(WARN, "XEngineDDL: convert new record error, table_name: %s, code is %d", tbl->s->table_name.str, res);
return res;
}
} else {
/*** rebuild secondary key **/
DBUG_ASSERT(index->is_secondary_key());
uint new_packed_size = 0;
if (index->pack_new_record(
tbl, ddl_ctx.pack_buffer, tbl->record[0], ddl_ctx.sk_packed_tuple,
&ddl_ctx.sk_tails, should_store_row_debug_checksums(),
hidden_pk_id, 0, nullptr, new_table_arg,
m_tbl_def->m_dict_info, new_packed_size)) {
__XHANDLER_LOG(ERROR, "XEngineDDL: pack new record error, table_name: %s", tbl->s->table_name.str);
return HA_EXIT_FAILURE;
}
DBUG_ASSERT(new_packed_size <= m_max_packed_sk_len);
key = xengine::common::Slice(
reinterpret_cast<const char *>(ddl_ctx.sk_packed_tuple),
new_packed_size);
val = xengine::common::Slice(
reinterpret_cast<const char *>(ddl_ctx.sk_tails.ptr()),
ddl_ctx.sk_tails.get_current_pos());
}
return res;
}
int ha_xengine::build_base_global_merge(
std::vector<std::shared_ptr<ParallelDDLScanCtx>>& ddl_ctx_set,
const std::shared_ptr<Xdb_key_def>& index, bool is_rebuild,
TABLE *const new_table_arg, size_t max_threads, bool need_unique_check,
Added_key_info& added_key_info, size_t part_id,
xengine::common::Slice& dup_key, xengine::common::Slice& dup_val,
std::atomic<int>& dup_ctx_id) {
int res = HA_EXIT_SUCCESS;
xengine::db::MiniTables tables;
xengine::storage::ChangeInfo change_info;
xengine::db::ColumnFamilyHandle *const cf = index->get_cf();
DBUG_ASSERT(cf != nullptr);
tables.change_info_ = &change_info;
tables.level = 2;
Xdb_sst_info sst_info(
xdb, m_table_handler->m_table_name, index->get_name(), cf,
xengine_db_options, false/*THDVAR(ha_thd(), trace_sst_api)*/, &tables);
const xengine::util::Comparator *index_comp =
index->get_cf()->GetComparator();
KEY* key_info = &new_table_arg->key_info[index->get_keyno()];
std::vector<xengine::common::Slice> merge_keys(max_threads);
std::vector<xengine::common::Slice> merge_vals(max_threads);
auto cmp = [&merge_keys, &index_comp](const int& lhs, const int& rhs) {
return index_comp->Compare(merge_keys[rhs], merge_keys[lhs]) < 0;};
std::priority_queue<int, std::vector<int>, decltype(cmp)> pq(cmp);
for (int i = 0; i < static_cast<int>(max_threads); ++i) {
if ((res = ddl_ctx_set[i]->bg_merge.next(&merge_keys[i], &merge_vals[i],
part_id)) == 0)
pq.push(i);
else if (res != HA_ERR_END_OF_FILE)
return res;
}
/* must call my_free to release memory before exiting */
uchar* dup_sk_packed_tuple = nullptr;
uchar* dup_sk_packed_tuple_old = nullptr;
if (need_unique_check &&
(!(dup_sk_packed_tuple = reinterpret_cast<uchar *>(my_malloc(
PSI_NOT_INSTRUMENTED, m_max_packed_sk_len, MYF(0)))) ||
!(dup_sk_packed_tuple_old = reinterpret_cast<uchar *>(my_malloc(
PSI_NOT_INSTRUMENTED, m_max_packed_sk_len, MYF(0)))))) {
my_free(dup_sk_packed_tuple);
my_free(dup_sk_packed_tuple_old);
__XHANDLER_LOG(ERROR, "XEngineDDL: allocate memory for sk duplication check fail");
return HA_ERR_INTERNAL_ERROR;
}
unique_key_buf_info key_buf(dup_sk_packed_tuple, dup_sk_packed_tuple_old);
uint64_t merged_count = 0;
while (pq.size() > 0) {
int idx = pq.top();
xengine::common::Slice& merge_key = merge_keys[idx];
xengine::common::Slice& merge_val = merge_vals[idx];
/* Perform uniqueness check if needed */
// xdb_merge.add can only detect duplication in singe (merge) segment
if (need_unique_check) {
if (check_duplicate_in_base(new_table_arg, *index, merge_key, &key_buf)) {
__XHANDLER_LOG(WARN, "XEngineDDL: duplicate entry found during xdb_merge, table_name:%s",
table->s->table_name.str);
dup_key = merge_key;
dup_val = merge_val;
dup_ctx_id = part_id;
res = HA_ERR_FOUND_DUPP_KEY;
break;
}
}
// Check where duplicateentry is found during DML
if (merged_count &&
(0 == merged_count % BUILD_BASE_CHECK_ERROR_FREQUENCY) &&
(res = inplace_check_dml_error(new_table_arg, key_info,
added_key_info, false))) {
__XHANDLER_LOG(WARN, "XEngineDDL: error status found during DML, code is %d, table_name:%s",
res, table->s->table_name.str);
break;
}
/*
Insert key and slice to SST via SSTFileWriter API.
Use mirror index (cfh) to save the index of (t0, t1).
*/
if ((res = sst_info.put(merge_key, merge_val))) {
__XHANDLER_LOG(ERROR, "XEngineDDL: Error while bulk loading keys in "
"external merge sort, table_name:%s",
table->s->table_name.str);
break;
}
++merged_count;
pq.pop();
if ((res = ddl_ctx_set[idx]->bg_merge.next(
&merge_keys[idx], &merge_vals[idx], part_id)) == 0)
pq.push(idx);
else if (res != HA_ERR_END_OF_FILE) // error occur in next2
break;
if (table->in_use->killed) {
res = HA_ERR_QUERY_INTERRUPTED;
break;
}
}
__XHANDLER_LOG(INFO, "PART: %d, CNT: %d", part_id, merged_count);
int bulk_load_ret = sst_info.commit();
if (res == HA_ERR_END_OF_FILE && (res = bulk_load_ret)) {
// NO_LINT_DEBUG
__XHANDLER_LOG(ERROR, "XEngineDDL: Error finishing bulk load, table_name:%s",
table->s->table_name.str);
}
my_free(dup_sk_packed_tuple);
my_free(dup_sk_packed_tuple_old);
return res;
}
int ha_xengine::inplace_build_base_phase_parallel(
TABLE *const new_table_arg, const std::shared_ptr<Xdb_key_def>& index,
bool need_unique_check, Added_key_info& added_key_info, bool is_rebuild) {
__XHANDLER_LOG(INFO, "XEngineDDL: inplace build base parallel start");
DBUG_ENTER_FUNC();
const bool hidden_pk_exists = has_hidden_pk(table);
int res = HA_EXIT_SUCCESS;
Xdb_transaction *tx = get_or_create_tx(table->in_use);
if (ha_thd()->mdl_context.upgrade_shared_lock(
table->mdl_ticket, MDL_EXCLUSIVE,
ha_thd()->variables.lock_wait_timeout)) {
XHANDLER_LOG(ERROR, "XEngineDDL: update mdl_exclusive lock fail");
DBUG_RETURN(HA_ERR_INTERNAL_ERROR);
}
// ensure we get a newest snapshot for the table.
if (tx->has_snapshot()) {
tx->release_snapshot();
}
/*
* For rebuilding of secondary key, rebuild population of primary
* key has been finished at this stage, but duplication may happen
* on primary key during DML before we get snapshot.
*/
if (is_rebuild && index->is_secondary_key()) {
auto iter = std::find_if(
m_tbl_def->m_inplace_new_keys.cbegin(),
m_tbl_def->m_inplace_new_keys.cend(),
[&](const std::pair<const Xdb_key_def*, Added_key_info>& key) {
return key.first->is_primary_key();
});
DBUG_ASSERT(iter != m_tbl_def->m_inplace_new_keys.cend());
KEY *primary_key = &new_table_arg->key_info[iter->first->get_keyno()];
if ((res = inplace_check_dml_error(new_table_arg, primary_key,
iter->second))) {
XHANDLER_LOG(WARN, "XEngineDDL: error happened on primary key "
"while building base for subtable",
"subtable_id", index->get_index_number(), "code", res);
DBUG_RETURN(res);
}
}
const uint pk = pk_index(table, m_tbl_def.get());
ha_index_init(pk, true);
KEY* key_info = &new_table_arg->key_info[index->get_keyno()];
inplace_update_added_key_info_step(added_key_info,
Added_key_info::BUILDING_BASE_INDEX);
table->mdl_ticket->downgrade_lock(MDL_SHARED_UPGRADABLE);
if (index->is_secondary_key()) {
DEBUG_SYNC(ha_thd(), "xengine.inplace_create_sk_scan_base_begin");
} else {
DEBUG_SYNC(ha_thd(), "xengine.inplace_copy_ddl_scan_base_begin");
}
/* Initialize variables for parallelddl */
size_t max_threads = get_parallel_read_threads();
std::vector<std::shared_ptr<ParallelDDLScanCtx>> ddl_ctx_set(max_threads);
const xengine::util::Comparator *index_comp =
index->get_cf()->GetComparator();
ulonglong xengine_merge_combine_read_size =
(xengine_sort_buffer_size * XENGINE_MERGE_COMBINE_READ_SIZE_RATIO);
const size_t xengine_merge_sample_mem_limit = xengine_sort_buffer_size;
for (int i = 0; i < static_cast<int>(ddl_ctx_set.size()); i++) {
ddl_ctx_set[i].reset(new ParallelDDLScanCtx(
this, std::make_shared<Xdb_index_merge>(
thd_xengine_tmpdir(),
xengine_sort_buffer_size,
xengine_merge_combine_read_size / max_threads, index_comp,
max_threads, xengine_merge_sample_mem_limit / max_threads),
max_threads));
if (ddl_ctx_set[i]->init())
DBUG_RETURN(HA_ADMIN_CORRUPT);
}
std::atomic<int> dup_ctx_id{-1};
std::vector<xengine::common::Slice> dup_key(max_threads);
std::vector<xengine::common::Slice> dup_val(max_threads);
/* Step1: Scan all records and build new index */
auto f_scan = [this, &ddl_ctx_set, is_rebuild, new_table_arg, &dup_ctx_id,
hidden_pk_exists, &index, &dup_key, &dup_val](
const xengine::common::ParallelReader::ExecuteCtx *ctx,
db::Iterator *db_iter) {
int res = HA_EXIT_SUCCESS;
ParallelDDLScanCtx* ddl_ctx = ddl_ctx_set[ctx->thread_id_].get();
TABLE& tbl = ddl_ctx->thd_table;
xengine::common::Slice iter_key = db_iter->key();
xengine::common::Slice iter_val = db_iter->value();
xengine::common::Slice key;
xengine::common::Slice val;
if ((res = this->build_index(iter_key, iter_val, *ddl_ctx, key, val,
&tbl, hidden_pk_exists, index, is_rebuild,
new_table_arg))) {
// __XHANDLER_LOG(ERROR, "XEngineDDL: build index error, table_name: %s", tbl.s->table_name.str);
return res;
}
/*
Add record to offset tree in preparation for writing out to
disk in sorted chunks.
*/
bool inserted = false;
res = ddl_ctx->xdb_merge->add(key, val, inserted);
if (res) {
XHANDLER_LOG(ERROR, "Failed add record to sort merge buffer",
"value_size", val.size());
return res;
} else if (!inserted) {
dup_ctx_id = ctx->thread_id_;
dup_key[ctx->thread_id_] = ddl_ctx->xdb_merge->get_dup_key();
dup_val[ctx->thread_id_] = ddl_ctx->xdb_merge->get_dup_val();
return HA_ERR_FOUND_DUPP_KEY;
}
return 0;
};
if ((res = scan_parallel(m_key_descr_arr[pk].get(), tx, std::move(f_scan)))) {
print_dup_err(res, dup_ctx_id, index, is_rebuild, new_table_arg, dup_key,
dup_val, added_key_info, key_info);
ha_index_end();
DBUG_RETURN(res);
}
ha_index_end();
__XHANDLER_LOG(INFO, "Scan Finished");
/* Step2: Sort and choose (partition_num-1) sample sort */
std::vector<xengine::common::Slice> sample_tmp, sample;
for (auto& ctx : ddl_ctx_set) {
ctx->xdb_merge->get_sample(sample_tmp);
}
std::sort(sample_tmp.begin(), sample_tmp.end(),
[index_comp] (const xengine::common::Slice& lhs,
const xengine::common::Slice& rhs) {
return index_comp->Compare(rhs, lhs) > 0;});
size_t split_interval = std::ceil(static_cast<double>(sample_tmp.size()) /
max_threads);
for (size_t i = split_interval; i < sample_tmp.size(); i += split_interval) {
sample.push_back(sample_tmp[i]);
}
__XHANDLER_LOG(INFO, "Deal Sample Finished");
/* Step3: When one thread's scan finished, start background local merge */
auto f_local_merge = [this, &ddl_ctx_set, &sample](size_t thread_id) {
if (thread_id >= ddl_ctx_set.size()) {
DBUG_ASSERT(0);
}
return ddl_ctx_set[thread_id]->bg_merge.merge(sample, table->in_use);
};
ParallelDDLMergeCtx local_merge_ctx(ddl_ctx_set, f_local_merge);
bool inject_err = false;
DBUG_EXECUTE_IF("crash_during_local_merge", inject_err = true;);
DBUG_EXECUTE_IF("parallel_ddl_local_merge_kill",
table->in_use->killed = THD::KILL_QUERY;);
local_merge_ctx.start(max_threads, inject_err);
if ((res = local_merge_ctx.finish())) {
DBUG_EXECUTE_IF("crash_during_local_merge", {
if (ddl_ctx_set[0]->bg_merge.get_interrupt()) {
DEBUG_SYNC(ha_thd(), "xengine.local_merge_interrupted");
};
});
DBUG_RETURN(res);
}
__XHANDLER_LOG(INFO, "Local Merge Finished");
/* Step4: Start global merge, each partition perform an n-way merge of
* n sorted buffers on disk, then writes all results to XENGINE via
* SSTFileWriter API.
*/
DEBUG_SYNC(ha_thd(), "xengine.build_base_global_merge_start");
bool test_stack_err = false;
DBUG_EXECUTE_IF("xengine.test_pddl_stack_err", test_stack_err = true;);
dup_ctx_id = -1;
auto f_global_merge = [this, &ddl_ctx_set, &index, is_rebuild, new_table_arg,
max_threads, need_unique_check, &added_key_info,
&dup_key, &dup_val, &dup_ctx_id, test_stack_err] (size_t part_id) {
if (test_stack_err) {
// wait for function return
sleep(10);
}
return this->build_base_global_merge(ddl_ctx_set, index, is_rebuild,
new_table_arg, max_threads,
need_unique_check, added_key_info,
part_id, dup_key[part_id],
dup_val[part_id], dup_ctx_id);
};
ParallelDDLMergeCtx global_merge_ctx(ddl_ctx_set, f_global_merge);
inject_err = false;
DBUG_EXECUTE_IF("crash_during_global_merge", inject_err = true;);
DBUG_EXECUTE_IF("parallel_ddl_global_merge_kill", table->in_use->killed = THD::KILL_QUERY;);
global_merge_ctx.start(max_threads, inject_err);
DBUG_EXECUTE_IF("xengine.test_pddl_stack_err", DBUG_RETURN(HA_ERR_INTERNAL_ERROR););
if ((res = global_merge_ctx.finish())) {
DBUG_EXECUTE_IF("crash_during_global_merge", {
if (ddl_ctx_set[0]->bg_merge.get_interrupt()) {
DEBUG_SYNC(ha_thd(), "xengine.global_merge_interrupted");
};
});
print_dup_err(res, dup_ctx_id, index, is_rebuild, new_table_arg, dup_key,
dup_val, added_key_info, key_info);
DBUG_RETURN(res);
}
__XHANDLER_LOG(INFO, "Global Merge Finished");
__XHANDLER_LOG(INFO, "XEngineDDL: inplace build base parallel end");
DBUG_RETURN(res);
}
/* Build Base data for new indexes
@param[in] new_table_arg
@param[in] index, xengine index definition
@param[in] need_unique_check, need unique check or not
@param[in] added_key_info: used to call inplace_check_dml_error
@param[in] is_rebuild
@return */
int ha_xengine::inplace_build_base_phase(TABLE *const new_table_arg,
const std::shared_ptr<Xdb_key_def>& index, bool need_unique_check,
Added_key_info& added_key_info, bool is_rebuild)
{
if (!xengine_disable_parallel_ddl && get_parallel_read_threads() > 1)
return inplace_build_base_phase_parallel(new_table_arg, index,
need_unique_check, added_key_info,
is_rebuild);
DBUG_ENTER_FUNC();
const bool hidden_pk_exists = has_hidden_pk(table);
int res = HA_EXIT_SUCCESS;
Xdb_transaction *tx = get_or_create_tx(table->in_use);
const xengine::util::Comparator *index_comp =
index->get_cf()->GetComparator();
ulonglong xengine_merge_combine_read_size =
(xengine_sort_buffer_size * XENGINE_MERGE_COMBINE_READ_SIZE_RATIO);
Xdb_index_merge xdb_merge(thd_xengine_tmpdir(), xengine_sort_buffer_size,
xengine_merge_combine_read_size, index_comp);
if ((res = xdb_merge.init())) {
__XHANDLER_LOG(ERROR, "XEngineDDL: xdb_merge init failed, errcode=%d, table_name: %s", res, table->s->table_name.str);
DBUG_RETURN(res);
}
if (ha_thd()->mdl_context.upgrade_shared_lock(
table->mdl_ticket, MDL_EXCLUSIVE,
ha_thd()->variables.lock_wait_timeout)) {
XHANDLER_LOG(ERROR, "XEngineDDL: update mdl_exclusive lock fail");
DBUG_RETURN(HA_ERR_INTERNAL_ERROR);
}
// ensure we get a newest snapshot for the table.
if (tx->has_snapshot()) {
tx->release_snapshot();
}
/* For rebuilding of secondary key, rebuild population of primary
* key has been finished at this stage, but duplication may happen
* on primary key during DML before we get snapshot.
*/
if (is_rebuild && index->is_secondary_key()) {
auto iter = std::find_if(
m_tbl_def->m_inplace_new_keys.cbegin(),
m_tbl_def->m_inplace_new_keys.cend(),
[&](const std::pair<const Xdb_key_def*, Added_key_info>& key) {
return key.first->is_primary_key();
});
DBUG_ASSERT(iter != m_tbl_def->m_inplace_new_keys.cend());
KEY *primary_key = &new_table_arg->key_info[iter->first->get_keyno()];
if ((res = inplace_check_dml_error(new_table_arg, primary_key,
iter->second))) {
XHANDLER_LOG(WARN, "XEngineDDL: error happened on primary key "
"while building base for subtable",
"subtable_id", index->get_index_number(), "code", res);
DBUG_RETURN(res);
}
}
const uint pk = pk_index(table, m_tbl_def.get());
ha_index_init(pk, true);
KEY* key_info = &new_table_arg->key_info[index->get_keyno()];
// get snapshot
res = index_first(table->record[0]);
if (res && res != HA_ERR_END_OF_FILE) {
__XHANDLER_LOG(ERROR, "XEngineDDL: failed to get first record, errcode=%d, table_name: %s", res, table->s->table_name.str);
ha_index_end();
table->mdl_ticket->downgrade_lock(MDL_SHARED_UPGRADABLE);
DBUG_RETURN(HA_ERR_INTERNAL_ERROR);
}
inplace_update_added_key_info_step(added_key_info, Added_key_info::BUILDING_BASE_INDEX);
table->mdl_ticket->downgrade_lock(MDL_SHARED_UPGRADABLE);
if (index->is_secondary_key()) {
DEBUG_SYNC(ha_thd(), "xengine.inplace_create_sk_scan_base_begin");
} else {
DEBUG_SYNC(ha_thd(), "xengine.inplace_copy_ddl_scan_base_begin");
}
bool processed_error = false;
/* Scan each record in the primary key in order */
for (; res == 0; res = index_next(table->record[0])) {
longlong hidden_pk_id = 0;
if (hidden_pk_exists && read_hidden_pk_id_from_rowkey(&hidden_pk_id, &m_last_rowkey)) {
__XHANDLER_LOG(ERROR, "XEngineDDL: Error retrieving hidden pk id, table_name: %s", table->s->table_name.str);
ha_index_end();
DBUG_RETURN(HA_ERR_INTERNAL_ERROR);
}
xengine::common::Slice key;
xengine::common::Slice val;
if (!is_rebuild) {
/* Create new secondary index entry */
const uint new_packed_size = index->pack_record(
table, m_pack_buffer, table->record[0], m_sk_packed_tuple,
&m_sk_tails, should_store_row_debug_checksums(), hidden_pk_id, 0, nullptr, new_table_arg);
DBUG_ASSERT(new_packed_size <= m_max_packed_sk_len);
key = xengine::common::Slice(
reinterpret_cast<const char *>(m_sk_packed_tuple), new_packed_size);
val = xengine::common::Slice(
reinterpret_cast<const char *>(m_sk_tails.ptr()),
m_sk_tails.get_current_pos());
} else if (index->is_primary_key()) {
/*** rebuild primary key **/
if (index->is_hidden_primary_key()) {
DBUG_ASSERT(hidden_pk_exists);
}
uint new_packed_size = 0;
if ((res = index->pack_new_record(
table, m_pack_buffer, table->record[0], m_pk_packed_tuple,
&m_pk_unpack_info, false, hidden_pk_id,
0, nullptr, new_table_arg,
m_tbl_def->m_dict_info, new_packed_size))) {
if (res != HA_ERR_INVALID_NULL_ERROR) {
__XHANDLER_LOG(ERROR, "XEngineDDL: pack new record error, table_name: %s", table->s->table_name.str);
}
ha_index_end();
DBUG_RETURN(res);
}
DBUG_ASSERT(new_packed_size <= m_max_packed_sk_len);
key = xengine::common::Slice(
reinterpret_cast<const char *>(m_pk_packed_tuple), new_packed_size);
if ((res = convert_new_record_from_old_record(table, new_table_arg,
m_tbl_def->m_dict_info, key,
&m_pk_unpack_info, &val,
m_new_storage_record))) {
__XHANDLER_LOG(WARN, "XEngineDDL: convert new record error, table_name: %s, code is %d", table->s->table_name.str, res);
my_error(ER_INVALID_USE_OF_NULL, MYF(0));
processed_error = true;
break;
}
} else {
/*** rebuild secondary key **/
DBUG_ASSERT(index->is_secondary_key());
uint new_packed_size = 0;
if (index->pack_new_record(
table, m_pack_buffer, table->record[0], m_sk_packed_tuple,
&m_sk_tails, should_store_row_debug_checksums(), hidden_pk_id, 0,
nullptr, new_table_arg, m_tbl_def->m_dict_info,
new_packed_size)) {
__XHANDLER_LOG(ERROR, "XEngineDDL: pack new record error, table_name: %s", table->s->table_name.str);
ha_index_end();
DBUG_RETURN(HA_EXIT_FAILURE);
}
DBUG_ASSERT(new_packed_size <= m_max_packed_sk_len);
key = xengine::common::Slice(
reinterpret_cast<const char *>(m_sk_packed_tuple), new_packed_size);
val = xengine::common::Slice(
reinterpret_cast<const char *>(m_sk_tails.ptr()),
m_sk_tails.get_current_pos());
}
/*
Add record to offset tree in preparation for writing out to
disk in sorted chunks.
*/
bool inserted = false;
res = xdb_merge.add(key, val, inserted);
if (res) {
XHANDLER_LOG(ERROR, "Failed add record to sort merge buffer",
"value_size", val.size());
processed_error = true;
break;
} else if (!inserted) {
// insert failed indicates there is duplicate key in current merge buffer
// For primary key, if xdb_merge.add with failure(m_offset_tree.emplace
// returns false), it means the key here is a duplicate key.
// For secondary key, currently, the key here composites user key
// columns/parts with primary key, even if there is duplication among user
// key columns/parts, the key here will not be same.
if (need_unique_check) {
// for utf8mb4_0900_ai_ci, we can't unpack record directly, so we get
// value from old_record and default value.
// using record[0] to call set_duplicate_key_for_print
res = inplace_update_added_key_info_status_dup_key(new_table_arg,
index, added_key_info, Slice(), Slice(), is_rebuild, false);
} else {
XHANDLER_LOG(ERROR, "XEngineDDL: Unexpected error: failed to do "
"xdb_merge.add while building base don't check unique",
"table_name", table->s->table_name.str);
DBUG_ASSERT(false);
}
ha_index_end();
DBUG_RETURN(res);
}
DBUG_EXECUTE_IF("serial_ddl_scan_phase", table->in_use->killed = THD::KILL_QUERY;);
if (my_core::thd_killed(current_thd)) {
ha_index_end();
DBUG_RETURN(HA_ERR_QUERY_INTERRUPTED);
}
}
if (res != HA_ERR_END_OF_FILE) {
// NO_LINT_DEBUG
if (!processed_error) {
XHANDLER_LOG(ERROR, "XEngineDDL: Error retrieving index entry from primary key",
"code", res, "table_name", table->s->table_name.str);
}
ha_index_end();
DBUG_RETURN(res);
}
ha_index_end();
/*
Perform an n-way merge of n sorted buffers on disk, then writes all
results to XENGINE via SSTFileWriter API.
*/
xengine::common::Slice merge_key;
xengine::common::Slice merge_val;
unique_key_buf_info key_buf(m_dup_sk_packed_tuple, m_dup_sk_packed_tuple_old);
uint64_t merged_count = 0;
while ((res = xdb_merge.next(&merge_key, &merge_val)) == 0) {
/* Perform uniqueness check if needed */
// xdb_merge.add can only detect duplication in singe (merge) segment
if (need_unique_check) {
if (check_duplicate_in_base(new_table_arg, *index, merge_key, &key_buf)) {
/*
Duplicate entry found when trying to create unique secondary key.
We need to unpack the record into new_table_arg->record[0] as it
is used inside print_keydup_error so that the error message shows
the duplicate record.
for utf8mb4_0900_ai_ci, we can't unpack record directly, so we get
value from old_record and default value.
For Unique-PK, we need use pk to search old_table and fill data,
and then fill new_data record.
using merge_key/merge_val to call fill_new_duplicate_record
*/
__XHANDLER_LOG(WARN, "XEngineDDL: duplicate entry found during xdb_merge, table_name:%s",
table->s->table_name.str);
res = inplace_update_added_key_info_status_dup_key(new_table_arg, index,
added_key_info, merge_key, merge_val, is_rebuild);
break;
}
}
// Check where duplicate entry is found during DML
if (merged_count &&
(0 == merged_count % BUILD_BASE_CHECK_ERROR_FREQUENCY) &&
(res = inplace_check_dml_error(new_table_arg, key_info, added_key_info))) {
__XHANDLER_LOG(WARN, "XEngineDDL: error status found during DML, code is %d, table_name:%s",
res, table->s->table_name.str);
break;
}
/*
Insert key and slice to SST via SSTFileWriter API.
Use mirror index (cfh) to save the index of (t0, t1).
*/
if ((res = bulk_load_key(tx, *index, merge_key, merge_val, 2))) {
__XHANDLER_LOG(ERROR, "XEngineDDL: Error while bulk loading keys in "
"external merge sort, table_name:%s",
table->s->table_name.str);
break;
}
++merged_count;
DBUG_EXECUTE_IF("serial_ddl_merge_phase", table->in_use->killed = THD::KILL_QUERY;);
if (my_core::thd_killed(current_thd)) {
// can't return directly, need to finish_bulk_load()
break;
}
}
// we need to finish bulk load whether there was error happened
int bulk_load_ret = tx->finish_bulk_load();
if (my_core::thd_killed(current_thd)) {
DBUG_RETURN(HA_ERR_QUERY_INTERRUPTED);
}
// Here, res == HA_ERR_END_OF_FILE means that we are finished
if (res == HA_ERR_END_OF_FILE && (res = bulk_load_ret)) {
// NO_LINT_DEBUG
__XHANDLER_LOG(ERROR, "XEngineDDL: Error finishing bulk load, table_name:%s",
table->s->table_name.str);
}
DBUG_RETURN(res);
}
/* Check whether given user key sequences have unique conflict
* return true which means there is unique conflict when there are two adjacent
* PUT (after sort by sequence number) in the given user key sequences
*/
bool ha_xengine::check_user_key_sequence(std::vector<UserKeySequence> &user_key_sequences)
{
if (user_key_sequences.size() > 1) {
// sort by sequence in descending order
std::sort(user_key_sequences.begin(), user_key_sequences.end(),
[](const UserKeySequence &key1, const UserKeySequence &key2) {
return key1.first > key2.first;
});
// check all delta sequences and the latest sequence in base for
// current user key to make sure there are no two adjacent PUT op
xengine::db::Iterator::RecordStatus prev_status = xengine::db::Iterator::kError;
for (auto& key_seq : user_key_sequences) {
if (xengine::db::Iterator::kExist == prev_status &&
xengine::db::Iterator::kExist == key_seq.second) {
return true;
}
prev_status = key_seq.second;
}
}
return false;
}
/** check data in (BUILDING_BASE_INDEX, CHECK_UNIQUE_CONSTRAINT)
against (TABLE_CREATE, BUILDING_BASE_INDEX)
@param new_table_arg
@param index, xengine index definition
@param is_rebuild, rebuild table or not */
/* Uniqueness conflict check with DML for online DDL
* Iterate over all user keys from DML operations:
* 1) Using key_seq() and key_status() to get sequence and op
* 2) Using Next to compare with user key at next position
* 3) For sequences of same user key, using check_use_key_sequence to do
* conflict check
*/
int ha_xengine::inplace_check_unique_phase(TABLE *new_table_arg,
const std::shared_ptr<Xdb_key_def>& index, Added_key_info& added_key_info,
bool is_rebuild)
{
DBUG_ENTER_FUNC();
if (index->is_secondary_key()) {
DEBUG_SYNC(ha_thd(), "xengine.inplace_create_sk_check_constraint_begin");
} else {
DEBUG_SYNC(ha_thd(), "xengine.inplace_unique_check_constraint_begin");
}
int res = HA_EXIT_SUCCESS;
KEY* key_info = &new_table_arg->key_info[index->get_keyno()];
if ((res = inplace_check_dml_error(new_table_arg, key_info, added_key_info))) {
__XHANDLER_LOG(WARN, "error status found during DML, code is %d, table_name: %s", res, table->s->table_name.str);
DBUG_RETURN(res);
}
xengine::common::ReadOptions base_opts;
base_opts.read_level_ = xengine::common::kOnlyL2;
base_opts.total_order_seek = true;
xengine::common::ReadOptions delta_opts;
delta_opts.read_level_ = xengine::common::kExcludeL2;
delta_opts.total_order_seek = true;
// also check DEL/SDEL records from delta
delta_opts.skip_del_ = false;
delta_opts.unique_check_ = true;
// Get Snapshot and Set CheckUniquePhase under protect for MDL-Lock
if (ha_thd()->mdl_context.upgrade_shared_lock(
table->mdl_ticket, MDL_EXCLUSIVE,
ha_thd()->variables.lock_wait_timeout)) {
__XHANDLER_LOG(ERROR, "XEngineDDL: update mdl_exclusive lock fail, table_name: %s", table->s->table_name.str);
DBUG_RETURN(HA_ERR_INTERNAL_ERROR);
}
inplace_update_added_key_info_step(added_key_info, Added_key_info::CHECK_UNIQUE_CONSTRAINT);
// (t0, t1) iter
IteratorUptr base_iter(xdb->NewIterator(base_opts, index->get_cf()));
// (t1, t2) iter, snapshot
IteratorUptr delta_iter(xdb->NewIterator(delta_opts, index->get_cf()));
table->mdl_ticket->downgrade_lock(MDL_SHARED_UPGRADABLE);
uint user_defined_key_parts = key_info->user_defined_key_parts;
const bool all_parts_used = (user_defined_key_parts == index->get_key_parts());
Arena arena; // used to do Slice::deep_copy
std::vector<UserKeySequence> user_key_sequences;
bool memcmp_key_inited = false, has_dup = false;
uint memcmp_key_size = 0;
xengine::common::Slice memcmp_key, saved_iter_key, saved_iter_val;
for (delta_iter->SeekToFirst(); delta_iter->Valid(); delta_iter->Next()) {
uint iter_key_memcmp_size = delta_iter->key().size();
if (index->is_secondary_key()) {
uint n_null_fields = 0;
iter_key_memcmp_size = index->get_memcmp_sk_size(
new_table_arg, delta_iter->key(), &n_null_fields);
if (n_null_fields > 0) {
continue;
}
}
if (memcmp_key_inited) {
// check whether this iterator key has same user key
if (memcmp_key_size != iter_key_memcmp_size || (0 != memcmp(
memcmp_key.data(), delta_iter->key().data(), memcmp_key_size))) {
// user key is changed, check whether the old one exists in base or not
base_iter->Seek(memcmp_key);
if (HA_EXIT_SUCCESS ==
read_key_exact(*index, base_iter.get(), all_parts_used, memcmp_key)) {
#ifndef NDEBUG
__XHANDLER_LOG(INFO, "XEngineDDL: find match record %s in L2 on index:%d during DDL",
base_iter->key().ToString(true).c_str(),
index->get_index_number());
#endif
// Use 0 as the sequence number of the record in base tier
user_key_sequences.emplace_back(0, xengine::db::Iterator::kExist);
}
// check all sequences with status for current user key
has_dup = check_user_key_sequence(user_key_sequences);
user_key_sequences.clear();
if (has_dup) {
break;
}
memcmp_key_inited = false;
}
}
if (!memcmp_key_inited) {
if (index->is_secondary_key()) {
uint n_null_fields = 0;
uint sk_memcmp_size = index->get_memcmp_sk_parts(
new_table_arg, delta_iter->key(), m_dup_sk_packed_tuple,
&n_null_fields);
DBUG_ASSERT(n_null_fields == 0);
DBUG_ASSERT(sk_memcmp_size == iter_key_memcmp_size);
memcmp_key = xengine::common::Slice(
reinterpret_cast<char *>(m_dup_sk_packed_tuple), sk_memcmp_size);
} else {
memcmp_key = delta_iter->key().deep_copy(arena);
}
memcmp_key_size = iter_key_memcmp_size;
memcmp_key_inited = true;
#ifndef NDEBUG
__XHANDLER_LOG(INFO, "XEngineDDL: unique check for %s(size=%u) on index:%d during DDL",
memcmp_key.ToString(true).c_str(), memcmp_key_size,
index->get_index_number());
#endif
}
// collect status and sequences number for the same user key
auto delta_sequence = delta_iter->key_seq();
auto status = delta_iter->key_status();
DBUG_ASSERT(xengine::db::Iterator::kNonExist != status);
user_key_sequences.emplace_back(delta_sequence, status);
// To print duplicate entry we may need value information to unpack the record
// save the first PUT record for later usage if duplicate entry is found
if (xengine::db::Iterator::kExist == status && saved_iter_val.empty()) {
saved_iter_key = delta_iter->key().deep_copy(arena);
saved_iter_val = delta_iter->value().deep_copy(arena);
}
#ifndef NDEBUG
__XHANDLER_LOG(INFO, "XEngineDDL: find match record %s(%u, %s) in L0&L1 on index:%d during DDL",
delta_iter->key().ToString(true).c_str(), delta_sequence,
(xengine::db::Iterator::kExist == status) ? "PUT":"DEL",
index->get_index_number());
#endif
}
if (!has_dup && memcmp_key_inited && !user_key_sequences.empty()) {
/* check whether the user key exists in base or not for last user key when
* delta_iter is not valid
*/
base_iter->Seek(memcmp_key);
if (HA_EXIT_SUCCESS ==
read_key_exact(*index, base_iter.get(), all_parts_used, memcmp_key)) {
#ifndef NDEBUG
__XHANDLER_LOG(INFO, "XEngineDDL: find match record %s in L2 on index:%d during DDL",
base_iter->key().ToString(true).c_str(),
index->get_index_number());
#endif
// Use 0 as the sequence number of the record in base tier
user_key_sequences.emplace_back(0, xengine::db::Iterator::kExist);
}
// check all sequences with status for current user key
has_dup = check_user_key_sequence(user_key_sequences);
user_key_sequences.clear();
}
if (has_dup) {
DBUG_ASSERT(!saved_iter_key.empty());
__XHANDLER_LOG(WARN, "XEngineDDL: duplicate entry is found %s during DDL on index:%d",
saved_iter_key.ToString(true).c_str(),
index->get_index_number());
// Current user key is duplicate entry
// using saved_iter_key/saved_iter_val to call fill_new_duplicate_record
res = inplace_update_added_key_info_status_dup_key(new_table_arg, index,
added_key_info, saved_iter_key, saved_iter_val, is_rebuild);
}
if (index->is_secondary_key()) {
DEBUG_SYNC(ha_thd(), "xengine.inplace_create_sk_check_constraint_done");
} else {
DEBUG_SYNC(ha_thd(), "xengine.inplace_unique_check_constraint_done");
}
/**
* at this point, cf-index has all the data (t0, now)
*
* new data is still allowed to come before commit, it goes to cf-index
* as usual since the lock is released, and still check unique against
* cf-index, cf-mirror has already removed and not checked anymore.
*
* upper layer will call commit_inplace_alter_table(..., commit=true)
* even duplicates happen here, but that function notices and returns
* error, then another commit_inplace_alter_table(..., commit=false) will
* be called.
*/
DBUG_RETURN(res);
}
/**
* Update status of index during inplace_populate_index
*/
int ha_xengine::inplace_update_added_key_info_step(
Added_key_info& added_key_info, Added_key_info::ADD_KEY_STEP step) const
{
DBUG_ENTER_FUNC();
DBUG_ASSERT(step >= Added_key_info::TABLE_CREATE &&
step <= Added_key_info::FINISHED);
added_key_info.step = step;
DBUG_RETURN(HA_EXIT_SUCCESS);
}
/**
* Check where duplicate status is set by DML during inplace populate index
*
* @param table_arg: used to call print_keydup_error
* @param key: used to call print_keydup_error
* @param added_key_info: used to check status
* @param print_err: used to identify whether print err
*/
int ha_xengine::inplace_check_dml_error(TABLE *const table_arg, KEY *key,
const Added_key_info& added_key_info, bool print_err)
{
DBUG_ENTER_FUNC();
DBUG_ASSERT(nullptr != table_arg);
if (HA_ERR_FOUND_DUPP_KEY == added_key_info.status.load()) {
// wait dup_key dup_val saved.
while (!added_key_info.dup_key_saved.load()) {
}
// new table duplicate record is set by dml session
if (print_err) {
// can't print err in child thread
print_keydup_error(table_arg, key, MYF(0));
}
DBUG_RETURN(HA_ERR_FOUND_DUPP_KEY);
} else if (HA_ERR_INVALID_NULL_ERROR == added_key_info.status.load()) {
if (print_err) {
my_error(ER_INVALID_USE_OF_NULL, MYF(0));
}
DBUG_RETURN(HA_ERR_INVALID_NULL_ERROR);
}
DBUG_RETURN(HA_EXIT_SUCCESS);
}
/** Scan the Primary Key index entries and populate the new add index keys.
for new table, we build new storage record from old_table_record and default values.
@param new_table_arg
@param index, new added index
@param is_rebuild, rebuild table or not
@return SUCCESS/FAILURE */
int ha_xengine::inplace_populate_index(TABLE *const new_table_arg,
const std::shared_ptr<Xdb_key_def>& index,
bool is_rebuild)
{
DBUG_ENTER_FUNC();
int res = HA_EXIT_SUCCESS;
bool need_unique_check = false;
KEY* key_info = &new_table_arg->key_info[index->get_keyno()];
if (!index->is_hidden_primary_key()) {
need_unique_check = is_rebuild ? !index->m_can_skip_unique_check :
key_info->flags & HA_NOSAME;
}
std::unordered_map<const Xdb_key_def *, Added_key_info>::iterator iter;
if (!is_rebuild) {
iter = m_tbl_def->m_added_key.find(index.get());
DBUG_ASSERT(iter != m_tbl_def->m_added_key.end());
} else {
iter = m_tbl_def->m_inplace_new_keys.find(index.get());
DBUG_ASSERT(iter != m_tbl_def->m_inplace_new_keys.end());
}
if ((res = inplace_build_base_phase(
new_table_arg, index, need_unique_check, iter->second, is_rebuild))) {
print_common_err(res);
if (res != HA_ERR_FOUND_DUPP_KEY && res != HA_ERR_INVALID_NULL_ERROR) {
__XHANDLER_LOG(ERROR,
"XEngineDDL: build base error, code: %d, table_name: %s",
res, table->s->table_name.str);
}
} else if (need_unique_check &&
(res = inplace_check_unique_phase(new_table_arg, index, iter->second, is_rebuild))) {
if (res != HA_ERR_FOUND_DUPP_KEY) {
__XHANDLER_LOG(ERROR, "XEngineDDL: check unique phase error, code: %d, table_name: %s",
res, table->s->table_name.str);
}
} else if ((res = inplace_update_added_key_info_step(
iter->second, Added_key_info::FINISHED))) {
__XHANDLER_LOG(ERROR, "Failed to update state of Added_key_info::step after unique check");
} else {
#ifndef NDEBUG
__XHANDLER_LOG(INFO, "XEngineDDL: build index %d for table %s successfully",
index->get_index_number(), table->s->table_name.str);
#endif
}
if (res == HA_ERR_FOUND_DUPP_KEY) {
__XHANDLER_LOG(WARN, "XEngineDDL: duplicated error happened during build index %d for table %s",
index->get_index_number(), table->s->table_name.str);
}
DBUG_RETURN(res);
}
/** Update metadata in commit phase for instant ADD COLUMN.
Basically, it should remember number of instant columns,
and the default value of newly added columns.
Note this function should only update the metadata
which would not result in failure
@param[in] new_table New InnoDB table object
@param[in] old_table MySQL table as it is before the ALTER operation
@param[in] altered_table MySQL table that is being altered
@param[in] old_dd_tab Old dd::Table
@param[in,out] new_dd_tab New dd::Table */
void ha_xengine::dd_commit_instant_table(const TABLE *old_table,
const TABLE *altered_table,
const dd::Table *old_dd_tab,
dd::Table *new_dd_tab)
{
assert(old_dd_tab->columns().size() <= new_dd_tab->columns()->size());
// pass DD_TABLE_INSTANT_COLS from old to new DD
if (!new_dd_tab->se_private_data().exists(
dd_table_key_strings[DD_TABLE_INSTANT_COLS])) {
uint32_t instant_cols = old_table->s->fields;
if (dd_table_has_instant_cols(*old_dd_tab)) {
old_dd_tab->se_private_data().get(
dd_table_key_strings[DD_TABLE_INSTANT_COLS], &instant_cols);
}
new_dd_tab->se_private_data().set(
dd_table_key_strings[DD_TABLE_INSTANT_COLS], instant_cols);
}
// pass DD_TABLE_NULL_BYTES from old to new DD
if (!new_dd_tab->se_private_data().exists(
dd_table_key_strings[DD_TABLE_NULL_BYTES])) {
uint32_t null_bytes = m_null_bytes_in_rec;
if (dd_table_has_instant_cols(*old_dd_tab)) {
old_dd_tab->se_private_data().get(
dd_table_key_strings[DD_TABLE_NULL_BYTES], &null_bytes);
}
new_dd_tab->se_private_data().set(dd_table_key_strings[DD_TABLE_NULL_BYTES],
null_bytes);
}
/* To remember old default values if exist */
dd_copy_table_columns(*new_dd_tab, *old_dd_tab);
/* Then add all new default values */
dd_add_instant_columns(old_table, altered_table, new_dd_tab);
// assert(dd_table_has_instant_cols(*new_dd_tab));
}
void ha_xengine::dd_commit_inplace_no_change(const dd::Table *old_dd_tab,
dd::Table *new_dd_tab)
{
dd_copy_private(*new_dd_tab, *old_dd_tab);
/* To remember old default values if exist */
dd_copy_table_columns(*new_dd_tab, *old_dd_tab);
/*if (!dd_table_is_partitioned(new_dd_tab->table()) ||
dd_part_is_first(reinterpret_cast<dd::Partition *>(new_dd_tab))) {
dd_copy_table(new_dd_tab->table(), old_dd_tab->table());
}*/
}
/** Update table level instant metadata in commit phase,
may be instant column name is modified, or default value is modifed
after ddl operation.
1. if add columns mixed with other ddl operation, it will be inplace type
2. if data type changed or not-null/null attribute changed, it will be copy/rebuild
so instant columns and instant nulls should be same to old_table
@param[in] old_dd_tab old dd::Table
@param[in] new_dd_tab new dd::Table */
void dd_commit_inplace_update_instant_meta(
my_core::Alter_inplace_info *const ha_alter_info,
const dd::Table *old_dd_tab, dd::Table *new_dd_tab, TABLE *table,
TABLE *altered_table)
{
if (!dd_table_has_instant_cols(*old_dd_tab)) {
return;
}
assert(old_dd_tab->se_private_data().exists(
dd_table_key_strings[DD_TABLE_INSTANT_COLS]));
assert(old_dd_tab->se_private_data().exists(
dd_table_key_strings[DD_TABLE_NULL_BYTES]));
uint32_t instant_cols = 0;
old_dd_tab->se_private_data().get(dd_table_key_strings[DD_TABLE_INSTANT_COLS],
&instant_cols);
new_dd_tab->se_private_data().set(dd_table_key_strings[DD_TABLE_INSTANT_COLS],
instant_cols);
uint32_t null_bytes = 0;
old_dd_tab->se_private_data().get(dd_table_key_strings[DD_TABLE_NULL_BYTES],
&null_bytes);
new_dd_tab->se_private_data().set(dd_table_key_strings[DD_TABLE_NULL_BYTES],
null_bytes);
//update dd-column meta if necessary
assert(table->s->fields == altered_table->s->fields);
if (!(ha_alter_info->handler_flags & Alter_inplace_info::ALTER_COLUMN_NAME)) {
dd_copy_table_columns(*new_dd_tab, *old_dd_tab);
return;
}
//some field name changed during inplae-ddl
Field *old_field, *new_field;
for (uint i=0; i < table->s->fields; i++) {
old_field = table->field[i];
new_field = altered_table->field[i];
dd::Column *old_col = const_cast<dd::Column *>(dd_find_column(old_dd_tab, old_field->field_name));
dd::Column *new_col = const_cast<dd::Column *>(
dd_find_column(new_dd_tab, new_field->field_name));
#ifndef NDEBUG
if (new_field->flags & FIELD_IS_RENAMED) {
assert(my_strcasecmp(system_charset_info, old_field->field_name,
new_field->field_name));
}
#endif
if (!old_col->se_private_data().empty()) {
if (!new_col->se_private_data().empty())
new_col->se_private_data().clear();
new_col->set_se_private_data(old_col->se_private_data());
}
}
return;
}
void ha_xengine::dd_commit_inplace_instant(Alter_inplace_info *ha_alter_info,
const TABLE *old_table,
const TABLE *altered_table,
const dd::Table *old_dd_tab,
dd::Table *new_dd_tab)
{
assert(is_instant(ha_alter_info));
auto type = static_cast<Instant_Type>(ha_alter_info->handler_trivial_ctx);
switch (type) {
case Instant_Type::INSTANT_NO_CHANGE:
dd_commit_inplace_no_change(old_dd_tab, new_dd_tab);
break;
case Instant_Type::INSTANT_ADD_COLUMN:
dd_copy_private(*new_dd_tab, *old_dd_tab);
dd_commit_instant_table(old_table, altered_table, &old_dd_tab->table(),
&new_dd_tab->table());
break;
case Instant_Type::INSTANT_IMPOSSIBLE:
default:
assert(0);
}
}
/** If ddl failed, we need to rollback new added indexes.
@param added_indexes, new add indexes in ddl
@ret, SUCCESS/FAILURE. */
int ha_xengine::rollback_added_index(
const std::unordered_set<std::shared_ptr<Xdb_key_def>>& added_indexes)
{
int ret = HA_EXIT_SUCCESS;
/* Remove uncommitted key definitions from ddl_manager */
ddl_manager.remove_uncommitted_keydefs(added_indexes);
/* Rollback created indexes */
m_tbl_def->clear_keys_for_ddl();
/* fix #22611997, in rollback case, we need renew members of handlers
Buffers need to be set up again to account for new, possibly longer
secondary keys. */
free_key_buffers();
if ((ret = alloc_key_buffers(table, m_tbl_def.get()))) {
__XHANDLER_LOG(ERROR, "XEngine failed allocating key buffers during alter. table_name: %s", table->s->table_name.str);
return ret;
}
return ret;
}
/**
Commit or rollback the changes made during prepare_inplace_alter_table()
and inplace_alter_table() inside the storage engine.
Note that in case of rollback the allowed level of concurrency during
this operation will be the same as for inplace_alter_table() and thus
might be higher than during prepare_inplace_alter_table(). (For example,
concurrent writes were blocked during prepare, but might not be during
rollback).
@note Storage engines are responsible for reporting any errors by
calling my_error()/print_error()
@note If this function with commit= true reports error, it will be called
again with commit= false.
@note In case of partitioning, this function might be called for rollback
without prepare_inplace_alter_table() having been called first.
Also partitioned tables sets ha_alter_info->group_commit_ctx to a NULL
terminated array of the partitions handlers and if all of them are
committed as one, then group_commit_ctx should be set to NULL to indicate
to the partitioning handler that all partitions handlers are committed.
@see prepare_inplace_alter_table().
@param altered_table TABLE object for new version of table.
@param ha_alter_info Structure describing changes to be done
by ALTER TABLE and holding data used
during in-place alter.
@param commit True => Commit, False => Rollback.
@param old_table_def dd::Table object describing old version of
the table.
@param new_table_def dd::Table object for the new version of the
table. Can be adjusted by this call if SE
supports atomic DDL. These changes to the
table definition will be persisted in the
data-dictionary at statement commit time.
@retval true Error
@retval false Success
*/
bool ha_xengine::commit_inplace_alter_table(
my_core::TABLE *const altered_table,
my_core::Alter_inplace_info *const ha_alter_info, bool commit,
const dd::Table *old_dd_tab, dd::Table *new_dd_tab)
{
DBUG_ENTER_FUNC();
DBUG_ASSERT(nullptr != ha_alter_info);
auto ctx = dynamic_cast<Xdb_inplace_alter_ctx *>(ha_alter_info->handler_ctx);
if (is_instant(ha_alter_info)) {
dd_commit_inplace_instant(ha_alter_info, table, altered_table, old_dd_tab,
new_dd_tab);
#ifndef NDEBUG
__XHANDLER_LOG(INFO, "XEngineDDL: commit instant alter sql is %s, commit is %d", ha_thd()->query().str, (int)commit);
#endif
DBUG_RETURN(HA_EXIT_SUCCESS);
} else if (nullptr == ctx) {
/* If ctx has not been created yet, nothing to do here */
__XHANDLER_LOG(INFO, "XEngineDDL: ctx is released or not initialized");
DBUG_RETURN(HA_EXIT_SUCCESS);
} else {
int ret = HA_EXIT_SUCCESS;
bool is_rebuild = ctx->m_rebuild;
if (ctx->m_rebuild) {
ret = commit_inplace_alter_table_rebuild(altered_table, ha_alter_info,
commit, old_dd_tab, new_dd_tab);
} else {
ret = commit_inplace_alter_table_norebuild(
altered_table, ha_alter_info, commit, old_dd_tab, new_dd_tab);
}
if (!ret) {
#ifndef NDEBUG
__XHANDLER_LOG(INFO, "XEngineDDL: inplace ddl %s successfully, table: %s",
commit ? "commit" : "rollback", table->s->table_name.str);
#endif
} else {
__XHANDLER_LOG(WARN, "XEngineDDL: failed to commit inplace ddl, table: %s, errcode is %d",
table->s->table_name.str, ret);
}
#ifndef NDEBUG
__XHANDLER_LOG(INFO, "XEngineDDL: commit alter sql is %s, inplace_rebuild: %d, commit is %d, code is %d", ha_thd()->query().str, (int)is_rebuild, (int)commit, ret);
#endif
DBUG_RETURN(ret);
}
}
/** Maybe autoincrement value changed by Alter Statement,
Get the auto-increment value of the table on commit.
@param[in] ha_alter_info Data used during in-place alter
@param[in] altered_table MySQL table that is being altered
@param[in] old_table MySQL table as it is before the ALTER operation
@retval HA_EXIT_SUCCESS Success, others FAILED */
int ha_xengine::commit_inplace_alter_get_autoinc(Alter_inplace_info *ha_alter_info,
const TABLE *altered_table,
const TABLE *old_table)
{
DBUG_ENTER("alter_commit_get_autoinc");
DBUG_ASSERT(nullptr != ha_alter_info);
auto ctx = dynamic_cast<Xdb_inplace_alter_ctx *>(ha_alter_info->handler_ctx);
DBUG_ASSERT(ctx);
if (altered_table->found_next_number_field &&
((ha_alter_info->handler_flags &
Alter_inplace_info::CHANGE_CREATE_OPTION) &&
(ha_alter_info->create_info->used_fields & HA_CREATE_USED_AUTO))) {
/* An AUTO_INCREMENT value was supplied, but the table was not
rebuilt. Get the user-supplied value or the last value from the
sequence. */
ulonglong set_autoinc_value =
ha_alter_info->create_info->auto_increment_value;
if (old_table->found_next_number_field) {
load_auto_incr_value();
}
ulonglong max_value_table = m_tbl_def->m_auto_incr_val;
if (set_autoinc_value > max_value_table) {
ctx->m_new_tdef->m_auto_incr_val = set_autoinc_value;
}
}
DBUG_RETURN(HA_EXIT_SUCCESS);
}
/** Both inplace alter ddl(rebuild/no_rebuild) should do these things at commit phase
1. update autoinc if necessary
2. update xengine dictionary
3. calulate statistics
4. enable major compaction and inner merge operation
@param[in] altered_table, new table
@param[in] ha_alter_info, DDL operation
@param[in] commit, true indicate commit, and false indicate rollback
@param[in] new_added_indexes
@return SUCCESS/FAILURE */
int ha_xengine::commit_inplace_alter_table_common(
my_core::TABLE *const altered_table,
my_core::Alter_inplace_info *const ha_alter_info, bool commit,
const std::unordered_set<std::shared_ptr<Xdb_key_def>> &new_added_indexes)
{
DBUG_ENTER_FUNC();
//update autoinc if necessary
if (commit_inplace_alter_get_autoinc(ha_alter_info, altered_table, table)) {
__XHANDLER_LOG(ERROR, "XEngineDDL: get autoinc value failed. table_name: %s", table->s->table_name.str);
return HA_EXIT_FAILURE;
}
DBUG_ASSERT(nullptr != ha_alter_info);
auto ctx0 = dynamic_cast<Xdb_inplace_alter_ctx *>(ha_alter_info->handler_ctx);
DBUG_ASSERT(ctx0);
m_tbl_def->clear_keys_for_ddl();
// current Xdb_tbl_def represented by m_tbl_def will be unreferenced
// new Xdb_tbl_def represented by ctx0->m_new_tdef will be referenced
m_tbl_def = ctx0->m_new_tdef;
m_key_descr_arr = m_tbl_def->m_key_descr_arr;
m_pk_descr = m_key_descr_arr[pk_index(altered_table, m_tbl_def.get())];
/*
For partitioned tables, we need to commit all changes to all tables at
once, unlike in the other inplace alter API methods.
*/
inplace_alter_handler_ctx **ctx_array;
inplace_alter_handler_ctx *ctx_single[2];
if (ha_alter_info->group_commit_ctx) {
DBUG_EXECUTE_IF("crash_during_index_creation_partition", DBUG_SUICIDE(););
ctx_array = ha_alter_info->group_commit_ctx;
} else {
ctx_single[0] = ctx0;
ctx_single[1] = nullptr;
ctx_array = ctx_single;
}
DBUG_ASSERT(ctx0 == ctx_array[0]);
ha_alter_info->group_commit_ctx = nullptr;
THD *const thd = table ? table->in_use : my_core::thd_get_current_thd();
Xdb_transaction *const tx = get_or_create_tx(thd);
xengine_register_tx(ht, thd, tx);
auto batch = dynamic_cast<xengine::db::WriteBatch *>(tx->get_blind_write_batch());
std::unordered_set<GL_INDEX_ID> create_index_ids;
dict_manager.lock();
for (inplace_alter_handler_ctx **pctx = ctx_array; *pctx; pctx++) {
auto ctx = dynamic_cast<Xdb_inplace_alter_ctx *>(*pctx);
for (uint i = 0; i < ctx->m_new_tdef->m_key_count; i++) {
create_index_ids.insert(ctx->m_new_tdef->m_key_descr_arr[i]->get_gl_index_id());
}
if (ddl_manager.put_and_write(ctx->m_new_tdef, batch, &ddl_log_manager,
thd_thread_id(thd), false)) {
DBUG_ASSERT(0);
}
/*
Remove uncommitted key definitions from ddl_manager, as they are now
committed into the data dictionary.
*/
ddl_manager.remove_uncommitted_keydefs(new_added_indexes);
}
dict_manager.unlock();
/** if crash here, during crash-recovery, drop_thread will collect
create-ongoing-indexs then drop subtables;
during post_recover phase, we collect ddl_logs and find subtables, if
exists, then drop it; if not, do nothing. */
DBUG_EXECUTE_IF("ddl_log_crash_before_remove_index_ongoing", DBUG_SUICIDE(););
/* Mark ongoing create indexes as finished/remove from data dictionary */
//dict_manager.finish_indexes_operation(create_index_ids,
// Xdb_key_def::DDL_CREATE_INDEX_ONGOING);
/** if crash here, during post_recover phase, we drop it. */
DBUG_EXECUTE_IF("ddl_log_crash_after_remove_index_ongoing", DBUG_SUICIDE(););
/*
We need to recalculate the index stats here manually. The reason is that
the secondary index does not exist inside
m_index_num_to_keydef until it is committed to the data dictionary, which
prevents us from updating the stats normally as the ddl_manager cannot
find the proper gl_index_ids yet during adjust_stats calls.
*/
if (calculate_stats(altered_table, nullptr, nullptr)) {
/* Failed to update index statistics, should never happen */
DBUG_ASSERT(0);
}
// enable add-index major compaction
std::vector<ColumnFamilyHandle *> column_family_handles;
for (auto& index : new_added_indexes) {
xengine::db::ColumnFamilyHandle *cf = index->get_cf();
column_family_handles.push_back(cf);
}
auto txn_db_impl = dynamic_cast<xengine::util::TransactionDBImpl *>(xdb);
xengine::common::Status s;
txn_db_impl->GetDBImpl()->switch_major_compaction(column_family_handles,
true);
if (!s.ok()) {
__XHANDLER_LOG(ERROR, "XEngineDDL: switch_major_compaction fail: %s. table_name: %s",
s.ToString().c_str(), table->s->table_name.str);
DBUG_RETURN(HA_EXIT_FAILURE);
}
s = txn_db_impl->GetDBImpl()->enable_backgroud_merge(column_family_handles);
if (!s.ok()) {
__XHANDLER_LOG(ERROR, "XEngineDDL: enable_background_merge fail: %s. table_name: %s",
s.ToString().c_str(), table->s->table_name.str);
DBUG_RETURN(HA_EXIT_FAILURE);
}
DBUG_RETURN(HA_EXIT_SUCCESS);
}
/** commit phase for inplace_rebuild ddl
@param[in] altered_table new table
@param[in] ha_alter_info DDL operation
@param[in] commit, true is commit and false is rollback
@param[in] old_dd_tab
@param[in] new_dd_tab
@return true is ERROR and false is Success */
bool ha_xengine::commit_inplace_alter_table_rebuild(
my_core::TABLE *const altered_table,
my_core::Alter_inplace_info *const ha_alter_info, bool commit,
const dd::Table *old_dd_tab, dd::Table *new_dd_tab)
{
DBUG_ENTER_FUNC();
DBUG_ASSERT(altered_table != nullptr);
DBUG_ASSERT(ha_alter_info != nullptr);
auto ctx0 = dynamic_cast<Xdb_inplace_alter_ctx *>(ha_alter_info->handler_ctx);
Xdb_tbl_def *new_tdef = m_tbl_def->m_inplace_new_tdef;
DBUG_ASSERT(new_tdef != nullptr);
int ret = HA_EXIT_SUCCESS;
std::unordered_set<std::shared_ptr<Xdb_key_def>> new_added_indexes;
for (uint i=0; i < new_tdef->m_key_count; i++) {
new_added_indexes.insert(new_tdef->m_key_descr_arr[i]);
}
if (commit) {
// if there were dups between inplace_alter_table's return and now,
// capture it here
for (auto &k : m_tbl_def->m_inplace_new_keys) {
if ((ret = inplace_check_dml_error(altered_table,
&altered_table->key_info[k.first->get_keyno()],
k.second))) {
__XHANDLER_LOG(WARN, "XEngineDDL: found error when rebuild table, code: %d", ret);
rollback_added_index(new_added_indexes);
ret = HA_EXIT_FAILURE;
goto rebuild_fun_end;
}
}
} else {
/* rollback created indexes **/
ret = rollback_added_index(new_added_indexes);
goto rebuild_fun_end;
}
/*
IMPORTANT: When rollback is requested, mysql will abort with
an assertion failure. That means every failed commit during inplace alter
table will result in a fatal error on the server. Indexes ongoing creation
will be detected when the server restarts, and dropped.
*/
DBUG_EXECUTE_IF("ddl_log_crash_before_inplace_ddl_commit", DBUG_SUICIDE(););
ret = commit_inplace_alter_table_common(altered_table, ha_alter_info, commit,
new_added_indexes);
rebuild_fun_end:
// any way, whether succeed or fail, we need to reload new_tdef for atomic ddl
delete ctx0;
ha_alter_info->handler_ctx = nullptr;
DBUG_RETURN(ret);
}
/** commit phase for inplace_norebuild ddl
@param[in] altered_table new table
@param[in] ha_alter_info DDL operation
@param[in] commit, true is commit and false is rollback
@param[in] old_dd_tab
@param[in] new_dd_tab
@return true is ERROR and false is Success */
bool ha_xengine::commit_inplace_alter_table_norebuild(
my_core::TABLE *const altered_table,
my_core::Alter_inplace_info *const ha_alter_info, bool commit,
const dd::Table *old_dd_tab, dd::Table *new_dd_tab)
{
DBUG_ENTER_FUNC();
DBUG_ASSERT(altered_table != nullptr);
DBUG_ASSERT(ha_alter_info != nullptr);
auto ctx0 = dynamic_cast<Xdb_inplace_alter_ctx *>(ha_alter_info->handler_ctx);
int ret = HA_EXIT_SUCCESS;
if (commit) {
// if there were dups between inplace_alter_table's return and now,
// capture it here
for (auto &k : m_tbl_def->m_added_key) {
if ((ret = inplace_check_dml_error(altered_table,
&altered_table->key_info[k.first->get_keyno()],
k.second))) {
__XHANDLER_LOG(WARN, "XEngineDDL: found error when build unique index, code is %d", ret);
DBUG_ASSERT(ctx0 != nullptr);
rollback_added_index(ctx0->m_added_indexes);
ret = HA_EXIT_FAILURE;
goto fun_end;
}
}
}
/*
* IMPORTANT: When rollback is requested, mysql will abort with
* an assertion failure. That means every failed commit during inplace alter
* table will result in a fatal error on the server. Indexes ongoing creation
* will be detected when the server restarts, and dropped.
*
* For partitioned tables, a rollback call to this function (commit == false)
* is done for each partition. A successful commit call only executes once
* for all partitions.
*/
if (!commit) {
// be consistent with ha_xengine::open,
// only when add indexes case, we will enlarge our buffer
if (ha_alter_info->handler_flags &
(my_core::Alter_inplace_info::ADD_INDEX |
my_core::Alter_inplace_info::ADD_UNIQUE_INDEX)) {
ret = rollback_added_index(ctx0->m_added_indexes);
goto fun_end;
} else {
ret = HA_EXIT_SUCCESS;
goto fun_end;
}
}
DBUG_EXECUTE_IF("ddl_log_crash_before_inplace_ddl_commit", DBUG_SUICIDE(););
dd_commit_inplace_update_instant_meta(ha_alter_info, old_dd_tab, new_dd_tab, table, altered_table);
/*
For partitioned tables, we need to commit all changes to all tables at
once, unlike in the other inplace alter API methods.
*/
#ifndef NDEBUG
inplace_alter_handler_ctx **ctx_array;
inplace_alter_handler_ctx *ctx_single[2];
#endif
DBUG_ASSERT(ctx0);
if (ha_alter_info->group_commit_ctx) {
DBUG_EXECUTE_IF("crash_during_index_creation_partition", DBUG_SUICIDE(););
#ifndef NDEBUG
ctx_array = ha_alter_info->group_commit_ctx;
} else {
ctx_single[0] = ctx0;
ctx_single[1] = nullptr;
ctx_array = ctx_single;
#endif
}
DBUG_ASSERT(ctx0 == ctx_array[0]);
ha_alter_info->group_commit_ctx = nullptr;
ret = commit_inplace_alter_table_common(altered_table, ha_alter_info, commit,
ctx0->m_added_indexes);
fun_end:
// any way, whether succeed or fail, we need to reload new_tdef for atomic ddl
delete ctx0;
ha_alter_info->handler_ctx = nullptr;
DBUG_RETURN(ret);
}
/** Unique check for ongoing DML with new index building concurrently.
used both for lock_and_check_new_index_pk and
lock_and_check_new_index_sk.
@param[in] kd, xengine key definition
@param[in/out] ki, altered_table and dup key information
@param[in] row_info, Xdb_transaction, new_pk_slice
@param[in] key, key slice to check
@param[in] is_rebuild
@param[out] duplicate key found or not
@return SUCCESS/FAILURE */
void ha_xengine::check_new_index_unique_inner(
const Xdb_key_def &kd, Added_key_info &ki,
const struct update_row_info &row_info, const xengine::common::Slice& key,
bool is_rebuild, bool *const found) const
{
#ifndef NDEBUG
__XHANDLER_LOG(INFO, "XEngineDDL: unique check for %s(size=%u) on index:%d during DML",
key.ToString(true).c_str(), key.size(), kd.get_index_number());
#endif
DBUG_ASSERT(ki.altered_table);
KEY *key_info = &ki.altered_table->key_info[kd.get_keyno()];
DBUG_ASSERT(key_info);
uint user_defined_key_parts = key_info->user_defined_key_parts;
const bool all_parts_used = (user_defined_key_parts == kd.get_key_parts());
const bool total_order_seek =
!can_use_bloom_filter(ha_thd(), kd, key, all_parts_used,
is_ascending(kd, HA_READ_KEY_EXACT));
const bool fill_cache = true;
// check against (t1, -)
IteratorUptr iter(row_info.tx->get_iterator(kd.get_cf(), total_order_seek,
fill_cache, true /* read current data */, false /* acquire snapshot */,
true /*exclude_l2*/, true/* for unique check */));
/*
* Need to scan the transaction to see if there is a duplicate key.
* Also need to scan XENGINE and verify the key has not been deleted
* in the transaction.
*/
// Stage 1 check write batch only
auto wb_status = xengine::db::Iterator::kNonExist;
auto uk_iter = dynamic_cast<UniqueCheckBaseDeltaIterator*>(iter.get());
DBUG_ASSERT(nullptr != uk_iter);
for (uk_iter->Seek(key); !uk_iter->is_at_base() && uk_iter->Valid(); uk_iter->Next()) {
uint cmp_size = uk_iter->key().size();
if (kd.is_secondary_key()) {
uint n_null_fields = 0;
cmp_size = kd.get_memcmp_sk_size(ki.altered_table, uk_iter->key(),
&n_null_fields);
if (n_null_fields > 0) {
#ifndef NDEBUG
__XHANDLER_LOG(INFO, "XEngineDDL: null field exists: %u", n_null_fields);
#endif
continue;
}
}
if (cmp_size != key.size() ||
memcmp(key.data(), uk_iter->key().data(), cmp_size)) {
// current key doesn't match exactly, no further check in WriteBatch
#ifndef NDEBUG
__XHANDLER_LOG(INFO, "XEngineDDL: record in WriteBatch doesn't match:(key:%s, size:%u) for indxe:%d",
uk_iter->key().ToString(true).c_str(), cmp_size,
kd.get_index_number());
#endif
break;
}
wb_status = uk_iter->key_status();
if (xengine::db::Iterator::kExist == wb_status) {
// find conflict user key, no further check in WriteBatch and base
#ifndef NDEBUG
__XHANDLER_LOG(INFO, "XEngineDDL: find duplicate record %s in WriteBatch for indxe:%d",
key.ToString(true).c_str(), kd.get_index_number());
#endif
break;
}
DBUG_ASSERT(xengine::db::Iterator::kDeleted == wb_status);
// found deleted record, check more in WriteBatch
#ifndef NDEBUG
__XHANDLER_LOG(INFO, "XEngineDDL: find deleted record %s in WriteBatch for indxe:%d",
uk_iter->key().ToString(true).c_str(), kd.get_index_number());
#endif
}
if (xengine::db::Iterator::kExist == wb_status) {
// duplicate record is found in WriteBatch
*found = true;
} else if (xengine::db::Iterator::kDeleted == wb_status) {
// duplicate record is deleted in WriteBatch
*found = false;
} else if (ki.step >= Added_key_info::CHECK_UNIQUE_CONSTRAINT) {
// Stage 2 check L0, L1 and L2 if not match in WriteBatch, if needed
/* no matching record is found in WriteBatch
* further check all user keys from base iterator of delta (L0, L1) and L2.
* We only do this unique check procedure after DDL has finished building
* baseline, because we only can get static L2 data at this stage.
* If we do this unique check procedure only with data in L0&L1, it is just
* waste of time due to incomplete data set
*/
std::vector<UserKeySequence> user_key_sequences;
for (uk_iter->change_to_base(); uk_iter->Valid(); uk_iter->Next()) {
uint cmp_size = uk_iter->key().size();
if (kd.is_secondary_key()) {
uint n_null_fields = 0;
cmp_size = kd.get_memcmp_sk_size(ki.altered_table, uk_iter->key(),
&n_null_fields);
if (n_null_fields > 0) {
#ifndef NDEBUG
__XHANDLER_LOG(INFO, "XEngineDDL: null field exist: %u", n_null_fields);
#endif
continue;
}
}
if (cmp_size != key.size() ||
memcmp(key.data(), uk_iter->key().data(), cmp_size)) {
// no more matching record in L0 and L1
#ifndef NDEBUG
__XHANDLER_LOG(INFO, "XEngineDDL: record in L0&L1 doesn't match: (key:%s, size:%u) for index:%d",
uk_iter->key().ToString().c_str(), cmp_size,
kd.get_index_number());
#endif
break;
}
auto sequence = uk_iter->key_seq();
auto status = uk_iter->key_status();
DBUG_ASSERT(xengine::db::Iterator::kExist == status ||
xengine::db::Iterator::kDeleted == status);
// match in (L0, L1), collect status for later usage
#ifndef NDEBUG
__XHANDLER_LOG(INFO, "XEngineDDL: match record %s(%u, %s) in L0&L1 for index:%d",
uk_iter->key().ToString(true).c_str(), sequence,
(xengine::db::Iterator::kExist == status) ? "PUT" : "DEL",
kd.get_index_number());
#endif
user_key_sequences.emplace_back(sequence, status);
}
xengine::common::ReadOptions opts;
opts.total_order_seek = true;
opts.read_level_ = xengine::common::kOnlyL2;
iter.reset(xdb->NewIterator(opts, kd.get_cf()));
iter->Seek(key);
if (HA_EXIT_SUCCESS ==
read_key_exact(kd, iter.get(), all_parts_used, key)) {
#ifndef NDEBUG
__XHANDLER_LOG(INFO, "XEngineDDL: find match record %s in L2 for index:%d",
iter->key().ToString(true).c_str(), kd.get_index_number());
#endif
user_key_sequences.emplace_back(0, xengine::db::Iterator::kExist);
}
*found = (!user_key_sequences.empty() &&
(check_user_key_sequence(user_key_sequences) ||
xengine::db::Iterator::kExist == user_key_sequences.begin()->second));
}
if (*found) {
__XHANDLER_LOG(WARN, "XEngineDDL: duplicate entry is found during DML for %s on index:%d",
key.ToString(true).c_str(), kd.get_index_number());
// save the duplicated key
uint err = HA_EXIT_SUCCESS;
bool rc = ki.status.compare_exchange_strong(err, HA_ERR_FOUND_DUPP_KEY);
// err only set from 0 -> DUP_KEY, not otherwise. false alert is possible,
// e.g. the dup record is deleted later, but not detect that right now.
if (rc) {
if (set_duplicate_key_for_print(ki.altered_table, table, &kd,
m_tbl_def->m_dict_info, is_rebuild)) {
__XHANDLER_LOG(WARN, "XEngineDDL: Generate duplicate key error, table_name:%s",
table->s->table_name.str);
} else {
#ifndef NDEBUG
__XHANDLER_LOG(INFO, "XEngineDDL: Generate duplicate key successfully, table_name:%s",
table->s->table_name.str);
#endif
}
ki.dup_key_saved.store(true);
}
}
}
/** Only used for rebuild table, if primary key is modified,
all secondary indexes will also modified.
@param[in] kd, xengine key definition
@param[in/out] ki, altered_table and dup key information
@param[in] row_info, old_data and new_data
@param[out] duplicate key found or not
@return SUCCESS/FAILURE */
int ha_xengine::lock_and_check_new_index_pk(
const Xdb_key_def &kd, Added_key_info &ki,
const struct update_row_info &row_info, bool *const pk_changed,
bool *const found) const
{
DEBUG_SYNC(ha_thd(), "xengine.check_and_lock_pk_inplace");
DBUG_ASSERT(found != nullptr);
DBUG_ASSERT(!has_hidden_pk(ki.altered_table));
DBUG_ASSERT(row_info.new_data != nullptr);
*pk_changed = false;
// pack user data with new table
uint new_pk_size = 0;
if (m_new_pk_descr->pack_new_record(table, m_pack_buffer, row_info.new_data,
m_pk_packed_tuple, nullptr,
false, 0, 0, nullptr, ki.altered_table,
m_tbl_def->m_dict_info, new_pk_size)) {
__XHANDLER_LOG(ERROR, "XEngineDDL: pack new data for new table error. table_name: %s", table->s->table_name.str);
return HA_EXIT_FAILURE;
}
if (row_info.old_data != nullptr) {
// for update
uint old_pk_size = 0;
if (m_new_pk_descr->pack_new_record(table, m_pack_buffer, row_info.old_data,
m_pk_packed_tuple_old, nullptr,
false, 0, 0, nullptr, ki.altered_table,
m_tbl_def->m_dict_info, old_pk_size)) {
__XHANDLER_LOG(ERROR, "XEngineDDL: pack old data for new table error. table_name: %s", table->s->table_name.str);
return HA_EXIT_FAILURE;
}
DBUG_ASSERT(old_pk_size > 0);
// If the keys are the same, then no lock is needed
if ((new_pk_size == old_pk_size) &&
!memcmp(m_pk_packed_tuple, m_pk_packed_tuple_old, new_pk_size)) {
*found = false;
return HA_EXIT_SUCCESS;
}
*pk_changed = true;
}
/* For UPDATEs, if the key has changed, we need to obtain a lock. INSERTs
* always require locking.
*/
xengine::common::Slice new_pk_slice((const char *)m_pk_packed_tuple, new_pk_size);
std::string tmp_value;
const xengine::common::Status s =
get_for_update(row_info.tx, kd.get_cf(), new_pk_slice, &tmp_value);
if (!s.ok() && !s.IsNotFound()) {
__XHANDLER_LOG(WARN, "XEngineDDL: get_for_update for key(%s) on pk(%u) failed with error:%s, table_name: %s",
new_pk_slice.ToString(true).c_str(),
kd.get_index_number(), s.ToString().c_str(),
table->s->table_name.str);
return row_info.tx->set_status_error(ki.altered_table->in_use, s,
*m_new_pk_descr, m_tbl_def->m_inplace_new_tdef);
}
if (!kd.m_can_skip_unique_check) {
check_new_index_unique_inner(kd, ki, row_info, new_pk_slice, true, found);
}
return HA_EXIT_SUCCESS;
}
/** During online-ddl running, new arrival inserts/updates need to do unique-check
for new-index base from ddl-session and increments itself.
@param[in] kd, xengine key definition
@param[in/out] ki, altered_table and dup key information
@param[in] row_info, old_data and new_data
@param[out] duplicate key found or not
@param[in] true is rebuild-ddl, and we should pack new record.
@return SUCCESS/FAILURE */
int ha_xengine::lock_and_check_new_index_sk(
const Xdb_key_def &kd, Added_key_info &ki,
const struct update_row_info &row_info, bool *const found,
bool is_rebuild/* = false */) const
{
DBUG_ASSERT(found != nullptr);
*found = false;
/* Can skip checking this key if none of the key fields have changed. */
if (row_info.old_data != nullptr && !m_update_scope.is_set(kd.get_keyno())) {
return HA_EXIT_SUCCESS;
}
TABLE *altered_table = ki.altered_table;
KEY *key_info = nullptr;
uint n_null_fields = 0;
uint user_defined_key_parts = 1;
key_info = &altered_table->key_info[kd.get_keyno()];
user_defined_key_parts = key_info->user_defined_key_parts;
/** If there are no uniqueness requirements, there's no need to obtain a
lock for this key. */
if (!(key_info->flags & HA_NOSAME)) {
return HA_EXIT_SUCCESS;
}
/* Calculate the new key for obtaining the lock
For unique secondary indexes, the key used for locking does not
include the extended fields.*/
uint size = 0;
if (!is_rebuild) {
size = kd.pack_record(table, m_pack_buffer, row_info.new_data,
m_sk_packed_tuple, nullptr, false, 0,
user_defined_key_parts, &n_null_fields, altered_table);
} else {
if (kd.pack_new_record(table, m_pack_buffer, row_info.new_data,
m_sk_packed_tuple, nullptr, false, 0,
user_defined_key_parts, &n_null_fields,
altered_table, m_tbl_def->m_dict_info, size)) {
__XHANDLER_LOG(ERROR, "XEngineDDL: pack new record errror, table_name: %s", table->s->table_name.str);
return HA_EXIT_FAILURE;
}
}
if (n_null_fields > 0) {
#ifndef NDEBUG
__XHANDLER_LOG(INFO, "XEngineDDL: null field exist: %u", n_null_fields);
#endif
/** If any fields are marked as NULL this will never match another row as
to NULL never matches anything else including another NULL. */
return HA_EXIT_SUCCESS;
}
const xengine::common::Slice new_slice =
xengine::common::Slice((const char *)m_sk_packed_tuple, size);
/** For UPDATEs, if the key has changed, we need to obtain a lock. INSERTs
always require locking. */
if (row_info.old_data != nullptr) {
// don't include the hidden_pk_id for sk comparision
if (!is_rebuild) {
size = kd.pack_record(table, m_pack_buffer, row_info.old_data,
m_sk_packed_tuple_old, nullptr, false,
0, user_defined_key_parts, nullptr, altered_table);
} else {
if (kd.pack_new_record(table, m_pack_buffer, row_info.old_data,
m_sk_packed_tuple_old, nullptr, false,
0, user_defined_key_parts, nullptr, altered_table,
m_tbl_def->m_dict_info, size)) {
__XHANDLER_LOG(ERROR, "XEngineDDL: pack new record errror, table_name: %s", table->s->table_name.str);
return HA_EXIT_FAILURE;
}
}
assert(size <= m_max_packed_sk_len);
const xengine::common::Slice old_slice =
xengine::common::Slice((const char *)m_sk_packed_tuple_old, size);
/** For updates, if the keys are the same, then no lock is needed
Also check to see if the key has any fields set to NULL. If it does, then
this key is unique since NULL is not equal to each other, so no lock is
needed. */
if (!Xdb_pk_comparator::bytewise_compare(new_slice, old_slice)) {
return HA_EXIT_SUCCESS;
}
}
/** Perform a read to determine if a duplicate entry exists - since this is
a secondary indexes a range scan is needed.
note: we intentionally don't set options.snapshot here. We want to read
the latest committed data. */
const bool all_parts_used = (user_defined_key_parts == kd.get_key_parts());
/**This iterator seems expensive since we need to allocate and free
memory for each unique index.
If this needs to be optimized, for keys without NULL fields, the
extended primary key fields can be migrated to the value portion of the
key. This enables using Get() instead of Seek() as in the primary key
case.
The bloom filter may need to be disabled for this lookup. */
const bool total_order_seek =
!can_use_bloom_filter(ha_thd(), kd, new_slice, all_parts_used,
is_ascending(kd, HA_READ_KEY_EXACT));
const bool fill_cache = true; // !THDVAR(ha_thd(), skip_fill_cache);
// psergey-todo: we just need to take lock, lookups not needed:
DBUG_ASSERT(!kd.m_is_reverse_cf);
const xengine::common::Status s = lock_unique_key(
row_info.tx, kd.get_cf(), new_slice, total_order_seek, fill_cache);
if (!s.ok() && !s.IsNotFound()) {
__XHANDLER_LOG(WARN, "XEngineDDL: lock_unique_key for key(%s) on index:%d failed with error:%s, table_name: %s",
new_slice.ToString(true).c_str(),
kd.get_index_number(), s.ToString().c_str(),
table->s->table_name.str);
auto tbl = is_rebuild ? m_tbl_def->m_inplace_new_tdef : m_tbl_def.get();
return row_info.tx->set_status_error(ki.altered_table->in_use, s, kd, tbl);
}
if (!kd.m_can_skip_unique_check) {
check_new_index_unique_inner(kd, ki, row_info, new_slice, is_rebuild, found);
}
return HA_EXIT_SUCCESS;
}
/* unique check and get lock of key for DDL altered table during DML
*
*@param[in] row_info: row record to insert or update
*@param[out] if true it means primary key of new table is changed (either DML
* updates the value of pk, or definition of pk is changed by DDL)
*/
int ha_xengine::check_uniqueness_and_lock_rebuild(
const struct update_row_info &row_info, bool *const pk_changed)
{
int rc = HA_EXIT_SUCCESS;
/*
check new table index if necessary
we need do unique-check for new-pk or new-uk, if failed,
make sure dml transaction go ahead, and record duplicate-error,
then make ddl transaction failed.
*/
for (auto &k : m_tbl_def->m_inplace_new_keys) {
const Xdb_key_def &kd = *k.first;
TABLE *atab = k.second.altered_table;
bool found = false;
if (kd.is_hidden_primary_key()) {
*pk_changed = false;
// No need to check with copied hidden pk
continue;
} else if (kd.is_primary_key()) {
*pk_changed = false;
// check whether the DML will update primary key
rc = lock_and_check_new_index_pk(kd, k.second, row_info,
pk_changed, &found);
// this primary key is newly added/re-defined
// we need convert UPDATE to DEL+PUT
*pk_changed = *pk_changed || !kd.m_can_skip_unique_check;
} else {
rc = lock_and_check_new_index_sk(kd, k.second, row_info, &found, true);
}
if (rc != HA_EXIT_SUCCESS){
__XHANDLER_LOG(WARN, "check duplicated key failed for new key of inplace DDL, errcode is %d", rc);
break;
}
}
/* update new table for inplace rebuild online ddl if necessary */
if (row_info.old_data == nullptr) {
// this is for INSERT
*pk_changed = false;
}
return rc;
}
/** update rebuild new table for inplace_rebuild_ddl
@param[in] row_info The DDL operation
@param[in] pk_changed flag indicate whether we should convert update to (del,put)
@return Success or Failure */
int ha_xengine::update_new_table(struct update_row_info &row_info, bool pk_changed) {
int ret = HA_EXIT_SUCCESS;
for (auto &k : m_tbl_def->m_inplace_new_keys) {
const Xdb_key_def &kd = *k.first;
const TABLE *atab = k.second.altered_table;
if (k.second.step < Added_key_info::BUILDING_BASE_INDEX) {
continue;
}
DBUG_ASSERT(!row_info.new_pk_slice.empty());
if (kd.is_primary_key()) {
ret = update_pk_for_new_table(kd, k.second, row_info, pk_changed, atab);
} else {
DBUG_ASSERT(kd.is_secondary_key());
ret = update_sk_for_new_table(kd, k.second, row_info, atab);
}
if (ret > 0) {
__XHANDLER_LOG(WARN, "XEngineDDL: update indexes error, code is %d, table_name: %s", ret, table->s->table_name.str);
return ret;
}
}
return ret;
}
xengine::common::Status ha_xengine::delete_or_singledelete_new_table(
const TABLE *altered_table, Xdb_tbl_def *new_tbl_def,
uint index, Xdb_transaction *const tx,
xengine::db::ColumnFamilyHandle *const column_family,
const xengine::common::Slice &key) {
bool can_use_single_del_opt = false;
/*
- Secondary Indexes can always use SingleDelete.
- If the index is PRIMARY KEY, and if all of the columns of the table
are covered by the PRIMARY KEY, SingleDelete can be used.
*/
if (index != pk_index(altered_table, new_tbl_def)) {
can_use_single_del_opt = true;
} else if (!has_hidden_pk(altered_table) &&
altered_table->key_info[index].actual_key_parts == altered_table->s->fields) {
can_use_single_del_opt = true;
}
if (can_use_single_del_opt)
return tx->single_delete(column_family, key);
return tx->delete_key(column_family, key);
}
/** update primary key for new table in inplace_rebuild_ddl
@param[in] kd, pk_def
@param[in] row_info, struct store old-data and new-data
@param[in] pk_changed, flag indicate whether we should convert update to (del,put)
@param[in] altered_table
@return Success or Failure */
int ha_xengine::update_pk_for_new_table(const Xdb_key_def &kd,
Added_key_info &ki,
const struct update_row_info &row_info,
const bool pk_changed,
const TABLE *altered_table) {
int rc = HA_EXIT_SUCCESS;
DBUG_ASSERT(altered_table != nullptr);
DBUG_ASSERT(m_tbl_def->m_inplace_new_tdef);
const uint key_id = kd.get_keyno();
const bool hidden_pk = is_hidden_pk(key_id, altered_table, m_tbl_def->m_inplace_new_tdef);
if (!hidden_pk && pk_changed) {
/*
* generate old_pk for new-table and delete it
*/
DBUG_ASSERT(row_info.old_data != nullptr);
DBUG_ASSERT(m_pk_packed_tuple_old);
uint old_packed_size = 0;
if (kd.pack_new_record(
table, m_pack_buffer, row_info.old_data, m_pk_packed_tuple_old,
nullptr, false, 0, 0, nullptr,
altered_table, m_tbl_def->m_dict_info, old_packed_size)) {
__XHANDLER_LOG(ERROR, "XEngineDDL: pack old pk for new table error, table_name: %s", table->s->table_name.str);
return HA_EXIT_FAILURE;
}
assert(old_packed_size <= m_max_packed_sk_len);
xengine::common::Slice old_pk_slice =
xengine::common::Slice(reinterpret_cast<char*>(m_pk_packed_tuple_old), old_packed_size);
const xengine::common::Status s =
delete_or_singledelete_new_table(altered_table, m_tbl_def->m_inplace_new_tdef, key_id, row_info.tx, kd.get_cf(), old_pk_slice);
if (!s.ok()) {
__XHANDLER_LOG(WARN, "XEngineDDL: failed to delete old record(%s) for updating pk(%u), code is %d, table_name: %s",
old_pk_slice.ToString(true).c_str(),
kd.get_index_number(), s.code(), table->s->table_name.str);
return row_info.tx->set_status_error(table->in_use, s, kd, m_tbl_def.get());
}
}
// auto_increment already increased in old table, we just use newest value for new_table
/*
if (table->next_number_field) {
update_auto_incr_val();
}
*/
xengine::common::Slice value_slice;
if ((rc = convert_new_record_from_old_record(
table, altered_table, m_tbl_def->m_dict_info, row_info.new_pk_slice,
row_info.new_pk_unpack_info, &value_slice, m_new_storage_record))) {
if (rc == HA_ERR_INVALID_NULL_ERROR) {
__XHANDLER_LOG(WARN, "XEngineDDL: convert null to not null failed, code is %d, table_name: %s", rc, table->s->table_name.str);
//if errcode is set, set HA_ERR_INVALID_NULL_ERROR is failed, that'ok
uint err = HA_EXIT_SUCCESS;
ki.status.compare_exchange_strong(err, HA_ERR_INVALID_NULL_ERROR);
return HA_EXIT_SUCCESS;
} else {
__XHANDLER_LOG(ERROR, "XEngineDDL: convert new record from old record error, code is %d, table_name: %s", rc, table->s->table_name.str);
return rc;
}
}
const auto cf = kd.get_cf();
if (xengine_enable_bulk_load_api /*&& THDVAR(table->in_use, bulk_load)*/ &&
!hidden_pk) {
/*
Write the primary key directly to an SST file using an SstFileWriter
*/
rc = bulk_load_key(row_info.tx, kd, row_info.new_pk_slice, value_slice, 0);
} else if (row_info.skip_unique_check) {
/*
It is responsibility of the user to make sure that the data being
inserted doesn't violate any unique keys.
*/
row_info.tx->get_blind_write_batch()->Put(cf, row_info.new_pk_slice,
value_slice);
} else if (row_info.tx->m_ddl_transaction) {
/*
DDL statement must check for unique key conflicts. For example:
ALTER TABLE tbl DROP PRIMARY KEY, ADD PRIMARY KEY(non_unique_column)
*/
row_info.tx->get_indexed_write_batch()->Put(cf, row_info.new_pk_slice,
value_slice);
} else {
const auto s = row_info.tx->put(cf, row_info.new_pk_slice, value_slice);
if (!s.ok()) {
if (s.IsBusy()) {
__XHANDLER_LOG(WARN, "XEngineDDL: duplicate entry is found during DML for key(%s) on pk:%d, table_name:%s",
row_info.new_pk_slice.ToString(true).c_str(),
kd.get_index_number(), table->s->table_name.str);
errkey = table->s->primary_key;
m_dupp_errkey = errkey;
rc = HA_ERR_FOUND_DUPP_KEY;
// ToDo confirm this is due to duplicate entry which should be handled by DDL
} else {
__XHANDLER_LOG(WARN, "XEngineDDL: failed to put record(%s:%s) for updating pk(%u), code is %d, table_name: %s",
row_info.new_pk_slice.ToString(true).c_str(),
value_slice.ToString(true).c_str(),
kd.get_index_number(), rc, table->s->table_name.str);
rc = row_info.tx->set_status_error(table->in_use, s, *m_pk_descr,
m_tbl_def.get());
}
}
}
return rc;
}
/** update secondary keys for new table in inplace_rebuild_ddl
@param[in] kd, xengine key definition
@param[in] row_info, struct store old-data and new-data
@param[in] altered_table MySQL new table
@return Success or Failure */
int ha_xengine::update_sk_for_new_table(const Xdb_key_def &kd,
Added_key_info &ki,
const struct update_row_info &row_info,
const TABLE *const altered_table)
{
uint new_packed_size;
uint old_packed_size;
xengine::common::Slice new_key_slice;
xengine::common::Slice new_value_slice;
xengine::common::Slice old_key_slice;
const uint key_id = kd.get_keyno();
/*
Can skip updating this key if none of the key fields have changed.
*/
if (row_info.old_data != nullptr && !m_update_scope.is_set(key_id)) {
return HA_EXIT_SUCCESS;
}
const bool store_row_debug_checksums = should_store_row_debug_checksums();
if (kd.pack_new_record(table, m_pack_buffer, row_info.new_data,
m_sk_packed_tuple, &m_sk_tails,
store_row_debug_checksums,
row_info.hidden_pk_id, 0, nullptr, altered_table,
m_tbl_def->m_dict_info, new_packed_size)) {
__XHANDLER_LOG(ERROR, "XEngineDDL: pack new record error, table_name: %s", table->s->table_name.str);
return HA_EXIT_FAILURE;
}
DBUG_ASSERT(new_packed_size <= m_max_packed_sk_len);
if (row_info.old_data != nullptr) {
// The old value
if (kd.pack_new_record(table, m_pack_buffer, row_info.old_data,
m_sk_packed_tuple_old, &m_sk_tails_old,
store_row_debug_checksums, row_info.hidden_pk_id, 0,
nullptr, altered_table, m_tbl_def->m_dict_info,
old_packed_size)) {
__XHANDLER_LOG(ERROR, "XEngineDDL: pack new record error, table_name: %s", table->s->table_name.str);
return HA_EXIT_FAILURE;
}
assert(old_packed_size <= m_max_packed_sk_len);
/*
Check if we are going to write the same value. This can happen when
one does
UPDATE tbl SET col='foo'
and we are looking at the row that already has col='foo'.
We also need to compare the unpack info. Suppose, the collation is
case-insensitive, and unpack info contains information about whether
the letters were uppercase and lowercase. Then, both 'foo' and 'FOO'
will have the same key value, but different data in unpack_info.
(note: anyone changing bytewise_compare should take this code into
account)
*/
if (old_packed_size == new_packed_size &&
m_sk_tails_old.get_current_pos() == m_sk_tails.get_current_pos() &&
memcmp(m_sk_packed_tuple_old, m_sk_packed_tuple, old_packed_size) ==
0 &&
memcmp(m_sk_tails_old.ptr(), m_sk_tails.ptr(),
m_sk_tails.get_current_pos()) == 0) {
return HA_EXIT_SUCCESS;
}
/*
Deleting entries from secondary index should skip locking, but
be visible to the transaction.
(also note that DDL statements do not delete rows, so this is not a DDL
statement)
*/
old_key_slice = xengine::common::Slice(
reinterpret_cast<const char *>(m_sk_packed_tuple_old), old_packed_size);
row_info.tx->get_indexed_write_batch()->SingleDelete(kd.get_cf(),
old_key_slice);
}
xengine::db::WriteBatchBase *write_batch;
write_batch = row_info.tx->get_indexed_write_batch();
new_key_slice = xengine::common::Slice(
reinterpret_cast<const char *>(m_sk_packed_tuple), new_packed_size);
new_value_slice =
xengine::common::Slice(reinterpret_cast<const char *>(m_sk_tails.ptr()),
m_sk_tails.get_current_pos());
write_batch->Put(kd.get_cf(), new_key_slice, new_value_slice);
return HA_EXIT_SUCCESS;
}
/** delete for new table in inplace_rebuild_ddl
@param[in] row_info, struct store old-data and new-data
@return Success or Failure */
int ha_xengine::delete_row_new_table(struct update_row_info &row_info) {
DBUG_ENTER_FUNC();
int ret = HA_EXIT_SUCCESS;
bool get_new_pk_flag = false;
for (auto &k : m_tbl_def->m_inplace_new_keys) {
const Xdb_key_def &kd = *k.first;
const TABLE *atab = k.second.altered_table;
if (k.second.step < Added_key_info::BUILDING_BASE_INDEX) {
continue;
}
/** generate new primary key **/
if (!get_new_pk_flag) {
if ((ret = get_new_pk_for_delete(&row_info, atab))) {
__XHANDLER_LOG(ERROR, "XEngineDDL: get new primary pk for rebuild ddl error, table_name: %s", table->s->table_name.str);
DBUG_RETURN(ret);
}
get_new_pk_flag = true;
}
Xdb_transaction *const tx = row_info.tx;
DBUG_ASSERT(tx != nullptr);
if (kd.is_primary_key()) {
const uint index = pk_index(atab, m_tbl_def->m_inplace_new_tdef);
xengine::common::Status s = delete_or_singledelete_new_table(
atab, m_tbl_def->m_inplace_new_tdef, index, row_info.tx, kd.get_cf(),
row_info.new_pk_slice);
if (!s.ok()) {
__XHANDLER_LOG(WARN, "XEngineDDL: failed to delete record(%s) on pk (%u) "
"of new table, code is %d, table_name: %s",
row_info.new_pk_slice.ToString(true).c_str(),
kd.get_index_number(), s.code(), table->s->table_name.str);
DBUG_RETURN(
tx->set_status_error(table->in_use, s, *m_pk_descr, m_tbl_def.get()));
}
} else {
uint packed_size;
if ((ret = kd.pack_new_record(table, m_pack_buffer, row_info.old_data,
m_sk_packed_tuple, &m_sk_tails,
false, row_info.hidden_pk_id, 0, nullptr,
atab, m_tbl_def->m_dict_info, packed_size))) {
__XHANDLER_LOG(ERROR, "XEngineDDL: pack new reocrd error, table_name: %s", table->s->table_name.str);
DBUG_RETURN(HA_EXIT_FAILURE);
}
DBUG_ASSERT(packed_size <= m_max_packed_sk_len);
xengine::common::Slice secondary_key_slice(
reinterpret_cast<const char *>(m_sk_packed_tuple), packed_size);
/* Deleting on secondary key doesn't need any locks: */
tx->get_indexed_write_batch()->SingleDelete(kd.get_cf(),
secondary_key_slice);
}
if (ret != HA_EXIT_SUCCESS) {
__XHANDLER_LOG(ERROR, "XEngineDDL: delete indexes error, code is %d, table_name: %s", ret, table->s->table_name.str);
DBUG_RETURN(ret);
}
}
DBUG_RETURN(ret);
}
/** Build new primary key for delete-stmt in storage format
@param[in] row_info, MySQL new/old record
@param[in] altered_table MySQL new table
@return SUCCESS/FAILURE */
int ha_xengine::get_new_pk_for_delete(struct update_row_info *const row_info,
const TABLE *altered_table)
{
DBUG_ENTER_FUNC();
uint size = 0;
DBUG_ASSERT(m_new_pk_descr != nullptr);
if (!has_hidden_pk(altered_table)) {
row_info->new_pk_unpack_info = &m_pk_unpack_info;
if (m_new_pk_descr->pack_new_record(
table, m_pack_buffer, row_info->old_data, m_pk_packed_tuple,
row_info->new_pk_unpack_info, false, 0, 0, nullptr, altered_table,
m_tbl_def->m_dict_info, size)) {
__XHANDLER_LOG(ERROR, "XEngineDDL: pack new record error, table_name: %s", table->s->table_name.str);
DBUG_RETURN(HA_EXIT_FAILURE);
}
DBUG_ASSERT(size <= m_max_packed_sk_len);
} else {
//we don't support drop primary key online-ddl
DBUG_ASSERT(has_hidden_pk(altered_table) && has_hidden_pk(table));
if (read_hidden_pk_id_from_rowkey(&row_info->hidden_pk_id, &m_last_rowkey)) {
__XHANDLER_LOG(ERROR, "XEngineDDL: read hidden_pk error, table_name: %s", table->s->table_name.str);
DBUG_RETURN(HA_EXIT_FAILURE);
}
//for new subtable_id changed, we can't use m_last_rowkey directly.
size =
m_new_pk_descr->pack_hidden_pk(row_info->hidden_pk_id, m_pk_packed_tuple);
}
row_info->new_pk_slice =
xengine::common::Slice((const char *)m_pk_packed_tuple, size);
DBUG_RETURN(HA_EXIT_SUCCESS);
}
/** Build new primary key for update-stmt in storage format
@param[in] row_info, MySQL new/old record
@param[in] altered_table MySQL new table
@return SUCCESS/FAILURE */
int ha_xengine::get_new_pk_for_update(struct update_row_info *const row_info, const TABLE *altered_table) {
uint size = 0;
/*
Get new row key for any insert, and any update where the pk is not hidden.
Row key for updates with hidden pk is handled below.
*/
if (!has_hidden_pk(altered_table)) {
row_info->hidden_pk_id = 0;
row_info->new_pk_unpack_info = &m_pk_unpack_info;
if (m_new_pk_descr->pack_new_record(
table, m_pack_buffer, row_info->new_data, m_pk_packed_tuple,
row_info->new_pk_unpack_info, false, 0, 0, nullptr, altered_table,
m_tbl_def->m_dict_info, size)) {
__XHANDLER_LOG(ERROR, "XEngineDDL: pack new record error, table_name: %s", table->s->table_name.str);
return HA_EXIT_FAILURE;
}
DBUG_ASSERT(size <= m_max_packed_sk_len);
} else {
//if new table has hidden_pk, there should be same with old table,
//hidden_pk already set in get_pk_for_update
DBUG_ASSERT(row_info->hidden_pk_id > 0);
size =
m_new_pk_descr->pack_hidden_pk(row_info->hidden_pk_id, m_pk_packed_tuple);
}
row_info->new_pk_slice =
xengine::common::Slice((const char *)m_pk_packed_tuple, size);
return HA_EXIT_SUCCESS;
}
/** fill duplicate value into new table record
TODO if pk is changed between old_table and new_table, it's hard to get
old_table_record from new_pk_slice.
1. if pk_changed, we use unpack_record get value from memcmp_key
2. we try to get old_record, and then fill new duplicate record
3. if step 2 failed(duplicate key deleted), we retry use unpack_record
@param[in] index duplicate xengine key definition
@param[in] new_table_arg MySQL Table Object
@param[in] dup_key duplicate key
@param[in] dup_val duplicate value
@param[in] rebuild ddl or not */
int ha_xengine::fill_new_duplicate_record(const Xdb_key_def *index,
TABLE *new_table_arg,
const xengine::common::Slice &dup_key,
const xengine::common::Slice &dup_val,
bool is_rebuild)
{
DBUG_ENTER_FUNC();
bool pk_def_changed = false;
for (auto &k : m_tbl_def->m_inplace_new_keys) {
if (k.first->is_primary_key()) {
pk_def_changed = !k.first->m_can_skip_unique_check;
break;
}
}
int res;
if (pk_def_changed && is_rebuild) {
if (index->is_primary_key()) {
DBUG_ASSERT(!index->is_hidden_primary_key());
if (index->get_support_icp_flag()) {
res = index->unpack_record_pk(new_table_arg, new_table_arg->record[0],
&dup_key, &dup_val, m_tbl_def->m_dict_info);
} else {
res = convert_record_from_storage_format(
&dup_key, &dup_val, new_table_arg->record[0], new_table_arg,
m_tbl_def->m_dict_info->m_null_bytes_in_rec,
m_tbl_def->m_dict_info->m_maybe_unpack_info, index,
m_tbl_def->m_dict_info->m_decoders_vect,
m_tbl_def->m_dict_info->m_instant_ddl_info,
m_tbl_def->m_dict_info->m_verify_row_debug_checksums,
m_tbl_def->m_dict_info->m_row_checksums_checked);
}
} else {
res = index->unpack_record(new_table_arg, new_table_arg->record[0],
&dup_key, &dup_val, false);
}
if (res) {
__XHANDLER_LOG(ERROR, "XEngineDDL: unable to unpack record from value");
}
} else {
//for utf8mb4_0900, try to use old-table record
if ((res = fill_old_table_record(new_table_arg, table, dup_key, index))) {
if (res != HA_ERR_KEY_NOT_FOUND) {
__XHANDLER_LOG(ERROR, "XEngineDDL: Fill old table record failed, table_name:%s",
table->s->table_name.str);
DBUG_RETURN(res);
}
}
if (res == HA_ERR_KEY_NOT_FOUND) {
DBUG_ASSERT(!index->is_primary_key());
if (index->unpack_record(new_table_arg, new_table_arg->record[0],
&dup_key, &dup_val,
false)) {
/* Should never reach here */
__XHANDLER_LOG(ERROR, "XEngineDDL: Error retrieving index entry, table_name:%s",
table->s->table_name.str);
DBUG_ASSERT(0);
}
__XHANDLER_LOG(INFO, "XEngineDDL: duplicated entry for index:%d is "
"deledted in old table:%s during online DML",
index->get_index_number(), table->s->table_name.str);
} else if ((res = set_duplicate_key_for_print(new_table_arg, table, index,
m_tbl_def->m_dict_info, is_rebuild))) {
__XHANDLER_LOG(ERROR,
"XEngineDDL: Set duplicate record for new table failed, "
"table_name:%s",
table->s->table_name.str);
DBUG_RETURN(res);
} else {
__XHANDLER_LOG(INFO,
"XEngineDDL: Set duplicate record for new table successsfully, "
"table_name:%s",
table->s->table_name.str);
}
}
DBUG_RETURN(HA_EXIT_SUCCESS);
}
/** Get lastest row by rowid, used find duplicate-key record.
There is a case may duplicate-key record is deleted, then we need use
unpack_record to get field-value from memcmp-key.But for utf8mb4_0900_ai_ci,
we can't get varchar/char value.
Only used by inplace ddl session.
*/
int ha_xengine::get_latest_row_by_rowid(uchar *const buf, const char *const rowid,
const uint rowid_size) {
DBUG_ENTER_FUNC();
DBUG_ASSERT(buf != nullptr);
DBUG_ASSERT(rowid != nullptr);
DBUG_ASSERT(table != nullptr);
int ret = HA_EXIT_SUCCESS;
xengine::common::Slice key_slice(rowid, rowid_size);
Xdb_transaction *const tx = get_or_create_tx(table->in_use);
DBUG_ASSERT(tx != nullptr);
bool found;
xengine::common::Status s;
s = tx->get_latest(m_pk_descr->get_cf(), key_slice, &m_retrieved_record);
if (!s.IsNotFound() && !s.ok()) {
__XHANDLER_LOG(WARN, "XEngienDDL: lock failed for key(%s) on index %u, code is %d, table_name is: %s",
key_slice.ToString(true).c_str(), m_pk_descr->get_index_number(),
s.code(), table->s->table_name.str);
DBUG_RETURN(tx->set_status_error(table->in_use, s, *m_pk_descr, m_tbl_def.get()));
}
found = !s.IsNotFound();
if (found) {
m_last_rowkey.copy((const char *)rowid, rowid_size, &my_charset_bin);
ret = convert_record_from_storage_format(&key_slice, m_retrieved_record,
buf, table);
} else {
/*
Note: we don't need to unlock the row. It is intentional that we keep
locks on rows that don't exist.
*/
ret = HA_ERR_KEY_NOT_FOUND;
}
DBUG_RETURN(ret);
}
/** We need fill old record before print duplicate key information.
@param[in] old_table
@param[in] dup_key, duplicate_key found during buiding index
@param[in] dup_kd, duplicate xengine key definition
@return SUCCESS/FAILURE */
int ha_xengine::fill_old_table_record(TABLE *new_table,
TABLE *old_table,
const xengine::common::Slice &dup_key,
const Xdb_key_def *dup_kd)
{
int ret = HA_EXIT_SUCCESS;
uint pk_size = 0;
if (dup_kd->is_primary_key()) {
//use old_pk index_number
memcpy(m_pk_packed_tuple, dup_key.data(), dup_key.size());
xdb_netbuf_store_index(m_pk_packed_tuple, m_pk_descr->get_index_number());
pk_size = dup_key.size();
} else {
pk_size =
dup_kd->get_primary_key_tuple(new_table, *m_pk_descr, &dup_key, m_pk_packed_tuple);
if (pk_size == XDB_INVALID_KEY_LEN) {
__XHANDLER_LOG(ERROR, "XEngineDDL: Get primary key failed, table_name: %s", table->s->table_name.str);
ret = HA_ERR_INTERNAL_ERROR;
}
}
if (ret != HA_EXIT_SUCCESS) {
__XHANDLER_LOG(ERROR, "XEngineDDL: Fill table record failed, table_name: %s", table->s->table_name.str);
return ret;
} else if ((ret = get_latest_row_by_rowid(old_table->record[0], reinterpret_cast<const char *>(m_pk_packed_tuple), pk_size))) {
if (ret == HA_ERR_KEY_NOT_FOUND) {
__XHANDLER_LOG(WARN, "XEngineDDL: duplicate record maybe deleted, table_name: %s", table->s->table_name.str);
} else {
__XHANDLER_LOG(ERROR, "XEngineDDL: Find duplicate record error, table_name: %s", table->s->table_name.str);
}
return ret;
}
return HA_EXIT_SUCCESS;
}
static int fill_duplicate_blob_val(Field_blob *const old_lob,
Field_blob *const new_lob)
{
const uint length_bytes = old_lob->pack_length() - portable_sizeof_char_ptr;
char *data_ptr;
memcpy(&data_ptr, old_lob->ptr + length_bytes, sizeof(uchar **));
if (new_lob->store(data_ptr, old_lob->get_length(), old_lob->charset())) {
return HA_EXIT_FAILURE;
}
return HA_EXIT_SUCCESS;
}
/** Set duplicate field value for new table, the value comes for old_reocrd.
If field is added in the ddl, we use default value to fill.
@param[in] new_table Altered_table
@param[in] old_table table before DDL operation
@param[in] kd, duplicate key found during ddl
@param[in] dict_info, dictionary used for ddl, such as col_map
@param[in] is_rebuild, if ddl is rebuild ddl
@return SUCCESS/FAILURE */
static int set_duplicate_key_for_print(
TABLE *new_table, TABLE *old_table, const Xdb_key_def *kd,
const std::shared_ptr<Xdb_inplace_ddl_dict_info>& dict_info, bool is_rebuild)
{
const bool is_secondary_key = kd->is_secondary_key();
const bool hidden_pk_exists = (new_table->s->primary_key == MAX_INDEXES);
const bool is_hidden_pk = kd->is_hidden_primary_key();
DBUG_ASSERT(!is_hidden_pk);
uint *col_map = nullptr;
if (is_rebuild) {
DBUG_ASSERT(dict_info != nullptr);
col_map = dict_info->m_col_map;
} else {
DBUG_ASSERT(dict_info == nullptr);
}
uint old_col_index = 0;
for (uint i = 0; i < kd->get_key_parts(); i++) {
/*
Hidden pk field is packed at the end of the secondary keys, but the SQL
layer does not know about it. Skip retrieving field if hidden pk.
*/
if ((is_secondary_key && hidden_pk_exists && i + 1 == kd->get_key_parts())) {
continue;
}
Field *field = kd->get_table_field_for_part_no(new_table, i);
Field *old_field = nullptr;
bool use_old_field = false;
if (is_rebuild) {
old_col_index = col_map[field->field_index];
if (old_col_index != (uint)(-1)) {
old_field = old_table->field[old_col_index];
use_old_field = true;
}
} else {
old_field = old_table->field[field->field_index];
use_old_field = true;
}
if (use_old_field) {
//If lob type is part of key, we need store blob value for new field
//Bug #24173631
if (old_field->real_type() == MYSQL_TYPE_BLOB ||
old_field->real_type() == MYSQL_TYPE_JSON) {
DBUG_ASSERT(field->real_type() == old_field->real_type());
if (fill_duplicate_blob_val((Field_blob *)old_field,
(Field_blob *)field)) {
__XHANDLER_LOG(ERROR,
"store duplicated blob field failed, index_id: %d",
kd->get_index_number());
return HA_EXIT_FAILURE;
}
} else {
// for non-duplicate sk, may be duplicated because of new primary-key.
// DBUG_ASSERT(!old_field->is_real_null());
memcpy(field->ptr, old_field->ptr, old_field->pack_length());
}
} else {
DBUG_ASSERT(is_rebuild);
// blob can not have default value.
DBUG_ASSERT(field->real_type() != MYSQL_TYPE_BLOB &&
field->real_type() != MYSQL_TYPE_JSON);
uint field_offset = field->ptr - new_table->record[0];
memcpy(field->ptr, new_table->s->default_values + field_offset,
field->pack_length());
}
field->set_notnull();
}
return HA_EXIT_SUCCESS;
}
} // namespace myx