polardbxengine/storage/ndb/plugin/ndb_thd_ndb.cc

241 lines
7.5 KiB
C++

/*
Copyright (c) 2011, 2019, Oracle and/or its affiliates. All rights reserved.
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License, version 2.0,
as published by the Free Software Foundation.
This program is also distributed with certain software (including
but not limited to OpenSSL) that is licensed under separate terms,
as designated in a particular file or component or in included license
documentation. The authors of MySQL hereby grant you an additional
permission to link the program and your derivative works with the
separately licensed software that they have included with MySQL.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License, version 2.0, for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
*/
#include "storage/ndb/plugin/ndb_thd_ndb.h"
#include "my_dbug.h"
#include "mysql/plugin.h" // thd_get_thread_id
#include "mysqld_error.h"
#include "sql/derror.h"
#include "sql/sql_error.h"
#include "storage/ndb/plugin/ndb_ddl_transaction_ctx.h"
#include "storage/ndb/plugin/ndb_log.h"
#include "storage/ndb/plugin/ndb_thd.h"
/*
Default value for max number of transactions createable against NDB from
the handler. Should really be 2 but there is a transaction to much allocated
when lock table is used, and one extra to used for global schema lock.
*/
static const int MAX_TRANSACTIONS = 4;
Thd_ndb *Thd_ndb::seize(THD *thd) {
DBUG_TRACE;
Thd_ndb *thd_ndb = new Thd_ndb(thd);
if (thd_ndb == NULL) return NULL;
if (thd_ndb->ndb->init(MAX_TRANSACTIONS) != 0) {
DBUG_PRINT("error", ("Ndb::init failed, error: %d message: %s",
thd_ndb->ndb->getNdbError().code,
thd_ndb->ndb->getNdbError().message));
delete thd_ndb;
thd_ndb = NULL;
} else {
thd_ndb->ndb->setCustomData64(thd_get_thread_id(thd));
}
return thd_ndb;
}
void Thd_ndb::release(Thd_ndb *thd_ndb) {
DBUG_TRACE;
delete thd_ndb;
}
bool Thd_ndb::recycle_ndb(void) {
DBUG_TRACE;
DBUG_PRINT("enter", ("ndb: 0x%lx", (long)ndb));
DBUG_ASSERT(global_schema_lock_trans == NULL);
DBUG_ASSERT(trans == NULL);
delete ndb;
if ((ndb = new Ndb(connection, "")) == NULL) {
DBUG_PRINT("error", ("failed to allocate Ndb object"));
return false;
}
if (ndb->init(MAX_TRANSACTIONS) != 0) {
delete ndb;
ndb = NULL;
DBUG_PRINT("error", ("Ndb::init failed, %d message: %s",
ndb->getNdbError().code, ndb->getNdbError().message));
return false;
} else {
ndb->setCustomData64(thd_get_thread_id(m_thd));
}
/* Reset last commit epoch for this 'session'. */
m_last_commit_epoch_session = 0;
/* Update m_connect_count to avoid false failures of ::valid_ndb() */
m_connect_count = connection->get_connect_count();
return true;
}
bool Thd_ndb::valid_ndb(void) const {
// The ndb object should be valid as long as a
// global schema lock transaction is ongoing
if (global_schema_lock_trans) return true;
// The ndb object should be valid as long as a
// transaction is ongoing
if (trans) return true;
if (unlikely(m_connect_count != connection->get_connect_count()))
return false;
return true;
}
void Thd_ndb::init_open_tables() {
count = 0;
m_error = false;
open_tables.clear();
}
bool Thd_ndb::check_option(Options option) const { return (options & option); }
void Thd_ndb::set_option(Options option) { options |= option; }
/*
Used for every additional row operation, to update the guesstimate
of pending bytes to send, and to check if it is now time to flush a batch.
*/
bool Thd_ndb::add_row_check_if_batch_full(uint size) {
if (m_unsent_bytes == 0) free_root(&m_batch_mem_root, MY_MARK_BLOCKS_FREE);
uint unsent = m_unsent_bytes;
unsent += size;
m_unsent_bytes = unsent;
return unsent >= m_batch_size;
}
bool Thd_ndb::check_trans_option(Trans_options option) const {
return (trans_options & option);
}
void Thd_ndb::set_trans_option(Trans_options option) {
#ifndef DBUG_OFF
if (check_trans_option(TRANS_TRANSACTIONS_OFF))
DBUG_PRINT("info", ("Disabling transactions"));
if (check_trans_option(TRANS_INJECTED_APPLY_STATUS))
DBUG_PRINT("info", ("Statement has written to ndb_apply_status"));
if (check_trans_option(TRANS_NO_LOGGING))
DBUG_PRINT("info", ("Statement is not using logging"));
#endif
trans_options |= option;
}
void Thd_ndb::reset_trans_options(void) {
DBUG_PRINT("info", ("Resetting trans_options"));
trans_options = 0;
}
/*
Push to THD's condition stack
@param severity Severity of the pushed condition
@param code Error code to use for the pushed condition
@param[in] fmt printf-like format string
@param[in] args Arguments
*/
static void push_condition(THD *thd,
Sql_condition::enum_severity_level severity,
uint code, const char *fmt, va_list args)
MY_ATTRIBUTE((format(printf, 4, 0)));
static void push_condition(THD *thd,
Sql_condition::enum_severity_level severity,
uint code, const char *fmt, va_list args) {
DBUG_ASSERT(fmt);
// Assemble the message
char msg_buf[512];
vsnprintf(msg_buf, sizeof(msg_buf), fmt, args);
push_warning(thd, severity, code, msg_buf);
// Workaround problem where Ndb_local_connection can't access the
// warnings produced when running a SQL query, instead detect
// if this a binlog thread and print the warning also to log.
// NOTE! This can be removed when BUG#27507543 has been implemented
// and instead log these warnings in a more controlled/selective manner
// in Ndb_local_connection.
if (ndb_thd_is_binlog_thread(thd)) {
ndb_log_warning("%s", msg_buf);
}
}
void Thd_ndb::push_warning(const char *fmt, ...) const {
const uint code = ER_GET_ERRMSG;
va_list args;
va_start(args, fmt);
push_condition(m_thd, Sql_condition::SL_WARNING, code, fmt, args);
va_end(args);
}
void Thd_ndb::push_warning(uint code, const char *fmt, ...) const {
va_list args;
va_start(args, fmt);
push_condition(m_thd, Sql_condition::SL_WARNING, code, fmt, args);
va_end(args);
}
void Thd_ndb::push_ndb_error_warning(const NdbError &ndberr) const {
if (ndberr.status == NdbError::TemporaryError) {
push_warning_printf(m_thd, Sql_condition::SL_WARNING,
ER_GET_TEMPORARY_ERRMSG,
ER_THD(m_thd, ER_GET_TEMPORARY_ERRMSG), ndberr.code,
ndberr.message, "NDB");
} else {
push_warning_printf(m_thd, Sql_condition::SL_WARNING, ER_GET_ERRMSG,
ER_THD(m_thd, ER_GET_ERRMSG), ndberr.code,
ndberr.message, "NDB");
}
}
void Thd_ndb::set_ndb_error(const NdbError &ndberr, const char *message) const {
push_ndb_error_warning(ndberr);
my_printf_error(ER_GET_ERRMSG, "%s", MYF(0), message);
}
Ndb_DDL_transaction_ctx *Thd_ndb::get_ddl_transaction_ctx(
bool create_if_not_exist) {
if (!m_ddl_ctx && create_if_not_exist) {
/* There is no DDL context yet. Instantiate it. */
m_ddl_ctx = new Ndb_DDL_transaction_ctx(m_thd);
}
return m_ddl_ctx;
}
void Thd_ndb::clear_ddl_transaction_ctx() {
DBUG_ASSERT(m_ddl_ctx != nullptr);
delete m_ddl_ctx;
m_ddl_ctx = nullptr;
}