925 lines
24 KiB
C++
925 lines
24 KiB
C++
/*
|
|
Copyright (c) 2018, 2019, Oracle and/or its affiliates. All rights reserved.
|
|
|
|
This program is free software; you can redistribute it and/or modify
|
|
it under the terms of the GNU General Public License, version 2.0,
|
|
as published by the Free Software Foundation.
|
|
|
|
This program is also distributed with certain software (including
|
|
but not limited to OpenSSL) that is licensed under separate terms,
|
|
as designated in a particular file or component or in included license
|
|
documentation. The authors of MySQL hereby grant you an additional
|
|
permission to link the program and your derivative works with the
|
|
separately licensed software that they have included with MySQL.
|
|
|
|
This program is distributed in the hope that it will be useful,
|
|
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
GNU General Public License, version 2.0, for more details.
|
|
|
|
You should have received a copy of the GNU General Public License
|
|
along with this program; if not, write to the Free Software
|
|
Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
|
|
*/
|
|
|
|
#ifndef NDB_IMPORT_UTIL_HPP
|
|
#define NDB_IMPORT_UTIL_HPP
|
|
|
|
#include <ndb_global.h>
|
|
#include <stdint.h>
|
|
#include <ndb_limits.h>
|
|
#include <mgmapi.h>
|
|
#include <NdbMutex.h>
|
|
#include <NdbCondition.h>
|
|
#include <NdbHost.h>
|
|
#include <NdbThread.h>
|
|
#include <NdbSleep.h>
|
|
#include <NdbGetRUsage.h>
|
|
#include <OutputStream.hpp>
|
|
#include <NdbOut.hpp>
|
|
#include <NdbApi.hpp>
|
|
#include "NdbImport.hpp"
|
|
#ifdef _WIN32
|
|
#include <io.h>
|
|
#endif
|
|
// STL
|
|
#include <string>
|
|
#include <vector>
|
|
#include <map>
|
|
#include <algorithm>
|
|
|
|
typedef NdbImport::Error Error;
|
|
|
|
#undef TEST_NDBIMPORT
|
|
#ifdef TEST_NDBIMPORTUTIL
|
|
#define TEST_NDBIMPORT
|
|
#endif
|
|
#ifdef TEST_NDBIMPORTCSV
|
|
#define TEST_NDBIMPORT
|
|
#endif
|
|
|
|
#define log_debug(n, x) \
|
|
if (unlikely(m_util.c_opt.m_log_level >= n)) \
|
|
(*m_util.c_log.out) << m_util.c_log.start << *this <<__LINE__ \
|
|
<< " " << x << m_util.c_log.stop
|
|
|
|
#if defined(VM_TRACE) || defined(TEST_NDBIMPORT)
|
|
#define log_debug_3(x) log_debug(3, x)
|
|
#else
|
|
#define log_debug_3(x)
|
|
#endif
|
|
|
|
#define Inval_uint (~(uint)0)
|
|
#define Inval_uint32 (~(uint32)0)
|
|
#define Inval_uint64 (~(uint64)0)
|
|
|
|
/*
|
|
* Utilities class. There is one Util instance attached to the
|
|
* single Impl plus Csv instance.
|
|
*/
|
|
|
|
class NdbImportUtil {
|
|
public:
|
|
NdbImportUtil();
|
|
~NdbImportUtil();
|
|
NdbImportUtil& m_util; // self
|
|
|
|
// opt
|
|
|
|
NdbImport::Opt c_opt;
|
|
|
|
// allow changing options within a scope
|
|
struct OptGuard {
|
|
OptGuard(NdbImportUtil& util) :
|
|
m_util(util) {
|
|
m_opt_save = m_util.c_opt;
|
|
}
|
|
~OptGuard() {
|
|
m_util.c_opt = m_opt_save;
|
|
}
|
|
NdbImportUtil& m_util;
|
|
NdbImport::Opt m_opt_save;
|
|
};
|
|
|
|
// name
|
|
|
|
/*
|
|
* Construct hierachical names where parts are separated by hyphens.
|
|
* Used to name teams, workers, stats.
|
|
*/
|
|
|
|
struct Name {
|
|
Name(const char* s);
|
|
Name(const char* s, const char* t);
|
|
Name(const char* s, uint t);
|
|
operator const char*() const {
|
|
return m_str.c_str();
|
|
}
|
|
const char* str() const {
|
|
return m_str.c_str();
|
|
}
|
|
std::string m_str;
|
|
};
|
|
|
|
// lockable
|
|
|
|
struct Lockable {
|
|
Lockable();
|
|
~Lockable();
|
|
void lock();
|
|
void unlock();
|
|
void wait(uint timeout);
|
|
void signal();
|
|
NdbMutex* m_mutex;
|
|
NdbCondition* m_condition;
|
|
};
|
|
|
|
// thread
|
|
|
|
struct Thread : Lockable {
|
|
Thread();
|
|
~Thread();
|
|
void join();
|
|
NdbThread* m_thread;
|
|
};
|
|
|
|
// list
|
|
|
|
/*
|
|
* Doubly-linked "intrusive" list, subclassed for various purposes.
|
|
* For example List/ListEnt are subclassed as RowList/Row where
|
|
* RowList stores only Row entries.
|
|
*
|
|
* For type-safety, subclasses of List use private inheritance and
|
|
* define explicitly the methods needed. This prevents putting an
|
|
* entry on a wrong type of list.
|
|
*
|
|
* A list has optional associated stats under a given name. These
|
|
* can be extended by subclasses.
|
|
*/
|
|
|
|
struct Stat;
|
|
struct Stats;
|
|
|
|
struct ListEnt {
|
|
ListEnt();
|
|
virtual ~ListEnt();
|
|
ListEnt* m_next;
|
|
ListEnt* m_prev;
|
|
};
|
|
|
|
struct List {
|
|
List();
|
|
virtual ~List();
|
|
void set_stats(Stats& stats, const char* name);
|
|
void push_back(ListEnt* ent);
|
|
void push_front(ListEnt* ent);
|
|
void push_after(ListEnt* ent1, ListEnt* ent2);
|
|
void push_before(ListEnt* ent1, ListEnt* ent2);
|
|
ListEnt* pop_front();
|
|
void remove(ListEnt* ent);
|
|
void push_back_from(List& src);
|
|
#if defined(VM_TRACE) || defined(TEST_NDBIMPORTUTIL)
|
|
void validate() const;
|
|
#else
|
|
void validate() const {}
|
|
#endif
|
|
ListEnt* m_front;
|
|
ListEnt* m_back;
|
|
uint m_cnt;
|
|
uint m_maxcnt;
|
|
uint64 m_totcnt;
|
|
Stat* m_stat_occup;
|
|
Stat* m_stat_total;
|
|
};
|
|
|
|
// attrs
|
|
|
|
struct Row;
|
|
|
|
struct Attr {
|
|
Attr();
|
|
void set_value(Row* row, const void* data, uint len) const;
|
|
void set_blob(Row* row, const void* data, uint len) const;
|
|
void set_null(Row* row, bool null) const;
|
|
// get_value() is used only for pseudo-columns
|
|
const uchar* get_value(const Row* row) const;
|
|
void get_value(const Row* row, uint32& value) const;
|
|
void get_value(const Row* row, uint64& value) const;
|
|
void get_value(const Row* row, char* buf, uint bufsz) const;
|
|
bool get_null(const Row* row) const;
|
|
uint get_blob_parts(uint len) const;
|
|
void set_sqltype();
|
|
std::string m_attrname;
|
|
uint m_attrno;
|
|
uint m_attrid;
|
|
NdbDictionary::Column::Type m_type;
|
|
CHARSET_INFO* m_charset;
|
|
char m_sqltype[100];
|
|
bool m_pk;
|
|
bool m_nullable;
|
|
uint m_precision;
|
|
uint m_scale;
|
|
uint m_length;
|
|
uint m_charlength;
|
|
NdbDictionary::Column::ArrayType m_arraytype;
|
|
uint m_inlinesize;
|
|
uint m_partsize;
|
|
const NdbDictionary::Table* m_blobtable;
|
|
uint m_size;
|
|
bool m_pad;
|
|
uchar m_padchar;
|
|
bool m_quotable;
|
|
bool m_isblob;
|
|
uint m_blobno;
|
|
uint m_offset;
|
|
uint m_null_byte;
|
|
uint m_null_bit;
|
|
};
|
|
|
|
typedef std::vector<Attr> Attrs;
|
|
|
|
// tables
|
|
|
|
struct Table {
|
|
Table();
|
|
void add_pseudo_attr(const char* name,
|
|
NdbDictionary::Column::Type type,
|
|
uint length = 1);
|
|
const Attr& get_attr(const char* attrname) const;
|
|
uint get_nodeid(uint fragid) const;
|
|
uint m_tabid;
|
|
const NdbDictionary::Table* m_tab;
|
|
const NdbRecord* m_rec;
|
|
const NdbRecord* m_keyrec;
|
|
uint m_recsize; // size of main record
|
|
bool m_has_hidden_pk;
|
|
Attrs m_attrs;
|
|
std::vector<uint> m_blobids;
|
|
// map fragid to nodeid
|
|
std::vector<uint16> m_fragments;
|
|
};
|
|
|
|
// tables mapped by table id
|
|
struct Tables {
|
|
std::map<uint, Table> m_tables;
|
|
};
|
|
|
|
Tables c_tables;
|
|
|
|
int add_table(NdbDictionary::Dictionary* dic,
|
|
const NdbDictionary::Table* tab,
|
|
uint& tabid,
|
|
Error& error);
|
|
const Table& get_table(uint tabid);
|
|
void remove_table(NdbDictionary::Dictionary* dic, uint tabid);
|
|
|
|
// rows
|
|
|
|
struct Blob;
|
|
|
|
struct RowCtl {
|
|
RowCtl(uint timeout) {
|
|
m_timeout = timeout;
|
|
m_retries = timeout == 0 ? 0 : 1;
|
|
m_dosignal = (timeout != 0);
|
|
m_dowait = (timeout != 0);
|
|
m_cnt_out = 0;
|
|
m_bytes_out = 0;
|
|
}
|
|
uint m_timeout;
|
|
uint m_retries;
|
|
bool m_dosignal;
|
|
bool m_dowait;
|
|
uint m_cnt_out;
|
|
uint m_bytes_out;
|
|
};
|
|
|
|
struct Row : ListEnt {
|
|
Row();
|
|
virtual ~Row();
|
|
void init(const Table& table);
|
|
uint m_tabid;
|
|
uint m_recsize; // fixed
|
|
uint m_rowsize; // includes blobs, used to compute batches
|
|
uint m_allocsize;
|
|
uint64 m_rowid;
|
|
uint64 m_linenr; // file line number starting at 1
|
|
uint64 m_startpos;
|
|
uint64 m_endpos;
|
|
uchar* m_data;
|
|
std::vector<Blob*> m_blobs;
|
|
};
|
|
|
|
struct RowList : private List, Lockable {
|
|
RowList();
|
|
virtual ~RowList();
|
|
void set_stats(Stats& stats, const char* name);
|
|
bool push_back(Row* row);
|
|
void push_back_force(Row* row);
|
|
bool push_front(Row* row);
|
|
void push_front_force(Row* row);
|
|
Row* pop_front();
|
|
void remove(Row* row);
|
|
void push_back_from(RowList& src);
|
|
// here signal/wait can be used on a locked argument list
|
|
void push_back_from(RowList& src, RowCtl& ctl);
|
|
void pop_front_to(RowList& dst, RowCtl& ctl);
|
|
uint cnt() const {
|
|
return m_cnt;
|
|
}
|
|
uint64 totcnt() const {
|
|
return m_totcnt;
|
|
}
|
|
bool empty() const {
|
|
return m_cnt == 0;
|
|
}
|
|
bool full() const {
|
|
return m_cnt >= m_rowbatch || m_rowsize >= m_rowbytes;
|
|
}
|
|
void lock() {
|
|
Lockable::lock();
|
|
if (m_stat_locks != 0)
|
|
m_stat_locks->add(1);
|
|
}
|
|
void unlock() {
|
|
Lockable::unlock();
|
|
}
|
|
#if defined(VM_TRACE) || defined(TEST_NDBIMPORTUTIL)
|
|
void validate() const;
|
|
#else
|
|
void validate() const {}
|
|
#endif
|
|
uint m_rowsize; // sum from row entries
|
|
uint m_rowbatch; // limit m_cnt
|
|
uint m_rowbytes; // limit m_rowsize
|
|
bool m_eof; // producer team has stopped
|
|
bool m_foe; // consumer team has stopped
|
|
uint64 m_overflow;
|
|
uint64 m_underflow;
|
|
Stat* m_stat_overflow; // failed to push due to size limit
|
|
Stat* m_stat_underflow; // failed to pop due to empty
|
|
Stat* m_stat_locks; // locks taken
|
|
};
|
|
|
|
// alloc and free shared rows
|
|
|
|
Row* alloc_row(const Table& Table, bool dolock = true);
|
|
void alloc_rows(const Table& table, uint cnt, RowList& dst);
|
|
void free_blobs_from_row(Row *row);
|
|
void free_row(Row* row);
|
|
void free_rows(RowList& src);
|
|
|
|
RowList* c_rows_free;
|
|
|
|
// blobs
|
|
|
|
struct Blob : ListEnt {
|
|
Blob();
|
|
virtual ~Blob();
|
|
void resize(uint size);
|
|
uint m_blobsize;
|
|
uint m_allocsize;
|
|
uchar* m_data;
|
|
};
|
|
|
|
struct BlobList : private List, Lockable {
|
|
BlobList();
|
|
virtual ~BlobList();
|
|
void push_back(Blob* blob) {
|
|
List::push_back(blob);
|
|
}
|
|
Blob* pop_front() {
|
|
return static_cast<Blob*>(List::pop_front());
|
|
}
|
|
};
|
|
|
|
Blob* alloc_blob();
|
|
void free_blob(Blob* blob);
|
|
|
|
BlobList* c_blobs_free;
|
|
|
|
// rowmap
|
|
|
|
/*
|
|
* A processed row is a row that has been inserted or rejected
|
|
* with permanent error. Processed rows tend to form ranges which
|
|
* merge together as processing continues. A row map represents
|
|
* such set of rows. Shared access requires mutex so workers
|
|
* should have private row maps merged periodically to a global
|
|
* row map. Contents of the row map are written to t1.map etc and
|
|
* are used to implement a --resume function.
|
|
*
|
|
* Implementation uses an ordered list. The main operation is
|
|
* merge. Lookup is used only when a resume is starting. Change
|
|
* to a skip list later if necessary.
|
|
*/
|
|
|
|
struct Range : ListEnt {
|
|
Range();
|
|
virtual ~Range();
|
|
void copy(const Range& range2);
|
|
bool equal(const Range& range2) const {
|
|
return
|
|
m_start == range2.m_start &&
|
|
m_end == range2.m_end &&
|
|
m_startpos == range2.m_startpos &&
|
|
m_endpos == range2.m_endpos &&
|
|
m_reject == range2.m_reject;
|
|
}
|
|
Range* next() {
|
|
return static_cast<Range*>(m_next);
|
|
}
|
|
const Range* next() const {
|
|
return static_cast<const Range*>(m_next);
|
|
}
|
|
Range* prev() {
|
|
return static_cast<Range*>(m_prev);
|
|
}
|
|
const Range* prev() const {
|
|
return static_cast<const Range*>(m_prev);
|
|
}
|
|
uint64 m_start; // starting rowid
|
|
uint64 m_end; // next rowid after
|
|
uint64 m_startpos; // byte offset of start in input
|
|
uint64 m_endpos; // byte offset of end in input
|
|
uint64 m_reject; // rejected rows (info only)
|
|
};
|
|
|
|
struct RangeList : private List, Lockable {
|
|
Range* front() {
|
|
return static_cast<Range*>(m_front);
|
|
}
|
|
const Range* front() const {
|
|
return static_cast<const Range*>(m_front);
|
|
}
|
|
Range* back() {
|
|
return static_cast<Range*>(m_back);
|
|
}
|
|
const Range* back() const {
|
|
return static_cast<const Range*>(m_back);
|
|
}
|
|
void push_back(Range* range) {
|
|
List::push_back(range);
|
|
}
|
|
void push_front(Range* range) {
|
|
List::push_front(range);
|
|
}
|
|
void push_after(Range* range1, Range* range2) {
|
|
List::push_after(range1, range2);
|
|
}
|
|
void push_before(Range* range1, Range* range2) {
|
|
List::push_before(range1, range2);
|
|
}
|
|
Range* pop_front() {
|
|
return static_cast<Range*>(List::pop_front());
|
|
}
|
|
void remove(Range* r) {
|
|
List::remove(r);
|
|
}
|
|
void push_back_from(RangeList& src) {
|
|
List::push_back_from(src);
|
|
}
|
|
uint cnt() const {
|
|
return m_cnt;
|
|
}
|
|
bool empty() const {
|
|
return m_cnt == 0;
|
|
}
|
|
void validate() const {
|
|
List::validate();
|
|
}
|
|
};
|
|
|
|
struct RowMap : Lockable {
|
|
RowMap(NdbImportUtil& util);
|
|
bool empty() const {
|
|
return m_ranges.empty();
|
|
}
|
|
uint size() const {
|
|
return m_ranges.cnt();
|
|
}
|
|
void clear() {
|
|
m_ranges_free.push_back_from(m_ranges);
|
|
}
|
|
bool equal(const RowMap& map2) const {
|
|
if (size() != map2.size())
|
|
return false;
|
|
const Range* r = m_ranges.front();
|
|
const Range* r2 = map2.m_ranges.front();
|
|
for (uint i = 0; i < size(); i++) {
|
|
require(r != 0 && r2 != 0);
|
|
if (!r->equal(*r2))
|
|
return false;
|
|
r = r->next();
|
|
r2 = r2->next();
|
|
}
|
|
require(r == 0 && r2 == 0);
|
|
return true;
|
|
}
|
|
void add(const Row* row, bool reject) {
|
|
Range r;
|
|
r.m_start = row->m_rowid;
|
|
r.m_end = row->m_rowid + 1;
|
|
r.m_startpos = row->m_startpos;
|
|
r.m_endpos = row->m_endpos;
|
|
r.m_reject = (uint)reject;
|
|
add(r);
|
|
}
|
|
void add(Range range2);
|
|
void add(const RowMap& map2);
|
|
Range* find(uint64 rowid);
|
|
bool remove(uint64 rowid);
|
|
// try to extend r upwards by r2
|
|
static bool merge_up(Range* r, const Range* r2) {
|
|
if (r->m_end == r2->m_start)
|
|
{
|
|
r->m_end = r2->m_end;
|
|
r->m_endpos = r2->m_endpos;
|
|
r->m_reject += r2->m_reject;
|
|
return true;
|
|
}
|
|
require(r->m_end < r2->m_start);
|
|
return false;
|
|
}
|
|
// try to extend r downwards by r2
|
|
static bool merge_down(Range* r, const Range* r2) {
|
|
if (r->m_start == r2->m_end)
|
|
{
|
|
r->m_start = r2->m_start;
|
|
r->m_startpos = r2->m_startpos;
|
|
r->m_reject += r2->m_reject;
|
|
return true;
|
|
}
|
|
require(r->m_start > r2->m_end);
|
|
return false;
|
|
}
|
|
void get_total(uint64& rows, uint64& reject) const;
|
|
Range* alloc_range() {
|
|
if (unlikely(m_ranges_free.empty())) {
|
|
Range* r = m_util.alloc_range(true);
|
|
m_ranges_free.push_back(r);
|
|
}
|
|
return m_ranges_free.pop_front();
|
|
}
|
|
void free_range(Range* r) {
|
|
m_ranges_free.push_back(r);
|
|
}
|
|
#if defined(VM_TRACE) || defined(TEST_NDBIMPORTUTIL)
|
|
void validate() const;
|
|
#else
|
|
void validate() const {}
|
|
#endif
|
|
NdbImportUtil& m_util;
|
|
RangeList m_ranges;
|
|
// store free ranges locally to avoid mutexing
|
|
RangeList m_ranges_free;
|
|
};
|
|
|
|
Range* alloc_range(bool dolock);
|
|
void alloc_ranges(uint cnt, RangeList& dst);
|
|
void free_range(Range* r);
|
|
void free_ranges(RangeList& src);
|
|
|
|
RangeList c_ranges_free;
|
|
|
|
// errormap
|
|
|
|
/*
|
|
* Count temporary errors per error code. Any number of temporary
|
|
* errors per db execution batch is counted as 1 on job level.
|
|
* This is because usually individual transactions are not
|
|
* responsible and all tend to fail with same error.
|
|
*/
|
|
struct ErrorMap : Lockable {
|
|
void clear() {
|
|
m_map.clear();
|
|
}
|
|
uint size() const {
|
|
return m_map.size();
|
|
}
|
|
uint get_sum() const {
|
|
uint sum = 0;
|
|
std::map<uint, uint>::const_iterator it;
|
|
for (it = m_map.begin(); it != m_map.end(); it++)
|
|
sum += it->second;
|
|
return sum;
|
|
}
|
|
void add_one(uint key) {
|
|
std::map<uint, uint>::iterator it;
|
|
it = m_map.find(key);
|
|
if (it != m_map.end())
|
|
it->second += 1;
|
|
else
|
|
m_map.insert(std::pair<uint, uint>(key, 1));
|
|
}
|
|
void add_one(const ErrorMap& errormap) {
|
|
std::map<uint, uint>::const_iterator it;
|
|
for (it = errormap.m_map.begin(); it != errormap.m_map.end(); it++)
|
|
add_one(it->first);
|
|
}
|
|
std::map<uint, uint> m_map;
|
|
};
|
|
|
|
// pseudo-tables
|
|
|
|
static const uint g_result_tabid = 0xffff0000;
|
|
static const uint g_reject_tabid = 0xffff0001;
|
|
static const uint g_rowmap_tabid = 0xffff0002;
|
|
static const uint g_stopt_tabid = 0xffff0003;
|
|
static const uint g_stats_tabid = 0xffff0004;
|
|
Table c_result_table;
|
|
Table c_reject_table;
|
|
Table c_rowmap_table;
|
|
Table c_stopt_table;
|
|
Table c_stats_table;
|
|
|
|
void add_pseudo_tables();
|
|
void add_result_table();
|
|
void add_reject_table();
|
|
void add_rowmap_table();
|
|
void add_stopt_table();
|
|
void add_stats_table();
|
|
|
|
void add_error_attrs(Table& table);
|
|
|
|
void set_result_row(Row* row,
|
|
uint32 runno,
|
|
const char* name,
|
|
const char* desc,
|
|
uint64 rows,
|
|
uint64 reject,
|
|
uint64 temperrors,
|
|
uint64 runtime,
|
|
uint64 utime,
|
|
const Error& error);
|
|
|
|
void set_reject_row(Row* row,
|
|
uint32 runno,
|
|
const Error& error,
|
|
const char* reject,
|
|
uint rejectlen);
|
|
|
|
void set_rowmap_row(Row* row,
|
|
uint32 runno,
|
|
const Range& range);
|
|
|
|
void set_stopt_row(Row* row,
|
|
uint32 runno,
|
|
const char* option,
|
|
uint32 value);
|
|
|
|
void set_stats_row(Row* row,
|
|
uint32 runno,
|
|
const Stat& stat,
|
|
bool global);
|
|
|
|
void set_error_attrs(Row* row,
|
|
const Table& table,
|
|
const Error& error,
|
|
uint& id);
|
|
|
|
// buf
|
|
|
|
/*
|
|
* Buffer for I/O and parsing etc. A "split" buffer is divided
|
|
* into upper and lower halves. Lower half is for I/O but more
|
|
* data can be added above it in upper half. This is done in CSV
|
|
* parsing to avoid splitting lines and fields between buffers.
|
|
* The byte after data (at m_len) is valid and is set to NUL.
|
|
*/
|
|
struct Buf {
|
|
Buf(bool split = false);
|
|
~Buf();
|
|
void alloc(uint pagesize, uint pagecnt);
|
|
void copy(const uchar* src, uint len);
|
|
void reset();
|
|
int movetail(Buf& dst); // move m_tail..m_len to another buffer
|
|
const bool m_split;
|
|
uchar* m_allocptr;
|
|
uint m_allocsize;
|
|
uchar* m_data; // full data
|
|
uint m_size; // full size
|
|
uint m_top; // I/O top, zero or lower half if split
|
|
uint m_start; // data start
|
|
uint m_tail; // end of used data (relative to start)
|
|
uint m_len; // data length (relative to start)
|
|
bool m_eof; // mark eof on read
|
|
// following are optional and are maintained by caller
|
|
uint m_pos; // current position (relative to start)
|
|
uint m_lineno; // current line number (starting at 0)
|
|
};
|
|
|
|
// dst should hold 5 times len
|
|
static void pretty_print(char* dst, const void* ptr, uint len);
|
|
|
|
// files
|
|
|
|
struct File {
|
|
#ifndef _WIN32
|
|
const static int Read_flags = O_RDONLY;
|
|
const static int Write_flags = O_WRONLY | O_CREAT | O_TRUNC;
|
|
const static int Append_flags = O_WRONLY | O_APPEND;
|
|
const static int Creat_mode = 0644;
|
|
#else
|
|
const static int Read_flags = _O_RDONLY | _O_BINARY;
|
|
const static int Write_flags = _O_WRONLY | _O_CREAT | _O_TRUNC | _O_BINARY;
|
|
const static int Append_flags = _O_WRONLY | _O_APPEND | _O_BINARY;
|
|
const static int Creat_mode = _S_IREAD | _S_IWRITE;
|
|
#endif
|
|
File(NdbImportUtil& util, Error& error);
|
|
~File();
|
|
void set_path(const char* path) {
|
|
m_path = path;
|
|
}
|
|
const char* get_path() {
|
|
return m_path.c_str();
|
|
}
|
|
int do_open(int flags);
|
|
int do_read(uchar* dst, uint size, uint& len);
|
|
int do_read(Buf& buf);
|
|
int do_write(const uchar* src, uint size);
|
|
int do_write(const Buf& buf);
|
|
int do_close();
|
|
int do_seek(uint64 offset);
|
|
NdbImportUtil& m_util;
|
|
Error& m_error;
|
|
std::string m_path;
|
|
int m_fd;
|
|
int m_flags;
|
|
};
|
|
|
|
// stats
|
|
|
|
static const uint StatNULL = Inval_uint;
|
|
|
|
/*
|
|
* A stat entry is identified by name and has an id for fast
|
|
* update access. Child entries propagate their values to
|
|
* the parent. Entries with no children usually have a unique
|
|
* source of values but this is not required.
|
|
*
|
|
* Many stats are updated under some mutex. We do not use
|
|
* std::atomic here.
|
|
*/
|
|
struct Stat {
|
|
Stat(Stats& stats,
|
|
uint id,
|
|
const char* name,
|
|
uint parent,
|
|
uint level,
|
|
uint flags);
|
|
void add(uint64 val);
|
|
void reset();
|
|
// meta
|
|
Stats& m_stats;
|
|
const uint m_id;
|
|
const Name m_name;
|
|
const uint m_parent;
|
|
const uint m_level;
|
|
const uint m_flags; // future use e.g. display options
|
|
uint m_childcnt;
|
|
uint m_firstchild;
|
|
uint m_lastchild;
|
|
uint m_nextchild;
|
|
// data
|
|
uint64 m_obs; // observations
|
|
uint64 m_sum;
|
|
uint64 m_min;
|
|
uint64 m_max;
|
|
double m_sum1; // m_sum summed as float
|
|
double m_sum2;
|
|
};
|
|
|
|
struct Stats : Lockable {
|
|
Stats(NdbImportUtil& util);
|
|
~Stats();
|
|
Stat* create(const char* name, uint parent, uint flags);
|
|
Stat* get(uint i);
|
|
const Stat* get(uint i) const;
|
|
Stat* find(const char* name) const;
|
|
void add(uint id, uint64 val);
|
|
const Stat* next(uint id) const;
|
|
void reset();
|
|
#if defined(VM_TRACE) || defined(TEST_NDBIMPORTUTIL)
|
|
struct Validate {
|
|
Validate(uint parent, uint id, uint level, bool* seen) :
|
|
m_parent(parent),
|
|
m_id(id),
|
|
m_level(level),
|
|
m_seen(seen)
|
|
{}
|
|
uint m_parent;
|
|
uint m_id;
|
|
uint m_cnt;
|
|
uint m_level;
|
|
bool* m_seen;
|
|
};
|
|
void validate() const;
|
|
const Stat* validate(Validate& v) const;
|
|
#else
|
|
void validate() const {}
|
|
#endif
|
|
NdbImportUtil& m_util;
|
|
std::vector<Stat*> m_stats; // root at 0
|
|
};
|
|
|
|
Stats c_stats;
|
|
|
|
// timer - like NdbTimer but we do not link to libndbNDBT
|
|
|
|
struct Timer {
|
|
Timer();
|
|
void start();
|
|
void stop();
|
|
uint64 elapsed_sec() const;
|
|
uint64 elapsed_msec() const;
|
|
uint64 elapsed_usec() const;
|
|
NDB_TICKS m_start;
|
|
NDB_TICKS m_stop;
|
|
uint64 m_utime_msec;
|
|
uint64 m_stime_msec;
|
|
};
|
|
|
|
// log
|
|
|
|
struct DebugLogger {
|
|
DebugLogger();
|
|
~DebugLogger();
|
|
|
|
struct MessageStart {
|
|
Timer *timer;
|
|
NdbMutex *mutex;
|
|
};
|
|
|
|
struct MessageStop {
|
|
NdbMutex *mutex;
|
|
};
|
|
|
|
Timer timer;
|
|
MessageStart start;
|
|
MessageStop stop;
|
|
FileOutputStream* logfile;
|
|
NdbOut* out;
|
|
NdbMutex* mutex;
|
|
};
|
|
|
|
DebugLogger c_log;
|
|
|
|
// error
|
|
|
|
/*
|
|
* Errors are written on several levels.
|
|
* - global error (c_error here), fatal to entire import
|
|
* - team level error, fatal for the job
|
|
* - local non-fatal error added to rejected rows
|
|
*/
|
|
|
|
Error c_error;
|
|
Lockable c_error_lock;
|
|
|
|
void set_error_gen(Error& error, int line,
|
|
const char* fmt = 0, ...)
|
|
ATTRIBUTE_FORMAT(printf, 4, 5);
|
|
void set_error_usage(Error& error, int line,
|
|
const char* fmt = 0, ...)
|
|
ATTRIBUTE_FORMAT(printf, 4, 5);
|
|
void set_error_alloc(Error& error, int line);
|
|
void set_error_mgm(Error& error, int line,
|
|
NdbMgmHandle handle);
|
|
void set_error_con(Error& error, int line,
|
|
const Ndb_cluster_connection* con);
|
|
void set_error_ndb(Error& error, int line,
|
|
const NdbError& ndberror, const char* fmt = 0, ...);
|
|
void set_error_os(Error& error, int line,
|
|
const char* fmt = 0, ...)
|
|
ATTRIBUTE_FORMAT(printf, 4, 5);
|
|
void set_error_data(Error& error, int line,
|
|
int code, const char* fmt = 0, ...)
|
|
ATTRIBUTE_FORMAT(printf, 5, 6);
|
|
void copy_error(Error& error, const Error& error2);
|
|
bool has_error(const Error& error) {
|
|
return error.type != Error::Type_noerror;
|
|
}
|
|
bool has_error() {
|
|
return has_error(c_error);
|
|
}
|
|
|
|
// global flag to stop all jobs
|
|
static bool g_stop_all;
|
|
|
|
// convert milliseconds to hours,minutes,seconds string
|
|
static void fmt_msec_to_hhmmss(char* str, uint64 msec);
|
|
};
|
|
|
|
NdbOut& operator<<(NdbOut& out, const NdbImportUtil& util);
|
|
NdbOut& operator<<(NdbOut& out, const NdbImportUtil::Name& name);
|
|
NdbOut& operator<<(NdbOut& out, const NdbImportUtil::RowMap& rowmap);
|
|
NdbOut& operator<<(NdbOut& out, const NdbImportUtil::Range& range);
|
|
NdbOut& operator<<(NdbOut& out, const NdbImportUtil::Buf& buf);
|
|
NdbOut& operator<<(NdbOut& out, const NdbImportUtil::Stats& stats);
|
|
NdbOut& operator<<(NdbOut& out, const NdbImportUtil::DebugLogger::MessageStart &);
|
|
NdbOut& operator<<(NdbOut& out, const NdbImportUtil::DebugLogger::MessageStop &);
|
|
|
|
#endif
|