741 lines
23 KiB
C++
741 lines
23 KiB
C++
/*****************************************************************************
|
|
|
|
Copyright (c) 2018, 2019, Oracle and/or its affiliates. All Rights Reserved.
|
|
|
|
This program is free software; you can redistribute it and/or modify it under
|
|
the terms of the GNU General Public License, version 2.0, as published by the
|
|
Free Software Foundation.
|
|
|
|
This program is also distributed with certain software (including but not
|
|
limited to OpenSSL) that is licensed under separate terms, as designated in a
|
|
particular file or component or in included license documentation. The authors
|
|
of MySQL hereby grant you an additional permission to link the program and
|
|
your derivative works with the separately licensed software that they have
|
|
included with MySQL.
|
|
|
|
This program is distributed in the hope that it will be useful, but WITHOUT
|
|
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
|
|
FOR A PARTICULAR PURPOSE. See the GNU General Public License, version 2.0,
|
|
for more details.
|
|
|
|
You should have received a copy of the GNU General Public License along with
|
|
this program; if not, write to the Free Software Foundation, Inc.,
|
|
51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
|
|
|
|
*****************************************************************************/
|
|
|
|
/** @file clone/clone0repl.cc
|
|
Innodb Clone Replication Coordinates
|
|
|
|
*******************************************************/
|
|
#include "clone0repl.h"
|
|
#include "clone0api.h"
|
|
#include "clone0clone.h"
|
|
#include "lizard0sys.h"
|
|
#include "sql/field.h"
|
|
#include "sql/mysqld.h"
|
|
#include "sql/rpl_gtid_persist.h"
|
|
#include "sql/sql_class.h"
|
|
#include "sql/sql_thd_internal_api.h"
|
|
|
|
bool svr_write_non_innodb_gtids = false;
|
|
|
|
/* To get current session thread default THD */
|
|
THD *thd_get_current_thd();
|
|
|
|
void Clone_persist_gtid::add(const Gtid_desc >id_desc) {
|
|
/* Check if valid descriptor. */
|
|
if (!gtid_desc.m_is_set) {
|
|
return;
|
|
}
|
|
/* Check if GTID persistence is active. */
|
|
if (!is_active() || gtid_table_persistor == nullptr) {
|
|
return;
|
|
}
|
|
ut_ad(trx_sys_mutex_own());
|
|
/* Get active GTID list */
|
|
auto ¤t_gtids = get_active_list();
|
|
|
|
/* Add input GTID to the set */
|
|
current_gtids.push_back(gtid_desc.m_info);
|
|
/* Atomic increment. */
|
|
int current_value = ++m_num_gtid_mem;
|
|
|
|
/* Wake up background if GTIDs crossed threshold. */
|
|
if (current_value == s_gtid_threshold) {
|
|
os_event_set(m_event);
|
|
}
|
|
}
|
|
|
|
trx_undo_t::Gtid_storage Clone_persist_gtid::persists_gtid(const trx_t *trx) {
|
|
auto thd = trx->mysql_thd;
|
|
|
|
if (thd == nullptr) {
|
|
thd = thd_get_current_thd();
|
|
}
|
|
|
|
if (thd == nullptr || !thd->se_persists_gtid()) {
|
|
/* No need to persist GTID. */
|
|
return trx_undo_t::Gtid_storage::NONE;
|
|
}
|
|
|
|
if (thd->is_extrenal_xa()) {
|
|
/* Need to persist both XA prepare and commit GTID. */
|
|
return trx_undo_t::Gtid_storage::PREPARE_AND_COMMIT;
|
|
}
|
|
|
|
/* Need to persist only commit GTID. */
|
|
return trx_undo_t::Gtid_storage::COMMIT;
|
|
}
|
|
|
|
void Clone_persist_gtid::set_persist_gtid(trx_t *trx, bool set) {
|
|
bool thd_check = false;
|
|
auto thd = trx->mysql_thd;
|
|
|
|
/* Check conditions if the session is good for persisting GTID. */
|
|
static_cast<void>(has_gtid(trx, thd, thd_check));
|
|
|
|
/* For attachable transaction, skip both set and reset. */
|
|
if (thd == nullptr || (thd->is_attachable_transaction_active() && !thd->is_autonomous_transaction()) ||
|
|
trx->internal) {
|
|
return;
|
|
}
|
|
|
|
/* First do the reset. */
|
|
if (!set) {
|
|
thd->reset_gtid_persisted_by_se();
|
|
/* Reset transaction flag also. */
|
|
trx->persists_gtid = false;
|
|
return;
|
|
}
|
|
|
|
ut_ad(set);
|
|
/* Don't set if thread checks have failed. */
|
|
if (!thd_check) {
|
|
return;
|
|
}
|
|
|
|
/* If GTID table is updated directly, don't add GTID for the transaction.
|
|
Try flushing in memory ones. */
|
|
if (thd->is_operating_gtid_table_implicitly) {
|
|
/* Trigger flush, don't wait; we could be in the middle of operation. */
|
|
os_event_set(m_event);
|
|
return;
|
|
}
|
|
|
|
/* This is an optimization to skip GTID allocation, if transaction
|
|
is guaranteed to not have GTID. */
|
|
if (!thd->se_persists_gtid()) {
|
|
auto gtid_next = thd->variables.gtid_next.type;
|
|
|
|
if (opt_bin_log) {
|
|
/* This transaction would not have GTID. */
|
|
if (gtid_next == ANONYMOUS_GTID) {
|
|
return;
|
|
}
|
|
} else {
|
|
/* If binary log is disabled, GTID must be directly assigned. */
|
|
if (gtid_next != ASSIGNED_GTID) {
|
|
return;
|
|
}
|
|
}
|
|
}
|
|
|
|
/* Test case to validate direct write to gtid_executed table. */
|
|
DBUG_EXECUTE_IF("simulate_err_on_write_gtid_into_table", { return; });
|
|
DBUG_EXECUTE_IF("disable_se_persists_gtid", { return; });
|
|
|
|
/* Set or Reset GTID persist flag in THD session. The transaction flag
|
|
is set later during prepare/commit/rollback. */
|
|
thd->set_gtid_persisted_by_se();
|
|
}
|
|
|
|
bool Clone_persist_gtid::trx_check_set(trx_t *trx, bool prepare,
|
|
bool rollback) {
|
|
auto thd = trx->mysql_thd;
|
|
bool alloc_check = false;
|
|
|
|
bool gtid_exists = has_gtid(trx, thd, alloc_check);
|
|
|
|
if (prepare) {
|
|
/* Check for XA prepare. */
|
|
gtid_exists = check_gtid_prepare(thd, trx, gtid_exists, alloc_check);
|
|
} else if (rollback) {
|
|
/* Check for Rollback. */
|
|
gtid_exists = check_gtid_rollback(thd, trx, gtid_exists);
|
|
alloc_check = gtid_exists;
|
|
} else {
|
|
/* Check for Commit. */
|
|
gtid_exists = check_gtid_commit(thd, gtid_exists);
|
|
alloc_check = gtid_exists;
|
|
}
|
|
/* Set transaction to persist GTID. This is one single point of decision
|
|
in prepare/commit and rollback. Once set, the GTID would be persisted
|
|
in undo and added to in memory GTID list to be written to gtid_executed
|
|
table later. */
|
|
trx->persists_gtid = gtid_exists;
|
|
return (alloc_check);
|
|
}
|
|
|
|
bool Clone_persist_gtid::check_gtid_prepare(THD *thd, trx_t *trx,
|
|
bool found_gtid, bool &alloc) {
|
|
/* Skip GTID if alloc checks have failed. */
|
|
if (!alloc) {
|
|
return (false);
|
|
}
|
|
alloc = false;
|
|
/* Skip GTID if mysql internal XA Prepare created by binlog. */
|
|
if (trx_is_mysql_xa(trx) || thd->is_one_phase_commit()) {
|
|
return (false);
|
|
}
|
|
auto thd_trx = thd->get_transaction();
|
|
auto xid_state = thd_trx->xid_state();
|
|
/* In permissive mode GTID could be assigned during XA commit/rollback. */
|
|
if (xid_state->has_state(XID_STATE::XA_IDLE) &&
|
|
(get_gtid_mode(GTID_MODE_LOCK_NONE) == GTID_MODE_ON_PERMISSIVE ||
|
|
get_gtid_mode(GTID_MODE_LOCK_NONE) == GTID_MODE_OFF_PERMISSIVE)) {
|
|
alloc = true;
|
|
}
|
|
/* Skip GTID if not set */
|
|
if (!found_gtid) {
|
|
return (false);
|
|
}
|
|
/* Skip GTID if External XA transaction is not in IDLE state. */
|
|
if (!xid_state->has_state(XID_STATE::XA_IDLE)) {
|
|
ut_ad(false);
|
|
return (false);
|
|
}
|
|
/* Skip if SE is not set to persist GTID.*/
|
|
if (!thd->se_persists_gtid()) {
|
|
return (false);
|
|
}
|
|
alloc = true;
|
|
return (true);
|
|
}
|
|
|
|
bool Clone_persist_gtid::check_gtid_commit(THD *thd, bool found_gtid) {
|
|
if (!found_gtid) {
|
|
return (false);
|
|
}
|
|
/* Persist if SE is set to persist GTID.*/
|
|
return (thd->se_persists_gtid());
|
|
}
|
|
|
|
bool Clone_persist_gtid::check_gtid_rollback(THD *thd, trx_t *trx,
|
|
bool found_gtid) {
|
|
if (!found_gtid) {
|
|
return (false);
|
|
}
|
|
/* Skip GTID if XA transaction not in prepared state. */
|
|
if (trx->state != TRX_STATE_PREPARED) {
|
|
return (false);
|
|
}
|
|
|
|
/* We don't need to persist GTID for binlog internal XA transaction.
|
|
One issue here is xid could be NULL when
|
|
1. External XA transaction is rolled back by XID
|
|
2. binlog internal XA transaction is rolled back during recovery
|
|
This is a side effect of trx_get_trx_by_xid() resetting the xid. We
|
|
cannot use trx_is_mysql_xa() to differentiate external XA transactions
|
|
in that case. However, it is safe to assume it is case (1) here as GTID
|
|
is never set for case (2) and input found_gtid should be false. */
|
|
if (!trx->xid->is_null() && trx_is_mysql_xa(trx)) {
|
|
return (false);
|
|
}
|
|
|
|
/* Skip GTID if it is rollback in error case. Ideally we should
|
|
not allow prepared transaction to be rolled back on error but
|
|
currently server/replication does rollback and has test for it. */
|
|
auto thd_trx = thd->get_transaction();
|
|
auto xid_state = thd_trx->xid_state();
|
|
if (xid_state->has_state(XID_STATE::XA_ROLLBACK_ONLY)) {
|
|
return (false);
|
|
}
|
|
|
|
/* Persist if SE is set to persist GTID.*/
|
|
return (thd->se_persists_gtid());
|
|
}
|
|
|
|
bool Clone_persist_gtid::has_gtid(trx_t *trx, THD *&thd, bool &passed_check) {
|
|
passed_check = false;
|
|
/* Transaction is not associated with mysql foreground session. */
|
|
if (trx->state == TRX_STATE_PREPARED && thd == nullptr) {
|
|
/* For XA transaction, the current transaction THD could be NULL. Also
|
|
check the default THD of current thread. */
|
|
thd = thd_get_current_thd();
|
|
}
|
|
/* Transaction should be associated with a THD session object. */
|
|
if (thd == nullptr) {
|
|
return (false);
|
|
}
|
|
/* Transaction is internal innodb transaction. */
|
|
if (trx->internal) {
|
|
return (false);
|
|
}
|
|
|
|
/* Transaction is updating GTID table implicitly. */
|
|
if (thd->is_operating_gtid_table_implicitly ||
|
|
thd->is_operating_substatement_implicitly ||
|
|
(thd->is_attachable_transaction_active() && !thd->is_autonomous_transaction())) {
|
|
return (false);
|
|
}
|
|
/* Transaction passed checks other than GTID. */
|
|
passed_check = true;
|
|
|
|
auto &trx_gtid = thd->owned_gtid;
|
|
/* Transaction is not assigned any GTID */
|
|
if (trx_gtid.is_empty() || trx_gtid.sidno == THD::OWNED_SIDNO_ANONYMOUS) {
|
|
return (false);
|
|
}
|
|
return (true);
|
|
}
|
|
|
|
void Clone_persist_gtid::get_gtid_info(trx_t *trx, Gtid_desc >id_desc) {
|
|
gtid_desc.m_is_set = false;
|
|
/* Check if transaction has GTID */
|
|
if (!trx->persists_gtid) {
|
|
return;
|
|
}
|
|
bool thd_check;
|
|
auto thd = trx->mysql_thd;
|
|
|
|
if (!has_gtid(trx, thd, thd_check)) {
|
|
ut_ad(false);
|
|
return;
|
|
}
|
|
|
|
gtid_desc.m_version = GTID_VERSION;
|
|
auto &trx_gtid = thd->owned_gtid;
|
|
auto &trx_sid = thd->owned_sid;
|
|
|
|
ut_ad(trx_gtid.sidno > 0);
|
|
ut_ad(trx_gtid.gno > 0);
|
|
|
|
/* Build GTID string. */
|
|
gtid_desc.m_info.fill(0);
|
|
auto char_buf = reinterpret_cast<char *>(>id_desc.m_info[0]);
|
|
auto len = trx_gtid.to_string(trx_sid, char_buf);
|
|
ut_a((size_t)len <= GTID_INFO_SIZE);
|
|
|
|
gtid_desc.m_is_set = true;
|
|
}
|
|
|
|
int Clone_persist_gtid::write_other_gtids() {
|
|
int err = 0;
|
|
if (opt_bin_log) {
|
|
err = gtid_state->save_gtids_of_last_binlog_into_table();
|
|
}
|
|
return (err);
|
|
}
|
|
|
|
bool Clone_persist_gtid::check_compress() {
|
|
/* Check local threshold on number of flush. */
|
|
if (m_compression_counter >= s_compression_threshold) {
|
|
return (true);
|
|
}
|
|
/* Check replication global threshold on number of GTIDs. */
|
|
if (!opt_bin_log && gtid_executed_compression_period != 0 &&
|
|
m_compression_gtid_counter > gtid_executed_compression_period) {
|
|
return (true);
|
|
}
|
|
/* Check for explicit flush request. */
|
|
if (m_explicit_request.load()) {
|
|
return (true);
|
|
}
|
|
return (false);
|
|
}
|
|
|
|
bool Clone_persist_gtid::debug_skip_write(bool compression) {
|
|
bool skip = false;
|
|
DBUG_EXECUTE_IF("simulate_flush_commit_error", { skip = true; });
|
|
DBUG_EXECUTE_IF("simulate_err_on_write_gtid_into_table", { skip = true; });
|
|
DBUG_EXECUTE_IF("disable_gtid_background_persister", { skip = true; });
|
|
if (compression) {
|
|
DBUG_EXECUTE_IF("dont_compress_gtid_table", { skip = true; });
|
|
}
|
|
return (skip);
|
|
}
|
|
|
|
int Clone_persist_gtid::write_to_table(uint64_t flush_list_number,
|
|
Gtid_set &table_gtid_set,
|
|
Sid_map &sid_map) {
|
|
int err = 0;
|
|
Gtid_set write_gtid_set(&sid_map, nullptr);
|
|
|
|
/* Allocate some intervals from stack */
|
|
static const int PREALLOCATED_INTERVAL_COUNT = 64;
|
|
Gtid_set::Interval iv[PREALLOCATED_INTERVAL_COUNT];
|
|
write_gtid_set.add_interval_memory(PREALLOCATED_INTERVAL_COUNT, iv);
|
|
|
|
auto &flush_list = get_list(flush_list_number);
|
|
/* Extract GTIDs from flush list. */
|
|
for (auto >id_info : flush_list) {
|
|
auto gtid_str = reinterpret_cast<const char *>(>id_info[0]);
|
|
auto status = write_gtid_set.add_gtid_text(gtid_str);
|
|
if (status != RETURN_STATUS_OK) {
|
|
err = ER_INTERNAL_ERROR;
|
|
return (err);
|
|
}
|
|
}
|
|
|
|
/* Skip call if error test. We don't want to catch this error here. */
|
|
if (debug_skip_write(false)) {
|
|
flush_list.clear();
|
|
ut_ad((m_flush_number + 1) == flush_list_number);
|
|
m_flush_number.store(flush_list_number);
|
|
return (0);
|
|
}
|
|
|
|
bool is_recovery = !m_thread_active.load();
|
|
if (is_recovery) {
|
|
/* During recovery, eliminate GTIDs already in gtid_executed table. */
|
|
write_gtid_set.remove_gtid_set(&table_gtid_set);
|
|
table_gtid_set.add_gtid_set(&write_gtid_set);
|
|
} else {
|
|
/* Handle concurrent write by other threads when binlog is enabled. */
|
|
gtid_state->update_prev_gtids(&write_gtid_set);
|
|
}
|
|
|
|
/* Write GTIDs to table. */
|
|
if (!write_gtid_set.is_empty()) {
|
|
++m_compression_counter;
|
|
err = gtid_table_persistor->save(&write_gtid_set, false);
|
|
}
|
|
|
|
/* Clear flush list and return */
|
|
flush_list.clear();
|
|
ut_ad((m_flush_number + 1) == flush_list_number);
|
|
m_flush_number.store(flush_list_number);
|
|
return (err);
|
|
}
|
|
|
|
void Clone_persist_gtid::update_gtid_trx_scn(scn_t new_gtid_trx_scn) {
|
|
auto trx_scn = m_gtid_trx_scn.load();
|
|
/* Noting to do if scn hasn't increased. */
|
|
if (trx_scn != lizard::SCN_NULL && trx_scn >= new_gtid_trx_scn) {
|
|
ut_ad(trx_scn == new_gtid_trx_scn);
|
|
return;
|
|
}
|
|
/* Update in memory variable. */
|
|
m_gtid_trx_scn.store(new_gtid_trx_scn);
|
|
|
|
/* Persist to disk. This would be useful during recovery. */
|
|
trx_sys_persist_gtid_scn(new_gtid_trx_scn);
|
|
|
|
/* Wake up purge thread. */
|
|
srv_purge_wakeup();
|
|
}
|
|
|
|
void Clone_persist_gtid::flush_gtids(THD *thd) {
|
|
int err = 0;
|
|
Sid_map sid_map(nullptr);
|
|
Gtid_set table_gtid_set(&sid_map, nullptr);
|
|
|
|
/* During recovery, fetch existing GTIDs from gtid_executed table. */
|
|
bool is_recovery = !m_thread_active.load();
|
|
if (is_recovery && !opt_initialize) {
|
|
gtid_table_persistor->fetch_gtids(&table_gtid_set);
|
|
}
|
|
|
|
bool explicit_request = m_explicit_request.load();
|
|
|
|
trx_sys_mutex_enter();
|
|
/* Get oldest scn that is yet to be committed. Any transaction
|
|
with lower transaction number is committed and is added to GTID list. */
|
|
|
|
scn_t oldest_trx_scn = lizard::lizard_sys_get_min_safe_scn();
|
|
|
|
bool compress_recovery = false;
|
|
/* Check and write if any GTID is accumulated. */
|
|
if (m_num_gtid_mem.load() != 0) {
|
|
m_flush_in_progress.store(true);
|
|
/* Switch active list and get the previous list to write to disk table. */
|
|
auto flush_list_number = switch_active_list();
|
|
/* Exit trx mutex during write to table. */
|
|
trx_sys_mutex_exit();
|
|
err = write_to_table(flush_list_number, table_gtid_set, sid_map);
|
|
m_flush_in_progress.store(false);
|
|
/* Compress always after recovery, if GTIDs are added. */
|
|
if (!m_thread_active.load()) {
|
|
compress_recovery = true;
|
|
ib::info(ER_IB_CLONE_GTID_PERSIST) << "GTID compression after recovery. ";
|
|
}
|
|
} else {
|
|
trx_sys_mutex_exit();
|
|
}
|
|
|
|
if (is_recovery) {
|
|
/* Allocate buffer and fill GTIDs */
|
|
char *gtid_buffer = nullptr;
|
|
auto gtid_buffer_size = table_gtid_set.to_string(>id_buffer);
|
|
/* Update GTID set to status for clone recovery. */
|
|
std::string all_gtids;
|
|
if (gtid_buffer_size > 0) {
|
|
all_gtids.assign(gtid_buffer);
|
|
}
|
|
/* Must update GITD status even if no GTID. This call completes
|
|
clone operation. */
|
|
clone_update_gtid_status(all_gtids);
|
|
my_free(gtid_buffer);
|
|
}
|
|
|
|
/* Update trx number upto which GTID is written to table. */
|
|
update_gtid_trx_scn(oldest_trx_scn);
|
|
|
|
/* Request Compression once the counter reaches threshold. */
|
|
bool debug_skip = debug_skip_write(true);
|
|
if (err == 0 && !debug_skip && (compress_recovery || check_compress())) {
|
|
m_compression_counter = 0;
|
|
m_compression_gtid_counter = 0;
|
|
/* Write non-innodb GTIDs before compression. */
|
|
if (svr_write_non_innodb_gtids)
|
|
write_other_gtids();
|
|
err = gtid_table_persistor->compress(thd);
|
|
}
|
|
if (err != 0) {
|
|
ib::error(ER_IB_CLONE_GTID_PERSIST) << "Error persisting GTIDs to table";
|
|
ut_ad(debug_skip);
|
|
}
|
|
|
|
/* Reset the explicit compression request, if our previous check
|
|
for explicit returned true. If the request is made after previous
|
|
check then we do the compression next time. */
|
|
if (explicit_request) {
|
|
m_explicit_request.store(false);
|
|
}
|
|
}
|
|
|
|
void Clone_persist_gtid::periodic_write() {
|
|
auto thd = create_thd(false, true, true, PSI_NOT_INSTRUMENTED);
|
|
|
|
m_thread_id = thd_get_thread_id(thd);
|
|
|
|
/* Allow GTID to be persisted on read only server. */
|
|
thd->set_skip_readonly_check();
|
|
|
|
/* Write all accumulated GTIDs while starting server. These GTIDs
|
|
are found in undo log during recovery. We must make sure all these
|
|
GTIDs are flushed and on disk before server is open for new operation
|
|
and new GTIDs are generated.
|
|
|
|
Why is it needed ?
|
|
|
|
1. mysql.gtid_executed table must be up to date at this point as global
|
|
variable gtid_executed is updated from it when binary log is disabled.
|
|
|
|
2. In older versions we used to have only one GTID storage in undo log
|
|
and PREAPARE GTID was stored in same place as COMMIT GTID. We used to
|
|
wait for PREPARE GTID to flush before writing commit GTID. Now this
|
|
limitation is removed and we no longer wait for PREPARE GTID to get
|
|
flushed before COMMIT as we store the PREPARE GTID in separate location.
|
|
However, while upgrading from previous version, there could be XA
|
|
transaction in PREPARED state with GTID stored in place of commit GTID.
|
|
Those GTIDs are also flushed here so that they are not overwritten later
|
|
at COMMIT.
|
|
*/
|
|
flush_gtids(thd);
|
|
|
|
/* Let the caller wait till first set of GTIDs are persisted to table
|
|
after recovery. */
|
|
m_thread_active.store(true);
|
|
|
|
while (!m_close_thread.load()) {
|
|
/* Exit if last phase of shutdown */
|
|
auto is_shutdown = (srv_shutdown_state != SRV_SHUTDOWN_NONE);
|
|
|
|
if (is_shutdown) {
|
|
/* Stop accepting any more GTID */
|
|
m_active.store(false);
|
|
|
|
/* Exit immediately if fast shutdown. */
|
|
if (srv_fast_shutdown == 2) {
|
|
break;
|
|
}
|
|
/* For slow shutdown, consume all GTIDs so that undo can be purged. */
|
|
if (m_num_gtid_mem.load() == 0) {
|
|
break;
|
|
}
|
|
}
|
|
if (!flush_immediate()) {
|
|
os_event_wait_time(m_event, s_time_threshold_ms * 1000);
|
|
}
|
|
os_event_reset(m_event);
|
|
/* Write accumulated GTIDs to disk table */
|
|
flush_gtids(thd);
|
|
}
|
|
m_active.store(false);
|
|
destroy_thd(thd);
|
|
m_thread_active.store(false);
|
|
}
|
|
|
|
bool Clone_persist_gtid::wait_thread(bool start, bool wait_flush,
|
|
uint64_t flush_number, bool compress,
|
|
bool early_timeout, Clone_Alert_Func cbk) {
|
|
size_t count = 0;
|
|
|
|
auto wait_cond = [&](bool alert, bool &result) {
|
|
if (wait_flush) {
|
|
/* If the thread is not active, return. */
|
|
if (!is_thread_active()) {
|
|
result = false;
|
|
return (0);
|
|
}
|
|
/* If it is flushed upto the point requested. */
|
|
if (check_flushed(flush_number)) {
|
|
/* Check if compression is done if requested. */
|
|
if (!compress || !m_explicit_request.load()) {
|
|
result = false;
|
|
return (0);
|
|
}
|
|
}
|
|
} else if (is_thread_active() == start) {
|
|
result = false;
|
|
return (0);
|
|
}
|
|
if (is_thread_active()) {
|
|
os_event_set(m_event);
|
|
}
|
|
result = true;
|
|
if (alert) {
|
|
ib::info(ER_IB_CLONE_TIMEOUT) << "Waiting for Clone GTID thread";
|
|
if (cbk) {
|
|
auto err = cbk();
|
|
if (err != 0) {
|
|
return (err);
|
|
}
|
|
}
|
|
}
|
|
|
|
++count;
|
|
/* Force early exit from wait loop. */
|
|
if (early_timeout && count > 1000) {
|
|
return (ER_QUERY_TIMEOUT);
|
|
}
|
|
return (0);
|
|
};
|
|
|
|
bool is_timeout = false;
|
|
|
|
/* Sleep for 1 millisecond */
|
|
Clone_Msec sleep_time(1);
|
|
/* Generate alert message every 5 seconds. */
|
|
Clone_Sec alert_interval(5);
|
|
/* Wait for 5 minutes. */
|
|
Clone_Sec time_out(Clone_Min(5));
|
|
|
|
static_cast<void>(Clone_Sys::wait(sleep_time, time_out, alert_interval,
|
|
wait_cond, nullptr, is_timeout));
|
|
return (!is_timeout);
|
|
}
|
|
|
|
/** Persist GTID to on disk table from time to time.
|
|
@param[in,out] persist_gtid GTID persister */
|
|
static void clone_gtid_thread(Clone_persist_gtid *persist_gtid) {
|
|
persist_gtid->periodic_write();
|
|
}
|
|
|
|
bool Clone_persist_gtid::start() {
|
|
if (m_thread_active.load()) {
|
|
m_active.store(true);
|
|
return (true);
|
|
}
|
|
|
|
srv_threads.m_gtid_persister =
|
|
os_thread_create(clone_gtid_thread_key, clone_gtid_thread, this);
|
|
srv_threads.m_gtid_persister.start();
|
|
|
|
if (!wait_thread(true, false, 0, false, false, nullptr)) {
|
|
ib::error(ER_IB_CLONE_TIMEOUT) << "Wait for GTID thread to start timed out";
|
|
ut_ad(false);
|
|
return (false);
|
|
}
|
|
m_active.store(true);
|
|
return (true);
|
|
}
|
|
|
|
void Clone_persist_gtid::stop() {
|
|
m_close_thread.store(true);
|
|
if (m_thread_active.load() &&
|
|
!wait_thread(false, false, 0, false, false, nullptr)) {
|
|
ib::error(ER_IB_CLONE_TIMEOUT) << "Wait for GTID thread to stop timed out";
|
|
ut_ad(false);
|
|
}
|
|
}
|
|
|
|
void Clone_persist_gtid::flush_if_implicit_gtid(THD *thd) {
|
|
/* Avoid recursive wait, when we are writing GTID. */
|
|
if (thd == nullptr || m_thread_id == thd_get_thread_id(thd)) {
|
|
return;
|
|
}
|
|
if (thd->is_operating_gtid_table_implicitly) {
|
|
wait_flush(false, false, false, nullptr);
|
|
}
|
|
}
|
|
|
|
void Clone_persist_gtid::wait_flush(bool wait, bool compress_gtid,
|
|
bool early_timeout, Clone_Alert_Func cbk) {
|
|
/* During recovery, avoid wait if called before persister is active. */
|
|
if (!is_thread_active()) {
|
|
return;
|
|
}
|
|
auto request_number = request_immediate_flush(compress_gtid);
|
|
os_event_set(m_event);
|
|
|
|
if (!wait) {
|
|
return;
|
|
}
|
|
|
|
/* For RESET MASTER we must wait for the flush. */
|
|
auto thd = thd_get_current_thd();
|
|
if (thd != nullptr && thd->is_log_reset()) {
|
|
early_timeout = false;
|
|
}
|
|
|
|
/* Wait for flush if test is asking for it. */
|
|
DBUG_EXECUTE_IF("wait_for_flush_gtid_persister", { early_timeout = false; });
|
|
|
|
auto success = wait_thread(false, true, request_number, compress_gtid,
|
|
early_timeout, cbk);
|
|
/* No error for early timeout. */
|
|
if (!success && !early_timeout) {
|
|
ib::error(ER_IB_CLONE_TIMEOUT) << "Wait for GTID thread to flush timed out";
|
|
ut_ad(false);
|
|
}
|
|
}
|
|
|
|
void Clone_persist_gtid::add_undo_gtids(const Gtid_desc >id_desc) {
|
|
/* Check if valid descriptor. */
|
|
if (!gtid_desc.m_is_set) {
|
|
return;
|
|
}
|
|
/* Check if GTID persistence is active. */
|
|
if (gtid_table_persistor == nullptr) {
|
|
return;
|
|
}
|
|
/* Add input GTID to the set */
|
|
m_undo_gtids_list.push_back(gtid_desc.m_info);
|
|
}
|
|
|
|
void Clone_persist_gtid::print_undo_gtids() {
|
|
Sid_map sid_map(nullptr);
|
|
Gtid_set print_gtid_set(&sid_map, nullptr);
|
|
|
|
for (auto >id_info : m_undo_gtids_list) {
|
|
auto gtid_str = reinterpret_cast<const char *>(>id_info[0]);
|
|
auto status = print_gtid_set.add_gtid_text(gtid_str);
|
|
if (status != RETURN_STATUS_OK) {
|
|
m_undo_gtids_list.clear();
|
|
return ;
|
|
}
|
|
}
|
|
|
|
m_undo_gtids_list.clear();
|
|
|
|
char *gtid_buf = nullptr;
|
|
if (print_gtid_set.to_string(>id_buf, false) == -1) {
|
|
return;
|
|
}
|
|
ib::warn() << " [GTID INFO] Reading from undo log :" << gtid_buf;
|
|
my_free(gtid_buf);
|
|
}
|