polardbxengine/storage/ndb/tools/NdbImportImpl.hpp

839 lines
23 KiB
C++

/*
Copyright (c) 2018, 2019, Oracle and/or its affiliates. All rights reserved.
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License, version 2.0,
as published by the Free Software Foundation.
This program is also distributed with certain software (including
but not limited to OpenSSL) that is licensed under separate terms,
as designated in a particular file or component or in included license
documentation. The authors of MySQL hereby grant you an additional
permission to link the program and your derivative works with the
separately licensed software that they have included with MySQL.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License, version 2.0, for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
*/
#ifndef NDB_IMPORT_IMPL_HPP
#define NDB_IMPORT_IMPL_HPP
#include <ndb_global.h>
#include <stdint.h>
#include <ndb_opts.h>
#include <ndb_limits.h>
#include <mgmapi.h>
#include <NdbApi.hpp>
#include <NdbError.hpp>
#include <NdbSleep.h>
#include <ndb_rand.h>
#include "NdbImport.hpp"
#include "NdbImportUtil.hpp"
#include "NdbImportCsv.hpp"
// STL
#include <map>
#include <algorithm>
class NdbImportImpl : public NdbImport {
public:
friend class NdbImport;
typedef NdbImportUtil::OptGuard OptGuard;
typedef NdbImportUtil::Name Name;
typedef NdbImportUtil::Lockable Lockable;
typedef NdbImportUtil::Thread Thread;
typedef NdbImportUtil::ListEnt ListEnt;
typedef NdbImportUtil::List List;
typedef NdbImportUtil::Attr Attr;
typedef NdbImportUtil::Attrs Attrs;
typedef NdbImportUtil::Table Table;
typedef NdbImportUtil::Tables Tables;
typedef NdbImportUtil::RowCtl RowCtl;
typedef NdbImportUtil::Row Row;
typedef NdbImportUtil::RowList RowList;
typedef NdbImportUtil::Blob Blob;
typedef NdbImportUtil::Range Range;
typedef NdbImportUtil::RangeList RangeList;
typedef NdbImportUtil::RowMap RowMap;
typedef NdbImportUtil::ErrorMap ErrorMap;
typedef NdbImportUtil::Buf Buf;
typedef NdbImportUtil::File File;
typedef NdbImportUtil::Stat Stat;
typedef NdbImportUtil::Stats Stats;
typedef NdbImportUtil::Timer Timer;
typedef NdbImportCsv::Spec CsvSpec;
typedef NdbImportCsv::Input CsvInput;
typedef NdbImportCsv::Output CsvOutput;
NdbImportImpl(NdbImport& facade);
~NdbImportImpl();
NdbImport* const m_facade;
NdbImportUtil m_util;
NdbImportCsv m_csv;
Error& m_error;
// mgm
struct Mgm {
Mgm(NdbImportImpl& impl);
~Mgm();
int do_connect();
void do_disconnect();
int get_status();
NdbImportImpl& m_impl;
NdbImportUtil& m_util;
Error& m_error;
NdbMgmHandle m_handle;
bool m_connected;
ndb_mgm_cluster_state* m_status;
};
// node
static const uint g_max_ndb_nodes = MAX_NDB_NODES;
static const uint g_max_nodes = MAX_NODES;
struct Node {
Node();
uint m_nodeid;
};
struct Nodes {
Nodes();
uint m_nodecnt;
Node m_nodes[g_max_ndb_nodes];
uint m_index[g_max_nodes];
};
Nodes c_nodes;
int get_nodes(Nodes& c);
// connect
struct Connect {
Connect();
~Connect();
uint m_connectioncnt;
Ndb_cluster_connection** m_connections;
Ndb_cluster_connection* m_mainconnection;
bool m_connected;
Ndb* m_mainndb;
};
Connect c_connect;
uint c_connectionindex;
int do_connect();
void do_disconnect();
// tables
int add_table(const char* database,
const char* table,
uint& tabid,
Error& error);
int remove_table(uint table_id);
// files
struct WorkerFile : File, Lockable {
WorkerFile(NdbImportUtil& util, Error& error);
uint m_workerno;
};
// job
/*
* Execution: Job - Team - Worker.
*
* A job does one task, for example loading a file of CSV data
* into a table. Multiple and parallel jobs are allowed. They
* share cluster connections, table definitions, and some data
* pools, but not threads.
*
* A job is defined as a set of teams. Basically there is
* a producer team and a consumer team. The CSV->NDB job also has
* a relay team between producer and consumer. Teams run in the
* same thread as the job owning them. A team only controls its
* workers and a job only controls its teams.
*
* A team has a set of workers cooperating on the same task, for
* example reading and parsing input in turns, or loading rows
* into NDB. Each worker runs in its own thread.
*
* Table data: Row, RowList.
*
* A row is data for one table row in NdbRecord format (blob data
* is included). A row has a unique row id starting at 0 (e.g.
* CSV line number). Teams are connected by row queues (lists).
* Rows are produced and consumed by workers. Usually a mutex on
* the row queue is required so batching should be done.
*
* A row queue has a preferred size limit to balance consumer and
* producer activities. When a producing worker sees the limit
* exceeded, it takes a short break. Similarly when a consuming
* worker finds no rows to consume, it takes a short break.
*
* Termination: A producing worker stops when done (e.g. end of
* CSV file). When the team sees that all workers have stopped it
* sets an "end-of-file" flag on the row queue. The queue need
* not be empty yet. A consuming worker stops when it finds no
* rows and also sees the end-of-file flag. When all workers have
* stopped, the team stops.
*
* A consuming team sets a "reverse" end-of-file flag on the row
* queue when it stops (which can be on fatal team error). This
* causes the previous producing team(s) to stop too.
*
* When all teams have stopped the job itself stops, waiting to be
* collected by the client application such as ndb_import.
*
* A job can also be asked to stop gracefully. This asks each
* team and worker to stop when they are in a consistent state.
* This is used to handle SIGHUP etc or other user interaction.
*
* Diagnostics: There is a diagnostics team which starts before
* and stops after other teams. Its task is to record job and
* team termination status, rejected rows, a map of processed
* rows, and statistics. The records exist in "pseudo-tables" and
* are written to files in CSV format.
*
* The diagnostics team is also responsible for reading required
* old result files if a job is resumed.
*
* Errors can be global (e.g. connection failure) or specific to
* a job. Any error causes job termination. Workers record their
* errors on team level and team level error causes job error.
* Rejected rows record their error in the rejected row but do not
* cause job error (up to a given limit).
*/
struct Job;
struct Team;
struct Worker;
struct JobState {
enum State {
State_null = 0,
State_created,
State_starting,
State_running,
State_stop,
State_stopped,
State_done
};
};
struct TeamState {
enum State {
State_null = 0,
State_created,
State_started,
State_running,
State_stop,
State_stopped
};
};
struct WorkerState {
enum State {
State_null = 0,
State_wait,
State_run,
State_running,
State_stop,
State_stopped
};
};
static const int g_teamstatecnt = TeamState::State_stopped + 1;
static const int g_workerstatecnt = WorkerState::State_stopped + 1;
static const char* g_str_state(JobState::State state);
static const char* g_str_state(TeamState::State state);
static const char* g_str_state(WorkerState::State state);
static const uint g_max_teams = 10;
struct Job : Thread {
Job(NdbImportImpl& impl, uint jobno);
~Job();
// define teams and row queues
void do_create();
void add_team(Team* team);
// add and set table
int add_table(const char* database, const char* table, uint& tabid);
void set_table(uint tabid);
int remove_table(uint table_id);
// start teams and run the job until done
void do_start();
void start_diag_team();
void start_resume();
void start_teams();
void check_teams(bool dostop);
void check_userstop();
void collect_teams();
void collect_stats();
void stop_diag_team();
// client request to stop the job
void do_stop();
void str_state(char* str) const;
NdbImportImpl& m_impl;
NdbImportUtil& m_util;
uint m_runno; // run number i.e. resume count
const uint m_jobno;
const Name m_name;
// per-job stats
Stats m_stats;
JobState::State m_state;
uint m_tabid;
bool m_dostop; // request graceful stop
bool m_fatal; // error is unlikely to be resumable
ErrorMap m_errormap;// temporary errors from exec-op
uint m_teamcnt;
Team* m_teams[g_max_teams];
uint m_teamstates[g_teamstatecnt];
RowList* m_rows_relay;
RowList* m_rows_exec[g_max_ndb_nodes];
RowList* m_rows_reject;
RowMap m_rowmap_in; // old rowmap on resume
RowMap m_rowmap_out;
// total from previous runs (if --resume)
uint64 m_old_rows;
uint64 m_old_reject;
uint64 m_old_runtime;
// counts from this run only
uint64 m_new_rows;
uint64 m_new_reject;
uint64 m_new_runtime;
mutable Timer m_timer;
Error m_error;
bool has_error() const {
return m_util.has_error(m_error);
}
// stats
Stat* m_stat_rows; // rows inserted
Stat* m_stat_reject; // rows rejected at some stage
Stat* m_stat_runtime; // total runtime in milliseconds
Stat* m_stat_rowssec; // rows inserted per second
Stat* m_stat_utime; // from workers
Stat* m_stat_stime;
Stat* m_stat_rowmap;
Stat* m_stat_rowmap_utime; // mainly from rowmap merges
};
struct Team {
Team(Job& job, const char* name, uint workercnt);
virtual ~Team() = 0;
// create workers
void do_create();
virtual Worker* create_worker(uint n) = 0;
// create worker threads
void do_start();
virtual void do_init() = 0;
Worker* get_worker(uint n);
void start_worker(Worker* w);
void wait_workers(WorkerState::State state);
void wait_worker(Worker* w, WorkerState::State state);
// start worker threads running
void do_run();
void run_worker(Worker* w);
void check_workers();
// stop and collect workers and stop this team
void do_stop();
void stop_worker(Worker* w);
virtual void do_end() = 0;
void set_table(uint tabid);
virtual void str_state(char* str) const;
Job& m_job;
NdbImportImpl& m_impl;
NdbImportUtil& m_util;
const uint m_teamno;
const Name m_name;
TeamState::State m_state;
const uint m_workercnt;
Worker** m_workers;
uint m_workerstates[g_workerstatecnt];
uint m_tabid;
RowMap m_rowmap_out;
bool m_is_diag;
mutable Timer m_timer;
Error m_error; // team level
bool has_error() const {
return m_util.has_error(m_error);
}
// stats
Stat* m_stat_runtime;
Stat* m_stat_slice;
Stat* m_stat_idleslice;
Stat* m_stat_idlerun;
Stat* m_stat_utime;
Stat* m_stat_stime;
Stat* m_stat_rowmap;
};
struct Worker : Thread {
Worker(Team& team, uint n);
virtual ~Worker();
// run the worker thread until done
void do_start();
virtual void do_init() = 0;
/*
* Run one "slice" of the task, e.g. parse a page of CSV or do
* a batch of database ops. Between slices the worker "comes up
* for air" to check status and to sleep for a while if the
* slice did no work.
*/
virtual void do_run() = 0;
virtual void do_end() = 0;
Worker* next_worker();
virtual void str_state(char* str) const;
Team& m_team;
NdbImportImpl& m_impl;
NdbImportUtil& m_util;
const uint m_workerno;
const Name m_name;
WorkerState::State m_state;
bool m_dostop; // request graceful stop
uint m_slice;
uint m_idleslice;
bool m_idle; // last slice did no work
uint m_idlerun; // consecutive idle slices
RowMap m_rowmap_out;
mutable Timer m_timer;
Error& m_error; // team level
bool has_error() const {
return m_team.has_error();
}
// random for tests
uint get_rand() {
return (uint)ndb_rand_r(&m_seed);
}
unsigned m_seed;
// stats
Stat* m_stat_slice; // slices
Stat* m_stat_idleslice; // slices which did no work
Stat* m_stat_idlerun;
Stat* m_stat_utime;
Stat* m_stat_stime;
Stat* m_stat_rowmap;
};
// random input team
struct RandomInputTeam : Team {
RandomInputTeam(Job& job, uint workercnt);
virtual ~RandomInputTeam();
virtual Worker* create_worker(uint n);
virtual void do_init();
virtual void do_end();
};
struct RandomInputWorker : Worker {
RandomInputWorker(Team& team, uint n);
virtual ~RandomInputWorker();
virtual void do_init();
virtual void do_run();
virtual void do_end();
Row* create_row(uint64 rowid, const Table& t);
};
// csv input team
// see NdbImportCsv::Input for details
struct InputState {
enum State {
State_null = 0,
// try to lock the input file to this worker
State_lock,
// read a block from the locked file and release the lock
State_read,
// waiting for previous worker to transfer partial last line
State_waittail,
// parse the complete lines via Csv
State_parse,
// transfer partial last line to next worker
State_movetail,
// evaluate parsed lines and fields into rows via Csv
State_eval,
// send evaluated rows to relay row queue via Csv
State_send,
// end of CSV input
State_eof
};
};
static const char* g_str_state(InputState::State state);
struct CsvInputTeam : Team {
CsvInputTeam(Job& job, uint workercnt);
virtual ~CsvInputTeam();
virtual Worker* create_worker(uint n);
virtual void do_init();
virtual void do_end();
CsvSpec m_csvspec;
WorkerFile m_file;
// stats
Stat* m_stat_waittail;
Stat* m_stat_waitmove;
Stat* m_stat_movetail;
};
struct CsvInputWorker : Worker {
CsvInputWorker(Team& team, uint n);
virtual ~CsvInputWorker();
virtual void do_init();
virtual void do_run();
virtual void do_end();
void state_lock();
void state_read();
void state_waittail();
void state_parse();
void state_movetail();
void state_eval();
void state_send();
virtual void str_state(char* str) const;
InputState::State m_inputstate;
Buf m_buf;
CsvInput* m_csvinput;
bool m_firstread;
bool m_eof;
};
// null output team
struct NullOutputTeam : Team {
NullOutputTeam(Job& job, uint workercnt);
virtual ~NullOutputTeam();
virtual Worker* create_worker(uint n);
virtual void do_init();
virtual void do_end();
};
struct NullOutputWorker : Worker {
NullOutputWorker(Team& team, uint n);
virtual ~NullOutputWorker();
virtual void do_init();
virtual void do_run();
virtual void do_end();
};
// op
struct Op : ListEnt {
Op();
Op* next() {
return static_cast<Op*>(m_next);
}
Row* m_row;
const NdbOperation* m_rowop;
uint m_opcnt; // main and blob NDB ops
uint m_opsize;
};
struct OpList : private List {
OpList();
~OpList();
void set_stats(Stats& stats, const char* name) {
List::set_stats(stats, name);
}
Op* front() {
return static_cast<Op*>(m_front);
}
Op* pop_front() {
return static_cast<Op*>(List::pop_front());
}
void push_back(Op* op) {
List::push_back(op);
}
void push_front(Op* op) {
List::push_front(op);
}
uint cnt() const {
return m_cnt;
}
};
// tx
struct DbWorker;
struct Tx : ListEnt {
Tx(DbWorker* worker);
virtual ~Tx();
DbWorker* const m_worker;
NdbTransaction* m_trans;
OpList m_ops;
Tx* next() {
return static_cast<Tx*>(m_next);
}
};
struct TxList : private List {
TxList();
~TxList();
void set_stats(Stats& stats, const char* name) {
List::set_stats(stats, name);
}
Tx* front() {
return static_cast<Tx*>(m_front);
}
void push_back(Tx* tx) {
List::push_back(tx);
}
Tx* pop_front() {
return static_cast<Tx*>(List::pop_front());
}
void remove(Tx* tx) {
List::remove(tx);
}
uint cnt() const {
return m_cnt;
}
};
// db team
struct DbTeam : Team {
DbTeam(Job& job, const char* name, uint workercnt);
virtual ~DbTeam() = 0;
};
struct DbWorker : Worker {
DbWorker(Team& team, uint n);
virtual ~DbWorker() = 0;
int create_ndb(uint transcnt);
Op* alloc_op();
void free_op(Op* op);
Tx* start_trans();
Tx* start_trans(const NdbRecord* keyrec,
const char* keydata,
uchar* xfrmbuf, uint xfrmbuflen);
Tx* start_trans(uint nodeid, uint instanceid);
void close_trans(Tx* tx);
Ndb* m_ndb;
OpList m_op_free;
TxList m_tx_free;
TxList m_tx_open;
// rows to free at batch end under single mutex
RowList m_rows_free;
};
// relay op team
/*
* A relay op worker consumes relay rows. It calls the hash
* calculation on distribution keys to determine optimal node to
* send the row to. It then pipes the row to exec op worker(s)
* dedicated to that node.
*/
struct RelayState {
enum State {
State_null = 0,
// receive rows from e.g. CSV input
State_receive,
// select optimal node
State_define,
// send rows to each exec op worker
State_send,
// no more rows
State_eof
};
};
static const char* g_str_state(RelayState::State state);
struct RelayOpTeam : DbTeam {
RelayOpTeam(Job& job, uint workercnt);
virtual ~RelayOpTeam();
virtual Worker* create_worker(uint n);
virtual void do_init();
virtual void do_end();
};
struct RelayOpWorker : DbWorker {
RelayOpWorker(Team& team, uint n);
virtual ~RelayOpWorker();
virtual void do_init();
virtual void do_run();
virtual void do_end();
void state_receive();
void state_define();
void state_send();
virtual void str_state(char* str) const;
RelayState::State m_relaystate;
uchar* m_xfrmalloc;
uchar* m_xfrmbuf;
uint m_xfrmbuflen;
RowList m_rows; // rows received
RowList* m_rows_exec[g_max_ndb_nodes]; // sorted to per-node
};
// exec op team
/*
* An exec op worker is dedicated to a specific node (DBTC). This
* allows better use of the transporter. The worker receives rows
* from relay op workers. A row gives rise to a main operation
* and any blob part operations.
*
* The code has synch and asynch variants. The synch variant is
* mainly for performance comparison. It uses one transaction for
* all rows in the batch and does not check errors on individual
* operations. The asynch variant uses one transaction for each
* row and can detect rows to reject.
*/
struct ExecState {
enum State {
State_null = 0,
// receive rows until a batch is full
State_receive,
// define transactions and operations
State_define,
// prepare the transactions (asynch)
State_prepare,
// execute (synch) or send (asynch) the transactions
State_send,
// poll for the transactions (asynch)
State_poll,
// no more incoming rows
State_eof
};
};
static const char* g_str_state(ExecState::State state);
struct ExecOpTeam : DbTeam {
ExecOpTeam(Job& job, uint workercnt);
virtual ~ExecOpTeam();
virtual Worker* create_worker(uint n);
virtual void do_init();
virtual void do_end();
};
struct ExecOpWorker : DbWorker {
ExecOpWorker(Team& team, uint n);
virtual ~ExecOpWorker() = 0;
virtual void do_init();
virtual void do_run();
virtual void do_end() = 0;
void state_receive(); // common to synch/asynch
virtual void state_define() = 0;
virtual void state_prepare() = 0;
virtual void state_send() = 0;
virtual void state_poll() = 0;
virtual void str_state(char* str) const;
void reject_row(Row* row, const Error& error);
ExecState::State m_execstate;
uint m_nodeindex; // index into ndb nodes array
uint m_nodeid;
RowList m_rows; // received rows
OpList m_ops; // received rows converted to ops
bool m_eof;
ErrorMap m_errormap;// temporary errors in current batch
uint m_opcnt; // current batch
uint m_opsize;
};
struct ExecOpWorkerSynch : ExecOpWorker {
ExecOpWorkerSynch(Team& team, uint n);
virtual ~ExecOpWorkerSynch();
virtual void do_end();
virtual void state_define();
virtual void state_prepare();
virtual void state_send();
virtual void state_poll();
};
struct ExecOpWorkerAsynch : ExecOpWorker {
ExecOpWorkerAsynch(Team& team, uint n);
virtual ~ExecOpWorkerAsynch();
virtual void do_end();
virtual void state_define();
virtual void state_prepare();
virtual void state_send();
virtual void state_poll();
void asynch_callback(Tx* tx);
};
// diag team
struct DiagTeam : Team {
DiagTeam(Job& job, uint workercnt);
virtual ~DiagTeam();
virtual Worker* create_worker(uint n);
virtual void do_init();
void read_old_diags(const char* name,
const char* path,
const Table& table,
RowList& rows_out);
void read_old_diags();
void open_new_diags();
virtual void do_end();
CsvSpec m_csvspec;
WorkerFile m_result_file;
WorkerFile m_reject_file;
WorkerFile m_rowmap_file;
WorkerFile m_stopt_file;
WorkerFile m_stats_file;
};
struct DiagWorker : Worker {
DiagWorker(Team& team, uint n);
virtual ~DiagWorker();
virtual void do_init();
virtual void do_run();
virtual void do_end();
void write_result();
void write_reject();
void write_rowmap();
void write_stopt();
void write_stats();
Buf m_result_buf;
Buf m_reject_buf;
Buf m_rowmap_buf;
Buf m_stopt_buf;
Buf m_stats_buf;
CsvOutput* m_result_csv;
CsvOutput* m_reject_csv;
CsvOutput* m_rowmap_csv;
CsvOutput* m_stopt_csv;
CsvOutput* m_stats_csv;
};
// global
struct Jobs {
Jobs();
~Jobs();
std::map<uint, Job*> m_jobs;
uint m_jobno; // next job number (forever increasing)
};
Jobs c_jobs;
Job* create_job();
Job* find_job(uint jobno);
void start_job(Job* job);
void stop_job(Job* job);
void wait_job(Job* job);
void destroy_job(Job* job);
};
NdbOut& operator<<(NdbOut& out, const NdbImportImpl& impl);
NdbOut& operator<<(NdbOut& out, const NdbImportImpl::Mgm& mgm);
NdbOut& operator<<(NdbOut& out, const NdbImportImpl::Job& job);
NdbOut& operator<<(NdbOut& out, const NdbImportImpl::Team& team);
NdbOut& operator<<(NdbOut& out, const NdbImportImpl::Worker& w);
#endif