851 lines
30 KiB
C++
851 lines
30 KiB
C++
/* Copyright (c) 2018, 2021, Alibaba and/or its affiliates. All rights reserved.
|
|
This program is free software; you can redistribute it and/or modify
|
|
it under the terms of the GNU General Public License, version 2.0,
|
|
as published by the Free Software Foundation.
|
|
This program is also distributed with certain software (including
|
|
but not limited to OpenSSL) that is licensed under separate terms,
|
|
as designated in a particular file or component or in included license
|
|
documentation. The authors of MySQL/PolarDB-X Engine hereby grant you an
|
|
additional permission to link the program and your derivative works with the
|
|
separately licensed software that they have included with
|
|
MySQL/PolarDB-X Engine.
|
|
This program is distributed in the hope that it will be useful,
|
|
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
GNU General Public License, version 2.0, for more details.
|
|
You should have received a copy of the GNU General Public License
|
|
along with this program; if not, write to the Free Software
|
|
Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */
|
|
|
|
#include "consensus_admin.h"
|
|
|
|
#include "sql/binlog.h"
|
|
#include "sql/consensus_log_manager.h"
|
|
#include "sql/bl_consensus_log.h"
|
|
#include "sql/appliedindex_checker.h"
|
|
#include "sql/auth/auth_acls.h"
|
|
#include "sql/mysqld_thd_manager.h"
|
|
#include "sql/rpl_slave.h"
|
|
#include "sql/rpl_msr.h"
|
|
#include "sql/rpl_mi.h"
|
|
#include "sql/rpl_rli.h"
|
|
#include "consensus_recovery_manager.h"
|
|
|
|
#include "my_config.h"
|
|
|
|
#include <errno.h>
|
|
#include <fcntl.h>
|
|
#include <limits.h>
|
|
#include <stdio.h>
|
|
#include <stdlib.h>
|
|
|
|
#include "my_dbug.h"
|
|
#include "sql/sql_lex.h"
|
|
#include "sql/debug_sync.h" // DEBUG_SYNC
|
|
#include "sql/log.h"
|
|
//#include "sql/binlog_ext.h"
|
|
#include "replica_read_manager.h"
|
|
|
|
using std::max;
|
|
|
|
/* wait no commit index when exec event in xpaxos_channel */
|
|
bool opt_disable_wait_commitindex = false;
|
|
|
|
/* Helper function for SHOW BINLOG/RELAYLOG EVENTS */
|
|
|
|
bool show_consensuslog_events(THD *thd, MYSQL_BIN_LOG *binary_log)
|
|
{
|
|
Protocol *protocol = thd->get_protocol();
|
|
List<Item> field_list;
|
|
const char *errmsg = 0;
|
|
bool ret = TRUE;
|
|
// IO_CACHE log;
|
|
Binlog_file_reader binlog_file_reader(opt_master_verify_checksum);
|
|
// File file = -1;
|
|
int old_max_allowed_packet = thd->variables.max_allowed_packet;
|
|
LOG_INFO linfo;
|
|
|
|
DBUG_ENTER("show_consensuslog_events");
|
|
|
|
DBUG_ASSERT(thd->lex->sql_command == SQLCOM_SHOW_CONSENSUSLOG_EVENTS);
|
|
|
|
Format_description_log_event *description_event = new
|
|
Format_description_log_event(); /* MySQL 4.0 by default ********* now is 4 ???? */
|
|
|
|
if (binary_log->is_open())
|
|
{
|
|
LEX_CONSENSUS_INFO *lex_ci = &thd->lex->consensus;
|
|
SELECT_LEX_UNIT *unit = thd->lex->unit;
|
|
ha_rows event_count, limit_start, limit_end;
|
|
uint64 pos = BIN_LOG_HEADER_SIZE; // user-friendly
|
|
char search_file_name[FN_REFLEN], *name;
|
|
uint64 consensus_index = lex_ci->log_index;
|
|
mysql_mutex_t *log_lock = binary_log->get_log_lock();
|
|
Log_event* ev;
|
|
|
|
unit->set_limit(thd, thd->lex->current_select());
|
|
limit_start = unit->offset_limit_cnt;
|
|
limit_end = unit->select_limit_cnt;
|
|
// find right position
|
|
if (binary_log->consensus_get_log_position(consensus_index, search_file_name, &pos))
|
|
{
|
|
errmsg = "Could not find target log";
|
|
goto err;
|
|
}
|
|
|
|
name = search_file_name;
|
|
|
|
linfo.index_file_offset = 0;
|
|
|
|
if (binary_log->find_log_pos(&linfo, name, true/*need_lock_index=true*/))
|
|
{
|
|
errmsg = "Could not find target log";
|
|
goto err;
|
|
}
|
|
|
|
mysql_mutex_lock(&thd->LOCK_thd_data);
|
|
thd->current_linfo = &linfo;
|
|
mysql_mutex_unlock(&thd->LOCK_thd_data);
|
|
|
|
// if ((file = open_binlog_file(&log, linfo.log_file_name, &errmsg)) < 0)
|
|
// goto err;
|
|
if (binlog_file_reader.open(linfo.log_file_name))
|
|
goto err;
|
|
|
|
my_off_t end_pos;
|
|
/*
|
|
Acquire LOCK_log only for the duration to calculate the
|
|
log's end position. LOCK_log should be acquired even while
|
|
we are checking whether the log is active log or not.
|
|
*/
|
|
mysql_mutex_lock(log_lock);
|
|
if (binary_log->is_active(linfo.log_file_name))
|
|
{
|
|
LOG_INFO li;
|
|
binary_log->get_current_log(&li, false /*LOCK_log is already acquired*/);
|
|
end_pos = li.pos;
|
|
}
|
|
else
|
|
{
|
|
end_pos = my_b_filelength(binlog_file_reader.get_io_cache());
|
|
}
|
|
mysql_mutex_unlock(log_lock);
|
|
|
|
/*
|
|
to account binlog event header size
|
|
*/
|
|
thd->variables.max_allowed_packet += MAX_LOG_EVENT_HEADER;
|
|
|
|
DEBUG_SYNC(thd, "after_show_binlog_event_found_file");
|
|
/*
|
|
open_binlog_file() sought to position 4.
|
|
Read the first event in case it's a Format_description_log_event, to
|
|
know the format. If there's no such event, we are 3.23 or 4.x. This
|
|
code, like before, can't read 3.23 binlogs.
|
|
This code will fail on a mixed relay log (one which has Format_desc then
|
|
Rotate then Format_desc).
|
|
*/
|
|
// ev = Log_event::read_log_event(&log, (mysql_mutex_t*)0, description_event,
|
|
// opt_master_verify_checksum);
|
|
ev = binlog_file_reader.read_event_object();
|
|
if (ev)
|
|
{
|
|
if (ev->get_type_code() == binary_log::FORMAT_DESCRIPTION_EVENT)
|
|
{
|
|
delete description_event;
|
|
description_event = (Format_description_log_event*)ev;
|
|
}
|
|
else
|
|
delete ev;
|
|
}
|
|
|
|
// my_b_seek(&log, pos);
|
|
binlog_file_reader.seek(pos);
|
|
|
|
if (!description_event->is_valid())
|
|
{
|
|
errmsg = "Invalid Format_description event; could be out of memory";
|
|
goto err;
|
|
}
|
|
|
|
// for (event_count = 0;
|
|
// (ev = Log_event::read_log_event(&log, (mysql_mutex_t*)0,
|
|
// description_event,
|
|
// opt_master_verify_checksum)); )
|
|
for (event_count = 0; (ev = binlog_file_reader.read_event_object()); )
|
|
{
|
|
DEBUG_SYNC(thd, "wait_in_show_binlog_events_loop");
|
|
if (ev->get_type_code() == binary_log::FORMAT_DESCRIPTION_EVENT)
|
|
description_event->common_footer->checksum_alg =
|
|
ev->common_footer->checksum_alg;
|
|
if (event_count >= limit_start &&
|
|
ev->net_send(protocol, linfo.log_file_name, pos))
|
|
{
|
|
errmsg = "Net error";
|
|
delete ev;
|
|
goto err;
|
|
}
|
|
|
|
pos = my_b_tell(binlog_file_reader.get_io_cache());
|
|
delete ev;
|
|
|
|
if (++event_count >= limit_end || pos >= end_pos)
|
|
break;
|
|
}
|
|
|
|
if (event_count < limit_end && binlog_file_reader.has_fatal_error())
|
|
{
|
|
errmsg = "Wrong offset or I/O error";
|
|
goto err;
|
|
}
|
|
|
|
}
|
|
// Check that linfo is still on the function scope.
|
|
DEBUG_SYNC(thd, "after_show_consensuslog_events");
|
|
|
|
ret = FALSE;
|
|
|
|
err:
|
|
delete description_event;
|
|
#if 0
|
|
if (file >= 0)
|
|
{
|
|
end_io_cache(&log);
|
|
mysql_file_close(file, MYF(MY_WME));
|
|
}
|
|
#endif
|
|
if (errmsg)
|
|
{
|
|
my_error(ER_ERROR_WHEN_EXECUTING_COMMAND, MYF(0),
|
|
"SHOW CONSENSUSLOG EVENTS", errmsg);
|
|
}
|
|
else
|
|
my_eof(thd);
|
|
|
|
mysql_mutex_lock(&thd->LOCK_thd_data);
|
|
thd->current_linfo = 0;
|
|
mysql_mutex_unlock(&thd->LOCK_thd_data);
|
|
thd->variables.max_allowed_packet = old_max_allowed_packet;
|
|
DBUG_RETURN(ret);
|
|
}
|
|
|
|
/**
|
|
Execute a SHOW CONSENSUSLOG EVENTS statement.
|
|
|
|
@param thd Pointer to THD object for the client thread executing the
|
|
statement.
|
|
|
|
@retval FALSE success
|
|
@retval TRUE failure
|
|
*/
|
|
bool mysql_show_consensuslog_events(THD* thd)
|
|
{
|
|
List<Item> field_list;
|
|
DBUG_ENTER("mysql_show_consensuslog_events");
|
|
|
|
DBUG_ASSERT(thd->lex->sql_command == SQLCOM_SHOW_CONSENSUSLOG_EVENTS);
|
|
|
|
Log_event::init_show_field_list(&field_list);
|
|
if (thd->send_result_metadata(&field_list,
|
|
Protocol::SEND_NUM_ROWS | Protocol::SEND_EOF))
|
|
DBUG_RETURN(TRUE);
|
|
|
|
/*
|
|
Wait for handlers to insert any pending information
|
|
into the binlog. For e.g. ndb which updates the binlog asynchronously
|
|
this is needed so that the uses sees all its own commands in the binlog
|
|
*/
|
|
ha_binlog_wait(thd);
|
|
consensus_log_manager.lock_consensus(TRUE);
|
|
MYSQL_BIN_LOG *log = consensus_log_manager.get_status() == BINLOG_WORKING ? &mysql_bin_log : &consensus_log_manager.get_relay_log_info()->relay_log;
|
|
bool res = show_consensuslog_events(thd, log);
|
|
consensus_log_manager.unlock_consensus();
|
|
DBUG_RETURN(res);
|
|
}
|
|
|
|
/**
|
|
Execute a SHOW CONSENSUS LOGS statement.
|
|
|
|
@param thd Pointer to THD object for the client thread executing the
|
|
statement.
|
|
|
|
@retval FALSE success
|
|
@retval TRUE failure
|
|
*/
|
|
bool show_consensus_logs(THD* thd)
|
|
{
|
|
IO_CACHE *index_file;
|
|
LOG_INFO cur;
|
|
File file;
|
|
char fname[FN_REFLEN];
|
|
List<Item> field_list;
|
|
size_t length;
|
|
size_t cur_dir_len;
|
|
Protocol *protocol= thd->get_protocol();
|
|
DBUG_ENTER("show_consensuslogs");
|
|
|
|
consensus_log_manager.lock_consensus(TRUE);
|
|
MYSQL_BIN_LOG *log = consensus_log_manager.get_status() == BINLOG_WORKING ? &mysql_bin_log : &consensus_log_manager.get_relay_log_info()->relay_log;
|
|
if (!log->is_open())
|
|
{
|
|
consensus_log_manager.unlock_consensus();
|
|
my_error(ER_NO_BINARY_LOGGING, MYF(0));
|
|
DBUG_RETURN(TRUE);
|
|
}
|
|
|
|
field_list.push_back(new Item_empty_string("Log_name", 255));
|
|
field_list.push_back(new Item_return_int("File_size", 20,
|
|
MYSQL_TYPE_LONGLONG));
|
|
field_list.push_back(new Item_return_int("Start_log_index", 20,
|
|
MYSQL_TYPE_LONGLONG));
|
|
|
|
if (thd->send_result_metadata(&field_list, Protocol::SEND_NUM_ROWS |
|
|
Protocol::SEND_EOF))
|
|
DBUG_RETURN(TRUE);
|
|
|
|
mysql_mutex_lock(log->get_log_lock());
|
|
DEBUG_SYNC(thd, "show_binlogs_after_lock_log_before_lock_index");
|
|
log->lock_index();
|
|
index_file= log->get_index_file();
|
|
|
|
log->raw_get_current_log(&cur); // dont take mutex
|
|
mysql_mutex_unlock(log->get_log_lock()); // lockdep, OK
|
|
|
|
cur_dir_len= dirname_length(cur.log_file_name);
|
|
|
|
reinit_io_cache(index_file, READ_CACHE, (my_off_t) 0, 0, 0);
|
|
|
|
/* The file ends with EOF or empty line */
|
|
while ((length=my_b_gets(index_file, fname, sizeof(fname))) > 1)
|
|
{
|
|
size_t dir_len;
|
|
ulonglong file_length= 0; // Length if open fails
|
|
fname[--length] = '\0'; // remove the newline
|
|
|
|
protocol->start_row();
|
|
dir_len= dirname_length(fname);
|
|
length-= dir_len;
|
|
protocol->store_string(fname + dir_len, length, &my_charset_bin);
|
|
|
|
if (!(strncmp(fname+dir_len, cur.log_file_name+cur_dir_len, length)))
|
|
file_length= cur.pos; /* The active log, use the active position */
|
|
else
|
|
{
|
|
/* this is an old log, open it and find the size */
|
|
if ((file= mysql_file_open(key_file_binlog,
|
|
fname, O_RDONLY | O_SHARE | O_BINARY,
|
|
MYF(0))) >= 0)
|
|
{
|
|
file_length= (ulonglong) mysql_file_seek(file, 0L, MY_SEEK_END, MYF(0));
|
|
mysql_file_close(file, MYF(0));
|
|
}
|
|
}
|
|
protocol->store(file_length);
|
|
|
|
ulonglong start_index = consensus_log_manager.get_log_file_index()->get_start_index_of_file(std::string(fname));
|
|
protocol->store(start_index);
|
|
if (protocol->end_row())
|
|
{
|
|
DBUG_PRINT("info", ("stopping dump thread because protocol->write failed at line %d", __LINE__));
|
|
goto err;
|
|
}
|
|
}
|
|
if(index_file->error == -1)
|
|
goto err;
|
|
log->unlock_index();
|
|
consensus_log_manager.unlock_consensus();
|
|
my_eof(thd);
|
|
DBUG_RETURN(FALSE);
|
|
|
|
err:
|
|
log->unlock_index();
|
|
consensus_log_manager.unlock_consensus();
|
|
DBUG_RETURN(TRUE);
|
|
}
|
|
|
|
static bool invalid_on_consensus_limited(enum_sql_command cmd, bool no_write_to_binlog)
|
|
{
|
|
/* cases for setting no_write_to_binlog, according to sql_yacc.yy */
|
|
/* alter/optimize may execute slowly, just check it ahead. */
|
|
if (cmd == SQLCOM_ALTER_TABLE || cmd == SQLCOM_OPTIMIZE)
|
|
return no_write_to_binlog? false: true;
|
|
|
|
/* update(2017.12.12): allow prepare statement */
|
|
/* update(2018.01.22): disallow change master */
|
|
/* update(2018.04.10): allow follower install/uninstall/show plugins */
|
|
/* update(2018.11.21): allow follower check table */
|
|
return (cmd > SQLCOM_SELECT && cmd <= SQLCOM_DROP_INDEX) ||
|
|
(cmd == SQLCOM_LOAD) ||
|
|
(cmd == SQLCOM_GRANT) ||
|
|
(cmd == SQLCOM_CHANGE_MASTER) ||
|
|
(cmd >= SQLCOM_CREATE_DB && cmd < SQLCOM_CHECK) ||
|
|
(cmd >= SQLCOM_ASSIGN_TO_KEYCACHE && cmd < SQLCOM_FLUSH) ||
|
|
(cmd >= SQLCOM_DELETE_MULTI && cmd <= SQLCOM_UPDATE_MULTI) ||
|
|
(cmd >= SQLCOM_CREATE_USER && cmd <= SQLCOM_REVOKE_ALL)||
|
|
(cmd >= SQLCOM_CREATE_PROCEDURE && cmd <= SQLCOM_SHOW_STATUS_FUNC) ||
|
|
(cmd >= SQLCOM_CREATE_VIEW && cmd <= SQLCOM_DROP_TRIGGER) ||
|
|
(cmd == SQLCOM_ALTER_TABLESPACE) ||
|
|
(cmd == SQLCOM_BINLOG_BASE64_EVENT) ||
|
|
(cmd >= SQLCOM_CREATE_SERVER && cmd <= SQLCOM_DROP_EVENT) ||
|
|
(cmd >= SQLCOM_GET_DIAGNOSTICS && cmd <= SQLCOM_SHOW_CREATE_USER) ||
|
|
(cmd == SQLCOM_ALTER_INSTANCE);
|
|
}
|
|
|
|
static bool invalid_on_logger_limited(enum_sql_command cmd) {
|
|
return (cmd == SQLCOM_START_XPAXOS_REPLICATION) ||
|
|
(cmd == SQLCOM_STOP_XPAXOS_REPLICATION) ||
|
|
(cmd == SQLCOM_CHANGE_MASTER);
|
|
}
|
|
|
|
static bool invalid_on_consensus_force_recovery_limited(enum_sql_command cmd) {
|
|
return (cmd == SQLCOM_START_XPAXOS_REPLICATION) ||
|
|
(cmd == SQLCOM_STOP_XPAXOS_REPLICATION);
|
|
}
|
|
|
|
int consensus_command_limit(THD *thd) {
|
|
/* rw check: leader read-write, others read-only */
|
|
bool reject_query = FALSE;
|
|
bool is_leader = FALSE;
|
|
DBUG_ENTER("consensus_command_limit");
|
|
// dd_upgrade execute inner sql before consensus module startup
|
|
if (!opt_initialize && consensus_ptr != NULL)
|
|
{
|
|
if (consensus_ptr->getState() == alisql::Paxos::LEADER)
|
|
{
|
|
is_leader = TRUE;
|
|
if (consensus_ptr->getTerm() != consensus_log_manager.get_current_term())
|
|
reject_query = TRUE;
|
|
}
|
|
else
|
|
{
|
|
is_leader = FALSE;
|
|
if (invalid_on_consensus_limited(thd->lex->sql_command, thd->lex->no_write_to_binlog))
|
|
reject_query = TRUE;
|
|
}
|
|
}
|
|
if (reject_query && !thd->slave_thread)
|
|
{
|
|
if ((thd->security_context()->master_access() & SUPER_ACL) == 0)
|
|
{
|
|
// normal account
|
|
if (is_leader)
|
|
my_error(ER_CONSENSUS_LEADER_NOT_ALLOWED, MYF(0));
|
|
else
|
|
my_error(ER_CONSENSUS_FOLLOWER_NOT_ALLOWED, MYF(0));
|
|
DBUG_RETURN(1);
|
|
}
|
|
else if (thd->variables.opt_force_revise == FALSE &&
|
|
(thd->variables.option_bits & OPTION_BIN_LOG)) // arg is only used for super account
|
|
{
|
|
// super account
|
|
if (!opt_cluster_log_type_instance)
|
|
{
|
|
// log type node allow super user to write data with sql-log-bin off
|
|
if (is_leader && invalid_on_consensus_limited(thd->lex->sql_command, thd->lex->no_write_to_binlog))
|
|
{
|
|
my_error(ER_CONSENSUS_LEADER_NOT_ALLOWED, MYF(0));
|
|
DBUG_RETURN(1);
|
|
}
|
|
if (!is_leader)
|
|
{
|
|
my_error(ER_CONSENSUS_FOLLOWER_NOT_ALLOWED, MYF(0));
|
|
DBUG_RETURN(1);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
/* logger node check: logger is not allowed to do configure change unless force_revise is set to TRUE */
|
|
if (opt_cluster_log_type_instance &&
|
|
invalid_on_logger_limited(thd->lex->sql_command) &&
|
|
thd->variables.opt_force_revise == FALSE) {
|
|
my_error(ER_CONSENSUS_LOG_TYPE_NODE, MYF(0));
|
|
DBUG_RETURN(1);
|
|
}
|
|
|
|
/* Consensus related commands are not allowed in consensus_force_recovery mode */
|
|
if (opt_consensus_force_recovery &&
|
|
invalid_on_consensus_force_recovery_limited(thd->lex->sql_command)) {
|
|
my_error(ER_CONSENSUS_SERVER_NOT_READY, MYF(0));
|
|
DBUG_RETURN(1);
|
|
}
|
|
|
|
DBUG_RETURN(0);
|
|
}
|
|
|
|
class Kill_all_conn : public Do_THD_Impl
|
|
{
|
|
public:
|
|
Kill_all_conn() {}
|
|
|
|
virtual void operator()(THD *thd_to_kill)
|
|
{
|
|
mysql_mutex_lock(&thd_to_kill->LOCK_thd_data);
|
|
|
|
/* kill all connections */
|
|
if (thd_to_kill->security_context()->has_account_assigned()
|
|
&& thd_to_kill->killed != THD::KILL_CONNECTION
|
|
&& !thd_to_kill->slave_thread)
|
|
thd_to_kill->awake(THD::KILL_CONNECTION);
|
|
|
|
mysql_mutex_unlock(&thd_to_kill->LOCK_thd_data);
|
|
}
|
|
};
|
|
|
|
void killall_threads()
|
|
{
|
|
Kill_all_conn kill_all_conn;
|
|
Global_THD_manager *thd_manager = Global_THD_manager::get_instance();
|
|
thd_manager->do_for_all_thd(&kill_all_conn);
|
|
}
|
|
|
|
class Kill_all_dump_conn : public Do_THD_Impl
|
|
{
|
|
public:
|
|
Kill_all_dump_conn() {}
|
|
|
|
virtual void operator()(THD *thd_to_kill)
|
|
{
|
|
mysql_mutex_lock(&thd_to_kill->LOCK_thd_data);
|
|
|
|
/* Kill all binlog dump connections */
|
|
if (thd_to_kill->security_context()->has_account_assigned()
|
|
&& (thd_to_kill->get_command() == COM_BINLOG_DUMP
|
|
|| thd_to_kill->get_command() == COM_BINLOG_DUMP_GTID)
|
|
&& thd_to_kill->killed != THD::KILL_CONNECTION
|
|
&& !thd_to_kill->slave_thread)
|
|
thd_to_kill->awake(THD::KILL_CONNECTION);
|
|
|
|
mysql_mutex_unlock(&thd_to_kill->LOCK_thd_data);
|
|
}
|
|
};
|
|
|
|
void killall_dump_threads()
|
|
{
|
|
Kill_all_dump_conn Kill_all_dump_conn;
|
|
Global_THD_manager *thd_manager = Global_THD_manager::get_instance();
|
|
thd_manager->do_for_all_thd(&Kill_all_dump_conn);
|
|
}
|
|
|
|
int start_consensus_apply_threads()
|
|
{
|
|
DBUG_ENTER("start_consensus_apply_threads");
|
|
Master_info *mi = NULL;
|
|
int thread_mask = SLAVE_SQL;
|
|
int error = 0;
|
|
channel_map.wrlock();
|
|
if (!opt_skip_slave_start || !opt_initialize)
|
|
{
|
|
for (mi_map::iterator it = channel_map.begin(); it != channel_map.end(); it++)
|
|
{
|
|
mi = it->second;
|
|
|
|
// Todo: mi must be itself
|
|
/* If server id is not set, start_slave_thread() will say it */
|
|
if (mi && channel_map.is_xpaxos_replication_channel_name(mi->get_channel()))
|
|
{
|
|
/* same as in start_slave() cache the global var values into rli's members */
|
|
mi->rli->opt_slave_parallel_workers = opt_mts_slave_parallel_workers;
|
|
mi->rli->checkpoint_group = opt_mts_checkpoint_group;
|
|
if (mts_parallel_option == MTS_PARALLEL_TYPE_DB_NAME)
|
|
mi->rli->channel_mts_submode = MTS_PARALLEL_TYPE_DB_NAME;
|
|
else
|
|
mi->rli->channel_mts_submode = MTS_PARALLEL_TYPE_LOGICAL_CLOCK;
|
|
/* wait intergrity consensus log */
|
|
while(mi->rli->get_consensus_apply_index() > consensus_log_manager.get_sync_index())
|
|
my_sleep(2000000);
|
|
if (start_slave_threads(true/*need_lock_slave=true*/,
|
|
false/*wait_for_start=false*/,
|
|
mi,
|
|
thread_mask))
|
|
{
|
|
/*
|
|
Creation of slave threads for subsequent channels are stopped
|
|
if a failure occurs in this iteration.
|
|
@todo:have an option if the user wants to continue
|
|
the replication for other channels.
|
|
*/
|
|
sql_print_error("Failed to create slave threads");
|
|
error = 1;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
channel_map.unlock();
|
|
DBUG_RETURN(error);
|
|
}
|
|
|
|
int check_exec_consensus_log_end_condition(Relay_log_info *rli,
|
|
bool is_xpaxos_replication) {
|
|
DBUG_ENTER("check_exec_consensus_log_end_condition");
|
|
if (is_xpaxos_replication && !opt_disable_wait_commitindex) {
|
|
while (consensus_ptr->checkCommitIndex(consensus_log_manager.get_real_apply_index() - 1, consensus_log_manager.get_current_term()) <
|
|
consensus_log_manager.get_real_apply_index())
|
|
{
|
|
if (consensus_ptr->isShutdown())
|
|
{
|
|
sql_print_information("Apply thread is terminated because of shutdown");
|
|
DBUG_RETURN(1);
|
|
}
|
|
|
|
// determine whether exit
|
|
uint64 stop_term = consensus_log_manager.get_stop_term();
|
|
long time_diff = (long)(time(0) - rli->last_master_timestamp);
|
|
if (stop_term == UINT64_MAX)
|
|
{
|
|
my_sleep(opt_consensus_check_commit_index_interval);
|
|
continue;
|
|
}
|
|
else if (consensus_log_manager.get_apply_term() >= stop_term ||
|
|
opt_consensus_leader_stop_apply ||
|
|
(opt_consensus_leader_stop_apply_time && (time_diff < (long)opt_consensus_leader_stop_apply_time)))
|
|
{
|
|
sql_print_information("Apply thread stop, opt_consensus_leader_stop_apply: %s, seconds_behind_master: %ld, opt_consensus_leader_stop_apply_time: %lu.",
|
|
opt_consensus_leader_stop_apply ? "true" : "false", time_diff, opt_consensus_leader_stop_apply_time);
|
|
opt_consensus_leader_stop_apply = FALSE;
|
|
mysql_mutex_lock(consensus_log_manager.get_apply_thread_lock());
|
|
mysql_cond_broadcast(consensus_log_manager.get_catchup_cond());
|
|
consensus_log_manager.set_apply_catchup(TRUE);
|
|
rli->sql_thread_kill_accepted = TRUE;
|
|
rli->force_apply_queue_before_stop = true;
|
|
mysql_mutex_unlock(consensus_log_manager.get_apply_thread_lock());
|
|
sql_print_information("Apply thread catchup commit index, consensus index: %lld, current term: %llu, stop term: %llu.",
|
|
rli->get_consensus_apply_index(), consensus_log_manager.get_current_term(), consensus_log_manager.get_stop_term());
|
|
DBUG_RETURN(1);
|
|
}
|
|
else if (consensus_ptr->getCommitIndex() > consensus_log_manager.get_real_apply_index() ||
|
|
(consensus_ptr->getCommitIndex() == 1 && consensus_log_manager.get_real_apply_index()==1))
|
|
{
|
|
// not reach commit index, continue to read log
|
|
break;
|
|
}
|
|
else
|
|
{
|
|
// reach commit index, continue to wait exit condition
|
|
my_sleep(opt_consensus_check_commit_index_interval);
|
|
continue;
|
|
}
|
|
}
|
|
}
|
|
|
|
DBUG_RETURN(0);
|
|
}
|
|
|
|
void update_consensus_apply_pos(Relay_log_info *rli,
|
|
Log_event *ev, bool is_xpaxos_replication) {
|
|
if (is_xpaxos_replication) {
|
|
// update apply index
|
|
/* for large trx, use the first one */
|
|
if (ev->get_type_code() == binary_log::CONSENSUS_LOG_EVENT) {
|
|
Consensus_log_event *r_ev = (Consensus_log_event *)ev;
|
|
uint64 consensus_index = r_ev->get_index();
|
|
uint64 consensus_term = r_ev->get_term();
|
|
uint64 consensus_index_end_pos =
|
|
r_ev->future_event_relay_log_pos + r_ev->get_length();
|
|
|
|
if (r_ev->get_flag() & Consensus_log_event_flag::FLAG_LARGE_TRX) {
|
|
if (!consensus_log_manager.get_in_large_trx()) {
|
|
consensus_log_manager.set_apply_index(consensus_index);
|
|
consensus_log_manager.set_apply_term(consensus_term);
|
|
consensus_log_manager.set_apply_ev_sequence(0);
|
|
consensus_log_manager.set_in_large_trx(true);
|
|
}
|
|
} else if (r_ev->get_flag() &
|
|
Consensus_log_event_flag::FLAG_LARGE_TRX_END) {
|
|
consensus_log_manager.set_in_large_trx(false);
|
|
} else {
|
|
/* normal case */
|
|
consensus_log_manager.set_apply_index(consensus_index);
|
|
consensus_log_manager.set_apply_term(consensus_term);
|
|
consensus_log_manager.set_apply_ev_sequence(0);
|
|
}
|
|
consensus_log_manager.set_real_apply_index(consensus_index);
|
|
consensus_log_manager.set_apply_index_end_pos(consensus_index_end_pos);
|
|
}
|
|
|
|
ev->consensus_index = consensus_log_manager.get_apply_index();
|
|
ev->consensus_real_index = consensus_log_manager.get_real_apply_index();
|
|
ev->consensus_index_end_pos = consensus_log_manager.get_apply_index_end_pos();
|
|
ev->consensus_sequence = consensus_log_manager.get_apply_ev_sequence();
|
|
consensus_log_manager.incr_apply_ev_sequence();
|
|
|
|
/**
|
|
* The Group of CONSENSUS_LOG which is applied may be truncated.
|
|
* It will add a CONSENSUS_EMPTY event instead of the truncated binlog.
|
|
*
|
|
* The CONSENSUS_EMPTY event can be considered as end_group().
|
|
*/
|
|
if (ev->get_type_code() == binary_log::CONSENSUS_EMPTY_EVENT) {
|
|
ev->consensus_index_end_pos = ev->future_event_relay_log_pos;
|
|
}
|
|
|
|
}
|
|
}
|
|
|
|
int calculate_consensus_apply_start_pos(Relay_log_info *rli, bool is_xpaxos_channel) {
|
|
uint64 recover_status = 0;
|
|
ulonglong start_apply_index = 0;
|
|
uint64 recover_term = 0;
|
|
std::string recover_log_content;
|
|
bool recover_outer = FALSE;
|
|
uint recover_flag = 0;
|
|
uint64 recover_checksum = 0;
|
|
uint64 rli_appliedindex = 0;
|
|
uint64 log_pos = 0;;
|
|
char log_name[FN_REFLEN];
|
|
|
|
if (is_xpaxos_channel) {
|
|
recover_status = consensus_log_manager.get_consensus_info()->get_recover_status();
|
|
DBUG_ASSERT(consensus_log_manager.get_status() == RELAY_LOG_WORKING);
|
|
start_apply_index = consensus_log_manager.get_consensus_info()->get_start_apply_index();
|
|
sql_print_warning("Apply thread start, recover status = %lld, start apply index = %lld, rli consensus index = %lld.", recover_status, start_apply_index, rli->get_consensus_apply_index());
|
|
// follower recover, do nothing, leader recover should set apply start point
|
|
if (recover_status == BINLOG_WORKING)
|
|
{
|
|
// leader degrade
|
|
if (start_apply_index == 0)
|
|
{
|
|
// crash when the role is leader
|
|
// care whether truncate log
|
|
uint64 recover_index = consensus_log_manager.get_recovery_manager()->get_last_leader_term_index();
|
|
uint64 next_index = consensus_log_manager.get_next_trx_index(recover_index);
|
|
if (consensus_log_manager.get_log_position(next_index, FALSE, log_name, &log_pos))
|
|
{
|
|
sql_print_error("Apply thread cannot find start index %llu.", next_index);
|
|
abort();
|
|
}
|
|
sql_print_warning("Apply thread group relay log file name = '%s', pos = %lld, rli apply index = %lld. ", log_name, log_pos, recover_index);
|
|
rli->set_group_relay_log_name(log_name);
|
|
rli->set_group_relay_log_pos(log_pos);
|
|
|
|
if (consensus_log_manager.get_log_directly(recover_index, &recover_term, recover_log_content, &recover_outer, &recover_flag, &recover_checksum, false))
|
|
{
|
|
sql_print_error("Apply thread cannot find term by index %llu.", recover_index);
|
|
abort();
|
|
}
|
|
rli->set_consensus_apply_index(recover_index);
|
|
consensus_log_manager.set_apply_term(recover_term);
|
|
rli->flush_info(TRUE);
|
|
}
|
|
else
|
|
{
|
|
// role already degraded to follower ,but log status is still binlog working
|
|
uint64 start_index = max(start_apply_index, rli->get_consensus_apply_index());
|
|
uint64 next_index = consensus_log_manager.get_next_trx_index(start_index);
|
|
if (consensus_log_manager.get_log_position(next_index, FALSE, log_name, &log_pos))
|
|
{
|
|
sql_print_error("Apply thread cannot find start index %llu.", next_index);
|
|
abort();
|
|
}
|
|
sql_print_warning("Apply thread group relay log file name = '%s', pos = %lld, rli apply index = %lld. ", log_name, log_pos, start_index);
|
|
rli->set_group_relay_log_name(log_name);
|
|
rli->set_group_relay_log_pos(log_pos);
|
|
|
|
if (consensus_log_manager.get_log_directly(start_index, &recover_term, recover_log_content, &recover_outer, &recover_flag, &recover_checksum, false))
|
|
{
|
|
sql_print_error("Apply thread cannot find term by index %llu.", start_index);
|
|
abort();
|
|
}
|
|
rli->set_consensus_apply_index(start_index);
|
|
consensus_log_manager.set_apply_term(recover_term);
|
|
rli->flush_info(TRUE);
|
|
}
|
|
}
|
|
else
|
|
{
|
|
if (start_apply_index == 0)
|
|
{
|
|
// node first initial
|
|
if (consensus_log_manager.get_recovery_manager()->get_last_leader_term_index() == 0 && rli->get_consensus_apply_index() == 0)
|
|
{
|
|
// start replay from first position
|
|
uint64 start_index = 0, start_term = 1;
|
|
uint64 next_index = 1;
|
|
if (consensus_log_manager.get_log_position(next_index, FALSE, log_name, &log_pos))
|
|
{
|
|
sql_print_error("Apply thread cannot find start index %llu.", next_index);
|
|
abort();
|
|
}
|
|
sql_print_warning("Apply thread group relay log file name = '%s', pos = %lld, rli apply index = %lld. ", log_name, log_pos, start_index);
|
|
rli->set_group_relay_log_name(log_name);
|
|
rli->set_group_relay_log_pos(log_pos);
|
|
rli->set_consensus_apply_index(start_index);
|
|
consensus_log_manager.set_apply_term(start_term);
|
|
rli->flush_info(TRUE);
|
|
}
|
|
else
|
|
{
|
|
// because backup restore will reorganize the log , so should use index to set the replay pos
|
|
uint64 start_index = rli->get_consensus_apply_index();
|
|
uint64 next_index = consensus_log_manager.get_next_trx_index(start_index);
|
|
if (consensus_log_manager.get_log_position(next_index, FALSE, log_name, &log_pos))
|
|
{
|
|
sql_print_error("Apply thread cannot find start index %llu.", next_index);
|
|
abort();
|
|
}
|
|
sql_print_warning("Apply thread group relay log file name = '%s', pos = %lld, rli apply index = %lld. ", log_name, log_pos, start_index);
|
|
|
|
rli->set_group_relay_log_name(log_name);
|
|
rli->set_group_relay_log_pos(log_pos);
|
|
rli->flush_info(TRUE);
|
|
}
|
|
}
|
|
else
|
|
{
|
|
// these code will not reached anymore
|
|
// start_apply_index != 0 && recover_status == RELAYLOG_WORKING is impossible
|
|
uint64 start_index = max(start_apply_index, rli->get_consensus_apply_index());
|
|
uint64 next_index = consensus_log_manager.get_next_trx_index(start_index);
|
|
if (consensus_log_manager.get_log_position(next_index, FALSE, log_name, &log_pos))
|
|
{
|
|
sql_print_error("Apply thread cannot find start index %llu.", next_index);
|
|
abort();
|
|
}
|
|
sql_print_warning("Apply thread group relay log file name = '%s', pos = %lld, rli apply index = %lld. ", log_name, log_pos, start_index);
|
|
rli->set_group_relay_log_name(log_name);
|
|
rli->set_group_relay_log_pos(log_pos);
|
|
|
|
if (consensus_log_manager.get_log_directly(start_index, &recover_term, recover_log_content, &recover_outer, &recover_flag, &recover_checksum, false))
|
|
{
|
|
sql_print_error("Apply thread cannot find start index %llu.", start_index);
|
|
abort();
|
|
}
|
|
rli->set_consensus_apply_index(start_index);
|
|
consensus_log_manager.set_apply_term(recover_term);
|
|
rli->flush_info(TRUE);
|
|
}
|
|
}
|
|
if (opt_recover_snapshot)
|
|
{
|
|
sql_print_information("Get the snapshot consensus index %llu ", rli->get_consensus_apply_index());
|
|
return -1; // let caller return early
|
|
}
|
|
// deal with appliedindex
|
|
rli_appliedindex = rli->get_consensus_apply_index();
|
|
rli_appliedindex = opt_appliedindex_force_delay >= rli_appliedindex? 0: rli_appliedindex - opt_appliedindex_force_delay;
|
|
consensus_ptr->updateAppliedIndex(rli_appliedindex);
|
|
replica_read_manager.update_lsn(rli_appliedindex);
|
|
|
|
/*
|
|
* Wait until new leader empty log is committed,
|
|
* which means truncate log is finished and we have correct hotlog.
|
|
*/
|
|
/*
|
|
while (consensus_ptr->getCommitIndex() <= consensus_log_manager.get_consensus_info()->get_start_apply_index())
|
|
my_sleep(5000);
|
|
*/
|
|
// set consensus info to relay-working
|
|
consensus_log_manager.get_consensus_info()->set_start_apply_index(0);
|
|
consensus_log_manager.get_consensus_info()->set_last_leader_term(0);
|
|
consensus_log_manager.get_consensus_info()->set_recover_status(RELAY_LOG_WORKING);
|
|
if (consensus_log_manager.get_consensus_info()->flush_info(true, false))
|
|
{
|
|
rli->report(ERROR_LEVEL, ER_SLAVE_FATAL_ERROR,
|
|
"Error flush consensus info set recover status");
|
|
return -1;
|
|
}
|
|
}
|
|
|
|
return 0;
|
|
}
|