971 lines
29 KiB
C++
971 lines
29 KiB
C++
/*
|
|
Copyright (c) 2018, 2019, Oracle and/or its affiliates. All rights reserved.
|
|
|
|
This program is free software; you can redistribute it and/or modify
|
|
it under the terms of the GNU General Public License, version 2.0,
|
|
as published by the Free Software Foundation.
|
|
|
|
This program is also distributed with certain software (including
|
|
but not limited to OpenSSL) that is licensed under separate terms,
|
|
as designated in a particular file or component or in included license
|
|
documentation. The authors of MySQL hereby grant you an additional
|
|
permission to link the program and your derivative works with the
|
|
separately licensed software that they have included with MySQL.
|
|
|
|
This program is distributed in the hope that it will be useful,
|
|
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
GNU General Public License, version 2.0, for more details.
|
|
|
|
You should have received a copy of the GNU General Public License
|
|
along with this program; if not, write to the Free Software
|
|
Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
|
|
*/
|
|
|
|
#include <ndb_global.h>
|
|
#include <ndb_opts.h>
|
|
#include <OutputStream.hpp>
|
|
#include <NdbOut.hpp>
|
|
#include <ndb_rand.h>
|
|
#include "NdbImport.hpp"
|
|
#include "NdbImportUtil.hpp"
|
|
// STL
|
|
#include <string>
|
|
|
|
#include "my_alloc.h"
|
|
|
|
typedef NdbImport::OptCsv OptCsv;
|
|
typedef NdbImport::JobStatus JobStatus;
|
|
typedef NdbImport::JobStats JobStats;
|
|
|
|
static FileOutputStream g_err_out(stderr);
|
|
static NdbOut g_err(g_err_out);
|
|
|
|
#define CHK1(b) \
|
|
if (!(b)) { \
|
|
ret = -1; \
|
|
break; \
|
|
}
|
|
|
|
#define CHK2(b, e) \
|
|
if (!(b)) { \
|
|
g_err << my_progname << ": " << e << endl; \
|
|
ret = -1; \
|
|
break; \
|
|
}
|
|
|
|
#define LOG(x) \
|
|
do { \
|
|
g_err << x << endl; \
|
|
} while (0)
|
|
|
|
// opts
|
|
|
|
static NdbImport::Opt g_opt;
|
|
|
|
static struct my_option
|
|
my_long_options[] =
|
|
{
|
|
NDB_STD_OPTS("ndb_import"),
|
|
{ "connections", NDB_OPT_NOSHORT,
|
|
"Number of cluster connections to create."
|
|
" If option --ndb-nodeid=N is given then this number of consecutive"
|
|
" API nodes starting at N must exist",
|
|
&g_opt.m_connections, &g_opt.m_connections, 0,
|
|
GET_UINT, REQUIRED_ARG, g_opt.m_connections, 0, 0, 0, 0, 0 },
|
|
{ "state-dir", NDB_OPT_NOSHORT,
|
|
"Where to write state files (t1.res etc)."
|
|
" Default is \".\" (currect directory)",
|
|
&g_opt.m_state_dir, &g_opt.m_state_dir, 0,
|
|
GET_STR, REQUIRED_ARG, 0, 0, 0, 0, 0, 0 },
|
|
{ "keep-state", NDB_OPT_NOSHORT,
|
|
"By default state files are removed when the job completes"
|
|
" successfully, except if there were any rejects (within allowed limit)"
|
|
" then *.rej is kept. This option keeps all state files",
|
|
&g_opt.m_keep_state, &g_opt.m_keep_state, 0,
|
|
GET_BOOL, NO_ARG, false, 0, 0, 0, 0, 0 },
|
|
{ "stats", NDB_OPT_NOSHORT,
|
|
"Save performance related options and internal statistics into"
|
|
" additional state files with suffixes .sto and .stt. The files"
|
|
" are kept also on successful completion",
|
|
&g_opt.m_stats, &g_opt.m_stats, 0,
|
|
GET_BOOL, NO_ARG, false, 0, 0, 0, 0, 0 },
|
|
{ "input-type", NDB_OPT_NOSHORT,
|
|
"Input type: csv,random"
|
|
" (random is a test option)",
|
|
&g_opt.m_input_type, &g_opt.m_input_type, 0,
|
|
GET_STR, REQUIRED_ARG, 0, 0, 0, 0, 0, 0 },
|
|
{ "input-workers", NDB_OPT_NOSHORT,
|
|
"Number of threads processing input",
|
|
&g_opt.m_input_workers, &g_opt.m_input_workers, 0,
|
|
GET_UINT, REQUIRED_ARG, g_opt.m_input_workers, 0, 0, 0, 0, 0 },
|
|
{ "output-type", NDB_OPT_NOSHORT,
|
|
"Output type: ndb,null"
|
|
" (null is a test option)",
|
|
&g_opt.m_output_type, &g_opt.m_output_type, 0,
|
|
GET_STR, REQUIRED_ARG, 0, 0, 0, 0, 0, 0 },
|
|
{ "output-workers", NDB_OPT_NOSHORT,
|
|
"Number of threads processing output or relaying db ops",
|
|
&g_opt.m_output_workers, &g_opt.m_output_workers, 0,
|
|
GET_UINT, REQUIRED_ARG, g_opt.m_output_workers, 0, 0, 0, 0, 0 },
|
|
{ "db-workers", NDB_OPT_NOSHORT,
|
|
"Number of threads PER datanode executing db ops",
|
|
&g_opt.m_db_workers, &g_opt.m_db_workers, 0,
|
|
GET_UINT, REQUIRED_ARG, g_opt.m_db_workers, 0, 0, 0, 0, 0 },
|
|
{ "ignore-lines", NDB_OPT_NOSHORT,
|
|
"Ignore given number of initial lines in input file."
|
|
" Used to skip a non-data header."
|
|
" To continue with an aborted job use --resume instead",
|
|
&g_opt.m_ignore_lines, &g_opt.m_ignore_lines, 0,
|
|
GET_UINT, REQUIRED_ARG, g_opt.m_ignore_lines, 0, 0, 0, 0, 0 },
|
|
{ "max-rows", NDB_OPT_NOSHORT,
|
|
"Limit number of rows proccessed."
|
|
" Mainly a test option. Default 0 means no limit."
|
|
" More rows may be processed",
|
|
&g_opt.m_max_rows, &g_opt.m_max_rows, 0,
|
|
GET_UINT, REQUIRED_ARG, g_opt.m_max_rows, 0, 0, 0, 0, 0 },
|
|
{ "continue", NDB_OPT_NOSHORT,
|
|
"If one job (e.g. CSV import) fails, continue to next job",
|
|
&g_opt.m_continue, &g_opt.m_continue, 0,
|
|
GET_BOOL, NO_ARG, false, 0, 0, 0, 0, 0 },
|
|
{ "resume", NDB_OPT_NOSHORT,
|
|
"If the job(s) are aborted due to e.g. too many rejects or"
|
|
" too many temporary NDB errors or user interrupt,"
|
|
" add this option to try to resume with rows not yet processed",
|
|
&g_opt.m_resume, &g_opt.m_resume, 0,
|
|
GET_BOOL, NO_ARG, false, 0, 0, 0, 0, 0 },
|
|
{ "monitor", NDB_OPT_NOSHORT,
|
|
"Periodically print status of running job if something has changed"
|
|
" (status, rejected rows, temporary errors)."
|
|
" Value 0 disables. Value 1 prints any change seen."
|
|
" Higher values reduce status printing exponentially"
|
|
" up to some pre-defined limit",
|
|
&g_opt.m_monitor, &g_opt.m_monitor, 0,
|
|
GET_UINT, REQUIRED_ARG, g_opt.m_monitor, 0, 0, 0, 0, 0 },
|
|
{ "ai-prefetch-sz", NDB_OPT_NOSHORT,
|
|
"For table with hidden PK, specify number of autoincrement values"
|
|
" that are prefetched. See mysqld",
|
|
&g_opt.m_ai_prefetch_sz, &g_opt.m_ai_prefetch_sz, 0,
|
|
GET_UINT, REQUIRED_ARG, g_opt.m_ai_prefetch_sz, 0, 0, 0, 0, 0 },
|
|
{ "ai-increment", NDB_OPT_NOSHORT,
|
|
"For table with hidden PK, specify autoincrement increment."
|
|
" See mysqld",
|
|
&g_opt.m_ai_increment, &g_opt.m_ai_increment, 0,
|
|
GET_UINT, REQUIRED_ARG, g_opt.m_ai_increment, 0, 0, 0, 0, 0 },
|
|
{ "ai-offset", NDB_OPT_NOSHORT,
|
|
"For table with hidden PK, specify autoincrement offset."
|
|
" See mysqld",
|
|
&g_opt.m_ai_offset, &g_opt.m_ai_offset, 0,
|
|
GET_UINT, REQUIRED_ARG, g_opt.m_ai_offset, 0, 0, 0, 0, 0 },
|
|
{ "no-asynch", NDB_OPT_NOSHORT,
|
|
"Run db ops as batches under single trans."
|
|
" Used for performance comparison and does not support all features"
|
|
" e.g. detecting individual rejected rows",
|
|
&g_opt.m_no_asynch, &g_opt.m_no_asynch, 0,
|
|
GET_BOOL, NO_ARG, false, 0, 0, 0, 0, 0 },
|
|
{ "no-hint", NDB_OPT_NOSHORT,
|
|
"Do not use distribution key hint to select db node (TC)",
|
|
&g_opt.m_no_hint, &g_opt.m_no_hint, 0,
|
|
GET_BOOL, NO_ARG, false, 0, 0, 0, 0, 0 },
|
|
{ "pagesize", NDB_OPT_NOSHORT,
|
|
"Align I/O buffers to given size",
|
|
&g_opt.m_pagesize, &g_opt.m_pagesize, 0,
|
|
GET_UINT, REQUIRED_ARG, g_opt.m_pagesize, 0, 0, 0, 0, 0 },
|
|
{ "pagecnt", NDB_OPT_NOSHORT,
|
|
"Size of I/O buffers as multiple of pagesize."
|
|
" CSV input worker allocates a double sized buffer",
|
|
&g_opt.m_pagecnt, &g_opt.m_pagecnt, 0,
|
|
GET_UINT, REQUIRED_ARG, g_opt.m_pagecnt, 0, 0, 0, 0, 0 },
|
|
{ "pagebuffer", NDB_OPT_NOSHORT,
|
|
"Size of I/O buffers in bytes. Rounded up to pagesize and"
|
|
" overrides pagecnt",
|
|
&g_opt.m_pagebuffer, &g_opt.m_pagebuffer, 0,
|
|
GET_UINT, REQUIRED_ARG, g_opt.m_pagebuffer, 0, 0, 0, 0, 0 },
|
|
{ "rowbatch", NDB_OPT_NOSHORT,
|
|
"Limit rows in row queues (0 no limit)",
|
|
&g_opt.m_rowbatch, &g_opt.m_rowbatch, 0,
|
|
GET_UINT, REQUIRED_ARG, 0, 0, 0, 0, 0, 0 },
|
|
{ "rowbytes", NDB_OPT_NOSHORT,
|
|
"Limit bytes in row queues (0 no limit)",
|
|
&g_opt.m_rowbytes, &g_opt.m_rowbytes, 0,
|
|
GET_UINT, REQUIRED_ARG, g_opt.m_rowbytes, 0, 0, 0, 0, 0 },
|
|
{ "opbatch", NDB_OPT_NOSHORT,
|
|
"A db execution batch is a set of transactions and operations"
|
|
" sent to NDB kernel."
|
|
" This option limits NDB operations (including blob operations)"
|
|
" in a db execution batch. Therefore it also limits number"
|
|
" of asynch transactions."
|
|
" Value 0 is not valid",
|
|
&g_opt.m_opbatch, &g_opt.m_opbatch, 0,
|
|
GET_UINT, REQUIRED_ARG, g_opt.m_opbatch, 0, 0, 0, 0, 0 },
|
|
{ "opbytes", NDB_OPT_NOSHORT,
|
|
"Limit bytes in db execution batch (0 no limit)",
|
|
&g_opt.m_opbytes, &g_opt.m_opbytes, 0,
|
|
GET_UINT, REQUIRED_ARG, 0, 0, 0, 0, 0, 0 },
|
|
{ "polltimeout", NDB_OPT_NOSHORT,
|
|
"Millisecond timeout in one poll for completed asynch transactions."
|
|
" Polls continue until all are completed or an error has occurred",
|
|
&g_opt.m_polltimeout, &g_opt.m_polltimeout, 0,
|
|
GET_UINT, REQUIRED_ARG, g_opt.m_polltimeout, 0, 0, 0, 0, 0 },
|
|
{ "temperrors", NDB_OPT_NOSHORT,
|
|
"Limit temporary NDB errors. Default is 0 which means that any"
|
|
" temporary error is fatal."
|
|
" The errors are counted per db execution batch, not per individual"
|
|
" operations, and do not cause rows to be rejected",
|
|
&g_opt.m_temperrors, &g_opt.m_temperrors, 0,
|
|
GET_UINT, REQUIRED_ARG, g_opt.m_temperrors, 0, 0, 0, 0, 0 },
|
|
{ "tempdelay", NDB_OPT_NOSHORT,
|
|
"Number of milliseconds to sleep between temporary errors",
|
|
&g_opt.m_tempdelay, &g_opt.m_tempdelay, 0,
|
|
GET_UINT, REQUIRED_ARG, g_opt.m_tempdelay, 0, 0, 0, 0, 0 },
|
|
{ "rowswait", NDB_OPT_NOSHORT,
|
|
"Number of milliseconds a worker waits for a signal that new rows"
|
|
" can be processed",
|
|
&g_opt.m_rowswait, &g_opt.m_rowswait, 0,
|
|
GET_UINT, REQUIRED_ARG, g_opt.m_rowswait, 0, 0, 0, 0, 0 },
|
|
{ "idlespin", NDB_OPT_NOSHORT,
|
|
"Number of times to re-try before idlesleep",
|
|
&g_opt.m_idlespin, &g_opt.m_idlespin, 0,
|
|
GET_UINT, REQUIRED_ARG, g_opt.m_idlespin, 0, 0, 0, 0, 0 },
|
|
{ "idlesleep", NDB_OPT_NOSHORT,
|
|
"Number of milliseconds to sleep waiting for more to do."
|
|
" Cause can be row queues stall (see --rowswait) or"
|
|
" e.g. passing control between CSV workers",
|
|
&g_opt.m_idlesleep, &g_opt.m_idlesleep, 0,
|
|
GET_UINT, REQUIRED_ARG, g_opt.m_idlesleep, 0, 0, 0, 0, 0 },
|
|
{ "checkloop", NDB_OPT_NOSHORT,
|
|
"A job and its diagnostics team periodically check for"
|
|
" progress from lower levels. This option gives number of"
|
|
" milliseconds to wait between such checks."
|
|
" High values may cause data structures (rowmaps) to grow too much."
|
|
" Low values may interfere too much with the workers",
|
|
&g_opt.m_checkloop, &g_opt.m_checkloop, 0,
|
|
GET_UINT, REQUIRED_ARG, g_opt.m_checkloop, 0, 0, 0, 0, 0 },
|
|
{ "alloc-chunk", NDB_OPT_NOSHORT,
|
|
"Number of free rows to alloc (seize) at a time."
|
|
" Higher values reduce mutexing but also may reduce parallelism",
|
|
&g_opt.m_alloc_chunk, &g_opt.m_alloc_chunk, 0,
|
|
GET_UINT, REQUIRED_ARG, g_opt.m_alloc_chunk, 0, 0, 0, 0, 0 },
|
|
{ "rejects", NDB_OPT_NOSHORT,
|
|
"Limit number of rejected rows (rows with permanent error) in data load."
|
|
" Default is 0 which means that any rejected row causes a fatal error."
|
|
" The row(s) exceeding the limit are also added to *.rej."
|
|
" The limit is per current run (not all --resume'd runs)",
|
|
&g_opt.m_rejects, &g_opt.m_rejects, 0,
|
|
GET_UINT, REQUIRED_ARG, g_opt.m_rejects, 0, 0, 0, 0, 0 },
|
|
{ "fields-terminated-by", NDB_OPT_NOSHORT,
|
|
"See MySQL LOAD DATA."
|
|
" This and other CSV controls are scanned for escapes"
|
|
" including \\\\,\\n,\\r,\\t",
|
|
&g_opt.m_optcsv.m_fields_terminated_by,
|
|
&g_opt.m_optcsv.m_fields_terminated_by, 0,
|
|
GET_STR, REQUIRED_ARG, 0, 0, 0, 0, 0, 0 },
|
|
{ "fields-enclosed-by", NDB_OPT_NOSHORT,
|
|
"See MySQL LOAD DATA."
|
|
" For CSV input this is same as --fields-optionally-enclosed-by",
|
|
&g_opt.m_optcsv.m_fields_enclosed_by,
|
|
&g_opt.m_optcsv.m_fields_enclosed_by, 0,
|
|
GET_STR, REQUIRED_ARG, 0, 0, 0, 0, 0, 0 },
|
|
{ "fields-optionally-enclosed-by", NDB_OPT_NOSHORT,
|
|
"See MySQL LOAD DATA",
|
|
&g_opt.m_optcsv.m_fields_optionally_enclosed_by,
|
|
&g_opt.m_optcsv.m_fields_optionally_enclosed_by, 0,
|
|
GET_STR, REQUIRED_ARG, 0, 0, 0, 0, 0, 0 },
|
|
{ "fields-escaped-by", NDB_OPT_NOSHORT,
|
|
"See MySQL LOAD DATA",
|
|
&g_opt.m_optcsv.m_fields_escaped_by,
|
|
&g_opt.m_optcsv.m_fields_escaped_by, 0,
|
|
GET_STR, REQUIRED_ARG, 0, 0, 0, 0, 0, 0 },
|
|
{ "lines-terminated-by", NDB_OPT_NOSHORT,
|
|
"See MySQL LOAD DATA but note that default is"
|
|
" \\n for unix and \\r\\n for windows",
|
|
&g_opt.m_optcsv.m_lines_terminated_by,
|
|
&g_opt.m_optcsv.m_lines_terminated_by, 0,
|
|
GET_STR, REQUIRED_ARG, 0, 0, 0, 0, 0, 0 },
|
|
{ "csvopt", NDB_OPT_NOSHORT,
|
|
"Set some typical CSV options."
|
|
" Useful for environments where command line quoting and escaping is hard."
|
|
" Argument is a string of letters:"
|
|
" d-defaults for the OS type"
|
|
" c-fields terminated by real comma (,)"
|
|
" q-fields optionally enclosed by double quotes (\")"
|
|
" n-lines terminated by \\n"
|
|
" r-lines terminated by \\r\\n",
|
|
&g_opt.m_csvopt, &g_opt.m_csvopt, 0,
|
|
GET_STR, REQUIRED_ARG, 0, 0, 0, 0, 0, 0 },
|
|
{ "log-level", NDB_OPT_NOSHORT,
|
|
"Print internal log at given level (0-2 or 0-4 if debug compiled)."
|
|
" Like --debug, this option is for developers",
|
|
&g_opt.m_log_level, &g_opt.m_log_level, 0,
|
|
GET_UINT, REQUIRED_ARG, 0, 0, 0, 0, 0, 0 },
|
|
{ "abort-on-error", NDB_OPT_NOSHORT,
|
|
"Dump core on any error, debug option",
|
|
&g_opt.m_abort_on_error, &g_opt.m_abort_on_error, 0,
|
|
GET_BOOL, NO_ARG, false, 0, 0, 0, 0, 0 },
|
|
{ "errins-type", NDB_OPT_NOSHORT,
|
|
"Error insert type (test option, give \"list\" to list)",
|
|
&g_opt.m_errins_type, &g_opt.m_errins_type, 0,
|
|
GET_STR, REQUIRED_ARG, 0, 0, 0, 0, 0, 0 },
|
|
{ "errins-delay", NDB_OPT_NOSHORT,
|
|
"Error insert delay in milliseconds (random variation added)",
|
|
&g_opt.m_errins_delay, &g_opt.m_errins_delay, 0,
|
|
GET_UINT, REQUIRED_ARG, g_opt.m_errins_delay, 0, 0, 0, 0, 0 },
|
|
{ 0, 0,
|
|
0,
|
|
0, 0, 0,
|
|
GET_NO_ARG, NO_ARG, 0, 0, 0, 0, 0, 0 }
|
|
};
|
|
|
|
static void
|
|
short_usage_sub(void)
|
|
{
|
|
ndb_short_usage_sub("database textfile...");
|
|
printf(
|
|
"\n"
|
|
"Arguments give database and files of CSV table data.\n"
|
|
"The basename of each file specifies the table name.\n"
|
|
"E.g. %s test foo/t1.csv foo/t2.csv loads tables\n"
|
|
"test.t1 test.t2.\n"
|
|
"\n"
|
|
"For each job (load of one table), results, rejected rows,\n"
|
|
"and processed row ranges are written to \"state files\" with\n"
|
|
"suffixes .res, .rej, and .map. By default these are removed\n"
|
|
"when the job completes successfully with no rejects.\n"
|
|
"See options --state-dir and --keep-state.\n"
|
|
"\n"
|
|
"Windows notes: File paths are shown with forward slash (/).\n"
|
|
"Default line-terminator is \\r\\n.\n"
|
|
"Keyboard interrupt is not implemented.\n"
|
|
"\n",
|
|
my_progname);
|
|
}
|
|
|
|
static void
|
|
usage()
|
|
{
|
|
printf("%s: load data from files to tables\n", my_progname);
|
|
}
|
|
|
|
// check opts and args
|
|
|
|
// opts
|
|
static std::string g_state_dir;
|
|
|
|
// args
|
|
struct TableArg {
|
|
std::string m_table;
|
|
std::string m_input_file;
|
|
std::string m_result_file;
|
|
std::string m_reject_file;
|
|
std::string m_rowmap_file;
|
|
std::string m_stopt_file;
|
|
std::string m_stats_file;
|
|
};
|
|
|
|
static TableArg* g_tablearg = 0;
|
|
static uint g_tablecnt = 0;
|
|
|
|
const char* g_reserved_extension[] = {
|
|
".res",
|
|
".rej",
|
|
".map",
|
|
".sto",
|
|
".stt",
|
|
0
|
|
};
|
|
|
|
/*
|
|
* File I/O functions in the Windows API convert "/" to "\" (says
|
|
* MicroSoft). MTR uses "/" and converts it to "\" when needed.
|
|
* It does not recognize paths in ndb_import options and arguments.
|
|
* We solve the mess by converting all "\" to "/". Messages will
|
|
* show the converted paths, fix later if it matters.
|
|
*/
|
|
static void
|
|
convertpath(std::string& str)
|
|
{
|
|
#ifdef _WIN32
|
|
// std::replace() exists but following is more clear
|
|
for (uint i = 0; i < (uint)str.size(); i++)
|
|
{
|
|
if (str[i] == '\\')
|
|
str[i] = '/';
|
|
}
|
|
#endif
|
|
}
|
|
|
|
static int
|
|
checkarg(TableArg& arg, const char* str)
|
|
{
|
|
int ret = 0;
|
|
do
|
|
{
|
|
std::string full = str; // foo/t1.bar.csv
|
|
convertpath(full);
|
|
std::string base = full; // t1.bar.csv
|
|
std::size_t slash = full.rfind("/");
|
|
if (slash != std::string::npos)
|
|
{
|
|
base = full.substr(slash + 1);
|
|
}
|
|
for (const char** p = g_reserved_extension; *p != 0; p++)
|
|
{
|
|
std::size_t pos = base.rfind(*p);
|
|
if (pos != std::string::npos &&
|
|
pos + strlen(*p) == base.length())
|
|
{
|
|
CHK2(false, full.c_str() << ": has reserved suffix: " << *p);
|
|
}
|
|
}
|
|
CHK1(ret == 0);
|
|
std::string stem = base; // t1.bar
|
|
std::size_t rdot = base.rfind(".");
|
|
if (rdot != std::string::npos)
|
|
{
|
|
stem = base.substr(0, rdot);
|
|
}
|
|
std::string table = stem; // t1
|
|
std::size_t ldot = stem.find(".");
|
|
if (ldot != std::string::npos)
|
|
{
|
|
table = stem.substr(0, ldot);
|
|
}
|
|
arg.m_table = table;
|
|
arg.m_input_file = full;
|
|
std::string path = "";
|
|
if (strcmp(g_opt.m_state_dir, ".") != 0)
|
|
{
|
|
path += g_opt.m_state_dir;
|
|
path += "/";
|
|
}
|
|
arg.m_result_file = path + stem + ".res";
|
|
arg.m_reject_file = path + stem + ".rej";
|
|
arg.m_rowmap_file = path + stem + ".map";
|
|
arg.m_stopt_file = path + stem + ".sto";
|
|
arg.m_stats_file = path + stem + ".stt";
|
|
} while (0);
|
|
return ret;
|
|
}
|
|
|
|
static int checkerrins();
|
|
|
|
static int
|
|
checkcsvopt()
|
|
{
|
|
int ret = 0;
|
|
for (const char* p = g_opt.m_csvopt; *p != 0; p++)
|
|
{
|
|
switch (*p) {
|
|
case 'd':
|
|
new (&g_opt.m_optcsv) OptCsv;
|
|
break;
|
|
case 'c':
|
|
g_opt.m_optcsv.m_fields_terminated_by = ",";
|
|
break;
|
|
case 'q':
|
|
g_opt.m_optcsv.m_fields_optionally_enclosed_by = "\"";
|
|
break;
|
|
case 'n':
|
|
g_opt.m_optcsv.m_lines_terminated_by = "\\n";
|
|
break;
|
|
case 'r':
|
|
g_opt.m_optcsv.m_lines_terminated_by = "\\r\\n";
|
|
break;
|
|
default:
|
|
{
|
|
char tmp[2];
|
|
sprintf(tmp, "%c", *p);
|
|
CHK2(false, "m_csvopt: undefined option: " << tmp);
|
|
}
|
|
break;
|
|
}
|
|
CHK1(ret == 0);
|
|
}
|
|
return ret;
|
|
}
|
|
|
|
static int
|
|
checkopts(int argc, char** argv)
|
|
{
|
|
int ret = 0;
|
|
do
|
|
{
|
|
CHK1(checkerrins() == 0);
|
|
if (g_opt.m_csvopt != 0)
|
|
CHK1(checkcsvopt() == 0);
|
|
g_state_dir = g_opt.m_state_dir;
|
|
convertpath(g_state_dir);
|
|
g_opt.m_state_dir = g_state_dir.c_str();
|
|
CHK2(argc >= 1, "database argument is required, use --help for help");
|
|
g_opt.m_database = argv[0];
|
|
argc--;
|
|
argv++;
|
|
g_tablecnt = argc;
|
|
g_tablearg = new TableArg [g_tablecnt];
|
|
for (uint i = 0; i < g_tablecnt; i++)
|
|
{
|
|
CHK1(checkarg(g_tablearg[i], argv[i]) == 0);
|
|
}
|
|
} while (0);
|
|
return ret;
|
|
}
|
|
|
|
// signal handlers
|
|
|
|
#ifndef _WIN32
|
|
|
|
static void
|
|
sighandler(int sig)
|
|
{
|
|
const char* signame = "unexpected";
|
|
switch (sig) {
|
|
case SIGHUP:
|
|
signame = "hangup";
|
|
break;
|
|
case SIGINT:
|
|
signame = "interrupt";
|
|
break;
|
|
}
|
|
LOG(my_progname << ": caught signal " << sig << " (" << signame << ")");
|
|
LOG(my_progname << ": please wait for any jobs to stop gracefully");
|
|
NdbImport::set_stop_all();
|
|
}
|
|
|
|
static void
|
|
setsighandler(bool on)
|
|
{
|
|
struct sigaction sa;
|
|
if (on)
|
|
sa.sa_handler = sighandler;
|
|
else
|
|
sa.sa_handler = SIG_DFL;
|
|
sigemptyset(&sa.sa_mask);
|
|
sigprocmask(SIG_SETMASK, &sa.sa_mask, NULL);
|
|
sa.sa_flags = SA_RESETHAND;
|
|
sigaction(SIGHUP, &sa, NULL);
|
|
sigaction(SIGINT, &sa, NULL);
|
|
}
|
|
|
|
#else
|
|
|
|
// TODO
|
|
|
|
static void
|
|
setsighandler(bool on)
|
|
{
|
|
}
|
|
|
|
#endif
|
|
|
|
// error insert
|
|
|
|
struct Errins {
|
|
const char* m_type;
|
|
const char* m_desc;
|
|
};
|
|
|
|
static const Errins
|
|
g_errins_list[] = {
|
|
{ "stopjob", "stop current job (synchronous)" },
|
|
{ "stopall", "stop all jobs" },
|
|
{ "sighup", "trigger stopall via SIGHUP" },
|
|
{ "sigint", "trigger stopall via SIGINT" },
|
|
{ 0, 0 }
|
|
};
|
|
|
|
static bool
|
|
listerrins()
|
|
{
|
|
const char* type = g_opt.m_errins_type;
|
|
if (type == 0 || strcmp(type, "list") != 0)
|
|
return false;
|
|
LOG(my_progname << ": only listing error inserts");
|
|
const Errins* list = g_errins_list;
|
|
for (uint i = 0; list[i].m_type != 0; i++)
|
|
{
|
|
LOG(list[i].m_type << " - " << list[i].m_desc);
|
|
}
|
|
return true;
|
|
}
|
|
|
|
static int
|
|
checkerrins()
|
|
{
|
|
int ret = 0;
|
|
do
|
|
{
|
|
const char* type = g_opt.m_errins_type;
|
|
if (type == 0)
|
|
break;
|
|
const Errins* list = g_errins_list;
|
|
bool found = false;
|
|
for (uint i = 0; list[i].m_type != 0; i++)
|
|
{
|
|
if (strcmp(list[i].m_type, type) == 0)
|
|
{
|
|
found = true;
|
|
break;
|
|
}
|
|
}
|
|
CHK2(found, "undefined error insert: " << type);
|
|
} while (0);
|
|
return ret;
|
|
}
|
|
|
|
static void
|
|
doerrinsstop(NdbImport::Job& job)
|
|
{
|
|
const char* type = g_opt.m_errins_type;
|
|
uint delay = g_opt.m_errins_delay;
|
|
uint ms = delay / 2 + ndb_rand() % (delay + 1);
|
|
if (type == 0)
|
|
return;
|
|
if (strcmp(type, "stopjob") != 0)
|
|
return;
|
|
NdbSleep_MilliSleep(ms);
|
|
job.do_stop();
|
|
}
|
|
|
|
static NdbThread* g_errins_thread = 0;
|
|
|
|
extern "C" { static void* doerrins_c(void* data); }
|
|
|
|
static void*
|
|
doerrins_c(void* data)
|
|
{
|
|
const char* type = g_opt.m_errins_type;
|
|
uint delay = g_opt.m_errins_delay;
|
|
uint ms = delay / 2 + ndb_rand() % (delay + 1);
|
|
NdbSleep_MilliSleep(ms);
|
|
if (strcmp(type, "stopall") == 0)
|
|
{
|
|
NdbImport::set_stop_all();
|
|
return 0;
|
|
}
|
|
#ifndef _WIN32
|
|
int pid = NdbHost_GetProcessId();
|
|
int sig = 0;
|
|
if (strcmp(type, "sighup") == 0)
|
|
sig = SIGHUP;
|
|
if (strcmp(type, "sigint") == 0)
|
|
sig = SIGINT;
|
|
require(sig != 0);
|
|
::kill(pid, sig);
|
|
#else
|
|
// TODO
|
|
#endif
|
|
return 0;
|
|
}
|
|
|
|
static void
|
|
doerrins()
|
|
{
|
|
const char* type = g_opt.m_errins_type;
|
|
if (type == 0)
|
|
return;
|
|
if (strcmp(type, "stopjob") == 0)
|
|
return;
|
|
NDB_THREAD_PRIO prio = NDB_THREAD_PRIO_MEAN;
|
|
uint stack_size = 64*1024;
|
|
g_errins_thread = NdbThread_Create(
|
|
doerrins_c, (void**)0, stack_size, "errins", prio);
|
|
require(g_errins_thread != 0);
|
|
}
|
|
|
|
static void
|
|
clearerrins()
|
|
{
|
|
if (g_errins_thread == 0)
|
|
return;
|
|
NdbThread_WaitFor(g_errins_thread, (void**)0);
|
|
NdbThread_Destroy(&g_errins_thread);
|
|
}
|
|
|
|
// main program
|
|
|
|
static const uint g_rep_status = (1 << 0);
|
|
static const uint g_rep_resume = (1 << 1);
|
|
static const uint g_rep_stats = (1 << 2);
|
|
static const uint g_rep_reject = (1 << 3);
|
|
static const uint g_rep_temperrors = (1 << 4);
|
|
static const uint g_rep_errortexts = (1 << 5);
|
|
|
|
// call job.get_status() first
|
|
static void
|
|
doreport(NdbImport::Job& job, uint flags)
|
|
{
|
|
require(job.m_status != JobStatus::Status_null);
|
|
char jobname[100];
|
|
sprintf(jobname, "job-%u", job.m_jobno);
|
|
const uint runno = job.m_runno;
|
|
char str_status[100] = "";
|
|
if (flags & g_rep_status)
|
|
{
|
|
// including status only if job has been started
|
|
sprintf(str_status, " [%s]", job.m_str_status);
|
|
}
|
|
if (runno == 0 || !(flags & g_rep_resume))
|
|
LOG(jobname << str_status <<
|
|
" import " << g_opt.m_database << "." << g_opt.m_table <<
|
|
" from " << g_opt.m_input_file);
|
|
else
|
|
LOG(jobname << str_status <<
|
|
" import " << g_opt.m_database << "." << g_opt.m_table <<
|
|
" from " << g_opt.m_input_file <<
|
|
" (resume #" << runno << ")");
|
|
const JobStats& stats = job.m_stats;
|
|
const uint64 rows = stats.m_rows;
|
|
const uint64 reject = stats.m_reject;
|
|
const uint64 new_rows = stats.m_new_rows;
|
|
const uint64 new_reject = stats.m_new_reject;
|
|
const uint temperrors = stats.m_temperrors;
|
|
const std::map<uint, uint>& errormap = stats.m_errormap;
|
|
const uint64 runtime = stats.m_runtime;
|
|
const uint64 rowssec = stats.m_rowssec;
|
|
if (flags & g_rep_stats)
|
|
{
|
|
char runtimestr[100];
|
|
NdbImportUtil::fmt_msec_to_hhmmss(runtimestr, runtime);
|
|
if (runno == 0)
|
|
LOG(jobname << " imported " << rows << " rows" <<
|
|
" in " << runtimestr <<
|
|
" at " << rowssec << " rows/s");
|
|
else
|
|
LOG(jobname << " imported " << rows << " rows" <<
|
|
" (new " << new_rows << ")" <<
|
|
" in " << runtimestr <<
|
|
" at " << rowssec << " rows/s");
|
|
}
|
|
if ((flags & g_rep_reject) &&
|
|
reject != 0)
|
|
{
|
|
if (runno == 0)
|
|
LOG(jobname << " rejected " << reject << " rows" <<
|
|
" (limit " << g_opt.m_rejects << ")," <<
|
|
" see " << g_opt.m_reject_file);
|
|
else
|
|
LOG(jobname << " rejected " << reject << " rows" <<
|
|
" (new " << new_reject << " limit " << g_opt.m_rejects << ")," <<
|
|
" see " << g_opt.m_reject_file);
|
|
}
|
|
if ((flags & g_rep_temperrors) &&
|
|
temperrors != 0)
|
|
{
|
|
std::string list;
|
|
std::map<uint, uint>::const_iterator it;
|
|
for (it = errormap.begin(); it != errormap.end(); it++)
|
|
{
|
|
char buf[100];
|
|
// showing count(code)
|
|
sprintf(buf, " %u(%u)", it->second, it->first);
|
|
list += buf;
|
|
}
|
|
LOG(jobname << " temporary errors " << temperrors <<
|
|
list.c_str() << " (limit " << g_opt.m_temperrors << ")");
|
|
}
|
|
if ((flags & g_rep_errortexts) &&
|
|
temperrors != 0)
|
|
{
|
|
std::map<uint, uint>::const_iterator it;
|
|
for (it = errormap.begin(); it != errormap.end(); it++)
|
|
{
|
|
char buf[100];
|
|
sprintf(buf, " %u(%u)", it->second, it->first);
|
|
ndberror_struct error;
|
|
error.code = it->first;
|
|
ndberror_update(&error);
|
|
LOG(jobname << buf << ": " << error.message);
|
|
}
|
|
}
|
|
if (job.m_status == JobStatus::Status_error ||
|
|
job.m_status == JobStatus::Status_fatal)
|
|
{
|
|
LOG(jobname << " " << job.get_error());
|
|
for (uint i = 0; i < job.m_teamcnt; i++)
|
|
{
|
|
NdbImport::Team* team = job.m_teams[i];
|
|
char teamname[100];
|
|
sprintf(teamname, "%u-%s", team->m_teamno, team->get_name());
|
|
if (team->has_error())
|
|
LOG(jobname << " " << teamname << " " << team->get_error());
|
|
}
|
|
}
|
|
}
|
|
|
|
static uint
|
|
getweigth(const JobStats& stats)
|
|
{
|
|
return stats.m_new_reject + stats.m_temperrors;
|
|
}
|
|
|
|
static void
|
|
domonitor(NdbImport::Job& job)
|
|
{
|
|
job.get_status();
|
|
JobStatus::Status old_status = job.m_status;
|
|
JobStats old_stats = job.m_stats;
|
|
const uint maxtreshold = 1000;
|
|
uint treshold = 1;
|
|
while (1)
|
|
{
|
|
NdbSleep_MilliSleep(g_opt.m_checkloop);
|
|
job.get_status();
|
|
JobStatus::Status status = job.m_status;
|
|
JobStats stats = job.m_stats;
|
|
if (job.has_error())
|
|
break;
|
|
if (status == JobStatus::Status_success ||
|
|
status == JobStatus::Status_error ||
|
|
status == JobStatus::Status_fatal)
|
|
break;
|
|
bool report =
|
|
status != old_status ||
|
|
getweigth(stats) >= getweigth(old_stats) + treshold;
|
|
if (report)
|
|
{
|
|
uint flags = g_rep_status |
|
|
(stats.m_new_reject ? g_rep_reject : 0) |
|
|
g_rep_temperrors |
|
|
g_rep_errortexts;
|
|
doreport(job, flags);
|
|
old_status = status;
|
|
old_stats = stats;
|
|
treshold *= g_opt.m_monitor;
|
|
if (treshold > maxtreshold)
|
|
treshold = maxtreshold;
|
|
}
|
|
}
|
|
}
|
|
|
|
static void
|
|
removefile(const char* path)
|
|
{
|
|
if (unlink(path) == -1)
|
|
{
|
|
LOG(my_progname << ": remove " << path << " failed: " << strerror(errno));
|
|
}
|
|
}
|
|
|
|
static int
|
|
doimp()
|
|
{
|
|
int ret = 0;
|
|
uint jobs_defined = g_tablecnt;
|
|
uint jobs_run = 0;
|
|
uint jobs_fail = 0;
|
|
NdbImport imp;
|
|
if (g_tablecnt == 0)
|
|
{
|
|
LOG("note: no files to import specified");
|
|
}
|
|
do
|
|
{
|
|
CHK2(imp.set_opt(g_opt) == 0, "invalid options: " << imp.get_error());
|
|
CHK2(imp.do_connect() == 0, "connect to NDB failed: " << imp.get_error());
|
|
for (uint i = 0; i < g_tablecnt; i++)
|
|
{
|
|
const TableArg& arg = g_tablearg[i];
|
|
// no parallel jobs yet so can use global g_opt
|
|
g_opt.m_table = arg.m_table.c_str();
|
|
g_opt.m_input_file = arg.m_input_file.c_str();
|
|
g_opt.m_result_file = arg.m_result_file.c_str();
|
|
g_opt.m_reject_file = arg.m_reject_file.c_str();
|
|
g_opt.m_rowmap_file = arg.m_rowmap_file.c_str();
|
|
g_opt.m_stopt_file = arg.m_stopt_file.c_str();
|
|
g_opt.m_stats_file = arg.m_stats_file.c_str();
|
|
CHK2(imp.set_opt(g_opt) == 0, "invalid options: "<< imp.get_error());
|
|
NdbImport::Job job(imp);
|
|
do
|
|
{
|
|
job.do_create();
|
|
job.get_status();
|
|
doreport(job, 0);
|
|
uint tabid;
|
|
if (job.add_table(g_opt.m_database, g_opt.m_table, tabid) == -1)
|
|
break;
|
|
job.set_table(tabid);
|
|
job.do_start();
|
|
doerrinsstop(job);
|
|
if (g_opt.m_monitor != 0)
|
|
domonitor(job);
|
|
job.do_wait();
|
|
if (job.remove_table(tabid) == -1)
|
|
{
|
|
break;
|
|
}
|
|
} while (0);
|
|
bool imp_error = imp.has_error();
|
|
bool job_error = job.has_error();
|
|
job.get_status();
|
|
uint flags = g_rep_status |
|
|
g_rep_resume |
|
|
g_rep_stats |
|
|
g_rep_reject |
|
|
g_rep_temperrors |
|
|
g_rep_errortexts;
|
|
doreport(job, flags);
|
|
if (imp_error)
|
|
{
|
|
LOG("global error: " << imp.get_error());
|
|
}
|
|
if (job.m_status == JobStatus::Status_success &&
|
|
!g_opt.m_keep_state)
|
|
{
|
|
removefile(g_opt.m_result_file);
|
|
if (job.m_stats.m_reject == 0)
|
|
removefile(g_opt.m_reject_file);
|
|
removefile(g_opt.m_rowmap_file);
|
|
}
|
|
job.do_destroy();
|
|
jobs_run++;
|
|
if (imp_error || job_error)
|
|
{
|
|
jobs_fail++;
|
|
ret = -1;
|
|
if (imp_error || !g_opt.m_continue)
|
|
break;
|
|
}
|
|
}
|
|
CHK1(ret == 0);
|
|
} while (0);
|
|
imp.do_disconnect();
|
|
LOG("jobs summary:" <<
|
|
" defined: " << jobs_defined <<
|
|
" run: " << jobs_run <<
|
|
" with success: " << jobs_run - jobs_fail <<
|
|
" with failure: " << jobs_fail);
|
|
return ret;
|
|
}
|
|
|
|
static int
|
|
doall()
|
|
{
|
|
int ret = 0;
|
|
do
|
|
{
|
|
setsighandler(true);
|
|
doerrins();
|
|
CHK1(doimp() == 0);
|
|
} while (0);
|
|
clearerrins();
|
|
setsighandler(false);
|
|
return ret;
|
|
}
|
|
|
|
int
|
|
main(int argc, char** argv)
|
|
{
|
|
NDB_INIT(argv[0]);
|
|
Ndb_opts opts(argc, argv, my_long_options);
|
|
opts.set_usage_funcs(short_usage_sub, usage);
|
|
if (opts.handle_options() != 0)
|
|
return 1;
|
|
if (listerrins())
|
|
return 0;
|
|
if (checkopts(argc, argv) != 0)
|
|
return 1;
|
|
if (doall() != 0)
|
|
return 1;
|
|
return 0;
|
|
}
|