515 lines
16 KiB
C++
515 lines
16 KiB
C++
/* Copyright (c) 2018, 2021, Alibaba and/or its affiliates. All rights reserved.
|
|
This program is free software; you can redistribute it and/or modify
|
|
it under the terms of the GNU General Public License, version 2.0,
|
|
as published by the Free Software Foundation.
|
|
This program is also distributed with certain software (including
|
|
but not limited to OpenSSL) that is licensed under separate terms,
|
|
as designated in a particular file or component or in included license
|
|
documentation. The authors of MySQL/PolarDB-X Engine hereby grant you an
|
|
additional permission to link the program and your derivative works with the
|
|
separately licensed software that they have included with
|
|
MySQL/PolarDB-X Engine.
|
|
This program is distributed in the hope that it will be useful,
|
|
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
GNU General Public License, version 2.0, for more details.
|
|
You should have received a copy of the GNU General Public License
|
|
along with this program; if not, write to the Free Software
|
|
Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */
|
|
|
|
|
|
#include "sql/recycle_bin/recycle_scheduler.h"
|
|
#include "mysql/components/services/log_builtins.h"
|
|
#include "mysql/plugin.h"
|
|
#include "mysql/psi/mysql_cond.h"
|
|
#include "mysql/psi/mysql_memory.h"
|
|
#include "mysql/psi/mysql_mutex.h"
|
|
#include "mysql/psi/psi_base.h"
|
|
#include "sql/mysqld.h" // my_localhost slave_net_timeout
|
|
#include "sql/mysqld_thd_manager.h" // Global_THD_manager
|
|
#include "sql/protocol_classic.h"
|
|
#include "sql/recycle_bin/recycle_table.h"
|
|
#include "sql/sql_base.h"
|
|
#include "sql/sql_class.h"
|
|
#include "sql/sql_lex.h"
|
|
#include "sql/sql_parse.h"
|
|
#include "sql/sql_table.h"
|
|
#include "sql/sql_time.h"
|
|
#include "sql/thd_raii.h"
|
|
#include "sql/transaction.h"
|
|
#include "sql/tztime.h"
|
|
#include "thr_mutex.h"
|
|
|
|
namespace im {
|
|
namespace recycle_bin {
|
|
|
|
/* Recycle scheduler thread loop interval (seconds). */
|
|
ulonglong recycle_scheduler_interval = 1;
|
|
/* Whether run recycle scheduler thread */
|
|
bool opt_recycle_scheduler = true;
|
|
/* Whether print the detailed information when purge table */
|
|
bool recycle_scheduler_purge_table_print = false;
|
|
/** Record the recycle scheduler version changes if modified
|
|
recycle_scheduler_rentention */
|
|
static ulonglong recycle_scheduler_loop_version = 1;
|
|
/**
|
|
Log error message when purge table
|
|
|
|
@param[in] table_name Dropping table name
|
|
@param[in] reason Why failed
|
|
*/
|
|
static void log_purge_error(const char *table_name, const char *reason) {
|
|
std::stringstream ss;
|
|
ss << "Fail to purge table " << table_name << " since " << reason;
|
|
LogErr(WARNING_LEVEL, ER_RECYCLE_BIN, ss.str().c_str());
|
|
}
|
|
|
|
/* Error handling constructor */
|
|
Recycle_scheduler_error_handler::Recycle_scheduler_error_handler(
|
|
THD *thd, const char *table)
|
|
: m_thd(thd), m_table(table), m_error(false) {
|
|
thd->push_internal_handler(this);
|
|
}
|
|
|
|
/* Ignore those error when recycle scheduler */
|
|
bool Recycle_scheduler_error_handler::handle_condition(
|
|
THD *, uint, const char *, Sql_condition::enum_severity_level *level,
|
|
const char *message) {
|
|
/* Log error */
|
|
log_purge_error(m_table, message);
|
|
if (*level == Sql_condition::SL_ERROR) m_error = true;
|
|
return true;
|
|
}
|
|
/* Pop current error handling */
|
|
Recycle_scheduler_error_handler::~Recycle_scheduler_error_handler() {
|
|
m_thd->pop_internal_handler();
|
|
}
|
|
|
|
Recycle_scheduler::Recycle_scheduler()
|
|
: m_state(State::INITED), m_scheduler_thd(nullptr) {
|
|
mysql_mutex_init(key_LOCK_state_recycle_scheduler, &m_state_mutex,
|
|
MY_MUTEX_INIT_FAST);
|
|
mysql_cond_init(key_COND_state_recycle_scheduler, &m_state_cond);
|
|
mysql_mutex_init(key_LOCK_sleep_recycle_scheduler, &m_sleep_mutex,
|
|
MY_MUTEX_INIT_FAST);
|
|
mysql_cond_init(key_COND_sleep_recycle_scheduler, &m_sleep_cond);
|
|
}
|
|
|
|
Recycle_scheduler::~Recycle_scheduler() {
|
|
DBUG_ENTER("Recycle_scheduler::~Recycle_scheduler");
|
|
DBUG_ASSERT(m_state == State::INITED);
|
|
mysql_mutex_destroy(&m_state_mutex);
|
|
mysql_cond_destroy(&m_state_cond);
|
|
mysql_mutex_destroy(&m_sleep_mutex);
|
|
mysql_cond_destroy(&m_sleep_cond);
|
|
m_scheduler_thd = nullptr;
|
|
DBUG_VOID_RETURN;
|
|
}
|
|
|
|
void Recycle_scheduler::lock_state() { mysql_mutex_lock(&m_state_mutex); }
|
|
void Recycle_scheduler::unlock_state() { mysql_mutex_unlock(&m_state_mutex); }
|
|
|
|
void Recycle_scheduler::lock_sleep() { mysql_mutex_lock(&m_sleep_mutex); }
|
|
void Recycle_scheduler::unlock_sleep() { mysql_mutex_unlock(&m_sleep_mutex); }
|
|
/**
|
|
Reset the state and m_scheduler_thd variable.
|
|
*/
|
|
void Recycle_scheduler::reset(bool need_lock) {
|
|
DBUG_ENTER("Recycle_scheduler::reset");
|
|
if (need_lock) lock_state();
|
|
|
|
mysql_mutex_assert_owner(&m_state_mutex);
|
|
m_state = State::INITED;
|
|
m_scheduler_thd = nullptr;
|
|
|
|
if (need_lock) unlock_state();
|
|
|
|
DBUG_VOID_RETURN;
|
|
}
|
|
|
|
/**
|
|
Really drop the recycled table.
|
|
|
|
@param[in] recycle_table target table
|
|
|
|
@retval false success
|
|
@retval true failure
|
|
*/
|
|
bool Recycle_scheduler::purge_table(struct Recycle_show_result *recycle_table) {
|
|
const char *table;
|
|
THD *thd;
|
|
DBUG_ENTER("Recycle_scheduler::purge_table");
|
|
table = recycle_table->table.str;
|
|
thd = m_scheduler_thd;
|
|
|
|
/* Overide error handling */
|
|
Recycle_scheduler_error_handler error_handler(thd, table);
|
|
Disable_binlog_guard binlog_guard(thd);
|
|
Disable_autocommit_guard autocommit_guard(thd);
|
|
|
|
thd->set_query_id(next_query_id());
|
|
|
|
mysql_reset_thd_for_next_command(thd);
|
|
lex_start(thd);
|
|
|
|
DBUG_ASSERT(!thd->in_sub_stmt);
|
|
/* Statement transaction still should not be started. */
|
|
DBUG_ASSERT(thd->get_transaction()->is_empty(Transaction_ctx::STMT));
|
|
|
|
DBUG_ASSERT(!trans_check_state(thd));
|
|
|
|
if (trans_commit_implicit(thd)) DBUG_RETURN(true);
|
|
|
|
thd->mdl_context.release_transactional_locks();
|
|
|
|
bool res = drop_base_recycle_table(thd, table);
|
|
bool result;
|
|
if (res || error_handler.is_error() || thd->killed) {
|
|
result = trans_rollback_stmt(thd);
|
|
result |= trans_rollback_implicit(thd);
|
|
} else {
|
|
result = trans_commit_stmt(thd);
|
|
result |= trans_commit_implicit(thd);
|
|
if (result) trans_rollback(thd);
|
|
}
|
|
close_thread_tables(thd);
|
|
thd->mdl_context.release_transactional_locks();
|
|
if (result) {
|
|
log_purge_error(table, "transaction commit or rollback failed");
|
|
}
|
|
thd->end_statement();
|
|
thd->cleanup_after_query();
|
|
free_root(thd->mem_root, MYF(MY_KEEP_PREALLOC));
|
|
|
|
DBUG_RETURN(false);
|
|
}
|
|
|
|
/**
|
|
Check whether recycle scheduler thread is running.
|
|
|
|
@ret true Running
|
|
@ret false Not running
|
|
*/
|
|
bool Recycle_scheduler::is_running() {
|
|
lock_state();
|
|
bool ret = (m_state == State::RUNNING);
|
|
unlock_state();
|
|
return ret;
|
|
}
|
|
|
|
/* Enter the condition wait stage */
|
|
void Recycle_scheduler::enter_cond(THD *thd, struct timespec *abstime,
|
|
mysql_mutex_t *mutex, mysql_cond_t *cond,
|
|
const PSI_stage_info *stage,
|
|
const char *src_func, const char *src_file,
|
|
uint src_line) {
|
|
DBUG_ENTER("Recycle_scheduler::enter_cond");
|
|
if (thd)
|
|
thd->enter_cond(cond, mutex, stage, NULL, src_func, src_file, src_line);
|
|
if (!abstime)
|
|
mysql_cond_wait(cond, mutex);
|
|
else
|
|
mysql_cond_timedwait(cond, mutex, abstime);
|
|
|
|
if (thd) {
|
|
mysql_mutex_unlock(mutex);
|
|
thd->exit_cond(NULL, src_func, src_file, src_line);
|
|
mysql_mutex_lock(mutex);
|
|
}
|
|
DBUG_VOID_RETURN;
|
|
}
|
|
/**
|
|
Stop the recycle scheduler task
|
|
|
|
@retval true failure
|
|
@retval false success
|
|
*/
|
|
bool Recycle_scheduler::stop() {
|
|
THD *thd = current_thd;
|
|
DBUG_ENTER("Recycle_scheduler::stop");
|
|
struct timespec abs_timeout;
|
|
|
|
lock_state();
|
|
|
|
if (m_state == State::INITED) goto end;
|
|
|
|
if (m_state == State::STOPPING) {
|
|
while (m_state == State::STOPPING) {
|
|
set_timespec(&abs_timeout, 5);
|
|
enter_cond(thd, &abs_timeout, &m_state_mutex, &m_state_cond,
|
|
&waiting_for_recycle_scheduler_thread_stop, __func__, __FILE__,
|
|
__LINE__);
|
|
}
|
|
goto end;
|
|
}
|
|
do {
|
|
m_state = State::STOPPING;
|
|
|
|
/* Wake the recycle scheduler thread */
|
|
mysql_mutex_lock(&m_scheduler_thd->LOCK_thd_data);
|
|
m_scheduler_thd->awake(THD::KILL_CONNECTION);
|
|
mysql_mutex_unlock(&m_scheduler_thd->LOCK_thd_data);
|
|
|
|
enter_cond(thd, NULL, &m_state_mutex, &m_state_cond,
|
|
&waiting_for_recycle_scheduler_thread_stop, __func__, __FILE__,
|
|
__LINE__);
|
|
} while (m_state == State::STOPPING);
|
|
|
|
end:
|
|
unlock_state();
|
|
DBUG_RETURN(false);
|
|
}
|
|
|
|
/**
|
|
Init the context in new thread.
|
|
|
|
@param[in] thd scheduler thread
|
|
*/
|
|
void static post_init_recycle_thread(THD *thd) {
|
|
DBUG_ENTER("post_init_recycle_thread");
|
|
thd->store_globals();
|
|
Global_THD_manager *thd_manager = Global_THD_manager::get_instance();
|
|
thd_manager->add_thd(thd);
|
|
thd_manager->inc_thread_running();
|
|
DBUG_VOID_RETURN;
|
|
}
|
|
|
|
/**
|
|
Deinit the scheduler thread context when stop
|
|
|
|
@param[in] thd scheduler thread
|
|
*/
|
|
void static deinit_recycle_thread(THD *thd) {
|
|
DBUG_ENTER("deinit_recycle_thread");
|
|
Global_THD_manager *thd_manager = Global_THD_manager::get_instance();
|
|
|
|
thd->proc_info = "Clearing";
|
|
thd->get_protocol_classic()->end_net();
|
|
thd->release_resources();
|
|
thd_manager->remove_thd(thd);
|
|
thd_manager->dec_thread_running();
|
|
delete thd;
|
|
DBUG_VOID_RETURN;
|
|
}
|
|
|
|
/**
|
|
Convert the YYMMDDHHMMSS number to epoch time.
|
|
*/
|
|
static my_time_t get_epoch_time(ulonglong nr) {
|
|
MYSQL_TIME t_mysql_time;
|
|
bool not_used;
|
|
|
|
/* Convert longlong to MYSQL_TIME (UTC) */
|
|
my_longlong_to_datetime_with_warn(nr, &t_mysql_time, MYF(0));
|
|
|
|
/* Convert MYSQL_TIME to epoch time */
|
|
return my_tz_OFFSET0->TIME_to_gmt_sec(&t_mysql_time, ¬_used);
|
|
}
|
|
|
|
/**
|
|
Wakeup the sleep recycle scheduler.
|
|
*/
|
|
void Recycle_scheduler::wakeup() {
|
|
recycle_scheduler_loop_version++;
|
|
if (is_running()) {
|
|
lock_sleep();
|
|
mysql_cond_broadcast(&m_sleep_cond);
|
|
unlock_sleep();
|
|
}
|
|
}
|
|
/**
|
|
Run the background thread to purge the recycled table.
|
|
|
|
@param[in] thd scheduler thread
|
|
*/
|
|
void Recycle_scheduler::run(THD *thd) {
|
|
my_time_t curr_time_sec;
|
|
struct timespec abs_timeout;
|
|
DBUG_ENTER("Recycle_scheduler::run");
|
|
LogErr(SYSTEM_LEVEL, ER_RECYCLE_SCHEDULER, "background thread start");
|
|
|
|
std::vector<Recycle_show_result *> container;
|
|
MEM_ROOT mem_root;
|
|
init_sql_alloc(key_memory_recycle, &mem_root, MEM_ROOT_BLOCK_SIZE,
|
|
MEM_ROOT_PREALLOC);
|
|
while(is_running()) {
|
|
thd_proc_info(thd, "Recycle scheduler collect tables");
|
|
ulonglong lc_version = recycle_scheduler_loop_version;
|
|
ulonglong lc_retention = recycle_bin_retention;
|
|
curr_time_sec = my_micro_time() / 1000000;
|
|
ulonglong min_future = ULLONG_MAX;
|
|
|
|
/* Retrieve all the recycled tables */
|
|
get_recycle_tables(thd, &mem_root, &container);
|
|
|
|
/**
|
|
Next round time calculation rule:
|
|
1. Wait to minimum recycled time if any recycled table.
|
|
2. Wait recycle_scheduler_interval if not any recycled table.
|
|
3. Wake up if modify the recycle_bin_retention.
|
|
*/
|
|
for (auto it = container.cbegin(); it != container.cend(); it++) {
|
|
if (is_running()) {
|
|
long long delta = curr_time_sec - get_epoch_time((*it)->recycled_time);
|
|
|
|
/* If the keeped time has exceeded retention, then purge it. */
|
|
if (delta >= 0 && static_cast<ulonglong>(delta) >= lc_retention) {
|
|
if (recycle_scheduler_purge_table_print) {
|
|
std::stringstream ss;
|
|
ss << "purge table " << (*it)->table.str;
|
|
LogErr(INFORMATION_LEVEL, ER_RECYCLE_BIN, ss.str().c_str());
|
|
}
|
|
purge_table(*it);
|
|
} else {
|
|
/* Get the next round least wait time */
|
|
ulonglong lc_future = lc_retention - delta;
|
|
min_future = min_future > lc_future ? lc_future : min_future;
|
|
}
|
|
} else {
|
|
break;
|
|
}
|
|
}
|
|
container.clear();
|
|
mem_root.Clear();
|
|
thd_proc_info(thd, "Recycle scheduler waiting for next round");
|
|
|
|
/** Maybe awaked by stop command here, so instead of my_sleep() */
|
|
if (min_future == ULLONG_MAX)
|
|
set_timespec(&abs_timeout, recycle_scheduler_interval);
|
|
else
|
|
set_timespec(&abs_timeout, min_future);
|
|
|
|
lock_sleep();
|
|
if (!thd->is_killed() && (lc_version == recycle_scheduler_loop_version)) {
|
|
enter_cond(thd, &abs_timeout, &m_sleep_mutex, &m_sleep_cond,
|
|
&stage_waiting_for_next_round, __func__, __FILE__, __LINE__);
|
|
}
|
|
unlock_sleep();
|
|
}
|
|
lock_state();
|
|
deinit_recycle_thread(thd);
|
|
reset(false);
|
|
mysql_cond_broadcast(&m_state_cond);
|
|
unlock_state();
|
|
|
|
LogErr(SYSTEM_LEVEL, ER_RECYCLE_SCHEDULER, "background thread end");
|
|
DBUG_VOID_RETURN;
|
|
}
|
|
|
|
/**
|
|
Init the recycle scheduler thd context:
|
|
|
|
1) Allow infinite access privileges
|
|
2) Host is my_localhost
|
|
3) Net is null (unnecessary for background thread)
|
|
*/
|
|
void static pre_init_recycle_thread(THD *thd) {
|
|
DBUG_ENTER("pre_init_recycle_thread");
|
|
thd->security_context()->set_master_access(~0ULL);
|
|
thd->security_context()->set_host_or_ip_ptr(my_localhost,
|
|
strlen(my_localhost));
|
|
thd->get_protocol_classic()->init_net(NULL);
|
|
thd->security_context()->set_user_ptr(C_STRING_WITH_LEN("recycle_scheduler"));
|
|
thd->slave_thread = 0;
|
|
thd->system_thread = SYSTEM_THREAD_RECYCLE_SCHEDULER;
|
|
|
|
/* DAEMON didn't react to kill */
|
|
thd->set_command(COM_DAEMON);
|
|
thd->set_new_thread_id();
|
|
|
|
thd->proc_info = "Initialized";
|
|
thd->set_time();
|
|
|
|
thd->variables.lock_wait_timeout = LONG_TIMEOUT;
|
|
|
|
thd->variables.transaction_read_only = false;
|
|
thd->tx_read_only = false;
|
|
DBUG_VOID_RETURN;
|
|
}
|
|
/**
|
|
Recycle scheduler backgroud thread run logic.
|
|
|
|
@param[in] arg recycle scheduler parameters
|
|
*/
|
|
static void *recycle_scheduler_thread(void *arg) {
|
|
struct Recycle_scheduler_param *rs_param = nullptr;
|
|
THD *thd = nullptr;
|
|
Recycle_scheduler *scheduler = nullptr;
|
|
DBUG_ENTER("recycle_scheduler_thread");
|
|
|
|
rs_param = static_cast<struct Recycle_scheduler_param *>(arg);
|
|
thd = rs_param->thd;
|
|
scheduler = rs_param->scheduler;
|
|
destroy_object<struct Recycle_scheduler_param>(rs_param);
|
|
|
|
if (my_thread_init()) {
|
|
LogErr(ERROR_LEVEL, ER_RECYCLE_SCHEDULER, "init new thread");
|
|
thd->proc_info = "Clearing";
|
|
thd->get_protocol_classic()->end_net();
|
|
scheduler->reset(true);
|
|
delete thd;
|
|
DBUG_RETURN(0);
|
|
}
|
|
|
|
thd->thread_stack = (char *)&thd;
|
|
mysql_thread_set_psi_id(thd->thread_id());
|
|
|
|
post_init_recycle_thread(thd);
|
|
|
|
/* Run the recycle purge task */
|
|
scheduler->run(thd);
|
|
|
|
my_thread_end();
|
|
DBUG_RETURN(0);
|
|
}
|
|
/**
|
|
Start the recycle scheduler thread, Only log error message if failed.
|
|
|
|
@retval true failure
|
|
@retval false success
|
|
*/
|
|
bool Recycle_scheduler::start() {
|
|
bool res = false;
|
|
THD *new_thd = nullptr;
|
|
my_thread_handle thread;
|
|
struct Recycle_scheduler_param *rs_param = nullptr;
|
|
DBUG_ENTER("Recycle_scheduler::start");
|
|
lock_state();
|
|
|
|
/* Maybe running or stopping already */
|
|
if (m_state > State::INITED) goto end;
|
|
|
|
if (!(new_thd = new THD)) {
|
|
LogErr(ERROR_LEVEL, ER_RECYCLE_SCHEDULER, "create new thd");
|
|
res = true;
|
|
goto end;
|
|
}
|
|
|
|
pre_init_recycle_thread(new_thd);
|
|
rs_param = allocate_recycle_object<struct Recycle_scheduler_param>();
|
|
rs_param->thd = new_thd;
|
|
rs_param->scheduler = this;
|
|
m_scheduler_thd = new_thd;
|
|
|
|
m_state = State::RUNNING;
|
|
|
|
if (mysql_thread_create(key_thread_recycle_scheduler, &thread,
|
|
&connection_attrib, recycle_scheduler_thread,
|
|
(void *)rs_param)) {
|
|
LogErr(ERROR_LEVEL, ER_RECYCLE_SCHEDULER, "create new thread");
|
|
new_thd->proc_info = "Clearing";
|
|
new_thd->get_protocol_classic()->end_net();
|
|
reset(false);
|
|
delete new_thd;
|
|
res = true;
|
|
destroy_object<struct Recycle_scheduler_param>(rs_param);
|
|
}
|
|
|
|
end:
|
|
unlock_state();
|
|
DBUG_RETURN(res);
|
|
}
|
|
|
|
} /* namespace recycle_bin */
|
|
} /* namespace im */
|