polardbxengine/storage/innobase/include/dict0dd.ic

419 lines
13 KiB
Plaintext

/*****************************************************************************
Copyright (c) 2016, 2019, Oracle and/or its affiliates. All Rights Reserved.
This program is free software; you can redistribute it and/or modify it under
the terms of the GNU General Public License, version 2.0, as published by the
Free Software Foundation.
This program is also distributed with certain software (including but not
limited to OpenSSL) that is licensed under separate terms, as designated in a
particular file or component or in included license documentation. The authors
of MySQL hereby grant you an additional permission to link the program and
your derivative works with the separately licensed software that they have
included with MySQL.
This program is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
FOR A PARTICULAR PURPOSE. See the GNU General Public License, version 2.0,
for more details.
You should have received a copy of the GNU General Public License along with
this program; if not, write to the Free Software Foundation, Inc.,
51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
*****************************************************************************/
/** @file include/dict0dd.ic
Data dictionary access
Created 10/2016 Jimmy Yamg
*******************************************************/
#include "sql_table.h"
#ifndef UNIV_HOTBACKUP
#include "dict0crea.h"
#include "dict0priv.h"
#include "field.h"
#include "sql/table.h"
#include "sql_base.h"
/** Get the explicit dd::Tablespace::id of a partition.
@param[in] partition partition or subpartition
@return the explicit dd::Tablespace::id
@retval dd::INVALID_OBJECT_ID if there is no explicit tablespace */
inline dd::Object_id dd_get_space_id(const dd::Partition &partition) {
ut_ad(dd_part_is_stored(&partition));
ut_ad(dd_table_is_partitioned(partition.table()));
ut_ad((partition.table().subpartition_type() == dd::Table::ST_NONE) ==
(partition.parent() == nullptr));
dd::Object_id id = partition.tablespace_id();
if (id == dd::INVALID_OBJECT_ID) {
if (const dd::Partition *parent = partition.parent()) {
/* If there is no explicit TABLESPACE for the
subpartition, fall back to the TABLESPACE
of the partition. */
id = parent->tablespace_id();
}
}
if (id == dd::INVALID_OBJECT_ID) {
/* If there is no explicit TABLESPACE for the partition,
fall back to the TABLESPACE of the table. */
id = partition.table().tablespace_id();
}
return (id);
}
#ifdef DBUG_OFF
#define path_to_tablename(t, n, l, f) filename_to_tablename((t), (n), (l))
#else
#define path_to_tablename(t, n, l, f) filename_to_tablename((t), (n), (l), (f))
#endif /* DBUG_OFF */
/** Parse a table file name into table name and database name.
Note the table name may have trailing TMP_POSTFIX for temporary table name.
@param[in] tbl_name table name including database and table name
@param[in,out] dd_db_name database name buffer to be filled
@param[in,out] dd_tbl_name table name buffer to be filled
@param[in,out] dd_part_name partition name to be filled if not nullptr
@param[in,out] dd_sub_name sub-partition name to be filled if not nullptr
@param[in,out] is_temp true if it is a temporary table name which
ends with TMP_POSTFIX.
@return true if table name is parsed properly, false if the table name
is invalid */
inline bool dd_parse_tbl_name(const char *tbl_name, char *dd_db_name,
char *dd_tbl_name, char *dd_part_name,
char *dd_sub_name, bool *is_temp) {
char db_buf[MAX_DATABASE_NAME_LEN + 1];
char tbl_buf[MAX_TABLE_NAME_LEN + 1];
ulint db_len = dict_get_db_name_len(tbl_name);
if (db_len == 0) {
return (false);
}
ut_ad(db_len <= MAX_DATABASE_NAME_LEN);
memcpy(db_buf, tbl_name, db_len);
db_buf[db_len] = 0;
size_t tbl_len = strlen(tbl_name) - db_len - 1;
memcpy(tbl_buf, tbl_name + db_len + 1, tbl_len);
tbl_buf[tbl_len] = 0;
filename_to_tablename(db_buf, dd_db_name, MAX_DATABASE_NAME_LEN + 1, true);
auto is_part = strstr(tbl_buf, PART_SEPARATOR);
if (is_part != nullptr) {
*is_part = '\0';
} else {
if (tbl_len > TMP_POSTFIX_LEN &&
strcmp(tbl_buf + tbl_len - TMP_POSTFIX_LEN, TMP_POSTFIX) == 0) {
tbl_buf[tbl_len - TMP_POSTFIX_LEN] = '\0';
if (is_temp != nullptr) {
*is_temp = true;
}
}
}
path_to_tablename(tbl_buf, dd_tbl_name, MAX_TABLE_NAME_LEN + 1, true);
if (is_part == nullptr || dd_part_name == nullptr) {
return (true);
}
auto is_sub = strstr(is_part + PART_SEPARATOR_LEN, SUB_PART_SEPARATOR);
char *trailing_sharp;
if (is_sub == nullptr) {
trailing_sharp = strchr(is_part + PART_SEPARATOR_LEN, '#');
} else {
*is_sub = '\0';
trailing_sharp = strchr(is_sub + SUB_PART_SEPARATOR_LEN, '#');
}
if (trailing_sharp != nullptr) {
ut_ad(strcmp(trailing_sharp, TMP_POSTFIX) == 0);
*trailing_sharp = '\0';
if (is_temp != nullptr) {
*is_temp = true;
}
}
if (is_part != nullptr && dd_part_name != nullptr) {
filename_to_tablename(is_part + PART_SEPARATOR_LEN, dd_part_name,
MAX_TABLE_NAME_LEN + 1);
if (is_sub != nullptr && dd_sub_name != nullptr) {
filename_to_tablename(is_sub + SUB_PART_SEPARATOR_LEN, dd_sub_name,
MAX_TABLE_NAME_LEN + 1);
}
}
return (true);
}
/** Look up a column in a table using the system_charset_info collation.
@param[in] dd_table data dictionary table
@param[in] name column name
@return the column
@retval nullptr if not found */
inline const dd::Column *dd_find_column(const dd::Table *dd_table,
const char *name) {
for (const dd::Column *c : dd_table->columns()) {
if (!my_strcasecmp(system_charset_info, c->name().c_str(), name)) {
return (c);
}
}
return (nullptr);
}
/** Add a hidden column when creating a table.
@param[in,out] dd_table table containing user columns and indexes
@param[in] name hidden column name
@param[in] length length of the column, in bytes
@param[in] type column type
@return the added column, or NULL if there already was a column by that name */
inline dd::Column *dd_add_hidden_column(dd::Table *dd_table, const char *name,
uint length,
dd::enum_column_types type) {
if (const dd::Column *c = dd_find_column(dd_table, name)) {
my_error(ER_WRONG_COLUMN_NAME, MYF(0), c->name().c_str());
return (nullptr);
}
dd::Column *col = dd_table->add_column();
col->set_hidden(dd::Column::enum_hidden_type::HT_HIDDEN_SE);
col->set_name(name);
col->set_type(type);
col->set_nullable(false);
col->set_char_length(length);
col->set_collation_id(my_charset_bin.number);
return (col);
}
/** Add a hidden index element at the end.
@param[in,out] index created index metadata
@param[in] column column of the index */
inline void dd_add_hidden_element(dd::Index *index, const dd::Column *column) {
dd::Index_element *e = index->add_element(const_cast<dd::Column *>(column));
e->set_hidden(true);
e->set_order(dd::Index_element::ORDER_ASC);
}
/** Initialize a hidden unique B-tree index.
@param[in,out] index created index metadata
@param[in] name name of the index
@param[in] column column of the index
@return the initialized index */
inline dd::Index *dd_set_hidden_unique_index(dd::Index *index, const char *name,
const dd::Column *column) {
index->set_name(name);
index->set_hidden(true);
index->set_algorithm(dd::Index::IA_BTREE);
index->set_type(dd::Index::IT_UNIQUE);
index->set_engine(innobase_hton_name);
dd_add_hidden_element(index, column);
return (index);
}
/** Returns a cached table object based on table id.
@param[in] table_id table id
@param[in] dict_locked TRUE=data dictionary locked
@return table, NULL if does not exist */
inline dict_table_t *dd_table_open_on_id_in_mem(table_id_t table_id,
bool dict_locked) {
dict_table_t *table;
if (!dict_locked) {
mutex_enter(&dict_sys->mutex);
}
ut_ad(mutex_own(&dict_sys->mutex));
/* Look for the table ID in the hash table */
ulint fold = ut_fold_ull(table_id);
HASH_SEARCH(id_hash, dict_sys->table_id_hash, fold, dict_table_t *, table,
ut_ad(table->cached), table->id == table_id);
ut_ad(table == nullptr || table->cached);
if (table != nullptr) {
if (table->can_be_evicted) {
dict_move_to_mru(table);
}
table->acquire();
}
if (!dict_locked) {
mutex_exit(&dict_sys->mutex);
}
return (table);
}
/** Returns a cached table object based on table name.
@param[in] table_id table id
@param[in] dict_locked TRUE=data dictionary locked
@return table, NULL if does not exist */
inline dict_table_t *dd_table_open_on_name_in_mem(const char *name,
ibool dict_locked) {
dict_table_t *table = nullptr;
if (!dict_locked) {
mutex_enter(&dict_sys->mutex);
}
table = dict_table_check_if_in_cache_low(name);
ut_ad(!table || table->cached);
if (table != nullptr) {
if (table->can_be_evicted) {
dict_move_to_mru(table);
}
table->acquire();
}
if (!dict_locked) {
mutex_exit(&dict_sys->mutex);
}
return (table);
}
/** Check whether there exist a column named as "FTS_DOC_ID", which is
reserved for InnoDB FTS Doc ID
@param[in] thd MySQL thread handle
@param[in] form information on table
columns and indexes
@param[out] doc_id_col Doc ID column number if
there exist a FTS_DOC_ID column,
ULINT_UNDEFINED if column is of the
wrong type/name/size
@return true if there exist a "FTS_DOC_ID" column */
inline bool create_table_check_doc_id_col(THD *thd, const TABLE *form,
ulint *doc_id_col) {
for (ulint i = 0; i < form->s->fields; i++) {
const Field *field;
ulint col_type;
ulint col_len;
ulint unsigned_type;
field = form->field[i];
if (!field->stored_in_db) continue;
col_type = get_innobase_type_from_mysql_type(&unsigned_type, field);
col_len = field->pack_length();
if (innobase_strcasecmp(field->field_name, FTS_DOC_ID_COL_NAME) == 0) {
/* Note the name is case sensitive due to
our internal query parser */
if (col_type == DATA_INT && !field->real_maybe_null() &&
col_len == sizeof(doc_id_t) &&
(strcmp(field->field_name, FTS_DOC_ID_COL_NAME) == 0)) {
*doc_id_col = i;
} else {
push_warning_printf(thd, Sql_condition::SL_WARNING,
ER_ILLEGAL_HA_CREATE_OPTION,
"InnoDB: FTS_DOC_ID column must be"
" of BIGINT NOT NULL type, and named"
" in all capitalized characters");
my_error(ER_WRONG_COLUMN_NAME, MYF(0), field->field_name);
*doc_id_col = ULINT_UNDEFINED;
}
return (true);
}
}
return (false);
}
/** Return a display name for the row format
@param[in] row_format Row Format
@return row format name */
inline const char *get_row_format_name(enum row_type row_format) {
switch (row_format) {
case ROW_TYPE_COMPACT:
return ("ROW_FORMAT=COMPACT");
case ROW_TYPE_COMPRESSED:
return ("ROW_FORMAT=COMPRESSED");
case ROW_TYPE_DYNAMIC:
return ("ROW_FORMAT=DYNAMIC");
case ROW_TYPE_REDUNDANT:
return ("ROW_FORMAT=REDUNDANT");
case ROW_TYPE_DEFAULT:
return ("ROW_FORMAT=DEFAULT");
case ROW_TYPE_FIXED:
return ("ROW_FORMAT=FIXED");
case ROW_TYPE_PAGED:
return ("ROW_FORMAT=PAGED");
case ROW_TYPE_NOT_USED:
break;
}
return ("ROW_FORMAT");
}
/** Acquire a shared metadata lock.
@param[in,out] thd current thread
@param[out] mdl metadata lock
@param[in] db schema name
@param[in] table table name
@retval false if acquired
@retval true if failed (my_error() will have been called) */
UNIV_INLINE MY_ATTRIBUTE((warn_unused_result)) bool dd_mdl_acquire(
THD *thd, MDL_ticket **mdl, const char *db, const char *table) {
bool ret = false;
char table_name[MAX_TABLE_NAME_LEN + 1];
const char *namep = table;
if (innobase_get_lower_case_table_names() == 2) {
ulint len = strlen(table);
ut_memcpy(table_name, table, len);
table_name[len] = '\0';
innobase_casedn_str(table_name);
namep = table_name;
} else {
#ifndef _WIN32
if (innobase_get_lower_case_table_names() == 1) {
ulint len = strlen(table);
ut_memcpy(table_name, table, len);
table_name[len] = '\0';
innobase_casedn_str(table_name);
namep = table_name;
}
#endif /* !_WIN32 */
}
ret = dd::acquire_shared_table_mdl(thd, db, namep, false, mdl);
return (ret);
}
/** Get the version attribute.
@param[in] dd_table dd::Table
@return table dynamic metadata version if exists, otherwise 0 */
inline uint64_t dd_get_version(const dd::Table *dd_table) {
uint64_t version = 0;
const dd::Properties &p = dd_table->se_private_data();
if (!p.exists(dd_table_key_strings[DD_TABLE_VERSION]) ||
p.get(dd_table_key_strings[DD_TABLE_VERSION],
reinterpret_cast<uint64 *>(&version))) {
return (0);
}
return (version);
}
#endif /* !UNIV_HOTBACKUP */