462 lines
15 KiB
C++
462 lines
15 KiB
C++
/*
|
|
Copyright (c) 2012, 2019, Oracle and/or its affiliates. All rights reserved.
|
|
|
|
This program is free software; you can redistribute it and/or modify
|
|
it under the terms of the GNU General Public License, version 2.0,
|
|
as published by the Free Software Foundation.
|
|
|
|
This program is also distributed with certain software (including
|
|
but not limited to OpenSSL) that is licensed under separate terms,
|
|
as designated in a particular file or component or in included license
|
|
documentation. The authors of MySQL hereby grant you an additional
|
|
permission to link the program and your derivative works with the
|
|
separately licensed software that they have included with MySQL.
|
|
|
|
This program is distributed in the hope that it will be useful,
|
|
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
GNU General Public License, version 2.0, for more details.
|
|
|
|
You should have received a copy of the GNU General Public License
|
|
along with this program; if not, write to the Free Software
|
|
Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
|
|
*/
|
|
|
|
#include "storage/ndb/plugin/ndb_repl_tab.h"
|
|
|
|
#include <stdio.h>
|
|
|
|
#include "mf_wcomp.h"
|
|
#include "sql/mysqld.h" // system_charset_info
|
|
#include "storage/ndb/plugin/ndb_share.h"
|
|
#include "storage/ndb/plugin/ndb_sleep.h"
|
|
#include "storage/ndb/plugin/ndb_table_guard.h"
|
|
|
|
Ndb_rep_tab_key::Ndb_rep_tab_key(const char *_db, const char *_table_name,
|
|
uint _server_id) {
|
|
uint db_len = (uint)strlen(_db);
|
|
uint tabname_len = (uint)strlen(_table_name);
|
|
assert(DB_MAXLEN < 256); /* Fits in Varchar */
|
|
assert(db_len <= DB_MAXLEN);
|
|
assert(tabname_len <= TABNAME_MAXLEN);
|
|
|
|
memcpy(&db[1], _db, db_len);
|
|
db[0] = db_len;
|
|
|
|
memcpy(&table_name[1], _table_name, tabname_len);
|
|
table_name[0] = tabname_len;
|
|
|
|
server_id = _server_id;
|
|
|
|
null_terminate_strings();
|
|
}
|
|
|
|
void Ndb_rep_tab_key::null_terminate_strings() {
|
|
assert((uint)db[0] <= DB_MAXLEN);
|
|
assert((uint)table_name[0] <= TABNAME_MAXLEN);
|
|
db[db[0] + 1] = '\0';
|
|
table_name[table_name[0] + 1] = '\0';
|
|
}
|
|
|
|
int Ndb_rep_tab_key::attempt_match(const char *keyptr, const uint keylen,
|
|
const char *candidateptr,
|
|
const uint candidatelen,
|
|
const int exactmatchvalue) {
|
|
if (my_strnncoll(system_charset_info, (const uchar *)keyptr, keylen,
|
|
(const uchar *)candidateptr, candidatelen) == 0) {
|
|
/* Exact match */
|
|
return exactmatchvalue;
|
|
} else if (my_wildcmp(system_charset_info, keyptr, keyptr + keylen,
|
|
candidateptr, candidateptr + candidatelen, '\\',
|
|
wild_one, wild_many) == 0) {
|
|
/* Wild match */
|
|
return 0;
|
|
}
|
|
|
|
/* No match */
|
|
return -1;
|
|
}
|
|
|
|
int Ndb_rep_tab_key::get_match_quality(const Ndb_rep_tab_key *key,
|
|
const Ndb_rep_tab_key *candidate_row) {
|
|
/* 0= No match
|
|
1= Loosest match
|
|
8= Best match
|
|
|
|
Actual mapping is :
|
|
db table serverid Quality
|
|
W W W 1
|
|
W W = 2
|
|
W = W 3
|
|
W = = 4
|
|
= W W 5
|
|
= W = 6
|
|
= = W 7
|
|
= = = 8
|
|
*/
|
|
int quality = MIN_MATCH_VAL;
|
|
|
|
int rc;
|
|
if ((rc = attempt_match(&key->db[1], key->db[0], &candidate_row->db[1],
|
|
candidate_row->db[0], EXACT_MATCH_DB)) == -1) {
|
|
/* No match, drop out now */
|
|
return 0;
|
|
}
|
|
quality += rc;
|
|
|
|
if ((rc = attempt_match(&key->table_name[1], key->table_name[0],
|
|
&candidate_row->table_name[1],
|
|
candidate_row->table_name[0],
|
|
EXACT_MATCH_TABLE_NAME)) == -1) {
|
|
/* No match, drop out now */
|
|
return 0;
|
|
}
|
|
quality += rc;
|
|
|
|
if (candidate_row->server_id == key->server_id) {
|
|
/* Exact match */
|
|
quality += EXACT_MATCH_SERVER_ID;
|
|
} else if (candidate_row->server_id != 0) {
|
|
/* No match */
|
|
return 0;
|
|
}
|
|
|
|
return quality;
|
|
}
|
|
|
|
Ndb_rep_tab_row::Ndb_rep_tab_row() : binlog_type(0), cfs_is_null(true) {
|
|
memset(conflict_fn_spec, 0, sizeof(conflict_fn_spec));
|
|
}
|
|
const char *Ndb_rep_tab_reader::ndb_rep_db = "mysql";
|
|
const char *Ndb_rep_tab_reader::ndb_replication_table = "ndb_replication";
|
|
const char *Ndb_rep_tab_reader::nrt_db = "db";
|
|
const char *Ndb_rep_tab_reader::nrt_table_name = "table_name";
|
|
const char *Ndb_rep_tab_reader::nrt_server_id = "server_id";
|
|
const char *Ndb_rep_tab_reader::nrt_binlog_type = "binlog_type";
|
|
const char *Ndb_rep_tab_reader::nrt_conflict_fn = "conflict_fn";
|
|
|
|
Ndb_rep_tab_reader::Ndb_rep_tab_reader()
|
|
: binlog_flags(NBT_DEFAULT), conflict_fn_spec(NULL), warning_msg(NULL) {}
|
|
|
|
int Ndb_rep_tab_reader::check_schema(const NdbDictionary::Table *reptab,
|
|
const char **error_str) {
|
|
DBUG_TRACE;
|
|
*error_str = NULL;
|
|
|
|
const NdbDictionary::Column *col_db, *col_table_name, *col_server_id,
|
|
*col_binlog_type, *col_conflict_fn;
|
|
if (reptab->getNoOfPrimaryKeys() != 3) {
|
|
*error_str = "Wrong number of primary key parts, expected 3";
|
|
return -2;
|
|
}
|
|
col_db = reptab->getColumn(*error_str = nrt_db);
|
|
if (col_db == NULL || !col_db->getPrimaryKey() ||
|
|
col_db->getType() != NdbDictionary::Column::Varbinary)
|
|
return -1;
|
|
col_table_name = reptab->getColumn(*error_str = nrt_table_name);
|
|
if (col_table_name == NULL || !col_table_name->getPrimaryKey() ||
|
|
col_table_name->getType() != NdbDictionary::Column::Varbinary)
|
|
return -1;
|
|
col_server_id = reptab->getColumn(*error_str = nrt_server_id);
|
|
if (col_server_id == NULL || !col_server_id->getPrimaryKey() ||
|
|
col_server_id->getType() != NdbDictionary::Column::Unsigned)
|
|
return -1;
|
|
col_binlog_type = reptab->getColumn(*error_str = nrt_binlog_type);
|
|
if (col_binlog_type == NULL || col_binlog_type->getPrimaryKey() ||
|
|
col_binlog_type->getType() != NdbDictionary::Column::Unsigned)
|
|
return -1;
|
|
col_conflict_fn = reptab->getColumn(*error_str = nrt_conflict_fn);
|
|
if (col_conflict_fn != NULL) {
|
|
if ((col_conflict_fn->getPrimaryKey()) ||
|
|
(col_conflict_fn->getType() != NdbDictionary::Column::Varbinary))
|
|
return -1;
|
|
}
|
|
|
|
return 0;
|
|
}
|
|
|
|
int Ndb_rep_tab_reader::scan_candidates(Ndb *ndb,
|
|
const NdbDictionary::Table *reptab,
|
|
const char *db, const char *table_name,
|
|
uint server_id,
|
|
Ndb_rep_tab_row &best_match) {
|
|
uint retries = 100;
|
|
int best_match_quality = 0;
|
|
NdbError ok;
|
|
NdbError ndberror;
|
|
|
|
/* Loop to enable temporary error retries */
|
|
while (true) {
|
|
ndberror = ok; /* reset */
|
|
NdbTransaction *trans = ndb->startTransaction();
|
|
if (trans == NULL) {
|
|
ndberror = ndb->getNdbError();
|
|
|
|
if (ndberror.status == NdbError::TemporaryError) {
|
|
if (retries--) {
|
|
ndb_trans_retry_sleep();
|
|
continue;
|
|
}
|
|
}
|
|
break;
|
|
}
|
|
NdbRecAttr *ra_binlog_type = NULL;
|
|
NdbRecAttr *ra_conflict_fn_spec = NULL;
|
|
Ndb_rep_tab_row row;
|
|
bool have_conflict_fn_col = (reptab->getColumn(nrt_conflict_fn) != NULL);
|
|
|
|
/* Define scan op on ndb_replication */
|
|
NdbScanOperation *scanOp = trans->getNdbScanOperation(reptab);
|
|
if (scanOp == NULL) {
|
|
ndberror = trans->getNdbError();
|
|
break;
|
|
}
|
|
|
|
if ((scanOp->readTuples(NdbScanOperation::LM_CommittedRead) != 0) ||
|
|
(scanOp->getValue(nrt_db, (char *)row.key.db) == NULL) ||
|
|
(scanOp->getValue(nrt_table_name, (char *)row.key.table_name) ==
|
|
NULL) ||
|
|
(scanOp->getValue(nrt_server_id, (char *)&row.key.server_id) == NULL) ||
|
|
((ra_binlog_type = scanOp->getValue(
|
|
nrt_binlog_type, (char *)&row.binlog_type)) == NULL) ||
|
|
(have_conflict_fn_col &&
|
|
((ra_conflict_fn_spec = scanOp->getValue(
|
|
nrt_conflict_fn, (char *)row.conflict_fn_spec)) == NULL))) {
|
|
ndberror = scanOp->getNdbError();
|
|
break;
|
|
}
|
|
|
|
if (trans->execute(NdbTransaction::NoCommit,
|
|
NdbOperation::AO_IgnoreError)) {
|
|
ndberror = trans->getNdbError();
|
|
ndb->closeTransaction(trans);
|
|
|
|
if (ndberror.status == NdbError::TemporaryError) {
|
|
if (retries--) {
|
|
ndb_trans_retry_sleep();
|
|
continue;
|
|
}
|
|
}
|
|
break;
|
|
}
|
|
|
|
/* Scroll through results, looking for best match */
|
|
DBUG_PRINT("info", ("Searching ndb_replication for %s.%s %u", db,
|
|
table_name, server_id));
|
|
|
|
bool ambiguous_match = false;
|
|
Ndb_rep_tab_key searchkey(db, table_name, server_id);
|
|
int scan_rc;
|
|
while ((scan_rc = scanOp->nextResult(true)) == 0) {
|
|
if (ra_binlog_type->isNULL() == 1) {
|
|
row.binlog_type = NBT_DEFAULT;
|
|
}
|
|
if (ra_conflict_fn_spec) {
|
|
row.set_conflict_fn_spec_null(ra_conflict_fn_spec->isNULL() == 1);
|
|
}
|
|
|
|
/* Compare row to searchkey to get quality of match */
|
|
int match_quality =
|
|
Ndb_rep_tab_key::get_match_quality(&searchkey, &row.key);
|
|
#ifndef DBUG_OFF
|
|
{
|
|
row.null_terminate_strings();
|
|
|
|
DBUG_PRINT("info", ("Candidate : %s.%s %u : %u %s"
|
|
" Match quality : %u.",
|
|
row.key.get_db(), row.key.get_table_name(),
|
|
row.key.server_id, row.binlog_type,
|
|
row.get_conflict_fn_spec(), match_quality));
|
|
}
|
|
#endif
|
|
|
|
if (match_quality > 0) {
|
|
if (match_quality == best_match_quality) {
|
|
ambiguous_match = true;
|
|
/* Ambiguous matches...*/
|
|
snprintf(warning_msg_buffer, sizeof(warning_msg_buffer),
|
|
"Ambiguous matches in %s.%s for %s.%s (%u)."
|
|
"Candidates : %s.%s (%u), %s.%s (%u).",
|
|
ndb_rep_db, ndb_replication_table, db, table_name, server_id,
|
|
&best_match.key.db[1], &best_match.key.table_name[1],
|
|
best_match.key.server_id, &row.key.db[1],
|
|
&row.key.table_name[1], row.key.server_id);
|
|
DBUG_PRINT("info", ("%s", warning_msg_buffer));
|
|
}
|
|
if (match_quality > best_match_quality) {
|
|
/* New best match */
|
|
best_match = row;
|
|
best_match_quality = match_quality;
|
|
ambiguous_match = false;
|
|
|
|
if (best_match_quality == Ndb_rep_tab_key::EXACT_MATCH_QUALITY) {
|
|
/* We're done */
|
|
break;
|
|
}
|
|
}
|
|
} /* if (match_quality > 0) */
|
|
} /* while ((scan_rc= scanOp->nextResult(true)) */
|
|
|
|
if (scan_rc < 0) {
|
|
ndberror = scanOp->getNdbError();
|
|
if (ndberror.status == NdbError::TemporaryError) {
|
|
if (retries--) {
|
|
ndb->closeTransaction(trans);
|
|
ndb_trans_retry_sleep();
|
|
continue;
|
|
}
|
|
}
|
|
}
|
|
|
|
ndb->closeTransaction(trans);
|
|
|
|
if (ambiguous_match) {
|
|
warning_msg = warning_msg_buffer;
|
|
best_match_quality = -1;
|
|
}
|
|
|
|
break;
|
|
} /* while(true) */
|
|
|
|
if (ndberror.code != 0) {
|
|
snprintf(warning_msg_buffer, sizeof(warning_msg_buffer),
|
|
"Unable to retrieve %s.%s, logging and "
|
|
"conflict resolution may not function "
|
|
"as intended (ndberror %u)",
|
|
ndb_rep_db, ndb_replication_table, ndberror.code);
|
|
warning_msg = warning_msg_buffer;
|
|
best_match_quality = -1;
|
|
}
|
|
|
|
return best_match_quality;
|
|
}
|
|
|
|
int Ndb_rep_tab_reader::lookup(Ndb *ndb,
|
|
/* Keys */
|
|
const char *db, const char *table_name,
|
|
uint server_id) {
|
|
DBUG_TRACE;
|
|
int error = 0;
|
|
NdbError ndberror;
|
|
const char *error_str = "<none>";
|
|
|
|
/* Set results to defaults */
|
|
binlog_flags = NBT_DEFAULT;
|
|
conflict_fn_spec = NULL;
|
|
warning_msg = NULL;
|
|
|
|
ndb->setDatabaseName(ndb_rep_db);
|
|
NdbDictionary::Dictionary *dict = ndb->getDictionary();
|
|
Ndb_table_guard ndbtab_g(dict, ndb_replication_table);
|
|
const NdbDictionary::Table *reptab = ndbtab_g.get_table();
|
|
|
|
do {
|
|
if (reptab == NULL) {
|
|
if (dict->getNdbError().classification == NdbError::SchemaError ||
|
|
dict->getNdbError().code == 4009) {
|
|
DBUG_PRINT("info",
|
|
("No %s.%s table", ndb_rep_db, ndb_replication_table));
|
|
return 0;
|
|
} else {
|
|
error = 0;
|
|
ndberror = dict->getNdbError();
|
|
break;
|
|
}
|
|
}
|
|
|
|
if ((error = check_schema(reptab, &error_str)) != 0) {
|
|
DBUG_PRINT("info", ("check_schema failed : %u, error_str : %s", error,
|
|
error_str));
|
|
break;
|
|
}
|
|
|
|
Ndb_rep_tab_row best_match_row;
|
|
|
|
int best_match_quality =
|
|
scan_candidates(ndb, reptab, db, table_name, server_id, best_match_row);
|
|
|
|
DBUG_PRINT("info", ("Best match at quality : %u", best_match_quality));
|
|
|
|
if (best_match_quality == -1) {
|
|
/* Problem in matching, message already set */
|
|
assert(warning_msg != NULL);
|
|
error = -3;
|
|
break;
|
|
}
|
|
if (best_match_quality == 0) {
|
|
/* No match : Use defaults */
|
|
} else {
|
|
/* Have a matching row, copy out values */
|
|
/* Ensure VARCHARs are usable as strings */
|
|
best_match_row.null_terminate_strings();
|
|
|
|
binlog_flags = (enum Ndb_binlog_type)best_match_row.binlog_type;
|
|
|
|
if (best_match_row.cfs_is_null) {
|
|
DBUG_PRINT("info", ("Conflict FN SPEC is Null"));
|
|
/* No conflict fn spec */
|
|
conflict_fn_spec = NULL;
|
|
} else {
|
|
const char *conflict_fn = best_match_row.get_conflict_fn_spec();
|
|
uint len = (uint)strlen(conflict_fn);
|
|
if ((len + 1) > sizeof(conflict_fn_buffer)) {
|
|
error = -2;
|
|
error_str = "Conflict function specification too long.";
|
|
break;
|
|
}
|
|
memcpy(conflict_fn_buffer, conflict_fn, len);
|
|
conflict_fn_buffer[len] = '\0';
|
|
conflict_fn_spec = conflict_fn_buffer;
|
|
}
|
|
}
|
|
} while (0);
|
|
|
|
/* Error handling */
|
|
if (error == 0) {
|
|
if (ndberror.code != 0) {
|
|
snprintf(warning_msg_buffer, sizeof(warning_msg_buffer),
|
|
"Unable to retrieve %s.%s, logging and "
|
|
"conflict resolution may not function "
|
|
"as intended (ndberror %u)",
|
|
ndb_rep_db, ndb_replication_table, ndberror.code);
|
|
warning_msg = warning_msg_buffer;
|
|
error = -4;
|
|
}
|
|
} else {
|
|
switch (error) {
|
|
case -1:
|
|
snprintf(warning_msg_buffer, sizeof(warning_msg_buffer),
|
|
"Missing or wrong type for column '%s'", error_str);
|
|
break;
|
|
case -2:
|
|
snprintf(warning_msg_buffer, sizeof(warning_msg_buffer), "%s",
|
|
error_str);
|
|
break;
|
|
case -3:
|
|
/* Message already set */
|
|
break;
|
|
default:
|
|
abort();
|
|
}
|
|
warning_msg = warning_msg_buffer;
|
|
error = 0; /* No real error, just use defaults */
|
|
}
|
|
|
|
DBUG_PRINT(
|
|
"info",
|
|
("Rc : %d Retrieved Binlog flags : %u and function spec : %s", error,
|
|
binlog_flags, (conflict_fn_spec != NULL ? conflict_fn_spec : "NULL")));
|
|
|
|
return error;
|
|
}
|
|
|
|
Uint32 Ndb_rep_tab_reader::get_binlog_flags() const { return binlog_flags; }
|
|
|
|
const char *Ndb_rep_tab_reader::get_conflict_fn_spec() const {
|
|
return conflict_fn_spec;
|
|
}
|
|
|
|
const char *Ndb_rep_tab_reader::get_warning_message() const {
|
|
return warning_msg;
|
|
}
|