1672 lines
57 KiB
C++
1672 lines
57 KiB
C++
/* Copyright (c) 2018, 2021, Alibaba and/or its affiliates. All rights reserved.
|
|
This program is free software; you can redistribute it and/or modify
|
|
it under the terms of the GNU General Public License, version 2.0,
|
|
as published by the Free Software Foundation.
|
|
This program is also distributed with certain software (including
|
|
but not limited to OpenSSL) that is licensed under separate terms,
|
|
as designated in a particular file or component or in included license
|
|
documentation. The authors of MySQL/PolarDB-X Engine hereby grant you an
|
|
additional permission to link the program and your derivative works with the
|
|
separately licensed software that they have included with
|
|
MySQL/PolarDB-X Engine.
|
|
This program is distributed in the hope that it will be useful,
|
|
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
GNU General Public License, version 2.0, for more details.
|
|
You should have received a copy of the GNU General Public License
|
|
along with this program; if not, write to the Free Software
|
|
Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */
|
|
|
|
|
|
#include "consensus_log_manager.h"
|
|
#include "log.h"
|
|
#include "log_event.h"
|
|
#include "sql_parse.h"
|
|
#include "mysql/thread_pool_priv.h"
|
|
#include "rpl_info_factory.h"
|
|
#include "rpl_slave.h"
|
|
#include "rpl_msr.h"
|
|
#include "appliedindex_checker.h"
|
|
#include "sql/rpl_mi.h"
|
|
#include "sql/consensus_admin.h"
|
|
#include "consensus_recovery_manager.h"
|
|
#include "events.h"
|
|
|
|
using std::max;
|
|
|
|
#ifdef NORMANDY_CLUSTER
|
|
#include "bl_consensus_log.h"
|
|
#endif
|
|
|
|
#define LOG_PREFIX "ML"
|
|
|
|
ConsensusLogManager consensus_log_manager;
|
|
|
|
uint64 show_fifo_cache_size(THD *, SHOW_VAR *var, char *buff) {
|
|
var->type= SHOW_LONGLONG;
|
|
var->value= buff;
|
|
long *value= reinterpret_cast<long*>(buff);
|
|
uint64 size = consensus_log_manager.get_fifo_cache_manager()->get_fifo_cache_size();
|
|
*value= static_cast<long long>(size);
|
|
return 0;
|
|
}
|
|
|
|
uint64 show_first_index_in_fifo_cache(THD *, SHOW_VAR *var, char *buff) {
|
|
var->type= SHOW_LONGLONG;
|
|
var->value= buff;
|
|
long *value= reinterpret_cast<long*>(buff);
|
|
uint64 size = consensus_log_manager.get_fifo_cache_manager()->get_first_index_of_fifo_cache();
|
|
*value= static_cast<long long>(size);
|
|
return 0;
|
|
}
|
|
|
|
uint64 show_log_count_in_fifo_cache(THD *, SHOW_VAR *var, char *buff) {
|
|
var->type= SHOW_LONGLONG;
|
|
var->value= buff;
|
|
long *value= reinterpret_cast<long*>(buff);
|
|
uint64 size = consensus_log_manager.get_fifo_cache_manager()->get_fifo_cache_log_count();
|
|
*value= static_cast<long long>(size);
|
|
return 0;
|
|
}
|
|
|
|
int show_appliedindex_checker_queue(THD *, SHOW_VAR *var, char *) {
|
|
var->type= SHOW_CHAR;
|
|
var->value= const_cast<char *>(appliedindex_checker.get_group_queue_status());
|
|
return 0;
|
|
}
|
|
|
|
int cluster_force_purge_gtid()
|
|
{
|
|
enum_return_status ret= RETURN_STATUS_OK;
|
|
THD *thd= NULL;
|
|
Gtid_set *executed_gtids= NULL;
|
|
global_sid_lock->wrlock();
|
|
Gtid_set purged_set(global_sid_map, opt_cluster_purged_gtid,
|
|
&ret, global_sid_lock);
|
|
if (ret != RETURN_STATUS_OK)
|
|
{
|
|
sql_print_error("Failed to parse the purged gtid.");
|
|
goto err;
|
|
}
|
|
if (!(thd= new THD))
|
|
{
|
|
sql_print_error("Out of memory in storing purged gtid.");
|
|
goto err;
|
|
}
|
|
thd->thread_stack= (char*) &thd;
|
|
thd->store_globals();
|
|
if (gtid_state->clear(thd))
|
|
{
|
|
sql_print_error("Failed to clear the gtid_executed table.");
|
|
goto err;
|
|
}
|
|
if (gtid_state->save(&purged_set)) {
|
|
sql_print_error("Failed to save the purged gtid to table.");
|
|
goto err;
|
|
}
|
|
thd->release_resources();
|
|
delete thd;
|
|
// set the previous_logged_gtid of binlog
|
|
executed_gtids= const_cast<Gtid_set *>(gtid_state->get_executed_gtids());
|
|
executed_gtids->add_gtid_set(&purged_set);
|
|
global_sid_lock->unlock();
|
|
return 0;
|
|
err:
|
|
global_sid_lock->unlock();
|
|
if (thd != NULL) {
|
|
thd->release_resources();
|
|
delete thd;
|
|
}
|
|
|
|
return -1;
|
|
}
|
|
|
|
#ifdef NORMANDY_CLUSTER
|
|
void stateChangeCb(alisql::Paxos::StateType state, uint64_t term, uint64_t commitIndex) {
|
|
ConsensusStateChange state_change = {state, term, commitIndex};
|
|
consensus_log_manager.add_state_change_request(state_change);
|
|
}
|
|
|
|
void wait_commit_index_in_recovery() {
|
|
auto mgr = consensus_log_manager.get_recovery_manager();
|
|
|
|
if (opt_print_gtid_info_during_recovery == DETAIL_INFO) {
|
|
sql_print_error("wait_commit_index_in_recovery(trx): %llu, %llu",
|
|
consensus_ptr->getCommitIndex(),
|
|
mgr->get_max_consensus_index_from_recover_trx_hash());
|
|
}
|
|
|
|
while (consensus_ptr->getCommitIndex() <
|
|
mgr->get_max_consensus_index_from_recover_trx_hash()) {
|
|
my_sleep(500);
|
|
}
|
|
|
|
if (opt_print_gtid_info_during_recovery == DETAIL_INFO) {
|
|
sql_print_error("wait_commit_index_in_recovery(trx): %llu, %llu",
|
|
consensus_ptr->getCommitIndex(),
|
|
mgr->get_max_consensus_index_from_recover_trx_hash());
|
|
}
|
|
|
|
/*
|
|
If the node was leader when shutdown, it should wait for binlog truncation
|
|
completed to get the corrent binlog end position before applying the lost
|
|
binlog events on recovery.
|
|
*/
|
|
auto recover_status =
|
|
consensus_log_manager.get_consensus_info()->get_recover_status();
|
|
auto start_apply_index =
|
|
consensus_log_manager.get_consensus_info()->get_start_apply_index();
|
|
|
|
if (recover_status == BINLOG_WORKING && start_apply_index == 0) {
|
|
if (opt_print_gtid_info_during_recovery == DETAIL_INFO) {
|
|
sql_print_error("wait_commit_index_in_recovery(index): %llu, %llu",
|
|
consensus_ptr->getCommitIndex(),
|
|
mgr->get_last_leader_term_index());
|
|
}
|
|
|
|
while (consensus_ptr->getCommitIndex() <
|
|
mgr->get_last_leader_term_index()) {
|
|
my_sleep(500);
|
|
}
|
|
|
|
if (opt_print_gtid_info_during_recovery == DETAIL_INFO) {
|
|
sql_print_error("wait_commit_index_in_recovery(index): %llu, %llu",
|
|
consensus_ptr->getCommitIndex(),
|
|
mgr->get_last_leader_term_index());
|
|
}
|
|
}
|
|
}
|
|
#endif
|
|
|
|
/* init gtid_state after consenus module setup */
|
|
int gtid_init_after_consensus_setup() {
|
|
/*
|
|
Initialize GLOBAL.GTID_EXECUTED and GLOBAL.GTID_PURGED from
|
|
gtid_executed table and binlog files during server startup.
|
|
*/
|
|
Gtid_set *executed_gtids=
|
|
const_cast<Gtid_set *>(gtid_state->get_executed_gtids());
|
|
Gtid_set *lost_gtids=
|
|
const_cast<Gtid_set *>(gtid_state->get_lost_gtids());
|
|
Gtid_set *gtids_only_in_table=
|
|
const_cast<Gtid_set *>(gtid_state->get_gtids_only_in_table());
|
|
Gtid_set *skip_counter_gtids =
|
|
Binlog_recovery::instance()->get_skip_counter_gtids();
|
|
|
|
Gtid_set purged_gtids_from_binlog(global_sid_map, global_sid_lock);
|
|
Gtid_set gtids_in_binlog(global_sid_map, global_sid_lock);
|
|
Gtid_set gtids_in_binlog_not_in_table(global_sid_map, global_sid_lock);
|
|
|
|
/* consensus recover from relay log */
|
|
MYSQL_BIN_LOG *log = &consensus_log_manager.get_relay_log_info()->relay_log;
|
|
uint64_t start_apply_index = consensus_log_manager.get_consensus_info()->get_start_apply_index();
|
|
uint64_t last_term = consensus_log_manager.get_consensus_info()->get_last_leader_term();
|
|
DBUG_ASSERT(log != NULL);
|
|
if (log->init_gtid_sets(>ids_in_binlog,
|
|
&purged_gtids_from_binlog,
|
|
opt_master_verify_checksum,
|
|
true/*true=need lock*/,
|
|
NULL/*trx_parser*/,
|
|
NULL/*gtid_partial_trx*/,
|
|
true/*is_server_starting*/,
|
|
last_term))
|
|
// unireg_abort(MYSQLD_ABORT_EXIT);
|
|
return -1;
|
|
|
|
global_sid_lock->wrlock();
|
|
uint64 recover_status = consensus_log_manager.get_consensus_info()->get_recover_status();
|
|
|
|
if (opt_print_gtid_info_during_recovery) {
|
|
log_gtid_set("[GTID INFO] After consensus setup. executed_gtids : ",
|
|
executed_gtids);
|
|
log_gtid_set(
|
|
"[GTID INFO] After consensus setup. purged_gtids_from_binlog : ",
|
|
&purged_gtids_from_binlog);
|
|
log_gtid_set("[GTID INFO] After consensus setup. gtids_in_binlog : ",
|
|
>ids_in_binlog);
|
|
}
|
|
if (!gtids_in_binlog.is_empty() &&
|
|
!gtids_in_binlog.is_subset(executed_gtids))
|
|
{
|
|
gtids_in_binlog_not_in_table.add_gtid_set(>ids_in_binlog);
|
|
if (!executed_gtids->is_empty()) {
|
|
gtids_in_binlog_not_in_table.remove_gtid_set(executed_gtids);
|
|
} else if (recover_status == RELAY_LOG_WORKING) {
|
|
sql_print_error("gtids is cleared of relay_log working");
|
|
}
|
|
/*
|
|
Save unsaved GTIDs into gtid_executed table, in the following
|
|
four cases:
|
|
1. the upgrade case.
|
|
2. the case that a slave is provisioned from a backup of
|
|
the master and the slave is cleaned by RESET MASTER
|
|
and RESET SLAVE before this.
|
|
3. the case that no binlog rotation happened from the
|
|
last RESET MASTER on the server before it crashes.
|
|
4. The set of GTIDs of the last binlog is not saved into the
|
|
gtid_executed table if server crashes, so we save it into
|
|
gtid_executed table and executed_gtids during recovery
|
|
from the crash.
|
|
*/
|
|
// only xpaxos leader need add last binlog
|
|
if (recover_status == BINLOG_WORKING && start_apply_index == 0)
|
|
{
|
|
#ifndef DBUG_OFF
|
|
/*
|
|
all the event in binlog should be applyed in
|
|
recovery binlog apply.
|
|
*/
|
|
if (opt_recovery_apply_binlog) {
|
|
DBUG_ASSERT(gtids_in_binlog_not_in_table.is_empty() ||
|
|
gtids_in_binlog_not_in_table.is_subset(skip_counter_gtids));
|
|
}
|
|
#endif
|
|
|
|
if (gtid_state->save(>ids_in_binlog_not_in_table) == -1) {
|
|
global_sid_lock->unlock();
|
|
// unireg_abort(MYSQLD_ABORT_EXIT);
|
|
return -1;
|
|
}
|
|
executed_gtids->add_gtid_set(>ids_in_binlog_not_in_table);
|
|
}
|
|
}
|
|
|
|
if (opt_print_gtid_info_during_recovery) {
|
|
log_gtid_set(
|
|
"[GTID INFO] After consensus setup. gtids_in_binlog_not_in_table : ",
|
|
>ids_in_binlog_not_in_table);
|
|
}
|
|
/* gtids_only_in_table= executed_gtids - gtids_in_binlog */
|
|
if (gtids_only_in_table->add_gtid_set(executed_gtids) !=
|
|
RETURN_STATUS_OK)
|
|
{
|
|
global_sid_lock->unlock();
|
|
// unireg_abort(MYSQLD_ABORT_EXIT);
|
|
return -1;
|
|
}
|
|
gtids_only_in_table->remove_gtid_set(>ids_in_binlog);
|
|
/*
|
|
lost_gtids = executed_gtids -
|
|
(gtids_in_binlog - purged_gtids_from_binlog)
|
|
= gtids_only_in_table + purged_gtids_from_binlog;
|
|
*/
|
|
DBUG_ASSERT(lost_gtids->is_empty());
|
|
if (lost_gtids->add_gtid_set(gtids_only_in_table) != RETURN_STATUS_OK ||
|
|
lost_gtids->add_gtid_set(&purged_gtids_from_binlog) !=
|
|
RETURN_STATUS_OK)
|
|
{
|
|
global_sid_lock->unlock();
|
|
// unireg_abort(MYSQLD_ABORT_EXIT);
|
|
return -1;
|
|
}
|
|
|
|
global_sid_lock->unlock();
|
|
|
|
return 0;
|
|
}
|
|
|
|
ConsensusLogManager::ConsensusLogManager() :
|
|
inited(FALSE), current_index(0),
|
|
cache_log(new MYSQL_BIN_LOG::Binlog_ofile()), binlog(NULL), rli_info(NULL)
|
|
{
|
|
}
|
|
|
|
ConsensusLogManager::~ConsensusLogManager()
|
|
{
|
|
}
|
|
|
|
int ConsensusLogManager::init(uint64 max_fifo_cache_size_arg, uint64 max_prefetch_cache_size_arg, uint64 fake_current_index_arg)
|
|
{
|
|
if (NULL == cache_log)
|
|
return 1;
|
|
cache_log->open();
|
|
if (open_cached_file(cache_log->get_io_cache(), mysql_tmpdir,
|
|
LOG_PREFIX, binlog_stmt_cache_size, MYF(MY_WME)))
|
|
return 1;
|
|
|
|
cache_log->get_io_cache()->end_of_file = CACHE_BUFFER_SIZE;
|
|
|
|
// initialize the empty log content
|
|
Consensus_empty_log_event eev;
|
|
eev.common_footer->checksum_alg = static_cast<enum_binlog_checksum_alg>(binlog_checksum_options);
|
|
eev.write(cache_log.get());
|
|
my_off_t buf_size = my_b_tell(cache_log->get_io_cache());
|
|
uchar *buffer = (uchar*)my_malloc(key_memory_thd_main_mem_root, (size_t)buf_size, MYF(MY_WME));
|
|
reinit_io_cache(cache_log->get_io_cache(), READ_CACHE, 0, 0, 0);
|
|
my_b_read(cache_log->get_io_cache(), buffer, (size_t)buf_size);
|
|
reinit_io_cache(cache_log->get_io_cache(), WRITE_CACHE, 0, 0, 1);
|
|
empty_log_event_content = std::string((char*)buffer, buf_size);
|
|
my_free(buffer);
|
|
|
|
|
|
status = BINLOG_WORKING;
|
|
current_index = fake_current_index_arg;
|
|
cache_index = 0;
|
|
sync_index = 0;
|
|
apply_index = 1;
|
|
real_apply_index = 1;
|
|
apply_index_end_pos = 0;
|
|
apply_term = 1;
|
|
apply_catchup = 0;
|
|
current_term = 1;
|
|
stop_term = UINT64_MAX;
|
|
already_set_start_index = FALSE;
|
|
already_set_start_term = FALSE;
|
|
apply_ev_seq = 1;
|
|
in_large_trx = false;
|
|
enable_rotate = false;
|
|
atomic_logging_flag = FLAG_GU1;
|
|
ev_tt_ = 0;
|
|
|
|
key_LOCK_consensuslog_status = key_rwlock_ConsensusLog_status_lock;
|
|
key_LOCK_consensuslog_sequence_stage1 = key_CONSENSUSLOG_LOCK_ConsensusLog_sequence_stage1_lock;
|
|
key_LOCK_consensuslog_sequence_stage2 = key_CONSENSUSLOG_LOCK_ConsensusLog_sequence_stage2_lock;
|
|
key_LOCK_consensuslog_term = key_CONSENSUSLOG_LOCK_ConsensusLog_term_lock;
|
|
key_LOCK_consensuslog_apply = key_CONSENSUSLOG_LOCK_ConsensusLog_apply_lock;
|
|
key_LOCK_consensuslog_apply_thread = key_CONSENSUSLOG_LOCK_ConsensusLog_apply_thread_lock;
|
|
key_LOCK_consensus_state_change = key_CONSENSUSLOG_LOCK_Consensus_stage_change;
|
|
key_COND_consensuslog_catchup = key_COND_ConsensusLog_catchup;
|
|
key_COND_consensus_state_change = key_COND_Consensus_state_change;
|
|
mysql_rwlock_init(key_LOCK_consensuslog_status, &LOCK_consensuslog_status);
|
|
mysql_mutex_init(key_LOCK_consensuslog_sequence_stage1, &LOCK_consensuslog_sequence_stage1, MY_MUTEX_INIT_FAST);
|
|
mysql_mutex_init(key_LOCK_consensuslog_sequence_stage2, &LOCK_consensuslog_sequence_stage2, MY_MUTEX_INIT_FAST);
|
|
mysql_mutex_init(key_LOCK_consensuslog_term, &LOCK_consensuslog_term, MY_MUTEX_INIT_FAST);
|
|
mysql_mutex_init(key_LOCK_consensuslog_apply, &LOCK_consensuslog_apply, MY_MUTEX_INIT_FAST);
|
|
mysql_mutex_init(key_LOCK_consensuslog_apply_thread, &LOCK_consensuslog_apply_thread, MY_MUTEX_INIT_FAST);
|
|
mysql_mutex_init(key_LOCK_consensus_state_change, &LOCK_consensus_state_change, MY_MUTEX_INIT_FAST);
|
|
mysql_mutex_init(key_CONSENSUSLOG_LOCK_commit_pos, &LOCK_consensus_commit_pos, MY_MUTEX_INIT_FAST);
|
|
mysql_cond_init(key_COND_consensuslog_catchup, &COND_consensuslog_catchup);
|
|
mysql_cond_init(key_COND_consensus_state_change, &COND_consensus_state_change);
|
|
recovery_manager = new ConsensusRecoveryManager();
|
|
recovery_manager->init();
|
|
|
|
fifo_cache_manager = new ConsensusFifoCacheManager();
|
|
fifo_cache_manager->init(max_fifo_cache_size_arg);
|
|
|
|
prefetch_manager = new ConsensusPreFetchManager();
|
|
prefetch_manager->init(max_prefetch_cache_size_arg);
|
|
|
|
log_file_index = new ConsensusLogIndex();
|
|
log_file_index->init();
|
|
|
|
// #ifdef HAVE_REPLICATION
|
|
Rpl_info_factory::init_consensus_repo_metadata();
|
|
consensus_info = Rpl_info_factory::create_consensus_info();
|
|
if (!consensus_info)
|
|
return 1;
|
|
//#endif
|
|
|
|
consensus_state_change_is_running = TRUE;
|
|
// #ifdef HAVE_REPLICATION
|
|
if (mysql_thread_create(key_thread_consensus_stage_change,
|
|
&consensus_state_change_thread_handle, NULL,
|
|
run_consensus_stage_change, (void *) &consensus_state_change_is_running))
|
|
{
|
|
sql_print_error("Fail to create thread run_consensus_stage_change.");
|
|
abort();
|
|
}
|
|
// #endif
|
|
inited = TRUE;
|
|
return 0;
|
|
}
|
|
|
|
int ConsensusLogManager::init_consensus_info() {
|
|
|
|
// init sys info
|
|
Consensus_info *consensus_info = get_consensus_info();
|
|
if (!opt_consensus_force_recovery) {
|
|
if (consensus_info->consensus_init_info()) {
|
|
sql_print_error("Fail to init consensus_info.");
|
|
return -1;
|
|
}
|
|
if (consensus_info->get_cluster_info() == "" && consensus_info->get_cluster_learner_info() == "") {
|
|
consensus_info->set_cluster_id(opt_cluster_id);
|
|
// reuse opt_cluster, if normal stands for cluster info, else stands for learner info
|
|
if (!opt_cluster_info) {
|
|
sql_print_error(
|
|
"PolarDB-X Engine cluster_info must be set when the server is running "
|
|
"with --initialize(-insecure) ");
|
|
return -1;
|
|
}
|
|
if (!opt_cluster_learner_node) {
|
|
consensus_info->set_cluster_info(std::string(opt_cluster_info));
|
|
consensus_info->set_cluster_learner_info("");
|
|
} else {
|
|
consensus_info->set_cluster_learner_info(std::string(opt_cluster_info));
|
|
consensus_info->set_cluster_info("");
|
|
}
|
|
}
|
|
if (!opt_initialize && !opt_cluster_force_change_meta) {
|
|
if (consensus_info->get_last_leader_term() != 0) {
|
|
if (!opt_cluster_log_type_instance &&
|
|
mysql_bin_log.init_last_index_of_term(consensus_info->get_last_leader_term())) {
|
|
sql_print_error("init_last_index_of_term for recovery failed");
|
|
return -1;
|
|
}
|
|
}
|
|
}
|
|
set_consensus_system_status(RELAY_LOG_WORKING);
|
|
|
|
// set right current term for apply thread
|
|
set_current_term(get_consensus_info()->get_current_term());
|
|
}
|
|
|
|
return 0;
|
|
}
|
|
|
|
int ConsensusLogManager::dump_cluster_info_to_file(std::string meta_file_name, std::string output_str)
|
|
{
|
|
File meta_file = -1;
|
|
if (!my_access(meta_file_name.c_str(), F_OK) &&
|
|
mysql_file_delete(0, meta_file_name.c_str(), MYF(0)))
|
|
{
|
|
sql_print_error("Dump meta file failed, access file or delete file error");
|
|
return -1;
|
|
}
|
|
if ((meta_file= mysql_file_open(0,
|
|
meta_file_name.c_str(),
|
|
O_RDWR | O_CREAT | O_BINARY,
|
|
MYF(MY_WME))) < 0 ||
|
|
mysql_file_sync(meta_file, MYF(MY_WME)) )
|
|
{
|
|
sql_print_error("Dump meta file failed, create file error");
|
|
return -1;
|
|
}
|
|
|
|
if (my_write(meta_file, (const uchar*)output_str.c_str(), output_str.length(), MYF(MY_WME)) != output_str.length() ||
|
|
mysql_file_sync(meta_file, MYF(MY_WME)))
|
|
{
|
|
sql_print_error("Dump meta file failed, write meta error");
|
|
return -1;
|
|
}
|
|
|
|
if (meta_file >= 0)
|
|
mysql_file_close(meta_file, MYF(0));
|
|
|
|
return 0;
|
|
}
|
|
|
|
int ConsensusLogManager::init_service() {
|
|
if (!opt_initialize) {
|
|
Consensus_info *consensus_info = get_consensus_info();
|
|
if (opt_cluster_dump_meta) {
|
|
std::string meta_file_name = "consensus.meta";
|
|
std::ostringstream oss_apply_index;
|
|
uint64 apply_index = get_relay_log_info()->get_consensus_apply_index();
|
|
oss_apply_index << apply_index;
|
|
std::string apply_index_str = oss_apply_index.str();
|
|
std::string cluster_info = get_consensus_info()->get_cluster_info();
|
|
std::string learner_info = get_consensus_info()->get_cluster_learner_info();
|
|
|
|
std::ostringstream oss_cluster_id;
|
|
uint64 cluster_id = get_consensus_info()->get_cluster_id();
|
|
oss_cluster_id << cluster_id;
|
|
std::string cluster_id_str = oss_cluster_id.str();
|
|
|
|
std::string output_str = "Consensus_apply_index: " + apply_index_str + "\n" +
|
|
"Conseneus_cluster_info: " + cluster_info + "\n" +
|
|
"Consensus_learner_info: " + learner_info + "\n" +
|
|
"Cluster_id: " + cluster_id_str;
|
|
if (dump_cluster_info_to_file(meta_file_name, output_str) < 0)
|
|
return -1;
|
|
sql_print_warning("Dump meta file successfully.");
|
|
return 1;
|
|
}
|
|
|
|
if (opt_cluster_force_change_meta)
|
|
{
|
|
consensus_info->set_cluster_id(opt_cluster_id);
|
|
if (opt_cluster_current_term)
|
|
consensus_info->set_current_term(opt_cluster_current_term);
|
|
|
|
if (opt_cluster_force_recover_index) {
|
|
// backup from leader can also recover like a follower
|
|
if (consensus_info->get_recover_status() == BINLOG_WORKING) {
|
|
consensus_info->set_recover_status(RELAY_LOG_WORKING);
|
|
consensus_info->set_last_leader_term(0);
|
|
consensus_info->set_start_apply_index(opt_cluster_force_recover_index);
|
|
}
|
|
}
|
|
// reuse opt_cluster, if normal stands for cluster info, else stands for learner info
|
|
if (!opt_cluster_info) {
|
|
sql_print_error(
|
|
"PolarDB-X Engine cluster_info must be set when the server is running "
|
|
"with --initialize(-insecure) ");
|
|
return -1;
|
|
}
|
|
if (!opt_cluster_learner_node) {
|
|
consensus_info->set_cluster_info(std::string(opt_cluster_info));
|
|
} else {
|
|
consensus_info->set_cluster_learner_info(std::string(opt_cluster_info));
|
|
consensus_info->set_cluster_info("");
|
|
}
|
|
// if change meta, flush sys info, force quit
|
|
consensus_info->flush_info(true, true);
|
|
sql_print_warning("Force change meta to system table successfully.");
|
|
return 1;
|
|
} else {
|
|
opt_cluster_id = get_consensus_info()->get_cluster_id();
|
|
}
|
|
|
|
if (opt_recover_snapshot)
|
|
{
|
|
if (opt_cluster_log_type_instance)
|
|
{
|
|
sql_print_error("can't get consensus index of snapshot with logger mode ");
|
|
return -1;
|
|
}
|
|
if(ha_commit_xids_by_recover_map(&consensus_log_manager))
|
|
return -1;
|
|
consensus_log_manager.get_recovery_manager()->clear_all_map();
|
|
if (start_consensus_apply_threads())
|
|
return -1;
|
|
|
|
std::string snapshot_file_name = mysql_real_data_home;
|
|
snapshot_file_name += "/mysql_binlog_snapshot";
|
|
std::ostringstream oss_apply_index;
|
|
uint64 apply_index = consensus_log_manager.get_relay_log_info()->get_consensus_apply_index();
|
|
oss_apply_index << apply_index;
|
|
std::string apply_index_str = oss_apply_index.str();
|
|
uint64 log_pos = 0;
|
|
char log_name[FN_REFLEN];
|
|
if (consensus_log_manager.get_log_position(apply_index, FALSE, log_name, &log_pos))
|
|
{
|
|
sql_print_error("Cann't find start index %llu in binlog file.", apply_index);
|
|
return -1;
|
|
}
|
|
std::string output_str = log_name;
|
|
const size_t last_slash_idx = output_str.find_last_of("\\/");
|
|
if (std::string::npos != last_slash_idx)
|
|
{
|
|
output_str.erase(0, last_slash_idx + 1);
|
|
}
|
|
output_str += "\t";
|
|
output_str += apply_index_str;
|
|
dump_cluster_info_to_file(snapshot_file_name, output_str);
|
|
sql_print_information("Get the snapshot binlog pos successfully.");
|
|
return 1;
|
|
}
|
|
|
|
// learner's cluster_info is empty or not contain @
|
|
bool is_learner = consensus_info->get_cluster_info() == "" ||
|
|
consensus_info->get_cluster_info().find('@') == std::string::npos;
|
|
// get ip-port vector config
|
|
std::string empty_str = "";
|
|
std::vector<std::string> cluster_str_config;
|
|
std::string cluster_info_str = is_learner ?
|
|
consensus_info->get_cluster_learner_info() :
|
|
consensus_info->get_cluster_info();
|
|
|
|
uint64 mock_start_index = max(get_log_file_index()->get_first_index(), opt_consensus_start_index);
|
|
consensus_log->init(mock_start_index, &consensus_log_manager);
|
|
alisql_server = std::make_shared<alisql::AliSQLServer>(0);
|
|
consensus_ptr = new alisql::Paxos(opt_consensus_election_timeout, consensus_log);
|
|
consensus_ptr->setStateChangeCb(stateChangeCb);
|
|
consensus_ptr->setMaxPacketSize(opt_consensus_max_packet_size);
|
|
consensus_ptr->setPipeliningTimeout(opt_consensus_pipelining_timeout);
|
|
consensus_ptr->setLargeBatchRatio(opt_consensus_large_batch_ratio);
|
|
consensus_ptr->setMaxDelayIndex(opt_consensus_max_delay_index);
|
|
consensus_ptr->setMinDelayIndex(opt_consensus_min_delay_index);
|
|
consensus_ptr->setSyncFollowerMetaInterval(opt_consensus_sync_follower_meta_interval);
|
|
consensus_ptr->setConsensusAsync(opt_weak_consensus_mode);
|
|
consensus_ptr->setReplicateWithCacheLog(opt_consensus_replicate_with_cache_log);
|
|
consensus_ptr->setCompactOldMode(opt_consensus_old_compact_mode);
|
|
consensus_ptr->setAlertLogLevel(alisql::Paxos::AlertLogLevel(opt_consensus_log_level + 3));
|
|
consensus_ptr->setForceSyncEpochDiff(opt_consensus_force_sync_epoch_diff);
|
|
consensus_ptr->setChecksumMode(opt_consensus_checksum);
|
|
consensus_ptr->setChecksumCb(checksum_crc32);
|
|
consensus_ptr->setConfigureChangeTimeout(opt_consensus_configure_change_timeout);
|
|
consensus_ptr->setMaxDelayIndex4NewMember(opt_consensus_new_follower_threshold);
|
|
consensus_ptr->setEnableDynamicEasyIndex(opt_consensus_dynamic_easyindex);
|
|
consensus_ptr->setEnableLearnerPipelining(opt_consensus_learner_pipelining);
|
|
consensus_ptr->setEnableLearnerHeartbeat(opt_consensus_learner_heartbeat);
|
|
consensus_ptr->setEnableAutoResetMatchIndex(opt_consensus_auto_reset_match_index);
|
|
consensus_ptr->setEnableAutoLeaderTransfer(opt_consensus_auto_leader_transfer);
|
|
consensus_ptr->setAutoLeaderTransferCheckSeconds(opt_consensus_auto_leader_transfer_check_seconds);
|
|
if (!opt_consensus_force_recovery) {
|
|
if (!is_learner) {
|
|
// startup as normal node
|
|
consensus_ptr->init(cluster_str_config, 0, NULL, opt_consensus_io_thread_cnt, opt_consensus_worker_thread_cnt, alisql_server, opt_consensus_easy_pool_size, opt_consensus_heartbeat_thread_cnt);
|
|
|
|
if (opt_cluster_log_type_instance)
|
|
consensus_ptr->setAsLogType(true);
|
|
} else {
|
|
// startup as learner node, config string arg pass empty
|
|
consensus_ptr->initAsLearner(empty_str, NULL, opt_consensus_io_thread_cnt, opt_consensus_worker_thread_cnt, alisql_server, opt_consensus_easy_pool_size, opt_consensus_heartbeat_thread_cnt);
|
|
}
|
|
consensus_ptr->initAutoPurgeLog(false, false, NULL); // disable autoPurge
|
|
|
|
if (opt_cluster_force_single_mode) // use nuclear weapon
|
|
consensus_ptr->forceSingleLeader();
|
|
|
|
consensus_ptr->waitCommitIndexUpdate(0);
|
|
|
|
if (opt_commit_pos_watcher) {
|
|
if(start_consensus_commit_pos_watcher())
|
|
return -1;
|
|
}
|
|
|
|
wait_commit_index_in_recovery();
|
|
|
|
if (!opt_cluster_log_type_instance) {
|
|
if (ha_commit_xids_by_recover_map(this)) {
|
|
return -1;
|
|
}
|
|
get_recovery_manager()->clear_all_map();
|
|
|
|
/*
|
|
use the last binlog/relay log to modify the gtids
|
|
read from gtid_executed table after truncate log
|
|
*/
|
|
//gtid_init_after_consensus_setup();
|
|
start_consensus_apply_threads();
|
|
} else {
|
|
DBUG_ASSERT(get_recovery_manager()->get_commit_index_map().empty());
|
|
}
|
|
} // end of opt_consensus_force_recovery
|
|
} else {
|
|
Consensus_info *consensus_info = get_consensus_info();
|
|
consensus_info->set_cluster_id(opt_cluster_id);
|
|
// reuse opt_cluster, if normal stands for cluster info, else stands for learner info
|
|
if (!opt_cluster_info) {
|
|
sql_print_error(
|
|
"PolarDB-X Engine cluster_info must be set when the server is running "
|
|
"with --initialize(-insecure) ");
|
|
return -1;
|
|
}
|
|
if (!opt_cluster_learner_node) {
|
|
consensus_info->set_cluster_info(std::string(opt_cluster_info));
|
|
consensus_info->set_cluster_learner_info("");
|
|
} else {
|
|
consensus_info->set_cluster_learner_info(std::string(opt_cluster_info));
|
|
consensus_info->set_cluster_info("");
|
|
}
|
|
}
|
|
|
|
return 0;
|
|
}
|
|
|
|
int ConsensusLogManager::cleanup()
|
|
{
|
|
if (inited)
|
|
{
|
|
/* set apply_catchup to TRUE to skip wait_replay_log_finished */
|
|
apply_catchup = TRUE;
|
|
mysql_mutex_lock(&LOCK_consensus_state_change);
|
|
consensus_state_change_is_running = FALSE;
|
|
mysql_mutex_unlock(&LOCK_consensus_state_change);
|
|
mysql_cond_broadcast(&COND_consensus_state_change);
|
|
mysql_cond_broadcast(&COND_server_started);
|
|
// #ifdef HAVE_REPLICATION
|
|
my_thread_join(&consensus_state_change_thread_handle, NULL);
|
|
// #endif
|
|
recovery_manager->cleanup();
|
|
fifo_cache_manager->cleanup();
|
|
prefetch_manager->cleanup();
|
|
log_file_index->cleanup();
|
|
|
|
close_cached_file(cache_log->get_io_cache());
|
|
|
|
consensus_info->end_info();
|
|
delete consensus_info;
|
|
delete fifo_cache_manager;
|
|
delete prefetch_manager;
|
|
delete log_file_index;
|
|
|
|
mysql_rwlock_destroy(&LOCK_consensuslog_status);
|
|
mysql_mutex_destroy(&LOCK_consensuslog_sequence_stage1);
|
|
mysql_mutex_destroy(&LOCK_consensuslog_sequence_stage2);
|
|
mysql_mutex_destroy(&LOCK_consensuslog_term);
|
|
mysql_mutex_destroy(&LOCK_consensuslog_apply);
|
|
mysql_mutex_destroy(&LOCK_consensuslog_apply_thread);
|
|
mysql_mutex_destroy(&LOCK_consensus_state_change);
|
|
mysql_mutex_destroy(&LOCK_consensus_commit_pos);
|
|
mysql_cond_destroy(&COND_consensuslog_catchup);
|
|
mysql_cond_destroy(&COND_consensus_state_change);
|
|
}
|
|
return 0;
|
|
}
|
|
|
|
std::string ConsensusLogManager::get_empty_log()
|
|
{
|
|
std::string ret_ev;
|
|
if (opt_consensuslog_revise)
|
|
{
|
|
// generate a new one with current time
|
|
size_t buf_size = empty_log_event_content.length();
|
|
uchar *buffer = (uchar*)my_malloc(key_memory_thd_main_mem_root, buf_size, MYF(MY_WME));
|
|
memcpy(buffer, empty_log_event_content.data(), buf_size);
|
|
int4store(buffer, my_micro_time() / 1000000);
|
|
if (binlog_checksum_options != binary_log::BINLOG_CHECKSUM_ALG_OFF)
|
|
{
|
|
ha_checksum crc= checksum_crc32(0L, NULL, 0);
|
|
crc= checksum_crc32(crc, buffer, buf_size - BINLOG_CHECKSUM_LEN);
|
|
int4store(buffer + buf_size - BINLOG_CHECKSUM_LEN, crc);
|
|
}
|
|
ret_ev = std::string((char*)buffer, buf_size);
|
|
my_free(buffer);
|
|
}
|
|
else
|
|
{
|
|
ret_ev = empty_log_event_content;
|
|
}
|
|
return ret_ev;
|
|
}
|
|
|
|
int ConsensusLogManager::set_start_apply_index_if_need(uint64 consensus_index)
|
|
{
|
|
if (opt_cluster_log_type_instance)
|
|
return 0;
|
|
mysql_rwlock_rdlock(&LOCK_consensuslog_status);
|
|
if (!already_set_start_index && status == BINLOG_WORKING)
|
|
{
|
|
consensus_info->set_start_apply_index(consensus_index);
|
|
// store executed gtid to table before xpaxos receiving relay log
|
|
if (gtid_state->save_gtids_of_last_binlog_into_table()) {
|
|
mysql_rwlock_unlock(&LOCK_consensuslog_status);
|
|
sql_print_error("Failed save gtids to table in saving last term index.");
|
|
return 1;
|
|
}
|
|
if (consensus_info->flush_info(true, true))
|
|
{
|
|
mysql_rwlock_unlock(&LOCK_consensuslog_status);
|
|
return 1;
|
|
}
|
|
already_set_start_index = TRUE;
|
|
}
|
|
mysql_rwlock_unlock(&LOCK_consensuslog_status);
|
|
return 0;
|
|
}
|
|
|
|
int ConsensusLogManager::set_start_apply_term_if_need(uint64 consensus_term)
|
|
{
|
|
if (opt_cluster_log_type_instance)
|
|
return 0;
|
|
mysql_rwlock_rdlock(&LOCK_consensuslog_status);
|
|
if (!already_set_start_term && status == BINLOG_WORKING)
|
|
{
|
|
consensus_info->set_last_leader_term(consensus_term);
|
|
if (consensus_info->flush_info(true, true))
|
|
{
|
|
mysql_rwlock_unlock(&LOCK_consensuslog_status);
|
|
return 1;
|
|
}
|
|
already_set_start_term = TRUE;
|
|
}
|
|
mysql_rwlock_unlock(&LOCK_consensuslog_status);
|
|
return 0;
|
|
}
|
|
|
|
|
|
int ConsensusLogManager::write_log_entry(ConsensusLogEntry &log, uint64* consensus_index, bool with_check)
|
|
{
|
|
int error = 0;
|
|
mysql_rwlock_rdlock(&LOCK_consensuslog_status);
|
|
|
|
enable_rotate = !(log.flag & Consensus_log_event_flag::FLAG_LARGE_TRX);
|
|
if (status == Consensus_Log_System_Status::BINLOG_WORKING)
|
|
{
|
|
bool do_rotate = FALSE;
|
|
if ((error = binlog->append_consensus_log(log, consensus_index, &do_rotate, rli_info, with_check)))
|
|
{
|
|
goto end;
|
|
}
|
|
if (*consensus_index == 0)
|
|
goto end;
|
|
DBUG_ASSERT(*consensus_index != 0);
|
|
|
|
// do not rotate if binlog working because of 2 stage recovery
|
|
}
|
|
else
|
|
{
|
|
// #ifdef HAVE_REPLICATION
|
|
bool do_rotate = false;
|
|
Master_info *mi = rli_info->mi;
|
|
mysql_mutex_lock(&mi->data_lock);
|
|
if(rli_info->relay_log.append_consensus_log(log, consensus_index, &do_rotate, rli_info, with_check))
|
|
{
|
|
mysql_mutex_unlock(&mi->data_lock);
|
|
goto end;
|
|
}
|
|
if (*consensus_index == 0)
|
|
{
|
|
mysql_mutex_unlock(&mi->data_lock);
|
|
goto end;
|
|
}
|
|
DBUG_ASSERT(*consensus_index != 0);
|
|
if (do_rotate && recovery_manager->is_recovering_trx_empty() && enable_rotate)
|
|
{
|
|
if ((error = rotate_relay_log(rli_info->mi)))
|
|
{
|
|
mysql_mutex_unlock(&mi->data_lock);
|
|
goto end;
|
|
}
|
|
}
|
|
mysql_mutex_unlock(&mi->data_lock);
|
|
// #endif
|
|
}
|
|
end:
|
|
mysql_rwlock_unlock(&LOCK_consensuslog_status);
|
|
if (error)
|
|
sql_print_error("ConsensusLogManager::write_log_entry error, consensus index: %llu.", *consensus_index);
|
|
if (*consensus_index == 0)
|
|
sql_print_error("ConsensusLogManager::write_log_entry error, consensus index: %llu, because of a failed term check.", *consensus_index);
|
|
return error;
|
|
}
|
|
|
|
|
|
int ConsensusLogManager::write_log_entries(std::vector<ConsensusLogEntry> &logs, uint64* max_index)
|
|
{
|
|
int error = 0;
|
|
// only follower will call this function
|
|
mysql_rwlock_rdlock(&LOCK_consensuslog_status);
|
|
bool do_rotate = false;
|
|
MYSQL_BIN_LOG *log = status == Consensus_Log_System_Status::BINLOG_WORKING ? binlog : &rli_info->relay_log;
|
|
enable_rotate = !(logs.back().flag & Consensus_log_event_flag::FLAG_LARGE_TRX);
|
|
if ((error = log->append_multi_consensus_logs(logs, max_index, &do_rotate, rli_info)))
|
|
{
|
|
goto end;
|
|
}
|
|
if (do_rotate && recovery_manager->is_recovering_trx_empty() && enable_rotate)
|
|
{
|
|
if (status == BINLOG_WORKING)
|
|
{
|
|
if ((error = binlog->rotate_consensus_log()))
|
|
{
|
|
goto end;
|
|
}
|
|
}
|
|
else
|
|
{
|
|
// #ifdef HAVE_REPLICATION
|
|
Master_info *mi = rli_info->mi;
|
|
mysql_mutex_lock(&mi->data_lock);
|
|
if ((error = rotate_relay_log(rli_info->mi)))
|
|
{
|
|
mysql_mutex_unlock(&mi->data_lock);
|
|
goto end;
|
|
}
|
|
mysql_mutex_unlock(&mi->data_lock);
|
|
// #endif
|
|
}
|
|
}
|
|
end:
|
|
mysql_rwlock_unlock(&LOCK_consensuslog_status);
|
|
if (error)
|
|
sql_print_error("ConsensusLogManager::write_log_entries error, , batch size: %d , max consensus index: %llu.", logs.size(), *max_index);
|
|
return error;
|
|
|
|
}
|
|
|
|
int ConsensusLogManager::get_log_directly(uint64 consensus_index, uint64* consensus_term, std::string& log_content, bool *outer, uint *flag, uint64 *checksum, bool need_content)
|
|
{
|
|
int error = 0;
|
|
if (consensus_index == 0)
|
|
{
|
|
*consensus_term = 1;
|
|
*outer = FALSE;
|
|
*flag = 0;
|
|
return error;
|
|
}
|
|
mysql_rwlock_rdlock(&LOCK_consensuslog_status);
|
|
MYSQL_BIN_LOG *log = status == Consensus_Log_System_Status::BINLOG_WORKING ?
|
|
binlog : &(rli_info->relay_log);
|
|
|
|
if (log->consensus_get_log_entry(consensus_index, consensus_term, log_content, outer, flag, checksum, need_content))
|
|
error = 1;
|
|
mysql_rwlock_unlock(&LOCK_consensuslog_status);
|
|
if (error)
|
|
sql_print_error("ConsensusLogManager::get_log_directly error, consensus index: %llu.", consensus_index);
|
|
return error;
|
|
}
|
|
|
|
|
|
int ConsensusLogManager::prefetch_log_directly(THD* thd, uint64 channel_id, uint64 consensus_index)
|
|
{
|
|
int error = 0;
|
|
mysql_rwlock_rdlock(&LOCK_consensuslog_status);
|
|
MYSQL_BIN_LOG *log = status == Consensus_Log_System_Status::BINLOG_WORKING ?
|
|
binlog : &(rli_info->relay_log);
|
|
|
|
if (log->consensus_prefetch_log_entries(thd, channel_id, consensus_index))
|
|
error = 1;
|
|
mysql_rwlock_unlock(&LOCK_consensuslog_status);
|
|
if (error)
|
|
sql_print_error("ConsensusLogManager::prefetch_log_directly error, consensus index: %llu.", consensus_index);
|
|
return error;
|
|
}
|
|
|
|
int ConsensusLogManager::get_log_entry(uint64 channel_id, uint64 consensus_index, uint64 *consensus_term, std::string& log_content, bool *outer, uint *flag, uint64 *checksum, bool fast_fail)
|
|
{
|
|
int error = 0;
|
|
if (!opt_consensus_disable_fifo_cache && consensus_index > cache_index)
|
|
return 1;
|
|
if (consensus_index == 0)
|
|
{
|
|
*consensus_term = 0;
|
|
log_content = "";
|
|
return 0;
|
|
}
|
|
|
|
error = fifo_cache_manager->get_log_from_cache(consensus_index, consensus_term, log_content, outer, flag, checksum);
|
|
if (error == ALREADY_SWAP_OUT)
|
|
{
|
|
uint64_t last_sync_index = sync_index;
|
|
if (consensus_index > last_sync_index) {
|
|
// don't prefetch log if it is not written to disk
|
|
sql_print_information("ConsensusLogManager::get_log_entry %llu fail, log has not been flushed to disk, sync index is %llu", consensus_index, last_sync_index);
|
|
return 1;
|
|
}
|
|
ConsensusPreFetchChannel *channel = prefetch_manager->get_prefetch_channel(channel_id);
|
|
if ((error = channel->get_log_from_prefetch_cache(consensus_index, consensus_term, log_content, outer, flag, checksum)))
|
|
{
|
|
if (!fast_fail)
|
|
{
|
|
error = get_log_directly(consensus_index, consensus_term, log_content, outer, flag, checksum, channel_id == 0? false: true);
|
|
}
|
|
else
|
|
{
|
|
error = 1;
|
|
}
|
|
channel->set_prefetch_request(consensus_index);
|
|
}
|
|
}
|
|
else if (error == OUT_OF_RANGE)
|
|
{
|
|
sql_print_error("ConsensusLogManager::get_log_entry fail, out of fifo range. channel_id %llu consensus index : %llu", channel_id, consensus_index);
|
|
}
|
|
if (error == 1)
|
|
sql_print_information("ConsensusLogManager::get_log_entry fail, channel_id %llu consensus index: %llu ,start prefetch.", channel_id, consensus_index);
|
|
return error;
|
|
}
|
|
|
|
|
|
uint64_t ConsensusLogManager::get_left_log_size(uint64 start_log_index, uint64 max_packet_size)
|
|
{
|
|
uint64 total_size = 0;
|
|
total_size += fifo_cache_manager->get_log_size_from_cache(start_log_index, cache_index, max_packet_size);
|
|
// do not consider prefetch cache here
|
|
return total_size;
|
|
}
|
|
|
|
|
|
int ConsensusLogManager::get_log_position(uint64 consensus_index, bool need_lock, char* log_name, uint64* pos)
|
|
{
|
|
int error = 0;
|
|
if (need_lock)
|
|
mysql_rwlock_rdlock(&LOCK_consensuslog_status);
|
|
MYSQL_BIN_LOG *log = status == Consensus_Log_System_Status::BINLOG_WORKING ?
|
|
binlog : &(rli_info->relay_log);
|
|
|
|
if (log->consensus_get_log_position(consensus_index, log_name, pos))
|
|
{
|
|
error = 1;
|
|
}
|
|
if (need_lock)
|
|
mysql_rwlock_unlock(&LOCK_consensuslog_status);
|
|
if (error)
|
|
sql_print_error("ConsensusLogManager::get_log_position error, consensus index: %llu.", consensus_index);
|
|
return error;
|
|
}
|
|
|
|
uint64 ConsensusLogManager::get_next_trx_index(uint64 consensus_index)
|
|
{
|
|
uint64 retIndex = consensus_index;
|
|
mysql_rwlock_rdlock(&LOCK_consensuslog_status);
|
|
MYSQL_BIN_LOG *log = status == Consensus_Log_System_Status::BINLOG_WORKING ?
|
|
binlog : &(rli_info->relay_log);
|
|
|
|
retIndex = log->get_trx_end_index(consensus_index);
|
|
mysql_rwlock_unlock(&LOCK_consensuslog_status);
|
|
if (retIndex == 0)
|
|
{
|
|
sql_print_error("ConsensusLogManager: fail to find next trx index.");
|
|
abort();
|
|
}
|
|
sql_print_information("ConsensusLogManager: next transaction index is %llu", retIndex + 1);
|
|
return retIndex + 1;
|
|
}
|
|
|
|
uint32 ConsensusLogManager::serialize_cache(uchar **buffer)
|
|
{
|
|
my_off_t buf_size = my_b_tell(cache_log->get_io_cache());
|
|
*buffer = (uchar*)my_malloc(key_memory_thd_main_mem_root, (size_t)buf_size, MYF(MY_WME));
|
|
if (reinit_io_cache(cache_log->get_io_cache(), READ_CACHE, 0, 0, 0))
|
|
return 0;
|
|
my_b_read(cache_log->get_io_cache(), *buffer, (size_t)buf_size);
|
|
if (reinit_io_cache(cache_log->get_io_cache(), WRITE_CACHE, 0, 0, 1))
|
|
return 0;
|
|
return buf_size;
|
|
}
|
|
|
|
|
|
int ConsensusLogManager:: truncate_log(uint64 consensus_index)
|
|
{
|
|
int error = 0;
|
|
|
|
if (opt_print_gtid_info_during_recovery == DETAIL_INFO)
|
|
sql_print_error("Consensus Truncate log , index is %llu", consensus_index);
|
|
else
|
|
sql_print_information("Consensus Truncate log , index is %llu", consensus_index);
|
|
|
|
prefetch_manager->stop_prefetch_threads();
|
|
mysql_rwlock_rdlock(&LOCK_consensuslog_status);
|
|
MYSQL_BIN_LOG *log = status == BINLOG_WORKING ? binlog : &(rli_info->relay_log);
|
|
mysql_mutex_lock(log->get_log_lock());
|
|
|
|
// truncate log file
|
|
Relay_log_info *rli = status == RELAY_LOG_WORKING ? rli_info : NULL;
|
|
if (log->consensus_truncate_log(consensus_index, rli))
|
|
{
|
|
error = 1;
|
|
}
|
|
if (status == RELAY_LOG_WORKING && recovery_manager->get_last_leader_term_index() >= consensus_index)
|
|
{
|
|
if (opt_print_gtid_info_during_recovery == DETAIL_INFO) {
|
|
sql_print_error("truncate_log set_last_leader_term_index to %llu",
|
|
consensus_index - 1);
|
|
}
|
|
recovery_manager->set_last_leader_term_index(consensus_index - 1);
|
|
}
|
|
// truncate commit map
|
|
recovery_manager->truncate_commit_xid_map(consensus_index);
|
|
|
|
// truncate cache
|
|
fifo_cache_manager->trunc_log_from_cache(consensus_index);
|
|
prefetch_manager->trunc_log_from_prefetch_cache(consensus_index);
|
|
mysql_mutex_unlock(log->get_log_lock());
|
|
|
|
mysql_mutex_lock(&rli_info->data_lock);
|
|
// reset the previous_gtid_set_of_relaylog after truncate log
|
|
if (!error) {
|
|
error = rli_info->reset_previous_gtid_set_of_relaylog();
|
|
}
|
|
mysql_mutex_unlock(&rli_info->data_lock);
|
|
|
|
mysql_rwlock_unlock(&LOCK_consensuslog_status);
|
|
if (error)
|
|
sql_print_error("ConsensusLogManager::truncate_log error, consensus index: %llu.", consensus_index);
|
|
|
|
prefetch_manager->start_prefetch_threads();
|
|
return error;
|
|
}
|
|
|
|
|
|
int ConsensusLogManager::purge_log(uint64 consensus_index)
|
|
{
|
|
int error = 0;
|
|
std::string file_name;
|
|
mysql_rwlock_rdlock(&LOCK_consensuslog_status);
|
|
uint64 purge_index = 0;
|
|
|
|
if (status == BINLOG_WORKING)
|
|
{
|
|
// server still work as leader, so we should at least retain 1 binlog file
|
|
// apply pos must at the last binlog file
|
|
purge_index = consensus_index;
|
|
}
|
|
else
|
|
{
|
|
uint64 start_apply_index = consensus_info->get_start_apply_index();
|
|
if (start_apply_index == 0)
|
|
{
|
|
// apply thread already start, use slave applied index as purge index
|
|
purge_index = rli_info->get_consensus_apply_index();
|
|
}
|
|
else
|
|
{
|
|
// apply thread not start
|
|
purge_index = start_apply_index;
|
|
}
|
|
purge_index = opt_cluster_log_type_instance ? consensus_index : std::min(purge_index, consensus_index);
|
|
}
|
|
// #ifdef HAVE_REPLICATION // for call binlog->purge_logs
|
|
MYSQL_BIN_LOG *log = status == BINLOG_WORKING ? binlog : &(rli_info->relay_log);
|
|
if (status == RELAY_LOG_WORKING)
|
|
{
|
|
mysql_mutex_lock(&rli_info->data_lock);
|
|
}
|
|
mysql_mutex_lock(log->get_index_lock());
|
|
if (log->find_log_by_consensus_index(purge_index, file_name) ||
|
|
log->purge_logs(file_name.c_str(), FALSE/**include*/, FALSE /*need index lock*/, TRUE /*update threads*/, NULL, TRUE))
|
|
{
|
|
error = 1;
|
|
}
|
|
|
|
if (status == BINLOG_WORKING)
|
|
{
|
|
mysql_mutex_unlock(log->get_index_lock());
|
|
}
|
|
else
|
|
{
|
|
/*
|
|
* Need to update the log pos because purge logs has been called
|
|
* after fetching initially the log pos at the begining of the method.
|
|
*/
|
|
if (!opt_cluster_log_type_instance)
|
|
{
|
|
LOG_INFO *log_info = rli_info->applier_reader->get_log_info();
|
|
DBUG_ASSERT(log_info != NULL);
|
|
if((error=log->find_log_pos(log_info, rli_info->get_event_relay_log_name(),
|
|
false/*need_lock_index=false*/)))
|
|
{
|
|
char buff[22];
|
|
sql_print_error("next log error: %d offset: %s log: %s",
|
|
error,
|
|
llstr(log_info->index_file_offset,buff),
|
|
rli_info->get_group_relay_log_name());
|
|
error = 1;
|
|
}
|
|
}
|
|
mysql_mutex_unlock(log->get_index_lock());
|
|
mysql_mutex_unlock(&rli_info->data_lock);
|
|
}
|
|
// #endif
|
|
mysql_rwlock_unlock(&LOCK_consensuslog_status);
|
|
if (error)
|
|
sql_print_error("ConsensusLogManager::purge_log error, consensus index: %llu.", consensus_index);
|
|
return error;
|
|
}
|
|
|
|
uint64 ConsensusLogManager::get_exist_log_length()
|
|
{
|
|
// first index in log index may not be exact value
|
|
// so we should check whether sync_index is larger
|
|
uint64 start_index = log_file_index->get_first_index();
|
|
uint64 end_index = sync_index;
|
|
if (end_index >= start_index)
|
|
return end_index - start_index;
|
|
else
|
|
return 0;
|
|
}
|
|
|
|
int ConsensusLogManager::start_consensus_commit_pos_watcher()
|
|
{
|
|
assert(inited);
|
|
// start a background thread to report current log position of commitIndex
|
|
consensus_commit_pos_watcher_is_running = true;
|
|
if (mysql_thread_create(key_thread_consensus_commit_pos_watcher,
|
|
&consensus_commit_pos_watcher_thread_handle, NULL,
|
|
run_consensus_commit_pos_watcher, (void *) &consensus_commit_pos_watcher_is_running))
|
|
{
|
|
sql_print_error("Fail to create thread run_consensus_commit_pos_watcher.");
|
|
return 1;
|
|
}
|
|
return 0;
|
|
}
|
|
|
|
void ConsensusLogManager::stop_consensus_commit_pos_watcher()
|
|
{
|
|
if (consensus_commit_pos_watcher_is_running)
|
|
{
|
|
consensus_commit_pos_watcher_is_running = false;
|
|
my_thread_join(&consensus_commit_pos_watcher_thread_handle, NULL);
|
|
}
|
|
}
|
|
|
|
void ConsensusLogManager::update_commit_pos(const std::string &log_name, uint64_t pos, uint64_t index)
|
|
{
|
|
assert(pos != 0);
|
|
assert(index >= commit_pos.index);
|
|
mysql_mutex_lock(&LOCK_consensus_commit_pos);
|
|
commit_pos.fname = log_name;
|
|
commit_pos.pos = pos;
|
|
commit_pos.index = index;
|
|
if (opt_consensus_log_level >= 2)
|
|
sql_print_information("Report binlog commit position: %s %lu %lu", commit_pos.fname.c_str(), commit_pos.pos, commit_pos.index);
|
|
|
|
mysql_mutex_unlock(&LOCK_consensus_commit_pos);
|
|
}
|
|
|
|
void ConsensusLogManager::get_commit_pos(char * const fname_ptr, uint64_t* pos_ptr, uint64_t* index_ptr)
|
|
{
|
|
if (fname_ptr == NULL || pos_ptr == NULL || index_ptr == NULL)
|
|
return;
|
|
mysql_mutex_lock(&LOCK_consensus_commit_pos);
|
|
// parse xxx/binlog.0001 to binlog.0001
|
|
const char *tmp = strrchr(commit_pos.fname.c_str(), '/');
|
|
strncpy(fname_ptr, tmp == NULL? commit_pos.fname.c_str(): tmp + 1, FN_REFLEN);
|
|
*pos_ptr = commit_pos.pos;
|
|
*index_ptr = commit_pos.index;
|
|
mysql_mutex_unlock(&LOCK_consensus_commit_pos);
|
|
}
|
|
|
|
|
|
void ConsensusLogManager::wait_replay_log_finished()
|
|
{
|
|
mysql_mutex_lock(&LOCK_consensuslog_apply_thread);
|
|
if (apply_catchup == TRUE)
|
|
{
|
|
mysql_mutex_unlock(&LOCK_consensuslog_apply_thread);
|
|
}
|
|
else
|
|
{
|
|
struct timespec abstime;
|
|
set_timespec(&abstime, 2);
|
|
while (!apply_catchup)
|
|
{
|
|
mysql_cond_timedwait(&COND_consensuslog_catchup, &LOCK_consensuslog_apply_thread, &abstime);
|
|
}
|
|
mysql_mutex_unlock(&LOCK_consensuslog_apply_thread);
|
|
}
|
|
|
|
consensus_log_manager.set_apply_catchup(FALSE);
|
|
}
|
|
|
|
void ConsensusLogManager::wait_apply_threads_start()
|
|
{
|
|
while (!rli_info->slave_running)
|
|
{
|
|
my_sleep(200);
|
|
}
|
|
}
|
|
|
|
void ConsensusLogManager::wait_apply_threads_stop()
|
|
{
|
|
while (rli_info->slave_running)
|
|
{
|
|
my_sleep(200);
|
|
}
|
|
}
|
|
|
|
// #ifdef HAVE_REPLICATION
|
|
int ConsensusLogManager::wait_leader_degraded(uint64 term, uint64 index)
|
|
{
|
|
sql_print_information("ConsensusLogManager::wait_leader_degraded, consensus term: %llu, consensus index: %llu", term, index);
|
|
int error = 0;
|
|
|
|
DBUG_EXECUTE_IF("simulate_leader_degrade_slow", { my_sleep(10000000);
|
|
sql_print_information("wait_leader_degraded sleep 10 seconds"); });
|
|
|
|
// switch event scheduler off
|
|
if (Events::opt_configured_event_scheduler == Events::EVENTS_ON)
|
|
{
|
|
Events::opt_event_scheduler = Events::EVENTS_OFF;
|
|
Events::stop();
|
|
}
|
|
|
|
// prefetch stop, and release LOCK_consensuslog_status
|
|
prefetch_manager->disable_all_prefetch_channels();
|
|
|
|
DBUG_ASSERT(this->status == Consensus_Log_System_Status::BINLOG_WORKING);
|
|
// set offline mode
|
|
opt_enable_consensus_leader = FALSE;
|
|
// kill all the thd
|
|
killall_threads();
|
|
// wait all commit trx finished
|
|
binlog->wait_xid_disappear();
|
|
|
|
mysql_rwlock_wrlock(&LOCK_consensuslog_status);
|
|
|
|
// close binlog system
|
|
binlog->close(LOG_CLOSE_INDEX | LOG_CLOSE_TO_BE_OPENED, true, true);
|
|
// open relay log system
|
|
mysql_mutex_lock(&rli_info->mi->data_lock);
|
|
mysql_mutex_lock(&rli_info->data_lock);
|
|
rli_info->rli_init_info();
|
|
mysql_mutex_unlock(&rli_info->data_lock);
|
|
mysql_mutex_unlock(&rli_info->mi->data_lock);
|
|
|
|
// record new term
|
|
current_term = term;
|
|
set_event_timestamp(0);
|
|
this-> status = Consensus_Log_System_Status::RELAY_LOG_WORKING;
|
|
|
|
stop_term = UINT64_MAX;
|
|
// log type instance do not to recover start index
|
|
if (!opt_cluster_log_type_instance) {
|
|
consensus_info->set_start_apply_index(index);
|
|
/* XCLUSTER_RESOLVE : GTID related stuff, may need review later */
|
|
// leader to follower need save the gtids of the binlog to table
|
|
if (gtid_state->save_gtids_of_last_binlog_into_table() ||
|
|
gtid_state->set_previous_logged_gtids_relaylog(rli_info->get_gtid_set(), rli_info->get_sid_lock())) {
|
|
mysql_rwlock_unlock(&LOCK_consensuslog_status);
|
|
sql_print_error("Failed in save last binlog gtid into table.");
|
|
error = 1;
|
|
goto end;
|
|
}
|
|
}
|
|
if (consensus_info->flush_info(true, true))
|
|
{
|
|
mysql_rwlock_unlock(&LOCK_consensuslog_status);
|
|
sql_print_error("Failed in flush_info() called from ConsensusLog::wait_leader_degraded.");
|
|
error = 1;
|
|
goto end;
|
|
}
|
|
if (!opt_cluster_log_type_instance)
|
|
{
|
|
start_consensus_apply_threads();
|
|
}
|
|
|
|
mysql_rwlock_unlock(&LOCK_consensuslog_status);
|
|
|
|
if (!opt_cluster_log_type_instance)
|
|
{
|
|
wait_apply_threads_start();
|
|
}
|
|
end:
|
|
// recover prefetch
|
|
prefetch_manager->enable_all_prefetch_channels();
|
|
return error;
|
|
}
|
|
|
|
int ConsensusLogManager::wait_follower_upgraded(uint64 term, uint64 index)
|
|
{
|
|
sql_print_information("ConsensusLogManager::wait_follower_upgraded, consensus term: %llu, consensus index: %llu", term, index);
|
|
int error = 0;
|
|
|
|
DBUG_ASSERT(this->status == Consensus_Log_System_Status::RELAY_LOG_WORKING);
|
|
|
|
// record new term
|
|
// notice, the order of stop term and current term is important for apply thread,
|
|
// because both stop term and current term is atomic variables. so do not care reorder problem
|
|
stop_term = term;
|
|
|
|
#ifdef NORMANDY_CLUSTER
|
|
// wait replay thread to commit index
|
|
if (!opt_cluster_log_type_instance)
|
|
{
|
|
DBUG_EXECUTE_IF("simulate_apply_too_slow", my_sleep(5000000););
|
|
wait_replay_log_finished();
|
|
wait_apply_threads_stop();
|
|
}
|
|
#endif
|
|
|
|
// wait recover trx all finished
|
|
while (!(recovery_manager->is_recovering_trx_empty()))
|
|
{
|
|
my_sleep(1000);
|
|
}
|
|
// clear cache
|
|
reinit_io_cache(consensus_log_manager.get_cache(), WRITE_CACHE, 0, 0, 1);
|
|
// prefetch stop, and release LOCK_consensuslog_status
|
|
prefetch_manager->disable_all_prefetch_channels();
|
|
|
|
// kill all binlog dump thd
|
|
killall_dump_threads();
|
|
|
|
mysql_rwlock_wrlock(&LOCK_consensuslog_status);
|
|
// stop_apply_threads(thd);
|
|
|
|
mysql_mutex_lock(&rli_info->data_lock);
|
|
|
|
rli_info->end_info();
|
|
mysql_mutex_unlock(&rli_info->data_lock);
|
|
|
|
// open binlog index and file
|
|
mysql_mutex_t *log_lock = binlog->get_log_lock();
|
|
if (binlog->open_index_file(opt_binlog_index_name, opt_bin_logname, TRUE))
|
|
{
|
|
mysql_rwlock_unlock(&LOCK_consensuslog_status);
|
|
sql_print_error("Failed in open_index_file() called from ConsensusLog::wait_follower_upgraded.");
|
|
error = 1;
|
|
goto end;
|
|
}
|
|
|
|
mysql_mutex_lock(log_lock);
|
|
if (binlog->open_exist_binlog(opt_bin_logname, 0,
|
|
max_binlog_size, true,
|
|
true/*need_lock_index=true*/,
|
|
true/*need_sid_lock=true*/,
|
|
NULL))
|
|
{
|
|
mysql_mutex_unlock(log_lock);
|
|
mysql_rwlock_unlock(&LOCK_consensuslog_status);
|
|
sql_print_error("Failed in open_log() called from ConsensusLog::wait_follower_upgraded.");
|
|
error = 2;
|
|
goto end;
|
|
}
|
|
mysql_mutex_unlock(log_lock);
|
|
// set offline mode
|
|
opt_enable_consensus_leader = TRUE;
|
|
appliedindex_checker.reset();
|
|
set_event_timestamp(0);
|
|
this->status = Consensus_Log_System_Status::BINLOG_WORKING;
|
|
|
|
//reset apply start point displayed in information_schema
|
|
apply_index = 1;
|
|
real_apply_index = 1;
|
|
already_set_start_index = FALSE;
|
|
already_set_start_term = FALSE;
|
|
current_term = term;
|
|
|
|
// log type instance do not to recover start index
|
|
if (!opt_cluster_log_type_instance) {
|
|
/*
|
|
TODO: the global set gtid_state->binlog_previous_gtids is not maintenaned
|
|
correctly on followers, and as a result the content of previous gtid event
|
|
in binlog file is incorrect at binlog rotation.
|
|
|
|
So, when follower upgrading to leader, that using the previous gtid event
|
|
in binlog to adjust the value in memory is unreliable.
|
|
|
|
Actually in xdb cluster, binlog previous gtid set are always equivalent to
|
|
executed gtid set, we disable adjusting temporarily.
|
|
*/
|
|
//if (index > 0 && binlog->reset_previous_gtids_logged(index)) {
|
|
// sql_print_error("Failed to reset previous gtids logged.");
|
|
//}
|
|
consensus_info->set_last_leader_term(term);
|
|
}
|
|
consensus_info->set_recover_status(Consensus_Log_System_Status::BINLOG_WORKING);
|
|
|
|
if (consensus_info->flush_info(true, true))
|
|
{
|
|
mysql_rwlock_unlock(&LOCK_consensuslog_status);
|
|
sql_print_error("Failed in flush_info() called from ConsensusLog::wait_follower_upgraded.");
|
|
error = 3;
|
|
goto end;
|
|
}
|
|
mysql_rwlock_unlock(&LOCK_consensuslog_status);
|
|
|
|
// switch event scheduler on
|
|
if (Events::opt_configured_event_scheduler == Events::EVENTS_ON)
|
|
{
|
|
int err_no= 0;
|
|
Events::opt_event_scheduler = Events::EVENTS_ON;
|
|
if (Events::start(&err_no))
|
|
{
|
|
Events::opt_event_scheduler = Events::EVENTS_OFF;
|
|
sql_print_error("Fail to set event_scheduler=on during "
|
|
"wait_follower_upgraded, error %d.", err_no);
|
|
}
|
|
}
|
|
end:
|
|
// recover prefetch
|
|
prefetch_manager->enable_all_prefetch_channels();
|
|
return error;
|
|
}
|
|
|
|
|
|
int ConsensusLogManager::wait_follower_change_term(uint64 term)
|
|
{
|
|
sql_print_information("ConsensusLogManager::wait_follower_change_term, consensus term: %llu.", term);
|
|
current_term = term;
|
|
return 0;
|
|
}
|
|
|
|
// #endif
|
|
|
|
uint64 ConsensusLogManager::get_cache_index()
|
|
{
|
|
return cache_index;
|
|
}
|
|
|
|
void ConsensusLogManager::set_cache_index(uint64 cache_index_arg)
|
|
{
|
|
cache_index = cache_index_arg;
|
|
}
|
|
|
|
uint64 ConsensusLogManager::get_sync_index(bool serious)
|
|
{
|
|
if (!serious)
|
|
{
|
|
return sync_index;
|
|
}
|
|
else
|
|
{
|
|
/* currently, never cover this path, deadlock risk */
|
|
mysql_rwlock_rdlock(&LOCK_consensuslog_status);
|
|
MYSQL_BIN_LOG *log = status == Consensus_Log_System_Status::BINLOG_WORKING ?
|
|
binlog : &(rli_info->relay_log);
|
|
log->wait_xid_disappear();
|
|
mysql_rwlock_unlock(&LOCK_consensuslog_status);
|
|
return sync_index;
|
|
}
|
|
}
|
|
|
|
uint64 ConsensusLogManager::get_final_sync_index()
|
|
{
|
|
mysql_mutex_lock(get_sequence_stage1_lock());
|
|
uint64_t final_sync_index = current_index ? current_index - 1 : 0;
|
|
mysql_mutex_unlock(get_sequence_stage1_lock());
|
|
return final_sync_index;
|
|
}
|
|
|
|
void ConsensusLogManager::set_sync_index(uint64 sync_index_arg)
|
|
{
|
|
sync_index = sync_index_arg;
|
|
}
|
|
|
|
void ConsensusLogManager::set_sync_index_if_greater(uint64 sync_index_arg)
|
|
{
|
|
for (;;)
|
|
{
|
|
uint64 old= sync_index.load();
|
|
if (old >= sync_index_arg || (old < sync_index_arg && sync_index.compare_exchange_weak(old, sync_index_arg)))
|
|
break;
|
|
}
|
|
}
|
|
|
|
void ConsensusLogManager::add_state_change_request(ConsensusStateChange &state_change)
|
|
{
|
|
mysql_mutex_lock(&LOCK_consensus_state_change);
|
|
consensus_state_change_queue.push_back(state_change);
|
|
mysql_mutex_unlock(&LOCK_consensus_state_change);
|
|
mysql_cond_broadcast(&COND_consensus_state_change);
|
|
|
|
}
|
|
|
|
void ConsensusLogManager::lock_consensus_state_change()
|
|
{
|
|
mysql_mutex_lock(&LOCK_consensus_state_change);
|
|
}
|
|
|
|
void ConsensusLogManager::unlock_consensus_state_change()
|
|
{
|
|
mysql_mutex_unlock(&LOCK_consensus_state_change);
|
|
}
|
|
|
|
void ConsensusLogManager::wait_state_change_cond()
|
|
{
|
|
mysql_cond_wait(&COND_consensus_state_change, &LOCK_consensus_state_change);
|
|
}
|
|
|
|
ConsensusStateChange ConsensusLogManager::get_stage_change_from_queue()
|
|
{
|
|
ConsensusStateChange state_change;
|
|
|
|
if (consensus_state_change_queue.size() > 0)
|
|
{
|
|
state_change = consensus_state_change_queue.front();
|
|
consensus_state_change_queue.pop_front();
|
|
}
|
|
return state_change;
|
|
}
|
|
|
|
// #ifdef HAVE_REPLICATION
|
|
void *run_consensus_stage_change(void *arg)
|
|
{
|
|
if (my_thread_init())
|
|
return NULL;
|
|
|
|
int error = 0;
|
|
bool *is_running = (bool *)arg;
|
|
|
|
/*
|
|
Waiting until mysqld_server_started == true to ensure that all server
|
|
components have been successfully initialized.
|
|
*/
|
|
mysql_mutex_lock(&LOCK_server_started);
|
|
while (!mysqld_server_started && (*is_running))
|
|
mysql_cond_wait(&COND_server_started, &LOCK_server_started);
|
|
mysql_mutex_unlock(&LOCK_server_started);
|
|
while (*is_running)
|
|
{
|
|
consensus_log_manager.lock_consensus_state_change();
|
|
if (consensus_log_manager.is_state_change_queue_empty() && (*is_running))
|
|
consensus_log_manager.wait_state_change_cond();
|
|
if (consensus_log_manager.is_state_change_queue_empty())
|
|
{
|
|
consensus_log_manager.unlock_consensus_state_change();
|
|
continue;
|
|
}
|
|
ConsensusStateChange state_change = consensus_log_manager.get_stage_change_from_queue();
|
|
if (opt_print_gtid_info_during_recovery == DETAIL_INFO) {
|
|
sql_print_error("run_consensus_stage_change start: %d,%d",
|
|
state_change.state, consensus_log_manager.get_status());
|
|
}
|
|
if (state_change.state != alisql::Paxos::LEADER)
|
|
{
|
|
if (consensus_log_manager.get_status() == BINLOG_WORKING)
|
|
{
|
|
error = consensus_log_manager.wait_leader_degraded(state_change.term, state_change.index);
|
|
}
|
|
else if (state_change.state != alisql::Paxos::CANDIDATE)
|
|
{
|
|
error = consensus_log_manager.wait_follower_change_term(state_change.term);
|
|
}
|
|
}
|
|
else
|
|
{
|
|
// must be candidate ->leader
|
|
error = consensus_log_manager.wait_follower_upgraded(state_change.term, state_change.index);
|
|
}
|
|
|
|
consensus_log_manager.unlock_consensus_state_change();
|
|
|
|
if (opt_print_gtid_info_during_recovery == DETAIL_INFO) {
|
|
sql_print_error("run_consensus_stage_change end: %d,%d,error=%d",
|
|
state_change.state, consensus_log_manager.get_status(),
|
|
error);
|
|
}
|
|
|
|
if (error)
|
|
{
|
|
sql_print_error("Consensus state change failed");
|
|
abort();
|
|
}
|
|
}
|
|
|
|
|
|
my_thread_end();
|
|
return NULL;
|
|
}
|
|
// #endif
|
|
|
|
void *run_consensus_commit_pos_watcher(void *arg)
|
|
{
|
|
sql_print_information("start consensus_commit_pos_watcher thread");
|
|
binlog_commit_pos_watcher((bool *)arg);
|
|
sql_print_information("stop consensus_commit_pos_watcher thread");
|
|
return NULL;
|
|
}
|
|
|
|
bool ConsensusLogManager::is_state_machine_ready()
|
|
{
|
|
return status == Consensus_Log_System_Status::BINLOG_WORKING &&
|
|
consensus_ptr->getTerm() == get_current_term();
|
|
}
|
|
|
|
bool ConsensusLogManager::option_invalid(bool log_bin) {
|
|
if (!log_bin) {
|
|
sql_print_error("PolarDB-X Engine log_bin must be set to ON");
|
|
return true;
|
|
}
|
|
|
|
return false;
|
|
}
|
|
|
|
|
|
IO_CACHE *ConsensusLogManager::get_cache() { return cache_log->get_io_cache(); }
|