/* Copyright (c) 2018, 2019, Oracle and/or its affiliates. All rights reserved. This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License, version 2.0, as published by the Free Software Foundation. This program is also distributed with certain software (including but not limited to OpenSSL) that is licensed under separate terms, as designated in a particular file or component or in included license documentation. The authors of MySQL hereby grant you an additional permission to link the program and your derivative works with the separately licensed software that they have included with MySQL. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License, version 2.0, for more details. You should have received a copy of the GNU General Public License along with this program; if not, write to the Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */ #include "NdbImportImpl.hpp" #include NdbImportImpl::NdbImportImpl(NdbImport& facade) : NdbImport(*this), m_facade(&facade), m_csv(m_util), m_error(m_util.c_error) { c_connectionindex = 0; log_debug(1, "ctor"); } NdbImportImpl::~NdbImportImpl() { log_debug(1, "dtor"); } NdbOut& operator<<(NdbOut& out, const NdbImportImpl& impl) { out << "impl "; return out; } // opt // mgm NdbImportImpl::Mgm::Mgm(NdbImportImpl& impl) : m_impl(impl), m_util(m_impl.m_util), m_error(m_util.c_error) { m_handle = 0; m_connected = false; m_status = 0; } NdbImportImpl::Mgm::~Mgm() { do_disconnect(); } NdbOut& operator<<(NdbOut& out, const NdbImportImpl::Mgm& mgm) { out << "mgm "; return out; } int NdbImportImpl::Mgm::do_connect() { log_debug(1, "do_connect"); require(m_handle == 0); m_handle = ndb_mgm_create_handle(); require(m_handle != 0); ndb_mgm_set_connectstring(m_handle, opt_ndb_connectstring); int retries = opt_connect_retries; int delay = opt_connect_retry_delay; if (ndb_mgm_connect(m_handle, retries, delay, 0) == -1) { m_util.set_error_mgm(m_error, __LINE__, m_handle); return -1; } m_connected = true; log_debug(1, "do_connect: success"); return 0; } void NdbImportImpl::Mgm::do_disconnect() { if (m_handle != 0) { if (m_status != 0) { free(m_status); m_status = 0; } if (m_connected) { (void)ndb_mgm_disconnect(m_handle); m_connected = false; } ndb_mgm_destroy_handle(&m_handle); m_handle = 0; log_debug(1, "do_disconnect: done"); } } int NdbImportImpl::Mgm::get_status() { log_debug(1, "get_status"); require(m_connected); require(m_status == 0); int retries = 0; while (retries < 10) { if ((m_status = ndb_mgm_get_status(m_handle)) != 0) { log_debug(1, "get_status: success"); return 0; } NdbSleep_SecSleep(1); retries++; log_debug(1, "get_status: retries " << retries); } m_util.set_error_mgm(m_error, __LINE__, m_handle); return -1; } // node NdbImportImpl::Node::Node() { m_nodeid = Inval_uint; } NdbImportImpl::Nodes::Nodes() { m_nodecnt = 0; for (uint i = 0; i < g_max_nodes; i++) m_index[i] = Inval_uint; } int NdbImportImpl::get_nodes(Nodes& c) { log_debug(1, "get_nodes"); c.m_nodecnt = 0; do { Mgm mgm(*this); if (mgm.do_connect() == -1) break; if (mgm.get_status() == -1) break; const ndb_mgm_cluster_state* status = mgm.m_status; for (uint i = 0; i < (uint)status->no_of_nodes; i++) { const ndb_mgm_node_state* anynode = &status->node_states[i]; if (anynode->node_type == NDB_MGM_NODE_TYPE_NDB) { Node& node = c.m_nodes[c.m_nodecnt]; require(node.m_nodeid == Inval_uint); node.m_nodeid = anynode->node_id; require(node.m_nodeid < g_max_nodes); require(c.m_index[node.m_nodeid] == Inval_uint); c.m_index[node.m_nodeid] = c.m_nodecnt; log_debug(1, "node " << c.m_nodecnt << ": " << node.m_nodeid); c.m_nodecnt++; } } mgm.do_disconnect(); return 0; } while (0); return -1; } // connect NdbImportImpl::Connect::Connect() { m_connectioncnt = 0; m_connections = 0; m_mainconnection = 0; m_connected = false; m_mainndb = 0; } NdbImportImpl::Connect::~Connect() { } int NdbImportImpl::do_connect() { log_debug(1, "do_connect"); const Opt& opt = m_util.c_opt; Connect& c = c_connect; if (c.m_connected) { m_util.set_error_usage(m_error, __LINE__); return -1; } { require(c.m_connections == 0 && c.m_mainconnection == 0); c.m_connectioncnt = opt.m_connections; c.m_connections = new Ndb_cluster_connection* [c.m_connectioncnt]; for (uint i = 0; i < c.m_connectioncnt; i++) { /* * There is a single --ndb-nodeid value but there can be * several connections. Use nodeid values consecutively * starting with --ndb-nodeid. All must exist as API * nodes in the config. */ int nodeid = opt_ndb_nodeid != 0 ? opt_ndb_nodeid + i : 0; c.m_connections[i] = new Ndb_cluster_connection(opt_ndb_connectstring, c.m_mainconnection, nodeid); if (i == 0) c.m_mainconnection = c.m_connections[i]; } for (uint i = 0; i < c.m_connectioncnt; i++) { log_debug(1, "connection " << i << " of " << c.m_connectioncnt); Ndb_cluster_connection* con = c.m_connections[i]; int retries = opt_connect_retries; int delay = opt_connect_retry_delay; if (con->connect(retries, delay, 1) != 0) { m_util.set_error_con(m_error, __LINE__, con); return -1; } log_debug(1, "connection " << i << " api nodeid " << con->node_id()); } } for (uint i = 0; i < c.m_connectioncnt; i++) { Ndb_cluster_connection* con = c.m_connections[i]; if (con->wait_until_ready(30, 0) < 0) { m_util.set_error_con(m_error, __LINE__, con); return -1; } log_debug(1, "connection " << i << " wait_until_ready done"); } require(c.m_mainndb == 0); c.m_mainndb = new Ndb(c.m_mainconnection); if (c.m_mainndb->init() != 0) { m_util.set_error_ndb(m_error, __LINE__, c.m_mainndb->getNdbError()); return -1; } if (c.m_mainndb->waitUntilReady() != 0) { m_util.set_error_ndb(m_error, __LINE__, c.m_mainndb->getNdbError()); return -1; } c.m_connected = true; log_debug(1, "do_connect: success"); return 0; } void NdbImportImpl::do_disconnect() { log_debug(1, "do_disconnect"); Connect& c = c_connect; // delete any ndb before delete connection delete c.m_mainndb; c.m_mainndb = 0; if (c.m_connections != 0) { for (uint i = 0; i < c.m_connectioncnt; i++) { log_debug(1, "delete connection " << i << " of " << c.m_connectioncnt); delete c.m_connections[i]; c.m_connections[i] = 0; } } delete [] c.m_connections; c.m_connections = 0; c.m_connected = false; log_debug(1, "do_disconnect: done"); } // tables int NdbImportImpl::add_table(const char* database, const char* table, uint& tabid, Error& error) { Connect& c = c_connect; if (!c.m_connected) { m_util.set_error_usage(error, __LINE__); return -1; } if (database == 0 || table == 0) { m_util.set_error_usage(error, __LINE__); return -1; } log_debug(1, "add table " << database << "." << table); Ndb* ndb = c.m_mainndb; if (strcmp(ndb->getDatabaseName(), database) != 0) { if (ndb->setDatabaseName(database) != 0) { m_util.set_error_ndb(error, __LINE__, ndb->getNdbError()); return -1; } } NdbDictionary::Dictionary* dic = ndb->getDictionary(); const NdbDictionary::Table* tab = dic->getTable(table); if (tab == 0) { m_util.set_error_ndb(error, __LINE__, dic->getNdbError()); return -1; } if (m_util.add_table(dic, tab, tabid, error) != 0) return -1; return 0; } int NdbImportImpl::remove_table(const uint table_id) { Connect& c = c_connect; if (!c.m_connected) { m_util.set_error_usage(m_error, __LINE__); return -1; } Ndb* ndb = c.m_mainndb; if (ndb == NULL) { m_util.set_error_usage(m_error, __LINE__); return -1; } NdbDictionary::Dictionary* dic = ndb->getDictionary(); m_util.remove_table(dic, table_id); return 0; } // files NdbImportImpl::WorkerFile::WorkerFile(NdbImportUtil& util, Error& error) : File(util, error) { m_workerno = Inval_uint; } // job NdbImportImpl::Job::Job(NdbImportImpl& impl, uint jobno) : m_impl(impl), m_util(m_impl.m_util), m_jobno(jobno), m_name("job", m_jobno), m_stats(m_util), m_rowmap_in(m_util), m_rowmap_out(m_util) { m_runno = 0; m_state = JobState::State_null; m_tabid = Inval_uint; m_dostop = false; m_fatal = false; m_teamcnt = 0; for (uint i = 0; i < g_max_teams; i++) m_teams[i] = 0; for (int k = 0; k < g_teamstatecnt; k++) m_teamstates[k] = 0; m_rows_relay = 0; for (uint i = 0; i < g_max_ndb_nodes; i++) m_rows_exec[i] = 0; m_rows_reject = 0; m_old_rows = 0; m_old_reject = 0; m_old_runtime = 0; m_new_rows = 0; m_new_reject = 0; m_new_runtime = 0; // stats Stats& stats = m_stats; { const Name name("job", "rows"); Stat* stat = stats.create(name, 0, 0); m_stat_rows = stat; } { const Name name("job", "reject"); Stat* stat = stats.create(name, 0, 0); m_stat_reject = stat; } { const Name name("job", "runtime"); Stat* stat = stats.create(name, 0, 0); m_stat_runtime = stat; } { const Name name("job", "rowssec"); Stat* stat = stats.create(name, 0, 0); m_stat_rowssec = stat; } { const Name name("job", "utime"); Stat* stat = stats.create(name, 0, 0); m_stat_utime = stat; } { const Name name("job", "stime"); Stat* stat = stats.create(name, 0, 0); m_stat_stime = stat; } { const Name name("job", "rowmap"); Stat* stat = stats.create(name, 0, 0); m_stat_rowmap = stat; } { const Name name("job", "rowmap-utime"); Stat* stat = stats.create(name, 0, 0); m_stat_rowmap_utime = stat; } log_debug(1, "ctor"); } NdbImportImpl::Job::~Job() { log_debug(1, "dtor"); for (uint i = 0; i < m_teamcnt; i++) delete m_teams[i]; delete m_rows_relay; for (uint i = 0; i < g_max_ndb_nodes; i++) delete m_rows_exec[i]; delete m_rows_reject; } void NdbImportImpl::Job::do_create() { const Opt& opt = m_util.c_opt; uint nodecnt = m_impl.c_nodes.m_nodecnt; require(nodecnt != 0); Job& job = *this; require(job.m_state == JobState::State_null); // diag team is team number 0 { uint workercnt = 1; Team* team = new DiagTeam(job, workercnt); add_team(team); } // worker teams start at number 1 if (opt.m_input_type != 0) { if (strcmp(opt.m_input_type, "random") == 0) { uint workercnt = opt.m_input_workers; RandomInputTeam* team = new RandomInputTeam(job, workercnt); add_team(team); } if (strcmp(opt.m_input_type, "csv") == 0) { uint workercnt = opt.m_input_workers; CsvInputTeam* team = new CsvInputTeam(job, workercnt); add_team(team); } } if (opt.m_output_type != 0) { if (strcmp(opt.m_output_type, "null") == 0) { uint workercnt = opt.m_output_workers; NullOutputTeam* team = new NullOutputTeam(job, workercnt); add_team(team); } if (strcmp(opt.m_output_type, "ndb") == 0) { uint workercnt = opt.m_output_workers; RelayOpTeam* team = new RelayOpTeam(job, workercnt); add_team(team); } if (strcmp(opt.m_output_type, "ndb") == 0) { require(opt.m_db_workers != 0); uint workercnt = opt.m_db_workers * nodecnt; ExecOpTeam* team = new ExecOpTeam(job, workercnt); add_team(team); } } // row queues Stats& stats = m_stats; { m_rows_relay = new RowList; m_rows_relay->set_stats(stats, "rows-relay"); uint rowbatch = opt.m_rowbatch; if (rowbatch != 0) m_rows_relay->m_rowbatch = rowbatch; uint rowbytes = opt.m_rowbytes; if (rowbytes != 0) m_rows_relay->m_rowbytes = rowbytes; } { for (uint i = 0; i < nodecnt; i++) { Name name("rows-exec", i); m_rows_exec[i] = new RowList; m_rows_exec[i]->set_stats(stats, name); uint rowbatch = opt.m_rowbatch; if (rowbatch != 0) m_rows_exec[i]->m_rowbatch = rowbatch; uint rowbytes = opt.m_rowbytes; if (rowbytes != 0) m_rows_exec[i]->m_rowbytes = rowbytes; } } { m_rows_reject = new RowList; m_rows_reject->set_stats(stats, "rows-reject"); } job.m_state = JobState::State_created; } void NdbImportImpl::Job::add_team(Team* team) { require(team != 0); require(m_teamcnt < g_max_teams); m_teams[m_teamcnt] = team; m_teamcnt++; } int NdbImportImpl::Job::add_table(const char* database, const char* table, uint& tabid) { return m_impl.add_table(database, table, tabid, m_error); } void NdbImportImpl::Job::set_table(uint tabid) { (void)m_util.get_table(tabid); m_tabid = tabid; } int NdbImportImpl::Job::remove_table(const uint table_id) { return m_impl.remove_table(table_id); } void NdbImportImpl::Job::do_start() { const Opt& opt = m_util.c_opt; log_debug(1, "start"); m_timer.start(); do { m_state = JobState::State_starting; start_diag_team(); if (has_error()) { m_state = JobState::State_stop; break; } if (opt.m_resume) start_resume(); start_teams(); if (has_error()) { m_state = JobState::State_stop; break; } m_state = JobState::State_running; while (m_state != JobState::State_stop) { log_debug(2, "running"); check_teams(false); check_userstop(); NdbSleep_MilliSleep(opt.m_checkloop); } } while (0); log_debug(1, "stop"); while (m_state != JobState::State_stopped) { log_debug(2, "stopping"); check_teams(true); if (m_state == JobState::State_stop) m_state = JobState::State_stopped; NdbSleep_MilliSleep(opt.m_checkloop); } log_debug(1, "stopped"); collect_teams(); collect_stats(); stop_diag_team(); log_debug(1, "rowmap out: " << m_rowmap_out); m_state = JobState::State_done; log_debug(1, "done"); } void NdbImportImpl::Job::start_diag_team() { Team* team = m_teams[0]; team->do_create(); team->do_start(); if (team->has_error()) { m_util.set_error_gen(m_error, __LINE__, "failed to start team %u-%s (state file manager)", team->m_teamno, team->m_name.str()); return; } team->do_run(); log_debug(1, "diag team started"); } void NdbImportImpl::Job::start_resume() { log_debug(1, "start_resume jobno=" << m_jobno); // verify old stats counts against old rowmap { uint64 old_rows; uint64 old_reject; m_rowmap_in.get_total(old_rows, old_reject); if (m_old_rows != old_rows || m_old_reject != old_reject) { m_util.set_error_gen(m_error, __LINE__, "inconsistent counts from old state files" " (*.stt vs *.map)" " rows %" PRIu64 " vs %" PRIu64 " reject %" PRIu64 " vs %" PRIu64, m_old_rows, old_rows, m_old_reject, old_reject); return; } } // copy entire old rowmap require(m_rowmap_out.empty()); m_rowmap_out.add(m_rowmap_in); // input worker handles seek in do_init() } void NdbImportImpl::Job::start_teams() { for (uint i = 1; i < m_teamcnt; i++) { Team* team = m_teams[i]; team->do_create(); } for (uint i = 1; i < m_teamcnt; i++) { Team* team = m_teams[i]; team->do_start(); if (team->has_error()) { m_util.set_error_gen(m_error, __LINE__, "failed to start team %u-%s", team->m_teamno, team->m_name.str()); return; } } for (uint i = 1; i < m_teamcnt; i++) { Team* team = m_teams[i]; team->do_run(); } log_debug(1, "teams started"); } void NdbImportImpl::Job::check_teams(bool dostop) { for (uint i = 1; i < m_teamcnt; i++) { Team* team = m_teams[i]; if (team->m_state == TeamState::State_null) { // never started team->m_state = TeamState::State_stopped; continue; } team->check_workers(); if (team->m_state == TeamState::State_stop) { team->do_stop(); } if (dostop && team->m_state != TeamState::State_stopped) { team->do_stop(); } if (!team->m_rowmap_out.empty()) { // lock since diag team also writes to job rowmap m_rowmap_out.lock(); m_rowmap_out.add(team->m_rowmap_out); log_debug(1, "rowmap " << m_rowmap_out.size() << " <- " << team->m_rowmap_out.size()); m_stat_rowmap->add(m_rowmap_out.size()); m_rowmap_out.unlock(); team->m_rowmap_out.clear(); } } for (int k = 0; k < g_teamstatecnt; k++) m_teamstates[k] = 0; for (uint i = 1; i < m_teamcnt; i++) { Team* team = m_teams[i]; int k = (int)team->m_state; require(k >= 0 && k < g_teamstatecnt); m_teamstates[k]++; } if (m_teamstates[TeamState::State_stopped] == m_teamcnt - 1) m_state = JobState::State_stop; } void NdbImportImpl::Job::check_userstop() { if (m_dostop && m_state != JobState::State_stop) { log_debug(1, "stop by user request"); m_util.set_error_gen(m_error, __LINE__, "stop by user request"); m_state = JobState::State_stop; } if (NdbImportUtil::g_stop_all && m_state != JobState::State_stop) { log_debug(1, "stop by user interrupt"); m_util.set_error_gen(m_error, __LINE__, "stop by user interrupt"); m_state = JobState::State_stop; } } void NdbImportImpl::Job::collect_teams() { char error_team[100] = ""; { Team* team = m_teams[0]; if (team->has_error()) sprintf(error_team + strlen(error_team), " %u-%s", team->m_teamno, team->m_name.str()); } for (uint i = 1; i < m_teamcnt; i++) { Team* team = m_teams[i]; require(team->m_state == TeamState::State_stopped); if (team->has_error()) sprintf(error_team + strlen(error_team), " %u-%s", team->m_teamno, team->m_name.str()); } if (strlen(error_team) != 0) { m_util.set_error_gen(m_error, __LINE__, "error in teams:%s", error_team); } } void NdbImportImpl::Job::collect_stats() { m_timer.stop(); uint64 msec = m_timer.elapsed_msec(); if (msec == 0) msec = 1; // counts const RowMap& rowmap = m_rowmap_out; uint64 total_rows; uint64 total_reject; rowmap.get_total(total_rows, total_reject); require(total_rows >= m_old_rows); require(total_reject >= m_old_reject); m_new_rows = total_rows - m_old_rows; m_new_reject = total_reject - m_old_reject; m_stat_rows->add(m_new_rows); m_stat_reject->add(m_new_reject); // times m_new_runtime = msec; m_stat_runtime->add(m_new_runtime); m_stat_rowssec->add((m_new_rows * 1000) / msec); m_stat_rowmap_utime->add(m_timer.m_utime_msec); } void NdbImportImpl::Job::stop_diag_team() { const Opt& opt = m_util.c_opt; Team* team = m_teams[0]; team->do_stop(); while (team->m_state != TeamState::State_stopped) { NdbSleep_MilliSleep(opt.m_checkloop); } log_debug(1, "diag team stopped"); } void NdbImportImpl::Job::do_stop() { m_dostop = true; } // team NdbImportImpl::Team::Team(Job& job, const char* name, uint workercnt) : m_job(job), m_impl(job.m_impl), m_util(m_impl.m_util), m_teamno(job.m_teamcnt), m_name(name), m_workercnt(workercnt), m_rowmap_out(m_util) { m_state = TeamState::State_null; m_workers = 0; for (int k = 0; k < g_workerstatecnt; k++) m_workerstates[k] = 0; m_tabid = Inval_uint; m_is_diag = false; // stats Stats& stats = m_job.m_stats; { const Name name(m_name, "runtime"); Stat* stat = stats.create(name, 0, 0); m_stat_runtime = stat; } { const Name name(m_name, "slice"); Stat* stat = stats.create(name, 0, 0); m_stat_slice = stat; } { const Name name(m_name, "idleslice"); Stat* stat = stats.create(name, 0, 0); m_stat_idleslice = stat; } { const Name name(m_name, "idlerun"); Stat* stat = stats.create(name, 0, 0); m_stat_idlerun = stat; } { const Name name(m_name, "utime"); uint parent = m_job.m_stat_utime->m_id; Stat* stat = stats.create(name, parent, 0); m_stat_utime = stat; } { const Name name(m_name, "stime"); uint parent = m_job.m_stat_stime->m_id; Stat* stat = stats.create(name, parent, 0); m_stat_stime = stat; } { const Name name(m_name, "rowmap"); Stat* stat = stats.create(name, 0, 0); m_stat_rowmap = stat; } log_debug(1, "ctor"); } NdbImportImpl::Team::~Team() { log_debug(1, "dtor"); if (m_workers != 0) { for (uint n = 0; n < m_workercnt; n++) { Worker* w = m_workers[n]; delete w; } delete [] m_workers; } } void NdbImportImpl::Team::do_create() { log_debug(1, "do_create"); require(m_state == TeamState::State_null); require(m_workers == 0); require(m_workercnt > 0); m_workers = new Worker* [m_workercnt]; for (uint n = 0; n < m_workercnt; n++) { m_workers[n] = 0; } for (uint n = 0; n < m_workercnt; n++) { Worker* w = create_worker(n); require(w != 0); m_workers[n] = w; } m_state = TeamState::State_created; } void NdbImportImpl::Team::do_start() { log_debug(1, "start"); m_timer.start(); require(m_state == TeamState::State_created); require(m_workers != 0); do_init(); if (has_error()) { m_state = TeamState::State_stop; return; } for (uint n = 0; n < m_workercnt; n++) { Worker* w = get_worker(n); start_worker(w); } wait_workers(WorkerState::State_wait); m_state = TeamState::State_started; } extern "C" { static void* start_worker_c(void* data); } static void* start_worker_c(void* data) { NdbImportImpl::Worker* w = (NdbImportImpl::Worker*)data; require(w != 0); w->do_start(); return 0; } NdbImportImpl::Worker* NdbImportImpl::Team::get_worker(uint n) { require(n < m_workercnt); Worker* w = m_workers[n]; require(w != 0); return w; } void NdbImportImpl::Team::start_worker(Worker* w) { NDB_THREAD_PRIO prio = NDB_THREAD_PRIO_MEAN; uint stack_size = 64*1024; w->m_thread = NdbThread_Create( start_worker_c, (void**)w, stack_size, w->m_name, prio); require(w->m_thread != 0); } void NdbImportImpl::Team::wait_workers(WorkerState::State state) { log_debug(1, "wait_workers"); for (uint n = 0; n < m_workercnt; n++) { Worker* w = get_worker(n); wait_worker(w, state); } } void NdbImportImpl::Team::wait_worker(Worker* w, WorkerState::State state) { log_debug(1, "wait_worker: " << *w << " for " << g_str_state(state)); const Opt& opt = m_util.c_opt; uint timeout = opt.m_idlesleep; w->lock(); while (1) { log_debug(1, *w << ": wait for " << g_str_state(state)); if (w->m_state == state || w->m_state == WorkerState::State_stopped) break; w->wait(timeout); } w->unlock(); } void NdbImportImpl::Team::do_run() { log_debug(1, "do_run"); if (has_error()) { if (m_state != TeamState::State_stopped) m_state = TeamState::State_stop; return; } for (uint n = 0; n < m_workercnt; n++) { Worker* w = get_worker(n); run_worker(w); } wait_workers(WorkerState::State_running); m_state = TeamState::State_running; } void NdbImportImpl::Team::run_worker(Worker* w) { log_debug(1, "run_worker: " << *w); w->lock(); w->m_state = WorkerState::State_run; w->signal(); w->unlock(); } void NdbImportImpl::Team::check_workers() { log_debug(2, "check_workers"); for (int k = 0; k < g_workerstatecnt; k++) m_workerstates[k] = 0; for (uint n = 0; n < m_workercnt; n++) { Worker* w = get_worker(n); w->lock(); log_debug(2, "check_worker " << *w); int k = (int)w->m_state; require(k >= 0 && k < g_workerstatecnt); m_workerstates[k]++; // transfer rowmap while worker is locked if (!w->m_rowmap_out.empty()) { m_rowmap_out.add(w->m_rowmap_out); log_debug(1, "rowmap " << m_rowmap_out.size() << " <- " << w->m_rowmap_out.size()); m_stat_rowmap->add(m_rowmap_out.size()); w->m_rowmap_out.clear(); } log_debug(2, "rowmap out: " << m_rowmap_out); w->unlock(); } if (m_workerstates[WorkerState::State_stopped] == m_workercnt) { if (m_state != TeamState::State_stopped) m_state = TeamState::State_stop; } if (has_error()) { if (m_state != TeamState::State_stopped) m_state = TeamState::State_stop; } log_debug(2, "check_workers done"); } void NdbImportImpl::Team::do_stop() { log_debug(1, "do_stop"); for (uint n = 0; n < m_workercnt; n++) { Worker* w = get_worker(n); stop_worker(w); } wait_workers(WorkerState::State_stopped); // transfer final rowmap entries for (uint n = 0; n < m_workercnt; n++) { Worker* w = get_worker(n); if (!w->m_rowmap_out.empty()) { m_rowmap_out.add(w->m_rowmap_out); m_stat_rowmap->add(m_rowmap_out.size()); w->m_rowmap_out.clear(); } } do_end(); for (uint n = 0; n < m_workercnt; n++) { Worker* w = get_worker(n); if (w->m_thread != 0) w->join(); w->m_thread = 0; } m_state = TeamState::State_stopped; m_timer.stop(); // stats { uint64 msec = m_timer.elapsed_msec(); if (msec == 0) msec = 1; m_stat_runtime->add(msec); } } void NdbImportImpl::Team::stop_worker(Worker* w) { log_debug(1, "stop_worker: " << *w); w->lock(); switch (w->m_state) { case WorkerState::State_null: w->m_state = WorkerState::State_stopped; w->signal(); break; case WorkerState::State_wait: w->m_state = WorkerState::State_stopped; w->signal(); break; case WorkerState::State_running: /* * Here we do not interfere with worker state but ask it * to stop when convenient. It is enough and simpler for * only producers to act on this flag. */ w->m_dostop = true; w->signal(); break; case WorkerState::State_stop: /* * Worker is about to stop, allow it to do so. It is either * ready or it is reacting to m_dostop. */ break; case WorkerState::State_stopped: break; default: require(false); break; } w->unlock(); } void NdbImportImpl::Team::set_table(uint tabid) { (void)m_util.get_table(tabid); m_tabid = tabid; } // worker NdbImportImpl::Worker::Worker(NdbImportImpl::Team& team, uint n) : m_team(team), m_impl(m_team.m_impl), m_util(m_impl.m_util), m_workerno(n), m_name(team.m_name, m_workerno), m_rowmap_out(m_util), m_error(m_team.m_error) { m_state = WorkerState::State_null; m_dostop = false; m_slice = 0; m_idleslice = 0; m_idle = false; m_idlerun = 0; m_seed = 0; // stats Stats& stats = m_team.m_job.m_stats; { const Name name(m_name, "slice"); uint parent = m_team.m_stat_slice->m_id; Stat* stat = stats.create(name, parent, 0); m_stat_slice = stat; } { const Name name(m_name, "idleslice"); uint parent = m_team.m_stat_idleslice->m_id; Stat* stat = stats.create(name, parent, 0); m_stat_idleslice = stat; } { const Name name(m_name, "idlerun"); uint parent = m_team.m_stat_idlerun->m_id; Stat* stat = stats.create(name, parent, 0); m_stat_idlerun = stat; } { const Name name(m_name, "utime"); uint parent = m_team.m_stat_utime->m_id; Stat* stat = stats.create(name, parent, 0); m_stat_utime = stat; } { const Name name(m_name, "stime"); uint parent = m_team.m_stat_stime->m_id; Stat* stat = stats.create(name, parent, 0); m_stat_stime = stat; } { const Name name(m_name, "rowmap"); Stat* stat = stats.create(name, 0, 0); m_stat_rowmap = stat; } log_debug(1, "ctor"); } NdbImportImpl::Worker::~Worker() { log_debug(1, "dtor"); } void NdbImportImpl::Worker::do_start() { log_debug(1, "start"); const Opt& opt = m_util.c_opt; uint timeout = opt.m_idlesleep; do_init(); if (has_error()) { m_state = WorkerState::State_stop; } m_seed = (unsigned)(NdbHost_GetProcessId() ^ m_workerno); while (m_state != WorkerState::State_stopped) { log_debug(2, "slice: " << m_slice); lock(); m_idle = false; switch (m_state) { case WorkerState::State_null: m_state = WorkerState::State_wait; break; case WorkerState::State_wait: wait(timeout); break; case WorkerState::State_run: m_state = WorkerState::State_running; break; case WorkerState::State_running: do_run(); m_slice++; if (m_idle) { m_idleslice++; m_idlerun++; m_stat_idlerun->add(m_idlerun); } else { m_idlerun = 0; } break; case WorkerState::State_stop: do_end(); m_slice++; m_stat_slice->add(m_slice); m_stat_idleslice->add(m_idleslice); m_timer.stop(); m_stat_utime->add(m_timer.m_utime_msec); m_stat_stime->add(m_timer.m_stime_msec); m_state = WorkerState::State_stopped; break; case WorkerState::State_stopped: require(false); break; default: require(false); break; } if (has_error()) { if (m_state != WorkerState::State_stopped) m_state = WorkerState::State_stop; } signal(); unlock(); if (!m_team.m_is_diag) { if (m_idlerun > opt.m_idlespin && opt.m_idlesleep != 0) { NdbSleep_MilliSleep(opt.m_idlesleep); } } else { NdbSleep_MilliSleep(opt.m_checkloop); } } log_debug(1, "stopped"); } NdbImportImpl::Worker* NdbImportImpl::Worker::next_worker() { Team& team = m_team; require(team.m_workercnt > 0); uint n = (m_workerno + 1) % team.m_workercnt; Worker* w = team.get_worker(n); return w; } // print const char* NdbImportImpl::g_str_state(JobState::State state) { const char* str = 0; switch (state) { case JobState::State_null: str = "null"; break; case JobState::State_created: str = "created"; break; case JobState::State_starting: str = "starting"; break; case JobState::State_running: str = "running"; break; case JobState::State_stop: str = "stop"; break; case JobState::State_stopped: str = "stopped"; break; case JobState::State_done: str = "done"; break; } require(str != 0); return str; } const char* NdbImportImpl::g_str_state(TeamState::State state) { const char* str = 0; switch (state) { case TeamState::State_null: str = "null"; break; case TeamState::State_created: str = "created"; break; case TeamState::State_started: str = "started"; break; case TeamState::State_running: str = "running"; break; case TeamState::State_stop: str = "stop"; break; case TeamState::State_stopped: str = "stopped"; break; } require(str != 0); return str; } const char* NdbImportImpl::g_str_state(WorkerState::State state) { const char* str = 0; switch (state) { case WorkerState::State_null: str = "null"; break; case WorkerState::State_wait: str = "wait"; break; case WorkerState::State_run: str = "run"; break; case WorkerState::State_running: str = "running"; break; case WorkerState::State_stop: str = "stop"; break; case WorkerState::State_stopped: str = "stopped"; break; } require(str != 0); return str; } void NdbImportImpl::Job::str_state(char* str) const { strcpy(str, g_str_state(m_state)); } void NdbImportImpl::Team::str_state(char* str) const { strcpy(str, g_str_state(m_state)); } void NdbImportImpl::Worker::str_state(char* str) const { strcpy(str, g_str_state(m_state)); } NdbOut& operator<<(NdbOut& out, const NdbImportImpl::Job& job) { char str[100]; job.str_state(str); out << "J-" << job.m_jobno << " [" << str << "] "; if (job.has_error()) { const Error& error = job.m_error; const char* typetext = error.gettypetext(); out << "error[" << typetext << "-" << error.code << "] "; } return out; } NdbOut& operator<<(NdbOut& out, const NdbImportImpl::Team& team) { char str[100]; team.str_state(str); out << "T-" << team.m_teamno << " " << team.m_name << " [" << str << "] "; if (team.has_error()) { const Error& error = team.m_error; const char* typetext = error.gettypetext(); out << "error[" << typetext << "-" << error.code << "] "; } return out; } NdbOut& operator<<(NdbOut& out, const NdbImportImpl::Worker& w) { char str[100]; w.str_state(str); out << "W " << w.m_name << " [" << str << "] "; return out; } // random input team NdbImportImpl::RandomInputTeam::RandomInputTeam(Job& job, uint workercnt) : Team(job, "random-input", workercnt) { } NdbImportImpl::RandomInputTeam::~RandomInputTeam() { } NdbImportImpl::Worker* NdbImportImpl::RandomInputTeam::create_worker(uint n) { RandomInputWorker* w = new RandomInputWorker(*this, n); return w; } void NdbImportImpl::RandomInputTeam::do_init() { log_debug(1, "do_init"); set_table(m_job.m_tabid); } void NdbImportImpl::RandomInputTeam::do_end() { log_debug(1, "do_end"); RowList& rows_out = *m_job.m_rows_relay; rows_out.lock(); require(!rows_out.m_eof); rows_out.m_eof = true; rows_out.unlock(); } NdbImportImpl::RandomInputWorker::RandomInputWorker(Team& team, uint n) : Worker(team, n) { m_seed = 0; } NdbImportImpl::RandomInputWorker::~RandomInputWorker() { } void NdbImportImpl::RandomInputWorker::do_init() { log_debug(1, "do_init"); } void NdbImportImpl::RandomInputWorker::do_run() { log_debug(2, "do_run"); const Opt& opt = m_util.c_opt; const uint tabid = m_team.m_tabid; const Table& table = m_util.get_table(tabid); RowList& rows_out = *m_team.m_job.m_rows_relay; uint64 max_rows = opt.m_max_rows != 0 ? opt.m_max_rows : UINT64_MAX; rows_out.lock(); for (uint i = 0; i < opt.m_rowbatch; i++) { if (rows_out.totcnt() >= max_rows) { log_debug(1, "stop at max rows " << max_rows); m_state = WorkerState::State_stop; break; } if (m_dostop) { log_debug(1, "stop by request"); m_state = WorkerState::State_stop; break; } uint64 rowid = rows_out.totcnt(); Row* row = create_row(rowid, table); if (row == 0) { require(has_error()); break; } require(row->m_tabid == table.m_tabid); if (!rows_out.push_back(row)) { m_idle = true; break; } } rows_out.unlock(); } void NdbImportImpl::RandomInputWorker::do_end() { log_debug(1, "do_end"); } NdbImportImpl::Row* NdbImportImpl::RandomInputWorker::create_row(uint64 rowid, const Table& table) { Row* row = m_util.alloc_row(table); row->m_rowid = rowid; const Attrs& attrs = table.m_attrs; const uint attrcnt = attrs.size(); char keychr[100]; uint keylen; sprintf(keychr, "%" PRIu64 ":", rowid); keylen = strlen(keychr); for (uint i = 0; i < attrcnt; i++) { const Attr& attr = attrs[i]; switch (attr.m_type) { case NdbDictionary::Column::Unsigned: { uint32 val = (uint32)rowid; attr.set_value(row, &val, sizeof(val)); } break; case NdbDictionary::Column::Bigunsigned: { uint64 val = rowid; attr.set_value(row, &val, sizeof(val)); } break; case NdbDictionary::Column::Varchar: { const uint maxsize = 255; uchar val[maxsize]; uint maxlen = attr.m_length; uint len = attr.m_pk ? maxlen : get_rand() % (maxlen + 1); for (uint i = 0; i < len; i++) val[i] = keychr[i % keylen]; attr.set_value(row, val, len); } break; case NdbDictionary::Column::Longvarchar: { const uint maxsize = 65535; uchar val[maxsize]; uint maxlen = attr.m_length; uint len = attr.m_pk ? maxlen : get_rand() % (maxlen + 1); for (uint i = 0; i < len; i++) val[i] = keychr[i % keylen]; attr.set_value(row, val, len); } break; default: { m_util.set_error_usage(m_error, __LINE__, "column type %d not supported for random input", (int)attr.m_type); return 0; } break; } } return row; } // csv input team NdbImportImpl::CsvInputTeam::CsvInputTeam(Job& job, uint workercnt) : Team(job, "csv-input", workercnt), m_file(m_util, m_error) { // stats Stats& stats = m_job.m_stats; { const Name name(m_name, "waittail"); Stat* stat = stats.create(name, 0, 0); m_stat_waittail = stat; } { const Name name(m_name, "waitmove"); Stat* stat = stats.create(name, 0, 0); m_stat_waitmove = stat; } { const Name name(m_name, "movetail"); Stat* stat = stats.create(name, 0, 0); m_stat_movetail = stat; } } NdbImportImpl::CsvInputTeam::~CsvInputTeam() { } NdbImportImpl::Worker* NdbImportImpl::CsvInputTeam::create_worker(uint n) { CsvInputWorker* w = new CsvInputWorker(*this, n); return w; } void NdbImportImpl::CsvInputTeam::do_init() { log_debug(1, "do_init"); const Opt& opt = m_util.c_opt; const OptCsv& optcsv = opt.m_optcsv; if (m_impl.m_csv.set_spec(m_csvspec, optcsv, OptCsv::ModeInput) == -1) { require(m_util.has_error()); return; } set_table(m_job.m_tabid); WorkerFile& file = m_file; file.set_path(opt.m_input_file); if (file.do_open(File::Read_flags) == -1) { require(has_error()); m_job.m_fatal = true; return; } log_debug(1, "file: opened: " << file.get_path()); const uint workerno = 0; file.m_workerno = workerno; CsvInputWorker* w = static_cast(get_worker(workerno)); w->m_firstread = true; } void NdbImportImpl::CsvInputTeam::do_end() { log_debug(1, "do_end"); WorkerFile& file = m_file; if (file.do_close() == -1) { require(has_error()); // continue } RowList& rows_out = *m_job.m_rows_relay; rows_out.lock(); require(!rows_out.m_eof); rows_out.m_eof = true; rows_out.unlock(); } NdbImportImpl::CsvInputWorker::CsvInputWorker(Team& team, uint n) : Worker(team, n), m_buf(true) { m_inputstate = InputState::State_null; m_csvinput = 0; m_firstread = false; m_eof = false; } NdbImportImpl::CsvInputWorker::~CsvInputWorker() { delete m_csvinput; } void NdbImportImpl::CsvInputWorker::do_init() { log_debug(1, "do_init"); const Opt& opt = m_util.c_opt; const CsvSpec& csvspec = static_cast(m_team).m_csvspec; const uint tabid = m_team.m_tabid; const Table& table = m_util.get_table(tabid); uint pagesize = opt.m_pagesize; uint pagecnt = opt.m_pagecnt; m_buf.alloc(pagesize, 2 * pagecnt); RowList& rows_out = *m_team.m_job.m_rows_relay; RowList& rows_reject = *m_team.m_job.m_rows_reject; RowMap& rowmap_in = m_team.m_job.m_rowmap_in; m_csvinput = new CsvInput(m_impl.m_csv, Name("csvinput", m_workerno), csvspec, table, m_buf, rows_out, rows_reject, rowmap_in, m_team.m_job.m_stats); m_csvinput->do_init(); if (m_firstread) { // this worker does first read if (opt.m_resume) { CsvInputTeam& team = static_cast(m_team); WorkerFile& file = team.m_file; RangeList& ranges_in = rowmap_in.m_ranges; require(!ranges_in.empty()); Range range_in = *ranges_in.front(); /* * First range is likely to be the big one. If the range * starts with rowid 0 seek to the end and erase it. * In rare cases rowid 0 may not yet have been processed * due to an early error and rejected out of order rows. */ if (range_in.m_start == 0) { uint64 seekpos = range_in.m_endpos; if (file.do_seek(seekpos) == -1) { require(has_error()); return; } log_debug(1, "file " << file.get_path() << ": " "seek to pos " << seekpos << " done"); m_csvinput->do_resume(range_in); (void)ranges_in.pop_front(); } else { log_debug(1, "file " << file.get_path() << ": " "cannot seek first rowid=" << range_in.m_start); } } } } void NdbImportImpl::CsvInputWorker::do_run() { log_debug(2, "do_run"); switch (m_inputstate) { case InputState::State_null: m_inputstate = InputState::State_lock; break; case InputState::State_lock: state_lock(); break; case InputState::State_read: state_read(); break; case InputState::State_waittail: state_waittail(); break; case InputState::State_parse: state_parse(); break; case InputState::State_movetail: state_movetail(); break; case InputState::State_eval: state_eval(); break; case InputState::State_send: state_send(); break; case InputState::State_eof: m_state = WorkerState::State_stop; break; default: require(false); break; } } void NdbImportImpl::CsvInputWorker::do_end() { log_debug(1, "do_end"); } void NdbImportImpl::CsvInputWorker::state_lock() { log_debug(2, "state_lock"); if (m_dostop) { log_debug(1, "stop by request"); m_state = WorkerState::State_stop; return; } WorkerFile& file = static_cast(m_team).m_file; file.lock(); if (file.m_workerno == m_workerno) m_inputstate = InputState::State_read; else m_idle = true; file.unlock(); } void NdbImportImpl::CsvInputWorker::state_read() { log_debug(2, "state_read"); if (m_dostop) { log_debug(1, "stop by request"); m_state = WorkerState::State_stop; return; } WorkerFile& file = static_cast(m_team).m_file; Buf& buf = m_buf; buf.reset(); if (file.do_read(buf) == -1) { require(has_error()); return; } log_debug(2, "file: read: " << buf.m_len); if (buf.m_eof) { log_debug(1, "eof"); m_eof = true; } file.lock(); CsvInputWorker* w2 = static_cast(next_worker()); file.m_workerno = w2->m_workerno; file.unlock(); if (m_firstread) { m_inputstate = InputState::State_parse; m_firstread = false; } else { m_inputstate = InputState::State_waittail; } } void NdbImportImpl::CsvInputWorker::state_waittail() { log_debug(2, "state_waittail"); if (m_dostop) { log_debug(1, "stop by request"); m_state = WorkerState::State_stop; return; } CsvInputTeam& team = static_cast(m_team); team.m_stat_waittail->add(1); m_idle = true; } void NdbImportImpl::CsvInputWorker::state_parse() { log_debug(2, "state_parse"); m_csvinput->do_parse(); log_debug(2, "lines parsed:" << m_csvinput->m_line_list.cnt()); m_inputstate = InputState::State_movetail; } void NdbImportImpl::CsvInputWorker::state_movetail() { log_debug(2, "state_movetail"); if (m_dostop) { log_debug(1, "stop by request"); m_state = WorkerState::State_stop; return; } CsvInputTeam& team = static_cast(m_team); CsvInputWorker* w2 = static_cast(next_worker()); w2->lock(); log_debug(2, "next worker: " << *w2); if (w2->m_inputstate == InputState::State_waittail) { m_csvinput->do_movetail(*w2->m_csvinput); team.m_stat_movetail->add(1); m_inputstate = InputState::State_eval; w2->m_inputstate = InputState::State_parse; } else if (w2->m_inputstate == InputState::State_eof) { m_inputstate = InputState::State_eval; } else { // cannot move tail yet team.m_stat_waitmove->add(1); m_idle = true; } w2->unlock(); } void NdbImportImpl::CsvInputWorker::state_eval() { log_debug(2, "state_eval"); m_csvinput->do_eval(); m_inputstate = InputState::State_send; } void NdbImportImpl::CsvInputWorker::state_send() { log_debug(2, "state_send"); const Opt& opt = m_util.c_opt; do { // max-rows is a test option, it need not be exact if (opt.m_max_rows != 0) { RowList& rows_out = *m_team.m_job.m_rows_relay; if (rows_out.totcnt() >= opt.m_max_rows) { log_debug(1, "stop on max-rows option"); m_inputstate = InputState::State_eof; break; } } uint curr = 0; uint left = 0; m_csvinput->do_send(curr, left); log_debug(2, "send: rows curr=" << curr << " left=" << left); if (m_csvinput->has_error()) { m_util.copy_error(m_error, m_csvinput->m_error); break; } if (left != 0) { log_debug(2, "send not ready"); m_idle = true; break; } if (!m_eof) { log_debug(2, "send ready and not eof"); // stop if csv error if (m_csvinput->has_error()) { m_util.copy_error(m_error, m_csvinput->m_error); break; } if (m_dostop) { log_debug(1, "stop by request"); m_state = WorkerState::State_stop; break; } m_inputstate = InputState::State_lock; break; } log_debug(2, "send ready and eof"); m_inputstate = InputState::State_eof; } while (0); } // print const char* NdbImportImpl::g_str_state(InputState::State state) { const char* str = 0; switch (state) { case InputState::State_null: str = "null"; break; case InputState::State_lock: str = "lock"; break; case InputState::State_read: str = "read"; break; case InputState::State_waittail: str = "waittail"; break; case InputState::State_parse: str = "parse"; break; case InputState::State_movetail: str = "movetail"; break; case InputState::State_eval: str = "eval"; break; case InputState::State_send: str = "send"; break; case InputState::State_eof: str = "eof"; break; } require(str != 0); return str; } void NdbImportImpl::CsvInputWorker::str_state(char* str) const { sprintf(str, "%s/%s", g_str_state(m_state), g_str_state(m_inputstate)); } // null output team NdbImportImpl::NullOutputTeam::NullOutputTeam(Job& job, uint workercnt) : Team(job, "null-output", workercnt) { } NdbImportImpl::NullOutputTeam::~NullOutputTeam() { } NdbImportImpl::Worker* NdbImportImpl::NullOutputTeam::create_worker(uint n) { NullOutputWorker* w = new NullOutputWorker(*this, n); return w; } void NdbImportImpl::NullOutputTeam::do_init() { log_debug(1, "do_init"); } void NdbImportImpl::NullOutputTeam::do_end() { log_debug(1, "do_end"); } NdbImportImpl::NullOutputWorker::NullOutputWorker(Team& team, uint n) : Worker(team, n) { } NdbImportImpl::NullOutputWorker::~NullOutputWorker() { } void NdbImportImpl::NullOutputWorker::do_init() { log_debug(1, "do_init"); } void NdbImportImpl::NullOutputWorker::do_run() { log_debug(2, "do_run"); RowList& rows_in = *m_team.m_job.m_rows_relay; rows_in.lock(); Row* row = rows_in.pop_front(); bool eof = (row == 0 && rows_in.m_eof); rows_in.unlock(); if (eof) { m_state = WorkerState::State_stop; return; } if (row == 0) { m_idle = true; return; } const uint tabid = row->m_tabid; (void)m_util.get_table(tabid); m_impl.m_util.free_row(row); } void NdbImportImpl::NullOutputWorker::do_end() { log_debug(1, "do_end"); } // op NdbImportImpl::Op::Op() { m_row = 0; m_rowop = 0; m_opcnt = 0; m_opsize = 0; } NdbImportImpl::OpList::OpList() { } NdbImportImpl::OpList::~OpList() { Op* one_op = NULL; while ((one_op = pop_front()) != NULL) { // See bug 30192989 // require(one_op->m_row == NULL); delete one_op; } } // tx NdbImportImpl::Tx::Tx(DbWorker* w) : m_worker(w) { Stats& stats = w->m_team.m_job.m_stats; m_trans = 0; m_ops.set_stats(stats, "op-used"); } NdbImportImpl::Tx::~Tx() { require(m_trans == 0); } NdbImportImpl::TxList::TxList() { } NdbImportImpl::TxList::~TxList() { } // db team NdbImportImpl::DbTeam::DbTeam(Job& job, const char* name, uint workercnt) : Team(job, name, workercnt) { } NdbImportImpl::DbTeam::~DbTeam() { } NdbImportImpl::DbWorker::DbWorker(Team& team, uint n) : Worker(team, n) { m_ndb = 0; Stats& stats = team.m_job.m_stats; m_op_free.set_stats(stats, "op-free"); m_tx_free.set_stats(stats, "tx-free"); m_tx_open.set_stats(stats, "tx-open"); } NdbImportImpl::DbWorker::~DbWorker() { require(m_tx_open.cnt() == 0); delete m_ndb; } int NdbImportImpl::DbWorker::create_ndb(uint transcnt) { Connect& c = m_impl.c_connect; require(m_ndb == 0); Ndb* ndb = 0; do { uint index = m_impl.c_connectionindex; require(index < c.m_connectioncnt); ndb = new Ndb(c.m_connections[index]); m_impl.c_connectionindex = (index + 1) % c.m_connectioncnt; require(ndb != 0); if (ndb->init(transcnt) != 0) { m_util.set_error_ndb(m_error, __LINE__, ndb->getNdbError()); break; } m_ndb = ndb; return 0; } while (0); delete ndb; return -1; } NdbImportImpl::Op* NdbImportImpl::DbWorker::alloc_op() { Op* op = m_op_free.pop_front(); if (op == 0) { op = new Op; } return op; } void NdbImportImpl::DbWorker::free_op(Op* op) { m_op_free.push_back(op); } NdbImportImpl::Tx* NdbImportImpl::DbWorker::start_trans() { log_debug(2, "start_trans"); TxList& tx_free = m_tx_free; TxList& tx_open = m_tx_open; require(m_ndb != 0); NdbTransaction* trans = m_ndb->startTransaction(); if (trans == 0) { return 0; } Tx* tx = tx_free.pop_front(); if (tx == 0) { tx = new Tx(this); } require(tx != 0); require(tx->m_trans == 0); require(tx->m_ops.cnt() == 0); tx->m_trans = trans; tx_open.push_back(tx); return tx; } NdbImportImpl::Tx* NdbImportImpl::DbWorker::start_trans(const NdbRecord* keyrec, const char* keydata, uchar* xfrmbuf, uint xfrmbuflen) { log_debug(2, "start_trans"); TxList& tx_free = m_tx_free; TxList& tx_open = m_tx_open; require(m_ndb != 0); NdbTransaction* trans = m_ndb->startTransaction(keyrec, keydata, xfrmbuf, xfrmbuflen); if (trans == 0) { return 0; } Tx* tx = tx_free.pop_front(); if (tx == 0) { tx = new Tx(this); } require(tx != 0); require(tx->m_trans == 0); require(tx->m_ops.cnt() == 0); tx->m_trans = trans; tx_open.push_back(tx); return tx; } NdbImportImpl::Tx* NdbImportImpl::DbWorker::start_trans(uint nodeid, uint instanceid) { log_debug(2, "start_trans"); TxList& tx_free = m_tx_free; TxList& tx_open = m_tx_open; require(m_ndb != 0); NdbTransaction* trans = m_ndb->startTransaction(nodeid, instanceid); if (trans == 0) { return 0; } Tx* tx = tx_free.pop_front(); if (tx == 0) { tx = new Tx(this); } require(tx != 0); require(tx->m_trans == 0); require(tx->m_ops.cnt() == 0); tx->m_trans = trans; tx_open.push_back(tx); return tx; } void NdbImportImpl::DbWorker::close_trans(Tx* tx) { log_debug(2, "close_trans"); TxList& tx_free = m_tx_free; TxList& tx_open = m_tx_open; require(tx->m_trans != 0); m_ndb->closeTransaction(tx->m_trans); tx->m_trans = 0; while (tx->m_ops.cnt() != 0) { Op* op = tx->m_ops.pop_front(); require(op != 0); require(op->m_row != 0); m_rows_free.push_back(op->m_row); op->m_row = 0; op->m_rowop = 0; op->m_opcnt = 0; op->m_opsize = 0; free_op(op); } tx_open.remove(tx); tx_free.push_back(tx); } // relay op team NdbImportImpl::RelayOpTeam::RelayOpTeam(Job& job, uint workercnt) : DbTeam(job, "relay-op", workercnt) { } NdbImportImpl::RelayOpTeam::~RelayOpTeam() { } NdbImportImpl::Worker* NdbImportImpl::RelayOpTeam::create_worker(uint n) { RelayOpWorker* w = new RelayOpWorker(*this, n); return w; } void NdbImportImpl::RelayOpTeam::do_init() { log_debug(1, "do_init"); } void NdbImportImpl::RelayOpTeam::do_end() { log_debug(1, "do_end"); RowList& rows_in = *m_job.m_rows_relay; rows_in.lock(); require(!rows_in.m_foe); rows_in.m_foe = true; rows_in.unlock(); for (uint i = 0; i < m_impl.c_nodes.m_nodecnt; i++) { RowList& rows_out = *m_job.m_rows_exec[i]; rows_out.lock(); require(!rows_out.m_eof); rows_out.m_eof = true; rows_out.unlock(); } } NdbImportImpl::RelayOpWorker::RelayOpWorker(Team& team, uint n) : DbWorker(team, n) { m_relaystate = RelayState::State_null; m_xfrmalloc = 0; m_xfrmbuf = 0; m_xfrmbuflen = 0; for (uint i = 0; i < g_max_ndb_nodes; i++) m_rows_exec[i] = 0; } NdbImportImpl::RelayOpWorker::~RelayOpWorker() { delete [] m_xfrmalloc; for (uint i = 0; i < g_max_ndb_nodes; i++) delete m_rows_exec[i]; } void NdbImportImpl::RelayOpWorker::do_init() { log_debug(1, "do_init"); create_ndb(1); uint len = (MAX_KEY_SIZE_IN_WORDS << 2) + sizeof(uint64); m_xfrmalloc = new uchar [len]; // copied from Ndb::computeHash() UintPtr org = UintPtr(m_xfrmalloc); UintPtr use = (org + 7) & ~(UintPtr)7; m_xfrmbuf = (uchar*)use; m_xfrmbuflen = len - uint(use - org); uint nodecnt = m_impl.c_nodes.m_nodecnt; require(nodecnt != 0); for (uint i = 0; i < nodecnt; i++) { m_rows_exec[i] = new RowList; } } void NdbImportImpl::RelayOpWorker::do_run() { log_debug(2, "do_run"); switch (m_relaystate) { case RelayState::State_null: m_relaystate = RelayState::State_receive; break; case RelayState::State_receive: state_receive(); break; case RelayState::State_define: state_define(); break; case RelayState::State_send: state_send(); break; case RelayState::State_eof: m_state = WorkerState::State_stop; break; } } void NdbImportImpl::RelayOpWorker::state_receive() { log_debug(2, "state_receive"); const Opt& opt = m_util.c_opt; RowList& rows_in = *m_team.m_job.m_rows_relay; rows_in.lock(); RowCtl ctl(opt.m_rowswait); m_rows.push_back_from(rows_in, ctl); bool eof = rows_in.m_eof; rows_in.unlock(); if (m_rows.empty()) { if (!eof) { m_idle = true; return; } m_relaystate = RelayState::State_eof; return; } m_relaystate = RelayState::State_define; } void NdbImportImpl::RelayOpWorker::state_define() { log_debug(2, "state_define"); const Opt& opt = m_util.c_opt; Row* row; while ((row = m_rows.pop_front()) != 0) { const Nodes& c = m_impl.c_nodes; const Table& table = m_util.get_table(row->m_tabid); const bool no_hint = opt.m_no_hint; uint nodeid = 0; if (no_hint) { uint i = get_rand() % c.m_nodecnt; nodeid = c.m_nodes[i].m_nodeid; } else { Uint32 hash; m_ndb->computeHash(&hash, table.m_keyrec, (const char*)row->m_data, m_xfrmbuf, m_xfrmbuflen); uint fragid = (uint)table.m_tab->getPartitionId(hash); nodeid = table.get_nodeid(fragid); } require(nodeid < g_max_nodes); uint nodeindex = c.m_index[nodeid]; require(nodeindex < c.m_nodecnt); // move locally to per-node rows RowList& rows_exec = *m_rows_exec[nodeindex]; rows_exec.push_back(row); } m_relaystate = RelayState::State_send; } void NdbImportImpl::RelayOpWorker::state_send() { log_debug(2, "state_send"); const Opt& opt = m_util.c_opt; uint nodecnt = m_impl.c_nodes.m_nodecnt; uint left = 0; for (uint i = 0; i < nodecnt; i++) { RowList& rows_exec = *m_rows_exec[i]; RowList& rows_out = *m_team.m_job.m_rows_exec[i]; if (rows_exec.cnt() != 0) { rows_out.lock(); RowCtl ctl(opt.m_rowswait); rows_exec.pop_front_to(rows_out, ctl); rows_out.unlock(); left += rows_exec.cnt(); } } if (!left) { m_relaystate = RelayState::State_receive; return; } m_idle = true; } void NdbImportImpl::RelayOpWorker::do_end() { log_debug(1, "do_end"); if (!has_error()) { require(m_tx_open.cnt() == 0); } else if (m_tx_open.cnt() != 0) { require(m_tx_open.cnt() == 1); Tx* tx = m_tx_open.front(); close_trans(tx); } } // print const char* NdbImportImpl::g_str_state(RelayState::State state) { const char* str = 0; switch (state) { case RelayState::State_null: str = "null"; break; case RelayState::State_receive: str = "receive"; break; case RelayState::State_define: str = "define"; break; case RelayState::State_send: str = "send"; break; case RelayState::State_eof: str = "eof"; break; } require(str != 0); return str; } void NdbImportImpl::RelayOpWorker::str_state(char* str) const { sprintf(str, "%s/%s", g_str_state(m_state), g_str_state(m_relaystate)); } // exec op team NdbImportImpl::ExecOpTeam::ExecOpTeam(Job& job, uint workercnt) : DbTeam(job, "exec-op", workercnt) { uint nodecnt = m_impl.c_nodes.m_nodecnt; require(nodecnt != 0); require(workercnt % nodecnt == 0); } NdbImportImpl::ExecOpTeam::~ExecOpTeam() { } NdbImportImpl::Worker* NdbImportImpl::ExecOpTeam::create_worker(uint n) { ExecOpWorker* w = 0; const Opt& opt = m_util.c_opt; if (opt.m_no_asynch) w = new ExecOpWorkerSynch(*this, n); else w = new ExecOpWorkerAsynch(*this, n); return w; } void NdbImportImpl::ExecOpTeam::do_init() { log_debug(1, "do_init"); } void NdbImportImpl::ExecOpTeam::do_end() { log_debug(1, "do_end"); for (uint i = 0; i < m_impl.c_nodes.m_nodecnt; i++) { RowList& rows_in = *m_job.m_rows_exec[i]; rows_in.lock(); require(!rows_in.m_foe); rows_in.m_foe = true; rows_in.unlock(); } } NdbImportImpl::ExecOpWorker::ExecOpWorker(Team& team, uint n) : DbWorker(team, n) { m_execstate = ExecState::State_null; m_nodeindex = Inval_uint; m_nodeid = Inval_uint; m_eof = false; m_opcnt = 0; m_opsize = 0; } NdbImportImpl::ExecOpWorker::~ExecOpWorker() { } void NdbImportImpl::ExecOpWorker::do_init() { log_debug(1, "do_init"); const Nodes& c = m_impl.c_nodes; require(c.m_nodecnt > 0); m_nodeindex = m_workerno % c.m_nodecnt; m_nodeid = c.m_nodes[m_nodeindex].m_nodeid; /* * Option opbatch limits number of received rows and * therefore number of async transactions. Each row * creates one transaction (this is unlikely to change). */ const Opt& opt = m_util.c_opt; require(opt.m_opbatch != 0); m_rows.m_rowbatch = opt.m_opbatch; m_rows.m_rowbytes = opt.m_opbytes != 0 ? opt.m_opbytes : UINT_MAX; create_ndb(opt.m_opbatch); } void NdbImportImpl::ExecOpWorker::do_run() { log_debug(2, "do_run"); switch (m_execstate) { case ExecState::State_null: m_execstate = ExecState::State_receive; break; case ExecState::State_receive: state_receive(); break; case ExecState::State_define: state_define(); break; case ExecState::State_prepare: state_prepare(); break; case ExecState::State_send: state_send(); break; case ExecState::State_poll: state_poll(); break; case ExecState::State_eof: m_state = WorkerState::State_stop; break; default: require(false); break; } } /* * Receive rows until a batch is full or eof is seen. At the end * convert the rows into ops. The ops are assigned to transactions * in state_define(). */ void NdbImportImpl::ExecOpWorker::state_receive() { log_debug(2, "state_receive"); const Opt& opt = m_util.c_opt; RowList& rows_in = *m_team.m_job.m_rows_exec[m_nodeindex]; rows_in.lock(); RowCtl ctl(opt.m_rowswait); m_rows.push_back_from(rows_in, ctl); bool eof = rows_in.m_eof; rows_in.unlock(); do { if (m_rows.full()) { log_debug(2, "got full batch"); break; } if (eof) { if (m_rows.cnt() != 0) { log_debug(2, "got partial last batch"); break; } log_debug(2, "no more rows"); m_execstate = ExecState::State_eof; return; } log_debug(2, "wait for more rows"); m_idle = true; return; } while (0); // assign op to each row and move the row under the op require(m_ops.cnt() == 0); Row* row; while ((row = m_rows.pop_front()) != 0) { Op* op = alloc_op(); op->m_row = row; m_ops.push_back(op); } m_execstate = ExecState::State_define; } void NdbImportImpl::ExecOpWorker::reject_row(Row* row, const Error& error) { const Opt& opt = m_util.c_opt; RowList& rows_reject = *m_team.m_job.m_rows_reject; rows_reject.lock(); // write reject row first const Table& reject_table = m_util.c_reject_table; Row* rejectrow = m_util.alloc_row(reject_table); rejectrow->m_rowid = row->m_rowid; rejectrow->m_linenr = row->m_linenr; rejectrow->m_startpos = row->m_startpos; rejectrow->m_endpos = row->m_endpos; const char* reject = ""; uint32 rejectlen = strlen(reject); m_util.set_reject_row(rejectrow, m_team.m_job.m_runno, error, reject, rejectlen); require(rows_reject.push_back(rejectrow)); // error if rejects exceeded if (rows_reject.totcnt() > opt.m_rejects) { // set team level error m_util.set_error_data(m_error, __LINE__, 0, "reject limit %u exceeded", opt.m_rejects); } rows_reject.unlock(); } // synch NdbImportImpl::ExecOpWorkerSynch::ExecOpWorkerSynch(Team& team, uint n) : ExecOpWorker(team, n) { } NdbImportImpl::ExecOpWorkerSynch::~ExecOpWorkerSynch() { } void NdbImportImpl::ExecOpWorkerSynch::do_end() { log_debug(1, "do_end/synch"); if (!has_error()) { require(m_tx_open.cnt() == 0); } else if (m_tx_open.cnt() != 0) { require(m_tx_open.cnt() == 1); Tx* tx = m_tx_open.front(); close_trans(tx); } } void NdbImportImpl::ExecOpWorkerSynch::state_define() { log_debug(2, "state_define/synch"); TxList& tx_open = m_tx_open; // single trans require(tx_open.cnt() == 0); Tx* tx = start_trans(); if (tx == 0) { const NdbError& ndberror = m_ndb->getNdbError(); require(ndberror.code != 0); // synch does not handle temporary errors yet m_util.set_error_ndb(m_error, __LINE__, ndberror); return; } NdbTransaction* trans = tx->m_trans; require(trans != 0); while (m_ops.cnt() != 0) { Op* op = m_ops.pop_front(); Row* row = op->m_row; require(row != 0); const Table& table = m_util.get_table(row->m_tabid); const NdbOperation* rowop = 0; const char* rowdata = (const char*)row->m_data; if ((rowop = trans->insertTuple(table.m_rec, rowdata)) == 0) { m_util.set_error_ndb(m_error, __LINE__, trans->getNdbError()); break; } for (uint j = 0; j < table.m_blobids.size(); j++) { uint i = table.m_blobids[j]; require(i < table.m_attrs.size()); const Attr& attr = table.m_attrs[i]; require(attr.m_isblob); NdbBlob* bh = 0; if ((bh = rowop->getBlobHandle(i)) == 0) { m_util.set_error_ndb(m_error, __LINE__, rowop->getNdbError()); break; } Blob* blob = row->m_blobs[attr.m_blobno]; if (!attr.get_null(row)) { if (bh->setValue(blob->m_data, blob->m_blobsize) == -1) { m_util.set_error_ndb(m_error, __LINE__, bh->getNdbError()); break; } } else { if (bh->setValue((void*)0, 0) == -1) { m_util.set_error_ndb(m_error, __LINE__, bh->getNdbError()); break; } } } op->m_rowop = rowop; tx->m_ops.push_back(op); } m_execstate = ExecState::State_prepare; } void NdbImportImpl::ExecOpWorkerSynch::state_prepare() { log_debug(2, "state_prepare/synch"); // nothing to do m_execstate = ExecState::State_send; } void NdbImportImpl::ExecOpWorkerSynch::state_send() { log_debug(2, "state_send/synch"); TxList& tx_open = m_tx_open; require(tx_open.cnt() == 1); Tx* tx = tx_open.front(); require(tx != 0); NdbTransaction* trans = tx->m_trans; require(trans != 0); const NdbTransaction::ExecType et = NdbTransaction::Commit; if (trans->execute(et) == -1) { m_util.set_error_ndb(m_error, __LINE__, trans->getNdbError()); } close_trans(tx); m_execstate = ExecState::State_poll; } void NdbImportImpl::ExecOpWorkerSynch::state_poll() { log_debug(2, "state_poll/synch"); // nothing to poll m_opcnt = 0; m_opsize = 0; m_util.free_rows(m_rows_free); m_execstate = ExecState::State_receive; } // asynch NdbImportImpl::ExecOpWorkerAsynch::ExecOpWorkerAsynch(Team& team, uint n) : ExecOpWorker(team, n) { } NdbImportImpl::ExecOpWorkerAsynch::~ExecOpWorkerAsynch() { } void NdbImportImpl::ExecOpWorkerAsynch::do_end() { log_debug(1, "do_end/asynch"); // currently only way for "graceful" stop is fatal error if (!has_error()) { require(m_execstate == ExecState::State_eof); require(m_tx_open.cnt() == 0); } else if (m_execstate == ExecState::State_prepare) { // error in State_define, simply close the txs while (m_tx_open.cnt() != 0) { Tx* tx = m_tx_open.front(); close_trans(tx); } } else { // currently trans cannot be closed after executeAsynchPrepare if (m_execstate == ExecState::State_send) { log_debug(1, "send remaining transes"); state_send(); } while (m_execstate == ExecState::State_poll) { log_debug(1, "poll remaining transes"); state_poll(); } } } static void asynch_callback(int result, NdbTransaction* trans, void* tx_void) { NdbImportImpl::Tx* tx = (NdbImportImpl::Tx*)tx_void; require(trans == tx->m_trans); NdbImportImpl::ExecOpWorkerAsynch* w = (NdbImportImpl::ExecOpWorkerAsynch*)(tx->m_worker); w->asynch_callback(tx); } void NdbImportImpl::ExecOpWorkerAsynch::asynch_callback(Tx* tx) { NdbTransaction* trans = tx->m_trans; const NdbError& ndberror = trans->getNdbError(); if (ndberror.status == NdbError::Success) { Op* op = tx->m_ops.front(); while (op != 0) { Row* row = op->m_row; m_rowmap_out.add(row, false); op = op->next(); } } else if (ndberror.status == NdbError::TemporaryError) { m_errormap.add_one(ndberror.code); /* * Move rows back to input for processing by new txs. * Check for too many temp errors later in state_poll(). */ RowList& rows_in = *m_team.m_job.m_rows_exec[m_nodeindex]; rows_in.lock(); while (tx->m_ops.cnt() != 0) { Op* op = tx->m_ops.pop_front(); Row* row = op->m_row; require(row != 0); log_debug(1, "push back to input: rowid " << row->m_rowid); rows_in.push_back_force(row); } rows_in.unlock(); } else if (ndberror.status == NdbError::PermanentError && ndberror.classification == NdbError::ConstraintViolation) { Error error; // local error m_util.set_error_ndb(error, __LINE__, ndberror, "permanent error"); while (tx->m_ops.cnt() != 0) { Op* op = tx->m_ops.pop_front(); require(op != 0); require(op->m_row != 0); reject_row(op->m_row, error); m_rows_free.push_back(op->m_row); op->m_row = NULL; free_op(op); } } else { m_util.set_error_ndb(m_error, __LINE__, ndberror); } close_trans(tx); } void NdbImportImpl::ExecOpWorkerAsynch::state_define() { log_debug(2, "state_define/asynch"); const Opt& opt = m_util.c_opt; TxList& tx_open = m_tx_open; // no transes yet require(tx_open.cnt() == 0); m_errormap.clear(); /* * Temporary errors can occur at auto-incr and start trans. We * don't want to get stuck here on "permanent" temporary errors. * So we limit them by opt.m_tmperrors (counted per op). */ while (m_ops.cnt() != 0) { Op* op = m_ops.pop_front(); Row* row = op->m_row; require(row != 0); const Table& table = m_util.get_table(row->m_tabid); if (table.m_has_hidden_pk) { const Attrs& attrs = table.m_attrs; const uint attrcnt = attrs.size(); const Attr& attr = attrs[attrcnt - 1]; require(attr.m_type == NdbDictionary::Column::Bigunsigned); Uint64 val; if (m_ndb->getAutoIncrementValue(table.m_tab, val, opt.m_ai_prefetch_sz, opt.m_ai_increment, opt.m_ai_offset) == -1) { const NdbError& ndberror = m_ndb->getNdbError(); require(ndberror.code != 0); if (ndberror.status == NdbError::TemporaryError) { m_errormap.add_one(ndberror.code); uint temperrors = m_errormap.get_sum(); if (temperrors <= opt.m_temperrors) { log_debug(1, "get autoincr try " << temperrors << ": " << ndberror); m_ops.push_front(op); NdbSleep_MilliSleep(opt.m_tempdelay); continue; } m_util.set_error_gen(m_error, __LINE__, "number of transaction tries with" " temporary errors is %u (limit %u)", temperrors, opt.m_temperrors); break; } else { m_util.set_error_ndb(m_error, __LINE__, ndberror, "table %s: get autoincrement failed", table.m_tab->getName()); break; } } attr.set_value(row, &val, 8); } const bool no_hint = opt.m_no_hint; Tx* tx = 0; if (no_hint) tx = start_trans(); else tx = start_trans(m_nodeid, 0); if (tx == 0) { const NdbError& ndberror = m_ndb->getNdbError(); require(ndberror.code != 0); if (ndberror.status == NdbError::TemporaryError) { m_errormap.add_one(ndberror.code); uint temperrors = m_errormap.get_sum(); if (temperrors <= opt.m_temperrors) { log_debug(1, "start trans try " << temperrors << ": " << ndberror); m_ops.push_front(op); NdbSleep_MilliSleep(opt.m_tempdelay); continue; } m_util.set_error_gen(m_error, __LINE__, "number of transaction tries with" " temporary errors is %u (limit %u)", temperrors, opt.m_temperrors); break; } else { m_util.set_error_ndb(m_error, __LINE__, ndberror, "table %s: start transaction failed", table.m_tab->getName()); break; } } NdbTransaction* trans = tx->m_trans; require(trans != 0); const NdbOperation* rowop = 0; const char* rowdata = (const char*)row->m_data; if ((rowop = trans->insertTuple(table.m_rec, rowdata)) == 0) { m_util.set_error_ndb(m_error, __LINE__, trans->getNdbError()); break; } for (uint j = 0; j < table.m_blobids.size(); j++) { uint i = table.m_blobids[j]; require(i < table.m_attrs.size()); const Attr& attr = table.m_attrs[i]; require(attr.m_isblob); NdbBlob* bh = 0; if ((bh = rowop->getBlobHandle(i)) == 0) { m_util.set_error_ndb(m_error, __LINE__, rowop->getNdbError()); break; } Blob* blob = row->m_blobs[attr.m_blobno]; if (!attr.get_null(row)) { if (bh->setValue(blob->m_data, blob->m_blobsize) == -1) { m_util.set_error_ndb(m_error, __LINE__, bh->getNdbError()); break; } } else { if (bh->setValue((void*)0, 0) == -1) { m_util.set_error_ndb(m_error, __LINE__, bh->getNdbError()); break; } } bool batch = false; if (bh->preExecute(NdbTransaction::Commit, batch) == -1) { m_util.set_error_ndb(m_error, __LINE__, bh->getNdbError()); break; } } op->m_rowop = rowop; tx->m_ops.push_back(op); } m_execstate = ExecState::State_prepare; } void NdbImportImpl::ExecOpWorkerAsynch::state_prepare() { Tx* tx = m_tx_open.front(); while (tx != 0) { const NdbTransaction::ExecType et = NdbTransaction::Commit; NdbTransaction* trans = tx->m_trans; require(trans != 0); trans->executeAsynchPrepare(et, &::asynch_callback, (void*)tx); tx = tx->next(); } m_execstate = ExecState::State_send; } void NdbImportImpl::ExecOpWorkerAsynch::state_send() { log_debug(2, "state_send/asynch"); require(m_tx_open.cnt() != 0); int forceSend = 0; m_ndb->sendPreparedTransactions(forceSend); m_execstate = ExecState::State_poll; } void NdbImportImpl::ExecOpWorkerAsynch::state_poll() { log_debug(2, "state_poll/asynch"); const Opt& opt = m_util.c_opt; int timeout = opt.m_polltimeout; require(m_tx_open.cnt() != 0); m_ndb->pollNdb(timeout, m_tx_open.cnt()); if (m_tx_open.cnt() != 0) { log_debug(2, "poll not ready"); return; } log_debug(2, "poll ready"); m_opcnt = 0; m_opsize = 0; if (m_errormap.size() != 0) { Job& job = m_team.m_job; job.lock(); job.m_errormap.add_one(m_errormap); uint temperrors = job.m_errormap.get_sum(); job.unlock(); if (temperrors <= opt.m_temperrors) { log_debug(1, "temp errors: sleep " << opt.m_tempdelay << "ms"); NdbSleep_MilliSleep(opt.m_tempdelay); } else { if (!m_util.has_error(m_error)) m_util.set_error_gen(m_error, __LINE__, "number of db execution batches with" " temporary errors is %u (limit %u)", temperrors, opt.m_temperrors); } } log_debug(1, "rowmap " << m_rowmap_out.size()); m_stat_rowmap->add(m_rowmap_out.size()); m_util.free_rows(m_rows_free); m_execstate = ExecState::State_receive; } // print const char* NdbImportImpl::g_str_state(ExecState::State state) { const char* str = 0; switch (state) { case ExecState::State_null: str = "null"; break; case ExecState::State_receive: str = "receive"; break; case ExecState::State_define: str = "define"; break; case ExecState::State_prepare: str = "prepare"; break; case ExecState::State_send: str = "send"; break; case ExecState::State_poll: str = "wait"; break; case ExecState::State_eof: str = "eof"; break; } require(str != 0); return str; } void NdbImportImpl::ExecOpWorker::str_state(char* str) const { sprintf(str, "%s/%s tx:free=%u,open=%u", g_str_state(m_state), g_str_state(m_execstate), m_tx_free.cnt(), m_tx_open.cnt()); } // diag team NdbImportImpl::DiagTeam::DiagTeam(Job& job, uint workercnt) : Team(job, "diag", workercnt), // diag file errors are global m_result_file(m_util, m_error), m_reject_file(m_util, m_error), m_rowmap_file(m_util, m_error), m_stopt_file(m_util, m_error), m_stats_file(m_util, m_error) { m_is_diag = true; } NdbImportImpl::DiagTeam::~DiagTeam() { } NdbImportImpl::Worker* NdbImportImpl::DiagTeam::create_worker(uint n) { DiagWorker* w = new DiagWorker(*this, n); return w; } void NdbImportImpl::DiagTeam::do_init() { log_debug(1, "do_init"); const Opt& opt = m_util.c_opt; if (opt.m_resume) { read_old_diags(); if (has_error()) return; } open_new_diags(); } void NdbImportImpl::DiagTeam::read_old_diags(const char* name, const char* path, const Table& table, RowList& rows_out) { log_debug(1, "read_old_diags: " << name << " path=" << path); OptGuard optGuard(m_util); Opt& opt = m_util.c_opt; opt.m_ignore_lines = 1; // use default MySQL spec for diags (set by OptCsv ctor) OptCsv optcsv; CsvSpec csvspec; if (m_impl.m_csv.set_spec(csvspec, optcsv, OptCsv::ModeInput) == -1) { m_util.copy_error(m_error, m_util.c_error); require(has_error()); return; } File file(m_util, m_error); file.set_path(path); if (file.do_open(File::Read_flags) == -1) { require(has_error()); m_job.m_fatal = true; return; } // csv input requires at least 2 instances Buf* buf[2]; CsvInput* csvinput[2]; RowList rows_reject; RowMap rowmap_in[] = {m_util, m_util}; for (uint i = 0; i < 2; i++) { uint pagesize = opt.m_pagesize; uint pagecnt = opt.m_pagecnt; buf[i] = new Buf(true); buf[i]->alloc(pagesize, 2 * pagecnt); csvinput[i] = new CsvInput(m_impl.m_csv, Name(name, i), csvspec, table, *buf[i], rows_out, rows_reject, rowmap_in[i], m_job.m_stats); csvinput[i]->do_init(); } { uint i = 0; // current index uint n = 0; // number of buffer switches while (1) { uint j = 1 - i; CsvInput& csvinput1 = *csvinput[i]; Buf& b1 = *buf[i]; Buf& b2 = *buf[j]; b1.reset(); if (file.do_read(b1) == -1) { require(has_error()); break; } // if not first read, move tail from previous if (n != 0) { require(b2.movetail(b1) == 0); } csvinput1.do_parse(); if (csvinput1.has_error()) { m_util.copy_error(m_error, csvinput1.m_error); require(has_error()); break; } csvinput1.do_eval(); if (csvinput1.has_error()) { m_util.copy_error(m_error, csvinput1.m_error); require(has_error()); break; } uint curr = 0; uint left = 0; csvinput1.do_send(curr, left); require(!csvinput1.has_error()); require(left == 0); if (b1.m_eof) break; i = j; n++; } log_debug(1, "read_old_diags: " << name << " count=" << rows_out.cnt()); } // XXX diag errors not yet handled require(rows_reject.cnt() == 0); } void NdbImportImpl::DiagTeam::read_old_diags() { log_debug(1, "read_old_diags"); const Opt& opt = m_util.c_opt; Job& job = m_job; // result { const char* path = opt.m_result_file; const Table& table = m_util.c_result_table; RowList rows; read_old_diags("old-result", path, table, rows); if (has_error()) return; uint32 runno = Inval_uint32; Row* row = 0; while ((row = rows.pop_front()) != 0) { // runno { const Attr& attr = table.get_attr("runno"); uint32 x; attr.get_value(row, x); if (runno == Inval_uint32 || runno < x) runno = x; } m_util.free_row(row); } if (runno == Inval_uint32) { m_util.set_error_gen(m_error, __LINE__, "%s: no valid records found", path); return; } job.m_runno = runno + 1; } // rowmap { const char* path = opt.m_rowmap_file; const Table& table = m_util.c_rowmap_table; RowList rows; read_old_diags("old-rowmap", path, table, rows); if (has_error()) return; RowMap& rowmap_in = job.m_rowmap_in; require(rowmap_in.empty()); Row* row = 0; while ((row = rows.pop_front()) != 0) { Range range; // runno { const Attr& attr = table.get_attr("runno"); uint32 runno; attr.get_value(row, runno); if (runno != job.m_runno - 1) { m_util.free_row(row); continue; } } // start { const Attr& attr = table.get_attr("start"); attr.get_value(row, range.m_start); } // end { const Attr& attr = table.get_attr("end"); attr.get_value(row, range.m_end); } // startpos { const Attr& attr = table.get_attr("startpos"); attr.get_value(row, range.m_startpos); } // endpos { const Attr& attr = table.get_attr("endpos"); attr.get_value(row, range.m_endpos); } // reject { const Attr& attr = table.get_attr("reject"); attr.get_value(row, range.m_reject); } m_util.free_row(row); // add to old rowmap rowmap_in.add(range); } if (rowmap_in.empty()) { m_util.set_error_gen(m_error, __LINE__, "%s: no records for run %u found", path, job.m_runno - 1); return; } log_debug(1, "old rowmap:" << rowmap_in); } // old counts { const char* path = opt.m_stats_file; const Table& table = m_util.c_stats_table; RowList rows; read_old_diags("old-stats", path, table, rows); if (has_error()) return; uint64 old_rows = 0; uint64 old_reject = 0; uint64 old_runtime = 0; Row* row = 0; while ((row = rows.pop_front()) != 0) { char name[200]; { const Attr& attr = table.get_attr("name"); attr.get_value(row, name, sizeof(name)); } // rows if (strcmp(name, "job-rows") == 0) { const Attr& attr = table.get_attr("sum"); uint64 value; attr.get_value(row, value); old_rows += value; } // reject if (strcmp(name, "job-reject") == 0) { const Attr& attr = table.get_attr("sum"); uint64 value; attr.get_value(row, value); old_reject += value; } // runtime if (strcmp(name, "job-runtime") == 0) { const Attr& attr = table.get_attr("sum"); uint64 value; attr.get_value(row, value); old_runtime += value; } m_util.free_row(row); } job.m_old_rows = old_rows; job.m_old_reject = old_reject; job.m_old_runtime = old_runtime; log_debug(1, "old stats:" << " rows=" << old_rows << " reject=" << old_reject << " runtime=" << old_runtime); } } void NdbImportImpl::DiagTeam::open_new_diags() { log_debug(1, "open_new_diags"); const Opt& opt = m_util.c_opt; int openflags = 0; if (!opt.m_resume) openflags = File::Write_flags; else openflags = File::Append_flags; // use default MySQL spec for diags (set by OptCsv ctor) OptCsv optcsv; if (m_impl.m_csv.set_spec(m_csvspec, optcsv, OptCsv::ModeOutput) == -1) { m_util.copy_error(m_error, m_util.c_error); require(has_error()); return; } // result m_result_file.set_path(opt.m_result_file); if (m_result_file.do_open(openflags) == -1) { require(has_error()); m_job.m_fatal = true; return; } log_debug(1, "file: opened: " << m_result_file.get_path()); // reject m_reject_file.set_path(opt.m_reject_file); if (m_reject_file.do_open(openflags) == -1) { require(has_error()); m_job.m_fatal = true; return; } log_debug(1, "file: opened: " << m_reject_file.get_path()); // rowmap m_rowmap_file.set_path(opt.m_rowmap_file); if (m_rowmap_file.do_open(openflags) == -1) { require(has_error()); m_job.m_fatal = true; return; } log_debug(1, "file: opened: " << m_rowmap_file.get_path()); // stats opt if (opt.m_stats) { m_stopt_file.set_path(opt.m_stopt_file); if (m_stopt_file.do_open(openflags) == -1) { require(has_error()); m_job.m_fatal = true; return; } log_debug(1, "file: opened: " << m_stopt_file.get_path()); } // stats if (opt.m_stats) { m_stats_file.set_path(opt.m_stats_file); if (m_stats_file.do_open(openflags) == -1) { require(has_error()); m_job.m_fatal = true; return; } log_debug(1, "file: opened: " << m_stats_file.get_path()); } } void NdbImportImpl::DiagTeam::do_end() { log_debug(1, "do_end"); const Opt& opt = m_util.c_opt; if (m_result_file.do_close() == -1) { require(has_error()); // continue } if (m_reject_file.do_close() == -1) { require(has_error()); // continue } if (m_rowmap_file.do_close() == -1) { require(has_error()); // continue } if (opt.m_stats) { if (m_stopt_file.do_close() == -1) { require(has_error()); // continue } } if (opt.m_stats) { if (m_stats_file.do_close() == -1) { require(has_error()); // continue } } } NdbImportImpl::DiagWorker::DiagWorker(Team& team, uint n) : Worker(team, n), m_result_csv(NULL), m_reject_csv(NULL), m_rowmap_csv(NULL), m_stopt_csv(NULL), m_stats_csv(NULL) {} NdbImportImpl::DiagWorker::~DiagWorker() { delete m_result_csv; delete m_reject_csv; delete m_rowmap_csv; delete m_stopt_csv; delete m_stats_csv; } void NdbImportImpl::DiagWorker::do_init() { log_debug(1, "do_init"); const Opt& opt = m_util.c_opt; const CsvSpec& csvspec = static_cast(m_team).m_csvspec; if (has_error()) return; // result { File& file = static_cast(m_team).m_result_file; Buf& buf = m_result_buf; const Table& table = m_util.c_result_table; uint pagesize = opt.m_pagesize; uint pagecnt = opt.m_pagecnt; buf.alloc(pagesize, pagecnt); m_result_csv = new CsvOutput(m_impl.m_csv, csvspec, table, m_result_buf); m_result_csv->do_init(); if (!opt.m_resume) { m_result_csv->add_header(); if (file.do_write(buf) == -1) { require(has_error()); m_team.m_job.m_fatal = true; return; } } } // reject { File& file = static_cast(m_team).m_reject_file; Buf& buf = m_reject_buf; const Table& table = m_util.c_reject_table; uint pagesize = opt.m_pagesize; uint pagecnt = opt.m_pagecnt; buf.alloc(pagesize, pagecnt); m_reject_csv = new CsvOutput(m_impl.m_csv, csvspec, table, m_reject_buf); m_reject_csv->do_init(); if (!opt.m_resume) { m_reject_csv->add_header(); if (file.do_write(buf) == -1) { require(has_error()); m_team.m_job.m_fatal = true; return; } } } // rowmap { File& file = static_cast(m_team).m_rowmap_file; Buf& buf = m_rowmap_buf; const Table& table = m_util.c_rowmap_table; uint pagesize = opt.m_pagesize; uint pagecnt = opt.m_pagecnt; buf.alloc(pagesize, pagecnt); m_rowmap_csv = new CsvOutput(m_impl.m_csv, csvspec, table, m_rowmap_buf); m_rowmap_csv->do_init(); if (!opt.m_resume) { m_rowmap_csv->add_header(); if (file.do_write(buf) == -1) { require(has_error()); m_team.m_job.m_fatal = true; return; } } } // stats opt if (opt.m_stats) { File& file = static_cast(m_team).m_stopt_file; Buf& buf = m_stopt_buf; const Table& table = m_util.c_stopt_table; uint pagesize = opt.m_pagesize; uint pagecnt = opt.m_pagecnt; m_stopt_buf.alloc(pagesize, pagecnt); m_stopt_csv = new CsvOutput(m_impl.m_csv, csvspec, table, m_stopt_buf); m_stopt_csv->do_init(); if (!opt.m_resume) { m_stopt_csv->add_header(); if (file.do_write(buf) == -1) { require(has_error()); m_team.m_job.m_fatal = true; return; } } } // stats if (opt.m_stats) { File& file = static_cast(m_team).m_stats_file; Buf& buf = m_stats_buf; const Table& table = m_util.c_stats_table; uint pagesize = opt.m_pagesize; uint pagecnt = opt.m_pagecnt; m_stats_buf.alloc(pagesize, pagecnt); m_stats_csv = new CsvOutput(m_impl.m_csv, csvspec, table, m_stats_buf); m_stats_csv->do_init(); if (!opt.m_resume) { m_stats_csv->add_header(); if (file.do_write(buf) == -1) { require(has_error()); m_team.m_job.m_fatal = true; return; } } } } void NdbImportImpl::DiagWorker::do_run() { log_debug(2, "do_run"); // reject write_reject(); // stop by request if (m_dostop) { log_debug(1, "stop by request"); m_state = WorkerState::State_stop; } } void NdbImportImpl::DiagWorker::do_end() { const Opt& opt = m_util.c_opt; log_debug(1, "do_end"); // result write_result(); // rowmap write_rowmap(); // stats opt if (opt.m_stats) { write_stopt(); } // stats if (opt.m_stats) { write_stats(); } } void NdbImportImpl::DiagWorker::write_result() { log_debug(1, "write_result"); DiagTeam& team = static_cast(m_team); const Job& job = team.m_job; File& file = team.m_result_file; Buf& buf = m_result_buf; buf.reset(); const Table& table = m_util.c_result_table; // fatal global error, should not happen in job scope if (m_util.has_error()) { Row* row = m_util.alloc_row(table); const char* name = "IMP"; const char* desc = ""; uint64 rows = 0; uint64 reject = 0; uint64 temperrors = 0; uint64 runtime = 0; uint64 utime = 0; const Error& error = m_util.c_error; m_util.set_result_row(row, job.m_runno, name, desc, rows, reject, temperrors, runtime, utime, error); m_result_csv->add_line(row); m_util.free_row(row); } // job { Row* row = m_util.alloc_row(table); const Name name("job", job.m_jobno); const char* desc = "job"; uint64 rows = job.m_stat_rows->m_max; uint64 reject = job.m_stat_reject->m_max; uint64 temperrors = job.m_errormap.get_sum(); uint64 runtime = job.m_timer.elapsed_msec(); uint64 utime = job.m_stat_utime->m_sum; const Error& error = job.m_error; m_util.set_result_row(row, job.m_runno, name, desc, rows, reject, temperrors, runtime, utime, error); m_result_csv->add_line(row); m_util.free_row(row); } // teams for (uint teamno = 0; teamno < job.m_teamcnt; teamno++) { Row* row = m_util.alloc_row(table); const Team& team = *job.m_teams[teamno]; const Name name("team", team.m_teamno); const Name desc(team.m_name); uint64 rows = 0; uint64 reject = 0; uint64 temperrors = 0; if (team.m_state != TeamState::State_stopped) { // not worth crashing team.m_timer.stop(); } uint64 runtime = team.m_timer.elapsed_msec(); uint64 utime = team.m_stat_utime->m_sum; const Error& error = team.m_error; m_util.set_result_row(row, job.m_runno, name, desc, rows, reject, temperrors, runtime, utime, error); m_result_csv->add_line(row); m_util.free_row(row); } if (file.do_write(buf) == -1) { require(has_error()); m_team.m_job.m_fatal = true; } } void NdbImportImpl::DiagWorker::write_reject() { log_debug(2, "write_reject"); DiagTeam& team = static_cast(m_team); Job& job = team.m_job; File& file = team.m_reject_file; Buf& buf = m_reject_buf; RowList& rows_reject = *job.m_rows_reject; rows_reject.lock(); while (1) { Row* row = rows_reject.pop_front(); require(!rows_reject.m_eof); if (row == 0) { m_idle = true; break; } // Csv does not know runno so fix it here { const Table& table = m_util.c_reject_table; const Attrs& attrs = table.m_attrs; const void* p = attrs[0].get_value(row); uint32 x = *(uint32*)p; if (x == Inval_uint32) { attrs[0].set_value(row, &job.m_runno, sizeof(uint32)); } else { require(x == job.m_runno); } } buf.reset(); m_reject_csv->add_line(row); if (file.do_write(buf) == -1) { require(has_error()); m_team.m_job.m_fatal = true; m_util.free_row(row); return; } // add to job level rowmap job.m_rowmap_out.lock(); job.m_rowmap_out.add(row, true); job.m_rowmap_out.unlock(); m_util.free_row(row); } rows_reject.unlock(); } void NdbImportImpl::DiagWorker::write_rowmap() { log_debug(1, "write_rowmap"); DiagTeam& team = static_cast(m_team); const Job& job = team.m_job; File& file = team.m_rowmap_file; Buf& buf = m_rowmap_buf; const Table& table = m_util.c_rowmap_table; const RowMap& rowmap = job.m_rowmap_out; const RangeList& ranges = rowmap.m_ranges; const Range* r = ranges.front(); while (r != 0) { Row* row = m_util.alloc_row(table); const Range& range = *r; m_util.set_rowmap_row(row, job.m_runno, range); buf.reset(); m_rowmap_csv->add_line(row); m_util.free_row(row); if (file.do_write(buf) == -1) { require(has_error()); m_team.m_job.m_fatal = true; return; } r = r->next(); } } void NdbImportImpl::DiagWorker::write_stopt() { static const Opt& opt = m_util.c_opt; DiagTeam& team = static_cast(m_team); const Job& job = team.m_job; File& file = team.m_stopt_file; Buf& buf = m_stopt_buf; const Table& table = m_util.c_stopt_table; // write performance related option values const struct ov_st { const char* m_option; uint m_value; } ov_list[] = { { "connections", opt.m_connections }, { "input_workers", opt.m_input_workers }, { "output_workers", opt.m_output_workers }, { "db_workers", opt.m_db_workers }, { "no_hint", (uint)opt.m_no_hint }, { "pagesize", opt.m_pagesize }, { "pagecnt", opt.m_pagecnt }, { "pagebuffer", opt.m_pagebuffer }, { "rowbatch", opt.m_rowbatch }, { "rowbytes", opt.m_rowbytes }, { "opbatch", opt.m_opbatch }, { "opbytes", opt.m_opbytes }, { "rowswait", opt.m_rowswait }, { "idlespin", opt.m_idlespin }, { "idlesleep", opt.m_idlesleep }, { "checkloop", opt.m_checkloop }, { "alloc_chunk", opt.m_alloc_chunk } }; const uint ov_size = sizeof(ov_list) / sizeof(ov_list[0]); for (uint i = 0; i < ov_size; i++) { const struct ov_st& ov = ov_list[i]; Row* row = m_util.alloc_row(table); m_util.set_stopt_row(row, job.m_runno, ov.m_option, ov.m_value); buf.reset(); m_stopt_csv->add_line(row); m_util.free_row(row); if (file.do_write(buf) == -1) { require(has_error()); m_team.m_job.m_fatal = true; return; } } } void NdbImportImpl::DiagWorker::write_stats() { DiagTeam& team = static_cast(m_team); const Job& job = team.m_job; File& file = team.m_stats_file; Buf& buf = m_stats_buf; const Table& table = m_util.c_stats_table; // write job and global (accumulating) stats const Stats* stats_list[2] = { &job.m_stats, &m_util.c_stats }; for (uint k = 0; k <= 1; k++) { const Stats& stats = *stats_list[k]; const bool global = (k == 1); for (uint id = 0; id < stats.m_stats.size(); id++) { const Stat* stat = stats.get(id); buf.reset(); Row* row = m_util.alloc_row(table); m_util.set_stats_row(row, job.m_runno, *stat, global); m_stats_csv->add_line(row); m_util.free_row(row); if (file.do_write(buf) == -1) { require(has_error()); m_team.m_job.m_fatal = true; return; } } } } // global NdbImportImpl::Jobs::Jobs() { m_jobno = 0; } NdbImportImpl::Jobs::~Jobs() { // XXX delete jobs } NdbImportImpl::Job* NdbImportImpl::create_job() { Jobs& jobs = c_jobs; // internal and external number from 1 Job* job = new Job(*this, ++jobs.m_jobno); jobs.m_jobs.insert(std::pair(job->m_jobno, job)); job->do_create(); return job; } NdbImportImpl::Job* NdbImportImpl::find_job(uint jobno) { Job* job = 0; const Jobs& jobs = c_jobs; std::map::const_iterator it; it = jobs.m_jobs.find(jobno); if (it != jobs.m_jobs.end()) { job = it->second; require(job->m_jobno == jobno); } return job; } extern "C" { static void* start_job_c(void* data); } static void* start_job_c(void* data) { NdbImportImpl::Job* job = (NdbImportImpl::Job*)data; require(job != 0); job->do_start(); return 0; } void NdbImportImpl::start_job(Job* job) { NDB_THREAD_PRIO prio = NDB_THREAD_PRIO_MEAN; uint stack_size = 64*1024; job->m_thread = NdbThread_Create( start_job_c, (void**)job, stack_size, "job", prio); require(job->m_thread != 0); } void NdbImportImpl::stop_job(Job* job) { job->do_stop(); log_debug(1, "done"); } void NdbImportImpl::wait_job(Job* job) { const Opt& opt = m_util.c_opt; while (job->m_state != JobState::State_done) { log_debug(2, "wait for " << g_str_state(JobState::State_done)); NdbSleep_MilliSleep(opt.m_checkloop); } log_debug(1, "done"); } void NdbImportImpl::destroy_job(Job* job) { Jobs& jobs = c_jobs; require(job != 0); require(find_job(job->m_jobno) == job); require(jobs.m_jobs.erase(job->m_jobno) == 1); delete job; }