3983 lines
88 KiB
C++
3983 lines
88 KiB
C++
/*
|
|
Copyright (c) 2018, 2019, Oracle and/or its affiliates. All rights reserved.
|
|
|
|
This program is free software; you can redistribute it and/or modify
|
|
it under the terms of the GNU General Public License, version 2.0,
|
|
as published by the Free Software Foundation.
|
|
|
|
This program is also distributed with certain software (including
|
|
but not limited to OpenSSL) that is licensed under separate terms,
|
|
as designated in a particular file or component or in included license
|
|
documentation. The authors of MySQL hereby grant you an additional
|
|
permission to link the program and your derivative works with the
|
|
separately licensed software that they have included with MySQL.
|
|
|
|
This program is distributed in the hope that it will be useful,
|
|
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
GNU General Public License, version 2.0, for more details.
|
|
|
|
You should have received a copy of the GNU General Public License
|
|
along with this program; if not, write to the Free Software
|
|
Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
|
|
*/
|
|
|
|
#include "NdbImportUtil.hpp"
|
|
|
|
#include "m_ctype.h"
|
|
#include "my_sys.h"
|
|
#include <NdbDictionaryImpl.hpp>
|
|
|
|
#include <Vector.hpp>
|
|
#include <inttypes.h>
|
|
// STL
|
|
#include <cmath>
|
|
|
|
NdbImportUtil::NdbImportUtil() :
|
|
m_util(*this),
|
|
c_stats(*this)
|
|
{
|
|
log_debug(1, "ctor");
|
|
c_rows_free = new RowList;
|
|
c_rows_free->set_stats(m_util.c_stats, "rows-free");
|
|
c_blobs_free = new BlobList;
|
|
add_pseudo_tables();
|
|
}
|
|
|
|
NdbImportUtil::~NdbImportUtil()
|
|
{
|
|
log_debug(1, "dtor");
|
|
delete c_blobs_free;
|
|
delete c_rows_free;
|
|
require(c_tables.m_tables.empty());
|
|
}
|
|
|
|
NdbOut&
|
|
operator<<(NdbOut& out, const NdbImportUtil& util)
|
|
{
|
|
out << "util ";
|
|
return out;
|
|
}
|
|
|
|
// DebugLogger
|
|
NdbImportUtil::DebugLogger::DebugLogger()
|
|
{
|
|
logfile = new FileOutputStream(stderr);
|
|
out = new NdbOut(* logfile);
|
|
mutex = NdbMutex_Create();
|
|
|
|
require(mutex != 0);
|
|
timer.start();
|
|
|
|
start.timer = & timer;
|
|
start.mutex = mutex;
|
|
stop.mutex = mutex;
|
|
}
|
|
|
|
NdbImportUtil::DebugLogger::~DebugLogger()
|
|
{
|
|
delete logfile;
|
|
delete out;
|
|
NdbMutex_Destroy(mutex);
|
|
}
|
|
|
|
NdbOut&
|
|
operator<<(NdbOut& out, const NdbImportUtil::DebugLogger::MessageStart & start)
|
|
{
|
|
/* Lock the logger mutex, obtain a timestamp, and print it. */
|
|
char buf[32];
|
|
NdbMutex_Lock(start.mutex);
|
|
start.timer->stop();
|
|
double t = (double) start.timer->elapsed_msec() / (double)1000.0;
|
|
sprintf(buf, "%8.3f: ", t);
|
|
out << buf;
|
|
return out;
|
|
}
|
|
|
|
NdbOut&
|
|
operator<<(NdbOut& out, const NdbImportUtil::DebugLogger::MessageStop & stop)
|
|
{
|
|
/* Print a final newline, then unlock the logger mutex. */
|
|
out << "\n";
|
|
NdbMutex_Unlock(stop.mutex);
|
|
return out;
|
|
}
|
|
|
|
// name
|
|
|
|
NdbImportUtil::Name::Name(const char* s)
|
|
{
|
|
require(s != 0);
|
|
m_str = s;
|
|
}
|
|
|
|
NdbImportUtil::Name::Name(const char* s, const char* t)
|
|
{
|
|
require(s != 0 && t != 0);
|
|
m_str = s;
|
|
m_str += "-";
|
|
m_str += t;
|
|
}
|
|
|
|
NdbImportUtil::Name::Name(const char* s, uint t)
|
|
{
|
|
require(s != 0);
|
|
m_str = s;
|
|
m_str += "-";
|
|
char b[100];
|
|
sprintf(b, "%u", t);
|
|
m_str += b;
|
|
}
|
|
|
|
NdbOut&
|
|
operator<<(NdbOut& out, const NdbImportUtil::Name& name)
|
|
{
|
|
out << (const char*)name;
|
|
return out;
|
|
}
|
|
|
|
// lockable
|
|
|
|
NdbImportUtil::Lockable::Lockable()
|
|
{
|
|
m_mutex = NdbMutex_Create();
|
|
m_condition = NdbCondition_Create();
|
|
require(m_mutex != 0 && m_condition != 0);
|
|
}
|
|
|
|
NdbImportUtil::Lockable::~Lockable()
|
|
{
|
|
NdbCondition_Destroy(m_condition);
|
|
NdbMutex_Destroy(m_mutex);
|
|
}
|
|
|
|
void
|
|
NdbImportUtil::Lockable::lock()
|
|
{
|
|
NdbMutex_Lock(m_mutex);
|
|
}
|
|
|
|
void
|
|
NdbImportUtil::Lockable::unlock()
|
|
{
|
|
NdbMutex_Unlock(m_mutex);
|
|
}
|
|
|
|
void
|
|
NdbImportUtil::Lockable::wait(uint timeout)
|
|
{
|
|
NdbCondition_WaitTimeout(m_condition, m_mutex, timeout);
|
|
}
|
|
|
|
void
|
|
NdbImportUtil::Lockable::signal()
|
|
{
|
|
NdbCondition_Signal(m_condition);
|
|
}
|
|
|
|
// thread
|
|
|
|
NdbImportUtil::Thread::Thread()
|
|
{
|
|
m_thread = 0;
|
|
}
|
|
|
|
NdbImportUtil::Thread::~Thread()
|
|
{
|
|
if (m_thread != 0)
|
|
NdbThread_Destroy(&m_thread);
|
|
}
|
|
|
|
void
|
|
NdbImportUtil::Thread::join()
|
|
{
|
|
require(m_thread != 0);
|
|
NdbThread_WaitFor(m_thread, (void**)0);
|
|
NdbThread_Destroy(&m_thread);
|
|
}
|
|
|
|
// list
|
|
|
|
NdbImportUtil::ListEnt::ListEnt()
|
|
{
|
|
m_next = 0;
|
|
m_prev = 0;
|
|
}
|
|
|
|
NdbImportUtil::ListEnt::~ListEnt()
|
|
{
|
|
}
|
|
|
|
NdbImportUtil::List::List()
|
|
{
|
|
m_front = 0;
|
|
m_back = 0;
|
|
m_cnt = 0;
|
|
m_maxcnt = 0;
|
|
m_totcnt = 0;
|
|
m_stat_occup = 0;
|
|
m_stat_total = 0;
|
|
}
|
|
|
|
NdbImportUtil::List::~List()
|
|
{
|
|
ListEnt* ent;
|
|
while ((ent = pop_front()) != 0)
|
|
delete ent;
|
|
}
|
|
|
|
void
|
|
NdbImportUtil::List::set_stats(Stats& stats, const char* name)
|
|
{
|
|
{
|
|
const Name statname(name, "occup");
|
|
Stat* stat = stats.create(statname, 0, 0);
|
|
m_stat_occup = stat;
|
|
}
|
|
{
|
|
const Name statname(name, "total");
|
|
Stat* stat = stats.create(statname, 0, 0);
|
|
m_stat_total = stat;
|
|
}
|
|
}
|
|
|
|
void
|
|
NdbImportUtil::List::push_back(ListEnt* ent)
|
|
{
|
|
require(ent != 0);
|
|
require(ent->m_next == 0 && ent->m_prev == 0);
|
|
if (m_cnt == 0)
|
|
{
|
|
m_front = m_back = ent;
|
|
ent->m_next = 0;
|
|
ent->m_prev = 0;
|
|
}
|
|
else
|
|
{
|
|
m_back->m_next = ent;
|
|
ent->m_next = 0;
|
|
ent->m_prev = m_back;
|
|
m_back = ent;
|
|
}
|
|
m_cnt++;
|
|
if (m_maxcnt < m_cnt)
|
|
{
|
|
m_maxcnt++;
|
|
require(m_maxcnt == m_cnt);
|
|
}
|
|
m_totcnt++;
|
|
validate();
|
|
if (m_stat_occup != 0)
|
|
m_stat_occup->add(m_cnt);
|
|
if (m_stat_total != 0)
|
|
m_stat_total->add(1);
|
|
}
|
|
|
|
void
|
|
NdbImportUtil::List::push_front(ListEnt* ent)
|
|
{
|
|
require(ent != 0);
|
|
require(ent->m_next == 0 && ent->m_prev == 0);
|
|
if (m_cnt == 0)
|
|
{
|
|
m_front = m_back = ent;
|
|
ent->m_next = 0;
|
|
ent->m_prev = 0;
|
|
}
|
|
else
|
|
{
|
|
m_front->m_prev = ent;
|
|
ent->m_prev = 0;
|
|
ent->m_next = m_front;
|
|
m_front = ent;
|
|
}
|
|
m_cnt++;
|
|
if (m_maxcnt < m_cnt)
|
|
{
|
|
m_maxcnt++;
|
|
require(m_maxcnt == m_cnt);
|
|
}
|
|
m_totcnt++;
|
|
validate();
|
|
if (m_stat_occup != 0)
|
|
m_stat_occup->add(m_cnt);
|
|
if (m_stat_total != 0)
|
|
m_stat_total->add(1);
|
|
}
|
|
|
|
void
|
|
NdbImportUtil::List::push_after(ListEnt* ent1, ListEnt* ent2)
|
|
{
|
|
require(ent1 != 0 && ent2 != 0);
|
|
require(ent2->m_next == 0 && ent2->m_prev == 0);
|
|
ListEnt* ent3 = ent1->m_next;
|
|
if (ent3 == 0)
|
|
{
|
|
push_back(ent2);
|
|
return;
|
|
}
|
|
ent1->m_next = ent2;
|
|
ent2->m_prev = ent1;
|
|
ent2->m_next = ent3;
|
|
ent3->m_prev = ent2;
|
|
m_cnt++;
|
|
if (m_maxcnt < m_cnt)
|
|
{
|
|
m_maxcnt++;
|
|
require(m_maxcnt == m_cnt);
|
|
}
|
|
m_totcnt++;
|
|
validate();
|
|
if (m_stat_occup != 0)
|
|
m_stat_occup->add(m_cnt);
|
|
if (m_stat_total != 0)
|
|
m_stat_total->add(1);
|
|
}
|
|
|
|
void
|
|
NdbImportUtil::List::push_before(ListEnt* ent1, ListEnt* ent2)
|
|
{
|
|
require(ent1 != 0 && ent2 != 0);
|
|
require(ent2->m_next == 0 && ent2->m_prev == 0);
|
|
ListEnt* ent3 = ent1->m_prev;
|
|
if (ent3 == 0)
|
|
{
|
|
push_front(ent2);
|
|
return;
|
|
}
|
|
ent1->m_prev = ent2;
|
|
ent2->m_next = ent1;
|
|
ent2->m_prev = ent3;
|
|
ent3->m_next = ent2;
|
|
m_cnt++;
|
|
if (m_maxcnt < m_cnt)
|
|
{
|
|
m_maxcnt++;
|
|
require(m_maxcnt == m_cnt);
|
|
}
|
|
m_totcnt++;
|
|
validate();
|
|
if (m_stat_occup != 0)
|
|
m_stat_occup->add(m_cnt);
|
|
if (m_stat_total != 0)
|
|
m_stat_total->add(1);
|
|
}
|
|
|
|
NdbImportUtil::ListEnt*
|
|
NdbImportUtil::List::pop_front()
|
|
{
|
|
ListEnt* ent = 0;
|
|
if (m_cnt != 0)
|
|
{
|
|
if (m_cnt == 1)
|
|
{
|
|
ent = m_front;
|
|
m_front = m_back = 0;
|
|
}
|
|
else
|
|
{
|
|
ent = m_front;
|
|
m_front = ent->m_next;
|
|
m_front->m_prev = 0;
|
|
ent->m_next = 0;
|
|
ent->m_prev = 0;
|
|
}
|
|
m_cnt--;
|
|
validate();
|
|
if (m_stat_occup != 0)
|
|
m_stat_occup->add(m_cnt);
|
|
}
|
|
return ent;
|
|
}
|
|
|
|
void
|
|
NdbImportUtil::List::remove(ListEnt* ent)
|
|
{
|
|
ListEnt* prev = ent->m_prev;
|
|
ListEnt* next = ent->m_next;
|
|
ent->m_prev = 0;
|
|
ent->m_next = 0;
|
|
if (prev != 0)
|
|
prev->m_next = next;
|
|
if (next != 0)
|
|
next->m_prev = prev;
|
|
if (m_front == ent)
|
|
m_front = next;
|
|
if (m_back == ent)
|
|
m_back = prev;
|
|
require(m_cnt != 0);
|
|
m_cnt--;
|
|
validate();
|
|
if (m_stat_occup != 0)
|
|
m_stat_occup->add(m_cnt);
|
|
}
|
|
|
|
void
|
|
NdbImportUtil::List::push_back_from(List& src)
|
|
{
|
|
if (src.m_cnt != 0)
|
|
{
|
|
if (m_cnt != 0)
|
|
{
|
|
ListEnt* ent1 = m_back;
|
|
ListEnt* ent2 = src.m_front;
|
|
require(ent1 != 0 && ent2 != 0);
|
|
require(ent1->m_next == 0 && ent2->m_prev == 0);
|
|
// push src to the back
|
|
ent1->m_next = ent2;
|
|
ent2->m_prev = ent1;
|
|
m_back = src.m_back;
|
|
m_cnt += src.m_cnt;
|
|
}
|
|
else
|
|
{
|
|
m_front = src.m_front;
|
|
m_back = src.m_back;
|
|
m_cnt = src.m_cnt;
|
|
}
|
|
if (m_maxcnt < m_cnt)
|
|
m_maxcnt = m_cnt;
|
|
m_totcnt += src.m_cnt;
|
|
}
|
|
validate();
|
|
// erase src but leave stats alone
|
|
src.m_front = 0;
|
|
src.m_back = 0;
|
|
src.m_cnt = 0;
|
|
}
|
|
|
|
#if defined(VM_TRACE) || defined(TEST_NDBIMPORTUTIL)
|
|
void
|
|
NdbImportUtil::List::validate() const
|
|
{
|
|
if (m_cnt == 0)
|
|
{
|
|
require(m_front == 0);
|
|
require(m_back == 0);
|
|
}
|
|
else
|
|
{
|
|
require(m_front != 0);
|
|
require(m_front->m_prev == 0);
|
|
require(m_back != 0);
|
|
require(m_back->m_next == 0);
|
|
if (m_cnt == 1)
|
|
require(m_front == m_back);
|
|
else
|
|
require(m_front != m_back);
|
|
}
|
|
#if defined(VM_TRACE) && defined(TEST_NDBIMPORTUTIL)
|
|
uint cnt = 0;
|
|
ListEnt* ent1 = m_front;
|
|
ListEnt* ent2 = 0;
|
|
while (ent1 != 0)
|
|
{
|
|
require(ent1->m_prev == ent2);
|
|
if (ent2 != 0)
|
|
require(ent2->m_next == ent1);
|
|
ent2 = ent1;
|
|
ent1 = ent1->m_next;
|
|
cnt++;
|
|
}
|
|
require(m_cnt == cnt);
|
|
#endif
|
|
}
|
|
#endif
|
|
|
|
// attrs
|
|
|
|
NdbImportUtil::Attr::Attr()
|
|
{
|
|
m_attrname = "";
|
|
m_attrno = Inval_uint;
|
|
m_attrid = Inval_uint;
|
|
m_type = NdbDictionary::Column::Undefined;
|
|
m_charset = 0;
|
|
memset(m_sqltype, 0, sizeof(m_sqltype));
|
|
m_pk = false;
|
|
m_nullable = false;
|
|
m_precision = 0;
|
|
m_scale = 0;
|
|
m_length = 0;
|
|
m_charlength = 0;
|
|
m_arraytype = NdbDictionary::Column::ArrayTypeFixed;
|
|
m_inlinesize = 0;
|
|
m_partsize = 0;
|
|
m_blobtable = 0;
|
|
m_size = 0;
|
|
m_pad = false;
|
|
m_padchar = 0;
|
|
m_quotable = false;
|
|
m_isblob = false;
|
|
m_blobno = Inval_uint;
|
|
m_offset = 0;
|
|
m_null_byte = Inval_uint;
|
|
m_null_bit = Inval_uint;
|
|
}
|
|
|
|
void
|
|
NdbImportUtil::Attr::set_value(Row* row, const void* data, uint len) const
|
|
{
|
|
require(!m_isblob);
|
|
require(data != 0);
|
|
uint totlen = m_arraytype + len;
|
|
require(totlen <= m_size);
|
|
require(m_offset + totlen <= row->m_recsize);
|
|
uchar* p = &row->m_data[m_offset];
|
|
switch (m_arraytype) {
|
|
case 0:
|
|
break;
|
|
case 1:
|
|
require(len <= 0xff);
|
|
p[0] = (uchar)len;
|
|
p += 1;
|
|
break;
|
|
case 2:
|
|
require(len <= 0xffff);
|
|
p[0] = (uchar)(len & 0xff);
|
|
p[1] = (uchar)(len >> 8);
|
|
p += 2;
|
|
break;
|
|
default:
|
|
require(false);
|
|
break;
|
|
}
|
|
memcpy(p, data, len);
|
|
if (m_pad)
|
|
memset(&p[totlen], m_padchar, m_size - totlen);
|
|
if (m_nullable)
|
|
set_null(row, false);
|
|
}
|
|
|
|
void
|
|
NdbImportUtil::Attr::set_blob(Row* row, const void* data, uint len) const
|
|
{
|
|
require(m_isblob);
|
|
require(data != 0);
|
|
require(m_blobno < row->m_blobs.size());
|
|
Blob* blob = row->m_blobs[m_blobno];
|
|
require(blob != 0);
|
|
blob->resize(len);
|
|
memcpy(blob->m_data, data, len);
|
|
blob->m_blobsize = len;
|
|
if (m_nullable)
|
|
set_null(row, false);
|
|
// add to rowsize which already includes recsize
|
|
require(row->m_rowsize >= row->m_recsize);
|
|
row->m_rowsize += len;
|
|
}
|
|
|
|
void
|
|
NdbImportUtil::Attr::set_null(Row* row, bool null) const
|
|
{
|
|
uchar* data = row->m_data;
|
|
uchar mask = (1 << m_null_bit);
|
|
if (null)
|
|
data[m_null_byte] |= mask;
|
|
else
|
|
data[m_null_byte] &= ~mask;
|
|
}
|
|
|
|
const uchar*
|
|
NdbImportUtil::Attr::get_value(const Row* row) const
|
|
{
|
|
const uchar* p = &row->m_data[m_offset];
|
|
return p;
|
|
}
|
|
|
|
void
|
|
NdbImportUtil::Attr::get_value(const Row* row, uint32& value) const
|
|
{
|
|
const uchar* p = get_value(row);
|
|
if (m_type == NdbDictionary::Column::Unsigned)
|
|
{
|
|
memcpy(&value, p, sizeof(value));
|
|
return;
|
|
}
|
|
require(false);
|
|
}
|
|
|
|
void
|
|
NdbImportUtil::Attr::get_value(const Row* row, uint64& value) const
|
|
{
|
|
const uchar* p = get_value(row);
|
|
if (m_type == NdbDictionary::Column::Bigunsigned)
|
|
{
|
|
memcpy(&value, p, sizeof(value));
|
|
return;
|
|
}
|
|
require(false);
|
|
}
|
|
|
|
void
|
|
NdbImportUtil::Attr::get_value(const Row* row, char* buf, uint bufsz) const
|
|
{
|
|
const uchar* p = get_value(row);
|
|
memset(buf, 0, bufsz);
|
|
if (m_type == NdbDictionary::Column::Varchar) {
|
|
uint sz = p[0];
|
|
require(sz < bufsz);
|
|
memcpy(buf, &p[1], sz);
|
|
return;
|
|
}
|
|
if (m_type == NdbDictionary::Column::Longvarchar) {
|
|
uint sz = p[0] | (p[1] << 8);
|
|
require(sz < bufsz);
|
|
memcpy(buf, &p[2], sz);
|
|
return;
|
|
}
|
|
require(false);
|
|
}
|
|
|
|
bool
|
|
NdbImportUtil::Attr::get_null(const Row* row) const
|
|
{
|
|
bool null = false;
|
|
if (m_nullable)
|
|
{
|
|
const uchar* data = row->m_data;
|
|
uchar mask = (1 << m_null_bit);
|
|
null = (bool)(data[m_null_byte] & mask);
|
|
}
|
|
return null;
|
|
}
|
|
|
|
uint
|
|
NdbImportUtil::Attr::get_blob_parts(uint len) const
|
|
{
|
|
require(m_isblob);
|
|
uint parts = 0;
|
|
if (len > m_inlinesize)
|
|
{
|
|
require(m_partsize != 0);
|
|
parts = (len -m_inlinesize + m_partsize - 1) / m_partsize;
|
|
}
|
|
return parts;
|
|
}
|
|
|
|
// could go in NdbDictionary
|
|
void
|
|
NdbImportUtil::Attr::set_sqltype()
|
|
{
|
|
m_sqltype[0] = 0;
|
|
switch (m_type) {
|
|
case NdbDictionary::Column::Tinyint:
|
|
sprintf(m_sqltype, "tinyint");
|
|
break;
|
|
case NdbDictionary::Column::Smallint:
|
|
sprintf(m_sqltype, "smallint");
|
|
break;
|
|
case NdbDictionary::Column::Mediumint:
|
|
sprintf(m_sqltype, "mediumint");
|
|
break;
|
|
case NdbDictionary::Column::Int:
|
|
sprintf(m_sqltype, "int");
|
|
break;
|
|
case NdbDictionary::Column::Bigint:
|
|
sprintf(m_sqltype, "bigint");
|
|
break;
|
|
case NdbDictionary::Column::Tinyunsigned:
|
|
sprintf(m_sqltype, "tinyint unsigned");
|
|
break;
|
|
case NdbDictionary::Column::Smallunsigned:
|
|
sprintf(m_sqltype, "smallint unsigned");
|
|
break;
|
|
case NdbDictionary::Column::Mediumunsigned:
|
|
sprintf(m_sqltype, "mediumint unsigned");
|
|
break;
|
|
case NdbDictionary::Column::Unsigned:
|
|
sprintf(m_sqltype, "int unsigned");
|
|
break;
|
|
case NdbDictionary::Column::Bigunsigned:
|
|
sprintf(m_sqltype, "bigint unsigned");
|
|
break;
|
|
case NdbDictionary::Column::Decimal:
|
|
sprintf(m_sqltype, "decimal");
|
|
break;
|
|
case NdbDictionary::Column::Decimalunsigned:
|
|
sprintf(m_sqltype, "decimal unsigned");
|
|
break;
|
|
case NdbDictionary::Column::Float:
|
|
sprintf(m_sqltype, "float");
|
|
break;
|
|
case NdbDictionary::Column::Double:
|
|
sprintf(m_sqltype, "double");
|
|
break;
|
|
case NdbDictionary::Column::Char:
|
|
require(m_charset != 0 && m_charset->csname != 0);
|
|
sprintf(m_sqltype, "char(%u) %s", m_charlength, m_charset->csname);
|
|
break;
|
|
case NdbDictionary::Column::Varchar:
|
|
require(m_charset != 0 && m_charset->csname != 0);
|
|
sprintf(m_sqltype, "varchar(%u) %s", m_charlength, m_charset->csname);
|
|
break;
|
|
case NdbDictionary::Column::Longvarchar:
|
|
require(m_charset != 0 && m_charset->csname != 0);
|
|
sprintf(m_sqltype, "varchar(%u) %s", m_charlength, m_charset->csname);
|
|
break;
|
|
case NdbDictionary::Column::Binary:
|
|
sprintf(m_sqltype, "binary(%u)", m_length);
|
|
break;
|
|
case NdbDictionary::Column::Varbinary:
|
|
sprintf(m_sqltype, "varbinary(%u)", m_length);
|
|
break;
|
|
case NdbDictionary::Column::Longvarbinary:
|
|
sprintf(m_sqltype, "varbinary(%u)", m_length);
|
|
break;
|
|
case NdbDictionary::Column::Bit:
|
|
sprintf(m_sqltype, "bit(%u)", m_length);
|
|
break;
|
|
case NdbDictionary::Column::Year:
|
|
sprintf(m_sqltype, "year");
|
|
break;
|
|
case NdbDictionary::Column::Date:
|
|
sprintf(m_sqltype, "date");
|
|
break;
|
|
case NdbDictionary::Column::Time2:
|
|
if (m_precision == 0)
|
|
sprintf(m_sqltype, "time");
|
|
else
|
|
sprintf(m_sqltype, "time(%u)", m_precision);
|
|
break;
|
|
case NdbDictionary::Column::Datetime2:
|
|
if (m_precision == 0)
|
|
sprintf(m_sqltype, "datetime");
|
|
else
|
|
sprintf(m_sqltype, "datetime(%u)", m_precision);
|
|
break;
|
|
case NdbDictionary::Column::Timestamp2:
|
|
if (m_precision == 0)
|
|
sprintf(m_sqltype, "timestamp");
|
|
else
|
|
sprintf(m_sqltype, "timestamp(%u)", m_precision);
|
|
break;
|
|
case NdbDictionary::Column::Blob:
|
|
sprintf(m_sqltype, "blob");
|
|
break;
|
|
case NdbDictionary::Column::Text:
|
|
sprintf(m_sqltype, "text");
|
|
break;
|
|
default:
|
|
sprintf(m_sqltype, "unknown type=%d", (int)m_type);
|
|
break;
|
|
}
|
|
}
|
|
|
|
// tables
|
|
|
|
NdbImportUtil::Table::Table()
|
|
{
|
|
m_tabid = Inval_uint;
|
|
m_tab = 0;
|
|
m_rec = 0;
|
|
m_keyrec = NULL;
|
|
m_recsize = 0;
|
|
m_has_hidden_pk = false;
|
|
}
|
|
|
|
void
|
|
NdbImportUtil::Table::add_pseudo_attr(const char* name,
|
|
NdbDictionary::Column::Type type,
|
|
uint length)
|
|
{
|
|
const uint id = m_attrs.size();
|
|
Attr attr; // ctor sets defaults
|
|
attr.m_attrname = name;
|
|
attr.m_attrno = id;
|
|
attr.m_attrid = id;
|
|
attr.m_type = type;
|
|
attr.m_length = length;
|
|
attr.m_charlength = length;
|
|
switch (type) {
|
|
case NdbDictionary::Column::Unsigned:
|
|
require(length == 1);
|
|
attr.m_arraytype = NdbDictionary::Column::ArrayTypeFixed;
|
|
attr.m_size = 4;
|
|
attr.m_quotable = false;
|
|
break;
|
|
case NdbDictionary::Column::Bigunsigned:
|
|
require(length == 1);
|
|
attr.m_arraytype = NdbDictionary::Column::ArrayTypeFixed;
|
|
attr.m_size = 8;
|
|
attr.m_quotable = false;
|
|
break;
|
|
case NdbDictionary::Column::Double:
|
|
require(length == 1);
|
|
attr.m_arraytype = NdbDictionary::Column::ArrayTypeFixed;
|
|
attr.m_size = 8;
|
|
attr.m_quotable = false;
|
|
break;
|
|
case NdbDictionary::Column::Varchar:
|
|
require(length > 1);
|
|
attr.m_charset = &my_charset_bin;
|
|
attr.m_arraytype = NdbDictionary::Column::ArrayTypeShortVar;
|
|
attr.m_size = 1 + length;
|
|
attr.m_quotable = true;
|
|
break;
|
|
case NdbDictionary::Column::Longvarchar:
|
|
require(length > 1);
|
|
attr.m_charset = &my_charset_bin;
|
|
attr.m_arraytype = NdbDictionary::Column::ArrayTypeMediumVar;
|
|
attr.m_size = 2 + length;
|
|
attr.m_quotable = true;
|
|
break;
|
|
case NdbDictionary::Column::Text:
|
|
attr.m_charset = &my_charset_bin;
|
|
attr.m_inlinesize = 256;
|
|
attr.m_partsize = 2000;
|
|
attr.m_isblob = true;
|
|
attr.m_blobno = m_blobids.size();
|
|
m_blobids.push_back(id);
|
|
attr.m_quotable = true;
|
|
break;
|
|
default:
|
|
require(false);
|
|
break;
|
|
};
|
|
attr.set_sqltype();
|
|
if (id == 0)
|
|
attr.m_offset = 0;
|
|
else
|
|
{
|
|
const Attr& prevattr = m_attrs[id - 1];
|
|
attr.m_offset = prevattr.m_offset + prevattr.m_size;
|
|
}
|
|
attr.m_null_byte = Inval_uint;
|
|
attr.m_null_bit = Inval_uint;
|
|
m_recsize += attr.m_size;
|
|
m_attrs.push_back(attr);
|
|
}
|
|
|
|
const NdbImportUtil::Attr&
|
|
NdbImportUtil::Table::get_attr(const char* attrname) const
|
|
{
|
|
uint i = 0;
|
|
const uint n = m_attrs.size();
|
|
while (i < n)
|
|
{
|
|
if (strcmp(m_attrs[i].m_attrname.c_str(), attrname) == 0)
|
|
break;
|
|
i++;
|
|
}
|
|
require(i < n);
|
|
return m_attrs[i];
|
|
}
|
|
|
|
uint
|
|
NdbImportUtil::Table::get_nodeid(uint fragid) const
|
|
{
|
|
require(fragid < m_fragments.size());
|
|
uint nodeid = m_fragments[fragid];
|
|
return nodeid;
|
|
}
|
|
|
|
int
|
|
NdbImportUtil::add_table(NdbDictionary::Dictionary* dic,
|
|
const NdbDictionary::Table* tab,
|
|
uint& tabid,
|
|
Error& error)
|
|
{
|
|
require(tab != 0);
|
|
require(tab->getObjectStatus() == NdbDictionary::Object::Retrieved);
|
|
log_debug(1, "add_table: " << tab->getName());
|
|
tabid = tab->getObjectId();
|
|
// check if mapped already
|
|
{
|
|
std::map<uint, Table>::const_iterator it;
|
|
it = c_tables.m_tables.find(tabid);
|
|
if (it != c_tables.m_tables.end())
|
|
{
|
|
const Table& table = it->second;
|
|
require(table.m_tab == tab);
|
|
return 0;
|
|
}
|
|
}
|
|
Table table;
|
|
do
|
|
{
|
|
const NdbRecord* rec = tab->getDefaultRecord();
|
|
table.m_tabid = tabid;
|
|
table.m_tab = tab;
|
|
table.m_rec = rec;
|
|
table.m_recsize = NdbDictionary::getRecordRowLength(rec);
|
|
Attrs& attrs = table.m_attrs;
|
|
const uint attrcnt = tab->getNoOfColumns();
|
|
attrs.reserve(attrcnt);
|
|
bool ok = true;
|
|
Uint32 recAttrId;
|
|
for (uint i = 0; i < attrcnt && ok; i++)
|
|
{
|
|
if (i == 0)
|
|
require(NdbDictionary::getFirstAttrId(rec, recAttrId));
|
|
else
|
|
require(NdbDictionary::getNextAttrId(rec, recAttrId));
|
|
require(recAttrId == i);
|
|
Attr attr;
|
|
const NdbDictionary::Column* col = tab->getColumn(i);
|
|
require(col !=0);
|
|
attr.m_attrname = col->getName();
|
|
attr.m_attrno = i;
|
|
attr.m_attrid = i;
|
|
attr.m_type = col->getType();
|
|
attr.m_pk = col->getPrimaryKey();
|
|
attr.m_nullable = col->getNullable();
|
|
attr.m_precision = col->getPrecision();
|
|
attr.m_scale = col->getScale();
|
|
attr.m_length = col->getLength();
|
|
attr.m_arraytype = col->getArrayType();
|
|
require(attr.m_arraytype <= 2);
|
|
attr.m_size = col->getSizeInBytes();
|
|
switch (attr.m_type) {
|
|
case NdbDictionary::Column::Char:
|
|
attr.m_pad = true;
|
|
attr.m_padchar = 0x20;
|
|
break;
|
|
case NdbDictionary::Column::Binary:
|
|
attr.m_pad = true;
|
|
attr.m_padchar = 0x0;
|
|
break;
|
|
default:
|
|
attr.m_pad = false;
|
|
break;
|
|
}
|
|
switch (attr.m_type) {
|
|
case NdbDictionary::Column::Char:
|
|
case NdbDictionary::Column::Varchar:
|
|
case NdbDictionary::Column::Longvarchar:
|
|
attr.m_charset = col->getCharset();
|
|
require(attr.m_charset != 0);
|
|
uint mbmaxlen; mbmaxlen = attr.m_charset->mbmaxlen;
|
|
require(mbmaxlen != 0);
|
|
require(attr.m_length % mbmaxlen == 0);
|
|
attr.m_charlength = attr.m_length / mbmaxlen;
|
|
attr.m_quotable = true;
|
|
break;
|
|
case NdbDictionary::Column::Text:
|
|
attr.m_charset = col->getCharset();
|
|
require(attr.m_charset != 0);
|
|
break;
|
|
case NdbDictionary::Column::Binary:
|
|
case NdbDictionary::Column::Varbinary:
|
|
case NdbDictionary::Column::Longvarbinary:
|
|
attr.m_charset = 0;
|
|
attr.m_charlength = attr.m_length;
|
|
attr.m_quotable = true;
|
|
break;
|
|
default:
|
|
attr.m_charset = 0;
|
|
attr.m_charlength = attr.m_length;
|
|
attr.m_quotable = false;
|
|
break;
|
|
}
|
|
switch (attr.m_type) {
|
|
case NdbDictionary::Column::Blob:
|
|
case NdbDictionary::Column::Text:
|
|
attr.m_isblob = true;
|
|
attr.m_inlinesize = col->getInlineSize();
|
|
attr.m_partsize = col->getPartSize();
|
|
attr.m_blobno = table.m_blobids.size();
|
|
attr.m_blobtable = col->getBlobTable();
|
|
if (attr.m_partsize == 0)
|
|
require(attr.m_blobtable == 0);
|
|
else
|
|
require(attr.m_blobtable != 0);
|
|
table.m_blobids.push_back(i);
|
|
break;
|
|
default:
|
|
attr.m_isblob = false;
|
|
break;
|
|
}
|
|
attr.set_sqltype();
|
|
Uint32 offset;
|
|
require(NdbDictionary::getOffset(rec, i, offset));
|
|
attr.m_offset = offset;
|
|
Uint32 null_byte, null_bit;
|
|
require(NdbDictionary::getNullBitOffset(rec, i, null_byte, null_bit));
|
|
attr.m_null_byte = null_byte;
|
|
attr.m_null_bit = null_bit;
|
|
attrs.push_back(attr);
|
|
}
|
|
if (!ok)
|
|
break;
|
|
require(!NdbDictionary::getNextAttrId(rec, recAttrId));
|
|
NdbDictionary::RecordSpecification
|
|
speclist[NDB_MAX_NO_OF_ATTRIBUTES_IN_KEY];
|
|
uint nkey = 0;
|
|
for (uint i = 0; i < attrcnt && ok; i++)
|
|
{
|
|
const Attr& attr = attrs[i];
|
|
if (attr.m_pk)
|
|
{
|
|
const NdbDictionary::Column* col = tab->getColumn(i);
|
|
require(col->getPrimaryKey());
|
|
NdbDictionary::RecordSpecification& spec = speclist[nkey];
|
|
spec.column = col;
|
|
spec.offset = attr.m_offset;
|
|
spec.nullbit_byte_offset = attr.m_null_byte;
|
|
spec.nullbit_bit_in_byte = attr.m_null_bit;
|
|
nkey++;
|
|
// guess hidden pk
|
|
if (strcmp(attr.m_attrname.c_str(), "$PK") == 0)
|
|
{
|
|
if (i + 1 == attrcnt &&
|
|
nkey == 1 &&
|
|
attr.m_type == NdbDictionary::Column::Bigunsigned)
|
|
table.m_has_hidden_pk = true;
|
|
else
|
|
{
|
|
set_error_usage(error, __LINE__,
|
|
"column %u: "
|
|
"invalid use of reserved column name $PK", i);
|
|
ok = false;
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
if (!ok)
|
|
break;
|
|
require(nkey == (uint)tab->getNoOfPrimaryKeys());
|
|
const NdbRecord* keyrec =
|
|
dic->createRecord(tab,
|
|
speclist,
|
|
nkey,
|
|
sizeof(NdbDictionary::RecordSpecification));
|
|
if (keyrec == 0)
|
|
{
|
|
m_util.set_error_ndb(error, __LINE__, dic->getNdbError());
|
|
break;
|
|
}
|
|
table.m_keyrec = keyrec;
|
|
{
|
|
NdbTableImpl& tabImpl = NdbTableImpl::getImpl(*tab);
|
|
const Vector<Uint16>& fragments = tabImpl.m_fragments;
|
|
for (uint i = 0; i < fragments.size(); i++)
|
|
{
|
|
uint16 nodeid = fragments[i];
|
|
table.m_fragments.push_back(nodeid);
|
|
}
|
|
}
|
|
c_tables.m_tables.insert(std::pair<uint, Table>(tabid, table));
|
|
return 0;
|
|
} while (0);
|
|
return -1;
|
|
}
|
|
|
|
const NdbImportUtil::Table&
|
|
NdbImportUtil::get_table(uint tabid)
|
|
{
|
|
std::map<uint, Table>::const_iterator it;
|
|
it = c_tables.m_tables.find(tabid);
|
|
require(it != c_tables.m_tables.end());
|
|
const Table& table = it->second;
|
|
return table;
|
|
}
|
|
|
|
void
|
|
NdbImportUtil::remove_table(NdbDictionary::Dictionary* dic, uint tabid)
|
|
{
|
|
std::map<uint, Table>::const_iterator it;
|
|
it = c_tables.m_tables.find(tabid);
|
|
require(it != c_tables.m_tables.end());
|
|
const Table& table = it->second;
|
|
|
|
dic->releaseRecord(const_cast<NdbRecord*>(table.m_keyrec));
|
|
c_tables.m_tables.erase(it);
|
|
}
|
|
|
|
// rows
|
|
|
|
NdbImportUtil::Row::Row()
|
|
{
|
|
m_tabid = Inval_uint;
|
|
m_recsize = 0;
|
|
m_rowsize = 0;
|
|
m_allocsize = 0;
|
|
m_rowid = Inval_uint64;
|
|
m_linenr = Inval_uint64;
|
|
m_startpos = Inval_uint64;
|
|
m_endpos = Inval_uint64;
|
|
m_data = 0;
|
|
}
|
|
|
|
NdbImportUtil::Row::~Row()
|
|
{
|
|
delete [] m_data;
|
|
|
|
for (uint i = 0; i < m_blobs.size(); ++i)
|
|
{
|
|
Blob* blob = m_blobs[i];
|
|
if (blob != NULL)
|
|
{
|
|
delete blob;
|
|
}
|
|
}
|
|
|
|
m_blobs.clear();
|
|
}
|
|
|
|
void
|
|
NdbImportUtil::Row::init(const Table& table)
|
|
{
|
|
m_tabid = table.m_tabid;
|
|
uint recsize = table.m_recsize;
|
|
require(recsize > 0);
|
|
m_recsize = recsize;
|
|
m_rowsize = recsize; // full main record is always included
|
|
if (m_allocsize < recsize)
|
|
{
|
|
delete [] m_data;
|
|
m_data = new uchar [recsize];
|
|
m_allocsize = recsize;
|
|
}
|
|
}
|
|
|
|
NdbImportUtil::RowList::RowList()
|
|
{
|
|
m_rowsize = 0;
|
|
m_rowbatch = UINT_MAX;
|
|
m_rowbytes = UINT_MAX;
|
|
m_eof = false;
|
|
m_foe = false;
|
|
m_overflow = 0;
|
|
m_underflow = 0;
|
|
m_stat_overflow = 0;
|
|
m_stat_underflow = 0;
|
|
m_stat_locks = 0;
|
|
}
|
|
|
|
NdbImportUtil::RowList::~RowList ()
|
|
{
|
|
Row *one_row;
|
|
while ((one_row = pop_front()) != NULL)
|
|
{
|
|
delete one_row;
|
|
}
|
|
}
|
|
|
|
void
|
|
NdbImportUtil::RowList::set_stats(Stats& stats, const char* name)
|
|
{
|
|
List::set_stats(stats, name);
|
|
{
|
|
const Name statname(name, "overflow");
|
|
Stat* stat = stats.create(statname, 0, 0);
|
|
m_stat_overflow = stat;
|
|
}
|
|
{
|
|
const Name statname(name, "underflow");
|
|
Stat* stat = stats.create(statname, 0, 0);
|
|
m_stat_underflow = stat;
|
|
}
|
|
{
|
|
const Name statname(name, "locks");
|
|
Stat* stat = stats.create(statname, 0, 0);
|
|
m_stat_locks = stat;
|
|
}
|
|
}
|
|
|
|
bool
|
|
NdbImportUtil::RowList::push_back(Row* row)
|
|
{
|
|
bool ret = false;
|
|
if (m_cnt < m_rowbatch && m_rowsize < m_rowbytes)
|
|
{
|
|
List::push_back(row);
|
|
m_rowsize += row->m_rowsize;
|
|
ret = true;
|
|
}
|
|
else
|
|
{
|
|
m_overflow++;
|
|
if (m_stat_overflow != 0)
|
|
m_stat_overflow->add(1);
|
|
}
|
|
return ret;
|
|
}
|
|
|
|
void
|
|
NdbImportUtil::RowList::push_back_force(Row* row)
|
|
{
|
|
List::push_back(row);
|
|
m_rowsize += row->m_rowsize;
|
|
}
|
|
|
|
bool
|
|
NdbImportUtil::RowList::push_front(Row* row)
|
|
{
|
|
bool ret = false;
|
|
if (m_cnt < m_rowbatch && m_rowsize < m_rowbytes)
|
|
{
|
|
List::push_front(row);
|
|
m_rowsize += row->m_rowsize;
|
|
ret = true;
|
|
}
|
|
else
|
|
{
|
|
m_overflow++;
|
|
if (m_stat_overflow != 0)
|
|
m_stat_overflow->add(1);
|
|
}
|
|
return ret;
|
|
}
|
|
|
|
void
|
|
NdbImportUtil::RowList::push_front_force(Row* row)
|
|
{
|
|
List::push_front(row);
|
|
m_rowsize += row->m_rowsize;
|
|
}
|
|
|
|
NdbImportUtil::Row*
|
|
NdbImportUtil::RowList::pop_front()
|
|
{
|
|
Row* row = 0;
|
|
do
|
|
{
|
|
row = static_cast<Row*>(List::pop_front());
|
|
if (row != 0)
|
|
{
|
|
require(m_rowsize >= row->m_rowsize);
|
|
m_rowsize -= row->m_rowsize;
|
|
break;
|
|
}
|
|
m_underflow++;
|
|
if (m_stat_underflow != 0)
|
|
m_stat_underflow->add(1);
|
|
} while (0);
|
|
return row;
|
|
}
|
|
|
|
void
|
|
NdbImportUtil::RowList::remove(Row* row)
|
|
{
|
|
List::remove(row);
|
|
require(m_rowsize >= row->m_rowsize);
|
|
m_rowsize -= row->m_rowsize;
|
|
}
|
|
|
|
void
|
|
NdbImportUtil::RowList::push_back_from(RowList& src)
|
|
{
|
|
List::push_back_from(src);
|
|
m_rowsize += src.m_rowsize;
|
|
src.m_rowsize = 0;
|
|
validate();
|
|
src.validate();
|
|
}
|
|
|
|
/*
|
|
* Transfer rows from a shared list src to our list. If src is
|
|
* empty, try to wait. Terminate if our list is full. If any rows
|
|
* were transferred, do not wait for more, and signal that src now
|
|
* has fewer rows.
|
|
*/
|
|
void
|
|
NdbImportUtil::RowList::push_back_from(RowList& src, RowCtl& ctl)
|
|
{
|
|
uint retries = ctl.m_retries;
|
|
uint cnt_out = 0;
|
|
uint bytes_out = 0;
|
|
if (unlikely(full()))
|
|
return;
|
|
while (src.empty() && retries != 0)
|
|
{
|
|
if (ctl.m_dowait)
|
|
src.wait(ctl.m_timeout);
|
|
retries--;
|
|
}
|
|
while (!src.empty())
|
|
{
|
|
// pop because row cannot be on 2 lists
|
|
Row* row = src.pop_front();
|
|
if (push_back(row))
|
|
{
|
|
cnt_out++;
|
|
bytes_out += row->m_rowsize;
|
|
continue;
|
|
}
|
|
src.push_front_force(row);
|
|
// our list is full
|
|
break;
|
|
}
|
|
if (cnt_out != 0 && ctl.m_dosignal)
|
|
{
|
|
// signal that we removed some rows from src
|
|
src.signal();
|
|
}
|
|
ctl.m_cnt_out += cnt_out;
|
|
ctl.m_bytes_out += bytes_out;
|
|
}
|
|
|
|
/*
|
|
* Transfer rows from our list to a shared list dst. If dst is
|
|
* full, try to wait. Terminate if our list is empty. If any rows
|
|
* were transferred, do not wait for more, and signal that dst now
|
|
* has more rows.
|
|
*/
|
|
void
|
|
NdbImportUtil::RowList::pop_front_to(RowList& dst, RowCtl& ctl)
|
|
{
|
|
uint retries = ctl.m_retries;
|
|
uint cnt_out = 0;
|
|
uint bytes_out = 0;
|
|
if (unlikely(empty()))
|
|
return;
|
|
while (dst.full() && retries != 0)
|
|
{
|
|
if (ctl.m_dowait)
|
|
dst.wait(ctl.m_timeout);
|
|
retries--;
|
|
}
|
|
while (!empty())
|
|
{
|
|
// pop because row cannot be on 2 lists
|
|
Row* row = pop_front();
|
|
if (dst.push_back(row))
|
|
{
|
|
cnt_out++;
|
|
bytes_out += row->m_rowsize;
|
|
continue;
|
|
}
|
|
push_front_force(row);
|
|
// dst is full
|
|
break;
|
|
}
|
|
if (cnt_out != 0 && ctl.m_dosignal)
|
|
{
|
|
// signal that we added some rows to dst
|
|
dst.signal();
|
|
}
|
|
ctl.m_cnt_out += cnt_out;
|
|
ctl.m_bytes_out += bytes_out;
|
|
}
|
|
|
|
#if defined(VM_TRACE) || defined(TEST_NDBIMPORTUTIL)
|
|
void
|
|
NdbImportUtil::RowList::validate() const
|
|
{
|
|
List::validate();
|
|
if (m_cnt == 0)
|
|
require(m_rowsize == 0);
|
|
if (m_rowsize == 0)
|
|
require(m_cnt == 0);
|
|
#if defined(VM_TRACE) && defined(TEST_NDBIMPORTUTIL)
|
|
uint rowsize = 0;
|
|
const Row* row = static_cast<const Row*>(m_front);
|
|
while (row != 0)
|
|
{
|
|
rowsize += row->m_rowsize;
|
|
row = static_cast<const Row*>(row->m_next);
|
|
}
|
|
require(m_rowsize == rowsize);
|
|
#endif
|
|
}
|
|
#endif
|
|
|
|
// alloc and free shared rows
|
|
|
|
NdbImportUtil::Row*
|
|
NdbImportUtil::alloc_row(const Table& table, bool dolock)
|
|
{
|
|
RowList& rows = *c_rows_free;
|
|
if (dolock)
|
|
rows.lock();
|
|
Row* row = rows.pop_front();
|
|
if (dolock)
|
|
rows.unlock();
|
|
if (row == 0)
|
|
{
|
|
row = new Row;
|
|
}
|
|
row->init(table);
|
|
while (row->m_blobs.size() < table.m_blobids.size())
|
|
{
|
|
Blob* blob = alloc_blob();
|
|
row->m_blobs.push_back(blob);
|
|
}
|
|
return row;
|
|
}
|
|
|
|
void
|
|
NdbImportUtil::alloc_rows(const Table& table, uint cnt, RowList& dst)
|
|
{
|
|
RowList& rows = *c_rows_free;
|
|
rows.lock();
|
|
for (uint i = 0; i < cnt; i++)
|
|
{
|
|
Row* row = alloc_row(table, false);
|
|
dst.push_back_force(row); // ignore limits
|
|
}
|
|
rows.unlock();
|
|
}
|
|
|
|
void
|
|
NdbImportUtil::free_blobs_from_row(Row *row)
|
|
{
|
|
for (uint i = 0; i < row->m_blobs.size(); ++i)
|
|
{
|
|
Blob* blob = row->m_blobs[i];
|
|
if (blob != NULL)
|
|
{
|
|
free_blob(blob);
|
|
}
|
|
}
|
|
row->m_blobs.clear();
|
|
}
|
|
|
|
void
|
|
NdbImportUtil::free_row(Row* row)
|
|
{
|
|
free_blobs_from_row(row);
|
|
|
|
RowList& rows = *c_rows_free;
|
|
rows.lock();
|
|
rows.push_back(row);
|
|
rows.unlock();
|
|
}
|
|
|
|
void
|
|
NdbImportUtil::free_rows(RowList& src)
|
|
{
|
|
RowList blob_freed_rows;
|
|
while (!src.empty())
|
|
{
|
|
Row *one_row = src.pop_front();
|
|
free_blobs_from_row(one_row);
|
|
blob_freed_rows.push_back(one_row);
|
|
}
|
|
RowList& rows = *c_rows_free;
|
|
rows.lock();
|
|
rows.push_back_from(blob_freed_rows);
|
|
rows.unlock();
|
|
}
|
|
|
|
// blobs
|
|
|
|
NdbImportUtil::Blob::Blob()
|
|
{
|
|
m_blobsize = 0;
|
|
m_allocsize = 0;
|
|
m_data = new uchar [0];
|
|
}
|
|
|
|
NdbImportUtil::Blob::~Blob()
|
|
{
|
|
delete [] m_data;
|
|
}
|
|
|
|
NdbImportUtil::BlobList::BlobList()
|
|
{
|
|
}
|
|
|
|
NdbImportUtil::BlobList::~BlobList()
|
|
{
|
|
|
|
Blob *blob = NULL;
|
|
while ((blob = pop_front()) != NULL)
|
|
{
|
|
delete blob;
|
|
}
|
|
}
|
|
|
|
void
|
|
NdbImportUtil::Blob::resize(uint size)
|
|
{
|
|
if (m_allocsize < size)
|
|
{
|
|
delete [] m_data;
|
|
m_data = new uchar [size];
|
|
m_allocsize = size;
|
|
}
|
|
}
|
|
|
|
NdbImportUtil::Blob*
|
|
NdbImportUtil::alloc_blob()
|
|
{
|
|
BlobList& blobs = *c_blobs_free;
|
|
blobs.lock();
|
|
Blob* blob = static_cast<Blob*>(blobs.pop_front());
|
|
blobs.unlock();
|
|
if (blob == 0)
|
|
{
|
|
blob = new Blob;
|
|
}
|
|
return blob;
|
|
}
|
|
|
|
void
|
|
NdbImportUtil::free_blob(Blob* blob)
|
|
{
|
|
BlobList& blobs = *c_blobs_free;
|
|
blobs.lock();
|
|
blobs.push_back(blob);
|
|
blobs.unlock();
|
|
}
|
|
|
|
// rowmap
|
|
|
|
NdbImportUtil::RowMap::RowMap(NdbImportUtil& util) :
|
|
m_util(util)
|
|
{
|
|
}
|
|
|
|
NdbImportUtil::Range::Range()
|
|
{
|
|
m_start = 0;
|
|
m_end = 0;
|
|
m_startpos = 0;
|
|
m_endpos = 0;
|
|
m_reject = 0;
|
|
}
|
|
|
|
NdbImportUtil::Range::~Range()
|
|
{
|
|
}
|
|
|
|
void
|
|
NdbImportUtil::Range::copy(const Range& range2)
|
|
{
|
|
m_start = range2.m_start;
|
|
m_end = range2.m_end;
|
|
m_startpos = range2.m_startpos;
|
|
m_endpos = range2.m_endpos;
|
|
m_reject = range2.m_reject;
|
|
}
|
|
|
|
/*
|
|
* This is typically used by a worker to add a row to its
|
|
* private rowmap. The row is likely to go near the end
|
|
* so search is done backwards.
|
|
*/
|
|
void
|
|
NdbImportUtil::RowMap::add(Range range2)
|
|
{
|
|
RangeList& ranges = m_ranges;
|
|
Range* r2 = alloc_range();
|
|
r2->copy(range2);
|
|
if (ranges.empty())
|
|
{
|
|
ranges.push_back(r2);
|
|
}
|
|
else
|
|
{
|
|
Range* rback = ranges.back();
|
|
Range* rfront = ranges.front();
|
|
if (rback->m_start < r2->m_start)
|
|
{
|
|
if (merge_up(rback, r2))
|
|
{
|
|
// rback grows up to include r2
|
|
free_range(r2);
|
|
}
|
|
else
|
|
{
|
|
ranges.push_back(r2);
|
|
}
|
|
}
|
|
else if (r2->m_start < rfront->m_start)
|
|
{
|
|
if (merge_down(rfront, r2))
|
|
{
|
|
// rfront grows down to include r2
|
|
free_range(r2);
|
|
}
|
|
else
|
|
{
|
|
ranges.push_front(r2);
|
|
}
|
|
}
|
|
else
|
|
{
|
|
// r2 is between 2 entries rprev rnext
|
|
Range* rprev = rback;
|
|
Range* rnext = 0;
|
|
while (1)
|
|
{
|
|
if (r2->m_start > rprev->m_start)
|
|
{
|
|
// found the place
|
|
require(rnext != 0);
|
|
if (merge_up(rprev, r2))
|
|
{
|
|
// rprev grows up to include r2
|
|
free_range(r2);
|
|
if (merge_up(rprev, rnext))
|
|
{
|
|
// rprev and rnext have been joined via r2
|
|
// rnext is now obsolete
|
|
ranges.remove(rnext);
|
|
free_range(rnext);
|
|
}
|
|
}
|
|
else if (merge_down(rnext, r2))
|
|
{
|
|
// rnext grows down to include r2
|
|
free_range(r2);
|
|
}
|
|
else
|
|
{
|
|
// r2 becomes new entry after rprev
|
|
ranges.push_after(rprev, r2);
|
|
}
|
|
break;
|
|
}
|
|
rnext = rprev;
|
|
rprev = rprev->prev();
|
|
require(rprev != 0);
|
|
}
|
|
}
|
|
}
|
|
validate();
|
|
}
|
|
|
|
/*
|
|
* Merge from another rowmap. Walks through both maps
|
|
* in ascending order. The argument map2 is not modified
|
|
* here but is normally cleared afterwards by caller.
|
|
*/
|
|
void
|
|
NdbImportUtil::RowMap::add(const RowMap& map2)
|
|
{
|
|
RangeList& ranges = m_ranges;
|
|
const RangeList& ranges2 = map2.m_ranges;
|
|
Range* r = ranges.front();
|
|
const Range* r2 = ranges2.front();
|
|
while (1)
|
|
{
|
|
if (r == 0)
|
|
{
|
|
// copy rest of map2 using our free ranges
|
|
while (r2 != 0)
|
|
{
|
|
r = alloc_range();
|
|
r->copy(*r2);
|
|
ranges.push_back(r);
|
|
r2 = r2->next();
|
|
}
|
|
break;
|
|
}
|
|
if (r2 == 0)
|
|
{
|
|
// nothing more to do
|
|
break;
|
|
}
|
|
if (r->m_start < r2->m_start)
|
|
{
|
|
{
|
|
Range* rnext = r->next();
|
|
if (rnext != 0 && rnext->m_start < r2->m_start)
|
|
{
|
|
// still below r2
|
|
r = rnext;
|
|
continue;
|
|
}
|
|
}
|
|
if (merge_up(r, r2))
|
|
{
|
|
// r grows up to include r2
|
|
Range* rnext = r->next();
|
|
if (rnext != 0 && merge_up(r, rnext))
|
|
{
|
|
// r and rnext have been joined via r2
|
|
// rnext is now obsolete
|
|
ranges.remove(rnext);
|
|
free_range(rnext);
|
|
}
|
|
// leave r unchanged as next r2 may also apply
|
|
{
|
|
const Range* r2next = r2->next();
|
|
// even in the join case r2next cannot overlap r
|
|
if (r2next != 0)
|
|
require(r->m_end <= r2next->m_start);
|
|
}
|
|
}
|
|
else
|
|
{
|
|
// r2 creates new entry
|
|
Range* rnext = alloc_range();
|
|
rnext->copy(*r2);
|
|
ranges.push_after(r, rnext);
|
|
require(rnext == r->next());
|
|
// move to the new entry
|
|
r = r->next();
|
|
{
|
|
Range* rnext = r->next();
|
|
if (rnext != 0 && merge_up(r, rnext))
|
|
{
|
|
// r and rnext have been joined via r2
|
|
// rnext is now obsolete
|
|
ranges.remove(rnext);
|
|
free_range(rnext);
|
|
}
|
|
}
|
|
// leave current r unchanged
|
|
}
|
|
// r2 has been consumed
|
|
r2 = r2->next();
|
|
continue;
|
|
}
|
|
if (r->m_start > r2->m_start)
|
|
{
|
|
if (merge_down(r, r2))
|
|
{
|
|
// r grows down to include r2
|
|
// no more entries below r but there can be one above
|
|
}
|
|
else
|
|
{
|
|
// r2 creates new entry
|
|
Range* rprev = alloc_range();
|
|
rprev->copy(*r2);
|
|
ranges.push_before(r, rprev);
|
|
// can be more entries below r
|
|
}
|
|
// r2 has been consumed
|
|
r2 = r2->next();
|
|
continue;
|
|
}
|
|
require(false);
|
|
}
|
|
validate();
|
|
}
|
|
|
|
/*
|
|
* find() and remove() are used only on --resume, which consumes
|
|
* the old rowmap. They need not be afficient.
|
|
*/
|
|
|
|
NdbImportUtil::Range*
|
|
NdbImportUtil::RowMap::find(uint64 rowid)
|
|
{
|
|
RangeList& ranges = m_ranges;
|
|
Range* r = ranges.front();
|
|
while (r != 0)
|
|
{
|
|
if (r->m_start <= rowid && rowid < r->m_end)
|
|
break;
|
|
r = r->next();
|
|
}
|
|
return r;
|
|
}
|
|
|
|
bool
|
|
NdbImportUtil::RowMap::remove(uint64 rowid)
|
|
{
|
|
RangeList& ranges = m_ranges;
|
|
Range* r = find(rowid);
|
|
if (r == 0)
|
|
return false;
|
|
if (rowid == r->m_start)
|
|
{
|
|
r->m_start++;
|
|
if (r->m_start == r->m_end)
|
|
{
|
|
ranges.remove(r);
|
|
free_range(r);
|
|
}
|
|
}
|
|
else if (rowid == r->m_end - 1)
|
|
{
|
|
r->m_end -= 1;
|
|
require(r->m_start < r->m_end);
|
|
}
|
|
else
|
|
{
|
|
Range* r2 = alloc_range();
|
|
r2->m_start = rowid + 1;
|
|
r2->m_end = r->m_end;
|
|
r2->m_reject = 0; // not relevant
|
|
require(r2->m_start < r2->m_end);
|
|
r->m_end = rowid;
|
|
require(r->m_start < r->m_end);
|
|
ranges.push_after(r, r2);
|
|
}
|
|
return true;
|
|
}
|
|
|
|
void
|
|
NdbImportUtil::RowMap::get_total(uint64& rows, uint64& reject) const
|
|
{
|
|
uint64 trows = 0;
|
|
uint64 treject = 0;
|
|
const Range* r = m_ranges.front();
|
|
while (r != 0)
|
|
{
|
|
trows += r->m_end - r->m_start - r->m_reject;
|
|
treject += r->m_reject;
|
|
r = r->next();
|
|
}
|
|
rows = trows;
|
|
reject = treject;
|
|
}
|
|
|
|
NdbImportUtil::Range*
|
|
NdbImportUtil::alloc_range(bool dolock)
|
|
{
|
|
RangeList& ranges = c_ranges_free;
|
|
if (dolock)
|
|
ranges.lock();
|
|
Range* range = ranges.pop_front();
|
|
if (dolock)
|
|
ranges.unlock();
|
|
if (range == 0) {
|
|
range = new Range;
|
|
}
|
|
return range;
|
|
}
|
|
|
|
void
|
|
NdbImportUtil::alloc_ranges(uint cnt, RangeList& dst)
|
|
{
|
|
RangeList& ranges = c_ranges_free;
|
|
ranges.lock();
|
|
for (uint i = 0; i < cnt; i++)
|
|
{
|
|
Range* range = alloc_range(false);
|
|
dst.push_back(range);
|
|
}
|
|
ranges.unlock();
|
|
}
|
|
|
|
void
|
|
NdbImportUtil::free_range(Range* range)
|
|
{
|
|
RangeList& ranges = c_ranges_free;
|
|
ranges.lock();
|
|
ranges.push_back(range);
|
|
ranges.unlock();
|
|
}
|
|
|
|
void
|
|
NdbImportUtil::free_ranges(RangeList& src)
|
|
{
|
|
RangeList& ranges = c_ranges_free;
|
|
ranges.lock();
|
|
ranges.push_back_from(src);
|
|
ranges.unlock();
|
|
}
|
|
|
|
#if defined(VM_TRACE) || defined(TEST_NDBIMPORTUTIL)
|
|
void
|
|
NdbImportUtil::RowMap::validate() const
|
|
{
|
|
m_ranges.validate();
|
|
#if defined(VM_TRACE) && defined(TEST_NDBIMPORTUTIL)
|
|
const RangeList& ranges = m_ranges;
|
|
const Range* r2 = 0;
|
|
const Range* r1 = ranges.front();
|
|
while (r1 != 0)
|
|
{
|
|
require(r1->m_start < r1->m_end);
|
|
if (r2 != 0)
|
|
require(r2->m_end < r1->m_start);
|
|
r2 = r1;
|
|
r1 = r1->next();
|
|
}
|
|
#endif
|
|
}
|
|
#endif
|
|
|
|
NdbOut&
|
|
operator<<(NdbOut& out, const NdbImportUtil::Range& range)
|
|
{
|
|
out << "start=" << range.m_start
|
|
<< " end=" << range.m_end
|
|
<< " rows=" << range.m_end - range.m_start
|
|
<< " startpos=" << range.m_startpos
|
|
<< " endpos=" << range.m_endpos
|
|
<< " bytes=" << range.m_endpos - range.m_startpos
|
|
<< " reject=" << range.m_reject;
|
|
return out;
|
|
}
|
|
|
|
NdbOut&
|
|
operator<<(NdbOut& out, const NdbImportUtil::RowMap& rowmap)
|
|
{
|
|
const NdbImportUtil::RangeList& ranges = rowmap.m_ranges;
|
|
const NdbImportUtil::Range* r = ranges.front();
|
|
uint i = 0;
|
|
while (r != 0)
|
|
{
|
|
out << i << ": " << *r << endl;
|
|
r = r->next();
|
|
i++;
|
|
}
|
|
return out;
|
|
}
|
|
|
|
// pseudo-tables
|
|
|
|
void
|
|
NdbImportUtil::add_pseudo_tables()
|
|
{
|
|
add_result_table();
|
|
add_reject_table();
|
|
add_rowmap_table();
|
|
add_stopt_table();
|
|
add_stats_table();
|
|
}
|
|
|
|
void
|
|
NdbImportUtil::add_result_table()
|
|
{
|
|
Table& table = c_result_table;
|
|
table.m_tabid = g_result_tabid;
|
|
require(table.m_recsize == 0);
|
|
table.add_pseudo_attr("runno",
|
|
NdbDictionary::Column::Unsigned);
|
|
table.add_pseudo_attr("name",
|
|
NdbDictionary::Column::Varchar,
|
|
10);
|
|
table.add_pseudo_attr("desc",
|
|
NdbDictionary::Column::Varchar,
|
|
100);
|
|
table.add_pseudo_attr("result",
|
|
NdbDictionary::Column::Unsigned);
|
|
table.add_pseudo_attr("rows",
|
|
NdbDictionary::Column::Bigunsigned);
|
|
table.add_pseudo_attr("reject",
|
|
NdbDictionary::Column::Bigunsigned);
|
|
table.add_pseudo_attr("temperrors",
|
|
NdbDictionary::Column::Bigunsigned);
|
|
table.add_pseudo_attr("runtime",
|
|
NdbDictionary::Column::Bigunsigned);
|
|
table.add_pseudo_attr("utime",
|
|
NdbDictionary::Column::Bigunsigned);
|
|
add_error_attrs(table);
|
|
}
|
|
|
|
void
|
|
NdbImportUtil::add_reject_table()
|
|
{
|
|
Table& table = c_reject_table;
|
|
table.m_tabid = g_reject_tabid;
|
|
require(table.m_recsize == 0);
|
|
table.add_pseudo_attr("runno",
|
|
NdbDictionary::Column::Unsigned);
|
|
table.add_pseudo_attr("rowid",
|
|
NdbDictionary::Column::Bigunsigned);
|
|
table.add_pseudo_attr("linenr",
|
|
NdbDictionary::Column::Bigunsigned);
|
|
table.add_pseudo_attr("startpos",
|
|
NdbDictionary::Column::Bigunsigned);
|
|
table.add_pseudo_attr("endpos",
|
|
NdbDictionary::Column::Bigunsigned);
|
|
table.add_pseudo_attr("bytes",
|
|
NdbDictionary::Column::Bigunsigned);
|
|
add_error_attrs(table);
|
|
table.add_pseudo_attr("reject",
|
|
NdbDictionary::Column::Text);
|
|
}
|
|
|
|
void
|
|
NdbImportUtil::add_rowmap_table()
|
|
{
|
|
Table& table = c_rowmap_table;
|
|
table.m_tabid = g_rowmap_tabid;
|
|
require(table.m_recsize == 0);
|
|
table.add_pseudo_attr("runno",
|
|
NdbDictionary::Column::Unsigned);
|
|
table.add_pseudo_attr("start",
|
|
NdbDictionary::Column::Bigunsigned);
|
|
table.add_pseudo_attr("end",
|
|
NdbDictionary::Column::Bigunsigned);
|
|
table.add_pseudo_attr("rows",
|
|
NdbDictionary::Column::Bigunsigned);
|
|
table.add_pseudo_attr("startpos",
|
|
NdbDictionary::Column::Bigunsigned);
|
|
table.add_pseudo_attr("endpos",
|
|
NdbDictionary::Column::Bigunsigned);
|
|
table.add_pseudo_attr("bytes",
|
|
NdbDictionary::Column::Bigunsigned);
|
|
table.add_pseudo_attr("reject",
|
|
NdbDictionary::Column::Bigunsigned);
|
|
}
|
|
|
|
void
|
|
NdbImportUtil::add_stopt_table()
|
|
{
|
|
Table& table = c_stopt_table;
|
|
table.m_tabid = g_stopt_tabid;
|
|
require(table.m_recsize == 0);
|
|
table.add_pseudo_attr("runno",
|
|
NdbDictionary::Column::Unsigned);
|
|
table.add_pseudo_attr("option",
|
|
NdbDictionary::Column::Varchar,
|
|
100);
|
|
table.add_pseudo_attr("value",
|
|
NdbDictionary::Column::Unsigned);
|
|
}
|
|
|
|
void
|
|
NdbImportUtil::add_stats_table()
|
|
{
|
|
Table& table = c_stats_table;
|
|
table.m_tabid = g_stats_tabid;
|
|
require(table.m_recsize == 0);
|
|
table.add_pseudo_attr("runno",
|
|
NdbDictionary::Column::Unsigned);
|
|
table.add_pseudo_attr("id",
|
|
NdbDictionary::Column::Unsigned);
|
|
table.add_pseudo_attr("name",
|
|
NdbDictionary::Column::Varchar,
|
|
100);
|
|
table.add_pseudo_attr("parent",
|
|
NdbDictionary::Column::Unsigned);
|
|
table.add_pseudo_attr("obs",
|
|
NdbDictionary::Column::Bigunsigned);
|
|
table.add_pseudo_attr("sum",
|
|
NdbDictionary::Column::Bigunsigned);
|
|
table.add_pseudo_attr("mean",
|
|
NdbDictionary::Column::Double);
|
|
table.add_pseudo_attr("min",
|
|
NdbDictionary::Column::Bigunsigned);
|
|
table.add_pseudo_attr("max",
|
|
NdbDictionary::Column::Bigunsigned);
|
|
table.add_pseudo_attr("stddev",
|
|
NdbDictionary::Column::Double);
|
|
}
|
|
|
|
#include <math.h>
|
|
|
|
void
|
|
NdbImportUtil::add_error_attrs(Table& table)
|
|
{
|
|
table.add_pseudo_attr("errortype",
|
|
NdbDictionary::Column::Varchar,
|
|
10);
|
|
table.add_pseudo_attr("errorcode",
|
|
NdbDictionary::Column::Unsigned);
|
|
table.add_pseudo_attr("sourceline",
|
|
NdbDictionary::Column::Unsigned);
|
|
table.add_pseudo_attr("errortext",
|
|
NdbDictionary::Column::Longvarchar,
|
|
1024);
|
|
}
|
|
|
|
void
|
|
NdbImportUtil::set_result_row(Row* row,
|
|
uint32 runno,
|
|
const char* name,
|
|
const char* desc,
|
|
uint64 rows,
|
|
uint64 reject,
|
|
uint64 temperrors,
|
|
uint64 runtime,
|
|
uint64 utime,
|
|
const Error& error)
|
|
{
|
|
const Table& table = c_result_table;
|
|
const Attrs& attrs = table.m_attrs;
|
|
uint id = 0;
|
|
// runno
|
|
{
|
|
const Attr& attr = attrs[id];
|
|
attr.set_value(row, &runno, sizeof(runno));
|
|
id++;
|
|
}
|
|
// name
|
|
{
|
|
const Attr& attr = attrs[id];
|
|
attr.set_value(row, name, strlen(name));
|
|
id++;
|
|
}
|
|
// desc
|
|
{
|
|
const Attr& attr = attrs[id];
|
|
attr.set_value(row, desc, strlen(desc));
|
|
id++;
|
|
}
|
|
// result
|
|
{
|
|
const Attr& attr = attrs[id];
|
|
uint32 value = m_util.has_error(error);
|
|
attr.set_value(row, &value, sizeof(value));
|
|
id++;
|
|
}
|
|
// rows
|
|
{
|
|
const Attr& attr = attrs[id];
|
|
attr.set_value(row, &rows, sizeof(rows));
|
|
id++;
|
|
}
|
|
// reject
|
|
{
|
|
const Attr& attr = attrs[id];
|
|
attr.set_value(row, &reject, sizeof(reject));
|
|
id++;
|
|
}
|
|
// temperrors
|
|
{
|
|
const Attr& attr = attrs[id];
|
|
attr.set_value(row, &temperrors, sizeof(temperrors));
|
|
id++;
|
|
}
|
|
// runtime
|
|
{
|
|
const Attr& attr = attrs[id];
|
|
attr.set_value(row, &runtime, sizeof(runtime));
|
|
id++;
|
|
}
|
|
// utime
|
|
{
|
|
const Attr& attr = attrs[id];
|
|
attr.set_value(row, &utime, sizeof(utime));
|
|
id++;
|
|
}
|
|
// error
|
|
set_error_attrs(row, table, error, id);
|
|
require(id == attrs.size());
|
|
}
|
|
|
|
void
|
|
NdbImportUtil::set_reject_row(Row* row,
|
|
uint32 runno,
|
|
const Error& error,
|
|
const char* reject,
|
|
uint rejectlen)
|
|
{
|
|
const Table& table = c_reject_table;
|
|
const Attrs& attrs = table.m_attrs;
|
|
uint id = 0;
|
|
// runno
|
|
{
|
|
const Attr& attr = attrs[id];
|
|
attr.set_value(row, &runno, sizeof(runno));
|
|
id++;
|
|
}
|
|
// rowid
|
|
{
|
|
const Attr& attr = attrs[id];
|
|
attr.set_value(row, &row->m_rowid, sizeof(row->m_rowid));
|
|
id++;
|
|
}
|
|
// linenr
|
|
{
|
|
const Attr& attr = attrs[id];
|
|
attr.set_value(row, &row->m_linenr, sizeof(row->m_linenr));
|
|
id++;
|
|
}
|
|
// startpos
|
|
{
|
|
const Attr& attr = attrs[id];
|
|
attr.set_value(row, &row->m_startpos, sizeof(row->m_startpos));
|
|
id++;
|
|
}
|
|
// endpos
|
|
{
|
|
const Attr& attr = attrs[id];
|
|
attr.set_value(row, &row->m_endpos, sizeof(row->m_endpos));
|
|
id++;
|
|
}
|
|
// bytes
|
|
{
|
|
const Attr& attr = attrs[id];
|
|
uint64 bytes = row->m_endpos - row->m_startpos;
|
|
attr.set_value(row, &bytes, sizeof(bytes));
|
|
id++;
|
|
}
|
|
// error
|
|
set_error_attrs(row, table, error, id);
|
|
// reject
|
|
{
|
|
const Attr& attr = attrs[id];
|
|
attr.set_blob(row, reject, rejectlen);
|
|
id++;
|
|
}
|
|
require(id == attrs.size());
|
|
}
|
|
|
|
void
|
|
NdbImportUtil::set_rowmap_row(Row* row,
|
|
uint32 runno,
|
|
const Range& range)
|
|
{
|
|
const Table& table = c_rowmap_table;
|
|
const Attrs& attrs = table.m_attrs;
|
|
uint id = 0;
|
|
// runno
|
|
{
|
|
const Attr& attr = attrs[id];
|
|
attr.set_value(row, &runno, sizeof(runno));
|
|
id++;
|
|
}
|
|
// start
|
|
{
|
|
const Attr& attr = attrs[id];
|
|
attr.set_value(row, &range.m_start, sizeof(range.m_start));
|
|
id++;
|
|
}
|
|
// end
|
|
{
|
|
const Attr& attr = attrs[id];
|
|
attr.set_value(row, &range.m_end, sizeof(range.m_end));
|
|
id++;
|
|
}
|
|
// rows
|
|
{
|
|
const Attr& attr = attrs[id];
|
|
uint64 rows = range.m_end - range.m_start;
|
|
attr.set_value(row, &rows, sizeof(rows));
|
|
id++;
|
|
}
|
|
// startpos
|
|
{
|
|
const Attr& attr = attrs[id];
|
|
attr.set_value(row, &range.m_startpos, sizeof(range.m_startpos));
|
|
id++;
|
|
}
|
|
// end
|
|
{
|
|
const Attr& attr = attrs[id];
|
|
attr.set_value(row, &range.m_endpos, sizeof(range.m_endpos));
|
|
id++;
|
|
}
|
|
// bytes
|
|
{
|
|
const Attr& attr = attrs[id];
|
|
uint64 bytes = range.m_endpos - range.m_startpos;
|
|
attr.set_value(row, &bytes, sizeof(bytes));
|
|
id++;
|
|
}
|
|
// reject
|
|
{
|
|
const Attr& attr = attrs[id];
|
|
attr.set_value(row, &range.m_reject, sizeof(range.m_reject));
|
|
id++;
|
|
}
|
|
require(id == attrs.size());
|
|
}
|
|
|
|
void
|
|
NdbImportUtil::set_stopt_row(Row* row,
|
|
uint32 runno,
|
|
const char* option,
|
|
uint32 value)
|
|
{
|
|
const Table& table = c_stopt_table;
|
|
const Attrs& attrs = table.m_attrs;
|
|
uint id = 0;
|
|
// runno
|
|
{
|
|
const Attr& attr = attrs[id];
|
|
attr.set_value(row, &runno, sizeof(runno));
|
|
id++;
|
|
}
|
|
// option
|
|
{
|
|
const Attr& attr = attrs[id];
|
|
uint len = strlen(option);
|
|
attr.set_value(row, option, len);
|
|
id++;
|
|
}
|
|
// value
|
|
{
|
|
const Attr& attr = attrs[id];
|
|
attr.set_value(row, &value, sizeof(value));
|
|
id++;
|
|
}
|
|
require(id == attrs.size());
|
|
}
|
|
|
|
void
|
|
NdbImportUtil::set_stats_row(Row* row,
|
|
uint32 runno,
|
|
const Stat& stat,
|
|
bool global)
|
|
{
|
|
const Table& table = c_stats_table;
|
|
const Attrs& attrs = table.m_attrs;
|
|
const uint g_offset = !global ? 0 : 1000;
|
|
const char* g_prefix = !global ? 0 : "g";
|
|
// floats
|
|
double obsf = (double)stat.m_obs;
|
|
double sum1 = stat.m_sum1;
|
|
double sum2 = stat.m_sum2;
|
|
uint id = 0;
|
|
// runno
|
|
{
|
|
const Attr& attr = attrs[id];
|
|
attr.set_value(row, &runno, sizeof(runno));
|
|
id++;
|
|
}
|
|
// id
|
|
{
|
|
const Attr& attr = attrs[id];
|
|
uint idval = stat.m_id + g_offset;
|
|
attr.set_value(row, &idval, sizeof(idval));
|
|
id++;
|
|
}
|
|
// name
|
|
{
|
|
const Attr& attr = attrs[id];
|
|
if (g_prefix == 0)
|
|
{
|
|
uint namelen = strlen(stat.m_name);
|
|
attr.set_value(row, stat.m_name, namelen);
|
|
}
|
|
else
|
|
{
|
|
Name name(g_prefix, stat.m_name);
|
|
uint namelen = strlen(name.str());
|
|
attr.set_value(row, name.str(), namelen);
|
|
}
|
|
id++;
|
|
}
|
|
// parent
|
|
{
|
|
const Attr& attr = attrs[id];
|
|
uint parentval = stat.m_id == 0 ? stat.m_parent : stat.m_parent + g_offset;
|
|
attr.set_value(row, &parentval, sizeof(parentval));
|
|
id++;
|
|
}
|
|
// obs
|
|
{
|
|
const Attr& attr = attrs[id];
|
|
attr.set_value(row, &stat.m_obs, sizeof(stat.m_obs));
|
|
id++;
|
|
}
|
|
// sum
|
|
{
|
|
const Attr& attr = attrs[id];
|
|
attr.set_value(row, &stat.m_sum, sizeof(stat.m_sum));
|
|
id++;
|
|
}
|
|
// mean
|
|
{
|
|
const Attr& attr = attrs[id];
|
|
double mean = 0.0;
|
|
if (stat.m_obs != 0)
|
|
mean = sum1 / obsf;
|
|
if (!std::isfinite(mean))
|
|
mean = 0.0;
|
|
attr.set_value(row, &mean, sizeof(mean));
|
|
id++;
|
|
}
|
|
// min
|
|
{
|
|
const Attr& attr = attrs[id];
|
|
attr.set_value(row, &stat.m_min, sizeof(stat.m_min));
|
|
id++;
|
|
}
|
|
// max
|
|
{
|
|
const Attr& attr = attrs[id];
|
|
attr.set_value(row, &stat.m_max, sizeof(stat.m_max));
|
|
id++;
|
|
}
|
|
// stddev
|
|
{
|
|
const Attr& attr = attrs[id];
|
|
double stddev = 0.0;
|
|
if (stat.m_obs != 0)
|
|
stddev = ::sqrt((obsf * sum2 - (sum1 * sum1)) / (obsf * obsf));
|
|
if (!std::isfinite(stddev))
|
|
stddev = 0.0;
|
|
attr.set_value(row, &stddev, sizeof(stddev));
|
|
id++;
|
|
}
|
|
require(id == attrs.size());
|
|
}
|
|
|
|
void
|
|
NdbImportUtil::set_error_attrs(Row* row,
|
|
const Table& table,
|
|
const Error& error,
|
|
uint& id)
|
|
{
|
|
const Attrs& attrs = table.m_attrs;
|
|
// errortype
|
|
{
|
|
const Attr& attr = attrs[id];
|
|
const char* errortype = error.gettypetext();
|
|
attr.set_value(row, errortype, strlen(errortype));
|
|
id++;
|
|
}
|
|
// errorcode
|
|
{
|
|
const Attr& attr = attrs[id];
|
|
int32 errorcode = error.code;
|
|
attr.set_value(row, &errorcode, sizeof(errorcode));
|
|
id++;
|
|
}
|
|
// sourceline
|
|
{
|
|
const Attr& attr = attrs[id];
|
|
uint32 errorline = error.line;
|
|
attr.set_value(row, &errorline, sizeof(errorline));
|
|
id++;
|
|
}
|
|
// errortext
|
|
{
|
|
const Attr& attr = attrs[id];
|
|
attr.set_value(row, error.text, strlen(error.text));
|
|
id++;
|
|
}
|
|
}
|
|
|
|
// buf
|
|
|
|
NdbImportUtil::Buf::Buf(bool split) :
|
|
m_split(split)
|
|
{
|
|
m_allocptr = 0;
|
|
m_allocsize = 0;
|
|
m_data = 0;
|
|
m_size = 0;
|
|
m_top = 0;
|
|
m_start = 0;
|
|
m_tail = 0;
|
|
m_len = 0;
|
|
m_eof = false;
|
|
m_pos = 0;
|
|
m_lineno = 0;
|
|
}
|
|
|
|
NdbImportUtil::Buf::~Buf()
|
|
{
|
|
delete [] m_allocptr;
|
|
}
|
|
|
|
void
|
|
NdbImportUtil::Buf::alloc(uint pagesize, uint pagecnt)
|
|
{
|
|
require(m_allocptr == 0);
|
|
require(pagesize != 0 && (pagesize & (pagesize - 1)) == 0);
|
|
require(pagecnt != 0);
|
|
uint size = pagesize * pagecnt;
|
|
uint allocsize = size + (pagesize - 1) + 1;
|
|
uchar* allocptr = new uchar [allocsize];
|
|
uchar* data = allocptr;
|
|
uint misalign = (UintPtr)data & (pagesize - 1);
|
|
if (misalign != 0)
|
|
{
|
|
data += (pagesize - misalign);
|
|
uint misalign2 = (UintPtr)data & (pagesize - 1);
|
|
require(misalign2 == 0);
|
|
}
|
|
require(data + size < allocptr + allocsize);
|
|
m_allocptr = allocptr;
|
|
m_allocsize = allocsize;
|
|
m_data = data;
|
|
m_size = size;
|
|
m_top = 0;
|
|
m_start = 0;
|
|
m_tail = 0;
|
|
m_len = 0;
|
|
if (m_split)
|
|
{
|
|
require(pagecnt % 2 == 0);
|
|
m_top = size / 2;
|
|
m_start = m_top;
|
|
}
|
|
}
|
|
|
|
void
|
|
NdbImportUtil::Buf::copy(const uchar* src, uint len)
|
|
{
|
|
require(m_start + m_len + len < m_allocsize);
|
|
memcpy(&m_data[m_start + m_len], src, len);
|
|
m_len += len;
|
|
m_data[m_start + m_len] = 0;
|
|
}
|
|
|
|
void
|
|
NdbImportUtil::Buf::reset()
|
|
{
|
|
m_start = 0;
|
|
if (m_split)
|
|
{
|
|
require(2 * m_top == m_size);
|
|
m_start = m_top;
|
|
}
|
|
m_tail = 0;
|
|
m_len = 0;
|
|
m_eof = false;
|
|
m_pos = 0;
|
|
m_lineno = 0;
|
|
}
|
|
|
|
int
|
|
NdbImportUtil::Buf::movetail(Buf& dst)
|
|
{
|
|
require(m_tail <= m_len);
|
|
uint bytes = m_len - m_tail;
|
|
if (bytes > dst.m_start)
|
|
{
|
|
return -1;
|
|
}
|
|
const uchar* srcptr = &m_data[m_start + m_tail];
|
|
uchar* dstptr = &dst.m_data[dst.m_start - bytes];
|
|
memcpy(dstptr, srcptr, bytes);
|
|
m_len = m_tail;
|
|
dst.m_start -= bytes;
|
|
dst.m_len += bytes;
|
|
return 0;
|
|
}
|
|
|
|
NdbOut&
|
|
operator<<(NdbOut& out, const NdbImportUtil::Buf& buf)
|
|
{
|
|
out << "allocsize=" << buf.m_allocsize;
|
|
out << " size=" << buf.m_size;
|
|
out << " top=" << buf.m_top;
|
|
out << " start=" << buf.m_start;
|
|
out << " tail=" << buf.m_tail;
|
|
out << " len=" << buf.m_len;
|
|
out << " eof=" << buf.m_eof;
|
|
const uchar* dataptr = &buf.m_data[buf.m_start];
|
|
{
|
|
char dst[100];
|
|
uint len = buf.m_len;
|
|
if (len > 10)
|
|
len = 10;
|
|
NdbImportUtil::pretty_print(dst, &dataptr[0], len);
|
|
out << " buf=" << "0:" << dst;
|
|
}
|
|
{
|
|
char dst[100];
|
|
require(buf.m_len >= buf.m_pos);
|
|
uint n = buf.m_len - buf.m_pos;
|
|
if (n > 10)
|
|
n = 10;
|
|
NdbImportUtil::pretty_print(dst, &dataptr[buf.m_pos], n);
|
|
out << " pos=" << buf.m_pos << ":" << dst;
|
|
}
|
|
{
|
|
out << " lineno=" << buf.m_lineno;
|
|
}
|
|
return out;
|
|
}
|
|
|
|
void
|
|
NdbImportUtil::pretty_print(char* dst, const void* ptr, uint len)
|
|
{
|
|
const char* p = (const char*)ptr;
|
|
char *s = dst;
|
|
for (uint i = 0; i < len; i++)
|
|
{
|
|
int c = p[i];
|
|
if (isascii(c) && isprint(c))
|
|
sprintf(s, "%c", c);
|
|
else if (c == '\n')
|
|
sprintf(s, "%s", "\\n");
|
|
else
|
|
sprintf(s, "<%02X>", (uchar)c);
|
|
s += strlen(s);
|
|
}
|
|
}
|
|
|
|
// files
|
|
|
|
NdbImportUtil::File::File(NdbImportUtil& util, Error& error) :
|
|
m_util(util),
|
|
m_error(error)
|
|
{
|
|
m_fd = -1;
|
|
m_flags = 0;
|
|
}
|
|
|
|
NdbImportUtil::File::~File()
|
|
{
|
|
if (m_fd != -1)
|
|
(void)::close(m_fd);
|
|
}
|
|
|
|
int
|
|
NdbImportUtil::File::do_open(int flags)
|
|
{
|
|
const char* path = get_path();
|
|
require(m_fd == -1);
|
|
#ifndef _WIN32
|
|
int fd = ::open(path, flags, Creat_mode);
|
|
#else
|
|
int fd = ::_open(path, flags, Creat_mode);
|
|
#endif
|
|
if (fd == -1)
|
|
{
|
|
const char* type = "unknown";
|
|
if (flags == Read_flags)
|
|
type = "read";
|
|
if (flags == Write_flags)
|
|
type = "write";
|
|
if (flags == Append_flags)
|
|
type = "append";
|
|
m_util.set_error_os(m_error, __LINE__,
|
|
"%s: open for %s failed", path, type);
|
|
return -1;
|
|
}
|
|
m_fd = fd;
|
|
m_flags = flags;
|
|
return 0;
|
|
}
|
|
|
|
int
|
|
NdbImportUtil::File::do_read(uchar* dst, uint size, uint& len)
|
|
{
|
|
const char* path = get_path();
|
|
require(m_fd != -1);
|
|
len = 0;
|
|
while (len < size)
|
|
{
|
|
// short read is possible on pipe
|
|
#ifndef _WIN32
|
|
int ret = ::read(m_fd, dst + len, size - len);
|
|
#else
|
|
int ret = ::_read(m_fd, dst + len, size - len);
|
|
#endif
|
|
if (ret == -1)
|
|
{
|
|
m_util.set_error_os(m_error, __LINE__,
|
|
"%s: read %u bytes failed", path, size - len);
|
|
return -1;
|
|
}
|
|
uint n = (uint)ret;
|
|
if (n == 0)
|
|
break;
|
|
len += n;
|
|
require(len <= size);
|
|
}
|
|
return 0;
|
|
}
|
|
|
|
int
|
|
NdbImportUtil::File::do_read(Buf& buf)
|
|
{
|
|
uint dstpos = buf.m_start + buf.m_len;
|
|
require(dstpos == buf.m_top);
|
|
require(dstpos <= buf.m_size);
|
|
uchar* dst = buf.m_data + dstpos;
|
|
uint size = buf.m_size - dstpos;
|
|
uint len = 0;
|
|
if (do_read(dst, size, len) == -1)
|
|
return -1;
|
|
buf.m_eof = (len == 0);
|
|
buf.m_len += len;
|
|
uint endpos = buf.m_start + buf.m_len;
|
|
require(endpos <= buf.m_size);
|
|
require(endpos < buf.m_allocsize);
|
|
buf.m_data[endpos] = 0;
|
|
return 0;
|
|
}
|
|
|
|
int
|
|
NdbImportUtil::File::do_write(const uchar* src, uint size)
|
|
{
|
|
const char* path = get_path();
|
|
require(m_fd != -1);
|
|
#ifndef _WIN32
|
|
int ret = ::write(m_fd, src, size);
|
|
#else
|
|
int ret = ::_write(m_fd, src, size);
|
|
#endif
|
|
if (ret == -1)
|
|
{
|
|
m_util.set_error_os(m_error, __LINE__,
|
|
"%s: write %u bytes failed", path, size);
|
|
return -1;
|
|
}
|
|
// short write is considered error
|
|
if (uint(ret) != size)
|
|
{
|
|
m_util.set_error_os(m_error, __LINE__,
|
|
"%s: short write %u < %u", path, (uint)ret, size);
|
|
return -1;
|
|
}
|
|
return 0;
|
|
}
|
|
|
|
int
|
|
NdbImportUtil::File::do_write(const Buf& buf)
|
|
{
|
|
const uchar* src = &buf.m_data[buf.m_start];
|
|
uint len = buf.m_len;
|
|
if (do_write(src, len) == -1)
|
|
return -1;
|
|
return 0;
|
|
}
|
|
|
|
int
|
|
NdbImportUtil::File::do_close()
|
|
{
|
|
const char* path = get_path();
|
|
if (m_fd == -1)
|
|
return 0;
|
|
#ifndef _WIN32
|
|
if (::close(m_fd) == -1)
|
|
#else
|
|
if (::_close(m_fd) == -1)
|
|
#endif
|
|
{
|
|
m_util.set_error_os(m_error, __LINE__,
|
|
"%s: close failed", path);
|
|
return -1;
|
|
}
|
|
m_fd = -1;
|
|
return 0;
|
|
}
|
|
|
|
int
|
|
NdbImportUtil::File::do_seek(uint64 offset)
|
|
{
|
|
const char* path = get_path();
|
|
require(m_fd != -1);
|
|
#ifndef _WIN32
|
|
off_t off = (off_t)offset;
|
|
if (::lseek(m_fd, off, SEEK_SET) == -1)
|
|
#else
|
|
__int64 off = (__int64)offset;
|
|
if (::_lseeki64(m_fd, off, SEEK_SET) == -1)
|
|
#endif
|
|
{
|
|
m_util.set_error_os(m_error, __LINE__,
|
|
"%s: lseek %" PRIu64 " failed", path, offset);
|
|
return -1;
|
|
}
|
|
return 0;
|
|
}
|
|
|
|
// stats
|
|
|
|
NdbImportUtil::Stat::Stat(Stats& stats,
|
|
uint id,
|
|
const char* name,
|
|
uint parent,
|
|
uint level,
|
|
uint flags) :
|
|
m_stats(stats),
|
|
m_id(id),
|
|
m_name(name),
|
|
m_parent(parent),
|
|
m_level(level),
|
|
m_flags(flags)
|
|
{
|
|
m_childcnt = 0;
|
|
m_firstchild = StatNULL;
|
|
m_lastchild = StatNULL;
|
|
m_nextchild = StatNULL;
|
|
if (m_parent != StatNULL)
|
|
{
|
|
Stat* parentstat = m_stats.get(m_parent);
|
|
if (parentstat->m_childcnt == 0)
|
|
{
|
|
parentstat->m_firstchild = m_id;
|
|
parentstat->m_lastchild = m_id;
|
|
}
|
|
else
|
|
{
|
|
Stat* lastchildstat = m_stats.get(parentstat->m_lastchild);
|
|
require(lastchildstat->m_nextchild == StatNULL);
|
|
lastchildstat->m_nextchild = m_id;
|
|
parentstat->m_lastchild = m_id;
|
|
}
|
|
parentstat->m_childcnt++;
|
|
}
|
|
reset();
|
|
}
|
|
|
|
void
|
|
NdbImportUtil::Stat::add(uint64 val)
|
|
{
|
|
uint id = m_id;
|
|
do
|
|
{
|
|
Stat* stat = m_stats.get(id);
|
|
stat->m_obs++;
|
|
stat->m_sum += val;
|
|
if (stat->m_obs == 1)
|
|
{
|
|
stat->m_min = val;
|
|
stat->m_max = val;
|
|
}
|
|
else
|
|
{
|
|
if (stat->m_min > val)
|
|
stat->m_min = val;
|
|
if (stat->m_max < val)
|
|
stat->m_max = val;
|
|
}
|
|
stat->m_sum1 += double(val);
|
|
stat->m_sum2 += double(val) * double(val);
|
|
id = stat->m_parent;
|
|
// root level stats are not useful
|
|
} while (id != 0);
|
|
}
|
|
|
|
void
|
|
NdbImportUtil::Stat::reset()
|
|
{
|
|
m_obs = 0;
|
|
m_sum = 0;
|
|
m_min = 0;
|
|
m_max = 0;
|
|
m_sum1 = 0.0;
|
|
m_sum2 = 0.0;
|
|
}
|
|
|
|
NdbImportUtil::Stats::Stats(NdbImportUtil& util) :
|
|
m_util(util)
|
|
{
|
|
Stat* rootstat = new Stat(*this, 0, "root", StatNULL, 0, 0);
|
|
m_stats.push_back(rootstat);
|
|
validate();
|
|
}
|
|
|
|
NdbImportUtil::Stats::~Stats()
|
|
{
|
|
for (uint i = 0; i < m_stats.size(); i++)
|
|
{
|
|
Stat* stat = get(i);
|
|
delete stat;
|
|
}
|
|
}
|
|
|
|
NdbImportUtil::Stat*
|
|
NdbImportUtil::Stats::create(const char* name, uint parent, uint flags)
|
|
{
|
|
lock();
|
|
{
|
|
Stat* stat = find(name);
|
|
if (stat != 0)
|
|
{
|
|
log_debug(2, "use existing " << stat->m_name << " id=" << stat->m_id);
|
|
unlock();
|
|
return stat;
|
|
}
|
|
}
|
|
Stat* parentstat = get(parent);
|
|
uint parentlevel = parentstat->m_level;
|
|
uint id = m_stats.size();
|
|
Stat* stat = new Stat(*this, id, name, parent, parentlevel + 1, flags);
|
|
m_stats.push_back(stat);
|
|
log_debug(2, "created stat id=" << stat->m_id << " name=" << stat->m_name);
|
|
validate();
|
|
unlock();
|
|
return stat;
|
|
}
|
|
|
|
NdbImportUtil::Stat*
|
|
NdbImportUtil::Stats::get(uint i)
|
|
{
|
|
require(i < m_stats.size());
|
|
Stat* stat = m_stats[i];
|
|
require(stat != 0);
|
|
require(stat->m_id == i);
|
|
return stat;
|
|
}
|
|
|
|
const NdbImportUtil::Stat*
|
|
NdbImportUtil::Stats::get(uint i) const
|
|
{
|
|
require(i < m_stats.size());
|
|
const Stat* stat = m_stats[i];
|
|
require(stat != 0);
|
|
require(stat->m_id == i);
|
|
return stat;
|
|
}
|
|
|
|
NdbImportUtil::Stat*
|
|
NdbImportUtil::Stats::find(const char* name) const
|
|
{
|
|
for (uint i = 0; i < m_stats.size(); i++)
|
|
{
|
|
Stat* stat = m_stats[i];
|
|
require(stat != 0);
|
|
if (strcmp(stat->m_name, name) == 0)
|
|
return stat;
|
|
}
|
|
return 0;
|
|
}
|
|
|
|
void
|
|
NdbImportUtil::Stats::add(uint id, uint64 val)
|
|
{
|
|
Stat* stat = get(id);
|
|
stat->add(val);
|
|
}
|
|
|
|
const NdbImportUtil::Stat*
|
|
NdbImportUtil::Stats::next(uint id) const
|
|
{
|
|
require(id < m_stats.size());
|
|
const Stat* stat = m_stats[id];
|
|
require(stat != 0);
|
|
if (stat->m_firstchild != StatNULL)
|
|
{
|
|
stat = get(stat->m_firstchild);
|
|
return stat;
|
|
}
|
|
while (1)
|
|
{
|
|
if (stat->m_nextchild != StatNULL)
|
|
{
|
|
stat = get(stat->m_nextchild);
|
|
return stat;
|
|
}
|
|
if (stat->m_parent == StatNULL)
|
|
{
|
|
break;
|
|
}
|
|
stat = get(stat->m_parent);
|
|
}
|
|
return 0;
|
|
}
|
|
|
|
void
|
|
NdbImportUtil::Stats::reset()
|
|
{
|
|
for (uint i = 0; i < m_stats.size(); i++)
|
|
{
|
|
Stat* stat = get(i);
|
|
stat->reset();
|
|
}
|
|
}
|
|
|
|
#if defined(VM_TRACE) || defined(TEST_NDBIMPORTUTIL)
|
|
void
|
|
NdbImportUtil::Stats::validate() const
|
|
{
|
|
bool* seen = new bool [m_stats.size()];
|
|
Validate v(StatNULL, 0, 0, seen);
|
|
for (uint i = 0; i < m_stats.size(); i++)
|
|
v.m_seen[i] = false;
|
|
(void)validate(v);
|
|
for (uint i = 0; i < m_stats.size(); i++)
|
|
require(v.m_seen[i] == true);
|
|
delete [] seen;
|
|
}
|
|
|
|
const NdbImportUtil::Stat*
|
|
NdbImportUtil::Stats::validate(Validate& v) const
|
|
{
|
|
const Stat* stat = get(v.m_id);
|
|
require(stat->m_parent == v.m_parent);
|
|
require(stat->m_id == v.m_id);
|
|
require(stat->m_level == v.m_level);
|
|
const Stat* stat2 = find(stat->m_name);
|
|
require(stat == stat2);
|
|
require(v.m_seen[v.m_id] == false);
|
|
v.m_seen[v.m_id] = true;
|
|
uint sibling = stat->m_nextchild;
|
|
if (sibling != StatNULL)
|
|
{
|
|
Validate v2(v.m_parent, sibling, v.m_level, v.m_seen);
|
|
validate(v2);
|
|
}
|
|
uint child = stat->m_firstchild;
|
|
if (child != StatNULL)
|
|
{
|
|
Validate v2(v.m_id, child, v.m_level + 1, v.m_seen);
|
|
validate(v2);
|
|
}
|
|
return stat;
|
|
}
|
|
#endif
|
|
|
|
NdbOut&
|
|
operator<<(NdbOut& out, const NdbImportUtil::Stats& stats)
|
|
{
|
|
out << "stats ";
|
|
return out;
|
|
}
|
|
|
|
// timer
|
|
|
|
NdbImportUtil::Timer::Timer()
|
|
{
|
|
// make sure initialized to avoid assert
|
|
start();
|
|
stop();
|
|
}
|
|
|
|
void
|
|
NdbImportUtil::Timer::start()
|
|
{
|
|
m_start = NdbTick_getCurrentTicks();
|
|
}
|
|
|
|
void
|
|
NdbImportUtil::Timer::stop()
|
|
{
|
|
m_stop = NdbTick_getCurrentTicks();
|
|
if (NdbTick_Compare(m_start, m_stop) > 0)
|
|
{
|
|
// not worth crashing
|
|
m_start = m_stop;
|
|
}
|
|
struct ndb_rusage ru;
|
|
if (Ndb_GetRUsage(&ru, false) == 0)
|
|
{
|
|
m_utime_msec = ru.ru_utime / 1000;
|
|
m_stime_msec = ru.ru_stime / 1000;
|
|
}
|
|
}
|
|
|
|
uint64
|
|
NdbImportUtil::Timer::elapsed_sec() const
|
|
{
|
|
return NdbTick_Elapsed(m_start, m_stop).seconds();
|
|
}
|
|
|
|
uint64
|
|
NdbImportUtil::Timer::elapsed_msec() const
|
|
{
|
|
return NdbTick_Elapsed(m_start, m_stop).milliSec();
|
|
}
|
|
|
|
uint64
|
|
NdbImportUtil::Timer::elapsed_usec() const
|
|
{
|
|
return NdbTick_Elapsed(m_start, m_stop).microSec();
|
|
}
|
|
|
|
// error
|
|
|
|
void
|
|
NdbImportUtil::set_error_gen(Error& error, int line,
|
|
const char* fmt, ...)
|
|
{
|
|
c_error_lock.lock();
|
|
new (&error) Error;
|
|
error.line = line;
|
|
error.type = Error::Type_gen;
|
|
if (fmt != 0)
|
|
{
|
|
va_list ap;
|
|
va_start(ap, fmt);
|
|
vsnprintf(error.text, sizeof(error.text), fmt, ap);
|
|
va_end(ap);
|
|
}
|
|
log_debug(1, "E " << error);
|
|
if (c_opt.m_abort_on_error)
|
|
abort();
|
|
c_error_lock.unlock();
|
|
}
|
|
|
|
void
|
|
NdbImportUtil::set_error_usage(Error& error, int line,
|
|
const char* fmt, ...)
|
|
{
|
|
c_error_lock.lock();
|
|
new (&error) Error;
|
|
error.line = line;
|
|
error.type = Error::Type_usage;
|
|
if (fmt != 0)
|
|
{
|
|
va_list ap;
|
|
va_start(ap, fmt);
|
|
vsnprintf(error.text, sizeof(error.text), fmt, ap);
|
|
va_end(ap);
|
|
}
|
|
log_debug(1, "E " << error);
|
|
if (c_opt.m_abort_on_error)
|
|
abort();
|
|
c_error_lock.unlock();
|
|
}
|
|
|
|
void
|
|
NdbImportUtil::set_error_alloc(Error& error, int line)
|
|
{
|
|
c_error_lock.lock();
|
|
new (&error) Error;
|
|
error.line = line;
|
|
error.type = Error::Type_alloc;
|
|
log_debug(1, "E " << error);
|
|
if (c_opt.m_abort_on_error)
|
|
abort();
|
|
c_error_lock.unlock();
|
|
}
|
|
|
|
void
|
|
NdbImportUtil::set_error_mgm(Error& error, int line,
|
|
NdbMgmHandle handle)
|
|
{
|
|
c_error_lock.lock();
|
|
new (&error) Error;
|
|
error.line = line;
|
|
error.type = Error::Type_mgm;
|
|
error.code = ndb_mgm_get_latest_error(handle);
|
|
snprintf(error.text, sizeof(error.text),
|
|
"%s", ndb_mgm_get_latest_error_msg(handle));
|
|
log_debug(1, "E " << error);
|
|
if (c_opt.m_abort_on_error)
|
|
abort();
|
|
c_error_lock.unlock();
|
|
}
|
|
|
|
void
|
|
NdbImportUtil::set_error_con(Error& error, int line,
|
|
const Ndb_cluster_connection* con)
|
|
{
|
|
c_error_lock.lock();
|
|
new (&error) Error;
|
|
error.line = line;
|
|
error.type = Error::Type_con;
|
|
error.code = con->get_latest_error();
|
|
snprintf(error.text, sizeof(error.text), "%s", con->get_latest_error_msg());
|
|
log_debug(1, "E " << error);
|
|
if (c_opt.m_abort_on_error)
|
|
abort();
|
|
c_error_lock.unlock();
|
|
}
|
|
|
|
void
|
|
NdbImportUtil::set_error_ndb(Error& error, int line,
|
|
const NdbError& ndberror, const char* fmt, ...)
|
|
{
|
|
c_error_lock.lock();
|
|
new (&error) Error;
|
|
error.line = line;
|
|
error.type = Error::Type_ndb;
|
|
error.code = ndberror.code;
|
|
snprintf(error.text, sizeof(error.text), "%s", ndberror.message);
|
|
log_debug(1, "E " << error);
|
|
if (c_opt.m_abort_on_error)
|
|
abort();
|
|
c_error_lock.unlock();
|
|
}
|
|
|
|
void
|
|
NdbImportUtil::set_error_os(Error& error, int line,
|
|
const char* fmt, ...)
|
|
{
|
|
c_error_lock.lock();
|
|
new (&error) Error;
|
|
error.line = line;
|
|
error.type = Error::Type_os;
|
|
error.code = errno;
|
|
if (fmt != 0)
|
|
{
|
|
va_list ap;
|
|
va_start(ap, fmt);
|
|
vsnprintf(error.text, sizeof(error.text), fmt, ap);
|
|
va_end(ap);
|
|
}
|
|
{
|
|
uint len = strlen(error.text);
|
|
if (len < sizeof(error.text))
|
|
{
|
|
uint left = sizeof(error.text) - len;
|
|
snprintf(&error.text[len], left, ": errno=%u: %s",
|
|
errno, strerror(errno));
|
|
}
|
|
}
|
|
log_debug(1, "E " << error);
|
|
if (c_opt.m_abort_on_error)
|
|
abort();
|
|
c_error_lock.unlock();
|
|
}
|
|
|
|
void
|
|
NdbImportUtil::set_error_data(Error& error, int line,
|
|
int code, const char* fmt, ...)
|
|
{
|
|
c_error_lock.lock();
|
|
new (&error) Error;
|
|
error.line = line;
|
|
error.type = Error::Type_data;
|
|
error.code = code;
|
|
if (fmt != 0)
|
|
{
|
|
va_list ap;
|
|
va_start(ap, fmt);
|
|
vsnprintf(error.text, sizeof(error.text), fmt, ap);
|
|
va_end(ap);
|
|
}
|
|
log_debug(1, "E " << error);
|
|
if (c_opt.m_abort_on_error)
|
|
abort();
|
|
c_error_lock.unlock();
|
|
}
|
|
|
|
void
|
|
NdbImportUtil::copy_error(Error& error, const Error& error2)
|
|
{
|
|
error.type = error2.type;
|
|
error.code = error2.code;
|
|
error.line = error2.line;
|
|
memcpy(error.text, error2.text, sizeof(error.text));
|
|
}
|
|
|
|
bool NdbImportUtil::g_stop_all = false;
|
|
|
|
void
|
|
NdbImportUtil::fmt_msec_to_hhmmss(char* str, uint64 msec)
|
|
{
|
|
uint sec = msec / 1000;
|
|
uint hh = sec / 3600;
|
|
sec -= hh * 3600;
|
|
uint mm = sec / 60;
|
|
sec -= mm * 60;
|
|
uint ss = sec;
|
|
uint ff = msec - sec * 1000;
|
|
(void)ff;
|
|
sprintf(str, "%uh%um%us", hh, mm, ss);
|
|
}
|
|
|
|
// unittest
|
|
|
|
#ifdef TEST_NDBIMPORTUTIL
|
|
|
|
typedef NdbImportUtil::ListEnt UtilListEnt;
|
|
typedef NdbImportUtil::List UtilList;
|
|
typedef NdbImportUtil::Table UtilTable;
|
|
typedef NdbImportUtil::RowCtl UtilRowCtl;
|
|
typedef NdbImportUtil::Row UtilRow;
|
|
typedef NdbImportUtil::RowList UtilRowList;
|
|
typedef NdbImportUtil::Range UtilRange;
|
|
typedef NdbImportUtil::RangeList UtilRangeList;
|
|
typedef NdbImportUtil::RowMap UtilRowMap;
|
|
typedef NdbImportUtil::Buf UtilBuf;
|
|
typedef NdbImportUtil::File UtilFile;
|
|
typedef NdbImportUtil::Name UtilName;
|
|
typedef NdbImportUtil::Stat UtilStat;
|
|
typedef NdbImportUtil::Stats UtilStats;
|
|
|
|
#include <NdbTap.hpp>
|
|
#include <ndb_rand.h>
|
|
#include <NdbEnv.h>
|
|
|
|
// increase size of some tests if release-compiled
|
|
static bool mybigtest = false;
|
|
|
|
static uint
|
|
myrandom()
|
|
{
|
|
return (uint)ndb_rand() * (uint)ndb_rand(); // < 2**30
|
|
}
|
|
|
|
static uint
|
|
myrandom(uint m)
|
|
{
|
|
require(m != 0);
|
|
uint n = myrandom();
|
|
return n % m;
|
|
}
|
|
|
|
struct MyRec : UtilListEnt {
|
|
uint m_index;
|
|
bool m_member;
|
|
MyRec() {
|
|
m_index = Inval_uint;
|
|
m_member = false;
|
|
}
|
|
};
|
|
|
|
struct MyRecs : UtilList {
|
|
};
|
|
|
|
static int
|
|
testlist()
|
|
{
|
|
ndbout << "testlist" << endl;
|
|
NdbImportUtil util;
|
|
util.c_opt.m_log_level = 3;
|
|
MyRecs recs;
|
|
const uint poolsize = 256;
|
|
MyRec* recpool = new MyRec [poolsize];
|
|
for (uint n = 0; n < poolsize; n++)
|
|
{
|
|
MyRec* rec = &recpool[n];
|
|
rec->m_index = n;
|
|
}
|
|
const uint numops = 1024 * poolsize;
|
|
uint ops[5];
|
|
ops[0] = 0; // push back
|
|
ops[1] = 0; // pop front
|
|
ops[2] = 0; // remove
|
|
ops[3] = 0; // push after
|
|
ops[4] = 0; // push before
|
|
uint max_occup = 0;
|
|
for (uint numop = 0; numop < numops; numop++)
|
|
{
|
|
MyRec* rec = 0;
|
|
while (1)
|
|
{
|
|
uint n = myrandom(poolsize);
|
|
rec = &recpool[n];
|
|
if (rec->m_member)
|
|
{
|
|
if (myrandom(100) < 80)
|
|
continue;
|
|
}
|
|
break;
|
|
}
|
|
if (!rec->m_member)
|
|
{
|
|
recs.push_back(rec);
|
|
rec->m_member = true;
|
|
ops[0]++;
|
|
}
|
|
else if (myrandom(100) < 50)
|
|
{
|
|
MyRec* rec = static_cast<MyRec*>(recs.pop_front());
|
|
require(rec != 0);
|
|
rec->m_member = false;
|
|
ops[1]++;
|
|
}
|
|
else if (myrandom(100) < 50)
|
|
{
|
|
recs.remove(rec);
|
|
rec->m_member = false;
|
|
ops[2]++;
|
|
}
|
|
else
|
|
{
|
|
uint n2 = myrandom(poolsize);
|
|
MyRec* rec2 = &recpool[n2];
|
|
if (!rec2->m_member)
|
|
{
|
|
if (myrandom(100) < 50)
|
|
{
|
|
recs.push_after(rec, rec2);
|
|
rec2->m_member = true;
|
|
ops[3]++;
|
|
}
|
|
else
|
|
{
|
|
recs.push_before(rec, rec2);
|
|
rec2->m_member = true;
|
|
ops[4]++;
|
|
}
|
|
}
|
|
}
|
|
if (max_occup < recs.m_cnt)
|
|
max_occup = recs.m_cnt;
|
|
}
|
|
uint last_occup = recs.m_cnt;
|
|
for (MyRec* rec = static_cast<MyRec*>(recs.m_front);
|
|
rec != 0;
|
|
rec = static_cast<MyRec*>(rec->m_next))
|
|
{
|
|
uint n = rec->m_index;
|
|
require(n < poolsize);
|
|
require(rec == &recpool[n]);
|
|
require(rec->m_member);
|
|
}
|
|
for (uint n = 0; n < poolsize; n++)
|
|
{
|
|
MyRec* rec = &recpool[n];
|
|
if (rec->m_member)
|
|
{
|
|
recs.remove(rec);
|
|
rec->m_member = false;
|
|
}
|
|
}
|
|
require(recs.m_cnt == 0);
|
|
uint ins = ops[0] + ops[3] + ops[4];
|
|
uint del = ops[1] + ops[2];
|
|
require(last_occup == ins - del);
|
|
delete [] recpool;
|
|
ndbout << "max_occup=" << max_occup << endl;
|
|
ndbout << "last_occup=" << last_occup << endl;
|
|
ndbout << "push_back: " << ops[0] << endl;
|
|
ndbout << "pop_front: " << ops[1] << endl;
|
|
ndbout << "remove: " << ops[2] << endl;
|
|
ndbout << "push_after: " << ops[3] << endl;
|
|
ndbout << "push_before: " << ops[4] << endl;
|
|
return 0;
|
|
}
|
|
|
|
static int
|
|
testrowlist1()
|
|
{
|
|
ndbout << "testrowlist1" << endl;
|
|
NdbImportUtil util;
|
|
UtilTable table;
|
|
table.add_pseudo_attr("a", NdbDictionary::Column::Unsigned);
|
|
table.add_pseudo_attr("b", NdbDictionary::Column::Varchar, 10);
|
|
const uint loops = !mybigtest ? 100 : 1000;
|
|
const uint rows = !mybigtest ? 1000 : 10000;
|
|
for (uint loop = 0; loop < loops; loop++)
|
|
{
|
|
UtilRowList list1;
|
|
UtilRowList list2;
|
|
while (list1.cnt() < rows)
|
|
{
|
|
uint cnt = 1 + myrandom(rows - list1.cnt());
|
|
util.alloc_rows(table, cnt, list1);
|
|
}
|
|
require(list1.cnt() == rows);
|
|
list2.push_back_from(list1);
|
|
require(list1.cnt() == 0);
|
|
require(list2.cnt() == rows);
|
|
util.free_rows(list2);
|
|
require(list2.cnt() == 0);
|
|
require(util.c_rows_free->cnt() == rows);
|
|
}
|
|
return 0;
|
|
}
|
|
|
|
static int
|
|
testrowlist2()
|
|
{
|
|
ndbout << "testrowlist2" << endl;
|
|
NdbImportUtil util;
|
|
UtilTable table;
|
|
table.add_pseudo_attr("a", NdbDictionary::Column::Unsigned);
|
|
table.add_pseudo_attr("b", NdbDictionary::Column::Varchar, 10);
|
|
const uint loops = !mybigtest ? 100 : 1000;
|
|
const uint rows = !mybigtest ? 1000 : 10000;
|
|
{
|
|
UtilRowList list;
|
|
util.alloc_rows(table, rows, list);
|
|
util.free_rows(list);
|
|
}
|
|
for (uint loop = 0; loop < loops; loop++)
|
|
{
|
|
UtilRowList list1;
|
|
UtilRowList list2;
|
|
list1.m_rowbatch = 1 + myrandom(rows);
|
|
list2.m_rowbatch = 1 + myrandom(rows);
|
|
const uint cnt = myrandom(rows + 1);
|
|
util.alloc_rows(table, cnt, list1);
|
|
require(list1.cnt() == cnt);
|
|
do
|
|
{
|
|
uint timeout = myrandom(10) != 0 ? 0 : 10;
|
|
UtilRowCtl ctl(timeout);
|
|
list2.push_back_from(list1, ctl);
|
|
util.free_rows(list2);
|
|
} while (list1.cnt() != 0);
|
|
require(util.c_rows_free->cnt() == rows);
|
|
util.alloc_rows(table, cnt, list1);
|
|
require(list1.cnt() == cnt);
|
|
do
|
|
{
|
|
uint timeout = myrandom(10) != 0 ? 0 : 10;
|
|
UtilRowCtl ctl(timeout);
|
|
list1.pop_front_to(list2, ctl);
|
|
util.free_rows(list2);
|
|
} while (list1.cnt() != 0);
|
|
require(util.c_rows_free->cnt() == rows);
|
|
}
|
|
return 0;
|
|
}
|
|
|
|
static int
|
|
testrowmap1()
|
|
{
|
|
ndbout << "testrowmap1" << endl;
|
|
NdbImportUtil util;
|
|
/*
|
|
* Create random ascending ranges. This does not represent
|
|
* a valid rowmap because gaps between ranges can be zero.
|
|
*/
|
|
const uint maxranges = 1000;
|
|
const uint numranges = myrandom(maxranges);
|
|
ndbout << "numranges = " << numranges << endl;
|
|
const uint maxgap = 5;
|
|
const uint maxcount = 10;
|
|
const uint maxrowid = maxranges * (maxcount + maxgap);
|
|
UtilRange tstranges[maxranges];
|
|
bool rowexist[maxrowid];
|
|
uint toprowid = 0;
|
|
for (uint k = 0; k < maxrowid; k++)
|
|
rowexist[k] = false;
|
|
{
|
|
uint start = 0;
|
|
for (uint i = 0; i < numranges; i++)
|
|
{
|
|
UtilRange& r = tstranges[i];
|
|
uint gap = myrandom(maxgap + 1);
|
|
uint count = 1 + myrandom(maxcount);
|
|
r.m_start = start + gap;
|
|
r.m_end = r.m_start + count;
|
|
r.m_reject = myrandom(1 + count);
|
|
start = r.m_end;
|
|
for (uint k = r.m_start; k < r.m_end; k++)
|
|
rowexist[k] = true;
|
|
toprowid = r.m_end;
|
|
}
|
|
}
|
|
ndbout << "toprowid=" << toprowid << endl;
|
|
//
|
|
ndbout << "map1: create in ascending order" << endl;
|
|
UtilRowMap map1(util);
|
|
for (uint i = 0; i < numranges; i++)
|
|
{
|
|
UtilRange r = tstranges[i];
|
|
map1.add(r);
|
|
}
|
|
ndbout << "map1: " << map1.size() << endl;
|
|
for (uint k = 0; k < toprowid; k++)
|
|
{
|
|
if (rowexist[k])
|
|
require(map1.find(k) != 0);
|
|
else
|
|
require(map1.find(k) == 0);
|
|
}
|
|
uint reorder[maxranges];
|
|
{
|
|
for (uint i = 0; i < numranges; i++)
|
|
reorder[i] = i;
|
|
for (uint i = 0; i < numranges; i++)
|
|
{
|
|
uint j = myrandom(numranges);
|
|
uint k = reorder[i];
|
|
reorder[i] = reorder[j];
|
|
reorder[j] = k;
|
|
}
|
|
}
|
|
//
|
|
ndbout << "map2: create in random order" << endl;
|
|
UtilRowMap map2(util);
|
|
for (uint i = 0; i < numranges; i++)
|
|
{
|
|
uint j = reorder[i];
|
|
UtilRange r = tstranges[j];
|
|
map2.add(r);
|
|
}
|
|
ndbout << "map2: " << map2.size() << endl;
|
|
require(map1.equal(map2));
|
|
//
|
|
ndbout << "map3: create from 2 random pieces" << endl;
|
|
UtilRowMap map3(util);
|
|
UtilRowMap map3a(util);
|
|
UtilRowMap map3b(util);
|
|
for (uint i = 0; i < numranges; i++)
|
|
{
|
|
UtilRange r = tstranges[i];
|
|
if (myrandom(100) < 50)
|
|
map3a.add(r);
|
|
else
|
|
map3b.add(r);
|
|
}
|
|
ndbout << "map3a: " << map3a.size() << endl;
|
|
//ndbout << map3a;
|
|
ndbout << "map3b: " << map3b.size() << endl;
|
|
//ndbout << map3b;
|
|
ndbout << "add map3a" << endl;
|
|
map3.add(map3a);
|
|
ndbout << "add map3b" << endl;
|
|
map3.add(map3b);
|
|
ndbout << "map3: " << map3.size() << endl;
|
|
//ndbout << map3;
|
|
require(map1.equal(map3));
|
|
//
|
|
ndbout << "map4: delete all in random order" << endl;
|
|
UtilRowMap map4(util);
|
|
map4.add(map1);
|
|
ndbout << "map4: " << map4.size() << endl;
|
|
bool rowexist4[maxrowid];
|
|
for (uint k = 0; k < maxrowid; k++)
|
|
rowexist4[k] = rowexist[k];
|
|
while (map4.size() != 0)
|
|
{
|
|
uint k = myrandom(toprowid);
|
|
if (rowexist4[k])
|
|
{
|
|
require(map4.remove(k) == true);
|
|
rowexist4[k] = false;
|
|
}
|
|
}
|
|
ndbout << "map4: " << map4.size() << endl;
|
|
return 0;
|
|
}
|
|
|
|
// an old "intrusive" test, fix later
|
|
static int
|
|
testrowmap2()
|
|
{
|
|
#if 0
|
|
ndbout << "testrowmap2" << endl;
|
|
const uint maxranges = 10000;
|
|
//
|
|
ndbout << "map1: create manually" << endl;
|
|
UtilRowMap map1;
|
|
UtilRowMap::Ranges& ranges1 = map1.m_ranges;
|
|
{
|
|
uint64 start = 0;
|
|
for (uint i = 0; i < maxranges; i++)
|
|
{
|
|
UtilRowMap::Range r;
|
|
uint64 gap = myrandom(10);
|
|
uint64 count = 1 + myrandom(10);
|
|
r.m_start = start + gap;
|
|
r.m_end = r.m_start + count;
|
|
r.m_reject = myrandom(1 + count / 10);
|
|
ranges1.push_back(r);
|
|
start = r.m_end;
|
|
}
|
|
}
|
|
ndbout << "map1: size " << ranges1.size() << endl;
|
|
require(ranges1.size() == maxranges);
|
|
//
|
|
ndbout << "map2: create via merge from map1" << endl;
|
|
UtilRowMap map2;
|
|
UtilRowMap::Ranges& ranges2 = map2.m_ranges;
|
|
{
|
|
uint i = 0;
|
|
while (i < maxranges)
|
|
{
|
|
UtilRowMap::Range r = ranges1[i];
|
|
uint j = i + 1;
|
|
while (j < maxranges)
|
|
{
|
|
UtilRowMap::Range r2 = ranges1[j];
|
|
if (r.m_end < r2.m_start)
|
|
break;
|
|
r.m_end += r2.m_end - r2.m_start;
|
|
r.m_reject += r2.m_reject;
|
|
j++;
|
|
}
|
|
ranges2.push_back(r);
|
|
i = j;
|
|
}
|
|
}
|
|
ndbout << "map2: size " << ranges2.size() << endl;
|
|
//
|
|
ndbout << "map3: create via util from map1" << endl;
|
|
UtilRowMap map3;
|
|
UtilRowMap::Ranges& ranges3 = map3.m_ranges;
|
|
{
|
|
uint* reorder = new uint [maxranges];
|
|
for (uint i = 0; i < maxranges; i++)
|
|
{
|
|
reorder[i] = i;
|
|
}
|
|
for (uint i = 0; i < maxranges; i++)
|
|
{
|
|
if (myrandom(10) == 0)
|
|
{
|
|
uint m = 100;
|
|
if (m > maxranges - i)
|
|
m = maxranges - i;
|
|
uint j = i + myrandom(m);
|
|
// std::swap
|
|
uint k = reorder[i];
|
|
reorder[i] = reorder[j];
|
|
reorder[j] = k;
|
|
}
|
|
}
|
|
for (uint i = 0; i < maxranges; i++)
|
|
{
|
|
uint j = reorder[i];
|
|
UtilRowMap::Range r = ranges1[j];
|
|
map3.add(r);
|
|
}
|
|
delete [] reorder;
|
|
}
|
|
ndbout << "map3: size " << ranges3.size() << endl;
|
|
//
|
|
ndbout << "map2 vs map3: verify" << endl;
|
|
require(ranges2.size() == ranges3.size());
|
|
for (uint i = 0; i < ranges2.size(); i++)
|
|
{
|
|
const UtilRowMap::Range r2 = ranges2[i];
|
|
const UtilRowMap::Range r3 = ranges3[i];
|
|
require(r2.m_start == r3.m_start);
|
|
require(r2.m_end == r3.m_end);
|
|
require(r2.m_reject == r2.m_reject);
|
|
}
|
|
//
|
|
ndbout << "mark existing" << endl;
|
|
struct Mark {
|
|
Mark(uint64 size) {
|
|
m_mark = new bool [size];
|
|
m_size = size;
|
|
for (uint64 i = 0; i < m_size; i++)
|
|
m_mark[i] = false;
|
|
m_cnt = 0;
|
|
}
|
|
~Mark() {
|
|
delete [] m_mark;
|
|
};
|
|
void set(uint64 pos, uint64 cnt) {
|
|
require(pos + cnt <= m_size);
|
|
for(uint64 i = pos; i < pos + cnt; i++) {
|
|
require(m_mark[i] == false);
|
|
m_mark[i] = true;
|
|
m_cnt++;
|
|
}
|
|
}
|
|
bool get(uint64 pos) {
|
|
require(pos < m_size);
|
|
return m_mark[pos];
|
|
}
|
|
void del(uint64 pos) {
|
|
require(pos < m_size);
|
|
require(m_mark[pos] == true);
|
|
m_mark[pos] = false;
|
|
require(m_cnt != 0);
|
|
m_cnt--;
|
|
}
|
|
bool* m_mark;
|
|
uint64 m_size;
|
|
uint64 m_cnt;
|
|
};
|
|
uint64 maxid = ranges3.back().m_end;
|
|
Mark mark(maxid);
|
|
for (uint i = 0; i < ranges3.size(); i++)
|
|
{
|
|
const UtilRowMap::Range r3 = ranges3[i];
|
|
mark.set(r3.m_start, r3.m_end - r3.m_start);
|
|
}
|
|
ndbout << "ids: size=" << mark.m_size << " cnt=" << mark.m_cnt << endl;
|
|
//
|
|
ndbout << "test find" << endl;
|
|
for (uint64 id = 0; id < maxid; id++)
|
|
{
|
|
UtilRowMap::Iterator it;
|
|
if (mark.get(id) == false)
|
|
require(map3.find(id, it) == false);
|
|
if (mark.get(id) == true)
|
|
require(map3.find(id, it) == true);
|
|
}
|
|
//
|
|
ndbout << "test remove: random 50%" << endl;
|
|
uint64 oldcnt = mark.m_cnt;
|
|
while (oldcnt < mark.m_cnt * 2)
|
|
{
|
|
uint64 id = myrandom() % mark.m_size;
|
|
if (mark.get(id))
|
|
{
|
|
map3.remove(id);
|
|
mark.del(id);
|
|
}
|
|
}
|
|
//
|
|
ndbout << "test remove: rest " << mark.m_cnt << endl;
|
|
for (uint64 id = 0; id < mark.m_size; id++)
|
|
{
|
|
if (mark.get(id))
|
|
{
|
|
map3.remove(id);
|
|
mark.del(id);
|
|
}
|
|
}
|
|
require(map3.empty());
|
|
require(mark.m_cnt == 0);
|
|
#endif
|
|
return 0;
|
|
}
|
|
|
|
static int
|
|
testbuf()
|
|
{
|
|
ndbout << "testbuf" << endl;
|
|
uint loops = 128 * 1024;
|
|
uint min_off = UINT_MAX;
|
|
uint max_off = 0;
|
|
for (uint n = 0; n < loops; n++)
|
|
{
|
|
UtilBuf* buf = new UtilBuf;
|
|
uint pagesize_log2 = 1 + myrandom(15);
|
|
uint pagesize = (1 << pagesize_log2);
|
|
uint pagecnt = 1 + myrandom(1024);
|
|
buf->alloc(pagesize, pagecnt);
|
|
uint off = buf->m_data - buf->m_allocptr;
|
|
if (min_off > off)
|
|
min_off = off;
|
|
if (max_off < off)
|
|
max_off = off;
|
|
delete buf;
|
|
}
|
|
ndbout << "min_off=" << min_off << " max_off=" << max_off << endl;
|
|
return 0;
|
|
}
|
|
|
|
static int
|
|
testprint()
|
|
{
|
|
ndbout << "testprint" << endl;
|
|
uchar buf[256];
|
|
char dst[5 * 256];
|
|
for (uint i = 0; i < 256; i++)
|
|
buf[i] = i;
|
|
NdbImportUtil::pretty_print(dst, buf, 256);
|
|
ndbout << dst << endl;
|
|
return 0;
|
|
}
|
|
|
|
static int
|
|
testfile()
|
|
{
|
|
ndbout << "testfile" << endl;
|
|
NdbImportUtil util;
|
|
const char* path = "test.csv";
|
|
struct stat st;
|
|
if (stat(path, &st) == -1)
|
|
{
|
|
ndbout << path << ": skip on errno " << errno << endl;
|
|
return 0;
|
|
}
|
|
for (int split = 0; split <= 1; split++)
|
|
{
|
|
ndbout << "read " << path << " buf split=" << split << endl;
|
|
UtilBuf buf(split);
|
|
buf.alloc(4096, 8);
|
|
UtilFile file(util, util.c_error);
|
|
file.set_path(path);
|
|
require(file.do_open(UtilFile::Read_flags) == 0);
|
|
uint totlen = 0;
|
|
uint totread = 0;
|
|
while (1)
|
|
{
|
|
buf.reset();
|
|
int ret = file.do_read(buf);
|
|
require(ret == 0);
|
|
if (buf.m_eof)
|
|
break;
|
|
totlen += buf.m_len;
|
|
totread++;
|
|
}
|
|
require(totlen == st.st_size);
|
|
ndbout << "len=" << totlen << " reads=" << totread << endl;
|
|
require(file.do_close() == 0);
|
|
}
|
|
return 0;
|
|
}
|
|
|
|
static int
|
|
teststat()
|
|
{
|
|
ndbout << "teststat" << endl;
|
|
NdbImportUtil util;
|
|
util.c_opt.m_log_level = 3;
|
|
UtilStats stats(util);
|
|
static const uint stattot = 256;
|
|
uint statcnt = stats.m_stats.size();
|
|
require(statcnt == 1);
|
|
{
|
|
const UtilStat* stat = stats.find("root");
|
|
require(stat != 0);
|
|
require(stat->m_id == 0);
|
|
}
|
|
for (uint i = 1; i < stattot; i++)
|
|
{
|
|
UtilName name("test", i);
|
|
uint parent = myrandom(statcnt);
|
|
uint flags = 0;
|
|
const UtilStat* stat = stats.create(name, parent, flags);
|
|
require(stat != 0);
|
|
require(stat->m_id == statcnt);
|
|
require(strcmp(stat->m_name, name) == 0);
|
|
statcnt++;
|
|
require(statcnt == stats.m_stats.size());
|
|
}
|
|
require(statcnt == stattot);
|
|
for (uint k = 0; k < 10 * stattot; k++)
|
|
{
|
|
uint i = myrandom(statcnt);
|
|
require(i < statcnt);
|
|
UtilStat* stat = stats.get(i);
|
|
require(stat != 0);
|
|
if (i == 0)
|
|
{
|
|
UtilName name("root");
|
|
require(strcmp(stat->m_name, name) == 0);
|
|
UtilStat* stat2 = stats.find(name);
|
|
require(stat == stat2);
|
|
}
|
|
else
|
|
{
|
|
UtilName name("test", i);
|
|
require(strcmp(stat->m_name, name) == 0);
|
|
UtilStat* stat2 = stats.find(name);
|
|
require(stat == stat2);
|
|
uint v =myrandom();
|
|
stat->add(v);
|
|
}
|
|
}
|
|
// iter
|
|
bool iterseen[stattot];
|
|
for (uint i = 0; i < stattot; i++)
|
|
iterseen[i] = false;
|
|
// root is skipped
|
|
const UtilStat* stat = stats.next(0);
|
|
iterseen[0] = true;
|
|
uint itercnt = 1;
|
|
while (stat != 0)
|
|
{
|
|
itercnt++;
|
|
require(itercnt <= statcnt);
|
|
const uint i = stat->m_id;
|
|
require(i < stattot);
|
|
require(iterseen[i] == false);
|
|
iterseen[i] = true;
|
|
for (uint j = i; j != 0; )
|
|
{
|
|
const UtilStat* stat = stats.get(j);
|
|
require(stat != 0);
|
|
require(iterseen[j] == true);
|
|
ndbout << j;
|
|
j = stat->m_parent;
|
|
if (j != 0)
|
|
ndbout << " ";
|
|
else
|
|
ndbout << endl;
|
|
}
|
|
stat = stats.next(i);
|
|
}
|
|
for (uint i = 0; i < stattot; i++)
|
|
require(iterseen[i] == true);
|
|
require(itercnt == statcnt);
|
|
return 0;
|
|
}
|
|
|
|
static void
|
|
myseed()
|
|
{
|
|
const char* p = NdbEnv_GetEnv("TEST_NDBIMPORTUTIL_SEED", (char*)0, 0);
|
|
unsigned seed = p != 0 ? (unsigned)atoi(p) : (uint)NdbHost_GetProcessId();
|
|
ndbout << "seed=" << seed << endl;
|
|
ndb_srand(seed);
|
|
}
|
|
|
|
static bool
|
|
mycase(const char* name)
|
|
{
|
|
const char* p = NdbEnv_GetEnv("TEST_NDBIMPORTUTIL_CASE", (char*)0, 0);
|
|
return p == 0 || strcmp(p, name) == 0;
|
|
}
|
|
|
|
static int
|
|
testmain()
|
|
{
|
|
ndb_init();
|
|
#ifdef VM_TRACE
|
|
signal(SIGABRT, SIG_DFL);
|
|
signal(SIGSEGV, SIG_DFL);
|
|
#endif
|
|
mybigtest =
|
|
#ifdef VM_TRACE
|
|
false;
|
|
#else
|
|
true;
|
|
#endif
|
|
myseed();
|
|
if (mycase("testlist") && testlist() != 0)
|
|
return -1;
|
|
if (mycase("testrowlist1") && testrowlist1() != 0)
|
|
return -1;
|
|
if (mycase("testrowlist2") && testrowlist2() != 0)
|
|
return -1;
|
|
if (mycase("testrowmap1") && testrowmap1() != 0)
|
|
return -1;
|
|
if (mycase("testrowmap2") && testrowmap2() != 0)
|
|
return -1;
|
|
if (mycase("testbuf") && testbuf() != 0)
|
|
return -1;
|
|
if (mycase("testfile") && testfile() != 0)
|
|
return -1;
|
|
if (mycase("testprint") && testprint() != 0)
|
|
return -1;
|
|
if (mycase("teststat") && teststat() != 0)
|
|
return -1;
|
|
struct ndb_rusage ru;
|
|
require(Ndb_GetRUsage(&ru, false) == 0);
|
|
ndbout << "utime=" << ru.ru_utime/1000
|
|
<< " stime=" << ru.ru_stime/1000 << " (ms)" << endl;
|
|
return 0;
|
|
}
|
|
|
|
TAPTEST(NdbImportUtil)
|
|
{
|
|
int ret = testmain();
|
|
return (ret == 0);
|
|
}
|
|
|
|
#endif
|