4211 lines
97 KiB
C++
4211 lines
97 KiB
C++
/*
|
|
Copyright (c) 2018, 2019, Oracle and/or its affiliates. All rights reserved.
|
|
|
|
This program is free software; you can redistribute it and/or modify
|
|
it under the terms of the GNU General Public License, version 2.0,
|
|
as published by the Free Software Foundation.
|
|
|
|
This program is also distributed with certain software (including
|
|
but not limited to OpenSSL) that is licensed under separate terms,
|
|
as designated in a particular file or component or in included license
|
|
documentation. The authors of MySQL hereby grant you an additional
|
|
permission to link the program and your derivative works with the
|
|
separately licensed software that they have included with MySQL.
|
|
|
|
This program is distributed in the hope that it will be useful,
|
|
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
GNU General Public License, version 2.0, for more details.
|
|
|
|
You should have received a copy of the GNU General Public License
|
|
along with this program; if not, write to the Free Software
|
|
Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
|
|
*/
|
|
|
|
#include "NdbImportImpl.hpp"
|
|
|
|
#include <inttypes.h>
|
|
|
|
NdbImportImpl::NdbImportImpl(NdbImport& facade) :
|
|
NdbImport(*this),
|
|
m_facade(&facade),
|
|
m_csv(m_util),
|
|
m_error(m_util.c_error)
|
|
{
|
|
c_connectionindex = 0;
|
|
log_debug(1, "ctor");
|
|
}
|
|
|
|
NdbImportImpl::~NdbImportImpl()
|
|
{
|
|
log_debug(1, "dtor");
|
|
}
|
|
|
|
NdbOut&
|
|
operator<<(NdbOut& out, const NdbImportImpl& impl)
|
|
{
|
|
out << "impl ";
|
|
return out;
|
|
}
|
|
|
|
// opt
|
|
|
|
// mgm
|
|
|
|
NdbImportImpl::Mgm::Mgm(NdbImportImpl& impl) :
|
|
m_impl(impl),
|
|
m_util(m_impl.m_util),
|
|
m_error(m_util.c_error)
|
|
{
|
|
m_handle = 0;
|
|
m_connected = false;
|
|
m_status = 0;
|
|
}
|
|
|
|
NdbImportImpl::Mgm::~Mgm()
|
|
{
|
|
do_disconnect();
|
|
}
|
|
|
|
NdbOut&
|
|
operator<<(NdbOut& out, const NdbImportImpl::Mgm& mgm)
|
|
{
|
|
out << "mgm ";
|
|
return out;
|
|
}
|
|
|
|
int
|
|
NdbImportImpl::Mgm::do_connect()
|
|
{
|
|
log_debug(1, "do_connect");
|
|
require(m_handle == 0);
|
|
m_handle = ndb_mgm_create_handle();
|
|
require(m_handle != 0);
|
|
ndb_mgm_set_connectstring(m_handle, opt_ndb_connectstring);
|
|
int retries = opt_connect_retries;
|
|
int delay = opt_connect_retry_delay;
|
|
if (ndb_mgm_connect(m_handle, retries, delay, 0) == -1)
|
|
{
|
|
m_util.set_error_mgm(m_error, __LINE__, m_handle);
|
|
return -1;
|
|
}
|
|
m_connected = true;
|
|
log_debug(1, "do_connect: success");
|
|
return 0;
|
|
}
|
|
|
|
void
|
|
NdbImportImpl::Mgm::do_disconnect()
|
|
{
|
|
if (m_handle != 0)
|
|
{
|
|
if (m_status != 0)
|
|
{
|
|
free(m_status);
|
|
m_status = 0;
|
|
}
|
|
if (m_connected)
|
|
{
|
|
(void)ndb_mgm_disconnect(m_handle);
|
|
m_connected = false;
|
|
}
|
|
ndb_mgm_destroy_handle(&m_handle);
|
|
m_handle = 0;
|
|
log_debug(1, "do_disconnect: done");
|
|
}
|
|
}
|
|
|
|
int
|
|
NdbImportImpl::Mgm::get_status()
|
|
{
|
|
log_debug(1, "get_status");
|
|
require(m_connected);
|
|
require(m_status == 0);
|
|
int retries = 0;
|
|
while (retries < 10)
|
|
{
|
|
if ((m_status = ndb_mgm_get_status(m_handle)) != 0)
|
|
{
|
|
log_debug(1, "get_status: success");
|
|
return 0;
|
|
}
|
|
NdbSleep_SecSleep(1);
|
|
retries++;
|
|
log_debug(1, "get_status: retries " << retries);
|
|
}
|
|
m_util.set_error_mgm(m_error, __LINE__, m_handle);
|
|
return -1;
|
|
}
|
|
|
|
// node
|
|
|
|
NdbImportImpl::Node::Node()
|
|
{
|
|
m_nodeid = Inval_uint;
|
|
}
|
|
|
|
NdbImportImpl::Nodes::Nodes()
|
|
{
|
|
m_nodecnt = 0;
|
|
for (uint i = 0; i < g_max_nodes; i++)
|
|
m_index[i] = Inval_uint;
|
|
}
|
|
|
|
int
|
|
NdbImportImpl::get_nodes(Nodes& c)
|
|
{
|
|
log_debug(1, "get_nodes");
|
|
c.m_nodecnt = 0;
|
|
do
|
|
{
|
|
Mgm mgm(*this);
|
|
if (mgm.do_connect() == -1)
|
|
break;
|
|
if (mgm.get_status() == -1)
|
|
break;
|
|
const ndb_mgm_cluster_state* status = mgm.m_status;
|
|
for (uint i = 0; i < (uint)status->no_of_nodes; i++)
|
|
{
|
|
const ndb_mgm_node_state* anynode = &status->node_states[i];
|
|
if (anynode->node_type == NDB_MGM_NODE_TYPE_NDB)
|
|
{
|
|
Node& node = c.m_nodes[c.m_nodecnt];
|
|
require(node.m_nodeid == Inval_uint);
|
|
node.m_nodeid = anynode->node_id;
|
|
require(node.m_nodeid < g_max_nodes);
|
|
require(c.m_index[node.m_nodeid] == Inval_uint);
|
|
c.m_index[node.m_nodeid] = c.m_nodecnt;
|
|
log_debug(1, "node " << c.m_nodecnt << ": " << node.m_nodeid);
|
|
c.m_nodecnt++;
|
|
}
|
|
}
|
|
mgm.do_disconnect();
|
|
return 0;
|
|
} while (0);
|
|
return -1;
|
|
}
|
|
|
|
// connect
|
|
|
|
NdbImportImpl::Connect::Connect()
|
|
{
|
|
m_connectioncnt = 0;
|
|
m_connections = 0;
|
|
m_mainconnection = 0;
|
|
m_connected = false;
|
|
m_mainndb = 0;
|
|
}
|
|
|
|
NdbImportImpl::Connect::~Connect()
|
|
{
|
|
}
|
|
|
|
int
|
|
NdbImportImpl::do_connect()
|
|
{
|
|
log_debug(1, "do_connect");
|
|
const Opt& opt = m_util.c_opt;
|
|
Connect& c = c_connect;
|
|
if (c.m_connected)
|
|
{
|
|
m_util.set_error_usage(m_error, __LINE__);
|
|
return -1;
|
|
}
|
|
{
|
|
require(c.m_connections == 0 && c.m_mainconnection == 0);
|
|
c.m_connectioncnt = opt.m_connections;
|
|
c.m_connections = new Ndb_cluster_connection* [c.m_connectioncnt];
|
|
for (uint i = 0; i < c.m_connectioncnt; i++)
|
|
{
|
|
/*
|
|
* There is a single --ndb-nodeid value but there can be
|
|
* several connections. Use nodeid values consecutively
|
|
* starting with --ndb-nodeid. All must exist as API
|
|
* nodes in the config.
|
|
*/
|
|
int nodeid = opt_ndb_nodeid != 0 ? opt_ndb_nodeid + i : 0;
|
|
c.m_connections[i] =
|
|
new Ndb_cluster_connection(opt_ndb_connectstring,
|
|
c.m_mainconnection,
|
|
nodeid);
|
|
if (i == 0)
|
|
c.m_mainconnection = c.m_connections[i];
|
|
}
|
|
for (uint i = 0; i < c.m_connectioncnt; i++)
|
|
{
|
|
log_debug(1, "connection " << i << " of " << c.m_connectioncnt);
|
|
Ndb_cluster_connection* con = c.m_connections[i];
|
|
int retries = opt_connect_retries;
|
|
int delay = opt_connect_retry_delay;
|
|
if (con->connect(retries, delay, 1) != 0)
|
|
{
|
|
m_util.set_error_con(m_error, __LINE__, con);
|
|
return -1;
|
|
}
|
|
log_debug(1, "connection " << i << " api nodeid " << con->node_id());
|
|
}
|
|
}
|
|
for (uint i = 0; i < c.m_connectioncnt; i++)
|
|
{
|
|
Ndb_cluster_connection* con = c.m_connections[i];
|
|
if (con->wait_until_ready(30, 0) < 0)
|
|
{
|
|
m_util.set_error_con(m_error, __LINE__, con);
|
|
return -1;
|
|
}
|
|
log_debug(1, "connection " << i << " wait_until_ready done");
|
|
}
|
|
require(c.m_mainndb == 0);
|
|
c.m_mainndb = new Ndb(c.m_mainconnection);
|
|
if (c.m_mainndb->init() != 0)
|
|
{
|
|
m_util.set_error_ndb(m_error, __LINE__, c.m_mainndb->getNdbError());
|
|
return -1;
|
|
}
|
|
if (c.m_mainndb->waitUntilReady() != 0)
|
|
{
|
|
m_util.set_error_ndb(m_error, __LINE__, c.m_mainndb->getNdbError());
|
|
return -1;
|
|
}
|
|
c.m_connected = true;
|
|
log_debug(1, "do_connect: success");
|
|
return 0;
|
|
}
|
|
|
|
void
|
|
NdbImportImpl::do_disconnect()
|
|
{
|
|
log_debug(1, "do_disconnect");
|
|
Connect& c = c_connect;
|
|
// delete any ndb before delete connection
|
|
delete c.m_mainndb;
|
|
c.m_mainndb = 0;
|
|
if (c.m_connections != 0)
|
|
{
|
|
for (uint i = 0; i < c.m_connectioncnt; i++)
|
|
{
|
|
log_debug(1, "delete connection " << i << " of " << c.m_connectioncnt);
|
|
delete c.m_connections[i];
|
|
c.m_connections[i] = 0;
|
|
}
|
|
}
|
|
delete [] c.m_connections;
|
|
c.m_connections = 0;
|
|
c.m_connected = false;
|
|
log_debug(1, "do_disconnect: done");
|
|
}
|
|
|
|
// tables
|
|
|
|
int
|
|
NdbImportImpl::add_table(const char* database,
|
|
const char* table,
|
|
uint& tabid,
|
|
Error& error)
|
|
{
|
|
Connect& c = c_connect;
|
|
if (!c.m_connected)
|
|
{
|
|
m_util.set_error_usage(error, __LINE__);
|
|
return -1;
|
|
}
|
|
if (database == 0 || table == 0)
|
|
{
|
|
m_util.set_error_usage(error, __LINE__);
|
|
return -1;
|
|
}
|
|
log_debug(1, "add table " << database << "." << table);
|
|
Ndb* ndb = c.m_mainndb;
|
|
if (strcmp(ndb->getDatabaseName(), database) != 0)
|
|
{
|
|
if (ndb->setDatabaseName(database) != 0)
|
|
{
|
|
m_util.set_error_ndb(error, __LINE__, ndb->getNdbError());
|
|
return -1;
|
|
}
|
|
}
|
|
NdbDictionary::Dictionary* dic = ndb->getDictionary();
|
|
const NdbDictionary::Table* tab = dic->getTable(table);
|
|
if (tab == 0)
|
|
{
|
|
m_util.set_error_ndb(error, __LINE__, dic->getNdbError());
|
|
return -1;
|
|
}
|
|
if (m_util.add_table(dic, tab, tabid, error) != 0)
|
|
return -1;
|
|
return 0;
|
|
}
|
|
|
|
int
|
|
NdbImportImpl::remove_table(const uint table_id)
|
|
{
|
|
Connect& c = c_connect;
|
|
if (!c.m_connected)
|
|
{
|
|
m_util.set_error_usage(m_error, __LINE__);
|
|
return -1;
|
|
}
|
|
|
|
Ndb* ndb = c.m_mainndb;
|
|
if (ndb == NULL)
|
|
{
|
|
m_util.set_error_usage(m_error, __LINE__);
|
|
return -1;
|
|
}
|
|
|
|
NdbDictionary::Dictionary* dic = ndb->getDictionary();
|
|
m_util.remove_table(dic, table_id);
|
|
return 0;
|
|
}
|
|
|
|
// files
|
|
|
|
NdbImportImpl::WorkerFile::WorkerFile(NdbImportUtil& util, Error& error) :
|
|
File(util, error)
|
|
{
|
|
m_workerno = Inval_uint;
|
|
}
|
|
|
|
// job
|
|
|
|
NdbImportImpl::Job::Job(NdbImportImpl& impl, uint jobno) :
|
|
m_impl(impl),
|
|
m_util(m_impl.m_util),
|
|
m_jobno(jobno),
|
|
m_name("job", m_jobno),
|
|
m_stats(m_util),
|
|
m_rowmap_in(m_util),
|
|
m_rowmap_out(m_util)
|
|
{
|
|
m_runno = 0;
|
|
m_state = JobState::State_null;
|
|
m_tabid = Inval_uint;
|
|
m_dostop = false;
|
|
m_fatal = false;
|
|
m_teamcnt = 0;
|
|
for (uint i = 0; i < g_max_teams; i++)
|
|
m_teams[i] = 0;
|
|
for (int k = 0; k < g_teamstatecnt; k++)
|
|
m_teamstates[k] = 0;
|
|
m_rows_relay = 0;
|
|
for (uint i = 0; i < g_max_ndb_nodes; i++)
|
|
m_rows_exec[i] = 0;
|
|
m_rows_reject = 0;
|
|
m_old_rows = 0;
|
|
m_old_reject = 0;
|
|
m_old_runtime = 0;
|
|
m_new_rows = 0;
|
|
m_new_reject = 0;
|
|
m_new_runtime = 0;
|
|
// stats
|
|
Stats& stats = m_stats;
|
|
{
|
|
const Name name("job", "rows");
|
|
Stat* stat = stats.create(name, 0, 0);
|
|
m_stat_rows = stat;
|
|
}
|
|
{
|
|
const Name name("job", "reject");
|
|
Stat* stat = stats.create(name, 0, 0);
|
|
m_stat_reject = stat;
|
|
}
|
|
{
|
|
const Name name("job", "runtime");
|
|
Stat* stat = stats.create(name, 0, 0);
|
|
m_stat_runtime = stat;
|
|
}
|
|
{
|
|
const Name name("job", "rowssec");
|
|
Stat* stat = stats.create(name, 0, 0);
|
|
m_stat_rowssec = stat;
|
|
}
|
|
{
|
|
const Name name("job", "utime");
|
|
Stat* stat = stats.create(name, 0, 0);
|
|
m_stat_utime = stat;
|
|
}
|
|
{
|
|
const Name name("job", "stime");
|
|
Stat* stat = stats.create(name, 0, 0);
|
|
m_stat_stime = stat;
|
|
}
|
|
{
|
|
const Name name("job", "rowmap");
|
|
Stat* stat = stats.create(name, 0, 0);
|
|
m_stat_rowmap = stat;
|
|
}
|
|
{
|
|
const Name name("job", "rowmap-utime");
|
|
Stat* stat = stats.create(name, 0, 0);
|
|
m_stat_rowmap_utime = stat;
|
|
}
|
|
log_debug(1, "ctor");
|
|
}
|
|
|
|
NdbImportImpl::Job::~Job()
|
|
{
|
|
log_debug(1, "dtor");
|
|
for (uint i = 0; i < m_teamcnt; i++)
|
|
delete m_teams[i];
|
|
delete m_rows_relay;
|
|
for (uint i = 0; i < g_max_ndb_nodes; i++)
|
|
delete m_rows_exec[i];
|
|
delete m_rows_reject;
|
|
}
|
|
|
|
void
|
|
NdbImportImpl::Job::do_create()
|
|
{
|
|
const Opt& opt = m_util.c_opt;
|
|
uint nodecnt = m_impl.c_nodes.m_nodecnt;
|
|
require(nodecnt != 0);
|
|
Job& job = *this;
|
|
require(job.m_state == JobState::State_null);
|
|
// diag team is team number 0
|
|
{
|
|
uint workercnt = 1;
|
|
Team* team = new DiagTeam(job, workercnt);
|
|
add_team(team);
|
|
}
|
|
// worker teams start at number 1
|
|
if (opt.m_input_type != 0)
|
|
{
|
|
if (strcmp(opt.m_input_type, "random") == 0)
|
|
{
|
|
uint workercnt = opt.m_input_workers;
|
|
RandomInputTeam* team = new RandomInputTeam(job, workercnt);
|
|
add_team(team);
|
|
}
|
|
if (strcmp(opt.m_input_type, "csv") == 0)
|
|
{
|
|
uint workercnt = opt.m_input_workers;
|
|
CsvInputTeam* team = new CsvInputTeam(job, workercnt);
|
|
add_team(team);
|
|
}
|
|
}
|
|
if (opt.m_output_type != 0)
|
|
{
|
|
if (strcmp(opt.m_output_type, "null") == 0)
|
|
{
|
|
uint workercnt = opt.m_output_workers;
|
|
NullOutputTeam* team = new NullOutputTeam(job, workercnt);
|
|
add_team(team);
|
|
}
|
|
if (strcmp(opt.m_output_type, "ndb") == 0)
|
|
{
|
|
uint workercnt = opt.m_output_workers;
|
|
RelayOpTeam* team = new RelayOpTeam(job, workercnt);
|
|
add_team(team);
|
|
}
|
|
if (strcmp(opt.m_output_type, "ndb") == 0)
|
|
{
|
|
require(opt.m_db_workers != 0);
|
|
uint workercnt = opt.m_db_workers * nodecnt;
|
|
ExecOpTeam* team = new ExecOpTeam(job, workercnt);
|
|
add_team(team);
|
|
}
|
|
}
|
|
// row queues
|
|
Stats& stats = m_stats;
|
|
{
|
|
m_rows_relay = new RowList;
|
|
m_rows_relay->set_stats(stats, "rows-relay");
|
|
uint rowbatch = opt.m_rowbatch;
|
|
if (rowbatch != 0)
|
|
m_rows_relay->m_rowbatch = rowbatch;
|
|
uint rowbytes = opt.m_rowbytes;
|
|
if (rowbytes != 0)
|
|
m_rows_relay->m_rowbytes = rowbytes;
|
|
}
|
|
{
|
|
for (uint i = 0; i < nodecnt; i++)
|
|
{
|
|
Name name("rows-exec", i);
|
|
m_rows_exec[i] = new RowList;
|
|
m_rows_exec[i]->set_stats(stats, name);
|
|
uint rowbatch = opt.m_rowbatch;
|
|
if (rowbatch != 0)
|
|
m_rows_exec[i]->m_rowbatch = rowbatch;
|
|
uint rowbytes = opt.m_rowbytes;
|
|
if (rowbytes != 0)
|
|
m_rows_exec[i]->m_rowbytes = rowbytes;
|
|
}
|
|
}
|
|
{
|
|
m_rows_reject = new RowList;
|
|
m_rows_reject->set_stats(stats, "rows-reject");
|
|
}
|
|
job.m_state = JobState::State_created;
|
|
}
|
|
|
|
void
|
|
NdbImportImpl::Job::add_team(Team* team)
|
|
{
|
|
require(team != 0);
|
|
require(m_teamcnt < g_max_teams);
|
|
m_teams[m_teamcnt] = team;
|
|
m_teamcnt++;
|
|
}
|
|
|
|
int
|
|
NdbImportImpl::Job::add_table(const char* database,
|
|
const char* table,
|
|
uint& tabid)
|
|
{
|
|
return m_impl.add_table(database, table, tabid, m_error);
|
|
}
|
|
|
|
void
|
|
NdbImportImpl::Job::set_table(uint tabid)
|
|
{
|
|
(void)m_util.get_table(tabid);
|
|
m_tabid = tabid;
|
|
}
|
|
|
|
int
|
|
NdbImportImpl::Job::remove_table(const uint table_id)
|
|
{
|
|
return m_impl.remove_table(table_id);
|
|
}
|
|
|
|
void
|
|
NdbImportImpl::Job::do_start()
|
|
{
|
|
const Opt& opt = m_util.c_opt;
|
|
log_debug(1, "start");
|
|
m_timer.start();
|
|
do
|
|
{
|
|
m_state = JobState::State_starting;
|
|
start_diag_team();
|
|
if (has_error())
|
|
{
|
|
m_state = JobState::State_stop;
|
|
break;
|
|
}
|
|
if (opt.m_resume)
|
|
start_resume();
|
|
start_teams();
|
|
if (has_error())
|
|
{
|
|
m_state = JobState::State_stop;
|
|
break;
|
|
}
|
|
m_state = JobState::State_running;
|
|
while (m_state != JobState::State_stop)
|
|
{
|
|
log_debug(2, "running");
|
|
check_teams(false);
|
|
check_userstop();
|
|
NdbSleep_MilliSleep(opt.m_checkloop);
|
|
}
|
|
} while (0);
|
|
log_debug(1, "stop");
|
|
while (m_state != JobState::State_stopped)
|
|
{
|
|
log_debug(2, "stopping");
|
|
check_teams(true);
|
|
if (m_state == JobState::State_stop)
|
|
m_state = JobState::State_stopped;
|
|
NdbSleep_MilliSleep(opt.m_checkloop);
|
|
}
|
|
log_debug(1, "stopped");
|
|
collect_teams();
|
|
collect_stats();
|
|
stop_diag_team();
|
|
log_debug(1, "rowmap out: " << m_rowmap_out);
|
|
m_state = JobState::State_done;
|
|
log_debug(1, "done");
|
|
}
|
|
|
|
void
|
|
NdbImportImpl::Job::start_diag_team()
|
|
{
|
|
Team* team = m_teams[0];
|
|
team->do_create();
|
|
team->do_start();
|
|
if (team->has_error())
|
|
{
|
|
m_util.set_error_gen(m_error, __LINE__,
|
|
"failed to start team %u-%s (state file manager)",
|
|
team->m_teamno, team->m_name.str());
|
|
return;
|
|
}
|
|
team->do_run();
|
|
log_debug(1, "diag team started");
|
|
}
|
|
|
|
void
|
|
NdbImportImpl::Job::start_resume()
|
|
{
|
|
log_debug(1, "start_resume jobno=" << m_jobno);
|
|
// verify old stats counts against old rowmap
|
|
{
|
|
uint64 old_rows;
|
|
uint64 old_reject;
|
|
m_rowmap_in.get_total(old_rows, old_reject);
|
|
if (m_old_rows != old_rows ||
|
|
m_old_reject != old_reject)
|
|
{
|
|
m_util.set_error_gen(m_error, __LINE__,
|
|
"inconsistent counts from old state files"
|
|
" (*.stt vs *.map)"
|
|
" rows %" PRIu64 " vs %" PRIu64 " reject %" PRIu64 " vs %" PRIu64,
|
|
m_old_rows, old_rows, m_old_reject, old_reject);
|
|
return;
|
|
}
|
|
}
|
|
// copy entire old rowmap
|
|
require(m_rowmap_out.empty());
|
|
m_rowmap_out.add(m_rowmap_in);
|
|
// input worker handles seek in do_init()
|
|
}
|
|
|
|
void
|
|
NdbImportImpl::Job::start_teams()
|
|
{
|
|
for (uint i = 1; i < m_teamcnt; i++)
|
|
{
|
|
Team* team = m_teams[i];
|
|
team->do_create();
|
|
}
|
|
for (uint i = 1; i < m_teamcnt; i++)
|
|
{
|
|
Team* team = m_teams[i];
|
|
team->do_start();
|
|
if (team->has_error())
|
|
{
|
|
m_util.set_error_gen(m_error, __LINE__,
|
|
"failed to start team %u-%s",
|
|
team->m_teamno, team->m_name.str());
|
|
return;
|
|
}
|
|
}
|
|
for (uint i = 1; i < m_teamcnt; i++)
|
|
{
|
|
Team* team = m_teams[i];
|
|
team->do_run();
|
|
}
|
|
log_debug(1, "teams started");
|
|
}
|
|
|
|
void
|
|
NdbImportImpl::Job::check_teams(bool dostop)
|
|
{
|
|
for (uint i = 1; i < m_teamcnt; i++)
|
|
{
|
|
Team* team = m_teams[i];
|
|
if (team->m_state == TeamState::State_null)
|
|
{
|
|
// never started
|
|
team->m_state = TeamState::State_stopped;
|
|
continue;
|
|
}
|
|
team->check_workers();
|
|
if (team->m_state == TeamState::State_stop)
|
|
{
|
|
team->do_stop();
|
|
}
|
|
if (dostop && team->m_state != TeamState::State_stopped)
|
|
{
|
|
team->do_stop();
|
|
}
|
|
if (!team->m_rowmap_out.empty())
|
|
{
|
|
// lock since diag team also writes to job rowmap
|
|
m_rowmap_out.lock();
|
|
m_rowmap_out.add(team->m_rowmap_out);
|
|
log_debug(1, "rowmap " << m_rowmap_out.size() << " <- "
|
|
<< team->m_rowmap_out.size());
|
|
m_stat_rowmap->add(m_rowmap_out.size());
|
|
m_rowmap_out.unlock();
|
|
team->m_rowmap_out.clear();
|
|
}
|
|
}
|
|
for (int k = 0; k < g_teamstatecnt; k++)
|
|
m_teamstates[k] = 0;
|
|
for (uint i = 1; i < m_teamcnt; i++)
|
|
{
|
|
Team* team = m_teams[i];
|
|
int k = (int)team->m_state;
|
|
require(k >= 0 && k < g_teamstatecnt);
|
|
m_teamstates[k]++;
|
|
}
|
|
if (m_teamstates[TeamState::State_stopped] == m_teamcnt - 1)
|
|
m_state = JobState::State_stop;
|
|
}
|
|
|
|
void
|
|
NdbImportImpl::Job::check_userstop()
|
|
{
|
|
if (m_dostop && m_state != JobState::State_stop)
|
|
{
|
|
log_debug(1, "stop by user request");
|
|
m_util.set_error_gen(m_error, __LINE__,
|
|
"stop by user request");
|
|
m_state = JobState::State_stop;
|
|
}
|
|
if (NdbImportUtil::g_stop_all && m_state != JobState::State_stop)
|
|
{
|
|
log_debug(1, "stop by user interrupt");
|
|
m_util.set_error_gen(m_error, __LINE__,
|
|
"stop by user interrupt");
|
|
m_state = JobState::State_stop;
|
|
}
|
|
}
|
|
|
|
void
|
|
NdbImportImpl::Job::collect_teams()
|
|
{
|
|
char error_team[100] = "";
|
|
{
|
|
Team* team = m_teams[0];
|
|
if (team->has_error())
|
|
sprintf(error_team + strlen(error_team),
|
|
" %u-%s", team->m_teamno, team->m_name.str());
|
|
}
|
|
for (uint i = 1; i < m_teamcnt; i++)
|
|
{
|
|
Team* team = m_teams[i];
|
|
require(team->m_state == TeamState::State_stopped);
|
|
if (team->has_error())
|
|
sprintf(error_team + strlen(error_team),
|
|
" %u-%s", team->m_teamno, team->m_name.str());
|
|
}
|
|
if (strlen(error_team) != 0)
|
|
{
|
|
m_util.set_error_gen(m_error, __LINE__,
|
|
"error in teams:%s", error_team);
|
|
}
|
|
}
|
|
|
|
void
|
|
NdbImportImpl::Job::collect_stats()
|
|
{
|
|
m_timer.stop();
|
|
uint64 msec = m_timer.elapsed_msec();
|
|
if (msec == 0)
|
|
msec = 1;
|
|
// counts
|
|
const RowMap& rowmap = m_rowmap_out;
|
|
uint64 total_rows;
|
|
uint64 total_reject;
|
|
rowmap.get_total(total_rows, total_reject);
|
|
require(total_rows >= m_old_rows);
|
|
require(total_reject >= m_old_reject);
|
|
m_new_rows = total_rows - m_old_rows;
|
|
m_new_reject = total_reject - m_old_reject;
|
|
m_stat_rows->add(m_new_rows);
|
|
m_stat_reject->add(m_new_reject);
|
|
// times
|
|
m_new_runtime = msec;
|
|
m_stat_runtime->add(m_new_runtime);
|
|
m_stat_rowssec->add((m_new_rows * 1000) / msec);
|
|
m_stat_rowmap_utime->add(m_timer.m_utime_msec);
|
|
}
|
|
|
|
void
|
|
NdbImportImpl::Job::stop_diag_team()
|
|
{
|
|
const Opt& opt = m_util.c_opt;
|
|
Team* team = m_teams[0];
|
|
team->do_stop();
|
|
while (team->m_state != TeamState::State_stopped)
|
|
{
|
|
NdbSleep_MilliSleep(opt.m_checkloop);
|
|
}
|
|
log_debug(1, "diag team stopped");
|
|
}
|
|
|
|
void
|
|
NdbImportImpl::Job::do_stop()
|
|
{
|
|
m_dostop = true;
|
|
}
|
|
|
|
// team
|
|
|
|
NdbImportImpl::Team::Team(Job& job,
|
|
const char* name,
|
|
uint workercnt) :
|
|
m_job(job),
|
|
m_impl(job.m_impl),
|
|
m_util(m_impl.m_util),
|
|
m_teamno(job.m_teamcnt),
|
|
m_name(name),
|
|
m_workercnt(workercnt),
|
|
m_rowmap_out(m_util)
|
|
{
|
|
m_state = TeamState::State_null;
|
|
m_workers = 0;
|
|
for (int k = 0; k < g_workerstatecnt; k++)
|
|
m_workerstates[k] = 0;
|
|
m_tabid = Inval_uint;
|
|
m_is_diag = false;
|
|
// stats
|
|
Stats& stats = m_job.m_stats;
|
|
{
|
|
const Name name(m_name, "runtime");
|
|
Stat* stat = stats.create(name, 0, 0);
|
|
m_stat_runtime = stat;
|
|
}
|
|
{
|
|
const Name name(m_name, "slice");
|
|
Stat* stat = stats.create(name, 0, 0);
|
|
m_stat_slice = stat;
|
|
}
|
|
{
|
|
const Name name(m_name, "idleslice");
|
|
Stat* stat = stats.create(name, 0, 0);
|
|
m_stat_idleslice = stat;
|
|
}
|
|
{
|
|
const Name name(m_name, "idlerun");
|
|
Stat* stat = stats.create(name, 0, 0);
|
|
m_stat_idlerun = stat;
|
|
}
|
|
{
|
|
const Name name(m_name, "utime");
|
|
uint parent = m_job.m_stat_utime->m_id;
|
|
Stat* stat = stats.create(name, parent, 0);
|
|
m_stat_utime = stat;
|
|
}
|
|
{
|
|
const Name name(m_name, "stime");
|
|
uint parent = m_job.m_stat_stime->m_id;
|
|
Stat* stat = stats.create(name, parent, 0);
|
|
m_stat_stime = stat;
|
|
}
|
|
{
|
|
const Name name(m_name, "rowmap");
|
|
Stat* stat = stats.create(name, 0, 0);
|
|
m_stat_rowmap = stat;
|
|
}
|
|
log_debug(1, "ctor");
|
|
}
|
|
|
|
NdbImportImpl::Team::~Team()
|
|
{
|
|
log_debug(1, "dtor");
|
|
if (m_workers != 0)
|
|
{
|
|
for (uint n = 0; n < m_workercnt; n++)
|
|
{
|
|
Worker* w = m_workers[n];
|
|
delete w;
|
|
}
|
|
delete [] m_workers;
|
|
}
|
|
}
|
|
|
|
void
|
|
NdbImportImpl::Team::do_create()
|
|
{
|
|
log_debug(1, "do_create");
|
|
require(m_state == TeamState::State_null);
|
|
require(m_workers == 0);
|
|
require(m_workercnt > 0);
|
|
m_workers = new Worker* [m_workercnt];
|
|
for (uint n = 0; n < m_workercnt; n++)
|
|
{
|
|
m_workers[n] = 0;
|
|
}
|
|
for (uint n = 0; n < m_workercnt; n++)
|
|
{
|
|
Worker* w = create_worker(n);
|
|
require(w != 0);
|
|
m_workers[n] = w;
|
|
}
|
|
m_state = TeamState::State_created;
|
|
}
|
|
|
|
void
|
|
NdbImportImpl::Team::do_start()
|
|
{
|
|
log_debug(1, "start");
|
|
m_timer.start();
|
|
require(m_state == TeamState::State_created);
|
|
require(m_workers != 0);
|
|
do_init();
|
|
if (has_error())
|
|
{
|
|
m_state = TeamState::State_stop;
|
|
return;
|
|
}
|
|
for (uint n = 0; n < m_workercnt; n++)
|
|
{
|
|
Worker* w = get_worker(n);
|
|
start_worker(w);
|
|
}
|
|
wait_workers(WorkerState::State_wait);
|
|
m_state = TeamState::State_started;
|
|
}
|
|
|
|
extern "C" { static void* start_worker_c(void* data); }
|
|
|
|
static void*
|
|
start_worker_c(void* data)
|
|
{
|
|
NdbImportImpl::Worker* w = (NdbImportImpl::Worker*)data;
|
|
require(w != 0);
|
|
w->do_start();
|
|
return 0;
|
|
}
|
|
|
|
NdbImportImpl::Worker*
|
|
NdbImportImpl::Team::get_worker(uint n)
|
|
{
|
|
require(n < m_workercnt);
|
|
Worker* w = m_workers[n];
|
|
require(w != 0);
|
|
return w;
|
|
}
|
|
|
|
void
|
|
NdbImportImpl::Team::start_worker(Worker* w)
|
|
{
|
|
NDB_THREAD_PRIO prio = NDB_THREAD_PRIO_MEAN;
|
|
uint stack_size = 64*1024;
|
|
w->m_thread = NdbThread_Create(
|
|
start_worker_c, (void**)w, stack_size, w->m_name, prio);
|
|
require(w->m_thread != 0);
|
|
}
|
|
|
|
void
|
|
NdbImportImpl::Team::wait_workers(WorkerState::State state)
|
|
{
|
|
log_debug(1, "wait_workers");
|
|
for (uint n = 0; n < m_workercnt; n++)
|
|
{
|
|
Worker* w = get_worker(n);
|
|
wait_worker(w, state);
|
|
}
|
|
}
|
|
|
|
void
|
|
NdbImportImpl::Team::wait_worker(Worker* w, WorkerState::State state)
|
|
{
|
|
log_debug(1, "wait_worker: " << *w << " for " << g_str_state(state));
|
|
const Opt& opt = m_util.c_opt;
|
|
uint timeout = opt.m_idlesleep;
|
|
w->lock();
|
|
while (1)
|
|
{
|
|
log_debug(1, *w << ": wait for " << g_str_state(state));
|
|
if (w->m_state == state || w->m_state == WorkerState::State_stopped)
|
|
break;
|
|
w->wait(timeout);
|
|
}
|
|
w->unlock();
|
|
}
|
|
|
|
void
|
|
NdbImportImpl::Team::do_run()
|
|
{
|
|
log_debug(1, "do_run");
|
|
if (has_error())
|
|
{
|
|
if (m_state != TeamState::State_stopped)
|
|
m_state = TeamState::State_stop;
|
|
return;
|
|
}
|
|
for (uint n = 0; n < m_workercnt; n++)
|
|
{
|
|
Worker* w = get_worker(n);
|
|
run_worker(w);
|
|
}
|
|
wait_workers(WorkerState::State_running);
|
|
m_state = TeamState::State_running;
|
|
}
|
|
|
|
void
|
|
NdbImportImpl::Team::run_worker(Worker* w)
|
|
{
|
|
log_debug(1, "run_worker: " << *w);
|
|
w->lock();
|
|
w->m_state = WorkerState::State_run;
|
|
w->signal();
|
|
w->unlock();
|
|
}
|
|
|
|
void
|
|
NdbImportImpl::Team::check_workers()
|
|
{
|
|
log_debug(2, "check_workers");
|
|
for (int k = 0; k < g_workerstatecnt; k++)
|
|
m_workerstates[k] = 0;
|
|
for (uint n = 0; n < m_workercnt; n++)
|
|
{
|
|
Worker* w = get_worker(n);
|
|
w->lock();
|
|
log_debug(2, "check_worker " << *w);
|
|
int k = (int)w->m_state;
|
|
require(k >= 0 && k < g_workerstatecnt);
|
|
m_workerstates[k]++;
|
|
// transfer rowmap while worker is locked
|
|
if (!w->m_rowmap_out.empty())
|
|
{
|
|
m_rowmap_out.add(w->m_rowmap_out);
|
|
log_debug(1, "rowmap " << m_rowmap_out.size() << " <- "
|
|
<< w->m_rowmap_out.size());
|
|
m_stat_rowmap->add(m_rowmap_out.size());
|
|
w->m_rowmap_out.clear();
|
|
}
|
|
log_debug(2, "rowmap out: " << m_rowmap_out);
|
|
w->unlock();
|
|
}
|
|
if (m_workerstates[WorkerState::State_stopped] == m_workercnt)
|
|
{
|
|
if (m_state != TeamState::State_stopped)
|
|
m_state = TeamState::State_stop;
|
|
}
|
|
if (has_error())
|
|
{
|
|
if (m_state != TeamState::State_stopped)
|
|
m_state = TeamState::State_stop;
|
|
}
|
|
log_debug(2, "check_workers done");
|
|
}
|
|
|
|
void
|
|
NdbImportImpl::Team::do_stop()
|
|
{
|
|
log_debug(1, "do_stop");
|
|
for (uint n = 0; n < m_workercnt; n++)
|
|
{
|
|
Worker* w = get_worker(n);
|
|
stop_worker(w);
|
|
}
|
|
wait_workers(WorkerState::State_stopped);
|
|
// transfer final rowmap entries
|
|
for (uint n = 0; n < m_workercnt; n++)
|
|
{
|
|
Worker* w = get_worker(n);
|
|
if (!w->m_rowmap_out.empty())
|
|
{
|
|
m_rowmap_out.add(w->m_rowmap_out);
|
|
m_stat_rowmap->add(m_rowmap_out.size());
|
|
w->m_rowmap_out.clear();
|
|
}
|
|
}
|
|
do_end();
|
|
for (uint n = 0; n < m_workercnt; n++)
|
|
{
|
|
Worker* w = get_worker(n);
|
|
if (w->m_thread != 0)
|
|
w->join();
|
|
w->m_thread = 0;
|
|
}
|
|
m_state = TeamState::State_stopped;
|
|
m_timer.stop();
|
|
// stats
|
|
{
|
|
uint64 msec = m_timer.elapsed_msec();
|
|
if (msec == 0)
|
|
msec = 1;
|
|
m_stat_runtime->add(msec);
|
|
}
|
|
}
|
|
|
|
void
|
|
NdbImportImpl::Team::stop_worker(Worker* w)
|
|
{
|
|
log_debug(1, "stop_worker: " << *w);
|
|
w->lock();
|
|
switch (w->m_state) {
|
|
case WorkerState::State_null:
|
|
w->m_state = WorkerState::State_stopped;
|
|
w->signal();
|
|
break;
|
|
case WorkerState::State_wait:
|
|
w->m_state = WorkerState::State_stopped;
|
|
w->signal();
|
|
break;
|
|
case WorkerState::State_running:
|
|
/*
|
|
* Here we do not interfere with worker state but ask it
|
|
* to stop when convenient. It is enough and simpler for
|
|
* only producers to act on this flag.
|
|
*/
|
|
w->m_dostop = true;
|
|
w->signal();
|
|
break;
|
|
case WorkerState::State_stop:
|
|
/*
|
|
* Worker is about to stop, allow it to do so. It is either
|
|
* ready or it is reacting to m_dostop.
|
|
*/
|
|
break;
|
|
case WorkerState::State_stopped:
|
|
break;
|
|
default:
|
|
require(false);
|
|
break;
|
|
}
|
|
w->unlock();
|
|
}
|
|
|
|
void
|
|
NdbImportImpl::Team::set_table(uint tabid)
|
|
{
|
|
(void)m_util.get_table(tabid);
|
|
m_tabid = tabid;
|
|
}
|
|
|
|
// worker
|
|
|
|
NdbImportImpl::Worker::Worker(NdbImportImpl::Team& team, uint n) :
|
|
m_team(team),
|
|
m_impl(m_team.m_impl),
|
|
m_util(m_impl.m_util),
|
|
m_workerno(n),
|
|
m_name(team.m_name, m_workerno),
|
|
m_rowmap_out(m_util),
|
|
m_error(m_team.m_error)
|
|
{
|
|
m_state = WorkerState::State_null;
|
|
m_dostop = false;
|
|
m_slice = 0;
|
|
m_idleslice = 0;
|
|
m_idle = false;
|
|
m_idlerun = 0;
|
|
m_seed = 0;
|
|
// stats
|
|
Stats& stats = m_team.m_job.m_stats;
|
|
{
|
|
const Name name(m_name, "slice");
|
|
uint parent = m_team.m_stat_slice->m_id;
|
|
Stat* stat = stats.create(name, parent, 0);
|
|
m_stat_slice = stat;
|
|
}
|
|
{
|
|
const Name name(m_name, "idleslice");
|
|
uint parent = m_team.m_stat_idleslice->m_id;
|
|
Stat* stat = stats.create(name, parent, 0);
|
|
m_stat_idleslice = stat;
|
|
}
|
|
{
|
|
const Name name(m_name, "idlerun");
|
|
uint parent = m_team.m_stat_idlerun->m_id;
|
|
Stat* stat = stats.create(name, parent, 0);
|
|
m_stat_idlerun = stat;
|
|
}
|
|
{
|
|
const Name name(m_name, "utime");
|
|
uint parent = m_team.m_stat_utime->m_id;
|
|
Stat* stat = stats.create(name, parent, 0);
|
|
m_stat_utime = stat;
|
|
}
|
|
{
|
|
const Name name(m_name, "stime");
|
|
uint parent = m_team.m_stat_stime->m_id;
|
|
Stat* stat = stats.create(name, parent, 0);
|
|
m_stat_stime = stat;
|
|
}
|
|
{
|
|
const Name name(m_name, "rowmap");
|
|
Stat* stat = stats.create(name, 0, 0);
|
|
m_stat_rowmap = stat;
|
|
}
|
|
log_debug(1, "ctor");
|
|
}
|
|
|
|
NdbImportImpl::Worker::~Worker()
|
|
{
|
|
log_debug(1, "dtor");
|
|
}
|
|
|
|
void
|
|
NdbImportImpl::Worker::do_start()
|
|
{
|
|
log_debug(1, "start");
|
|
const Opt& opt = m_util.c_opt;
|
|
uint timeout = opt.m_idlesleep;
|
|
do_init();
|
|
if (has_error())
|
|
{
|
|
m_state = WorkerState::State_stop;
|
|
}
|
|
m_seed = (unsigned)(NdbHost_GetProcessId() ^ m_workerno);
|
|
while (m_state != WorkerState::State_stopped)
|
|
{
|
|
log_debug(2, "slice: " << m_slice);
|
|
lock();
|
|
m_idle = false;
|
|
switch (m_state) {
|
|
case WorkerState::State_null:
|
|
m_state = WorkerState::State_wait;
|
|
break;
|
|
case WorkerState::State_wait:
|
|
wait(timeout);
|
|
break;
|
|
case WorkerState::State_run:
|
|
m_state = WorkerState::State_running;
|
|
break;
|
|
case WorkerState::State_running:
|
|
do_run();
|
|
m_slice++;
|
|
if (m_idle)
|
|
{
|
|
m_idleslice++;
|
|
m_idlerun++;
|
|
m_stat_idlerun->add(m_idlerun);
|
|
}
|
|
else
|
|
{
|
|
m_idlerun = 0;
|
|
}
|
|
break;
|
|
case WorkerState::State_stop:
|
|
do_end();
|
|
m_slice++;
|
|
m_stat_slice->add(m_slice);
|
|
m_stat_idleslice->add(m_idleslice);
|
|
m_timer.stop();
|
|
m_stat_utime->add(m_timer.m_utime_msec);
|
|
m_stat_stime->add(m_timer.m_stime_msec);
|
|
m_state = WorkerState::State_stopped;
|
|
break;
|
|
case WorkerState::State_stopped:
|
|
require(false);
|
|
break;
|
|
default:
|
|
require(false);
|
|
break;
|
|
}
|
|
if (has_error())
|
|
{
|
|
if (m_state != WorkerState::State_stopped)
|
|
m_state = WorkerState::State_stop;
|
|
}
|
|
signal();
|
|
unlock();
|
|
if (!m_team.m_is_diag)
|
|
{
|
|
if (m_idlerun > opt.m_idlespin && opt.m_idlesleep != 0)
|
|
{
|
|
NdbSleep_MilliSleep(opt.m_idlesleep);
|
|
}
|
|
}
|
|
else
|
|
{
|
|
NdbSleep_MilliSleep(opt.m_checkloop);
|
|
}
|
|
}
|
|
log_debug(1, "stopped");
|
|
}
|
|
|
|
NdbImportImpl::Worker*
|
|
NdbImportImpl::Worker::next_worker()
|
|
{
|
|
Team& team = m_team;
|
|
require(team.m_workercnt > 0);
|
|
uint n = (m_workerno + 1) % team.m_workercnt;
|
|
Worker* w = team.get_worker(n);
|
|
return w;
|
|
}
|
|
|
|
// print
|
|
|
|
const char*
|
|
NdbImportImpl::g_str_state(JobState::State state)
|
|
{
|
|
const char* str = 0;
|
|
switch (state) {
|
|
case JobState::State_null:
|
|
str = "null";
|
|
break;
|
|
case JobState::State_created:
|
|
str = "created";
|
|
break;
|
|
case JobState::State_starting:
|
|
str = "starting";
|
|
break;
|
|
case JobState::State_running:
|
|
str = "running";
|
|
break;
|
|
case JobState::State_stop:
|
|
str = "stop";
|
|
break;
|
|
case JobState::State_stopped:
|
|
str = "stopped";
|
|
break;
|
|
case JobState::State_done:
|
|
str = "done";
|
|
break;
|
|
}
|
|
require(str != 0);
|
|
return str;
|
|
}
|
|
|
|
const char*
|
|
NdbImportImpl::g_str_state(TeamState::State state)
|
|
{
|
|
const char* str = 0;
|
|
switch (state) {
|
|
case TeamState::State_null:
|
|
str = "null";
|
|
break;
|
|
case TeamState::State_created:
|
|
str = "created";
|
|
break;
|
|
case TeamState::State_started:
|
|
str = "started";
|
|
break;
|
|
case TeamState::State_running:
|
|
str = "running";
|
|
break;
|
|
case TeamState::State_stop:
|
|
str = "stop";
|
|
break;
|
|
case TeamState::State_stopped:
|
|
str = "stopped";
|
|
break;
|
|
}
|
|
require(str != 0);
|
|
return str;
|
|
}
|
|
|
|
const char*
|
|
NdbImportImpl::g_str_state(WorkerState::State state)
|
|
{
|
|
const char* str = 0;
|
|
switch (state) {
|
|
case WorkerState::State_null:
|
|
str = "null";
|
|
break;
|
|
case WorkerState::State_wait:
|
|
str = "wait";
|
|
break;
|
|
case WorkerState::State_run:
|
|
str = "run";
|
|
break;
|
|
case WorkerState::State_running:
|
|
str = "running";
|
|
break;
|
|
case WorkerState::State_stop:
|
|
str = "stop";
|
|
break;
|
|
case WorkerState::State_stopped:
|
|
str = "stopped";
|
|
break;
|
|
}
|
|
require(str != 0);
|
|
return str;
|
|
}
|
|
|
|
void
|
|
NdbImportImpl::Job::str_state(char* str) const
|
|
{
|
|
strcpy(str, g_str_state(m_state));
|
|
}
|
|
|
|
void
|
|
NdbImportImpl::Team::str_state(char* str) const
|
|
{
|
|
strcpy(str, g_str_state(m_state));
|
|
}
|
|
|
|
void
|
|
NdbImportImpl::Worker::str_state(char* str) const
|
|
{
|
|
strcpy(str, g_str_state(m_state));
|
|
}
|
|
|
|
NdbOut&
|
|
operator<<(NdbOut& out, const NdbImportImpl::Job& job)
|
|
{
|
|
char str[100];
|
|
job.str_state(str);
|
|
out << "J-" << job.m_jobno << " [" << str << "] ";
|
|
if (job.has_error())
|
|
{
|
|
const Error& error = job.m_error;
|
|
const char* typetext = error.gettypetext();
|
|
out << "error[" << typetext << "-" << error.code << "] ";
|
|
}
|
|
return out;
|
|
}
|
|
|
|
NdbOut&
|
|
operator<<(NdbOut& out, const NdbImportImpl::Team& team)
|
|
{
|
|
char str[100];
|
|
team.str_state(str);
|
|
out << "T-" << team.m_teamno << " " << team.m_name << " [" << str << "] ";
|
|
if (team.has_error())
|
|
{
|
|
const Error& error = team.m_error;
|
|
const char* typetext = error.gettypetext();
|
|
out << "error[" << typetext << "-" << error.code << "] ";
|
|
}
|
|
return out;
|
|
}
|
|
|
|
NdbOut&
|
|
operator<<(NdbOut& out, const NdbImportImpl::Worker& w)
|
|
{
|
|
char str[100];
|
|
w.str_state(str);
|
|
out << "W " << w.m_name << " [" << str << "] ";
|
|
return out;
|
|
}
|
|
|
|
// random input team
|
|
|
|
NdbImportImpl::RandomInputTeam::RandomInputTeam(Job& job,
|
|
uint workercnt) :
|
|
Team(job, "random-input", workercnt)
|
|
{
|
|
}
|
|
|
|
NdbImportImpl::RandomInputTeam::~RandomInputTeam()
|
|
{
|
|
}
|
|
|
|
NdbImportImpl::Worker*
|
|
NdbImportImpl::RandomInputTeam::create_worker(uint n)
|
|
{
|
|
RandomInputWorker* w = new RandomInputWorker(*this, n);
|
|
return w;
|
|
}
|
|
|
|
void
|
|
NdbImportImpl::RandomInputTeam::do_init()
|
|
{
|
|
log_debug(1, "do_init");
|
|
set_table(m_job.m_tabid);
|
|
}
|
|
|
|
void
|
|
NdbImportImpl::RandomInputTeam::do_end()
|
|
{
|
|
log_debug(1, "do_end");
|
|
RowList& rows_out = *m_job.m_rows_relay;
|
|
rows_out.lock();
|
|
require(!rows_out.m_eof);
|
|
rows_out.m_eof = true;
|
|
rows_out.unlock();
|
|
}
|
|
|
|
NdbImportImpl::RandomInputWorker::RandomInputWorker(Team& team, uint n) :
|
|
Worker(team, n)
|
|
{
|
|
m_seed = 0;
|
|
}
|
|
|
|
NdbImportImpl::RandomInputWorker::~RandomInputWorker()
|
|
{
|
|
}
|
|
|
|
void
|
|
NdbImportImpl::RandomInputWorker::do_init()
|
|
{
|
|
log_debug(1, "do_init");
|
|
}
|
|
|
|
void
|
|
NdbImportImpl::RandomInputWorker::do_run()
|
|
{
|
|
log_debug(2, "do_run");
|
|
const Opt& opt = m_util.c_opt;
|
|
const uint tabid = m_team.m_tabid;
|
|
const Table& table = m_util.get_table(tabid);
|
|
RowList& rows_out = *m_team.m_job.m_rows_relay;
|
|
uint64 max_rows = opt.m_max_rows != 0 ? opt.m_max_rows : UINT64_MAX;
|
|
rows_out.lock();
|
|
for (uint i = 0; i < opt.m_rowbatch; i++)
|
|
{
|
|
if (rows_out.totcnt() >= max_rows)
|
|
{
|
|
log_debug(1, "stop at max rows " << max_rows);
|
|
m_state = WorkerState::State_stop;
|
|
break;
|
|
}
|
|
if (m_dostop)
|
|
{
|
|
log_debug(1, "stop by request");
|
|
m_state = WorkerState::State_stop;
|
|
break;
|
|
}
|
|
uint64 rowid = rows_out.totcnt();
|
|
Row* row = create_row(rowid, table);
|
|
if (row == 0)
|
|
{
|
|
require(has_error());
|
|
break;
|
|
}
|
|
require(row->m_tabid == table.m_tabid);
|
|
if (!rows_out.push_back(row))
|
|
{
|
|
m_idle = true;
|
|
break;
|
|
}
|
|
}
|
|
rows_out.unlock();
|
|
}
|
|
|
|
void
|
|
NdbImportImpl::RandomInputWorker::do_end()
|
|
{
|
|
log_debug(1, "do_end");
|
|
}
|
|
|
|
NdbImportImpl::Row*
|
|
NdbImportImpl::RandomInputWorker::create_row(uint64 rowid, const Table& table)
|
|
{
|
|
Row* row = m_util.alloc_row(table);
|
|
row->m_rowid = rowid;
|
|
const Attrs& attrs = table.m_attrs;
|
|
const uint attrcnt = attrs.size();
|
|
char keychr[100];
|
|
uint keylen;
|
|
sprintf(keychr, "%" PRIu64 ":", rowid);
|
|
keylen = strlen(keychr);
|
|
for (uint i = 0; i < attrcnt; i++)
|
|
{
|
|
const Attr& attr = attrs[i];
|
|
switch (attr.m_type) {
|
|
case NdbDictionary::Column::Unsigned:
|
|
{
|
|
uint32 val = (uint32)rowid;
|
|
attr.set_value(row, &val, sizeof(val));
|
|
}
|
|
break;
|
|
case NdbDictionary::Column::Bigunsigned:
|
|
{
|
|
uint64 val = rowid;
|
|
attr.set_value(row, &val, sizeof(val));
|
|
}
|
|
break;
|
|
case NdbDictionary::Column::Varchar:
|
|
{
|
|
const uint maxsize = 255;
|
|
uchar val[maxsize];
|
|
uint maxlen = attr.m_length;
|
|
uint len = attr.m_pk ? maxlen : get_rand() % (maxlen + 1);
|
|
for (uint i = 0; i < len; i++)
|
|
val[i] = keychr[i % keylen];
|
|
attr.set_value(row, val, len);
|
|
}
|
|
break;
|
|
case NdbDictionary::Column::Longvarchar:
|
|
{
|
|
const uint maxsize = 65535;
|
|
uchar val[maxsize];
|
|
uint maxlen = attr.m_length;
|
|
uint len = attr.m_pk ? maxlen : get_rand() % (maxlen + 1);
|
|
for (uint i = 0; i < len; i++)
|
|
val[i] = keychr[i % keylen];
|
|
attr.set_value(row, val, len);
|
|
}
|
|
break;
|
|
default:
|
|
{
|
|
m_util.set_error_usage(m_error, __LINE__,
|
|
"column type %d not supported for random input",
|
|
(int)attr.m_type);
|
|
return 0;
|
|
}
|
|
break;
|
|
}
|
|
}
|
|
return row;
|
|
}
|
|
|
|
// csv input team
|
|
|
|
NdbImportImpl::CsvInputTeam::CsvInputTeam(Job& job,
|
|
uint workercnt) :
|
|
Team(job, "csv-input", workercnt),
|
|
m_file(m_util, m_error)
|
|
{
|
|
// stats
|
|
Stats& stats = m_job.m_stats;
|
|
{
|
|
const Name name(m_name, "waittail");
|
|
Stat* stat = stats.create(name, 0, 0);
|
|
m_stat_waittail = stat;
|
|
}
|
|
{
|
|
const Name name(m_name, "waitmove");
|
|
Stat* stat = stats.create(name, 0, 0);
|
|
m_stat_waitmove = stat;
|
|
}
|
|
{
|
|
const Name name(m_name, "movetail");
|
|
Stat* stat = stats.create(name, 0, 0);
|
|
m_stat_movetail = stat;
|
|
}
|
|
}
|
|
|
|
NdbImportImpl::CsvInputTeam::~CsvInputTeam()
|
|
{
|
|
}
|
|
|
|
NdbImportImpl::Worker*
|
|
NdbImportImpl::CsvInputTeam::create_worker(uint n)
|
|
{
|
|
CsvInputWorker* w = new CsvInputWorker(*this, n);
|
|
return w;
|
|
}
|
|
|
|
void
|
|
NdbImportImpl::CsvInputTeam::do_init()
|
|
{
|
|
log_debug(1, "do_init");
|
|
const Opt& opt = m_util.c_opt;
|
|
const OptCsv& optcsv = opt.m_optcsv;
|
|
if (m_impl.m_csv.set_spec(m_csvspec, optcsv, OptCsv::ModeInput) == -1)
|
|
{
|
|
require(m_util.has_error());
|
|
return;
|
|
}
|
|
set_table(m_job.m_tabid);
|
|
WorkerFile& file = m_file;
|
|
file.set_path(opt.m_input_file);
|
|
if (file.do_open(File::Read_flags) == -1)
|
|
{
|
|
require(has_error());
|
|
m_job.m_fatal = true;
|
|
return;
|
|
}
|
|
log_debug(1, "file: opened: " << file.get_path());
|
|
const uint workerno = 0;
|
|
file.m_workerno = workerno;
|
|
CsvInputWorker* w = static_cast<CsvInputWorker*>(get_worker(workerno));
|
|
w->m_firstread = true;
|
|
}
|
|
|
|
void
|
|
NdbImportImpl::CsvInputTeam::do_end()
|
|
{
|
|
log_debug(1, "do_end");
|
|
WorkerFile& file = m_file;
|
|
if (file.do_close() == -1)
|
|
{
|
|
require(has_error());
|
|
// continue
|
|
}
|
|
RowList& rows_out = *m_job.m_rows_relay;
|
|
rows_out.lock();
|
|
require(!rows_out.m_eof);
|
|
rows_out.m_eof = true;
|
|
rows_out.unlock();
|
|
}
|
|
|
|
NdbImportImpl::CsvInputWorker::CsvInputWorker(Team& team, uint n) :
|
|
Worker(team, n),
|
|
m_buf(true)
|
|
{
|
|
m_inputstate = InputState::State_null;
|
|
m_csvinput = 0;
|
|
m_firstread = false;
|
|
m_eof = false;
|
|
}
|
|
|
|
NdbImportImpl::CsvInputWorker::~CsvInputWorker()
|
|
{
|
|
delete m_csvinput;
|
|
}
|
|
|
|
void
|
|
NdbImportImpl::CsvInputWorker::do_init()
|
|
{
|
|
log_debug(1, "do_init");
|
|
const Opt& opt = m_util.c_opt;
|
|
const CsvSpec& csvspec = static_cast<CsvInputTeam&>(m_team).m_csvspec;
|
|
const uint tabid = m_team.m_tabid;
|
|
const Table& table = m_util.get_table(tabid);
|
|
uint pagesize = opt.m_pagesize;
|
|
uint pagecnt = opt.m_pagecnt;
|
|
m_buf.alloc(pagesize, 2 * pagecnt);
|
|
RowList& rows_out = *m_team.m_job.m_rows_relay;
|
|
RowList& rows_reject = *m_team.m_job.m_rows_reject;
|
|
RowMap& rowmap_in = m_team.m_job.m_rowmap_in;
|
|
m_csvinput = new CsvInput(m_impl.m_csv,
|
|
Name("csvinput", m_workerno),
|
|
csvspec,
|
|
table,
|
|
m_buf,
|
|
rows_out,
|
|
rows_reject,
|
|
rowmap_in,
|
|
m_team.m_job.m_stats);
|
|
m_csvinput->do_init();
|
|
if (m_firstread)
|
|
{
|
|
// this worker does first read
|
|
if (opt.m_resume)
|
|
{
|
|
CsvInputTeam& team = static_cast<CsvInputTeam&>(m_team);
|
|
WorkerFile& file = team.m_file;
|
|
RangeList& ranges_in = rowmap_in.m_ranges;
|
|
require(!ranges_in.empty());
|
|
Range range_in = *ranges_in.front();
|
|
/*
|
|
* First range is likely to be the big one. If the range
|
|
* starts with rowid 0 seek to the end and erase it.
|
|
* In rare cases rowid 0 may not yet have been processed
|
|
* due to an early error and rejected out of order rows.
|
|
*/
|
|
if (range_in.m_start == 0)
|
|
{
|
|
uint64 seekpos = range_in.m_endpos;
|
|
if (file.do_seek(seekpos) == -1)
|
|
{
|
|
require(has_error());
|
|
return;
|
|
}
|
|
log_debug(1, "file " << file.get_path() << ": "
|
|
"seek to pos " << seekpos << " done");
|
|
m_csvinput->do_resume(range_in);
|
|
(void)ranges_in.pop_front();
|
|
}
|
|
else
|
|
{
|
|
log_debug(1, "file " << file.get_path() << ": "
|
|
"cannot seek first rowid=" << range_in.m_start);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
void
|
|
NdbImportImpl::CsvInputWorker::do_run()
|
|
{
|
|
log_debug(2, "do_run");
|
|
switch (m_inputstate) {
|
|
case InputState::State_null:
|
|
m_inputstate = InputState::State_lock;
|
|
break;
|
|
case InputState::State_lock:
|
|
state_lock();
|
|
break;
|
|
case InputState::State_read:
|
|
state_read();
|
|
break;
|
|
case InputState::State_waittail:
|
|
state_waittail();
|
|
break;
|
|
case InputState::State_parse:
|
|
state_parse();
|
|
break;
|
|
case InputState::State_movetail:
|
|
state_movetail();
|
|
break;
|
|
case InputState::State_eval:
|
|
state_eval();
|
|
break;
|
|
case InputState::State_send:
|
|
state_send();
|
|
break;
|
|
case InputState::State_eof:
|
|
m_state = WorkerState::State_stop;
|
|
break;
|
|
default:
|
|
require(false);
|
|
break;
|
|
}
|
|
}
|
|
|
|
void
|
|
NdbImportImpl::CsvInputWorker::do_end()
|
|
{
|
|
log_debug(1, "do_end");
|
|
}
|
|
|
|
void
|
|
NdbImportImpl::CsvInputWorker::state_lock()
|
|
{
|
|
log_debug(2, "state_lock");
|
|
if (m_dostop)
|
|
{
|
|
log_debug(1, "stop by request");
|
|
m_state = WorkerState::State_stop;
|
|
return;
|
|
}
|
|
WorkerFile& file = static_cast<CsvInputTeam&>(m_team).m_file;
|
|
file.lock();
|
|
if (file.m_workerno == m_workerno)
|
|
m_inputstate = InputState::State_read;
|
|
else
|
|
m_idle = true;
|
|
file.unlock();
|
|
}
|
|
|
|
void
|
|
NdbImportImpl::CsvInputWorker::state_read()
|
|
{
|
|
log_debug(2, "state_read");
|
|
if (m_dostop)
|
|
{
|
|
log_debug(1, "stop by request");
|
|
m_state = WorkerState::State_stop;
|
|
return;
|
|
}
|
|
WorkerFile& file = static_cast<CsvInputTeam&>(m_team).m_file;
|
|
Buf& buf = m_buf;
|
|
buf.reset();
|
|
if (file.do_read(buf) == -1)
|
|
{
|
|
require(has_error());
|
|
return;
|
|
}
|
|
log_debug(2, "file: read: " << buf.m_len);
|
|
if (buf.m_eof)
|
|
{
|
|
log_debug(1, "eof");
|
|
m_eof = true;
|
|
}
|
|
file.lock();
|
|
CsvInputWorker* w2 = static_cast<CsvInputWorker*>(next_worker());
|
|
file.m_workerno = w2->m_workerno;
|
|
file.unlock();
|
|
if (m_firstread)
|
|
{
|
|
m_inputstate = InputState::State_parse;
|
|
m_firstread = false;
|
|
}
|
|
else
|
|
{
|
|
m_inputstate = InputState::State_waittail;
|
|
}
|
|
}
|
|
|
|
void
|
|
NdbImportImpl::CsvInputWorker::state_waittail()
|
|
{
|
|
log_debug(2, "state_waittail");
|
|
if (m_dostop)
|
|
{
|
|
log_debug(1, "stop by request");
|
|
m_state = WorkerState::State_stop;
|
|
return;
|
|
}
|
|
CsvInputTeam& team = static_cast<CsvInputTeam&>(m_team);
|
|
team.m_stat_waittail->add(1);
|
|
m_idle = true;
|
|
}
|
|
|
|
void
|
|
NdbImportImpl::CsvInputWorker::state_parse()
|
|
{
|
|
log_debug(2, "state_parse");
|
|
m_csvinput->do_parse();
|
|
log_debug(2, "lines parsed:" << m_csvinput->m_line_list.cnt());
|
|
m_inputstate = InputState::State_movetail;
|
|
}
|
|
|
|
void
|
|
NdbImportImpl::CsvInputWorker::state_movetail()
|
|
{
|
|
log_debug(2, "state_movetail");
|
|
if (m_dostop)
|
|
{
|
|
log_debug(1, "stop by request");
|
|
m_state = WorkerState::State_stop;
|
|
return;
|
|
}
|
|
CsvInputTeam& team = static_cast<CsvInputTeam&>(m_team);
|
|
CsvInputWorker* w2 = static_cast<CsvInputWorker*>(next_worker());
|
|
w2->lock();
|
|
log_debug(2, "next worker: " << *w2);
|
|
if (w2->m_inputstate == InputState::State_waittail)
|
|
{
|
|
m_csvinput->do_movetail(*w2->m_csvinput);
|
|
team.m_stat_movetail->add(1);
|
|
m_inputstate = InputState::State_eval;
|
|
w2->m_inputstate = InputState::State_parse;
|
|
}
|
|
else if (w2->m_inputstate == InputState::State_eof)
|
|
{
|
|
m_inputstate = InputState::State_eval;
|
|
}
|
|
else
|
|
{
|
|
// cannot move tail yet
|
|
team.m_stat_waitmove->add(1);
|
|
m_idle = true;
|
|
}
|
|
w2->unlock();
|
|
}
|
|
|
|
void
|
|
NdbImportImpl::CsvInputWorker::state_eval()
|
|
{
|
|
log_debug(2, "state_eval");
|
|
m_csvinput->do_eval();
|
|
m_inputstate = InputState::State_send;
|
|
}
|
|
|
|
void
|
|
NdbImportImpl::CsvInputWorker::state_send()
|
|
{
|
|
log_debug(2, "state_send");
|
|
const Opt& opt = m_util.c_opt;
|
|
do
|
|
{
|
|
// max-rows is a test option, it need not be exact
|
|
if (opt.m_max_rows != 0)
|
|
{
|
|
RowList& rows_out = *m_team.m_job.m_rows_relay;
|
|
if (rows_out.totcnt() >= opt.m_max_rows)
|
|
{
|
|
log_debug(1, "stop on max-rows option");
|
|
m_inputstate = InputState::State_eof;
|
|
break;
|
|
}
|
|
}
|
|
uint curr = 0;
|
|
uint left = 0;
|
|
m_csvinput->do_send(curr, left);
|
|
log_debug(2, "send: rows curr=" << curr << " left=" << left);
|
|
if (m_csvinput->has_error())
|
|
{
|
|
m_util.copy_error(m_error, m_csvinput->m_error);
|
|
break;
|
|
}
|
|
if (left != 0)
|
|
{
|
|
log_debug(2, "send not ready");
|
|
m_idle = true;
|
|
break;
|
|
}
|
|
if (!m_eof)
|
|
{
|
|
log_debug(2, "send ready and not eof");
|
|
// stop if csv error
|
|
if (m_csvinput->has_error())
|
|
{
|
|
m_util.copy_error(m_error, m_csvinput->m_error);
|
|
break;
|
|
}
|
|
if (m_dostop)
|
|
{
|
|
log_debug(1, "stop by request");
|
|
m_state = WorkerState::State_stop;
|
|
break;
|
|
}
|
|
m_inputstate = InputState::State_lock;
|
|
break;
|
|
}
|
|
log_debug(2, "send ready and eof");
|
|
m_inputstate = InputState::State_eof;
|
|
} while (0);
|
|
}
|
|
|
|
// print
|
|
|
|
const char*
|
|
NdbImportImpl::g_str_state(InputState::State state)
|
|
{
|
|
const char* str = 0;
|
|
switch (state) {
|
|
case InputState::State_null:
|
|
str = "null";
|
|
break;
|
|
case InputState::State_lock:
|
|
str = "lock";
|
|
break;
|
|
case InputState::State_read:
|
|
str = "read";
|
|
break;
|
|
case InputState::State_waittail:
|
|
str = "waittail";
|
|
break;
|
|
case InputState::State_parse:
|
|
str = "parse";
|
|
break;
|
|
case InputState::State_movetail:
|
|
str = "movetail";
|
|
break;
|
|
case InputState::State_eval:
|
|
str = "eval";
|
|
break;
|
|
case InputState::State_send:
|
|
str = "send";
|
|
break;
|
|
case InputState::State_eof:
|
|
str = "eof";
|
|
break;
|
|
}
|
|
require(str != 0);
|
|
return str;
|
|
}
|
|
|
|
void
|
|
NdbImportImpl::CsvInputWorker::str_state(char* str) const
|
|
{
|
|
sprintf(str, "%s/%s", g_str_state(m_state), g_str_state(m_inputstate));
|
|
}
|
|
|
|
// null output team
|
|
|
|
NdbImportImpl::NullOutputTeam::NullOutputTeam(Job& job,
|
|
uint workercnt) :
|
|
Team(job, "null-output", workercnt)
|
|
{
|
|
}
|
|
|
|
NdbImportImpl::NullOutputTeam::~NullOutputTeam()
|
|
{
|
|
}
|
|
|
|
NdbImportImpl::Worker*
|
|
NdbImportImpl::NullOutputTeam::create_worker(uint n)
|
|
{
|
|
NullOutputWorker* w = new NullOutputWorker(*this, n);
|
|
return w;
|
|
}
|
|
|
|
void
|
|
NdbImportImpl::NullOutputTeam::do_init()
|
|
{
|
|
log_debug(1, "do_init");
|
|
}
|
|
|
|
void
|
|
NdbImportImpl::NullOutputTeam::do_end()
|
|
{
|
|
log_debug(1, "do_end");
|
|
}
|
|
|
|
NdbImportImpl::NullOutputWorker::NullOutputWorker(Team& team, uint n) :
|
|
Worker(team, n)
|
|
{
|
|
}
|
|
|
|
NdbImportImpl::NullOutputWorker::~NullOutputWorker()
|
|
{
|
|
}
|
|
|
|
void
|
|
NdbImportImpl::NullOutputWorker::do_init()
|
|
{
|
|
log_debug(1, "do_init");
|
|
}
|
|
|
|
void
|
|
NdbImportImpl::NullOutputWorker::do_run()
|
|
{
|
|
log_debug(2, "do_run");
|
|
RowList& rows_in = *m_team.m_job.m_rows_relay;
|
|
rows_in.lock();
|
|
Row* row = rows_in.pop_front();
|
|
bool eof = (row == 0 && rows_in.m_eof);
|
|
rows_in.unlock();
|
|
if (eof)
|
|
{
|
|
m_state = WorkerState::State_stop;
|
|
return;
|
|
}
|
|
if (row == 0)
|
|
{
|
|
m_idle = true;
|
|
return;
|
|
}
|
|
const uint tabid = row->m_tabid;
|
|
(void)m_util.get_table(tabid);
|
|
m_impl.m_util.free_row(row);
|
|
}
|
|
|
|
void
|
|
NdbImportImpl::NullOutputWorker::do_end()
|
|
{
|
|
log_debug(1, "do_end");
|
|
}
|
|
|
|
// op
|
|
|
|
NdbImportImpl::Op::Op()
|
|
{
|
|
m_row = 0;
|
|
m_rowop = 0;
|
|
m_opcnt = 0;
|
|
m_opsize = 0;
|
|
}
|
|
|
|
NdbImportImpl::OpList::OpList()
|
|
{
|
|
}
|
|
|
|
NdbImportImpl::OpList::~OpList()
|
|
{
|
|
Op* one_op = NULL;
|
|
while ((one_op = pop_front()) != NULL)
|
|
{
|
|
// See bug 30192989
|
|
// require(one_op->m_row == NULL);
|
|
delete one_op;
|
|
}
|
|
}
|
|
|
|
// tx
|
|
|
|
NdbImportImpl::Tx::Tx(DbWorker* w) :
|
|
m_worker(w)
|
|
{
|
|
Stats& stats = w->m_team.m_job.m_stats;
|
|
m_trans = 0;
|
|
m_ops.set_stats(stats, "op-used");
|
|
}
|
|
|
|
NdbImportImpl::Tx::~Tx()
|
|
{
|
|
require(m_trans == 0);
|
|
}
|
|
|
|
NdbImportImpl::TxList::TxList()
|
|
{
|
|
}
|
|
|
|
NdbImportImpl::TxList::~TxList()
|
|
{
|
|
}
|
|
|
|
// db team
|
|
|
|
NdbImportImpl::DbTeam::DbTeam(Job& job,
|
|
const char* name,
|
|
uint workercnt) :
|
|
Team(job, name, workercnt)
|
|
{
|
|
}
|
|
|
|
NdbImportImpl::DbTeam::~DbTeam()
|
|
{
|
|
}
|
|
|
|
NdbImportImpl::DbWorker::DbWorker(Team& team, uint n) :
|
|
Worker(team, n)
|
|
{
|
|
m_ndb = 0;
|
|
Stats& stats = team.m_job.m_stats;
|
|
m_op_free.set_stats(stats, "op-free");
|
|
m_tx_free.set_stats(stats, "tx-free");
|
|
m_tx_open.set_stats(stats, "tx-open");
|
|
}
|
|
|
|
NdbImportImpl::DbWorker::~DbWorker()
|
|
{
|
|
require(m_tx_open.cnt() == 0);
|
|
delete m_ndb;
|
|
}
|
|
|
|
int
|
|
NdbImportImpl::DbWorker::create_ndb(uint transcnt)
|
|
{
|
|
Connect& c = m_impl.c_connect;
|
|
require(m_ndb == 0);
|
|
Ndb* ndb = 0;
|
|
do
|
|
{
|
|
uint index = m_impl.c_connectionindex;
|
|
require(index < c.m_connectioncnt);
|
|
ndb = new Ndb(c.m_connections[index]);
|
|
m_impl.c_connectionindex = (index + 1) % c.m_connectioncnt;
|
|
require(ndb != 0);
|
|
if (ndb->init(transcnt) != 0)
|
|
{
|
|
m_util.set_error_ndb(m_error, __LINE__, ndb->getNdbError());
|
|
break;
|
|
}
|
|
m_ndb = ndb;
|
|
return 0;
|
|
} while (0);
|
|
delete ndb;
|
|
return -1;
|
|
}
|
|
|
|
NdbImportImpl::Op*
|
|
NdbImportImpl::DbWorker::alloc_op()
|
|
{
|
|
Op* op = m_op_free.pop_front();
|
|
if (op == 0)
|
|
{
|
|
op = new Op;
|
|
}
|
|
return op;
|
|
}
|
|
|
|
void
|
|
NdbImportImpl::DbWorker::free_op(Op* op)
|
|
{
|
|
m_op_free.push_back(op);
|
|
}
|
|
|
|
NdbImportImpl::Tx*
|
|
NdbImportImpl::DbWorker::start_trans()
|
|
{
|
|
log_debug(2, "start_trans");
|
|
TxList& tx_free = m_tx_free;
|
|
TxList& tx_open = m_tx_open;
|
|
require(m_ndb != 0);
|
|
NdbTransaction* trans = m_ndb->startTransaction();
|
|
if (trans == 0)
|
|
{
|
|
return 0;
|
|
}
|
|
Tx* tx = tx_free.pop_front();
|
|
if (tx == 0)
|
|
{
|
|
tx = new Tx(this);
|
|
}
|
|
require(tx != 0);
|
|
require(tx->m_trans == 0);
|
|
require(tx->m_ops.cnt() == 0);
|
|
tx->m_trans = trans;
|
|
tx_open.push_back(tx);
|
|
return tx;
|
|
}
|
|
|
|
NdbImportImpl::Tx*
|
|
NdbImportImpl::DbWorker::start_trans(const NdbRecord* keyrec,
|
|
const char* keydata,
|
|
uchar* xfrmbuf, uint xfrmbuflen)
|
|
{
|
|
log_debug(2, "start_trans");
|
|
TxList& tx_free = m_tx_free;
|
|
TxList& tx_open = m_tx_open;
|
|
require(m_ndb != 0);
|
|
NdbTransaction* trans = m_ndb->startTransaction(keyrec, keydata,
|
|
xfrmbuf, xfrmbuflen);
|
|
if (trans == 0)
|
|
{
|
|
return 0;
|
|
}
|
|
Tx* tx = tx_free.pop_front();
|
|
if (tx == 0)
|
|
{
|
|
tx = new Tx(this);
|
|
}
|
|
require(tx != 0);
|
|
require(tx->m_trans == 0);
|
|
require(tx->m_ops.cnt() == 0);
|
|
tx->m_trans = trans;
|
|
tx_open.push_back(tx);
|
|
return tx;
|
|
}
|
|
|
|
NdbImportImpl::Tx*
|
|
NdbImportImpl::DbWorker::start_trans(uint nodeid, uint instanceid)
|
|
{
|
|
log_debug(2, "start_trans");
|
|
TxList& tx_free = m_tx_free;
|
|
TxList& tx_open = m_tx_open;
|
|
require(m_ndb != 0);
|
|
NdbTransaction* trans = m_ndb->startTransaction(nodeid, instanceid);
|
|
if (trans == 0)
|
|
{
|
|
return 0;
|
|
}
|
|
Tx* tx = tx_free.pop_front();
|
|
if (tx == 0)
|
|
{
|
|
tx = new Tx(this);
|
|
}
|
|
require(tx != 0);
|
|
require(tx->m_trans == 0);
|
|
require(tx->m_ops.cnt() == 0);
|
|
tx->m_trans = trans;
|
|
tx_open.push_back(tx);
|
|
return tx;
|
|
}
|
|
|
|
void
|
|
NdbImportImpl::DbWorker::close_trans(Tx* tx)
|
|
{
|
|
log_debug(2, "close_trans");
|
|
TxList& tx_free = m_tx_free;
|
|
TxList& tx_open = m_tx_open;
|
|
require(tx->m_trans != 0);
|
|
m_ndb->closeTransaction(tx->m_trans);
|
|
tx->m_trans = 0;
|
|
while (tx->m_ops.cnt() != 0)
|
|
{
|
|
Op* op = tx->m_ops.pop_front();
|
|
require(op != 0);
|
|
require(op->m_row != 0);
|
|
m_rows_free.push_back(op->m_row);
|
|
op->m_row = 0;
|
|
op->m_rowop = 0;
|
|
op->m_opcnt = 0;
|
|
op->m_opsize = 0;
|
|
free_op(op);
|
|
}
|
|
tx_open.remove(tx);
|
|
tx_free.push_back(tx);
|
|
}
|
|
|
|
// relay op team
|
|
|
|
NdbImportImpl::RelayOpTeam::RelayOpTeam(Job& job,
|
|
uint workercnt) :
|
|
DbTeam(job, "relay-op", workercnt)
|
|
{
|
|
}
|
|
|
|
NdbImportImpl::RelayOpTeam::~RelayOpTeam()
|
|
{
|
|
}
|
|
|
|
NdbImportImpl::Worker*
|
|
NdbImportImpl::RelayOpTeam::create_worker(uint n)
|
|
{
|
|
RelayOpWorker* w = new RelayOpWorker(*this, n);
|
|
return w;
|
|
}
|
|
|
|
void
|
|
NdbImportImpl::RelayOpTeam::do_init()
|
|
{
|
|
log_debug(1, "do_init");
|
|
}
|
|
|
|
void
|
|
NdbImportImpl::RelayOpTeam::do_end()
|
|
{
|
|
log_debug(1, "do_end");
|
|
RowList& rows_in = *m_job.m_rows_relay;
|
|
rows_in.lock();
|
|
require(!rows_in.m_foe);
|
|
rows_in.m_foe = true;
|
|
rows_in.unlock();
|
|
for (uint i = 0; i < m_impl.c_nodes.m_nodecnt; i++)
|
|
{
|
|
RowList& rows_out = *m_job.m_rows_exec[i];
|
|
rows_out.lock();
|
|
require(!rows_out.m_eof);
|
|
rows_out.m_eof = true;
|
|
rows_out.unlock();
|
|
}
|
|
}
|
|
|
|
NdbImportImpl::RelayOpWorker::RelayOpWorker(Team& team, uint n) :
|
|
DbWorker(team, n)
|
|
{
|
|
m_relaystate = RelayState::State_null;
|
|
m_xfrmalloc = 0;
|
|
m_xfrmbuf = 0;
|
|
m_xfrmbuflen = 0;
|
|
for (uint i = 0; i < g_max_ndb_nodes; i++)
|
|
m_rows_exec[i] = 0;
|
|
}
|
|
|
|
NdbImportImpl::RelayOpWorker::~RelayOpWorker()
|
|
{
|
|
delete [] m_xfrmalloc;
|
|
for (uint i = 0; i < g_max_ndb_nodes; i++)
|
|
delete m_rows_exec[i];
|
|
}
|
|
|
|
void
|
|
NdbImportImpl::RelayOpWorker::do_init()
|
|
{
|
|
log_debug(1, "do_init");
|
|
create_ndb(1);
|
|
uint len = (MAX_KEY_SIZE_IN_WORDS << 2) + sizeof(uint64);
|
|
m_xfrmalloc = new uchar [len];
|
|
// copied from Ndb::computeHash()
|
|
UintPtr org = UintPtr(m_xfrmalloc);
|
|
UintPtr use = (org + 7) & ~(UintPtr)7;
|
|
m_xfrmbuf = (uchar*)use;
|
|
m_xfrmbuflen = len - uint(use - org);
|
|
uint nodecnt = m_impl.c_nodes.m_nodecnt;
|
|
require(nodecnt != 0);
|
|
for (uint i = 0; i < nodecnt; i++)
|
|
{
|
|
m_rows_exec[i] = new RowList;
|
|
}
|
|
}
|
|
|
|
void
|
|
NdbImportImpl::RelayOpWorker::do_run()
|
|
{
|
|
log_debug(2, "do_run");
|
|
switch (m_relaystate) {
|
|
case RelayState::State_null:
|
|
m_relaystate = RelayState::State_receive;
|
|
break;
|
|
case RelayState::State_receive:
|
|
state_receive();
|
|
break;
|
|
case RelayState::State_define:
|
|
state_define();
|
|
break;
|
|
case RelayState::State_send:
|
|
state_send();
|
|
break;
|
|
case RelayState::State_eof:
|
|
m_state = WorkerState::State_stop;
|
|
break;
|
|
}
|
|
}
|
|
|
|
void
|
|
NdbImportImpl::RelayOpWorker::state_receive()
|
|
{
|
|
log_debug(2, "state_receive");
|
|
const Opt& opt = m_util.c_opt;
|
|
RowList& rows_in = *m_team.m_job.m_rows_relay;
|
|
rows_in.lock();
|
|
RowCtl ctl(opt.m_rowswait);
|
|
m_rows.push_back_from(rows_in, ctl);
|
|
bool eof = rows_in.m_eof;
|
|
rows_in.unlock();
|
|
if (m_rows.empty())
|
|
{
|
|
if (!eof)
|
|
{
|
|
m_idle = true;
|
|
return;
|
|
}
|
|
m_relaystate = RelayState::State_eof;
|
|
return;
|
|
}
|
|
m_relaystate = RelayState::State_define;
|
|
}
|
|
|
|
void
|
|
NdbImportImpl::RelayOpWorker::state_define()
|
|
{
|
|
log_debug(2, "state_define");
|
|
const Opt& opt = m_util.c_opt;
|
|
Row* row;
|
|
while ((row = m_rows.pop_front()) != 0)
|
|
{
|
|
const Nodes& c = m_impl.c_nodes;
|
|
const Table& table = m_util.get_table(row->m_tabid);
|
|
const bool no_hint = opt.m_no_hint;
|
|
uint nodeid = 0;
|
|
if (no_hint)
|
|
{
|
|
uint i = get_rand() % c.m_nodecnt;
|
|
nodeid = c.m_nodes[i].m_nodeid;
|
|
}
|
|
else
|
|
{
|
|
Uint32 hash;
|
|
m_ndb->computeHash(&hash, table.m_keyrec, (const char*)row->m_data,
|
|
m_xfrmbuf, m_xfrmbuflen);
|
|
uint fragid = (uint)table.m_tab->getPartitionId(hash);
|
|
nodeid = table.get_nodeid(fragid);
|
|
}
|
|
require(nodeid < g_max_nodes);
|
|
uint nodeindex = c.m_index[nodeid];
|
|
require(nodeindex < c.m_nodecnt);
|
|
// move locally to per-node rows
|
|
RowList& rows_exec = *m_rows_exec[nodeindex];
|
|
rows_exec.push_back(row);
|
|
}
|
|
m_relaystate = RelayState::State_send;
|
|
}
|
|
|
|
void
|
|
NdbImportImpl::RelayOpWorker::state_send()
|
|
{
|
|
log_debug(2, "state_send");
|
|
const Opt& opt = m_util.c_opt;
|
|
uint nodecnt = m_impl.c_nodes.m_nodecnt;
|
|
uint left = 0;
|
|
for (uint i = 0; i < nodecnt; i++)
|
|
{
|
|
RowList& rows_exec = *m_rows_exec[i];
|
|
RowList& rows_out = *m_team.m_job.m_rows_exec[i];
|
|
if (rows_exec.cnt() != 0)
|
|
{
|
|
rows_out.lock();
|
|
RowCtl ctl(opt.m_rowswait);
|
|
rows_exec.pop_front_to(rows_out, ctl);
|
|
rows_out.unlock();
|
|
left += rows_exec.cnt();
|
|
}
|
|
}
|
|
if (!left)
|
|
{
|
|
m_relaystate = RelayState::State_receive;
|
|
return;
|
|
}
|
|
m_idle = true;
|
|
}
|
|
|
|
void
|
|
NdbImportImpl::RelayOpWorker::do_end()
|
|
{
|
|
log_debug(1, "do_end");
|
|
if (!has_error())
|
|
{
|
|
require(m_tx_open.cnt() == 0);
|
|
}
|
|
else if (m_tx_open.cnt() != 0)
|
|
{
|
|
require(m_tx_open.cnt() == 1);
|
|
Tx* tx = m_tx_open.front();
|
|
close_trans(tx);
|
|
}
|
|
}
|
|
|
|
// print
|
|
|
|
const char*
|
|
NdbImportImpl::g_str_state(RelayState::State state)
|
|
{
|
|
const char* str = 0;
|
|
switch (state) {
|
|
case RelayState::State_null:
|
|
str = "null";
|
|
break;
|
|
case RelayState::State_receive:
|
|
str = "receive";
|
|
break;
|
|
case RelayState::State_define:
|
|
str = "define";
|
|
break;
|
|
case RelayState::State_send:
|
|
str = "send";
|
|
break;
|
|
case RelayState::State_eof:
|
|
str = "eof";
|
|
break;
|
|
}
|
|
require(str != 0);
|
|
return str;
|
|
}
|
|
|
|
void
|
|
NdbImportImpl::RelayOpWorker::str_state(char* str) const
|
|
{
|
|
sprintf(str, "%s/%s",
|
|
g_str_state(m_state), g_str_state(m_relaystate));
|
|
}
|
|
|
|
// exec op team
|
|
|
|
NdbImportImpl::ExecOpTeam::ExecOpTeam(Job& job,
|
|
uint workercnt) :
|
|
DbTeam(job, "exec-op", workercnt)
|
|
{
|
|
uint nodecnt = m_impl.c_nodes.m_nodecnt;
|
|
require(nodecnt != 0);
|
|
require(workercnt % nodecnt == 0);
|
|
}
|
|
|
|
NdbImportImpl::ExecOpTeam::~ExecOpTeam()
|
|
{
|
|
}
|
|
|
|
NdbImportImpl::Worker*
|
|
NdbImportImpl::ExecOpTeam::create_worker(uint n)
|
|
{
|
|
ExecOpWorker* w = 0;
|
|
const Opt& opt = m_util.c_opt;
|
|
if (opt.m_no_asynch)
|
|
w = new ExecOpWorkerSynch(*this, n);
|
|
else
|
|
w = new ExecOpWorkerAsynch(*this, n);
|
|
return w;
|
|
}
|
|
|
|
void
|
|
NdbImportImpl::ExecOpTeam::do_init()
|
|
{
|
|
log_debug(1, "do_init");
|
|
}
|
|
|
|
void
|
|
NdbImportImpl::ExecOpTeam::do_end()
|
|
{
|
|
log_debug(1, "do_end");
|
|
for (uint i = 0; i < m_impl.c_nodes.m_nodecnt; i++)
|
|
{
|
|
RowList& rows_in = *m_job.m_rows_exec[i];
|
|
rows_in.lock();
|
|
require(!rows_in.m_foe);
|
|
rows_in.m_foe = true;
|
|
rows_in.unlock();
|
|
}
|
|
}
|
|
|
|
NdbImportImpl::ExecOpWorker::ExecOpWorker(Team& team, uint n) :
|
|
DbWorker(team, n)
|
|
{
|
|
m_execstate = ExecState::State_null;
|
|
m_nodeindex = Inval_uint;
|
|
m_nodeid = Inval_uint;
|
|
m_eof = false;
|
|
m_opcnt = 0;
|
|
m_opsize = 0;
|
|
}
|
|
|
|
NdbImportImpl::ExecOpWorker::~ExecOpWorker()
|
|
{
|
|
}
|
|
|
|
void
|
|
NdbImportImpl::ExecOpWorker::do_init()
|
|
{
|
|
log_debug(1, "do_init");
|
|
const Nodes& c = m_impl.c_nodes;
|
|
require(c.m_nodecnt > 0);
|
|
m_nodeindex = m_workerno % c.m_nodecnt;
|
|
m_nodeid = c.m_nodes[m_nodeindex].m_nodeid;
|
|
/*
|
|
* Option opbatch limits number of received rows and
|
|
* therefore number of async transactions. Each row
|
|
* creates one transaction (this is unlikely to change).
|
|
*/
|
|
const Opt& opt = m_util.c_opt;
|
|
require(opt.m_opbatch != 0);
|
|
m_rows.m_rowbatch = opt.m_opbatch;
|
|
m_rows.m_rowbytes = opt.m_opbytes != 0 ? opt.m_opbytes : UINT_MAX;
|
|
create_ndb(opt.m_opbatch);
|
|
}
|
|
|
|
void
|
|
NdbImportImpl::ExecOpWorker::do_run()
|
|
{
|
|
log_debug(2, "do_run");
|
|
switch (m_execstate) {
|
|
case ExecState::State_null:
|
|
m_execstate = ExecState::State_receive;
|
|
break;
|
|
case ExecState::State_receive:
|
|
state_receive();
|
|
break;
|
|
case ExecState::State_define:
|
|
state_define();
|
|
break;
|
|
case ExecState::State_prepare:
|
|
state_prepare();
|
|
break;
|
|
case ExecState::State_send:
|
|
state_send();
|
|
break;
|
|
case ExecState::State_poll:
|
|
state_poll();
|
|
break;
|
|
case ExecState::State_eof:
|
|
m_state = WorkerState::State_stop;
|
|
break;
|
|
default:
|
|
require(false);
|
|
break;
|
|
}
|
|
}
|
|
|
|
/*
|
|
* Receive rows until a batch is full or eof is seen. At the end
|
|
* convert the rows into ops. The ops are assigned to transactions
|
|
* in state_define().
|
|
*/
|
|
void
|
|
NdbImportImpl::ExecOpWorker::state_receive()
|
|
{
|
|
log_debug(2, "state_receive");
|
|
const Opt& opt = m_util.c_opt;
|
|
RowList& rows_in = *m_team.m_job.m_rows_exec[m_nodeindex];
|
|
rows_in.lock();
|
|
RowCtl ctl(opt.m_rowswait);
|
|
m_rows.push_back_from(rows_in, ctl);
|
|
bool eof = rows_in.m_eof;
|
|
rows_in.unlock();
|
|
do
|
|
{
|
|
if (m_rows.full())
|
|
{
|
|
log_debug(2, "got full batch");
|
|
break;
|
|
}
|
|
if (eof)
|
|
{
|
|
if (m_rows.cnt() != 0)
|
|
{
|
|
log_debug(2, "got partial last batch");
|
|
break;
|
|
}
|
|
log_debug(2, "no more rows");
|
|
m_execstate = ExecState::State_eof;
|
|
return;
|
|
}
|
|
log_debug(2, "wait for more rows");
|
|
m_idle = true;
|
|
return;
|
|
} while (0);
|
|
// assign op to each row and move the row under the op
|
|
require(m_ops.cnt() == 0);
|
|
Row* row;
|
|
while ((row = m_rows.pop_front()) != 0)
|
|
{
|
|
Op* op = alloc_op();
|
|
op->m_row = row;
|
|
m_ops.push_back(op);
|
|
}
|
|
m_execstate = ExecState::State_define;
|
|
}
|
|
|
|
void
|
|
NdbImportImpl::ExecOpWorker::reject_row(Row* row, const Error& error)
|
|
{
|
|
const Opt& opt = m_util.c_opt;
|
|
RowList& rows_reject = *m_team.m_job.m_rows_reject;
|
|
rows_reject.lock();
|
|
// write reject row first
|
|
const Table& reject_table = m_util.c_reject_table;
|
|
Row* rejectrow = m_util.alloc_row(reject_table);
|
|
rejectrow->m_rowid = row->m_rowid;
|
|
rejectrow->m_linenr = row->m_linenr;
|
|
rejectrow->m_startpos = row->m_startpos;
|
|
rejectrow->m_endpos = row->m_endpos;
|
|
const char* reject = "<row data not yet available>";
|
|
uint32 rejectlen = strlen(reject);
|
|
m_util.set_reject_row(rejectrow, m_team.m_job.m_runno, error, reject, rejectlen);
|
|
require(rows_reject.push_back(rejectrow));
|
|
// error if rejects exceeded
|
|
if (rows_reject.totcnt() > opt.m_rejects)
|
|
{
|
|
// set team level error
|
|
m_util.set_error_data(m_error, __LINE__, 0,
|
|
"reject limit %u exceeded", opt.m_rejects);
|
|
}
|
|
rows_reject.unlock();
|
|
}
|
|
|
|
// synch
|
|
|
|
NdbImportImpl::ExecOpWorkerSynch::ExecOpWorkerSynch(Team& team, uint n) :
|
|
ExecOpWorker(team, n)
|
|
{
|
|
}
|
|
|
|
NdbImportImpl::ExecOpWorkerSynch::~ExecOpWorkerSynch()
|
|
{
|
|
}
|
|
|
|
void
|
|
NdbImportImpl::ExecOpWorkerSynch::do_end()
|
|
{
|
|
log_debug(1, "do_end/synch");
|
|
if (!has_error())
|
|
{
|
|
require(m_tx_open.cnt() == 0);
|
|
}
|
|
else if (m_tx_open.cnt() != 0)
|
|
{
|
|
require(m_tx_open.cnt() == 1);
|
|
Tx* tx = m_tx_open.front();
|
|
close_trans(tx);
|
|
}
|
|
}
|
|
|
|
void
|
|
NdbImportImpl::ExecOpWorkerSynch::state_define()
|
|
{
|
|
log_debug(2, "state_define/synch");
|
|
TxList& tx_open = m_tx_open;
|
|
// single trans
|
|
require(tx_open.cnt() == 0);
|
|
Tx* tx = start_trans();
|
|
if (tx == 0)
|
|
{
|
|
const NdbError& ndberror = m_ndb->getNdbError();
|
|
require(ndberror.code != 0);
|
|
// synch does not handle temporary errors yet
|
|
m_util.set_error_ndb(m_error, __LINE__, ndberror);
|
|
return;
|
|
}
|
|
NdbTransaction* trans = tx->m_trans;
|
|
require(trans != 0);
|
|
while (m_ops.cnt() != 0)
|
|
{
|
|
Op* op = m_ops.pop_front();
|
|
Row* row = op->m_row;
|
|
require(row != 0);
|
|
const Table& table = m_util.get_table(row->m_tabid);
|
|
const NdbOperation* rowop = 0;
|
|
const char* rowdata = (const char*)row->m_data;
|
|
if ((rowop = trans->insertTuple(table.m_rec, rowdata)) == 0)
|
|
{
|
|
m_util.set_error_ndb(m_error, __LINE__, trans->getNdbError());
|
|
break;
|
|
}
|
|
for (uint j = 0; j < table.m_blobids.size(); j++)
|
|
{
|
|
uint i = table.m_blobids[j];
|
|
require(i < table.m_attrs.size());
|
|
const Attr& attr = table.m_attrs[i];
|
|
require(attr.m_isblob);
|
|
NdbBlob* bh = 0;
|
|
if ((bh = rowop->getBlobHandle(i)) == 0)
|
|
{
|
|
m_util.set_error_ndb(m_error, __LINE__, rowop->getNdbError());
|
|
break;
|
|
}
|
|
Blob* blob = row->m_blobs[attr.m_blobno];
|
|
if (!attr.get_null(row))
|
|
{
|
|
if (bh->setValue(blob->m_data, blob->m_blobsize) == -1)
|
|
{
|
|
m_util.set_error_ndb(m_error, __LINE__, bh->getNdbError());
|
|
break;
|
|
}
|
|
}
|
|
else
|
|
{
|
|
if (bh->setValue((void*)0, 0) == -1)
|
|
{
|
|
m_util.set_error_ndb(m_error, __LINE__, bh->getNdbError());
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
op->m_rowop = rowop;
|
|
tx->m_ops.push_back(op);
|
|
}
|
|
m_execstate = ExecState::State_prepare;
|
|
}
|
|
|
|
void
|
|
NdbImportImpl::ExecOpWorkerSynch::state_prepare()
|
|
{
|
|
log_debug(2, "state_prepare/synch");
|
|
// nothing to do
|
|
m_execstate = ExecState::State_send;
|
|
}
|
|
|
|
void
|
|
NdbImportImpl::ExecOpWorkerSynch::state_send()
|
|
{
|
|
log_debug(2, "state_send/synch");
|
|
TxList& tx_open = m_tx_open;
|
|
require(tx_open.cnt() == 1);
|
|
Tx* tx = tx_open.front();
|
|
require(tx != 0);
|
|
NdbTransaction* trans = tx->m_trans;
|
|
require(trans != 0);
|
|
const NdbTransaction::ExecType et = NdbTransaction::Commit;
|
|
if (trans->execute(et) == -1)
|
|
{
|
|
m_util.set_error_ndb(m_error, __LINE__, trans->getNdbError());
|
|
}
|
|
close_trans(tx);
|
|
m_execstate = ExecState::State_poll;
|
|
}
|
|
|
|
void
|
|
NdbImportImpl::ExecOpWorkerSynch::state_poll()
|
|
{
|
|
log_debug(2, "state_poll/synch");
|
|
// nothing to poll
|
|
m_opcnt = 0;
|
|
m_opsize = 0;
|
|
m_util.free_rows(m_rows_free);
|
|
m_execstate = ExecState::State_receive;
|
|
}
|
|
|
|
// asynch
|
|
|
|
NdbImportImpl::ExecOpWorkerAsynch::ExecOpWorkerAsynch(Team& team, uint n) :
|
|
ExecOpWorker(team, n)
|
|
{
|
|
}
|
|
|
|
NdbImportImpl::ExecOpWorkerAsynch::~ExecOpWorkerAsynch()
|
|
{
|
|
}
|
|
|
|
void
|
|
NdbImportImpl::ExecOpWorkerAsynch::do_end()
|
|
{
|
|
log_debug(1, "do_end/asynch");
|
|
// currently only way for "graceful" stop is fatal error
|
|
if (!has_error())
|
|
{
|
|
require(m_execstate == ExecState::State_eof);
|
|
require(m_tx_open.cnt() == 0);
|
|
}
|
|
else if (m_execstate == ExecState::State_prepare)
|
|
{
|
|
// error in State_define, simply close the txs
|
|
while (m_tx_open.cnt() != 0)
|
|
{
|
|
Tx* tx = m_tx_open.front();
|
|
close_trans(tx);
|
|
}
|
|
}
|
|
else
|
|
{
|
|
// currently trans cannot be closed after executeAsynchPrepare
|
|
if (m_execstate == ExecState::State_send)
|
|
{
|
|
log_debug(1, "send remaining transes");
|
|
state_send();
|
|
}
|
|
while (m_execstate == ExecState::State_poll)
|
|
{
|
|
log_debug(1, "poll remaining transes");
|
|
state_poll();
|
|
}
|
|
}
|
|
}
|
|
|
|
static void
|
|
asynch_callback(int result, NdbTransaction* trans, void* tx_void)
|
|
{
|
|
NdbImportImpl::Tx* tx = (NdbImportImpl::Tx*)tx_void;
|
|
require(trans == tx->m_trans);
|
|
NdbImportImpl::ExecOpWorkerAsynch* w =
|
|
(NdbImportImpl::ExecOpWorkerAsynch*)(tx->m_worker);
|
|
w->asynch_callback(tx);
|
|
}
|
|
|
|
void
|
|
NdbImportImpl::ExecOpWorkerAsynch::asynch_callback(Tx* tx)
|
|
{
|
|
NdbTransaction* trans = tx->m_trans;
|
|
const NdbError& ndberror = trans->getNdbError();
|
|
if (ndberror.status == NdbError::Success)
|
|
{
|
|
Op* op = tx->m_ops.front();
|
|
while (op != 0)
|
|
{
|
|
Row* row = op->m_row;
|
|
m_rowmap_out.add(row, false);
|
|
op = op->next();
|
|
}
|
|
}
|
|
else if (ndberror.status == NdbError::TemporaryError)
|
|
{
|
|
m_errormap.add_one(ndberror.code);
|
|
/*
|
|
* Move rows back to input for processing by new txs.
|
|
* Check for too many temp errors later in state_poll().
|
|
*/
|
|
RowList& rows_in = *m_team.m_job.m_rows_exec[m_nodeindex];
|
|
rows_in.lock();
|
|
while (tx->m_ops.cnt() != 0)
|
|
{
|
|
Op* op = tx->m_ops.pop_front();
|
|
Row* row = op->m_row;
|
|
require(row != 0);
|
|
log_debug(1, "push back to input: rowid " << row->m_rowid);
|
|
rows_in.push_back_force(row);
|
|
}
|
|
rows_in.unlock();
|
|
}
|
|
else if (ndberror.status == NdbError::PermanentError &&
|
|
ndberror.classification == NdbError::ConstraintViolation)
|
|
{
|
|
Error error; // local error
|
|
m_util.set_error_ndb(error, __LINE__, ndberror,
|
|
"permanent error");
|
|
while (tx->m_ops.cnt() != 0)
|
|
{
|
|
Op* op = tx->m_ops.pop_front();
|
|
require(op != 0);
|
|
require(op->m_row != 0);
|
|
reject_row(op->m_row, error);
|
|
m_rows_free.push_back(op->m_row);
|
|
op->m_row = NULL;
|
|
free_op(op);
|
|
}
|
|
}
|
|
else
|
|
{
|
|
m_util.set_error_ndb(m_error, __LINE__, ndberror);
|
|
}
|
|
close_trans(tx);
|
|
}
|
|
|
|
void
|
|
NdbImportImpl::ExecOpWorkerAsynch::state_define()
|
|
{
|
|
log_debug(2, "state_define/asynch");
|
|
const Opt& opt = m_util.c_opt;
|
|
TxList& tx_open = m_tx_open;
|
|
// no transes yet
|
|
require(tx_open.cnt() == 0);
|
|
m_errormap.clear();
|
|
/*
|
|
* Temporary errors can occur at auto-incr and start trans. We
|
|
* don't want to get stuck here on "permanent" temporary errors.
|
|
* So we limit them by opt.m_tmperrors (counted per op).
|
|
*/
|
|
while (m_ops.cnt() != 0)
|
|
{
|
|
Op* op = m_ops.pop_front();
|
|
Row* row = op->m_row;
|
|
require(row != 0);
|
|
const Table& table = m_util.get_table(row->m_tabid);
|
|
if (table.m_has_hidden_pk)
|
|
{
|
|
const Attrs& attrs = table.m_attrs;
|
|
const uint attrcnt = attrs.size();
|
|
const Attr& attr = attrs[attrcnt - 1];
|
|
require(attr.m_type == NdbDictionary::Column::Bigunsigned);
|
|
Uint64 val;
|
|
if (m_ndb->getAutoIncrementValue(table.m_tab, val,
|
|
opt.m_ai_prefetch_sz,
|
|
opt.m_ai_increment,
|
|
opt.m_ai_offset) == -1)
|
|
{
|
|
const NdbError& ndberror = m_ndb->getNdbError();
|
|
require(ndberror.code != 0);
|
|
if (ndberror.status == NdbError::TemporaryError)
|
|
{
|
|
m_errormap.add_one(ndberror.code);
|
|
uint temperrors = m_errormap.get_sum();
|
|
if (temperrors <= opt.m_temperrors)
|
|
{
|
|
log_debug(1, "get autoincr try " << temperrors << ": " << ndberror);
|
|
m_ops.push_front(op);
|
|
NdbSleep_MilliSleep(opt.m_tempdelay);
|
|
continue;
|
|
}
|
|
m_util.set_error_gen(m_error, __LINE__,
|
|
"number of transaction tries with"
|
|
" temporary errors is %u (limit %u)",
|
|
temperrors, opt.m_temperrors);
|
|
break;
|
|
}
|
|
else
|
|
{
|
|
m_util.set_error_ndb(m_error, __LINE__, ndberror,
|
|
"table %s: get autoincrement failed",
|
|
table.m_tab->getName());
|
|
break;
|
|
}
|
|
}
|
|
attr.set_value(row, &val, 8);
|
|
}
|
|
const bool no_hint = opt.m_no_hint;
|
|
Tx* tx = 0;
|
|
if (no_hint)
|
|
tx = start_trans();
|
|
else
|
|
tx = start_trans(m_nodeid, 0);
|
|
if (tx == 0)
|
|
{
|
|
const NdbError& ndberror = m_ndb->getNdbError();
|
|
require(ndberror.code != 0);
|
|
if (ndberror.status == NdbError::TemporaryError)
|
|
{
|
|
m_errormap.add_one(ndberror.code);
|
|
uint temperrors = m_errormap.get_sum();
|
|
if (temperrors <= opt.m_temperrors)
|
|
{
|
|
log_debug(1, "start trans try " << temperrors << ": " << ndberror);
|
|
m_ops.push_front(op);
|
|
NdbSleep_MilliSleep(opt.m_tempdelay);
|
|
continue;
|
|
}
|
|
m_util.set_error_gen(m_error, __LINE__,
|
|
"number of transaction tries with"
|
|
" temporary errors is %u (limit %u)",
|
|
temperrors, opt.m_temperrors);
|
|
break;
|
|
}
|
|
else
|
|
{
|
|
m_util.set_error_ndb(m_error, __LINE__, ndberror,
|
|
"table %s: start transaction failed",
|
|
table.m_tab->getName());
|
|
break;
|
|
}
|
|
}
|
|
NdbTransaction* trans = tx->m_trans;
|
|
require(trans != 0);
|
|
const NdbOperation* rowop = 0;
|
|
const char* rowdata = (const char*)row->m_data;
|
|
if ((rowop = trans->insertTuple(table.m_rec, rowdata)) == 0)
|
|
{
|
|
m_util.set_error_ndb(m_error, __LINE__, trans->getNdbError());
|
|
break;
|
|
}
|
|
for (uint j = 0; j < table.m_blobids.size(); j++)
|
|
{
|
|
uint i = table.m_blobids[j];
|
|
require(i < table.m_attrs.size());
|
|
const Attr& attr = table.m_attrs[i];
|
|
require(attr.m_isblob);
|
|
NdbBlob* bh = 0;
|
|
if ((bh = rowop->getBlobHandle(i)) == 0)
|
|
{
|
|
m_util.set_error_ndb(m_error, __LINE__, rowop->getNdbError());
|
|
break;
|
|
}
|
|
Blob* blob = row->m_blobs[attr.m_blobno];
|
|
if (!attr.get_null(row))
|
|
{
|
|
if (bh->setValue(blob->m_data, blob->m_blobsize) == -1)
|
|
{
|
|
m_util.set_error_ndb(m_error, __LINE__, bh->getNdbError());
|
|
break;
|
|
}
|
|
}
|
|
else
|
|
{
|
|
if (bh->setValue((void*)0, 0) == -1)
|
|
{
|
|
m_util.set_error_ndb(m_error, __LINE__, bh->getNdbError());
|
|
break;
|
|
}
|
|
}
|
|
bool batch = false;
|
|
if (bh->preExecute(NdbTransaction::Commit, batch) == -1)
|
|
{
|
|
m_util.set_error_ndb(m_error, __LINE__, bh->getNdbError());
|
|
break;
|
|
}
|
|
}
|
|
op->m_rowop = rowop;
|
|
tx->m_ops.push_back(op);
|
|
}
|
|
m_execstate = ExecState::State_prepare;
|
|
}
|
|
|
|
void
|
|
NdbImportImpl::ExecOpWorkerAsynch::state_prepare()
|
|
{
|
|
Tx* tx = m_tx_open.front();
|
|
while (tx != 0)
|
|
{
|
|
const NdbTransaction::ExecType et = NdbTransaction::Commit;
|
|
NdbTransaction* trans = tx->m_trans;
|
|
require(trans != 0);
|
|
trans->executeAsynchPrepare(et, &::asynch_callback, (void*)tx);
|
|
tx = tx->next();
|
|
}
|
|
m_execstate = ExecState::State_send;
|
|
}
|
|
|
|
void
|
|
NdbImportImpl::ExecOpWorkerAsynch::state_send()
|
|
{
|
|
log_debug(2, "state_send/asynch");
|
|
require(m_tx_open.cnt() != 0);
|
|
int forceSend = 0;
|
|
m_ndb->sendPreparedTransactions(forceSend);
|
|
m_execstate = ExecState::State_poll;
|
|
}
|
|
|
|
void
|
|
NdbImportImpl::ExecOpWorkerAsynch::state_poll()
|
|
{
|
|
log_debug(2, "state_poll/asynch");
|
|
const Opt& opt = m_util.c_opt;
|
|
int timeout = opt.m_polltimeout;
|
|
require(m_tx_open.cnt() != 0);
|
|
m_ndb->pollNdb(timeout, m_tx_open.cnt());
|
|
if (m_tx_open.cnt() != 0)
|
|
{
|
|
log_debug(2, "poll not ready");
|
|
return;
|
|
}
|
|
log_debug(2, "poll ready");
|
|
m_opcnt = 0;
|
|
m_opsize = 0;
|
|
if (m_errormap.size() != 0)
|
|
{
|
|
Job& job = m_team.m_job;
|
|
job.lock();
|
|
job.m_errormap.add_one(m_errormap);
|
|
uint temperrors = job.m_errormap.get_sum();
|
|
job.unlock();
|
|
if (temperrors <= opt.m_temperrors)
|
|
{
|
|
log_debug(1, "temp errors: sleep " << opt.m_tempdelay << "ms");
|
|
NdbSleep_MilliSleep(opt.m_tempdelay);
|
|
}
|
|
else
|
|
{
|
|
if (!m_util.has_error(m_error))
|
|
m_util.set_error_gen(m_error, __LINE__,
|
|
"number of db execution batches with"
|
|
" temporary errors is %u (limit %u)",
|
|
temperrors, opt.m_temperrors);
|
|
}
|
|
}
|
|
log_debug(1, "rowmap " << m_rowmap_out.size());
|
|
m_stat_rowmap->add(m_rowmap_out.size());
|
|
m_util.free_rows(m_rows_free);
|
|
m_execstate = ExecState::State_receive;
|
|
}
|
|
|
|
// print
|
|
|
|
const char*
|
|
NdbImportImpl::g_str_state(ExecState::State state)
|
|
{
|
|
const char* str = 0;
|
|
switch (state) {
|
|
case ExecState::State_null:
|
|
str = "null";
|
|
break;
|
|
case ExecState::State_receive:
|
|
str = "receive";
|
|
break;
|
|
case ExecState::State_define:
|
|
str = "define";
|
|
break;
|
|
case ExecState::State_prepare:
|
|
str = "prepare";
|
|
break;
|
|
case ExecState::State_send:
|
|
str = "send";
|
|
break;
|
|
case ExecState::State_poll:
|
|
str = "wait";
|
|
break;
|
|
case ExecState::State_eof:
|
|
str = "eof";
|
|
break;
|
|
}
|
|
require(str != 0);
|
|
return str;
|
|
}
|
|
|
|
void
|
|
NdbImportImpl::ExecOpWorker::str_state(char* str) const
|
|
{
|
|
sprintf(str, "%s/%s tx:free=%u,open=%u",
|
|
g_str_state(m_state), g_str_state(m_execstate),
|
|
m_tx_free.cnt(), m_tx_open.cnt());
|
|
}
|
|
|
|
// diag team
|
|
|
|
NdbImportImpl::DiagTeam::DiagTeam(Job& job,
|
|
uint workercnt) :
|
|
Team(job, "diag", workercnt),
|
|
// diag file errors are global
|
|
m_result_file(m_util, m_error),
|
|
m_reject_file(m_util, m_error),
|
|
m_rowmap_file(m_util, m_error),
|
|
m_stopt_file(m_util, m_error),
|
|
m_stats_file(m_util, m_error)
|
|
{
|
|
m_is_diag = true;
|
|
}
|
|
|
|
NdbImportImpl::DiagTeam::~DiagTeam()
|
|
{
|
|
}
|
|
|
|
NdbImportImpl::Worker*
|
|
NdbImportImpl::DiagTeam::create_worker(uint n)
|
|
{
|
|
DiagWorker* w = new DiagWorker(*this, n);
|
|
return w;
|
|
}
|
|
|
|
void
|
|
NdbImportImpl::DiagTeam::do_init()
|
|
{
|
|
log_debug(1, "do_init");
|
|
const Opt& opt = m_util.c_opt;
|
|
if (opt.m_resume)
|
|
{
|
|
read_old_diags();
|
|
if (has_error())
|
|
return;
|
|
}
|
|
open_new_diags();
|
|
}
|
|
|
|
void
|
|
NdbImportImpl::DiagTeam::read_old_diags(const char* name,
|
|
const char* path,
|
|
const Table& table,
|
|
RowList& rows_out)
|
|
{
|
|
log_debug(1, "read_old_diags: " << name << " path=" << path);
|
|
OptGuard optGuard(m_util);
|
|
Opt& opt = m_util.c_opt;
|
|
opt.m_ignore_lines = 1;
|
|
// use default MySQL spec for diags (set by OptCsv ctor)
|
|
OptCsv optcsv;
|
|
CsvSpec csvspec;
|
|
if (m_impl.m_csv.set_spec(csvspec, optcsv, OptCsv::ModeInput) == -1)
|
|
{
|
|
m_util.copy_error(m_error, m_util.c_error);
|
|
require(has_error());
|
|
return;
|
|
}
|
|
File file(m_util, m_error);
|
|
file.set_path(path);
|
|
if (file.do_open(File::Read_flags) == -1)
|
|
{
|
|
require(has_error());
|
|
m_job.m_fatal = true;
|
|
return;
|
|
}
|
|
// csv input requires at least 2 instances
|
|
Buf* buf[2];
|
|
CsvInput* csvinput[2];
|
|
RowList rows_reject;
|
|
RowMap rowmap_in[] = {m_util, m_util};
|
|
for (uint i = 0; i < 2; i++)
|
|
{
|
|
uint pagesize = opt.m_pagesize;
|
|
uint pagecnt = opt.m_pagecnt;
|
|
buf[i] = new Buf(true);
|
|
buf[i]->alloc(pagesize, 2 * pagecnt);
|
|
csvinput[i] = new CsvInput(m_impl.m_csv,
|
|
Name(name, i),
|
|
csvspec,
|
|
table,
|
|
*buf[i],
|
|
rows_out,
|
|
rows_reject,
|
|
rowmap_in[i],
|
|
m_job.m_stats);
|
|
csvinput[i]->do_init();
|
|
}
|
|
{
|
|
uint i = 0; // current index
|
|
uint n = 0; // number of buffer switches
|
|
while (1)
|
|
{
|
|
uint j = 1 - i;
|
|
CsvInput& csvinput1 = *csvinput[i];
|
|
Buf& b1 = *buf[i];
|
|
Buf& b2 = *buf[j];
|
|
b1.reset();
|
|
if (file.do_read(b1) == -1)
|
|
{
|
|
require(has_error());
|
|
break;
|
|
}
|
|
// if not first read, move tail from previous
|
|
if (n != 0)
|
|
{
|
|
require(b2.movetail(b1) == 0);
|
|
}
|
|
csvinput1.do_parse();
|
|
if (csvinput1.has_error())
|
|
{
|
|
m_util.copy_error(m_error, csvinput1.m_error);
|
|
require(has_error());
|
|
break;
|
|
}
|
|
csvinput1.do_eval();
|
|
if (csvinput1.has_error())
|
|
{
|
|
m_util.copy_error(m_error, csvinput1.m_error);
|
|
require(has_error());
|
|
break;
|
|
}
|
|
uint curr = 0;
|
|
uint left = 0;
|
|
csvinput1.do_send(curr, left);
|
|
require(!csvinput1.has_error());
|
|
require(left == 0);
|
|
if (b1.m_eof)
|
|
break;
|
|
i = j;
|
|
n++;
|
|
}
|
|
log_debug(1, "read_old_diags: " << name << " count=" << rows_out.cnt());
|
|
}
|
|
// XXX diag errors not yet handled
|
|
require(rows_reject.cnt() == 0);
|
|
}
|
|
|
|
void
|
|
NdbImportImpl::DiagTeam::read_old_diags()
|
|
{
|
|
log_debug(1, "read_old_diags");
|
|
const Opt& opt = m_util.c_opt;
|
|
Job& job = m_job;
|
|
// result
|
|
{
|
|
const char* path = opt.m_result_file;
|
|
const Table& table = m_util.c_result_table;
|
|
RowList rows;
|
|
read_old_diags("old-result", path, table, rows);
|
|
if (has_error())
|
|
return;
|
|
uint32 runno = Inval_uint32;
|
|
Row* row = 0;
|
|
while ((row = rows.pop_front()) != 0)
|
|
{
|
|
// runno
|
|
{
|
|
const Attr& attr = table.get_attr("runno");
|
|
uint32 x;
|
|
attr.get_value(row, x);
|
|
if (runno == Inval_uint32 || runno < x)
|
|
runno = x;
|
|
}
|
|
m_util.free_row(row);
|
|
}
|
|
if (runno == Inval_uint32)
|
|
{
|
|
m_util.set_error_gen(m_error, __LINE__,
|
|
"%s: no valid records found", path);
|
|
return;
|
|
}
|
|
job.m_runno = runno + 1;
|
|
}
|
|
// rowmap
|
|
{
|
|
const char* path = opt.m_rowmap_file;
|
|
const Table& table = m_util.c_rowmap_table;
|
|
RowList rows;
|
|
read_old_diags("old-rowmap", path, table, rows);
|
|
if (has_error())
|
|
return;
|
|
RowMap& rowmap_in = job.m_rowmap_in;
|
|
require(rowmap_in.empty());
|
|
Row* row = 0;
|
|
while ((row = rows.pop_front()) != 0)
|
|
{
|
|
Range range;
|
|
// runno
|
|
{
|
|
const Attr& attr = table.get_attr("runno");
|
|
uint32 runno;
|
|
attr.get_value(row, runno);
|
|
if (runno != job.m_runno - 1)
|
|
{
|
|
m_util.free_row(row);
|
|
continue;
|
|
}
|
|
}
|
|
// start
|
|
{
|
|
const Attr& attr = table.get_attr("start");
|
|
attr.get_value(row, range.m_start);
|
|
}
|
|
// end
|
|
{
|
|
const Attr& attr = table.get_attr("end");
|
|
attr.get_value(row, range.m_end);
|
|
}
|
|
// startpos
|
|
{
|
|
const Attr& attr = table.get_attr("startpos");
|
|
attr.get_value(row, range.m_startpos);
|
|
}
|
|
// endpos
|
|
{
|
|
const Attr& attr = table.get_attr("endpos");
|
|
attr.get_value(row, range.m_endpos);
|
|
}
|
|
// reject
|
|
{
|
|
const Attr& attr = table.get_attr("reject");
|
|
attr.get_value(row, range.m_reject);
|
|
}
|
|
m_util.free_row(row);
|
|
// add to old rowmap
|
|
rowmap_in.add(range);
|
|
}
|
|
if (rowmap_in.empty())
|
|
{
|
|
m_util.set_error_gen(m_error, __LINE__,
|
|
"%s: no records for run %u found",
|
|
path, job.m_runno - 1);
|
|
return;
|
|
}
|
|
log_debug(1, "old rowmap:" << rowmap_in);
|
|
}
|
|
// old counts
|
|
{
|
|
const char* path = opt.m_stats_file;
|
|
const Table& table = m_util.c_stats_table;
|
|
RowList rows;
|
|
read_old_diags("old-stats", path, table, rows);
|
|
if (has_error())
|
|
return;
|
|
uint64 old_rows = 0;
|
|
uint64 old_reject = 0;
|
|
uint64 old_runtime = 0;
|
|
Row* row = 0;
|
|
while ((row = rows.pop_front()) != 0)
|
|
{
|
|
char name[200];
|
|
{
|
|
const Attr& attr = table.get_attr("name");
|
|
attr.get_value(row, name, sizeof(name));
|
|
}
|
|
// rows
|
|
if (strcmp(name, "job-rows") == 0)
|
|
{
|
|
const Attr& attr = table.get_attr("sum");
|
|
uint64 value;
|
|
attr.get_value(row, value);
|
|
old_rows += value;
|
|
}
|
|
// reject
|
|
if (strcmp(name, "job-reject") == 0)
|
|
{
|
|
const Attr& attr = table.get_attr("sum");
|
|
uint64 value;
|
|
attr.get_value(row, value);
|
|
old_reject += value;
|
|
}
|
|
// runtime
|
|
if (strcmp(name, "job-runtime") == 0)
|
|
{
|
|
const Attr& attr = table.get_attr("sum");
|
|
uint64 value;
|
|
attr.get_value(row, value);
|
|
old_runtime += value;
|
|
}
|
|
m_util.free_row(row);
|
|
}
|
|
job.m_old_rows = old_rows;
|
|
job.m_old_reject = old_reject;
|
|
job.m_old_runtime = old_runtime;
|
|
log_debug(1, "old stats:" <<
|
|
" rows=" << old_rows <<
|
|
" reject=" << old_reject <<
|
|
" runtime=" << old_runtime);
|
|
}
|
|
}
|
|
|
|
void
|
|
NdbImportImpl::DiagTeam::open_new_diags()
|
|
{
|
|
log_debug(1, "open_new_diags");
|
|
const Opt& opt = m_util.c_opt;
|
|
int openflags = 0;
|
|
if (!opt.m_resume)
|
|
openflags = File::Write_flags;
|
|
else
|
|
openflags = File::Append_flags;
|
|
// use default MySQL spec for diags (set by OptCsv ctor)
|
|
OptCsv optcsv;
|
|
if (m_impl.m_csv.set_spec(m_csvspec, optcsv, OptCsv::ModeOutput) == -1)
|
|
{
|
|
m_util.copy_error(m_error, m_util.c_error);
|
|
require(has_error());
|
|
return;
|
|
}
|
|
// result
|
|
m_result_file.set_path(opt.m_result_file);
|
|
if (m_result_file.do_open(openflags) == -1)
|
|
{
|
|
require(has_error());
|
|
m_job.m_fatal = true;
|
|
return;
|
|
}
|
|
log_debug(1, "file: opened: " << m_result_file.get_path());
|
|
// reject
|
|
m_reject_file.set_path(opt.m_reject_file);
|
|
if (m_reject_file.do_open(openflags) == -1)
|
|
{
|
|
require(has_error());
|
|
m_job.m_fatal = true;
|
|
return;
|
|
}
|
|
log_debug(1, "file: opened: " << m_reject_file.get_path());
|
|
// rowmap
|
|
m_rowmap_file.set_path(opt.m_rowmap_file);
|
|
if (m_rowmap_file.do_open(openflags) == -1)
|
|
{
|
|
require(has_error());
|
|
m_job.m_fatal = true;
|
|
return;
|
|
}
|
|
log_debug(1, "file: opened: " << m_rowmap_file.get_path());
|
|
// stats opt
|
|
if (opt.m_stats)
|
|
{
|
|
m_stopt_file.set_path(opt.m_stopt_file);
|
|
if (m_stopt_file.do_open(openflags) == -1)
|
|
{
|
|
require(has_error());
|
|
m_job.m_fatal = true;
|
|
return;
|
|
}
|
|
log_debug(1, "file: opened: " << m_stopt_file.get_path());
|
|
}
|
|
// stats
|
|
if (opt.m_stats)
|
|
{
|
|
m_stats_file.set_path(opt.m_stats_file);
|
|
if (m_stats_file.do_open(openflags) == -1)
|
|
{
|
|
require(has_error());
|
|
m_job.m_fatal = true;
|
|
return;
|
|
}
|
|
log_debug(1, "file: opened: " << m_stats_file.get_path());
|
|
}
|
|
}
|
|
|
|
void
|
|
NdbImportImpl::DiagTeam::do_end()
|
|
{
|
|
log_debug(1, "do_end");
|
|
const Opt& opt = m_util.c_opt;
|
|
if (m_result_file.do_close() == -1)
|
|
{
|
|
require(has_error());
|
|
// continue
|
|
}
|
|
if (m_reject_file.do_close() == -1)
|
|
{
|
|
require(has_error());
|
|
// continue
|
|
}
|
|
if (m_rowmap_file.do_close() == -1)
|
|
{
|
|
require(has_error());
|
|
// continue
|
|
}
|
|
if (opt.m_stats)
|
|
{
|
|
if (m_stopt_file.do_close() == -1)
|
|
{
|
|
require(has_error());
|
|
// continue
|
|
}
|
|
}
|
|
if (opt.m_stats)
|
|
{
|
|
if (m_stats_file.do_close() == -1)
|
|
{
|
|
require(has_error());
|
|
// continue
|
|
}
|
|
}
|
|
}
|
|
|
|
NdbImportImpl::DiagWorker::DiagWorker(Team& team, uint n) :
|
|
Worker(team, n),
|
|
m_result_csv(NULL),
|
|
m_reject_csv(NULL),
|
|
m_rowmap_csv(NULL),
|
|
m_stopt_csv(NULL),
|
|
m_stats_csv(NULL)
|
|
{}
|
|
|
|
NdbImportImpl::DiagWorker::~DiagWorker()
|
|
{
|
|
delete m_result_csv;
|
|
delete m_reject_csv;
|
|
delete m_rowmap_csv;
|
|
delete m_stopt_csv;
|
|
delete m_stats_csv;
|
|
}
|
|
|
|
void
|
|
NdbImportImpl::DiagWorker::do_init()
|
|
{
|
|
log_debug(1, "do_init");
|
|
const Opt& opt = m_util.c_opt;
|
|
const CsvSpec& csvspec = static_cast<DiagTeam&>(m_team).m_csvspec;
|
|
if (has_error())
|
|
return;
|
|
// result
|
|
{
|
|
File& file = static_cast<DiagTeam&>(m_team).m_result_file;
|
|
Buf& buf = m_result_buf;
|
|
const Table& table = m_util.c_result_table;
|
|
uint pagesize = opt.m_pagesize;
|
|
uint pagecnt = opt.m_pagecnt;
|
|
buf.alloc(pagesize, pagecnt);
|
|
m_result_csv = new CsvOutput(m_impl.m_csv,
|
|
csvspec,
|
|
table,
|
|
m_result_buf);
|
|
m_result_csv->do_init();
|
|
if (!opt.m_resume)
|
|
{
|
|
m_result_csv->add_header();
|
|
if (file.do_write(buf) == -1)
|
|
{
|
|
require(has_error());
|
|
m_team.m_job.m_fatal = true;
|
|
return;
|
|
}
|
|
}
|
|
}
|
|
// reject
|
|
{
|
|
File& file = static_cast<DiagTeam&>(m_team).m_reject_file;
|
|
Buf& buf = m_reject_buf;
|
|
const Table& table = m_util.c_reject_table;
|
|
uint pagesize = opt.m_pagesize;
|
|
uint pagecnt = opt.m_pagecnt;
|
|
buf.alloc(pagesize, pagecnt);
|
|
m_reject_csv = new CsvOutput(m_impl.m_csv,
|
|
csvspec,
|
|
table,
|
|
m_reject_buf);
|
|
m_reject_csv->do_init();
|
|
if (!opt.m_resume)
|
|
{
|
|
m_reject_csv->add_header();
|
|
if (file.do_write(buf) == -1)
|
|
{
|
|
require(has_error());
|
|
m_team.m_job.m_fatal = true;
|
|
return;
|
|
}
|
|
}
|
|
}
|
|
// rowmap
|
|
{
|
|
File& file = static_cast<DiagTeam&>(m_team).m_rowmap_file;
|
|
Buf& buf = m_rowmap_buf;
|
|
const Table& table = m_util.c_rowmap_table;
|
|
uint pagesize = opt.m_pagesize;
|
|
uint pagecnt = opt.m_pagecnt;
|
|
buf.alloc(pagesize, pagecnt);
|
|
m_rowmap_csv = new CsvOutput(m_impl.m_csv,
|
|
csvspec,
|
|
table,
|
|
m_rowmap_buf);
|
|
m_rowmap_csv->do_init();
|
|
if (!opt.m_resume)
|
|
{
|
|
m_rowmap_csv->add_header();
|
|
if (file.do_write(buf) == -1)
|
|
{
|
|
require(has_error());
|
|
m_team.m_job.m_fatal = true;
|
|
return;
|
|
}
|
|
}
|
|
}
|
|
// stats opt
|
|
if (opt.m_stats)
|
|
{
|
|
File& file = static_cast<DiagTeam&>(m_team).m_stopt_file;
|
|
Buf& buf = m_stopt_buf;
|
|
const Table& table = m_util.c_stopt_table;
|
|
uint pagesize = opt.m_pagesize;
|
|
uint pagecnt = opt.m_pagecnt;
|
|
m_stopt_buf.alloc(pagesize, pagecnt);
|
|
m_stopt_csv = new CsvOutput(m_impl.m_csv,
|
|
csvspec,
|
|
table,
|
|
m_stopt_buf);
|
|
m_stopt_csv->do_init();
|
|
if (!opt.m_resume)
|
|
{
|
|
m_stopt_csv->add_header();
|
|
if (file.do_write(buf) == -1)
|
|
{
|
|
require(has_error());
|
|
m_team.m_job.m_fatal = true;
|
|
return;
|
|
}
|
|
}
|
|
}
|
|
// stats
|
|
if (opt.m_stats)
|
|
{
|
|
File& file = static_cast<DiagTeam&>(m_team).m_stats_file;
|
|
Buf& buf = m_stats_buf;
|
|
const Table& table = m_util.c_stats_table;
|
|
uint pagesize = opt.m_pagesize;
|
|
uint pagecnt = opt.m_pagecnt;
|
|
m_stats_buf.alloc(pagesize, pagecnt);
|
|
m_stats_csv = new CsvOutput(m_impl.m_csv,
|
|
csvspec,
|
|
table,
|
|
m_stats_buf);
|
|
m_stats_csv->do_init();
|
|
if (!opt.m_resume)
|
|
{
|
|
m_stats_csv->add_header();
|
|
if (file.do_write(buf) == -1)
|
|
{
|
|
require(has_error());
|
|
m_team.m_job.m_fatal = true;
|
|
return;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
void
|
|
NdbImportImpl::DiagWorker::do_run()
|
|
{
|
|
log_debug(2, "do_run");
|
|
// reject
|
|
write_reject();
|
|
// stop by request
|
|
if (m_dostop)
|
|
{
|
|
log_debug(1, "stop by request");
|
|
m_state = WorkerState::State_stop;
|
|
}
|
|
}
|
|
|
|
void
|
|
NdbImportImpl::DiagWorker::do_end()
|
|
{
|
|
const Opt& opt = m_util.c_opt;
|
|
log_debug(1, "do_end");
|
|
// result
|
|
write_result();
|
|
// rowmap
|
|
write_rowmap();
|
|
// stats opt
|
|
if (opt.m_stats)
|
|
{
|
|
write_stopt();
|
|
}
|
|
// stats
|
|
if (opt.m_stats)
|
|
{
|
|
write_stats();
|
|
}
|
|
}
|
|
|
|
void
|
|
NdbImportImpl::DiagWorker::write_result()
|
|
{
|
|
log_debug(1, "write_result");
|
|
DiagTeam& team = static_cast<DiagTeam&>(m_team);
|
|
const Job& job = team.m_job;
|
|
File& file = team.m_result_file;
|
|
Buf& buf = m_result_buf;
|
|
buf.reset();
|
|
const Table& table = m_util.c_result_table;
|
|
// fatal global error, should not happen in job scope
|
|
if (m_util.has_error())
|
|
{
|
|
Row* row = m_util.alloc_row(table);
|
|
const char* name = "IMP";
|
|
const char* desc = "";
|
|
uint64 rows = 0;
|
|
uint64 reject = 0;
|
|
uint64 temperrors = 0;
|
|
uint64 runtime = 0;
|
|
uint64 utime = 0;
|
|
const Error& error = m_util.c_error;
|
|
m_util.set_result_row(row,
|
|
job.m_runno,
|
|
name,
|
|
desc,
|
|
rows,
|
|
reject,
|
|
temperrors,
|
|
runtime,
|
|
utime,
|
|
error);
|
|
m_result_csv->add_line(row);
|
|
m_util.free_row(row);
|
|
}
|
|
// job
|
|
{
|
|
Row* row = m_util.alloc_row(table);
|
|
const Name name("job", job.m_jobno);
|
|
const char* desc = "job";
|
|
uint64 rows = job.m_stat_rows->m_max;
|
|
uint64 reject = job.m_stat_reject->m_max;
|
|
uint64 temperrors = job.m_errormap.get_sum();
|
|
uint64 runtime = job.m_timer.elapsed_msec();
|
|
uint64 utime = job.m_stat_utime->m_sum;
|
|
const Error& error = job.m_error;
|
|
m_util.set_result_row(row,
|
|
job.m_runno,
|
|
name,
|
|
desc,
|
|
rows,
|
|
reject,
|
|
temperrors,
|
|
runtime,
|
|
utime,
|
|
error);
|
|
m_result_csv->add_line(row);
|
|
m_util.free_row(row);
|
|
}
|
|
// teams
|
|
for (uint teamno = 0; teamno < job.m_teamcnt; teamno++)
|
|
{
|
|
Row* row = m_util.alloc_row(table);
|
|
const Team& team = *job.m_teams[teamno];
|
|
const Name name("team", team.m_teamno);
|
|
const Name desc(team.m_name);
|
|
uint64 rows = 0;
|
|
uint64 reject = 0;
|
|
uint64 temperrors = 0;
|
|
if (team.m_state != TeamState::State_stopped)
|
|
{
|
|
// not worth crashing
|
|
team.m_timer.stop();
|
|
}
|
|
uint64 runtime = team.m_timer.elapsed_msec();
|
|
uint64 utime = team.m_stat_utime->m_sum;
|
|
const Error& error = team.m_error;
|
|
m_util.set_result_row(row,
|
|
job.m_runno,
|
|
name,
|
|
desc,
|
|
rows,
|
|
reject,
|
|
temperrors,
|
|
runtime,
|
|
utime,
|
|
error);
|
|
m_result_csv->add_line(row);
|
|
m_util.free_row(row);
|
|
}
|
|
if (file.do_write(buf) == -1)
|
|
{
|
|
require(has_error());
|
|
m_team.m_job.m_fatal = true;
|
|
}
|
|
}
|
|
|
|
void
|
|
NdbImportImpl::DiagWorker::write_reject()
|
|
{
|
|
log_debug(2, "write_reject");
|
|
DiagTeam& team = static_cast<DiagTeam&>(m_team);
|
|
Job& job = team.m_job;
|
|
File& file = team.m_reject_file;
|
|
Buf& buf = m_reject_buf;
|
|
RowList& rows_reject = *job.m_rows_reject;
|
|
rows_reject.lock();
|
|
while (1)
|
|
{
|
|
Row* row = rows_reject.pop_front();
|
|
require(!rows_reject.m_eof);
|
|
if (row == 0)
|
|
{
|
|
m_idle = true;
|
|
break;
|
|
}
|
|
// Csv does not know runno so fix it here
|
|
{
|
|
const Table& table = m_util.c_reject_table;
|
|
const Attrs& attrs = table.m_attrs;
|
|
const void* p = attrs[0].get_value(row);
|
|
uint32 x = *(uint32*)p;
|
|
if (x == Inval_uint32)
|
|
{
|
|
attrs[0].set_value(row, &job.m_runno, sizeof(uint32));
|
|
}
|
|
else
|
|
{
|
|
require(x == job.m_runno);
|
|
}
|
|
}
|
|
buf.reset();
|
|
m_reject_csv->add_line(row);
|
|
if (file.do_write(buf) == -1)
|
|
{
|
|
require(has_error());
|
|
m_team.m_job.m_fatal = true;
|
|
m_util.free_row(row);
|
|
return;
|
|
}
|
|
// add to job level rowmap
|
|
job.m_rowmap_out.lock();
|
|
job.m_rowmap_out.add(row, true);
|
|
job.m_rowmap_out.unlock();
|
|
m_util.free_row(row);
|
|
}
|
|
rows_reject.unlock();
|
|
}
|
|
|
|
void
|
|
NdbImportImpl::DiagWorker::write_rowmap()
|
|
{
|
|
log_debug(1, "write_rowmap");
|
|
DiagTeam& team = static_cast<DiagTeam&>(m_team);
|
|
const Job& job = team.m_job;
|
|
File& file = team.m_rowmap_file;
|
|
Buf& buf = m_rowmap_buf;
|
|
const Table& table = m_util.c_rowmap_table;
|
|
const RowMap& rowmap = job.m_rowmap_out;
|
|
const RangeList& ranges = rowmap.m_ranges;
|
|
const Range* r = ranges.front();
|
|
while (r != 0)
|
|
{
|
|
Row* row = m_util.alloc_row(table);
|
|
const Range& range = *r;
|
|
m_util.set_rowmap_row(row, job.m_runno, range);
|
|
buf.reset();
|
|
m_rowmap_csv->add_line(row);
|
|
m_util.free_row(row);
|
|
if (file.do_write(buf) == -1)
|
|
{
|
|
require(has_error());
|
|
m_team.m_job.m_fatal = true;
|
|
return;
|
|
}
|
|
r = r->next();
|
|
}
|
|
}
|
|
|
|
void
|
|
NdbImportImpl::DiagWorker::write_stopt()
|
|
{
|
|
static const Opt& opt = m_util.c_opt;
|
|
DiagTeam& team = static_cast<DiagTeam&>(m_team);
|
|
const Job& job = team.m_job;
|
|
File& file = team.m_stopt_file;
|
|
Buf& buf = m_stopt_buf;
|
|
const Table& table = m_util.c_stopt_table;
|
|
// write performance related option values
|
|
const struct ov_st {
|
|
const char* m_option;
|
|
uint m_value;
|
|
} ov_list[] = {
|
|
{ "connections", opt.m_connections },
|
|
{ "input_workers", opt.m_input_workers },
|
|
{ "output_workers", opt.m_output_workers },
|
|
{ "db_workers", opt.m_db_workers },
|
|
{ "no_hint", (uint)opt.m_no_hint },
|
|
{ "pagesize", opt.m_pagesize },
|
|
{ "pagecnt", opt.m_pagecnt },
|
|
{ "pagebuffer", opt.m_pagebuffer },
|
|
{ "rowbatch", opt.m_rowbatch },
|
|
{ "rowbytes", opt.m_rowbytes },
|
|
{ "opbatch", opt.m_opbatch },
|
|
{ "opbytes", opt.m_opbytes },
|
|
{ "rowswait", opt.m_rowswait },
|
|
{ "idlespin", opt.m_idlespin },
|
|
{ "idlesleep", opt.m_idlesleep },
|
|
{ "checkloop", opt.m_checkloop },
|
|
{ "alloc_chunk", opt.m_alloc_chunk }
|
|
};
|
|
const uint ov_size = sizeof(ov_list) / sizeof(ov_list[0]);
|
|
for (uint i = 0; i < ov_size; i++)
|
|
{
|
|
const struct ov_st& ov = ov_list[i];
|
|
Row* row = m_util.alloc_row(table);
|
|
m_util.set_stopt_row(row, job.m_runno, ov.m_option, ov.m_value);
|
|
buf.reset();
|
|
m_stopt_csv->add_line(row);
|
|
m_util.free_row(row);
|
|
if (file.do_write(buf) == -1)
|
|
{
|
|
require(has_error());
|
|
m_team.m_job.m_fatal = true;
|
|
return;
|
|
}
|
|
}
|
|
}
|
|
|
|
void
|
|
NdbImportImpl::DiagWorker::write_stats()
|
|
{
|
|
DiagTeam& team = static_cast<DiagTeam&>(m_team);
|
|
const Job& job = team.m_job;
|
|
File& file = team.m_stats_file;
|
|
Buf& buf = m_stats_buf;
|
|
const Table& table = m_util.c_stats_table;
|
|
// write job and global (accumulating) stats
|
|
const Stats* stats_list[2] = { &job.m_stats, &m_util.c_stats };
|
|
for (uint k = 0; k <= 1; k++)
|
|
{
|
|
const Stats& stats = *stats_list[k];
|
|
const bool global = (k == 1);
|
|
for (uint id = 0; id < stats.m_stats.size(); id++)
|
|
{
|
|
const Stat* stat = stats.get(id);
|
|
buf.reset();
|
|
Row* row = m_util.alloc_row(table);
|
|
m_util.set_stats_row(row, job.m_runno, *stat, global);
|
|
m_stats_csv->add_line(row);
|
|
m_util.free_row(row);
|
|
if (file.do_write(buf) == -1)
|
|
{
|
|
require(has_error());
|
|
m_team.m_job.m_fatal = true;
|
|
return;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// global
|
|
|
|
NdbImportImpl::Jobs::Jobs()
|
|
{
|
|
m_jobno = 0;
|
|
}
|
|
|
|
NdbImportImpl::Jobs::~Jobs()
|
|
{
|
|
// XXX delete jobs
|
|
}
|
|
|
|
NdbImportImpl::Job*
|
|
NdbImportImpl::create_job()
|
|
{
|
|
Jobs& jobs = c_jobs;
|
|
// internal and external number from 1
|
|
Job* job = new Job(*this, ++jobs.m_jobno);
|
|
jobs.m_jobs.insert(std::pair<uint, Job*>(job->m_jobno, job));
|
|
job->do_create();
|
|
return job;
|
|
}
|
|
|
|
NdbImportImpl::Job*
|
|
NdbImportImpl::find_job(uint jobno)
|
|
{
|
|
Job* job = 0;
|
|
const Jobs& jobs = c_jobs;
|
|
std::map<uint, Job*>::const_iterator it;
|
|
it = jobs.m_jobs.find(jobno);
|
|
if (it != jobs.m_jobs.end())
|
|
{
|
|
job = it->second;
|
|
require(job->m_jobno == jobno);
|
|
}
|
|
return job;
|
|
}
|
|
|
|
extern "C" { static void* start_job_c(void* data); }
|
|
|
|
static void*
|
|
start_job_c(void* data)
|
|
{
|
|
NdbImportImpl::Job* job = (NdbImportImpl::Job*)data;
|
|
require(job != 0);
|
|
job->do_start();
|
|
return 0;
|
|
}
|
|
|
|
void
|
|
NdbImportImpl::start_job(Job* job)
|
|
{
|
|
NDB_THREAD_PRIO prio = NDB_THREAD_PRIO_MEAN;
|
|
uint stack_size = 64*1024;
|
|
job->m_thread = NdbThread_Create(
|
|
start_job_c, (void**)job, stack_size, "job", prio);
|
|
require(job->m_thread != 0);
|
|
}
|
|
|
|
void
|
|
NdbImportImpl::stop_job(Job* job)
|
|
{
|
|
job->do_stop();
|
|
log_debug(1, "done");
|
|
}
|
|
|
|
void
|
|
NdbImportImpl::wait_job(Job* job)
|
|
{
|
|
const Opt& opt = m_util.c_opt;
|
|
while (job->m_state != JobState::State_done)
|
|
{
|
|
log_debug(2, "wait for " << g_str_state(JobState::State_done));
|
|
NdbSleep_MilliSleep(opt.m_checkloop);
|
|
}
|
|
log_debug(1, "done");
|
|
}
|
|
|
|
void
|
|
NdbImportImpl::destroy_job(Job* job)
|
|
{
|
|
Jobs& jobs = c_jobs;
|
|
require(job != 0);
|
|
require(find_job(job->m_jobno) == job);
|
|
require(jobs.m_jobs.erase(job->m_jobno) == 1);
|
|
delete job;
|
|
}
|