/* Copyright (c) 2018, 2019, Oracle and/or its affiliates. All rights reserved. This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License, version 2.0, as published by the Free Software Foundation. This program is also distributed with certain software (including but not limited to OpenSSL) that is licensed under separate terms, as designated in a particular file or component or in included license documentation. The authors of MySQL hereby grant you an additional permission to link the program and your derivative works with the separately licensed software that they have included with MySQL. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License, version 2.0, for more details. You should have received a copy of the GNU General Public License along with this program; if not, write to the Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */ #include "NdbImportUtil.hpp" #include "m_ctype.h" #include "my_sys.h" #include #include #include // STL #include NdbImportUtil::NdbImportUtil() : m_util(*this), c_stats(*this) { log_debug(1, "ctor"); c_rows_free = new RowList; c_rows_free->set_stats(m_util.c_stats, "rows-free"); c_blobs_free = new BlobList; add_pseudo_tables(); } NdbImportUtil::~NdbImportUtil() { log_debug(1, "dtor"); delete c_blobs_free; delete c_rows_free; require(c_tables.m_tables.empty()); } NdbOut& operator<<(NdbOut& out, const NdbImportUtil& util) { out << "util "; return out; } // DebugLogger NdbImportUtil::DebugLogger::DebugLogger() { logfile = new FileOutputStream(stderr); out = new NdbOut(* logfile); mutex = NdbMutex_Create(); require(mutex != 0); timer.start(); start.timer = & timer; start.mutex = mutex; stop.mutex = mutex; } NdbImportUtil::DebugLogger::~DebugLogger() { delete logfile; delete out; NdbMutex_Destroy(mutex); } NdbOut& operator<<(NdbOut& out, const NdbImportUtil::DebugLogger::MessageStart & start) { /* Lock the logger mutex, obtain a timestamp, and print it. */ char buf[32]; NdbMutex_Lock(start.mutex); start.timer->stop(); double t = (double) start.timer->elapsed_msec() / (double)1000.0; sprintf(buf, "%8.3f: ", t); out << buf; return out; } NdbOut& operator<<(NdbOut& out, const NdbImportUtil::DebugLogger::MessageStop & stop) { /* Print a final newline, then unlock the logger mutex. */ out << "\n"; NdbMutex_Unlock(stop.mutex); return out; } // name NdbImportUtil::Name::Name(const char* s) { require(s != 0); m_str = s; } NdbImportUtil::Name::Name(const char* s, const char* t) { require(s != 0 && t != 0); m_str = s; m_str += "-"; m_str += t; } NdbImportUtil::Name::Name(const char* s, uint t) { require(s != 0); m_str = s; m_str += "-"; char b[100]; sprintf(b, "%u", t); m_str += b; } NdbOut& operator<<(NdbOut& out, const NdbImportUtil::Name& name) { out << (const char*)name; return out; } // lockable NdbImportUtil::Lockable::Lockable() { m_mutex = NdbMutex_Create(); m_condition = NdbCondition_Create(); require(m_mutex != 0 && m_condition != 0); } NdbImportUtil::Lockable::~Lockable() { NdbCondition_Destroy(m_condition); NdbMutex_Destroy(m_mutex); } void NdbImportUtil::Lockable::lock() { NdbMutex_Lock(m_mutex); } void NdbImportUtil::Lockable::unlock() { NdbMutex_Unlock(m_mutex); } void NdbImportUtil::Lockable::wait(uint timeout) { NdbCondition_WaitTimeout(m_condition, m_mutex, timeout); } void NdbImportUtil::Lockable::signal() { NdbCondition_Signal(m_condition); } // thread NdbImportUtil::Thread::Thread() { m_thread = 0; } NdbImportUtil::Thread::~Thread() { if (m_thread != 0) NdbThread_Destroy(&m_thread); } void NdbImportUtil::Thread::join() { require(m_thread != 0); NdbThread_WaitFor(m_thread, (void**)0); NdbThread_Destroy(&m_thread); } // list NdbImportUtil::ListEnt::ListEnt() { m_next = 0; m_prev = 0; } NdbImportUtil::ListEnt::~ListEnt() { } NdbImportUtil::List::List() { m_front = 0; m_back = 0; m_cnt = 0; m_maxcnt = 0; m_totcnt = 0; m_stat_occup = 0; m_stat_total = 0; } NdbImportUtil::List::~List() { ListEnt* ent; while ((ent = pop_front()) != 0) delete ent; } void NdbImportUtil::List::set_stats(Stats& stats, const char* name) { { const Name statname(name, "occup"); Stat* stat = stats.create(statname, 0, 0); m_stat_occup = stat; } { const Name statname(name, "total"); Stat* stat = stats.create(statname, 0, 0); m_stat_total = stat; } } void NdbImportUtil::List::push_back(ListEnt* ent) { require(ent != 0); require(ent->m_next == 0 && ent->m_prev == 0); if (m_cnt == 0) { m_front = m_back = ent; ent->m_next = 0; ent->m_prev = 0; } else { m_back->m_next = ent; ent->m_next = 0; ent->m_prev = m_back; m_back = ent; } m_cnt++; if (m_maxcnt < m_cnt) { m_maxcnt++; require(m_maxcnt == m_cnt); } m_totcnt++; validate(); if (m_stat_occup != 0) m_stat_occup->add(m_cnt); if (m_stat_total != 0) m_stat_total->add(1); } void NdbImportUtil::List::push_front(ListEnt* ent) { require(ent != 0); require(ent->m_next == 0 && ent->m_prev == 0); if (m_cnt == 0) { m_front = m_back = ent; ent->m_next = 0; ent->m_prev = 0; } else { m_front->m_prev = ent; ent->m_prev = 0; ent->m_next = m_front; m_front = ent; } m_cnt++; if (m_maxcnt < m_cnt) { m_maxcnt++; require(m_maxcnt == m_cnt); } m_totcnt++; validate(); if (m_stat_occup != 0) m_stat_occup->add(m_cnt); if (m_stat_total != 0) m_stat_total->add(1); } void NdbImportUtil::List::push_after(ListEnt* ent1, ListEnt* ent2) { require(ent1 != 0 && ent2 != 0); require(ent2->m_next == 0 && ent2->m_prev == 0); ListEnt* ent3 = ent1->m_next; if (ent3 == 0) { push_back(ent2); return; } ent1->m_next = ent2; ent2->m_prev = ent1; ent2->m_next = ent3; ent3->m_prev = ent2; m_cnt++; if (m_maxcnt < m_cnt) { m_maxcnt++; require(m_maxcnt == m_cnt); } m_totcnt++; validate(); if (m_stat_occup != 0) m_stat_occup->add(m_cnt); if (m_stat_total != 0) m_stat_total->add(1); } void NdbImportUtil::List::push_before(ListEnt* ent1, ListEnt* ent2) { require(ent1 != 0 && ent2 != 0); require(ent2->m_next == 0 && ent2->m_prev == 0); ListEnt* ent3 = ent1->m_prev; if (ent3 == 0) { push_front(ent2); return; } ent1->m_prev = ent2; ent2->m_next = ent1; ent2->m_prev = ent3; ent3->m_next = ent2; m_cnt++; if (m_maxcnt < m_cnt) { m_maxcnt++; require(m_maxcnt == m_cnt); } m_totcnt++; validate(); if (m_stat_occup != 0) m_stat_occup->add(m_cnt); if (m_stat_total != 0) m_stat_total->add(1); } NdbImportUtil::ListEnt* NdbImportUtil::List::pop_front() { ListEnt* ent = 0; if (m_cnt != 0) { if (m_cnt == 1) { ent = m_front; m_front = m_back = 0; } else { ent = m_front; m_front = ent->m_next; m_front->m_prev = 0; ent->m_next = 0; ent->m_prev = 0; } m_cnt--; validate(); if (m_stat_occup != 0) m_stat_occup->add(m_cnt); } return ent; } void NdbImportUtil::List::remove(ListEnt* ent) { ListEnt* prev = ent->m_prev; ListEnt* next = ent->m_next; ent->m_prev = 0; ent->m_next = 0; if (prev != 0) prev->m_next = next; if (next != 0) next->m_prev = prev; if (m_front == ent) m_front = next; if (m_back == ent) m_back = prev; require(m_cnt != 0); m_cnt--; validate(); if (m_stat_occup != 0) m_stat_occup->add(m_cnt); } void NdbImportUtil::List::push_back_from(List& src) { if (src.m_cnt != 0) { if (m_cnt != 0) { ListEnt* ent1 = m_back; ListEnt* ent2 = src.m_front; require(ent1 != 0 && ent2 != 0); require(ent1->m_next == 0 && ent2->m_prev == 0); // push src to the back ent1->m_next = ent2; ent2->m_prev = ent1; m_back = src.m_back; m_cnt += src.m_cnt; } else { m_front = src.m_front; m_back = src.m_back; m_cnt = src.m_cnt; } if (m_maxcnt < m_cnt) m_maxcnt = m_cnt; m_totcnt += src.m_cnt; } validate(); // erase src but leave stats alone src.m_front = 0; src.m_back = 0; src.m_cnt = 0; } #if defined(VM_TRACE) || defined(TEST_NDBIMPORTUTIL) void NdbImportUtil::List::validate() const { if (m_cnt == 0) { require(m_front == 0); require(m_back == 0); } else { require(m_front != 0); require(m_front->m_prev == 0); require(m_back != 0); require(m_back->m_next == 0); if (m_cnt == 1) require(m_front == m_back); else require(m_front != m_back); } #if defined(VM_TRACE) && defined(TEST_NDBIMPORTUTIL) uint cnt = 0; ListEnt* ent1 = m_front; ListEnt* ent2 = 0; while (ent1 != 0) { require(ent1->m_prev == ent2); if (ent2 != 0) require(ent2->m_next == ent1); ent2 = ent1; ent1 = ent1->m_next; cnt++; } require(m_cnt == cnt); #endif } #endif // attrs NdbImportUtil::Attr::Attr() { m_attrname = ""; m_attrno = Inval_uint; m_attrid = Inval_uint; m_type = NdbDictionary::Column::Undefined; m_charset = 0; memset(m_sqltype, 0, sizeof(m_sqltype)); m_pk = false; m_nullable = false; m_precision = 0; m_scale = 0; m_length = 0; m_charlength = 0; m_arraytype = NdbDictionary::Column::ArrayTypeFixed; m_inlinesize = 0; m_partsize = 0; m_blobtable = 0; m_size = 0; m_pad = false; m_padchar = 0; m_quotable = false; m_isblob = false; m_blobno = Inval_uint; m_offset = 0; m_null_byte = Inval_uint; m_null_bit = Inval_uint; } void NdbImportUtil::Attr::set_value(Row* row, const void* data, uint len) const { require(!m_isblob); require(data != 0); uint totlen = m_arraytype + len; require(totlen <= m_size); require(m_offset + totlen <= row->m_recsize); uchar* p = &row->m_data[m_offset]; switch (m_arraytype) { case 0: break; case 1: require(len <= 0xff); p[0] = (uchar)len; p += 1; break; case 2: require(len <= 0xffff); p[0] = (uchar)(len & 0xff); p[1] = (uchar)(len >> 8); p += 2; break; default: require(false); break; } memcpy(p, data, len); if (m_pad) memset(&p[totlen], m_padchar, m_size - totlen); if (m_nullable) set_null(row, false); } void NdbImportUtil::Attr::set_blob(Row* row, const void* data, uint len) const { require(m_isblob); require(data != 0); require(m_blobno < row->m_blobs.size()); Blob* blob = row->m_blobs[m_blobno]; require(blob != 0); blob->resize(len); memcpy(blob->m_data, data, len); blob->m_blobsize = len; if (m_nullable) set_null(row, false); // add to rowsize which already includes recsize require(row->m_rowsize >= row->m_recsize); row->m_rowsize += len; } void NdbImportUtil::Attr::set_null(Row* row, bool null) const { uchar* data = row->m_data; uchar mask = (1 << m_null_bit); if (null) data[m_null_byte] |= mask; else data[m_null_byte] &= ~mask; } const uchar* NdbImportUtil::Attr::get_value(const Row* row) const { const uchar* p = &row->m_data[m_offset]; return p; } void NdbImportUtil::Attr::get_value(const Row* row, uint32& value) const { const uchar* p = get_value(row); if (m_type == NdbDictionary::Column::Unsigned) { memcpy(&value, p, sizeof(value)); return; } require(false); } void NdbImportUtil::Attr::get_value(const Row* row, uint64& value) const { const uchar* p = get_value(row); if (m_type == NdbDictionary::Column::Bigunsigned) { memcpy(&value, p, sizeof(value)); return; } require(false); } void NdbImportUtil::Attr::get_value(const Row* row, char* buf, uint bufsz) const { const uchar* p = get_value(row); memset(buf, 0, bufsz); if (m_type == NdbDictionary::Column::Varchar) { uint sz = p[0]; require(sz < bufsz); memcpy(buf, &p[1], sz); return; } if (m_type == NdbDictionary::Column::Longvarchar) { uint sz = p[0] | (p[1] << 8); require(sz < bufsz); memcpy(buf, &p[2], sz); return; } require(false); } bool NdbImportUtil::Attr::get_null(const Row* row) const { bool null = false; if (m_nullable) { const uchar* data = row->m_data; uchar mask = (1 << m_null_bit); null = (bool)(data[m_null_byte] & mask); } return null; } uint NdbImportUtil::Attr::get_blob_parts(uint len) const { require(m_isblob); uint parts = 0; if (len > m_inlinesize) { require(m_partsize != 0); parts = (len -m_inlinesize + m_partsize - 1) / m_partsize; } return parts; } // could go in NdbDictionary void NdbImportUtil::Attr::set_sqltype() { m_sqltype[0] = 0; switch (m_type) { case NdbDictionary::Column::Tinyint: sprintf(m_sqltype, "tinyint"); break; case NdbDictionary::Column::Smallint: sprintf(m_sqltype, "smallint"); break; case NdbDictionary::Column::Mediumint: sprintf(m_sqltype, "mediumint"); break; case NdbDictionary::Column::Int: sprintf(m_sqltype, "int"); break; case NdbDictionary::Column::Bigint: sprintf(m_sqltype, "bigint"); break; case NdbDictionary::Column::Tinyunsigned: sprintf(m_sqltype, "tinyint unsigned"); break; case NdbDictionary::Column::Smallunsigned: sprintf(m_sqltype, "smallint unsigned"); break; case NdbDictionary::Column::Mediumunsigned: sprintf(m_sqltype, "mediumint unsigned"); break; case NdbDictionary::Column::Unsigned: sprintf(m_sqltype, "int unsigned"); break; case NdbDictionary::Column::Bigunsigned: sprintf(m_sqltype, "bigint unsigned"); break; case NdbDictionary::Column::Decimal: sprintf(m_sqltype, "decimal"); break; case NdbDictionary::Column::Decimalunsigned: sprintf(m_sqltype, "decimal unsigned"); break; case NdbDictionary::Column::Float: sprintf(m_sqltype, "float"); break; case NdbDictionary::Column::Double: sprintf(m_sqltype, "double"); break; case NdbDictionary::Column::Char: require(m_charset != 0 && m_charset->csname != 0); sprintf(m_sqltype, "char(%u) %s", m_charlength, m_charset->csname); break; case NdbDictionary::Column::Varchar: require(m_charset != 0 && m_charset->csname != 0); sprintf(m_sqltype, "varchar(%u) %s", m_charlength, m_charset->csname); break; case NdbDictionary::Column::Longvarchar: require(m_charset != 0 && m_charset->csname != 0); sprintf(m_sqltype, "varchar(%u) %s", m_charlength, m_charset->csname); break; case NdbDictionary::Column::Binary: sprintf(m_sqltype, "binary(%u)", m_length); break; case NdbDictionary::Column::Varbinary: sprintf(m_sqltype, "varbinary(%u)", m_length); break; case NdbDictionary::Column::Longvarbinary: sprintf(m_sqltype, "varbinary(%u)", m_length); break; case NdbDictionary::Column::Bit: sprintf(m_sqltype, "bit(%u)", m_length); break; case NdbDictionary::Column::Year: sprintf(m_sqltype, "year"); break; case NdbDictionary::Column::Date: sprintf(m_sqltype, "date"); break; case NdbDictionary::Column::Time2: if (m_precision == 0) sprintf(m_sqltype, "time"); else sprintf(m_sqltype, "time(%u)", m_precision); break; case NdbDictionary::Column::Datetime2: if (m_precision == 0) sprintf(m_sqltype, "datetime"); else sprintf(m_sqltype, "datetime(%u)", m_precision); break; case NdbDictionary::Column::Timestamp2: if (m_precision == 0) sprintf(m_sqltype, "timestamp"); else sprintf(m_sqltype, "timestamp(%u)", m_precision); break; case NdbDictionary::Column::Blob: sprintf(m_sqltype, "blob"); break; case NdbDictionary::Column::Text: sprintf(m_sqltype, "text"); break; default: sprintf(m_sqltype, "unknown type=%d", (int)m_type); break; } } // tables NdbImportUtil::Table::Table() { m_tabid = Inval_uint; m_tab = 0; m_rec = 0; m_keyrec = NULL; m_recsize = 0; m_has_hidden_pk = false; } void NdbImportUtil::Table::add_pseudo_attr(const char* name, NdbDictionary::Column::Type type, uint length) { const uint id = m_attrs.size(); Attr attr; // ctor sets defaults attr.m_attrname = name; attr.m_attrno = id; attr.m_attrid = id; attr.m_type = type; attr.m_length = length; attr.m_charlength = length; switch (type) { case NdbDictionary::Column::Unsigned: require(length == 1); attr.m_arraytype = NdbDictionary::Column::ArrayTypeFixed; attr.m_size = 4; attr.m_quotable = false; break; case NdbDictionary::Column::Bigunsigned: require(length == 1); attr.m_arraytype = NdbDictionary::Column::ArrayTypeFixed; attr.m_size = 8; attr.m_quotable = false; break; case NdbDictionary::Column::Double: require(length == 1); attr.m_arraytype = NdbDictionary::Column::ArrayTypeFixed; attr.m_size = 8; attr.m_quotable = false; break; case NdbDictionary::Column::Varchar: require(length > 1); attr.m_charset = &my_charset_bin; attr.m_arraytype = NdbDictionary::Column::ArrayTypeShortVar; attr.m_size = 1 + length; attr.m_quotable = true; break; case NdbDictionary::Column::Longvarchar: require(length > 1); attr.m_charset = &my_charset_bin; attr.m_arraytype = NdbDictionary::Column::ArrayTypeMediumVar; attr.m_size = 2 + length; attr.m_quotable = true; break; case NdbDictionary::Column::Text: attr.m_charset = &my_charset_bin; attr.m_inlinesize = 256; attr.m_partsize = 2000; attr.m_isblob = true; attr.m_blobno = m_blobids.size(); m_blobids.push_back(id); attr.m_quotable = true; break; default: require(false); break; }; attr.set_sqltype(); if (id == 0) attr.m_offset = 0; else { const Attr& prevattr = m_attrs[id - 1]; attr.m_offset = prevattr.m_offset + prevattr.m_size; } attr.m_null_byte = Inval_uint; attr.m_null_bit = Inval_uint; m_recsize += attr.m_size; m_attrs.push_back(attr); } const NdbImportUtil::Attr& NdbImportUtil::Table::get_attr(const char* attrname) const { uint i = 0; const uint n = m_attrs.size(); while (i < n) { if (strcmp(m_attrs[i].m_attrname.c_str(), attrname) == 0) break; i++; } require(i < n); return m_attrs[i]; } uint NdbImportUtil::Table::get_nodeid(uint fragid) const { require(fragid < m_fragments.size()); uint nodeid = m_fragments[fragid]; return nodeid; } int NdbImportUtil::add_table(NdbDictionary::Dictionary* dic, const NdbDictionary::Table* tab, uint& tabid, Error& error) { require(tab != 0); require(tab->getObjectStatus() == NdbDictionary::Object::Retrieved); log_debug(1, "add_table: " << tab->getName()); tabid = tab->getObjectId(); // check if mapped already { std::map::const_iterator it; it = c_tables.m_tables.find(tabid); if (it != c_tables.m_tables.end()) { const Table& table = it->second; require(table.m_tab == tab); return 0; } } Table table; do { const NdbRecord* rec = tab->getDefaultRecord(); table.m_tabid = tabid; table.m_tab = tab; table.m_rec = rec; table.m_recsize = NdbDictionary::getRecordRowLength(rec); Attrs& attrs = table.m_attrs; const uint attrcnt = tab->getNoOfColumns(); attrs.reserve(attrcnt); bool ok = true; Uint32 recAttrId; for (uint i = 0; i < attrcnt && ok; i++) { if (i == 0) require(NdbDictionary::getFirstAttrId(rec, recAttrId)); else require(NdbDictionary::getNextAttrId(rec, recAttrId)); require(recAttrId == i); Attr attr; const NdbDictionary::Column* col = tab->getColumn(i); require(col !=0); attr.m_attrname = col->getName(); attr.m_attrno = i; attr.m_attrid = i; attr.m_type = col->getType(); attr.m_pk = col->getPrimaryKey(); attr.m_nullable = col->getNullable(); attr.m_precision = col->getPrecision(); attr.m_scale = col->getScale(); attr.m_length = col->getLength(); attr.m_arraytype = col->getArrayType(); require(attr.m_arraytype <= 2); attr.m_size = col->getSizeInBytes(); switch (attr.m_type) { case NdbDictionary::Column::Char: attr.m_pad = true; attr.m_padchar = 0x20; break; case NdbDictionary::Column::Binary: attr.m_pad = true; attr.m_padchar = 0x0; break; default: attr.m_pad = false; break; } switch (attr.m_type) { case NdbDictionary::Column::Char: case NdbDictionary::Column::Varchar: case NdbDictionary::Column::Longvarchar: attr.m_charset = col->getCharset(); require(attr.m_charset != 0); uint mbmaxlen; mbmaxlen = attr.m_charset->mbmaxlen; require(mbmaxlen != 0); require(attr.m_length % mbmaxlen == 0); attr.m_charlength = attr.m_length / mbmaxlen; attr.m_quotable = true; break; case NdbDictionary::Column::Text: attr.m_charset = col->getCharset(); require(attr.m_charset != 0); break; case NdbDictionary::Column::Binary: case NdbDictionary::Column::Varbinary: case NdbDictionary::Column::Longvarbinary: attr.m_charset = 0; attr.m_charlength = attr.m_length; attr.m_quotable = true; break; default: attr.m_charset = 0; attr.m_charlength = attr.m_length; attr.m_quotable = false; break; } switch (attr.m_type) { case NdbDictionary::Column::Blob: case NdbDictionary::Column::Text: attr.m_isblob = true; attr.m_inlinesize = col->getInlineSize(); attr.m_partsize = col->getPartSize(); attr.m_blobno = table.m_blobids.size(); attr.m_blobtable = col->getBlobTable(); if (attr.m_partsize == 0) require(attr.m_blobtable == 0); else require(attr.m_blobtable != 0); table.m_blobids.push_back(i); break; default: attr.m_isblob = false; break; } attr.set_sqltype(); Uint32 offset; require(NdbDictionary::getOffset(rec, i, offset)); attr.m_offset = offset; Uint32 null_byte, null_bit; require(NdbDictionary::getNullBitOffset(rec, i, null_byte, null_bit)); attr.m_null_byte = null_byte; attr.m_null_bit = null_bit; attrs.push_back(attr); } if (!ok) break; require(!NdbDictionary::getNextAttrId(rec, recAttrId)); NdbDictionary::RecordSpecification speclist[NDB_MAX_NO_OF_ATTRIBUTES_IN_KEY]; uint nkey = 0; for (uint i = 0; i < attrcnt && ok; i++) { const Attr& attr = attrs[i]; if (attr.m_pk) { const NdbDictionary::Column* col = tab->getColumn(i); require(col->getPrimaryKey()); NdbDictionary::RecordSpecification& spec = speclist[nkey]; spec.column = col; spec.offset = attr.m_offset; spec.nullbit_byte_offset = attr.m_null_byte; spec.nullbit_bit_in_byte = attr.m_null_bit; nkey++; // guess hidden pk if (strcmp(attr.m_attrname.c_str(), "$PK") == 0) { if (i + 1 == attrcnt && nkey == 1 && attr.m_type == NdbDictionary::Column::Bigunsigned) table.m_has_hidden_pk = true; else { set_error_usage(error, __LINE__, "column %u: " "invalid use of reserved column name $PK", i); ok = false; break; } } } } if (!ok) break; require(nkey == (uint)tab->getNoOfPrimaryKeys()); const NdbRecord* keyrec = dic->createRecord(tab, speclist, nkey, sizeof(NdbDictionary::RecordSpecification)); if (keyrec == 0) { m_util.set_error_ndb(error, __LINE__, dic->getNdbError()); break; } table.m_keyrec = keyrec; { NdbTableImpl& tabImpl = NdbTableImpl::getImpl(*tab); const Vector& fragments = tabImpl.m_fragments; for (uint i = 0; i < fragments.size(); i++) { uint16 nodeid = fragments[i]; table.m_fragments.push_back(nodeid); } } c_tables.m_tables.insert(std::pair(tabid, table)); return 0; } while (0); return -1; } const NdbImportUtil::Table& NdbImportUtil::get_table(uint tabid) { std::map::const_iterator it; it = c_tables.m_tables.find(tabid); require(it != c_tables.m_tables.end()); const Table& table = it->second; return table; } void NdbImportUtil::remove_table(NdbDictionary::Dictionary* dic, uint tabid) { std::map::const_iterator it; it = c_tables.m_tables.find(tabid); require(it != c_tables.m_tables.end()); const Table& table = it->second; dic->releaseRecord(const_cast(table.m_keyrec)); c_tables.m_tables.erase(it); } // rows NdbImportUtil::Row::Row() { m_tabid = Inval_uint; m_recsize = 0; m_rowsize = 0; m_allocsize = 0; m_rowid = Inval_uint64; m_linenr = Inval_uint64; m_startpos = Inval_uint64; m_endpos = Inval_uint64; m_data = 0; } NdbImportUtil::Row::~Row() { delete [] m_data; for (uint i = 0; i < m_blobs.size(); ++i) { Blob* blob = m_blobs[i]; if (blob != NULL) { delete blob; } } m_blobs.clear(); } void NdbImportUtil::Row::init(const Table& table) { m_tabid = table.m_tabid; uint recsize = table.m_recsize; require(recsize > 0); m_recsize = recsize; m_rowsize = recsize; // full main record is always included if (m_allocsize < recsize) { delete [] m_data; m_data = new uchar [recsize]; m_allocsize = recsize; } } NdbImportUtil::RowList::RowList() { m_rowsize = 0; m_rowbatch = UINT_MAX; m_rowbytes = UINT_MAX; m_eof = false; m_foe = false; m_overflow = 0; m_underflow = 0; m_stat_overflow = 0; m_stat_underflow = 0; m_stat_locks = 0; } NdbImportUtil::RowList::~RowList () { Row *one_row; while ((one_row = pop_front()) != NULL) { delete one_row; } } void NdbImportUtil::RowList::set_stats(Stats& stats, const char* name) { List::set_stats(stats, name); { const Name statname(name, "overflow"); Stat* stat = stats.create(statname, 0, 0); m_stat_overflow = stat; } { const Name statname(name, "underflow"); Stat* stat = stats.create(statname, 0, 0); m_stat_underflow = stat; } { const Name statname(name, "locks"); Stat* stat = stats.create(statname, 0, 0); m_stat_locks = stat; } } bool NdbImportUtil::RowList::push_back(Row* row) { bool ret = false; if (m_cnt < m_rowbatch && m_rowsize < m_rowbytes) { List::push_back(row); m_rowsize += row->m_rowsize; ret = true; } else { m_overflow++; if (m_stat_overflow != 0) m_stat_overflow->add(1); } return ret; } void NdbImportUtil::RowList::push_back_force(Row* row) { List::push_back(row); m_rowsize += row->m_rowsize; } bool NdbImportUtil::RowList::push_front(Row* row) { bool ret = false; if (m_cnt < m_rowbatch && m_rowsize < m_rowbytes) { List::push_front(row); m_rowsize += row->m_rowsize; ret = true; } else { m_overflow++; if (m_stat_overflow != 0) m_stat_overflow->add(1); } return ret; } void NdbImportUtil::RowList::push_front_force(Row* row) { List::push_front(row); m_rowsize += row->m_rowsize; } NdbImportUtil::Row* NdbImportUtil::RowList::pop_front() { Row* row = 0; do { row = static_cast(List::pop_front()); if (row != 0) { require(m_rowsize >= row->m_rowsize); m_rowsize -= row->m_rowsize; break; } m_underflow++; if (m_stat_underflow != 0) m_stat_underflow->add(1); } while (0); return row; } void NdbImportUtil::RowList::remove(Row* row) { List::remove(row); require(m_rowsize >= row->m_rowsize); m_rowsize -= row->m_rowsize; } void NdbImportUtil::RowList::push_back_from(RowList& src) { List::push_back_from(src); m_rowsize += src.m_rowsize; src.m_rowsize = 0; validate(); src.validate(); } /* * Transfer rows from a shared list src to our list. If src is * empty, try to wait. Terminate if our list is full. If any rows * were transferred, do not wait for more, and signal that src now * has fewer rows. */ void NdbImportUtil::RowList::push_back_from(RowList& src, RowCtl& ctl) { uint retries = ctl.m_retries; uint cnt_out = 0; uint bytes_out = 0; if (unlikely(full())) return; while (src.empty() && retries != 0) { if (ctl.m_dowait) src.wait(ctl.m_timeout); retries--; } while (!src.empty()) { // pop because row cannot be on 2 lists Row* row = src.pop_front(); if (push_back(row)) { cnt_out++; bytes_out += row->m_rowsize; continue; } src.push_front_force(row); // our list is full break; } if (cnt_out != 0 && ctl.m_dosignal) { // signal that we removed some rows from src src.signal(); } ctl.m_cnt_out += cnt_out; ctl.m_bytes_out += bytes_out; } /* * Transfer rows from our list to a shared list dst. If dst is * full, try to wait. Terminate if our list is empty. If any rows * were transferred, do not wait for more, and signal that dst now * has more rows. */ void NdbImportUtil::RowList::pop_front_to(RowList& dst, RowCtl& ctl) { uint retries = ctl.m_retries; uint cnt_out = 0; uint bytes_out = 0; if (unlikely(empty())) return; while (dst.full() && retries != 0) { if (ctl.m_dowait) dst.wait(ctl.m_timeout); retries--; } while (!empty()) { // pop because row cannot be on 2 lists Row* row = pop_front(); if (dst.push_back(row)) { cnt_out++; bytes_out += row->m_rowsize; continue; } push_front_force(row); // dst is full break; } if (cnt_out != 0 && ctl.m_dosignal) { // signal that we added some rows to dst dst.signal(); } ctl.m_cnt_out += cnt_out; ctl.m_bytes_out += bytes_out; } #if defined(VM_TRACE) || defined(TEST_NDBIMPORTUTIL) void NdbImportUtil::RowList::validate() const { List::validate(); if (m_cnt == 0) require(m_rowsize == 0); if (m_rowsize == 0) require(m_cnt == 0); #if defined(VM_TRACE) && defined(TEST_NDBIMPORTUTIL) uint rowsize = 0; const Row* row = static_cast(m_front); while (row != 0) { rowsize += row->m_rowsize; row = static_cast(row->m_next); } require(m_rowsize == rowsize); #endif } #endif // alloc and free shared rows NdbImportUtil::Row* NdbImportUtil::alloc_row(const Table& table, bool dolock) { RowList& rows = *c_rows_free; if (dolock) rows.lock(); Row* row = rows.pop_front(); if (dolock) rows.unlock(); if (row == 0) { row = new Row; } row->init(table); while (row->m_blobs.size() < table.m_blobids.size()) { Blob* blob = alloc_blob(); row->m_blobs.push_back(blob); } return row; } void NdbImportUtil::alloc_rows(const Table& table, uint cnt, RowList& dst) { RowList& rows = *c_rows_free; rows.lock(); for (uint i = 0; i < cnt; i++) { Row* row = alloc_row(table, false); dst.push_back_force(row); // ignore limits } rows.unlock(); } void NdbImportUtil::free_blobs_from_row(Row *row) { for (uint i = 0; i < row->m_blobs.size(); ++i) { Blob* blob = row->m_blobs[i]; if (blob != NULL) { free_blob(blob); } } row->m_blobs.clear(); } void NdbImportUtil::free_row(Row* row) { free_blobs_from_row(row); RowList& rows = *c_rows_free; rows.lock(); rows.push_back(row); rows.unlock(); } void NdbImportUtil::free_rows(RowList& src) { RowList blob_freed_rows; while (!src.empty()) { Row *one_row = src.pop_front(); free_blobs_from_row(one_row); blob_freed_rows.push_back(one_row); } RowList& rows = *c_rows_free; rows.lock(); rows.push_back_from(blob_freed_rows); rows.unlock(); } // blobs NdbImportUtil::Blob::Blob() { m_blobsize = 0; m_allocsize = 0; m_data = new uchar [0]; } NdbImportUtil::Blob::~Blob() { delete [] m_data; } NdbImportUtil::BlobList::BlobList() { } NdbImportUtil::BlobList::~BlobList() { Blob *blob = NULL; while ((blob = pop_front()) != NULL) { delete blob; } } void NdbImportUtil::Blob::resize(uint size) { if (m_allocsize < size) { delete [] m_data; m_data = new uchar [size]; m_allocsize = size; } } NdbImportUtil::Blob* NdbImportUtil::alloc_blob() { BlobList& blobs = *c_blobs_free; blobs.lock(); Blob* blob = static_cast(blobs.pop_front()); blobs.unlock(); if (blob == 0) { blob = new Blob; } return blob; } void NdbImportUtil::free_blob(Blob* blob) { BlobList& blobs = *c_blobs_free; blobs.lock(); blobs.push_back(blob); blobs.unlock(); } // rowmap NdbImportUtil::RowMap::RowMap(NdbImportUtil& util) : m_util(util) { } NdbImportUtil::Range::Range() { m_start = 0; m_end = 0; m_startpos = 0; m_endpos = 0; m_reject = 0; } NdbImportUtil::Range::~Range() { } void NdbImportUtil::Range::copy(const Range& range2) { m_start = range2.m_start; m_end = range2.m_end; m_startpos = range2.m_startpos; m_endpos = range2.m_endpos; m_reject = range2.m_reject; } /* * This is typically used by a worker to add a row to its * private rowmap. The row is likely to go near the end * so search is done backwards. */ void NdbImportUtil::RowMap::add(Range range2) { RangeList& ranges = m_ranges; Range* r2 = alloc_range(); r2->copy(range2); if (ranges.empty()) { ranges.push_back(r2); } else { Range* rback = ranges.back(); Range* rfront = ranges.front(); if (rback->m_start < r2->m_start) { if (merge_up(rback, r2)) { // rback grows up to include r2 free_range(r2); } else { ranges.push_back(r2); } } else if (r2->m_start < rfront->m_start) { if (merge_down(rfront, r2)) { // rfront grows down to include r2 free_range(r2); } else { ranges.push_front(r2); } } else { // r2 is between 2 entries rprev rnext Range* rprev = rback; Range* rnext = 0; while (1) { if (r2->m_start > rprev->m_start) { // found the place require(rnext != 0); if (merge_up(rprev, r2)) { // rprev grows up to include r2 free_range(r2); if (merge_up(rprev, rnext)) { // rprev and rnext have been joined via r2 // rnext is now obsolete ranges.remove(rnext); free_range(rnext); } } else if (merge_down(rnext, r2)) { // rnext grows down to include r2 free_range(r2); } else { // r2 becomes new entry after rprev ranges.push_after(rprev, r2); } break; } rnext = rprev; rprev = rprev->prev(); require(rprev != 0); } } } validate(); } /* * Merge from another rowmap. Walks through both maps * in ascending order. The argument map2 is not modified * here but is normally cleared afterwards by caller. */ void NdbImportUtil::RowMap::add(const RowMap& map2) { RangeList& ranges = m_ranges; const RangeList& ranges2 = map2.m_ranges; Range* r = ranges.front(); const Range* r2 = ranges2.front(); while (1) { if (r == 0) { // copy rest of map2 using our free ranges while (r2 != 0) { r = alloc_range(); r->copy(*r2); ranges.push_back(r); r2 = r2->next(); } break; } if (r2 == 0) { // nothing more to do break; } if (r->m_start < r2->m_start) { { Range* rnext = r->next(); if (rnext != 0 && rnext->m_start < r2->m_start) { // still below r2 r = rnext; continue; } } if (merge_up(r, r2)) { // r grows up to include r2 Range* rnext = r->next(); if (rnext != 0 && merge_up(r, rnext)) { // r and rnext have been joined via r2 // rnext is now obsolete ranges.remove(rnext); free_range(rnext); } // leave r unchanged as next r2 may also apply { const Range* r2next = r2->next(); // even in the join case r2next cannot overlap r if (r2next != 0) require(r->m_end <= r2next->m_start); } } else { // r2 creates new entry Range* rnext = alloc_range(); rnext->copy(*r2); ranges.push_after(r, rnext); require(rnext == r->next()); // move to the new entry r = r->next(); { Range* rnext = r->next(); if (rnext != 0 && merge_up(r, rnext)) { // r and rnext have been joined via r2 // rnext is now obsolete ranges.remove(rnext); free_range(rnext); } } // leave current r unchanged } // r2 has been consumed r2 = r2->next(); continue; } if (r->m_start > r2->m_start) { if (merge_down(r, r2)) { // r grows down to include r2 // no more entries below r but there can be one above } else { // r2 creates new entry Range* rprev = alloc_range(); rprev->copy(*r2); ranges.push_before(r, rprev); // can be more entries below r } // r2 has been consumed r2 = r2->next(); continue; } require(false); } validate(); } /* * find() and remove() are used only on --resume, which consumes * the old rowmap. They need not be afficient. */ NdbImportUtil::Range* NdbImportUtil::RowMap::find(uint64 rowid) { RangeList& ranges = m_ranges; Range* r = ranges.front(); while (r != 0) { if (r->m_start <= rowid && rowid < r->m_end) break; r = r->next(); } return r; } bool NdbImportUtil::RowMap::remove(uint64 rowid) { RangeList& ranges = m_ranges; Range* r = find(rowid); if (r == 0) return false; if (rowid == r->m_start) { r->m_start++; if (r->m_start == r->m_end) { ranges.remove(r); free_range(r); } } else if (rowid == r->m_end - 1) { r->m_end -= 1; require(r->m_start < r->m_end); } else { Range* r2 = alloc_range(); r2->m_start = rowid + 1; r2->m_end = r->m_end; r2->m_reject = 0; // not relevant require(r2->m_start < r2->m_end); r->m_end = rowid; require(r->m_start < r->m_end); ranges.push_after(r, r2); } return true; } void NdbImportUtil::RowMap::get_total(uint64& rows, uint64& reject) const { uint64 trows = 0; uint64 treject = 0; const Range* r = m_ranges.front(); while (r != 0) { trows += r->m_end - r->m_start - r->m_reject; treject += r->m_reject; r = r->next(); } rows = trows; reject = treject; } NdbImportUtil::Range* NdbImportUtil::alloc_range(bool dolock) { RangeList& ranges = c_ranges_free; if (dolock) ranges.lock(); Range* range = ranges.pop_front(); if (dolock) ranges.unlock(); if (range == 0) { range = new Range; } return range; } void NdbImportUtil::alloc_ranges(uint cnt, RangeList& dst) { RangeList& ranges = c_ranges_free; ranges.lock(); for (uint i = 0; i < cnt; i++) { Range* range = alloc_range(false); dst.push_back(range); } ranges.unlock(); } void NdbImportUtil::free_range(Range* range) { RangeList& ranges = c_ranges_free; ranges.lock(); ranges.push_back(range); ranges.unlock(); } void NdbImportUtil::free_ranges(RangeList& src) { RangeList& ranges = c_ranges_free; ranges.lock(); ranges.push_back_from(src); ranges.unlock(); } #if defined(VM_TRACE) || defined(TEST_NDBIMPORTUTIL) void NdbImportUtil::RowMap::validate() const { m_ranges.validate(); #if defined(VM_TRACE) && defined(TEST_NDBIMPORTUTIL) const RangeList& ranges = m_ranges; const Range* r2 = 0; const Range* r1 = ranges.front(); while (r1 != 0) { require(r1->m_start < r1->m_end); if (r2 != 0) require(r2->m_end < r1->m_start); r2 = r1; r1 = r1->next(); } #endif } #endif NdbOut& operator<<(NdbOut& out, const NdbImportUtil::Range& range) { out << "start=" << range.m_start << " end=" << range.m_end << " rows=" << range.m_end - range.m_start << " startpos=" << range.m_startpos << " endpos=" << range.m_endpos << " bytes=" << range.m_endpos - range.m_startpos << " reject=" << range.m_reject; return out; } NdbOut& operator<<(NdbOut& out, const NdbImportUtil::RowMap& rowmap) { const NdbImportUtil::RangeList& ranges = rowmap.m_ranges; const NdbImportUtil::Range* r = ranges.front(); uint i = 0; while (r != 0) { out << i << ": " << *r << endl; r = r->next(); i++; } return out; } // pseudo-tables void NdbImportUtil::add_pseudo_tables() { add_result_table(); add_reject_table(); add_rowmap_table(); add_stopt_table(); add_stats_table(); } void NdbImportUtil::add_result_table() { Table& table = c_result_table; table.m_tabid = g_result_tabid; require(table.m_recsize == 0); table.add_pseudo_attr("runno", NdbDictionary::Column::Unsigned); table.add_pseudo_attr("name", NdbDictionary::Column::Varchar, 10); table.add_pseudo_attr("desc", NdbDictionary::Column::Varchar, 100); table.add_pseudo_attr("result", NdbDictionary::Column::Unsigned); table.add_pseudo_attr("rows", NdbDictionary::Column::Bigunsigned); table.add_pseudo_attr("reject", NdbDictionary::Column::Bigunsigned); table.add_pseudo_attr("temperrors", NdbDictionary::Column::Bigunsigned); table.add_pseudo_attr("runtime", NdbDictionary::Column::Bigunsigned); table.add_pseudo_attr("utime", NdbDictionary::Column::Bigunsigned); add_error_attrs(table); } void NdbImportUtil::add_reject_table() { Table& table = c_reject_table; table.m_tabid = g_reject_tabid; require(table.m_recsize == 0); table.add_pseudo_attr("runno", NdbDictionary::Column::Unsigned); table.add_pseudo_attr("rowid", NdbDictionary::Column::Bigunsigned); table.add_pseudo_attr("linenr", NdbDictionary::Column::Bigunsigned); table.add_pseudo_attr("startpos", NdbDictionary::Column::Bigunsigned); table.add_pseudo_attr("endpos", NdbDictionary::Column::Bigunsigned); table.add_pseudo_attr("bytes", NdbDictionary::Column::Bigunsigned); add_error_attrs(table); table.add_pseudo_attr("reject", NdbDictionary::Column::Text); } void NdbImportUtil::add_rowmap_table() { Table& table = c_rowmap_table; table.m_tabid = g_rowmap_tabid; require(table.m_recsize == 0); table.add_pseudo_attr("runno", NdbDictionary::Column::Unsigned); table.add_pseudo_attr("start", NdbDictionary::Column::Bigunsigned); table.add_pseudo_attr("end", NdbDictionary::Column::Bigunsigned); table.add_pseudo_attr("rows", NdbDictionary::Column::Bigunsigned); table.add_pseudo_attr("startpos", NdbDictionary::Column::Bigunsigned); table.add_pseudo_attr("endpos", NdbDictionary::Column::Bigunsigned); table.add_pseudo_attr("bytes", NdbDictionary::Column::Bigunsigned); table.add_pseudo_attr("reject", NdbDictionary::Column::Bigunsigned); } void NdbImportUtil::add_stopt_table() { Table& table = c_stopt_table; table.m_tabid = g_stopt_tabid; require(table.m_recsize == 0); table.add_pseudo_attr("runno", NdbDictionary::Column::Unsigned); table.add_pseudo_attr("option", NdbDictionary::Column::Varchar, 100); table.add_pseudo_attr("value", NdbDictionary::Column::Unsigned); } void NdbImportUtil::add_stats_table() { Table& table = c_stats_table; table.m_tabid = g_stats_tabid; require(table.m_recsize == 0); table.add_pseudo_attr("runno", NdbDictionary::Column::Unsigned); table.add_pseudo_attr("id", NdbDictionary::Column::Unsigned); table.add_pseudo_attr("name", NdbDictionary::Column::Varchar, 100); table.add_pseudo_attr("parent", NdbDictionary::Column::Unsigned); table.add_pseudo_attr("obs", NdbDictionary::Column::Bigunsigned); table.add_pseudo_attr("sum", NdbDictionary::Column::Bigunsigned); table.add_pseudo_attr("mean", NdbDictionary::Column::Double); table.add_pseudo_attr("min", NdbDictionary::Column::Bigunsigned); table.add_pseudo_attr("max", NdbDictionary::Column::Bigunsigned); table.add_pseudo_attr("stddev", NdbDictionary::Column::Double); } #include void NdbImportUtil::add_error_attrs(Table& table) { table.add_pseudo_attr("errortype", NdbDictionary::Column::Varchar, 10); table.add_pseudo_attr("errorcode", NdbDictionary::Column::Unsigned); table.add_pseudo_attr("sourceline", NdbDictionary::Column::Unsigned); table.add_pseudo_attr("errortext", NdbDictionary::Column::Longvarchar, 1024); } void NdbImportUtil::set_result_row(Row* row, uint32 runno, const char* name, const char* desc, uint64 rows, uint64 reject, uint64 temperrors, uint64 runtime, uint64 utime, const Error& error) { const Table& table = c_result_table; const Attrs& attrs = table.m_attrs; uint id = 0; // runno { const Attr& attr = attrs[id]; attr.set_value(row, &runno, sizeof(runno)); id++; } // name { const Attr& attr = attrs[id]; attr.set_value(row, name, strlen(name)); id++; } // desc { const Attr& attr = attrs[id]; attr.set_value(row, desc, strlen(desc)); id++; } // result { const Attr& attr = attrs[id]; uint32 value = m_util.has_error(error); attr.set_value(row, &value, sizeof(value)); id++; } // rows { const Attr& attr = attrs[id]; attr.set_value(row, &rows, sizeof(rows)); id++; } // reject { const Attr& attr = attrs[id]; attr.set_value(row, &reject, sizeof(reject)); id++; } // temperrors { const Attr& attr = attrs[id]; attr.set_value(row, &temperrors, sizeof(temperrors)); id++; } // runtime { const Attr& attr = attrs[id]; attr.set_value(row, &runtime, sizeof(runtime)); id++; } // utime { const Attr& attr = attrs[id]; attr.set_value(row, &utime, sizeof(utime)); id++; } // error set_error_attrs(row, table, error, id); require(id == attrs.size()); } void NdbImportUtil::set_reject_row(Row* row, uint32 runno, const Error& error, const char* reject, uint rejectlen) { const Table& table = c_reject_table; const Attrs& attrs = table.m_attrs; uint id = 0; // runno { const Attr& attr = attrs[id]; attr.set_value(row, &runno, sizeof(runno)); id++; } // rowid { const Attr& attr = attrs[id]; attr.set_value(row, &row->m_rowid, sizeof(row->m_rowid)); id++; } // linenr { const Attr& attr = attrs[id]; attr.set_value(row, &row->m_linenr, sizeof(row->m_linenr)); id++; } // startpos { const Attr& attr = attrs[id]; attr.set_value(row, &row->m_startpos, sizeof(row->m_startpos)); id++; } // endpos { const Attr& attr = attrs[id]; attr.set_value(row, &row->m_endpos, sizeof(row->m_endpos)); id++; } // bytes { const Attr& attr = attrs[id]; uint64 bytes = row->m_endpos - row->m_startpos; attr.set_value(row, &bytes, sizeof(bytes)); id++; } // error set_error_attrs(row, table, error, id); // reject { const Attr& attr = attrs[id]; attr.set_blob(row, reject, rejectlen); id++; } require(id == attrs.size()); } void NdbImportUtil::set_rowmap_row(Row* row, uint32 runno, const Range& range) { const Table& table = c_rowmap_table; const Attrs& attrs = table.m_attrs; uint id = 0; // runno { const Attr& attr = attrs[id]; attr.set_value(row, &runno, sizeof(runno)); id++; } // start { const Attr& attr = attrs[id]; attr.set_value(row, &range.m_start, sizeof(range.m_start)); id++; } // end { const Attr& attr = attrs[id]; attr.set_value(row, &range.m_end, sizeof(range.m_end)); id++; } // rows { const Attr& attr = attrs[id]; uint64 rows = range.m_end - range.m_start; attr.set_value(row, &rows, sizeof(rows)); id++; } // startpos { const Attr& attr = attrs[id]; attr.set_value(row, &range.m_startpos, sizeof(range.m_startpos)); id++; } // end { const Attr& attr = attrs[id]; attr.set_value(row, &range.m_endpos, sizeof(range.m_endpos)); id++; } // bytes { const Attr& attr = attrs[id]; uint64 bytes = range.m_endpos - range.m_startpos; attr.set_value(row, &bytes, sizeof(bytes)); id++; } // reject { const Attr& attr = attrs[id]; attr.set_value(row, &range.m_reject, sizeof(range.m_reject)); id++; } require(id == attrs.size()); } void NdbImportUtil::set_stopt_row(Row* row, uint32 runno, const char* option, uint32 value) { const Table& table = c_stopt_table; const Attrs& attrs = table.m_attrs; uint id = 0; // runno { const Attr& attr = attrs[id]; attr.set_value(row, &runno, sizeof(runno)); id++; } // option { const Attr& attr = attrs[id]; uint len = strlen(option); attr.set_value(row, option, len); id++; } // value { const Attr& attr = attrs[id]; attr.set_value(row, &value, sizeof(value)); id++; } require(id == attrs.size()); } void NdbImportUtil::set_stats_row(Row* row, uint32 runno, const Stat& stat, bool global) { const Table& table = c_stats_table; const Attrs& attrs = table.m_attrs; const uint g_offset = !global ? 0 : 1000; const char* g_prefix = !global ? 0 : "g"; // floats double obsf = (double)stat.m_obs; double sum1 = stat.m_sum1; double sum2 = stat.m_sum2; uint id = 0; // runno { const Attr& attr = attrs[id]; attr.set_value(row, &runno, sizeof(runno)); id++; } // id { const Attr& attr = attrs[id]; uint idval = stat.m_id + g_offset; attr.set_value(row, &idval, sizeof(idval)); id++; } // name { const Attr& attr = attrs[id]; if (g_prefix == 0) { uint namelen = strlen(stat.m_name); attr.set_value(row, stat.m_name, namelen); } else { Name name(g_prefix, stat.m_name); uint namelen = strlen(name.str()); attr.set_value(row, name.str(), namelen); } id++; } // parent { const Attr& attr = attrs[id]; uint parentval = stat.m_id == 0 ? stat.m_parent : stat.m_parent + g_offset; attr.set_value(row, &parentval, sizeof(parentval)); id++; } // obs { const Attr& attr = attrs[id]; attr.set_value(row, &stat.m_obs, sizeof(stat.m_obs)); id++; } // sum { const Attr& attr = attrs[id]; attr.set_value(row, &stat.m_sum, sizeof(stat.m_sum)); id++; } // mean { const Attr& attr = attrs[id]; double mean = 0.0; if (stat.m_obs != 0) mean = sum1 / obsf; if (!std::isfinite(mean)) mean = 0.0; attr.set_value(row, &mean, sizeof(mean)); id++; } // min { const Attr& attr = attrs[id]; attr.set_value(row, &stat.m_min, sizeof(stat.m_min)); id++; } // max { const Attr& attr = attrs[id]; attr.set_value(row, &stat.m_max, sizeof(stat.m_max)); id++; } // stddev { const Attr& attr = attrs[id]; double stddev = 0.0; if (stat.m_obs != 0) stddev = ::sqrt((obsf * sum2 - (sum1 * sum1)) / (obsf * obsf)); if (!std::isfinite(stddev)) stddev = 0.0; attr.set_value(row, &stddev, sizeof(stddev)); id++; } require(id == attrs.size()); } void NdbImportUtil::set_error_attrs(Row* row, const Table& table, const Error& error, uint& id) { const Attrs& attrs = table.m_attrs; // errortype { const Attr& attr = attrs[id]; const char* errortype = error.gettypetext(); attr.set_value(row, errortype, strlen(errortype)); id++; } // errorcode { const Attr& attr = attrs[id]; int32 errorcode = error.code; attr.set_value(row, &errorcode, sizeof(errorcode)); id++; } // sourceline { const Attr& attr = attrs[id]; uint32 errorline = error.line; attr.set_value(row, &errorline, sizeof(errorline)); id++; } // errortext { const Attr& attr = attrs[id]; attr.set_value(row, error.text, strlen(error.text)); id++; } } // buf NdbImportUtil::Buf::Buf(bool split) : m_split(split) { m_allocptr = 0; m_allocsize = 0; m_data = 0; m_size = 0; m_top = 0; m_start = 0; m_tail = 0; m_len = 0; m_eof = false; m_pos = 0; m_lineno = 0; } NdbImportUtil::Buf::~Buf() { delete [] m_allocptr; } void NdbImportUtil::Buf::alloc(uint pagesize, uint pagecnt) { require(m_allocptr == 0); require(pagesize != 0 && (pagesize & (pagesize - 1)) == 0); require(pagecnt != 0); uint size = pagesize * pagecnt; uint allocsize = size + (pagesize - 1) + 1; uchar* allocptr = new uchar [allocsize]; uchar* data = allocptr; uint misalign = (UintPtr)data & (pagesize - 1); if (misalign != 0) { data += (pagesize - misalign); uint misalign2 = (UintPtr)data & (pagesize - 1); require(misalign2 == 0); } require(data + size < allocptr + allocsize); m_allocptr = allocptr; m_allocsize = allocsize; m_data = data; m_size = size; m_top = 0; m_start = 0; m_tail = 0; m_len = 0; if (m_split) { require(pagecnt % 2 == 0); m_top = size / 2; m_start = m_top; } } void NdbImportUtil::Buf::copy(const uchar* src, uint len) { require(m_start + m_len + len < m_allocsize); memcpy(&m_data[m_start + m_len], src, len); m_len += len; m_data[m_start + m_len] = 0; } void NdbImportUtil::Buf::reset() { m_start = 0; if (m_split) { require(2 * m_top == m_size); m_start = m_top; } m_tail = 0; m_len = 0; m_eof = false; m_pos = 0; m_lineno = 0; } int NdbImportUtil::Buf::movetail(Buf& dst) { require(m_tail <= m_len); uint bytes = m_len - m_tail; if (bytes > dst.m_start) { return -1; } const uchar* srcptr = &m_data[m_start + m_tail]; uchar* dstptr = &dst.m_data[dst.m_start - bytes]; memcpy(dstptr, srcptr, bytes); m_len = m_tail; dst.m_start -= bytes; dst.m_len += bytes; return 0; } NdbOut& operator<<(NdbOut& out, const NdbImportUtil::Buf& buf) { out << "allocsize=" << buf.m_allocsize; out << " size=" << buf.m_size; out << " top=" << buf.m_top; out << " start=" << buf.m_start; out << " tail=" << buf.m_tail; out << " len=" << buf.m_len; out << " eof=" << buf.m_eof; const uchar* dataptr = &buf.m_data[buf.m_start]; { char dst[100]; uint len = buf.m_len; if (len > 10) len = 10; NdbImportUtil::pretty_print(dst, &dataptr[0], len); out << " buf=" << "0:" << dst; } { char dst[100]; require(buf.m_len >= buf.m_pos); uint n = buf.m_len - buf.m_pos; if (n > 10) n = 10; NdbImportUtil::pretty_print(dst, &dataptr[buf.m_pos], n); out << " pos=" << buf.m_pos << ":" << dst; } { out << " lineno=" << buf.m_lineno; } return out; } void NdbImportUtil::pretty_print(char* dst, const void* ptr, uint len) { const char* p = (const char*)ptr; char *s = dst; for (uint i = 0; i < len; i++) { int c = p[i]; if (isascii(c) && isprint(c)) sprintf(s, "%c", c); else if (c == '\n') sprintf(s, "%s", "\\n"); else sprintf(s, "<%02X>", (uchar)c); s += strlen(s); } } // files NdbImportUtil::File::File(NdbImportUtil& util, Error& error) : m_util(util), m_error(error) { m_fd = -1; m_flags = 0; } NdbImportUtil::File::~File() { if (m_fd != -1) (void)::close(m_fd); } int NdbImportUtil::File::do_open(int flags) { const char* path = get_path(); require(m_fd == -1); #ifndef _WIN32 int fd = ::open(path, flags, Creat_mode); #else int fd = ::_open(path, flags, Creat_mode); #endif if (fd == -1) { const char* type = "unknown"; if (flags == Read_flags) type = "read"; if (flags == Write_flags) type = "write"; if (flags == Append_flags) type = "append"; m_util.set_error_os(m_error, __LINE__, "%s: open for %s failed", path, type); return -1; } m_fd = fd; m_flags = flags; return 0; } int NdbImportUtil::File::do_read(uchar* dst, uint size, uint& len) { const char* path = get_path(); require(m_fd != -1); len = 0; while (len < size) { // short read is possible on pipe #ifndef _WIN32 int ret = ::read(m_fd, dst + len, size - len); #else int ret = ::_read(m_fd, dst + len, size - len); #endif if (ret == -1) { m_util.set_error_os(m_error, __LINE__, "%s: read %u bytes failed", path, size - len); return -1; } uint n = (uint)ret; if (n == 0) break; len += n; require(len <= size); } return 0; } int NdbImportUtil::File::do_read(Buf& buf) { uint dstpos = buf.m_start + buf.m_len; require(dstpos == buf.m_top); require(dstpos <= buf.m_size); uchar* dst = buf.m_data + dstpos; uint size = buf.m_size - dstpos; uint len = 0; if (do_read(dst, size, len) == -1) return -1; buf.m_eof = (len == 0); buf.m_len += len; uint endpos = buf.m_start + buf.m_len; require(endpos <= buf.m_size); require(endpos < buf.m_allocsize); buf.m_data[endpos] = 0; return 0; } int NdbImportUtil::File::do_write(const uchar* src, uint size) { const char* path = get_path(); require(m_fd != -1); #ifndef _WIN32 int ret = ::write(m_fd, src, size); #else int ret = ::_write(m_fd, src, size); #endif if (ret == -1) { m_util.set_error_os(m_error, __LINE__, "%s: write %u bytes failed", path, size); return -1; } // short write is considered error if (uint(ret) != size) { m_util.set_error_os(m_error, __LINE__, "%s: short write %u < %u", path, (uint)ret, size); return -1; } return 0; } int NdbImportUtil::File::do_write(const Buf& buf) { const uchar* src = &buf.m_data[buf.m_start]; uint len = buf.m_len; if (do_write(src, len) == -1) return -1; return 0; } int NdbImportUtil::File::do_close() { const char* path = get_path(); if (m_fd == -1) return 0; #ifndef _WIN32 if (::close(m_fd) == -1) #else if (::_close(m_fd) == -1) #endif { m_util.set_error_os(m_error, __LINE__, "%s: close failed", path); return -1; } m_fd = -1; return 0; } int NdbImportUtil::File::do_seek(uint64 offset) { const char* path = get_path(); require(m_fd != -1); #ifndef _WIN32 off_t off = (off_t)offset; if (::lseek(m_fd, off, SEEK_SET) == -1) #else __int64 off = (__int64)offset; if (::_lseeki64(m_fd, off, SEEK_SET) == -1) #endif { m_util.set_error_os(m_error, __LINE__, "%s: lseek %" PRIu64 " failed", path, offset); return -1; } return 0; } // stats NdbImportUtil::Stat::Stat(Stats& stats, uint id, const char* name, uint parent, uint level, uint flags) : m_stats(stats), m_id(id), m_name(name), m_parent(parent), m_level(level), m_flags(flags) { m_childcnt = 0; m_firstchild = StatNULL; m_lastchild = StatNULL; m_nextchild = StatNULL; if (m_parent != StatNULL) { Stat* parentstat = m_stats.get(m_parent); if (parentstat->m_childcnt == 0) { parentstat->m_firstchild = m_id; parentstat->m_lastchild = m_id; } else { Stat* lastchildstat = m_stats.get(parentstat->m_lastchild); require(lastchildstat->m_nextchild == StatNULL); lastchildstat->m_nextchild = m_id; parentstat->m_lastchild = m_id; } parentstat->m_childcnt++; } reset(); } void NdbImportUtil::Stat::add(uint64 val) { uint id = m_id; do { Stat* stat = m_stats.get(id); stat->m_obs++; stat->m_sum += val; if (stat->m_obs == 1) { stat->m_min = val; stat->m_max = val; } else { if (stat->m_min > val) stat->m_min = val; if (stat->m_max < val) stat->m_max = val; } stat->m_sum1 += double(val); stat->m_sum2 += double(val) * double(val); id = stat->m_parent; // root level stats are not useful } while (id != 0); } void NdbImportUtil::Stat::reset() { m_obs = 0; m_sum = 0; m_min = 0; m_max = 0; m_sum1 = 0.0; m_sum2 = 0.0; } NdbImportUtil::Stats::Stats(NdbImportUtil& util) : m_util(util) { Stat* rootstat = new Stat(*this, 0, "root", StatNULL, 0, 0); m_stats.push_back(rootstat); validate(); } NdbImportUtil::Stats::~Stats() { for (uint i = 0; i < m_stats.size(); i++) { Stat* stat = get(i); delete stat; } } NdbImportUtil::Stat* NdbImportUtil::Stats::create(const char* name, uint parent, uint flags) { lock(); { Stat* stat = find(name); if (stat != 0) { log_debug(2, "use existing " << stat->m_name << " id=" << stat->m_id); unlock(); return stat; } } Stat* parentstat = get(parent); uint parentlevel = parentstat->m_level; uint id = m_stats.size(); Stat* stat = new Stat(*this, id, name, parent, parentlevel + 1, flags); m_stats.push_back(stat); log_debug(2, "created stat id=" << stat->m_id << " name=" << stat->m_name); validate(); unlock(); return stat; } NdbImportUtil::Stat* NdbImportUtil::Stats::get(uint i) { require(i < m_stats.size()); Stat* stat = m_stats[i]; require(stat != 0); require(stat->m_id == i); return stat; } const NdbImportUtil::Stat* NdbImportUtil::Stats::get(uint i) const { require(i < m_stats.size()); const Stat* stat = m_stats[i]; require(stat != 0); require(stat->m_id == i); return stat; } NdbImportUtil::Stat* NdbImportUtil::Stats::find(const char* name) const { for (uint i = 0; i < m_stats.size(); i++) { Stat* stat = m_stats[i]; require(stat != 0); if (strcmp(stat->m_name, name) == 0) return stat; } return 0; } void NdbImportUtil::Stats::add(uint id, uint64 val) { Stat* stat = get(id); stat->add(val); } const NdbImportUtil::Stat* NdbImportUtil::Stats::next(uint id) const { require(id < m_stats.size()); const Stat* stat = m_stats[id]; require(stat != 0); if (stat->m_firstchild != StatNULL) { stat = get(stat->m_firstchild); return stat; } while (1) { if (stat->m_nextchild != StatNULL) { stat = get(stat->m_nextchild); return stat; } if (stat->m_parent == StatNULL) { break; } stat = get(stat->m_parent); } return 0; } void NdbImportUtil::Stats::reset() { for (uint i = 0; i < m_stats.size(); i++) { Stat* stat = get(i); stat->reset(); } } #if defined(VM_TRACE) || defined(TEST_NDBIMPORTUTIL) void NdbImportUtil::Stats::validate() const { bool* seen = new bool [m_stats.size()]; Validate v(StatNULL, 0, 0, seen); for (uint i = 0; i < m_stats.size(); i++) v.m_seen[i] = false; (void)validate(v); for (uint i = 0; i < m_stats.size(); i++) require(v.m_seen[i] == true); delete [] seen; } const NdbImportUtil::Stat* NdbImportUtil::Stats::validate(Validate& v) const { const Stat* stat = get(v.m_id); require(stat->m_parent == v.m_parent); require(stat->m_id == v.m_id); require(stat->m_level == v.m_level); const Stat* stat2 = find(stat->m_name); require(stat == stat2); require(v.m_seen[v.m_id] == false); v.m_seen[v.m_id] = true; uint sibling = stat->m_nextchild; if (sibling != StatNULL) { Validate v2(v.m_parent, sibling, v.m_level, v.m_seen); validate(v2); } uint child = stat->m_firstchild; if (child != StatNULL) { Validate v2(v.m_id, child, v.m_level + 1, v.m_seen); validate(v2); } return stat; } #endif NdbOut& operator<<(NdbOut& out, const NdbImportUtil::Stats& stats) { out << "stats "; return out; } // timer NdbImportUtil::Timer::Timer() { // make sure initialized to avoid assert start(); stop(); } void NdbImportUtil::Timer::start() { m_start = NdbTick_getCurrentTicks(); } void NdbImportUtil::Timer::stop() { m_stop = NdbTick_getCurrentTicks(); if (NdbTick_Compare(m_start, m_stop) > 0) { // not worth crashing m_start = m_stop; } struct ndb_rusage ru; if (Ndb_GetRUsage(&ru, false) == 0) { m_utime_msec = ru.ru_utime / 1000; m_stime_msec = ru.ru_stime / 1000; } } uint64 NdbImportUtil::Timer::elapsed_sec() const { return NdbTick_Elapsed(m_start, m_stop).seconds(); } uint64 NdbImportUtil::Timer::elapsed_msec() const { return NdbTick_Elapsed(m_start, m_stop).milliSec(); } uint64 NdbImportUtil::Timer::elapsed_usec() const { return NdbTick_Elapsed(m_start, m_stop).microSec(); } // error void NdbImportUtil::set_error_gen(Error& error, int line, const char* fmt, ...) { c_error_lock.lock(); new (&error) Error; error.line = line; error.type = Error::Type_gen; if (fmt != 0) { va_list ap; va_start(ap, fmt); vsnprintf(error.text, sizeof(error.text), fmt, ap); va_end(ap); } log_debug(1, "E " << error); if (c_opt.m_abort_on_error) abort(); c_error_lock.unlock(); } void NdbImportUtil::set_error_usage(Error& error, int line, const char* fmt, ...) { c_error_lock.lock(); new (&error) Error; error.line = line; error.type = Error::Type_usage; if (fmt != 0) { va_list ap; va_start(ap, fmt); vsnprintf(error.text, sizeof(error.text), fmt, ap); va_end(ap); } log_debug(1, "E " << error); if (c_opt.m_abort_on_error) abort(); c_error_lock.unlock(); } void NdbImportUtil::set_error_alloc(Error& error, int line) { c_error_lock.lock(); new (&error) Error; error.line = line; error.type = Error::Type_alloc; log_debug(1, "E " << error); if (c_opt.m_abort_on_error) abort(); c_error_lock.unlock(); } void NdbImportUtil::set_error_mgm(Error& error, int line, NdbMgmHandle handle) { c_error_lock.lock(); new (&error) Error; error.line = line; error.type = Error::Type_mgm; error.code = ndb_mgm_get_latest_error(handle); snprintf(error.text, sizeof(error.text), "%s", ndb_mgm_get_latest_error_msg(handle)); log_debug(1, "E " << error); if (c_opt.m_abort_on_error) abort(); c_error_lock.unlock(); } void NdbImportUtil::set_error_con(Error& error, int line, const Ndb_cluster_connection* con) { c_error_lock.lock(); new (&error) Error; error.line = line; error.type = Error::Type_con; error.code = con->get_latest_error(); snprintf(error.text, sizeof(error.text), "%s", con->get_latest_error_msg()); log_debug(1, "E " << error); if (c_opt.m_abort_on_error) abort(); c_error_lock.unlock(); } void NdbImportUtil::set_error_ndb(Error& error, int line, const NdbError& ndberror, const char* fmt, ...) { c_error_lock.lock(); new (&error) Error; error.line = line; error.type = Error::Type_ndb; error.code = ndberror.code; snprintf(error.text, sizeof(error.text), "%s", ndberror.message); log_debug(1, "E " << error); if (c_opt.m_abort_on_error) abort(); c_error_lock.unlock(); } void NdbImportUtil::set_error_os(Error& error, int line, const char* fmt, ...) { c_error_lock.lock(); new (&error) Error; error.line = line; error.type = Error::Type_os; error.code = errno; if (fmt != 0) { va_list ap; va_start(ap, fmt); vsnprintf(error.text, sizeof(error.text), fmt, ap); va_end(ap); } { uint len = strlen(error.text); if (len < sizeof(error.text)) { uint left = sizeof(error.text) - len; snprintf(&error.text[len], left, ": errno=%u: %s", errno, strerror(errno)); } } log_debug(1, "E " << error); if (c_opt.m_abort_on_error) abort(); c_error_lock.unlock(); } void NdbImportUtil::set_error_data(Error& error, int line, int code, const char* fmt, ...) { c_error_lock.lock(); new (&error) Error; error.line = line; error.type = Error::Type_data; error.code = code; if (fmt != 0) { va_list ap; va_start(ap, fmt); vsnprintf(error.text, sizeof(error.text), fmt, ap); va_end(ap); } log_debug(1, "E " << error); if (c_opt.m_abort_on_error) abort(); c_error_lock.unlock(); } void NdbImportUtil::copy_error(Error& error, const Error& error2) { error.type = error2.type; error.code = error2.code; error.line = error2.line; memcpy(error.text, error2.text, sizeof(error.text)); } bool NdbImportUtil::g_stop_all = false; void NdbImportUtil::fmt_msec_to_hhmmss(char* str, uint64 msec) { uint sec = msec / 1000; uint hh = sec / 3600; sec -= hh * 3600; uint mm = sec / 60; sec -= mm * 60; uint ss = sec; uint ff = msec - sec * 1000; (void)ff; sprintf(str, "%uh%um%us", hh, mm, ss); } // unittest #ifdef TEST_NDBIMPORTUTIL typedef NdbImportUtil::ListEnt UtilListEnt; typedef NdbImportUtil::List UtilList; typedef NdbImportUtil::Table UtilTable; typedef NdbImportUtil::RowCtl UtilRowCtl; typedef NdbImportUtil::Row UtilRow; typedef NdbImportUtil::RowList UtilRowList; typedef NdbImportUtil::Range UtilRange; typedef NdbImportUtil::RangeList UtilRangeList; typedef NdbImportUtil::RowMap UtilRowMap; typedef NdbImportUtil::Buf UtilBuf; typedef NdbImportUtil::File UtilFile; typedef NdbImportUtil::Name UtilName; typedef NdbImportUtil::Stat UtilStat; typedef NdbImportUtil::Stats UtilStats; #include #include #include // increase size of some tests if release-compiled static bool mybigtest = false; static uint myrandom() { return (uint)ndb_rand() * (uint)ndb_rand(); // < 2**30 } static uint myrandom(uint m) { require(m != 0); uint n = myrandom(); return n % m; } struct MyRec : UtilListEnt { uint m_index; bool m_member; MyRec() { m_index = Inval_uint; m_member = false; } }; struct MyRecs : UtilList { }; static int testlist() { ndbout << "testlist" << endl; NdbImportUtil util; util.c_opt.m_log_level = 3; MyRecs recs; const uint poolsize = 256; MyRec* recpool = new MyRec [poolsize]; for (uint n = 0; n < poolsize; n++) { MyRec* rec = &recpool[n]; rec->m_index = n; } const uint numops = 1024 * poolsize; uint ops[5]; ops[0] = 0; // push back ops[1] = 0; // pop front ops[2] = 0; // remove ops[3] = 0; // push after ops[4] = 0; // push before uint max_occup = 0; for (uint numop = 0; numop < numops; numop++) { MyRec* rec = 0; while (1) { uint n = myrandom(poolsize); rec = &recpool[n]; if (rec->m_member) { if (myrandom(100) < 80) continue; } break; } if (!rec->m_member) { recs.push_back(rec); rec->m_member = true; ops[0]++; } else if (myrandom(100) < 50) { MyRec* rec = static_cast(recs.pop_front()); require(rec != 0); rec->m_member = false; ops[1]++; } else if (myrandom(100) < 50) { recs.remove(rec); rec->m_member = false; ops[2]++; } else { uint n2 = myrandom(poolsize); MyRec* rec2 = &recpool[n2]; if (!rec2->m_member) { if (myrandom(100) < 50) { recs.push_after(rec, rec2); rec2->m_member = true; ops[3]++; } else { recs.push_before(rec, rec2); rec2->m_member = true; ops[4]++; } } } if (max_occup < recs.m_cnt) max_occup = recs.m_cnt; } uint last_occup = recs.m_cnt; for (MyRec* rec = static_cast(recs.m_front); rec != 0; rec = static_cast(rec->m_next)) { uint n = rec->m_index; require(n < poolsize); require(rec == &recpool[n]); require(rec->m_member); } for (uint n = 0; n < poolsize; n++) { MyRec* rec = &recpool[n]; if (rec->m_member) { recs.remove(rec); rec->m_member = false; } } require(recs.m_cnt == 0); uint ins = ops[0] + ops[3] + ops[4]; uint del = ops[1] + ops[2]; require(last_occup == ins - del); delete [] recpool; ndbout << "max_occup=" << max_occup << endl; ndbout << "last_occup=" << last_occup << endl; ndbout << "push_back: " << ops[0] << endl; ndbout << "pop_front: " << ops[1] << endl; ndbout << "remove: " << ops[2] << endl; ndbout << "push_after: " << ops[3] << endl; ndbout << "push_before: " << ops[4] << endl; return 0; } static int testrowlist1() { ndbout << "testrowlist1" << endl; NdbImportUtil util; UtilTable table; table.add_pseudo_attr("a", NdbDictionary::Column::Unsigned); table.add_pseudo_attr("b", NdbDictionary::Column::Varchar, 10); const uint loops = !mybigtest ? 100 : 1000; const uint rows = !mybigtest ? 1000 : 10000; for (uint loop = 0; loop < loops; loop++) { UtilRowList list1; UtilRowList list2; while (list1.cnt() < rows) { uint cnt = 1 + myrandom(rows - list1.cnt()); util.alloc_rows(table, cnt, list1); } require(list1.cnt() == rows); list2.push_back_from(list1); require(list1.cnt() == 0); require(list2.cnt() == rows); util.free_rows(list2); require(list2.cnt() == 0); require(util.c_rows_free->cnt() == rows); } return 0; } static int testrowlist2() { ndbout << "testrowlist2" << endl; NdbImportUtil util; UtilTable table; table.add_pseudo_attr("a", NdbDictionary::Column::Unsigned); table.add_pseudo_attr("b", NdbDictionary::Column::Varchar, 10); const uint loops = !mybigtest ? 100 : 1000; const uint rows = !mybigtest ? 1000 : 10000; { UtilRowList list; util.alloc_rows(table, rows, list); util.free_rows(list); } for (uint loop = 0; loop < loops; loop++) { UtilRowList list1; UtilRowList list2; list1.m_rowbatch = 1 + myrandom(rows); list2.m_rowbatch = 1 + myrandom(rows); const uint cnt = myrandom(rows + 1); util.alloc_rows(table, cnt, list1); require(list1.cnt() == cnt); do { uint timeout = myrandom(10) != 0 ? 0 : 10; UtilRowCtl ctl(timeout); list2.push_back_from(list1, ctl); util.free_rows(list2); } while (list1.cnt() != 0); require(util.c_rows_free->cnt() == rows); util.alloc_rows(table, cnt, list1); require(list1.cnt() == cnt); do { uint timeout = myrandom(10) != 0 ? 0 : 10; UtilRowCtl ctl(timeout); list1.pop_front_to(list2, ctl); util.free_rows(list2); } while (list1.cnt() != 0); require(util.c_rows_free->cnt() == rows); } return 0; } static int testrowmap1() { ndbout << "testrowmap1" << endl; NdbImportUtil util; /* * Create random ascending ranges. This does not represent * a valid rowmap because gaps between ranges can be zero. */ const uint maxranges = 1000; const uint numranges = myrandom(maxranges); ndbout << "numranges = " << numranges << endl; const uint maxgap = 5; const uint maxcount = 10; const uint maxrowid = maxranges * (maxcount + maxgap); UtilRange tstranges[maxranges]; bool rowexist[maxrowid]; uint toprowid = 0; for (uint k = 0; k < maxrowid; k++) rowexist[k] = false; { uint start = 0; for (uint i = 0; i < numranges; i++) { UtilRange& r = tstranges[i]; uint gap = myrandom(maxgap + 1); uint count = 1 + myrandom(maxcount); r.m_start = start + gap; r.m_end = r.m_start + count; r.m_reject = myrandom(1 + count); start = r.m_end; for (uint k = r.m_start; k < r.m_end; k++) rowexist[k] = true; toprowid = r.m_end; } } ndbout << "toprowid=" << toprowid << endl; // ndbout << "map1: create in ascending order" << endl; UtilRowMap map1(util); for (uint i = 0; i < numranges; i++) { UtilRange r = tstranges[i]; map1.add(r); } ndbout << "map1: " << map1.size() << endl; for (uint k = 0; k < toprowid; k++) { if (rowexist[k]) require(map1.find(k) != 0); else require(map1.find(k) == 0); } uint reorder[maxranges]; { for (uint i = 0; i < numranges; i++) reorder[i] = i; for (uint i = 0; i < numranges; i++) { uint j = myrandom(numranges); uint k = reorder[i]; reorder[i] = reorder[j]; reorder[j] = k; } } // ndbout << "map2: create in random order" << endl; UtilRowMap map2(util); for (uint i = 0; i < numranges; i++) { uint j = reorder[i]; UtilRange r = tstranges[j]; map2.add(r); } ndbout << "map2: " << map2.size() << endl; require(map1.equal(map2)); // ndbout << "map3: create from 2 random pieces" << endl; UtilRowMap map3(util); UtilRowMap map3a(util); UtilRowMap map3b(util); for (uint i = 0; i < numranges; i++) { UtilRange r = tstranges[i]; if (myrandom(100) < 50) map3a.add(r); else map3b.add(r); } ndbout << "map3a: " << map3a.size() << endl; //ndbout << map3a; ndbout << "map3b: " << map3b.size() << endl; //ndbout << map3b; ndbout << "add map3a" << endl; map3.add(map3a); ndbout << "add map3b" << endl; map3.add(map3b); ndbout << "map3: " << map3.size() << endl; //ndbout << map3; require(map1.equal(map3)); // ndbout << "map4: delete all in random order" << endl; UtilRowMap map4(util); map4.add(map1); ndbout << "map4: " << map4.size() << endl; bool rowexist4[maxrowid]; for (uint k = 0; k < maxrowid; k++) rowexist4[k] = rowexist[k]; while (map4.size() != 0) { uint k = myrandom(toprowid); if (rowexist4[k]) { require(map4.remove(k) == true); rowexist4[k] = false; } } ndbout << "map4: " << map4.size() << endl; return 0; } // an old "intrusive" test, fix later static int testrowmap2() { #if 0 ndbout << "testrowmap2" << endl; const uint maxranges = 10000; // ndbout << "map1: create manually" << endl; UtilRowMap map1; UtilRowMap::Ranges& ranges1 = map1.m_ranges; { uint64 start = 0; for (uint i = 0; i < maxranges; i++) { UtilRowMap::Range r; uint64 gap = myrandom(10); uint64 count = 1 + myrandom(10); r.m_start = start + gap; r.m_end = r.m_start + count; r.m_reject = myrandom(1 + count / 10); ranges1.push_back(r); start = r.m_end; } } ndbout << "map1: size " << ranges1.size() << endl; require(ranges1.size() == maxranges); // ndbout << "map2: create via merge from map1" << endl; UtilRowMap map2; UtilRowMap::Ranges& ranges2 = map2.m_ranges; { uint i = 0; while (i < maxranges) { UtilRowMap::Range r = ranges1[i]; uint j = i + 1; while (j < maxranges) { UtilRowMap::Range r2 = ranges1[j]; if (r.m_end < r2.m_start) break; r.m_end += r2.m_end - r2.m_start; r.m_reject += r2.m_reject; j++; } ranges2.push_back(r); i = j; } } ndbout << "map2: size " << ranges2.size() << endl; // ndbout << "map3: create via util from map1" << endl; UtilRowMap map3; UtilRowMap::Ranges& ranges3 = map3.m_ranges; { uint* reorder = new uint [maxranges]; for (uint i = 0; i < maxranges; i++) { reorder[i] = i; } for (uint i = 0; i < maxranges; i++) { if (myrandom(10) == 0) { uint m = 100; if (m > maxranges - i) m = maxranges - i; uint j = i + myrandom(m); // std::swap uint k = reorder[i]; reorder[i] = reorder[j]; reorder[j] = k; } } for (uint i = 0; i < maxranges; i++) { uint j = reorder[i]; UtilRowMap::Range r = ranges1[j]; map3.add(r); } delete [] reorder; } ndbout << "map3: size " << ranges3.size() << endl; // ndbout << "map2 vs map3: verify" << endl; require(ranges2.size() == ranges3.size()); for (uint i = 0; i < ranges2.size(); i++) { const UtilRowMap::Range r2 = ranges2[i]; const UtilRowMap::Range r3 = ranges3[i]; require(r2.m_start == r3.m_start); require(r2.m_end == r3.m_end); require(r2.m_reject == r2.m_reject); } // ndbout << "mark existing" << endl; struct Mark { Mark(uint64 size) { m_mark = new bool [size]; m_size = size; for (uint64 i = 0; i < m_size; i++) m_mark[i] = false; m_cnt = 0; } ~Mark() { delete [] m_mark; }; void set(uint64 pos, uint64 cnt) { require(pos + cnt <= m_size); for(uint64 i = pos; i < pos + cnt; i++) { require(m_mark[i] == false); m_mark[i] = true; m_cnt++; } } bool get(uint64 pos) { require(pos < m_size); return m_mark[pos]; } void del(uint64 pos) { require(pos < m_size); require(m_mark[pos] == true); m_mark[pos] = false; require(m_cnt != 0); m_cnt--; } bool* m_mark; uint64 m_size; uint64 m_cnt; }; uint64 maxid = ranges3.back().m_end; Mark mark(maxid); for (uint i = 0; i < ranges3.size(); i++) { const UtilRowMap::Range r3 = ranges3[i]; mark.set(r3.m_start, r3.m_end - r3.m_start); } ndbout << "ids: size=" << mark.m_size << " cnt=" << mark.m_cnt << endl; // ndbout << "test find" << endl; for (uint64 id = 0; id < maxid; id++) { UtilRowMap::Iterator it; if (mark.get(id) == false) require(map3.find(id, it) == false); if (mark.get(id) == true) require(map3.find(id, it) == true); } // ndbout << "test remove: random 50%" << endl; uint64 oldcnt = mark.m_cnt; while (oldcnt < mark.m_cnt * 2) { uint64 id = myrandom() % mark.m_size; if (mark.get(id)) { map3.remove(id); mark.del(id); } } // ndbout << "test remove: rest " << mark.m_cnt << endl; for (uint64 id = 0; id < mark.m_size; id++) { if (mark.get(id)) { map3.remove(id); mark.del(id); } } require(map3.empty()); require(mark.m_cnt == 0); #endif return 0; } static int testbuf() { ndbout << "testbuf" << endl; uint loops = 128 * 1024; uint min_off = UINT_MAX; uint max_off = 0; for (uint n = 0; n < loops; n++) { UtilBuf* buf = new UtilBuf; uint pagesize_log2 = 1 + myrandom(15); uint pagesize = (1 << pagesize_log2); uint pagecnt = 1 + myrandom(1024); buf->alloc(pagesize, pagecnt); uint off = buf->m_data - buf->m_allocptr; if (min_off > off) min_off = off; if (max_off < off) max_off = off; delete buf; } ndbout << "min_off=" << min_off << " max_off=" << max_off << endl; return 0; } static int testprint() { ndbout << "testprint" << endl; uchar buf[256]; char dst[5 * 256]; for (uint i = 0; i < 256; i++) buf[i] = i; NdbImportUtil::pretty_print(dst, buf, 256); ndbout << dst << endl; return 0; } static int testfile() { ndbout << "testfile" << endl; NdbImportUtil util; const char* path = "test.csv"; struct stat st; if (stat(path, &st) == -1) { ndbout << path << ": skip on errno " << errno << endl; return 0; } for (int split = 0; split <= 1; split++) { ndbout << "read " << path << " buf split=" << split << endl; UtilBuf buf(split); buf.alloc(4096, 8); UtilFile file(util, util.c_error); file.set_path(path); require(file.do_open(UtilFile::Read_flags) == 0); uint totlen = 0; uint totread = 0; while (1) { buf.reset(); int ret = file.do_read(buf); require(ret == 0); if (buf.m_eof) break; totlen += buf.m_len; totread++; } require(totlen == st.st_size); ndbout << "len=" << totlen << " reads=" << totread << endl; require(file.do_close() == 0); } return 0; } static int teststat() { ndbout << "teststat" << endl; NdbImportUtil util; util.c_opt.m_log_level = 3; UtilStats stats(util); static const uint stattot = 256; uint statcnt = stats.m_stats.size(); require(statcnt == 1); { const UtilStat* stat = stats.find("root"); require(stat != 0); require(stat->m_id == 0); } for (uint i = 1; i < stattot; i++) { UtilName name("test", i); uint parent = myrandom(statcnt); uint flags = 0; const UtilStat* stat = stats.create(name, parent, flags); require(stat != 0); require(stat->m_id == statcnt); require(strcmp(stat->m_name, name) == 0); statcnt++; require(statcnt == stats.m_stats.size()); } require(statcnt == stattot); for (uint k = 0; k < 10 * stattot; k++) { uint i = myrandom(statcnt); require(i < statcnt); UtilStat* stat = stats.get(i); require(stat != 0); if (i == 0) { UtilName name("root"); require(strcmp(stat->m_name, name) == 0); UtilStat* stat2 = stats.find(name); require(stat == stat2); } else { UtilName name("test", i); require(strcmp(stat->m_name, name) == 0); UtilStat* stat2 = stats.find(name); require(stat == stat2); uint v =myrandom(); stat->add(v); } } // iter bool iterseen[stattot]; for (uint i = 0; i < stattot; i++) iterseen[i] = false; // root is skipped const UtilStat* stat = stats.next(0); iterseen[0] = true; uint itercnt = 1; while (stat != 0) { itercnt++; require(itercnt <= statcnt); const uint i = stat->m_id; require(i < stattot); require(iterseen[i] == false); iterseen[i] = true; for (uint j = i; j != 0; ) { const UtilStat* stat = stats.get(j); require(stat != 0); require(iterseen[j] == true); ndbout << j; j = stat->m_parent; if (j != 0) ndbout << " "; else ndbout << endl; } stat = stats.next(i); } for (uint i = 0; i < stattot; i++) require(iterseen[i] == true); require(itercnt == statcnt); return 0; } static void myseed() { const char* p = NdbEnv_GetEnv("TEST_NDBIMPORTUTIL_SEED", (char*)0, 0); unsigned seed = p != 0 ? (unsigned)atoi(p) : (uint)NdbHost_GetProcessId(); ndbout << "seed=" << seed << endl; ndb_srand(seed); } static bool mycase(const char* name) { const char* p = NdbEnv_GetEnv("TEST_NDBIMPORTUTIL_CASE", (char*)0, 0); return p == 0 || strcmp(p, name) == 0; } static int testmain() { ndb_init(); #ifdef VM_TRACE signal(SIGABRT, SIG_DFL); signal(SIGSEGV, SIG_DFL); #endif mybigtest = #ifdef VM_TRACE false; #else true; #endif myseed(); if (mycase("testlist") && testlist() != 0) return -1; if (mycase("testrowlist1") && testrowlist1() != 0) return -1; if (mycase("testrowlist2") && testrowlist2() != 0) return -1; if (mycase("testrowmap1") && testrowmap1() != 0) return -1; if (mycase("testrowmap2") && testrowmap2() != 0) return -1; if (mycase("testbuf") && testbuf() != 0) return -1; if (mycase("testfile") && testfile() != 0) return -1; if (mycase("testprint") && testprint() != 0) return -1; if (mycase("teststat") && teststat() != 0) return -1; struct ndb_rusage ru; require(Ndb_GetRUsage(&ru, false) == 0); ndbout << "utime=" << ru.ru_utime/1000 << " stime=" << ru.ru_stime/1000 << " (ms)" << endl; return 0; } TAPTEST(NdbImportUtil) { int ret = testmain(); return (ret == 0); } #endif