polardbxengine/storage/ndb/tools/ndb_lib_move_data.cpp

1136 lines
27 KiB
C++

/* Copyright (c) 2016, 2019, Oracle and/or its affiliates. All rights reserved.
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License, version 2.0,
as published by the Free Software Foundation.
This program is also distributed with certain software (including
but not limited to OpenSSL) that is licensed under separate terms,
as designated in a particular file or component or in included license
documentation. The authors of MySQL hereby grant you an additional
permission to link the program and your derivative works with the
separately licensed software that they have included with MySQL.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License, version 2.0, for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */
#include <ndb_global.h>
#include "m_ctype.h"
#include <NdbOut.hpp>
#include <NdbApi.hpp>
#include <NDBT.hpp>
#include <ndb_limits.h>
#include <ndb_lib_move_data.hpp>
#include <ndb_rand.h>
#define CHK1(b) \
if (!(b)) { \
ret = -1; \
abort_on_error(); \
break; \
}
#define CHK2(b, e) \
if (!(b)) { \
set_error_line(__LINE__); \
set_error_code e; \
ret = -1; \
abort_on_error(); \
break; \
}
Ndb_move_data::Ndb_move_data()
{
m_source = 0;
m_target = 0;
m_sourceattr = 0;
m_targetattr = 0;
m_data = 0;
m_error_insert = false;
}
Ndb_move_data::~Ndb_move_data()
{
delete [] m_sourceattr;
delete [] m_targetattr;
m_sourceattr = 0;
m_targetattr = 0;
release_data();
}
int
Ndb_move_data::init(const NdbDictionary::Table* source,
const NdbDictionary::Table* target)
{
int ret = 0;
do
{
CHK2(source != 0, (Error::InvalidSource, "null source table pointer"));
CHK2(source->getObjectStatus() == NdbDictionary::Object::Retrieved,
(Error::InvalidSource, "source table status is not Retrieved"));
CHK2(target != 0, (Error::InvalidTarget, "null target table pointer"));
CHK2(target->getObjectStatus() == NdbDictionary::Object::Retrieved,
(Error::InvalidTarget, "target table status is not Retrieved"));
m_source = source;
m_target = target;
}
while (0);
return ret;
}
Ndb_move_data::Attr::Attr()
{
column = 0;
name = 0;
id = -1;
map_id = -1;
type = TypeNone;
size_in_bytes = 0;
length_bytes = 0;
data_size = 0;
tiny_bytes = 0;
pad_char = -1;
equal = false;
}
void
Ndb_move_data::set_type(Attr& attr, const NdbDictionary::Column* c)
{
attr.column = c;
attr.name = c->getName();
attr.size_in_bytes = c->getSizeInBytes();
switch (c->getType()) {
case NdbDictionary::Column::Char:
case NdbDictionary::Column::Binary:
attr.type = Attr::TypeArray;
attr.length_bytes = 0;
attr.data_size = attr.size_in_bytes;
if (c->getType() == NdbDictionary::Column::Char)
attr.pad_char = 0x20;
else
attr.pad_char = 0x0;
break;
case NdbDictionary::Column::Varchar:
case NdbDictionary::Column::Varbinary:
attr.type = Attr::TypeArray;
attr.length_bytes = 1;
require(attr.size_in_bytes >= attr.length_bytes);
attr.data_size = attr.size_in_bytes - attr.length_bytes;
attr.pad_char = -1;
break;
case NdbDictionary::Column::Longvarchar:
case NdbDictionary::Column::Longvarbinary:
attr.type = Attr::TypeArray;
attr.length_bytes = 2;
require(attr.size_in_bytes >= attr.length_bytes);
attr.data_size = attr.size_in_bytes - attr.length_bytes;
attr.pad_char = -1;
break;
case NdbDictionary::Column::Text:
case NdbDictionary::Column::Blob:
attr.type = Attr::TypeBlob;
attr.length_bytes = 0;
if (unlikely(c->getPartSize() == 0))
{
/* Tiny BLOB/TEXT, store inline size */
attr.tiny_bytes = c->getInlineSize();
require(attr.tiny_bytes);
}
break;
default:
attr.type = Attr::TypeOther;
break;
}
}
Uint32
Ndb_move_data::calc_str_len_truncated(CHARSET_INFO *cs, const char *data, Uint32 maxlen)
{
if (!cs)
return maxlen; // no charset found, do not truncate
const char *begin = (const char*) data;
const char *end= (const char*) (data+maxlen);
int errors = 0;
// for multi-byte characters, truncate to last well-formed character before
// maxlen so that string is not truncated in the middle of a multi-byte char.
Uint32 numchars = cs->cset->numchars(cs, begin, end);
Uint32 wf_len = cs->cset->well_formed_len(cs, begin, end, numchars, &errors);
require(wf_len <= maxlen);
return wf_len;
}
int
Ndb_move_data::check_nopk(const Attr& attr1, const Attr& attr2)
{
int ret = 0;
const NdbDictionary::Column* c1 = attr1.column;
const NdbDictionary::Column* c2 = attr2.column;
do
{
CHK2(!c1->getPrimaryKey() && !c2->getPrimaryKey(),
(Error::UnsupportedConversion,
"cannot convert column #%d '%s'"
": primary key attributes not allowed here",
1+attr1.id, attr1.name));
}
while (0);
return ret;
}
int
Ndb_move_data::check_promotion(const Attr& attr1, const Attr& attr2)
{
(void)attr2;
int ret = 0;
const Opts& opts = m_opts;
do
{
CHK2(opts.flags & Opts::MD_ATTRIBUTE_PROMOTION,
(Error::NoPromotionFlag,
"cannot convert column #%d '%s'"
": promote-attributes has not been specified",
1+attr1.id, attr1.name));
}
while (0);
return ret;
}
int
Ndb_move_data::check_demotion(const Attr& attr1, const Attr& attr2)
{
(void)attr2;
int ret = 0;
const Opts& opts = m_opts;
do
{
CHK2(opts.flags & Opts::MD_ATTRIBUTE_DEMOTION,
(Error::NoDemotionFlag,
"cannot convert column #%d '%s'"
": demote-attributes has not been specified",
1+attr1.id, attr1.name));
}
while (0);
return ret;
}
int
Ndb_move_data::check_sizes(const Attr& attr1, const Attr& attr2)
{
int ret = 0;
do
{
if (attr1.data_size < attr2.data_size)
{
CHK1(check_promotion(attr1, attr2) == 0);
}
if (attr1.data_size > attr2.data_size)
{
CHK1(check_demotion(attr1, attr2) == 0);
}
}
while (0);
return ret;
}
int
Ndb_move_data::check_unsupported(const Attr& attr1, const Attr& attr2)
{
(void)attr2;
int ret = 0;
do
{
CHK2(false,
(Error::UnsupportedConversion,
"cannot convert column #%d '%s'"
": unimplemented conversion",
1+attr1.id, attr1.name));
}
while (0);
return ret;
}
int
Ndb_move_data::check_tables()
{
int ret = 0;
const Opts& opts = m_opts;
do
{
int attrcount1 = m_source->getNoOfColumns();
int attrcount2 = m_target->getNoOfColumns();
m_sourceattr = new Attr [attrcount1];
m_targetattr = new Attr [attrcount2];
// set type info, remap columns, check missing
for (int i1 = 0; i1 < attrcount1; i1++)
{
Attr& attr1 = m_sourceattr[i1];
attr1.id = i1;
const NdbDictionary::Column* c1 = m_source->getColumn(i1);
require(c1 != 0);
set_type(attr1, c1);
const NdbDictionary::Column* c2 = m_target->getColumn(attr1.name);
if (c2 == 0)
{
CHK2(opts.flags & Opts::MD_EXCLUDE_MISSING_COLUMNS,
(Error::NoExcludeMissingFlag,
"cannot convert source to target"
": source column #%d '%s' not found in target"
" and exclude-missing-columns has not been specified",
1+i1, attr1.name));
}
else
{
int i2 = c2->getColumnNo();
require(i2 >= 0 && i2 < attrcount2);
Attr& attr2 = m_targetattr[i2];
attr1.map_id = i2;
require(attr2.map_id == -1);
attr2.map_id = i1;
}
}
CHK1(ret == 0);
for (int i2 = 0; i2 < attrcount2; i2++)
{
Attr& attr2 = m_targetattr[i2];
attr2.id = i2;
const NdbDictionary::Column* c2 = m_target->getColumn(i2);
require(c2 != 0);
set_type(attr2, c2);
const NdbDictionary::Column* c1 = m_source->getColumn(attr2.name);
if (c1 == 0)
{
CHK2(opts.flags & Opts::MD_EXCLUDE_MISSING_COLUMNS,
(Error::NoExcludeMissingFlag,
"cannot convert source to target"
": target column #%d '%s' not found in source"
" and exclude-missing-columns has not been specified",
1+i2, attr2.name));
}
else
{
int i1 = c1->getColumnNo();
require(i2 >= 0 && i2 < attrcount2);
Attr& attr1 = m_sourceattr[i1];
require(attr1.map_id == i2);
require(attr2.map_id == i1);
}
}
CHK1(ret == 0);
// check conversion of non-excluded columns
for (int i1 = 0; i1 < attrcount1; i1++)
{
Attr& attr1 = m_sourceattr[i1];
int i2 = attr1.map_id;
if (i2 == -1) // excluded
continue;
Attr& attr2 = m_targetattr[i2];
{
/* Exclude internal implementation details when comparing */
NdbDictionary::Column a1ColCopy(*attr1.column);
NdbDictionary::Column a2ColCopy(*attr2.column);
/* [Non] Dynamic internal storage is irrelevant */
a1ColCopy.setDynamic(false);
a2ColCopy.setDynamic(false);
if ((attr1.equal = attr2.equal = a1ColCopy.equal(a2ColCopy)))
{
continue;
}
}
if (attr1.type == Attr::TypeArray &&
attr2.type == Attr::TypeBlob)
{
CHK1(check_nopk(attr1, attr2) == 0);
CHK1(check_promotion(attr1, attr2) == 0);
continue;
}
if (attr1.type == Attr::TypeBlob &&
attr2.type == Attr::TypeArray)
{
CHK1(check_nopk(attr1, attr2) == 0);
CHK1(check_demotion(attr1, attr2) == 0);
continue;
}
if (attr1.type == Attr::TypeArray &&
attr2.type == Attr::TypeArray)
{
CHK1(check_sizes(attr1, attr2) == 0);
continue;
}
if (attr1.type == Attr::TypeBlob &&
attr2.type == Attr::TypeBlob)
{
// TEXT and BLOB conversions
CHK1(check_sizes(attr1, attr2) == 0);
continue;
}
CHK1(check_unsupported(attr1, attr2) == 0);
}
CHK1(ret == 0);
}
while (0);
return ret;
}
Ndb_move_data::Data::Data()
{
data = 0;
next = 0;
}
Ndb_move_data::Data::~Data()
{
delete [] data;
data = 0;
}
char*
Ndb_move_data::alloc_data(Uint32 n)
{
Data* d = new Data;
d->data = new char [n];
d->next = m_data;
m_data = d;
return d->data;
}
void
Ndb_move_data::release_data()
{
while (m_data != 0)
{
Data* d = m_data;
m_data = d->next;
delete d;
}
}
Ndb_move_data::Op::Op()
{
ndb = 0;
scantrans = 0;
scanop = 0;
updatetrans = 0;
updateop = 0;
values = 0;
buflen = 32 * 1024;
static_assert(32 * 1024 >= (4 * MAX_TUPLE_SIZE_IN_WORDS), "");
require(buflen >= (4 * MAX_TUPLE_SIZE_IN_WORDS));
buf1 = new char [buflen];
buf2 = new char [buflen];
rows_in_batch = 0;
truncated_in_batch = 0;
end_of_scan = false;
}
Ndb_move_data::Op::~Op()
{
delete [] values;
delete [] buf1;
delete [] buf2;
values = 0;
buf1 = 0;
buf2 = 0;
}
int
Ndb_move_data::start_scan()
{
int ret = 0;
Op& op = m_op;
const int attrcount1 = m_source->getNoOfColumns();
do
{
require(op.scantrans == 0);
op.scantrans = op.ndb->startTransaction(m_source);
CHK2(op.scantrans != 0, (op.ndb->getNdbError()));
op.scanop = op.scantrans->getNdbScanOperation(m_source);
CHK2(op.scanop != 0, (op.scantrans->getNdbError()));
NdbOperation::LockMode lm = NdbOperation::LM_Exclusive;
Uint32 flags = 0;
CHK2(op.scanop->readTuples(lm, flags) == 0, (op.scanop->getNdbError()));
require(op.values == 0);
op.values = new Op::Value [attrcount1];
for (int i1 = 0; i1 < attrcount1; i1++)
{
const Attr& attr1 = m_sourceattr[i1];
if (attr1.type != Attr::TypeBlob)
{
NdbRecAttr* ra = op.scanop->getValue(i1);
CHK2(ra != 0, (op.scanop->getNdbError()));
op.values[i1].ra = ra;
}
else
{
NdbBlob* bh = op.scanop->getBlobHandle(i1);
CHK2(bh != 0, (op.scanop->getNdbError()));
op.values[i1].bh = bh;
}
}
CHK1(ret == 0);
CHK2(op.scantrans->execute(NdbTransaction::NoCommit) == 0,
(op.scantrans->getNdbError()));
}
while (0);
return ret;
}
/*
* Copy one attribute value. nextResult() re-defines ra/bh
* to point to the new row. Since we are batching, the data
* must be saved to remain valid until execute.
*/
int
Ndb_move_data::copy_other_to_other(const Attr& attr1, const Attr& attr2)
{
int ret = 0;
Op& op = m_op;
const int i1 = attr1.id;
const int i2 = attr2.id;
do
{
const NdbRecAttr* ra1 = op.values[i1].ra;
require(ra1 != 0 && ra1->isNULL() != -1);
if (ra1->isNULL())
{
const char* value = 0;
CHK2(op.updateop->setValue(i2, value) == 0,
(op.updateop->getNdbError()));
}
else
{
const char* value = ra1->aRef();
CHK2(op.updateop->setValue(i2, value) == 0,
(op.updateop->getNdbError()));
}
}
while (0);
return ret;
}
int
Ndb_move_data::copy_data_to_array(const char* data1, const Attr& attr2,
Uint32 length1, Uint32 length1x)
{
int ret = 0;
const Opts& opts = m_opts;
Op& op = m_op;
const int i2 = attr2.id;
do
{
if (data1 == 0)
{
CHK2(op.updateop->setValue(i2, data1) == 0,
(op.updateop->getNdbError()));
}
else
{
/*
* length1 is bytes present in data1
* length1x may be longer for blob source
* see invocation from array and blob source
*/
require(length1 <= length1x);
if (length1x > attr2.data_size)
{
require(opts.flags & Opts::MD_ATTRIBUTE_DEMOTION);
op.truncated_in_batch++;
length1 = calc_str_len_truncated(attr2.column->getCharset(),
data1, attr2.data_size);
}
uchar* uptr = (uchar*)&op.buf2[0];
switch (attr2.length_bytes) {
case 0:
break;
case 1:
require(length1 <= 0xFF);
uptr[0] = (uchar)length1;
break;
case 2:
require(length1 <= 0xFFFF);
uptr[0] = (uchar)(length1 & 0xFF);
uptr[1] = (uchar)(length1 >> 8);
break;
default:
require(false);
break;
}
char* data2 = &op.buf2[attr2.length_bytes];
memcpy(data2, data1, length1);
if (attr2.pad_char != -1)
{
char* pad_data2 = &data2[length1];
Uint32 pad_length2 = attr2.data_size - length1;
memset(pad_data2, attr2.pad_char, pad_length2);
}
CHK2(op.updateop->setValue(i2, (char*)op.buf2) == 0,
(op.updateop->getNdbError()));
}
}
while (0);
return ret;
}
int
Ndb_move_data::copy_array_to_array(const Attr& attr1, const Attr& attr2)
{
int ret = 0;
Op& op = m_op;
const int i1 = attr1.id;
do
{
const NdbRecAttr* ra1 = op.values[i1].ra;
require(ra1 != 0 && ra1->isNULL() != -1);
if (ra1->isNULL())
{
const char* data1 = 0;
CHK1(copy_data_to_array(data1, attr2, 0, 0) == 0);
}
else
{
Uint32 size1 = ra1->get_size_in_bytes();
require(size1 >= attr1.length_bytes);
Uint32 length1 = size1 - attr1.length_bytes;
const char* aref1 = ra1->aRef();
const char* data1 = &aref1[attr1.length_bytes];
if (attr1.length_bytes == 0)
while (length1 != 0 && data1[length1-1] == attr1.pad_char)
length1--;
CHK1(copy_data_to_array(data1, attr2, length1, length1) == 0);
}
}
while (0);
return ret;
}
int
Ndb_move_data::copy_array_to_blob(const Attr& attr1, const Attr& attr2)
{
int ret = 0;
Op& op = m_op;
const int i1 = attr1.id;
const int i2 = attr2.id;
do
{
const NdbRecAttr* ra1 = op.values[i1].ra;
require(ra1 != 0 && ra1->isNULL() != -1);
NdbBlob* bh2 = op.updateop->getBlobHandle(i2);
CHK2(bh2 != 0, (op.updateop->getNdbError()));
if (ra1->isNULL())
{
CHK2(bh2->setValue(0, 0) == 0, (bh2->getNdbError()));
}
else
{
Uint32 size1 = ra1->get_size_in_bytes();
require(size1 >= attr1.length_bytes);
Uint32 length1 = size1 - attr1.length_bytes;
const char* aref1 = ra1->aRef();
const char* data1 = &aref1[attr1.length_bytes];
if (attr1.length_bytes == 0)
while (length1 != 0 && data1[length1-1] == attr1.pad_char)
length1--;
if (unlikely(attr2.tiny_bytes && length1 > attr2.tiny_bytes))
{
require(m_opts.flags & Opts::MD_ATTRIBUTE_DEMOTION);
op.truncated_in_batch++;
length1 = calc_str_len_truncated(attr2.column->getCharset(),
data1, attr2.tiny_bytes);
}
char* data1copy = alloc_data(length1);
memcpy(data1copy, data1, length1);
CHK2(bh2->setValue(data1copy, length1) == 0, (bh2->getNdbError()));
}
}
while (0);
return ret;
}
int
Ndb_move_data::copy_blob_to_array(const Attr& attr1, const Attr& attr2)
{
int ret = 0;
Op& op = m_op;
const int i1 = attr1.id;
do
{
NdbBlob* bh1 = op.values[i1].bh;
require(bh1 != 0);
int isNull = -1;
CHK2(bh1->getNull(isNull) == 0, (bh1->getNdbError()));
require(isNull == 0 || isNull == 1);
Uint64 length64 = ~(Uint64)0;
CHK2(bh1->getLength(length64) == 0, (bh1->getNdbError()));
Uint32 data_length = (Uint32)length64;
require((uint64)data_length == length64);
if (isNull)
{
const char* data1 = 0;
CHK1(copy_data_to_array(data1, attr2, 0, 0) == 0);
}
else
{
char* data1 = &op.buf1[0];
Uint32 length1 = attr2.data_size;
require(length1 <= (unsigned)op.buflen); // avoid buffer overflow
CHK2(bh1->readData(data1, length1) == 0, (bh1->getNdbError()));
// pass also real length to detect truncation
CHK1(copy_data_to_array(data1, attr2, length1, data_length) == 0);
}
}
while (0);
return ret;
}
int
Ndb_move_data::copy_blob_to_blob(const Attr& attr1, const Attr& attr2)
{
int ret = 0;
Op& op = m_op;
const int i1 = attr1.id;
const int i2 = attr2.id;
do
{
NdbBlob* bh1 = op.values[i1].bh;
require(bh1 != 0);
int isNull = -1;
CHK2(bh1->getNull(isNull) == 0, (bh1->getNdbError()));
require(isNull == 0 || isNull == 1);
Uint64 length64 = ~(Uint64)0;
CHK2(bh1->getLength(length64) == 0, (bh1->getNdbError()));
Uint32 data_length = (Uint32)length64;
require((uint64)data_length == length64);
NdbBlob* bh2 = op.updateop->getBlobHandle(i2);
CHK2(bh2 != 0, (op.updateop->getNdbError()));
if (isNull)
{
CHK2(bh2->setValue(0, 0) == 0, (bh2->getNdbError()));
}
else
{
char* data = alloc_data(data_length);
Uint32 bytes = data_length;
CHK2(bh1->readData(data, bytes) == 0, (bh1->getNdbError()));
require(bytes == data_length);
// prevent TINYTEXT/TINYBLOB overflow by truncating data
if(attr2.tiny_bytes)
{
if(bytes > attr2.tiny_bytes)
{
require(m_opts.flags & Opts::MD_ATTRIBUTE_DEMOTION);
op.truncated_in_batch++;
data_length = calc_str_len_truncated(attr2.column->getCharset(),
data, attr2.tiny_bytes);
}
}
CHK2(bh2->setValue(data, data_length) == 0,
(bh2->getNdbError()));
}
}
while (0);
return ret;
}
int
Ndb_move_data::copy_attr(const Attr& attr1, const Attr& attr2)
{
int ret = 0;
require(attr1.map_id == attr2.id);
do
{
if (attr1.equal && attr1.type != Attr::TypeBlob)
{
require(attr2.equal && attr2.type != Attr::TypeBlob);
CHK1(copy_other_to_other(attr1, attr2) == 0);
break;
}
if (attr1.type == Attr::TypeArray &&
attr2.type == Attr::TypeArray)
{
CHK1(copy_array_to_array(attr1, attr2) == 0);
break;
}
if (attr1.type == Attr::TypeArray &&
attr2.type == Attr::TypeBlob)
{
CHK1(copy_array_to_blob(attr1, attr2) == 0);
break;
}
if (attr1.type == Attr::TypeBlob &&
attr2.type == Attr::TypeArray)
{
CHK1(copy_blob_to_array(attr1, attr2) == 0);
break;
}
if (attr1.type == Attr::TypeBlob &&
attr2.type == Attr::TypeBlob)
{
// handles TEXT and BLOB conversions
CHK1(copy_blob_to_blob(attr1, attr2) == 0);
break;
}
require(false);
}
while (0);
return ret;
}
int
Ndb_move_data::move_row()
{
int ret = 0;
Op& op = m_op;
const int attrcount1 = m_source->getNoOfColumns();
do
{
op.updateop = op.updatetrans->getNdbOperation(m_target);
CHK2(op.updateop != 0, (op.updatetrans->getNdbError()));
CHK2(op.updateop->insertTuple() == 0, (op.updateop->getNdbError()));
for (int j = 0; j <= 1; j++)
{
for (int i1 = 0; i1 < attrcount1; i1++)
{
const Attr& attr1 = m_sourceattr[i1];
int i2 = attr1.map_id;
if (unlikely(i2 == -1))
continue;
const Attr& attr2 = m_targetattr[i2];
const NdbDictionary::Column* c = attr2.column;
if (j == 0 && !c->getPrimaryKey())
continue;
if (j == 1 && c->getPrimaryKey())
continue;
CHK1(copy_attr(attr1, attr2) == 0);
}
CHK1(ret == 0);
}
CHK1(ret == 0);
CHK2(op.scanop->deleteCurrentTuple(op.updatetrans) == 0,
(op.scanop->getNdbError()));
}
while (0);
return ret;
}
int
Ndb_move_data::move_batch()
{
int ret = 0;
Op& op = m_op;
op.rows_in_batch = 0;
op.truncated_in_batch = 0;
do
{
int res;
CHK2((res = op.scanop->nextResult(true)) != -1,
(op.scanop->getNdbError()));
require(res == 0 || res == 1);
if (res == 1)
{
op.end_of_scan = true;
op.ndb->closeTransaction(op.scantrans);
op.scantrans = 0;
break;
}
require(op.updatetrans == 0);
op.updatetrans = op.ndb->startTransaction();
CHK2(op.updatetrans != 0, (op.ndb->getNdbError()));
do
{
CHK1(move_row() == 0);
op.rows_in_batch++;
CHK2((res = op.scanop->nextResult(false)) != -1,
(op.scanop->getNdbError()));
require(res == 0 || res == 2);
}
while (res == 0);
CHK1(ret == 0);
if (m_error_insert && ndb_rand() % 5 == 0)
{
invoke_error_insert();
CHK1(false);
}
CHK2(op.updatetrans->execute(NdbTransaction::Commit) == 0,
(op.updatetrans->getNdbError()));
op.ndb->closeTransaction(op.updatetrans);
op.updatetrans = 0;
}
while (0);
release_data();
return ret;
}
int
Ndb_move_data::move_data(Ndb* ndb)
{
int ret = 0;
Op& op = m_op;
Stat& stat = m_stat;
stat.rows_moved = 0; // keep rows_total
do
{
const NDB_TICKS now = NdbTick_getCurrentTicks();
ndb_srand((unsigned)now.getUint64());
reset_error();
CHK2(m_source != 0 && m_target != 0,
(Error::InvalidState, "source / target not defined"));
op.ndb = ndb;
CHK1(m_error.code == 0);
CHK1(check_tables() == 0);
CHK1(start_scan() == 0);
while (1)
{
CHK1(move_batch() == 0);
stat.rows_moved += op.rows_in_batch;
stat.rows_total += op.rows_in_batch;
stat.truncated += op.truncated_in_batch;
require(op.end_of_scan == (op.rows_in_batch == 0));
if (op.end_of_scan)
break;
}
CHK1(ret == 0);
}
while (0);
close_op(ndb, ret);
return ret;
}
void
Ndb_move_data::close_op(Ndb* ndb, int ret)
{
Op& op = m_op;
if (ret == 0)
{
require(op.scantrans == 0);
require(op.updatetrans == 0);
}
else
{
if (op.scantrans != 0)
{
ndb->closeTransaction(op.scantrans);
op.scantrans = 0;
}
if (op.updatetrans != 0)
{
ndb->closeTransaction(op.updatetrans);
op.updatetrans = 0;
}
}
delete [] op.values;
op.values = 0;
}
Ndb_move_data::Opts::Opts()
{
flags = 0;
}
void
Ndb_move_data::set_opts_flags(int flags)
{
Opts& opts = m_opts;
opts.flags = flags;
}
void
Ndb_move_data::unparse_opts_tries(char* opt, const Opts::Tries& ot)
{
sprintf(opt, "%d,%d,%d", ot.maxtries, ot.mindelay, ot.maxdelay);
}
static int
parse_opts_tries_field(const char*& t, int& out)
{
char* u = 0;
out = (int)strtol(t, &u, 10);
if (t == u)
return -1; // empty
if (out < 0)
return -1; // bad value
if (*u == 0)
return 0; // last one
if (*u != ',')
return -1; // bad char
t = u + 1;
return 1; // more after this one
}
int
Ndb_move_data::parse_opts_tries(const char* s, Opts::Tries& ot)
{
Opts::Tries out; // a copy so nothing is set on error
int ret;
if (s == 0)
return -1;
const char* t = s;
if ((ret = parse_opts_tries_field(t, out.maxtries)) > 0)
if ((ret = parse_opts_tries_field(t, out.mindelay)) > 0)
if ((ret = parse_opts_tries_field(t, out.maxdelay)) > 0)
return -1; // too many
if (ret < 0)
return -1;
if (out.mindelay > out.maxdelay)
return -1;
ot = out;
return 0;
}
Ndb_move_data::Stat::Stat()
{
rows_moved = 0;
rows_total = 0;
truncated = 0;
}
const Ndb_move_data::Stat&
Ndb_move_data::get_stat()
{
return m_stat;
}
Ndb_move_data::Error::Error()
{
line = 0;
code = 0;
memset(message, 0, sizeof(message));
}
bool
Ndb_move_data::Error::is_temporary() const
{
return code > 0 && ndberror.status == NdbError::TemporaryError;
}
const Ndb_move_data::Error&
Ndb_move_data::get_error()
{
return m_error;
}
void
Ndb_move_data::set_error_line(int line)
{
Error& e = m_error;
e.line = line;
}
void
Ndb_move_data::set_error_code(int code, const char* fmt, ...)
{
va_list ap;
va_start(ap, fmt);
require(code != 0);
Error& e = m_error;
e.code = code;
vsnprintf(e.message, sizeof(e.message), fmt, ap);
va_end(ap);
}
void
Ndb_move_data::set_error_code(const NdbError& ndberror)
{
Error& e = m_error;
set_error_code(ndberror.code, "%s", ndberror.message);
e.ndberror = ndberror;
}
void
Ndb_move_data::reset_error()
{
new (&m_error) Error;
}
NdbOut&
operator<<(NdbOut& out, const Ndb_move_data::Error& error)
{
if (error.code < 0)
out << "move data error " << error.code
<< ": " << error.message;
else if (error.code > 0)
out << "ndb error"
<< ": " << error.ndberror;
else
out << "no error";
out << " (at lib line " << error.line << ")" << endl;
return out;
}
void
Ndb_move_data::error_insert()
{
m_error_insert = true;
}
void
Ndb_move_data::invoke_error_insert()
{
NdbError ndberror;
ndberror.code = 9999;
ndberror.status = NdbError::TemporaryError;
ndberror.message = "Error insert";
set_error_line(0);
set_error_code(ndberror);
m_error_insert = false;
}
void
Ndb_move_data::abort_on_error()
{
const Opts& opts = m_opts;
require(!(opts.flags & Opts::MD_ABORT_ON_ERROR));
}