/* Copyright (c) 2018, 2021, Alibaba and/or its affiliates. All rights reserved. This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License, version 2.0, as published by the Free Software Foundation. This program is also distributed with certain software (including but not limited to OpenSSL) that is licensed under separate terms, as designated in a particular file or component or in included license documentation. The authors of MySQL/PolarDB-X Engine hereby grant you an additional permission to link the program and your derivative works with the separately licensed software that they have included with MySQL/PolarDB-X Engine. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License, version 2.0, for more details. You should have received a copy of the GNU General Public License along with this program; if not, write to the Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */ #include "sql/binlog.h" #include "sql/consensus_log_manager.h" #include "sql/bl_consensus_log.h" #include "consensus_recovery_manager.h" #include "my_config.h" #include #include #include #include #include #include "my_dbug.h" #include "mysql/components/services/log_builtins.h" #include "sql/sql_lex.h" #include "sql/debug_sync.h" // DEBUG_SYNC #include "sql/log.h" #include #include "sql/log_event_ext.h" #include "replica_read_manager.h" static void correct_binlog_event_log_pos(char *buf, size_t buf_len, ulonglong offset) { if (!opt_consensuslog_revise) return; // calc correct end_log_pos ulonglong end_log_pos= my_b_safe_tell(mysql_bin_log.get_binlog_file()->get_io_cache()); end_log_pos+= offset + buf_len; // inc the length of consensus_log_event end_log_pos+= Consensus_log_event::MAX_EVENT_LENGTH; if (binlog_checksum_options != binary_log::BINLOG_CHECKSUM_ALG_OFF) end_log_pos+= 4; // checksum length // correct end_log_pos in ev->temp_buf int4store(buf + LOG_POS_OFFSET, end_log_pos); // update checksum field because we modify the event's content if (binlog_checksum_options != binary_log::BINLOG_CHECKSUM_ALG_OFF) { uint32_t crc= checksum_crc32(0L, NULL, 0); crc= checksum_crc32(crc, (const unsigned char *)buf, buf_len - BINLOG_CHECKSUM_LEN); int4store(buf + buf_len - BINLOG_CHECKSUM_LEN, crc); } } int large_event_flush(THD *thd, uchar *buffer, ulonglong total_size, Log_event *ev, ulonglong &total_batch_size) { int error = 0; uint flag = 0; uint64 start_pos = 0 , end_pos = opt_consensus_large_event_split_size > ev->buf_len? ev->buf_len: opt_consensus_large_event_split_size; std::string log_content = consensus_log_manager.get_empty_log(); while(start_pos < ev->buf_len) { uint64 blen = end_pos - start_pos; memcpy(buffer, ev->temp_buf + start_pos, blen); total_batch_size += blen; if (total_batch_size == total_size) flag = Consensus_log_event_flag::FLAG_LARGE_TRX_END; else flag = Consensus_log_event_flag::FLAG_LARGE_TRX; if (end_pos == ev->buf_len) flag |= Consensus_log_event_flag::FLAG_BLOB_END; else if (start_pos == 0) { consensus_log_manager.get_fifo_cache_manager()->set_lock_blob_index(consensus_log_manager.get_current_index()); flag |= (Consensus_log_event_flag::FLAG_BLOB | Consensus_log_event_flag::FLAG_BLOB_START); } else flag |= Consensus_log_event_flag::FLAG_BLOB; thd->consensus_index = consensus_log_manager.get_current_index(); sql_print_information("Large event batch_size %llu, total_batch_size %llu, original buf_size %llu, consensus_index %llu.", blen, total_batch_size, total_size, thd->consensus_index); uint64 crc32 = opt_consensus_checksum? checksum_crc32(0, buffer, blen): 0; if (end_pos == ev->buf_len) { correct_binlog_event_log_pos(ev->temp_buf, ev->buf_len, 0); error = mysql_bin_log.write_consensus_log(flag, thd->consensus_term, ev->buf_len, crc32); } else { correct_binlog_event_log_pos((char *)log_content.data(), log_content.length(), 0); error = mysql_bin_log.write_consensus_log(flag, thd->consensus_term, log_content.length(), crc32); } if (!error) { consensus_log_manager.get_fifo_cache_manager()->add_log_to_cache(thd->consensus_term, thd->consensus_index, blen, buffer, FALSE, flag, crc32); if (end_pos == ev->buf_len) { error = mysql_bin_log.write_buf_to_log_file((uchar *)ev->temp_buf, ev->buf_len); consensus_log_manager.get_fifo_cache_manager()->set_lock_blob_index(0); } else error = mysql_bin_log.write_buf_to_log_file((uchar *)log_content.data(), log_content.length()); if (!error) { error = mysql_bin_log.flush_and_sync(false); consensus_log_manager.set_sync_index_if_greater(thd->consensus_index); alisql_server->writeLogDoneInternal(thd->consensus_index, true); } } DBUG_EXECUTE_IF("crash_during_large_event_binlog_flush", { static int ccnt = 0; ccnt++; if (ccnt > 1) { DBUG_SUICIDE(); } }); DBUG_EXECUTE_IF("crash_during_large_event_binlog_flush_slow", { /* let follower get the log */ static int ccnt = 0; ccnt++; if (ccnt > 1) { sleep(2); DBUG_SUICIDE(); } }); start_pos = end_pos; end_pos = end_pos + opt_consensus_large_event_split_size > ev->buf_len? ev->buf_len: end_pos + opt_consensus_large_event_split_size; } return error; } int large_trx_flush(THD *thd, uchar *buffer, ulonglong total_size) { int error = 0; Log_event *ev = NULL; uint flag = 0; bool force_batch = false; uint64 crc32 = 0; Format_description_log_event fd_ev; IO_CACHE *consensus_log = consensus_log_manager.get_cache(); ulonglong batch_size = 0 , total_batch_size = 0; // init consensus_log for read reinit_io_cache(consensus_log, READ_CACHE, 0, 0, 0); Binlog_read_error m_error; Write_cache_istream consensus_log_istream(consensus_log); Binlog_event_data_istream consensus_log_data_istream(&m_error, &consensus_log_istream, UINT_MAX); Binlog_event_object_istream consensus_log_event_istream(&m_error, &consensus_log_data_istream); Default_binlog_event_allocator default_alloc; /* * the max size of buffer is opt_consensus_max_log_size * do not use more than this restriction */ while(!error && (ev = consensus_log_event_istream.read_event_object(fd_ev, opt_master_verify_checksum, &default_alloc)) != NULL) { if (ev->buf_len > opt_consensus_max_log_size || DBUG_EVALUATE_IF("force_large_trx_single_ev", 1, 0)) force_batch = true; /* current ev is large event */ else force_batch = false; /* * When should we batch a consensus log? * 1. current buffer must not empty * 2. current buffer size is larger than opt_consensus_large_trx_split_size * 3. current buffer size is ok, but if we add next event, the buffer will overflow * 4. force do a batch: large event OR for debug */ if (batch_size > 0 && (batch_size > opt_consensus_large_trx_split_size || batch_size + ev-> buf_len > opt_consensus_max_log_size || force_batch)) { total_batch_size += batch_size; flag = Consensus_log_event_flag::FLAG_LARGE_TRX; DBUG_ASSERT(total_batch_size < total_size); thd->consensus_index = consensus_log_manager.get_current_index(); sql_print_information("Large trx batch_size %llu, total_batch_size %llu, original buf_size %llu, consensus_index %llu.",batch_size, total_batch_size, total_size, thd->consensus_index); crc32 = opt_consensus_checksum? checksum_crc32(0, buffer, batch_size ): 0; error = mysql_bin_log.write_consensus_log(flag, thd->consensus_term, batch_size, crc32); /* inc current index inside */ if (!error) { consensus_log_manager.get_fifo_cache_manager()->add_log_to_cache(thd->consensus_term, thd->consensus_index, batch_size, buffer, FALSE, flag, crc32); DBUG_EXECUTE_IF("crash_during_large_trx_binlog_flush2", { static int ccnt = 0; ccnt++; if (ccnt > 1) { // force miss 1 byte and then crash mysql_bin_log.write_buf_to_log_file(buffer, batch_size - 1); mysql_bin_log.flush_and_sync(true); DBUG_SUICIDE(); } }); error = mysql_bin_log.write_buf_to_log_file(buffer, batch_size); /* for large trx, sync directly after flush for performance */ if (!error) { error = mysql_bin_log.flush_and_sync(false); consensus_log_manager.set_sync_index_if_greater(thd->consensus_index); /* use lockless writeLogDone */ alisql_server->writeLogDoneInternal(thd->consensus_index, true); } } batch_size = 0; DEBUG_SYNC(thd, "large_trx_sync_part"); DBUG_EXECUTE_IF("crash_during_large_trx_binlog_flush", { static int ccnt = 0; ccnt++; if (ccnt > 1) { DBUG_SUICIDE(); } }); DBUG_EXECUTE_IF("crash_during_large_trx_binlog_flush_slow", { /* let follower get the log */ static int ccnt = 0; ccnt++; if (ccnt > 1) { sleep(2); DBUG_SUICIDE(); } }); } if (ev->buf_len > opt_consensus_max_log_size) { /* current ev is large event */ DBUG_ASSERT(batch_size == 0); error = large_event_flush(thd, buffer, total_size, ev, total_batch_size); delete ev; } else { correct_binlog_event_log_pos(ev->temp_buf, ev->buf_len, batch_size); memcpy(buffer + batch_size, ev->temp_buf, ev->buf_len); batch_size += ev->buf_len; delete ev; } } /* deal with remained buffer */ if (batch_size > 0) { total_batch_size += batch_size; flag = Consensus_log_event_flag::FLAG_LARGE_TRX_END; thd->consensus_index = consensus_log_manager.get_current_index(); sql_print_information("Large trx batch_size %llu, total_batch_size %llu, original buf_size %llu, consensus_index %llu.",batch_size, total_batch_size, total_size, thd->consensus_index); DBUG_ASSERT(total_batch_size == total_size && batch_size > 0); crc32 = opt_consensus_checksum? checksum_crc32(0, buffer, batch_size ): 0; error = mysql_bin_log.write_consensus_log(flag, thd->consensus_term, batch_size, crc32); /* inc current index inside */ if (!error) { consensus_log_manager.get_fifo_cache_manager()->add_log_to_cache(thd->consensus_term, thd->consensus_index, batch_size, buffer, FALSE, flag, crc32); error = mysql_bin_log.write_buf_to_log_file(buffer, batch_size); } } /* set consensus cache_log back to write cache */ reinit_io_cache(consensus_log, WRITE_CACHE, 0, 0, 1); return error; } bool MYSQL_BIN_LOG::open_for_normandy( #ifdef HAVE_PSI_INTERFACE PSI_file_key log_file_key, #endif const char *log_name, const char *new_name __attribute__((unused))) { DBUG_ENTER("MYSQL_BIN_LOG::open"); bool ret = false; my_off_t file_off = 0; write_error = 0; myf flags = MY_WME | MY_NABP | MY_WAIT_IF_FULL; // xpaxos threads have no THD and can't report the WAITING_FULL state // TODO: add THD to xpaxos threads if (is_relay_log && !is_xpaxos_log) flags = flags | MY_REPORT_WAITING_IF_FULL; if (!(name = my_strdup(key_memory_MYSQL_LOG_name, log_name, MYF(MY_WME)))) { name = (char *)log_name; // for the error message goto err; } // if (init_and_set_log_file_name(name, new_name, new_index_number) || // DBUG_EVALUATE_IF("fault_injection_init_name", 1, 0)) // goto err; db[0] = 0; /* Keep the key for reopen */ m_log_file_key = log_file_key; /* LOCK_sync guarantees that no thread is calling m_binlog_file to sync data to disk when another thread is opening the new file (FLUSH LOG or RESET MASTER). */ if (!is_relay_log) mysql_mutex_lock(&LOCK_sync); // avoid open for new binlog assert fail m_binlog_file->close(); ret = m_binlog_file->open(log_file_key, log_file_name, flags); if (ret) { LogErr(ERROR_LEVEL, ER_BINLOG_CANT_OPEN_FOR_LOGGING, name, errno); goto err; } // write from the end file_off = my_seek(m_binlog_file->get_io_cache()->file, 0L, MY_SEEK_END, MYF(MY_WME + MY_FAE)); m_binlog_file->truncate(file_off); if (!is_relay_log) mysql_mutex_unlock(&LOCK_sync); atomic_log_state = LOG_OPENED; DBUG_RETURN(0); err: if (binlog_error_action == ABORT_SERVER) { exec_binlog_error_action_abort( "Either disk is full, file system is read only or " "there was an encryption error while opening the binlog. " "Aborting the server."); } else LogErr(ERROR_LEVEL, ER_BINLOG_CANT_OPEN_FOR_LOGGING, name, errno); my_free(name); name = NULL; atomic_log_state = LOG_CLOSED; DBUG_RETURN(1); } int MYSQL_BIN_LOG::build_consensus_log_index() { LOG_INFO log_info; int error = 1; std::vector consensuslog_file_name_vector; mysql_mutex_lock(&LOCK_index); // find last log name according to the binlog index if (!my_b_inited(&index_file)) { mysql_mutex_unlock(&LOCK_index); sql_print_error("build consenus log index failed, can't init index file"); return 1; } if ((error = find_log_pos(&log_info, NullS, false))) { if (error != LOG_INFO_EOF ) { sql_print_error("find_log_pos() failed (error: %d)", error); mysql_mutex_unlock(&LOCK_index); return error; } } if (error == 0) { do { consensuslog_file_name_vector.push_back(log_info.log_file_name); } while (!(error = find_next_log(&log_info, false/*need_lock_index=true*/))); } if (error != LOG_INFO_EOF) { sql_print_error("find_log_pos() failed (error: %d)", error); mysql_mutex_unlock(&LOCK_index); return error; } else { error = 0; } mysql_mutex_unlock(&LOCK_index); if (error) return error; for (auto iter = consensuslog_file_name_vector.begin(); iter != consensuslog_file_name_vector.end(); ++iter) { // const char *errmsg = NULL; //IO_CACHE log; //File file = open_binlog_file(&log, iter->c_str(), &errmsg); //if (file < 0) //return 1; Binlog_file_reader binlog_file_reader(opt_master_verify_checksum); if (binlog_file_reader.open(iter->c_str())) return 1; Format_description_log_event fd_ev; Format_description_log_event *fd_ev_p = &fd_ev; // my_b_seek(&log, BIN_LOG_HEADER_SIZE); binlog_file_reader.seek(BIN_LOG_HEADER_SIZE); binlog_file_reader.set_format_description_event(*fd_ev_p); Log_event *ev = NULL; Previous_consensus_index_log_event *prev_consensus_index_ev = NULL; bool find_prev_consensus_log = FALSE; // while (!find_prev_consensus_log && ((ev = Log_event::read_log_event(&log, 0, fd_ev_p, 1)) != NULL)) while (!find_prev_consensus_log && (ev = binlog_file_reader.read_event_object()) != NULL) { switch (ev->get_type_code()) { case binary_log::FORMAT_DESCRIPTION_EVENT: if (fd_ev_p != &fd_ev) delete fd_ev_p; fd_ev_p = (Format_description_log_event*)ev; binlog_file_reader.set_format_description_event(*fd_ev_p); break; case binary_log::PREVIOUS_CONSENSUS_INDEX_LOG_EVENT: prev_consensus_index_ev = (Previous_consensus_index_log_event*)ev; consensus_log_manager.get_log_file_index()->add_to_index_list(prev_consensus_index_ev->get_index(), prev_consensus_index_ev->common_header->when.tv_sec, *iter); find_prev_consensus_log = TRUE; break; default: break; } if (ev != NULL && ev != fd_ev_p) delete ev, ev = NULL; } if (fd_ev_p != &fd_ev) { delete fd_ev_p; fd_ev_p = &fd_ev; } // mysql_file_close(file, MYF(MY_WME)); // end_io_cache(&log); if (!find_prev_consensus_log) { sql_print_error("log file %s do not contain prev_consensus_log_ev", iter->c_str()); return 1; } } return 0; } int MYSQL_BIN_LOG::init_last_index_of_term(uint64 term) { LOG_INFO log_info; bool found = FALSE; std::vector consensuslog_file_name_vector; get_consensus_log_file_list(consensuslog_file_name_vector); for (auto iter = consensuslog_file_name_vector.rbegin(); !found && iter != consensuslog_file_name_vector.rend(); ++iter) { uint64 current_term = 0; uint64 current_index = 0; uint64 current_flag = 0; // const char *errmsg = NULL; // IO_CACHE log; // File file = open_binlog_file(&log, iter->c_str(), &errmsg); // if (file < 0) //return 1; Binlog_file_reader binlog_file_reader(opt_master_verify_checksum); if (binlog_file_reader.open(iter->c_str())) return 1; Format_description_log_event fd_ev; Format_description_log_event *fd_ev_p = &fd_ev; // my_b_seek(&log, BIN_LOG_HEADER_SIZE); binlog_file_reader.seek(BIN_LOG_HEADER_SIZE); binlog_file_reader.set_format_description_event(*fd_ev_p); Log_event *ev = NULL; Consensus_log_event *consensus_log_ev = NULL; bool skip = FALSE; // while (!skip && ((ev = Log_event::read_log_event(&log, 0, fd_ev_p, 1)) != NULL)) while (!skip && (ev = binlog_file_reader.read_event_object()) != NULL) { switch (ev->get_type_code()) { case binary_log::FORMAT_DESCRIPTION_EVENT: if (fd_ev_p != &fd_ev) delete fd_ev_p; fd_ev_p = (Format_description_log_event*)ev; binlog_file_reader.set_format_description_event(*fd_ev_p); break; case binary_log::CONSENSUS_LOG_EVENT: consensus_log_ev = (Consensus_log_event*)ev; current_term = consensus_log_ev->get_term(); current_index = consensus_log_ev->get_index(); current_flag = consensus_log_ev->get_flag(); // if find larger term, skip current file to previous one if (current_term > term) { skip = TRUE; } if (current_term <= term && !(current_flag & Consensus_log_event_flag::FLAG_LARGE_TRX)) { consensus_log_manager.get_recovery_manager()->set_last_leader_term_index(current_index); found = TRUE; } break; default: break; } if (ev != NULL && ev != fd_ev_p) delete ev, ev = NULL; } if (fd_ev_p != &fd_ev) { delete fd_ev_p; fd_ev_p = &fd_ev; } // mysql_file_close(file, MYF(MY_WME)); // end_io_cache(&log); if (!found) { sql_print_warning("log file %s cannot found last log term index, term is %llu.", iter->c_str(), term); } } sql_print_warning("last log term is %llu, last log term index is %llu.", term, consensus_log_manager.get_recovery_manager()->get_last_leader_term_index()); return !found; } int MYSQL_BIN_LOG::get_consensus_log_file_list(std::vector & consensuslog_file_name_vector) { consensus_log_manager.get_log_file_index()->get_log_file_list(consensuslog_file_name_vector); return 0; } int MYSQL_BIN_LOG::find_log_by_consensus_index(uint64 consensus_index, std::string & file_name) { return consensus_log_manager.get_log_file_index()->get_log_file_from_index(consensus_index, file_name); } uint64 MYSQL_BIN_LOG::get_trx_end_index(uint64 firstIndex) { std::string file_name; // use another io_cache , so do not need lock LOCK_log if (find_log_by_consensus_index(firstIndex, file_name)) { sql_print_error("get_trx_end_index cannot find consensus index log %llu.", firstIndex); return 0; } // const char *errmsg = NULL; // IO_CACHE log; // File file = open_binlog_file(&log, file_name.c_str(), &errmsg); // if (file < 0) { //return 0; //} Binlog_file_reader binlog_file_reader(opt_master_verify_checksum); if (binlog_file_reader.open(file_name.c_str())) return 0; // ?????? Format_description_log_event fd_ev; Format_description_log_event *fd_ev_p = &fd_ev; // my_b_seek(&log, BIN_LOG_HEADER_SIZE); binlog_file_reader.seek(BIN_LOG_HEADER_SIZE); binlog_file_reader.set_format_description_event(*fd_ev_p); Log_event *ev = NULL; Consensus_log_event *consensus_log_ev = NULL; bool stop_scan = FALSE; uint64 currentIndex = 0; uint64 currentFlag = 0; // while (!stop_scan && (ev = Log_event::read_log_event(&log, 0, fd_ev_p, 1)) != NULL) while (!stop_scan && (ev = binlog_file_reader.read_event_object()) != NULL) { switch (ev->get_type_code()) { case binary_log::CONSENSUS_LOG_EVENT: consensus_log_ev = (Consensus_log_event*)ev; currentIndex = consensus_log_ev->get_index(); currentFlag = consensus_log_ev->get_flag(); if (firstIndex <= currentIndex && !(currentFlag & Consensus_log_event_flag::FLAG_LARGE_TRX)) stop_scan = true; break; case binary_log::FORMAT_DESCRIPTION_EVENT: if (fd_ev_p != &fd_ev) delete fd_ev_p; fd_ev_p = (Format_description_log_event*)ev; binlog_file_reader.set_format_description_event(*fd_ev_p); break; default: break; } if (ev != NULL && ev != fd_ev_p) delete ev; } if (fd_ev_p != &fd_ev) { delete fd_ev_p; fd_ev_p = &fd_ev; } // mysql_file_close(file, MYF(MY_WME)); // end_io_cache(&log); return stop_scan? currentIndex: 0; } // int MYSQL_BIN_LOG::fetch_binlog_by_offset(IO_CACHE *log, uint64 start_pos, uint64 end_pos, Consensus_cluster_info_log_event *rci_ev, std::string& log_content) int MYSQL_BIN_LOG::fetch_binlog_by_offset(Binlog_file_reader &binlog_file_reader, uint64 start_pos, uint64 end_pos, Consensus_cluster_info_log_event *rci_ev, std::string& log_content) { if (start_pos == end_pos) { log_content.assign(""); return 0; } if (rci_ev == NULL) { unsigned int buf_size = end_pos - start_pos; uchar* buffer = (uchar*)my_malloc(key_memory_thd_main_mem_root, buf_size, MYF(MY_WME)); // my_b_seek(log, start_pos); binlog_file_reader.seek(start_pos); // my_b_read(log, buffer, buf_size); // binlog_file_reader.read_event_data(&buffer, &(buf_size)); my_b_read(binlog_file_reader.get_io_cache(), buffer, buf_size); log_content.assign((char *)buffer, buf_size); my_free(buffer); } else { log_content.assign(rci_ev->get_info(), (size_t)rci_ev->get_info_length()); } return 0; } int MYSQL_BIN_LOG::read_log_by_consensus_index(const char* file_name, uint64 consensus_index, uint64 *consensus_term, std::string& log_content, bool *outer, uint *flag, uint64 *checksum, bool need_content) { // const char *errmsg = NULL; // IO_CACHE log; // File file = open_binlog_file(&log, file_name, &errmsg); // if (file < 0) { //return 1; //} Binlog_file_reader binlog_file_reader(opt_master_verify_checksum); if (binlog_file_reader.open(file_name)) return 1; Format_description_log_event fd_ev; Format_description_log_event *fd_ev_p = &fd_ev; // my_b_seek(&log, BIN_LOG_HEADER_SIZE); binlog_file_reader.seek(BIN_LOG_HEADER_SIZE); binlog_file_reader.set_format_description_event(*fd_ev_p); Log_event *ev = NULL; Consensus_cluster_info_log_event *rci_ev = NULL; Consensus_log_event *consensus_log_ev = NULL; bool found = FALSE; bool stop_scan = FALSE; bool in_transaction = FALSE; uint64 start_pos = my_b_tell(binlog_file_reader.get_io_cache()); uint64 end_pos = start_pos; uint64 consensus_log_length = 0; uint64 cindex, cterm, cflag, ccrc32; std::vector blob_index_list; std::vector blob_term_list; std::vector blob_flag_list; std::vector blob_crc32_list; // while (!stop_scan && (ev = Log_event::read_log_event(&log, 0, fd_ev_p, 1)) != NULL) while (!stop_scan && (ev = binlog_file_reader.read_event_object()) != NULL) { switch (ev->get_type_code()) { case binary_log::CONSENSUS_LOG_EVENT: consensus_log_ev = (Consensus_log_event*)ev; cindex = consensus_log_ev->get_index(); cterm = consensus_log_ev->get_term(); cflag = consensus_log_ev->get_flag(); ccrc32 = consensus_log_ev->get_reserve(); consensus_log_length = consensus_log_ev->get_length(); end_pos = start_pos = my_b_tell(binlog_file_reader.get_io_cache()); if (consensus_index == cindex) { found = TRUE; *consensus_term = cterm; *flag = cflag; *checksum = ccrc32; } else if (!found && consensus_log_ev->get_index() > consensus_index) { sql_print_information("directly read log error, log size is error"); abort(); } break; case binary_log::FORMAT_DESCRIPTION_EVENT: if (fd_ev_p != &fd_ev) delete fd_ev_p; fd_ev_p = (Format_description_log_event*)ev; binlog_file_reader.set_format_description_event(*fd_ev_p); break; default: if (!ev->is_control_event()) { end_pos = my_b_tell(binlog_file_reader.get_io_cache()); if (ev->get_type_code() == binary_log::CONSENSUS_CLUSTER_INFO_EVENT && found) { rci_ev = (Consensus_cluster_info_log_event*)ev; } if (end_pos > start_pos && end_pos - start_pos == consensus_log_length) { if (need_content && (cflag & Consensus_log_event_flag::FLAG_BLOB)) { blob_index_list.push_back(cindex); blob_term_list.push_back(cterm); blob_flag_list.push_back(cflag); blob_crc32_list.push_back(ccrc32); } else if (need_content && (cflag & Consensus_log_event_flag::FLAG_BLOB_END)) { blob_index_list.push_back(cindex); blob_term_list.push_back(cterm); blob_flag_list.push_back(cflag); blob_crc32_list.push_back(ccrc32); if (found) { DBUG_ASSERT(consensus_index >= blob_index_list[0] && consensus_index <= cindex); /* It means the required index is between a blob event */ uint64 split_len = opt_consensus_large_event_split_size; uint64 blob_start_pos = start_pos, blob_end_pos = start_pos + split_len; for (size_t i=0; i end_pos? end_pos: blob_end_pos + split_len; } } blob_index_list.clear(); blob_term_list.clear(); blob_flag_list.clear(); blob_crc32_list.clear(); } else { if (found) { if (need_content || rci_ev != NULL) // fetch_binlog_by_offset(&log, start_pos, end_pos, rci_ev, log_content); fetch_binlog_by_offset(binlog_file_reader, start_pos, end_pos, rci_ev, log_content); *outer = (rci_ev != NULL); end_pos = start_pos = my_b_tell(binlog_file_reader.get_io_cache()); stop_scan = TRUE; rci_ev = NULL; } } } } break; } if (ev != NULL && ev != fd_ev_p) delete ev; } // if scan to end of file if (end_pos > start_pos && !in_transaction) { if (need_content || rci_ev != NULL) // fetch_binlog_by_offset(&log, start_pos, end_pos, rci_ev, log_content); fetch_binlog_by_offset(binlog_file_reader, start_pos, end_pos, rci_ev, log_content); sql_print_information("directly read last log size %llu", end_pos - start_pos); end_pos = start_pos = my_b_tell(binlog_file_reader.get_io_cache()); } if (fd_ev_p != &fd_ev) { delete fd_ev_p; fd_ev_p = &fd_ev; } sql_print_information("directly read log reached consensus index %llu", consensus_index); if (!found) sql_print_error("read log by consensus index failed"); // mysql_file_close(file, MYF(MY_WME)); // end_io_cache(&log); return (int)!found; } int MYSQL_BIN_LOG::prefetch_logs_of_file(THD *thd, uint64 channel_id, const char* file_name, uint64 start_index) { // const char *errmsg = NULL; // IO_CACHE log; LOG_INFO linfo; strncpy(linfo.log_file_name, file_name, FN_REFLEN - 1); mysql_mutex_lock(&thd->LOCK_thd_data); thd->current_linfo = &linfo; mysql_mutex_unlock(&thd->LOCK_thd_data); // File file = open_binlog_file(&log, file_name, &errmsg); // if (file < 0) { // thd->current_linfo = 0; // return 1; // } Binlog_file_reader binlog_file_reader(opt_master_verify_checksum); if (binlog_file_reader.open(file_name)) { mysql_mutex_lock(&thd->LOCK_thd_data); thd->current_linfo = 0; mysql_mutex_unlock(&thd->LOCK_thd_data); return 1; } Format_description_log_event fd_ev; Format_description_log_event *fd_ev_p = &fd_ev; // my_b_seek(&log, BIN_LOG_HEADER_SIZE); binlog_file_reader.seek(BIN_LOG_HEADER_SIZE); binlog_file_reader.set_format_description_event(*fd_ev_p); Log_event *ev = NULL; Consensus_cluster_info_log_event *rci_ev = NULL; Consensus_log_event *consensus_log_ev = NULL; uint64 start_pos = my_b_tell(binlog_file_reader.get_io_cache()); uint64 end_pos = start_pos; uint64 current_index = 0; uint64 current_term = 0; uint32 consensus_log_length = 0; uint current_flag = 0; uint64 current_crc32 = 0; bool stop_prefetch = FALSE; std::string log_content; std::vector blob_index_list; std::vector blob_term_list; std::vector blob_flag_list; std::vector blob_crc32_list; ConsensusPreFetchManager *prefetch_mgr = consensus_log_manager.get_prefetch_manager(); ConsensusPreFetchChannel *prefetch_channel = prefetch_mgr->get_prefetch_channel(channel_id); prefetch_channel->set_prefetching(TRUE); if (prefetch_channel->get_channel_id() == 0) prefetch_channel->clear_large_trx_table(); // while (!stop_prefetch && (ev = Log_event::read_log_event(&log, 0, fd_ev_p, 1)) != NULL) while (!stop_prefetch && (ev = binlog_file_reader.read_event_object()) != NULL) { switch (ev->get_type_code()) { case binary_log::CONSENSUS_LOG_EVENT: consensus_log_ev = (Consensus_log_event*)ev; current_index = consensus_log_ev->get_index(); current_term = consensus_log_ev->get_term(); consensus_log_length = consensus_log_ev->get_length(); current_flag = consensus_log_ev->get_flag(); current_crc32 = consensus_log_ev->get_reserve(); end_pos = start_pos = my_b_tell(binlog_file_reader.get_io_cache()); if (opt_consensus_prefetch_fast_fetch) { /* * jump to next consensus_log_event: * 1. not large trx, not blob * 2. current_index + window_size < start_index */ if ((!(current_flag & (Consensus_log_event_flag::FLAG_LARGE_TRX | Consensus_log_event_flag::FLAG_LARGE_TRX_END | Consensus_log_event_flag::FLAG_BLOB | Consensus_log_event_flag::FLAG_BLOB_START | Consensus_log_event_flag::FLAG_BLOB_END))) && (current_index + prefetch_channel->get_window_size() < start_index)) { // my_b_seek(&log, start_pos + consensus_log_length); binlog_file_reader.seek(start_pos + consensus_log_length); } /* * fetch data directly: * 1. not large trx, not blob * 2. not configure change * 3. current_index + window_size >= start_index */ if ((!(current_flag & (Consensus_log_event_flag::FLAG_LARGE_TRX | Consensus_log_event_flag::FLAG_LARGE_TRX_END | Consensus_log_event_flag::FLAG_BLOB | Consensus_log_event_flag::FLAG_BLOB_START | Consensus_log_event_flag::FLAG_BLOB_END | Consensus_log_event_flag::FLAG_CONFIG_CHANGE))) && (current_index + prefetch_channel->get_window_size() >= start_index)) { uchar* buffer = (uchar*)my_malloc(key_memory_thd_main_mem_root, consensus_log_length, MYF(MY_WME)); // my_b_read(&log, buffer, consensus_log_length); // binlog_file_reader.read_event_data(&buffer, &consensus_log_length); my_b_read(binlog_file_reader.get_io_cache(), buffer, consensus_log_length); int result = 0; while ((result = prefetch_channel->add_log_to_prefetch_cache(current_term, current_index, consensus_log_length, buffer, false, current_flag, current_crc32)) == FULL) { // wait condition already executed in add log to prefetch cache } if (result == INTERRUPT || current_index == consensus_log_manager.get_sync_index()) stop_prefetch = TRUE; my_free(buffer); end_pos = my_b_tell(binlog_file_reader.get_io_cache()); assert(end_pos - start_pos == consensus_log_length); } } break; case binary_log::FORMAT_DESCRIPTION_EVENT: if (fd_ev_p != &fd_ev) delete fd_ev_p; fd_ev_p = (Format_description_log_event*)ev; binlog_file_reader.set_format_description_event(*fd_ev_p); break; default: if (!ev->is_control_event()) { end_pos = my_b_tell(binlog_file_reader.get_io_cache()); if (ev->get_type_code() == binary_log::CONSENSUS_CLUSTER_INFO_EVENT) { rci_ev = (Consensus_cluster_info_log_event*)ev; } if (end_pos > start_pos && end_pos - start_pos == consensus_log_length) { if (prefetch_channel->get_channel_id() == 0 && (current_flag & (Consensus_log_event_flag::FLAG_LARGE_TRX | Consensus_log_event_flag::FLAG_LARGE_TRX_END))) { prefetch_channel->add_log_to_large_trx_table(current_term, current_index, (rci_ev != NULL), current_flag); } if (current_flag & Consensus_log_event_flag::FLAG_BLOB) { blob_index_list.push_back(current_index); blob_term_list.push_back(current_term); blob_flag_list.push_back(current_flag); blob_crc32_list.push_back(current_crc32); } else if (current_flag & Consensus_log_event_flag::FLAG_BLOB_END) { blob_index_list.push_back(current_index); blob_term_list.push_back(current_term); blob_flag_list.push_back(current_flag); blob_crc32_list.push_back(current_crc32); uint64 split_len = opt_consensus_large_event_split_size; uint64 blob_start_pos = start_pos, blob_end_pos = start_pos + split_len; for (size_t i=0; iget_window_size() >= start_index) { // fetch_binlog_by_offset(&log, blob_start_pos, blob_end_pos, NULL, log_content); fetch_binlog_by_offset(binlog_file_reader, blob_start_pos, blob_end_pos, NULL, log_content); int result = 0; while ((result = prefetch_channel->add_log_to_prefetch_cache(blob_term_list[i], blob_index_list[i], log_content.size(), (uchar*)log_content.c_str(), false, blob_flag_list[i], blob_crc32_list[i])) == FULL) { // wait condition already executed in add log to prefetch cache } if (result == INTERRUPT) { stop_prefetch = TRUE; break; // break iterate blob_index_list } } blob_start_pos = blob_end_pos; blob_end_pos = blob_end_pos + split_len > end_pos? end_pos: blob_end_pos + split_len; } blob_index_list.clear(); blob_term_list.clear(); blob_flag_list.clear(); blob_crc32_list.clear(); } else { if (current_index + prefetch_channel->get_window_size() >= start_index) { // fetch_binlog_by_offset(&log, start_pos, end_pos, rci_ev, log_content); fetch_binlog_by_offset(binlog_file_reader, start_pos, end_pos, rci_ev, log_content); int result = 0; while ((result = prefetch_channel->add_log_to_prefetch_cache(current_term, current_index, log_content.size(), (uchar*)log_content.c_str(), (rci_ev != NULL), current_flag, current_crc32)) == FULL) { // wait condition already executed in add log to prefetch cache } if (result == INTERRUPT || current_index == consensus_log_manager.get_sync_index()) { stop_prefetch = TRUE; // because truncate log happened, stop prefetch and retry } } } rci_ev = NULL; } } break; } if (ev != NULL && ev != fd_ev_p) delete ev, ev = NULL; } if (fd_ev_p != &fd_ev) { delete fd_ev_p; fd_ev_p = &fd_ev; } prefetch_channel->set_prefetching(FALSE); prefetch_channel->dec_ref_count(); prefetch_channel->clear_prefetch_request(); // mysql_file_close(file, MYF(MY_WME)); // end_io_cache(&log); mysql_mutex_lock(&thd->LOCK_thd_data); thd->current_linfo = 0; mysql_mutex_unlock(&thd->LOCK_thd_data); sql_print_information("channel_id %llu prefetch log reached consensus index %llu", channel_id, (uint64)current_index); return 0; } /* There are 3 condition to determine the right position 1. beginning of the index 2. ending of the previous index 3. beginning of the binlog file */ int MYSQL_BIN_LOG::find_pos_by_consensus_index(const char* file_name, uint64 consensus_index, uint64 *pos) { // const char *errmsg = NULL; // IO_CACHE log; // File file = open_binlog_file(&log, file_name, &errmsg); // if (file < 0) //{ //return 1; //} Binlog_file_reader binlog_file_reader(opt_master_verify_checksum); if (binlog_file_reader.open(file_name)) return 1; Format_description_log_event fd_ev; Format_description_log_event *fd_ev_p = &fd_ev; // my_b_seek(&log, BIN_LOG_HEADER_SIZE); binlog_file_reader.seek(BIN_LOG_HEADER_SIZE); binlog_file_reader.set_format_description_event(*fd_ev_p); Log_event *ev = NULL; Consensus_log_event *consensus_log_ev = NULL; Previous_consensus_index_log_event *consensus_prev_ev = NULL; bool found = FALSE; bool first_log_in_file = FALSE; // while (!found && (ev = Log_event::read_log_event(&log, 0, fd_ev_p, 1)) != NULL) while (!found && (ev = binlog_file_reader.read_event_object()) != NULL) { switch (ev->get_type_code()) { case binary_log::CONSENSUS_LOG_EVENT: consensus_log_ev = (Consensus_log_event*)ev; if (consensus_index == consensus_log_ev->get_index()) found = TRUE; if (consensus_index == consensus_log_ev->get_index() + 1) { found = TRUE; *pos = my_b_tell(binlog_file_reader.get_io_cache()) + consensus_log_ev->get_length(); } break; case binary_log::PREVIOUS_CONSENSUS_INDEX_LOG_EVENT: consensus_prev_ev = (Previous_consensus_index_log_event*)ev; if (consensus_index == consensus_prev_ev->get_index()) first_log_in_file = TRUE; break; case binary_log::PREVIOUS_GTIDS_LOG_EVENT: if (first_log_in_file) { *pos = my_b_tell(binlog_file_reader.get_io_cache()); found = TRUE; } break; case binary_log::FORMAT_DESCRIPTION_EVENT: if (fd_ev_p != &fd_ev) delete fd_ev_p; fd_ev_p = (Format_description_log_event*)ev; binlog_file_reader.set_format_description_event(*fd_ev_p); break; default: break; } if (ev != NULL && ev != fd_ev_p) delete ev, ev = NULL; } if (fd_ev_p != &fd_ev) { delete fd_ev_p; fd_ev_p = &fd_ev; } // mysql_file_close(file, MYF(MY_WME)); // end_io_cache(&log); return !found; } int MYSQL_BIN_LOG::truncate_logs_from_index(std::vector & files_list, std::string last_file) { LOG_INFO log_info; mysql_mutex_lock(&LOCK_index); if (find_log_pos(&log_info, last_file.c_str(), false/*need_lock_index=false*/)) { sql_print_error("MYSQL_BIN_LOG::truncate_logs was called with file %s not " "listed in the index.", last_file.c_str()); goto err; } if (open_crash_safe_index_file()) { sql_print_error("MYSQL_BIN_LOG::remove_logs_from_index failed to " "open the crash safe index file."); goto err; } for (std::vector::iterator it = files_list.begin(); it != files_list.end(); it++) { std::string record = (*it) + "\n"; if (mysql_file_write(crash_safe_index_file.file, (uchar*)record.c_str(), record.length(), MYF(MY_WME | MY_NABP))) goto err; } if (close_crash_safe_index_file()) { sql_print_error("MYSQL_BIN_LOG::remove_logs_from_index failed to " "close the crash safe index file."); goto err; } DBUG_EXECUTE_IF("fault_injection_copy_part_file", DBUG_SUICIDE();); if (move_crash_safe_index_file_to_index_file(false/*need_lock_index=false*/)) { sql_print_error("MYSQL_BIN_LOG::remove_logs_from_index failed to " "move crash safe index file to index file."); goto err; } // #ifdef HAVE_REPLICATION // now update offsets in index file for running threads adjust_linfo_offsets(log_info.index_file_start_offset); // #endif mysql_mutex_unlock(&LOCK_index); return 0; err: sql_print_error("truncate log from index failed"); mysql_mutex_unlock(&LOCK_index); return 1; } int MYSQL_BIN_LOG::truncate_files_after(std::string & file_name) { int error = 0; bool found = false; std::vector consensus_log_file_name_vector; std::vector delete_vector; std::vector exist_vector; mysql_mutex_assert_owner(&LOCK_log); get_consensus_log_file_list(consensus_log_file_name_vector); if (file_name == *(consensus_log_file_name_vector.rbegin())) return 0; for (std::vector::iterator iter = consensus_log_file_name_vector.begin(); iter != consensus_log_file_name_vector.end(); iter++) { if (!found) exist_vector.push_back(*iter); else delete_vector.push_back(*iter); if (*iter == file_name) found = TRUE; } // truncate consensus log file index consensus_log_manager.get_log_file_index()->truncate_after(file_name); // modify index file truncate_logs_from_index(exist_vector, file_name); // delete file for (std::vector::iterator iter = delete_vector.begin(); iter != delete_vector.end(); iter++) { if (mysql_file_delete(key_file_binlog, (*iter).c_str(), MYF(0))) { DBUG_ASSERT(0); error = 1; break; } } if (error) sql_print_error("truncate_files_after failed"); return error; } int MYSQL_BIN_LOG::truncate_single_file_by_consensus_index(const char* file_name, uint64 consensus_index) { DBUG_ASSERT(consensus_index != 0); uint64 offset = 0; File file; mysql_mutex_assert_owner(&LOCK_log); if (find_pos_by_consensus_index(file_name, consensus_index, &offset)) { sql_print_error("Failed to find pos by consensus index %llu when truncate ", consensus_index); return -1; } if ((file = mysql_file_open(key_file_binlog, file_name, O_RDWR | O_BINARY, MYF(MY_WME))) < 0) { sql_print_error("Failed to open the binlog file when truncate."); return -1; } if (my_chsize(file, offset, 0, MYF(MY_WME))) { sql_print_error("Failed to resize binlog file when truncate."); mysql_file_close(file, MYF(MY_WME)); return -1; } else { sql_print_information("Truncate binlog file %s, Binlog trimmed to %llu bytes.", file_name, offset); mysql_file_close(file, MYF(MY_WME)); } return 0; } int MYSQL_BIN_LOG::consensus_truncate_log(uint64 consensus_index, Relay_log_info *rli) { std::string file_name; mysql_mutex_lock(&LOCK_sync); // truncate must not cross binlog file. if (find_log_by_consensus_index(consensus_index, file_name) || truncate_files_after(file_name) || truncate_single_file_by_consensus_index(file_name.c_str(), consensus_index)) { sql_print_error("Truncate cannot find consensus index log %llu.", consensus_index); abort(); } else { consensus_log_manager.set_sync_index(consensus_index - 1); consensus_log_manager.set_current_index(consensus_index); // move this to truncate_single_file_by_consensus_index uint64 offset = 0; if (find_pos_by_consensus_index(file_name.c_str(), consensus_index, &offset)) { sql_print_error("Failed to find pos by consensus index %llu when truncate ", consensus_index); goto err; } if (m_binlog_file->truncate(offset)) { sql_print_error("Failed to truncate the binlog file.", offset); goto err; } atomic_binlog_end_pos = offset; if (rli) { rli->notify_relay_log_truncated(); } } mysql_mutex_unlock(&LOCK_sync); return 0; err: mysql_mutex_unlock(&LOCK_sync); return 1; } int MYSQL_BIN_LOG::consensus_get_log_position(uint64 consensus_index, char* log_name, uint64 *pos) { std::string file_name; int ret = 0; // use another io_cache , so do not need lock LOCK_log if (find_log_by_consensus_index(consensus_index, file_name) || find_pos_by_consensus_index(file_name.c_str(), consensus_index, pos)) { sql_print_error("Get log position cannot find consensus index log %llu.", consensus_index); ret = 1; } strncpy(log_name, file_name.c_str(), FN_REFLEN); return ret; } int MYSQL_BIN_LOG::consensus_get_log_entry(uint64 consensus_index, uint64 *consensus_term, std::string& log_content, bool *outer, uint *flag, uint64 *checksum, bool need_content) { std::string file_name; int ret = 0; // use another io_cache , so do not need lock LOCK_log if (find_log_by_consensus_index(consensus_index, file_name) || read_log_by_consensus_index(file_name.c_str(), consensus_index, consensus_term, log_content, outer, flag, checksum, need_content)) { sql_print_error("Get log entry cannot find consensus index log %llu.", consensus_index); ret = 1; } return ret; } int MYSQL_BIN_LOG::consensus_prefetch_log_entries(THD *thd, uint64 channel_id, uint64 consensus_index) { std::string file_name; int ret = 0; // use another io_cache , so do not need lock LOCK_log if (find_log_by_consensus_index(consensus_index, file_name) || prefetch_logs_of_file(thd, channel_id, file_name.c_str(), consensus_index)) { sql_print_error("Prefetch cannot find consensus index log %llu.", consensus_index); ret = 1; } return ret; } static void store_gtid_for_xpaxos(const char *buf, Relay_log_info *rli) { Log_event_type event_type = (Log_event_type)buf[EVENT_TYPE_OFFSET]; Format_description_log_event fd_ev; fd_ev.footer()->checksum_alg = static_cast(binlog_checksum_options); if (event_type == binary_log::GCN_LOG_EVENT) { buf = buf + Gcn_log_event::get_event_length(fd_ev.footer()->checksum_alg); event_type = (Log_event_type)buf[EVENT_TYPE_OFFSET]; } if (event_type == binary_log::GTID_LOG_EVENT) { Gtid_log_event gtid_ev(buf, &fd_ev); rli->get_sid_lock()->wrlock(); rli->add_logged_gtid(rli->get_sid_map()->add_sid(*gtid_ev.get_sid()), gtid_ev.get_gno()); rli->get_sid_lock()->unlock(); } } static void revise_one_event(uchar *event_ptr, size_t event_len, size_t log_pos) { /* PolarDB-X Engine: fix timestamp for non-leader local event */ if (consensus_log_manager.get_status() != Consensus_Log_System_Status::BINLOG_WORKING) { uint32 tt = uint4korr(event_ptr); if (!Log_event::is_local_event_type(static_cast(event_ptr[EVENT_TYPE_OFFSET]))) { // cache the last timestamp consensus_log_manager.set_event_timestamp(tt); } else { // set control event timestamp to the lastest non-control one uint32 last_tt = consensus_log_manager.get_event_timestamp(); if (last_tt) int4store(event_ptr, last_tt); } } /* PolarDB-X Engine: reset each binlog event's log_pos (end_log_pos) to the correct value */ int4store(event_ptr + LOG_POS_OFFSET, log_pos); /* PolarDB-X Engine: recalculate the checksum if necessary */ if (binlog_checksum_options != binary_log::BINLOG_CHECKSUM_ALG_OFF) { ha_checksum crc= checksum_crc32(0L, NULL, 0); crc= checksum_crc32(crc, event_ptr, event_len - BINLOG_CHECKSUM_LEN); int4store(event_ptr + event_len - BINLOG_CHECKSUM_LEN, crc); } } static int init_consensus_event_timestamp(uchar *buf, size_t len) { if (opt_consensuslog_revise && consensus_log_manager.get_status() != Consensus_Log_System_Status::BINLOG_WORKING && consensus_log_manager.get_event_timestamp() == 0) { /* get event timestamp from current buffer */ size_t event_len = 0; uchar *header = buf; while ((size_t)(header - buf) < len) { event_len = uint4korr(header + EVENT_LEN_OFFSET); if (!Log_event::is_local_event_type(static_cast(header[EVENT_TYPE_OFFSET]))) { consensus_log_manager.set_event_timestamp(uint4korr(header)); break; } header += event_len; } if ((size_t)(header - buf) > len) { sql_print_error("Found invalid event during init_consensus_event_timestamp."); return 1; // report error to abort } } return 0; } static int revise_entry_and_write(MYSQL_BIN_LOG::Binlog_ofile *binlog_file, uchar *buf, size_t len) { /* revise end_pos & timestamp in consensus log */ if (!opt_consensuslog_revise) return binlog_file->write(buf, len); size_t event_len = 0; uchar *header = buf; size_t offset = binlog_file->position(); while ((size_t)(header - buf) < len) { event_len = uint4korr(header + EVENT_LEN_OFFSET); revise_one_event(header, event_len, offset + header - buf + event_len); header += event_len; } if ((size_t)(header - buf) != len) { sql_print_error("Found invalid event during revise_entry_and_write."); return 1; // report error to abort } return binlog_file->write(buf, len); } int MYSQL_BIN_LOG::append_consensus_log(ConsensusLogEntry &log, uint64* index, bool* rotate_var, Relay_log_info *rli, bool with_check) { int error = 0; uint64 bytes = 0; uchar *real_buffer = NULL; size_t real_buf_size = 0; my_off_t end_pos = 0; mysql_mutex_lock(&LOCK_log); if (with_check) { mysql_mutex_lock(consensus_log_manager.get_term_lock()); if (consensus_log->getCurrentTerm() != log.term) { mysql_mutex_unlock(consensus_log_manager.get_term_lock()); mysql_mutex_unlock(&LOCK_log); /* set index to 0 to mark it fail */ *index = 0; /* return 0 do not let it abort */ return 0; } mysql_mutex_unlock(consensus_log_manager.get_term_lock()); } // cluster info should consider real binlog format size if (log.outer) { Consensus_cluster_info_log_event ev(log.buf_size, (char*)(log.buffer)); ev.common_footer->checksum_alg = static_cast (binlog_checksum_options); error = ev.write(consensus_log_manager.get_log_file()); real_buf_size = consensus_log_manager.serialize_cache(&real_buffer); } else if (log.flag & Consensus_log_event_flag::FLAG_BLOB) { if (log.flag & Consensus_log_event_flag::FLAG_BLOB_START) { /* some log may truncate after leader crash recovery, so if FLAG_BLOB_START, clear the cache */ reinit_io_cache(consensus_log_manager.get_cache(), WRITE_CACHE, 0, 0, 1); } /* save empty log as a replace to binlog */ std::string empty_log = consensus_log_manager.get_empty_log(); real_buffer = (uchar*)my_malloc(key_memory_thd_main_mem_root, empty_log.length(), MYF(MY_WME)); memcpy(real_buffer, empty_log.data(), empty_log.length()); real_buf_size = empty_log.length(); /* save real data to cache */ sql_print_information("Large event: cache the current log, size(%lu).", log.buf_size); my_b_write(consensus_log_manager.get_cache(), log.buffer, log.buf_size); } else if (log.flag & Consensus_log_event_flag::FLAG_BLOB_END) { DBUG_EXECUTE_IF("crash_during_large_event_receive", {DBUG_SUICIDE();}); DBUG_EXECUTE_IF("crash_during_large_event_receive_slow", {sleep(2); DBUG_SUICIDE();}); /* save real data to cache, now cache has the integrated blob event */ sql_print_information("Large event: cache the current log, size(%lu).", log.buf_size); my_b_write(consensus_log_manager.get_cache(), log.buffer, log.buf_size); real_buf_size = consensus_log_manager.serialize_cache(&real_buffer); sql_print_information("Large event: load the whole log, size(%lu).", real_buf_size); } else { real_buffer = log.buffer; real_buf_size = log.buf_size; } *index = consensus_log_manager.get_current_index(); if (*index != log.index && log.index != 0) // leader write empty log entry with index 0 { sql_print_error("Consensus Index Mismatch, system current index is %llu, " "but the log index is %llu.", *index, log.index); abort(); } consensus_log_manager.get_fifo_cache_manager()->add_log_to_cache(log.term, *index, log.buf_size, log.buffer, log.outer, log.flag, log.checksum); if (!error) error = init_consensus_event_timestamp(real_buffer, real_buf_size); if (!error) error = write_consensus_log(log.flag, log.term, real_buf_size, log.checksum); if (!error) error = revise_entry_and_write(m_binlog_file, real_buffer, real_buf_size); if (!error) { store_gtid_for_xpaxos((const char*)real_buffer, rli); } bytes += real_buf_size; if (log.outer || (log.flag & (Consensus_log_event_flag::FLAG_BLOB | Consensus_log_event_flag::FLAG_BLOB_END))) { my_free(real_buffer); } bytes_written += bytes; if (!error) error = flush_and_sync(FALSE); if (error) goto err; consensus_log_manager.set_sync_index_if_greater(*index); // signal_update(); end_pos = my_b_safe_tell(m_binlog_file->get_io_cache()); update_binlog_end_pos(m_binlog_file->get_binlog_name(), end_pos); if (end_pos >= (my_off_t)max_size) *rotate_var = true; #ifdef NORMANDY_CLUSTER if (opt_cluster_log_type_instance) { consensus_ptr->updateAppliedIndex(*index); replica_read_manager.update_lsn(*index); } #endif err: if (error) { char err_buff[MYSQL_ERRMSG_SIZE] = "Append log error Hence aborting the server."; exec_binlog_error_action_abort(err_buff); } mysql_mutex_unlock(&LOCK_log); return error; } int MYSQL_BIN_LOG::append_multi_consensus_logs(std::vector &logs, uint64* max_index, bool* rotate_var, Relay_log_info *rli) { int error = 0; my_off_t end_pos = 0; mysql_mutex_lock(&LOCK_log); for (auto iter = logs.begin(); iter != logs.end(); iter++) { uint64 bytes = 0; size_t real_buf_size = 0; uchar *real_buffer = NULL; *max_index = consensus_log_manager.get_current_index(); if (*max_index != iter->index) { sql_print_error("Consensus Index Mismatch, system current index is %llu, " "but the log index is %llu.", *max_index, iter->index); abort(); } consensus_log_manager.get_fifo_cache_manager()->add_log_to_cache(iter->term, *max_index, iter->buf_size, iter->buffer, iter->outer, iter->flag, iter->checksum); // cluster info should consider real binlog format size if (iter->outer) { Consensus_cluster_info_log_event ev(iter->buf_size, (char*)(iter->buffer)); ev.common_footer->checksum_alg = static_cast (binlog_checksum_options); error = ev.write(consensus_log_manager.get_log_file()); real_buf_size = consensus_log_manager.serialize_cache(&real_buffer); } else if (iter->flag & Consensus_log_event_flag::FLAG_BLOB) { if (iter->flag & Consensus_log_event_flag::FLAG_BLOB_START) { /* some log may truncate after leader crash recovery, so if FLAG_BLOB_START, clear the cache */ reinit_io_cache(consensus_log_manager.get_cache(), WRITE_CACHE, 0, 0, 1); } /* save empty log as a replace to binlog */ std::string empty_log = consensus_log_manager.get_empty_log(); real_buffer = (uchar*)my_malloc(key_memory_thd_main_mem_root, empty_log.length(), MYF(MY_WME)); memcpy(real_buffer, empty_log.data(), empty_log.length()); real_buf_size = empty_log.length(); /* save real data to cache */ sql_print_information("Large event: cache the current log, size(%lu).", iter->buf_size); my_b_write(consensus_log_manager.get_cache(), iter->buffer, iter->buf_size); } else if (iter->flag & Consensus_log_event_flag::FLAG_BLOB_END) { DBUG_EXECUTE_IF("crash_during_large_event_receive", {DBUG_SUICIDE();}); DBUG_EXECUTE_IF("crash_during_large_event_receive_slow", {sleep(2); DBUG_SUICIDE();}); /* save real data to cache, now cache has the integrated blob event */ sql_print_information("Large event: cache the current log, size(%lu).", iter->buf_size); my_b_write(consensus_log_manager.get_cache(), iter->buffer, iter->buf_size); /* read the total cache */ real_buf_size = consensus_log_manager.serialize_cache(&real_buffer); sql_print_information("Large event: load the whole log, size(%lu).", real_buf_size); } else { real_buffer = iter->buffer; real_buf_size = iter->buf_size; } if (!error) error = init_consensus_event_timestamp(real_buffer, real_buf_size); if (!error) error = write_consensus_log(iter->flag, iter->term, real_buf_size, iter->checksum); if (!error) error = revise_entry_and_write(m_binlog_file, real_buffer, real_buf_size); if (!error && rli != NULL) { store_gtid_for_xpaxos((const char*)real_buffer, rli); } bytes = real_buf_size; if (iter->outer || (iter->flag & (Consensus_log_event_flag::FLAG_BLOB | Consensus_log_event_flag::FLAG_BLOB_END))) { my_free(real_buffer); } if (error) break; bytes_written += bytes; } if (!error) error = flush_and_sync(FALSE); if (error) goto err; consensus_log_manager.set_sync_index_if_greater(*max_index); // signal_update(); end_pos = my_b_safe_tell(m_binlog_file->get_io_cache()); update_binlog_end_pos(m_binlog_file->get_binlog_name(), end_pos); if (end_pos >= (my_off_t)max_size) *rotate_var = true; #ifdef NORMANDY_CLUSTER if (opt_cluster_log_type_instance) { consensus_ptr->updateAppliedIndex(*max_index); replica_read_manager.update_lsn(*max_index); } #endif err: if (error) { char err_buff[MYSQL_ERRMSG_SIZE] = "Append multi logs error Hence aborting the server."; exec_binlog_error_action_abort(err_buff); } mysql_mutex_unlock(&LOCK_log); return error; } int MYSQL_BIN_LOG::rotate_consensus_log() { int error = 0; bool need_real_rotate = false; mysql_mutex_lock(&LOCK_rotate); need_real_rotate = !rotating; if (!rotating) rotating = TRUE; mysql_mutex_unlock(&LOCK_rotate); if (need_real_rotate) { mysql_mutex_lock(&LOCK_log); bool check_purge = false; error = rotate(false, &check_purge); mysql_mutex_unlock(&LOCK_log); mysql_mutex_lock(&LOCK_rotate); rotating = FALSE; mysql_mutex_unlock(&LOCK_rotate); } return error; } void MYSQL_BIN_LOG::consensus_before_commit(THD *thd) { #ifdef NORMANDY_CLUSTER if (opt_initialize) return; if (thd->commit_error != THD::CE_NONE || ((consensus_ptr->waitCommitIndexUpdate(thd->consensus_index - 1, thd->consensus_term) < thd->consensus_index) && (thd->consensus_index > consensus_log_manager.get_consensus_info()->get_start_apply_index())) ) { // TODO: need write apply index to consensus info table??? sql_print_warning("Failed to commit ,because previous error or shutdown or leadership changed, system apply index:%llu, thd consensus term:%llu, consensus index:%llu", consensus_log_manager.get_consensus_info()->get_start_apply_index(), thd->consensus_term, thd->consensus_index); if (thd->commit_error == THD::CE_NONE) { sql_print_warning( "'There are some dirty binlogs, restert to deal with them"); flush_error_log_messages(); abort(); } thd->mark_transaction_to_rollback(true); thd->commit_error = THD::CE_COMMIT_ERROR; thd->get_transaction()->m_flags.commit_low = false; // define error code // if code is not shutdown or log too large, it must be leadership change if (consensus_ptr->isShutdown()) thd->consensus_error = THD::CSS_SHUTDOWN; if (thd->consensus_error == THD::CSS_NONE) thd->consensus_error = THD::CSS_LEADERSHIP_CHANGE; if (thd->consensus_error == THD::CSS_LEADERSHIP_CHANGE) my_error(ER_CONSENSUS_LEADERSHIP_CHANGE, MYF(0)); else if (thd->consensus_error == THD::CSS_LOG_TOO_LARGE) my_error(ER_CONSENSUS_LOG_TOO_LARGE, MYF(0)); else if (thd->consensus_error == THD::CSS_SHUTDOWN) my_error(ER_SERVER_SHUTDOWN, MYF(0)); else my_error(ER_CONSENSUS_OTHER_ERROR, MYF(0)); } #endif } /** MYSQLD server recovers from last crashed binlog. @param log IO_CACHE of the crashed binlog. @param fdle Format_description_log_event of the crashed binlog. @param valid_pos The position of the last valid transaction or event(non-transaction) of the crashed binlog. @retval 0 ok @retval 1 error */ int MYSQL_BIN_LOG::recover_intergrity_for_normandy(Binlog_file_reader *binlog_file_reader, Format_description_log_event *fdle, my_off_t *valid_pos) { Log_event *ev; uint64 current_index = 0, valid_index = 0; uint64 current_term = 0; uint64 current_length = 0; uint current_flag = 0; uint64 current_crc32 = 0; // HASH xids; // MEM_ROOT mem_root; MEM_ROOT mem_root(key_memory_binlog_recover_exec, MEM_ROOT_BLOCK_SIZE); memroot_unordered_map xids(&mem_root); Consensus_cluster_info_log_event *rci_ev = NULL; uint64 start_pos = BIN_LOG_HEADER_SIZE; uint64 end_pos = start_pos; uint64 recover_term = 0; // mark the last term that this node as leader write log std::string log_content; ulonglong current_commit_gcn = MYSQL_GCN_NULL; /* The flag is used for handling the case that a transaction is partially written to the binlog. */ bool __attribute__((unused)) in_transaction= FALSE; bool begin_consensus = FALSE; std::vector blob_index_list; std::vector blob_term_list; std::vector blob_flag_list; std::vector blob_crc32_list; /*collect gtid and start pos for recovery apply binlog*/ Gtid gtid; my_off_t ev_start_pos = 0; gtid.clear(); if (!fdle->is_valid() /*|| my_hash_init(&xids, &my_charset_bin, TC_LOG_PAGE_SIZE / 3, 0, sizeof(my_xid), 0, 0, MYF(0), key_memory_binlog_recover_exec)*/) goto err1; // init_alloc_root(key_memory_binlog_recover_exec, // &mem_root, TC_LOG_PAGE_SIZE, TC_LOG_PAGE_SIZE); /* Gtid->contain need global_sid_lock->wrlock*/ global_sid_lock->wrlock(); while ((ev = binlog_file_reader->read_event_object()) != NULL && ev->is_valid()) { if (ev->get_type_code() == binary_log::CONSENSUS_LOG_EVENT) { Consensus_log_event * rev = (Consensus_log_event*)ev; if (current_index > rev->get_index()) { sql_print_error("ConsensusLogManager::recover_intergrity_for_normandy error, Log index out of order"); abort(); } if (end_pos < start_pos) { sql_print_error("ConsensusLogManager::recover_intergrity_for_normandy error, Log struct broken"); abort(); } current_index = rev->get_index(); current_term = rev->get_term(); current_length = rev->get_length(); current_flag = rev->get_flag(); current_crc32 = rev->get_reserve(); end_pos = start_pos = my_b_tell(binlog_file_reader->get_io_cache()); begin_consensus = TRUE; ev_start_pos = binlog_file_reader->event_start_pos(); } else if (ev->get_type_code() == binary_log::PREVIOUS_CONSENSUS_INDEX_LOG_EVENT) { Previous_consensus_index_log_event * prev_consensus_event = (Previous_consensus_index_log_event *)ev; current_index = prev_consensus_event->get_index() - 1; valid_index = current_index; } else if (!ev->is_control_event()) { end_pos = my_b_tell(binlog_file_reader->get_io_cache()); if (ev->get_type_code() == binary_log::CONSENSUS_CLUSTER_INFO_EVENT) { rci_ev = (Consensus_cluster_info_log_event*)ev; } else if (ev->get_type_code() == binary_log::QUERY_EVENT) { if (!strcmp(((Query_log_event *)ev)->query, "BEGIN") || !strncmp(((Query_log_event *)ev)->query, "XA START", strlen("XA START"))) { in_transaction = TRUE; } if (!strcmp(((Query_log_event *)ev)->query, "COMMIT") || !strncmp(((Query_log_event *)ev)->query, "XA COMMIT", strlen("XA COMMIT"))) { #ifndef DEBUG_OFF if(!strcmp(((Query_log_event *)ev)->query, "COMMIT")) DBUG_ASSERT(in_transaction == TRUE); #endif in_transaction = FALSE; current_commit_gcn = MYSQL_GCN_NULL; } } if (ev->get_type_code() == binary_log::GCN_LOG_EVENT) { Gcn_log_event *gcn_ev = (Gcn_log_event *)ev; if (gcn_ev->have_commit_gcn()) current_commit_gcn = gcn_ev->get_commit_gcn(); } if (ev->get_type_code() == binary_log::XA_PREPARE_LOG_EVENT) { in_transaction = FALSE; current_commit_gcn = MYSQL_GCN_NULL; } /* * is_atomic_ddl_event(ev) should been collect and * commit or rollback. * * DDL log operation will fail due to * PolarX move commit or rollback after open_binlog * * We rollback is_atomic_ddl_event(ev) and executing ddl * again in recovery apply binlog. */ if (ev->get_type_code() == binary_log::XID_EVENT) { DBUG_ASSERT(in_transaction == TRUE); in_transaction= FALSE; if (ev->common_header->unmasked_server_id == server_id) { Xid_log_event *xev = (Xid_log_event *)ev; if (recover_term == 0 || current_term > recover_term) { consensus_log_manager.get_recovery_manager()->clear_xid_gcn_and_gtid_xid_map(); recover_term = current_term; } consensus_log_manager.get_recovery_manager() ->add_trx_to_total_commit_map(current_index, xev->xid, current_commit_gcn, gtid); current_commit_gcn = MYSQL_GCN_NULL; } } if (ev->get_type_code() == binary_log::GTID_LOG_EVENT) { Gtid_log_event *gtid_ev = dynamic_cast(ev); gtid.set(gtid_ev->get_sidno(false), gtid_ev->get_gno()); } if (!in_transaction && !is_gtid_event(ev) && !is_gcn_event(ev)) { Binlog_recovery::instance()->add_gtid(gtid, ev_start_pos, current_index); gtid.clear(); } /* find a integrated consensus log */ if (begin_consensus && end_pos > start_pos && end_pos - start_pos == current_length) { if (current_flag & Consensus_log_event_flag::FLAG_BLOB) { blob_index_list.push_back(current_index); blob_term_list.push_back(current_term); blob_flag_list.push_back(current_flag); blob_crc32_list.push_back(current_crc32); } else if (current_flag & Consensus_log_event_flag::FLAG_BLOB_END) { blob_index_list.push_back(current_index); blob_term_list.push_back(current_term); blob_flag_list.push_back(current_flag); blob_crc32_list.push_back(current_crc32); uint64 split_len = opt_consensus_large_event_split_size; uint64 blob_start_pos = start_pos, blob_end_pos = start_pos + split_len; uint64 save_position = binlog_file_reader->position(); for (size_t i=0; iadd_log_to_cache(blob_term_list[i], blob_index_list[i], log_content.size(), (uchar*)log_content.c_str(), (rci_ev != NULL), blob_flag_list[i], blob_crc32_list[i]); blob_start_pos = blob_end_pos; blob_end_pos = blob_end_pos + split_len > end_pos? end_pos: blob_end_pos + split_len; } blob_index_list.clear(); blob_term_list.clear(); blob_flag_list.clear(); blob_crc32_list.clear(); begin_consensus = FALSE; valid_index = current_index; /* fetch_binlog_by_offset will modify the position of binlog_file_reader. */ binlog_file_reader->seek(save_position); } else { uint64 save_position = binlog_file_reader->position(); // copy log to buffer // fetch_binlog_by_offset(log, start_pos, end_pos, rci_ev, log_content); fetch_binlog_by_offset(*binlog_file_reader, start_pos, end_pos, rci_ev, log_content); consensus_log_manager.get_fifo_cache_manager()->add_log_to_cache(current_term, current_index, log_content.size(), (uchar*)log_content.c_str(), (rci_ev != NULL), current_flag, current_crc32); begin_consensus = FALSE; valid_index = current_index; /* fetch_binlog_by_offset will modify the position of binlog_file_reader. */ binlog_file_reader->seek(save_position); } rci_ev = NULL; } } /* do not rely on in_transaction because of large trx */ // if (!log->error && !begin_consensus && !is_gtid_event(ev)) if (!binlog_file_reader->has_fatal_error() && !begin_consensus && !is_gtid_event(ev) && !is_gcn_event(ev)) *valid_pos = my_b_tell(binlog_file_reader->get_io_cache()); delete ev; } if (opt_print_gtid_info_during_recovery) { log_gtid_set("[GTID INFO] Reading from last binlog : ", Binlog_recovery::instance()->get_gtids_in_last_file()); } global_sid_lock->unlock(); if (start_pos < *valid_pos && end_pos > *valid_pos) end_pos = *valid_pos; // recover current/sync index // // if the last log is not integrated if (begin_consensus) { sql_print_warning("last consensus log is not intergrated, sync index should set to %llu instead of %llu", valid_index, current_index); } consensus_log_manager.set_cache_index(valid_index); consensus_log_manager.set_sync_index(valid_index); consensus_log_manager.set_current_index(consensus_log_manager.get_sync_index() + 1); consensus_log_manager.set_enable_rotate(!(current_flag & Consensus_log_event_flag::FLAG_LARGE_TRX)); /* Call ha_recover if and only if there is a registered engine that does 2PC, otherwise in DBUG builds calling ha_recover directly will result in an assert. (Production builds would be safe since ha_recover returns right away if total_ha_2pc <= opt_log_bin.) */ if (consensus_log_manager.get_recovery_manager()->collect_commit_trx_to_hash(xids, &mem_root)) goto err2; if (total_ha_2pc > 1 && ha_recover(&xids)) goto err2; // free_root(&mem_root, MYF(0)); // my_hash_free(&xids); sql_print_warning("Recover consensus index is %llu", current_index); return 0; err2: // free_root(&mem_root, MYF(0)); // my_hash_free(&xids); err1: sql_print_error("Crash recovery failed. Either correct the problem " "(if it's, for example, out of memory error) and restart, " "or delete (or rename) binary log and start mysqld with " "--tc-heuristic-recover={commit|rollback}"); return 1; } void binlog_commit_pos_watcher(bool *is_running) { std::string log_name; uint64_t commitIndex = 0, pos = 0; uint retry = 0; Format_description_log_event fd_ev, *fd_ev_p = &fd_ev; Log_event *ev = NULL; Consensus_log_event *consensus_log_ev = NULL; // const char *errmsg = NULL; bool skip = false; // skip flag if flush log while (*is_running) { /* Note that you cannot flush log if it still has pending xid, which means commit position locates in last binlog file. */ log_name = consensus_log_manager.get_log_file_index()->get_last_log_file_name(); Binlog_file_reader binlog_file_reader(opt_master_verify_checksum); if (binlog_file_reader.open(log_name.c_str())) { sql_print_error("Thread binlog_commit_pos_watcher fails to open the binlog file %s", log_name.c_str()); goto err; } skip = false; while (*is_running && !skip && (ev = binlog_file_reader.read_event_object()) != NULL) { switch (ev->get_type_code()) { case binary_log::PREVIOUS_CONSENSUS_INDEX_LOG_EVENT: { Previous_consensus_index_log_event *consensus_prev_ev = (Previous_consensus_index_log_event*)ev; uint64_t prev_index = consensus_prev_ev->get_index() - 1; /* 1. open a new binlog file 2. reopen the same binlog file because truncateLog happens */ if (prev_index >= consensus_log_manager.get_commit_pos_index()) { pos = binlog_file_reader.position(); consensus_log_manager.update_commit_pos(log_name, pos, prev_index); retry = 0; // reset retry times after a success update_commit_pos } // push to a commitIndex larger than the reported one while(*is_running && ((commitIndex = consensus_ptr->getCommitIndex()) <= consensus_log_manager.get_commit_pos_index())) { my_sleep(opt_commit_pos_watcher_interval); // check whether 'flush log' happens if (log_name != consensus_log_manager.get_log_file_index()->get_last_log_file_name()) { skip = true; break; } } break; } case binary_log::CONSENSUS_LOG_EVENT: // PolarDB-X Engine makes sure the corresponding logEntry exists if index is commitIndex consensus_log_ev = (Consensus_log_event*)ev; if (commitIndex <= consensus_log_ev->get_index()) { if (commitIndex < consensus_log_ev->get_index()) { sql_print_error("Thread binlog_commit_pos_watcher reports a unsafe commit position."); // for defence DBUG_ASSERT(0); // abort on debug mode } pos = binlog_file_reader.position() + consensus_log_ev->get_length(); consensus_log_manager.update_commit_pos(log_name, pos, consensus_log_ev->get_index()); retry = 0; // reset retry times after a success update_commit_pos } // push to a commitIndex larger than the reported one while(*is_running && ((commitIndex = consensus_ptr->getCommitIndex()) <= consensus_log_manager.get_commit_pos_index())) { my_sleep(opt_commit_pos_watcher_interval); // check whether 'flush log' happens if (log_name != consensus_log_manager.get_log_file_index()->get_last_log_file_name()) { skip = true; break; } } break; case binary_log::FORMAT_DESCRIPTION_EVENT: if (fd_ev_p != &fd_ev) delete fd_ev_p; fd_ev_p = (Format_description_log_event*)ev; break; default: break; } if (ev != NULL && ev != fd_ev_p) delete ev, ev = NULL; } // shutdown or EOF err: if (fd_ev_p != &fd_ev) { delete fd_ev_p; fd_ev_p = &fd_ev; } /* It is safe in truncate_log case and the error is not READ_EOF */ if (binlog_file_reader.has_fatal_error()) { // avoid sleep too long (maximum 60 * intervals) and reduce log output if (retry < 60) { retry++; sql_print_warning("Fail to find commit position. " "It could be caused by a binlog truncation or a failed read_log_event. " "Just wait and reopen the file."); } for (uint c = 0; c < retry && *is_running; ++c) my_sleep(opt_commit_pos_watcher_interval); } } } bool MYSQL_BIN_LOG::write_consensus_log(uint flag, uint64 term, uint64 length, uint64 checksum) { DBUG_ENTER("MYSQL_BIN_LOG::write_consensus_log"); Consensus_log_event rev(flag, term, consensus_log_manager.get_current_index(), length); rev.set_reserve(checksum); if (!(rev.get_flag() & Consensus_log_event_flag::FLAG_LARGE_TRX)) alisql_server->setLastNonCommitDepIndex(rev.get_index()); if (opt_consensuslog_revise && is_xpaxos_log && is_relay_log) rev.consensus_extra_time = consensus_log_manager.get_event_timestamp(); if (write_event_to_binlog(&rev)) return true; if (!opt_initialize) consensus_log_manager.incr_current_index(); DBUG_RETURN(false); } /* reset the previous gtids logged of the given binlog file of the commit index */ bool MYSQL_BIN_LOG::reset_previous_gtids_logged(uint64 commit_index) { std::string filename; Gtid_set *previous_gtids_logged= const_cast(gtid_state->get_previous_gtids_logged()); Gtid_set *executed_gtids= const_cast(gtid_state->get_executed_gtids()); Gtid_set previous_gtid_set(executed_gtids->get_sid_map()); DBUG_ASSERT(is_xpaxos_log && !is_relay_log); /* read the previous logged gtid from the relay log which contain the commit_index */ bool ret = false; if (find_log_by_consensus_index(commit_index, filename)) { sql_print_error("Can't find the log has the commit index %llu", commit_index); return true; } global_sid_lock->wrlock(); if (read_gtids_from_binlog(filename.c_str(), NULL, &previous_gtid_set, NULL, executed_gtids->get_sid_map(), opt_master_verify_checksum, false, 0) == GOT_PREVIOUS_GTIDS) { previous_gtids_logged->add_gtid_set(&previous_gtid_set); } else { ret = true; } global_sid_lock->unlock(); return ret; } /** Open a already existed binlog file for normandy - Open the log file and the index file. - When calling this when the file is in use, you must have a locks on LOCK_log and LOCK_index. @retval 0 ok @retval 1 error */ bool MYSQL_BIN_LOG::open_exist_binlog(const char *log_name, const char *new_name, ulong max_size_arg, bool null_created_arg __attribute__((unused)), bool need_lock_index, bool need_sid_lock, Format_description_log_event *extra_description_event __attribute__((unused))) { LOG_INFO log_info; int error = 1; // lock_index must be acquired *before* sid_lock. DBUG_ASSERT(need_sid_lock || !need_lock_index); DBUG_ENTER("MYSQL_BIN_LOG::open_binlog_for_normandy(const char *, ...)"); DBUG_PRINT("enter", ("base filename: %s", log_name)); mysql_mutex_assert_owner(get_log_lock()); // find last log name according to the binlog index if (!my_b_inited(&index_file)) { cleanup(); return 1; } if ((error = find_log_pos(&log_info, NullS, true))) { if (error != LOG_INFO_EOF) { sql_print_error("find_log_pos() failed (error: %d)", error); return error; } } if (error == 0) { do { strmake(log_file_name, log_info.log_file_name, sizeof(log_file_name) - 1); } while (!(error = find_next_log(&log_info, true/*need_lock_index=true*/))); } if (error != LOG_INFO_EOF) { sql_print_error("find_log_pos() failed (error: %d)", error); return error; } // #ifdef HAVE_REPLICATION DBUG_EXECUTE_IF("crash_create_non_critical_before_update_index", DBUG_SUICIDE();); // #endif write_error = 0; /* open the main log file */ if (open_for_normandy( #ifdef HAVE_PSI_INTERFACE m_key_file_log, #endif log_name, new_name)) { DBUG_RETURN(1); /* all warnings issued */ } max_size = max_size_arg; /* This must be before goto err. */ #ifndef DBUG_OFF binary_log_debug::debug_pretend_version_50034_in_binlog = DBUG_EVALUATE_IF("pretend_version_50034_in_binlog", true, false); #endif if (is_relay_log) { /* relay-log */ if (relay_log_checksum_alg == binary_log::BINLOG_CHECKSUM_ALG_UNDEF) { #ifdef NORMANDY_CLUSTER /* PolarDB-X Engine do not send fd event to Follower, so just use binlog_checksum_options. The binlog_checksum_options of Leader and Follower must be set to a same value. */ relay_log_checksum_alg= static_cast (binlog_checksum_options); #else /* inherit master's A descriptor if one has been received */ if (opt_slave_sql_verify_checksum == 0) /* otherwise use slave's local preference of RL events verification */ relay_log_checksum_alg = binary_log::BINLOG_CHECKSUM_ALG_OFF; else relay_log_checksum_alg = static_cast (binlog_checksum_options); #endif } } atomic_log_state = LOG_OPENED; /* At every rotate memorize the last transaction counter state to use it as offset at logging the transaction logical timestamps. */ m_dependency_tracker.rotate(); update_binlog_end_pos(); DBUG_RETURN(0); } uint64 MYSQL_BIN_LOG::wait_xid_disappear() { DBUG_EXECUTE_IF("semi_sync_3-way_deadlock", DEBUG_SYNC(current_thd, "before_rotate_binlog");); mysql_mutex_lock(&LOCK_xids); while (get_prep_xids() > 0) { DEBUG_SYNC(current_thd, "before_rotate_binlog_file"); mysql_cond_wait(&m_prep_xids_cond, &LOCK_xids); } uint64 sync_index = consensus_log_manager.get_sync_index(); mysql_mutex_unlock(&LOCK_xids); return sync_index; } /** Write the consensus log cache to the binary log. The cache will be reset as a READ_CACHE to be able to read the contents from it. The data will be post-processed: see class Binlog_event_writer for details. @param cache Events will be read from this IO_CACHE. @param writer Events will be written to this Binlog_event_writer. @retval true IO error. @retval false Success. @see MYSQL_BIN_LOG::write_cache */ bool MYSQL_BIN_LOG::write_buf_to_log_file(uchar *buffer, size_t buf_size) { // bool ret = my_b_safe_write(&log_file, buffer, buf_size); bool ret = m_binlog_file->write(buffer, buf_size); return ret; } int flush_consensus_log(THD *thd, binlog_cache_data *binlog_cache, Binlog_event_writer *writer, bool &mark_as_rollback, my_off_t &bytes_in_cache) { int error = 0; ulonglong buf_size; uchar *buffer = NULL; uint flag = 0; bool is_large_trx = false; // alloc the buffer error = mysql_bin_log.write_cache(thd, binlog_cache, writer); buf_size = my_b_tell(consensus_log_manager.get_cache()); // determine whether log is too large if (buf_size > opt_consensus_max_log_size) is_large_trx = TRUE; // group update do not support large trx DBUG_EXECUTE_IF("simulate_trx_cache_error", { if (thd->consensus_index != 0) mark_as_rollback = true; }); DBUG_EXECUTE_IF("simulate_trx_cache_error_slow", { if (thd->consensus_index != 0) { consensus_ptr->leaderTransfer(2); sleep(5); mark_as_rollback = true; } }); if (mark_as_rollback || (!opt_consensus_large_trx && is_large_trx)) { sql_print_warning( "Failed to flush log ,because consensus log is too large."); thd->mark_transaction_to_rollback(true); thd->commit_error = THD::CE_COMMIT_ERROR; thd->get_transaction()->m_flags.commit_low = false; thd->consensus_error = THD::CSS_LOG_TOO_LARGE; bytes_in_cache = 0; // clear the cache reinit_io_cache(consensus_log_manager.get_cache(), WRITE_CACHE, 0, 0, 1); // rollback logical clock mysql_bin_log.m_dependency_tracker.step_down(); goto end; } DBUG_EXECUTE_IF("force_large_trx", { is_large_trx = true; }); if (!is_large_trx) { uint64 crc32 = 0; // firstly write consensus log event if (!error) { buf_size = consensus_log_manager.serialize_cache(&buffer); thd->consensus_index = consensus_log_manager.get_current_index(); crc32 = opt_consensus_checksum ? checksum_crc32(0, buffer, buf_size) : 0; error = mysql_bin_log.write_consensus_log(flag, thd->consensus_term, buf_size, crc32); } // secondly write consensus_log_body if (!error) { int ret = consensus_log_manager.get_fifo_cache_manager()->add_log_to_cache( thd->consensus_term, thd->consensus_index, buf_size, buffer, FALSE, flag, crc32, true); error = mysql_bin_log.write_buf_to_log_file(buffer, buf_size); // ret == 1 means fifo do not use the buffer if (ret == 1 && buffer) my_free(buffer); buffer = NULL; /* fifo cache reuse the buffer */ } } else { if (!error) { size_t buffer_maxsize = opt_consensus_max_log_size > opt_consensus_large_event_split_size ? opt_consensus_max_log_size : opt_consensus_large_event_split_size; buffer = (uchar *)my_malloc(key_memory_thd_main_mem_root, buffer_maxsize, MYF(MY_WME)); error = large_trx_flush(thd, buffer, buf_size); } } end: if (buffer) { my_free(buffer); } return error; }